深入理解Disruptor

简介: Disruptor通过缓存行填充,利用CPU高速缓存,只是Disruptor“快”的一个因素,快的另一因素是“无锁”,尽可能发挥CPU本身的高速处理性能。

Disruptor通过缓存行填充,利用CPU高速缓存,只是Disruptor“快”的一个因素,快的另一因素是“无锁”,尽可能发挥CPU本身的高速处理性能。


1 缓慢的锁

Disruptor作为一个高性能的生产者-消费者队列系统,核心就是通过RingBuffer实现一个无锁队列。


Jdk像LinkedBlockingQueue队列库,比Disruptor RingBuffer慢很多。


1.1 链表数据在内存布局对高速缓存不友好

RingBuffer使用数组:

2.png


1.2 锁依赖

生产者-消费者模式里,可能有多个消费者,也可能多个生产者。

多个生产者都要往队尾指针添加新任务,产生多线程竞争。于是,做这事时,生产者就要拿到对队尾的锁。同样多个消费者去消费队头时,也就产生竞争。同样消费者也要拿到锁。


那只有一个生产者或一个消费者,是不是就没锁的竞争问题?No!生产者-消费者模式下,消费者比生产者快。不然,队列会积压,任务越堆越多:


越来越多的任务没能及时完成

内存也放不下

虽然生产者-消费者模型下,都有队列作为缓冲区,但大部分情况下,这缓冲区空。即使只有一个生产者和一个消费者,这生产者指向的队尾和消费者指向的队头是同一节点。于是,这两个生产者和消费者之间一样产生锁竞争。


在LinkedBlocking Queue锁机制通过ReentrantLock实现。用Java在JVM上直接实现的加锁机制,由JVM进行裁决。这锁的争夺,会把没有拿到锁的线程挂起等待,也需经过一次上下文切换(Context Switch)。


这上下文切换要做的和异常和中断里的一样。上下文切换过程,要把当前执行线程的寄存器等信息,保存到线程栈。即已加载到高速缓存的指令或数据,又回到主内存,会进步拖慢性能。


Disruptor的Benchmark测试:把一个long类型counter,从0自增到5亿


一种方式没任何锁

另外一个方式每次自增时都取一个锁

分别207毫秒和9603毫秒,性能差近50倍。


import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class LockBenchmark {

   public static void runIncrement() {

       long counter = 0;

       long max = 500000000L;

       long start = System.currentTimeMillis();

       while (counter < max) {

           counter++;

       }

       long end = System.currentTimeMillis();

       System.out.println("Time spent is " + (end-start) + "ms without lock");

   }

   public static void runIncrementWithLock() {

       Lock lock = new ReentrantLock();

       long counter = 0;

       long max = 500000000L;

       long start = System.currentTimeMillis();

       while (counter < max) {

           if (lock.tryLock()){

               counter++;

               lock.unlock();

           }

       }

       long end = System.currentTimeMillis();

       System.out.println("Time spent is " + (end-start) + "ms with lock");

   }

   public static void main(String[] args) {

       runIncrement();

       runIncrementWithLock();

   }

}


加锁和不加锁自增counter

Time spent is 207ms without lock

Time spent is 9603ms with lock

性能差出将近10倍


2 无锁的RingBuffer

加锁很慢,所以Disruptor“无锁”,即没有os层的锁。

Disruptor还利用CPU硬件支持的指令,CAS,Intel CPU对应指令 cmpxchg。


和直接在链表的头和尾加锁不同,RingBuffer创建一个Sequence对象,指向当前的RingBuffer的头和尾。这头和尾的标识不是通过指针实现,而是通过序号,类名叫Sequence。

1.png


RingBuffer中进行生产者和消费者之间的资源协调,是对比序号。

当Pro想往队列加新数据,它会把当前Pro的Sequence的序号,加上需要加入的新数据的数量,然后和实际的消费者所在的位置对比,看队列里是否有足够空间加入这些数据,而不会覆盖消费者还没处理完的数据。


Sequence就是通过CAS,即UNSAFE.compareAndSwapLong:


public boolean compareAndSet(final long expectedValue, final long newValue)

    {

        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);

    }

