手写了个 BOSS 来了的摸鱼神器!

admin 2022年8月28日09:30:40评论16 views字数 21665阅读72分13秒阅读模式

作者:小小明
原文链接:
https://blog.csdn.net/as604049322/article/details/120110098
本文为读者投稿

大家好,我是小小明。

前段时间,我写了篇水文《获取当前局域网下所有连接设备的ip地址和mac地址》链接:https://blog.csdn.net/as604049322/article/details/118442299,但是没有想到的是居然上了热榜,也是我个人第一篇上热榜的文章,阅读量瞬间飙升💥。然而我的硬核技术文却几乎没有人看到。既然有很多人对这个话题感兴趣,那么我们就继续对相关原理深挖,最好能自己实现,理解透彻。

首先我们回顾一下前文,在前文中我介绍了windows下获取ip地址和arp映射表的命令,通过分析最新arp映射表知道当前网段下在线或下线的设备⭐。

文章使用的技术是通过python调用系统ping命令,实现arp表的更新。然而系统自带的ping命令访问整个网段的ip时,耗时达到了2分钟,后面通过多线程加速,最终也只能提速到最快25秒。这个速度实在延时过大,无法应用于更高级的应用😇。

今天我们的目标是就是将Ping整个网段IP的总耗时降低到5秒以内,这样我们就能够在5秒内知道指定mac地址设备的上下线,例如开发一个BOSS来了的摸鱼神器,只要老板的手机一连上wifi,这边在5秒内收到通知,立马停止摸鱼,就保证了平时放心大胆的摸鱼⚡。

手写了个 BOSS 来了的摸鱼神器!

那么如何提速呢?经过我几天的苦思冥想,并在学习了一些网络知识后,自己实现了PING命令,成功的实现了放心大胆的摸鱼。于是,在我看了几本书,写了几千行代码,踩了几百个坑后,终于把相关知识理解透了。下面是我将涉及到的核心知识点总结成了这篇文章,所以这篇文章都是非常精简的干货,强烈❤️建议收藏❤️。

学完本文,你的力量将不仅仅止于此,还能够底层化开发任何基于IP协议的自定义协议,当然这要看你自己是否具有举一反三的能力。甚至你还能继续自己深挖,去研究开发比IP协议更底层的协议。

手写了个 BOSS 来了的摸鱼神器!

渴望吗?渴望那就学起来吧⁉️下面是本文的知识点目录:

手写了个 BOSS 来了的摸鱼神器!

01

socket 套接字核心知识

socket 简介

进程间通信指运行的程序之间的数据共享,在1台电脑上可以通过进程号(PID)来唯一标识一个进程进行通信。

在网络中,TCP/IP协议族网络层的“ip地址”可以唯一标识网络中的主机,而传输层的“协议+端口”可以唯一标识主机中的应用进程(进程)。网络中的进程通信就可以通过ip地址,协议,端口这个标志与其它进程进行交互。

socket(简称 套接字) 就是实现网络进程间通信的一种方式,网络上各种各样的服务大多都是基于 Socket 来完成通信的。为了建立通信通道,网络通信的每个端点拥有一个socket套接字对象,它们允许程序接受并进行连接,如发送和接受数据。

socket 链接

在 Python 中 使用socket 模块的函数 socket 就可以完成:

import socket
socket.socket(family=-1, type=-1, proto=-1, fileno=None)

参数说明:

family为指定的地址族,主要有三种:

  • socket.AF_UNIX :用于同一台机器进程间通信

  • socket.AF_INET :基于ipv4协议的Internet 进程间通信

  • socket.AF_INET6 :基于ipv6协议的Internet 进程间通信

更多的地址族还包括,socket.AF_BLUETOOTH蓝牙相关、socket.AF_VSOCK虚拟机通信、socket.AF_PACKET直连网络设备底层接口等。

