RocketMQ进阶-延时消息

简介: 在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务。

前言

在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务


那么如何实现延迟任务呢?


第一反应是利用cron方案来实现:


1.png


启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。


cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:


  • 当数据量大的时候轮询效率低;


  • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;


  • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;


既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。


实现


RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。


注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";


下面我们结合SprintBoot利用RocketMQ发送延时消息


  • 引入RocketMQ组件


<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>


  • 增加RocketMQ的配置


rocketmq:
name-server: 172.31.0.44:9876producer:
group: delay-group


  • 编写生产者


@Component@Slf4jpublicclassDelayProduce {
@AutowiredprivateRocketMQTemplaterocketMQTemplatet;
publicvoidsendDelayMessage(Stringtopic,Stringmessage,intdelayLevel){
SendResultsendResult=rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);
log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
log.info("sendResult is{}",sendResult);
    }
}


  • 编写消费者


@Slf4j@Component@RocketMQMessageListener(
topic="delay-topic",
consumerGroup="delay-group")
publicclassDelayConsumerimplementsRocketMQListener<String> {
@OverridepublicvoidonMessage(Stringmessage) {
log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
log.info("received message is {}",message);
    }
}


  • 测试


@RunWith(SpringRunner.class)
@SpringBootTestpublicclassDelayProduceTest {
@AutowiredprivateDelayProducedelayProduce;
@TestpublicvoidsendDelayMessage() {
delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);
    }
}


这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。


  • 运行结果


2.png


  • 发送时间


3.png


消费时间


修改延时级别


RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。


  • 打开RocketMQ的配置文件,修改 messageDelayLevel 属性


brokerClusterName=DefaultClusterbrokerName=broker-abrokerId=0deleteWhen=04fileReservedTime=48brokerRole=ASYNC_MASTERflushDiskType=ASYNC_FLUSHstorePathRootDir=/app/rocketmq/datamessageDelayLevel=90s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h


这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &


  • 使用延时等级1发送消息


publicvoidsendDelayMessage() {
delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);
}


  • 测试


4.png


发送时间


5.png


消费时间


通过比对发送时间与消费时间证明延时等级修改生效。


RocketMQ 相关文章

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
607 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
618 0
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
311 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
468 0
RocketMQ Schema——让消息成为流动的结构化数据
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
903 1
|
消息中间件 RocketMQ
简述RocketMQ消息拉取过程【一】
简述RocketMQ消息拉取过程【一】
634 0
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
474 1
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
484 1
|
消息中间件 存储 Java
10 张图告诉你 RocketMQ 是怎样保存消息的
10 张图告诉你 RocketMQ 是怎样保存消息的
198 0
10 张图告诉你 RocketMQ 是怎样保存消息的
|
消息中间件 存储 uml
5 张图带你彻底理解 RocketMQ 轨迹消息
5 张图带你彻底理解 RocketMQ 轨迹消息
401 0
5 张图带你彻底理解 RocketMQ 轨迹消息