RocketMQ 5.0中的消费者类型和最佳实践|学习笔记

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 快速学习 RocketMQ 5.0中的消费者类型和最佳实践

开发者学堂课程消息队列 RocketMQ 5.0 云原生架构升级课程RocketMQ 5.0 中的消费者类型和最佳实践学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/1234/detail/18403


Rocket MQ5.0中消费者类型和最佳实践

 

内容介绍

一、课前介绍

二、消费者类型介绍

三、Push consumer

四、Simple consumer

五、Pull Consumer

六、三种消费者类型的异处

 

一、课前介绍

这里是阿里云消息队列公开课,然后今天的议题是Rocket MQ的消费者类型以及最佳实践。

首先会向大家去简单的介绍一下就是Rocket MQ5.0中现存的三种不同的 consumer 类型,对此有一个抽象的大概的了解,然后再具体而言,每个版块再具体介绍三个不同的consumer的具体的使用方法以及最佳实践,那接下来就开始具体的内容。

 

二、消费者类型介绍

1、总述

Rocket MQ5.0中,去强化了三种不同的消费者类型,其实在现在的Rocket MQ4.0中也是有消费者类型这样的一个概念的。比如很熟悉的这个 Push Consumer 和 Pull Consumer 已经存在的消费者类型Rocket MQ5.0就更加去强化这个消费者类型的一个概念。 Rocket MQ本身设计是面向一种多样的复杂业务场景,不同的customer类型其实是用来满足不同的业务场景的。

2、Push Consumer

Push Consumer是一种全托管的消费类型,用户其实只需要去注册应的消息监听器就可以了,也是与业务集成最为普遍的一个消费者类型在使用Push Consumer的时候,用户只需要去注册一个 Listener 就可以了, listener 接口去完成自己的消费逻辑。对于从服务端到达客户端具体的消息的情况用户不需要关注,所有的逻辑都已经在客户端本身完成封装了。

3、Simple consumer

Simple Consumer 其实 Rocket MQ5.0中新提出来的一种 customer 类型它去解耦了消息的消费以及消费进度的同步,用户需要自己去主动接受来自于服务端的消息,然后手动去消息进行确认。然后它与 Push Consumer 比较类似的一点是它也是由服务端来托管消费进度的,即同一个 consumer group 下这个 Simple Consumer 的消息消费到何处也是由服务端去进行记录的然后它适用用户需要自主去控制消费速率的这种业务场景。举例而言,其实消息在进入 Listener 的时候是逐条进行投递的,而在 Simple Consumer 用户主动 receive 的时候,可以去批量的进行 receive 的操作因此在用户需要自主去控制速率这种场景下,Simple Consumer 是非常有用处的

4、Pull Consumer

Pull Consumer 在welcomes也是有的,在 Pull Consumer 中用户是以队列的维度去操作这些消息的Rocket MQ中队列是最小的一个逻辑组成单位, Pull Consumer 中所有的消息都是按照队列进行接收的,这样同时又需要用户去关心整个消息的位点和进度的同步,当然具体而言也可以选择自动或者手动的方式来进行提交。相而言 Pull Consumer 是一个需要用户自己去管理的消费者类型,它是在流处理框架中更加常用的一种消费者类型。

接下来会具体深入到每一个Push Consumer给大家展示一下不同的消费者类型,包括它是如何使用的以及它最佳的实践方式。

 

三、Push Consumer

1、基本介绍

Push Consumer 可能是大家最熟悉的一个 consumer 类型,此前也提到它是一种全托管的消费者,全托管的意思就是用户其实是不需要去关心整个消息的接收的,只需要关心本身消息接收过程消费过程就可以了。

image.png

2、使用简介

除此之外,所有逻辑都已经在消费者实践中完成封装,用户只需要根据每一条收到的消息返回不同的消费结果就可以了。所以它是一种最为朴实的消费者类型,用户可以根据自己业务逻辑处理结果的不同,进而返回 success 或者 filer。前者就表现为消费成功,后者表现为消费失败。其实一个 Consumer Group 会有最大消费次数的概念如果消费失败了,按照正常逻辑来说,它会有一个重投的逻辑,会隔一段时间之后重新来到这个消费队列。如果消费次数超过了最大限度的话,就不会再投到 Listener,而是相反就会进入死信队列 Push Consumer 中用户只需要实现 Listener 的消费逻辑就可以了,具体而言其实就是实现这个 Message Listener 的这个 consumer 接口就可以了。

 