type为指定的套接字类型,主要有三种:

  • socket.SOCK_STREAM :流式套接字,使用面向连接的TCP协议实现字节流的传输

  • socket.SOCK_DGRAM :数据报套接字,使用面向非连接的UDP实现数据报套接字

  • socket.SOCK_RAW:原始套接字,该套接字允许对较低层协议(如 IP或 ICMP)进行直接访问

更多套接字类型还包括socket.SOCK_RDMsocket.SOCK_SEQPACKET等。

TCP 与 UDP 通信模型

对于tcp或udp套接字可以直接使用以下方式进行创建:

import socket

# 创建tcp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建udp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 不用的时候,关闭套接字
s.close()

UDP通信模型:在通信开始之前,不需要建立相关的链接,只需要发送数据即可,类似于写信

手写了个 BOSS 来了的摸鱼神器!

UDP服务端示例代码:

from socket import *
# 创建套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 绑定本地的相关信息,不绑定系统会随机分配
udp_socket.bind(('0.0.0.0'8080))
# 等待接收对方发送的数据
recv_data = udp_socket.recvfrom(1024#  1024表示本次接收的最大字节数
# 显示接收到的数据,第1个元素是对方发送的数据,第2个元素是对方的ip和端口
print(recv_data[0].decode('u8'))
# 关闭套接字
udp_socket.close()

UDP客户端示例代码:

from socket import *
# 创建udp套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 发送数据到指定的电脑上的指定程序中
udp_socket.sendto("你好,服务器~".encode('u8'), ('192.168.1.103'8080))
# 关闭套接字
udp_socket.close()

TCP通信模型:在通信开始之前,一定要先建立相关的链接,才能发送数据,类似于打电话

手写了个 BOSS 来了的摸鱼神器!

TCP服务端示例代码:

from socket import *

# 创建socket
tcp_server_socket = socket(AF_INET, SOCK_STREAM)
# 服务器绑定本机ip和端口
tcp_server_socket.bind(('0.0.0.0'8080))
# 监听端口,128表示最大同时接收128个客户端链接
tcp_server_socket.listen(128)
# 如果有新的客户端来链接服务器,那么就产生一个新的套接字专门为这个客户端服务
client_socket, clientAddr = tcp_server_socket.accept()
# 接收对方发送过来的数据
recv_data = client_socket.recv(1024)  # 接收1024个字节
print('接收到的数据为:', recv_data.decode('u8'))
# 发送一些数据到客户端
client_socket.send("你好客户端!".encode('u8'))
# 关闭为这个客户端服务的套接字
client_socket.close()

TCP客户端示例代码:

from socket import *

# 创建socket
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
# 链接服务器
tcp_client_socket.connect(('192.168.3.31'8080))
tcp_client_socket.send("测试发送的内容".encode("u8"))
# 接收对方发送过来的数据,最大接收1024个字节
recvData = tcp_client_socket.recv(1024)
print('接收到的数据为:', recvData.decode('u8'))
# 关闭套接字
tcp_client_socket.close()

SOCK_RAW 原始套接字

上述两种套接字是常规的套接字模式,第三个参数省略或为零(IP协议)会自动选择正确的协议(TCP协议和UDP协议)。

当我们指定套接字类型为socket.SOCK_RAW原始套接字时,第三个参数就需要指定proto协议号。

python的socket库预定义的协议号有:

  • socket.IPPROTO_TCP:TCP传输协议,值为6

  • socket.IPPROTO_UDP:UDP传输协议,值为17

  • socket.IPPROTO_ICMP:ICMP协议,值为1

  • socket.IPPROTO_IP:IP协议,值为0

  • socket.IPPROTO_RAW:可自行构建IP头部构建更底层的协议,值为1

也可以通过协议名称获取协议号常量:

import socket

print(socket.IPPROTO_ICMP, socket.getprotobyname("icmp"),
      socket.IPPROTO_ICMP == socket.getprotobyname("icmp"))
1 1 True

可以看到两种方式获取协议号均可。

通过原始套接字我们可以使用ICMP或更底层的协议进行通讯从而实现更高级的功能。

我们需要使用ICMP协议进行网络通信就可以使用SOCK_RAW原始套接字:

icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

socket 模块和对象的其他常用方法

socket模块的其他常用方法:

socket.gethostbyname:将主机名转换为IPv4地址格式。IPv4地址以字符串形式返回

socket.gethostname:返回包含Python解释器当前正在执行的机器的主机名的字符串

socket.gethostbyaddr:根据IP地址获取主机名

socket.getprotobyname:将Internet协议名称转换为协议号常量

在主机字节顺序与网络字节顺序不相同的机器上,使用以下方法转换:

网络顺序转换为主机字节顺序 主机顺序转换为网络字节顺序
32位正整数
4字节的交换操作
socket.ntohl socket.htonl
16位正整数
2字节的交换操作
socket.ntohs socket.htons

在主机字节顺序与网络字节顺序相同的机器上,执行以上方法是无操作的。

socket.inet_aton:将字符串格式的IPv4地址打包为32位4字节的字节对象

获取本机ip地址方法1:先获取本机主机名,再通过主机名获取ip

import socket

ip = socket.gethostbyname(socket.gethostname())
print(ip)
192.168.3.31

获取本机所有网卡的IP:

ips = socket.gethostbyname_ex(socket.gethostname())[-1]
print(ips)
['192.168.3.31']

⚠️注意:如果本机没有正确设置主机名时可能无法获取本机ip地址。

socket套接字对象的公用函数套接字函数:

  • s.getpeername()  :返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)

  • s.getsockname()  :返回套接字自己的地址。通常是一个元组(ipaddr,port)

  • s.setsockopt(level,optname,value)  :设置给定套接字选项的值。

  • s.getsockopt(level,optname[.buflen])  :返回套接字选项的值。

  • s.settimeout(timeout)  :设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect)

  • s.gettimeout()  :返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None

  • s.fileno()  :返回套接字的文件描述符

  • s.setblocking(flag)  :如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。

  • s.makefile()  :创建一个与该套接字相关联的文件。

