gopacket API

简介: gopacket API

gopacket

PacketSource

  PacketSource可以从 PacketDataSource 中读取数据包,对其进行解码并返回它们。其定义入下

type PacketSource struct {
   
    // 数据源
    source  PacketDataSource
    // 解码器
    decoder Decoder
    // 解码选项设置,可以在外部改变
    DecodeOptions
    // 数据通道
    c chan Packet
}

// 数据源接口
type PacketDataSource interface {
   
    ReadPacketData() (data []byte, ci CaptureInfo, err error)
}

// 解码器接口
type Decoder interface {
   
    Decode([]byte, PacketBuilder) error
}

// 解码器选项
type DecodeOptions struct {
   
    // 是否延迟解码
    //注意在并发时要小心因为*可能会改变数据包,导致并发函数调用互斥
    Lazy bool
    // 不拷贝到缓冲区
    // 如果能够保证传递给NewPacket的片的底层自己不会被修改,那么这可能更快
    // 如果可能被修改,这可能使数据无效
    NoCopy bool
    // 在包解码时跳过recover恢复
    // 一般在发生panic时,将会被recover捕获,
    //并在详细描述问题的包添加DecodeFailure层
    SkipDecodeRecovery bool
    // 可以在TCP解码器中实现应用层路由
    // 默认false,因为重报应该在解码TCP负载后
    DecodeStreamsAsDatagrams bool
}

  我们可以通过NewPacketSource方法来构造一个PacketSouce源

// source:源,需要实现ReadPacketData接口
// decorder:解码器需要实现Decodeer接口
func NewPacketSource(source PacketDataSource, decoder Decoder) *PacketSource {
   
    return &PacketSource{
   
        source:  source,
        decoder: decoder,
    }
}

  目前有两种不同的方法可以通过 PacketSource 读取数据包:

  • Packets[^1]
  • NextPacket[^3]

Packets

  内部启用一个协程调用NextPacket进行读Packet[^2],并将其写入到返回的管道中

func (p *PacketSource) Packets() chan Packet {
   
    if p.c == nil {
   
        p.c = make(chan Packet, 1000)
        go p.packetsToChannel()
    }
    return p.c
}

  使用示例:

for packet := range packetSource.Packets() {
   
  ...
}

NextPacket

  返回下一个数据包Packet[^2]

func (p *PacketSource) NextPacket() (Packet, error) {
   
    // 读取元数据
    data, ci, err := p.source.ReadPacketData()
    if err != nil {
   
        return nil, err
    }
    // 根据数据基础Packet对象
    packet := NewPacket(data, p.decoder, p.DecodeOptions)
    m := packet.Metadata()
    m.CaptureInfo = ci
    m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
    return packet, nil
}

  使用示例:

for {
   
  packet, err := packetSource.NextPacket()
  if err == io.EOF {
   
    break
  } else if err != nil {
   
    log.Println("Error:", err)
    continue
  }
  handlePacket(packet)  // Do something with each packet.
}

Packet

  packet是gopacket的主要对象,其主要由Decoder创建,一个数据包由一组数据组成,数据在解码时被分解为多个层。

type Packet interface {
   
    // 输出可读性的字符串
    String() string
    // 使用LayerDump输出所有layer的16进制
    Dump() string
    // 返回所有layer
    Layers() []Layer
    // 返回指定层的数据,如果nil则返回第一层数据
    Layer(LayerType) Layer
    // 返回指
    LayerClass(LayerClass) Layer

    // 返回链路层
    LinkLayer() LinkLayer
    // 返回网络层
    NetworkLayer() NetworkLayer
    // 返回传输层
    TransportLayer() TransportLayer
    /// 返回应用层
    ApplicationLayer() ApplicationLayer
    // 返回错误层的信息
    ErrorLayer() ErrorLayer

    // 数据
    Data() []byte
    // 元数据
    Metadata() *PacketMetadata
}

Layer

  Layer表示单个解码数据包层(使用层的 OSI 或 TCP/IP 定义)。解码时,数据包的数据被分解为多个层。调用者可以调用 LayerType() 来确定他们从数据包中接收到的层类型。或者,他们可以使用类型断言来获取实际的层类型,以便对数据进行深入检查。

type Layer interface {
   
    // layer类型
    LayerType() LayerType
    // layer的内容
    LayerContents() []byte
    // layer payload
    LayerPayload() []byte
}

