如何使用Disruptor(二)如何从Ringbuffer读取

简介:

作者:Trisha  译者:古圣昌  校对:方腾飞

上一篇文章中我们都了解了什么是Ring Buffer以及它是如何的特别。但遗憾的是,我还没有讲述如何使用DisruptorRing Buffer写数据和从Ring Buffer中读取数据。

ConsumerBarrier与消费者

这里我要稍微反过来介绍,因为总的来说读取数据这一过程比写数据要容易理解。假设通过一些“魔法”已经把数据写入到 Ring Buffer了,怎样从 Ring Buffer读出这些数据呢?

(好,我开始后悔使用Paint/Gimp 了。尽管这是个购买绘图板的好借口,如果我继续写下去的话… UML界的权威们大概也在诅咒我的名字了。)

消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9

消费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.

1 final long availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号,我现在不会去描述它的细节,代码的注释里已经概括了每一种WaitStrategy的优点和缺点 。

接下来怎么做?

接下来,消费者会一直原地停留,等待更多数据被写入Ring Buffer。并且,一旦数据写入后消费者会收到通知——节点9101112 已写入。现在序号12到了,消费者可以让ConsumerBarrier去拿这些序号节点里的数据了。

拿到了数据后,消费者(Consumer)会更新自己的标识(cursor)。

你应该已经感觉得到,这样做是怎样有助于平缓延迟的峰值了——以前需要逐个节点地询问“我可以拿下一个数据吗?现在可以了么?现在呢?”,消费者(Consumer)现在只需要简单的说“当你拿到的数字比我这个要大的时候请告诉我”,函数返回值会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(Ring Buffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。这太好了,不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。

另一个好处是——你可以用多个消费者(Consumer)去读同一个RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor的协调下实现真正的并发数据处理。

BatchConsumer代码是一个消费者的例子。如果你实现了BatchHandler, 你可以用BatchConsumer来完成上面我提到的复杂工作。它很容易对付那些需要成批处理的节点(例如上文中要处理的9-12节点)而不用单独地去读取每一个节点。

更新:注意Disruptor 2.0版本使用了与本文不一样的命名。如果你对类名感到困惑,请阅读我的变更总结

目录
相关文章
|
6月前
|
存储 消息中间件 NoSQL
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
1354 3
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
|
存储 canal Java
两个例子带你入门 Disruptor
Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列。很多知名开源项目里,比如 canal 、log4j2、 storm 都是用了 Disruptor 以提升系统性能 。 这篇文章,我们通过两个例子一步一个脚印帮助同学们入门 Disruptor 。
两个例子带你入门 Disruptor
系统编程之高级文件IO(十二)——阻塞和非阻塞方式读取
系统编程之高级文件IO(十二)——阻塞和非阻塞方式读取
169 0
系统编程之高级文件IO(十二)——阻塞和非阻塞方式读取
|
JSON 缓存 前端开发
【并发技术系列】「Web请求读取系列」如何构建一个可重复读取的Request的流机制
【并发技术系列】「Web请求读取系列」如何构建一个可重复读取的Request的流机制
255 0
【并发技术系列】「Web请求读取系列」如何构建一个可重复读取的Request的流机制
|
消息中间件 存储 缓存
Disruptor介绍与基本使用
Disruptor是英国外汇交易公司LMAX开发的高性能的并发框架,研发的初衷是解决内存队列的延迟问题,它是线程间通信的高效低延时的内存消息组件,它最大的特点是高性能。下面我就来给大家从各个维度去介绍一下这个组件
775 0
Giraph 源码分析(五)—— 加载数据+同步总结
作者|白松 关于Giraph 共有九个章节,本文第五个章节。 环境:在单机上(机器名:giraphx)启动了2个workers。 输入:SSSP文件夹,里面有1.txt和2.txt两个文件。 1、在Worker向Master汇报健康状况后,就开始等待Master创建InputSplit。