packetbeat包流转流程

简介: packetbeat包流转流程

packetbeat包流转流程

概览

  1. sniffer 从捕获器中捕获包转接给decode的
  2. d‍ecoder.Onpacket输入包,进行以太网层、IP层、传输层解析
  3. 由TCP、UDP接口转发至应用层

包捕获

sniffer Run

func (s *Sniffer) Run() error {
    handle, err := s.open()
    if err != nil {
        return fmt.Errorf("failed to start sniffer: %w", err)
    }
    defer handle.Close()

    var w *pcapgo.Writer
    if s.config.Dumpfile != "" {
        f, err := os.Create(s.config.Dumpfile)
        if err != nil {
            return err
        }
        defer f.Close()

        w = pcapgo.NewWriterNanos(f)
        err = w.WriteFileHeader(65535, handle.LinkType())
        if err != nil {
            return fmt.Errorf("failed to write dump file header to %s: %w", s.config.Dumpfile, err)
        }
    }

    decoder, err := s.decoders(handle.LinkType())
    if err != nil {
        return err
    }

    // Mark inactive sniffer as active. In case of the sniffer/packetbeat closing
    // before/while Run is executed, the state will be snifferClosing.
    // => return if state is already snifferClosing.
    if !s.state.CAS(snifferInactive, snifferActive) {
        return nil
    }
    defer s.state.Store(snifferInactive)

    var packets int
    for s.state.Load() == snifferActive {
        if s.config.OneAtATime {
            fmt.Fprintln(os.Stdout, "Press enter to read packet")
            fmt.Scanln()
        }

        data, ci, err := handle.ReadPacketData()
        if err == pcap.NextErrorTimeoutExpired || isAfpacketErrTimeout(err) { //nolint:errorlint // pcap.NextErrorTimeoutExpired is not wrapped.
            logp.Debug("sniffer", "timedout")
            continue
        }

        if err != nil {
            // ignore EOF, if sniffer was driven from file
            if err == io.EOF && s.config.File != "" { //nolint:errorlint // io.EOF should never be wrapped.
                return nil
            }

            s.state.Store(snifferInactive)
            return fmt.Errorf("sniffing error: %w", err)
        }

        if len(data) == 0 {
            // Empty packet, probably timeout from afpacket.
            continue
        }

        packets++

        if w != nil {
            err = w.WritePacket(ci, data)
            if err != nil {
                return fmt.Errorf("failed to write packet %d: %w", packets, err)
            }
        }

        logp.Debug("sniffer", "Packet number: %d", packets)
        // 解析包
        decoder.OnPacket(data, &ci)
    }

    return nil
}

decoder OnPacket

func (d *Decoder) OnPacket(data []byte, ci *gopacket.CaptureInfo) {
    defer logp.Recover("packet decoding failed")

    d.truncated = false

    current := d.linkLayerDecoder
    currentType := d.linkLayerType

    packet := protos.Packet{Ts: ci.Timestamp}

    debugf("decode packet data")
    processed := false

    if d.flowID != nil {
        d.flowID.Reset(d.flowIDBufferBacking[:0])

        // suppress flow stats snapshots while processing packet
        d.flows.Lock()
        defer d.flows.Unlock()
    }

    for len(data) > 0 {
        err := current.DecodeFromBytes(data, d)
        if err != nil {
            logp.Info("packet decode failed with: %v", err)
            break
        }

        nextType := current.NextLayerType()
        data = current.LayerPayload()
        // 分发
        processed, err = d.process(&packet, currentType)
        if err != nil {
            logp.Info("Error processing packet: %v", err)
            break
        }
        if processed {
            break
        }

        // choose next decoding layer
        next, ok := d.decoders[nextType]
        if !ok {
            break
        }

        // jump to next layer
        current = next
        currentType = nextType
    }

    // add flow s.tats
    if d.flowID != nil {
        debugf("flow id flags: %v", d.flowID.Flags())
    }

    if d.flowID != nil && d.flowID.Flags() != 0 {
        flow := d.flows.Get(d.flowID)
        d.statPackets.Add(flow, 1)
        d.statBytes.Add(flow, uint64(ci.Length))
    }
}

decoder process

