【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结

背景介绍


公司最近年底要对系统做一次大的体检,所以是不测不知道,一测吓一跳啊,出现了很多问题,其中最恶心的问题要数我们的ROCKETMQ消息队列的问题了,大家都知道消息队列是作为流量削峰的主要手段,负责系统健壮性和压力的最佳手段,谁知道,它竟然“生病”了,干不动活了。




问题现象


系统频繁出现:system busy 和 broker busy 解决方案:

com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8
复制代码



Rocketmq发送控制流程

image.png

image.png

针对前4种 broker busy ,主要是由于 Broker 在追加消息时持有的锁时间超过了设置的1s,Broker 为了自我保护,会抛出错误,客户端会选择其他 broker 服务器进行重试。


如果对不是金融级服务,建议将 transientStorePoolEnable = true,可以有效避免前面 4 种 broker ,因为开启这个参数,消息首先会存储在堆外内存中,并且 RocketMQ 提供了内存锁定的功能,其追加性能能得到一定的保障,这样可以做到在内存使用层面的读写分离,即写消息是直接写入堆外内存,消费消息直接从 pagecache中读,然后定时将堆外内存的消息写入 pagecache。


但这种方案随之带来的就是可能存在消息丢失,如果对消息非常严谨的话,建议扩容集群,或迁移topic到新的集群。


可以看出来,抛出这种错误,在 broker 还没有发送“严重”的 pagecache 繁忙,即消息追加到内存中的最大时延没有超过 1s,通常追加是很快的,绝大部分都会低于1ms,但可能会由于出现一个超过200ms的追加时间,导致排队中的任务等待时间超过了200ms,则此时会触发broker 端的快速失败,让请求快速失败,便于客户端快速重试。但是这种请求并不是实时的,而是每隔10s 检查一遍。


值得注意的是,一旦出现 TIMEOUT_CLEAN_QUEUE,可能在一个点会有多个这样的错误信息,具体多少与当前积压在待发送队列中的个数有关。




Rocketmq 发送时异常


system busy 和 broker busy 解决方案


  • [REJECTREQUEST]system busy  too many requests and system thread pool busy
  • [PC_SYNCHRONIZED]broker busy
  • [PCBUSY_CLEAN_QUEUE]broker busy
  • [TIMEOUT_CLEAN_QUEUE]broker busy


之前写的解决方案,都是基于测试环境测试的.到生产环境之后,正常使用没有问题,生产环境压测时,又出现了system busy异常(简直崩溃)


com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8
For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)
  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)
  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)
  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1030)
  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:989)
  at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90)
  at 
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
复制代码