3、代码事例

这里是一个 Push Consumer 的一个代码事例,大家可以看到首先只需要去设置对应的订阅关系,其中 topic 以及订阅的是否为 tag 订阅,然后去注册 Message Listener 这样的接口,启动 Push Consumer 其实就可以了

实际上在 Push Consumer 的实现,用户其实不需要关心消息的拉取,但是这并不代表拉取逻辑是不存在的实际上对于无法确定如何实现这一方面,它是将这个逻辑给删除了,同时为了保证消息到达客户端的及时性其实本身客户端也做了一些服务端消息的缓存,比如消息可能是先从服务端拉到客户端在客户端进行缓存,缓存完之后再进入 Listener 进行消费的。

这个时候为了避免缓存的消息过多而导致客户端内存泄露,也提供了一些参数,用户可以自己去调节比如可以是消息本身在客户端里面缓存最多的条数,也可以最多缓存字节数量,这个都是用户可以自行调整的然后可以看到 Push Consumer Build对应的方法比如Set Max Cached 的大小、数量以及 Bytes,一个是最多能缓存多少条消息,一个是最多能缓存多少自己的消息只要获得其中一个条件,客户端就不会再去缓存消息了,直到这个消息被消费成功,缓存有所减少,它才会重新的去缓存来自于服务端的消息。

image.png

 

4、最佳实践

(1)首先用户只需要实现这个 Listener 中的逻辑就可以了在绝大数情况下,Message Listener 其实是应该返回消费成功的,而如果有大量的消费失败,这是一个不会被 Push Consumer 所推崇的做法。

(2)另外一点就是尽量去捕获任何可能在消息消费过程中发生的一个异常,让一系列行为变得可预期

(3)不宜长期 block 消费线程,因为实际上在 Push Consumer 消费线程的数目也是有限的。对于一些消费耗时长的业务场景,比如有的用户可能到一条消息之后,可能就会经历一个比较长的消费逻辑去操作,这时候可以先行去提交消费成功,再一步的进行业务处理。即在拿到一条消息之后,可以把这条消息丢到一个单独的业务形成池里面进行处理,而 Push Consumer Listener返回一个消费成功本身这条消息不会占用这个消费线程,同时这个消息也能得到比较好的处理

(4)最后一点就是在 Push Consumer ,其实消费线程也是可以进行调整的。对于一些 IO 密集型的应用,比如可能没办法去做,也有可能没有办法去做一个很好的一化处理。这个时候可以适当的去调大一些消费线程数,比如消费时间就是比较长,调大了消费线程数之后,就可以很大程度上提高消费速度,但这个要根据用户自己的业务需求以及机器的配置来确定具体的消费线程数。

 

四、Simple consumer

image.png

1、基本介绍

Simple consumer 是在 Rocket MQ5.0中新引入的一种消费者类型,它是一种半托管的消费者,需要使用者自行去调用 Simple consumer#receive 方法获取来自于服务端的消息,并且根据自己的业务需求来选择是调用 ack 方法还是 change Invisible Duration 的方法。对于ack 的方法和change Invisible Duration 的方法,会进行介绍进而帮助大家理解。

2、方法介绍

receive方法是通过长轮询接来自于服务端的消息,具体的长轮询时间可以使用 simple computer builder#set Await Duration来进行设置。

而且用户在收到对应的消息之后,需要根据自己业务处理结果的不同消息进行不同的处理,也就是对应上面的 ack 和进行change Invisible Duration 的方法相比较于Push Consumer 而言Simple Consumer 用户可以自主的去控制整个消息接收的节奏比如用户 receive 的时候可以选择应的 batch 大小,用户可以批量的进行处理选择。

