开发者学堂课程【消息队列 RocketMQ 消息集成:【视频】顺序消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/1189/detail/18101
【视频】顺序消息
内容介绍:
一、背景介绍:RocketMQ 顺序消息简介
二、功能原理:RocketMQ 顺序消息的原理及实践
三、最佳实践:RocketMQ 顺序消息应用实例
四、实战:顺序消息收发样例
一、背景介绍:RocketMQ 顺序消息简介
顺序消息也是多类型业务消息中比较重要的一块。
顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。同一 MessageGroup 的消息按照严格的先进先出(FIFO) 原则进行发布和消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求。实际上可以看到这其实是一个偏序。因此需做到两点,发送的顺序性和消费的顺序性。
Producer 发送A1,B1,A2,B2。
这里的 A,B 指的是 MessageGroup1 和 MessageGroup2。也就是 A1,A2是一组,B1,B2则是另外一组。Consumer 就会按照这两组,并且会按照组内的顺序进行消费。
其实发送顺序主要是包括两块,一个是发送,另一个则是消费。会围绕这两块方面进行阐述。
二、功能原理:RocketMQ 顺序消息的原理及实践
1、生命周期
在详细介绍顺序消费的发送和消费之前,首先来了解它的生命周期。它的生命周期其实和普通消息的生命周期比较相似,都是包括五块内容。
(1)初始化:顺序消息被生产者构建初始化完成,待发送至到服务端的状态。
(2)待消费:消息被传输到服务端,对下游可见,等待消费费者获取处理的状态。
(3)消费中:消息被消费者获取,并按照业务逻辑处理过程程,此时服务端会等待消费完成,如果一定时间后没有
收到消费提交的事件,消息还会重试处理。
(4)消费提交:消费者完成消息处理,并提交应答事件到月服务端,服务端标记当前消息已经被处理(包括消费成
功和失败)。RocketMQ 默认支持所有消息保留,此时消息数据并不会立即被删除,只是逻辑标记完成,在消息被物理删除之前,消费者仍然可以回溯重新处理消消息。
(5)消息删除: RocketMQ 按照消息保存时间机制滚动清理最早的消息数据,将消息从物理文件中删除。
这里与普通消息不同的点就在于顺序消息是一条或者是一批地被拉取到客户,拉取到消费者客户端。当前一批消息未被完全的处理完时,后一批消息并不能被推送到消费者,也就是说拉取和推送是取决于消费完成的结果的。
2、RocketMQ 顺序消息的发送
对于顺序消息的发送,可以先谈论在分布式环境下顺序的概念。
在分布式环境下,保证消息的全局顺序性是十分困难的, 例如两个 RocketMQ ProducerA 与 ProducerB,它们在没有沟通的情况下各自向 RocketMO 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证a和b的顺序。
无法定义 a 在 b 之前,或 b 在 a 之前。这个值对于客户来说是未知的。
因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。
如下图所示,ProducerA 会发送A1,A2,B1.B2; ProducerB 发送 C1,C2。
同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同线程下的 Producer 进行处理,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端先发送一条A1。在得到上一条消息A1的发送结果后再发送下一条A2。通过这样的服务端返回给客户端的结果,客户端就知道了A1和A2的逻辑关系。此时发送A2,如果A2成功了,那么A2就在A1之后。即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。实际上是通过同步发送的机制去保证发送的顺序性。分 MessageGroup 实际上是提高了它的并发程度和可突然性,也就是限制只是对同一个 MessageGroup,对于不同的 MessageGroup 显然是不需要同步反馈的结果。
因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式实际上隐式降低了消息吞吐。
3、RocketMQ 顺序消息的消费
顺序消息的消费需要保证同一时刻只有一个客户端或者说只有客户端的一个线程来消费同一个 MessaaeGroup 的消息。并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,如果可以一步并发下面的消息的话,消费的顺序性将得不到保证。
因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。
如图所示,consumer 如果有两个线程,那么它会分别消费 MessageGroup C 和 MessageGroup D。可以看到 C 和 D 之间其实是一个异步并发消费,并不存在顺序的关联性。MessageGroup 下的顺序其实是一个偏序。如果拿普通消息进行对比的话,普通消息实际上对消费没有任何的限制,都是完全的异步并发的。顺序消息顾名思义,顺序的特点就是要必须保证一个先后顺序消费的意义。
4、RocketMQ 顺序消息总结
对此把顺序消费和普通消费进行一个对比。
|
顺序消息 |
普通消息 |
原子性 |
消息之间存在偏序性 |
消息之间没有关联关系 |
顺序性 |
严格有序 |
大致有序 |
扩展性 |
具备一定的可拓展性 |
可拓展性较强 |
吞吐 |
理论较高,实际受到收发限制 |
较高 |
并发单元 |
同一 MessageGroup 的消息 |
单条消息 |
编码要求 |
相对较高 |
低 |
(1)原子性
首先是普通消息没有任何关联关系,顺利消息的消息之间存在一个偏序关系。也就是说同一个 MessageGroup下保证一个顺序。
(2)顺序性
其次是顺序性。顺序消息当然是严格有序的,普通消息实际上会保证一个大概有序,但是大致有序的是没有一个严格保证的,在一些情况下会被破坏掉。
(3)拓展性
由于之前提到的一个 MessageGroup 的一个限制,它的可拓展性相比较普通消息的较差,但是由于不同 MessageGroup 之间它是变更一个逻辑关系的,所以它还能够具备一定的可拓展性。
(4)吞吐
其实理论上顺序消息的吞吐量还是挺高的,实际上刚才提到的一个发送和消费的一个介绍,可以看到顺序消费增量实际上是受到收发的一个限制发送的就是这个同步发送的限制,消费只是取决于用户自身的一个消费逻辑,如果用自身消费逻辑卡的比较久的话,那么它的消费吞吐量也会受到表达限制。普通消息的由于没有任何的一个拓展逻辑和关联关系,所以它的吞吐量是较高的。
(5)并发单元
接下来是并发单元,可以看到顺应消息的一个并发单元,实际上就是同一个 MessageGroup 或者同一组 MessageGroup 是一个地方单元,也就是说一个 MessageGroup下它是一个顺序单线程的。而对于普通消息可以看到,可以认为普通消息的并发基本单元是单独要写,或者说它是一种特殊的,每条消息都有不同的 MessageGroup 的顺序消息,是一种特殊性消息。
因此综上所述,顺利消息其实对用户的编码要求会比较高,对于那个普通消息的相对较低。之所以这么说,可以后面结合案例和这家实验来做一个具体的看。这里的编码要求,实际上更多的是指的是一个 MessageGroup 的选择以及消费逻辑的一个设计。
(6)总结
最后对于发送和消费在功能原理上做一个总结。无论对于发送还是消费,都会通过 MessageGroup 来将相息进行分组,这个分组实际上就是构建了一个单线程的一个环境来保证消息的这个发送和消费的一个顺序性。实际上也是构建了一个逻辑上的一个先入先出的一个分子概念。然后剩余消息的并发的基本单元是MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度的具备了可突然性。当然可突然性也就是说一定程度支持多队列存储、水平拆分、支持并发消费,并且可以不受到影响。
三、最大实践:RocketMQ 顺序消息应用实例
1、经典案例
做完了功能和原理的介绍,现在来看一个最大实践。首先来看两个比较经典的案例。
(1)用户注册
用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
首先要注册,然后再登录,这就是会有一个发布的先后顺序。当然不同的用户之间的顺序当然是无所谓的,因为是以用户 ID 作为 MessageGroup。
(2)电商的订单创建
其次是电商的一个订单创建,通过下面这个图也可以看出来。
假设以订单 ID 作为 MessageGroup,那么同一个订单相关的创建支付,退款,物流,这当然是一个逻辑关系的,它的顺序都会按照发布的先后顺序来消费。
2、最大实践
(1)合理设置 MessageGroup
首先就是需要合理的去设置的 MessageGroup,在实际的 case 上 MessageGroup 会有很多错误的选择。以某电商平台的一个使用为例子。某电商平台会将商家的 ID 作为 MessageGroup,那么可以看到商家规模是有大有小的,部分规模较大的商家都会产生较多订单,那么这批较多订单就会强行保证一个顺序。那么由于下游的一个消费的一个限制,因为要一条一条进行处理,那么这部分商家对应的订单就会发生严重的堆积。问题出来之后,并不能够将商家的ID来作为 MessageGroup,而应当将订单号来作为 MessageGroup。而且站在背后的业务逻辑上来说,本来就是同一个订单才有一个顺序要求,同一个商家的不同订单实际上是可以并发去进行处理的。因此可以看到合理选择 MessageGroup。如何去合理选择?
首先最好就是 MessageGroup 的生命周期比较短暂。比如订单ID,它是一个比较持续十几分钟或者持续一段时间的一个生命周期的例子。还有一点就是不同的MessageGroupgroup 的数量应该尽量均衡,也就是它们的数量不能差太多。像之前那个错误 case,商家 ID 显然不适合做一个合理的 MessageGroup,因为不同 MessageGroup 的数量会差比较多,比如较大的商家可能不同的 MessageGroup 的消息比较多。
(2)同步发送和消息重试
其次也提到了一定要使用同步发送和发送重试。
①同步发送
使用同步发送,才能保证同一 MessageGroup 的消息是能够同时发送。如果使用多线程发送,实际上是不能保证发送的顺序性的。
②发送重试
其次是发送重试,发送重试当然要犯错误。如果不做发送重试,消费失败不进行处理的话,当然消息都未能成功地发送到服务端,那也就无从保证发送的顺序性了。
(3)消费幂等
相信常见的可能会出现少量重复,如果业务不能正常地去处理这个重复,顺序性可能会得到破坏,因此业务消费首先要做好消费幂等,避免重复处理带来的一些风险。所以看到顺序消息并不是表面上看到的是一个普通消息的完全加强版,实际上它是在普通消息的基础上做过取舍和权衡的。因此还是需要合理的根据自己的业务场景来对消息序列进行选型。同时对于用户的编码有些要求。比如不能错误地使用那些一直不发送,消费逻辑也应该注意。因为如果一旦上面的逻辑卡住的话,同一 MessageGroup 下的顺序都会同样被卡住。因此就会造成一些影响拉取、影响消费、影响吞吐量的一些后果。
四、实战:顺序消息发送样例
在例子上首先设置了一个配置,其次设置了对应的 topic,但最重要的是总体的 MessageGroup。因为 MessageGroup 是核心概念,决定了消息的发送顺序。可以看到这里采用的是同步发送,当发送者和客户端拿到服务端响应的结果之后,才会进行下一步的处理。
这是一个顺序消费的样例,首先要进行拉取。其实这是被业务逻辑时忽略掉的。当要拉取消息,并且对这个消息进行主场的处理,但是为一条条的处理。消息处理之后拿到消息然后会丢到自己的一个业务处理逻辑中。处理完之后会执行一个全逻辑,标记上消息以及正常的消费。至此整个消费流程就完成了。
发送的代码:
publicclassProducerFifoMessageExample{
privatestaticfinalLoggerLOGGER=
LoggerFactory.getLogger(ProducerFifoMessageExample.class);
privateProducerFifoMessageExample(){
}
publicstaticvoidmain(String[]args)throwsClientException,
IOException{
finalClientServiceProviderprovider=
ClientServiceProvider.loadService();
//Credentialproviderisoptionalforclientconfiguration.
StringaccessKey="yourAccessKey";
StringsecretKey="yourSecretKey";
SessionCredentialsProvidersessionCredentialsProvider=
newStaticSessionCredentialsProvider(accessKey,
secretKey);
Stringendpoints="foobar.com:8080";
ClientConfigurationclientConfiguration=
ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
Stringtopic="yourFifoTopic";
finalProducerproducer=provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
//Setthetopicname(s),whichisoptional.Itmakesproducer
couldprefetchthetopicroutebefore
//messagepublishing.
.setTopics(topic)
//Maythrow{@linkClientException}iftheproducerisnot
initialized.
.build();
//Defineyourmessagebody.
消费的代码:
publicclassSimpleConsumerExample{
privatestaticfinalLoggerLOGGER=
LoggerFactory.getLogger(SimpleConsumerExample.class);
privateSimpleConsumerExample(){
}
publicstaticvoidmain(String[]args)throwsClientException,
IOException{
finalClientServiceProviderprovider=
ClientServiceProvider.loadService();
//Credentialproviderisoptionalforclientconfiguration.
StringaccessKey="yourAccessKey";
StringsecretKey="yourSecretKey";
SessionCredentialsProvidersessionCredentialsProvider=
newStaticSessionCredentialsProvider(accessKey,
secretKey);
Stringendpoints="foobar.com:8080";
ClientConfigurationclientConfiguration=
ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
StringconsumerGroup="yourConsumerGroup";
DurationawaitDuration=Duration.ofSeconds(30);
Stringtag="yourMessageTagA";
Stringtopic="yourTopic";
FilterExpressionfilterExpression=newFilterExpression(tag,
FilterExpressionType.TAG);
SimpleConsumerconsumer=provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//Settheconsumergroupname.
.setConsumerGroup(consumerGroup)
//setawaitdurationforlong-polling.
.setAwaitDuration(awaitDuration)
//Setthesubscriptionfortheconsumer.
.setSubscriptionExpressions(Collections.singletonMap(topi
c,filterExpression))
.build();
//Maxmessagenumforeachlongpolling.
intmaxMessageNum=16;
//Setmessageinvisibledurationafteritisreceived.
DurationinvisibleDuration=Duration.ofSeconds(5);
finalList<MessageView>messages=
consumer.receive(maxMessageNum,invisibleDuration);
for(MessageViewmessage:messages){
try{
consumer.ack(message);
}catch(Throwablet){
LOGGER.error("Failedtoacknowledgemessage,
messageId={}",message.getMessageId(),t);