packetbeat包流转流程

简介: packetbeat包流转流程

packetbeat包流转流程

概览

  1. sniffer 从捕获器中捕获包转接给decode的
  2. d‍ecoder.Onpacket输入包,进行以太网层、IP层、传输层解析
  3. 由TCP、UDP接口转发至应用层

包捕获

sniffer Run

func (s *Sniffer) Run() error {
    handle, err := s.open()
    if err != nil {
        return fmt.Errorf("failed to start sniffer: %w", err)
    }
    defer handle.Close()

    var w *pcapgo.Writer
    if s.config.Dumpfile != "" {
        f, err := os.Create(s.config.Dumpfile)
        if err != nil {
            return err
        }
        defer f.Close()

        w = pcapgo.NewWriterNanos(f)
        err = w.WriteFileHeader(65535, handle.LinkType())
        if err != nil {
            return fmt.Errorf("failed to write dump file header to %s: %w", s.config.Dumpfile, err)
        }
    }

    decoder, err := s.decoders(handle.LinkType())
    if err != nil {
        return err
    }

    // Mark inactive sniffer as active. In case of the sniffer/packetbeat closing
    // before/while Run is executed, the state will be snifferClosing.
    // => return if state is already snifferClosing.
    if !s.state.CAS(snifferInactive, snifferActive) {
        return nil
    }
    defer s.state.Store(snifferInactive)

    var packets int
    for s.state.Load() == snifferActive {
        if s.config.OneAtATime {
            fmt.Fprintln(os.Stdout, "Press enter to read packet")
            fmt.Scanln()
        }

        data, ci, err := handle.ReadPacketData()
        if err == pcap.NextErrorTimeoutExpired || isAfpacketErrTimeout(err) { //nolint:errorlint // pcap.NextErrorTimeoutExpired is not wrapped.
            logp.Debug("sniffer", "timedout")
            continue
        }

        if err != nil {
            // ignore EOF, if sniffer was driven from file
            if err == io.EOF && s.config.File != "" { //nolint:errorlint // io.EOF should never be wrapped.
                return nil
            }

            s.state.Store(snifferInactive)
            return fmt.Errorf("sniffing error: %w", err)
        }

        if len(data) == 0 {
            // Empty packet, probably timeout from afpacket.
            continue
        }

        packets++

        if w != nil {
            err = w.WritePacket(ci, data)
            if err != nil {
                return fmt.Errorf("failed to write packet %d: %w", packets, err)
            }
        }

        logp.Debug("sniffer", "Packet number: %d", packets)
        // 解析包
        decoder.OnPacket(data, &ci)
    }

    return nil
}
AI 代码解读

decoder OnPacket

func (d *Decoder) OnPacket(data []byte, ci *gopacket.CaptureInfo) {
    defer logp.Recover("packet decoding failed")

    d.truncated = false

    current := d.linkLayerDecoder
    currentType := d.linkLayerType

    packet := protos.Packet{Ts: ci.Timestamp}

    debugf("decode packet data")
    processed := false

    if d.flowID != nil {
        d.flowID.Reset(d.flowIDBufferBacking[:0])

        // suppress flow stats snapshots while processing packet
        d.flows.Lock()
        defer d.flows.Unlock()
    }

    for len(data) > 0 {
        err := current.DecodeFromBytes(data, d)
        if err != nil {
            logp.Info("packet decode failed with: %v", err)
            break
        }

        nextType := current.NextLayerType()
        data = current.LayerPayload()
        // 分发
        processed, err = d.process(&packet, currentType)
        if err != nil {
            logp.Info("Error processing packet: %v", err)
            break
        }
        if processed {
            break
        }

        // choose next decoding layer
        next, ok := d.decoders[nextType]
        if !ok {
            break
        }

        // jump to next layer
        current = next
        currentType = nextType
    }

    // add flow s.tats
    if d.flowID != nil {
        debugf("flow id flags: %v", d.flowID.Flags())
    }

    if d.flowID != nil && d.flowID.Flags() != 0 {
        flow := d.flows.Get(d.flowID)
        d.statPackets.Add(flow, 1)
        d.statBytes.Add(flow, uint64(ci.Length))
    }
}
AI 代码解读

