Python 原生 ping批量扫描器

  • A+
所属分类:安全开发

Python 原生 ping批量扫描器

△△△点击上方“蓝字”关注我们了解更多精彩




0x00 Preface [前言/简介]


1、基于python原生组件的ping批量工具,不依赖系统ping。

2、默认情况下,一个C段扫描花费3s左右。

3、可修改内置结构,精简所需依赖




0x01 所有代码
#!/usr/bin/python3# -*- coding: utf-8 -*-
import osimport sysimport argparseimport socketimport structimport selectimport timeimport IPyimport argparse
#线程组件from concurrent.futures import ThreadPoolExecutor,as_completedimport time
class MyThreadPool(): def __init__(self, my_func,my_list,thread_num): self.my_func = my_func self.my_list = my_list self.thread_num = thread_num
def start(self): with ThreadPoolExecutor(max_workers=self.thread_num) as executor: all_task = [executor.submit(self.my_func, (test)) for test in self.my_list] for future in as_completed(all_task): pass # data = future.result() # print('the result is {}'.format(data)) #ping组件ICMP_ECHO_REQUEST = 8 # Platform specificDEFAULT_TIMEOUT = 0.1DEFAULT_COUNT = 4
class Pinger(object): """ Pings to a host -- the Pythonic way"""
def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT): self.target_host = target_host self.count = count self.timeout = timeout

def do_checksum(self, source_string): """ Verify the packet integritity """ sum = 0 max_count = (len(source_string)/2)*2 count = 0 while count < max_count:
val = source_string[count + 1]*256 + source_string[count] sum = sum + val sum = sum & 0xffffffff count = count + 2
if max_count<len(source_string): sum = sum + ord(source_string[len(source_string) - 1]) sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff) sum = sum + (sum >> 16) answer = ~sum answer = answer & 0xffff answer = answer >> 8 | (answer << 8 & 0xff00) return answer
def receive_pong(self, sock, ID, timeout): """ Receive ping from the socket. """ time_remaining = timeout while True: start_time = time.time() readable = select.select([sock], [], [], time_remaining) time_spent = (time.time() - start_time) if readable[0] == []: # Timeout return
time_received = time.time() recv_packet, addr = sock.recvfrom(1024) icmp_header = recv_packet[20:28] type, code, checksum, packet_ID, sequence = struct.unpack( "bbHHh", icmp_header ) if packet_ID == ID: bytes_In_double = struct.calcsize("d") time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0] return time_received - time_sent
time_remaining = time_remaining - time_spent if time_remaining <= 0: return

def send_ping(self, sock, ID): """ Send ping to the target host """ target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Create a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) bytes_In_double = struct.calcsize("d") data = (192 - bytes_In_double) * "Q" data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
# Get the checksum on the data and the dummy header. my_checksum = self.do_checksum(header + data) header = struct.pack( "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 ) packet = header + data sock.sendto(packet, (target_addr, 1))

def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error as e: if e.errno == 1: # Not superuser, so operation not permitted e.msg += "ICMP messages can only be sent from root user processes" raise socket.error(e.msg) except Exception as e: print("Exception: %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID) delay = self.receive_pong(sock, my_ID, self.timeout) sock.close() return delay

def ping(self): """ Run the ping process """ for i in range(self.count): print ("Ping to %s..." % self.target_host,) try: delay = self.ping_once() except socket.gaierror as e: print ("Ping failed. (socket error: '%s')" % e[1]) break
if delay == None: print ("Ping failed. (timeout within %ssec.)" % self.timeout) else: delay = delay * 1000 print("Get pong in %0.4fms" % delay)
def ping(host): host = str(host) pinger = Pinger(target_host=host) delay = pinger.ping_once() if delay == None: print("Ping %s Failed, timed out for 2 seconds" % host) else: print("Ping %s = %s ms" % (host, round(delay * 1000, 4))) alive.append(host) # time.sleep(0.5)
def getFileType(file_path): #判断文件类型gbk、utf-8 FileType = "gbk" try: htmlf = open(file_path, 'r', encoding=FileType) htmlf.read() except UnicodeDecodeError: FileType = "utf-8" else: htmlf.close() return FileType def readFile2List(filename): result_list = [] #读取文件到列表 FileType = getFileType(filename) with open(filename,'r',encoding=FileType) as f: for line in f.readlines(): linestr = line.strip() if linestr!='': result_list.append(linestr) return result_list if __name__ == '__main__': parser = argparse.ArgumentParser() parser.description="Native MultiThreading Ping scan, does not dependent System Ping -- by NOVASEC" parser.add_argument("-l", "--ipList", help="Specify the target IP segment , eg:192.168.1.1,192.168.0.1/24" , default=None) parser.add_argument("-f", "--ipFile", help="Specify the target IP file, one IP per line , eg: ipfile.txt" , default=None) parser.add_argument("-t", "--thread", help="Specify Ping thread" , type=int , default=5 ) parser.add_argument("-o", "--output", help="Specified result output file" , default= 'result.txt' ) args = parser.parse_args() ipList = args.ipList ipFile = args.ipFile result= args.output thread= args.thread ########################### alive = [] if ipList != None or ipFile !=None : if ipList != None: ip_list = IPy.IP(ipList,make_net=1) if ipFile != None: ip_list = readFile2List(ipFile) ########################### last_time = time.time() thread_num = thread myThreadPool = MyThreadPool(ping,ip_list,thread_num) #init myThreadPool.start() print('All Scan use time is {}'.format(time.time() - last_time)) #线程time is 3.0822317600250244 ########################### print('Found alive host :', len(alive)) if alive !=[]: print('Write result to file :', result) f_result=open(result , "w+") f_result.writelines('n'.join(alive)) f_result.close() else: print('Not Found alive host ') ########################### else: print('None Input target')    ###########################




0x02 所有参数
python3 Ping-thread.py -husage: Ping-thread.exe [-h] [-l IPLIST] [-f IPFILE] [-t THREAD] [-o OUTPUT]
Native MultiThreading Ping scan, does not dependent System Ping -- by NOVASEC
optional arguments: -h, --help show this help message and exit -l IPLIST, --ipList IPLIST Specify the target IP segment , eg:192.168.1.1,192.168.0.1/24 -f IPFILE, --ipFile IPFILE Specify the target IP file, one IP per line , eg: ipfile.txt -t THREAD, --thread THREAD Specify Ping thread -o OUTPUT, --output OUTPUT Specified result output file




0x04 常用命令
# 扫描192.168.2.0/24python3 Ping-thread.py -l 192.168.2.0/24
# 扫描 ip.txt内所有IPpython3 Ping-thread.py -f ip.txt 
# 使用5个线程扫描192.168.2.0/24 python3 Ping-thread.py -l 192.168.2.0/24 -t 5
# 使用10个线程扫描ip.txt内所有IP并输出到result.txtpython3 Ping-thread.py -f ip.txt -t 10 -o result.txt




0x09 Summary 总结

NOVASEC公众号后台回复【共享】,获取ping-thread源码及Win可执行文件

参考:https://mp.weixin.qq.com/s/Pi5eBkKc5fdMcadk1hRrgQ

END



如您有任何投稿、问题、建议、需求、合作、后台留言NOVASEC公众号!

Python 原生 ping批量扫描器

或添加NOVASEC-MOYU 以便于及时回复。

Python 原生 ping批量扫描器


感谢大哥们的对NOVASEC的支持点赞和关注

加入我们与萌新一起成长吧!


本团队任何技术及文件仅用于学习分享,请勿用于任何违法活动,感谢大家的支持!



本文始发于微信公众号(NOVASEC):Python 原生 ping批量扫描器

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: