调用
参考示例example/httpassembly
- 自定义一个factory,实现
New
接口
type httpStream struct {
net, transport gopacket.Flow
r tcpreader.ReaderStream
}
func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
}
New
接口保存一个tcpreader.NewReaderStream()
流,启动处理流的协程,然后返回这个流
func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
hstream := &httpStream{
net: net,
transport: transport,
r: tcpreader.NewReaderStream(),
}
go hstream.run() // Important... we must guarantee that data from the reader stream is read.
// ReaderStream implements tcpassembly.Stream, so we can return a pointer to it.
return &hstream.r
}
- 流处理协程,建一个buf,从这个buf中读取数据,然后重组解析
func (h *httpStream) run() {
buf := bufio.NewReader(&h.r)
for {
req, err := http.ReadRequest(buf)
if err == io.EOF {
// We must read until we see an EOF... very important!
return
} else if err != nil {
log.Println("Error reading stream", h.net, h.transport, ":", err)
} else {
bodyBytes := tcpreader.DiscardBytesToEOF(req.Body)
req.Body.Close()
log.Println("Received request from stream", h.net, h.transport, ":", req, "with", bodyBytes, "bytes in request body")
}
}
}
- 使用,和reassembly一样
func main() {
defer util.Run()()
// 1. 打开设备
var handle *pcap.Handle
var err error
handle, err = pcap.OpenLive(*iface, int32(*snaplen), true, pcap.BlockForever)
if err != nil {
log.Fatal(err)
}
// 设置BPF
if err := handle.SetBPFFilter(*filter); err != nil {
log.Fatal(err)
}
// 2. 初始化assembly
streamFactory := &httpStreamFactory{}
streamPool := tcpassembly.NewStreamPool(streamFactory)
assembler := tcpassembly.NewAssembler(streamPool)
log.Println("reading in packets")
// 3.初始化packetSource
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packets := packetSource.Packets()
ticker := time.Tick(time.Second)
for {
select {
// 4. 读取包
case packet := <-packets:
// A nil packet indicates the end of a pcap file.
if packet == nil {
return
}
if *logAllPackets {
log.Println(packet)
}
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
log.Println("Unusable packet")
continue
}
tcp := packet.TransportLayer().(*layers.TCP)
// 5. tcp直接丢进去
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
case <-ticker:
// 6. 定时书信连接
assembler.FlushOlderThan(time.Now().Add(time.Minute * -2))
}
}
}
Assembler
type AssemblerOptions struct {
// 等待无序包时要缓冲的page总数最大值
// 一旦达到这个上限值, Assembler将会降级刷新每个连接的,如果<=0将被忽略。
MaxBufferedPagesTotal int
// 单个连接缓冲的page最大值
// 如果达到上限,则将刷新最小序列号以及任何连续数据。如果<= 0,这将被忽略。
MaxBufferedPagesPerConnection int
}
type Assembler struct {
AssemblerOptions // 选项
ret []Reassembly // 数据包
pc *pageCache
connPool *StreamPool
}
// 创建一个Assember
// pool: StreamPool,来Assember共享
// DefaultAssemblerOptions
// 改造建议 - 选项模式与默认值
func NewAssembler(pool *StreamPool) *Assembler {
pool.mu.Lock()
pool.users++
pool.mu.Unlock()
return &Assembler{
ret: make([]Reassembly, assemblerReturnValueInitialSize),
pc: newPageCache(),
connPool: pool,
AssemblerOptions: DefaultAssemblerOptions, //默认值,无限制
}
}
AssemblyTimestamp
func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) {
// 忽略空的数据包,比如keepalived
// tcp握手时 t.SYN = 1 t.FIN = 0 t.RST = 0 len(t.LayerPayload()) == 0
// 即false && true && true && true
// tcp挥手时 t.SYN = 0 t.FIN = 1 t.RST = 0 len(t.LayerPayload()) == 0
// 即true && false && true && true
if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 {
if *debugLog {
log.Println("ignoring useless packet")
}
return
}
a.ret = a.ret[:0]
// 4元组组成的key
key := key{netFlow, t.TransportFlow()}
var conn *connection
// This for loop handles a race condition where a connection will close, lock
// the connection pool, and remove itself, but before it locked the connection
// pool it's returned to another Assemble statement. This should loop 0-1
// times for the VAST majority of cases.
// 创建conn
for {
// tcp keepalive syn=0 payload=0
// 即 true && true end 为true?
conn = a.connPool.getConnection(key, !t.SYN && len(t.LayerPayload()) == 0, timestamp)
if conn == nil {
if *debugLog {
log.Printf("%v got empty packet on otherwise empty connection", key)
}
return
}
conn.mu.Lock()
if !conn.closed {
break
}
conn.mu.Unlock()
}
if conn.lastSeen.Before(timestamp) {
conn.lastSeen = timestamp
}
//type Sequence int64 提供Difference和Add函数
seq, bytes := Sequence(t.Seq), t.Payload // seq:当前序号 bytes:tcp负载的数据
// 校验序号
if conn.nextSeq == invalidSequence {
if t.SYN {
if *debugLog {
log.Printf("%v saw first SYN packet, returning immediately, seq=%v", key, seq)
}
// 添加 Reassembly重组后的对象
a.ret = append(a.ret, Reassembly{
Bytes: bytes,
Skip: 0,
Start: true,
Seen: timestamp,
})
// 下一个包的序号 = 当前的序号 + 字节数 + 1
conn.nextSeq = seq.Add(len(bytes) + 1)
} else {
if *debugLog {
log.Printf("%v waiting for start, storing into connection", key)
}
// 插入到数据到connection中
a.insertIntoConn(t, conn, timestamp)
}
} else if diff := conn.nextSeq.Difference(seq); diff > 0 {
if *debugLog {
log.Printf("%v gap in sequence numbers (%v, %v) diff %v, storing into connection", key, conn.nextSeq, seq, diff)
}
// 插入到数据到connection中
a.insertIntoConn(t, conn, timestamp)
} else {=<0
// 字节校准
bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes)
if *debugLog {
log.Printf("%v found contiguous data (%v, %v), returning immediately", key, seq, conn.nextSeq)
}
a.ret = append(a.ret, Reassembly{
Bytes: bytes,
Skip: 0,
End: t.RST || t.FIN,
Seen: timestamp,
})
}
if len(a.ret) > 0 {
a.sendToConnection(conn)
}
conn.mu.Unlock()
}
insertIntoConn
func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) {
if conn.first != nil && conn.first.seq == conn.nextSeq {
panic("wtf")
}
// p:第一页 p2:最后一页 numPages:页数
p, p2, numPages := a.pagesFromTCP(t, ts)
//遍历双向链接page列表获取正确的放置给定序号的位置
// 直接插入不好吗?
prev, current := conn.traverseConn(Sequence(t.Seq))
conn.pushBetween(prev, current, p, p2)
conn.pages += numPages
// 校验最大缓冲page数
if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) ||
(a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) {
if *debugLog {
log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used)
}
// 弹出
a.addNextFromConn(conn)
}
}
pagesFromTCP
从TCP数据包创建一个page(或设置一个pages)。
注意此函数不应该接受SYN包,因为它不能正确处理seq。
返回双连接的page列表中的第一个和最后一个页面。
func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) {
first := a.pc.next(ts)
current := first
numPages++
seq, bytes := Sequence(t.Seq), t.Payload
for {
length := min(len(bytes), pageBytes)
// 拷贝负载数据
current.Bytes = current.buf[:length]
copy(current.Bytes, bytes)
// 设置seq
current.seq = seq
// 处理剩余数据>1900,一般不会进入到这里,实际场景下MTU会将TCP切段
bytes = bytes[length:]
if len(bytes) == 0 {
break
}
seq = seq.Add(length)
// 创建下一页
current.next = a.pc.next(ts)
// 设置下一个的prev为current
current.next.prev = current
// 设置下一页
current = current.next
numPages++
}
current.End = t.RST || t.FIN // 设置end
return first, current, numPages
}
addNextFromConn
弹出第一页
func (a *Assembler) addNextFromConn(conn *connection) {
if conn.nextSeq == invalidSequence {
conn.first.Skip = -1
} else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
conn.first.Skip = int(diff)
}
conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
if *debugLog {
log.Printf("%v adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
}
a.ret = append(a.ret, conn.first.Reassembly)
a.pc.replace(conn.first)
if conn.first == conn.last {
conn.first = nil
conn.last = nil
} else {
conn.first = conn.first.next
conn.first.prev = nil
}
conn.pages--
}
sendToConnection
func (a *Assembler) sendToConnection(conn *connection) {
// 组数据
a.addContiguous(conn)
if conn.stream == nil {
panic("why?")
}
conn.stream.Reassembled(a.ret)
if a.ret[len(a.ret)-1].End {
a.closeConnection(conn)
}
}
addContiguous
func (a *Assembler) addContiguous(conn *connection) {
for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 {
a.addNextFromConn(conn)
}
}
addNextFromConn
弹出第一页添加到数组中
func (a *Assembler) addNextFromConn(conn *connection) {
if conn.nextSeq == invalidSequence {
conn.first.Skip = -1
} else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
conn.first.Skip = int(diff)
}
conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
if *debugLog {
log.Printf("%v adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
}
a.ret = append(a.ret, conn.first.Reassembly)
a.pc.replace(conn.first)
if conn.first == conn.last {
conn.first = nil
conn.last = nil
} else {
conn.first = conn.first.next
conn.first.prev = nil
}
conn.pages--
}
closeConnection
func (a *Assembler) closeConnection(conn *connection) {
if *debugLog {
log.Printf("%v closing", conn.key)
}
conn.stream.ReassemblyComplete()
conn.closed = true
a.connPool.remove(conn)
for p := conn.first; p != nil; p = p.next {
a.pc.replace(p)
}
}
StreamPool
管理流的连接池,初始连接池分配1024个
type StreamPool struct {
conns map[key]*connection
users int
mu sync.RWMutex
factory StreamFactory
free []*connection
all [][]connection
nextAlloc int
newConnectionCount int64
}
func NewStreamPool(factory StreamFactory) *StreamPool {
return &StreamPool{
conns: make(map[key]*connection, initialAllocSize),
free: make([]*connection, 0, initialAllocSize),
factory: factory,
nextAlloc: initialAllocSize,
}
}
grow
分配连接
func (p *StreamPool) grow() {
conns := make([]connection, p.nextAlloc)
p.all = append(p.all, conns)
for i := range conns {
p.free = append(p.free, &conns[i])
}
if *memLog {
log.Println("StreamPool: created", p.nextAlloc, "new connections")
}
p.nextAlloc *= 2
}
newConnection
创建连接
func (p *StreamPool) newConnection(k key, s Stream, ts time.Time) (c *connection) {
if *memLog {
p.newConnectionCount++
if p.newConnectionCount&0x7FFF == 0 {
log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
}
}
if len(p.free) == 0 {
p.grow()
}
index := len(p.free) - 1
c, p.free = p.free[index], p.free[:index]
c.reset(k, s, ts)
return c
}
getConnection
// 返回一个连接,如果连接已经被关闭或者连接不存在,返回nil
func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection {
p.mu.RLock()
conn := p.conns[k]
p.mu.RUnlock()
if end || conn != nil {
return conn
}
s := p.factory.New(k[0], k[1])
p.mu.Lock()
conn = p.newConnection(k, s, ts)
if conn2 := p.conns[k]; conn2 != nil {
p.mu.Unlock()
return conn2
}
p.conns[k] = conn
p.mu.Unlock()
return conn
}
remove
删除某个个连接
func (p *StreamPool) remove(conn *connection) {
p.mu.Lock()
delete(p.conns, conn.key)
p.free = append(p.free, conn)
p.mu.Unlock()
}
connection
返回所有的连接
func (p *StreamPool) connections() []*connection {
p.mu.RLock()
conns := make([]*connection, 0, len(p.conns))
for _, conn := range p.conns {
conns = append(conns, conn)
}
p.mu.RUnlock()
return conns
}
connection
type connection struct {
key key
pages int
first, last *page
nextSeq Sequence
created, lastSeen time.Time
stream Stream
closed bool
mu sync.Mutex
}
reset
因为连接是预先分配的,所以需要重置,相当于初始化
func (c *connection) reset(k key, s Stream, ts time.Time) {
c.key = k
c.pages = 0
c.first, c.last = nil, nil
c.nextSeq = invalidSequence
c.created = ts
c.stream = s
c.closed = false
}
traverseConn
遍历双向链接page列表获取正确的放置给定序号的位置。
注意它是向后遍历的,从最大的序号开始,一直往下,因为我们假设常见的情况是TCP数据包的流是顺序,与最小损失或数据包重新排序。
遍历双向链接的page列表,以找到放置给定序号的正确位置。
注意,它是向后遍历的,从最高的序号开始,一直向下,因为我们假设通常的情况是,流的TCP包将按顺序出现,损失或包重排序最小。
func (c *connection) traverseConn(seq Sequence) (prev, current *page) {
prev = c.last
for prev != nil && prev.seq.Difference(seq) < 0 {
current = prev
prev = current.prev
}
return
}
pushbettwen
首先插入双向链接列表first-…-last在另一个双链接列表中的节点prevnext之间。如果prev为nil,则首先成为连接列表中的新第一页。如果next为nil,则使last成为列表中新的最后一页。第一个/最后一个可能指向相同的页面。
func (c *connection) pushBetween(prev, next, first, last *page) {
// Maintain our doubly linked list
if next == nil || c.last == nil {
c.last = last
} else {
last.next = next
next.prev = last
}
if prev == nil || c.first == nil {
c.first = first
} else {
first.prev = prev
prev.next = first
}
}
Reassembly
Reassembly由assembler传入stream中。
type Reassembly struct {
// TCP流的负载,可能为空
Bytes []byte
// 如果在此和最后一个Reassembly之间跳过字节,则Skip设置为非零。
// 如果这是连接中的第一个包,我们没有看到开始,我们不知道我们跳过了多少字节,所以我们将它设为-1。
// 否则,它被设置为跳过的字节数。
Skip int
// 如果这组字节带有TCP SYN,则设置Start。
Start bool
// 如果这组字节带有TCP FIN或RST,则设置End。
End bool
// 看到的是这组字节从网络中取出的时间戳。
Seen time.Time
}
pageCache
pageCache是一个并发不安全的page对象,使用此应该避免内存分配。它会增长但是不会收缩。
type pageCache struct {
free []*page
pcSize int
size, used int
pages [][]page
pageRequests int64
}
func newPageCache() *pageCache {
pc := &pageCache{
free: make([]*page, 0, initialAllocSize),
pcSize: initialAllocSize,
}
pc.grow()
return pc
}
grow
分配空间
func (c *pageCache) grow() {
pages := make([]page, c.pcSize)
c.pages = append(c.pages, pages)
c.size += c.pcSize
for i := range pages {
c.free = append(c.free, &pages[i])
}
if *memLog {
log.Println("PageCache: created", c.pcSize, "new pages")
}
c.pcSize *= 2
}
next
返回一个感觉的page对象
func (c *pageCache) next(ts time.Time) (p *page) {
if *memLog {
c.pageRequests++
if c.pageRequests&0xFFFF == 0 {
log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
}
}
if len(c.free) == 0 {
c.grow()
}
i := len(c.free) - 1
p, c.free = c.free[i], c.free[:i]
p.prev = nil
p.next = nil
p.Reassembly = Reassembly{Bytes: p.buf[:0], Seen: ts}
c.used++
return p
}
replace
替换一个page
func (c *pageCache) replace(p *page) {
c.used--
c.free = append(c.free, p)
}
page
用来存储未处理的TCP数据包(无序包)。
未使用的page存储在页面缓存中,并从pageCache中返回,避免内存分配。
使用的page存储在connection中的双向链表中。
type page struct {
Reassembly
seq Sequence
index int
prev, next *page
buf [pageBytes]byte
}
byteSpan
func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) {
if expected == invalidSequence {
return bytes, received.Add(len(bytes))
}
span := int(received.Difference(expected))
if span <= 0 {
return bytes, received.Add(len(bytes))
} else if len(bytes) < span {
return nil, expected
}
return bytes[span:], expected.Add(len(bytes) - span)
}