func (d *Decoder) process(
    packet *protos.Packet,
    layerType gopacket.LayerType,
) (bool, error) {
    withFlow := d.flowID != nil

    switch layerType {
    case layers.LayerTypeEthernet:
        if withFlow {
            d.flowID.AddEth(d.eth.SrcMAC, d.eth.DstMAC)
        }

    case layers.LayerTypeDot1Q:
        d1q := &d.d1q[d.stD1Q.i]
        d.stD1Q.next()
        if withFlow {
            d.flowID.AddVLan(d1q.VLANIdentifier)
        }

    case layers.LayerTypeIPv4:
        debugf("IPv4 packet")
        ip4 := &d.ip4[d.stIP4.i]
        d.stIP4.next()

        if withFlow {
            d.flowID.AddIPv4(ip4.SrcIP, ip4.DstIP)
        }

        packet.Tuple.SrcIP = ip4.SrcIP
        packet.Tuple.DstIP = ip4.DstIP
        packet.Tuple.IPLength = 4

    case layers.LayerTypeIPv6:
        debugf("IPv6 packet")
        ip6 := &d.ip6[d.stIP6.i]
        d.stIP6.next()

        if withFlow {
            d.flowID.AddIPv6(ip6.SrcIP, ip6.DstIP)
        }

        packet.Tuple.SrcIP = ip6.SrcIP
        packet.Tuple.DstIP = ip6.DstIP
        packet.Tuple.IPLength = 16

    case layers.LayerTypeICMPv4:
        debugf("ICMPv4 packet")
        d.onICMPv4(packet)
        return true, nil

    case layers.LayerTypeICMPv6:
        debugf("ICMPv6 packet")
        d.onICMPv6(packet)
        return true, nil

    case layers.LayerTypeUDP:
        debugf("UDP packet")
        d.onUDP(packet)
        return true, nil

    case layers.LayerTypeTCP:
        debugf("TCP packet")
        d.onTCP(packet)
        return true, nil
    }

    return false, nil
}

TCP

onTcp

func (d *Decoder) onTCP(packet *protos.Packet) {
    src := uint16(d.tcp.SrcPort)
    dst := uint16(d.tcp.DstPort)

    id := d.flowID
    if id != nil {
        id.AddTCP(src, dst)
    }

    packet.Tuple.SrcPort = src
    packet.Tuple.DstPort = dst
    packet.Payload = d.tcp.Payload

    if id == nil && len(packet.Payload) == 0 && !d.tcp.FIN {
        // We have no use for this atm.
        debugf("Ignore empty non-FIN packet")
        return
    }
    packet.Tuple.ComputeHashables()
    d.tcpProc.Process(id, &d.tcp, packet)
}

tcp Process

