chromedriver
可应对爱站的反爬机制,并且使用了多线程。作为白帽,此工具主要帮助大家批量查询域名的权重,补天漏洞平台的权重信息重要以爱站为主,在漏洞挖掘过程中方便查询域名信息,维护网络空间安全。- 采用chromedriver模拟用户的正常行为
- 对需要获取的信息采用xpath进行数据处理
- 数据在爬取过程中可能也会遗漏,对获取不到数据的域名我们将重复调用一次避免遗漏
- 使用tqdm来显示进度条优化用户体验
- 使用多线程提高爬取速度
- 对数据进行主域名处理再去重(包括ip地址)优化爬取机制
- 针对以上开发思路,我们需要设计几个方法来实现
- 用户指定.txt文件说明要爬取的域名,读取并创建列表存储
- 对列表中的数据进行处理(主域名提取,去ip,去重)
- 创建线程池准备进行数据爬取(线程来调用我们下面的爬取函数)
- 设计域名查询的方法,采用chromedriver实现,需要有两个,一个爬取不到再调用另一个
- 最后将查询到的结果写入.csv文件
具体框架如下:
import argparse def file_Read(domain): pass def data_Processing(domain): pass def Multi_threading(domain): pass def Weight_query(domain): pass def Weight_query_B(domain): pass if __name__ == '__main__': parser = argparse.ArgumentParser(description="网站权重查询") parser.add_argument('-u','--url',type=str,help='查询单个域名权重!') parser.add_argument('-f', '--file', type=str, help='批量查询域名权重!') args = parser.parse_args() if '-u' in sys.argv: pass if '-f' in sys.argv: Weight_data = [] #创建列表存储爬取到的数据 datas = file_Read(args.file) domains = data_Processing(datas) domains = list(set(domains)) #去重 Multi_threading(domains) #创建线程池,域名数据爬取的方法也是在这里面调用 Data_writing(Weight_data)
这里主要主要分析创建多线程以及调用chromedriver
进行数据爬取,其他方面比较简单,最后代码也会给到大家。
- 多线程线程设置最多10个,
chromedriver
是比较耗费资源的,使用后发现10是最合适的,当然可以根据自己去情况进行调整
def Multi_threading(domains): try: with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # 使用tqdm来显示进度条 with tqdm(total=len(domains), desc="域名权重查询中") as progress_bar: # 提交所有任务到线程池,并收集Future对象 futures = [executor.submit(Weight_query, domain) for domain in domains] # 遍历Future对象,当每个任务完成时更新进度条 for future in concurrent.futures.as_completed(futures): progress_bar.update(1) except Exception as e: pass
- 数据爬取这里关键点是最后的递归调用,因为多线程不方便重复调用,也怕陷入死循环,区别在最后
Weight_query
查询不到调用Weight_query_B
,Weight_query_B
查询不到将查询不到的域名添加到列表,最后再返回给用户。
def Weight_query(domain): try: target = 'https://www.aizhan.com/cha/' + domain service = Service(r'./chromedriver.exe', service_log_path=logFilename) driver = webdriver.Chrome(service=service, options=chrome_options) driver.get(target, ) WebDriverWait(driver, 10) page_source = driver.page_source wait = WebDriverWait(driver, 10) # 设置超时时间为30秒 driver.quit() tree = etree.HTML(page_source) baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0] yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0] three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0] SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0] sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0] google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0] ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0] ICP_company = tree.xpath('//*[@id="icp_company"]')[0] line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n" line_to_write = line_to_write.replace('\n', '') line_to_write = line_to_write + "\n" Query_results(line_to_write) except Exception as e: Weight_query_B(domain) def Weight_query_B(domain): try: target = 'https://www.aizhan.com/cha/' + domain service = Service(r'./chromedriver.exe',service_log_path=logFilename) driver = webdriver.Chrome(service=service, options=chrome_options) driver.get(target, ) WebDriverWait(driver, 10) page_source = driver.page_source wait = WebDriverWait(driver, 10) # 设置超时时间为30秒 driver.quit() tree = etree.HTML(page_source) baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0] yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0] three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0] SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0] sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0] google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0] ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0] ICP_company = tree.xpath('//*[@id="icp_company"]')[0] line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n" line_to_write = line_to_write.replace('\n', '') line_to_write = line_to_write + "\n" Query_results(line_to_write) except Exception as e: err_domains.append(domain)
chromedriver
参数配置 chromedriver
使用过程中会有大量日志产生,优化体验要配置一些参数。
chrome_options = webdriver.ChromeOptions() # 设置ChromeOptions,以便在后台运行 # chrome_options = Options() chrome_options.add_argument('--headless') # 不显示浏览器窗口 chrome_options.add_argument('--no-sandbox') # 在Linux环境下去除沙箱模式 chrome_options.add_argument('--log-level=3') chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
- 工具的实现主要是基于chromedriver,需要本地的chrome版本与chromedriver版本匹配,大家查看下自己的浏览器版本,这里给大家提供ChromeDriver下载链接,放在与这些文件同一目录下即可。
- 使用前检测一下是否可以访问爱站
希望大家给个关注^_^将完整代码给给到大家,其中肯定还有很多可以优化的地方,欢迎各位白帽给出建议我也将代码打包成为.exe
文件,帮大家解决使用环境问题爱站爬取exe版
import argparse import random import re import sys import datetime import time from tld import get_fld from rich import print as rprint import requests from lxml import etree from datetime import datetime import threading import requests from queue import Queue from tqdm import tqdm import concurrent.futures import tldextract from fake_useragent import UserAgent from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException import time import logging chrome_options = webdriver.ChromeOptions() # 设置ChromeOptions,以便在后台运行 # chrome_options = Options() chrome_options.add_argument('--headless') # 不显示浏览器窗口 chrome_options.add_argument('--no-sandbox') # 在Linux环境下去除沙箱模式 chrome_options.add_argument('--log-level=3') chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) #格式化输出 def error(date, body): rprint("[[bold green]" + date + "[/bold green]] [[bold red]Error[/bold red]] > " + body) def info(date, body): rprint("[[bold green]" + date + "[/bold green]] [[bold blue]Info[/bold blue]] > " + body) def prompt(date, body): rprint("[[bold green]" + date + ": " + "[[bold blue]" + body +"[/bold blue]]" ) def file_Read(path): try: datas = [] with open(path, 'r', encoding='utf-8') as file: while True: data = file.readline() if not data: # 如果读取到空字符串,表示文件已经读完 break else: datas.append(data) return datas except Exception as e: error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),'文件读取出错,请检查传入路径及文件类型!') info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),'程序已退出') sys.exit() def data_Processing(datas): domains_one = [] for data in tqdm(datas, desc="数据初始化"): try: domain_name = tldextract.extract(data) domain_name = f"{domain_name.domain}.{domain_name.suffix}" domains_one.append(domain_name) except Exception as e: continue # print(domains_one) ip_pattern = re.compile( r'(?:\d{1,3}\.){3}\d{1,3}' # IPv4 r'|(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}' # 简化的 IPv6(不包含压缩表示) r'|(?:[A-Fa-f0-9]{1,4}:){1,7}:?' # 包含压缩表示的 IPv6(部分匹配) ) def contains_ip_address(s): return bool(ip_pattern.search(s)) domains = [item for item in domains_one if not contains_ip_address(item)] return domains def Weight_query_B(domain): try: target = 'https://www.aizhan.com/cha/' + domain service = Service(r'./chromedriver.exe',service_log_path=logFilename) driver = webdriver.Chrome(service=service, options=chrome_options) driver.get(target, ) WebDriverWait(driver, 10) page_source = driver.page_source wait = WebDriverWait(driver, 10) # 设置超时时间为30秒 driver.quit() tree = etree.HTML(page_source) baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0] yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0] three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0] SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0] sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0] google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0] ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0] ICP_company = tree.xpath('//*[@id="icp_company"]')[0] line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n" line_to_write = line_to_write.replace('\n', '') line_to_write = line_to_write + "\n" Query_results(line_to_write) except Exception as e: err_domains.append(domain) def Weight_query(domain): try: target = 'https://www.aizhan.com/cha/' + domain service = Service(r'./chromedriver.exe', service_log_path=logFilename) driver = webdriver.Chrome(service=service, options=chrome_options) driver.get(target, ) WebDriverWait(driver, 10) page_source = driver.page_source wait = WebDriverWait(driver, 10) # 设置超时时间为30秒 driver.quit() tree = etree.HTML(page_source) baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0] yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0] three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0] SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0] sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0] google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0] ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0] ICP_company = tree.xpath('//*[@id="icp_company"]')[0] line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n" line_to_write = line_to_write.replace('\n', '') line_to_write = line_to_write + "\n" Query_results(line_to_write) except Exception as e: Weight_query_B(domain) def Query_results(result): Weight_data.append(result) def Multi_threading(domains): try: with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # 使用tqdm来显示进度条 with tqdm(total=len(domains), desc="域名权重查询中") as progress_bar: # 提交所有任务到线程池,并收集Future对象 futures = [executor.submit(Weight_query, domain) for domain in domains] # 遍历Future对象,当每个任务完成时更新进度条 for future in concurrent.futures.as_completed(futures): progress_bar.update(1) except Exception as e: pass def Data_writing(Weight_data): try: filename = formatted_now + '.csv' with open(filename, 'a', encoding='utf-8') as file: file.write("domain_name,ICP,ICP_company,Baidu_weight,yidong_weight,360_weight,SM_weight,sogou_weight,google_weight\n") for line in tqdm(Weight_data, desc=f"正在将结果写入文件: {filename}"): file.write(line) info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), f'数据写入完成,请查看文件: {filename}') except Exception as e: error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '结果写入出错了!') def logFile(): try: with open(logFilename, 'a', encoding='utf-8') as file: file.write("日志文件") except Exception as e: sys.exit() if __name__ == '__main__': parser = argparse.ArgumentParser(description="网站权重查询") parser.add_argument('-u','--url',type=str,help='查询单个域名权重!') parser.add_argument('-f', '--file', type=str, help='批量查询域名权重!') args = parser.parse_args() if '-u' in sys.argv: now = datetime.now() formatted_now = now.strftime("%Y%m%d%H%M%S") logFilename = formatted_now + '.log' info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '欢迎使用权重查询系统!当前是3.0版本!') print(r''' _______ ____ ____ __ / ____\ \ / /\ \ / / \/ | | | \ \_/ / \ \ /\ / /| \ / | | | \ / \ \/ \/ / | |\/| | | |____ | | \ /\ / | | | | \_____| |_| \/ \/ |_| |_| ''') domain_name = args.url try: domain_name = tldextract.extract(domain_name) domain_name = f"{domain_name.domain}.{domain_name.suffix}" except Exception as e: error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '域名处理出错,请检查传入的域名!') try: Weight_data = [] Weight_query(domain_name) Weight_data = Weight_data[0].replace('\n', '') Weight_data = Weight_data.split(',') print(Weight_data) info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '查询结果如下:') prompt('域名', Weight_data[0]) prompt('ICP',Weight_data[1]) prompt('企业名称', Weight_data[2]) prompt('百度权重', Weight_data[3]) prompt('移动权重', Weight_data[4]) prompt('360权重', Weight_data[5]) prompt('神马', Weight_data[6]) prompt('搜狗', Weight_data[7]) prompt('谷歌PR', Weight_data[8]) except Exception as e: error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '啊哦!好像遇到点问题··· ···') elif '-f' in sys.argv: print(r''' _______ ____ ____ __ / ____\ \ / /\ \ / / \/ | | | \ \_/ / \ \ /\ / /| \ / | | | \ / \ \/ \/ / | |\/| | | |____ | | \ /\ / | | | | \_____| |_| \/ \/ |_| |_| ''') info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '欢迎使用权重查询系统!当前是3.0版本!') err_domains = [] now = datetime.now() formatted_now = now.strftime("%Y%m%d%H%M%S") logFilename = formatted_now + '.log' # results_queue = Queue() # thread_lock = threading.Lock() Weight_data = [] datas = file_Read(args.file) domains = data_Processing(datas) domains = list(set(domains)) Multi_threading(domains) info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '域名: '+ str(err_domains) + '查询不到结果!') Data_writing(Weight_data) else: print(r''' _______ ____ ____ __ / ____\ \ / /\ \ / / \/ | | | \ \_/ / \ \ /\ / /| \ / | | | \ / \ \/ \/ / | |\/| | | |____ | | \ /\ / | | | | \_____| |_| \/ \/ |_| |_| ''') error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '不是这样用的! --help 看看……')
原文始发于微信公众号(神农Sec):爬虫项目:高效ChromeDriver驱动的爱站信息精准获取与利用工具
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论