Python 抓取并解码原始数据包

简介: 应用Python支持的混杂模式,抓取流经网卡的数据包,并对IP以及ICMP数据包进行拆包,打印出我们所需要的字段信息。
+关注继续查看

应用Python支持的混杂模式,抓取流经网卡的数据包,并对IP以及ICMP数据包进行拆包,打印出我们所需要的字段信息。

抓取原始数据包: Python中默认的Socket模块就可以实现对原始数据包的解包操作,如下代码.

需要注意这段代码只能在Windows平台使用,因为我们需要开启网卡的IOCTL混杂模式,这是Win平台特有的.

import socket
import uuid

# 获取本机MAC地址
def GetHostMAC():
    mac=uuid.UUID(int = uuid.getnode()).hex[-12:]
    this_mac = ":".join([mac[e:e+2] for e in range(0,11,2)])
    this_name = socket.getfqdn(socket.gethostname())
    print("本机名: {} --> 本机MAC: {}".format(this_name,this_mac))

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 开始跟踪原始数据包
def SnifferIOSock(address):
    # 创建原始套接字,然后绑定在公开接口上
    socket_protocol = socket.IPPROTO_IP

    # 开启原始数据包模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))

    # 设置在捕获的数据包中包含IP头
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    # Windows平台需要设置IOCTL以启动混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 读取单个数据包
    while True:
        print(sniffer.recvfrom(65565))

    # 退出时,关闭混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    GetHostMAC()
    address = GetHostAddress()
    print("本机IP: {}".format(address))
    SnifferIOSock(address)

解码IP数据包头: 解码方法同样运用的是上方的方法,只不过这里我们需要找到完整的IP地址的包头封装格式,然后根据特定的包头格式对数据包进行解包操作即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 输出协议和通信双方IP地址
            print("传输协议: {} --> 原地址: {} --> 传输到: {} --> 生存周期: {}".format(ip_header.protocol,ip_header.src_address,ip_header.dst_address,ip_header.this_ttl))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)

解码ICMP数据包头: 原理与解包IP头相同,但需要注意,由于ICMP头在IP头的下方,所以我们需要先解析出IP头数据包,然后根据IP头中的protocol_num判断如果是ICMP则将其传入ICMP结构做进一步解包即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 定义ICMP结构包头
class ICMP(Structure):
    _fields_ = [
        ("type",            c_ubyte),
        ("code",            c_ubyte),
        ("checksum",        c_ushort),
        ("unused",          c_ushort),
        ("next_hop_mtu",    c_ushort)
    ]

    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        self.icmp_type = self.type
        self.icmp_code = self.code
        self.icmp_checksum = self.checksum


# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 判断协议类型是否为ICMP
            if ip_header.protocol == "ICMP":

                # 计算ICMP包的起始位置
                offset = ip_header.ihl * 4
                buf = raw_buffer[offset:offset + sizeof(ICMP)]

                #解析ICMP数据
                icmp_header = ICMP(buf)
                print("原地址: {} --> 发送到: {} --> 解包协议: {} --> 解包代码: {} --> 校验和: {}".format(ip_header.src_address, ip_header.dst_address, icmp_header.icmp_type,icmp_header.icmp_code,icmp_header.icmp_checksum))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)

我们运行上方的代码,然后在本机Ping测试一下www.lyshark.com此时即可看到,本机与对端的来往数据包.

目录
相关文章
|
7月前
|
算法 Python
程序解码错误-由python的requests.post 请求结果乱码引起的思考
最近,在使用python的requests.post的时候,不论结果如何处理,得到的都是乱码。乱码的原因是什么?Accept-Encoding和Content-Encoding的本质是什么?
194 0
|
8月前
|
机器学习/深度学习 移动开发 安全
Base64编码和Python解码
Base64编码和Python解码
139 0
Base64编码和Python解码
|
9月前
|
网络协议 网络安全 定位技术
Python 运用Dpkt库解析数据包
dpkt项目是一个python模块,用于快速、简单的数据包解析,并定义了基本TCP/IP协议,使用该库可以快速解析通过各类抓包工具抓到的数据包,从而提取分析包内的参数。
476 0
|
10月前
|
存储 C++ Python
LeetCode每日一题题解:394. 字符串解码-题解-python && C++源代码
LeetCode每日一题题解:394. 字符串解码-题解-python && C++源代码
|
Python
Python编程:URL网址链接中的中文编码与解码
Python编程:URL网址链接中的中文编码与解码
159 0
|
测试技术 数据安全/隐私保护 Python
力扣每日一题:1720.解码异或后的数组 python异或操作
力扣每日一题:1720.解码异或后的数组 python异或操作
64 0
|
编解码 数据安全/隐私保护 Python
Python 中 base64 编码与解码
Python 中 base64 编码与解码
145 0
|
JavaScript 前端开发 Python
Python 技术篇 - 使用unicode_escape对js的escape()方法编码后的字符串进行解码实例演示
Python 技术篇 - 使用unicode_escape对js的escape()方法编码后的字符串进行解码实例演示
173 0
Python 技术篇 - 使用unicode_escape对js的escape()方法编码后的字符串进行解码实例演示
|
API 语音技术 Python
Python 技术篇-用base64库对音频、图片等文件进行base64编码和解码实例演示
Python 技术篇-用base64库对音频、图片等文件进行base64编码和解码实例演示
774 0
Python 技术篇-用base64库对音频、图片等文件进行base64编码和解码实例演示
|
Python
python笔记6-%u60A0和\u60a0类似unicode解码
前言 有时候从接口的返回值里面获取到的是类似"%u4E0A%u6D77%u60A0%u60A0"这种格式的编码,不是python里面的unicode编码。 python里面的unicode编码应该是这种格式:\u4e0a\u6d77\u60a0\u60a0 unicode编码-python2 1.
1678 0
相关产品
云迁移中心
推荐文章
更多