获取本机ip地址方法2:向任意网络地址发送一个无状态的UDP请求后,再通过套接字对象获取自己的地址从而获取本机地址

import socket

def get_local_ip():
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.connect(('1.1.1.1'80))
        ip, port = s.getsockname()
        return ip
# 获取本机IP
ip = get_local_ip()
print(ip)
192.168.3.31

✅即使无法连接Internet目标地址无法访问(发出报文会丢失),也可以使用该方法获取本机ip地址。

struct 二进制数据的转换

Python提供了一个struct模块来解决bytes和其他二进制数据类型的转换。

struct的pack函数把任意数据类型变成bytes。

import struct
print(struct.pack('>I'10240099))
b'x00x9c@c'

pack 的第一个参数是处理指令:

  • >:表示字节顺序是 big-endian,也就是网络序

  • I:表示 4 字节无符号整数

  • H:2 字节无符号整数。

后面的参数字节个数要和处理指令一致。

unpack 把 bytes 变成相应的数据类型:

>>> struct.unpack('>IH', b'xf0xf0xf0xf0x80x80')
(404232216032896)

struct模块定义的数据类型可以参考Python官方文档:

https://docs.python.org/zh-cn/3/library/struct.html#format-characters

格式 C 类型 Python 类型 标准大小 注释
x 填充字节
c char 长度为 1 的字节串 1
b signed char 整数 1 (1), (2)
B unsigned char 整数 1 -2
? _Bool bool 1 -1
h short 整数 2 -2
H unsigned short 整数 2 -2
i int 整数 4 -2
I unsigned int 整数 4 -2
l long 整数 4 -2
L unsigned long 整数 4 -2
q long long 整数 8 -2
Q unsigned long long 整数 8 -2
n ssize_t 整数 -3
N size_t 整数 -3
e -6 浮点数 2 -4
f float 浮点数 4 -4
d double 浮点数 8 -4
s char [] 字节串
p char [] 字节串
P void * 整数 -5

02

Ping 的工作原理

