消息队列——延时消息应用解析及实践

简介: 在大部分场景下业务系统如果只需要实现异步解耦、削峰填谷等能力,常规的普通消息就可以满足此类需求。除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况。这就需要消息队列服务本身支持一些特殊的消息类型,或者开发者通过开发一些定制化的代码实现目的。

作者:阿里云解决方案架构师 鹿玄

前言

消息队列服务相信大家一定都不陌生了,在很多应用系统中,都有一些场景会使用到消息队列服务,简单来说,我们可以把消息队列比作是一个存放消息的容器,上游发送端将消息发送到消息队列,下游消费端从消息队列里消费消息。消息队列是分布式系统中重要的组件,核心作用可以帮助我们实现异步、解耦以及削峰,从而提高系统性能和稳定性。

在大部分场景下业务系统如果只需要实现异步解耦、削峰填谷等能力,常规的普通消息就可以满足此类需求。除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况。这就需要消息队列服务本身支持一些特殊的消息类型,或者开发者通过开发一些定制化的代码实现目的。这里我们列举在使用消息队列过程中几种特殊场景的例子:

顺序消费场景

生产者按照一定的先后顺序发布消息,消费者按照既定的先后顺序消费消息,即先发布的消息一定会先被客户端消费。

分布式事务场景

分布式架构下,随着系统的演进,数据库也进行了垂直拆分,如果选择使用消息队列进行上下游解耦的话,生产者和消费者需要保证数据一致性。

延时消费场景

生产者将消息发送到消息队列后,并不期望立马投递这条消息,而是推迟到某个时间点之后将消息投递给消费者进行消费。

对于顺序消息和事务消息,这里就不进行详细介绍了,大家有兴趣可以自行研究,本文后续内容会和大家一起详细讨论下延时消息更多的细节及应用场景。

延时消息介绍

延时(定时)消息的特点就是发送者成功发送一条消息后,这条消息并不会马上被消费者消费,而是在某个特定的时间或者延迟一段时间后,消息才被消费者可见并进行后续的消费,延时消息整个生命周期可以用如下示意图来表示:

image.png

  1. 消息发布者将一条延时消息发送到消息队列服务端;
  2. 在预计投递时间未到之前,消息对消费者不可见,消费者此时无法立刻消费;
  3. 投递时间到达后,消息才对消费者可见,消费者此时可以消费;
  4. 消费者获取此条消息并进行消费;
  5. 消费者成功消费后,进行确认,此条消息将不再被消费。

延时消息应用场景

交易场景

在生产者和消费者有时间窗口的要求下,我们可以考虑使用延时消息。如在电商交易场景下,交易中超时未支付的订单需要被关闭的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后,需要判断对应的订单是否已完成支付;如支付未完成,则关闭订单。

image.png

游戏场景

再比如在游戏社区里,游戏运营方经常会发起一些活动,玩家在活动期间内按照规则完成一系列任务,活动时间截止后,游戏后台根据玩家完成任务的情况进行判定,发送系统通知或者进行rank排名并派发奖励等。

image.png

此种场景也可以采用延时消息来实现,上游系统发布活动公告后,同时发送一条延时消息,延时时间设置为活动周期的时间。当活动截止后,下游系统可以随即消费消息并进行相应的逻辑处理。

其他场景

同时延时消息也可以广泛应用于信息提醒等比较通用的场景。

如何实现延时消息

介绍完延时消息的一些概念及应用场景后,我们接下来分析一下目前比较主流的几款开源消息中间件对延时消息的支持情况以及实现方式。

Kafka

原生Kafka默认是不支持延时消息的,需要开发者自己实现一层代理服务,比如发送端将消息发送到延时Topic,代理服务消费延时Topic的消息然后转存起来,代理服务通过一定的算法,计算延时消息所附带的延时时间是否到达,然后将延时消息取出来并发送到实际的Topic里面,消费端从实际的Topic里面进行消费。

RabbitMQ

RabbitMQ实现延时消息有两种方案,第一种是采用rabbitmq-delayed-message-exchange 插件实现,第二种则是利用DLX(Dead Letter Exchanges)+ TTL(消息存活时间)来间接实现。大致的实现思路如下:

  1. 创建一个普通队列delay_queue,为此队列设置死信交换机 (通过x-dead-letter-exchange参数) 和 RoutingKey (通过x-dead-letter-routing-key参数),生产者将向delay_queue发送延时消息。
  2. 创建步骤1中设置的死信交换机,同时创建一个目标队列 target_queue,并使用步骤1中设置的RoutingKey将两者绑定起来。消费者将从target_queue里面消费延时消息。
  3. 设置消息的存活时间TTL,可以在步骤1中设置到队列级别delay_queue的消息存活时间,或者在发送消息时动态设置消息级别的存活时间。

RocketMQ

开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消息队列服务在启动时,会创建一个内部topic:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列。生产者发送消息时可以设置延时等级,示例代码:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

发送的消息会暂存在Broker对应的内部topic中,再通过定时任务从内部topic中拉取数据,如果延迟时间到了,就会把消息转发到目标topic下,消费者从目标topic消费消息。

阿里云消息队列RocketMQ版

通过上一章节的讨论,我们可以看出目前几款主流的开源消息队列服务,在支持延时消息的场景下或多或少有些不完美的地方。主要体现在以下几点:

  1. Kafka不支持延时消息,需要完全开发代理服务来实现,工作量大。
  2. RabbitMQ需要额外的插件,或者利用DLX+TTL的方式进行中转,实现不是非常直观。
  3. RocketMQ支持延时消息,但是无法支持秒级延时。