Simple Comer 实际上每次消息的接收都是按照 topic 的虚拟分区来发起请求的,比如用户群体发起一次 receive请求,可能是针一个分区的具体分区可能会是有一个算法。如果实际的 topic 分区会比较多,这个时候其实比较建议使用 Simple Comer 的时候去并发的调用这个 receive 方法,来保证消息的及时性有的消息可能分布在这个分区,有的消息可能分布在另一个分区,如果这个时候去并发调用的话,可以保证比较好的及时性。另外的话就是在并发 Simple consume  receive 方法的时候,也需要考虑到整个业务逻辑处理能力,所以具体到底开多少的并发度,需要综合自己的业务处理能力。

image.png 

在这里 simple consumer ack 方法和 change Invisible Duration 方法做一个大致的讲解。比如有一个simple consumer, Consumer Group 于消息最大的消费数是三条,用户在使用receive 方法收到第一条消息之后,这个消息的消费数当然是一了receive 方法其实本质上会有一个 Invisible Duration 的设置,在调研 receive 方法的时候其实是可以看到的它实际上就是一个消息的一个不可见时间,如果用户在不可见时间之内没有做任何操作的话,这个消息就会进行二次的重

重投之后消息的消费次数就变成了二,用户这个时候可以选择 ack 消息,或者是选择调用 change Invisible Duration 的方法如果用户这个时候选择了ack的话,就代表这条消息被消费成功了,那它的位点也就会被同步到服务端。如果这个时候用户选择了 change Invisible Duration 的话,相当于这个消息的这个不可见时间又给用户修改,直到这个时间过完之后,消息才会进行下一次的重投。

假如用户这个时候使用了 change Invisible Duration 的方法,然后消息已经重到了第三次,这时候的消费次数是三的话,可以注意到消息的最大的消费次数已经三条了,这个时候用户也可以选择去调用 ack 方法或者是调用 change Invisible Duration 的方法,如果用户这时候选择用ack 方法的话,相当于被消费成功了,这条消息的生命周期也就结束了。如果用户这个时候选择调用change Invisible Duration 的方法,这时候其实它的效果PushConsumer里面的返回消费失败是类似的,这时候相当于就要进入死队列了因为这个时候其实已经是消息的第三次投递了,Consumer Group 设计的最大消费次数已经是相同的了,所以可以看到用户使用 change Invisible Duration 接口里面的Invisible Duration 其实和 receive 里面的 Invisible Duration 是一个概念,本质上是消息一个不可见的时间窗口超过了这个时间窗口,如果消息还没有到达它的最大的消费次数的话,这条消息就会进行再一次的重投,否则的话它就会进入队列。

这就是simple consumer 里面的ack 方法和 change Invisible Duration方法的一个简单的介绍。那接下来总结一下simple consumer 的最佳实践。

3、最佳实践

(1)Simple Consumer 需要收到的消息进行逐条的确认或者修改下次可见时间关于这里的逐条确认或者修改下次可见时间可以把它认为是 Push Consumer 的消费成功或者失败来进行处理

(2)关于每条消息 ack 方法和change Invisible Duration,需要在消息最新的可见时间到来之前进行调用。此前说到这个Invisible Duration,其实它相当于一个不可见的时间窗口,ack 方法和 Invisible Duration,如果是针当前收到这一次消息进行调用的话,需要在最新的可见时间到来之前进行调用。

(3)第三点就是前面提到的simple consumer,其实 receive 的时候是会按照分区来进行调用的。如果想要保证消息接的及时性,需要去有针性的去增大调用 receive 方法的并发度,但具体而言要根据这个消息的处理能力进行并发调用。

(4)最后一点,就是 Simple Consumer 于一些用户需要自主去控制消息获取速率或者批量处理消息业务场景里面,它会是一个比 Push Consumer 更好的选择。因为这个Push Consumer里面的 Listener 是面向于单调消息去做的,而Simple Consumer 用户可以去控制bytes,而且是自主的去进行消息接受的

 

五、Pull Consumer

1、基本介绍

Pull Consumer 其实也是在Rocket MQ4.0里面就已经存在的一种消费者类型,也是一直以来都支持的一种消费者类型。Rocket MQ5.0全新的 Pull Consumer API还在演进中,敬情期待接下来的内容会Rocket MQ4.0里面的 Pull Consumer API来进行讲述。

