自顶向下学习 RocketMQ(九):回溯消费

简介: 回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

定义


“回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。”


Demo


我们仍然是利用 Spring Cloud Stream 的编程模型 + Spring Cloud Alibaba RocketMQ 来实现。


理论


在消费时,可以设置一个字段 ConsumeFromWhere(源码位置在:org.apache.rocketmq.common.consumer.ConsumeFromWhere),从哪开始消费。可选参数,去掉 Deprecated 的,剩下的就是


public enum ConsumeFromWhere {
  CONSUME_FROM_LAST_OFFSET,
  CONSUME_FROM_FIRST_OFFSET,
  CONSUME_FROM_TIMESTAMP,
}


  • CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费
  • CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费
  • CONSUME_FROM_TIMESTAMP:从某个时间开始消费


我们需要设置从某个时间开始消费,即配置 CONSUME_FROM_TIMESTAMP 并设置好具体的时间点。


实现


首先还是看一下配置文件


server:
  port: 8080
  servlet:
    context-path: /mq-example
spring:
  application:
    name: mq-example
  cloud:
    stream:
      bindings:
        input-backtracking:
          content-type: application/json
          destination: test-topic3
          group: backtracking-consumer-group
        # 定义 name 为 output 的 binding 生产
        output-order:
          content-type: application/json
          destination: test-topic3
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          # 配置 rocketmq 的 nameserver 地址
          name-server: 127.0.0.1:9876
          group: rocketmq-group
        bindings:
          output-order:
            # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
            producer:
              #group: producer-group # 生产者分组
              sync: true # 是否同步发送消息,默认为 false 异步。
          input-backtracking: # 回溯消息配置
            # com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties
            consumer:
              consumeFromWhere: CONSUME_FROM_TIMESTAMP
              consumeTimestamp: 20220117110148
              enabled: true # 是否开启消费,默认为 true
              broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费


这里我们仍然用之前的 ouput-order 作为生产者,生产消息。


消息者配置上主要注意 input-backtracking 节点中的属性配置:


  • consumeFromWhere 即上文提到的从哪儿开始消费,这里我们指定时间消费
  • consumeTimestamp 即指定的时间点


程序入口:


@SpringBootApplication
@EnableBinding({ MySource.class})
public class MqBootstrapApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqBootstrapApplication.class);
    }
}


要加上 @EnableBinding

MySource:


public interface MySource {
    @Output("output-order")
    MessageChannel output4Order();
    @Input("input-backtracking")
    MessageChannel inputBackTracking();
}


controller 生产消息:


@GetMapping("/produce")
    public void produceMsg() {
        Map<String, Object> headers = Maps.newHashMapWithExpectedSize(16);
        headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
        headers.put(MessageConst.PROPERTY_TAGS, "test03");
        Order order = Order.builder().id(1L).desc("test").build();
        Message message = MessageBuilder.createMessage(order, new MessageHeaders(headers));
        mySource.output4Order().send(message);
    }


ReceiveService 消费消息:


@Service
@Log4j2
public class ReceiveService {
    @StreamListener("input-backtracking")
    public void receiveBackTrackingInput(String receiveMsg, GenericMessage message, @Headers Map headers) {
        log.info("接收到回溯消息:{}", receiveMsg);
    }
}


测试


可以先调用 controller 生产消息,或者不用 Demo 中的生产者生产消息,找一个之前发过消息的 topic , 看一下它的消息轨迹,找到存储时间


23.jpg


如果你用之前发过消息的 topic 记得修改配置文件中的 topic名称 :


24.jpg


确认找到的这条消息已经被消费过(因为要测回溯,至少是二次消费),将 consumeTimestamp 的时间配置在 存储时间之后。


这时启动项目,观察 ReceiveService 的输出:


接收到回溯消息:{"id":1,"desc":"test"}


证明消息回溯消费成功。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
2月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
76 3
|
3月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
3月前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
62 0
|
5月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
58 0
|
6月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
762 0
|
6月前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
45 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
767 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67766 2
3 张图带你彻底理解 RocketMQ 事务消息