文章首发于:
火线Zone社区(https://zone.huoxian.cn/)
引言
host碰撞,懂的都懂。这里主要讲下脚本实现逻辑。
host碰撞逻辑
ip与host组合,进行尝试,修改请求头中的host。本脚本主要是多线程先进行DNS解析检测,排除外网域名,提高扫描准确度,再进行host碰撞扫描。
脚本实现逻辑
2、实现函数:
domainCheckThreadMain
逻辑:首先会对host_file里的域名进行dns解析检测,如果能直接解析的,这种域名就不属于内网域名。
可以直接排除掉(这里有个困惑点,通过一些工具收集资产时,会发现解析地址为内网地址,所以会检测解析的地址是否为内网地址,如果是的话,那么依然进行碰撞检测),过滤后的域名结果会保存在self.new_host_save_file
3、实现函数:hostScanThreadMain
逻辑:对ip(会拼接http、https组合成url)、host进行组合,存入到队列中,方便多线程取数据,只保存状态码400以下后的(这里也可以直接修改成状态码为200的),因为感觉不够了解,不知道会不会有什么其他特殊的情况,所以这里没有去直接匹配200状态码。
4、最后的结果会保存为xlsx表,因为xlsx表方便去筛选。结果说明,目前发现比如nginx默认页面,可能host怎么改,状态码都是200,所以对于结果的筛选主要是通过响应包长度、title快速排查。
示例代码需要安装的库
pip install requests
pip install openpyxl
pip install dnspython
贴实现代码:
import
os
import
sys
import
time
import
requests
import
threading
import
queue
import
random
import
re
import
dns.
resolver
import
openpyxl
class
HostScan
(object)
:
def __
init__
(self,ip_file,host_file,userAgent_file,thread_num,proxies=None,isDomainCheck=True)
:
'''
:param ip_file:
:param host_file:
:param userAgent_file:
:param thread_num:
:param proxies:
:param isDomainCheck: 是否先进行域名解析检测,如果在外网能解析的对象,则可以直接过滤
'''
self.ip_list
= self.load_ip(ip_file)
self.host_list = self.load_host(host_file)
self.userAgent_list = self.load_userAgent(userAgent_file)
self.domain_queue =
queue
.Queue(maxsize=
0
)
self.thread_num = thread_num
self.proxies = proxies
self.dns_resolver = dns.resolver.Resolver()
self.dns_resolver.nameservers = [
"114.114.114.114"
,
"8.8.8.8"
]
self.search_title_re = re.compile(
"<title>([sS]*?)</title>"
)
self.floder_path = os.path.dirname(os.path.abspath(sys.argv[
0
]))
self.result_save_file = os.path.join(self.floder_path,
'hostScanResult.txt'
)
self.result_list = []
self.result_xlsx_file = os.path.join(self.floder_path,
'hostScanResult.xlsx'
)
self.new_host_save_file = os.path.join(self.floder_path,
'new_host.txt'
)
self.isDomainCheck = isDomainCheck
def load_ip(self,file):
data_set =
set
()
with open(file,
"r"
, encoding=
"utf-8"
) as f:
for
row in f:
row = row.strip()
if
row:
row = row.split()[
0
]
data_set.add(row)
return
list
(data_set)
def load_host(self,file):
data_set =
set
()
with open(file,
"r"
, encoding=
"utf-8"
) as f:
for
row in f:
row = row.strip()
if
row:
row = row.split()[
0
]
data_set.add(row)
return
list
(data_set)
def check_inside_ip(self,ip):
'''
判断是否为内网ip
:param ip: 要检测的ip
:
return
: True 则为内网ip,False则为外网ip
'''
inside_ip_segment = [
"192.168.0.0/16"
,
"172.16.0.0/12"
,
"10.0.0.0/8"
]
for
ip_segment in inside_ip_segment:
if
self.check_ip_by_network_segment(ip, ip_segment):
return
True
if
ip in [
"127.0.0.1"
,
"0.0.0.0"
]:
return
True
return
False
def check_ip_by_network_segment(self,ip, ip_segment):
'''
判断ip与ip_segment是否为同网段
判断思路,将ip的
4
个部分,每个部分都变成
2
进制形式,如果不满
8
位,则在后面补
0
将ip网段同理分成
2
进制
然后比较前掩码个数,比如掩码为
25
,那么就比较前
25
位是否相同,如果相同,则说明同网段
:param ip:
172.16
.1
.1
:param ip_segment:
172.16
.0
.0
/
24
:
return
: 同网段,返回True,不同段网返回False
'''
ip_list = ip.split(
"."
)
ip_two =
""
for
ip_num in ip_list:
ip_2 = bin(
int
(ip_num)).replace(
"0b"
,
""
)
ip_2 =
"0"
* (
8
- len(ip_2)) + ip_2
ip_two += ip_2
ip_segment_net, ip_segment_ym = ip_segment.split(
"/"
)
ip_segment_ym =
int
(ip_segment_ym)
ip_segment_net_list = ip_segment_net.split(
"."
)
ip_segment_net_2 =
""
for
ip_num in ip_segment_net_list:
ip_2 = bin(
int
(ip_num)).replace(
"0b"
,
""
)
ip_2 =
"0"
* (
8
- len(ip_2)) + ip_2
ip_segment_net_2 += ip_2
if
ip_segment_net_2[:ip_segment_ym] == ip_two[:ip_segment_ym]:
return
True
else
:
return
False
def load_userAgent(self,file):
userAgent_set =
set
()
with open(file,
"r"
, encoding=
"utf-8"
) as f:
for
row in f:
row = row.strip()
if
row:
userAgent_set.add(row)
return
list
(userAgent_set)
def create_ip_host_queue(self,ip_list,host_list):
'''
生成ip与
queue
的组合
:
return
:
'''
q =
queue
.Queue(maxsize=
0
)
for
ip in ip_list:
for
host in host_list:
q.put((
"http://"
+ip,host),timeout=
1
)
q.put((
"https://"
+ ip, host), timeout=
1
)
return
q
def result_save(self,msg):
with open(self.result_save_file,
"a+"
,encoding=
"utf-8"
) as f:
f.write(msg+
"n"
)
def checkDomainStatus(self,domain):
'''
检测域名是否可在外网解析,需要排除解析成内网域名的情况
可解析 返回True 不可解析返回False
解析的IP只要有一个IP是内网域名,就认为不可解析 返回False
:param domain:
:
return
:
'''
try
:
result = self.dns_resolver.query(domain,
"A"
)
for
i in result.response.answer:
if
i.rdtype ==
1
:
for
j in i.items:
if
self.check_inside_ip(j.to_text()):
# 内网域名
return
False
return
True
except Exception as e:
return
False
def hostScanRun(self):
'''
实际扫描函数
:
return
:
'''
while
not
self.data_queue.empty():
try
:
url,host = self.data_queue.get(timeout=
1
)
except Exception as e:
return
None
headers = {
"user-agent"
: random.choice(self.userAgent_list),
"host"
: host,
}
try
:
resp = requests.get(url, headers=headers, proxies=self.proxies, verify=False, allow_redirects=False,
timeout=
5
)
except Exception as e:
continue
content_length = len(resp.content)
result = self.search_title_re.search(resp.text)
if
result:
title = result.groups()[
0
]
else
:
title =
""
msg = f
'u
rl:{url}thost:{host}tstatus_code:{resp.status_code}tlength:{content_length}ttitle:{title}'
if
resp.status_code <
400
:
print(msg)
self.result_save(msg)
self.result_list.append([url,host,resp.status_code,content_length,title])
def domainCheckRun(self):
while
not
self.domain_queue.empty():
try
:
domain= self.domain_queue.get(timeout=
1
)
except Exception as e:
break
if
not
self.checkDomainStatus(domain):
# 不可解析的加入到检测列表中
self.host_list.append(domain)
def domainCheckThreadMain(self):
print(f
"域名外网解析检测,检测前共有域名数:{len(self.host_list)}"
)
for
host in self.host_list:
self.domain_queue.put(host,timeout=
0.5
)
self.host_list = [] # 清空
td_list = []
td = threading.Thread(target=self.showProgress,args=(
"域名解析检测进度"
,self.domain_queue,self.domain_queue.qsize()) )
td_list.append(td)
for
i in range(self.thread_num):
td = threading.Thread(target=self.domainCheckRun, )
td_list.append(td)
for
td in td_list:
td.start()
for
td in td_list:
td.join()
print(f
"域名外网解析检测结束,发现不可解析域名数:{len(self.host_list)}"
)
def showProgress(self,title,queue_object,count_init,time_show=
3
):
'''
进度条展示
:
return
:
'''
while
not
queue_object.empty():
print(f
"r{title}:{count_init-queue_object.qsize()}/{count_init}"
,end=
""
)
time.sleep(time_show)
print(
""
)
def hostScanThreadMain(self):
'''
异步扫描,主函数
:
return
:
'''
td_list = []
td = threading.Thread(target=self.showProgress, args=(
"ip、host检测进度"
, self.data_queue, self.data_queue.qsize()))
td_list.append(td)
for
i in range(self.thread_num):
td = threading.Thread(target=self.hostScanRun,)
td_list.append(td)
for
td in td_list:
td.start()
for
td in td_list:
td.join()
# 结果转xlsx
self.result_list = sorted(self.result_list,key=lambda x:x[
3
],reverse=True)
wb = openpyxl.Workbook()
ws = wb.active
ws.title =
"host碰撞结果"
ws.append([
"url"
,
"host"
,
"status_code"
,
"content_length"
,
"title"
])
for
row in self.result_list:
ws.append(row)
wb.save(self.result_xlsx_file)
def runMain(self):
if
self.isDomainCheck:
self.domainCheckThreadMain()
new_host_list = [ host+
"n"
for
host in self.host_list]
with open(self.new_host_save_file,
"w"
,encoding=
"utf-8"
) as f:
f.writelines(new_host_list)
self.data_queue = self.create_ip_host_queue(self.ip_list, self.host_list)
self.hostScanThreadMain()
if
__name__ ==
'__main__'
:
ip_file =
"ip.txt"
host_file =
'host.txt'
userAgent_file =
"userAgent.txt"
thread_num =
10
proxies = {
"http"
:
"http://127.0.0.1:8080"
,
"https"
:
"http://127.0.0.1:8080"
,
}
h = HostScan(ip_file,host_file,userAgent_file,thread_num,proxies=None,isDomainCheck=True)
h.runMain()
【火线—白帽技术交流群】
进群可以与技术大佬互相交流
进群有机会免费领取节假日礼品
进群可以免费观看技术分享直播
识别二维码回复【交流群】进群
【火线zone社区周激励】
2022.1.4 ~ 2022.1.8公告
【相关精选文章】
微信号
huoxian_zone
点击阅读原文,加入社区,共建一个有技术氛围的优质社区!
原文始发于微信公众号(火线Zone):host碰撞python3脚本实现
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论