Disruptor - 工作流程(2)

简介: 开篇 整个博文希望能够讲清楚Disruptor的producer和consumer的处理过程以及两者之间的消息通知机制。工作过程 Disruptor本质上是一个内存消息队列,适合生产者消费者模型,所以它的整个工作过程其实也就分三个大过程,分别是Disruptor本身启动、Disruptor生产者工作过程、Disruptor消费者工作过程。

开篇

 整个博文希望能够讲清楚Disruptor的producer和consumer的处理过程以及两者之间的消息通知机制。


工作过程

 Disruptor本质上是一个内存消息队列,适合生产者消费者模型,所以它的整个工作过程其实也就分三个大过程,分别是Disruptor本身启动、Disruptor生产者工作过程、Disruptor消费者工作过程。整个Disruptor工作流程图如下图:

img_e423df9adeef466f751c7b710f7cb21f.png
Disruptor工作流程图


Disruptor对象关系图

img_98c26587bcbabc3312c1743908cf7b37.png
Disruptor对象关系图

说明:

  • Disruptor包含:RingBuffer对象、Executor对象。
  • RingBuffer包含:Sequencer sequencer对象、int bufferSize变量、Object[] entries变量。
  • Sequencer类分为MultiProducerSequencer和SingleProducerSequencer两类。
  • Disruptor的整个启动过程就是上面变量的初始化过程。


Disruptor启动过程

Disruptor初始化过程

  • 1、创建Disruptor对象。
  • 2、创建RingBuffer对象。
  • 3、Sequencer分为MultiProducerSequencer和SingleProducerSequencer两类。
  • 4、整个过程按照Disruptor->RingBuffer->Sequencer的顺序创建核心对象。
 Disruptor<LongEvent> d = new Disruptor<LongEvent>(
            LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
            producerType, new SleepingWaitStrategy());


public class Disruptor<T>
{
    private final RingBuffer<T> ringBuffer;
    private final Executor executor;
    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();

    @Deprecated
    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
    {
        // RingBuffer.createMultiProducer创建RingBuffer对象
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
    }

    @Deprecated
    public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final Executor executor,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
    {
         // RingBuffer.create创建RingBuffer对象
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
    }

    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
    {
         // RingBuffer.createMultiProducer创建RingBuffer对象
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
    }

    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        // RingBuffer.createMultiProducer创建RingBuffer对象
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }

    // 真正核心的构造函数参数包括一个RingBuffer对象和Executor对象,前者由于保存数据,后者用于执行消费者函数
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
}



创建Sequencer对象,根据单生产者还是多生产者分为SingleProducerSequencer和MultiProducerSequencer两类

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
    //用于填充的对象引用,为什么填充不知道?
    private static final int BUFFER_PAD;
    //entry存储位置相对与array起始位置的偏移量,用于UNSAFE内存操作时进行寻址,注意这个偏移量加上了用于填充的BUFFER_PAD大小
    private static final long REF_ARRAY_BASE;
    //对应对象引用占用内存大小,计算出来的相对位移数,比如对象引用大小是4byte,那么REF_ELEMENT_SHIFT=2,因为2的2次方=4;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();

    static
    {
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
        else
        {
            throw new IllegalStateException("Unknown pointer size");
        }

        BUFFER_PAD = 128 / scale;
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    }

    private final long indexMask;
    // RingBuffer存储数据对象
    private final Object[] entries;
    // RingBuffer数组大小
    protected final int bufferSize;
    // Sequencer对象
    protected final Sequencer sequencer;

    RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    RingBuffer(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }


    public static <E> RingBuffer<E> createMultiProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        // 创建Sequencer对象
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }


    // 创建Sequencer对象的过程
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
    {
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }


    public static <E> RingBuffer<E> createSingleProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        // 创建Sequencer对象
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }


    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize)
    {
        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
    }


    public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }
}



