如何在 go 中实现一个 worker-pool?

简介: 如何在 go 中实现一个 worker-pool?

今天偶然看到另一篇文章(原文在文末[1])。两篇文章原理相似:有一批工作任务(job)通过工作池(worker-pool)的方式达到多worker并发处理job的效果


他们还是有很多不同的点,实现上差别也是蛮大的。

首先上一篇文章我放了一张图片,大概就是上篇整体的工作流。

1668506288789.jpg

  • 每个worker处理完任务就好,不关心结果,不对结果做进一步处理
  • 只要请求不停止,程序就不会停止,没有控制机制,除非宕机

这篇文章不同点在于:

首先数据会从generate(生产数据)->并发处理数据->处理结果聚合
图大概是这样的,

1668506298849.jpg

然后它可以通过context.context达到控制工作池停止工作的效果

最后通过代码,你会发现它不是传统意义上的worker-pool后面会说明

下图能清晰表达整体流程了

1668506306971.jpg

顺便说一句,这篇文章实现的代码比使用 Go 每分钟处理百万请求的代码简单多了

首先看job

1668506317252.jpg

这个可以简单过一下。最终每个job处理完都会包装成Result返回

下面这段就是核心代码了

1668506326327.jpg

整个WorkerPool结构很简单jobs是一个缓冲channel每一个任务都会放入jobs中等待处理woker处理

results也是一个通道类型,它的作用是保存每个job处理后产生的结果Result

首先通过New初始化一个worker-pool工作池,然后执行Run开始运行

1668506336145.jpg

初始化的时候传入worker数,对应每个g运行work(ctx,&wg,wp.jobs,wp.results),组成了worker-pool

同时通过sync.WaitGroup,我们可以等待所有worker工作结束,也就意味着work-pool结束工作,当然可能是因为任务处理结束,也可能是被停止了

每个job数据源是如何来的

1668506347263.jpg

对应每个worker的工作

1668506355712.jpg

每个 worker 都尝试从同一个jobs获取数据,这是一个典型的fan-out模式当对应的g获取到job进行处理后,会把处理结果发送到同一个results channel中,这又是一个fan-in模式


当然我们通过context.Context可以对每个worker做停止运行控制

最后是处理结果集合

1668506365110.jpg

那么整体的测试代码就是:

1668506374703.jpg

看了代码之后,我们知道,这并不是一个传统意义的worker-pool它并不像上篇这篇文章一样,初始化一个真正的worker-pool一旦接收到job,就尝试从池中获取一个worker把对应的job交给这个work进行处理,等work处理完毕,重新进行到工作池中,等待下一次被利用。


附录


[1]https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0#fe56

相关文章
|
Go 数据库 数据安全/隐私保护
Go实现随机加盐密码认证
Go实现随机加盐密码认证
307 0
|
存储 缓存 人工智能
基于Go的缓存实现
缓存是架构设计中的常用概念,本文基于Go实现了一个简单的缓存组件,支持最基本的缓存操作。
277 0
基于Go的缓存实现
|
存储 缓存 NoSQL
一文搞懂Go整合captcha实现验证码功能
一文搞懂Go整合captcha实现验证码功能
|
XML JSON Java
RPC框架之Thrift—实现Go和Java远程过程调用
RPC框架之Thrift—实现Go和Java远程过程调用
|
监控 测试技术 Go
用 Go 从零实现日志包 - 第零篇 序言
设计一个日志包,需要考虑的基础功能有日志级别设置、标准输出和文件、输出格式配置、日志的时间戳、文件与打印行号、正文。高级功能有按级别分类输出、支持结构化日志、支持日志轮转。
134 0
|
Go 数据安全/隐私保护
Go 实现 AES 加密 CBC 模式|Go主题月
密码分组链接模式 CBC (Cipher Block Chaining),这种模式是先将明文切分成若干小段,然后每一小段与初始块或者上一段的密文段进行异或运算后,再与密钥进行加密。
341 0
|
Go
【Golang】panic和recover底层逻辑实现|Go主题月
在每个goroutine也有一个指针指向_panic链表表头,然后每增加一个panic就会在链表头部加入一个_panic结构体。当所有的defer执行完后,_panic链表就会从尾部开始打印panic信息了,也就是说先发生的panic先打印信息。
236 0
|
Go 索引 Python
Go 和Python中的闭包实现及使用
Go 和Python中的闭包实现及使用
121 0
go语言实现【队列】|二叉树的【先序遍历】【创建】
go语言实现【队列】|二叉树的【先序遍历】【创建】
go语言实现【队列】|二叉树的【先序遍历】【创建】
|
Java Go
Go语言实现多态
Go语言实现多态
247 0
Go语言实现多态