【安全工具】dark_link_detector暗链检测工具

admin 2025年3月24日23:15:57评论27 views字数 11001阅读36分40秒阅读模式

一、工具简介

dark_link_detector.py 一款专门用于检测网页中暗链的 Python 异步爬虫工具。在如今的网络环境中,暗链作为一种隐蔽且潜在危害较大的网络元素,常被恶意利用来引导用户访问恶意网站、传播不良信息或实施其他有害行为。本工具旨在通过自动化的方式,深入挖掘网站页面及其相关资源,准确识别并报告可能存在的暗链情况,为网站的安全性和合规性提供有力保障。该工具通过递归爬取指定网站的页面,深入分析页面内容和外部资源(如 JavaScript 文件),并依据预设的关键词规则来检测是否存在暗链。

二、功能特性

  1. URL 处理:为确保在处理不同格式和来源的 URL 时保持一致性和准确性,工具提供了 normalize_url 方法,能够去除 URL 中不必要的参数、锚点等信息,将其转换为标准格式,方便后续的爬取和分析操作。
  2. 规则加载:从指定文件加载关键词规则,用于暗链检测。
  3. 异步爬取:采用 asyncio 和 aiohttp 库实现异步爬取机制,显著提高了爬取效率。相较于传统的同步爬取方式,异步爬取可以在等待网络请求响应的同时处理其他任务,大大缩短了整体的爬取时间,尤其适用于大规模网站的检测场景。
  4. 内容分析:analyze_content 方法负责对页面内容和 JavaScript 文件进行细致分析。它会遍历文本内容,查找与预设规则匹配的关键词,并记录匹配规则和相关上下文信息。对于外部资源(如 JavaScript 文件),工具还提供了专门的处理方法,确保能够全面检测其中隐藏的暗链。
  5. 重定向跟踪:部分暗链可能会通过 URL 重定向的方式来隐藏真实目标,为了应对这种情况,工具实现了 track_redirects 方法,能够跟踪 URL 的重定向过程,获取最终的跳转目标,从而准确判断是否存在暗链。
  6. 报告生成:将检测结果输出到指定文件,方便用户查看。

三、安装依赖

在运行代码之前,需要安装所需的 Python 库。可以使用以下命令进行安装:

pip install aiohttp beautifulsoup4 tldextract chardet

四、使用方法

准备规则文件

创建一个文本文件rules.txt,每行包含一个关键词规则。例如:

【安全工具】dark_link_detector暗链检测工具

将该文件保存为一个已知的路径,后续在运行代码时需要指定该文件的路径。

五、运行代码

命令行参数说明

  • --base_url
    :必需参数,指定要爬取的基础 URL。
  • --rules_file
    :必需参数,指定包含关键词规则的文件路径。
  • --output_file
    :可选参数,指定输出报告的文件路径,默认为 report.txt
  • --max_depth
    :可选参数,指定最大爬取深度,默认为 5。
  • --max_pages
    :可选参数,指定最大爬取页面数,默认为 2000。
  • --timeout
    :可选参数,指定请求超时时间,默认为 15 秒。

可以通过命令行参数来指定基础 URL、规则文件、输出文件、最大深度、最大页面数和超时时间。以下是一个示例命令:

python dark_link_detector.py --base_url https://example.com --rules_file rules.txt --output_file report.txt --max_depth 5 --max_pages 2000 --timeout 15

或者使用默认输出文件、最大深度、最大页面数和超时时间

python dark_link_detector.py --base_url https://example.com --rules_file rules.txt 

六、查看报告

爬取完成后,检测结果将保存到指定的输出文件中。可以打开该文件查看匹配的规则和上下文信息。

【安全工具】dark_link_detector暗链检测工具
【安全工具】dark_link_detector暗链检测工具

七、应用场景

本工具适用于各类网站的安全检测和监控,可以使用该工具定期对网站进行暗链检测,及时发现并处理潜在的安全隐患,维护网站的正常运行和用户的合法权益。

工具代码:

