【视频】普通消息 | 学习笔记

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 快速学习【视频】普通消息

开发者学堂课程【消息队列 RocketMQ 消息集成【视频】普通消息】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/1189/detail/18098


【视频】普通消息

 

内容介绍:

一、方向探讨

二、原理实践

三、产品预告

 

一、方向探讨


RocketMQetMQ 消息集成多类型业务消息专题,今天主要的重点是普通消息。那什么是业务集成,什么是数据集成,这里列了一个对比,

image.png

首先业务消息以应用为中心,在做业务的核心架构的时候,其实很多时候都面向的上层需求去完成逻辑的设计,比如以电商交易产品为例,更多的是关注用户买了一个订单的一个流转过程,这个时候通过微服务的拆分,可能在整个链路中会拆成很多个环节。这个时候不同应用之间,它通过消息去集成的时候更多人关注业务逻辑处理,然后比一下,数据集成它是以数据为中心。其实它更多的是关注于刚才学习的业务基层所产生的数据的一个价值分析,它根本不关心数据是从哪里来的,它只关心数据本身的一个属性和它的一个数据之间的关系。

第二点说链路的一个多样性,在业务阶层里面随着的一个企业的业务逻辑,或者是业务领域的布局,拓宽和复杂度的一个提升,其实在调用和被调用方之间的耦合性,或者是说链路的拓扑也会变得越来越复杂。经常会出现一条消息的上游是另一条消息的下游,一个服务可能既是发送方也是消费方等等。同时在数据集成的场景里面,它更多的是关注数据多样性,也就是说。在做数据集成分析。它更多的是从各种异构的数据源里去提取去汇聚这些数据,比如说会从常说的日志,数据库或者是一些三方的系统接口的一个调用的情况,去把这些业务的系统数据聚合在一起做清洗,清洗之后,然后汇聚成需要去做结构化的分析,或者是一些报表之类的东西,它更多是关注数据的异构系统多样性。

同时第三点,也是业务进程和数据集成里面非常 SVR 或者服务的一个质量的一个重点的一个差异,就是说业务阶层里面可能很容易理解。就是它更多的是一种在线的一个逻辑,或者是巧实施逻辑,也就是说在业务集成领域可能服务之间的调用,无论你是同步调用还是异步调用,都调用和被调用之间的响应协同机制有一定的要求,举个例子,一个订单的处理必须肯定是要在毫秒级完成的,否则的话,客户的体验会非常的差,然后调用的延迟就非常敏感,但是在数据集成领域,更多的可能是近实时甚至是离线非实时的场景,也就是说通过批、实时流或近实时流的场景去爬取数据之后做分析,具体链路于用户来说并不是可见的,这也是数据集成和业务集成侧重点的差异。那么今天学习 RocketMQetMQ 作为业务集成的一个属性。

image.png

首先,就是业务集成,如上图,也是在企业提升模式的一个书里面,图中有参考链接。图里面了解说消息队列是企业业务进程的一个主要模式之一,就是它是一种应用模式,应用模式其实就是提供高可靠可观测的通讯能力。非常复杂的业务的一个集成的业务里面使用消息之后会是什么一个现象。比如上图就是一个比较典型的上层的应用链路,从应用A到下层的应用B的一个单链路,通过发送初始化或者结构化一个消息,作为调用事件发送到事件通道,这个通道就是消息系统,比如RocketMQetMQRabbitMQ 等。在时间通道里存储后通过过滤路由的分发组件匹配到下游,然后推送处理。与此同时,还会有可观测、运维、监控的一些体系去支撑这个链路的可靠运行。那在链路中,它是个非常抽象的链路,它整个消息的一个系统的一个要求是什么。

这里提了四个比较核心的观点,第一点就是它消息类型,或者说支持传输方式有很多多样化的诉求。它并不是简单来说就只是传出的信息就完事。很多时候,随着链路的一个复杂或者是业务及产品的需求,比如说它会延伸出一种。比如延时投递、定时消息,还有一些说为了保证说消息的处理和一些业务逻辑处理是不是一些事物的一个一致性的一个保障,同时于某些场景里面,它需要不同的两个消息事件之间可能需要比如说订单的一个事件,比如支付和和发货,类似逻辑或者是一个用户之间必须要有顺序的相互性,如果顺序乱了下游的业务处理是非常麻烦的,其实第一个点就是说,如果在做业务集成的领域里面传输类型,数据类型的要求非常多,如果消息系统不能做,那就业务系统自己去做。其实是非常大的一个集成的一个成本。

第二点,也就是说它是有一个丰富的路由分发的一个诉求。就像图的开关说的,其实很多时候并不是说都是点点的通信,比如就上游直接明确发给下游,是说上游和下游做一个深度的解耦,也就是上游发完了数据到管道里面去之后,下游的人关心的人来订阅,不关心的人可以不管,就需要的消息系统提供一种非常丰富的过滤匹配的能力。很多时候不仅仅是基于消息的主题。更多时候可能是基于消息的一些属性。去做一些非常复杂的计算,比如说不仅仅是说正相匹配,甚至反相匹配。或者包含关系,或者排它关系,或者是一些判断条件,去做一个一一或者一多的分发,

那第三点的话就是说消费方式上面也有一个诉求就是通过消息的一个集成的时候,给下游去取消息,它可能会有多样化的需求。举个例子,最简单的一种场景,可能就说下游通过一种被动的推送式的方式,也就是事件通知的方式去处理。数据来了之后,自然就推送给去处理,但这种方式也有它的弊端,比如自身的速率的匹配,或者是它只能同步去处理。怎么异步去处理。这些都是一些多样化的消费的或者是发送方式处理的一个需求,

第四类也是业务消息领域集成里面的非常重要的一点,就是因为刚才讲业务领域它的集成,它更多的是偏向于的一个,比如说每个公司或者每个行业的业务的一个逻辑,很多时候传出的消息都是比如说是订单,支付之类的信息。它是非常高价值的信号和实践。这个时候于整个链路的一种可观测的体系是需要非常完善的,也就是说能够很好地诊断出链路的一个异常。比如一条消息没有正确的处理。是不是知道它在哪里出问题了,那时候如果没有这些体系,那在订单的处理或者业务处理过程中会非常的困难,这时候出现了问题,也很难去排查,那于最终的服务的客户的体验感非常的差,所以在业务集成领域,想说可观测的体系就是说典型的三大件就是MetricsTraceEvents。事件轨迹和指标三大件的一个信息的分析非常重要。那这些核心能力,其实说它是对于业务继承消息中间这样一个强诉求。

image.png

RocketMQetMQ 产品的话题,其实可能知道就是 Apache RocketMQetMQ 最早其实诞生于阿里巴巴的一个电商的系统内部,然后现在是 Apache 社区一个非常活跃的一个销售文件。那么为什么说它是一个业务消息集成的首选方案呢?其实就是对于刚才的这几个核心能力。RocketMQetMQ 提供非常丰富的功能特性,就是消息类型,订阅模式,同时,它还能满足刚才说的业务技术对高性能低延迟高吞吐的常见的一个诉求,以及说刚才说可观测运维的整个体系的一个支撑。

 

原理实践

今天开一个新的专题,就是业务消息类型,对于刚才的集成类型。

image.png

RocketMQetMQ 也是提供了非常多的消息类型,包括普通消息,定时消息,顺序和事务等多种信息的类型,就方便的业务集成方去做它集成模式的一种开发,通过开箱即用的功能的业务就不需要自己再去封装它自己的一种诉求,那今天重点的是普通消息。说到普通消息,名字其实可能并不是特别恰当,它其实并不普通。它其实是 RocketMQ 。它提供高吞吐,可扩展低延迟异步的通信能力。为什么说它是一个基本类型。其实可以说普通消息这个类型,它所具备的通信能力,它是一个基础的,更多的后面的专题会有定时消息,它是在普通消息能力基础上,再叠加了自己的一个独有的高级的一个特性或者是特定的一个使用的一种方式,普通消息的典型错误,就是其实也就是跟刚才消息队列的产品一样,就是生产者发送消息。发送普通消息到服务端去存储,然后存储完了之后,会把消息按照刚才说的订阅关系的匹配,然后推送给下游的消息的消费方做消费,那这时候重点说普通消息自己有什么样的特点。

第一点就是原子性,或者简单点描述一下就是它的消息是一个独立的一个事件。比如说普通消息的任何两条消息之间是没有什么关联关系的。它是任何两条消息都是独立的。那这时候对消息相应的处理不用关心处理第一条消息之后和处理第二条消息之间是不是要去做一些约束和保护,它就不需要,那这个时候它可以很零散很随意,就是这样,拓展性非常强,

第二点就是说拓展性,拓展性指的是什么。就是这张图可以看到,就是可以通过RocketMQ 的存储的一个基调把一个普通消息主体的消息,离散地分布在多个物理的一个存储的队列里面去,这样的好处就是它可以实现单个主题的普通消息的吞吐能力和容量的能力是一个无限的水平扩展,也就是说可能说单个队列都得在一个物理节点。能力是有限的或者存储空间是有限的,但是可以通过无限的加队列方式,加在不同的节点上实现业务对于普通消息的调用。它的容量是不是无限扩展,这个时候可以实现高并发,

甚至是另外一点就是说一个队列当前有一些问题,这时候没关系,它可以接着使用另外一个队列发消息,因为它并不纠结每条消息到底存储在哪里,消息之间是没有任何关系的,这样的话,普通消息可以看到它的一个链路是非常的简单,同时就是整个的状态也非常简单,链路也非常的精简,可以实现很好的高吞吐低延迟的通信。这是它的一个非常好的一个优势,所以也其实也是讲普通消息,它的应用是非常的广泛的,它是可能是 RocketMQ 应用最多的一种消息。那么再来看普通消息刚才讲了它整个链路。这个时候可能也会想说那普通消息从它的初始化发送开始到它最终处理过程中到底经历了哪几个状态和过程,了解生命的消息周期可以帮助判断到底是什么情况,接下来该怎么处理。那这里列了状态。

image.png

一般简单来说,把它抽象成五个状态,第一个状态其实就是初始化的一个状态,就是在生产端,消费的生产端,它把消息按照的业务诉求初始化构建好之后,还没有发送到服务端,这个时候它是一个待发生的一个阶段,

第二个阶段,其实就是可能是说发送到服务端了之后,服务端存下来这个时候,下游就是说等待被消费,就等于给下游可见了,那时候为什么会出现状态呢?因为说消费的本身就是解耦的。生产的速率和消费速率是可以不一样的,也许发送的比较快,这个时候这些消息就处于待消费的状态,还没有来得及被下游去消费,

第三个状态也是非常关键的,就是说当消费者,知道有这条消息的时候,来取这条消息之后,它的处理是有一个过程的,过程中其实这条消息的状态就叫做消费中,过程中其实就是一个客户端的一种行为,就是拿了消息之后,它是需要按照的业务逻辑,因为很多业务逻辑,比如说消费消息是要写出数据库的一个价格表,过程比较缓慢的,这个时候服务端会去等待消费完成,那这个时候也是 RocketMQ 作为业务集成里面的一个关键点,就是会保证消息的一个可靠传输的时候,它是有个应答机制的,比如说会等类似一个拆分表,就是说会等消费看是不是处理,如果实在到一个很长的时间之后都没有处理成功,那这个时候其实就把它当做一个保守的一个状态,就是说可能认为消费就没有成功,那时候就还会进行重试处理。

当然了,消费如果没有很慢,那时候它会出现两种情况,第一种就是。就是成功,就是消费提交,第二个就是消费失败,那就不管怎么样,就是说它都是提交的一个状态,那时候也就是消费者完成消费处理的时候,它会把应答事件提交到服务端去,应答事件会告诉说是成功还是失败,那这个时候如果是成功的话,那服务端就会把它标记一下说消息被处理了。

可以接着处理后面的消息,那这个时候是就能实现可靠的传输,如果它消费失败了,它也会去标记,不管成功失败标记完之后,刚才说的这条物理消息并不会立即删除。那它只是个逻辑标记,RocketMQ 为什么这么设计呢?它是这样的一个设计方式,就是因为首先第一点。RocketMQ 它有一对多的常见,这个时候为了提高系统的吞吐能力,或者是这个时候它并不会为每个消息再存一份副本,那可能都存的是一份副本。之后状态标记去标记出每个消费者是不是处理独立的一个消费状态,好处是一方面节省了成本提高了性能,第二点就是说它能够保证说给一个后悔药的一个计划,就是说因为你是逻辑比较如果业务逻辑因为自身的问题处理时有异常,它想重新处理消息的时候,它还可以通过回溯的一个重置未来的方式重新去处理。那就是业务提供的兜底的一个机制,那最后不管怎么样,那消息难道一直不删除。其实也不是。然后用 RocketMQ 的消息,它是按照存储,它是一个流逝的,也就是说它会先进先出,也就是最早的消息,到一定的的保障机制到达,比如说按时间保存的时候,到达一个保存时间之后,它就会滚动删除。才是真正的物理删除。

可以看从消息的一个初始化发送的服务,进入到待销费,最终删除期间其实有很长的一个过程,在这个过程中有很多的状态,所以在去使用客服消息的时候,要关注这些状态去做业务的,比如诊断或者发现问题的一些过程。