LinkLayer

type LinkLayer interface {
   
    Layer
    LinkFlow() Flow
}

NetworkLayer

type NetworkLayer interface {
   
    Layer
    NetworkFlow() Flow
}

TransportLayer

type TransportLayer interface {
   
    Layer
    TransportFlow() Flow
}

ApplicationLayer

type ApplicationLayer interface {
   
    Layer
    Payload() []byte
}

ErrorLayer

  数据包解码失败时创建的数据包层。它的有效载荷是我们无法解码的所有字节,返回的错误详细说明了解码失败的原因。

type ErrorLayer interface {
   
    Layer
    Error() error
}

Flow

  Flow表示数据包层的流量方向,作为源端点和目的端点。Flow可用作映射键。

type Flow struct {
   
    typ        EndpointType
    slen, dlen int
    src, dst   [MaxEndpointSize]byte
}

NewFlow

  创建一个Flow,src和dst长度必须小于16,否则会panic

func NewFlow(t EndpointType, src, dst []byte) (f Flow)

公共方法

// src Endpoint
func (f Flow) Src() (src Endpoint)
// dst Endpoint
func (f Flow) Dst() (dst Endpoint)
// Endpoint类型
func (f Flow) EndpointType() EndpointType
// src 和 dst Endpoint
func (f Flow) Endpoints() (src, dst Endpoint)
// 快速散列函数
func (f Flow) FastHash() (h uint64)
// 将src与dst转换
func (f Flow) Reverse() Flow
// 输出
func (f Flow) String() string

Enpoint

  Enpoint是用于在各个层寻址数据包的一组字节。请参阅 LinkLayer[^4]、NetworkLayer[^5]和 TransportLayer[^6]规范。Enpoint可用作映射键。

type Endpoint struct {
   
    typ EndpointType
    len int
    raw [MaxEndpointSize]byte
}
// 新建Endpoint,raw的大小必须小于MaxEndpointSize
func NewEndpoint(typ EndpointType , raw [] byte ) (e Endpoint )
// EndpointType
func (a Endpoint) EndpointType() EndpointType
// 快速散列函数
func (a Endpoint ) FastHash() (h uint64 )
// 返回原始byte
func (a Endpoint) Raw() []byte 
// 为所有端点提供稳定的排序。
// 1. 根据端点的 EndpointType 进行排序
// 2. 然后根据该端点的原始字节进行排序。
// 排序首先基于端点类型,然后基于原始端点字节。端点字节按字典顺序排序。
func (a Endpoint) LessThan(b Endpoint) bool

func (a Endpoint) String() string

  ‍

Pcap

Handle

OpenLive

// device:指定网络要捕获的网络设备名称,可以是FindAllDevs返回的设备的Name
// snaplen:每个数据包读取的最大长度
// promisc:是否将网口设置为混杂模式,即是否接收目的地址不为本机的包
// timeout:设置抓到包返回的超时。如果设置成30s,那么每30s才会刷新一次数据包;设置成负数,会立刻刷新数据包,即不做等待
func OpenLive(device string, snaplen int32, promisc bool, timeout time.Duration) (handle *Handle, _ error)

OpenOffline

  打开一个存储libpcap支持的文件

func OpenOffline(file string) (handle *Handle, err error)

OpenOfflineFile

  与OpenOffline类似,需要我们手动打开文件后返回的文件句柄作为参数

func OpenOfflineFile(file * os . File ) (handle * Handle , err error )

InactiveHandle

NewInactiveHandle

  创建一个pcap句柄,但是需要调用Active进行激活,使用或者你可以更加方便的更改其参数

func NewInactiveHandle(device string) (*InactiveHandle, error)

pcapgo

Writer

  Writer 封装了一个底层的 io.Writer 来以 PCAP 格式写入数据包数据。

type Writer struct {
   
    w        io.Writer
    tsScaler int
    buf [16]byte
}

NewWriter与NewWriterNanos

  返回一个新的 writer 对象,用于将数据包数据写入给定的 writer。如果这是一个新的空写入器(与追加相反),则必须在 WritePacket 之前调用 WriteFileHeader[^7]。数据包时间戳以微秒精度写入。

func NewWriter(w io . Writer ) * Writer
func NewWriterNanos(w io . Writer ) * Writer

  示例:

// 创建一个空文件
f, _ := os.Create("/tmp/file.pcap")
// 新建一个Writer对象
w := pcapgo.NewWriter(f)
// 新文件,必须写入头
w.WriteFileHeader(65536, layers.LinkTypeEthernet)  
// 写入包
w.WritePacket(gopacket.CaptureInfo{
   ...}, data1)
f.Close()
// 追加
f2, _ := os.OpenFile("/tmp/file.pcap", os.O_APPEND, 0700)
// 创建一个writer
w2 := pcapgo.NewWriter(f2)
// 
w2.WritePacket(gopacket.CaptureInfo{
   ...}, data2)
f2.Close()

WriteFileHeader

  写入文件头通过链路层类型,每次新建文件时必须首先调用此函数

func (w *Writer) WriteFileHeader(snaplen uint32, linktype layers.LinkType) error

WritePacket

func (w *Writer) WritePacket(ci gopacket.CaptureInfo, data []byte) error

ip4defrag - IPV4分片重组

  包ip4defrag实现了IPV4分片重组。

IP4Defragmenter

  IP4Defragmenter嵌入一个map用来存储IPV4分片

type IPv4Defragmenter struct {
   
    sync.RWMutex
    ipFlows map[ipv4]*fragmentList
}

NewIPv4Defragmenter

  构造一个IPv4Defragmenter

func NewIPv4Defragmenter() *IPv4Defragmenter {
   
    return &IPv4Defragmenter{
   
        ipFlows: make(map[ipv4]*fragmentList),
    }
}

DefragIPv4

  DefragIPv4 接收带有片段有效负载的 IPv4 数据包。

  它不会修改 IPv4 层,“in”保持不变它返回一个准备好使用的 IPv4 层。

  如果传入的 IPv4 层没有分片,它会立即返回而不修改该层。

  如果 IPv4 层是一个片段并且我们没有所有片段,它将返回 nil 并存储最终对数据包进行碎片整理所需的任何内部信息。

  如果 IPv4 层是重建数据包所需的最后一个片段,则将返回一个新的 IPv4 层,并将设置为整个碎片整理的数据包,

func (d * IPv4Defragmenter ) DefragIPv4(in * layers . IPv4 ) 
        (* layers . IPv4 , error )

  使用示例

func HandlePacket(in *layers.IPv4) err {
   
    defragger := ip4defrag.NewIPv4Defragmenter()
    in, err := defragger.DefragIPv4(in)
    if err != nil {
   
        return err
    } else if in == nil {
   
        return nil  // packet fragment, we don't have whole packet yet.
    }
    // At this point, we know that 'in' is defragmented.
    //It may be the same 'in' passed to
       // HandlePacket, or it may not, but we don't really care :)
       ... do stuff to 'in' ...
}

DefragIPv4WithTimestamp

  DefragIPv4WithTimestamp 提供 DefragIPv4 的功能,带有一个额外的时间戳参数,用于丢弃旧片段而不是 time.Now()

  这在操作 pcap 文件而不是实时捕获的数据时很有用

func (d *IPv4Defragmenter) DefragIPv4WithTimestamp(in *layers.IPv4, t time.Time) (*layers.IPv4, error)

DiscardOlderThan

  DiscardOlderThan 会忘记自时间 t 以来没有任何活动的所有数据包。它返回 FragmentList 的数量,也就是它丢弃的片段数据包的数量。

func (d * IPv4Defragmenter ) DiscardOlderThan(t time.Time ) int

layers- 为许多常见协议提供解码

  layers为许多常见协议提供解码layer。

  layers包含许多不同类型的数据包layer的解码实现。

  要查看 gopacket/layers 当前能够解码的协议集,请查看变量部分中定义的 LayerTypes 集。layers包还定义了许多具有与其相关联的src/dst地址的常见数据包层的端点,例如 IPv4/6 (IP) 和 TCP/UDP (端口)。最后,layers包含许多有用的枚举(IPProtocol、EthernetType、LinkType、PPPType 等)。其中许多实现了 gopacket.Decoder 接口,因此可以将它们作为解码器传递给 gopacket。

reassembly - 提供TCP包重组

  reassembly包提供了TCP流重组。

  Assembler 使用用户提供的 StreamFactory 创建用户定义的 Stream 接口,然后按流顺序将数据包数据传递给该对象。并发安全的 StreamPool 会跟踪所有当前正在重组的 Stream,因此可以同时运行多个 Assembler 来组装数据包,同时利用多个内核。