ping 基于 ICMP 协议工作的,ICMP 全称是 Internet Control Message Protocol,也就是互联网控制报文协议。ping 发出的ICMP 报文实际上是以侦察网络状态的形式实现了控制,反馈网络状态,从而调整传输策略以此控制整个局面。

ICMP 主要的功能包括:确认 IP 包是否成功送达目标地址、报告发送过程中 IP 包被废弃的原因和改善网络设置等。ICMP 协议主要负责在 IP 通信中通知某个 IP 包未能达到目标地址的原因。

ICMP 报文格式

Ping命令发出的ICMP 报文封装在 IP 包里面的,结构如下:

手写了个 BOSS 来了的摸鱼神器!

上述报文格式中,左边的IP头部分不需要太关心,因为我们使用socket的原始套接字模式会自动帮我们封装IP头部分,右边的ICMP报文才是我们需要关心的部分。

⚠️注意:相比原生的 ICMP,Ping命令发出的ICMP报文多出了标识符和序号两个字段。

对于ICMP报文的类型,有两大类:

  1. 查询报文类型:用于诊断的查询消息

  2. 差错报文类型:通知出错原因的错误消息

不过咱们使用的PING只需要使用查询报文类型中的回送应答和回送请求。

常见的 ICMP 类型包括:

手写了个 BOSS 来了的摸鱼神器!

ICMP 查询报文类型

回送消息:0表示回送应答,8表示回送请求。用于进行通信的主机或路由器之间,判断所发送的数据包是否已经成功到达对端的一种消息。

ping 命令是通过ICMP协议的回送消息实现的:

手写了个 BOSS 来了的摸鱼神器!

发送端主机向接收端主机发送一个回送请求(ICMP Echo Request Message,类型 8),只要正常接收到接收端返回的回送响应(ICMP Echo Reply Message,类型 0),则代表发送端主机到接收端主机可达。

ICMP 差错报文类型

对于差错报文类型,在本次编码中不会用到,无需深究,简单了解一下即可。

ICMP 常见差错报文:

  • 目标不可达消息 —— 类型 为 3

  • 原点抑制消息 —— 类型 4

  • 重定向消息 —— 类型 5

  • 超时消息 —— 类型 11

目标不可达消息(Destination Unreachable Message):

IP 路由器无法将 IP 数据包发送给目标地址时,会给发送端主机返回一个目标不可达的 ICMP 消息,并在这个消息中显示不可达的具体原因,原因记录在 ICMP 包头的代码字段。

由此,根据 ICMP 不可达的具体消息,发送端主机也就可以了解此次发送不可达的具体原因

目标不可达的原因有:

  • 网络不可达代码为 0

  • 主机不可达代码为 1

  • 协议不可达代码为 2

  • 端口不可达代码为 3

  • 需要进行分片但设置了不分片位代码为 4

原点抑制消息(ICMP Source Quench Message):

ICMP 原点抑制消息的目是为了缓和网络拥堵的问题,当路由器向低速线路发送数据时,其发送队列的缓存变为零而无法发送出去时,可以向 IP 包的源地址发送一个 ICMP 原点抑制消息

但是收到这种 ICMP 消息的主机并不见得真的会增大 IP 包的传输间隔,还可能会引起不公平的网络通信,所以一般不被使用。

重定向消息(ICMP Redirect Message):

在路由器持有更好的路由信息时,发现发送端主机使用了不是最优的路径发送数据,那么路由器会返回一个 ICMP 重定向消息给这个主机。这个消息中包含了最合适的路由信息和源数据,发送端下次可以发给另外一个更近的路由器。

超时消息(ICMP Time Exceeded Message):

IP 包中有一个8位的字段叫做 TTLTime To Live,生存周期),它的值随着每经过一次路由器就会减 1,直到减到 0 时该 IP 包会被丢弃。

此时,IP 路由器将会发送一个ICMP超时消息给发送端主机,并通知该包已被丢弃。设置 IP 包生存周期的主要目的是为了在路由控制遇到问题发生循环状况时,避免 IP 包无休止地在网络上被转发。

