#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import sys
import argparse
import socket
import struct
import select
import time
import IPy
import argparse
#线程组件
from concurrent.futures import ThreadPoolExecutor,as_completed
import time
class MyThreadPool():
def __init__(self, my_func,my_list,thread_num):
self.my_func = my_func
self.my_list = my_list
self.thread_num = thread_num
def start(self):
with ThreadPoolExecutor(max_workers=self.thread_num) as executor:
all_task = [executor.submit(self.my_func, (test)) for test in self.my_list]
for future in as_completed(all_task):
pass
# data = future.result()
# print('the result is {}'.format(data))
#ping组件
ICMP_ECHO_REQUEST = 8 # Platform specific
DEFAULT_TIMEOUT = 0.1
DEFAULT_COUNT = 4
class Pinger(object):
""" Pings to a host -- the Pythonic way"""
def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
self.target_host = target_host
self.count = count
self.timeout = timeout
def do_checksum(self, source_string):
""" Verify the packet integritity """
sum = 0
max_count = (len(source_string)/2)*2
count = 0
while count < max_count:
val = source_string[count + 1]*256 + source_string[count]
sum = sum + val
sum = sum & 0xffffffff
count = count + 2
if max_count<len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_pong(self, sock, ID, timeout):
"""
Receive ping from the socket.
"""
time_remaining = timeout
while True:
start_time = time.time()
readable = select.select([sock], [], [], time_remaining)
time_spent = (time.time() - start_time)
if readable[0] == []: # Timeout
return
time_received = time.time()
recv_packet, addr = sock.recvfrom(1024)
icmp_header = recv_packet[20:28]
type, code, checksum, packet_ID, sequence = struct.unpack(
"bbHHh", icmp_header
)
if packet_ID == ID:
bytes_In_double = struct.calcsize("d")
time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
return time_received - time_sent
time_remaining = time_remaining - time_spent
if time_remaining <= 0:
return
def send_ping(self, sock, ID):
"""
Send ping to the target host
"""
target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Create a dummy heder with a 0 checksum.
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
bytes_In_double = struct.calcsize("d")
data = (192 - bytes_In_double) * "Q"
data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
# Get the checksum on the data and the dummy header.
my_checksum = self.do_checksum(header + data)
header = struct.pack(
"bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
)
packet = header + data
sock.sendto(packet, (target_addr, 1))
def ping_once(self):
"""
Returns the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as e:
if e.errno == 1:
# Not superuser, so operation not permitted
e.msg += "ICMP messages can only be sent from root user processes"
raise socket.error(e.msg)
except Exception as e:
print("Exception: %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID)
delay = self.receive_pong(sock, my_ID, self.timeout)
sock.close()
return delay
def ping(self):
"""
Run the ping process
"""
for i in range(self.count):
print ("Ping to %s..." % self.target_host,)
try:
delay = self.ping_once()
except socket.gaierror as e:
print ("Ping failed. (socket error: '%s')" % e[1])
break
if delay == None:
print ("Ping failed. (timeout within %ssec.)" % self.timeout)
else:
delay = delay * 1000
print("Get pong in %0.4fms" % delay)
def ping(host):
host = str(host)
pinger = Pinger(target_host=host)
delay = pinger.ping_once()
if delay == None:
print("Ping %s Failed, timed out for 2 seconds" % host)
else:
print("Ping %s = %s ms" % (host, round(delay * 1000, 4)))
alive.append(host)
# time.sleep(0.5)
def getFileType(file_path):
#判断文件类型gbk、utf-8
FileType = "gbk"
try:
htmlf = open(file_path, 'r', encoding=FileType)
htmlf.read()
except UnicodeDecodeError:
FileType = "utf-8"
else:
htmlf.close()
return FileType
def readFile2List(filename):
result_list = []
#读取文件到列表
FileType = getFileType(filename)
with open(filename,'r',encoding=FileType) as f:
for line in f.readlines():
linestr = line.strip()
if linestr!='':
result_list.append(linestr)
return result_list
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.description="Native MultiThreading Ping scan, does not dependent System Ping -- by NOVASEC"
parser.add_argument("-l", "--ipList", help="Specify the target IP segment , eg:192.168.1.1,192.168.0.1/24" , default=None)
parser.add_argument("-f", "--ipFile", help="Specify the target IP file, one IP per line , eg: ipfile.txt" , default=None)
parser.add_argument("-t", "--thread", help="Specify Ping thread" , type=int , default=5 )
parser.add_argument("-o", "--output", help="Specified result output file" , default= 'result.txt' )
args = parser.parse_args()
ipList = args.ipList
ipFile = args.ipFile
result= args.output
thread= args.thread
###########################
alive = []
if ipList != None or ipFile !=None :
if ipList != None:
ip_list = IPy.IP(ipList,make_net=1)
if ipFile != None:
ip_list = readFile2List(ipFile)
###########################
last_time = time.time()
thread_num = thread
myThreadPool = MyThreadPool(ping,ip_list,thread_num) #init
myThreadPool.start()
print('All Scan use time is {}'.format(time.time() - last_time)) #线程time is 3.0822317600250244
###########################
print('Found alive host :', len(alive))
if alive !=[]:
print('Write result to file :', result)
f_result=open(result , "w+")
f_result.writelines('n'.join(alive))
f_result.close()
else:
print('Not Found alive host ')
###########################
else:
print('None Input target')
###########################
python3 Ping-thread.py -h
usage: Ping-thread.exe [-h] [-l IPLIST] [-f IPFILE] [-t THREAD] [-o OUTPUT]
Native MultiThreading Ping scan, does not dependent System Ping -- by NOVASEC
optional arguments:
--help show this help message and exit
IPLIST, --ipList IPLIST
Specify the target IP segment ,
eg:192.168.1.1,192.168.0.1/24
IPFILE, --ipFile IPFILE
Specify the target IP file, one IP per line , eg:
ipfile.txt
THREAD, --thread THREAD
Specify Ping thread
OUTPUT, --output OUTPUT
Specified result output file
# 扫描192.168.2.0/24
python3 Ping-thread.py -l 192.168.2.0/24
# 扫描 ip.txt内所有IP
python3 Ping-thread.py -f ip.txt
# 使用5个线程扫描192.168.2.0/24
python3 Ping-thread.py -l 192.168.2.0/24 -t 5
# 使用10个线程扫描ip.txt内所有IP并输出到result.txt
python3 Ping-thread.py -f ip.txt -t 10 -o result.txt
参考:
https://mp.weixin.qq.com/s/Pi5eBkKc5fdMcadk1hRrgQ
END
本文始发于微信公众号(NOVASEC):Python 原生 ping批量扫描器
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论