从同步走向异步-RocketMQ实践

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RocketMQ用户实践

概述

RocketMQ是什么在此不再赘述,官网参考:https://rocketmq.apache.org/,目前是apache的顶级项目。架构图如下:
image.png
作者从2016年就开始接触RocketMQ了,原因很简单,原有交易系统并发能力不够,竞争对手系统能力较强,市场压力巨大。经过多轮研讨,大部专家认为拆分事务,增加并行处理能力可能是一个有效途径,把引入MQ作为一个备选方案进行POC。我们对当时几款MQ做了对比,选型矩阵如下:
image.png
作为一个开源的,高并发能力的消息中间件,我们最终选择了RocketMQ,其实最重要的是有阿里做背书,架构团队比较放心。从2016年的3.X版本到现在稳定运行的4.5.X版本(我们没升到最新),一路走来,解决了我们很多问题,也趟了很多坑。

典型场景

大事务拆分

我们做了一个典型的优化场景,把一个交易大事务拆分,具体实现思路如下:
image.png
首先,交易订单经过初步检查后,直接落盘,请求返回给用户,提示订单提交成功。经过RocketMQ之后,进行库存,扣款操作,如果出现库存不足或者扣款失败,通过短信通知用户。交易的提交是一笔一笔提交,交易的处理则是批量处理。现在的眼光来看,这个处理过程其实并不十分友好,但当时获得的极大的系统能力提升。
整个系统的数据处理过程如下:
image.png
基于RocketMQ的特点,我们做了一些优化:

  1. 我们构建了发送失败日志和消费失败日志,存储发送和消费失败的消息
  2. 我们构建了失败日志的轮询重试机制,发送失败的重新发送,消费失败的,重新消费
  3. 在交易处理中构建了以主键作为去重手段的幂等去重
  4. 因为我们会有多套环境,在MQ的接口层加了白名单机制

多系统复制

大型系统中,或者建设时间比较久的业务系统中,都会存在数据变更的同步,现在通常叫做CDC,但我们的业务不需要整库的复制,而且需要传播的也不是落库数据,需要经过一个处理,所以我们基于RocketMQ,构建一个多系统数据同步机制,架构如下:
image.png
当消费端的多个系统不是一个厂商维护的时候,RocketMQ是存在问题的,因为但是版本的RocketMQ是无法方便的查询消费轨迹的,也没有保存历史数据消费情况。此时如果出现消费端生成没说到消息我们就很被动。

大文件同步

我们的系统也存在一些批量数据复制的场景,需要传输的是一些数据文件,我们基于RocketMQ研发了一套文件传输系统,架构如下:
image.png
数据文件,首先进行签名,其次按照一堆规则拆分、压缩,要求压缩后的消息体小于512k,通过RocketMQ把文件签名,小文件数量,小文件传输到接受端,接收端解压、合并、验证签名。通过这个过程是为了避免网路抖动造成文件传输失败,整个传输过程,我们也设计了很多失败重试的机制,保证数据完整传输到对端。

经典配置

我们一般双机互备部署架构,如下图:
image.png
准备两台机器,一般是虚拟机,要求SSD磁盘,每台机器上部署一个NameSrv、一个Broker,在另一台机器上部署此Broker的备节点。此架构,如果掉了一台机器,是不会造成数据丢失的。
我们一般有两个配置:

  • 8C16G 两台
  • 16C32G 四台

我们最大的MQ集群是16C32G 四台,基本上一天数据流量在4个亿左右(生产和消费合计)。普通的集群就是8C16G 两台,一般的几十万的消息(生产和消费合计)传递是没问题的。

几个坑

主从同步差异导致的消费回退

这是个巨坑,先描述情况:
image.png
这是初始状态,Broker-A-S出现底层磁盘错误,导致消费进度无法同步到Broker-A-S。
image.png
这是异常情况,此时因为某些原因,机器A丢失了或者网络被隔离了,原来在Broker-A上的请求转移到Broker-A-S上,此时消费从500开始消费,消费端系统需要从500读到1001,我们的真实场景是从500万读到了1000万,消费端崩溃。
我们面对过好几次这样的场景,尝试升级SSD盘后都得到了缓解,最终我们修改消费端源码,禁止了消费进度的回退,才最终解决了问题。

数据落盘错误导致的数据丢失

此问题的表面现象是生产消息正常,消费无法正常进行,消费按照Broker比例消失,例如有四个Broker,消费出来的消息就像是少了四分之一。经sh mqadmin topicstatus -t topic检查,发现Last Updated时间为空,如下图:
image.png
此时一般日志里面都会发现一些IO异常,基本上时间为空的队列中的数据都丢失了。我们应对方式是清空掉发生问题的Broker的commitlog,基本上就恢复了。尝试升级SSD盘后也会有所缓解。

使用容器部署导致的机制失效

2019年之后,我们尝试在容器中部署RocketMQ,发现了一个问题,如下:
image.png
所有的NameSrv和Broker都在容器中部署,我们以为主备同步时从A到B,但是却是:
image.png
找了很久没找到原因,最后我们只能是去掉了备库。

未来的期望

在升级到4.5.X版本后,上述问题也都没再发生,证明新版本还是很稳定的。但是对于轨迹,对于历史消息的查找方面还是不太方便,在出现问题时,会比较麻烦,希望在下一步增强这方面的功能。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
12月前
|
消息中间件 存储 缓存
服务异步通信--RabbitMQ
服务异步通信--RabbitMQ
69 0
|
10天前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
31 3
|
3月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
2月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
|
3月前
|
消息中间件 存储 Java
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
23 0
|
4月前
|
消息中间件 存储 JSON
服务器的异步通信——RabbitMQ2
服务器的异步通信——RabbitMQ
34 0
|
4月前
|
消息中间件 缓存 中间件
服务器的异步通信——RabbitMQ1
服务器的异步通信——RabbitMQ
29 0
|
4月前
|
安全 物联网 测试技术
C++ 构建通用的MQTT接口:从理论到实践
C++ 构建通用的MQTT接口:从理论到实践
857 2
|
4月前
|
消息中间件 监控 负载均衡
RocketMQ实践问题收集
RocketMQ实践问题收集
下一篇
DDNS