也可以通过设置一个较小的 TTL 值 控制包的到达范围。

03

socket 原始套接字实现 ping 命令

学了这么多基础的网络知识,我们最终为了什么?就是为了能够自己实现PING命令。相关的网络知识还有很多,但对于我们实现PING命令并没有太大关系,就暂不做深究。

下面我们从实战出现,一步步调试继续深挖PING命令的实现原理。

首先我们创建ICMP协议的原始套接字链接:

import socket

icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

发送回送请求

然后需要向目标发送一个回送请求,结构如下:

手写了个 BOSS 来了的摸鱼神器!

下面开始组织报文数据(对于系列号,我们可以自行决定要发送的值):

import os
import time
import struct

# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum, identifier, serial_num = 8, 0, 0, os.getpid() & 0xFFFF, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                     checksum, identifier, serial_num)
# 打包选项数据,包含当前时间戳,后面用Q补齐到192位
data = struct.pack("d", time.time()).ljust(192, b"Q")

计算校验和的规则这里我已经写成代码,大家可以直接看代码:

def calc_checksum(src_bytes):
    """用于计算ICMP报文的校验和"""
    total = 0
    max_count = len(src_bytes)
    count = 0
    while count < max_count:
        val = src_bytes[count + 1]*256 + src_bytes[count]
        total = total + val
        total = total & 0xffffffff
        count = count + 2

    if max_count < len(src_bytes):
        total = total + ord(src_bytes[len(src_bytes) - 1])
        total = total & 0xffffffff

    total = (total >> 16) + (total & 0xffff)
    total = total + (total >> 16)
    answer = ~total
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return socket.htons(answer)

⚠️注意:最终返回时通过socket.htons方法将数据从主机序转换为网络序。

然后就可以计算出校验和重新打包header:

checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                     checksum, identifier, serial_num)

然后就可以发送了:

# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
target_addr = "192.168.3.31"
icmp_socket.sendto(header + data, (target_addr, 1))

⚠️注意:虽然发送给了1号端口,但其实发送给任意端口都可以。

接收回送响应

回送响应与回送请求结构一致:

手写了个 BOSS 来了的摸鱼神器!

发送完消息后,我们就可以接收回送相应:

# 接收回送请求
recv_packet, addr = icmp_socket.recvfrom(1024)
# 前20字节是ip协议的ip头
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier, serial_num = struct.unpack(
    "bbHHh", icmp_header
)
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])

⚠️注意:我们接收的回送请求中包含了前20自己的IP头。

从选项数据中可解析出了这个包发送的时间(之前发出时写入的时间)。

完善 ping 命令的开发

虽然标准的PING命令是用以上协议规则实现的,但我们并不需要完全按照上述规范,例如标识符可以发送任何16位的值,序号可以从任意数值开始,选项数据192位的空间也可以用来存放任何数据。

我们在接收回送响应时需要检查包的标识符,确定是自己发出的包才接收。

最终封装出如下方法:

import struct
import time
import os
import socket
import select


def calc_checksum(src_bytes):
    """用于计算ICMP报文的校验和"""
    total = 0
    max_count = len(src_bytes)
    count = 0
    while count < max_count:
        val = src_bytes[count + 1]*256 + src_bytes[count]
        total = total + val
        total = total & 0xffffffff
        count = count + 2

    if max_count < len(src_bytes):
        total = total + ord(src_bytes[len(src_bytes) - 1])
        total = total & 0xffffffff

    total = (total >> 16) + (total & 0xffff)
    total = total + (total >> 16)
    answer = ~total
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
              serial_num=0data=None):
    # 校验需要后面再计算,这里先设置为0
    ICMP_ECHO_REQUEST, code, checksum = 800
    # 初步打包ICMP头部
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 打包选项数据
    if data:
        data = data.ljust(192, b"Q")
    else:
        data = struct.pack("d", time.time()).ljust(192, b"Q")
    checksum = calc_checksum(header + data)
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
    icmp_socket.sendto(header + data, (target_addr, 1))


