《重学Java高并发》Disruptor使用实战

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 《重学Java高并发》Disruptor使用实战

上文已经详细介绍了disruptor,也体会了并发编程的奥妙,接下来将理论结合实战,本文和大家分享一下disruptor的使用,加深对disruptor工具包对理解。


1、 disruptor常用类一览


disruptor的常用类体系如下图所示:

6ed5d300b84bc4fcabfd1e3afb91d2ac.png

其职责说明如下:


  • RingBuffer 环形队列,disruptor中的核心存储类
  • Sequencer 序号实现器,维护发送者发送的序号生成逻辑、消费方获取可消费的序号,是无锁化访问的核心实现类,共有两个实现类,MultiProducerSequencer为多生产者实现类、SingleProducerSequencer单生产者实现类。
  • WaitStrategy 等待策略,消息发送者在容器已写满时、消费者在无消费数据时的等待策略,disruptor提供了N种实现类:

WaitStrategy在创建RingBuffer时指定,默认为BlockingWaitStrategy

  • BlockingWaitStrategy 基于juc的阻塞等待。
  • LiteTimeoutBlockingWaitStrategy 带超时时间的阻塞等待。
  • YieldingWaitStrategy 先自旋100次,如果还不满足条件,则每次调用yield方法,让出CPU,重新参与调度。
  • BusySpinWaitStrategy 自旋,直到满足条件,生产环境慎用
  • TimeoutBlockingWaitStrategy 带超时时间的阻塞等待,与LiteTimeoutBlockingWaitStrategy的区别是TimeoutBlockingWaitStrategy等待时间必须严格低于设定的值。
  • SleepingWaitStrategy 前100次自旋每一次都调用yield,然后阻塞1ms ,继续重试。
  • PhasedBackoffWaitStrategy 组合策略,可以指定上述策略,然后退化为 yield自旋。
  • SequenceBarrier 序号栈栏。在流水线上有多个步骤,后一个步骤必须依赖前一个步骤的完成,栈栏的作用就在于此。
  • EventFactory
    事件生成器工厂类,RingBuffer的设计为力避免频繁的垃圾回收,在RingBuffer中存储的值会预先创建,生产者获取一个Event对象,并填充具体的值,故通常事件对象通常创建的事一个包装类。
  • EventProcessor 事件处理器,disruptor中提供了两类事件处理器WorkProcessorBatchEventProcessor(批处理),它的职责是从RingBuffer中获取可消费的事件,然后调用EventHandler的onEvent方法。
  • EventHandler事件处理器在获取一个可处理的事件后调用EventHandler的onEvent方法,这也是用户自定处理程序的入口,即编写用户业务代码的扩展点
  • ExceptionHandler 异常处理策略。


2、disruptor在canal中的运用


首先以笔者在工作中遇到一个经典使用场景来和大家观摩一下disruptor的基本使用。

在互联网行业中有一种经典的读写分离架构:数据异构,以物流下单为示例,通常关系型数据库只负责订单的创建业务,而关于订单查询、订单轨迹查询等查询类业务,通常会去查询es,依此来降低数据库压力,但接踵而来的问题是如何将数据库的数据准实时同步到Es呢?canal闪亮登场,其核心理念就是订阅并解析binlog,其基本的流程如下:

9f06c8c06cb1d92ca388de6f40215892.png

在示例中解析binlog的目的是提取数据的变化,即DML语句(插入、更新、删除),将这些数据变更在目标端进行重放,为了提高性能,采用disruptor框架提高性能,该如何实现呢?


  • 将解析动作分解为两步,第一步判断事件是否是dml事件,即是否需要解析。
  • 解析dml


为什么要这样拆分呢?一是将粒度降低,解耦,灵活提供不一样的并发度。

接下来我们看一下canal中是如何使用disruptor来解决该问题的。


2.1 创建EventFactory


首先需要创建一个EventFactory,用于填充RingBuffer中的对象,避免过多垃圾回收。

74e86fdb5949a5af46e993ba1fd0420a.png


2.2 创建RingBuffer


根据bingo dump协议,mysql的解析线程创建一个,故该场景下的事件发送者只有一个,创建一个单生产者的RingBuffer,其代码如下:

1ecf9674179fd420774623b714cc3fa8.png


2.3 创建相关的业务Handler


在该场景中需要定义两个handler,由于是具体的业务逻辑,这里不做详细介绍,简单截图说明如下:

8be169dab7c4ba724b98e7551e66ac16.png


2.4 创建栅栏(顺序性保证)


由于binlog解析场景有一个特殊的场景:并发解析但不能破坏顺序性语义。

9b8b711a78255a35f0851540e87c59ea.png


2.5创建事件处理器


6e951275fca482f1ac1891bca7248880.png

在解析事件类型、元数据时采用了BatchEventProcessor,但使用了批处理机制;而在解析dml具体数据时采用了WorkProcessor,支持多线程并发解析。


值得注意的是dml解析器必须依赖元数据解析器,故这里需要引入栈栏,具体是利用RingBuffer的addGatingSequences方法依次将自身处理器的sequences加入到RingBuffer中。


2.6 生产者端代码模板


生长者这边主要是将数据写入到RingBuffer中,从而让下游消费。

9c349def520523f6d2a9cc550a4f9bd5.png

上述代码的特点:


  • 使用 do while 循环,持续放入。
  • 首先调用 RingBuffer的tryNext,尝试获取一个可写的序号,如果获取不到,则重试。
  • 获取一个可写序号号,将值进行填充,然后调用publish方法进行发布,让消费端可感知。
相关文章
|
3月前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
88 2
|
22天前
|
缓存 NoSQL Java
高并发场景秒杀抢购超卖Bug实战重现
在电商平台的秒杀活动中,高并发场景下的抢购超卖Bug是一个常见且棘手的问题。一旦处理不当,不仅会引发用户投诉,还会对商家的信誉和利益造成严重损害。本文将详细介绍秒杀抢购超卖Bug的背景历史、业务场景、底层原理以及Java代码实现,旨在帮助开发者更好地理解和解决这一问题。
54 12
|
17天前
|
Java
Java基础却常被忽略:全面讲解this的实战技巧!
本次分享来自于一道Java基础的面试试题,对this的各种妙用进行了深度讲解,并分析了一些关于this的常见面试陷阱,主要包括以下几方面内容: 1.什么是this 2.this的场景化使用案例 3.关于this的误区 4.总结与练习
|
1月前
|
Java 程序员
Java基础却常被忽略:全面讲解this的实战技巧!
小米,29岁程序员,分享Java中`this`关键字的用法。`this`代表当前对象引用,用于区分成员变量与局部变量、构造方法间调用、支持链式调用及作为参数传递。文章还探讨了`this`在静态方法和匿名内部类中的使用误区,并提供了练习题。
34 1
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
69 6
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
缓存 监控 Java
Java 线程池在高并发场景下有哪些优势和潜在问题?
Java 线程池在高并发场景下有哪些优势和潜在问题?
|
3月前
|
开发框架 Java 程序员
揭开Java反射的神秘面纱:从原理到实战应用!
本文介绍了Java反射的基本概念、原理及应用场景。反射允许程序在运行时动态获取类的信息并操作其属性和方法,广泛应用于开发框架、动态代理和自定义注解等领域。通过反射,可以实现更灵活的代码设计,但也需注意其性能开销。
63 1
|
3月前
|
设计模式 缓存 Java
Java高并发处理机制
Java高并发处理机制
34 1