spring-boot项目整合Disruptor的初步使用

简介: Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。

1.在项目的pom文件中配置

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>r09</version>
        </dependency>

2.创建BaseQueueHelper

/**
 * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
 *
 * 调用init()时才真正启动线程开始处理 系统退出自动清理资源.

 */
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
 
    /**
     * 记录所有的队列,系统退出时统一清理资源
     */
    private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
    /**
     * Disruptor 对象
     */
    private Disruptor<E> disruptor;
    /**
     * RingBuffer
     */
    private RingBuffer<E> ringBuffer;
    /**
     * initQueue
     */
    private List<D> initQueue = new ArrayList<D>();
 
    /**
     * 队列大小
     *
     * @return 队列长度,必须是2的幂
     */
    protected abstract int getQueueSize();
 
    /**
     * 事件工厂
     *
     * @return EventFactory
     */
    protected abstract EventFactory<E> eventFactory();
 
    /**
     * 事件消费者
     *
     * @return WorkHandler[]
     */
    protected abstract WorkHandler[] getHandler();
 
    /**
     * 初始化
     */
    public void init() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
        disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
        disruptor.setDefaultExceptionHandler(new MyHandlerException());
        disruptor.handleEventsWithWorkerPool(getHandler());
        ringBuffer = disruptor.start();
 
        //初始化数据发布
        for (D data : initQueue) {
            ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
                @Override
                public void translateTo(E event, long sequence, D data) {
                    event.setValue(data);
                }
            }, data);
        }
 
        //加入资源清理钩子
        synchronized (queueHelperList) {
            if (queueHelperList.isEmpty()) {
                Runtime.getRuntime().addShutdownHook(new Thread() {
                    @Override
                    public void run() {
                        for (BaseQueueHelper baseQueueHelper : queueHelperList) {
                            baseQueueHelper.shutdown();
                        }
                    }
                });
            }
            queueHelperList.add(this);
        }
    }
 
    /**
     * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
     * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
     *
     * @return WaitStrategy
     */
    protected abstract WaitStrategy getStrategy();
 
    /**
     * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
     */
    public synchronized void publishEvent(D data) {
        if (ringBuffer == null) {
            initQueue.add(data);
            return;
        }
        ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
            @Override
            public void translateTo(E event, long sequence, D data) {
                event.setValue(data);
            }
        }, data);
    }
 
    /**
     * 关闭队列
     */
    public void shutdown() {
        disruptor.shutdown();
    }
}

3.创建MyHandlerException


public class MyHandlerException implements ExceptionHandler {

    private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);

    /*
     * (non-Javadoc) 运行过程中发生时的异常
     *
     * @see
     * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
     * , long, java.lang.Object)
     */
    @Override
    public void handleEventException(Throwable ex, long sequence, Object event) {
        ex.printStackTrace();
        logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
    }

    /*
     * (non-Javadoc) 启动时的异常
     *
     * @see
     * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
     * Throwable)
     */
    @Override
    public void handleOnStartException(Throwable ex) {
        logger.error("start disruptor error ==[{}]!", ex.getMessage());
    }

    /*
     * (non-Javadoc) 关闭时的异常
     *
     * @see
     * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
     * .Throwable)
     */
    @Override
    public void handleOnShutdownException(Throwable ex) {
        logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
    }

}

4.创建ValueWrapper

public abstract class ValueWrapper<T> {
 
    private T value;
 
    public ValueWrapper() {}
 
    public ValueWrapper(T value) {
        this.value = value;
    }
 
    public T getValue() {
        return value;
    }
 
    public void setValue(T value) {
        this.value = value;
    }
}

5.创建EventFactory

public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
 
 
    @Override
    public SeriesDataEvent newInstance() {
        return new SeriesDataEvent();
    }
}
  1. 创建DisruptorConfig
@Configuration
@ComponentScan(value = {"com.demo.disruptor"})
public class DisruptorConfig {
 
    /**
     * smsParamEventHandler1
     *
     * @return SeriesDataEventHandler
     */
    @Bean
    public SeriesDataEventHandler smsParamEventHandler1() {
        return new SeriesDataEventHandler();
    }
 
    /**
     * smsParamEventHandler2
     *
     * @return SeriesDataEventHandler
     */
    @Bean
    public SeriesDataEventHandler smsParamEventHandler2() {
        return new SeriesDataEventHandler();
    }
 
    /**
     * smsParamEventHandler3
     *
     * @return SeriesDataEventHandler
     */
    @Bean
    public SeriesDataEventHandler smsParamEventHandler3() {
        return new SeriesDataEventHandler();
    }
 
 
    /**
     * smsParamEventHandler4
     *
     * @return SeriesDataEventHandler
     */
    @Bean
    public SeriesDataEventHandler smsParamEventHandler4() {
        return new SeriesDataEventHandler();
    }
 
    /**
     * smsParamEventHandler5
     *
     * @return SeriesDataEventHandler
     */
    @Bean
    public SeriesDataEventHandler smsParamEventHandler5() {
        return new SeriesDataEventHandler();
    }


    /**
     * smsParamEventHandler5
     *
     * @return SeriesDataEventHandler
     */
    @Bean
    public SeriesDataEventHandler smsParamEventHandler6() {
        return new SeriesDataEventHandler();
    }
 
 
}

7.创建SeriesData

public class SeriesData {
 
    private String deviceInfoStr;
 
    public SeriesData() {
    }
 
    public SeriesData(String deviceInfoStr) {
        this.deviceInfoStr = deviceInfoStr;
    }
 
    public String getDeviceInfoStr() {
        return deviceInfoStr;
    }
 
