【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

简介: 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

痛点背景


业务场景

假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做?


之前方案

最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)>30分钟,则关闭订单。


方案评估


  • 优点:是实现简单,缺点呢?
  • 缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。



延时队列出现


  • 能够在指定时间间隔后触发某个业务操作
  • 能够应对业务数据量特别大的特殊场景


RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。




功能特点


  • RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
  • 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
  • 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
  • broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。




Broker处理延迟消息


延时队列生产者端:


延时消息的关键点在于Producer生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。


public class Producer {
  private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //设置namesrv地址
        producer.setNamesrvAddr("111.231.110.149:9876");
        //启动生产者
        producer.start();
        //发送10条消息
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //设置消息延时级别  3对应10秒后发送
                //延时级别1对应延时1秒后发送消息
                //延时级别2对应延时5秒后发送消息
                //延时级别3对应延时10秒后发送消息
                //以此类推。
                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}
复制代码


初始化


DefaultMessageStore在启动时,会调用ScheduleMessageService#load()方法来加载消息消费进度和初始化延迟级别对应map,然后调用ScheduleMessageService#start()方法来启动类


load方法

public boolean load() {
        boolean result = super.load();
        result = result && this.parseDelayLevel();
        return result;
}
复制代码



ScheduleMessageService继承自ConfigManager类,super.load()方法对应

public boolean load() {
        String fileName = null;
        try {
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName);
            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
}
复制代码



延时队列源码分析:


先从延时消息延迟级别设置与broker端消息持久化入手。


具体实现

RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。


image.png

启动延迟消息定时任务


如果想要深入了解的可以看一下ScheduleMessageService这个类

image.png

内部变量含义


延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义

image.png


  • delayLevelTable定义了延迟级别和延迟时间的对应关系
  • offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start()

image.png

延迟消息投递

image.png


其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1

public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
}



image.png


核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown <= 0)中,看下代码


image.png

每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的topic下,并把延迟队列中的消息删除


重新投递实现


重新构建投递消息的关键点在于messageTimeup中,其构建了一个新的消息,并从延时消息属性中恢复出了原有的topic,queueId,再调用putMessage重新进行投递。


image.png



总结


  • 优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性
  • 缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况
  • 改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率



基本思路已经介绍完,梳理下延时消息实现思路


  • producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
  • broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1
  • mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
  • 根据消费偏移量offset从commitLog中解析出对应消息
  • 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
  • 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递













相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
因为一个问题、我新学了一门技术 ElasticSearch 分布式搜索
这篇文章讲述了作者因为一个检索问题而学习了ElasticSearch技术,并分享了排查和解决ElasticSearch检索结果与页面展示不符的过程。
因为一个问题、我新学了一门技术 ElasticSearch 分布式搜索
|
4天前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
17天前
|
存储 负载均衡 中间件
构建可扩展的分布式数据库:技术策略与实践
【8月更文挑战第3天】构建可扩展的分布式数据库是一个复杂而具有挑战性的任务。通过采用数据分片、复制与一致性模型、分布式事务管理和负载均衡与自动扩展等关键技术策略,并合理设计节点、架构模式和网络拓扑等关键组件,可以构建出高可用性、高性能和可扩展的分布式数据库系统。然而,在实际应用中还需要注意解决数据一致性、故障恢复与容错性以及分布式事务的复杂性等挑战。随着技术的不断发展和创新,相信分布式数据库系统将在未来发挥更加重要的作用。
|
1月前
|
算法 数据库 OceanBase
共识协议的技术变迁问题之Raft协议对分布式系统有什么贡献
共识协议的技术变迁问题之Raft协议对分布式系统有什么贡献
42 8
|
1月前
|
存储 人工智能 前端开发
共识协议的技术变迁问题之分布式系统的基础目标是什么
共识协议的技术变迁问题之分布式系统的基础目标是什么
|
21天前
|
数据采集 存储 NoSQL
Redis 与 Scrapy:无缝集成的分布式爬虫技术
Redis 与 Scrapy:无缝集成的分布式爬虫技术
|
22天前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
30天前
|
存储 数据管理 数据库
现代数据库技术中的分布式一致性问题与解决方案探讨
分布式系统在现代数据库技术中扮演着重要角色,但分布式环境下的数据一致性问题始终是挑战之一。本文深入探讨了分布式一致性的核心概念、各种一致性模型的特点及其在实际应用中的优缺点,旨在为技术从业者提供全面的视角和实用的解决方案。
|
22天前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
22天前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。