每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习

简介: 每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习

1.来源


Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内部的内存队列的延迟问题,而不是分布式队列。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。


2.应用背景和介绍


据目前资料显示:应用Disruptor的知名项目有如下的一些:Storm, Camel, Log4j2,还有目前的美团点评技术团队也有很多不少的应用,或者说有一些借鉴了它的设计机制。

Disruptor是一个高性能的线程间异步通信的框架,即在同一个JVM进程中的多线程间消息传递。


Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单。


disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。

 官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,目测性能只有有5~10倍左右的提升。


队列


队列是属于一种数据结构,队列采用的FIFO(first in firstout),新元素(等待进入队列的元素)总是被插入到尾部,而读取的时候总是从头部开始读取。在计算中队列一般用来做排队(如线程池的等待排队,锁的等待排队),用来做解耦(生产者消费者模式),异步等等


在jdk中的队列都实现了java.util.Queue接口,在队列中又分为两类,一类是线程不安全的,ArrayDeque,LinkedList等等,还有一类都在java.util.concurrent包下属于线程安全,而在我们真实的环境中,我们的机器都是属于多线程,当多线程对同一个队列进行排队操作的时候,如果使用线程不安全会出现,覆盖数据,数据丢失等无法预测的事情,所以我们这个时候只能选择线程安全的队列。

其次还剩下ArrayBlockingQueue,LinkedBlockingQueue两个队列,他们两个都是用ReentrantLock控制的线程安全,他们两个的区别一个是数组,一个是链表,在队列中,一般获取这个队列元素之后紧接着会获取下一个元素,或者一次获取多个队列元素都有可能,而数组在内存中地址是连续的,在操作系统中会有缓存的优化(下面也会介绍缓存行),所以访问的速度会略胜一筹,我们也会尽量去选择ArrayBlockingQueue。而事实证明在很多第三方的框架中,比如早期的log4j异步,都是选择的ArrayBlockingQueue。


在jdk中提供的线程安全的队列下面简单列举部分队列:\

我们可以看见,我们无锁的队列是无界的,有锁的队列是有界的,这里就会涉及到一个问题,我们在真正的线上环境中,无界的队列,对我们系统的影响比较大,有可能会导致我们内存直接溢出,所以我们首先得排除无界队列,当然并不是无界队列就没用了,只是在某些场景下得排除。其次还剩下ArrayBlockingQueue,LinkedBlockingQueue两个队列,他们两个都是用ReentrantLock控制的线程安全,他们两个的区别一个是数组,一个是链表。

(LinkedBlockingQueue 其实也是有界队列,但是不设置大小时就时Integer.MAX_VALUE),ArrayBlockingQueue,LinkedBlockingQueue也有自己的弊端,就是性能比较低,为什么jdk会增加一些无锁的队列,其实就是为了增加性能,很苦恼,又需要无锁,又需要有界,答案就是Disruptor

Disruptor

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,并且是一个开源的并发框架,并获得2011Duke’s程序框架创新奖。能够在无锁的情况下实现网络的Queue并发操作,基于Disruptor开发的系统单线程能支撑每秒600万订单。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在内部集成了Disruptor用来替代jdk的队列,以此来获得高性能。


为什么这么牛逼?

在Disruptor中有三大杀器:

  • CAS
  • 消除伪共享
  • RingBuffer

 

3.1.1锁和CAS

我们ArrayBlockingQueue为什么会被抛弃的一点,就是因为用了重量级lock锁,在我们加锁过程中我们会把锁挂起,解锁后,又会把线程恢复,这一过程会有一定的开销,并且我们一旦没有获取锁,这个线程就只能一直等待,这个线程什么事也不能做。


CAS(compare and swap),顾名思义先比较在交换,一般是比较是否是老的值,如果是的进行交换设置,大家熟悉乐观锁的人都知道CAS可以用来实现乐观锁,CAS中没有线程的上下文切换,减少了不必要的开销

而我们的Disruptor也是基于CAS。


3.1.2伪共享


到了伪共享就不得不说计算机CPU缓存,缓存大小是CPU的重要指标之一,而且缓存的结构和大小对CPU速度的影响非常大,CPU内缓存的运行频率极高,一般是和处理器同频运作,工作效率远远大于系统内存和硬盘。实际工作时,CPU往往需要重复读取同样的数据块,而缓存容量的增大,可以大幅度提升CPU内部读取数据的命中率,而不用再到内存或者硬盘上寻找,以此提高系统性能。但是从CPU芯片面积和成本的因素来考虑,缓存都很小。

CPU缓存可以分为一级缓存,二级缓存,如今主流CPU还有三级缓存,甚至有些CPU还有四级缓存。每一级缓存中所储存的全部数据都是下一级缓存的一部分,这三种缓存的技术难度和制造成本是相对递减的,所以其容量也是相对递增的。

每一次你听见intel发布新的cpu什么,比如i7-7700k,8700k,都会对cpu缓存大小进行优化,感兴趣可以自行下来搜索,这些的发布会或者发布文章。

Martin和Mike的 QConpresentation演讲中给出了一些每个缓存时间:

缓存行


在cpu的多级缓存中,并不是以独立的项来保存的,而是类似一种pageCahe的一种策略,以缓存行来保存,而缓存行的大小通常是64字节,在Java中Long是8个字节,所以可以存储8个Long,举个例子,你访问一个long的变量的时候,他会把帮助再加载7个,我们上面说为什么选择数组不选择链表,也就是这个原因,在数组中可以依靠缓冲行得到很快的访问。

 

缓存行是万能的吗?NO,因为他依然带来了一个缺点,我在这里举个例子说明这个缺点,可以想象有个数组队列,ArrayQueue,他的数据结构如下:

对于maxSize是我们一开始就定义好的,数组的大小,对于currentIndex,是标志我们当前队列的位置,这个变化比较快,可以想象你访问maxSize的时候,是不是把currentIndex也加载进来了,这个时候,其他线程更新currentIndex,就会把cpu中的缓存行置位无效,请注意这是CPU规定的,他并不是只吧currentIndex置位无效,如果此时又继续访问maxSize他依然得继续从内存中读取,但是MaxSize却是我们一开始定义好的,我们应该访问缓存即可,但是却被我们经常改变的currentIndex所影响。

Padding的魔法

为了解决上面缓存行出现的问题,在Disruptor中采用了Padding的方式

其中的Value就被其他一些无用的long变量给填充了。这样你修改Value的时候,就不会影响到其他变量的缓存行。


最后顺便一提,在jdk8中提供了@Contended的注解,当然一般来说只允许Jdk中内部,如果你自己使用那就得配置Jvm参数 -RestricContentended = fase,将限制这个注解置位取消。很多文章分析了ConcurrentHashMap,但是都把这个注解给忽略掉了,在ConcurrentHashMap中就使用了这个注解,在ConcurrentHashMap每个桶都是单独的用计数器去做计算,而这个计数器由于时刻都在变化,所以被用这个注解进行填充缓存行优化,以此来增加性能。




作者:tracy_668

链接:https://www.jianshu.com/p/bad7b4b44e48

来源:简书

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

 

下面的例子是测试利用cache line的特性和不利用cache line的特性的效果对比.

 

什么是伪共享

ArrayBlockingQueue有三个成员变量:

这三个变量很容易放到一个缓存行中, 但是之间修改没有太多的关联. 所以每次修改, 都会使之前缓存的数据失效, 从而不能完全达到共享的效果.

如上图所示, 当生产者线程put一个元素到ArrayBlockingQueue时, putIndex会修改, 从而导致消费者线程的缓存中的缓存行无效, 需要从主存中重新读取.

这种无法充分使用缓存行特性的现象, 称为伪共享

3.1.3RingBuffer

ringbuffer到底是什么

它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程)间传递数据的buffer。