BasicExecutor只是简单的实现了Executor接口,用于解决没有传递Executor对象的时候使用默认的BasicExecutor即可,可以理解就是默认提供的线程池对象。

public class BasicExecutor implements Executor
{
    private final ThreadFactory factory;
    private final Queue<Thread> threads = new ConcurrentLinkedQueue<Thread>();

    public BasicExecutor(ThreadFactory factory)
    {
        this.factory = factory;
    }

    @Override
    public void execute(Runnable command)
    {
        final Thread thread = factory.newThread(command);
        if (null == thread)
        {
            throw new RuntimeException("Failed to create thread to run: " + command);
        }

        thread.start();

        threads.add(thread);
    }
}


绑定消费者接口

 整个绑定消费者接口主要完成了以下几个步骤:

  • 1、根据消费者接口创建BatchEventProcessor对象
  • 2、将BatchEventProcessor添加到consumerRepository当中
  • 3、updateGatingSequencesForNextInChain内通过ringBuffer.addGatingSequences()方法添加消费者的Sequence到gatingSequences,完成了消费者和生产者之间的关联
// 创建Disruptor对象并通过handleEventsWith绑定消费者Handler对象
public void shouldBatch() throws Exception
{
    Disruptor<LongEvent> d = new Disruptor<LongEvent>(
            LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
            producerType, new SleepingWaitStrategy());

    ParallelEventHandler handler1 = new ParallelEventHandler(1, 0);
    ParallelEventHandler handler2 = new ParallelEventHandler(1, 1);
    
    d.handleEventsWith(handler1, handler2);
    RingBuffer<LongEvent> buffer = d.start();
}


// handleEventsWith用于绑定消费者对象
public class Disruptor<T>
{
    // step1 根据消费者接口创建消费者对象createEventProcessors
    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);
    }

    public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
    {
        final Sequence[] barrierSequences = new Sequence[0];
        return createEventProcessors(barrierSequences, eventProcessorFactories);
    }



    // 根据消费者接口创建批量处理对象BatchEventProcessor
    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        // 每个消费者配备一个Sequence对象,应该需要申请一个Sequence数组
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        // 所有消费者公用一个SequenceBarrier对象,这个SequenceBarrier会传递到消费者当中
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        // 根据eventHandlers的消费接口的个数,为每个消费接口创建一个BatchEventProcessor对象
        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }
            
            // 增加消费者对象到消费者数组consumerRepository当中
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            // processorSequences保存每个消费者的Sequence对象,每个消费者配备一个Sequence对象
            processorSequences[i] = batchEventProcessor.getSequence();
        }
        // 内部主要是关联所有consumer的Sequence到RingBuffer的gatingSequences数组当中
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }



    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            // 增加消费者关联的Sequence到RingBuffer当中
            ringBuffer.addGatingSequences(processorSequences);

            // 这里不是特别懂,绑定接口的时候barrierSequences的数组为空数组
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

    // RingBuffer->Sequencer
    public void addGatingSequences(Sequence... gatingSequences)
    {
        // RingBuffer当中的sequencer为RingBuffer中唯一的值
        // 通过addGatingSequences将所有消费者的Sequenece和生产者关联起来,
        // 这样生产者可以检测消费者进度避免覆盖未消费数据
        sequencer.addGatingSequences(gatingSequences);
    }
}


// 保存消费者信息
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<Sequence, ConsumerInfo>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();

    public void add(
        final EventProcessor eventprocessor,
        final EventHandler<? super T> handler,
        final SequenceBarrier barrier)
    {
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }
}



  AbstractSequencer的addGatingSequences()方法负责把消费者的Sequence添加到RingBuffer的Sequencer当中gatingSequences数组当中。内部通过调用SequenceGroups.addSequences()实现。

public abstract class AbstractSequencer implements Sequencer
{
    // SEQUENCE_UPDATER指向gatingSequences变量的内存地址
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

    protected final int bufferSize;
    protected final WaitStrategy waitStrategy;
    /**
     * 当前RingBuffer对应的油表位置
     */
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    /**
     * 各个消费者持有的取数sequence数组
     */
    protected volatile Sequence[] gatingSequences = new Sequence[0];

    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
    {
        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.bufferSize = bufferSize;
        this.waitStrategy = waitStrategy;
    }

    @Override
    public final void addGatingSequences(Sequence... gatingSequences)
    {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }

    @Override
    public boolean removeGatingSequence(Sequence sequence)
    {
        return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
    }



 SequenceGroups的addSequences()方法内部通过把新增的sequencesToAdd添加到Sequencer当中的gatingSequences数组中。

class SequenceGroups
{
    static <T> void addSequences(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
        final Cursored cursor,
        final Sequence... sequencesToAdd)
    {
        long cursorSequence;
        Sequence[] updatedSequences;
        Sequence[] currentSequences;

        do
        {
            currentSequences = updater.get(holder);
            updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
            cursorSequence = cursor.getCursor();

            int index = currentSequences.length;
            for (Sequence sequence : sequencesToAdd)
            {
                sequence.set(cursorSequence);
                updatedSequences[index++] = sequence;
            }
        }
        while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

        cursorSequence = cursor.getCursor();
        for (Sequence sequence : sequencesToAdd)
        {
            sequence.set(cursorSequence);
        }
    }
}



 SequenceBarrier的创建过程,所有消费者依靠SequenceBarrier的cursor进行消费。

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        // protected final Sequencer sequencer
        // sequencer这里指SingleProducerSequencer或MultiProducerSequencer
        return sequencer.newBarrier(sequencesToTrack);
    }
}


public abstract class AbstractSequencer implements Sequencer
{
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        // 以SingleProducerSequencer为例子,这里的curosr是Sequencer的curosr对象。
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }
}


final class ProcessingSequenceBarrier implements SequenceBarrier
{
    private final WaitStrategy waitStrategy;
    private final Sequence dependentSequence;
    private volatile boolean alerted = false;
    // 记录生产者现在生产的数据到哪个下标了,用于生产者和消费者之间的消息同步
    private final Sequence cursorSequence;
    private final Sequencer sequencer;

    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        // 这里创建ProcessingSequenceBarrier对象的时候传入的cursorSequence
       //
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }
}


Disruptor启动

 Disruptor启动的内容主要是通过线程池对象运行消费者任务,消费者任务进入While循环进行循环消费。整个启动过程如下:

  • 1、遍历所有消费者由consumerRepository保存依次启动,ConsumerRepository的iterator()方法会遍历Collection<ConsumerInfo> consumerInfos。
  • 2、ConsumerInfo中保存的是EventProcessorInfo对象,EventProcessorInfo的start()方法执行 executor.execute(eventprocessor)提交任务到线程池当中。
  • 3、EventProcessorInfo中的EventProcessor eventprocessor对象是我们绑定消费函数指定的BatchEventProcessor对象。
  • 4、BatchEventProcessor的内部方法run()方法由executor负责执行并启动,从而让消费者进入了消费的循环阶段。
public class Disruptor<T>
{
    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        // 负责启动所有的消费者对象,ConsumerInfo对象是EventProcessorInfo
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            // 通过executor启动consumer对象,
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }
}



class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<Sequence, ConsumerInfo>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();

    public void add(
        final EventProcessor eventprocessor,
        final EventHandler<? super T> handler,
        final SequenceBarrier barrier)
    {
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }

    @Override
    public Iterator<ConsumerInfo> iterator()
    {
        return consumerInfos.iterator();
    }
}

class EventProcessorInfo<T> implements ConsumerInfo
{
    private final EventProcessor eventprocessor;
    private final EventHandler<? super T> handler;
    private final SequenceBarrier barrier;
    private boolean endOfChain = true;

