自顶向下学习 RocketMQ(六):定时消息

简介: 定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

定义和原理


定时消息(延迟队列) 是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。


broker 有配置项 messageDelayLevel,默认值为 1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h  18 个 level , 可以配置自定义 messageDelayLevel。


注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:


  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如 level==1,延迟 1s
  • level > maxLevel,则 level== maxLevel,例如 level==20,延迟 2h


定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。


RocketMQ 暂时不支持任意时间的定时


简化一个实现原理方案示意图:


16.jpg


分为两个部分:


  • 消息的写入
  • 消息的 Schedule


消息写入中:


  1. 在写入 CommitLog 之前,如果是延迟消息,替换掉消息的 Topic 和 queueId(被替换为延迟消息特定的 Topic,queueId 则为延迟级别对应的 id)


  1. 消息写入 CommitLog 之后,提交 dispatchRequest 到 DispatchService


  1. 因为在第①步中 Topic 和 QueueId 被替换了,所以写入的 ConsumeQueue 实际上非真正消息应该所属的 ConsumeQueue,而是写入到 ScheduledConsumeQueue 中(这个特定的 Queue 存放不会被消费)


Schedule 过程中:


  1. 给每个 Level 设置定时器,从 ScheduledConsumeQueue 中读取信息


  1. 如果 ScheduledConsumeQueue 中的元素已近到时,那么从 CommitLog 中读取消息内容,恢复成正常的消息内容写入 CommitLog


  1. 写入 CommitLog 后提交 dispatchRequest 给 DispatchService


  1. 因为在写入 CommitLog 前已经恢复了 Topic 等属性,所以此时 DispatchService 会将消息投递到正确的 ConsumeQueue 中


Demo


配置


由于 spring cloud alibaba 低版本的 rocketmq 定时消息功能有问题,不能实现,所以必须换高版本的,下面是我使用的版本信息:


<spring.boot.version>2.3.12.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR12</spring.cloud.version>
<spring.cloud.alibaba.version>2.2.7.RELEASE</spring.cloud.alibaba.version>


引入的是 starter


<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>


分享一下我的配置文件:


spring:
  application:
    name: mq-example
  cloud:
    stream:
      bindings:
        # 定义 name 为 input 的 binding 消费
        input:
          content-type: application/json
          destination: test-topic3
          group: consumer-group
        # 定义 name 为 output 的 binding 生产
        output-order:
          content-type: application/json
          destination: test-topic3
          # Producer 配置项,对应 ProducerProperties 类
          #producer:
            # partition-key-expression: payload['id'] # 分区 key 表达式。该表达式基于 Spring EL,从消息中获得分区 key。
            #partitionCount: 3  # 分区数量
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          # 配置 rocketmq 的 nameserver 地址
          name-server: 127.0.0.1:9876
          group: rocketmq-group
        bindings:
          output-order:
            # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
            producer:
              #group: producer-group # 生产者分组
              sync: true # 是否同步发送消息,默认为 false 异步。
          input:
            # RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
            consumer:
              #group: consumer-group # 消费者分组
              enabled: true # 是否开启消费,默认为 true
              broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
              orderly: false # 是否顺序消费,默认为 false 并发消费。


17.jpg


这里注意红框部分在低版本有说要改成 true 才可以发送定时消息,我在高版本测试不用,true 和 false 都可以。


发送消息


消息发送部分几乎和之前一样,只是多加一了个 header PROPERTY_DELAY_TIME_LEVEL, 这里我写的是 2,即延迟 5 秒。


Map<String, Object> headers = Maps.newHashMapWithExpectedSize(16);
        headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
        headers.put(MessageConst.PROPERTY_TAGS, "test03");
        Order order = Order.builder().id(1L).desc("test").build();
        Message message = MessageBuilder.createMessage(order, new MessageHeaders(headers));
        mySource.output4Order().send(message);


接收消息


接收和之前没什么区别


@Service
public class ReceiveService {
    /**
     * 订阅消息
     *
     * @param receiveMsg
     */
    @StreamListener("input")
    public void receiveInput1(String receiveMsg, GenericMessage message, @Headers Map headers) {
        System.out.println(message.toString());
        System.out.println("线程 ID: " + Thread.currentThread().getId() + " 接受到消息 input receive: " + receiveMsg);
    }


效果


18:00:16.673 [http-nio-8080-exec-1] INFO   the message has sent, ......
18:00:21.685 [ConsumeMessageThread_1] INFO   接收到消息:{"id":1,"desc":"test"}


可以看到发送到接收,相距 5 秒。



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
3月前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
62 0
|
5月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
58 0
|
6月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
773 0
|
6月前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
46 0
|
25天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
66 6
|
19天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
23天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!