Python 运用Dpkt库解析数据包

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: dpkt项目是一个python模块,用于快速、简单的数据包解析,并定义了基本TCP/IP协议,使用该库可以快速解析通过各类抓包工具抓到的数据包,从而提取分析包内的参数。

dpkt项目是一个python模块,用于快速、简单的数据包解析,并定义了基本TCP/IP协议,使用该库可以快速解析通过各类抓包工具抓到的数据包,从而提取分析包内的参数。

使用Dpkt分析数据包: 使用Dpkt发现URL中存在的.zip字样链接

#coding=utf-8
import dpkt
import socket

def FindPcapWord(pcap,WordKey):
    for ts,buf in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(buf)
            ip = eth.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            tcp = ip.data
            http = dpkt.http.Request(tcp.data)
            if(http.method == "GET"):
                uri = http.uri.lower()
                if WordKey in uri:
                    print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
        except Exception:
            pass

fp = open("D://aaa.pcap","rb")
pcap = dpkt.pcap.Reader(fp)
FindPcapWord(pcap,"wang.zip")

也可以使用dpkt解析本机数据包中是否包含后门。

#coding=utf-8
import dpkt
import socket

def FindPcapWord(pcap,WordKey):
    for timestamp,packet in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(packet)
            ip = eth.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            tcp = ip.data
            http = dpkt.http.Request(tcp.data)
            if(http.method == "GET"):
                uri = http.uri.lower()
                if WordKey in uri:
                    print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
        except Exception:
            pass

def Banner():
    print("  _          ____  _                _    ")
    print(" | |   _   _/ ___|| |__   __ _ _ __| | __")
    print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")
    print(" | |__| |_| |___) | | | | (_| | |  |   < ")
    print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")
    print("       |___/                             \n")
    print("E-Mail: me@lyshark.com")

def FindHivemind(pcap):
    for timestamp,packet in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(packet)
            ip = eth.data
            tcp = ip.data

            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            sport = tcp.sport
            dport = tcp.dport
            # print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))
            if dport == 80 and dst == "125.39.247.226":
                # 如果数据流中存在cmd等明文命令则说明可能存在后门
                if '[cmd]# ' in tcp.data.lower():
                    print("[+] {}:{}".format(dst,dport))
        except Exception:
            pass

Banner()
fp = open("D://aaa.pcap","rb")
pcap = dpkt.pcap.Reader(fp)
FindHivemind(pcap)

实时检测DDoS攻击: 主要通过设置检测不正常数据包数量的阈值来判断是否存在DDoS攻击。

#coding=utf-8
import dpkt
import socket

def FindPcapWord(pcap,WordKey):
    for timestamp,packet in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(packet)
            ip = eth.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            tcp = ip.data
            http = dpkt.http.Request(tcp.data)
            if(http.method == "GET"):
                uri = http.uri.lower()
                if WordKey in uri:
                    print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
        except Exception:
            pass

def FindHivemind(pcap):
    for timestamp,packet in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(packet)
            ip = eth.data
            tcp = ip.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            sport = tcp.sport
            dport = tcp.dport
            # print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))
            if dport == 80 and dst == "125.39.247.226":
                # 如果数据流中存在cmd等明文命令则说明可能存在后门
                if '[cmd]# ' in tcp.data.lower():
                    print("[+] {}:{}".format(dst,dport))
        except Exception:
            pass

def Banner():
    print("  _          ____  _                _    ")
    print(" | |   _   _/ ___|| |__   __ _ _ __| | __")
    print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")
    print(" | |__| |_| |___) | | | | (_| | |  |   < ")
    print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")
    print("       |___/                             \n")
    print("E-Mail: me@lyshark.com")