报错定位


  • cleanExpiredRequestInQueue会处理发送消息、拉取消息、心跳、事务消息队列中的数据,此次遇到的问题是发送Topic消息报出来的错误,所以接下来针对发送消息流程进行分析。


  • 报出此错误的源码位置为broker快速失败机制BrokerFastFailure.java类(该类在Broker启动时会启动一个定时任务,每10毫秒执行一次),报错位置代码如下:
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
        while (true) {
            try {
                if (!blockingQueue.isEmpty()) {
                    // 获取队列头元素
                    final Runnable runnable = blockingQueue.peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }
                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                    // 如果头元素对应的任务处理时间超过设置的最大等待时间,则处理请求返回该错误,并移除掉该任务
                    if (behind >= maxWaitTimeMillsInQueue) {
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                        }
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
    }
复制代码

这段代码是Broker快速失败机制的核心代码,如果一个等待队列的头元素(也就是第一个要处理或者正在处理的元素)等待时间超过该队列设置的最大等待时间,则丢弃该元素对象的任务,并对这个请求返回[TIMEOUT_CLEAN_QUEUE]broker busy异常信息。



发送Topic消息报该错误


sendThreadPoolQueue取出头元素,转换成对应的任务,判断任务在队列存活时间是否超过了队列设置的最大等待时间,如果超过了则组装处理返回对象response,response的code为RemotingSysResponseCode.SYSTEM_BUSY,内容为:


[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: [当前任务在队列存活时间], size of queue: [当前队列的长度]
复制代码



MQClientAPIImpl.processSendResponse处理返回response,根据response.getCode()的处理分支,最终返回MQBrokerException异常,response分支处理代码如下:

// 只有ResponseCode.SUCCESS的情况下返回结果,其他情况抛出MQBrokerException异常
private SendResult processSendResponse(
        final String brokerName,
        final Message msg,
        final RemotingCommand response
    ) throws MQBrokerException, RemotingCommandException {
        switch (response.getCode()) {
            case ResponseCode.FLUSH_DISK_TIMEOUT:
            case ResponseCode.FLUSH_SLAVE_TIMEOUT:
            case ResponseCode.SLAVE_NOT_AVAILABLE: {
            }
            case ResponseCode.SUCCESS: {
                // 省略部分代码
                return sendResult;
            }
            default:
                break;
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }
复制代码



消息发送客户端接收到MQBrokerException异常信息,捕获异常处理中不符合消息重试逻辑,直接抛出该异常,也就是用户看到的; // timesTotal为消息生产者设置的发送失败重试次数

for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 省略部分代码
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        // 此处为MQBrokerException异常处理逻辑,RemotingSysResponseCode.SYSTEM_BUSY不符合分支条件,最终throw e抛出异常
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }
                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }
复制代码



生产环境各种参数:


  • broker busy异常: 可通过增大 waitTimeMillsInSendQueue 解决
  • system busy异常:可通过增大 osPageCacheBusyTimeOutMills 解决
#发送队列等待时间
waitTimeMillsInSendQueue=3000
#系统页面缓存繁忙超时时间(翻译),默认值 1000
osPageCacheBusyTimeOutMills=5000
复制代码



出现问题分析


出现异常的原因是因为我们同一台服务器部署的多个应用造成的。我们一台服务器上部署了 三个ES、八个redis、一个rocketmq ,压力测试时这些都在使用,虽然cpu、内存都还有很大剩余,但是磁盘io和内存频率毕竟只有那么多可能已经占满,或者还有其他都会有影响。


之前测试环境测试其他东西时,发现mq和redis同时大量使用时,redis速度会降低三到四倍,由此可见应用分服务器部署的重要性。以前知道会有影响,没想到影响这么大。


最终结解决方案:应该给rocketmq单独部署性能较高的服务器.


记一次 rocketmq 使用时的异常。



问题分析总结


  1. system busy , start flow control for a while

该异常会造成 消息丢失。

  1. broker busy , start flow control for a while

该异常不会造成消息丢失。




问题解决过程


1、最开始时候 ,测试发现在性能好的服务器上只会出现system busy,也就是说出现异常就会消息丢失。


所以:业务代码进行处理,出现异常就会重发到当前topic的bak队列,当时想的是既然这个topic busy了,就换到另外的topic去发,总不能都 busy吧。也算是临时解决了。


2、发现有消息重复的现象。不用想肯定是报broker busy异常,重发到topic的 bak队列了。又因为broker busy可能不会造成消息丢失,所以消息重复就出现了。



解决方案:


修改rocketmq配置文件:


  • 方案一:sendMessageThreadPoolNums 改成 1 ,没有的话新增一行。sendMessageThreadPoolNums=1
  • 方案二:useReentrantLockWhenPutMessage改成true,没有的话新增一行。
sendMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true
复制代码

sendMessageThreadPoolNums这个属性是发送线程池大小, rocketmq4.1版本之后默认为 1,之前版本默认什么不知道但是肯定大于1。这个属性改成1的话,就不用管useReentrantLockWhenPutMessage这个属性了;


如果改成大于1,就需要将useReentrantLockWhenPutMessage这个属性设置为 true;


目前测试 未发现这两个方案有什么区别,sendMessageThreadPoolNums=1 时也支持多线程发送,发送速度感觉和 sendMessageThreadPoolNums大于1没有区别,都能跑满100M的网卡。


感觉如果useReentrantLockWhenPutMessage=true的时候,就是打开锁,然后关键代码其实还是单线程处理;



解决方案


  1. 业务逻辑处理中进行异常捕获,如果捕获到异常为MQBrokerException并且responseCode为2则重发消息;
  2. 修改broker的默认发送消息任务队列等待时长waitTimeMillsInSendQueue(单位: 毫秒);


除此之外,还可以观察报错时磁盘的IO情况,出现这种错误很有可能是当时的磁盘IO很高,导致消息落盘时间变长。




RocketMQ的参数指南


NameServer配置属性

#broker名字,注意此处不同的配置文件填写的不一样
brokerClusterName=rocketmqcluster
brokerName=broker-a
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#这个配置可解决双网卡,发送消息走外网的问题,这里配上内网ip就可以了
brokerIP1=10.30.51.149
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=8
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 0点
deleteWhen=03
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=1000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/data/rocketmq/data
#commitLog 存储路径
storePathCommitLog=/app/data/rocketmq/data/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/data/rocketmq/data/consumerqueue
#消息索引存储路径
storePathIndex=/app/data/rocketmq/data/index
#checkpoint 文件存储路径
storeCheckpoint=/app/data/rocketmq/data/checkpoint
#abort 文件存储路径
abortFile=/app/data/rocketmq/data/abort
#限制的消息大小 修改为16M
maxMessageSize=‭16777216‬
#发送队列等待时间
waitTimeMillsInSendQueue=3000
osPageCacheBusyTimeOutMills=5000
flushCommitLogLeastPages=12
flushConsumeQueueLeastPages=6
flushCommitLogThoroughInterval=30000
flushConsumeQueueThoroughInterval=180000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
sendMessageThreadPoolNums=80
#拉消息线程池数量
pullMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
复制代码

官方资料



参考资料




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
4月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
4月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
5月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
101 3
|
5月前
|
消息中间件 Linux API
centos7 安装rabbitmq自定义版本及配置
centos7 安装rabbitmq自定义版本及配置
|
5月前
|
消息中间件 Cloud Native Serverless
RabbitMQ 与云原生技术的融合
【8月更文第28天】随着微服务架构和容器化的普及,云原生技术已成为构建现代应用的标准方式。云原生应用程序利用了诸如容器化、微服务、声明式API等技术,以提高可伸缩性、可靠性和可维护性。消息队列作为服务间通信的关键组件,在云原生环境中扮演着重要角色。本文将探讨如何将RabbitMQ与云原生技术(如Service Mesh和Serverless平台)相结合,并通过具体的代码示例来展示其集成方法。
47 2
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
125 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
3月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
143 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
2月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
ly~
|
4月前
|
消息中间件 搜索推荐 大数据
一般情况下在 RocketMQ 中添加 access key 的步骤: 一、确定配置文件位置 RocketMQ 的配置文件通常位于安装目录下的 conf 文件夹中。你需要找到 broker.conf 或相关的配置文件。 二、编辑配置文件 打开配置文件,查找与 ACL(访问控制列表)相关的配置部分。 在配置文件中添加以下内容:
大数据广泛应用于商业、金融、医疗和政府等多个领域。在商业上,它支持精准营销、客户细分及流失预测,并优化供应链管理;金融领域则利用大数据进行风险评估、市场预测及欺诈检测;医疗行业通过大数据预测疾病、提供个性化治疗;政府运用大数据进行城市规划和公共安全管理;工业领域则借助大数据进行设备维护、故障预测及质量控制。
ly~
231 2

相关产品

  • 云消息队列 MQ