并发编程之Disruptor框架介绍和高阶运用(一)

简介: 并发编程之Disruptor框架介绍和高阶运用

1.   Disruptor是什么

1.1   技术背景

LMAX是在英国注册并受到FCA监管(监管号码为509778)的外汇黄金交易所, LMAX架构是LMAX内部研发并应用到交易系统的一种技术。它之所以引起人们的关注,是因为它是一个非常高性能系统,这个系统是建立在JVM平台上,核心是一个业务逻辑处理器,官方号称它能够在一个线程里每秒处理6百万订单.

一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据。而每台服务器(8核12G)上CPU占用不到100%,load不超过5。

1.2   对比阻塞队列

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor论文中讲述了一个实验:

这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。 机器环境:2.4G 6核 运算: 64位的计数器累加5亿次

Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。 单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。 在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。

 可以和BlockingQueue做对比,不过disruptor除了能完成同样的工作场景外,能做更多的事,效率也更高。业务逻辑处理器完全是运行在内存中(in-memory),使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件,能够在无锁的情况下实现Queue并发安全操作

1.3   Disruptor构成

disruptor核心UML图

先介绍几个相关的核心概念。

①环形队列ringbuffer

数据缓冲区,不同线程之间传递数据的BUFFER。RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。

②Producer/Consumer

Producer即生产者,比如下图中的P1. 只是泛指调用 Disruptor 发布事件(我们把写入缓冲队列的一个元素定义为一个事件)的用户代码。

Consumer和EventProcessor是一个概念,新的版本中由EventProcessor概念替代了Consumer

有两种实现策略,一个是SingleThreadedStrategy(单线程策略)另一个是 MultiThreadedStrategy(多线程策略),两种策略对应的实现类为SingleProducerSequencer、MultiProducerSequencer【都实现了Sequencer类,之所以叫Sequencer是因为他们都是通过Sequence来实现数据写,Sequence的概念参见③】 ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。具体使用哪个根据自己的场景来定,[多线程的策略使用了AtomicLong(Java提供的CAS操作),而单线程的使用long,没有锁也没有CAS。这意味着单线程版本会非常快,因为它只有一个生产者,不会产生序号上的冲突]

Producer生产event数据,EventHandler作为消费者消费event并进行逻辑处理。消费消息的进度通过Sequence来控制。

c0d56e02d151246091dac4a036a93d25_20181219233541392.png

③Sequence

Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享,关于解决伪共享的问题,可以参见下面章节详细的介绍。

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。

说明:虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

④Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。不好理解的话,可以看下面章节事例配合了解。

SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

⑤Wait Strategy

当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。

BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。

SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。

YieldingWaitStrategy : 是一种充分压榨 CPU 的策略,使用 自旋+yield的方式来提高性能。 当消费线程(Event Handler threads)的数量小于 CPU 核心数时推荐使用该策略。(多个消费者且大于CPU核数可能导致CPU接近100%,需要谨慎使用)

PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。

⑥Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

  1.3 .6.1 EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。通过把EventProcessor提交到线程池来真正执行,有两类Processor:

其中一类消费者是BatchEvenProcessor。每个BatchEvenProcessor有一个Sequence,来记录自己消费RingBuffer中消息的情况。所以,一个消息必然会被每一个BatchEvenProcessor消费。

另一类消费者是WorkProcessor。每个WorkProcessor也有一个Sequence,多个WorkProcessor还共享一个Sequence用于互斥的访问RingBuffer。一个消息被一个WorkProcessor消费,就不会被共享一个Sequence的其他WorkProcessor消费。这个被WorkProcessor共享的Sequence相当于尾指针

  1.3 6.2 EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。开发者实现EventHandler,然后作为入参传递给EventProcessor的实例。

综上所述,附官方类图:

34305aeb8571549403739a1b0b30af3e_20181219151311869.png

2.  Disruptor什么时候用

disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。下面以两个简单场景举例:

例如场景一:

停车批量入场数据上报,数据上报后需要对每条入场数据存入DB,还需要发送kafka消息给其他业务系统。如果执行完所有的操作,再返回,那么接口耗时比较长,我们可以批量上报后验证数据正确性,通过后按单条入场数据写入环形队列,然后直接返回成功。

实现方式一:启 动2个消费者线程,一个消费者去执行db入库,一个消费者去发送kafka消息。

实现方式二:启动4个消费者,2个消费者并发执行db入库,两个消费者并发发送kafka消息,充分利用cpu多核特性,提高执行效率。

实现方式三:如果要求写入DB和kafka后,需要给用户发送短信。那么可以启动三个消费者线程,一个执行db插入,一个执行kafka消息发布,最后一个依赖前两个线程执行成功,前两个线程都执行成功后,该线程执行短信发送。

例如场景二:

你在网上使用信用卡下订单。一个简单的零售系统将获取您的订单信息,使用信用卡验证服务,以检查您的信用卡号码,然后确认您的订单 – 所有这些都在一个单一过程中操作。当进行信用卡有效性检查时,服务器这边的线程会阻塞等待,当然这个对于用户来说停顿不会太长。

在MAX架构中,你将此单一操作过程分为两个,第一部分将获取订单信息,然后输出事件(请求信用卡检查有效性的请求事件)给信用卡公司. 业务逻辑处理器将继续处理其他客户的订单,直至它在输入事件中发现了信用卡已经检查有效的事件,然后获取该事件来确认该订单有效。

3.  Disruptor为什么快

3.1数组实现

用数组实现, 解决了链表节点分散, 不利于cache预读问题,可以预分配用于存储事件内容的内存空间;并且解决了节点每次需要分配和释放, 需要大量的垃圾回收GC问题 (数组内元素的内存地址的连续性存储的,在硬件级别,数组中的元素是会被预加载的,因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行)

3.2 批量预读

相比链表队列,实现数组预读,减少结点操作空间释放和申请,(事先把数据空间申请出来,但不赋值)从而减少gc次数。生产者支持单生产,多生产者模式,单生产者cursor使用普通long实现,无锁加快速度,多生产者才使用Sequence(AtomicLong)生产和消费元素支持单线程批量操作数据。

RingBuffer的批量预读源码:

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

3.3求余操作优化

求余操作本身也是一种高耗费的操作, 所以ringbuffer的size设成2的n次方(不要太大,否则会造成oom), 可以利用位操作来高效实现求余。要找到数组中当前序号指向的元素,可以通过mod操作,正常通过sequence mod array length = array index,优化后可以通过:sequence & (array length-1) = array index实现。比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。

3.4 Lock-Free

如果只有一个生产者,那么系统中的每一个序列号只会由一个线程写入。这意味着没有竞争、不需要锁、甚至不需要CAS。在ClaimStrategy中,如果存在多个生产者,唯一会被多线程竞争写入的序号就是 ClaimStrategy 对象里的那个。

那么是采用什么样的方式竞争写入呢?

disruptor不使用锁, 使用CAS(Compare And Swap/Set),严格意义上说仍然是使用锁, 因为CAS本质上也是一种乐观锁, 只不过是CPU级别指令, 不涉及到操作系统, 所以效率很高(AtomicLong实现Sequence)

这就是我们所说的“分离竞争点问题”或者队列的“合并竞争点问题”。通过将所有的东西都赋予私有的序列号,并且只允许一个消费者读Entry对象中的变量来消除竞争,Disruptor 唯一需要处理访问冲突的地方,是多个生产者写入 Ring Buffer 的场景

为什么队列不能胜任这个工作?

- 节点分散, 不利于cache预读

- 节点每次需要分配和释放, 需要大量的垃圾回收, 低效

- 不利于批量读取

