packetbeat包流转流程
概览
- sniffer 从捕获器中捕获包转接给decode的
- decoder.Onpacket输入包,进行以太网层、IP层、传输层解析
- 由TCP、UDP接口转发至应用层
包捕获
sniffer Run
func (s *Sniffer) Run() error {
handle, err := s.open()
if err != nil {
return fmt.Errorf("failed to start sniffer: %w", err)
}
defer handle.Close()
var w *pcapgo.Writer
if s.config.Dumpfile != "" {
f, err := os.Create(s.config.Dumpfile)
if err != nil {
return err
}
defer f.Close()
w = pcapgo.NewWriterNanos(f)
err = w.WriteFileHeader(65535, handle.LinkType())
if err != nil {
return fmt.Errorf("failed to write dump file header to %s: %w", s.config.Dumpfile, err)
}
}
decoder, err := s.decoders(handle.LinkType())
if err != nil {
return err
}
// Mark inactive sniffer as active. In case of the sniffer/packetbeat closing
// before/while Run is executed, the state will be snifferClosing.
// => return if state is already snifferClosing.
if !s.state.CAS(snifferInactive, snifferActive) {
return nil
}
defer s.state.Store(snifferInactive)
var packets int
for s.state.Load() == snifferActive {
if s.config.OneAtATime {
fmt.Fprintln(os.Stdout, "Press enter to read packet")
fmt.Scanln()
}
data, ci, err := handle.ReadPacketData()
if err == pcap.NextErrorTimeoutExpired || isAfpacketErrTimeout(err) { //nolint:errorlint // pcap.NextErrorTimeoutExpired is not wrapped.
logp.Debug("sniffer", "timedout")
continue
}
if err != nil {
// ignore EOF, if sniffer was driven from file
if err == io.EOF && s.config.File != "" { //nolint:errorlint // io.EOF should never be wrapped.
return nil
}
s.state.Store(snifferInactive)
return fmt.Errorf("sniffing error: %w", err)
}
if len(data) == 0 {
// Empty packet, probably timeout from afpacket.
continue
}
packets++
if w != nil {
err = w.WritePacket(ci, data)
if err != nil {
return fmt.Errorf("failed to write packet %d: %w", packets, err)
}
}
logp.Debug("sniffer", "Packet number: %d", packets)
// 解析包
decoder.OnPacket(data, &ci)
}
return nil
}
decoder OnPacket
func (d *Decoder) OnPacket(data []byte, ci *gopacket.CaptureInfo) {
defer logp.Recover("packet decoding failed")
d.truncated = false
current := d.linkLayerDecoder
currentType := d.linkLayerType
packet := protos.Packet{Ts: ci.Timestamp}
debugf("decode packet data")
processed := false
if d.flowID != nil {
d.flowID.Reset(d.flowIDBufferBacking[:0])
// suppress flow stats snapshots while processing packet
d.flows.Lock()
defer d.flows.Unlock()
}
for len(data) > 0 {
err := current.DecodeFromBytes(data, d)
if err != nil {
logp.Info("packet decode failed with: %v", err)
break
}
nextType := current.NextLayerType()
data = current.LayerPayload()
// 分发
processed, err = d.process(&packet, currentType)
if err != nil {
logp.Info("Error processing packet: %v", err)
break
}
if processed {
break
}
// choose next decoding layer
next, ok := d.decoders[nextType]
if !ok {
break
}
// jump to next layer
current = next
currentType = nextType
}
// add flow s.tats
if d.flowID != nil {
debugf("flow id flags: %v", d.flowID.Flags())
}
if d.flowID != nil && d.flowID.Flags() != 0 {
flow := d.flows.Get(d.flowID)
d.statPackets.Add(flow, 1)
d.statBytes.Add(flow, uint64(ci.Length))
}
}
decoder process
func (d *Decoder) process(
packet *protos.Packet,
layerType gopacket.LayerType,
) (bool, error) {
withFlow := d.flowID != nil
switch layerType {
case layers.LayerTypeEthernet:
if withFlow {
d.flowID.AddEth(d.eth.SrcMAC, d.eth.DstMAC)
}
case layers.LayerTypeDot1Q:
d1q := &d.d1q[d.stD1Q.i]
d.stD1Q.next()
if withFlow {
d.flowID.AddVLan(d1q.VLANIdentifier)
}
case layers.LayerTypeIPv4:
debugf("IPv4 packet")
ip4 := &d.ip4[d.stIP4.i]
d.stIP4.next()
if withFlow {
d.flowID.AddIPv4(ip4.SrcIP, ip4.DstIP)
}
packet.Tuple.SrcIP = ip4.SrcIP
packet.Tuple.DstIP = ip4.DstIP
packet.Tuple.IPLength = 4
case layers.LayerTypeIPv6:
debugf("IPv6 packet")
ip6 := &d.ip6[d.stIP6.i]
d.stIP6.next()
if withFlow {
d.flowID.AddIPv6(ip6.SrcIP, ip6.DstIP)
}
packet.Tuple.SrcIP = ip6.SrcIP
packet.Tuple.DstIP = ip6.DstIP
packet.Tuple.IPLength = 16
case layers.LayerTypeICMPv4:
debugf("ICMPv4 packet")
d.onICMPv4(packet)
return true, nil
case layers.LayerTypeICMPv6:
debugf("ICMPv6 packet")
d.onICMPv6(packet)
return true, nil
case layers.LayerTypeUDP:
debugf("UDP packet")
d.onUDP(packet)
return true, nil
case layers.LayerTypeTCP:
debugf("TCP packet")
d.onTCP(packet)
return true, nil
}
return false, nil
}
TCP
onTcp
func (d *Decoder) onTCP(packet *protos.Packet) {
src := uint16(d.tcp.SrcPort)
dst := uint16(d.tcp.DstPort)
id := d.flowID
if id != nil {
id.AddTCP(src, dst)
}
packet.Tuple.SrcPort = src
packet.Tuple.DstPort = dst
packet.Payload = d.tcp.Payload
if id == nil && len(packet.Payload) == 0 && !d.tcp.FIN {
// We have no use for this atm.
debugf("Ignore empty non-FIN packet")
return
}
packet.Tuple.ComputeHashables()
d.tcpProc.Process(id, &d.tcp, packet)
}
tcp Process
func (tcp *TCP) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet) {
// This Recover should catch all exceptions in
// protocol modules.
defer logp.Recover("Process tcp exception")
tcp.expiredConns.notifyAll()
stream, created := tcp.getStream(pkt)
if stream.conn == nil {
return
}
conn := stream.conn
if id != nil {
id.AddConnectionID(uint64(conn.id))
}
if isDebug {
logp.Debug("tcp", "tcp flow id: %p", id)
}
if len(pkt.Payload) == 0 && !tcphdr.FIN {
// return early if packet is not interesting. Still need to find/create
// stream first in order to update the TCP stream timer
return
}
tcpStartSeq := tcphdr.Seq
tcpSeq := tcpStartSeq + uint32(len(pkt.Payload))
lastSeq := conn.lastSeq[stream.dir]
if isDebug {
logp.Debug("tcp", "pkt.start_seq=%v pkt.last_seq=%v stream.last_seq=%v (len=%d)",
tcpStartSeq, tcpSeq, lastSeq, len(pkt.Payload))
}
if len(pkt.Payload) > 0 && lastSeq != 0 {
if tcpSeqBeforeEq(tcpSeq, lastSeq) {
if isDebug {
logp.Debug("tcp", "Ignoring retransmitted segment. pkt.seq=%v len=%v stream.seq=%v",
tcphdr.Seq, len(pkt.Payload), lastSeq)
}
return
}
switch tcpSeqCompare(lastSeq, tcpStartSeq) {
case seqLT: // lastSeq < tcpStartSeq => Gap in tcp stream detected
if created {
break
}
gap := int(tcpStartSeq - lastSeq)
logp.Debug("tcp", "Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcpStartSeq, gap)
drop := stream.gapInStream(gap)
if drop {
if isDebug {
logp.Debug("tcp", "Dropping connection state because of gap")
}
droppedBecauseOfGaps.Add(1)
// drop application layer connection state and
// update stream_id for app layer analysers using stream_id for lookups
conn.id = tcp.getID()
conn.data = nil
}
case seqGT:
// lastSeq > tcpStartSeq => overlapping TCP segment detected. shrink packet
delta := lastSeq - tcpStartSeq
if isDebug {
logp.Debug("tcp", "Overlapping tcp segment. last_seq %d, seq: %d, delta: %d",
lastSeq, tcpStartSeq, delta)
}
pkt.Payload = pkt.Payload[delta:]
tcphdr.Seq += delta
}
}
conn.lastSeq[stream.dir] = tcpSeq
stream.addPacket(pkt, tcphdr)
}