gopacket tcpassembly源码分析

简介: gopacket tcpassembly源码分析

调用

  参考示例example/httpassembly

  1. 自定义一个factory,实现New接口
type httpStream struct {
    net, transport gopacket.Flow
    r              tcpreader.ReaderStream
}
func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
}
  1. New接口保存一个tcpreader.NewReaderStream()流,启动处理流的协程,然后返回这个流

func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
    hstream := &httpStream{
        net:       net,
        transport: transport,
        r:         tcpreader.NewReaderStream(),
    }
    go hstream.run() // Important... we must guarantee that data from the reader stream is read.

    // ReaderStream implements tcpassembly.Stream, so we can return a pointer to it.
    return &hstream.r
}
  1. 流处理协程,建一个buf,从这个buf中读取数据,然后重组解析
func (h *httpStream) run() {
    buf := bufio.NewReader(&h.r)
    for {
        req, err := http.ReadRequest(buf)
        if err == io.EOF {
            // We must read until we see an EOF... very important!
            return
        } else if err != nil {
            log.Println("Error reading stream", h.net, h.transport, ":", err)
        } else {
            bodyBytes := tcpreader.DiscardBytesToEOF(req.Body)
            req.Body.Close()
            log.Println("Received request from stream", h.net, h.transport, ":", req, "with", bodyBytes, "bytes in request body")
        }
    }
}
  1. 使用,和reassembly一样
func main() {
    defer util.Run()()
    // 1. 打开设备
    var handle *pcap.Handle
    var err error
    handle, err = pcap.OpenLive(*iface, int32(*snaplen), true, pcap.BlockForever)
    if err != nil {
        log.Fatal(err)
    }
    // 设置BPF
    if err := handle.SetBPFFilter(*filter); err != nil {
        log.Fatal(err)
    }

    // 2. 初始化assembly
    streamFactory := &httpStreamFactory{}
    streamPool := tcpassembly.NewStreamPool(streamFactory)
    assembler := tcpassembly.NewAssembler(streamPool)

    log.Println("reading in packets")
    // 3.初始化packetSource
    packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
    packets := packetSource.Packets()
    ticker := time.Tick(time.Second)
    for {
        select {
        // 4. 读取包
        case packet := <-packets:
            // A nil packet indicates the end of a pcap file.
            if packet == nil {
                return
            }
            if *logAllPackets {
                log.Println(packet)
            }
            if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
                log.Println("Unusable packet")
                continue
            }
            tcp := packet.TransportLayer().(*layers.TCP)
            // 5. tcp直接丢进去
            assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)

        case <-ticker:
            // 6. 定时书信连接
            assembler.FlushOlderThan(time.Now().Add(time.Minute * -2))
        }
    }
}

Assembler

type AssemblerOptions struct {
    // 等待无序包时要缓冲的page总数最大值
    // 一旦达到这个上限值, Assembler将会降级刷新每个连接的,如果<=0将被忽略。
    MaxBufferedPagesTotal int
    // 单个连接缓冲的page最大值
    // 如果达到上限,则将刷新最小序列号以及任何连续数据。如果<= 0,这将被忽略。
    MaxBufferedPagesPerConnection int
}

type Assembler struct {
    AssemblerOptions    // 选项
    ret      []Reassembly    // 数据包
    pc       *pageCache
    connPool *StreamPool
}

// 创建一个Assember
// pool: StreamPool,来Assember共享
// DefaultAssemblerOptions
// 改造建议 - 选项模式与默认值
func NewAssembler(pool *StreamPool) *Assembler {
    pool.mu.Lock()
    pool.users++
    pool.mu.Unlock()
    return &Assembler{
        ret:              make([]Reassembly, assemblerReturnValueInitialSize),
        pc:               newPageCache(),
        connPool:         pool,
        AssemblerOptions: DefaultAssemblerOptions, //默认值,无限制
    }
}

AssemblyTimestamp