基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用的元素。(如下图右边的图片表示序号,这个序号指向数组的索引4的位置

 

 

随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。

要找到数组中当前序号指向的元素,可以通过sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。

常用的队列之间的区别

  • 没有尾指针。只维护了一个指向下一个可用位置的序号。
  • 不删除buffer中的数据,也就是说这些数据一直存放在buffer中,直到新的数据覆盖他们

ringbuffer采用这种数据结构原因

  • 因为它是数组,所以要比链表快,数组内元素的内存地址的连续性存储的。这是对CPU缓存友好的—也就是说,在硬件级别,数组中的元素是会被预加载的,因此在ringbuffer当中,cpu无需时不时去主存加载数组中的下一个元素。因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行。
  • 其次,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。

如何从Ringbuffer读取


消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9。

费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.

 

final long availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12。ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号.

接下来

接下来,消费者会一直逛来逛去,等待更多数据被写入 Ring Buffer。并且,写入数据后消费者会收到通知——节点 9,10,11 和 12 已写入。现在序号 12 到了,消费者可以指示 ConsumerBarrier 去拿这些序号里的数据了。


在Disruptor中采用了数组的方式保存了我们的数据,上面我们也介绍了采用数组保存我们访问时很好的利用缓存,但是在Disruptor中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并不是真正的环形数组,在RingBuffer中是采用取余的方式进行访问的,比如数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。


实际上,在这些框架中取余并不是使用%运算,都是使用的&与运算,这就要求你设置的大小一般是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增加了访问速度。

如果在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。

  • Producer会向这个RingBuffer中填充元素,填充元素的流程是首先从RingBuffer读取下一个Sequence,之后在这个Sequence位置的槽填充数据,之后发布。
  • Consumer消费RingBuffer中的数据,通过SequenceBarrier来协调不同的Consumer的消费先后顺序,以及获取下一个消费位置Sequence。
  • Producer在RingBuffer写满时,会从头开始继续写替换掉以前的数据。但是如果有SequenceBarrier指向下一个位置,则不会覆盖这个位置,阻塞到这个位置被消费完成。Consumer同理,在所有Barrier被消费完之后,会阻塞到有新的数据进来。

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构
    为了避免垃圾回收, 采用数组而非链表. 同时, 数组对处理器的缓存机制更加友好.
  • 元素位置定位
    数组长度2^n, 通过位运算, 加快定位的速度. 下标采取递增的形式. 不用担心index溢出的问题. index是long类型, 即使100万QPS的处理速度, 也需要30万年才能用完.
  • 无锁设计
    每个生产者或者消费者线程, 会先申请可以操作的元素在数组中的位置, 申请到之后, 直接在该位置写入或者读取数据.

下面忽略数组的环形结构, 介绍一下如何实现无锁设计. 整个过程通过原子变量CAS, 保证操作的线程安全.

一个生产者

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入, 则返回最大的序列号. 这儿主要判断是否会覆盖未读的元素
  3. 若是返回的正确, 则生产者开始写入元素.

多个生产者

多个生产者的情况下, 会遇到“如何防止多个线程重复写同一个元素”的问题. Disruptor的解决方法是, 每个线程获取不同的一段数组空间进行操作. 这个通过CAS很容易达到. 只需要在分配元素的时候, 通过CAS判断一下这段空间是否已经分配出去即可.

但是会遇到一个新问题: 如何防止读取的时候, 读到还未写的元素. Disruptor在多个生产者的情况下, 引入了一个与Ring Buffer大小相同的buffer: available Buffer. 当某个位置写入成功的时候, 便把availble Buffer相应的位置置位, 标记为写入成功. 读取的时候, 会遍历available Buffer, 来判断元素是否已经就绪.


读数据

生产者多线程写入的情况会复杂很多:

  1. 申请读取到序号n;
  2. 若writer cursor >= n, 这时仍然无法确定连续可读的最大下标. 从reader cursor开始读取available Buffer, 一直查到第一个不可用的元素, 然后返回最大连续可读元素的位置;
  3. 消费者读取元素.

如下图所示, 读线程读到下标为2的元素, 三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据, 写线程被分配到的最大元素下标是11.

读线程申请读取到下标从3到11的元素, 判断writer cursor>=11. 然后开始读取availableBuffer, 从3开始, 往后读取, 发现下标为7的元素没有生产成功, 于是WaitFor(11)返回6.

然后, 消费者读取下标从3到6共计4个元素.


 

 

写数据

多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入, 则返回最大的序列号. 每个生产者会被分配一段独享的空间;
  3. 生产者写入元素, 写入元素的同时设置available Buffer里面相应的位置, 以标记自己哪些位置是已经写入成功的.

如下图所示, Writer1和Writer2两个线程写入数组, 都申请可写的数组空间. Writer1被分配了下标3到下表5的空间, Writer2被分配了下标6到下标9的空间.

Writer1写入下标3位置的元素, 同时把available Buffer相应位置置位, 标记已经写入成功, 往后移一位, 开始写下标4位置的元素. Writer2同样的方式. 最终都写入完成.

 

防止不同生产者对同一段空间写入的代码, 如下所示:

通过do/while循环的条件cursor.compareAndSet(current, next), 来判断每次申请的空间是否已经被其他生产者占据. 假如已经被占据, 该函数会返回失败, While循环重新执行, 申请写入空间.


消费者的流程与生产者非常类似, 这儿就不多描述了. Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能.

3.2Disruptor怎么使用

package concurrent;
 
import sun.misc.Contended;
 
import java.util.concurrent.ThreadFactory;
 
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
 
/**
 * @Description:
 * @Created on 2019-10-04
 */
public class DisruptorTest {
    public static void main(String[] args) throws Exception {
        // 队列中的元素
        class Element {
            @Contended
            private String value;
 
 
            public String getValue() {
                return value;
            }
 
            public void setValue(String value) {
                this.value = value;
            }
        }
 
        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            int i = 0;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread" + String.valueOf(i++));
            }
        };
 
        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };
 
        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {
                System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence);
