Disruptor通过缓存行填充,利用CPU高速缓存,只是Disruptor“快”的一个因素,快的另一因素是“无锁”,尽可能发挥CPU本身的高速处理性能。
1 缓慢的锁
Disruptor作为一个高性能的生产者-消费者队列系统,核心就是通过RingBuffer实现一个无锁队列。
Jdk像LinkedBlockingQueue队列库,比Disruptor RingBuffer慢很多。
1.1 链表数据在内存布局对高速缓存不友好
RingBuffer使用数组:
1.2 锁依赖
生产者-消费者模式里,可能有多个消费者,也可能多个生产者。
多个生产者都要往队尾指针添加新任务,产生多线程竞争。于是,做这事时,生产者就要拿到对队尾的锁。同样多个消费者去消费队头时,也就产生竞争。同样消费者也要拿到锁。
那只有一个生产者或一个消费者,是不是就没锁的竞争问题?No!生产者-消费者模式下,消费者比生产者快。不然,队列会积压,任务越堆越多:
越来越多的任务没能及时完成
内存也放不下
虽然生产者-消费者模型下,都有队列作为缓冲区,但大部分情况下,这缓冲区空。即使只有一个生产者和一个消费者,这生产者指向的队尾和消费者指向的队头是同一节点。于是,这两个生产者和消费者之间一样产生锁竞争。
在LinkedBlocking Queue锁机制通过ReentrantLock实现。用Java在JVM上直接实现的加锁机制,由JVM进行裁决。这锁的争夺,会把没有拿到锁的线程挂起等待,也需经过一次上下文切换(Context Switch)。
这上下文切换要做的和异常和中断里的一样。上下文切换过程,要把当前执行线程的寄存器等信息,保存到线程栈。即已加载到高速缓存的指令或数据,又回到主内存,会进步拖慢性能。
Disruptor的Benchmark测试:把一个long类型counter,从0自增到5亿
一种方式没任何锁
另外一个方式每次自增时都取一个锁
分别207毫秒和9603毫秒,性能差近50倍。
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockBenchmark {
public static void runIncrement() {
long counter = 0;
long max = 500000000L;
long start = System.currentTimeMillis();
while (counter < max) {
counter++;
}
long end = System.currentTimeMillis();
System.out.println("Time spent is " + (end-start) + "ms without lock");
}
public static void runIncrementWithLock() {
Lock lock = new ReentrantLock();
long counter = 0;
long max = 500000000L;
long start = System.currentTimeMillis();
while (counter < max) {
if (lock.tryLock()){
counter++;
lock.unlock();
}
}
long end = System.currentTimeMillis();
System.out.println("Time spent is " + (end-start) + "ms with lock");
}
public static void main(String[] args) {
runIncrement();
runIncrementWithLock();
}
}
加锁和不加锁自增counter
Time spent is 207ms without lock
Time spent is 9603ms with lock
性能差出将近10倍
2 无锁的RingBuffer
加锁很慢,所以Disruptor“无锁”,即没有os层的锁。
Disruptor还利用CPU硬件支持的指令,CAS,Intel CPU对应指令 cmpxchg。
和直接在链表的头和尾加锁不同,RingBuffer创建一个Sequence对象,指向当前的RingBuffer的头和尾。这头和尾的标识不是通过指针实现,而是通过序号,类名叫Sequence。
RingBuffer中进行生产者和消费者之间的资源协调,是对比序号。
当Pro想往队列加新数据,它会把当前Pro的Sequence的序号,加上需要加入的新数据的数量,然后和实际的消费者所在的位置对比,看队列里是否有足够空间加入这些数据,而不会覆盖消费者还没处理完的数据。
Sequence就是通过CAS,即UNSAFE.compareAndSwapLong:
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue + increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
Sequence源码中的addAndGet,若CAS失败,会不断忙等待重试。
CAS不是基础库函数,也不是os实现的一个系统调用,而是一个CPU硬件支持的机器指令。Intel CPU的cmpxchg指令,compxchg [ax] (隐式参数,EAX累加器), [bx] (源操作数地址), [cx] (目标操作数地址):
第一个操作数不在指令里面出现,是一个隐式的操作数,也就是EAX累加寄存器里面的值
第二个操作数就是源操作数,并且指令会对比这个操作数和上面的累加寄存器里面的值
若值相同,CPU会把ZF(条件码寄存器里零标志位的值)置1,再把第三个操作数(即目标操作数)设置到源操作数的地址。
不相等,就会把源操作数里的值,设置到累加器寄存器。
对应伪代码:
IF [ax]< == [bx] THEN [ZF] = 1, [bx] = [cx]
ELSE [ZF] = 0, [ax] = [bx]
单指令是原子的,即CAS时,无需再加锁,直接调用。无锁,CPU就像在赛道上行驶,不会遇到需上下文切换红灯而停下来。虽会遇到CAS这样复杂机器指令,就好像赛道上会有U型弯,不过不用完全停等待,CPU运行起来仍快得多。
3 CAS到底多快
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockBenchmark {
public static void runIncrementAtomic()
{
AtomicLong counter = new AtomicLong(0);
long max = 500000000L;
long start = System.currentTimeMillis();
while (counter.incrementAndGet() < max) {
}
long end = System.currentTimeMillis();
System.out.println("Time spent is " + (end-start) + "ms with cas");
}
public static void main(String[] args) {
runIncrementAtomic();
}
}
Time spent is 3867ms with cas
incrementAndGet最终到CPU指令层面,就是CAS操作。它所花费时间,虽比没任何锁的操作慢一个数量级,但比使用ReentrantLock这样的操作系统锁的机制,还是减少一半时间。
4 总结
Java基础库里面的BlockingQueue,都要通过显示地加锁来保障生产者之间、消费者之间,乃至生产者和消费者之间,不会发生锁冲突的问题。
但加锁会大大拖慢性能。获取锁时,CPU没有执行计算相关指令,而要等待os或JVM进行锁竞争裁决。那些没有拿到锁而被挂起等待的线程,则需上下文切换。这上下文切换,会把挂起线程的寄存器里的数据放到线程的程序栈。即加载到高速缓存里面的数据也失效了,程序就变得更慢。
RingBuffer采用无锁方案,通过CAS进行序号自增和对比,使CPU无需获取os锁。而能继续顺序执行CPU指令。无上下文切换、os锁,程序就快。不过因为采用CAS忙等待(Busy-Wait),会使得CPU始终满负荷运转,消耗更多电,小缺点。