Python 抓取并解码原始数据包

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 应用Python支持的混杂模式,抓取流经网卡的数据包,并对IP以及ICMP数据包进行拆包,打印出我们所需要的字段信息。

应用Python支持的混杂模式,抓取流经网卡的数据包,并对IP以及ICMP数据包进行拆包,打印出我们所需要的字段信息。

抓取原始数据包: Python中默认的Socket模块就可以实现对原始数据包的解包操作,如下代码.

需要注意这段代码只能在Windows平台使用,因为我们需要开启网卡的IOCTL混杂模式,这是Win平台特有的.

import socket
import uuid

# 获取本机MAC地址
def GetHostMAC():
    mac=uuid.UUID(int = uuid.getnode()).hex[-12:]
    this_mac = ":".join([mac[e:e+2] for e in range(0,11,2)])
    this_name = socket.getfqdn(socket.gethostname())
    print("本机名: {} --> 本机MAC: {}".format(this_name,this_mac))

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 开始跟踪原始数据包
def SnifferIOSock(address):
    # 创建原始套接字,然后绑定在公开接口上
    socket_protocol = socket.IPPROTO_IP

    # 开启原始数据包模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))

    # 设置在捕获的数据包中包含IP头
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    # Windows平台需要设置IOCTL以启动混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 读取单个数据包
    while True:
        print(sniffer.recvfrom(65565))

    # 退出时,关闭混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    GetHostMAC()
    address = GetHostAddress()
    print("本机IP: {}".format(address))
    SnifferIOSock(address)

解码IP数据包头: 解码方法同样运用的是上方的方法,只不过这里我们需要找到完整的IP地址的包头封装格式,然后根据特定的包头格式对数据包进行解包操作即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 输出协议和通信双方IP地址
            print("传输协议: {} --> 原地址: {} --> 传输到: {} --> 生存周期: {}".format(ip_header.protocol,ip_header.src_address,ip_header.dst_address,ip_header.this_ttl))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)

解码ICMP数据包头: 原理与解包IP头相同,但需要注意,由于ICMP头在IP头的下方,所以我们需要先解析出IP头数据包,然后根据IP头中的protocol_num判断如果是ICMP则将其传入ICMP结构做进一步解包即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 定义ICMP结构包头
class ICMP(Structure):
    _fields_ = [
        ("type",            c_ubyte),
        ("code",            c_ubyte),
        ("checksum",        c_ushort),
        ("unused",          c_ushort),
        ("next_hop_mtu",    c_ushort)
    ]

    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        self.icmp_type = self.type
        self.icmp_code = self.code
        self.icmp_checksum = self.checksum


# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 判断协议类型是否为ICMP
            if ip_header.protocol == "ICMP":

                # 计算ICMP包的起始位置
                offset = ip_header.ihl * 4
                buf = raw_buffer[offset:offset + sizeof(ICMP)]

                #解析ICMP数据
                icmp_header = ICMP(buf)
                print("原地址: {} --> 发送到: {} --> 解包协议: {} --> 解包代码: {} --> 校验和: {}".format(ip_header.src_address, ip_header.dst_address, icmp_header.icmp_type,icmp_header.icmp_code,icmp_header.icmp_checksum))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)

我们运行上方的代码,然后在本机Ping测试一下www.lyshark.com此时即可看到,本机与对端的来往数据包.