- 竞争点较多, head指针, tail指针, size

  由于producer和consumer很难同步, 所以大部分queue都是满或空状态, 这样会导致大量的竞争, 比较低效

- 而且习惯的编程方式导致head指针, tail指针, size常常在一个cacheline中, 造成伪共享问题

用数组实现, 可以部分解决前3点问题, 但仍然无法解决竞争点问题, 以及由于数组的fix size, 带来扩展性问题

3.5 解决伪共享{False Sharing}

缓存系统是以缓存行cache-line为存储单位的,大小一般为2的整数次幂一般为64字节

当多线程互相修改独立的变量时,如果这些变量共享一个缓存行,就会影响彼此的性能,这就是伪共享

缓存行通常是64字节, 一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量. 缓存行是缓存更新的基本单位, 就算你只读一个变量, 系统也会预读其余7个, 并cache这一行, 并且这行中的任一变量发生改变, 都需要重新加载整行, 而非仅仅重新加载一个变量.

解决伪共享的办法是填充一些无用的字段p1,p2,p3,p4,p5,p6,p7再考虑到对象头也占用8bit, 刚好把对象占用的内存扩展到刚好占64bytes(或者64bytes的整数倍)

注:有可能处理器的缓存行是128字节,那么使用64字节填充还是会存在伪共享问题

在Disruptor里我们对RingBuffer的cursor和BatchEventProcessor的序列进行了缓存行填充,如下:

abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

jdk消除伪共享的策略

http://www.cnblogs.com/Binhua-Liu/p/5620339.html

https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-api/src/main/java/com/lzhsite/disruptor/com.lzhsite.disruptor.optimization

3.6 使用内存屏障

内存屏障本身不是一种优化方式, 而是你使用lock-free(CAS)的时候, 必须要配合使用内存屏障,因为CPU和memory之间有多级cache, CPU core只会更新cache-line, 而cache-line什么时候flush到memory, 这个是有一定延时的 ,在这个延时当中, 其他CPU core是无法得知你的更新的, 因为只有把cache-line flush到memory后, 其他core中的相应的cache-line才会被置为过期数据,所以如果要保证使用CAS能保证线程间互斥, 即乐观锁, 必须当一个core发生更新后, 其他所有core立刻知道并把相应的cache-line设为过期, 否则在这些core上执行CAS读到的都是过期数据.

内存屏障 = “立刻将cache-line flush到memory, 没有延时”

注:可参考java中volatile的原理和 happen-before语义 同样实现了内存屏障。

3.7 序号栅栏

    生产者序号wrapPoint,比消费者序号的最小值minSequence大就不断自旋

disruptor的多生产者多消费者模型

多生产者多消费者完整代码:

https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-api/src/main/java/com/lzhsite/disruptor/heigh/multi


相关文章
|
11月前
|
存储 缓存 算法
并发编程系列教程(12) - Disruptor框架
并发编程系列教程(12) - Disruptor框架
92 0
|
存储 缓存 JavaScript
深入浅出 RxJS 核心原理(响应式编程篇)
在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。
329 0
深入浅出 RxJS 核心原理(响应式编程篇)
|
5月前
|
存储 关系型数据库 MySQL
纯c协程框架NtyCo实现与原理
纯c协程框架NtyCo实现与原理
140 1
|
消息中间件 网络协议 Java
|
前端开发 Java Maven
响应式编程实战(08)-WebFlux,使用注解编程模式构建异步非阻塞服务
响应式编程实战(08)-WebFlux,使用注解编程模式构建异步非阻塞服务
160 0
|
存储 Java
并发编程(十)线程池核心原理与源码剖析
并发编程(十)线程池核心原理与源码剖析
110 0
并发编程(十二)ForkJoin框架使用
并发编程(十二)ForkJoin框架使用
84 0
|
JavaScript 前端开发 Java
响应式编程简介之:Reactor
响应式编程简介之:Reactor
响应式编程简介之:Reactor