//                Thread.sleep(10000000);
            }
        };
 
 
        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();
 
        // 指定RingBuffer的大小
        int bufferSize = 8;
 
        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
 
        // 设置EventHandler
        disruptor.handleEventsWith(handler);
 
        // 启动disruptor的线程
        disruptor.start();
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((element, sequence) -> {
                System.out.println("之前的数据" + element.getValue() + "当前的sequence" + sequence);
                element.setValue("我是第" + sequence + "个");
            });
 
        }
    }
}


在Disruptor中有几个比较关键的:


ThreadFactory:这是一个线程工厂,用于我们Disruptor中生产、消费的时候需要的线程。

EventFactory:事件工厂,用于产生我们队列元素的工厂。在Disruptor中,他会在初始化的时候直接填充满RingBuffer,一次到位。

EventHandler:用于处理Event的handler,这里一个EventHandler可以看做是一个消费者,但是多个EventHandler他们都是独立消费的队列。

WorkHandler:也是用于处理Event的handler,和上面区别在于,多个消费者都是共享同一个队列。

WaitStrategy:等待策略,在Disruptor中有多种策略,来决定消费者在消费时,如果没有数据采取的策略是什么?下面简单列举一下Disruptor中的部分策略

BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。

BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu

YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

相关文章
|
7月前
|
算法 安全 编译器
并发的三大特性
并发的三大特性
82 1
|
2月前
|
缓存 安全 算法
高性能无锁并发框架Disruptor,太强了!
高性能无锁并发框架Disruptor,太强了!
高性能无锁并发框架Disruptor,太强了!
|
5月前
|
机器学习/深度学习 Java 数据挖掘
线程操纵术之更优雅的并行策略问题之并发和并行有区别问题如何解决
线程操纵术之更优雅的并行策略问题之并发和并行有区别问题如何解决
|
6月前
|
负载均衡 并行计算 Java
分布式系统中,利用并行和并发来提高整体的处理能力
分布式系统中,利用并行和并发来提高整体的处理能力
|
架构师
架构系列——架构师必备基础:并发、并行与多线程关系
架构系列——架构师必备基础:并发、并行与多线程关系
|
消息中间件 SQL 资源调度
|
Java
并发三大特性
并发三大特性
41 0
|
设计模式 算法 安全
并发 并行 同步 异步 你分清了吗
并发 并行 同步 异步 你分清了吗
高性能无锁并发框架Disruptor,太强了
Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单

相关实验场景

更多