从Go channel中批量读取数据

简介: 从Go channel中批量读取数据

有时候批量积攒一批数据集中处理,是一个高效的提高程序性能的方法,比如我们可以批量写入数据库,批量发送消息到 kafka,批量写入网络数据等等。 批量把数据收集出来,我们常用 channel 类型,此时 channel 的功能就是一个 buffer,多个生产者把数据写入到 channel 中,消费者从 channel 中读取数据,但是 Go 的 channel 并没有提供批量读取的方法,我们需要自己实现一个。



github.com/smallnest/exp/chanx 库


当然我已经实现了一个 batch 库,你可以直接拿来用,本文主要介绍它的功能、使用方法以及设计原理和考量:github.com/smallnest/exp/chanx[1]


我们可以使用这个库的Batch方法来批量读取数据,它的定义如下:


func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))


  • 第一个参数是Context,可以让调用者主动取消或者超时控制
  • 第二个参数是 channel,我们从这个 channel 中读取数据。channel 可以在外部被关闭
  • 第三个参数是批处理的大小,也就是我们从 channel 中读取一批数据的最大量
  • 第四个参数是一个函数,我们把从 channel 中读取的一批数据传递给这个函数,由这个函数来处理这批数据


举一个例子:


func TestBatch(t *testing.T) {
 ch := make(chan int, 10)
 for i := 0; i < 10; i++ {
  ch <- i
 }
 count := 0
 go Batch[int](context.Background( "int"), ch, 5, func(batch []int) {
  if len(batch) != 5 {
   assert.Fail(t, "expected batch size 5, got %d", len(batch))
  }
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 10, count)
}


这个例子一开始我们把 10 个数据写入到一个 channel 中,然后我们从 channel 中批量读取,每次读取 5 个,然后把这 5 个数据传递给一个函数来处理,我们可以看到,我们读取了两次,每次读取 5 个,总共读取了 10 个数据。

我们还可以使用FlatBatch方法来批量读取批量数据,它的定义如下:


func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))


这个函数和Batch类似,只不过它的 channel 中的数据是一个切片,每次从 channel 中读取到一个切片后,把这个切片中的数据展开放入到一批数据中,最后再传递给处理函数。所以它有FlatBatch两个功能。

举一个例子:


func TestFlatBatch(t *testing.T) {
 ch := make(chan []int, 10)
 for i := 0; i < 10; i++ {
  ch <- []int{i, i}
 }
 count := 0
 go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) {
  assert.NotEmpty(t, batch)
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 20, count)
}


在这个例子中,我们把 10 个切片写入到 channel 中,每个切片中有两个元素,然后我们从 channel 中批量读取并展开,放入到一个 batch 中,如果 batch 中的数据大于货等于 5 个,就把这 5 个数据传递给一个函数来处理,我们可以看到,我们读取了两次,每次读取 5 个,总共读取了 10 个数据。


实现原理和考量


想要从 channel 中批量读取数据,我们需要考虑以下几个问题:


  1. 我们需要设定一个批处理的大小,不能无限制的读取而不处理,否则会把消费者饿死,内存也会爆表
  2. 从 channel 中读取数据的时候,如果 channel 中没有数据,我们需要等待,直到 channel 中有数据,或者 channel 被关闭。
  3. 不能无限制的等待,或者长时间的等待,否则消费者会饥饿,而且时延太长业务不允许


我先举一个简单但是不太好的实现方式,我们在它的基础上做优化:


func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }
   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  }
 }
}


这个实现中我们使用了一个batch变量来保存从 channel 中读取的数据,当batch中的数据量达到batchSize时,我们就把这个batch传递给处理函数,然后清空batch,继续读取数据。


这个实现的一个最大的问题就是,如果 channel 中没有数据,并且当前 batch 的数量还未达到预期, 我们就会一直等待,直到 channel 中有数据,或者 channel 被关闭,这样会导致消费者饥饿。


我们可以使用select语句来解决这个问题,我们可以在select语句中加入一个default分支,当 channel 中没有数据的时候,就会执行default分支以便在 channel 中没有数据的时候,我们能够把已读取到的数据也能交给函数 fn 去处理。


func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }
            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        default:
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}


这个实现貌似解决了消费者饥饿的问题,但是也会带来一个新的问题,如果 channel 中总是没有数据,那么我们总是落入default分支中,导致 CPU 空转,这个 goroutine 可能导致 CPU 占用 100%, 这样也不行。


有些人会使用time.After来解决这个问题,我们可以在select语句中加入一个time.After分支,当 channel 中没有数据的时候,就会执行time.After分支,这样我们就可以在 channel 中没有数据的时候,等待一段时间,如果还是没有数据,就把已读取到的数据也能交给函数 fn 去处理。


func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }
            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        case <-time.After(100 * time.Millisecond):
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}


这样貌似解决了 CPU 空转的问题,如果你测试这个实现,生产者在生产数据很慢的时候,程序的 CPU 的确不会占用 100%。 但是正如有经验的 Gopher 意识到的那样,这个实现还是有问题的,如果生产者生产数据的速度很快,而消费者处理数据的速度很慢,那么我们就会产生大量的Timer,这些 Timer 不能及时的被回收,可能导致大量的内存占用,而且如果有大量的 Timer,也会导致 Go 运行时处理 Timer 的性能。


这里我提出一个新的解决办法,在这个库中实现了,我们不应该使用time.After,因为time.After既带来了性能的问题,还可能导致它在休眠的时候不能及时读取 channel 中的数据,导致业务时延增加。


最终的实现如下:


func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }
   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  default:
   if len(batch) > 0 { // partial
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   } else { // empty
    // wait for more
    select {
    case <-ctx.Done():
     if len(batch) > 0 {
      fn(batch)
     }
     return
    case v, ok := <-ch:
     if !ok {
      return
     }
     batch = append(batch, v)
    }
   }
  }
 }
}


这个实现的巧妙之处在于default出来。


如果代码运行落入到default分支,说明当前 channel 中没有数据可读。那么它会检查当前的batch中是否有数据,如果有,就把这个batch传递给处理函数,然后清空batch,继续读取数据。这样已读取的数据能够及时得到处理。


如果当前的batch中没有数据,那么它会再次进入select语句,等待 channel 中有数据,或者 channel 被关闭,或者ctx被取消。如果 channel 中没有数据,那么它会被阻塞,直到 channel 中有数据,或者 channel 被关闭,或者ctx被取消。这样就能够及时的读取 channel 中的数据,而不会导致 CPU 空转。


通过在default分支中的特殊处理,我们就可以低时延高效的从 channel 中批量读取数据了。


参考资料


[1]

github.com/smallnest/exp/chanx: https://github.com/smallnest/exp/blob/master/chanx/batcher.go

相关文章
|
2月前
|
存储 Go 开发者
Go语言中的并发编程与通道(Channel)的深度探索
本文旨在深入探讨Go语言中并发编程的核心概念和实践,特别是通道(Channel)的使用。通过分析Goroutines和Channels的基本工作原理,我们将了解如何在Go语言中高效地实现并行任务处理。本文不仅介绍了基础语法和用法,还深入讨论了高级特性如缓冲通道、选择性接收以及超时控制等,旨在为读者提供一个全面的并发编程视角。
|
5月前
|
数据采集 网络协议 测试技术
使用Go Validator在Go应用中有效验证数据
使用Go Validator在Go应用中有效验证数据
|
2月前
|
安全 Go 数据处理
Go语言中的并发编程:掌握goroutine和channel的艺术####
本文深入探讨了Go语言在并发编程领域的核心概念——goroutine与channel。不同于传统的单线程执行模式,Go通过轻量级的goroutine实现了高效的并发处理,而channel作为goroutines之间通信的桥梁,确保了数据传递的安全性与高效性。文章首先简述了goroutine的基本特性及其创建方法,随后详细解析了channel的类型、操作以及它们如何协同工作以构建健壮的并发应用。此外,还介绍了select语句在多路复用中的应用,以及如何利用WaitGroup等待一组goroutine完成。最后,通过一个实际案例展示了如何在Go中设计并实现一个简单的并发程序,旨在帮助读者理解并掌
|
2月前
|
安全 Go 调度
探索Go语言的并发模型:goroutine与channel
在这个快节奏的技术世界中,Go语言以其简洁的并发模型脱颖而出。本文将带你深入了解Go语言的goroutine和channel,这两个核心特性如何协同工作,以实现高效、简洁的并发编程。
|
2月前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
3月前
|
安全 Go 调度
探索Go语言的并发之美:goroutine与channel
在这个快节奏的技术时代,Go语言以其简洁的语法和强大的并发能力脱颖而出。本文将带你深入Go语言的并发机制,探索goroutine的轻量级特性和channel的同步通信能力,让你在高并发场景下也能游刃有余。
|
3月前
|
存储 安全 Go
探索Go语言的并发模型:Goroutine与Channel
在Go语言的多核处理器时代,传统并发模型已无法满足高效、低延迟的需求。本文深入探讨Go语言的并发处理机制,包括Goroutine的轻量级线程模型和Channel的通信机制,揭示它们如何共同构建出高效、简洁的并发程序。
|
3月前
|
存储 Go 调度
深入理解Go语言的并发模型:goroutine与channel
在这个快速变化的技术世界中,Go语言以其简洁的并发模型脱颖而出。本文将带你穿越Go语言的并发世界,探索goroutine的轻量级特性和channel的同步机制。摘要部分,我们将用一段对话来揭示Go并发模型的魔力,而不是传统的介绍性文字。
|
3月前
|
安全 Go 调度
探索Go语言的并发模型:Goroutine与Channel的魔力
本文深入探讨了Go语言的并发模型,不仅解释了Goroutine的概念和特性,还详细讲解了Channel的用法和它们在并发编程中的重要性。通过实际代码示例,揭示了Go语言如何通过轻量级线程和通信机制来实现高效的并发处理。
|
3月前
|
安全 Go 调度
探索Go语言的并发之美:goroutine与channel的实践指南
在本文中,我们将深入探讨Go语言的并发机制,特别是goroutine和channel的使用。通过实际的代码示例,我们将展示如何利用这些工具来构建高效、可扩展的并发程序。我们将讨论goroutine的轻量级特性,channel的同步通信能力,以及它们如何共同简化并发编程的复杂性。