    EventProcessorInfo(
        final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
    {
        this.eventprocessor = eventprocessor;
        this.handler = handler;
        this.barrier = barrier;
    }

    @Override
    public void start(final Executor executor)
    {
        executor.execute(eventprocessor);
    }
}


public final class BatchEventProcessor<T>
    implements EventProcessor
{
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T> dataProvider;
    // 保存sequenceBarrier对象,批量增加的消费者会公用一个sequenceBarrier对象。
    private final SequenceBarrier sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler timeoutHandler;
    private final BatchStartAware batchStartAware;

    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }

        batchStartAware =
                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
        timeoutHandler =
                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }



    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }

                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }
}


producer工作过程

 producer工作过程如下:

  • 获取下一个可用的序号用于生成数据 ringBuffer.next()
  • 生成数据并保存到循环数组当中 ringBuffer.get(sequence)
  • 更新RingBuffer当中待消费数据的位移 ringBuffer.publish(sequence)
  • SingleProducerSequencer的next()方法逻辑都在注释代码当中,自行阅读。
  • ringBuffer.publish(sequence)会唤醒消费者如果消费者在等待状态
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//请求下一个事件序号;
    
try {
    LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;
    long data = getEventData();//获取要通过事件传递的业务数据;
    event.set(data);
} finally{
    ringBuffer.publish(sequence);//发布事件;
}



public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    // 获取下一个可用的位置
    public long next()
    {
        return sequencer.next();
    }

    // 数据放置到环形数组当中
    public void publish(long sequence)
    {
        sequencer.publish(sequence);
    }
}

 next()方法获取下一个可用的可写的位置,publish()方法用于唤醒消费者

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }

    public long next()
    {
        return next(1);
    }

    public long next(int n) 
    {
        if (n < 1) //n表示此次生产者期望获取多少个序号,通常是1
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;
        
        // 生产者当前序号值+期望获取的序号数量后达到的序号值
        long nextSequence = nextValue + n;  
        // 减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
        // 因为是环形数组的设计,所以生产者最多比消费者快一圈
        // 所以减去bufferSize后比较看是否追上
        long wrapPoint = nextSequence - bufferSize; 

        // 从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前‘消费者中最小序号值’,
        // 而是上次程序进入到下面的if判定代码段是,被赋值的当时的‘消费者中最小序号值’
        // 这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。
        // 只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
        long cachedGatingSequence = this.cachedValue;  

        // (wrapPoint > cachedGatingSequence) : 
        // 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),
        // 需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’

        // (cachedGatingSequence > nextValue) : 生产者和消费者均为顺序递增的,且生产者的seq“先于”消费者的seq,注意是‘先于’而不是‘大于’。
        // 当nextValue>Long.MAXVALUE时,nextValue+1就会变成负数,wrapPoint也会变成负数,这时候必然会是:cachedGatingSequence > nextValue
        // 这个变化的过程会持续bufferSize个序号,这个区间,由于getMinimumSequence()得到的虽然是名义上的‘消费者中最小序号值’,但是不代表是走在‘最后面’的消费者
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) 
        {
            // 将生产者的cursor值更新到主存,以便对所有的消费者线程可见。cursor表示现在生产那个节点了
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            // 生产者停下来,等待消费者消费,直到‘覆盖’现象清除。
            // 这里会获取所有消费者消费最慢的消费者的消费位移保证不覆盖。
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                // 通知消费者进行消费,然后自旋
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

    public void publish(long sequence)
    {
        // 设置当前生产者的位置便于消费者消费
        cursor.set(sequence);
        // 通知消费者进行消费
        waitStrategy.signalAllWhenBlocking();
    }


consumer工作过程

 consumer在绑定消费者的过程中创建的BatchEventProcessor对象,核心逻辑在于run()方法:

  • 获取待消费的数据的位置 sequenceBarrier.waitFor(nextSequence)
  • 遍历待消费的数据进行消费eventHandler.onEvent()
  • 设置该消费者的消费的位移sequence.set(availableSequence)
public final class BatchEventProcessor<T>
    implements EventProcessor
{
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T> dataProvider;
    // 保存sequenceBarrier对象,这个sequenceBarrier在创建Consumer对象传递
    private final SequenceBarrier sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler timeoutHandler;
    private final BatchStartAware batchStartAware;

    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }

        batchStartAware =
                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
        timeoutHandler =
                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }


    // 消费者进行消费,会在while循环当中一直消费
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    // 等待可消费的Sequence,在sequenceBarrier.waitFor()内部会执行waitStrategy.waitFor()方法实现生产者和消费者之间的通信。
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }
                    
                    // 进行数据消费
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    // 设置消费者当前的消费进度至sequence
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }
}



 ProcessingSequenceBarrier的waitFor()方法内部调用waitStrategy.waitFor()获取当前可消费的数据的位置,不同的waitStrategy由不同的策略,在内部通过信号量进行通知。

