一、工具简介
dark_link_detector.py
一款专门用于检测网页中暗链的 Python 异步爬虫工具。在如今的网络环境中,暗链作为一种隐蔽且潜在危害较大的网络元素,常被恶意利用来引导用户访问恶意网站、传播不良信息或实施其他有害行为。本工具旨在通过自动化的方式,深入挖掘网站页面及其相关资源,准确识别并报告可能存在的暗链情况,为网站的安全性和合规性提供有力保障。该工具通过递归爬取指定网站的页面,深入分析页面内容和外部资源(如 JavaScript 文件),并依据预设的关键词规则来检测是否存在暗链。
二、功能特性
- URL 处理:为确保在处理不同格式和来源的 URL 时保持一致性和准确性,工具提供了 normalize_url 方法,能够去除 URL 中不必要的参数、锚点等信息,将其转换为标准格式,方便后续的爬取和分析操作。
- 规则加载:从指定文件加载关键词规则,用于暗链检测。
- 异步爬取:采用 asyncio 和 aiohttp 库实现异步爬取机制,显著提高了爬取效率。相较于传统的同步爬取方式,异步爬取可以在等待网络请求响应的同时处理其他任务,大大缩短了整体的爬取时间,尤其适用于大规模网站的检测场景。
- 内容分析:analyze_content 方法负责对页面内容和 JavaScript 文件进行细致分析。它会遍历文本内容,查找与预设规则匹配的关键词,并记录匹配规则和相关上下文信息。对于外部资源(如 JavaScript 文件),工具还提供了专门的处理方法,确保能够全面检测其中隐藏的暗链。
- 重定向跟踪:部分暗链可能会通过 URL 重定向的方式来隐藏真实目标,为了应对这种情况,工具实现了 track_redirects 方法,能够跟踪 URL 的重定向过程,获取最终的跳转目标,从而准确判断是否存在暗链。
- 报告生成:将检测结果输出到指定文件,方便用户查看。
三、安装依赖
在运行代码之前,需要安装所需的 Python 库。可以使用以下命令进行安装:
pip install aiohttp beautifulsoup4 tldextract chardet
四、使用方法
准备规则文件
创建一个文本文件rules.txt,每行包含一个关键词规则。例如:
将该文件保存为一个已知的路径,后续在运行代码时需要指定该文件的路径。
五、运行代码
命令行参数说明
--base_url
:必需参数,指定要爬取的基础 URL。 --rules_file
:必需参数,指定包含关键词规则的文件路径。 --output_file
:可选参数,指定输出报告的文件路径,默认为 report.txt
。--max_depth
:可选参数,指定最大爬取深度,默认为 5。 --max_pages
:可选参数,指定最大爬取页面数,默认为 2000。 --timeout
:可选参数,指定请求超时时间,默认为 15 秒。
可以通过命令行参数来指定基础 URL、规则文件、输出文件、最大深度、最大页面数和超时时间。以下是一个示例命令:
python dark_link_detector.py --base_url https://example.com --rules_file rules.txt --output_file report.txt --max_depth 5 --max_pages 2000 --timeout 15
或者使用默认输出文件、最大深度、最大页面数和超时时间
python dark_link_detector.py --base_url https://example.com --rules_file rules.txt
六、查看报告
爬取完成后,检测结果将保存到指定的输出文件中。可以打开该文件查看匹配的规则和上下文信息。
七、应用场景
本工具适用于各类网站的安全检测和监控,可以使用该工具定期对网站进行暗链检测,及时发现并处理潜在的安全隐患,维护网站的正常运行和用户的合法权益。
工具代码:
import asyncio import aiohttp import re import time import random import argparse from urllib.parse import urlparse, urljoin, quote, parse_qs from bs4 import BeautifulSoup import tldextract import chardet from collections import deque from typing import List, Dict, Tuple class AdvancedDarkLinkDetector: def __init__(self, base_url: str, rules_file: str, output_file: str = "report.txt", max_depth: int = 5, max_pages: int = 2000, timeout: int = 15): self.base_url = self.normalize_url(base_url) self.output_file = output_file self.rules = self.load_rules(rules_file) self.max_depth = max_depth self.max_pages = max_pages self.timeout = timeout # 增强移动端 User-Agent self.USER_AGENTS = [ # 手机端 'Mozilla/5.0 (Linux; Android 13; SM-S901B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Mobile Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1', 'Mozilla/5.0 (Linux; Android 12; SM-G9910) AppleWebKit/537.36 (KHTML, like Gecko) Quark/5.9.3.232 Mobile Safari/537.36', 'Mozilla/5.0 (Linux; Android 13; V2148A) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Mobile Safari/537.36', 'Mozilla/5.0 (Linux; Android 12; HUAWEI MLA-AL10) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.61 Mobile Safari/537.36', # 电脑端 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15', ] self.MOBILE_UA_WEIGHT = 0.8 # 提高移动端请求比例 self.base_domain = self.get_base_domain(self.base_url) self.visited = set() self.queue = deque([(self.base_url, 1)]) self.results = [] self.crawled_count = 0 self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=timeout), headers=self.get_random_headers() ) def normalize_url(self, url: str) -> str: """标准化 URL 格式""" if not url.startswith(('http://', 'https://')): return 'http://' + url return url.rstrip('/') def get_base_domain(self, url: str) -> str: """提取主域名""" extracted = tldextract.extract(url) return f"{extracted.domain}.{extracted.suffix}" def load_rules(self, rule_file: str) -> List[str]: """加载关键词规则""" try: with open(rule_file, 'r', encoding='utf-8') as f: return [line.strip().lower() for line in f if line.strip()] except FileNotFoundError: print(f"规则文件 {rule_file} 未找到") return [] except Exception as e: print(f"规则加载失败: {str(e)}") return [] def get_random_headers(self) -> Dict[str, str]: """生成随机请求头""" return { 'User-Agent': self.get_random_user_agent(), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Referer': random.choice(['https://www.baidu.com', self.base_url]), 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } def get_random_user_agent(self): return random.choice(self.USER_AGENTS) if random.random() < self.MOBILE_UA_WEIGHT else random.choice( self.USER_AGENTS) async def track_redirects(self, url: str): redirect_chain = [] current_url = url for _ in range(5): # 增加跳转跟踪深度 try: async with self.session.get(current_url, allow_redirects=False) as response: redirect_chain.append((response.status, current_url)) if response.status in [301, 302, 307, 308]: new_url = response.headers.get('Location') if new_url: current_url = urljoin(current_url, new_url) else: break else: break except Exception as e: print(f"跳转跟踪失败: {e}") break return redirect_chain async def analyze_content(self, url: str) -> List[Tuple[str, str]]: """分析内容并返回匹配规则和上下文""" try: async with self.session.get(url) as response: content = await response.read() detected_encoding = chardet.detect(content)['encoding'] if detected_encoding: text = content.decode(detected_encoding, errors='ignore') else: text = content.decode('utf-8', errors='ignore') lower_text = text.lower() matches = [] for rule in self.rules: pattern = re.compile(rf'\b{re.escape(rule)}\b', re.IGNORECASE) for match in pattern.finditer(lower_text): start = max(0, match.start() - 100) end = min(len(lower_text), match.end() + 100) context = text[start:end].replace('\n', ' ') matches.append((rule, context)) return matches except Exception as e: print(f"内容分析失败: {e}") return [] async def process_page(self, url: str, depth: int): try: redirect_chain = await self.track_redirects(url) final_url = redirect_chain[-1][1] if redirect_chain else url if final_url != url: self.results.append({ 'type': 'redirect', 'source': url, 'destination': final_url, 'chain': redirect_chain }) if final_url in self.visited: return self.visited.add(final_url) self.crawled_count += 1 async with self.session.get(final_url) as response: content = await response.read() detected_encoding = chardet.detect(content)['encoding'] if detected_encoding: page_text = content.decode(detected_encoding, errors='ignore') else: page_text = content.decode('utf-8', errors='ignore') content_type = response.headers.get('Content-Type', '') if 'html' in content_type: soup = BeautifulSoup(page_text, 'html.parser') # 提取并处理资源 for tag in soup.find_all(['a', 'script', 'link']): href = tag.get('href') or tag.get('src') if href: absolute_url = urljoin(final_url, href) if self.get_base_domain(absolute_url) != self.base_domain: await self.process_external_resource(absolute_url, final_url, depth) elif 'javascript' in content_type: await self.process_js_file(final_url, depth) except Exception as e: print(f"页面处理失败: {e}") async def process_external_resource(self, url: str, source_page: str, depth: int): """处理外部资源""" if url in self.visited or depth >= self.max_depth: return self.visited.add(url) self.crawled_count += 1 matches = await self.analyze_content(url) if matches: self.results.append({ 'type': 'external_resource', 'source': source_page, 'url': url, 'matches': matches }) content_type = await self.get_content_type(url) if 'javascript' in content_type: await self.process_js_file(url, depth + 1) async def get_content_type(self, url: str): try: async with self.session.head(url) as response: return response.headers.get('Content-Type', '') except Exception as e: print(f"获取内容类型失败: {e}") return '' async def process_js_file(self, url: str, depth: int): """处理 JS 文件""" try: async with self.session.get(url) as response: content = await response.read() detected_encoding = chardet.detect(content)['encoding'] if detected_encoding: js_content = content.decode(detected_encoding, errors='ignore') else: js_content = content.decode('utf-8', errors='ignore') # 提取 JS 文件中的 URL urls_in_js = self.extract_urls_from_js(js_content, url) for new_url in urls_in_js: if self.get_base_domain(new_url) != self.base_domain: await self.process_external_resource(new_url, url, depth + 1) # 对提取出的 URL 进行规则匹配 js_url_matches = await self.analyze_content(new_url) if js_url_matches: self.results.append({ 'type': 'js_file_url', 'source': url, 'url': new_url, 'matches': js_url_matches }) # 分析 JS 文件内容 matches = await self.analyze_content(url) if matches: self.results.append({ 'type': 'js_file', 'source': url, 'url': url, 'matches': matches }) except Exception as e: print(f"处理 JS 文件失败: {e}") def extract_urls_from_js(self, js_content: str, base_url: str) -> List[str]: """从 JS 文件中提取 URL""" # 再次优化正则表达式 url_pattern = re.compile(r'(?:["\'\[\(,]|=)\s*(https?://[^\s,"\'\)\]<>\?]+(?:\?[^\s,"\'\)\]<>]*)?)\s*(?:["\'\]\),;]|$)') found_urls = url_pattern.findall(js_content) absolute_urls = [] for url in found_urls: absolute_url = urljoin(base_url, url) absolute_urls.append(absolute_url) return absolute_urls async def crawl(self): while self.queue and self.crawled_count < self.max_pages: current_url, depth = self.queue.popleft() print(f"正在扫描 [{depth}/{self.max_depth}] {current_url}") await self.process_page(current_url, depth) await asyncio.sleep(random.uniform(0.5, 2)) def generate_report(self): with open(self.output_file, 'w', encoding='utf-8') as f: f.write("=" * 80 + "\n") f.write(f"暗链检测报告 - {self.base_url}\n") f.write(f"检测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"扫描页面: {self.crawled_count}\n") f.write("=" * 80 + "\n") # 跳转检测统计 redirects = [r for r in self.results if r['type'] == 'redirect'] f.write(f"[跳转检测] 发现跳转: {len(redirects)}\n") for item in redirects: f.write(f"来源: {item['source']}\n") f.write(f"目标: {item['destination']}\n") f.write("跳转路径:\n") for status, url in item['chain']: f.write(f" {status} → {url}\n") f.write("-" * 80 + "\n") # 外部资源检测 external_resources = [r for r in self.results if r['type'] == 'external_resource'] f.write(f"[外部资源] 命中规则: {len(external_resources)}\n") for item in external_resources: f.write(f"来源: {item['source']}\n") f.write(f"URL: {item['url']}\n") for rule, context in item['matches']: f.write(f"匹配规则: {rule}\n") f.write(f"上下文: {context}\n") f.write("-" * 80 + "\n") # JS 文件检测 js_files = [r for r in self.results if r['type'] == 'js_file'] f.write(f"[JS 文件检测] 命中规则: {len(js_files)}\n") for item in js_files: f.write(f"来源: {item['source']}\n") f.write(f"URL: {item['url']}\n") for rule, context in item['matches']: f.write(f"匹配规则: {rule}\n") f.write(f"上下文: {context}\n") f.write("-" * 80 + "\n") # JS 文件中 URL 检测 js_file_urls = [r for r in self.results if r['type'] == 'js_file_url'] f.write(f"[JS 文件中 URL 检测] 命中规则: {len(js_file_urls)}\n") for item in js_file_urls: f.write(f"来源 JS 文件: {item['source']}\n") f.write(f"URL: {item['url']}\n") for rule, context in item['matches']: f.write(f"匹配规则: {rule}\n") f.write(f"上下文: {context}\n") f.write("-" * 80 + "\n") async def close_session(self): """关闭会话""" await self.session.close() async def main(): parser = argparse.ArgumentParser(description='高级暗链检测工具') parser.add_argument('url', help='目标网站 URL') parser.add_argument('-o', '--output', default='report.txt', help='输出文件名') parser.add_argument('-r', '--rules', default='rules.txt', help='规则文件路径') parser.add_argument('--max-depth', type=int, default=5, help='最大爬取深度') parser.add_argument('--max-pages', type=int, default=2000, help='最大爬取页面数') parser.add_argument('--timeout', type=int, default=15, help='请求超时时间') args = parser.parse_args() detector = AdvancedDarkLinkDetector( base_url=args.url, rules_file=args.rules, output_file=args.output, max_depth=args.max_depth, max_pages=args.max_pages, timeout=args.timeout ) print("启动检测引擎...") await detector.crawl() detector.generate_report() print(f"检测完成,结果保存至: {args.output}") await detector.close_session() if __name__ == "__main__": asyncio.run(main())
原文始发于微信公众号(魔都安全札记):【安全工具】dark_link_detector暗链检测工具
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论