这一讲,我们主要来讲延迟消息。
这一次我们结合业务来讲。
业务背景
在电商中,下单后,有一些用户付款后,会主动退款。也有一些用户下单后,未付款。但是这部分未付款的订单,会占用着商品库存。
我们目前的电商App,下单后,会在订单表创建对应的订单数据。这些订单的状态,有一些是未付款的,但是未付款的订单占用着商品库存。为了让商品库存能正常恢复,我们现在的处理方案是:
- 启动一个定时任务,每30分钟,定时扫描一遍订单表
- 如果订单是已付款,则跳过,不处理
- 如果订单是未付款,但未超过30分钟,不处理
- 如果订单是未付款,且超过30分钟,就取消订单 (补充:取消订单,其实就是下单的逆向流程)
方案缺点
这个方案有什么缺点?
- 第一,每次定时任务去扫描全部订单,但是订单未付款且超时30分钟的只有一小部分。就是做很多无用功。
- 第二,如果订单表的数量超级超级大,这个时候,扫描的时间巨长,浪费cpu资源。
- 第三,这样子频繁查询数据库,给数据库造成很多不必要的压力。
解决方案
那针对上述的缺点,我们有没有好的解决方案:
- 第一,避免扫描全表
- 第二,谁没付款,就去取消谁,不要做多余的动作
- 第三,要保证近实时取消订单。(近实时:1s左右)
说了这么多,我摊牌了,不装了,就是为了引入RocketMQ的延迟消息
简单总结一下:创建订单的时候,发送一条延时30分钟的消息。到30分钟后,消费者拿到信息,再去判断订单是否已付款,如果付款就跳过不处理,没付款,那就取消订单。
这种方案:没有多余的扫描数据库操作;谁没付款,就去取消谁。多好呀!在生产上,赶紧用起来。
生产者
上面,介绍的都是方法论,下面就是具体的实操环节了。
下面,简单用一个demo介绍一下生产者
public class Producer { public static void main(String[] args) throws Exception{ //生产者组 DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group"); //设置nameserver producer.setNamesrvAddr("localhost:9876"); //启动生产者 producer.start(); //构建消息 Message message = new Message("delayTopic","TagA","delayMessage".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 重点:设置延迟级别 message.setDelayTimeLevel(3); // 发送消息 SendResult sendResult = producer.send(message); // 打印发送结果 System.out.println("发送结果:"+sendResult); // 关闭生产者 producer.shutdown(); } }
这里强调一下,不是延迟发送哈,是延迟消费。发送是立马就发送的,只是消费的时候,延迟30分钟。
补充知识点
延迟级别是从1开始的,不是从0开始。然后你可能会发现,最多延迟2小时。如果你想延迟3小时,对不起,RocketMQ不支持。告辞!!!
消费者
public class Consumer { public static void main(String[] args) throws Exception { // 消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer_group"); //注册nameserver consumer.setNamesrvAddr("localhost:9876"); // 订阅主题 consumer.subscribe("delayTopic","TagA"); // 开启消费offset consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (int i = 0; i < list.size(); i++) { MessageExt messageExt = list.get(i); String msg = new String(messageExt.getBody()); //这里主要打印延迟时间≈当前时间-消息的生产时间 System.out.println(msg+" 时间="+(System.currentTimeMillis()-messageExt.getBornTimestamp())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
总结:延迟消费者和普通的消费者相同,一毛一样。延迟消息的核心点:生产者多了一个延迟级别。
知其然知其所以然
上面,你已经知道怎么使用了。
如果面试官问你:RocketMQ的延迟消息底层原理是什么?
那你接着看下去。
看图说话。
- 第一,生产者发送的消息,因为带了延迟级别,因此会被分发到叫SCHEDULE_TOPIC_XXXX的Topic中。里面有18的队列,一个队列对应着一个延迟级别。比如queueId=delayLevel-1。
- 第二,定时器,每100毫秒,扫描所有延迟级别里面的延迟消息message,如果消费时间已经大于当前时间,那定时器就会把延迟消息message,发送到真正的topic(就是代码写的topic,比如上面代码的:delayTopic),根据负载均衡策略,把message发送到具体某个队列。
- 第三,有消息后,消费者进行消息和后续处理。
上面这里,是一个总体流程图。
然后,我们对照代码,来进一步深刻认识一下。其实,就是加深理解。
第一步:生产者发送的消息到SCHEDULE_TOPIC_XXXX的topic
org.apache.rocketmq.store.CommitLog#putMessage
//真正的topic String topic = msg.getTopic(); //真正的队列Id int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // 延迟级别大于0 if (msg.getDelayTimeLevel() > 0) { // 如果延迟级别大于最大延迟级别,那就把延迟级别设置为最大延迟级别 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 延迟topicSCHEDULE_TOPIC_XXXX topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 根据延迟级别,获取队列id queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 消息的topic设置为延迟topic,不是设置真正的topic msg.setTopic(topic); msg.setQueueId(queueId); } } 省略部分封装msg的代码.. //最后把msg追加到mappedFile上,mappedFile这个后续再讲,在这里你把它当做一个文件即可 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
第二步:定时器扫描信息
- 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
public void start() { //通过AtomicBoolean 来确保 有且仅有一次执行start方法 if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 遍历所有 延迟级别 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // key为延迟级别 Integer level = entry.getKey(); // value 为 毫秒数 Long timeDelay = entry.getValue(); // 根据延迟级别 ,获取对应的offset Long offset = this.offsetTable.get(level); // if (null == offset) { offset = 0L; } // 为每个延迟级别创建定时任务,开始执行定时任务,1S后开始执行 if (timeDelay != null) { // 第二步:具体核心执行逻辑在DeliverDelayedMessageTimerTask-->executeOnTimeup() this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 延迟10秒后执行定时任务,flushDelayOffsetInterval=10s,周期也是10秒执行一次 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { //持久化每个队列的消费offset if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
public void executeOnTimeup() { //根据延迟级别和topic:RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";来找到对应的ConsumeQueue ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); // 消费偏移量 long failScheduleOffset = offset; if (cq != null) { // 根据消费偏移量从消息队列中获取所有有效消息 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); // 遍历所有消息 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 获取消息的物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 获取消息的物理长度 int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); //当前时间 long now = System.currentTimeMillis(); //消费时间 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); //下一个偏移量 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); //如果消费时间<当前时间,说明应该被消费了 long countdown = deliverTimestamp - now; if (countdown <= 0) { //根据物理偏移量和长度,获取消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { //构建真正 的消息 MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); // 重新把消息发送到真正的消息队列上 PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); ...省略一堆不太重要的代码 } //这里又重新添加一个新的任务,这次是100毫秒 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
第三步: 消费者后续处理(略)后用一张图来总结
最后用一张图来总结
好了,写完了,下期见,拜拜。
有问题的话,欢迎留言交流。
每日一问
RocketMQ不支持自定义延迟时间,那Kafka支持延迟消息吗?如果支持,支持自定义延迟时间吗?如要你实现自定义延迟时间,你会怎么实现?说说你的思路
欢迎留言
后续文章
- RocketMQ-入门(已更新)
- RocketMQ-架构和角色(已更新)
- RocketMQ-消息发送(已更新)
- RocketMQ-消费信息
- RocketMQ-消费者的广播模式和集群模式(已更新)
- RocketMQ-顺序消息(已更新)
- RocketMQ-延迟消息(已更新)
- RocketMQ-批量消息
- RocketMQ-过滤消息
- RocketMQ-事务消息
- RocketMQ-消息存储
- RocketMQ-高可用
- RocketMQ-高性能
- RocketMQ-主从复制
- RocketMQ-刷盘机制
- RocketMQ-幂等性
- RocketMQ-消息重试
- RocketMQ-死信队列...
欢迎各位入(guan)股(zhu),后续文章干货多多。
—本文转载自「大头菜技术」公众号