func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) {
    // 忽略空的数据包,比如keepalived
    // tcp握手时 t.SYN = 1 t.FIN = 0 t.RST = 0 len(t.LayerPayload()) == 0 
    // 即false && true && true && true
    // tcp挥手时 t.SYN = 0 t.FIN = 1 t.RST = 0 len(t.LayerPayload()) == 0 
    // 即true && false && true && true
    if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 {
        if *debugLog {
            log.Println("ignoring useless packet")
        }
        return
    }

    a.ret = a.ret[:0]
    // 4元组组成的key
    key := key{netFlow, t.TransportFlow()}

    var conn *connection
    // This for loop handles a race condition where a connection will close, lock
    // the connection pool, and remove itself, but before it locked the connection
    // pool it's returned to another Assemble statement.  This should loop 0-1
    // times for the VAST majority of cases.
    // 创建conn
    for {
        // tcp keepalive syn=0 payload=0
        // 即 true && true end 为true?
        conn = a.connPool.getConnection(key, !t.SYN && len(t.LayerPayload()) == 0, timestamp)
        if conn == nil {
            if *debugLog {
                log.Printf("%v got empty packet on otherwise empty connection", key)
            }
            return
        }
        conn.mu.Lock()
        if !conn.closed {
            break
        }
        conn.mu.Unlock()
    }
    if conn.lastSeen.Before(timestamp) {
        conn.lastSeen = timestamp
    }
    //type Sequence int64 提供Difference和Add函数
    seq, bytes := Sequence(t.Seq), t.Payload // seq:当前序号  bytes:tcp负载的数据
    // 校验序号
    if conn.nextSeq == invalidSequence {
        if t.SYN {
            if *debugLog {
                log.Printf("%v saw first SYN packet, returning immediately, seq=%v", key, seq)
            }
            // 添加 Reassembly重组后的对象
            a.ret = append(a.ret, Reassembly{
                Bytes: bytes,
                Skip:  0,
                Start: true,
                Seen:  timestamp,
            })
            // 下一个包的序号 = 当前的序号 + 字节数 + 1
            conn.nextSeq = seq.Add(len(bytes) + 1)
        } else {
            if *debugLog {
                log.Printf("%v waiting for start, storing into connection", key)
            }
            // 插入到数据到connection中
            a.insertIntoConn(t, conn, timestamp)
        }
    } else if diff := conn.nextSeq.Difference(seq); diff > 0 {
        if *debugLog {
            log.Printf("%v gap in sequence numbers (%v, %v) diff %v, storing into connection", key, conn.nextSeq, seq, diff)
        }
        // 插入到数据到connection中
        a.insertIntoConn(t, conn, timestamp)
    } else {=<0
        // 字节校准
        bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes)
        if *debugLog {
            log.Printf("%v found contiguous data (%v, %v), returning immediately", key, seq, conn.nextSeq)
        }
        a.ret = append(a.ret, Reassembly{
            Bytes: bytes,
            Skip:  0,
            End:   t.RST || t.FIN,
            Seen:  timestamp,
        })
    }
    if len(a.ret) > 0 {
        a.sendToConnection(conn)
    }
    conn.mu.Unlock()
}

insertIntoConn

func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) {
    if conn.first != nil && conn.first.seq == conn.nextSeq {
        panic("wtf")
    }
    // p:第一页 p2:最后一页 numPages:页数
    p, p2, numPages := a.pagesFromTCP(t, ts)

    //遍历双向链接page列表获取正确的放置给定序号的位置
    // 直接插入不好吗?
    prev, current := conn.traverseConn(Sequence(t.Seq))
    conn.pushBetween(prev, current, p, p2)
    conn.pages += numPages

    // 校验最大缓冲page数
    if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) ||
        (a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) {
        if *debugLog {
            log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used)
        }
        // 弹出
        a.addNextFromConn(conn)
    }
}

pagesFromTCP

  从TCP数据包创建一个page(或设置一个pages)。

  注意此函数不应该接受SYN包,因为它不能正确处理seq。

  返回双连接的page列表中的第一个和最后一个页面。

func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) {
    first := a.pc.next(ts)
    current := first
    numPages++
    seq, bytes := Sequence(t.Seq), t.Payload
    for {
        length := min(len(bytes), pageBytes)
        // 拷贝负载数据
        current.Bytes = current.buf[:length]
        copy(current.Bytes, bytes)
        // 设置seq
        current.seq = seq
        // 处理剩余数据>1900,一般不会进入到这里,实际场景下MTU会将TCP切段
        bytes = bytes[length:]
        if len(bytes) == 0 {
            break
        }
        seq = seq.Add(length)
        // 创建下一页
        current.next = a.pc.next(ts)
        // 设置下一个的prev为current
        current.next.prev = current
        // 设置下一页
        current = current.next
        numPages++
    }
    current.End = t.RST || t.FIN // 设置end
    return first, current, numPages
}

addNextFromConn

  弹出第一页

func (a *Assembler) addNextFromConn(conn *connection) {
    if conn.nextSeq == invalidSequence {
        conn.first.Skip = -1
    } else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
        conn.first.Skip = int(diff)
    }
    conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
    if *debugLog {
        log.Printf("%v   adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
    }
    a.ret = append(a.ret, conn.first.Reassembly)
    a.pc.replace(conn.first)
    if conn.first == conn.last {
        conn.first = nil
        conn.last = nil
    } else {
        conn.first = conn.first.next
        conn.first.prev = nil
    }
    conn.pages--
}

sendToConnection

func (a *Assembler) sendToConnection(conn *connection) {
    // 组数据
    a.addContiguous(conn)
    if conn.stream == nil {
        panic("why?")
    }
    conn.stream.Reassembled(a.ret)
    if a.ret[len(a.ret)-1].End {
        a.closeConnection(conn)
    }
}

