RocketMQ极简入门-RocketMQ延迟消息

简介: 我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。

使用场景

我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。

概述

延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费 , 在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1 到 18)来指定,分别有:

messageDelayLevel=1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

可以通过修改配置来增加级别,比如在mq安装目录的 broker.conf 文件中增加如:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d 
• 1

增加了2d 两天,这个时候总共就有19个level。

延迟消息工作原理

延迟队列工作流程图

RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;如果有就走延迟队列,执行下面的流程

  1. 修改消息Topic的名字为SCHEDULE_TOPIC_XXXX
  2. 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId
    目录与consumequeue文件
  3. 修改消息索引单元,计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,投递时间 = 消息存储时间 + 延时等级时间 。下面是CosumeQueue单个存储单元组成结构如下

  • Commit Log Offset:记录在CommitLog中的位置。
  • Size:记录消息的大小
  • Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。
  1. 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
  2. Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。
  3. 在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic。

延迟消息实战

消息发送方

给消息设置延迟级别,API:message.setDelayTimeLevel(3);

publicclassProducer {
//演示消息同步发送publicstaticvoidmain(String[] args) throwsException {
//生产者DefaultMQProducerproducer=newDefaultMQProducer("syn-producerGroup-delay");
//设置name server地址producer.setNamesrvAddr("127.0.0.1:9876");
//启动producer.start();
for (longi=0 ; i<4 ; i++){
Orderorder=newOrder(i,"订单"+i,"创建");
//添加内容byte[] bytes= (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
//创建消息,指定:TOPIC 和 TAGMessagemessage=newMessage("topic-order-delay","product-order-delay",bytes);
//延迟级别 3,代表 10s延迟message.setDelayTimeLevel(3);
message.setKeys("key-"+i);
//执行发送SendResultresult=producer.send(message);
System.out.println("发送时间:"+newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate()));
System.out.println(result);
        }
producer.shutdown();
    }
}

消息消费方

publicclassConsumer {
publicstaticvoidmain(String[] args) throwsException {
//创建消费者DefaultMQPushConsumerdefaultMQPushConsumer=newDefaultMQPushConsumer("syn-consumerGroup-delay");
//设置name server 地址defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅defaultMQPushConsumer.subscribe("topic-order-delay","product-order-delay");
defaultMQPushConsumer.registerMessageListener(newMessageListenerConcurrently() {
@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>list, ConsumeConcurrentlyContextconsumeConcurrentlyContext) {
list.forEach(message->{
System.out.println("消费时间:"+newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate()));
System.out.println(message+" ; "+newString(message.getBody(), CharsetUtil.UTF_8));
                });
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
defaultMQPushConsumer.start();
    }
}

从消费者消费结果时间来看,消息是延迟了10s后才收到。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2174 8
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
5月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
259 99
EMQ
|
安全 网络性能优化
MQTT 5.0 报文(Packets)入门指南
MQTT 控制报文是 MQTT 数据传输的最小单元。MQTT 客户端和服务端通过交换控制报文来完成它们的工作,比如订阅主题和发布消息。
EMQ
1080 93
MQTT 5.0 报文(Packets)入门指南
|
11月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
190 0
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
197 0
分享一下rocketmq入门小知识
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
728 2
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
321 0
说说RabbitMQ延迟队列实现原理?