gopacket
PacketSource
PacketSource可以从 PacketDataSource 中读取数据包,对其进行解码并返回它们。其定义入下
type PacketSource struct {
// 数据源
source PacketDataSource
// 解码器
decoder Decoder
// 解码选项设置,可以在外部改变
DecodeOptions
// 数据通道
c chan Packet
}
// 数据源接口
type PacketDataSource interface {
ReadPacketData() (data []byte, ci CaptureInfo, err error)
}
// 解码器接口
type Decoder interface {
Decode([]byte, PacketBuilder) error
}
// 解码器选项
type DecodeOptions struct {
// 是否延迟解码
//注意在并发时要小心因为*可能会改变数据包,导致并发函数调用互斥
Lazy bool
// 不拷贝到缓冲区
// 如果能够保证传递给NewPacket的片的底层自己不会被修改,那么这可能更快
// 如果可能被修改,这可能使数据无效
NoCopy bool
// 在包解码时跳过recover恢复
// 一般在发生panic时,将会被recover捕获,
//并在详细描述问题的包添加DecodeFailure层
SkipDecodeRecovery bool
// 可以在TCP解码器中实现应用层路由
// 默认false,因为重报应该在解码TCP负载后
DecodeStreamsAsDatagrams bool
}
我们可以通过NewPacketSource方法来构造一个PacketSouce源
// source:源,需要实现ReadPacketData接口
// decorder:解码器需要实现Decodeer接口
func NewPacketSource(source PacketDataSource, decoder Decoder) *PacketSource {
return &PacketSource{
source: source,
decoder: decoder,
}
}
目前有两种不同的方法可以通过 PacketSource 读取数据包:
- Packets[^1]
- NextPacket[^3]
Packets
内部启用一个协程调用NextPacket进行读Packet[^2],并将其写入到返回的管道中
func (p *PacketSource) Packets() chan Packet {
if p.c == nil {
p.c = make(chan Packet, 1000)
go p.packetsToChannel()
}
return p.c
}
使用示例:
for packet := range packetSource.Packets() {
...
}
NextPacket
返回下一个数据包Packet[^2]
func (p *PacketSource) NextPacket() (Packet, error) {
// 读取元数据
data, ci, err := p.source.ReadPacketData()
if err != nil {
return nil, err
}
// 根据数据基础Packet对象
packet := NewPacket(data, p.decoder, p.DecodeOptions)
m := packet.Metadata()
m.CaptureInfo = ci
m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
return packet, nil
}
使用示例:
for {
packet, err := packetSource.NextPacket()
if err == io.EOF {
break
} else if err != nil {
log.Println("Error:", err)
continue
}
handlePacket(packet) // Do something with each packet.
}
Packet
packet是gopacket的主要对象,其主要由Decoder创建,一个数据包由一组数据组成,数据在解码时被分解为多个层。
type Packet interface {
// 输出可读性的字符串
String() string
// 使用LayerDump输出所有layer的16进制
Dump() string
// 返回所有layer
Layers() []Layer
// 返回指定层的数据,如果nil则返回第一层数据
Layer(LayerType) Layer
// 返回指
LayerClass(LayerClass) Layer
// 返回链路层
LinkLayer() LinkLayer
// 返回网络层
NetworkLayer() NetworkLayer
// 返回传输层
TransportLayer() TransportLayer
/// 返回应用层
ApplicationLayer() ApplicationLayer
// 返回错误层的信息
ErrorLayer() ErrorLayer
// 数据
Data() []byte
// 元数据
Metadata() *PacketMetadata
}
Layer
Layer表示单个解码数据包层(使用层的 OSI 或 TCP/IP 定义)。解码时,数据包的数据被分解为多个层。调用者可以调用 LayerType() 来确定他们从数据包中接收到的层类型。或者,他们可以使用类型断言来获取实际的层类型,以便对数据进行深入检查。
type Layer interface {
// layer类型
LayerType() LayerType
// layer的内容
LayerContents() []byte
// layer payload
LayerPayload() []byte
}
LinkLayer
type LinkLayer interface {
Layer
LinkFlow() Flow
}
NetworkLayer
type NetworkLayer interface {
Layer
NetworkFlow() Flow
}
TransportLayer
type TransportLayer interface {
Layer
TransportFlow() Flow
}
ApplicationLayer
type ApplicationLayer interface {
Layer
Payload() []byte
}
ErrorLayer
数据包解码失败时创建的数据包层。它的有效载荷是我们无法解码的所有字节,返回的错误详细说明了解码失败的原因。
type ErrorLayer interface {
Layer
Error() error
}
Flow
Flow表示数据包层的流量方向,作为源端点和目的端点。Flow可用作映射键。
type Flow struct {
typ EndpointType
slen, dlen int
src, dst [MaxEndpointSize]byte
}
NewFlow
创建一个Flow,src和dst长度必须小于16,否则会panic
func NewFlow(t EndpointType, src, dst []byte) (f Flow)
公共方法
// src Endpoint
func (f Flow) Src() (src Endpoint)
// dst Endpoint
func (f Flow) Dst() (dst Endpoint)
// Endpoint类型
func (f Flow) EndpointType() EndpointType
// src 和 dst Endpoint
func (f Flow) Endpoints() (src, dst Endpoint)
// 快速散列函数
func (f Flow) FastHash() (h uint64)
// 将src与dst转换
func (f Flow) Reverse() Flow
// 输出
func (f Flow) String() string
Enpoint
Enpoint是用于在各个层寻址数据包的一组字节。请参阅 LinkLayer[^4]、NetworkLayer[^5]和 TransportLayer[^6]规范。Enpoint可用作映射键。
type Endpoint struct {
typ EndpointType
len int
raw [MaxEndpointSize]byte
}
// 新建Endpoint,raw的大小必须小于MaxEndpointSize
func NewEndpoint(typ EndpointType , raw [] byte ) (e Endpoint )
// EndpointType
func (a Endpoint) EndpointType() EndpointType
// 快速散列函数
func (a Endpoint ) FastHash() (h uint64 )
// 返回原始byte
func (a Endpoint) Raw() []byte
// 为所有端点提供稳定的排序。
// 1. 根据端点的 EndpointType 进行排序
// 2. 然后根据该端点的原始字节进行排序。
// 排序首先基于端点类型,然后基于原始端点字节。端点字节按字典顺序排序。
func (a Endpoint) LessThan(b Endpoint) bool
func (a Endpoint) String() string
Pcap
Handle
OpenLive
// device:指定网络要捕获的网络设备名称,可以是FindAllDevs返回的设备的Name
// snaplen:每个数据包读取的最大长度
// promisc:是否将网口设置为混杂模式,即是否接收目的地址不为本机的包
// timeout:设置抓到包返回的超时。如果设置成30s,那么每30s才会刷新一次数据包;设置成负数,会立刻刷新数据包,即不做等待
func OpenLive(device string, snaplen int32, promisc bool, timeout time.Duration) (handle *Handle, _ error)
OpenOffline
打开一个存储libpcap支持的文件
func OpenOffline(file string) (handle *Handle, err error)
OpenOfflineFile
与OpenOffline类似,需要我们手动打开文件后返回的文件句柄作为参数
func OpenOfflineFile(file * os . File ) (handle * Handle , err error )
InactiveHandle
NewInactiveHandle
创建一个pcap句柄,但是需要调用Active进行激活,使用或者你可以更加方便的更改其参数
func NewInactiveHandle(device string) (*InactiveHandle, error)
pcapgo
Writer
Writer 封装了一个底层的 io.Writer 来以 PCAP 格式写入数据包数据。
type Writer struct {
w io.Writer
tsScaler int
buf [16]byte
}
NewWriter与NewWriterNanos
返回一个新的 writer 对象,用于将数据包数据写入给定的 writer。如果这是一个新的空写入器(与追加相反),则必须在 WritePacket 之前调用 WriteFileHeader[^7]。数据包时间戳以微秒精度写入。
func NewWriter(w io . Writer ) * Writer
func NewWriterNanos(w io . Writer ) * Writer
示例:
// 创建一个空文件
f, _ := os.Create("/tmp/file.pcap")
// 新建一个Writer对象
w := pcapgo.NewWriter(f)
// 新文件,必须写入头
w.WriteFileHeader(65536, layers.LinkTypeEthernet)
// 写入包
w.WritePacket(gopacket.CaptureInfo{
...}, data1)
f.Close()
// 追加
f2, _ := os.OpenFile("/tmp/file.pcap", os.O_APPEND, 0700)
// 创建一个writer
w2 := pcapgo.NewWriter(f2)
//
w2.WritePacket(gopacket.CaptureInfo{
...}, data2)
f2.Close()
WriteFileHeader
写入文件头通过链路层类型,每次新建文件时必须首先调用此函数
func (w *Writer) WriteFileHeader(snaplen uint32, linktype layers.LinkType) error
WritePacket
func (w *Writer) WritePacket(ci gopacket.CaptureInfo, data []byte) error
ip4defrag - IPV4分片重组
包ip4defrag实现了IPV4分片重组。
IP4Defragmenter
IP4Defragmenter嵌入一个map用来存储IPV4分片
type IPv4Defragmenter struct {
sync.RWMutex
ipFlows map[ipv4]*fragmentList
}
NewIPv4Defragmenter
构造一个IPv4Defragmenter
func NewIPv4Defragmenter() *IPv4Defragmenter {
return &IPv4Defragmenter{
ipFlows: make(map[ipv4]*fragmentList),
}
}
DefragIPv4
DefragIPv4 接收带有片段有效负载的 IPv4 数据包。
它不会修改 IPv4 层,“in”保持不变它返回一个准备好使用的 IPv4 层。
如果传入的 IPv4 层没有分片,它会立即返回而不修改该层。
如果 IPv4 层是一个片段并且我们没有所有片段,它将返回 nil 并存储最终对数据包进行碎片整理所需的任何内部信息。
如果 IPv4 层是重建数据包所需的最后一个片段,则将返回一个新的 IPv4 层,并将设置为整个碎片整理的数据包,
func (d * IPv4Defragmenter ) DefragIPv4(in * layers . IPv4 )
(* layers . IPv4 , error )
使用示例
func HandlePacket(in *layers.IPv4) err {
defragger := ip4defrag.NewIPv4Defragmenter()
in, err := defragger.DefragIPv4(in)
if err != nil {
return err
} else if in == nil {
return nil // packet fragment, we don't have whole packet yet.
}
// At this point, we know that 'in' is defragmented.
//It may be the same 'in' passed to
// HandlePacket, or it may not, but we don't really care :)
... do stuff to 'in' ...
}
DefragIPv4WithTimestamp
DefragIPv4WithTimestamp 提供 DefragIPv4 的功能,带有一个额外的时间戳参数,用于丢弃旧片段而不是 time.Now()
这在操作 pcap 文件而不是实时捕获的数据时很有用
func (d *IPv4Defragmenter) DefragIPv4WithTimestamp(in *layers.IPv4, t time.Time) (*layers.IPv4, error)
DiscardOlderThan
DiscardOlderThan 会忘记自时间 t 以来没有任何活动的所有数据包。它返回 FragmentList 的数量,也就是它丢弃的片段数据包的数量。
func (d * IPv4Defragmenter ) DiscardOlderThan(t time.Time ) int
layers- 为许多常见协议提供解码
layers为许多常见协议提供解码layer。
layers包含许多不同类型的数据包layer的解码实现。
要查看 gopacket/layers 当前能够解码的协议集,请查看变量部分中定义的 LayerTypes 集。layers包还定义了许多具有与其相关联的src/dst地址的常见数据包层的端点,例如 IPv4/6 (IP) 和 TCP/UDP (端口)。最后,layers包含许多有用的枚举(IPProtocol、EthernetType、LinkType、PPPType 等)。其中许多实现了 gopacket.Decoder 接口,因此可以将它们作为解码器传递给 gopacket。
reassembly - 提供TCP包重组
reassembly包提供了TCP流重组。
Assembler 使用用户提供的 StreamFactory 创建用户定义的 Stream 接口,然后按流顺序将数据包数据传递给该对象。并发安全的 StreamPool 会跟踪所有当前正在重组的 Stream,因此可以同时运行多个 Assembler 来组装数据包,同时利用多个内核。
Assembler
FlushWithOption
func (a * Assembler ) FlushWithOptions(opt FlushOptions ) (flushed, closed int )
FlushWithOptions 发现任何等待数据包的数据包早于给定时间 T 的任何流,并推送他们拥有的数据(IE:告诉他们停止等待并跳过他们正在等待的数据)。
它还关闭早于 TC 的流(可以设置为零,以保持长期流的活动状态,但无论如何都要刷新数据)。
每个 Stream 都维护一个包含零个或多个乱序接收的字节集的列表。例如,如果它已经处理到序列号 10,它的列表中可能有字节 [15-20)、[20-25)、[30,50)。每组字节还具有最初查看的时间戳。刷新调用将查看最小的后续字节集,在本例中为 [15-20),如果它的时间戳早于传入的时间,它将把它和所有连续的字节集推送到 Stream 的 Reassembled功能。在这种情况下,它将推动 [15-20),但也会推动 [20-25),因为这是连续的。如果它的时间戳也早于传入的时间,它只会推送 [30-50),否则它将等到下一个 FlushCloseOlderThan 来查看字节 [25-30) 是否进来。
返回刷新的连接数,以及其中因刷新而关闭的连接数。
Stream
type Stream interface {
//告诉TCP数据包是否应该被接受,start可以修改为强制启动,即使没有看到SYN
Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool, ac AssemblerContext) bool
// ReassembledSG被调用0次或多次。
// ScatterGather在每次重组调用后被重用,
// 所以把你需要的东西都复制出来是很重要的,
// 特别是bytes(或使用KeepFrom())
ReassembledSG(sg ScatterGather, ac AssemblerContext)
//ReassemblyComplete会在程序集决定此流没有更多数据时调用,
//原因可能是看到了FIN或RST数据包,
//也可能是流超时而没有任何新数据包数据
//(原因是调用FlushCloseOlderThan)。
//如果它想查看Accept()的后续数据包,
//例如查看FIN-ACK,以便进行更深入的状态机分析,则返回false。
ReassemblyComplete(ac AssemblerContext) bool
}
ScatterGather
ScatterGather用于通过ReassembledSG将重新组装的数据和重新组装的数据包的元数据传递给流
type ScatterGather interface {
// 返回可用的字节数和保存的字节数
Lengths() (int, int)
// 返回数据最大为可用字节数
Fetch(length int) []byte
// 告诉防止偏移
KeepFrom(offset int)
// 返回对应于给定偏移量的数据包的CaptureInfo
CaptureInfo(offset int) gopacket.CaptureInfo
// 重组块的信息
// direction
Info() (direction TCPFlowDirection, start bool, end bool, skip int)
// 流的stats
Stats() TCPAssemblyStats
}
[^1]: ### Packets
[^2]: ## Packet
[^3]: ### NextPacket
[^4]: ## LinkLayer
[^5]: ## NetworkLayer
[^6]: ## TransportLayer
[^7]: ### WriteFileHeader