addContiguous

func (a *Assembler) addContiguous(conn *connection) {
    for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 {
        a.addNextFromConn(conn)
    }
}

addNextFromConn

  弹出第一页添加到数组中

func (a *Assembler) addNextFromConn(conn *connection) {
    if conn.nextSeq == invalidSequence {
        conn.first.Skip = -1
    } else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
        conn.first.Skip = int(diff)
    }
    conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
    if *debugLog {
        log.Printf("%v   adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
    }
    a.ret = append(a.ret, conn.first.Reassembly)
    a.pc.replace(conn.first)
    if conn.first == conn.last {
        conn.first = nil
        conn.last = nil
    } else {
        conn.first = conn.first.next
        conn.first.prev = nil
    }
    conn.pages--
}

closeConnection

func (a *Assembler) closeConnection(conn *connection) {
    if *debugLog {
        log.Printf("%v closing", conn.key)
    }
    conn.stream.ReassemblyComplete()
    conn.closed = true
    a.connPool.remove(conn)
    for p := conn.first; p != nil; p = p.next {
        a.pc.replace(p)
    }
}

StreamPool

  管理流的连接池,初始连接池分配1024个

type StreamPool struct {
    conns              map[key]*connection
    users              int
    mu                 sync.RWMutex
    factory            StreamFactory
    free               []*connection
    all                [][]connection
    nextAlloc          int
    newConnectionCount int64
}

func NewStreamPool(factory StreamFactory) *StreamPool {
    return &StreamPool{
        conns:     make(map[key]*connection, initialAllocSize),
        free:      make([]*connection, 0, initialAllocSize),
        factory:   factory,
        nextAlloc: initialAllocSize,
    }
}

grow

  分配连接

func (p *StreamPool) grow() {
    conns := make([]connection, p.nextAlloc)
    p.all = append(p.all, conns)
    for i := range conns {
        p.free = append(p.free, &conns[i])
    }
    if *memLog {
        log.Println("StreamPool: created", p.nextAlloc, "new connections")
    }
    p.nextAlloc *= 2
}

newConnection

  创建连接

func (p *StreamPool) newConnection(k key, s Stream, ts time.Time) (c *connection) {
    if *memLog {
        p.newConnectionCount++
        if p.newConnectionCount&0x7FFF == 0 {
            log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
        }
    }
    if len(p.free) == 0 {
        p.grow()
    }
    index := len(p.free) - 1
    c, p.free = p.free[index], p.free[:index]
    c.reset(k, s, ts)
    return c
}

getConnection

// 返回一个连接,如果连接已经被关闭或者连接不存在,返回nil
func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection {
    p.mu.RLock()
    conn := p.conns[k]
    p.mu.RUnlock()
    if end || conn != nil {
        return conn
    }
    s := p.factory.New(k[0], k[1])
    p.mu.Lock()
    conn = p.newConnection(k, s, ts)
    if conn2 := p.conns[k]; conn2 != nil {
        p.mu.Unlock()
        return conn2
    }
    p.conns[k] = conn
    p.mu.Unlock()
    return conn
}

remove

  删除某个个连接

func (p *StreamPool) remove(conn *connection) {
    p.mu.Lock()
    delete(p.conns, conn.key)
    p.free = append(p.free, conn)
    p.mu.Unlock()
}

connection

  返回所有的连接

func (p *StreamPool) connections() []*connection {
    p.mu.RLock()
    conns := make([]*connection, 0, len(p.conns))
    for _, conn := range p.conns {
        conns = append(conns, conn)
    }
    p.mu.RUnlock()
    return conns
}

connection

type connection struct {
    key               key
    pages             int
    first, last       *page
    nextSeq           Sequence
    created, lastSeen time.Time
    stream            Stream
    closed            bool
    mu                sync.Mutex
}

reset

  因为连接是预先分配的,所以需要重置,相当于初始化

func (c *connection) reset(k key, s Stream, ts time.Time) {
    c.key = k
    c.pages = 0
    c.first, c.last = nil, nil
    c.nextSeq = invalidSequence
    c.created = ts
    c.stream = s
    c.closed = false
}

traverseConn

  遍历双向链接page列表获取正确的放置给定序号的位置。

  注意它是向后遍历的,从最大的序号开始,一直往下,因为我们假设常见的情况是TCP数据包的流是顺序,与最小损失或数据包重新排序。

  遍历双向链接的page列表,以找到放置给定序号的正确位置。

  注意,它是向后遍历的,从最高的序号开始,一直向下,因为我们假设通常的情况是,流的TCP包将按顺序出现,损失或包重排序最小。