那了解完简单的原理和介绍之后来说普通消息到底用在哪里?就是普通消息RocketMQ 应用最广泛,规模最大。它主要集中在服务间的接口调用,同时还有一些批量的数据的采集传输。这两种产品的比如说订单的场景,上游发一个订单,把订单转化成一个事件,事件转消息,然后到时候 RocketMQ 之后下游比如说的物流系统,积分系统。它是关注订单的变更,那它就会把消息的事件然后去做它的业务逻辑的处理,这个处理就是刚才说业务的一个微服务的调用,或者是一个数据库的一个变更。这个场景下主要利用的解耦能力。通过这个方式,上游它不需要等着下一次处理完成。在这里发消息调用成功之后,就可以认为下单成功了,这样,链路的流程就非常的短,就是提高了响应。

第二点的话也是利用普通消息海量的写入能力,去做一个削峰填谷,比如说下游的时候,它处理的非常慢,然后卡住了,或者是它自己能力不足也没关系,因为 MQ会按照自适应的方式,给它去做一个流通。这个时候不会打垮下游,按照它的最大能力来处理,那时候多了一份消息,也就是说上游发过来的,比如说大促的零点。大量的消息,如果削峰填谷的能力堆在客户端,那这个时候的上游和下游之间,不用说去做一个强硬的容量保障,就能完成整个的一个大促的生命周期的一种保障

image.png

那第二点,就是说的实时数据传输这块,这块跟说的业务不是那么相关,但是它也是一个非常典型的产品,那时候举个例子就是它就是说的采集日志。是前端的采集日志或者埋点日志通过 MQ 去做异步的高吞吐的分发,它也是能够主要利用的是高吞吐的传输能力,然后去完成消息的一个数据的传输。因为下游可能需要分发的数据库、分析,各种场景里面去做分发,使用消息队列是非常合适的。

 

那么举一个客户案例

image.png

那其实阿里内部其实电商的核心交易不是说订单。产品的过程全部都是用RocketMQ来完成的,那的很多社区里面其它的一些客户,公司,很多都是用在线的事物的处理场景。那么说了这么多场景和案例,直接看一下的代码怎么用。

二、原理实践


image.png

首先看发送,其实当初始化生产的注册之后,看到输入消息就可以看。普通消息里面最核心的信息是 topic body topic 就是主题,body 就是消息负载,那还有两个额外的属性,叫做 key tag tag 是一种标签,它就用来完成刚才说的分类就是订单里面消息里面可以有订单支付,然后订单的退款,或者类似于场景。就可以打个标,下游比如说某些业务去关注某一个子类型的消息的时候,它就可以直接用某一个 tag 。然后 key 是一个索引键,索引键是可以帮助快速去定位消息,支持索引查询也就是说基于 key 可以快速的找到消息的内容和消息的 ID ,很多时候,消息系统提供的 ID 是一个没有语义的。那这个时候业务可能就是说能不能查一个订单所关联的所有消息。把订单的 ID 也是一个比较唯一的 ID,放进去之后,就能查出订单的 ID 的生命周期的相关的消息,这样也是一个非常方便的问题排查的一个场景。那初始化消息之后,就可以去调事态的发送,发送完了之后呢,它会给你一个回执就是发送结果,通过发送结果你就可以记录出来这条消息的 ID 状态是什么,如果发送异常需要注意的是需要去把异常捕获了,因为正常情况下有可能 SDK它发消息之后,它在如果有局部节点影响,它会在内部去做重试,当然了重试是未必能够完全的保证一定很好的,时候如果最终重试不成功的时候,会把异常抛给的调动方,业务调动方如果说消息不能丢。需要保证自己去处理一下异常,去做一个兜底。可以看一下,就是发送是非常简单的。

来看一下消费,消费刚才也讲了,就是 RocketMQ 好多种消费方式,有主动获取的方式,也有被动的等着鉴定器推送的方式,那这个时候可以看一下,不管怎么样,消费都是要把你的订阅关系,就是说要订阅刚才 topic,把订阅关系搞上来之后,然后它就会比如说鉴定器的方式,它就只要注册一个鉴定器。然后把在鉴定器内部去处理逻辑就是说处理消息的过程,就是比如是打印或者写数据库,最终处理完之后,一定要记得把消息的一个消费的结果告诉 RocketMQ 返回,比如说消费成功和失败,那如果消费失败,希望让 RocketMQ 再做,从头的话就要返回一个失败的结果,抛异常也是失败。要反复的就完成了整个消费的过程。然后,对于普通获取的方式,其实就是说它会稍微灵活一点,就是并不是RocketMQ帮你去取条件,而是由业务调用,按照自己的诉求,比如按照你自己的速率或者并发去取消息,比如说这里说取一条消息来处理,比如说打个日志,处理完之后,然后记得有一个调用,就是说主动消费的方式,处理完消息之后,它要去主动的应答。就是说刚才这条消息是成功了。如果说这个时候消费失败了就不用管了,失败再取消它会设置一个超时时间,就说如果失败的话,等时间之后 MQ 会自动的给你重置。通过一个简单的一个发送和消费的一个 DEMO 看一下,就是普通消息使用简单的过程。

