从同步走向异步-RocketMQ实践

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RocketMQ用户实践

概述

RocketMQ是什么在此不再赘述,官网参考:https://rocketmq.apache.org/,目前是apache的顶级项目。架构图如下:
image.png
作者从2016年就开始接触RocketMQ了,原因很简单,原有交易系统并发能力不够,竞争对手系统能力较强,市场压力巨大。经过多轮研讨,大部专家认为拆分事务,增加并行处理能力可能是一个有效途径,把引入MQ作为一个备选方案进行POC。我们对当时几款MQ做了对比,选型矩阵如下:
image.png
作为一个开源的,高并发能力的消息中间件,我们最终选择了RocketMQ,其实最重要的是有阿里做背书,架构团队比较放心。从2016年的3.X版本到现在稳定运行的4.5.X版本(我们没升到最新),一路走来,解决了我们很多问题,也趟了很多坑。

典型场景

大事务拆分

我们做了一个典型的优化场景,把一个交易大事务拆分,具体实现思路如下:
image.png
首先,交易订单经过初步检查后,直接落盘,请求返回给用户,提示订单提交成功。经过RocketMQ之后,进行库存,扣款操作,如果出现库存不足或者扣款失败,通过短信通知用户。交易的提交是一笔一笔提交,交易的处理则是批量处理。现在的眼光来看,这个处理过程其实并不十分友好,但当时获得的极大的系统能力提升。
整个系统的数据处理过程如下:
image.png
基于RocketMQ的特点,我们做了一些优化:

  1. 我们构建了发送失败日志和消费失败日志,存储发送和消费失败的消息
  2. 我们构建了失败日志的轮询重试机制,发送失败的重新发送,消费失败的,重新消费
  3. 在交易处理中构建了以主键作为去重手段的幂等去重
  4. 因为我们会有多套环境,在MQ的接口层加了白名单机制

多系统复制

大型系统中,或者建设时间比较久的业务系统中,都会存在数据变更的同步,现在通常叫做CDC,但我们的业务不需要整库的复制,而且需要传播的也不是落库数据,需要经过一个处理,所以我们基于RocketMQ,构建一个多系统数据同步机制,架构如下:
image.png
当消费端的多个系统不是一个厂商维护的时候,RocketMQ是存在问题的,因为但是版本的RocketMQ是无法方便的查询消费轨迹的,也没有保存历史数据消费情况。此时如果出现消费端生成没说到消息我们就很被动。

大文件同步

我们的系统也存在一些批量数据复制的场景,需要传输的是一些数据文件,我们基于RocketMQ研发了一套文件传输系统,架构如下:
image.png
数据文件,首先进行签名,其次按照一堆规则拆分、压缩,要求压缩后的消息体小于512k,通过RocketMQ把文件签名,小文件数量,小文件传输到接受端,接收端解压、合并、验证签名。通过这个过程是为了避免网路抖动造成文件传输失败,整个传输过程,我们也设计了很多失败重试的机制,保证数据完整传输到对端。

经典配置

我们一般双机互备部署架构,如下图:
image.png
准备两台机器,一般是虚拟机,要求SSD磁盘,每台机器上部署一个NameSrv、一个Broker,在另一台机器上部署此Broker的备节点。此架构,如果掉了一台机器,是不会造成数据丢失的。
我们一般有两个配置:

  • 8C16G 两台
  • 16C32G 四台

我们最大的MQ集群是16C32G 四台,基本上一天数据流量在4个亿左右(生产和消费合计)。普通的集群就是8C16G 两台,一般的几十万的消息(生产和消费合计)传递是没问题的。

几个坑

主从同步差异导致的消费回退

这是个巨坑,先描述情况:
image.png
这是初始状态,Broker-A-S出现底层磁盘错误,导致消费进度无法同步到Broker-A-S。
image.png
这是异常情况,此时因为某些原因,机器A丢失了或者网络被隔离了,原来在Broker-A上的请求转移到Broker-A-S上,此时消费从500开始消费,消费端系统需要从500读到1001,我们的真实场景是从500万读到了1000万,消费端崩溃。
我们面对过好几次这样的场景,尝试升级SSD盘后都得到了缓解,最终我们修改消费端源码,禁止了消费进度的回退,才最终解决了问题。

数据落盘错误导致的数据丢失

此问题的表面现象是生产消息正常,消费无法正常进行,消费按照Broker比例消失,例如有四个Broker,消费出来的消息就像是少了四分之一。经sh mqadmin topicstatus -t topic检查,发现Last Updated时间为空,如下图:
image.png
此时一般日志里面都会发现一些IO异常,基本上时间为空的队列中的数据都丢失了。我们应对方式是清空掉发生问题的Broker的commitlog,基本上就恢复了。尝试升级SSD盘后也会有所缓解。

使用容器部署导致的机制失效

2019年之后,我们尝试在容器中部署RocketMQ,发现了一个问题,如下:
image.png
所有的NameSrv和Broker都在容器中部署,我们以为主备同步时从A到B,但是却是:
image.png
找了很久没找到原因,最后我们只能是去掉了备库。

未来的期望

在升级到4.5.X版本后,上述问题也都没再发生,证明新版本还是很稳定的。但是对于轨迹,对于历史消息的查找方面还是不太方便,在出现问题时,会比较麻烦,希望在下一步增强这方面的功能。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
13天前
|
消息中间件 存储 监控
活动实践 | 快速体验云消息队列RocketMQ版
本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
|
3月前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
420 4
|
3月前
|
消息中间件 存储 Serverless
【实践】快速学会使用阿里云消息队列RabbitMQ版
云消息队列 RabbitMQ 版是一款基于高可用分布式存储架构实现的 AMQP 0-9-1协议的消息产品。云消息队列 RabbitMQ 版兼容开源 RabbitMQ 客户端,解决开源各种稳定性痛点(例如消息堆积、脑裂等问题),同时具备高并发、分布式、灵活扩缩容等云消息服务优势。
140 2
|
1月前
|
消息中间件 Java 开发工具
【实践】快速学会使用云消息队列RabbitMQ版
本次分享的主题是快速学会使用云消息队列RabbitMQ版的实践。内容包括:如何创建和配置RabbitMQ实例,如Vhost、Exchange、Queue等;如何通过阿里云控制台管理静态用户名密码和AccessKey;以及如何使用RabbitMQ开源客户端进行消息生产和消费测试。最后介绍了实验资源的回收步骤,确保资源合理利用。通过详细的操作指南,帮助用户快速上手并掌握RabbitMQ的使用方法。
106 10
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
120 13
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
3月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
3月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
3月前
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版实践解决方案评测
随着企业业务的增长,对消息队列的需求日益提升。阿里云的云消息队列 RabbitMQ 版通过架构优化,解决了消息积压、内存泄漏等问题,并支持弹性伸缩和按量计费,大幅降低资源和运维成本。本文从使用者角度详细评测这一解决方案,涵盖实践原理、部署体验、实际优势及应用场景。
|
3月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
84 4