Python工具开发 | 端口扫描

admin 2024年11月24日12:15:44评论12 views字数 7976阅读26分35秒阅读模式

web安全

原创声明:转载本文请标注出处和作者,望尊重作者劳动成果!感谢!

前言:打算以python语言为主,0 到 1 写一个信息收集工具开发系列。本文主要讲讲端口工具开发思路,会涉及到“多线程”、“协程”、“socket”相关的知识。

前置知识

socket

在Python中,socket模块提供了一组底层网络通信的API,它是基于 Berkeley sockets API的一个封装,使得Python能够实现网络编程。通过socket模块,Python程序可以使用TCP或UDP等协议进行网络通信。

多线程

在Python中,使用线程可以并发地执行多个任务,提高程序的运行效率。Python提供了一个内置的threading模块,可以方便地创建和管理线程。在 Python 中实现多线程通常有两种方式:继承Thread类和 使用Thread对象。其中,继承Thread类是一种更加面向对象的方式 ,可以让我们自定义线程类,重写其方法,实现更加灵活的线程控制

线程池

在python中,使用线程池相对于使用多线程有以下优点:

  1. 控制线程数量:使用线程池可以限制并发线程数量,防止线程数量过多导致系统资源耗尽。

  2. 减少线程创建和销毁的开销:线程池中的线程是事先创建好的,当有任务需要处理时,只需要从池中获取一个线程即可,不需要反复创建和销毁线程,从而减少了线程创建和销毁的开销。

  3. 简化线程编程:线程池把线程管理的细节封装起来,对于开发者来说,只需要提交任务到线程池,不需要关心线程的创建和销毁等细节,从而简化了线程编程。

  4. 避免线程饥饿:线程池可以避免因为某个线程长时间占用CPU资源而导致其它线程无法得到执行的情况,从而避免了线程饥饿问题。

综上所述,使用线程池相对于使用多线程,可以更加方便地管理线程,并且减少线程创建和销毁的开销,从而提高了系统的性能和稳定性。

协程

协程(Coroutine)是一种轻量级的并发编程方式,不同于线程和进程,协程在同一线程内执行,使用协程可以避免线程切换的开销,提高程序的执行效率。Python提供了asyncio标准库来支持协程编程。

实现端口扫描

单线程版本

先看一个简单实现单个端口扫描的例子:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import socket


