前面给大家分享了Rocket大体架构设计和Spring快速集成RocketMQ。看了前面的文章的小伙伴把RocketMQ集成进项目以及发送消息和消费消息问题应该不大,有问题的可以私信我一起学习解决问题。
今天给大家梳理下RocketMQ的延时消息如何使用以及如何实现的,包括我的一些改进想法,是不是有点飘了居然想修改RocketMQ的源码。
延时消息基本概念
延时消息:顾名思义就是消息不是实时处理的,可以在延时设置时候后消息才能被消费者消费。
以下使用场景不一定使用延时消息是最好的方案,但延时消息是适用于以下场景的:
1、30分钟取消订单,商城用户下单后如果在设定时候后还没支付就将订单状态置为取消,并恢复商品库存
2、超时自动审批,有的系统审批流程可以设置为超过设定时间后自动执行通过或者拒绝流程
3、短信或提醒,比如注册账号后三天不登陆就发短信提醒
RocketMq如何使用延时消息
Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 设置延时消息的级别
msg.setDelayTimeLevel(2);
是不是发现了RocketMQ发送延时消息非常简单,只需要在消息上设置delayTimeLevel属性即可。但是看属性的字面意思可以发现这个属性值好像不是具体多少秒,感觉像是一个等级的意思,那这个延时时间等级是怎么样个效果呢?请接着往下看
RocketMQ延时消息等级
RocketMQ的延时消息其实并非是精确的一个时间,而是采用延时等级来定义的。在MessageStoreConfig
类上有对这个延时消息等级的定义,定义了18个等级最低1s,最长2h
// RocketMQ延时消息的定义
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
我们看看RocketMQ是如何使用这18个等级来实现延时消息的。
Broker初始化流程-延时消息部分
在Broker初始化流程中有这样的逻辑,流程如下
这里解析了18个等级为秒然后放入到delayLevelTable中存储起来。
Broker启动流程-延时消息读部分
接着来看看延时任务在Broker启动的时候怎么创建的,可以看到每个等级都创建了一个定时任务,初始设定都是秒调度执行一次
那关键执行逻辑就是DeliverDelayedMessageTimerTask的run方法了
org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
//有这样的一个代码,可以看到RocketMQ把延时消息都放到了SCHEDULE_TOPIC_XXXX队列按照level分到了不同的queue。这个Topic是内部写死的
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
//然后从这个对象中根据offset来取buffer,主要是用来判断当前offset的消息时间戳和当前时间戳作对比
// 在executeOnTimeup这个方法的中间部分
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
// 如果时间戳比当前时间戳小证明消息到期了
// 会调用这个方法把真实的Topic设置到消息上
# DeliverDelayedMessageTimerTask#messageTimeup
...
// 组装真实的topic消息
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 写到真实的消息commitLog中
PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
} else {
// 如果没到期则进行下一轮检测
}
延时消息写入commitLog流程
当我们需要发送延时消息时我们会在message上设置setDelayTimeLevel(), 那从上面读取部分我们反推下写入流程,应该是在存储时判断消息体上是否有延时消息的level属性,如果有就应该把消息放在Topic为SCHEDULE_TOPIC_XXXX的队列中,我们来看看是否是我们猜想的呢?
当Broker启动时会作为服务器启动netty,当有接到消息时会触发NettyServerHandler
,跟踪下发送消息的流程
org.apache.rocketmq.store.CommitLog#asyncPutMessage
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
// 这里将topic设置为SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// queueId设置为 msg.getDelayTimeLevel()-1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 将真实的topic存储起来,以便在消息到期时替换为真实的topic发送到对应的队列上去
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
小结
从上面的代码我们可以梳理出RocketMQ为我们内置了18个延时,同时在启动Broker时启动18个定时任务分别扫描各自等级的消息逻辑。当需要延时消息时可在发送时在消息上指定等级,消息到达Broker后将topic替换为SCHEDULE_TOPIC_XXXX,将真实的topic存放在propertis中。当定时器定时扫描各自等级的队列,如果根据offset取出有消息并判断时间戳消息当前时间戳就将topic替换为真实topic发送的对应topic的队列中。
从上面的逻辑我们可以了解到开源版RocketMQ的延时消息其实并不是精准延时的。商业版的RocketMQ是阿里云的ons,ons是能满足精准延时的。滴滴也基于开源版的RocketMQ开发出了精准延时的模块,有兴趣的可以去了解下 DDMQ
本次分享就到这了,关于commitLog本篇文章梳理的不是很清楚的,下次咱们专门针对RocketMQ的存储再梳理下,好了本篇关于开源版RocketMQ的延时消息实现梳理对你有帮助可以关注下我 Java极客帮