public long addAndGet(final long increment)

   {

       long currentValue;

       long newValue;

       do

       {

           currentValue = get();

           newValue = currentValue + increment;

       }

       while (!compareAndSet(currentValue, newValue));

       return newValue;



Sequence源码中的addAndGet,若CAS失败,会不断忙等待重试。

CAS不是基础库函数,也不是os实现的一个系统调用,而是一个CPU硬件支持的机器指令。Intel CPU的cmpxchg指令,compxchg [ax] (隐式参数,EAX累加器), [bx] (源操作数地址), [cx] (目标操作数地址):


第一个操作数不在指令里面出现,是一个隐式的操作数,也就是EAX累加寄存器里面的值

第二个操作数就是源操作数,并且指令会对比这个操作数和上面的累加寄存器里面的值

若值相同,CPU会把ZF(条件码寄存器里零标志位的值)置1,再把第三个操作数(即目标操作数)设置到源操作数的地址。


不相等,就会把源操作数里的值,设置到累加器寄存器。


对应伪代码:


IF [ax]< == [bx] THEN [ZF] = 1, [bx] = [cx]

                ELSE [ZF] = 0, [ax] = [bx]

单指令是原子的,即CAS时,无需再加锁,直接调用。无锁,CPU就像在赛道上行驶,不会遇到需上下文切换红灯而停下来。虽会遇到CAS这样复杂机器指令,就好像赛道上会有U型弯,不过不用完全停等待,CPU运行起来仍快得多。


3 CAS到底多快

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class LockBenchmark {

   public static void runIncrementAtomic()

   {

       AtomicLong counter = new AtomicLong(0);

       long max = 500000000L;

       long start = System.currentTimeMillis();

       while (counter.incrementAndGet() < max) {

       }

       long end = System.currentTimeMillis();

       System.out.println("Time spent is " + (end-start) + "ms with cas");

   }

   public static void main(String[] args) {

       runIncrementAtomic();

   }

}



Time spent is 3867ms with cas


incrementAndGet最终到CPU指令层面,就是CAS操作。它所花费时间,虽比没任何锁的操作慢一个数量级,但比使用ReentrantLock这样的操作系统锁的机制,还是减少一半时间。


4 总结

Java基础库里面的BlockingQueue,都要通过显示地加锁来保障生产者之间、消费者之间,乃至生产者和消费者之间,不会发生锁冲突的问题。


但加锁会大大拖慢性能。获取锁时,CPU没有执行计算相关指令,而要等待os或JVM进行锁竞争裁决。那些没有拿到锁而被挂起等待的线程,则需上下文切换。这上下文切换,会把挂起线程的寄存器里的数据放到线程的程序栈。即加载到高速缓存里面的数据也失效了,程序就变得更慢。


RingBuffer采用无锁方案,通过CAS进行序号自增和对比,使CPU无需获取os锁。而能继续顺序执行CPU指令。无上下文切换、os锁,程序就快。不过因为采用CAS忙等待(Busy-Wait),会使得CPU始终满负荷运转,消耗更多电,小缺点。

目录
相关文章
|
存储 安全 Java
【无锁并发框架Disruptor】
【无锁并发框架Disruptor】
162 0
|
算法 Java
Disruptor使用笔记
Disruptor使用笔记
58 0
|
消息中间件 Java Kafka
Java工具篇之Disruptor高性能队列
disruptor适用于多个线程之间的消息队列,`作用与ArrayBlockingQueue有相似之处`,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
1420 1
Java工具篇之Disruptor高性能队列
|
缓存 安全 Java
Java多线程-Disruptor性能用测
Java多线程框架Disruptor
349 1
Java多线程-Disruptor性能用测
|
存储 消息中间件 安全
Disruptor - 介绍(1)
开篇  Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。
1915 0
|
消息中间件 监控 Java
强如 Disruptor 也发生内存溢出?(下)
OutOfMemoryError 问题相信很多朋友都遇到过,相对于常见的业务异常(数组越界、空指针等)来说这类问题是很难定位和解决的。 本文以最近碰到的一次线上内存溢出的定位、解决问题的方式展开;希望能对碰到类似问题的同学带来思路和帮助。 主要从表现-->排查-->定位-->解决 四个步骤来分析和解决问题。
|
消息中间件 运维 监控
强如 Disruptor 也发生内存溢出?(上)
OutOfMemoryError 问题相信很多朋友都遇到过,相对于常见的业务异常(数组越界、空指针等)来说这类问题是很难定位和解决的。 本文以最近碰到的一次线上内存溢出的定位、解决问题的方式展开;希望能对碰到类似问题的同学带来思路和帮助。 主要从表现-->排查-->定位-->解决 四个步骤来分析和解决问题。
|
消息中间件 存储 缓存
Disruptor介绍与基本使用
Disruptor是英国外汇交易公司LMAX开发的高性能的并发框架,研发的初衷是解决内存队列的延迟问题,它是线程间通信的高效低延时的内存消息组件,它最大的特点是高性能。下面我就来给大家从各个维度去介绍一下这个组件
774 0