消息队列——延时消息应用解析及实践

简介: 在大部分场景下业务系统如果只需要实现异步解耦、削峰填谷等能力,常规的普通消息就可以满足此类需求。除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况。这就需要消息队列服务本身支持一些特殊的消息类型,或者开发者通过开发一些定制化的代码实现目的。

作者:阿里云解决方案架构师 鹿玄

前言

消息队列服务相信大家一定都不陌生了,在很多应用系统中,都有一些场景会使用到消息队列服务,简单来说,我们可以把消息队列比作是一个存放消息的容器,上游发送端将消息发送到消息队列,下游消费端从消息队列里消费消息。消息队列是分布式系统中重要的组件,核心作用可以帮助我们实现异步、解耦以及削峰,从而提高系统性能和稳定性。

在大部分场景下业务系统如果只需要实现异步解耦、削峰填谷等能力,常规的普通消息就可以满足此类需求。除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况。这就需要消息队列服务本身支持一些特殊的消息类型,或者开发者通过开发一些定制化的代码实现目的。这里我们列举在使用消息队列过程中几种特殊场景的例子:

顺序消费场景

生产者按照一定的先后顺序发布消息,消费者按照既定的先后顺序消费消息,即先发布的消息一定会先被客户端消费。

分布式事务场景

分布式架构下,随着系统的演进,数据库也进行了垂直拆分,如果选择使用消息队列进行上下游解耦的话,生产者和消费者需要保证数据一致性。

延时消费场景

生产者将消息发送到消息队列后,并不期望立马投递这条消息,而是推迟到某个时间点之后将消息投递给消费者进行消费。

对于顺序消息和事务消息,这里就不进行详细介绍了,大家有兴趣可以自行研究,本文后续内容会和大家一起详细讨论下延时消息更多的细节及应用场景。

延时消息介绍

延时(定时)消息的特点就是发送者成功发送一条消息后,这条消息并不会马上被消费者消费,而是在某个特定的时间或者延迟一段时间后,消息才被消费者可见并进行后续的消费,延时消息整个生命周期可以用如下示意图来表示:

image.png

  1. 消息发布者将一条延时消息发送到消息队列服务端;
  2. 在预计投递时间未到之前,消息对消费者不可见,消费者此时无法立刻消费;
  3. 投递时间到达后,消息才对消费者可见,消费者此时可以消费;
  4. 消费者获取此条消息并进行消费;
  5. 消费者成功消费后,进行确认,此条消息将不再被消费。

延时消息应用场景

交易场景

在生产者和消费者有时间窗口的要求下,我们可以考虑使用延时消息。如在电商交易场景下,交易中超时未支付的订单需要被关闭的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后,需要判断对应的订单是否已完成支付;如支付未完成,则关闭订单。

image.png

游戏场景

再比如在游戏社区里,游戏运营方经常会发起一些活动,玩家在活动期间内按照规则完成一系列任务,活动时间截止后,游戏后台根据玩家完成任务的情况进行判定,发送系统通知或者进行rank排名并派发奖励等。

image.png

此种场景也可以采用延时消息来实现,上游系统发布活动公告后,同时发送一条延时消息,延时时间设置为活动周期的时间。当活动截止后,下游系统可以随即消费消息并进行相应的逻辑处理。

其他场景

同时延时消息也可以广泛应用于信息提醒等比较通用的场景。

如何实现延时消息

介绍完延时消息的一些概念及应用场景后,我们接下来分析一下目前比较主流的几款开源消息中间件对延时消息的支持情况以及实现方式。

Kafka

原生Kafka默认是不支持延时消息的,需要开发者自己实现一层代理服务,比如发送端将消息发送到延时Topic,代理服务消费延时Topic的消息然后转存起来,代理服务通过一定的算法,计算延时消息所附带的延时时间是否到达,然后将延时消息取出来并发送到实际的Topic里面,消费端从实际的Topic里面进行消费。

RabbitMQ

