Java多线程-Disruptor性能用测

简介: Java多线程框架Disruptor

1. 背景

好像业界有个说法,就是在单机多线程场景下 Disruptor 号称是无敌的存在,性能可以甩 JDK 原生的多线程框架好几条街。我也是秉着探索的脚步,想要证实下这个结论。因此为验证 阻塞队列 和 Disruptor 的性能差距,这里并不做过多的严谨测试,仅仅按照 1千万、5千万、1亿三个维度的数量级作为采样测试。

首先为了将这三个数量级封装为一个抽象接口,

  • 字典类

public interface Constants {

    /**
     * 一亿
     */
    int EVENT_NUM_OHM = 100000000;

    /**
     * 五千万
     */
    int EVENT_NUM_FM = 50000000;

    /**
     * 一千万
     */
    int EVENT_NUM_OM = 10000000;
}
  • 数据类:用以模拟真实场景下的复杂对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataCase implements Serializable {
    private Long id ;
    private String name;
}
  • 有界阻塞队列:为了有个 JDK 原生性能作为参考,这里利用队列的写入和取出元素,通过一个有界阻塞队列,实现生产者和消费者。

public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
        ArrayBlockingQueueDemo demo = new ArrayBlockingQueueDemo();
        demo.testCostTime();
    }

    public void testCostTime(){
        // 1、手工创建线程池
        ThreadPoolExecutor pool = ThreadPoolsUtil.doCreate(2,2,"disruptor");
        // 2、声明队列的大小
        final ArrayBlockingQueue<DataCase> queue = new ArrayBlockingQueue<>(100000000);
        final long startTime = System.currentTimeMillis();
        // 3、提交生产者队列
        pool.submit(new ProvideRunable(queue));
        // 4、提交消费者队列
        pool.submit(new CoustmRunable(queue,startTime));
        // 4、关闭队列
        pool.shutdown();
    }

    class ProvideRunable implements Runnable{
        private ArrayBlockingQueue<DataCase> queue;
        // 3.1、构造函数,将队列引入进来
        ProvideRunable(ArrayBlockingQueue<DataCase> queue){
            this.queue=queue;
        }

        @Override
        public void run() {
            long i = 0;
            // 3.2、循环
            while(i < Constants.EVENT_NUM_FM) {
                try {
                    // 3.3、放入元素
                    queue.put(new DataCase(i, "c"+i));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                i++;
            }
        }
    }

    class CoustmRunable implements Runnable{
        private ArrayBlockingQueue<DataCase> queue;
        private long startTime;

        // 4.1、构造函数,引入队列和开始时间
        CoustmRunable(ArrayBlockingQueue<DataCase> queue,long startTime){
            this.queue=queue;
            this.startTime=startTime;
        }

        @Override
        public void run() {
            long k = 0;
            // 4.2、遍历循环
            while (k < Constants.EVENT_NUM_FM) {
                try {
                    // 4.3、取出元素
                    queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                k++;
            }
            long endTime = System.currentTimeMillis();
            System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");
        }
    }
}

在数量为 一亿的场景下,执行耗费时间是 21830 毫秒

运行结果

2. Disruptor

2.1. 核心参数

2.1.1. RingBuffer

类似环形队列的数组,随着不停地填充这个 buffer ,这个序号会一直增长,直到绕过这个环。如果想要找到数组中当前序号指向的元素,可以通过取模操作:


sequence mod array length = array index

2019120618005050-2

如果觉得想要更深入了解 原文链接中文翻译链接

2.1.2. Sequence

发布事件的递增序列号 value 前后各增加 7 个长整型的变量,保证 value 变量在一个单独的 64 字节的缓存行中,不会与其它变量在同一个缓存行,避免伪共享。

通过 sun.misc.Unsafe 类实现了对 value 变量的 CAS 操作。

2.1.3. Sequencer

主要针对生产者的模式,分为单生产者模式和多生产者模式。

  • 单生产者模式

在获取下一个可用序列号时,不会存在多线程竞争,用长整型变量 nextSequence ,而没有使用 Sequence 从而可以提高处理速度。

  • 多生产者模式

存在多个生产者同时获取下一个可用序列号的情况,存在多线程竞争,所以使用 Sequence 类型变量 cursor ,利用 Sequence 类提供的 volatile 修饰的长整型变量 valueUnsafe 提供的 CAS 操作,保证在多线程环境中序列号更新的线程安全,但影响了处理速度。

2.1.4. WaitStrategy

决定一个消费者如何等待生产者将 Event 置入 Disruptor 的策略,都是针对消费者线程的。

主要策略有:

  • BlockingWaitStrategy:最低效且折中的策略,但其对CPU的消耗最小,并且在各种部署环境中能提供更加一致的性能表现;
  • SleepingWaitStrategy:也是一种折中方案,与 BlockingWaitStrategy 类似
  • BusySpinWaitStrategy:可能出现当没有可用序列号时,长期占用CPU,不释放CPU使用权,导致其它线程无法获取CPU使用权。
  • YieldingWaitStrategy:通过 Thread.yield 方法,实现生产者和消费者之间的同步,与等待策略相比,该策略100%使用CPU
  • TimeoutBlockingWaitStrategy:

2.2. 性能测试

2.2.1. 生产者单核


public class DisruptorDemo {

    public static void main(String[] args) {
        int ringBufferSize = 65536;
        // 1、自定义 ThreadFactory
        ThreadFactory threadFactory = new PippinThreadFactory("disruptor");
        // 2、Disruptor 构造函数
        final Disruptor<DataCase> disruptor = new Disruptor<>(
                () -> new DataCase(),
                // 2.1 定义环形数组的大小
                ringBufferSize,
                threadFactory,
                // 2.2 单生产者模式
                ProducerType.SINGLE,
                // 2.3 CPU饱和的模式
                new YieldingWaitStrategy()
        );

        // 3、处理器
        ConsumerCase consumerCase = new ConsumerCase();
        disruptor.handleEventsWith(consumerCase);
        disruptor.start();

        // 4、手工创建线程池
        ThreadPoolExecutor pool = ThreadPoolsUtil.doCreate(2,2,"Pool");

        // 5、提交线程池
        pool.submit(()->{
            RingBuffer<DataCase> ringBuffer = disruptor.getRingBuffer();
            for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {
                long seq = ringBuffer.next();
                DataCase data = ringBuffer.get(seq);
                data.setId(i);
                data.setName("c" + i);
                ringBuffer.publish(seq);
            }
        });

        pool.shutdown();
    }

}

class ConsumerCase implements EventHandler<DataCase> {
    private long startTime;
    private int i;

    public ConsumerCase() {
        this.startTime = System.currentTimeMillis();
    }

    @Override
    public void onEvent(DataCase event, long sequence, boolean endOfBatch) throws Exception {
        i++;
        if (i == Constants.EVENT_NUM_OHM) {
            long endTime = System.currentTimeMillis();
            System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
        }
    }
}

20220314152827

2.2.2. 生产者多核

目录
相关文章
|
7天前
|
Java 开发者
奇迹时刻!探索 Java 多线程的奇幻之旅:Thread 类和 Runnable 接口的惊人对决
【8月更文挑战第13天】Java的多线程特性能显著提升程序性能与响应性。本文通过示例代码详细解析了两种核心实现方式:Thread类与Runnable接口。Thread类适用于简单场景,直接定义线程行为;Runnable接口则更适合复杂的项目结构,尤其在需要继承其他类时,能保持代码的清晰与模块化。理解两者差异有助于开发者在实际应用中做出合理选择,构建高效稳定的多线程程序。
28 7
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
1天前
|
Java
java开启线程的四种方法
这篇文章介绍了Java中开启线程的四种方法,包括继承Thread类、实现Runnable接口、实现Callable接口和创建线程池,每种方法都提供了代码实现和测试结果。
java开启线程的四种方法
|
4天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践
|
6天前
|
缓存 NoSQL Redis
一天五道Java面试题----第九天(简述MySQL中索引类型对数据库的性能的影响--------->缓存雪崩、缓存穿透、缓存击穿)
这篇文章是关于Java面试中可能会遇到的五个问题,包括MySQL索引类型及其对数据库性能的影响、Redis的RDB和AOF持久化机制、Redis的过期键删除策略、Redis的单线程模型为何高效,以及缓存雪崩、缓存穿透和缓存击穿的概念及其解决方案。
|
6天前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。
|
5天前
|
缓存 前端开发 JavaScript
一篇文章助你搞懂java中的线程概念!纯干货,快收藏!
【8月更文挑战第11天】一篇文章助你搞懂java中的线程概念!纯干货,快收藏!
13 0
一篇文章助你搞懂java中的线程概念!纯干货,快收藏!
|
7天前
|
缓存 监控 Java
Java性能优化:从单线程执行到线程池管理的进阶实践
在Java开发中,随着应用规模的不断扩大和用户量的持续增长,性能优化成为了一个不可忽视的重要课题。特别是在处理大量并发请求或执行耗时任务时,单线程执行模式往往难以满足需求,这时线程池的概念便应运而生。本文将从应用场景举例出发,探讨Java线程池的使用,并通过具体案例和核心代码展示其在实际问题解决中的强大作用。
22 1
|
4天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践