你肯定不知道RocketMQ生产者是如何规避故障Broker的

简介: 你肯定不知道RocketMQ生产者是如何规避故障Broker的

前言

在消息发送过程中,生产者从NameServer中获取到了指定Topic对应的Broker信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的。那么Broker有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker的呢?

收集故障Broker

我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl类中查找:

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
复制代码

无论是发送成功还是失败,RocketMQ生产者客户端都会做这一步操作:

// 发送成功的话,isolation传false,失败isolation传true
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
复制代码

如果Broker产生故障,那么会创建一个FaultItem对象记录故障的Broker,并把结果放进故障规避表faultItemTable中,数据格式如下:

"broker-a": {
  // broker名称
  "name": "broker-a",
  "currentLatency": 发送消息消耗的时间,毫秒值之差,
  // 解除规避的时间,绝对时间
  "startTimestamp": 时间戳毫秒值
},
"broker-b": {
  // broker名称
  "name": "broker-b",
  "currentLatency": 发送消息消耗的时间,毫秒值之差,
  // 解除规避的时间,绝对时间
  "startTimestamp": 时间戳毫秒值
}
复制代码

发送成功的Broker设置的故障规避时间为0,发送失败的Broker将被设置为规避30秒;

选择Broker

MQFaultStrategy.selectOneMessageQueue()方法中,我们分三部分来分析如何选择Broker。

  • 轮询选择一个可用的Broker
// 轮询的基本套路,一个自增变量
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
     // 通过对队列数量取模,获取选定的Broker所在的位置
     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
     if (pos < 0)
         pos = 0;
     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
     // 判断Broker是否在规避时间内,如果不在规避时间内,就选择这个Broker,否则继续循环直至所有Broker都在规避时间内
     if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
         return mq;
}
复制代码

1.轮询的基本套路都是通过一个自增变量来对所有的Broker数量取模,这样就可以命中一个Broker;

2.针对命中的Broker判断是否在规避时间范围内,不在规避时间内就可以返回;否则只能进入第二个方案;

  • 选择一个相对延迟低的Broker
// 把所有规避列表中的Broker按延迟高低排序,并从延迟低的Broker中选择一个
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 判断该Broker是否允许写消息
if (writeQueueNums > 0) {
    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    if (notBestBroker != null) {
        mq.setBrokerName(notBestBroker);
        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
    }
    // 返回选中的Broker
    return mq;
}
复制代码

1.从规避列表中找到延时比较低的Broker;

2.判断该Broker是否允许写消息,允许写消息的话就直接返回,否则再进入下一个方案;

  • 默认的选择
return tpInfo.selectOneMessageQueue();
复制代码

最后直接轮询一个Broker直接返回:

public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
复制代码

该方案是默认方案,没有开启故障规避配置的话,所有Broker的选择都是使用的该方案;

小结

RocketMQ通过设置故障规避表的方式,把所有的Broker的延迟数据都保留在故障规避表中,根据该列表制定了以下几种策略:

1.优先选择不在规避时间范围内的Broker

2.如果所有Broker都在规避时间内,优先选择延迟低的Broker

3.如果依然没有选中合适的Broker,那么就直接挑一个Broker来用;


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 Java Apache
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
990 0
|
15天前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
15天前
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
12天前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
18 2
|
12天前
|
消息中间件 负载均衡 Apache
【RocketMQ系列七】消费者和生产者的实现细节
【RocketMQ系列七】消费者和生产者的实现细节
16 1
|
1月前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
|
26天前
|
消息中间件 监控 网络安全
在RocketMQ中,生产者提交数据导致连接不上问题
【6月更文挑战第19天】在RocketMQ中,生产者提交数据导致连接不上问题
75 1
|
1月前
|
消息中间件 设计模式 网络安全
消息队列 MQ操作报错合集之broker启用controller配置时,遇到报错,是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
15天前
|
消息中间件 Java Shell
消息队列 MQ产品使用合集之启动broker&proxy的时候会报错,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。