2、使用原理

Rocket MQ 中无论是消息的发送和接收,都是通过队列来进行的一个 topic若干个队列组成,消息本身也是按照队列的形式来进行组织的队列天然它会有这种先进先出的特性,同一个队列中的消息会拥有不同的位点,先到达这个队列的消息就会拥有比后到达这个队列消息更小的位点。

所以说位点的大小是跟随消息到达服务的时间逐次递增的,而且本质上不同 Consumer Group 在服务端的消费进度就是一个个立中的位点信息,客户端将自己的消费进度同步给服务端,本质上就是在提交一个个消息的位点。

3、对应接口方法

接下来来看一下就是Rocket MQ4.0 pull consumer 对应的接口中主要的方法。

(1)注册消息路由

首先它会有一个注册消息路由监听器的接口,在这个接口里面,用户需要填应的 topic,然后以及实现应的这样的一个界定级方法。

这个方法就是说用户如果想要去订阅某一个 topic 的消息,这个时候因为所有的这种请求都是针于队列进行的,用户需要去感知这个队列的变化,这个时候用户就可以通过使用这个接口来达到目的。

(2)assign 方法

另外的话它还会有 assign 方法,就相当于说拿到队列之后,这个队列绑定到目前的 consumer如果要消费哪个队的消息,就将这个队列绑定到当前的 consumer,然后再使用这个 pull方法来进行消息详细的过序。因为这些消息需要去同步位点,如果想去查询服务端现在队列应的消费位点已经到了哪一步,这时候就可以调用应的 Commit 方法来获取每一个队列提交到服务端的位点。同步也未必是全部手动的,也可以将它设置成自动的模式,这个时候可以使用 set Auto Commit 方法去选择自动提交位点所谓的自动提交位点就是说消息一旦到达客户端就认为位点就被提交了,被客户端消费了,也可以去选择去自己手动的提交位点,这时候也就是个Commit Sync 的方法。

所以 Pull Consumer 是根据队列来获取消息,然后再通过位点去更新消费进度的消费者类型。在 Pull Consumer 里面,将列这个概念完整地暴露给客户,用户可以根据自己关系的topic设置鉴定器感知路由的变化,然后去将它绑定给当前的消费者。当用户使用pull这个方法的时候,它其实就是在从已经 assign好的队列里面去获取消息。如果设置Auto Commit 的话,客户端就会自动进行位点的提交消费进度这样的同步,否则用户也可以选择去手动进行提交。

4、总结

可以看到整个 Pull Consumer 它是面向于队列去设计的,也利用到了队列先进先出特性,整个复杂度相来说比 Push Consumer  Simple Consumer还更高的,所以 Pull Consumer相来说,它会在这个流计算框架里面应用的比较广泛的。

Pull Consumer是用户自己进行管理的一种消费类型,需要手动的去获取消息并提交消费进度。而且 Pull Consumer 是以流计算框架进行集成的一个首选的消费类型,获取消息以及为您提交更多的实际是由计算框架来进行决定的。

 

六、三种消费者类型的异处

通过今天的内容了解到 Rocket MQ 三种不同的消费者类型,分别是Push ConsumerPull Consumer 以及 Simple Consumer每一种消费者类型的设计其实都是为了满足不同的业务场景。

比如Push Consumer 它就是最朴实的一种业务处理场景Simple Consumer更面向于用户需要自己去控制消费进度消费数据的业务场景,Pull Consumer它更适用于一种与计算相结合的这样的业务场景,每一种不同的 Consumer 都有自己独到的业务场景。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
22天前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4天前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4天前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
5天前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
15 0
|
5天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
13 0
|
5天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
11 0
|
5天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
10 0
|
5天前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
14 0
|
5天前
|
消息中间件 RocketMQ
RocketMQ - 生产者最佳实践总结
RocketMQ - 生产者最佳实践总结
11 0
|
22天前
|
消息中间件 API RocketMQ
消息队列 MQ使用问题之消息在没有消费者的情况下丢失,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。