4. 使用Disruptor开发
4.1handleEventsWith,handleEventsWithWorkerPool方法的联系及区别
在disruptor框架调用start方法之前,往往需要将消息的消费者指定给disruptor框架。
常用的方法是:disruptor.handleEventsWith(EventHandler ... handlers),将多个EventHandler的实现类传入方法,封装成一个EventHandlerGroup,实现多消费者消费。
disruptor的另一个方法是:disruptor.handleEventsWithWorkerPool(WorkHandler ... handlers),将多个WorkHandler的实现类传入方法,封装成一个EventHandlerGroup实现多消费者消费。
两者共同点都是,将多个消费者封装到一起,供框架消费消息。
不同点在于,
4.1.1. 对于某一条消息m,handleEventsWith方法返回的EventHandlerGroup,Group中的每个消费者都会对m进行消费,各个消费者之间不存在竞争。handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消费者对于同一条消息m不重复消费;也就是,如果c0消费了消息m,则c1不再消费消息m。
4.1.2. 传入的形参不同。对于独立消费的消费者,应当实现EventHandler接口。对于不重复消费的消费者,应当实现WorkHandler接口。
因此,根据消费者集合是否独立消费消息,可以对不同的接口进行实现。也可以对两种接口同时实现,具体消费流程由disruptor的方法调用决定。
API示例
4.2应用场景
4.2.1车辆入场案例
入场后需要存入数据库,需要发送kafka消息,两步执行完后,给用户发送短信。代码实现如下:(经过自己实测单生产,单消费者的模式如果性能瓶颈在写入数据库那么引入disruptor也不能明显提高性能)
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; /** * 测试 P1生产消息,C1,C2消费消息,C1和C2会共享所有的event元素! C3依赖C1,C2处理结果 * @author lzhcode * */ public class Main { public static void main(String[] args) throws InterruptedException { long beginTime = System.currentTimeMillis(); //最好是2的n次方 int bufferSize = 1024; // Disruptor交给线程池来处理,共计 p1,c1,c2,c3四个线程 ExecutorService executor = Executors.newFixedThreadPool(4); // 构造缓冲区与事件生成 Disruptor<InParkingDataEvent> disruptor = new Disruptor<InParkingDataEvent>( new EventFactory<InParkingDataEvent>() { @Override public InParkingDataEvent newInstance() { return new InParkingDataEvent(); } }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 使用disruptor创建消费者组C1,C2 EventHandlerGroup<InParkingDataEvent> handlerGroup = disruptor.handleEventsWith(new ParkingDataToKafkaHandler(), new ParkingDataInDbHandler()); ParkingDataSmsHandler smsHandler = new ParkingDataSmsHandler(); // 声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 handlerGroup.then(smsHandler); disruptor.start();// 启动 CountDownLatch latch = new CountDownLatch(1); // 生产者准备 executor.submit(new InParkingDataEventPublisher(latch, disruptor)); latch.await();// 等待生产者结束 disruptor.shutdown(); executor.shutdown(); System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime)); } }
4.2.2 大文件内容hash后输入到小文件的案例
文件中存放50亿个url,每个url各占64字节,内存限制是4G。按照每个url64字节来算,每个文件有50亿个url,那么每个文件大小为5G*64=320G。320G远远超出内存限定的4G,分给四个线程分别处理,每个线程处理80G文件内容,单线程内需要循环处理20次。每次处理完后,根据url的hash值输出到对应小文件,然后进行下一次处理。
这样的方法有两个弊端:
同一个线程内,读写相互依赖,互相等待
不同线程可能争夺同一个输出文件,需要lock同步
于是改为如下方法,四个线程读取数据,计算hash值,将信息写入相应disruptor。每个线程对应disruptor的一个消费者,将disruptor中的信息落盘持久化(使用disruptor的多生产者单消费者模型)。对于四个读取线程(生产者)而言,只有读取文件操作,没有写文件操作,因此不存在读写互相依赖的问题。对于disruptor消费线程而言,只存在写文件操作,没有读文件,因此也不存在读写互相依赖的问题。同时disruptor的单消费者又很好的解决了多个线程互相竞争同一个文件的问题(disruptor的一个消费者是相当于一个线程),因此可以大大提高程序的吞吐率。
优化后的文件内容hash方案示意图
核心框架代码如下:
import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkerPool; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.ProducerType; /** * 多生产者多消费者模型 * @author lzhcode * */ public class Main { public static void main(String[] args) throws InterruptedException { //1 创建RingBuffer RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Order>() { public Order newInstance() { return new Order(); } }, 1024*1024, new YieldingWaitStrategy()); //2 通过ringBuffer 创建一个屏障 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //3 创建一个消费者: Consumer consumer = new Consumer; //4 构建多消费者工作池 WorkerPool<Order> workerPool = new WorkerPool<Order>( ringBuffer, sequenceBarrier, new EventExceptionHandler(), consumer ); //5 设置消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); //6 启动workerPool workerPool .start(Executors.newFixedThreadPool(5)); final CountDownLatch latch = new CountDownLatch(1); for(int i = 0; i < 100; i++) { final Producer producer = new Producer(ringBuffer); new Thread(new Runnable() { public void run() { try { latch.await(); } catch (Exception e) { e.printStackTrace(); } for(int j = 0; j<100; j++) { producer.sendData(UUID.randomUUID().toString()); } } }).start(); } Thread.sleep(2000); System.err.println("----------线程创建完毕,开始生产数据----------"); latch.countDown(); Thread.sleep(10000); System.err.println("任务总数:" + consumers[2].getCount()); } static class EventExceptionHandler implements ExceptionHandler<Order> { public void handleEventException(Throwable ex, long sequence, Order event) { } public void handleOnStartException(Throwable ex) { } public void handleOnShutdownException(Throwable ex) { } } }
4.2.3 Netty整合并发编程框架Disruptor实战百万长链接服务构建案例
对于一个server,我们一般考虑他所能支撑的qps,但有那么一种应用, 我们需要关注的是它能支撑的连接数个数,而并非qps,当然qps也是我们需要考虑的性能点之一。这种应用常见于消息推送系统,比如聊天室或即时消息推送系统等。c对于这类系统,因为很多消息需要到产生时才推送给客户端,所以当没有消 息产生时,就需要hold住客户端的连接,这样,当有大量的客户端时,就需要hold住大量的连接,这种连接我们称为长连接。
首先,我们分析一下,对于这类服务,需消耗的系统资源有:cpu、网络、内存。所以,想让系统性能达到最佳,我们先找到系统的瓶颈所在。这样的长连 接,往往我们是没有数据发送的,所以也可以看作为非活动连接。对于系统来说,这种非活动连接,并不占用cpu与网络资源,而仅仅占用系统的内存而已。所以,我们假想,只要系统内存足够,系统就能够支持我们想达到的连接数,那么事实是否真的如此?
在 Linux 内核配置上,默认的配置会限制全局最大打开文件数(Max Open Files)还会限制进程数。 所以需要对 Linux 内核配置进行一定的修改才可以。具体如何修改这里不做讨论
java 中用的是非阻塞 IO(NIO 和 AIO 都算),那么它们都可以用单线程来实现大量的 Socket 连接。 不会像 BIO 那样为每个连接创建一个线程,因为代码层面不会成为瓶颈,最主要的是把业务代码用disruptor来进行解耦
TCP握手原理和Netty线程组的组合
构建Netty高性能核心架构图(集成Disruptor)
参考代码:
https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-netty-server
https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-netty-com
https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-netty-client