开发者学堂课程【RocketMQ 知识精讲与项目实战(第二阶段):线程池优化消息发送】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/703/detail/12438
线程池优化消息发送
第五步是在发送消息,第六步是等待消息发送的结果,如果发送成功,删除数据库,持久化的消息内容,势必要等到mq接收消息成功,第六步进行数据库的处理,两步会耗费一定的时间,如果当前支付的请求比较多,第三方的支付平台在回调时,会遭到请求的堆积,这两步处理都是在主线程中进行处理的,所以可以创建线程池进行优化处理。
//5.发送消息到MQ
sendResult result = sendMessage(topic, tag, String.
valueOf(tradePay . getPayId()),
if(result . getSendStatus() . equals(SendStatus .SEND_ OK))
{
Log. info("消息发送成功");
//6.等待发送结果, 如果MQ接受到消息,删除发送成功的消息
mqProducerTempMapper .deleteByPrimaryKey(tradeMqProducerTemp . getId());
Log. info("持久化到数据库的消息删除");
}
在 spring 容器中创建线程池 bean 对象,创建 getThreadPool,设置当前的属性信息,默认线程池4个,最大有8个线程,队列的大小是100。
@Bean
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() ;
executor. setCorePoolsize (4);
executor. setMaxPoolSize(8);
executor . setQueueCapacity(100);
executor. setKeepAliveSeconds (60);
executor . setThreadNamePrefix("Pool-A");
executor. setRejectedExecutionHandler (new ThreadPool Executor. callerRunsPolicy());
executor. initialize();
return executor;
}
把 bean 注入到 spring 容器,就可以进行优化,当前对象的创建是在入口类进行处理。
@Autowired
Private ThreadPoolTaskExecutor threadPoolTaskExecutor;
使用线程池优化第五步和第六步。
//
在线程池中进行处理
threadPoolTaskExecutor . submit(new Runnable() {
@Override
public void
run( ){
//5.发送消息到MQ
sendResult result = sendMessage(topic, tag, String.
valueOf(tradePay . getPayId()),
if(result . getSendStatus() . equals(SendStatus .SEND_ OK))
{
Log. info("消息发送成功");
//6.等待发送结果, 如果MQ接受到消息,删除发送成功的消息
mqProducerTempMapper .deleteByPrimaryKey(tradeMqProducerTemp . getId());
Log. info("持久化到数据库的消息删除");
}
}
}}
整个业务没有变,只是对线程池进行了优化。
报错是因为当前项目在运行时才会注入对应的Mapper,不是代码的问题,是编译器的问题,不用管。
@Autowired
private TradePayMapper tradePayMapper;
@Autowired
Private TradeMqProducerTempMapper mqProducerTempMapper;