func (c *connection) traverseConn(seq Sequence) (prev, current *page) {
    prev = c.last
    for prev != nil && prev.seq.Difference(seq) < 0 {
        current = prev
        prev = current.prev
    }
    return
}

pushbettwen

  首先插入双向链接列表first-…-last在另一个双链接列表中的节点prevnext之间。如果prev为nil,则首先成为连接列表中的新第一页。如果next为nil,则使last成为列表中新的最后一页。第一个/最后一个可能指向相同的页面。

func (c *connection) pushBetween(prev, next, first, last *page) {
    // Maintain our doubly linked list
    if next == nil || c.last == nil {
        c.last = last
    } else {
        last.next = next
        next.prev = last
    }
    if prev == nil || c.first == nil {
        c.first = first
    } else {
        first.prev = prev
        prev.next = first
    }
}

Reassembly

  Reassembly由assembler传入stream中。

type Reassembly struct {
    // TCP流的负载,可能为空
    Bytes []byte
    // 如果在此和最后一个Reassembly之间跳过字节,则Skip设置为非零。
    // 如果这是连接中的第一个包,我们没有看到开始,我们不知道我们跳过了多少字节,所以我们将它设为-1。
    // 否则,它被设置为跳过的字节数。
    Skip int
    // 如果这组字节带有TCP SYN,则设置Start。
    Start bool
    // 如果这组字节带有TCP FIN或RST,则设置End。
    End bool
    // 看到的是这组字节从网络中取出的时间戳。
    Seen time.Time
}

pageCache

  pageCache是一个并发不安全的page对象,使用此应该避免内存分配。它会增长但是不会收缩。

type pageCache struct {
    free         []*page
    pcSize       int
    size, used   int
    pages        [][]page
    pageRequests int64
}
func newPageCache() *pageCache {
    pc := &pageCache{
        free:   make([]*page, 0, initialAllocSize),
        pcSize: initialAllocSize,
    }
    pc.grow()
    return pc
}

grow

  分配空间

func (c *pageCache) grow() {
    pages := make([]page, c.pcSize)
    c.pages = append(c.pages, pages)
    c.size += c.pcSize
    for i := range pages {
        c.free = append(c.free, &pages[i])
    }
    if *memLog {
        log.Println("PageCache: created", c.pcSize, "new pages")
    }
    c.pcSize *= 2
}

next

  返回一个感觉的page对象

func (c *pageCache) next(ts time.Time) (p *page) {
    if *memLog {
        c.pageRequests++
        if c.pageRequests&0xFFFF == 0 {
            log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
        }
    }
    if len(c.free) == 0 {
        c.grow()
    }
    i := len(c.free) - 1
    p, c.free = c.free[i], c.free[:i]
    p.prev = nil
    p.next = nil
    p.Reassembly = Reassembly{Bytes: p.buf[:0], Seen: ts}
    c.used++
    return p
}

replace

  替换一个page

func (c *pageCache) replace(p *page) {
    c.used--
    c.free = append(c.free, p)
}

page

  用来存储未处理的TCP数据包(无序包)。

  未使用的page存储在页面缓存中,并从pageCache中返回,避免内存分配。

  使用的page存储在connection中的双向链表中。

type page struct {
    Reassembly
    seq        Sequence
    index      int
    prev, next *page
    buf        [pageBytes]byte
}

byteSpan

func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) {
    if expected == invalidSequence {
        return bytes, received.Add(len(bytes))
    }
    span := int(received.Difference(expected))
    if span <= 0 {
        return bytes, received.Add(len(bytes))
    } else if len(bytes) < span {
        return nil, expected
    }
    return bytes[span:], expected.Add(len(bytes) - span)
}

  ‍

相关文章
|
JSON Kubernetes 数据格式
K8S client-go Patch example
我在本文中主要会介绍使用client-go的Patch方式,主要包括strategic merge patch和json-patch
|
3月前
|
存储 人工智能 安全
go sync.Map 设计与实现
go sync.Map 设计与实现
35 1
|
3月前
|
存储 缓存 安全
go sync.Pool 设计与实现
go sync.Pool 设计与实现
37 2
|
3月前
|
缓存 安全 测试技术
深入理解 go sync.Map - 基本原理
深入理解 go sync.Map - 基本原理
35 0
|
3月前
|
安全 编译器 数据库连接
深入理解 go sync.Once
深入理解 go sync.Once
38 0
|
11月前
|
存储 缓存 网络协议
gopacket reassembly源码分析
gopacket reassembly源码分析
|
11月前
|
Linux Go Windows
gopacket使用
gopacket使用
|
11月前
|
存储 网络协议 安全
gopacket API
gopacket API
|
安全 Java Go
GO通道和 sync 包的分享
GO通道和 sync 包的分享
|
存储 机器学习/深度学习 Unix
Go源码解析之format.go(2)
Go源码解析之format.go(2)
115 0