并发编程之Disruptor框架介绍和高阶运用(二)

简介: 并发编程之Disruptor框架介绍和高阶运用

4.  使用Disruptor开发

4.1handleEventsWith,handleEventsWithWorkerPool方法的联系及区别

在disruptor框架调用start方法之前,往往需要将消息的消费者指定给disruptor框架。

常用的方法是:disruptor.handleEventsWith(EventHandler ... handlers),将多个EventHandler的实现类传入方法,封装成一个EventHandlerGroup,实现多消费者消费。

disruptor的另一个方法是:disruptor.handleEventsWithWorkerPool(WorkHandler ... handlers),将多个WorkHandler的实现类传入方法,封装成一个EventHandlerGroup实现多消费者消费。

两者共同点都是,将多个消费者封装到一起,供框架消费消息。

不同点在于,

4.1.1. 对于某一条消息m,handleEventsWith方法返回的EventHandlerGroup,Group中的每个消费者都会对m进行消费,各个消费者之间不存在竞争。handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消费者对于同一条消息m不重复消费;也就是,如果c0消费了消息m,则c1不再消费消息m。

4.1.2. 传入的形参不同。对于独立消费的消费者,应当实现EventHandler接口。对于不重复消费的消费者,应当实现WorkHandler接口

因此,根据消费者集合是否独立消费消息,可以对不同的接口进行实现。也可以对两种接口同时实现,具体消费流程由disruptor的方法调用决定。

API示例

https://www.cnblogs.com/pku-liuqiang/p/8544700.html

4.2应用场景

4.2.1车辆入场案例

入场后需要存入数据库,需要发送kafka消息,两步执行完后,给用户发送短信。代码实现如下:(经过自己实测单生产,单消费者的模式如果性能瓶颈在写入数据库那么引入disruptor也不能明显提高性能)

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
/**
 * 测试 P1生产消息,C1,C2消费消息,C1和C2会共享所有的event元素! C3依赖C1,C2处理结果
 * @author lzhcode
 *
 */
