子域名扫描工具
利用ping命令扫描
import argparse
import platform #引用判断是不是window系统还是linux系统库
import subprocess #引用系统操作命令的库
from concurrent.futures import ThreadPoolExecutor #引用多线程的内容的库
ASCII_ART = r'''
,---,. ,-. ____
,' .' | ,--/ /| ,---, ,' , `. ,--,
,---.' | ,--. :/ | ,---.'| ,---. ,-+-,.' _ | ,--.'| ,---,
| | .' : : ' / | | : ' ,' ,-+-. ; , || | |, ,-+-. / |
: : : ,--.--. ,---. | ' / | | | / / | ,--.'|' | || ,--.--. `--'_ ,--.'|' |
: | |-, / / ' | : ,--.__| |. ; ,. :| | ,', | |,/ ,' ,'| | | ,"' |
| : ;/|.--. .-. | / / ' | | / ,' |' | |: :| | / | |--'.--. .-. | ' | | | | / | |
| | .' __/: . .. ' / ' : |. . ' / |' | .; :| : | | , __/: . . | | : | | | | |
' : ' ," .--.; |' ; :__ | | ' ' ; |: || : || : | |/ ," .--.; | ' : |__ | | | |/
| | | / / ,. |' | '.'|' : |--' | | '/ ' / | | |`-' / / ,. | | | '.'|| | |--'
| : ; : .' : :; |,' | : :| `----' | ;/ ; : .' ; : ;| |/
| | ,' | , .-./ / '--' / '---' | , .-./| , / '---'
`----' `--`---' `----' `----' `--`---' ---`-'
--Version 1.0 --Author 嵩艺
'''
def is_subdomain_valid(subdomain, domain):
"""判断子域名是否有效"""
full_domain = f"{subdomain}.{domain}"
system = platform.system()
param = ['-n', '1', '-w', '1000'] if system == 'Windows' else ['-c', '1', '-W', '1']
try:
result = subprocess.run(
['ping'] + param + [full_domain],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
if system == 'Windows' and 'TTL=' in result.stdout: #windows 系统ping返回正确的结果 是会有 TTL=
return True
elif system != 'Windows' and 'bytes from' in result.stdout: # linux 系统ping返回正确的结果是会有 bytes from
return True
return False
except Exception:
return False
def main():
"""主函数:处理命令行参数并执行扫描"""
print(ASCII_ART) # 打印嵌入的艺术字标题
parser = argparse.ArgumentParser(
description='子域名扫描工具 - 通过ping命令检测有效子域名',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-d', '--domain', required=True, help='主域名 (例如: example.com)')
parser.add_argument('-list', '--wordlist', required=True, help='子域名字典文件路径 (相对或绝对路径)')
parser.add_argument('-o', '--output', help='输出结果文件 (例如: results.txt)')
parser.add_argument('-t', '--threads', type=int, default=50, help='并发线程数 (默认: 50)')
args = parser.parse_args()
try:
# 读取子域名字典
with open(args.wordlist, 'r', encoding='utf-8') as f:
subdomains = [line.strip() for line in f if line.strip()]
#新列表 = [表达式 for 变量 in 可迭代对象 if 条件]
#1.表达式:对每个元素进行处理的逻辑。2.for 变量 in 可迭代对象:遍历可迭代对象中的每个元素。3.if 条件(可选):过滤不符合条件的元素
print(f"开始扫描 {args.domain} 的子域名,使用 {args.threads} 个线程...")
valid_subdomains = []
# 并发执行扫描
with ThreadPoolExecutor(max_workers=args.threads) as executor:
results = list(executor.map(
lambda sub: (sub, is_subdomain_valid(sub, args.domain)), #这里有点绕就是 匿名函数使用方法 加上 execuror map函数的使用方法结合了
subdomains
))
# 收集有效子域名
for subdomain, is_valid in results:
if is_valid:
full_domain = f"{subdomain}.{args.domain}"
valid_subdomains.append(full_domain)
print(f"[+] 发现有效子域名: {full_domain}")
# 输出结果到文件
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write('n'.join(valid_subdomains))
print(f"[✓] 结果已保存到 {args.output},共发现 {len(valid_subdomains)} 个有效子域名")
except FileNotFoundError as e:
print(f"错误:文件不存在 - {e.filename}")
except Exception as e:
print(f"程序异常: {str(e)}")
if __name__ == "__main__":
main()
效果
代码分析
with ThreadPoolExecutor(max_workers=args.threads) as executor:
results = list(executor.map(
lambda sub: (sub, is_subdomain_valid(sub, args.domain)), #这里有点绕就是 匿名函数使用方法 加上 execuror map函数的使用方法结合了
subdomains
))
第一个板块
map逻辑就是
subdomains 取到子域名字典中一行内容,然后给到sub 这个效果是 map函数实现,传参
第二个板块
lambda sub: (sub, is_subdomain_valid(sub, args.domain))逻辑是
匿名函数lambda sub () sub 是匿名函数的参数
第三个板块
(sub, is_subdomain_valid(sub, args.domain))
逻辑是这个括号lambda (x,y)返回的结果是元组(x,y),(sub,经过域名存活判断函数返回ture或者是flas)
===》 [('www', True), ('admin', False), ('mail', True)]大概就是这样
lambda sub: (sub, fun(sub+sub)) 如果 sub=3 则这个匿名函数返回的结果是 (3,6) 个元组
for subdomain, is_valid in results:
if is_valid:
full_domain = f"{subdomain}.{args.domain}"
valid_subdomains.append(full_domain)
print(f"[+] 发现有效子域名: {full_domain}")
这个results 返回的结果是 这种 [('www', True), ('admin', False), ('mail', True)]
意思是将 元组第一个元素给到 subdomain 第二个元素给到 is_valid 对吧
装逼
这种太慢了,受不了。虽然写的有线程池但是还是太慢了,所以得换个思路。
利用DNS解析扫描
如果对方服务器禁止ping我们就需要这种
基于 Socket 的 DNS 解析 的子域名扫描,其核心原理是直接与 DNS 服务器通信获取解析记录,不经过目标 Web 服务器,因此 WAF(Web 应用防火墙)无法拦截这类请求。
-
WAF 是 七层(应用层)设备,仅监控 HTTP/HTTPS 协议流量(如请求头、URL、Payload 等),用于防御 SQL 注入、XSS 等 Web 层攻击。 -
DNS 解析属于三层(网络层)和四层(传输层)操作,WAF 无法解析和拦截 DNS 查询数据包。
原理
就是向NDS服务器发送一个域名的请求,如果存在这个域名,NDS服务器会返回给主机一个ip地址,通过这个逻辑来判断是否存在。
DNS服务器就是用来将域名解析成IP地址的,方便我们电脑去访问一个域名的时候,走正确的ip地址。
初始代码
import socket
def socket_domain():
# 读取子域名列表
with open('admindir-1.2w.txt', 'r', encoding='utf-8') as f:
subdomains = [line.strip() for line in f if line.strip()]
# 遍历子域名列表进行解析
for subdomain in subdomains:
try:
# 拼接完整域名并进行DNS解析
full_domain = f'{subdomain}.xxxxx.edu.cn'
ip = socket.gethostbyname(full_domain)
print(f'{full_domain},{ip}')
except socket.gaierror as e:
# 处理DNS解析错误
pass
except Exception as e:
# 处理其他异常
pass
if __name__ == '__main__':
socket_domain()
优化
import socket
import argparse
import os
import concurrent.futures
import time
import sys
import logging
import signal
from threading import Lock
ASCII_ART = r'''
,---,. ,-. ____
,' .' | ,--/ /| ,---, ,' , `. ,--,
,---.' | ,--. :/ | ,---.'| ,---. ,-+-,.' _ | ,--.'| ,---,
| | .' : : ' / | | : ' ,' ,-+-. ; , || | |, ,-+-. / |
: : : ,--.--. ,---. | ' / | | | / / | ,--.'|' | || ,--.--. `--'_ ,--.'|' |
: | |-, / / ' | : ,--.__| |. ; ,. :| | ,', | |,/ ,' ,'| | | ,"' |
| : ;/|.--. .-. | / / ' | | / ,' |' | |: :| | / | |--'.--. .-. | ' | | | | / | |
| | .' __/: . .. ' / ' : |. . ' / |' | .; :| : | | , __/: . . | | : | | | | |
' : ' ," .--.; |' ; :__ | | ' ' ; |: || : || : | |/ ," .--.; | ' : |__ | | | |/
| | | / / ,. |' | '.'|' : |--' | | '/ ' / | | |`-' / / ,. | | | '.'|| | |--'
| : ; : .' : :; |,' | : :| `----' | ;/ ; : .' ; : ;| |/
| | ,' | , .-./ / '--' / '---' | , .-./| , / '---'
`----' `--`---' `----' `----' `--`---' ---`-'
--Version 2.0 --Author 嵩艺
'''
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('subdomain_scanner')
# 全局变量用于进度跟踪
processed_count = 0 #全局变量,记录处理子域名的数量
success_count = 0 #全局变量, 记录成功解析的子域名的数量
start_time = time.time() #全局变量,记录程序开始的时间戳
print_lock = Lock() #创建一个索对象 用于控制打印操作i的线程安全
def resolve_subdomain(subdomain, base_domain, timeout=3):
"""解析单个子域名并返回结果"""
try:
full_domain = f"{subdomain}.{base_domain}"
# 设置DNS解析超时
socket.setdefaulttimeout(timeout) #时间限制
ip = socket.gethostbyname(full_domain) #获取域名的ip地址
return full_domain, ip
except (socket.gaierror, socket.timeout):
# DNS解析失败或超时
return None, None
except Exception as e:
# 记录其他异常
logger.error(f"解析 {subdomain}.{base_domain} 时出错: {str(e)}")
return None, None
def update_progress(total):
"""更新进度显示"""
global processed_count, success_count
with print_lock:
elapsed = time.time() - start_time
processed_count += 1
progress = (processed_count / total) * 100
# 每秒更新一次进度
if processed_count % max(1, total // 100) == 0 or processed_count == total:
sys.stdout.write(
f"r进度: {processed_count}/{total} ({progress:.1f}%) | "
f"成功: {success_count} | 耗时: {elapsed:.1f}s | "
f"速度: {processed_count / max(1, elapsed):.1f} req/s"
)
sys.stdout.flush()
if processed_count == total:
print() # 完成时换行
def process_subdomains(subdomains, base_domain, output_file, max_workers=50, timeout=3):
"""使用线程池处理子域名列表"""
global success_count, processed_count
total = len(subdomains)
# 重置计数器
processed_count = 0
success_count = 0
# 创建输出目录(如果需要)
if output_file:
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 使用ThreadPoolExecutor处理子域名
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 准备写入文件
output_handle = open(output_file, 'w', encoding='utf-8') if output_file else None
try:
# 批量提交任务
futures = {executor.submit(resolve_subdomain, sub, base_domain, timeout): sub for sub in subdomains}
# 处理完成的任务
for future in concurrent.futures.as_completed(futures): #按照顺序进行,谁先完成谁先排序
subdomain = futures[future]
full_domain, ip = future.result()
# 更新进度
update_progress(total)
if ip:
success_count += 1
result_line = f"{full_domain},{ip}"
# 写入文件
if output_handle:
output_handle.write(result_line + 'n')
output_handle.flush() # 确保及时写入
# 打印结果
with print_lock:
logger.info(result_line)
except Exception as e:
logger.error(f"处理过程中发生错误: {str(e)}")
finally:
if output_handle:
output_handle.close()
return success_count
def signal_handler(sig, frame):
"""处理Ctrl+C信号"""
print("nn检测到中断信号,正在退出...")
logger.warning("用户中断扫描")
sys.exit(0)
def main():
"""主函数"""
global start_time
print(ASCII_ART) # 打印嵌入的艺术字标题
# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
# 解析命令行参数
parser = argparse.ArgumentParser(
description="高级子域名扫描工具 - 基于多线程DNS解析",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'-list',
required=True,
help="子域名字典文件路径(相对或绝对路径)"
)
parser.add_argument(
'-o',
default=None,
help="输出结果文件路径(可选),若提供则结果写入文件"
)
parser.add_argument(
'-t', # 改为 -t 表示线程数
'--threads',
type=int,
default=100,
help="线程数"
)
parser.add_argument(
'-d', # 改为 -d 表示域名
'--domain',
default="zzuit.edu.cn",
help="基础域名"
)
parser.add_argument(
'--timeout',
type=float,
default=2.0,
help="DNS解析超时时间(秒)"
)
parser.add_argument(
'--verbose',
action='store_true',
help="显示详细输出"
)
args = parser.parse_args()
# 设置详细日志
if args.verbose:
logger.setLevel(logging.DEBUG) #当用户在命令端输入 --verbose就能看到日志信息
# 检查文件是否存在
if not os.path.exists(args.list):
logger.error(f"字典文件 '{args.list}' 不存在")
return
# 读取子域名列表
try:
with open(args.list, 'r', encoding='utf-8') as f:
subdomains = [line.strip() for line in f if line.strip()]
logger.info(f"成功读取 {len(subdomains)} 个子域名")
except Exception as e:
logger.error(f"读取字典文件时出错: {str(e)}")
return
# 显示配置信息
logger.info(f"目标域名: {args.domain}")
logger.info(f"线程数: {args.threads}")
logger.info(f"超时设置: {args.timeout}s")
if args.o:
logger.info(f"结果将保存到: {args.o}")
# 处理子域名
start_time = time.time()
success = process_subdomains(
subdomains,
args.domain,
args.o,
max_workers=args.threads,
timeout=args.timeout
)
# 显示摘要信息
elapsed = time.time() - start_time
logger.info(f"扫描完成! 共处理 {len(subdomains)} 个子域名")
logger.info(f"成功解析: {success} 个 ({success / max(1, len(subdomains)):.1%})")
logger.info(f"总耗时: {elapsed:.2f} 秒")
logger.info(f"平均速度: {len(subdomains) / max(1, elapsed):.1f} 请求/秒")
logger.info("扫描完成")
if __name__ == '__main__':
main()
效果
处理九万个字典
原文始发于微信公众号(嵩艺):子域名扫描工具开发
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论