def receive_pong(icmp_socket, identifier=os.getpid() & 0xFFFF, serial_num=0timeout=2):
    icmp_socket.settimeout(timeout)
    time_remaining = timeout
    while True:
        start_time = time.time()
        # 接收回送请求
        recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
        time_received = time.time()
        time_spent = time_received-start_time
        # 前20字节是ip协议的ip头
        icmp_header = recv_packet[20:28]
        data = recv_packet[28:]
        ICMP_Echo_Reply, code, checksum, identifier_reciver, serial_num_reciver = struct.unpack(
            "bbHHh", icmp_header
        )
        if identifier_reciver != identifier or serial_num != serial_num_reciver:
            # 不是当前自己发的包则忽略
            time_remaining -= time_spent
            if time_remaining <= 0:
                raise socket.timeout
            continue
        time_sent, = struct.unpack("d"data[:struct.calcsize("d")])
        return int((time_received - time_sent)*1000), ip

192.168.3.31是我当前本机的局域网IP地址,测试一下:

icmp_socket = socket.socket(
    socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
ip = '192.168.3.31'
sent_ping(icmp_socket, ip)
try:
    delay, ip_received = receive_pong(icmp_socket, timeout=2)
    print(f"延迟:{delay}ms,对方ip:{ip_received}")
except socket.timeout as e:
    print("超时")
延迟:0ms,对方ip:192.168.3.31

然后再批量ping一下指定当前网段的所有IP:

def get_local_ip():
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.connect(('1.1.1.1'80))
        ip, port = s.getsockname()
        return ip


icmp_socket = socket.socket(
    socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

local_ip = get_local_ip()
net_segment = local_ip[:local_ip.rfind(".")]
ips = []
for i in range(1255):
    ip = f"{net_segment}.{i}"
    sent_ping(icmp_socket, ip)
    print("ping", ip, end=" ")
    try:
        delay, ip_received = receive_pong(icmp_socket, timeout=0.1)
        print(f"延迟:{delay}ms,对方ip:{ip_received}")
        ips.append(ip)
    except socket.timeout as e:
        print("超时")
print(ips)
icmp_socket.close()

超时时间0.1秒时,总耗时30秒:

手写了个 BOSS 来了的摸鱼神器!

超时时间设置为0.01秒时,总耗时则为2.59秒。

借助 arp 表获取当前网段在线设备

如何尽量快的获取到当前在线的设备?经过测试发现,被ping后,ping不通的机器,arp表能够自动删除对应的条目,那么思路1就是快速的向全网段发送回送请求不等待回送响应,然后2秒后去查arp表,即可看到最新的在线设备。

实现思路1:

import struct
import time
import os
import re
import socket
import pandas as pd


def calc_checksum(src_bytes):
    """用于计算ICMP报文的校验和"""
    total = 0
    max_count = len(src_bytes)
    count = 0
    while count < max_count:
        val = src_bytes[count + 1]*256 + src_bytes[count]
        total = total + val
        total = total & 0xffffffff
        count = count + 2

    if max_count < len(src_bytes):
        total = total + ord(src_bytes[len(src_bytes) - 1])
        total = total & 0xffffffff

    total = (total >> 16) + (total & 0xffff)
    total = total + (total >> 16)
    answer = ~total
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
              serial_num=0, data=None)
:

    # 校验需要后面再计算,这里先设置为0
    ICMP_ECHO_REQUEST, code, checksum = 800
    # 初步打包ICMP头部
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 打包选项数据
    if data:
        data = data.ljust(192b"Q")
    else:
        data = struct.pack("d", time.time()).ljust(192b"Q")
    checksum = calc_checksum(header + data)
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
    icmp_socket.sendto(header + data, (target_addr, 1))

def get_arp_ip_mac():
    header = None
    with os.popen("arp -a"as res:
        for line in res:
            line = line.strip()
            if not line or line.startswith("接口"):
                continue
            if header is None:
                header = re.split(" {2,}", line.strip())
                break
        df = pd.read_csv(res, sep=" {2,}",
                         names=header, header=0, engine='python')
    return df


def ping_net_segment_all(net_segment):
    with socket.socket(
            socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) as icmp_socket:
        for i in range(1255):
            ip = f"{net_segment}.{i}"
            sent_ping(icmp_socket, ip)


net_segment = "192.168.3"
ping_net_segment_all(net_segment)
# 等待回送响应的到来,预计1秒之内
time.sleep(1)
# 读取最新的arp表
df = get_arp_ip_mac()
df

于是我们获取到了当前网段在线的设备列表:

手写了个 BOSS 来了的摸鱼神器!

双线程获取指定网段的在线设备

不过使用arp表查看有个缺陷,只能查看当前网段的,跨网段的在线设备似乎看不到。经分析我使用的台式机通过有线连接到3网段,而手机通过WiFi连接到2网段,所以必须能够分析2网段设备的在线设备才有意义。

思路2:用两个线程一个线程专门发回送请求,一个线程专门接收回送响应,可以通过回送响应获取IP地址,于是就可以得到指定网段的当前在线的设备的ip。

先完成获取在线设备列表:

from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd


def calc_checksum(src_bytes):
    """用于计算ICMP报文的校验和"""
    total = 0
    max_count = len(src_bytes)
    count = 0
    while count < max_count:
        val = src_bytes[count + 1]*256 + src_bytes[count]
        total = total + val
        total = total & 0xffffffff
        count = count + 2

    if max_count < len(src_bytes):
        total = total + ord(src_bytes[len(src_bytes) - 1])
        total = total & 0xffffffff

    total = (total >> 16) + (total & 0xffff)
    total = total + (total >> 16)
    answer = ~total
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
              serial_num=0, data=None)
:

    # 校验需要后面再计算,这里先设置为0
    ICMP_ECHO_REQUEST, code, checksum = 800
    # 初步打包ICMP头部
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 打包选项数据
    if data:
        data = data.ljust(192b"Q")
    else:
        data = struct.pack("d", time.time()).ljust(192b"Q")
    checksum = calc_checksum(header + data)
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
    icmp_socket.sendto(header + data, (target_addr, 1))


def receive_pong(icmp_socket, net_segment, timeout=2):
    icmp_socket.settimeout(timeout)
    ips = set()
    while True:
        start_time = time.time()
        try:
            recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
            if ip.startswith(net_segment):
                ips.add(ip)
        except socket.timeout as e:
            break
    return ips


def ping_net_segment_all(icmp_socket, net_segment):
    for i in range(1255):
        ip = f"{net_segment}.{i}"
        sent_ping(icmp_socket, ip)


icmp_socket = socket.socket(
    socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:
    p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
    future = p.submit(receive_pong, icmp_socket, "192.168.2"3)
    ips = future.result()

ips

运行结果,目前我的手机ip为192.168.2.122,运行后被顺利检测到:

{'192.168.2.1',
 '192.168.2.122',
 '192.168.2.17',
 '192.168.2.18',
 '192.168.2.19',
 '192.168.2.20',
 '192.168.2.21',
 '192.168.2.22',
 '192.168.2.23',
 '192.168.2.49'}

关闭手机WiFi后,再次运行,顺利看到该IP的下线。

完成 BOSS 来了的摸鱼神器

在已经将更新时间缩短到5秒以内时,咱们就可以PING指定网段,最后完成分析设备上下线的功能,从而达到最终的目的完成BOSS来了的摸鱼神器。

from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd


def calc_checksum(src_bytes):
    """用于计算ICMP报文的校验和"""
    total = 0
    max_count = len(src_bytes)
    count = 0
    while count < max_count:
        val = src_bytes[count + 1]*256 + src_bytes[count]
        total = total + val
        total = total & 0xffffffff
        count = count + 2

    if max_count < len(src_bytes):
        total = total + ord(src_bytes[len(src_bytes) - 1])
        total = total & 0xffffffff

    total = (total >> 16) + (total & 0xffff)
    total = total + (total >> 16)
    answer = ~total
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return socket.htons(answer)


def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
              serial_num=0, data=None)
:

    # 校验需要后面再计算,这里先设置为0
    ICMP_ECHO_REQUEST, code, checksum = 800
    # 初步打包ICMP头部
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 打包选项数据
    if data:
        data = data.ljust(192b"Q")
    else:
        data = struct.pack("d", time.time()).ljust(192b"Q")
    checksum = calc_checksum(header + data)
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
                         checksum, identifier, serial_num)
    # 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
    icmp_socket.sendto(header + data, (target_addr, 1))