    public void setDeviceInfoStr(String deviceInfoStr) {
        this.deviceInfoStr = deviceInfoStr;
    }
 
    @Override
    public String toString() {
        return "SeriesData{" +
                "deviceInfoStr='" + deviceInfoStr + '\'' +
                '}';
    }
}

8.创建SeriesDataEvent

public class SeriesDataEvent extends ValueWrapper<SeriesData> {
 
 
}

9.创建SeriesDataEventQueueHelper

@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
 
    private static final int QUEUE_SIZE = 1024;
 
    @Autowired
    private List<SeriesDataEventHandler> seriesDataEventHandler;
 
 
    @Override
    protected int getQueueSize() {
        return QUEUE_SIZE;
    }
 
    @Override
    protected com.lmax.disruptor.EventFactory eventFactory() {
        return new EventFactory();
    }
 
    @Override
    protected WorkHandler[] getHandler() {
        int size = seriesDataEventHandler.size();
        SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
        return paramEventHandlers;
    }
 
    @Override
    protected WaitStrategy getStrategy() {
        return new BlockingWaitStrategy();
        //return new YieldingWaitStrategy();
    }
 
    @Override
    public void afterPropertiesSet() throws Exception {
        this.init();
    }
}

10.创建SeriesDataEventHandler

public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {

    private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
    @Autowired
    private SocketService socketService;
    @Override
    public void onEvent(SeriesDataEvent event) {
        if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
            logger.warn("receiver series data is empty!");
        }
         logger.error("hello word!");
    }

}
 

11.使用操作

@Autowired
    private SeriesDataEventQueueHelper seriesDataEventQueueHelper;
    @Test
    public void demo(){
        seriesDataEventQueueHelper.publishEvent(new SeriesData("hello word"));
    }

12.后面推出Disruptor的各种操作

相关文章
|
1月前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
42 2
|
5月前
|
Java 测试技术 数据库
Spring Boot中的项目属性配置
本节课主要讲解了 Spring Boot 中如何在业务代码中读取相关配置,包括单一配置和多个配置项,在微服务中,这种情况非常常见,往往会有很多其他微服务需要调用,所以封装一个配置类来接收这些配置是个很好的处理方式。除此之外,例如数据库相关的连接参数等等,也可以放到一个配置类中,其他遇到类似的场景,都可以这么处理。最后介绍了开发环境和生产环境配置的快速切换方式,省去了项目部署时,诸多配置信息的修改。
|
2月前
|
设计模式 前端开发 Java
Spring MVC——项目创建和建立请求连接
MVC是一种软件架构设计模式,将应用分为模型、视图和控制器三部分。Spring MVC是基于MVC模式的Web框架,通过`@RequestMapping`等注解实现URL路由映射,支持GET和POST请求,并可传递参数。创建Spring MVC项目与Spring Boot类似,使用`@RestController`注解标记控制器类。
43 1
Spring MVC——项目创建和建立请求连接
|
2月前
|
Java 关系型数据库 MySQL
Maven——创建 Spring Boot项目
Maven 是一个项目管理工具,通过配置 `pom.xml` 文件自动获取所需的 jar 包,简化了项目的构建和管理过程。其核心功能包括项目构建和依赖管理,支持创建、编译、测试、打包和发布项目。Maven 仓库分为本地仓库和远程仓库,远程仓库包括中央仓库、私服和其他公共库。此外,文档还介绍了如何创建第一个 SpringBoot 项目并实现简单的 HTTP 请求响应。
169 1
Maven——创建 Spring Boot项目
|
2月前
|
Java 关系型数据库 MySQL
如何使用 maven 创建一个 Spring Boot项目
Maven 是一个强大的项目管理工具,通过配置 `pom.xml` 文件自动获取所需的 jar 包,提高开发效率。其核心功能包括项目构建和依赖管理。项目构建支持编译、测试、打包和发布等流程,而依赖管理则通过中央仓库、本地仓库和私有服务器获取和管理项目依赖。示例中展示了如何创建第一个 SpringBoot 项目并实现简单接口。
48 1
如何使用 maven 创建一个 Spring Boot项目
|
2月前
|
Java 应用服务中间件 Android开发
Eclipse创建Spring项目
本文介绍了在Eclipse中创建Spring项目的步骤,包括如何配置Tomcat服务器、创建项目、部署项目到Tomcat以及添加Spring框架所需的JAR包。
81 1
Eclipse创建Spring项目
|
2月前
|
Java Spring
ij社区版如何创建spring项目
如何在IntelliJ IDEA社区版中创建Spring项目,包括安装Spring Boot Helper插件的步骤和创建过程。
85 1
ij社区版如何创建spring项目
|
2月前
|
Java Apache Maven
Java/Spring项目的包开头为什么是com?
本文介绍了 Maven 项目的初始结构,并详细解释了 Java 包命名惯例中的域名反转规则。通过域名反转(如 `com.example`),可以确保包名的唯一性,避免命名冲突,提高代码的可读性和逻辑分层。文章还讨论了域名反转的好处,包括避免命名冲突、全球唯一性、提高代码可读性和逻辑分层。最后,作者提出了一个关于包名的问题,引发读者思考。
Java/Spring项目的包开头为什么是com?
|
2月前
|
前端开发 Java 应用服务中间件
【Spring】Spring MVC的项目准备和连接建立
【Spring】Spring MVC的项目准备和连接建立
64 2
|
2月前
|
XML Java 应用服务中间件
【Spring】运行Spring Boot项目,请求响应流程分析以及404和500报错
【Spring】运行Spring Boot项目,请求响应流程分析以及404和500报错
223 2
下一篇
DataWorks