final class ProcessingSequenceBarrier implements SequenceBarrier
{
    private final WaitStrategy waitStrategy;
    private final Sequence dependentSequence;
    private volatile boolean alerted = false;
    // 记录生产者现在生产的数据到哪个下标了,用于生产者和消费者之间的消息同步
    private final Sequence cursorSequence;
    private final Sequencer sequencer;

    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }


    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        // Wait for the given sequence to be available。
        // 这里并不保证返回值availableSequence一定等于 given sequence,他们的大小关系取决于采用的WaitStrategy。
        // 1、YieldingWaitStrategy在自旋100次尝试后,会直接返回dependentSequence的最小seq,这时并不保证返回值>=given sequence
        // 2、BlockingWaitStrategy则会阻塞等待given sequence可用为止,可用并不是说availableSequence == given sequence,而应当是指 >=
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        // 获取消费者可以消费的最大的可用序号,支持批处理效应,提升处理效率。
        // 当availableSequence > sequence时,需要遍历 sequence --> availableSequence,找到最前一个准备就绪,可以被消费的event对应的seq。
        // 最小值为:sequence-1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
}


参考文章

Disruptor3.0的实现细节

目录
相关文章
|
6月前
|
消息中间件 分布式计算 Java
探究Kafka原理-3.生产者消费者API原理解析(上)
探究Kafka原理-3.生产者消费者API原理解析
67 0
|
6月前
|
消息中间件 缓存 Kafka
探究Kafka原理-3.生产者消费者API原理解析(下)
探究Kafka原理-3.生产者消费者API原理解析
170 0
|
消息中间件 存储 缓存
|
消息中间件 网络协议 Java
|
消息中间件 Java Kafka
Java工具篇之Disruptor高性能队列
disruptor适用于多个线程之间的消息队列,`作用与ArrayBlockingQueue有相似之处`,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
1410 1
Java工具篇之Disruptor高性能队列
一些关于Netty的工作架构流程的问题
一些关于Netty的工作架构流程的问题
|
存储 缓存 NoSQL
并发编程框架Disruptor之高性能设计(上)
并发编程框架Disruptor之高性能设计(上)
152 0
并发编程框架Disruptor之高性能设计(上)
|
缓存 算法 Java
并发编程框架Disruptor之高性能设计(下)
并发编程框架Disruptor之高性能设计(下)
148 0
并发编程框架Disruptor之高性能设计(下)
|
存储 Cloud Native Java
蚂蚁金服分布式链路跟踪组件 SOFATracer 中 Disruptor 实践(含源码)
本文对 SOFATracer 中使用 Disruptor 来进行日志输出的代码进行了简单的分析。
蚂蚁金服分布式链路跟踪组件 SOFATracer 中 Disruptor 实践(含源码)
|
Java 调度 Android开发
08.RxJava运作流程源码分析
RxJava线程切换非常方便,只要调用subscribeOn(Schedules.io())就可以使前边的操作运行于子线程,调用obsersableOn(AndroidSchedules.
1071 0