RabbitMQ实现延时消息有两种方案,第一种是采用rabbitmq-delayed-message-exchange 插件实现,第二种则是利用DLX(Dead Letter Exchanges)+ TTL(消息存活时间)来间接实现。大致的实现思路如下:

  1. 创建一个普通队列delay_queue,为此队列设置死信交换机 (通过x-dead-letter-exchange参数) 和 RoutingKey (通过x-dead-letter-routing-key参数),生产者将向delay_queue发送延时消息。
  2. 创建步骤1中设置的死信交换机,同时创建一个目标队列 target_queue,并使用步骤1中设置的RoutingKey将两者绑定起来。消费者将从target_queue里面消费延时消息。
  3. 设置消息的存活时间TTL,可以在步骤1中设置到队列级别delay_queue的消息存活时间,或者在发送消息时动态设置消息级别的存活时间。

RocketMQ

开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消息队列服务在启动时,会创建一个内部topic:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列。生产者发送消息时可以设置延时等级,示例代码:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

发送的消息会暂存在Broker对应的内部topic中,再通过定时任务从内部topic中拉取数据,如果延迟时间到了,就会把消息转发到目标topic下,消费者从目标topic消费消息。

阿里云消息队列RocketMQ版

通过上一章节的讨论,我们可以看出目前几款主流的开源消息队列服务,在支持延时消息的场景下或多或少有些不完美的地方。主要体现在以下几点:

  1. Kafka不支持延时消息,需要完全开发代理服务来实现,工作量大。
  2. RabbitMQ需要额外的插件,或者利用DLX+TTL的方式进行中转,实现不是非常直观。
  3. RocketMQ支持延时消息,但是无法支持秒级延时。

那么有没有一款消息队列服务,能够完美的支持延时(定时)消息。本节我们将介绍阿里云消息队列RocketMQ版。

阿里云消息队列RocketMQ版基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。同时支持丰富的消息类型包括普通消息、顺序消息、事务消息以及我们本文讨论的延时消息。接下来我们看下阿里云RocketMQ为延时消息提供的能力及优势:

  1. 支持秒级的延时(定时)消息,同时延时时间可以最大设置为40天,基本满足所有场景。
  2. 延时(定时)消息的投递精度可控制在1~2秒之内。
  3. 延时(定时)消息在某段时间内是对消费者不可见的,从另一个维度看也属于积压的消息,阿里云消息队列RocketMQ版的不同实例规格可以支持亿级的消息积压。
  4. 提供了多语言支持,包括Java、.NET、CC++、GO、Python、PHP、Node.js等

使用阿里云消息队列RocketMQ版收发延时(定时)消息,只需要在控制台创建Topic的时候选择定时/延时消息类型,既可以使用TCP或者http协议进行消息收发。

控制台创建定时/延时Topic

image.png

Java语言示例代码(TCP协议)

  • 发送定时消息
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如2020-03-07 16:21:00投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 发送消息,只要不抛异常就是成功。
SendResult sendResult = producer.send(msg);
  • 发送延时消息
// 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在3秒后投递。
long delayTime = System.currentTimeMillis() + 3000;
// 设置消息需要被投递的时间。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);

同时订阅延时消息的逻辑无需任何改造,完全可以按照订阅普通消息的方式,没有任何的代码侵入性。

结束语

到此我们讨论了延时消息的特性、应用场景,对比了各类消息队列对延时消息的支持情况,同时也向大家介绍了阿里云消息队列RocketMQ版。我们在对消息中间件进行选型时,也会考虑到多方面的因素。除了消息中间件本身所能提供的能力外,也包括服务性能、稳定性、可扩展能力,以及需要结合开发团队自身的技术栈等情况。最后如果大家想了解更多阿里云消息队列RocketMQ版。可以参考下面的链接:
https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.2add192aWINJ9c

《阿里云原生产品手册》上线,本电子书聚焦云原生12款核心产品,覆盖容器产品、微服务产品、消息中间件产品、Serverless产品等,内容包括每款产品的核心亮点、解决问题、客户案例、常见问题等,展示最全面的云原生产品与行业应用,为企业云原生上云和容器化改造提供思路和指引。
下载链接:https://developer.aliyun.com/topic/download?id=1000