def tcp_scan(host: str, port: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字
sock.settimeout(3) # 设置超时时间
res = sock.connect_ex((host, port)) # 连接主机和端口
sock.close() # 关闭套接字
if res == 0:
print(f"{host}:{port} open")
else:
# 端口关闭
pass


host: str = "127.0.0.1"
port: int = 135
tcp_scan(host, port)

上面的代码中我们定义了一个方法“tcp_scan”,它主要使用了socket模块来发送来进行端口探测,并设置了响应的超时时间用于防止扫描时间过长。如果我们需要扫描多个端口,可以通过列表遍历目标的形式实现:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import socket
import time


def tcp_scan(host: str, port: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字
sock.settimeout(3) # 设置超时时间
res = sock.connect_ex((host, port)) # 连接主机和端口
sock.close() # 关闭套接字
if res == 0:
print(f"{host}:{port} open")
else:
# 端口关闭
pass

start = time.time()
host: str = "127.0.0.1"
port_list: int = [22,23,135,445,3306]
for port in port_list:
tcp_scan(host, port)
end = time.time()
print(f"Total time:{end - start}")

通过测试,程序运行时间大约6秒,显然我们对这个速度是不满意的,如果我们要扫描一个IP的大量端口就需要用到python的并发编程。

Python工具开发 | 端口扫描

多线程版本

我们先看例子,要如何使用多线程优化端口扫描:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import socket
import threading
import time


class MyThread(threading.Thread):
def __init__(self, host: str, port: int):
threading.Thread.__init__(self)
self.host = host
self.port = port

def run(self):
tcp_scan(host, self.port)


def tcp_scan(host: str, port: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字
sock.settimeout(3) # 设置超时时间
res = sock.connect_ex((host, port)) # 连接主机和端口
sock.close() # 关闭套接字
if res == 0:
print(f"{host}:{port} open")
else:
# 端口关闭
pass


start = time.time()
host: str = "127.0.0.1"
port_list: int = range(1, 65536)
thread_task = [MyThread(host, port) for port in port_list] # 定义一个线程任务池
for t in thread_task:
t.start()
for t in thread_task:
t.join()
end = time.time()
print(f"Total time:{end - start}")

上面的代码主要是通过继承线程类,然后调用多线程执行任务。过测试,多线程端口扫描本地全端口,运行时间大约21秒。

Python工具开发 | 端口扫描

扫描一个外网IP的时间,大概是36秒。

Python工具开发 | 端口扫描

线程池版本

线程池版本比起多线程,前者运行会更稳定,而且支持自定义设置并发数:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import time
import concurrent.futures
import socket


def scan_port(host: str, port: int) -> None:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(3)
result = s.connect_ex((host, port))
if result == 0:
print(f"{host}:{port}")
except:
pass


def main(host: str, port_list: list, thread_count: int) -> None:
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = [executor.submit(scan_port, host, port) for port in port_list]
for future in concurrent.futures.as_completed(futures):
pass


start = time.time()
host: str = "127.0.0.1"
port_list: list = range(1, 65536)
thread_count: int = 5000 # 设置线程中的并发数
main(host, port_list, thread_count)
end = time.time()
print(f"Total time:{end - start}")

通过测试,线程池版本的扫描本地全端口,并发数设置在5000时,运行时间大约28秒左右。

Python工具开发 | 端口扫描

扫描一个外网IP的时间,大概是43秒左右。

Python工具开发 | 端口扫描

协程版本

协程版本的实现比较简单,主要就是asyncio库的使用。需要注意的是python异步协程的写法,如果你不了解异步,可能需要自行补充相关的知识。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio


async def tcp_scan(host:str, port: int) -> None:
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await writer.wait_closed()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass

async def main(host:str, port_list: list) -> None:
tasks = [tcp_scan(host, port) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务


start = time()
asyncio.run(main('127.0.0.1', range(1, 65535)))
end = time()
print(f"Total time:{end - start}")

通过测试,协程版本的扫描本地全端口,运行时间大约3秒左右。

Python工具开发 | 端口扫描

扫描一个外网IP的时间,大概是6秒左右。

Python工具开发 | 端口扫描

协程并发控制版本

我们知道python的标准库中并没有协程池的用法,不过提供了asyncio.Semaphore来帮助我们控制同一时间协程并发的数量。通过控制协程的并发数量,我们更加精准地扫描开放的端口。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio


async def tcp_scan(host:str, port: int) -> None:
try:
async with sem:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass

async def main(host:str, port_list: list) -> None:
tasks = [tcp_scan(host, port) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务


start = time()
sem = asyncio.Semaphore(5000) # 设置并发数
asyncio.run(main('127.0.0.1', range(1, 65535)))
end = time()
print(f"Total time:{end - start}")

经过测试,设置协程并发数为5000后扫描本地全端口,运行时间大约29秒左右:

Python工具开发 | 端口扫描

扫描一个外网IP的时间,大概是44秒左右。

Python工具开发 | 端口扫描

那么实现端口扫描的方法那么多,我们该怎么去选择呢?从多线程、协程的结果来看,如果抛开结果准确度的问题,显然使用协程的方式扫描速度会更快,因为协程比线程的开销会更小。

实现命令行接口

在python的标准库中,存在一个专门用于处理命令行接口的库: argparse 。使用方法也很简单,可以自行阅读官方文档获取帮助:https://docs.python.org/3/library/argparse.html

下面是添加了命令行接口的版本:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio
import argparse
import rich


async def tcp_scan(host:str, port: int) -> None:
try:
async with sem:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass


async def main(host:str, port_list: list) -> None:
tasks = [tcp_scan(host, port) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务


def get_portlist(ip_range: str) -> list:
"""
用于处理格式: 80,135,445,500-65535
"""

def fuc(data) -> list:
ran = data.split("-")
s = int(ran[0])
e = int(ran[1])
return [n for n in range(s, e+1)]

if "," in ip_range:
tmp: list = ip_range.split(",")
result = []
for i in tmp:
if "-" in i:
result += fuc(i)
else:
result.append(i)
return(result)
elif "-" in ip_range:
return fuc(ip_range)
else:
return [ip_range]

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--host', help='127.0.0.1')
parser.add_argument('--port', help='1-65535')
args = parser.parse_args()
args_dict = vars(args) # 将解析出来的Namespace对象转换为一个包含所有参数名称和对应值的字典
start = time()
sem = asyncio.Semaphore(5000) # 设置并发数
host = args_dict['host']
portlist = get_portlist(args_dict['port'])
asyncio.run(main(host, portlist))
end = time()
print(f"Total time:{end - start}")

上面新添加了一个get_portlist方法,用于处理命令输入的端口范围。看看效果:

Python工具开发 | 端口扫描

实现进度条功能

推荐一个强大的终端的UI库:rich,后续系列的文章也会大量用到这个库。安装: pip install rich

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from time import time
import asyncio
import argparse
from rich.progress import Progress


async def tcp_scan(host:str, port: int, progress, task) -> None:
try:
async with sem:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) # 创建TCP套接字
print(f"{host}:{port}")
writer.close()
await asyncio.sleep(0.1)
except:
# 端口不可用
pass
finally:
# 进度条渲染
progress.update(task, advance=1)


async def main(host:str, port_list: list, progress, task) -> None:
tasks = [tcp_scan(host, port, progress, task) for port in port_list]
await asyncio.gather(*tasks) # 开启并发任务


def get_portlist(ip_range: str) -> list:
"""
用于处理格式: 80,135,445,500-65535
"""

def fuc(data) -> list:
ran = data.split("-")
s = int(ran[0])
e = int(ran[1])
return [n for n in range(s, e+1)]

if "," in ip_range:
tmp: list = ip_range.split(",")
result = []
for i in tmp:
if "-" in i:
result += fuc(i)
else:
result.append(i)
return(result)
elif "-" in ip_range:
return fuc(ip_range)
else:
return [ip_range]


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--host', help='127.0.0.1')
parser.add_argument('--port', help='1-65535')
args = parser.parse_args()
args_dict = vars(args) # 将解析出来的Namespace对象转换为一个包含所有参数名称和对应值的字典
start = time()
sem = asyncio.Semaphore(5000) # 设置并发数
host = args_dict['host']
portlist = get_portlist(args_dict['port'])
with Progress() as progress:
task = progress.add_task("[green]Processing...", total=len(portlist)) # 定义一个进度条对象
while not progress.finished:
asyncio.run(main(host, portlist, progress, task)) # 需要把进度条渲染的两个参数带入
end = time()
print(f"Total time:{end - start}")

最后进度条显示的效果如下:

Python工具开发 | 端口扫描

实现批量扫描和逻辑优化

新增了一个可以控制并发数的接口,然后添加了批量扫描和结果导出的功能,最终的成品如下:

Python工具开发 | 端口扫描通过在同目录下,把需要批量扫描的IP放到targets.txt文件下,调用--file接口即可批量扫描。最后会在同目录下生成结果文件 results.txt

END

后续会继续更新相关系列文章,感兴趣的可以持续关注一下。

公众号回复“20230321”,获取原格式PDF文章和相关工具或源码。

原文始发于微信公众号(Fighter安全团队):Python工具开发 | 端口扫描

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2024年11月24日12:15:44
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Python工具开发 | 端口扫描https://cn-sec.com/archives/1618993.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息