概述
Disruptor是一种高性能的并发编程框架,它由LMAX Exchange公司开发,并于2011年开源。Disruptor旨在解决传统多线程编程中的性能瓶颈和并发问题,特别适用于需要高度并发处理的场景。
Disruptor采用了一种称为"无锁编程"的机制,通过使用环形缓冲区(Ring Buffer)和事件驱动的方式实现高效的消息传递和处理。它的核心思想是将消息(事件)在生产者和消费者之间进行无锁的、高效的交换,以减少线程间的竞争和上下文切换。
在Disruptor中,所有的事件都被放置在一个环形缓冲区中,生产者将事件写入缓冲区,而消费者则从缓冲区中读取事件进行处理。这种设计避免了线程间的锁竞争,使得多个线程可以同时进行读写操作,从而提高了整体的处理能力。
Github主页
https://github.com/LMAX-Exchange/disruptor
核心组件
Disruptor的核心组件包括:
- Ring Buffer(环形缓冲区):它是Disruptor的核心数据结构,用于存储事件。环形缓冲区的大小是固定的,所有的事件都按照顺序存储在缓冲区中,生产者和消费者可以通过索引来读写事件。
- Sequence Barrier(序列屏障):它是用于控制事件的发布和消费顺序的屏障。生产者在发布事件之前必须等待所有消费者的序列达到某个特定值,而消费者在消费事件之前必须等待生产者的序列达到某个特定值。
- Event Processor(事件处理器):它是Disruptor中的消费者,用于处理事件。每个消费者都会维护一个序列,表示它已经处理过的事件位置,消费者可以并行地处理事件。
Disruptor的使用方式相对复杂,需要开发者熟悉其核心概念和编程模型。但是,一旦正确使用,Disruptor可以显著提高并发处理的性能,并减少线程间的竞争和开销。它在高性能领域,如金融交易系统、消息队列等场景中得到了广泛的应用。
概念解读
Ring Buffer
Ring Buffer(环形缓冲区)在Disruptor框架中起着关键作用,它是用来存储事件(消息)的数据结构。环形缓冲区的作用主要有以下几个方面:
- 存储事件:环形缓冲区提供了一个固定大小的存储空间,用于存放生产者产生的事件。生产者将事件写入环形缓冲区的空闲位置,而不需要进行锁操作或等待其他线程的释放,从而避免了锁竞争的开销。
- 支持并发读写:环形缓冲区的结构支持多个生产者和多个消费者同时进行读写操作。这意味着多个线程可以并发地向环形缓冲区写入事件或从中读取事件,提高了整体的处理能力和并发性能。
- 解耦生产者和消费者:环形缓冲区作为生产者和消费者之间的中间介质,实现了生产者和消费者的解耦。生产者将事件写入缓冲区后,可以继续进行其他操作,而不需要等待消费者的处理。消费者则可以根据自身的处理能力和节奏从缓冲区中读取事件进行处理。
- 循环利用空间:环形缓冲区的大小是固定的,并且采用循环的方式利用空间。当缓冲区的末尾被写满后,新的事件会从缓冲区的开头重新写入,实现了对空间的循环利用。这样可以避免频繁地进行内存分配和释放,提高了系统的效率和性能。
总的来说,环形缓冲区在Disruptor中扮演了存储和传递事件的角色,它通过无锁的方式实现高效的并发读写操作,并提供了解耦和循环利用空间的特性,从而在高性能的并发编程场景中发挥重要作用。
Sequence Disruptor
在 Disruptor 中,Sequence(序列)是一种用于跟踪生产者和消费者处理事件的进度的机制。Sequence Disruptor 是指使用 Sequence 来优化 Disruptor 的一种方式。
Sequence Disruptor 的作用主要有以下几个方面:
- 解决数据依赖:在 Disruptor 中,消费者处理事件的速度通常比生产者产生事件的速度快。为了保证事件的正确处理顺序,消费者必须等待生产者生产的事件,这会导致一定程度的阻塞。使用 Sequence Disruptor 可以解决这个问题,通过让消费者根据生产者的进度决定是否处理事件,避免了不必要的阻塞。
- 提高并发性能:Sequence Disruptor 可以在多个消费者之间实现更细粒度的任务划分。每个消费者通过维护自己的 Sequence,表示自己已经处理过的事件位置,可以独立地处理未被其他消费者处理的事件,从而提高了并发性能。
- 避免竞争和锁:使用 Sequence 来控制事件的处理顺序避免了多线程之间的竞争和锁操作。每个消费者都通过自己的 Sequence 来判断是否可以处理某个事件,而不需要使用传统的锁机制,减少了线程间的竞争,提高了系统的性能。
- 确保顺序性:在某些应用场景中,事件的处理顺序非常重要,Sequence Disruptor 可以保证事件的处理顺序与生产者的生产顺序一致,确保了数据的准确性和一致性。
总的来说,Sequence Disruptor 通过使用 Sequence 来解决数据依赖、提高并发性能、避免竞争和锁以及确保顺序性,从而优化了 Disruptor 的性能和效率,在高性能并发编程场景中得到广泛应用。
它是如何解决CPU缓存伪共享问题的
Sequence Disruptor 在一定程度上可以缓解 CPU 缓存伪共享(Cache False Sharing)问题。CPU 缓存伪共享是指多个线程同时访问并修改不同但共享同一缓存行(Cache Line)的数据,由于缓存一致性协议的原因,会导致频繁的缓存无效和刷新操作,从而降低了多线程并发执行的性能。
在 Disruptor 中,每个消费者通过维护自己的 Sequence 来表示已经处理过的事件位置。Sequence 通常会被不同的线程同时访问和修改,因此可能存在缓存伪共享问题。为了减少缓存伪共享带来的性能影响,Sequence Disruptor 可以采用以下策略:
- 缓存对齐(Cache Alignment):在实现 Sequence 的数据结构时,可以使用编程技巧来保证不同 Sequence 的实例位于不同的缓存行中,避免了多个 Sequence 位于同一缓存行的情况,从而减少了缓存伪共享的可能性。
- Padded Padding):在 Sequence 的数据结构中,可以使用填充字段(Padding)来确保每个 Sequence 之间有足够的空间,避免它们共享同一缓存行。填充字段可以增加数据结构的大小,但可以有效地避免缓存伪共享问题。
通过采用上述策略,Sequence Disruptor 可以将不同 Sequence 实例之间的数据在缓存中互相隔离,减少了缓存伪共享的概率。这样一来,当不同的线程对不同的 Sequence 进行操作时,它们访问的数据通常位于不同的缓存行,减少了缓存无效和刷新操作,提高了多线程并发执行的性能。
需要注意的是,虽然 Sequence Disruptor 可以缓解 CPU 缓存伪共享问题,但并不能完全消除。在特定的硬件和应用场景下,仍可能存在缓存伪共享带来的性能影响。因此,在实际使用中,仍需综合考虑其他优化技术和硬件特性,以最大程度地减少缓存伪共享的影响。
Sequencer
Sequencer(序列器)是 Disruptor 框架中的核心组件之一,它在生产者和消费者之间起着关键的协调作用。Sequencer 的主要作用如下:
- 控制事件发布顺序:Sequencer 确保生产者发布事件的顺序,并为每个事件分配递增的序列号。生产者通过 Sequencer 将事件写入环形缓冲区,并且保证事件按照顺序被消费者处理。这样可以确保事件的顺序性,避免乱序或错乱的情况发生。
- 确保消费者进度:Sequencer 跟踪消费者的处理进度,即消费者处理事件的序列号。通过跟踪最小消费者序列号和最大消费者序列号,Sequencer 可以确保所有消费者都已经处理了指定序列号之前的所有事件。这种机制保证了事件处理的完整性,消费者不会丢失或跳过任何事件。
- 提供等待策略:Sequencer 提供了多种等待策略,用于控制生产者和消费者之间的协调和等待行为。等待策略可以根据系统的需求进行配置,例如自旋等待、阻塞等待、超时等待等,以最大程度地平衡生产者和消费者之间的速度差异。
- 支持批量操作:Sequencer 还支持批量操作,即生产者和消费者可以一次性处理多个事件。通过指定批量大小,生产者可以连续写入多个事件,而消费者可以一次性处理多个事件,从而减少了线程间的通信开销和上下文切换。
此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
总的来说,Sequencer 在 Disruptor 框架中起着控制事件发布顺序、确保消费者进度、提供等待策略和支持批量操作等关键作用。它通过协调生产者和消费者之间的行为,保证事件的顺序和完整性,并提高了并发处理的效率和性能。
Sequence Barrier
Sequence Barrier(序列屏障)是 Disruptor 框架中的另一个重要组件,用于控制事件的发布和消费顺序,确保消费者在处理事件时按照正确的顺序进行。
Sequence Barrier 有两个主要作用:
- 控制事件发布顺序:在 Disruptor 中,生产者通过 Sequencer 将事件写入环形缓冲区,并分配递增的序列号。而消费者通过 Sequence 来表示自己已经处理的事件位置。Sequence Barrier 用于控制消费者可以读取的事件的范围。每个消费者都要在 Sequence Barrier 上等待,直到它们的序列号达到 Sequence Barrier 的值,才能继续处理事件。这样确保了消费者按照正确的顺序处理事件,避免了乱序和错乱。
- 避免事件丢失和覆盖:通过 Sequence Barrier 的等待机制,确保了所有消费者都已经处理了指定序列号之前的所有事件。这样可以防止事件的丢失或覆盖,消费者不会跳过未处理的事件,从而保证了数据的完整性。
Sequence Barrier 可以看作是一个事件的处理控制点,它定义了消费者可以读取事件的边界。当消费者在 Sequence Barrier 上等待时,如果事件的序列号还没有达到 Sequence Barrier 的值,消费者将被阻塞,直到满足条件。
Disruptor 提供了不同的 Sequence Barrier 实现,以支持不同的等待策略,如自旋等待、阻塞等待、超时等待等。开发者可以根据具体的场景需求选择合适的等待策略,以平衡生产者和消费者之间的速度差异,实现高效的事件处理。
Wait Strategy
等待策略(Wait Strategy)是 Disruptor 框架中用于控制消费者在没有可用事件时的等待行为的策略。
在 Disruptor 中,当消费者无法获取到新的可用事件时,需要采取适当的等待策略,以平衡生产者和消费者之间的速度差异,避免资源浪费和性能下降。等待策略定义了消费者在等待期间的行为,包括等待的方式、时间间隔等。
Disruptor 提供了多种等待策略可供选择,常见的等待策略包括:
- BlockingWaitStrategy:阻塞等待策略,在消费者没有可用事件时,将会一直阻塞等待,直到有新的事件可用。这种策略可以保证消费者尽快得到新的事件,但会占用系统的线程资源。
- SleepingWaitStrategy:休眠等待策略,在消费者没有可用事件时,会定期进行短暂的休眠,然后再检查是否有新的事件可用。通过休眠可以释放 CPU 资源,减少资源占用,但会增加事件处理的延迟。
- YieldingWaitStrategy:让步等待策略,在消费者没有可用事件时,会调用线程的 yield() 方法,将 CPU 时间让给其他线程。这种策略可以减少资源占用,并且具有一定的自旋效果,但也可能会导致 CPU 自旋消耗较高。
- BusySpinWaitStrategy:忙等待策略,在消费者没有可用事件时,会进行自旋等待,不进行休眠或让步。这种策略可以实现最低的延迟,但也会持续消耗 CPU 资源。
等待策略的选择应根据具体应用场景和需求进行权衡。例如,对于对低延迟和高吞吐量要求较高的场景,可以选择 BusySpinWaitStrategy 等策略;而对于对系统资源占用要求较高的场景,可以选择 SleepingWaitStrategy 或 YieldingWaitStrategy 等策略。
需要注意的是,选择合适的等待策略是优化 Disruptor 性能的重要因素之一,不同的等待策略在不同的硬件和应用场景下可能会产生不同的效果,开发者需要根据具体情况进行测试和评估。
Event
在 Disruptor 中,Event(事件)是传递的数据单元,它是实际需要在生产者和消费者之间传递的信息。Disruptor 是一个高性能的并发框架,它主要用于在多个线程之间高效地传递事件(Event),以实现高并发处理。
在 Disruptor 中,事件通常是一个简单的数据结构,例如一个包含若干字段的 Java 对象。在创建 Disruptor 时,需要预先定义事件的数据结构,并为其分配内存空间。然后,生产者可以在这块预分配的内存中填充事件的数据,而消费者则可以从中读取和处理事件的数据。
事件的数据结构通常由应用程序开发者根据具体业务需求定义,可以根据实际情况包含所需的字段和信息。例如,如果 Disruptor 用于实现实时交易系统,事件可能包含交易时间、交易类型、交易金额等字段;如果 Disruptor 用于处理日志信息,事件可能包含日志记录时间、日志内容等字段。
在 Disruptor 的环形缓冲区中,事件被生产者写入并按序存储。然后,消费者从环形缓冲区中读取事件,并进行相应的处理。通过合理地定义事件的数据结构,可以实现高效的并发处理,避免线程之间的竞争和锁操作,从而提高系统的性能和吞吐量。
总的来说,Event 在 Disruptor 中代表着实际需要传递的数据单元,它是 Disruptor 实现高性能并发处理的基础。通过合理定义和使用事件,可以充分发挥 Disruptor 框架的优势,实现高效、低延迟的事件处理。
Event Processor
在 Disruptor 框架中,EventProcessor(事件处理器)是消费者(Consumer)的抽象表示,用于处理从环形缓冲区中读取的事件(Event)。
EventProcessor 主要有两个重要的子类:
BatchEventProcessor:这是最常见的事件处理器类型。它负责从环形缓冲区中读取事件,并将事件批量地传递给一个或多个 EventHandlers(事件处理器)。BatchEventProcessor 会维护一个 Sequence,用于跟踪其处理的事件序列号,以及一个 SequenceBarrier,用于控制其和其他消费者的事件处理顺序。
WorkProcessor:这是另一种事件处理器类型,与 BatchEventProcessor 不同,WorkProcessor 只会将事件传递给单个 WorkHandler(工作处理器)。WorkProcessor 在处理事件时不需要维护 Sequence,因为它只会处理来自 SequenceBarrier 的单个事件。
EventProcessor 的主要作用是将 Disruptor 中的事件从环形缓冲区中读取出来,并将其传递给相应的事件处理器进行处理。这样,Disruptor 可以将事件的生产和消费解耦,实现高效的并发处理。
需要注意的是,Disruptor 中的事件处理器是单线程的,每个 EventProcessor 都会在自己的线程中独立执行。这样可以避免多个消费者之间的竞争和锁操作
EventHandler
EventHandler(事件处理器)是 Disruptor 框架中用于处理事件的实际组件,它实现了对事件的具体处理逻辑。
在 Disruptor 中,EventHandler 是 EventProcessor 的一种具体实现。每个 EventHandler 都会被绑定到一个或多个 EventProcessor 上,负责实际处理从环形缓冲区中读取到的事件。
EventHandler 的主要作用如下:
- 事件处理逻辑:EventHandler 实现了具体的事件处理逻辑。当 EventProcessor 从环形缓冲区读取到事件时,会将事件传递给 EventHandler 进行处理。EventHandler 可以根据事件的数据和业务逻辑,执行相应的处理操作。
- 并发处理:Disruptor 框架中可以有多个 EventHandler,每个 EventHandler 在不同的线程中独立执行。这样可以实现事件的并发处理,提高系统的吞吐量和性能。
- 事件依赖处理:在 Disruptor 中,可以将多个 EventHandler 进行组合,形成事件处理链。每个 EventHandler 处理完事件后,将事件传递给下一个 EventHandler 进行处理。这样可以实现事件的依赖处理,将复杂的业务逻辑拆分为多个独立的 EventHandler 进行处理。
- 状态管理:EventHandler 可以维护自己的状态,用于跟踪和管理事件处理过程中的状态信息。它可以记录已处理的事件数量、统计数据、错误处理等,以支持事件处理的状态管理和监控。
通过定义和实现 EventHandler,可以将事件处理逻辑与 Disruptor 框架进行结合,实现高效、可扩展的事件处理。EventHandler 应该根据具体的业务需求进行设计和实现,确保处理逻辑正确、高效,并满足系统的性能要求。
总结起来,EventHandler 是 Disruptor 框架中用于处理事件的组件,负责实现具体的事件处理逻辑。通过并发处理、事件依赖和状态管理,EventHandler 可以实现高效、可扩展的事件处理,提高系统的吞吐量和性能。
Producer
在 Disruptor 框架中,Producer(生产者)是指负责向环形缓冲区(Ring Buffer)中发布事件(Event)的组件或角色。
生产者的主要任务是将事件写入环形缓冲区,使得消费者能够读取和处理这些事件。生产者通常与一个或多个 EventProcessor 绑定,以确保事件按正确的顺序发布和消费。
以下是生产者的一些关键概念和作用:
- 写入事件:生产者负责将事件数据写入环形缓冲区。它将事件填充到缓冲区的空闲位置,并更新环形缓冲区的序列号,表示已经写入的事件的位置。
- 序列号分配:生产者使用 Sequencer(顺序器)来分配事件的序列号。Sequencer 确保生产者写入事件的顺序,并为每个事件分配唯一的序列号。消费者通过序列号来确定事件的处理顺序。
- 批量写入:生产者可以选择批量写入事件,即一次性写入多个事件。这可以减少写入操作的开销,并提高性能。Disruptor 提供了支持批量写入的方法,例如批量发布(publishEvents)。
- 并发发布:生产者可以与其他生产者并发地向环形缓冲区发布事件。Disruptor 的并发模型允许多个生产者同时写入事件,而无需加锁或同步。这可以提高系统的并发性能。
- 等待策略:在环形缓冲区已满或无可用空间时,生产者可能需要等待。等待策略定义了生产者在等待期间的行为,例如阻塞等待、自旋等待或超时等待。
通过合理配置和使用生产者,可以实现高效、低延迟的事件发布。生产者负责将事件写入环形缓冲区,确保事件按正确的顺序发布和消费,从而实现高性能的并发事件处理。
Code
Step 0 . POM 依赖
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.4</version> </dependency>
Step 1 . 消息实体类
package com.artisan; import lombok.Data; /** * @author 小工匠 * @version 1.0 * @description: Step1 消息体Model * @date 2022/4/9 7:28 * @mark: show me the code , change the world */ @Data public class ArtisanMessage { private String message; }
Step 2 . 构建 EventFactory
package com.artisan; import com.lmax.disruptor.EventFactory; /** * @author 小工匠 * @version 1.0 * @description: Step2 构造EventFactory * @date 2022/4/9 7:28 * @mark: show me the code , change the world */ public class MyEventFactory implements EventFactory<ArtisanMessage> { @Override public ArtisanMessage newInstance() { return new ArtisanMessage(); } }
Step 3 构建 EventHandler(消费者)
package com.artisan; import com.lmax.disruptor.EventHandler; import lombok.extern.slf4j.Slf4j; /** * @author 小工匠 * @version 1.0 * @description: Step3 构造EventHandler-消费者 * @date 2022/4/9 7:31 * @mark: show me the code , change the world */ @Slf4j public class MyEventHandler implements EventHandler<ArtisanMessage> { @Override public void onEvent(ArtisanMessage event, long sequence, boolean endOfBatch) throws Exception { try { // 停止3000ms是为了模拟消费消息是异步的 Thread.sleep(3000); log.info("开始处理消息 biubiubiu"); if (event != null) { log.info("收到消息 :{} " ,event); } } catch (Exception e) { log.error("消费者处理消息失败,{}",e.getMessage()); } log.info("消费者处理消息结束"); } }
Step 4 . 构建Bean
package com.artisan; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author 小工匠 * @version 1.0 * @description: Step4 构造BeanManager * @date 2022/4/9 7:44 * @mark: show me the code , change the world */ @Component public class MyBeanManager implements ApplicationContextAware { private static ApplicationContext applicationContext = null; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } public static <T> T getBean(Class<T> clazz) { return applicationContext.getBean(clazz); } }
Step 5 . 构建MQManager
package com.artisan; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author 小工匠 * @version 1.0 * @description: Step5 构造MQManager * @date 2022/4/9 7:47 * @mark: show me the code , change the world */ @Configuration public class MQManager<T> { @Bean("messageRingBuffer") public RingBuffer<T> messageRingBuffer() { //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理 ExecutorService executor = Executors.newFixedThreadPool(2); //指定事件工厂 MyEventFactory factory = new MyEventFactory(); //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率 int bufferSize = 1024 * 256; //单线程模式,获取额外的性能 Disruptor<ArtisanMessage> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //设置事件业务处理器---消费者 disruptor.handleEventsWith(new MyEventHandler()); // 启动disruptor线程 disruptor.start(); //获取ringbuffer环,用于接取生产者生产的事件 RingBuffer<T> ringBuffer = (RingBuffer<T>) disruptor.getRingBuffer(); return ringBuffer; } }
Step 6 . 构建Mqservice和实现类-生产者
package com.artisan; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2022/4/9 7:49 * @mark: show me the code , change the world */ public interface DisruptorMQService { /** * 消息 * @param message */ void testSendMessage(String message); }
【实现类】
package com.artisan; import com.lmax.disruptor.RingBuffer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2022/4/9 7:50 * @mark: show me the code , change the world */ @Slf4j @Component @Service public class DisruptorServiceImpl implements DisruptorMQService { @Autowired private RingBuffer<ArtisanMessage> artisanMessageRingBuffer; @Override public void testSendMessage(String message) { log.info("将要发送的消息为: {} ", message); //获取下一个Event槽的下标 long sequence = artisanMessageRingBuffer.next(); try { //给Event填充数据 ArtisanMessage event = artisanMessageRingBuffer.get(sequence); event.setMessage(message); log.info("向消息队列中添加消息: {} ", event); } catch (Exception e) { log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage()); } finally { //发布Event,激活观察者去消费,将sequence传递给改消费者 //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer artisanMessageRingBuffer.publish(sequence); log.info("发布Event结束,等待消费...." ); } } }
Step 7 . 测试
我们写一个单测来跑一下
package com.artisan; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2022/4/9 7:55 * @mark: show me the code , change the world */ @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = DisruptorMQApplcaiton.class) public class DisruptorMQTest { @Autowired private DisruptorMQService disruptorMqService; /** * 项目内部使用Disruptor做消息队列 * * @throws Exception */ @Test public void sayHelloMqTest() throws Exception { disruptorMqService.testSendMessage("消息FROM DISRUPTOR!"); log.info("消息队列已发送完毕"); //这里停止1000ms是为了确定是处理消息是异步的 Thread.sleep(10000); } }