func (tcp *TCP) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet) {
    // This Recover should catch all exceptions in
    // protocol modules.
    defer logp.Recover("Process tcp exception")

    tcp.expiredConns.notifyAll()

    stream, created := tcp.getStream(pkt)
    if stream.conn == nil {
        return
    }

    conn := stream.conn
    if id != nil {
        id.AddConnectionID(uint64(conn.id))
    }

    if isDebug {
        logp.Debug("tcp", "tcp flow id: %p", id)
    }

    if len(pkt.Payload) == 0 && !tcphdr.FIN {
        // return early if packet is not interesting. Still need to find/create
        // stream first in order to update the TCP stream timer
        return
    }

    tcpStartSeq := tcphdr.Seq
    tcpSeq := tcpStartSeq + uint32(len(pkt.Payload))
    lastSeq := conn.lastSeq[stream.dir]
    if isDebug {
        logp.Debug("tcp", "pkt.start_seq=%v pkt.last_seq=%v stream.last_seq=%v (len=%d)",
            tcpStartSeq, tcpSeq, lastSeq, len(pkt.Payload))
    }

    if len(pkt.Payload) > 0 && lastSeq != 0 {
        if tcpSeqBeforeEq(tcpSeq, lastSeq) {
            if isDebug {
                logp.Debug("tcp", "Ignoring retransmitted segment. pkt.seq=%v len=%v stream.seq=%v",
                    tcphdr.Seq, len(pkt.Payload), lastSeq)
            }
            return
        }

        switch tcpSeqCompare(lastSeq, tcpStartSeq) {
        case seqLT: // lastSeq < tcpStartSeq => Gap in tcp stream detected
            if created {
                break
            }

            gap := int(tcpStartSeq - lastSeq)
            logp.Debug("tcp", "Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcpStartSeq, gap)
            drop := stream.gapInStream(gap)
            if drop {
                if isDebug {
                    logp.Debug("tcp", "Dropping connection state because of gap")
                }
                droppedBecauseOfGaps.Add(1)

                // drop application layer connection state and
                // update stream_id for app layer analysers using stream_id for lookups
                conn.id = tcp.getID()
                conn.data = nil
            }

        case seqGT:
            // lastSeq > tcpStartSeq => overlapping TCP segment detected. shrink packet
            delta := lastSeq - tcpStartSeq

            if isDebug {
                logp.Debug("tcp", "Overlapping tcp segment. last_seq %d, seq: %d, delta: %d",
                    lastSeq, tcpStartSeq, delta)
            }

            pkt.Payload = pkt.Payload[delta:]
            tcphdr.Seq += delta
        }
    }

    conn.lastSeq[stream.dir] = tcpSeq
    stream.addPacket(pkt, tcphdr)
}

相关文章
|
8月前
修正flowable的发起流程中根据用户信息流转不同的流程
修正flowable的发起流程中根据用户信息流转不同的流程
84 0
|
前端开发 Java Spring
源码浅析SpringMVC请求的流转过程
Spring MVC框架使用了其”模型-视图-控制器”( Model-View-Controller )架构方式,用于开发灵活且松散耦合的 Web 应用程序。我们都使用过SpringMVC来处理信息,并渲染视图到Browser。但需要注意的是,在现在的架构中,大都采用了前后端分离的情况,而我们在使用SpringMVC的时候,只需要关注M(Model),C(Controller)这两个部分,而视图渲染的部分则交给了前端。
330 0
源码浅析SpringMVC请求的流转过程
|
数据采集 分布式计算 监控
网站流量日志分析--工作流调度--预处理调度--程序打包 job 编写 | 学习笔记
快速学习网站流量日志分析--工作流调度--预处理调度--程序打包 job 编写
网站流量日志分析--工作流调度--预处理调度--程序打包 job 编写 | 学习笔记
|
数据采集 资源调度 监控
网站流量日志分析--工作流调度--预处理调度--功能实现 | 学习笔记
快速学习网站流量日志分析--工作流调度--预处理调度--功能实现
网站流量日志分析--工作流调度--预处理调度--功能实现 | 学习笔记
|
XML 移动开发 网络协议
mPaas-H5离线包常用排查工具
工欲善其事,必先利其器。应用开发和问题排查的方法论完全不一样,应用开发强调的是从无到有的构建方法,而问题排查强调的是抽丝剥茧的分析之法。问题排查的基础是建立在对程序”预期行为“的理解和掌握上,任何偏离预期的行为都是解决问题的线索。程序的”行为“活动除了问题本身的症状表象,更多的内容则记录和体现在内部或外部日志中,从日志中观察异常行为,再作出合理推测是排查过程的基本要素。在离线包相关问题的排查中,内部日志主要是app控制台日志,H5页面控制台日志;外部日志主要包括:HTTP应用层网络包和TCP层网络包。根据症状正确地使用工具去捕获到合适的日志是问题分析的基石。这一章节主要介绍三个实用工具的具体使
776 0
mPaas-H5离线包常用排查工具
|
NoSQL 数据可视化 关系型数据库
Swoole Tracker v3.3.0 版本发布,支持链路追踪上报到 Zipkin
Tracker 此版本修改为了 Zend 扩展,所以需要通过zend_extension=swoole_tracker的方式进行加载
576 0
Swoole Tracker v3.3.0 版本发布,支持链路追踪上报到 Zipkin
|
JSON 数据格式
数据集成模块流程组件之条件分发介绍
在数据集成的过程中,在一些场景下,需要对上游数据进行分发操作,条件分发组件可对上游数据根据配置条件进行分发,本文将介绍如何进行条件分发组件的配置。
332 0
数据集成模块流程组件之条件分发介绍
|
移动开发 编解码 缓存
mPaas H5离线包优化指南
在移动互联网时代的今天,市场上绝大多数终端App都在使用H5展示页面,且随着终端技术迭代更新和市场多变性,H5页面在App中的占比越来越重要。同时也暴露出一个所有App的共性问题,即性能优化。同样的H5页面的性能优化也是重点问题。 在mPaaS团队中虽然已将H5页面资源等打包做离线包了,但在复杂的客户环境、开发环境、市场环境下,客户端的H5离线包仍有性能优化问题,这里整理简单了集团下对H5离线包的优化策略方案,以供参考。
2879 0
mPaas H5离线包优化指南
|
Apache
Apache SkyWalking 告警动态配置源码简析
AlarmModuleProvider实现了ModuleProvider接口,通过SPI的方式被加载进来。AlarmModuleProvider的prepare方法先被调用,做一些预处理:
313 0
|
存储 算法 搜索推荐
一文了解EPaxos核心协议流程
EPaxos(Egalitarian Paxos)作为工业界备受瞩目的下一代分布式一致性算法,具有广阔的应用前景。但纵观业内,至今仍未出现一个EPaxos的工程实现,甚至都没看到一篇能把EPaxos讲得通俗一点的文章。EPaxos算法理论虽好,但由于其实在晦涩难懂,工程实现上也有很多挑战,实际应用落地尚未成熟。本文将用通俗易懂的语言介绍EPaxos算法,由浅入深、一步一步的让只有Paxos或Raft等分布式一致性算法基础的同学都能轻易看懂EPaxos,真正将晦涩难懂的EPaxos,变的平易近人,带入千万家。
一文了解EPaxos核心协议流程