RocketMQ 常见问题

简介: 至此,我们已经介绍完了 RocketMQ 的所有实现细节,最后我们简单地介绍一下使用 RocketMQ 时常见的问题。

引言

至此,我们已经介绍完了 RocketMQ 的所有实现细节,最后我们简单地介绍一下使用 RocketMQ 时常见的问题,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

常见问题

顺序消息方案

顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这 3 个消息必须按顺序处理才行。顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单 ID 的三个消息能按顺序消费即可。

  • 全局顺序消息:一个 topic 只留一个队列,并且顺序消费,这时高并发、高吞吐量的功能完全用不上了。
  • 部分顺序消息:在发送端,要做到把同一业务 ID 的消息发送到同一个 Message Queue(hash 映射);在消费过程中,要做到从同一个 Message Queue 读取的消息不被并发处理,Consumer 使用 MessageListenerOrderly 的时候,这样才能达到部分有序。

消息重复问题

解决消息重复有两种方法:第一种方法是保证消费逻辑的幂等性(多次调用和一次调用效果相同);另一种方法是维护一个巳消费消息的记录(例如,和实际消费过程在同一个事务中写入 DB),消费前查询这个消息是否被消费过。这两种方法都需要使用者自己实现。

动态增减机器

NameServer 增删

NameServer 是各自独立的,在添加或删除 NameServer 后,只能能通过一些机制更新其他角色的 NameServer 地址列表即可:

  • 通过代码设置,比如Producer.setNamesrvAddr (”name-server1-ip:port;name-server2即:port”
  • 使用 Java 启动参数设置
  • 通过 Linux 环境变量设置
  • 使用者定时通过 HTTP 服务查询最新的 NameServer 列表

Broker 增删

由于业务增长,需要对集群进行扩容的时候,可以动态增加 Broker 角色的机器。只增加 Broker 不会对原有的 Topic 产生影响,原来创建好的 Topic 中数据的读写依然在原来的那些 Broker 上进行。

减少 Broker 要看是否有持续运行的 Producer,当一个 Topic 只有一个 Master Broker,停掉这个 Broker 后,消息的发送肯定会受到影响,需要在停止这个 Broker 前,停止发送消息。

当某个 Topic 有多个 Master Broker,停了其中一个,这时候是否会丢失消息呢? 答案和 Producer 使用的发送消息的方式有关,如果使用同步方式 send (msg)发送,在 DefaultMQProducer 内部有个自动重试逻辑,其中一个 Broker 停了,会自动向另一个 Broker 发消息,不会发生丢消息现象。 如果使用异步方式发送 send (msg, callback),或者用 sendOneWay 方式,会丢失切换过程中的消息。 因为在异步和 sendOneWay 这两种发送方式下,发送失败不会重试。DefaultMQProducer 默认每 30 秒到 NameServer 请求最新的路由消息,Producer 如果获取不到已停止的 Broker 下的队列信息,后续就自动不再向这些队列发送消息。

如果 Producer 程序能够暂停,在有一个 Master 和一个 Slave 的情况下也可以顺利切换。可以关闭 Producer 后关闭 Master Broker,这个时候所有的读取都会被定向到 Slave 机器,消费消息不受影响。把 MasterBroker 机器置换完后,基于原来的数据启动这个 Master Broker,然后再启动 Producer 程序正常发送消息。

用 Linux 的 kill pid 命令就可以正确地关闭 Broker, BrokerController 下有个 shutdown 函数,这个函数被加到了 ShutdownHook 里,当用 Linux 的 kill 命令时(不能用 kill -9 ), shutdown 函数会先被执行。也可以通过 RocketMQ 提供的工具(mqshutdown broker)来关闭 Broker,它们的原理是一样的。

各种故障的影响

我们期望消息队列集群一直可靠稳定地运行,但有时候故障是难免的,我们列出可能的故障情况,看看如何处理:

  1. Broker 正常关闭,启动
  2. Broker 异常 Crash,然后启动
  3. OS Crash,重启
  4. 机器断电,但能马上恢复供电
  5. 磁盘损坏
  6. CPU、主板、内存等关键设备损坏

假设现有的 RocketMQ 集群,每个 Topic 都配有多 Master 角色的 Broker 供写入,并且每个 Master 都至少有一个 Slave 机器(用两台物理机就可以实现上述配置),我们来看看在上述情况下消息的可靠性情况。

第 1 种情况属于可控的软件问题,内存中的数据不会丢失。如果重启过程中有持续运行的 Consumer, Master机器出故障后,Consumer会自动重连到对应的 Slave 机器,不会有消息丢失和偏差。当 Master 角色的机器重启以后,Consumer又会重新连接到 Master 机器(注意在启动 Master 机器的时候,如果 Consumer 正在从 Slave 消费消息,不要停止 Consumer。 假如此时先停止 Consumer 后再启动 Master机器,然后再启动 Consumer,这个时候 Consumer 就会去读 Master 机器上已经滞后的 offset 值,还记得集群模式 offset 存在 Master Broker 吗,这样造成消息大量重复)。

如果第 1 种情况出现时有持续运行的 Producer,一台 Master 出故障后,Producer 只能向 Topic 下其他的 Master 机器发送消息,如果 Producer 采用同步发送方式,不会有消息丢失。

第2、3、4种情况属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,如果 Master、Slave 都配置成 SYNC_FLUSH,可以达到和第 1 种情况相同的效果。

第 5、6 种情况属于硬件故障,发生第 5、6 种情况的故障,原有机器的磁盘数据可能会丢失。如果 Master 和 Slave 机器间配置成同步复制方式,某一台机器发生 5 或 6 的故障,也可以达到消息不丢失的效果。如果机器间是异步复制,两次 Sync 间的消息会丢失。

总的来说,当设置成:

  • 多 Master,每个 Master 带有 Slave
  • 主从之间设置成 SYNC_MASTER
  • Producer 用同步方式写
  • 刷盘策略设置成 SYNC_FLUSH

就可以消除单点依赖,即使某台机器出现极端故障也不会丢消息。

消息优先级

RocketMQ 是个先人先出的队列,不支持消息级别或者 Topic 级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决:

  • 拆分出多个 topic 分配不同的 Consumer 去消费
  • 一个 topic 拆分多个队列,每个队列分别消费
  • 强优先级:拆分多个 topic A,B,C,消费者在 A 消费完的情况下才会消费 B,同样 B 消费完的情况下,才去消费 C

吞吐优先方案

  • 过滤感兴趣的消息,减少网络传输 TAG、SQL、FilterServer
  • 增加 Consumer 并发度
  • 检测延时情况,跳过非重要消息
  • 增加 topic 队列
  • 并发 Producer
  • 推荐使用 EXT4 文件系统,IO 调度算法使用 deadline算法

file-system

deadline算法大致思想如下: 实现四个队列,其中两个处理正常的 read 和 write 操作,另外两个处理超时的 read 和 write 操作。正常的 read 和 write 队列中,元素按扇区号排序,进行正常的 IO 合并处理以提高吞吐量。因为 IO 请求可能会集中在某些磁盘位置,这样会导致新来的请求一直被合并,可能会有其他磁盘位置的 IO 请求被饿死。超时的 read 和 write 的队列中,元素按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时(达到最终期限时间)的队列中的 IO 请求会优先被处理。

调优

这里讨论的系统是指能完成某项功能的软硬件整体,比如我们用 RocketMQ ,加上自己写的 Producer、Consumer 程序,部署到一台服务器上,组成一个消息处理系统。

首先是搭建测试环境,查看硬件利用率。把测试系统搭建好以后,要想办法模拟实际使用时的情况,并且逐步增大请求量,同时检测系统的 TPS。在请求量增大到一定程度时,系统的 QPS 达到峰值,这个时候维持这种请求量,保持系统在峰值状态下运行。然后查看此时系统的硬件使用情况:

  • 使用 top 命令查看 CPU 和内存的利用率
  • 使用 sar -n DEV 2 10 命令查看网卡使用情况
  • 使用 netstat -t 查看网卡的连接情况, 看是否有大量连接造成堵塞
  • 使用 iostat -xdm 1 查看磁盘的使用情况

经过上面的一系列检查,应该能够找到系统的瓶颈。比如瓶颈是在CPU、网卡还是磁盘?

还有一种情况是这三者都没有到使用极限,这也是一种比较常见而且有优化空间的情况,这种情况说明 CPU 利用率没有发挥出来,比如可能是锁的机制有 bug,造成线程阻塞。

对于 Java 程序来说,接下来可以用 Java 的 profiling 工具来找出程序的具体问题,比如 jvisualvm、 jstack、 perfJ 等。通过上面这些工具,可以逐步定位出是哪些 Java 线程比较慢,那个函数占用的时间多,是否因为存在锁造成了忙等的情况,然后通过不断的更改测试,找到影响性能的关键代码,最终解决问题。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 海量数据处理之Bloom Filter详解
[7] rocketmq GitHub Wiki

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Java Kafka
RocketMQ快速入门
RocketMQ快速入门
6915 0
RocketMQ快速入门
|
7月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
4月前
|
消息中间件 存储 Java
如何使用RocketMQ
【8月更文挑战第29天】如何使用RocketMQ
204 1
|
6月前
|
消息中间件 弹性计算 运维
对比阿里云的SofaMQ与RocketMQ
对比阿里云的SofaMQ与RocketMQ
797 2
|
6月前
|
消息中间件 Java RocketMQ
【已解决】RocketMq使用报错
【已解决】RocketMq使用报错
304 0
|
7月前
|
消息中间件 存储 Java
RocketMQ部署文档
RocketMQ部署文档
|
7月前
|
消息中间件 存储 Java
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
198 0
|
消息中间件 SQL 负载均衡
RocketMQ快速入门 2
RocketMQ快速入门
202 0
|
消息中间件 Java Apache
RocketMQ快速入门 1
RocketMQ快速入门
186 0
|
消息中间件 存储 Java
RocketMQ极简入门-MQ概述&RocketMQ安装
1.MQ是什么 MQ全称为Message Queue,即消息队列 ,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生 产、存储、消费全过程的软件系统,遵循FIFO原则。在高并发的分布式系统中使用居多。
285 0