import asyncio
import aiohttp
import re
import time
import random
import argparse
from urllib.parse import urlparse, urljoin, quote, parse_qs
from bs4 import BeautifulSoup
import tldextract
import chardet
from collections import deque
from typing import List, Dict, Tuple


class AdvancedDarkLinkDetector:
    def __init__(self, base_url: str, rules_file: str, output_file: str = "report.txt",
                 max_depth: int = 5, max_pages: int = 2000, timeout: int = 15):
        self.base_url = self.normalize_url(base_url)
        self.output_file = output_file
        self.rules = self.load_rules(rules_file)
        self.max_depth = max_depth
        self.max_pages = max_pages
        self.timeout = timeout

        # 增强移动端 User-Agent
        self.USER_AGENTS = [
            # 手机端
            'Mozilla/5.0 (Linux; Android 13; SM-S901B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Mobile Safari/537.36',
            'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1',
            'Mozilla/5.0 (Linux; Android 12; SM-G9910) AppleWebKit/537.36 (KHTML, like Gecko) Quark/5.9.3.232 Mobile Safari/537.36',
            'Mozilla/5.0 (Linux; Android 13; V2148A) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Mobile Safari/537.36',
            'Mozilla/5.0 (Linux; Android 12; HUAWEI MLA-AL10) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.61 Mobile Safari/537.36',

            # 电脑端
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
        ]
        self.MOBILE_UA_WEIGHT = 0.8  # 提高移动端请求比例

        self.base_domain = self.get_base_domain(self.base_url)
        self.visited = set()
        self.queue = deque([(self.base_url, 1)])
        self.results = []
        self.crawled_count = 0
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=timeout),
            headers=self.get_random_headers()
        )

    def normalize_url(self, url: str) -> str:
        """标准化 URL 格式"""
        if not url.startswith(('http://', 'https://')):
            return 'http://' + url
        return url.rstrip('/')

    def get_base_domain(self, url: str) -> str:
        """提取主域名"""
        extracted = tldextract.extract(url)
        return f"{extracted.domain}.{extracted.suffix}"

    def load_rules(self, rule_file: str) -> List[str]:
        """加载关键词规则"""
        try:
            with open(rule_file, 'r', encoding='utf-8') as f:
                return [line.strip().lower() for line in f if line.strip()]
        except FileNotFoundError:
            print(f"规则文件 {rule_file} 未找到")
            return []
        except Exception as e:
            print(f"规则加载失败: {str(e)}")
            return []

    def get_random_headers(self) -> Dict[str, str]:
        """生成随机请求头"""
        return {
            'User-Agent': self.get_random_user_agent(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Referer': random.choice(['https://www.baidu.com', self.base_url]),
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive'
        }

    def get_random_user_agent(self):
        return random.choice(self.USER_AGENTS) if random.random() < self.MOBILE_UA_WEIGHT else random.choice(
            self.USER_AGENTS)

    async def track_redirects(self, url: str):
        redirect_chain = []
        current_url = url
        for _ in range(5):  # 增加跳转跟踪深度
            try:
                async with self.session.get(current_url, allow_redirects=False) as response:
                    redirect_chain.append((response.status, current_url))
                    if response.status in [301, 302, 307, 308]:
                        new_url = response.headers.get('Location')
                        if new_url:
                            current_url = urljoin(current_url, new_url)
                        else:
                            break
                    else:
                        break
            except Exception as e:
                print(f"跳转跟踪失败: {e}")
                break
        return redirect_chain

    async def analyze_content(self, url: str) -> List[Tuple[str, str]]:
        """分析内容并返回匹配规则和上下文"""
        try:
            async with self.session.get(url) as response:
                content = await response.read()
                detected_encoding = chardet.detect(content)['encoding']
                if detected_encoding:
                    text = content.decode(detected_encoding, errors='ignore')
                else:
                    text = content.decode('utf-8', errors='ignore')
                lower_text = text.lower()
                matches = []
                for rule in self.rules:
                    pattern = re.compile(rf'\b{re.escape(rule)}\b', re.IGNORECASE)
                    for match in pattern.finditer(lower_text):
                        start = max(0, match.start() - 100)
                        end = min(len(lower_text), match.end() + 100)
                        context = text[start:end].replace('\n', ' ')
                        matches.append((rule, context))
                return matches
        except Exception as e:
            print(f"内容分析失败: {e}")
            return []

    async def process_page(self, url: str, depth: int):
        try:
            redirect_chain = await self.track_redirects(url)
            final_url = redirect_chain[-1][1] if redirect_chain else url

            if final_url != url:
                self.results.append({
                    'type': 'redirect',
                    'source': url,
                    'destination': final_url,
                    'chain': redirect_chain
                })

            if final_url in self.visited:
                return
            self.visited.add(final_url)
            self.crawled_count += 1

            async with self.session.get(final_url) as response:
                content = await response.read()
                detected_encoding = chardet.detect(content)['encoding']
                if detected_encoding:
                    page_text = content.decode(detected_encoding, errors='ignore')
                else:
                    page_text = content.decode('utf-8', errors='ignore')
                content_type = response.headers.get('Content-Type', '')

                if 'html' in content_type:
                    soup = BeautifulSoup(page_text, 'html.parser')

                    # 提取并处理资源
                    for tag in soup.find_all(['a', 'script', 'link']):
                        href = tag.get('href') or tag.get('src')
                        if href:
                            absolute_url = urljoin(final_url, href)
                            if self.get_base_domain(absolute_url) != self.base_domain:
                                await self.process_external_resource(absolute_url, final_url, depth)

                elif 'javascript' in content_type:
                    await self.process_js_file(final_url, depth)

        except Exception as e:
            print(f"页面处理失败: {e}")

    async def process_external_resource(self, url: str, source_page: str, depth: int):
        """处理外部资源"""
        if url in self.visited or depth >= self.max_depth:
            return
        self.visited.add(url)
        self.crawled_count += 1

        matches = await self.analyze_content(url)
        if matches:
            self.results.append({
                'type': 'external_resource',
                'source': source_page,
                'url': url,
                'matches': matches
            })

        content_type = await self.get_content_type(url)
        if 'javascript' in content_type:
            await self.process_js_file(url, depth + 1)

    async def get_content_type(self, url: str):
        try:
            async with self.session.head(url) as response:
                return response.headers.get('Content-Type', '')
        except Exception as e:
            print(f"获取内容类型失败: {e}")
            return ''

    async def process_js_file(self, url: str, depth: int):
        """处理 JS 文件"""
        try:
            async with self.session.get(url) as response:
                content = await response.read()
                detected_encoding = chardet.detect(content)['encoding']
                if detected_encoding:
                    js_content = content.decode(detected_encoding, errors='ignore')
                else:
                    js_content = content.decode('utf-8', errors='ignore')
                # 提取 JS 文件中的 URL
                urls_in_js = self.extract_urls_from_js(js_content, url)
                for new_url in urls_in_js:
                    if self.get_base_domain(new_url) != self.base_domain:
                        await self.process_external_resource(new_url, url, depth + 1)
                        # 对提取出的 URL 进行规则匹配
                        js_url_matches = await self.analyze_content(new_url)
                        if js_url_matches:
                            self.results.append({
                                'type': 'js_file_url',
                                'source': url,
                                'url': new_url,
                                'matches': js_url_matches
                            })

                # 分析 JS 文件内容
                matches = await self.analyze_content(url)
                if matches:
                    self.results.append({
                        'type': 'js_file',
                        'source': url,
                        'url': url,
                        'matches': matches
                    })
        except Exception as e:
            print(f"处理 JS 文件失败: {e}")

    def extract_urls_from_js(self, js_content: str, base_url: str) -> List[str]:
        """从 JS 文件中提取 URL"""
        # 再次优化正则表达式
        url_pattern = re.compile(r'(?:["\'\[\(,]|=)\s*(https?://[^\s,"\'\)\]<>\?]+(?:\?[^\s,"\'\)\]<>]*)?)\s*(?:["\'\]\),;]|$)')
        found_urls = url_pattern.findall(js_content)
        absolute_urls = []
        for url in found_urls:
            absolute_url = urljoin(base_url, url)
            absolute_urls.append(absolute_url)
        return absolute_urls

    async def crawl(self):
        while self.queue and self.crawled_count < self.max_pages:
            current_url, depth = self.queue.popleft()
            print(f"正在扫描 [{depth}/{self.max_depth}] {current_url}")
            await self.process_page(current_url, depth)
            await asyncio.sleep(random.uniform(0.5, 2))

    def generate_report(self):
        with open(self.output_file, 'w', encoding='utf-8') as f:
            f.write("=" * 80 + "\n")
            f.write(f"暗链检测报告 - {self.base_url}\n")
            f.write(f"检测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"扫描页面: {self.crawled_count}\n")
            f.write("=" * 80 + "\n")

            # 跳转检测统计
            redirects = [r for r in self.results if r['type'] == 'redirect']
            f.write(f"[跳转检测] 发现跳转: {len(redirects)}\n")
            for item in redirects:
                f.write(f"来源: {item['source']}\n")
                f.write(f"目标: {item['destination']}\n")
                f.write("跳转路径:\n")
                for status, url in item['chain']:
                    f.write(f"  {status} → {url}\n")
                f.write("-" * 80 + "\n")

            # 外部资源检测
            external_resources = [r for r in self.results if r['type'] == 'external_resource']
            f.write(f"[外部资源] 命中规则: {len(external_resources)}\n")
            for item in external_resources:
                f.write(f"来源: {item['source']}\n")
                f.write(f"URL: {item['url']}\n")
                for rule, context in item['matches']:
                    f.write(f"匹配规则: {rule}\n")
                    f.write(f"上下文: {context}\n")
                f.write("-" * 80 + "\n")

            # JS 文件检测
            js_files = [r for r in self.results if r['type'] == 'js_file']
            f.write(f"[JS 文件检测] 命中规则: {len(js_files)}\n")
            for item in js_files:
                f.write(f"来源: {item['source']}\n")
                f.write(f"URL: {item['url']}\n")
                for rule, context in item['matches']:
                    f.write(f"匹配规则: {rule}\n")
                    f.write(f"上下文: {context}\n")
                f.write("-" * 80 + "\n")

            # JS 文件中 URL 检测
            js_file_urls = [r for r in self.results if r['type'] == 'js_file_url']
            f.write(f"[JS 文件中 URL 检测] 命中规则: {len(js_file_urls)}\n")
            for item in js_file_urls:
                f.write(f"来源 JS 文件: {item['source']}\n")
                f.write(f"URL: {item['url']}\n")
                for rule, context in item['matches']:
                    f.write(f"匹配规则: {rule}\n")
                    f.write(f"上下文: {context}\n")
                f.write("-" * 80 + "\n")

    async def close_session(self):
        """关闭会话"""
        await self.session.close()


async def main():
    parser = argparse.ArgumentParser(description='高级暗链检测工具')
    parser.add_argument('url', help='目标网站 URL')
    parser.add_argument('-o', '--output', default='report.txt', help='输出文件名')
    parser.add_argument('-r', '--rules', default='rules.txt', help='规则文件路径')
    parser.add_argument('--max-depth', type=int, default=5, help='最大爬取深度')
    parser.add_argument('--max-pages', type=int, default=2000, help='最大爬取页面数')
    parser.add_argument('--timeout', type=int, default=15, help='请求超时时间')

    args = parser.parse_args()

    detector = AdvancedDarkLinkDetector(
        base_url=args.url,
        rules_file=args.rules,
        output_file=args.output,
        max_depth=args.max_depth,
        max_pages=args.max_pages,
        timeout=args.timeout
    )

    print("启动检测引擎...")
    await detector.crawl()
    detector.generate_report()
    print(f"检测完成,结果保存至: {args.output}")
    await detector.close_session()


if __name__ == "__main__":
    asyncio.run(main())

‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍

 

原文始发于微信公众号(魔都安全札记):【安全工具】dark_link_detector暗链检测工具

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2025年3月24日23:15:57
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   【安全工具】dark_link_detector暗链检测工具https://cn-sec.com/archives/3874593.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息