victoriaMetrics之byteBuffer
VictoriaMetrics经常会处理数目庞大的指标,在处理的过程中会涉及指标的拷贝,如果在指标拷贝时都进行内存申请的话,其内存消耗和性能损耗都非常大。victoriaMetrics使用byteBuffer
来复用内存,提升性能,其核心就是用了sync.pool
。下面主要看下它是如何结合sync.pool
运作的。
ByteBuffer
的结构体如下,只包含一个切片:
type ByteBuffer struct { // B is the underlying byte slice. B []byte }
ByteBufferPool的用法
为了服用ByteBuffer
,victoriaMetrics用到了ByteBufferPool
,与常见的sync.Pool
用法相同,包含一个Get
和一个Put
函数。
// ByteBufferPool is a pool of ByteBuffers. type ByteBufferPool struct { p sync.Pool } // Get obtains a ByteBuffer from bbp. func (bbp *ByteBufferPool) Get() *ByteBuffer { bbv := bbp.p.Get() if bbv == nil { return &ByteBuffer{} } return bbv.(*ByteBuffer) } // Put puts bb into bbp. func (bbp *ByteBufferPool) Put(bb *ByteBuffer) { bb.Reset() bbp.p.Put(bb) }
Put
函数用于将ByteBuffer
返回给资源池,为了防止下次使用的时候出现无效数据,在返回给sync.Pool
之前需要清空切片内存,其使用的Reset
函数如下,bb.B = bb.B[:0]
也是一种常见的清空切片内容的方式:
func (bb *ByteBuffer) Reset() { bb.B = bb.B[:0] }
ByteBuffer
实现了io.Writer
和io.ReaderFrom
接口。
Writer接口实现
实现的write
接口如下,比较简单,只是简单地将入参数据添加到byteBuffer中。在append
的时候会增加切片的容量。
func (bb *ByteBuffer) Write(p []byte) (int, error) { bb.B = append(bb.B, p...) return len(p), nil }
ReaderFrom接口实现
ReaderFrom
中比较有意思的是看它是如何预分配容量,以及在容量不足的情况下,如何进行扩容。其核心思想是使用make
预先申请一块内存,而不是通过append
来让底层自动扩容。
- 首先获取b的长度,表示切片中已有的数据长度
- 由于
ByteBuffer
可能来自ByteBufferPool.Get
,因此,其切片容量可能无法满足数据读取的需要,此时用到了ResizeWithCopyMayOverallocate
,ResizeWithCopyMayOverallocate
确保切片的容量不小于n
字节,如果容量足够,则返回长度为n
的子切片,否则申请新的切片,并返回长度为n的子切片。roundToNearestPow2
会找出最接近n
的2的整倍数的数值,以此作为新切片的容量。
// ResizeNoCopyMayOverallocate resizes b to minimum n bytes and returns the resized buffer (which may be newly allocated). // // If newly allocated buffer is returned then b contents isn't copied to it. func ResizeNoCopyMayOverallocate(b []byte, n int) []byte { if n <= cap(b) { return b[:n] } nNew := roundToNearestPow2(n) bNew := make([]byte, nNew) return bNew[:n] } // roundToNearestPow2 rounds n to the nearest power of 2 // // It is expected that n > 0 func roundToNearestPow2(n int) int { pow2 := uint8(bits.Len(uint(n - 1))) return 1 << pow2 }
- 将b的长度等于容量
- 设置offset为b中已有的数据偏移量
- 获取剩余的容量
free
,如果剩余的容量不足一半(free < offset
),则将容量翻倍 - 将数据读取到
offset
之后的存储中,并增加偏移量 - 当
Read
操作返回错误时,将ByteBuffer
中的切片长度设置为b,如果返回错误为EOF,则视为数据读取完成。
// ReadFrom reads all the data from r to bb until EOF. func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) { b := bb.B bLen := len(b)//1 b = ResizeWithCopyMayOverallocate(b, 4*1024) //2 b = b[:cap(b)]//3 offset := bLen//4 for { if free := len(b) - offset; free < offset {//5 n := len(b) b = append(b, make([]byte, n)...) } n, err := r.Read(b[offset:])//6 offset += n if err != nil {//7 bb.B = b[:offset] if err == io.EOF { err = nil } return int64(offset - bLen), err//9 } } }
如果无需从io.Reader中获取数据,则可以使用如下Write方法写入sync.pool中。
package util import ( "sync" ) // ByteBuffer implements a simple byte buffer. type ByteBuffer struct { // B is the underlying byte slice. B []byte } // Reset resets bb. func (bb *ByteBuffer) Reset() { bb.B = bb.B[:0] } // Resize resizes b to n bytes and returns b (which may be newly allocated). func resize(b []byte, n int) []byte { if nn := n - cap(b); nn > 0 { b = append(b[:cap(b)], make([]byte, nn)...) } return b[:n] } // ReadFrom reads all the data from r to bb until EOF. func (bb *ByteBuffer) Write(data []byte) { bb.B = resize(bb.B, len(data)) copy(bb.B, data) } // ByteBufferPool is a pool of ByteBuffers. type ByteBufferPool struct { p sync.Pool } // Get obtains a ByteBuffer from bbp. func (bbp *ByteBufferPool) Get() *ByteBuffer { bbv := bbp.p.Get() if bbv == nil { return &ByteBuffer{} } return bbv.(*ByteBuffer) } // Put puts bb into bbp. func (bbp *ByteBufferPool) Put(bb *ByteBuffer) { bb.Reset() bbp.p.Put(bb) }
总结
后续可以使用该库来满足从io.Reader
中读取数据,而不用担心buffer不足,借助ByteBufferPool
可以有效地复用buffer。