【更多精彩】

1.中间件爆款一折起,还有阿里巴巴十年最佳实践深度解密,点击马上了解:https://www.aliyun.com/activity/daily/commercial?spm=5176.20960838.0.0.6a54305etoEn4D

2.【填问卷领淘公仔】点击马上填写问卷:
https://survey.aliyun.com/apps/zhiliao/YmW95Gk8bU

【加入行业实战交流钉钉群】

阿里云专门成立了“互联网架构升级实战课”钉钉群,每周邀请一位阿里云专家在群内进行行业最佳实践直播,每天分享行业前沿干货,钉钉扫码马上加入。

image.png

相关文章
|
4天前
|
机器学习/深度学习 人工智能 自然语言处理
思通数科AI平台在尽职调查中的技术解析与应用
思通数科AI多模态能力平台结合OCR、NLP和深度学习技术,为IPO尽职调查、融资等重要交易环节提供智能化解决方案。平台自动识别、提取并分类海量文档,实现高效数据核验与合规性检查,显著提升审查速度和精准度,同时保障敏感信息管理和数据安全。
35 11
|
1天前
|
编解码 前端开发 UED
探索无界:前端开发中的响应式设计深度解析与实践####
【10月更文挑战第29天】 本文深入探讨了响应式设计的核心理念,即通过灵活的布局、媒体查询及弹性图片等技术手段,使网站能够在不同设备上提供一致且优质的用户体验。不同于传统摘要概述,本文将以一次具体项目实践为引,逐步剖析响应式设计的关键技术点,分享实战经验与避坑指南,旨在为前端开发者提供一套实用的响应式设计方法论。 ####
20 4
|
2天前
|
安全 编译器 PHP
PHP 8新特性解析与实践应用####
————探索PHP 8的创新功能及其在现代Web开发中的实际应用
|
4天前
|
机器学习/深度学习 人工智能 自然语言处理
医疗行业的语音识别技术解析:AI多模态能力平台的应用与架构
AI多模态能力平台通过语音识别技术,实现实时转录医患对话,自动生成结构化数据,提高医疗效率。平台具备强大的环境降噪、语音分离及自然语言处理能力,支持与医院系统无缝集成,广泛应用于门诊记录、多学科会诊和急诊场景,显著提升工作效率和数据准确性。
|
5天前
|
机器学习/深度学习 人工智能 安全
TPAMI:安全强化学习方法、理论与应用综述,慕工大、同济、伯克利等深度解析
【10月更文挑战第27天】强化学习(RL)在实际应用中展现出巨大潜力,但其安全性问题日益凸显。为此,安全强化学习(SRL)应运而生。近日,来自慕尼黑工业大学、同济大学和加州大学伯克利分校的研究人员在《IEEE模式分析与机器智能汇刊》上发表了一篇综述论文,系统介绍了SRL的方法、理论和应用。SRL主要面临安全性定义模糊、探索与利用平衡以及鲁棒性与可靠性等挑战。研究人员提出了基于约束、基于风险和基于监督学习等多种方法来应对这些挑战。
14 2
|
4天前
|
前端开发 JavaScript
JavaScript新纪元:ES6+特性深度解析与实战应用
【10月更文挑战第29天】本文深入解析ES6+的核心特性,包括箭头函数、模板字符串、解构赋值、Promise、模块化和类等,结合实战应用,展示如何利用这些新特性编写更加高效和优雅的代码。
13 0
|
8天前
|
存储 人工智能 大数据
拼多多详情API的价值与应用解析
拼多多作为中国电商市场的重要参与者,其开放平台提供的商品详情API接口为电商行业带来了新的机遇和挑战。该接口允许开发者通过编程方式获取商品的详细信息,包括标题、价格、描述、图片、规格参数和库存等,推动了电商运营的智能化和高效化。本文将深入解析拼多多详情API的价值与应用,帮助商家和开发者更好地理解和利用这一宝贵资源。
17 0
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

推荐镜像

更多