开篇
整个博文希望能够讲清楚Disruptor的producer和consumer的处理过程以及两者之间的消息通知机制。
工作过程
Disruptor本质上是一个内存消息队列,适合生产者消费者模型,所以它的整个工作过程其实也就分三个大过程,分别是Disruptor本身启动、Disruptor生产者工作过程、Disruptor消费者工作过程。整个Disruptor工作流程图如下图:
Disruptor对象关系图
说明:
- Disruptor包含:RingBuffer对象、Executor对象。
- RingBuffer包含:Sequencer sequencer对象、int bufferSize变量、Object[] entries变量。
- Sequencer类分为MultiProducerSequencer和SingleProducerSequencer两类。
- Disruptor的整个启动过程就是上面变量的初始化过程。
Disruptor启动过程
Disruptor初始化过程
- 1、创建Disruptor对象。
- 2、创建RingBuffer对象。
- 3、Sequencer分为MultiProducerSequencer和SingleProducerSequencer两类。
- 4、整个过程按照Disruptor->RingBuffer->Sequencer的顺序创建核心对象。
Disruptor<LongEvent> d = new Disruptor<LongEvent>(
LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
producerType, new SleepingWaitStrategy());
public class Disruptor<T>
{
private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
@Deprecated
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
{
// RingBuffer.createMultiProducer创建RingBuffer对象
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
}
@Deprecated
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
// RingBuffer.create创建RingBuffer对象
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
}
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
// RingBuffer.createMultiProducer创建RingBuffer对象
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
// RingBuffer.createMultiProducer创建RingBuffer对象
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
// 真正核心的构造函数参数包括一个RingBuffer对象和Executor对象,前者由于保存数据,后者用于执行消费者函数
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
}
创建Sequencer对象,根据单生产者还是多生产者分为SingleProducerSequencer和MultiProducerSequencer两类。
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad
{
//用于填充的对象引用,为什么填充不知道?
private static final int BUFFER_PAD;
//entry存储位置相对与array起始位置的偏移量,用于UNSAFE内存操作时进行寻址,注意这个偏移量加上了用于填充的BUFFER_PAD大小
private static final long REF_ARRAY_BASE;
//对应对象引用占用内存大小,计算出来的相对位移数,比如对象引用大小是4byte,那么REF_ELEMENT_SHIFT=2,因为2的2次方=4;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
static
{
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
else
{
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
private final long indexMask;
// RingBuffer存储数据对象
private final Object[] entries;
// RingBuffer数组大小
protected final int bufferSize;
// Sequencer对象
protected final Sequencer sequencer;
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
public static <E> RingBuffer<E> createMultiProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
// 创建Sequencer对象
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 创建Sequencer对象的过程
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
{
return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
}
public static <E> RingBuffer<E> createSingleProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
// 创建Sequencer对象
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize)
{
return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
}
public static <E> RingBuffer<E> create(
ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType)
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
}
BasicExecutor只是简单的实现了Executor接口,用于解决没有传递Executor对象的时候使用默认的BasicExecutor即可,可以理解就是默认提供的线程池对象。
public class BasicExecutor implements Executor
{
private final ThreadFactory factory;
private final Queue<Thread> threads = new ConcurrentLinkedQueue<Thread>();
public BasicExecutor(ThreadFactory factory)
{
this.factory = factory;
}
@Override
public void execute(Runnable command)
{
final Thread thread = factory.newThread(command);
if (null == thread)
{
throw new RuntimeException("Failed to create thread to run: " + command);
}
thread.start();
threads.add(thread);
}
}
绑定消费者接口
整个绑定消费者接口主要完成了以下几个步骤:
- 1、根据消费者接口创建BatchEventProcessor对象
- 2、将BatchEventProcessor添加到consumerRepository当中
- 3、updateGatingSequencesForNextInChain内通过ringBuffer.addGatingSequences()方法添加消费者的Sequence到gatingSequences,完成了消费者和生产者之间的关联
// 创建Disruptor对象并通过handleEventsWith绑定消费者Handler对象
public void shouldBatch() throws Exception
{
Disruptor<LongEvent> d = new Disruptor<LongEvent>(
LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
producerType, new SleepingWaitStrategy());
ParallelEventHandler handler1 = new ParallelEventHandler(1, 0);
ParallelEventHandler handler2 = new ParallelEventHandler(1, 1);
d.handleEventsWith(handler1, handler2);
RingBuffer<LongEvent> buffer = d.start();
}
// handleEventsWith用于绑定消费者对象
public class Disruptor<T>
{
// step1 根据消费者接口创建消费者对象createEventProcessors
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
{
final Sequence[] barrierSequences = new Sequence[0];
return createEventProcessors(barrierSequences, eventProcessorFactories);
}
// 根据消费者接口创建批量处理对象BatchEventProcessor
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
// 每个消费者配备一个Sequence对象,应该需要申请一个Sequence数组
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
// 所有消费者公用一个SequenceBarrier对象,这个SequenceBarrier会传递到消费者当中
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
// 根据eventHandlers的消费接口的个数,为每个消费接口创建一个BatchEventProcessor对象
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 增加消费者对象到消费者数组consumerRepository当中
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
// processorSequences保存每个消费者的Sequence对象,每个消费者配备一个Sequence对象
processorSequences[i] = batchEventProcessor.getSequence();
}
// 内部主要是关联所有consumer的Sequence到RingBuffer的gatingSequences数组当中
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
// 增加消费者关联的Sequence到RingBuffer当中
ringBuffer.addGatingSequences(processorSequences);
// 这里不是特别懂,绑定接口的时候barrierSequences的数组为空数组
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
// RingBuffer->Sequencer
public void addGatingSequences(Sequence... gatingSequences)
{
// RingBuffer当中的sequencer为RingBuffer中唯一的值
// 通过addGatingSequences将所有消费者的Sequenece和生产者关联起来,
// 这样生产者可以检测消费者进度避免覆盖未消费数据
sequencer.addGatingSequences(gatingSequences);
}
}
// 保存消费者信息
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
new IdentityHashMap<Sequence, ConsumerInfo>();
private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
public void add(
final EventProcessor eventprocessor,
final EventHandler<? super T> handler,
final SequenceBarrier barrier)
{
final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
eventProcessorInfoByEventHandler.put(handler, consumerInfo);
eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
consumerInfos.add(consumerInfo);
}
}
AbstractSequencer的addGatingSequences()方法负责把消费者的Sequence添加到RingBuffer的Sequencer当中gatingSequences数组当中。内部通过调用SequenceGroups.addSequences()实现。
public abstract class AbstractSequencer implements Sequencer
{
// SEQUENCE_UPDATER指向gatingSequences变量的内存地址
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
protected final int bufferSize;
protected final WaitStrategy waitStrategy;
/**
* 当前RingBuffer对应的油表位置
*/
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
/**
* 各个消费者持有的取数sequence数组
*/
protected volatile Sequence[] gatingSequences = new Sequence[0];
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
@Override
public boolean removeGatingSequence(Sequence sequence)
{
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
SequenceGroups的addSequences()方法内部通过把新增的sequencesToAdd添加到Sequencer当中的gatingSequences数组中。
class SequenceGroups
{
static <T> void addSequences(
final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
final Cursored cursor,
final Sequence... sequencesToAdd)
{
long cursorSequence;
Sequence[] updatedSequences;
Sequence[] currentSequences;
do
{
currentSequences = updater.get(holder);
updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
cursorSequence = cursor.getCursor();
int index = currentSequences.length;
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
updatedSequences[index++] = sequence;
}
}
while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
cursorSequence = cursor.getCursor();
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
}
}
}
SequenceBarrier的创建过程,所有消费者依靠SequenceBarrier的cursor进行消费。
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
// protected final Sequencer sequencer
// sequencer这里指SingleProducerSequencer或MultiProducerSequencer
return sequencer.newBarrier(sequencesToTrack);
}
}
public abstract class AbstractSequencer implements Sequencer
{
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
// 以SingleProducerSequencer为例子,这里的curosr是Sequencer的curosr对象。
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
}
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
// 记录生产者现在生产的数据到哪个下标了,用于生产者和消费者之间的消息同步
private final Sequence cursorSequence;
private final Sequencer sequencer;
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
// 这里创建ProcessingSequenceBarrier对象的时候传入的cursorSequence
//
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
}
Disruptor启动
Disruptor启动的内容主要是通过线程池对象运行消费者任务,消费者任务进入While循环进行循环消费。整个启动过程如下:
- 1、遍历所有消费者由consumerRepository保存依次启动,ConsumerRepository的iterator()方法会遍历Collection<ConsumerInfo> consumerInfos。
- 2、ConsumerInfo中保存的是EventProcessorInfo对象,EventProcessorInfo的start()方法执行 executor.execute(eventprocessor)提交任务到线程池当中。
- 3、EventProcessorInfo中的EventProcessor eventprocessor对象是我们绑定消费函数指定的BatchEventProcessor对象。
- 4、BatchEventProcessor的内部方法run()方法由executor负责执行并启动,从而让消费者进入了消费的循环阶段。
public class Disruptor<T>
{
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
// 负责启动所有的消费者对象,ConsumerInfo对象是EventProcessorInfo
for (final ConsumerInfo consumerInfo : consumerRepository)
{
// 通过executor启动consumer对象,
consumerInfo.start(executor);
}
return ringBuffer;
}
}
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
new IdentityHashMap<Sequence, ConsumerInfo>();
private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
public void add(
final EventProcessor eventprocessor,
final EventHandler<? super T> handler,
final SequenceBarrier barrier)
{
final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
eventProcessorInfoByEventHandler.put(handler, consumerInfo);
eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
consumerInfos.add(consumerInfo);
}
@Override
public Iterator<ConsumerInfo> iterator()
{
return consumerInfos.iterator();
}
}
class EventProcessorInfo<T> implements ConsumerInfo
{
private final EventProcessor eventprocessor;
private final EventHandler<? super T> handler;
private final SequenceBarrier barrier;
private boolean endOfChain = true;
EventProcessorInfo(
final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
{
this.eventprocessor = eventprocessor;
this.handler = handler;
this.barrier = barrier;
}
@Override
public void start(final Executor executor)
{
executor.execute(eventprocessor);
}
}
public final class BatchEventProcessor<T>
implements EventProcessor
{
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
// 保存sequenceBarrier对象,批量增加的消费者会公用一个sequenceBarrier对象。
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware;
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}
batchStartAware =
(eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
(eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
}
producer工作过程
producer工作过程如下:
- 获取下一个可用的序号用于生成数据 ringBuffer.next()
- 生成数据并保存到循环数组当中 ringBuffer.get(sequence)
- 更新RingBuffer当中待消费数据的位移 ringBuffer.publish(sequence)
- SingleProducerSequencer的next()方法逻辑都在注释代码当中,自行阅读。
- ringBuffer.publish(sequence)会唤醒消费者如果消费者在等待状态
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//请求下一个事件序号;
try {
LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;
long data = getEventData();//获取要通过事件传递的业务数据;
event.set(data);
} finally{
ringBuffer.publish(sequence);//发布事件;
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
// 获取下一个可用的位置
public long next()
{
return sequencer.next();
}
// 数据放置到环形数组当中
public void publish(long sequence)
{
sequencer.publish(sequence);
}
}
next()方法获取下一个可用的可写的位置,publish()方法用于唤醒消费者
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
public long next()
{
return next(1);
}
public long next(int n)
{
if (n < 1) //n表示此次生产者期望获取多少个序号,通常是1
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
// 生产者当前序号值+期望获取的序号数量后达到的序号值
long nextSequence = nextValue + n;
// 减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
// 因为是环形数组的设计,所以生产者最多比消费者快一圈
// 所以减去bufferSize后比较看是否追上
long wrapPoint = nextSequence - bufferSize;
// 从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前‘消费者中最小序号值’,
// 而是上次程序进入到下面的if判定代码段是,被赋值的当时的‘消费者中最小序号值’
// 这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。
// 只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
long cachedGatingSequence = this.cachedValue;
// (wrapPoint > cachedGatingSequence) :
// 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),
// 需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
// (cachedGatingSequence > nextValue) : 生产者和消费者均为顺序递增的,且生产者的seq“先于”消费者的seq,注意是‘先于’而不是‘大于’。
// 当nextValue>Long.MAXVALUE时,nextValue+1就会变成负数,wrapPoint也会变成负数,这时候必然会是:cachedGatingSequence > nextValue
// 这个变化的过程会持续bufferSize个序号,这个区间,由于getMinimumSequence()得到的虽然是名义上的‘消费者中最小序号值’,但是不代表是走在‘最后面’的消费者
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
// 将生产者的cursor值更新到主存,以便对所有的消费者线程可见。cursor表示现在生产那个节点了
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 生产者停下来,等待消费者消费,直到‘覆盖’现象清除。
// 这里会获取所有消费者消费最慢的消费者的消费位移保证不覆盖。
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
// 通知消费者进行消费,然后自旋
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
public void publish(long sequence)
{
// 设置当前生产者的位置便于消费者消费
cursor.set(sequence);
// 通知消费者进行消费
waitStrategy.signalAllWhenBlocking();
}
consumer工作过程
consumer在绑定消费者的过程中创建的BatchEventProcessor对象,核心逻辑在于run()方法:
- 获取待消费的数据的位置 sequenceBarrier.waitFor(nextSequence)
- 遍历待消费的数据进行消费eventHandler.onEvent()
- 设置该消费者的消费的位移sequence.set(availableSequence)
public final class BatchEventProcessor<T>
implements EventProcessor
{
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
// 保存sequenceBarrier对象,这个sequenceBarrier在创建Consumer对象传递
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware;
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}
batchStartAware =
(eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
(eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
// 消费者进行消费,会在while循环当中一直消费
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
// 等待可消费的Sequence,在sequenceBarrier.waitFor()内部会执行waitStrategy.waitFor()方法实现生产者和消费者之间的通信。
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 进行数据消费
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 设置消费者当前的消费进度至sequence
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
}
ProcessingSequenceBarrier的waitFor()方法内部调用waitStrategy.waitFor()获取当前可消费的数据的位置,不同的waitStrategy由不同的策略,在内部通过信号量进行通知。
final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
// 记录生产者现在生产的数据到哪个下标了,用于生产者和消费者之间的消息同步
private final Sequence cursorSequence;
private final Sequencer sequencer;
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// Wait for the given sequence to be available。
// 这里并不保证返回值availableSequence一定等于 given sequence,他们的大小关系取决于采用的WaitStrategy。
// 1、YieldingWaitStrategy在自旋100次尝试后,会直接返回dependentSequence的最小seq,这时并不保证返回值>=given sequence
// 2、BlockingWaitStrategy则会阻塞等待given sequence可用为止,可用并不是说availableSequence == given sequence,而应当是指 >=
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
// 获取消费者可以消费的最大的可用序号,支持批处理效应,提升处理效率。
// 当availableSequence > sequence时,需要遍历 sequence --> availableSequence,找到最前一个准备就绪,可以被消费的event对应的seq。
// 最小值为:sequence-1
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
}