有时候批量积攒一批数据集中处理,是一个高效的提高程序性能的方法,比如我们可以批量写入数据库,批量发送消息到 kafka,批量写入网络数据等等。 批量把数据收集出来,我们常用 channel 类型,此时 channel 的功能就是一个 buffer,多个生产者把数据写入到 channel 中,消费者从 channel 中读取数据,但是 Go 的 channel 并没有提供批量读取的方法,我们需要自己实现一个。
github.com/smallnest/exp/chanx 库
当然我已经实现了一个 batch 库,你可以直接拿来用,本文主要介绍它的功能、使用方法以及设计原理和考量:github.com/smallnest/exp/chanx[1]。
我们可以使用这个库的Batch
方法来批量读取数据,它的定义如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))
- 第一个参数是
Context
,可以让调用者主动取消或者超时控制 - 第二个参数是 channel,我们从这个 channel 中读取数据。channel 可以在外部被关闭
- 第三个参数是批处理的大小,也就是我们从 channel 中读取一批数据的最大量
- 第四个参数是一个函数,我们把从 channel 中读取的一批数据传递给这个函数,由这个函数来处理这批数据
举一个例子:
func TestBatch(t *testing.T) { ch := make(chan int, 10) for i := 0; i < 10; i++ { ch <- i } count := 0 go Batch[int](context.Background( "int"), ch, 5, func(batch []int) { if len(batch) != 5 { assert.Fail(t, "expected batch size 5, got %d", len(batch)) } count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 10, count) }
这个例子一开始我们把 10 个数据写入到一个 channel 中,然后我们从 channel 中批量读取,每次读取 5 个,然后把这 5 个数据传递给一个函数来处理,我们可以看到,我们读取了两次,每次读取 5 个,总共读取了 10 个数据。
我们还可以使用FlatBatch
方法来批量读取批量数据,它的定义如下:
func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))
这个函数和Batch
类似,只不过它的 channel 中的数据是一个切片,每次从 channel 中读取到一个切片后,把这个切片中的数据展开放入到一批数据中,最后再传递给处理函数。所以它有Flat
和Batch
两个功能。
举一个例子:
func TestFlatBatch(t *testing.T) { ch := make(chan []int, 10) for i := 0; i < 10; i++ { ch <- []int{i, i} } count := 0 go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) { assert.NotEmpty(t, batch) count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 20, count) }
在这个例子中,我们把 10 个切片写入到 channel 中,每个切片中有两个元素,然后我们从 channel 中批量读取并展开,放入到一个 batch 中,如果 batch 中的数据大于货等于 5 个,就把这 5 个数据传递给一个函数来处理,我们可以看到,我们读取了两次,每次读取 5 个,总共读取了 10 个数据。
实现原理和考量
想要从 channel 中批量读取数据,我们需要考虑以下几个问题:
- 我们需要设定一个批处理的大小,不能无限制的读取而不处理,否则会把消费者饿死,内存也会爆表
- 从 channel 中读取数据的时候,如果 channel 中没有数据,我们需要等待,直到 channel 中有数据,或者 channel 被关闭。
- 不能无限制的等待,或者长时间的等待,否则消费者会饥饿,而且时延太长业务不允许
我先举一个简单但是不太好的实现方式,我们在它的基础上做优化:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
这个实现中我们使用了一个batch
变量来保存从 channel 中读取的数据,当batch
中的数据量达到batchSize
时,我们就把这个batch
传递给处理函数,然后清空batch
,继续读取数据。
这个实现的一个最大的问题就是,如果 channel 中没有数据,并且当前 batch 的数量还未达到预期, 我们就会一直等待,直到 channel 中有数据,或者 channel 被关闭,这样会导致消费者饥饿。
我们可以使用select
语句来解决这个问题,我们可以在select
语句中加入一个default
分支,当 channel 中没有数据的时候,就会执行default
分支以便在 channel 中没有数据的时候,我们能够把已读取到的数据也能交给函数 fn 去处理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
这个实现貌似解决了消费者饥饿的问题,但是也会带来一个新的问题,如果 channel 中总是没有数据,那么我们总是落入default
分支中,导致 CPU 空转,这个 goroutine 可能导致 CPU 占用 100%, 这样也不行。
有些人会使用time.After
来解决这个问题,我们可以在select
语句中加入一个time.After
分支,当 channel 中没有数据的时候,就会执行time.After
分支,这样我们就可以在 channel 中没有数据的时候,等待一段时间,如果还是没有数据,就把已读取到的数据也能交给函数 fn 去处理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } case <-time.After(100 * time.Millisecond): if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
这样貌似解决了 CPU 空转的问题,如果你测试这个实现,生产者在生产数据很慢的时候,程序的 CPU 的确不会占用 100%。 但是正如有经验的 Gopher 意识到的那样,这个实现还是有问题的,如果生产者生产数据的速度很快,而消费者处理数据的速度很慢,那么我们就会产生大量的Timer
,这些 Timer 不能及时的被回收,可能导致大量的内存占用,而且如果有大量的 Timer,也会导致 Go 运行时处理 Timer 的性能。
这里我提出一个新的解决办法,在这个库中实现了,我们不应该使用time.After
,因为time.After
既带来了性能的问题,还可能导致它在休眠的时候不能及时读取 channel 中的数据,导致业务时延增加。
最终的实现如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { // partial fn(batch) batch = make([]T, 0, batchSize) // reset } else { // empty // wait for more select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { return } batch = append(batch, v) } } } } }
这个实现的巧妙之处在于default
出来。
如果代码运行落入到default
分支,说明当前 channel 中没有数据可读。那么它会检查当前的batch
中是否有数据,如果有,就把这个batch
传递给处理函数,然后清空batch
,继续读取数据。这样已读取的数据能够及时得到处理。
如果当前的batch
中没有数据,那么它会再次进入select
语句,等待 channel 中有数据,或者 channel 被关闭,或者ctx
被取消。如果 channel 中没有数据,那么它会被阻塞,直到 channel 中有数据,或者 channel 被关闭,或者ctx
被取消。这样就能够及时的读取 channel 中的数据,而不会导致 CPU 空转。
通过在default
分支中的特殊处理,我们就可以低时延高效的从 channel 中批量读取数据了。
参考资料
[1]
github.com/smallnest/exp/chanx: https://github.com/smallnest/exp/blob/master/chanx/batcher.go