那么有没有一款消息队列服务,能够完美的支持延时(定时)消息。本节我们将介绍阿里云消息队列RocketMQ版。

阿里云消息队列RocketMQ版基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。同时支持丰富的消息类型包括普通消息、顺序消息、事务消息以及我们本文讨论的延时消息。接下来我们看下阿里云RocketMQ为延时消息提供的能力及优势:

  1. 支持秒级的延时(定时)消息,同时延时时间可以最大设置为40天,基本满足所有场景。
  2. 延时(定时)消息的投递精度可控制在1~2秒之内。
  3. 延时(定时)消息在某段时间内是对消费者不可见的,从另一个维度看也属于积压的消息,阿里云消息队列RocketMQ版的不同实例规格可以支持亿级的消息积压。
  4. 提供了多语言支持,包括Java、.NET、CC++、GO、Python、PHP、Node.js等

使用阿里云消息队列RocketMQ版收发延时(定时)消息,只需要在控制台创建Topic的时候选择定时/延时消息类型,既可以使用TCP或者http协议进行消息收发。

控制台创建定时/延时Topic

image.png

Java语言示例代码(TCP协议)

  • 发送定时消息
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如2020-03-07 16:21:00投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 发送消息,只要不抛异常就是成功。
SendResult sendResult = producer.send(msg);
  • 发送延时消息
// 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在3秒后投递。
long delayTime = System.currentTimeMillis() + 3000;
// 设置消息需要被投递的时间。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);

同时订阅延时消息的逻辑无需任何改造,完全可以按照订阅普通消息的方式,没有任何的代码侵入性。

结束语

到此我们讨论了延时消息的特性、应用场景,对比了各类消息队列对延时消息的支持情况,同时也向大家介绍了阿里云消息队列RocketMQ版。我们在对消息中间件进行选型时,也会考虑到多方面的因素。除了消息中间件本身所能提供的能力外,也包括服务性能、稳定性、可扩展能力,以及需要结合开发团队自身的技术栈等情况。最后如果大家想了解更多阿里云消息队列RocketMQ版。可以参考下面的链接:
https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.2add192aWINJ9c

《阿里云原生产品手册》上线,本电子书聚焦云原生12款核心产品,覆盖容器产品、微服务产品、消息中间件产品、Serverless产品等,内容包括每款产品的核心亮点、解决问题、客户案例、常见问题等,展示最全面的云原生产品与行业应用,为企业云原生上云和容器化改造提供思路和指引。
下载链接:https://developer.aliyun.com/topic/download?id=1000

【更多精彩】

1.中间件爆款一折起,还有阿里巴巴十年最佳实践深度解密,点击马上了解:https://www.aliyun.com/activity/daily/commercial?spm=5176.20960838.0.0.6a54305etoEn4D

2.【填问卷领淘公仔】点击马上填写问卷:
https://survey.aliyun.com/apps/zhiliao/YmW95Gk8bU

【加入行业实战交流钉钉群】

阿里云专门成立了“互联网架构升级实战课”钉钉群,每周邀请一位阿里云专家在群内进行行业最佳实践直播,每天分享行业前沿干货,钉钉扫码马上加入。

image.png

相关文章
RS-485网络中的标准端接与交流电端接应用解析
RS-485,作为一种广泛应用的差分信号传输标准,因其传输距离远、抗干扰能力强、支持多点通讯等优点,在工业自动化、智能建筑、交通运输等领域得到了广泛应用。在构建RS-485网络时,端接技术扮演着至关重要的角色,它直接影响到网络的信号完整性、稳定性和通信质量。
|
12天前
|
编解码 前端开发 UED
探索无界:前端开发中的响应式设计深度解析与实践####
【10月更文挑战第29天】 本文深入探讨了响应式设计的核心理念,即通过灵活的布局、媒体查询及弹性图片等技术手段,使网站能够在不同设备上提供一致且优质的用户体验。不同于传统摘要概述,本文将以一次具体项目实践为引,逐步剖析响应式设计的关键技术点,分享实战经验与避坑指南,旨在为前端开发者提供一套实用的响应式设计方法论。 ####
37 4
|
11天前
|
自然语言处理 并行计算 数据可视化
免费开源法律文档比对工具:技术解析与应用
这款免费开源的法律文档比对工具,利用先进的文本分析和自然语言处理技术,实现高效、精准的文档比对。核心功能包括文本差异检测、多格式支持、语义分析、批量处理及用户友好的可视化界面,广泛适用于法律行业的各类场景。
|
13天前
|
安全 编译器 PHP
PHP 8新特性解析与实践应用####
————探索PHP 8的创新功能及其在现代Web开发中的实际应用
|
5天前
|
存储 供应链 物联网
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
|
5天前
|
存储 供应链 安全
深度解析区块链技术的核心原理与应用前景
深度解析区块链技术的核心原理与应用前景
13 0
|
9天前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
22 0
|
12天前
|
前端开发 中间件 PHP
PHP框架深度解析:Laravel的魔力与实战应用####
【10月更文挑战第31天】 本文作为一篇技术深度好文,旨在揭开PHP领域璀璨明星——Laravel框架的神秘面纱。不同于常规摘要的概括性介绍,本文将直接以一段引人入胜的技术剖析开场,随后通过具体代码示例和实战案例,逐步引导读者领略Laravel在简化开发流程、提升代码质量及促进团队协作方面的卓越能力。无论你是PHP初学者渴望深入了解现代开发范式,还是经验丰富的开发者寻求优化项目架构的灵感,本文都将为你提供宝贵的见解与实践指导。 ####
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

推荐镜像

更多