今天偶然看到另一篇文章(原文在文末[1])。两篇文章原理相似:有一批工作任务(job),通过工作池(worker-pool)的方式,达到多worker
并发处理job
的效果。
他们还是有很多不同的点,实现上差别也是蛮大的。
首先上一篇文章我放了一张图片,大概就是上篇整体的工作流。
- 每个
worker
处理完任务就好,不关心结果,不对结果做进一步处理。 - 只要请求不停止,程序就不会停止,没有控制机制,除非宕机。
这篇文章不同点在于:
首先数据会从generate
(生产数据)->并发处理数据->处理结果聚合。
图大概是这样的,
然后它可以通过context.context
达到控制工作池停止工作的效果。
最后通过代码,你会发现它不是传统意义上的worker-pool
,后面会说明。
下图能清晰表达整体流程了。
顺便说一句,这篇文章实现的代码比使用 Go 每分钟处理百万请求的代码简单多了。
首先看job
。
这个可以简单过一下。最终每个job
处理完都会包装成Result
返回。
下面这段就是核心代码了。
整个WorkerPool
结构很简单。jobs
是一个缓冲channel
。每一个任务都会放入jobs
中等待处理woker
处理。
results
也是一个通道类型,它的作用是保存每个job
处理后产生的结果Result
。
首先通过New
初始化一个worker-pool
工作池,然后执行Run
开始运行。
初始化的时候传入worker
数,对应每个g
运行work(ctx,&wg,wp.jobs,wp.results)
,组成了worker-pool
。
同时通过sync.WaitGroup
,我们可以等待所有worker
工作结束,也就意味着work-pool
结束工作,当然可能是因为任务处理结束,也可能是被停止了。
每个job
数据源是如何来的?
对应每个worker
的工作,
每个 worker 都尝试从同一个jobs
获取数据,这是一个典型的fan-out
模式。当对应的g
获取到job
进行处理后,会把处理结果发送到同一个results channel
中,这又是一个fan-in
模式。
当然我们通过context.Context
可以对每个worker
做停止运行控制。
最后是处理结果集合,
那么整体的测试代码就是:
看了代码之后,我们知道,这并不是一个传统意义的worker-pool
。它并不像上篇这篇文章一样,初始化一个真正的worker-pool
,一旦接收到job
,就尝试从池中获取一个worker
,把对应的job
交给这个work
进行处理,等work
处理完毕,重新进行到工作池中,等待下一次被利用。
附录
[1]https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0#fe56