def FindDDosAttack(pcap):
    pktCount = {}
    for timestamp,packet in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(packet)
            ip = eth.data
            tcp = ip.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            sport = tcp.sport
            # 累计判断各个src地址对目标地址80端口访问次数
            if dport == 80:
                stream = src + ":" + dst
                if pktCount.has_key(stream):
                    pktCount[stream] = pktCount[stream] + 1
                else:
                    pktCount[stream] = 1
        except Exception:
            pass
    for stream in pktCount:
        pktSent = pktCount[stream]
        # 如果超过设置的检测阈值500,则判断为DDOS攻击行为
        if pktSent > 500:
            src = stream.split(":")[0]
            dst = stream.split(":")[1]
            print("[+] 源地址: {} 攻击: {} 流量: {} pkts.".format(src,dst,str(pktSent)))

if __name__ == "__main__":
    Banner()
    fp = open("D://data.pcap","rb")
    pcap = dpkt.pcap.Reader(fp)
    FindPcapWord(pcap,"wang.zip")

DPKT动态抓包解析: 首先使用scapy动态抓包,然后调用不同的函数对抓到的数据包进行处理提取出想要的数据.

import os,argparse,dpkt
from scapy.all import *
pkts=[]
count=0

# 检查数据包的IP层,提取出IP和TTL字段的值
def Get_TTL(pkt):
    try:
        if pkt.haslayer(IP):
            ip_src = pkt.getlayer(IP).src
            ip_sport = pkt.getlayer(IP).sport
            ip_dst = pkt.getlayer(IP).dst
            ip_dport = pkt.getlayer(IP).dport
            ip_ttl = str(pkt.ttl)
            print("[+] 源地址: %-15s:%-5s --> 目标地址: %-15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl))
    except Exception:
        pass

# 获取本机发送出去的DNS请求所对应的网站地址
def Get_DNSRR(pkt):
    if pkt.haslayer(DNSRR):
        rrname = pkt.getlayer(DNSRR).rrname
        rdata = pkt.getlayer(DNSRR).rdata
        ttl = pkt.getlayer(DNSRR).ttl
        print("[+] 域名: {} --> 别名: {} --> TTL: {}".format(rrname,rdata,ttl))

# 解析网页的DNS查询记录
def Get_DNSQR(pkt):
    # 判断是否含有DNSRR且存在UDP端口53
    if pkt.haslayer(DNSRR) and pkt.getlayer(UDP).sport == 53:
        rcode = pkt.getlayer(DNS).rcode
        qname = pkt.getlayer(DNSQR).qname
        # 若rcode为3,则表示该域名不存在
        if rcode == 3:
            print("[-] 域名解析不存在")
        else:
            print("[+] 解析DNSQR存在:" + str(qname))

# 检测主机是否被DDOS攻击了
def FindDDosAttack(pcap):
    pktCount = {}
    for timestamp,packet in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(packet)
            ip = eth.data
            tcp = ip.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            sport = tcp.sport
            # 累计判断各个src地址对目标地址80端口访问次数
            if dport == 80:
                stream = src + ":" + dst
                if pktCount.has_key(stream):
                    pktCount[stream] = pktCount[stream] + 1
                else:
                    pktCount[stream] = 1
        except Exception:
            pass
    for stream in pktCount:
        pktSent = pktCount[stream]
        # 如果超过设置的检测阈值500,则判断为DDOS攻击行为
        if pktSent > 500:
            src = stream.split(":")[0]
            dst = stream.split(":")[1]
            print("[+] 源地址: {} 攻击: {} 流量: {} pkts.".format(src,dst,str(pktSent)))

# FindPcapURL 监控提取数据包中的所有URL
def FindPcapURL(pcap):
    Url = []
    for timestamp,packet in pcap:
        try:
            eth = dpkt.ethernet.Ethernet(packet)
            ip = eth.data
            src = socket.inet_ntoa(ip.src)
            tcp = ip.data
            http = dpkt.http.Request(tcp.data)
            if(http.method == "GET"):
                UrlHead = http.headers
                for key,value in UrlHead.items():
                    url = re.findall('^https*://.*',str(value))
                    if url:
                        print("[+] 源地址: %10s --> 访问URL: %-80s"%(src, url[0]))
        except Exception:
            pass
    return set(Url)

# 动态保存pcap文件(每1024字节保存一次pcap文件),并读取出其中的网址解析出来
def write_cap(pkt):
    global pkts
    global count
    pkts.append(pkt)
    count += 1
    if count == 1024:
        wrpcap("data.pcap",pkts)
        fp = open("./data.pcap","rb")
        pcap = dpkt.pcap.Reader(fp)
        FindPcapURL(pcap)
        fp.close()
        pkts,count = [],0

def Banner():
    print("  _          ____  _                _    ")
    print(" | |   _   _/ ___|| |__   __ _ _ __| | __")
    print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")
    print(" | |__| |_| |___) | | | | (_| | |  |   < ")
    print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")
    print("       |___/                             \n")
    print("E-Mail: me@lyshark.com")

if __name__ == "__main__":
    Banner()
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode",dest="mode",help="模式选择<TTL/DNSRR/DNSQR/URL>")
    args = parser.parse_args()
    if args.mode == "TTL":
        print("[*] 开始抓取本机TTL流量")
        sniff(prn=Get_TTL,store=0)
    elif args.mode == "DNSRR":
        print("[*] 开始抓取本机发送出去的DNS查询请求所对应的网站URL")
        sniff(prn=Get_DNSRR,store=0)
    elif args.mode == "DNSQR":
        print("[*] 解析网页的DNS查询记录")
        sniff(prn=Get_DNSQR,store=0)
    elif args.mode == "URL":
        print("[+] 开始抓包,pcap文件并读取出其中的网址")
        sniff(prn=write_cap,store=0)
    else:
        parser.print_help()

Geoip2定位IP来源: 首先提取出Pcpa格式的数据包文件,然后通过使用离线数据库查询出指定IP地址的地理位置.

# pip install geoip2
# github地址下载:https://github.com/maxmind/GeoIP2-python
# 离线数据库:https://www.maxmind.com/en/accounts/current/geoip/downloads
import argparse
import socket,dpkt
import geoip2.database

def AnalysisPace(DpktPack,Filter):
    respon = []
    with open(DpktPack,"rb") as fp:
        pcap = dpkt.pcap.Reader(fp)
        for timestamp, packet in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(packet)
                # 解析过滤出网络层(三层)中的IP数据包
                if eth.data.__class__.__name__ == "IP":
                    ip = eth.data
                    src = socket.inet_ntoa(ip.src)
                    dst = socket.inet_ntoa(ip.dst)
                    # 解析过滤出传输层(四层)中的TCP数据包
                    if eth.data.data.__class__.__name__ == "TCP":
                        sport = eth.data.data.sport
                        dport = eth.data.data.dport
                        # 过滤出源地址是192.168.1.2且目的端口是80或者443的流量
                        # if src == "192.168.1.2" and dport == 80 or dport == 443:
                        if eval(Filter):
                            dic = { "src":"None","sport":0 , "dst":"None","dport":0 }
                            #print("[+] 时间戳: %-17s 源地址: %-14s:%-2s ---> 目标地址: %-16s:%-2s" %(timestamp,src, sport, dst, dport))
                            RecvData = eth.data.data.data
                            if len(RecvData) and b"GET" in RecvData:
                                #print("[*] 时间戳: {} 源地址: {} <--- 访问网页: {}".format(timestamp,src,bytes.decode(RecvData).split("\n")[1]))
                                pass
                            dic['src'] = src
                            dic['dst'] = dst
                            dic['sport'] = sport
                            dic['dport'] = dport
                            respon.append(dic)
            except Exception:
                pass
    return respon

def AnalysisIP_To_Address(PcapFile,MmdbFile):
    IPDict = AnalysisPace(PcapFile,"dport ==80 or dport == 443")
    NoRepeat = []

    for item in range(len(IPDict)):
        NoRepeat.append(IPDict[item].get("dst"))
    NoRepeat = set(NoRepeat)

    reader = geoip2.database.Reader(MmdbFile)
    for item in NoRepeat:
        try:
            response = reader.city(item)
            print("[+] IP地址: %-16s --> " %item,end="")
            print("网段: %-16s --> " %response.traits.network,end="")
            print("经度: %-10s 纬度: %-10s --> " %(response.location.latitude, response.location.longitude),end="")
            print("定位: {} {} {}".format(response.country.names["zh-CN"],response.subdivisions.most_specific.name,response.city.name),end="\n")
        except Exception:
            print("定位: None None None")
            pass

def Banner():
    print("  _          ____  _                _    ")
    print(" | |   _   _/ ___|| |__   __ _ _ __| | __")
    print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")
    print(" | |__| |_| |___) | | | | (_| | |  |   < ")
    print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")
    print("       |___/                             \n")
    print("E-Mail: me@lyshark.com\n")

if __name__ == '__main__':
    Banner()
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--pcap", dest="pcap", help="设置抓到的数据包 *.pcap")
    parser.add_argument("-d", "--mmdb", dest="mmdb", help="设置城市数据库 GeoLite2-City.mmdb")
    
    args = parser.parse_args()
    # 使用方式: main.py -p data.pcap -d GeoLite2-City.mmdb (分析数据包中IP)
    if args.pcap and args.mmdb:
        AnalysisIP_To_Address(args.pcap,args.mmdb)
    else:
        parser.print_help()
相关文章
|
27天前
|
XML JSON 数据库
Python的标准库
Python的标准库
167 77
|
21天前
|
数据采集 JSON API
如何利用Python爬虫淘宝商品详情高级版(item_get_pro)API接口及返回值解析说明
本文介绍了如何利用Python爬虫技术调用淘宝商品详情高级版API接口(item_get_pro),获取商品的详细信息,包括标题、价格、销量等。文章涵盖了环境准备、API权限申请、请求构建和返回值解析等内容,强调了数据获取的合规性和安全性。
|
19天前
|
数据挖掘 vr&ar C++
让UE自动运行Python脚本:实现与实例解析
本文介绍如何配置Unreal Engine(UE)以自动运行Python脚本,提高开发效率。通过安装Python、配置UE环境及使用第三方插件,实现Python与UE的集成。结合蓝图和C++示例,展示自动化任务处理、关卡生成及数据分析等应用场景。
84 5
|
28天前
|
XML JSON 数据库
Python的标准库
Python的标准库
48 11
|
28天前
|
数据可视化 Python
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
通过这些思维导图和分析说明表,您可以更直观地理解和选择适合的数据可视化图表类型,帮助更有效地展示和分析数据。
66 8
|
1月前
|
存储 缓存 Python
Python中的装饰器深度解析与实践
在Python的世界里,装饰器如同一位神秘的魔法师,它拥有改变函数行为的能力。本文将揭开装饰器的神秘面纱,通过直观的代码示例,引导你理解其工作原理,并掌握如何在实际项目中灵活运用这一强大的工具。从基础到进阶,我们将一起探索装饰器的魅力所在。
|
1月前
|
Android开发 开发者 Python
通过标签清理微信好友:Python自动化脚本解析
微信已成为日常生活中的重要社交工具,但随着使用时间增长,好友列表可能变得臃肿。本文介绍了一个基于 Python 的自动化脚本,利用 `uiautomator2` 库,通过模拟用户操作实现根据标签批量清理微信好友的功能。脚本包括环境准备、类定义、方法实现等部分,详细解析了如何通过标签筛选并删除好友,适合需要批量管理微信好友的用户。
55 7
|
1月前
|
安全 API 文件存储
Yagmail邮件发送库:如何用Python实现自动化邮件营销?
本文详细介绍了如何使用Yagmail库实现自动化邮件营销。Yagmail是一个简洁强大的Python库,能简化邮件发送流程,支持文本、HTML邮件及附件发送,适用于数字营销场景。文章涵盖了Yagmail的基本使用、高级功能、案例分析及最佳实践,帮助读者轻松上手。
36 4
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
92 2
|
15天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析