前言
学过计算机网络协议的人应该都深有体会,网络协议知识点多而复杂,小编在刚开始学习的时候也是被各种协议搞得晕头转向,各层协议,各种协议报文格式,各个字段代表的含义...,小编目前所接触到的学习网络协议的方法有以下三种:
(1)通过看书和看视频。这是很普遍的方式了,学校教育基本这种,适合考试
(2)除了看书和视频,利用抓包软件(wireshark,科来网络分析系统)抓包,看看数据包到底长啥样,再对照书和视频来理解。有一定实操,适合工作,是比较有效的方式
(3)就是小编最近参加的一个星球活动,直接手写一个抓包软件,要求对计算机网络协议有一定了解且有一定的编程基础。个人觉得这是史上最强的学习网络协议的方式了,适合想对网络协议有更深理解或者想提升编程能力的人。
活动形式是这样的:星主把整个任务分成了三个阶段,每个阶段的任务再合理的细分到每天的量,每天发布学习任务和对应的资料,再布置作业,然后参加者根据给出的资料自己动手、思考完成作业并在星球提交。目前已结束第一阶段。
本系列文章将记录参与活动的整个过程(希望能全程跟下来),作为学习笔记和自己成长历程的记录。由于本人非科班专业出身,故选择易学的python作为编程语言(本人代码水平很烂,大佬勿喷)
附上一张星主此次活动的宣传海报:
大家可以关注下大佬的微信公众号:编程技术宇宙
内容绝对干货,以故事形式给你说清网络协议、操作系统、CPU这些计算机中枯燥晦涩的知识点,关注快两年了,推荐给大家。
第一阶段历时三周(周六周日不另外发布任务,自己用来总结和回顾,完善代码),13天的作业布置,外加最后的收官竞赛(神仙打架):
https://mp.weixin.qq.com/s/cQKq6H983Xy9cuYaVpdDeg
如果有和我一样想对网络协议有更深理解且想提升编程能力的师傅们,欢迎加入进来,人多才好玩~(嘿哈)
今天的代码解析的是ARP协议和IPV4网络下的UDP协议,相对来讲难度没那么大。
day8-9—解析ARP协议报文
代码:
import mmap,binascii,time,os
def big_small_convert(data): #小端模式转化为大端模式
return binascii.hexlify(binascii.unhexlify(data)[::-1])
def mac_address_format(start,end): #格式化MAC地址
j = 1
s = ""
for i in range(start,end):
if j % 6 == 0:
s += (m.read(1).hex().upper() + "")
else:
s += (m.read(1).hex().upper() + "-")
j += 1
i += 1
return s
def eth_protocol_identify(start): #识别协议类型
type_code = m[start:start + 2].hex()
if type_code == "0800":
s = "IPV4"
elif type_code == "0806":
s = "ARP"
elif type_code == "0835":
s = "RARP"
elif type_code == "86dd":
s = "IPV6"
else:
s = "unknow"
return s
def ip_convert(start,end): #将十六进制ip转换为点分十进制
j = 1
s = ""
for i in range(start,end):
if j % 4 == 0:
hex_num = m.read(1).hex()
num = int(hex_num,16)
s += (str(num) + "")
else:
hex_num = m.read(1).hex()
num = int(hex_num,16)
s += (str(num) + ".")
j += 1
i += 1
return s
if __name__ == '__main__':
size = os.path.getsize('day8.pcap')
with open('day8.pcap','rb') as f:
with mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ) as m:
type_index = 52
total = 0 #记录总数据包数
arp_number = 0 #记录arp协议数据包数
dict = {} #创建字典,记录IP和MAC地址的映射关系
while True:
total += 1
protocol_type = eth_protocol_identify(type_index)
if (protocol_type != "ARP"): #判断当前数据包有无ARP报文,没有的话读取包大小,跳到下一个数据包
packet_filed_index = type_index - 20
packet_length = m[packet_filed_index:packet_filed_index + 4].hex() #读取包头中Caplen(4B)字段
packet_length = big_small_convert(packet_length) #小端模式转化为大端模式
packet_length = int(packet_length,16) #16进制转十进制
type_index = type_index + packet_length + 16
continue
else:
arp_number += 1
packet_filed_index = type_index - 20 #当前数据包包头中Caplen(4B)字段所在的起始位置
smac_index = type_index + 10 #当前数据包目的MAC、源MAC、目的IP、源IP的起始位置
src_ip_index = smac_index + 6
dmac_index = src_ip_index + 4
dst_ip_index = dmac_index + 6
opcode_index = type_index + 8 #ARP包的操作类型字段的起始位置
packet_length = m[packet_filed_index:packet_filed_index + 4].hex() #读取包头中Caplen(4B)字段
packet_length = big_small_convert(packet_length) #小端模式转化为大端模式
packet_length = int(packet_length,16) #16进制转十进制
m.seek(smac_index) #读取目的MAC、源MAC、目的IP、源IP并按格式输出
smac_address = mac_address_format(smac_index,src_ip_index)
src_ip = ip_convert(src_ip_index,dmac_index)
dmac_address = mac_address_format(dmac_index,dst_ip_index)
dst_ip = ip_convert(dst_ip_index,dst_ip_index + 4)
opcode = m[opcode_index:opcode_index + 2].hex() #读取操作类型字段的值
opcode = int(opcode,16)
if opcode == 1: #值为1,表示为ARP请求
print("> [ARP请求] {}({}) 查询 {} 的MAC地址在哪里".format(
src_ip,smac_address,dst_ip))
if src_ip not in dict:
dict[src_ip] = smac_address
else: #值为0,表示为ARP响应
print("> [ARP响应] {}({}) 回复 {}({}):{} 的MAC地址在我这里".format(
src_ip,smac_address,dst_ip,dmac_address,src_ip))
if src_ip not in dict: #响应包中源、目IP的MAC值都有,故在此处获取映射关系
dict[src_ip] = smac_address
if dst_ip not in dict:
dict[dst_ip] = dmac_address
type_index = type_index + packet_length + 16
if type_index >= size:
break
print("nIP和MAC地址映射表:n> - IP地址tMAC地址")
for key in dict:
print("> - " + key + "t" + dict[key])
f.close()
print("n> - 总计{}个数据包,其中ARP协议数据包{}个".format(str(total),str(arp_number)))
运行结果:
day10—解析UDP协议报文
代码:
import mmap,binascii,time,os
def big_small_convert(data): #小端模式转化为大端模式
return binascii.hexlify(binascii.unhexlify(data)[::-1])
def time_stamp_convert(hexstr): #16进制字符串时间戳转化为年月日时分秒
stamp = int(hexstr,16)
timeArray = time.localtime(stamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
return otherStyleTime
def mac_address_format(start,end): #格式化MAC地址
j = 1
s = ""
for i in range(start,end):
if j % 6 == 0:
s += (m.read(1).hex().upper() + "")
else:
s += (m.read(1).hex().upper() + "-")
j += 1
i += 1
return s
def eth_protocol_identify(start): #识别协议类型
type_code = m[start:start + 2].hex()
if type_code == "0800":
s = "IPV4"
elif type_code == "0806":
return "ARP"
elif type_code == "0835":
s = "RARP"
elif type_code == "86dd":
s = "IPV6"
else:
s = "unknow"
return s
def ip_convert(start,end): #将十六进制ip转换为点分十进制
j = 1
s = ""
for i in range(start,end):
if j % 4 == 0:
hex_num = m.read(1).hex()
num = int(hex_num,16)
s += (str(num) + "")
else:
hex_num = m.read(1).hex()
num = int(hex_num,16)
s += (str(num) + ".")
j += 1
i += 1
return s
if __name__ == '__main__':
size = os.path.getsize('day8.pcap')
with open('day8.pcap','rb') as f:
with mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ) as m:
type_index = 52
total = 0 #记录总数据包数
ipv4_udp_number = 0 #记录IPV4的UDP协议数据包数
while True:
total += 1
protocol_type = eth_protocol_identify(type_index)
ipv4_protocol_index = type_index + 11
ipv4_protocol = m[ipv4_protocol_index:ipv4_protocol_index + 1].hex()
if (protocol_type != "IPV4" or ipv4_protocol != "11"): #判断当前数据包有无IPV4的UDP报文,没有的话读取包大小,跳到下一个数据包
packet_filed_index = type_index - 20
packet_length = m[packet_filed_index:packet_filed_index + 4].hex() #读取包头中Caplen(4B)字段
packet_length = big_small_convert(packet_length) #小端模式转化为大端模式
packet_length = int(packet_length,16) #16进制转十进制
type_index = type_index + packet_length + 16
if type_index < size:
continue
else:
break
else:
ipv4_udp_number += 1
time_filed_index = type_index - 28 #当前数据包包头中Timestamp(4B)字段所在的起始位置
packet_filed_index = time_filed_index + 8 #当前数据包包头中Caplen(4B)字段所在的起始位置
dmac_index = packet_filed_index + 8 #当前数据包包头目的MAC、源MAC的起始位置
smac_index = dmac_index + 6
src_ip_index = smac_index + 20
dst_ip_index = src_ip_index + 4
src_port_index = dst_ip_index + 4
dst_port_index = src_port_index + 2
udp_length_index = dst_port_index + 2
data = m[time_filed_index:time_filed_index + 4].hex() #读取包头中Timestamp(4B)字段
result = big_small_convert(data) #小端模式转化为大端模式
standard_time = time_stamp_convert(result) #16进制字符时间戳转化为年月日时分秒
packet_length = m[packet_filed_index:packet_filed_index + 4].hex() #读取包头中Caplen(4B)字段
packet_length = big_small_convert(packet_length) #小端模式转化为大端模式
packet_length = int(packet_length,16) #16进制转十进制
m.seek(dmac_index)
dmac_address = mac_address_format(dmac_index,smac_index)
smac_address = mac_address_format(smac_index,smac_index + 6)
m.seek(src_ip_index)
src_ip = ip_convert(src_ip_index,dst_ip_index)
dst_ip = ip_convert(dst_ip_index,src_port_index)
src_port = m[src_port_index:dst_port_index].hex()
src_port = int(src_port,16)
dst_port = m[dst_port_index:udp_length_index].hex()
dst_port = int(dst_port,16)
udp_legth = m[udp_length_index:udp_length_index + 2].hex()
udp_legth = int(str(udp_legth),16)
print("> - [{}] {} Bytes {} {} {} {} {} {} {}".format(
standard_time,str(packet_length),dmac_address,smac_address,src_ip,
dst_ip,str(src_port),str(dst_port),str(udp_legth)))
type_index = type_index + packet_length + 16
if type_index >= size:
break
f.close()
print("> - 总计{}个数据包,其中IPV4的UDP协议数据包{}个".format(str(total),str(ipv4_udp_number)))
小插曲:这里有一处代码和之前的不太一样了:
加上红框这几句,让代码变得比之前更健壮了。
原因是:原先这里就一句continue,如果当前这个非UDP包为pcap文件的最后一个包,跳到下一个包的packet_length对应字段就为空,继续执行的话就会报packet_length无效的错:
这个bug让我找了足足一个多小时:
但最终运行成果,瞬间有种喜悦感和成就感:
最后,明天解析的将是本次活动中最后一个也是这次最难解析的协议——DNS协议
如果喜欢小编的文章,记得点赞+关注支持一下哦~
原文始发于微信公众号(沃克学安全):跟着轩辕之风大佬手写一个抓包软件|第一阶段:解析静态网络数据包文件(day8-day10)
- 左青龙
- 微信扫一扫
- 右白虎
- 微信扫一扫
评论