decoder process

func (d *Decoder) process(
    packet *protos.Packet,
    layerType gopacket.LayerType,
) (bool, error) {
    withFlow := d.flowID != nil

    switch layerType {
    case layers.LayerTypeEthernet:
        if withFlow {
            d.flowID.AddEth(d.eth.SrcMAC, d.eth.DstMAC)
        }

    case layers.LayerTypeDot1Q:
        d1q := &d.d1q[d.stD1Q.i]
        d.stD1Q.next()
        if withFlow {
            d.flowID.AddVLan(d1q.VLANIdentifier)
        }

    case layers.LayerTypeIPv4:
        debugf("IPv4 packet")
        ip4 := &d.ip4[d.stIP4.i]
        d.stIP4.next()

        if withFlow {
            d.flowID.AddIPv4(ip4.SrcIP, ip4.DstIP)
        }

        packet.Tuple.SrcIP = ip4.SrcIP
        packet.Tuple.DstIP = ip4.DstIP
        packet.Tuple.IPLength = 4

    case layers.LayerTypeIPv6:
        debugf("IPv6 packet")
        ip6 := &d.ip6[d.stIP6.i]
        d.stIP6.next()

        if withFlow {
            d.flowID.AddIPv6(ip6.SrcIP, ip6.DstIP)
        }

        packet.Tuple.SrcIP = ip6.SrcIP
        packet.Tuple.DstIP = ip6.DstIP
        packet.Tuple.IPLength = 16

    case layers.LayerTypeICMPv4:
        debugf("ICMPv4 packet")
        d.onICMPv4(packet)
        return true, nil

    case layers.LayerTypeICMPv6:
        debugf("ICMPv6 packet")
        d.onICMPv6(packet)
        return true, nil

    case layers.LayerTypeUDP:
        debugf("UDP packet")
        d.onUDP(packet)
        return true, nil

    case layers.LayerTypeTCP:
        debugf("TCP packet")
        d.onTCP(packet)
        return true, nil
    }

    return false, nil
}
AI 代码解读

TCP

onTcp

func (d *Decoder) onTCP(packet *protos.Packet) {
    src := uint16(d.tcp.SrcPort)
    dst := uint16(d.tcp.DstPort)

    id := d.flowID
    if id != nil {
        id.AddTCP(src, dst)
    }

    packet.Tuple.SrcPort = src
    packet.Tuple.DstPort = dst
    packet.Payload = d.tcp.Payload

    if id == nil && len(packet.Payload) == 0 && !d.tcp.FIN {
        // We have no use for this atm.
        debugf("Ignore empty non-FIN packet")
        return
    }
    packet.Tuple.ComputeHashables()
    d.tcpProc.Process(id, &d.tcp, packet)
}
AI 代码解读

tcp Process

