SpringBoot整合RocketMQ发送延时消息

简介: 当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息

1. 延时等级
延时时长不支持随意时长的延迟,是通过特定的延迟等级来指定的,默认变量有1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别对应1~18等级。例如等级为3,对应于10s
如果需要自己定义延时等级,需要在broker加载的配置文件中配置messageDelayLevel

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

2. 延时消息实现原理
修改消息

  • 修改消息的Topic为SCHEDULE_TOPIC_XXXX
  • 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件
  • 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到Broker时的时间戳
  • 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

投递延时消息
Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中。
将消息重新写入commitlog
延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。

3. 生产者业务接口

public interface ScheduleMessageService {

    /**
     * 发送同步定时消息
     * @param id
     * @param message 消息内容
     * @param timeout 过期时间(毫秒)
     * @param delayLevel 延时级别为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)其下标,从1开始
     */
    void sendSyncScheduleMessage(String id, String message, long timeout, int delayLevel);
}

4. 生产者业务接口实现类

@Service
public class ScheduleMessageServiceImpl implements ScheduleMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(ScheduleMessageServiceImpl.class);

    @Override
    public void sendSyncScheduleMessage(String id, String message, long timeout, int delayLevel) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("schedule-message-topic:sync-tags", strMessage, timeout, delayLevel);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送同步定时消息成功!消息ID为:{},当前时间为:{}", result.getMsgId(), LocalDateTime.now());
        } else {
            logger.info("发送同步定时消息失败!消息ID为:{}", result.getMsgId());
        }
    }
}

5. 消费者类

@Component
@RocketMQMessageListener(topic = "schedule-message-topic", consumerGroup = "schedule-consumer-group")
public class ScheduleMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(ScheduleMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到定时消息:{},当前时间为:{}", message, LocalDateTime.now());
    }
}

5.6. 测试

@Test
void scheduleMessage() {
    String uuid = UUID.randomUUID().toString();
    scheduleMessageService.sendSyncScheduleMessage(uuid, "hello" + uuid, 100,3);
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
731 0
|
消息中间件 Java RocketMQ
低版本的springboot(1.5.3)如何集成rocketmq
低版本的springboot(1.5.3)如何集成rocketmq
652 1
|
消息中间件 Java 程序员
SpringBoot整合RocketMQ,尝尝几大高级特性!
作为一名程序员,您一定熟悉RocketMQ的功能,包括支持事务、顺序和延迟消息等。在程序员界有一句名言,“Talk is cheap. Show me the code” 。本文将通过实际案例来引出解决方案,并通过代码实现,让您在学习本节的过程中能够确切地掌握实际编码技能
559 0
SpringBoot整合RocketMQ,尝尝几大高级特性!
|
消息中间件 Java Maven
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
1623 1
QGS
|
NoSQL 关系型数据库 MySQL
手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS
手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS
QGS
461 3
|
消息中间件 人工智能 Java
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
1020 1
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成