Assembler

FlushWithOption

func (a * Assembler ) FlushWithOptions(opt FlushOptions ) (flushed, closed int )

  FlushWithOptions 发现任何等待数据包的数据包早于给定时间 T 的任何流,并推送他们拥有的数据(IE:告诉他们停止等待并跳过他们正在等待的数据)。

  它还关闭早于 TC 的流(可以设置为零,以保持长期流的活动状态,但无论如何都要刷新数据)。

  每个 Stream 都维护一个包含零个或多个乱序接收的字节集的列表。例如,如果它已经处理到序列号 10,它的列表中可能有字节 [15-20)、[20-25)、[30,50)。每组字节还具有最初查看的时间戳。刷新调用将查看最小的后续字节集,在本例中为 [15-20),如果它的时间戳早于传入的时间,它将把它和所有连续的字节集推送到 Stream 的 Reassembled功能。在这种情况下,它将推动 [15-20),但也会推动 [20-25),因为这是连续的。如果它的时间戳也早于传入的时间,它只会推送 [30-50),否则它将等到下一个 FlushCloseOlderThan 来查看字节 [25-30) 是否进来。

  返回刷新的连接数,以及其中因刷新而关闭的连接数。

Stream

type Stream interface {
   
    //告诉TCP数据包是否应该被接受,start可以修改为强制启动,即使没有看到SYN
    Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool, ac AssemblerContext) bool

    // ReassembledSG被调用0次或多次。
    // ScatterGather在每次重组调用后被重用,
    // 所以把你需要的东西都复制出来是很重要的,
    // 特别是bytes(或使用KeepFrom())
    ReassembledSG(sg ScatterGather, ac AssemblerContext)

    //ReassemblyComplete会在程序集决定此流没有更多数据时调用,
    //原因可能是看到了FIN或RST数据包,
    //也可能是流超时而没有任何新数据包数据
    //(原因是调用FlushCloseOlderThan)。
    //如果它想查看Accept()的后续数据包,
    //例如查看FIN-ACK,以便进行更深入的状态机分析,则返回false。
    ReassemblyComplete(ac AssemblerContext) bool
}

ScatterGather

  ScatterGather用于通过ReassembledSG将重新组装的数据和重新组装的数据包的元数据传递给流

type ScatterGather interface {
   
    // 返回可用的字节数和保存的字节数
    Lengths() (int, int)
    // 返回数据最大为可用字节数
    Fetch(length int) []byte
    // 告诉防止偏移
    KeepFrom(offset int)
    // 返回对应于给定偏移量的数据包的CaptureInfo
    CaptureInfo(offset int) gopacket.CaptureInfo
    // 重组块的信息
    // direction
    Info() (direction TCPFlowDirection, start bool, end bool, skip int)
    // 流的stats
    Stats() TCPAssemblyStats
}

  ‍

[^1]: ### Packets

[^2]: ## Packet

[^3]: ### NextPacket

[^4]: ## LinkLayer

[^5]: ## NetworkLayer

[^6]: ## TransportLayer

[^7]: ### WriteFileHeader

相关文章
|
JSON Kubernetes 数据格式
K8S client-go Patch example
我在本文中主要会介绍使用client-go的Patch方式,主要包括strategic merge patch和json-patch
|
1月前
|
NoSQL Go Redis
关于kratos proto 生成pb.go的一些报错,问题
关于kratos proto 生成pb.go的一些报错,问题
|
6月前
|
程序员 Go
|
3月前
|
算法 搜索推荐 Unix
快速指南: Go 1.19功能
快速指南: Go 1.19功能
|
3月前
|
缓存 JavaScript 前端开发
为开源项目 go-gin-api 增加 WebSocket 模块
为开源项目 go-gin-api 增加 WebSocket 模块
46 2
|
Java Go 调度
Go Runtime功能初探
Go Runtime功能初探
96 2
|
11月前
|
Linux Go Windows
gopacket使用
gopacket使用
|
11月前
|
存储 缓存 网络协议
gopacket reassembly源码分析
gopacket reassembly源码分析
|
11月前
|
存储 缓存 网络协议
gopacket tcpassembly源码分析
gopacket tcpassembly源码分析
|
JavaScript 前端开发 Go
Go WASM:如何在 Go 中访问 DOM API?
Go WASM:如何在 Go 中访问 DOM API?
192 0