原创文章web安全渗透技术
原创声明:转载本文请标注出处和作者,望尊重作者劳动成果!感谢!
前言:打算以python语言为主,0 到 1 写一个信息收集工具开发系列。本文主要讲讲端口工具开发思路,会涉及到“多线程”、“协程”、“socket”相关的知识。
前置知识
socket
在Python中,socket模块提供了一组底层网络通信的API,它是基于 Berkeley sockets API的一个封装,使得Python能够实现网络编程。通过socket模块,Python程序可以使用TCP或UDP等协议进行网络通信。
多线程
在Python中,使用线程可以并发地执行多个任务,提高程序的运行效率。Python提供了一个内置的threading模块,可以方便地创建和管理线程。在 Python 中实现多线程通常有两种方式:继承Thread类和 使用Thread对象。其中,继承Thread类是一种更加面向对象的方式 ,可以让我们自定义线程类,重写其方法,实现更加灵活的线程控制
线程池
在python中,使用线程池相对于使用多线程有以下优点:
-
控制线程数量:使用线程池可以限制并发线程数量,防止线程数量过多导致系统资源耗尽。
-
减少线程创建和销毁的开销:线程池中的线程是事先创建好的,当有任务需要处理时,只需要从池中获取一个线程即可,不需要反复创建和销毁线程,从而减少了线程创建和销毁的开销。
-
简化线程编程:线程池把线程管理的细节封装起来,对于开发者来说,只需要提交任务到线程池,不需要关心线程的创建和销毁等细节,从而简化了线程编程。
-
避免线程饥饿:线程池可以避免因为某个线程长时间占用CPU资源而导致其它线程无法得到执行的情况,从而避免了线程饥饿问题。
综上所述,使用线程池相对于使用多线程,可以更加方便地管理线程,并且减少线程创建和销毁的开销,从而提高了系统的性能和稳定性。
协程
协程(Coroutine)是一种轻量级的并发编程方式,不同于线程和进程,协程在同一线程内执行,使用协程可以避免线程切换的开销,提高程序的执行效率。Python提供了asyncio标准库来支持协程编程。
实现端口扫描
单线程版本
先看一个简单实现单个端口扫描的例子:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import socket
def tcp_scan(host: str, port: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字
sock.settimeout(3) # 设置超时时间
res = sock.connect_ex((host, port)) # 连接主机和端口
sock.close() # 关闭套接字
if res == 0:
print(f"{host}:{port} open")
else:
# 端口关闭
pass
host: str = "127.0.0.1"
port: int = 135
tcp_scan(host, port)
上面的代码中我们定义了一个方法“tcp_scan”,它主要使用了socket
模块来发送来进行端口探测,并设置了响应的超时时间用于防止扫描时间过长。如果我们需要扫描多个端口,可以通过列表遍历目标的形式实现:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import socket
import time
def tcp_scan(host: str, port: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字
sock.settimeout(3) # 设置超时时间
res = sock.connect_ex((host, port)) # 连接主机和端口
sock.close() # 关闭套接字
if res == 0:
print(f"{host}:{port} open")
else:
# 端口关闭
pass
start = time.time()
host: str = "127.0.0.1"
port_list: int = [22,23,135,445,3306]
for port in port_list:
tcp_scan(host, port)
end = time.time()
print(f"Total time:{end - start}")
通过测试,程序运行时间大约6秒,显然我们对这个速度是不满意的,如果我们要扫描一个IP的大量端口就需要用到python的并发编程。
多线程版本
我们先看例子,要如何使用多线程优化端口扫描:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import socket
import threading
import time
class MyThread(threading.Thread):
def __init__(self, host: str, port: int):
threading.Thread.__init__(self)
self.host = host
self.port = port
def run(self):
tcp_scan(host, self.port)
def tcp_scan(host: str, port: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字
sock.settimeout(3) # 设置超时时间
res = sock.connect_ex((host, port)) # 连接主机和端口
sock.close() # 关闭套接字
if res == 0:
print(f"{host}:{port} open")
else:
# 端口关闭
pass
start = time.time()
host: str = "127.0.0.1"
port_list: int = range(1, 65536)
thread_task = [MyThread(host, port) for port in port_list] # 定义一个线程任务池
for t in thread_task:
t.start()
for t in thread_task:
t.join()
end = time.time()
print(f"Total time:{end - start}")
上面的代码主要是通过继承线程类,然后调用多线程执行任务。过测试,多线程端口扫描本地全端口,运行时间大约21秒。
扫描一个外网IP的时间,大概是36秒。
线程池版本
线程池版本比起多线程,前者运行会更稳定,而且支持自定义设置并发数:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import time
import concurrent.futures
import socket
def scan_port(host: str, port: int) -> None:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(3)
result = s.connect_ex((host, port))
if result == 0:
print(f"{host}:{port}")
except:
pass
def main(host: str, port_list: list, thread_count: int) -> None:
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = [executor.submit(scan_port, host, port) for port in port_list]
for future in concurrent.futures.as_completed(futures):
pass
start = time.time()
host: str = "127.0.0.1"
port_list: list = range(1, 65536)
thread_count: int = 5000 # 设置线程中的并发数
main(host, port_list, thread_count)
end = time.time()
print(f"Total time:{end - start}")
通过测试,线程池版本的扫描本地全端口,并发数设置在5000时,运行时间大约28秒左右。
扫描一个外网IP的时间,大概是43秒左右。
协程版本
协程版本的实现比较简单,主要就是asyncio
库的使用。需要注意的是python异步协程的写法,如果你不了解异步,可能需要自行补充相关的知识。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio
async def tcp_scan(host:str, port: int) -> None:
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await writer.wait_closed()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass
async def main(host:str, port_list: list) -> None:
tasks = [tcp_scan(host, port) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务
start = time()
asyncio.run(main('127.0.0.1', range(1, 65535)))
end = time()
print(f"Total time:{end - start}")
通过测试,协程版本的扫描本地全端口,运行时间大约3秒左右。
扫描一个外网IP的时间,大概是6秒左右。
协程并发控制版本
我们知道python的标准库中并没有协程池的用法,不过提供了asyncio.Semaphore来帮助我们控制同一时间协程并发的数量。通过控制协程的并发数量,我们更加精准地扫描开放的端口。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio
async def tcp_scan(host:str, port: int) -> None:
try:
async with sem:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass
async def main(host:str, port_list: list) -> None:
tasks = [tcp_scan(host, port) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务
start = time()
sem = asyncio.Semaphore(5000) # 设置并发数
asyncio.run(main('127.0.0.1', range(1, 65535)))
end = time()
print(f"Total time:{end - start}")
经过测试,设置协程并发数为5000后扫描本地全端口,运行时间大约29秒左右:
扫描一个外网IP的时间,大概是44秒左右。
那么实现端口扫描的方法那么多,我们该怎么去选择呢?从多线程、协程的结果来看,如果抛开结果准确度的问题,显然使用协程的方式扫描速度会更快,因为协程比线程的开销会更小。
实现命令行接口
在python的标准库中,存在一个专门用于处理命令行接口的库: argparse 。使用方法也很简单,可以自行阅读官方文档获取帮助:https://docs.python.org/3/library/argparse.html
下面是添加了命令行接口的版本:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio
import argparse
import rich
async def tcp_scan(host:str, port: int) -> None:
try:
async with sem:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass
async def main(host:str, port_list: list) -> None:
tasks = [tcp_scan(host, port) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务
def get_portlist(ip_range: str) -> list:
"""
用于处理格式: 80,135,445,500-65535
"""
def fuc(data) -> list:
ran = data.split("-")
s = int(ran[0])
e = int(ran[1])
return [n for n in range(s, e+1)]
if "," in ip_range:
tmp: list = ip_range.split(",")
result = []
for i in tmp:
if "-" in i:
result += fuc(i)
else:
result.append(i)
return(result)
elif "-" in ip_range:
return fuc(ip_range)
else:
return [ip_range]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--host', help='127.0.0.1')
parser.add_argument('--port', help='1-65535')
args = parser.parse_args()
args_dict = vars(args) # 将解析出来的Namespace对象转换为一个包含所有参数名称和对应值的字典
start = time()
sem = asyncio.Semaphore(5000) # 设置并发数
host = args_dict['host']
portlist = get_portlist(args_dict['port'])
asyncio.run(main(host, portlist))
end = time()
print(f"Total time:{end - start}")
上面新添加了一个get_portlist方法,用于处理命令输入的端口范围。看看效果:
实现进度条功能
推荐一个强大的终端的UI库:rich,后续系列的文章也会大量用到这个库。安装: pip install rich
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio
import argparse
from rich.progress import Progress
async def tcp_scan(host:str, port: int, progress, task) -> None:
try:
async with sem:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass
finally:
# 进度条渲染
progress.update(task, advance=1)
async def main(host:str, port_list: list, progress, task) -> None:
tasks = [tcp_scan(host, port, progress, task) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务
def get_portlist(ip_range: str) -> list:
"""
用于处理格式: 80,135,445,500-65535
"""
def fuc(data) -> list:
ran = data.split("-")
s = int(ran[0])
e = int(ran[1])
return [n for n in range(s, e+1)]
if "," in ip_range:
tmp: list = ip_range.split(",")
result = []
for i in tmp:
if "-" in i:
result += fuc(i)
else:
result.append(i)
return(result)
elif "-" in ip_range:
return fuc(ip_range)
else:
return [ip_range]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--host', help='127.0.0.1')
parser.add_argument('--port', help='1-65535')
args = parser.parse_args()
args_dict = vars(args) # 将解析出来的Namespace对象转换为一个包含所有参数名称和对应值的字典
start = time()
sem = asyncio.Semaphore(5000) # 设置并发数
host = args_dict['host']
portlist = get_portlist(args_dict['port'])
with Progress() as progress:
task = progress.add_task("[green]Processing...", total=len(portlist)) # 定义一个进度条对象
while not progress.finished:
asyncio.run(main(host, portlist, progress, task)) # 需要把进度条渲染的两个参数带入
end = time()
print(f"Total time:{end - start}")
最后进度条显示的效果如下:
实现批量扫描和逻辑优化
新增了一个可以控制并发数的接口,然后添加了批量扫描和结果导出的功能,最终的成品如下:
通过在同目录下,把需要批量扫描的IP放到targets.txt文件下,调用--file接口即可批量扫描。最后会在同目录下生成结果文件 results.txt。
END
后续会继续更新相关系列文章,感兴趣的可以持续关注一下。
公众号回复“20230321”,获取原格式PDF文章和相关工具或源码。
原文始发于微信公众号(Fighter安全团队):Python工具开发 | 端口扫描
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论