五.RocketMQ极简入门-RocketMQ延迟消息

简介: RocketMQ极简入门-RocketMQ延迟消息

使用场景

我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。

概述

延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费 , 在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1 到 18)来指定,分别有:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

可以通过修改配置来增加级别,比如在mq安装目录的 broker.conf 文件中增加如:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d

增加了2d 两天,这个时候总共就有19个level。

延迟消息工作原理

延迟队列工作流程图
在这里插入图片描述
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;如果有就走延迟队列,执行下面的流程

  1. 修改消息Topic的名字为SCHEDULE_TOPIC_XXXX

  2. 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId
    目录与consumequeue文件

  3. 修改消息索引单元,计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,投递时间 = 消息存储时间 + 延时等级时间 。下面是CosumeQueue单个存储单元组成结构如下
    在这里插入图片描述

  • Commit Log Offset:记录在CommitLog中的位置。
  • Size:记录消息的大小
  • Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。
  1. 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

  2. Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。

  3. 在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic。

延迟消息实战

消息发送方

给消息设置延迟级别,API:message.setDelayTimeLevel(3);

public class Producer {
   
   

    //演示消息同步发送
    public static void main(String[] args) throws Exception {
   
   
        //生产者
        DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup-delay");

        //设置name server地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //启动
        producer.start();

        for (long i = 0 ; i < 4 ; i++){
   
   


            Order order = new Order(i,"订单"+i,"创建");

            //添加内容
            byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
            //创建消息,指定:TOPIC 和 TAG
            Message message = new Message("topic-order-delay","product-order-delay",bytes);

            //延迟级别 3,代表 10s延迟
            message.setDelayTimeLevel(3);

            message.setKeys("key-"+i);

            //执行发送
            SendResult result = producer.send(message);

            System.out.println("发送时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

            System.out.println(result);
        }

        producer.shutdown();
    }
}

消息消费方

public class Consumer {
   
   
    public static void main(String[] args) throws Exception {
   
   
        //创建消费者
        DefaultMQPushConsumer defaultMQPushConsumer = 
                                    new DefaultMQPushConsumer("syn-consumerGroup-delay");
        //设置name server 地址
        defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

        //从开始位置消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //订阅
        defaultMQPushConsumer.subscribe("topic-order-delay","product-order-delay");

        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
   
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
   
   

                list.forEach(message->{
   
   
                    System.out.println("消费时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                    System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                });

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        defaultMQPushConsumer.start();
    }
}

从消费者消费结果时间来看,消息是延迟了10s后才收到。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
12天前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
16 1
|
15天前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ产品使用合集之是否支持任意时间延迟的消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
20天前
|
消息中间件 数据库
03.RabbitMQ延迟队列
03.RabbitMQ延迟队列
24 0
|
20天前
|
消息中间件 存储 Kafka
01.RabbitMQ入门
01.RabbitMQ入门
34 0
|
1月前
|
消息中间件 存储 前端开发
RabbitMQ在Java中的完美实现:从入门到精通
本文由木头左介绍如何在Java项目中使用RabbitMQ。RabbitMQ是开源的AMQP实现,支持多种客户端,适合分布式系统中的消息传递。首先需安装Erlang和RabbitMQ,接着在Java项目中添加RabbitMQ客户端库依赖。通过创建连接工厂和连接,建立与RabbitMQ的通信,并展示了创建连接和通道的代码示例。
|
2月前
|
消息中间件 监控 Linux
RabbitMQ轻松入门:从零开始的部署与安装指南
RabbitMQ轻松入门:从零开始的部署与安装指南
69 0
RabbitMQ轻松入门:从零开始的部署与安装指南
|
2月前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
335 0
|
2月前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
95 0
|
2月前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
102 0
RabbitMQ入门指南(九):消费者可靠性
|
2月前
|
消息中间件 Java 微服务
RabbitMQ入门指南(七):生产者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消息丢失的可能性、生产者可靠性中的生产者重试机制和生产者确认机制等内容。
73 0
RabbitMQ入门指南(七):生产者可靠性