那么这地方也要注意几个点。使用约束和最佳实践,比如使用普通消息的时候一定要注意,就是消息类型,因为 RocketMQ 的版本里面它会有一些消息的类型的一个检查,就是说创建 topic 主题的时候,要关注说是不是发的普通消息,然后对普通消息做一个检查。五点零版本之后都会做强的类型检查。然后早期它们可能没有但这样的话也会有些混乱。新版本的话会有检查。然后第二点的话就一定要注意消息的负载。就是说消息你去传输的时候一般都是事件。对消息事件的大小有要求。不推荐用 RocketMQ 去传输文件。比如说传个1G2G的文件,那每个消息都是有一个消息大小的限制的,所以超过会被拒绝。而且也会带来一些风险。

最佳实践这块,比较关键的就是第一点就是建议发送方案去设置消息的key。这样可以帮助快速去定位问题和追踪这些问题。第二点是重试,要去做兜底处理。第三点,也是针对消费场景,因为就是在整个消费过程中会有应答机制,但是由于调用方和被调用方之间的一个远程调用存在一些超时一些未知的一些机制在容错的场景下,会有一些保守的策略,就是为了保证可靠,可能会产生少量的重复。那这个时候,业务消费的时候需要做幂等,也就是说能够保证消费的逻辑是可重入的,也就是说如果这条消息推送了两次,或者消费两次对于业务需要没有影响,一般可以利用数据库的软件,或者是放一些KV的方式去做一个判筹。

 

三、产品预告

image.png

阿里云关 于RocketMQ 的一个产品预告。就是会在八月份发一个新的版本,就是5.0新版本,这一代的新实例,相比当前看到阿里云 RocketMQ 的实例会提供更好的一个服务,弹性能力,降本提效,还有运维方面都会有很好的一个提升,会带来一个新的版本系列的一个售卖,然后会对原有的一些计费体系做很大的优化,大部分情况下会给客户带来更好的一个成本提效。同时弹性能力会有很好的增强。比如说新版本的存储是完全 serverless 的,就是说相比自己去搞部署的时候,你会发现你的云盘是没法扩容的,或者是没法缩容的,然后这个时候扩容需要很长的周期。是很不灵活的,而在新版本里面,存储是按照你实际消耗的空间去做一个计费。非常弹性,而且计算这块也会提供突发弹性的情况。可能很多时候都会遇到说今天偶尔有个毛刺的时候可能会把系统给影响,但是为了毛刺去扩容,其实也不划算,突发的弹性其实是个很好的一个功能。可以满足说就支持突发弹性之后,你就不用去为它留麻烦。然后超出的这一部分就按照计费,然后同时刚才讲了,就是业务消息集成里面可观测增强这块也会有很好的提升,就是架构到运维的一个集成都会有领域的一些变化。

相关文章
|
5月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
91 1
|
消息中间件 缓存 RocketMQ
消息发送5-总结|学习笔记
快速学习消息发送5-总结
消息发送5-总结|学习笔记
|
编解码 Java 测试技术
消息类型-普通消息|学习笔记
快速学习消息类型-普通消息
174 0
消息类型-普通消息|学习笔记
|
消息中间件 RocketMQ 开发者
消息消费初探|学习笔记
快速学习消息消费初探
消息消费初探|学习笔记
|
消息中间件 RocketMQ 开发者
消息发送4发送消息|学习笔记
快速学习消息发送4发送消息
消息发送4发送消息|学习笔记
|
消息中间件 物联网 Linux
Msgrcv 接收消息|学习笔记
快速学习 Msgrcv 接收消息
Msgrcv 接收消息|学习笔记
|
消息中间件 缓存 负载均衡
消息发送3-选择队列|学习笔记
快速学习消息发送3-选择队列
消息发送3-选择队列|学习笔记
|
消息中间件 RocketMQ 开发者
消息消费方准备工作|学习笔记
快速学习消息消费方准备工作
消息消费方准备工作|学习笔记
|
消息中间件 RocketMQ 开发者
消息发送1-消息校验|学习笔记
快速学习消息发送1-消息校验
消息发送1-消息校验|学习笔记
|
存储 消息中间件 固态存储
RocketMQ 消息存储和发送性能保证|学习笔记
快速学习 RocketMQ 消息存储和发送性能保证
RocketMQ 消息存储和发送性能保证|学习笔记
下一篇
DataWorks