func (tcp *TCP) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet) {
    // This Recover should catch all exceptions in
    // protocol modules.
    defer logp.Recover("Process tcp exception")

    tcp.expiredConns.notifyAll()

    stream, created := tcp.getStream(pkt)
    if stream.conn == nil {
        return
    }

    conn := stream.conn
    if id != nil {
        id.AddConnectionID(uint64(conn.id))
    }

    if isDebug {
        logp.Debug("tcp", "tcp flow id: %p", id)
    }

    if len(pkt.Payload) == 0 && !tcphdr.FIN {
        // return early if packet is not interesting. Still need to find/create
        // stream first in order to update the TCP stream timer
        return
    }

    tcpStartSeq := tcphdr.Seq
    tcpSeq := tcpStartSeq + uint32(len(pkt.Payload))
    lastSeq := conn.lastSeq[stream.dir]
    if isDebug {
        logp.Debug("tcp", "pkt.start_seq=%v pkt.last_seq=%v stream.last_seq=%v (len=%d)",
            tcpStartSeq, tcpSeq, lastSeq, len(pkt.Payload))
    }

    if len(pkt.Payload) > 0 && lastSeq != 0 {
        if tcpSeqBeforeEq(tcpSeq, lastSeq) {
            if isDebug {
                logp.Debug("tcp", "Ignoring retransmitted segment. pkt.seq=%v len=%v stream.seq=%v",
                    tcphdr.Seq, len(pkt.Payload), lastSeq)
            }
            return
        }

        switch tcpSeqCompare(lastSeq, tcpStartSeq) {
        case seqLT: // lastSeq < tcpStartSeq => Gap in tcp stream detected
            if created {
                break
            }

            gap := int(tcpStartSeq - lastSeq)
            logp.Debug("tcp", "Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcpStartSeq, gap)
            drop := stream.gapInStream(gap)
            if drop {
                if isDebug {
                    logp.Debug("tcp", "Dropping connection state because of gap")
                }
                droppedBecauseOfGaps.Add(1)

                // drop application layer connection state and
                // update stream_id for app layer analysers using stream_id for lookups
                conn.id = tcp.getID()
                conn.data = nil
            }

        case seqGT:
            // lastSeq > tcpStartSeq => overlapping TCP segment detected. shrink packet
            delta := lastSeq - tcpStartSeq

            if isDebug {
                logp.Debug("tcp", "Overlapping tcp segment. last_seq %d, seq: %d, delta: %d",
                    lastSeq, tcpStartSeq, delta)
            }

            pkt.Payload = pkt.Payload[delta:]
            tcphdr.Seq += delta
        }
    }

    conn.lastSeq[stream.dir] = tcpSeq
    stream.addPacket(pkt, tcphdr)
}
AI 代码解读

相关文章
构建离线应用:Apollo与本地状态管理
构建离线应用:Apollo与本地状态管理
RocketMQ5 PopAck源码拆解
分享RocketMQ5.X Pop,Ack源码解读。内容较多建议PC上对照代码查看,手机你可能会晕
270 0
RocketMQ5 PopAck源码拆解
07-rsync企业真实项目备份案例实战(需求收集--服务器配置---客户端配置---报警机制---数据校验---邮件告警)
2.需求描述 客户端需求: 1.客户端每天凌晨1点在服务器本地打包备份(系统配置文件、日志文件、其他目录、应用配置文件) 2.客户端备份的数据必须存放至以主机名IP地址当前时间命名的目录中,例如/backup/nfs_192.168.81.210_2020-05-26,其实更好的备份方式/backup/192.168.81.210/nfs_2020-05-26,一会采用后者,要求不只是备份文件,要求连上级目录一并拷过来
204 0
07-rsync企业真实项目备份案例实战(需求收集--服务器配置---客户端配置---报警机制---数据校验---邮件告警)
【非广告】Gitbook 接入 Gitlab Webhook 功能,实现文档实时在线更新(下)
Hello,大家好,我是阿粉,对接文档是每个开发人员不可避免都要写的,友好的文档可以大大的提升工作效率。阿粉最近将项目的文档基于 Gitbook 和 Gitlab 的 Webhook 功能的在内网部署了一套实时的,使用起来特方便了。跟着阿粉的步骤,教你部署自己的文档服务。
【非广告】Gitbook 接入 Gitlab Webhook 功能,实现文档实时在线更新(下)
【非广告】Gitbook 接入 Gitlab Webhook 功能,实现文档实时在线更新(上)
Hello,大家好,我是阿粉,对接文档是每个开发人员不可避免都要写的,友好的文档可以大大的提升工作效率。阿粉最近将项目的文档基于 Gitbook 和 Gitlab 的 Webhook 功能的在内网部署了一套实时的,使用起来特方便了。跟着阿粉的步骤,教你部署自己的文档服务。
【非广告】Gitbook 接入 Gitlab Webhook 功能,实现文档实时在线更新(上)
数据集成模块流程组件之条件分发介绍
在数据集成的过程中,在一些场景下,需要对上游数据进行分发操作,条件分发组件可对上游数据根据配置条件进行分发,本文将介绍如何进行条件分发组件的配置。
332 0
数据集成模块流程组件之条件分发介绍
flowable 从zip压缩包 部署流程定义
flowable 从zip压缩包 部署流程定义
179 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等