指北君就让小 B 用 CyclicBarrier 。CyclicBarrier 是一个循环的栅栏,在多个线程完成各自的任务之后,主线程才可以开始执行任务。小 B 的情况就适用于多个线程并行查询数据库,然后写入 excel 的各个 sheet 页,在所有操作完成之后执行汇总数据的算法并将结果写入汇总的 sheet 页。
下面用一个小 demo,对 CyclicBarrier 有一个初步的印象。
这个 demo 中一共 3 个线程,每个线程都随机获取一个数字(在实际生产代码中会有更复杂的操作),最后将每个线程获取的数字相加后打印最后的结果。
源码分析
内部类
CyclicBarrier 的一个内部类,Generation 被翻译成为“代”。当这一代的所有线程都到达栅栏后可以开启下一代,所以才被成为循环栅栏。broken 属性表示栅栏是否被打破了。
属性与构造函数
从上面的内容和 demo 粗略的可以看出,CyclicBarrier 在初始化时设置了线程数量 parties,必须等待所有的线程都到栅栏处 cyclicBarrier.await() 时才可以运行 barrierCommand 方法。
如果还有线程没有到达栅栏处,会将先到达栅栏处的线程放入 trip 条件队列中等待最后一个线程到达。
await()
外部调用 await() 方法,等待线程到达栅栏后一起执行后续的操作。await() 可以被复用,每多调用一次 await() 就表示多增加一代,第一次调用是一代、第二次调用是二代、第三次调用是三代...。
dowait()
dowait() 是 CyclicBarrier 的核心方法。
dowait() 的运行被分成了 2 部分:
- 最后一个线程的时候,进入运行 barrierCommand 方法的流程,并且进入下一代。
- 前面的其他线程都进入循环中,将线程添加到 trip 的条件队列中,等待最后一个线程将它们唤醒。
nextGeneration()
CyclicBarrier 为什么会一代结束后可以开始下一代,就靠这个 nextGeneration() 方法,它干了三件事:
- trip.signalAll() 方法将 trip() 条件队列中的线程全部转移到 AQS 队列中去。AQS 队列中出队是在 lock.unlock() 的时候。
- 将线程的数量重置。
- 初始化一个新的代。
总结
CyclicBarrier 使用了两个队列,一个条件队列,一个 AQS 队列,在 trip.await() 出进入条件队列。当最后一个线程到达栅栏出的时候,条件队列中的线程全部移动到 AQS 队列中,要注意的是最后一个线程并没有进入 AQS 队列中。在 lock.unlock() 的时候 AQS 队列中的线程出队。
CyclicBarrier 基于 ReentrantLock 和 Condition 实现同步线程的逻辑。
我是指北君,操千曲而后晓声,观千剑而后识器。感谢各位人才的:点赞、收藏和评论,我们下期更精彩!