def receive_pong(icmp_socket, net_segment, timeout=2):
    icmp_socket.settimeout(timeout)
    ips = set()
    while True:
        start_time = time.time()
        try:
            recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
            if ip.startswith(net_segment):
                ips.add(ip)
        except socket.timeout as e:
            break
    return ips


def ping_net_segment_all(icmp_socket, net_segment):
    for i in range(1255):
        ip = f"{net_segment}.{i}"
        sent_ping(icmp_socket, ip)


last = None
while 1:
    icmp_socket = socket.socket(
        socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    with ThreadPoolExecutor() as p:
        p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
        future = p.submit(receive_pong, icmp_socket, "192.168.2")
        ips = future.result()
    if last is None:
        print("当前在线设备:", ips)
    if last:
        up = ips-last
        if up:
            print("r新上线设备:", up, end=" "*100)
        down = last-ips
        if down:
            print("r刚下线设备:", down, end=" "*100)
    last = ips
    time.sleep(3)

结果示例:

当前在线设备: {'192.168.2.122''192.168.2.18''192.168.2.20''192.168.2.1''192.168.2.23''192.168.2.49''192.168.2.21''192.168.2.17''192.168.2.22''192.168.2.19'}
刚下线设备: {'192.168.2.122'}  

经测试,手工关闭或打开手机WiFi能够顺利看到设备IP的打印信息。这种方法虽然无法获取MAC地址,但是经测试,同一台机器都会被分配同一个IP,在我当前的网络下是满足要求的,只需要知道老板手机连接的IP就行了。或者观察一下,老板走之后,到底哪个IP下线了,专门去监控这个IP。

更安全的做法就是每看到有新的IP上线都额外警惕一点,如果你是win10系统可以使用如下方法实现系统通知:

from win10toast import ToastNotifier

toaster = ToastNotifier()
toaster.show_toast("通知标题""通知内容!", duration=10)

上述三个参数分别是通知标题,通知的内容和通知持续的时间,对于摸鱼这种事持续时间可以调大掉,再手工关闭通知,通过pip install win10toast安装。

04

总结

总算做成了这个摸鱼神器,不过虽然我上面一本正经的讲的津津有味,但不会真有人打算拿这个代码去应用于实际去对付老板吧⁉️不会吧,不会吧⁉️

真打算做摸鱼神器的童鞋,我个人推荐搞个网络摄像头,写个人物图像识别的代码,发现有人进来了都自动提醒,这样才可以更放心的摸鱼。万一老板没连wifi就过来了,这就有点坑。

开发摸鱼神器不是本文本身的目的,学习网络知识自主实现网络协议,从通过实际例子理解网络协议才是本文真正的目的。为了构思本文,我也是苦思冥想了几天几夜了,小小明在这里在线求大家一个3连可以吗?💖

我是小小明,咱们下期再见~别忘了点亮小红心噢~

推荐阅读

你电脑被挖矿了吗?

【收藏】10大常用恶意软件检测分析平台

手写了个 BOSS 来了的摸鱼神器!

手写了个 BOSS 来了的摸鱼神器!

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2022年8月28日09:30:40
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   手写了个 BOSS 来了的摸鱼神器!http://cn-sec.com/archives/580792.html

发表评论

匿名网友 填写信息