相关文章
|
3月前
|
Python
"揭秘!Python如何运用神秘的正则表达式,轻松穿梭于网页迷宫,一键抓取隐藏链接?"
【8月更文挑战第21天】Python凭借其强大的编程能力,在数据抓取和网页解析领域表现出高效与灵活。通过结合requests库进行网页请求及正则表达式进行复杂文本模式匹配,可轻松提取网页信息。本示例展示如何使用Python和正则表达式解析网页链接。首先确保已安装requests库,可通过`pip install requests`安装。接着,利用requests获取网页内容,并使用正则表达式提取所有`&lt;a&gt;`标签的`href`属性。
46 0
|
29天前
|
数据采集 JSON 数据处理
抓取和分析JSON数据:使用Python构建数据处理管道
在大数据时代,电商网站如亚马逊、京东等成为数据采集的重要来源。本文介绍如何使用Python结合代理IP、多线程等技术,高效、隐秘地抓取并处理电商网站的JSON数据。通过爬虫代理服务,模拟真实用户行为,提升抓取效率和稳定性。示例代码展示了如何抓取亚马逊商品信息并进行解析。
抓取和分析JSON数据:使用Python构建数据处理管道
|
26天前
|
数据采集 Python
python爬虫抓取91处理网
本人是个爬虫小萌新,看了网上教程学着做爬虫爬取91处理网www.91chuli.com,如果有什么问题请大佬们反馈,谢谢。
28 4
|
27天前
|
数据采集 Java Python
如何用Python同时抓取多个网页:深入ThreadPoolExecutor
在信息化时代,实时数据的获取对体育赛事爱好者、数据分析师和投注行业至关重要。本文介绍了如何使用Python的`ThreadPoolExecutor`结合代理IP和请求头设置,高效稳定地抓取五大足球联赛的实时比赛信息。通过多线程并发处理,解决了抓取效率低、请求限制等问题,提供了详细的代码示例和解析方法。
如何用Python同时抓取多个网页:深入ThreadPoolExecutor
|
2月前
|
数据采集 存储 JavaScript
构建您的第一个Python网络爬虫:抓取、解析与存储数据
【9月更文挑战第24天】在数字时代,数据是新的金矿。本文将引导您使用Python编写一个简单的网络爬虫,从互联网上自动抓取信息。我们将介绍如何使用requests库获取网页内容,BeautifulSoup进行HTML解析,以及如何将数据存储到文件或数据库中。无论您是数据分析师、研究人员还是对编程感兴趣的新手,这篇文章都将为您提供一个实用的入门指南。拿起键盘,让我们开始挖掘互联网的宝藏吧!
|
3月前
|
数据采集 Python
如何用Python Selenium和WebDriver抓取LinkedIn数据并保存登录状态
本文介绍了使用Python Selenium和WebDriver库抓取LinkedIn数据的方法。首先,安装Selenium库和对应的WebDriver,然后配置爬虫代理IP以避免频繁请求被检测。接下来,设置user-agent和cookies以模拟真实用户行为,实现登录并保持状态。登录后,使用WebDriver抓取目标页面数据,如用户名、年龄、性别和简历信息。最后,强调了优化代码、处理异常和遵守使用条款的重要性,以提高效率并避免账号被封禁。
如何用Python Selenium和WebDriver抓取LinkedIn数据并保存登录状态
|
2月前
|
数据采集 JavaScript 前端开发
构建简易Python爬虫:抓取网页数据入门指南
【8月更文挑战第31天】在数字信息的时代,数据抓取成为获取网络资源的重要手段。本文将引导你通过Python编写一个简单的网页爬虫,从零基础到实现数据抓取的全过程。我们将一起探索如何利用Python的requests库进行网络请求,使用BeautifulSoup库解析HTML文档,并最终提取出有价值的数据。无论你是编程新手还是有一定基础的开发者,这篇文章都将为你打开数据抓取的大门。
|
3月前
|
Python
【python】python抓取古诗文内容保存(源码)【独一无二】
【python】python抓取古诗文内容保存(源码)【独一无二】
|
3月前
|
算法 Python
【Leetcode刷题Python】百分号解码
深信服公司的算法笔试题.
38 1
|
3月前
|
数据采集 JavaScript 前端开发
构建你的第一个Python爬虫:抓取网页数据入门指南
【8月更文挑战第31天】在数字时代,数据是新的石油。本文将引导初学者通过简单的步骤,使用Python编程语言创建一个基础的网络爬虫程序。我们将探索如何从网络上提取信息,并理解背后的原理。无论你是编程新手还是想要扩展你的技术工具箱,这篇文章都将为你提供一条清晰的道路,让你学会编写能够自动获取网络数据的脚本。准备好开始你的网络数据抓取之旅了吗?让我们现在就开始吧!
下一篇
无影云桌面