public class Main {
  public static void main(String[] args) throws InterruptedException {
    long beginTime = System.currentTimeMillis();
    //最好是2的n次方
    int bufferSize = 1024;
    // Disruptor交给线程池来处理,共计 p1,c1,c2,c3四个线程
    ExecutorService executor = Executors.newFixedThreadPool(4);
    // 构造缓冲区与事件生成
    Disruptor<InParkingDataEvent> disruptor = new Disruptor<InParkingDataEvent>(
        new EventFactory<InParkingDataEvent>() {
          @Override
          public InParkingDataEvent newInstance() {
            return new InParkingDataEvent();
          }
        }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
    // 使用disruptor创建消费者组C1,C2
    EventHandlerGroup<InParkingDataEvent> handlerGroup = disruptor.handleEventsWith(new ParkingDataToKafkaHandler(),
        new ParkingDataInDbHandler());
    ParkingDataSmsHandler smsHandler = new ParkingDataSmsHandler();
    // 声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3
    handlerGroup.then(smsHandler);
    disruptor.start();// 启动
    CountDownLatch latch = new CountDownLatch(1);
    // 生产者准备
    executor.submit(new InParkingDataEventPublisher(latch, disruptor));
    latch.await();// 等待生产者结束
    disruptor.shutdown();
    executor.shutdown();
    System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
  }
}

完整代码 https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-api/src/main/java/com/lzhsite/disruptor/quickstart/inParking

4.2.2 大文件内容hash后输入到小文件的案例

文件中存放50亿个url,每个url各占64字节,内存限制是4G。按照每个url64字节来算,每个文件有50亿个url,那么每个文件大小为5G*64=320G。320G远远超出内存限定的4G,分给四个线程分别处理,每个线程处理80G文件内容,单线程内需要循环处理20次。每次处理完后,根据url的hash值输出到对应小文件,然后进行下一次处理。

这样的方法有两个弊端:

同一个线程内,读写相互依赖,互相等待

不同线程可能争夺同一个输出文件,需要lock同步

于是改为如下方法,四个线程读取数据,计算hash值,将信息写入相应disruptor。每个线程对应disruptor的一个消费者,将disruptor中的信息落盘持久化(使用disruptor的多生产者单消费者模型)。对于四个读取线程(生产者)而言,只有读取文件操作,没有写文件操作,因此不存在读写互相依赖的问题。对于disruptor消费线程而言,只存在写文件操作,没有读文件,因此也不存在读写互相依赖的问题。同时disruptor的单消费者又很好的解决了多个线程互相竞争同一个文件的问题(disruptor的一个消费者是相当于一个线程),因此可以大大提高程序的吞吐率。

优化后的文件内容hash方案示意图

核心框架代码如下:

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
/**
 * 多生产者多消费者模型
 * @author lzhcode
 *
 */
public class Main {
  public static void main(String[] args) throws InterruptedException {
    //1 创建RingBuffer
    RingBuffer<Order> ringBuffer =
        RingBuffer.create(ProducerType.MULTI,
            new EventFactory<Order>() {
              public Order newInstance() {
                return new Order();
              }
            },
            1024*1024,
            new YieldingWaitStrategy());
    //2 通过ringBuffer 创建一个屏障
    SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    //3 创建一个消费者:
    Consumer consumer = new Consumer;
    //4 构建多消费者工作池
    WorkerPool<Order> workerPool = new WorkerPool<Order>(
        ringBuffer,
        sequenceBarrier,
        new EventExceptionHandler(),
        consumer );
    //5 设置消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中
    ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
    //6 启动workerPool
    workerPool
    .start(Executors.newFixedThreadPool(5));
    final CountDownLatch latch = new CountDownLatch(1);
    for(int i = 0; i < 100; i++) {
      final Producer producer = new Producer(ringBuffer);
      new Thread(new Runnable() {
        public void run() {
          try {
            latch.await();
          } catch (Exception e) {
            e.printStackTrace();
          }
          for(int j = 0; j<100; j++) {
            producer.sendData(UUID.randomUUID().toString());
          }
        }
      }).start();
    }
    Thread.sleep(2000);
    System.err.println("----------线程创建完毕,开始生产数据----------");
    latch.countDown();
    Thread.sleep(10000);
    System.err.println("任务总数:" + consumers[2].getCount());
  }
  static class EventExceptionHandler implements ExceptionHandler<Order> {
    public void handleEventException(Throwable ex, long sequence, Order event) {
    }
    public void handleOnStartException(Throwable ex) {
    }
    public void handleOnShutdownException(Throwable ex) {
    }
  }
}

4.2.3 Netty整合并发编程框架Disruptor实战百万长链接服务构建案例

对于一个server,我们一般考虑他所能支撑的qps,但有那么一种应用, 我们需要关注的是它能支撑的连接数个数,而并非qps,当然qps也是我们需要考虑的性能点之一。这种应用常见于消息推送系统,比如聊天室或即时消息推送系统等。c对于这类系统,因为很多消息需要到产生时才推送给客户端,所以当没有消 息产生时,就需要hold住客户端的连接,这样,当有大量的客户端时,就需要hold住大量的连接,这种连接我们称为长连接。

首先,我们分析一下,对于这类服务,需消耗的系统资源有:cpu、网络、内存。所以,想让系统性能达到最佳,我们先找到系统的瓶颈所在。这样的长连 接,往往我们是没有数据发送的,所以也可以看作为非活动连接。对于系统来说,这种非活动连接,并不占用cpu与网络资源,而仅仅占用系统的内存而已。所以,我们假想,只要系统内存足够,系统就能够支持我们想达到的连接数,那么事实是否真的如此?

在 Linux 内核配置上,默认的配置会限制全局最大打开文件数(Max Open Files)还会限制进程数。 所以需要对 Linux 内核配置进行一定的修改才可以。具体如何修改这里不做讨论

java 中用的是非阻塞 IO(NIO 和 AIO 都算),那么它们都可以用单线程来实现大量的 Socket 连接。 不会像 BIO 那样为每个连接创建一个线程,因为代码层面不会成为瓶颈,最主要的是把业务代码用disruptor来进行解耦

TCP握手原理和Netty线程组的组合

构建Netty高性能核心架构图(集成Disruptor)

参考代码:

https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-netty-server

https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-netty-com

https://gitee.com/lzhcode/maven-parent/tree/master/lzh-disruptor/lzh-disruptor-netty-client

相关文章
|
11月前
|
存储 缓存 算法
并发编程系列教程(12) - Disruptor框架
并发编程系列教程(12) - Disruptor框架
92 0
|
存储 缓存 JavaScript
深入浅出 RxJS 核心原理(响应式编程篇)
在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。
329 0
深入浅出 RxJS 核心原理(响应式编程篇)
|
5月前
|
存储 关系型数据库 MySQL
纯c协程框架NtyCo实现与原理
纯c协程框架NtyCo实现与原理
140 1
|
消息中间件 存储 缓存
|
前端开发 Java Maven
响应式编程实战(08)-WebFlux,使用注解编程模式构建异步非阻塞服务
响应式编程实战(08)-WebFlux,使用注解编程模式构建异步非阻塞服务
160 0
|
存储 Java
并发编程(十)线程池核心原理与源码剖析
并发编程(十)线程池核心原理与源码剖析
110 0
并发编程(十二)ForkJoin框架使用
并发编程(十二)ForkJoin框架使用
84 0
|
JavaScript 前端开发 Java
响应式编程简介之:Reactor
响应式编程简介之:Reactor
响应式编程简介之:Reactor