开发者学堂课程【 RocketMQ 消息集成:多类型业务消息专题:多类型业务消息专题-普通消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/1161/detail/17322
多类型业务消息专题-普通消息
四、原理实践
1、原理实践:RocketMQ 普通消息x原理消息
普通消息,其实是 RocketMQ 的基本消息类型,提供高存储扩展低延迟异步通信的一种通讯能力。
普通消息所具备的通信的能力是基础的,生产者发送消息到服务端去存储,然后存储完之后,会把消息按照订阅关系的匹配推送给下游的消息销售方去做消费。
普通消息的特点:第一点就是原子性,消息是独立的事件。消息之间是没有关联关系,任何两条消息之间都是独立的。消息的处理也不用关心在第一条消息之后的一些约束和保护。
第二点是扩展性,可以通过 RocketMQ 存储的机架,把普通消息的主题的消息离散的分布在多个链。好处是可以实现单个主题的普通消息的吞吐能力和容量的无限的水平扩展。也就是单个物理节点的能力是有限的,或者存储空间是有限的。但是通过无限的加队列的方式,加在不同的节点上实现业务对于普通消息的调用,它的容量无限扩展。而且另外一点就是队列当前有些问题,可以接着使用另外队列去发消息,因为消息之间是没有任何关系的。
普通消息整个状态非常简单,链路也非常精简,可以实现很好的高纯度低延迟的风险。
2、原理实践:RocketMQ 普通消息x原理介绍
那普通消息从初始化发送到处理过程中到底是经历哪些状态和过程。
消息的生命周期,可以帮助去判断线上出现的这种问题的具体情况。第一个状态其实就是初始化的状态。在生产端把消息按照的业务诉求去化构建好之后,待发送的阶段。那第二个阶段,发送到服务端之后,对下游可见,也就是等待被消费,因为消费本身就是解耦的,生产速率和销售设备可以不一样。此时消息属于待消费的状态,没有被下游去消费。
第三个状态非常关键。当消费者获取消息之后,处理是有过程的。消息的状态是消费中。就是获取消息之后按照业务逻辑,因为很多业务逻辑,比如消费消息是要写订单表,过程是比较缓慢的。RocketMQ 在业务集团里面会保证消息的考核传输,如果很长时间之后,消息没有发送成功,还会进行重试处理。当然消费如果没有很慢,会出现两种情况。第一种是成功,消费提交。第二种消费失败。
消费者完成消费处理的时候,他会把应答事件提交到服务端。一段时间里,消费提交是成功还是失败。如果成功,服务端就会把它标记,继续后面的处理。消费失败也会标记,标记后物理消息并不会被立即删除。只是个逻辑标题,因为首先RocketMQ 有一对多的这种场面,为提高这种系统的分布并不会为每个消息独立副本。一方面节省成本,提高性能;另一方面能够撤销,假如业务逻辑因为自身问题处理异常,
可以通过回溯的重新去处理,RocketMQ 存储流式,也就是先进先出,最早的消息到一定的保存机制上最后会删除。那时候才是真正的物理删除。
消息的初始化、待消费到最终删除期间有很长的过程,在过程中也有很多状态。所以在再去使用 RocketMQ 的时候,要关注这些流程。
3、原理实践:RocketMQ 普通消息x使用场景
普通消息应用集中在服务间的结果调用,同时还有一些批量的数据的采集传输。举个例子,订单的产业上游发发订单,把订单转化成事件。然后,下游物流系统、积分系统关注订单的变更,会把消息的事件,去做业务逻辑的处理,比如微服务的调用、数据库的变更。这种场景下,下游处理完成,上游订单系统发送消息成功,就可以认为下单成功的。
第二点利用刚才这种海量的写入能力去做削峰填谷。比如下游的处理非常慢或者是能力不足,但是 MQ 会按照自适应的方式去做流控。下游会按照最大的能力来处理,上游发过来的大量的消息会堆在服务端,但是上游和下游之间不用做强保障,要完成整个的全周期的保障。第二点是实时数据传输。举个例子,采集者,前端采集日志或者其他。如果 MQ 去做异步方式,分发场景利用高吞吐传输完成消息的数据的传输。因为下游可能需要分发的数据库分析、分发,采用这种方式比较合适的。
4、原理实践:RocketMQ 普通消息x客户案例
举个客户案例。阿里内部电商的交易使用 RocketMQ 完成,然后那海关社区里面其他的客户使用在线事物处理的场景。
5、原理实践:RocketMQ 普通消息x使用示例
上图是普通消息的代码,发送普通消息示例中,注意 producer 中的 settopic 和setbody ,分别是主题和消息负载。还有两个额外的属性叫做 tag ,一种标签,针对订单里面消息的分类,比如有订单支付、退款或者类似场景,就是可以打标,指定 tag 。
Key 就是索引架,可以帮助快速定位。基于key就可以快速的找到消息的内容和消息的 id 。查询订单的id可以得到生命周期,非常方便的排查问题。
初始化消息之后,就可以去调生态的发送。发送完之后,返回发送结果。发送结记录消息的 id 、状态。如果消息抛异常,需要注意的是捕获异常。因为正常情况下发送消息之后,会在内部做重置,但是结果未必可靠。如果最终重置不成功,就会把异常抛给调用方,调用方去处理异常。
RocketMQ 支持主动获取的方式,也有被动的监听器推送的方式等多种方式。消费都是要把订阅关系关系转化完之后,利用比如监听器
,在监听器内部去处理逻辑,处理消息的过程。最终处理完善之后,一定要记得把消息的消费结果通过 RocketMQ 返回。比如消费成功和失败。如果消费失败,希望在做重投的话,返回失败的结果。虽然结果不好,但是返回就完成整个消费的过程。
对于这种普通消息获取的方式比较灵活,因为业务调用是按照自己的诉求,比如速率、并发,取消息。之后就来处理。比如日志处理完之后,调研要求主动消费的方式处理完后,要去主动的应答。比如消息成功消费。如果消费失败,取消热对方,会设置超时时间。就是失败等待一段时间之后,会自动的重置。
6、原理实践:RocketMQ 普通消息x约束&最佳实践
注意普通消息的约束和最佳实践。使用消息时,一定要注意消息类型。RocketMQ会有消息类型的检查。上传主题的时候注意是消息,然后再去做检查。
然后第二点就要注意消息的负载。消息去传输的时候,对消息事件的大小是有要求的。就并不推荐传到 1G、2G 的文件,每个消息都是有消息大小的限制,超过会被服务端拒绝,而且也会带来一些风险。
最佳实践第一点就是建议设置发送消息 key ,可以帮助快速去定位问题的。第二点发送重试,要去做兜底处理。
第三点,消费场景。在整个消费过程中会有应答机制。但是由于调用方和被调用方之间的交流存在一些超时,或者在容错的场景下,会有一些保守的策略产生少量的重做。业务消费的时候需要做消费幂等,保证消费的逻辑可重复。
五、产品预告
产品预告:八月份会发新的版本,5.0新版本的实验。降本提效还有运维等方面都会提升。同时会对原有的一些计费去做优化,给一些客户来带来成本的提效。
弹性能力会很大的增强,原先云盘是没法扩容缩容。扩容需要很长的周期,市场不灵活。在新版本里面,是按照实际消耗的空间去做计费,
突发弹性是很好的功能特性,超出的部分按量计费。
然后同时针对业务消息,可发射增强也会有很好的提升,从架构到集成都会有新的一些变化。