SpringBoot整合RocketMQ发送延时消息

简介: 当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息

1. 延时等级
延时时长不支持随意时长的延迟,是通过特定的延迟等级来指定的,默认变量有1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别对应1~18等级。例如等级为3,对应于10s
如果需要自己定义延时等级,需要在broker加载的配置文件中配置messageDelayLevel

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

2. 延时消息实现原理
修改消息

  • 修改消息的Topic为SCHEDULE_TOPIC_XXXX
  • 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件
  • 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到Broker时的时间戳
  • 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

投递延时消息
Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中。
将消息重新写入commitlog
延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。

3. 生产者业务接口

public interface ScheduleMessageService {

    /**
     * 发送同步定时消息
     * @param id
     * @param message 消息内容
     * @param timeout 过期时间(毫秒)
     * @param delayLevel 延时级别为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)其下标,从1开始
     */
    void sendSyncScheduleMessage(String id, String message, long timeout, int delayLevel);
}

4. 生产者业务接口实现类

@Service
public class ScheduleMessageServiceImpl implements ScheduleMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(ScheduleMessageServiceImpl.class);

    @Override
    public void sendSyncScheduleMessage(String id, String message, long timeout, int delayLevel) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("schedule-message-topic:sync-tags", strMessage, timeout, delayLevel);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送同步定时消息成功!消息ID为:{},当前时间为:{}", result.getMsgId(), LocalDateTime.now());
        } else {
            logger.info("发送同步定时消息失败!消息ID为:{}", result.getMsgId());
        }
    }
}

5. 消费者类

@Component
@RocketMQMessageListener(topic = "schedule-message-topic", consumerGroup = "schedule-consumer-group")
public class ScheduleMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(ScheduleMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到定时消息:{},当前时间为:{}", message, LocalDateTime.now());
    }
}

5.6. 测试

@Test
void scheduleMessage() {
    String uuid = UUID.randomUUID().toString();
    scheduleMessageService.sendSyncScheduleMessage(uuid, "hello" + uuid, 100,3);
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
95 0
|
消息中间件 Java RocketMQ
SpringBoot整合RocketMQ发送批量消息
SpringBoot整合RocketMQ发送批量消息
|
6月前
|
消息中间件 Java RocketMQ
低版本的springboot(1.5.3)如何集成rocketmq
低版本的springboot(1.5.3)如何集成rocketmq
153 1
|
3月前
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
114 0
|
3月前
|
消息中间件 Java RocketMQ
Springboot整合RocketMQ 基本消息处理
1. 同步消息 2. 异步消息 3. 单向消息 4. 延迟消息 5. 批量消息 6. 顺序消息 7. Tag过滤 8. 广播消息
|
8月前
|
消息中间件 Java 程序员
SpringBoot整合RocketMQ,尝尝几大高级特性!
作为一名程序员,您一定熟悉RocketMQ的功能,包括支持事务、顺序和延迟消息等。在程序员界有一句名言,“Talk is cheap. Show me the code” 。本文将通过实际案例来引出解决方案,并通过代码实现,让您在学习本节的过程中能够确切地掌握实际编码技能
144 0
SpringBoot整合RocketMQ,尝尝几大高级特性!
|
8月前
|
消息中间件 物联网 大数据
SpringBoot3集成RocketMq
RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;
237 0
|
9月前
|
消息中间件 Java API
RocketMQ极简入门-在SpringBoot中使用RocketMQ
现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例
396 0
|
9月前
|
消息中间件 Java API
八.RocketMQ极简入门-在SpringBoot中使用RocketMQ
RocketMQ极简入门-在SpringBoot中使用RocketMQ
|
9月前
|
Java
SpringBoot整合RocketMQ
SpringBoot整合RocketMQ
303 0