rocketmq实现延迟队列思路探讨

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 本文介绍了两种实现RocketMQ延迟消息的方法。非任意时间延迟可通过在服务器端配置`messageDelayLevel`实现,但需重启服务。任意时间延迟则分为两种策略:一是结合原生逻辑和时间轮,利用RocketMQ的默认延迟等级组合支持任意延迟,但可能丢失1分钟内的数据;二是使用存储介质(如Redis)加时间轮,消息存储和定时发送结合,能处理数据不一致和丢失问题,但涉及更多组件。推荐项目[civism-rocket](https://github.com/civism/civism-rocket)作为参考。

一、非任意时间

1、修改

在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

缺点:需要重启rocketmq的服务端

二、任意时间

1、借鉴原生的逻辑

先建立多个时间范围的level,依靠一个定时任务搬运,到一天以内的时候建立时间轮的方式建立时分秒三个表来查着着几个区间的数据,上一个级别的查到才会注册下一个级别的定时任务,执行完成后取消注册,时间轮有HashedWheelTimer,需要考虑持久化问题

2、时间轮加rocketmq

1.rocketMQ默认支持18个等级 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

2.支持任意延时,设计逻辑为拆分法,用默认的rocketMQ去支持 任意时间都能够通过上面的时间组装出来

3.在1分中内需要发送的采用时间轮方式,发送出去

4.延时会走 guava-topic,不会走业务需要的topic,只有当真正需要发送的时候才会走业务topic

5.考虑1分钟内由于强制重启等所带来的数据丢失问题. (TODO)

代码

https://github.com/civism/civism-rocket

优点

不需要依赖除rocketMQ以外的任何中间间,可以算是0侵入

支持任意时间纬度的延时

实现简单,浅显易懂,安全与否取决于rocketMQ

rocketMQ所有的优点

缺点

极端情况下会有一分钟的数据丢失(服务重启并且满足刚好进入时间轮)

增大了rocketMQ的自带的延时压力

rocketMQ所有的缺点

3、存储介质加时间轮:

生产延迟消息:延迟消息由两部分组成–该笔消息的订单号key+业务数据value;

存储消息:当把延迟消息组装好之后,把该消息(key,value)放入redis中并设置一定的超时时间同时存入时间轮数据结构中;

取出消息:当该消息在时间轮数据结构中到期时,取出key,然后根据这个key去redis中取value;

通过RocketMQ的生产者线程,把消息发送出去,若发送成功,则把redis中该key删除;若是发送失败,则记录日志,人工补偿;

每部分的作用是:

HashedWheelTimer:存储消息的key,key到期时,自动弹出—起到一个定时器的作用;

Redis:将完整的延迟消息存储到内存中时,还把数据持久化到硬盘,当redis重启时,基本不丢数据;

RocketMQ:发送延迟消息;

这里有几个问题需要注意:

当系统突然宕机,服务器重启后,时间轮HashedWheelTimer中的key都将消失,并且很难恢复,此时丢失的key对应在Redis中的value只能等待时间到期,这种情况怎么办,即数据丢失问题?也可以不使用Redis存储完整的消息,把完整的消息直接放入时间轮数据结构中或放入延迟队列DelayQueue中;用这种方式也会存在数据丢失的问题:即系统突然宕机,服务器重启后,未到期的数据都将丢失,因为对数据没有进行持久化;

当key从HashedWheelTimer中取出后,根据该key在Redis中没取到数据,这种情况该怎么办,即数据不一致的问题?

当消息到期后,用RocketMQ发送时,发送好几次都失败了,这时候除了记录日志,人工进行补偿之外,还有什么好的解决方案?–解决办法之一是:把这些发送失败的消息,存入数据库表中;然后启动一个定时任务,定时把发送失败的消息,通过RocketMQ再次发送出去,若发送成功,将该消息从数据库中删除;若这次还是发送失败,则下次定时任务执行时,再继续尝试发送。

这里的HashedWheelTimer可以用Delayqueue代替,它两相比较而言,HashedWheelTimer的时间复杂度比Delayqueue要好些。

参考借鉴:

1、https://github.com/civism/civism-rocket

2、https://blog.csdn.net/zhaoming19870124/article/details/94152008

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
99 0
|
6天前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
29 4
|
6天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
37 0
|
6天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
173 4
|
6天前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
|
6天前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
52 1
|
6天前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
53 0
|
6天前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
49 0
|
6天前
|
消息中间件
rabbitmq动态创建队列
rabbitmq动态创建队列
40 0
|
6天前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总: