这 6 个场景下 RocketMQ 会找不到 Broker

简介: 这 6 个场景下 RocketMQ 会找不到 Broker

大家好,我是君哥。

今天来分享一个最近生产环境遇到的一个 RocketMQ 异常:

微信图片_20221213112535.png

首先,我们回顾一下 RockemtMQ 的架构:

微信图片_20221213112557.png

Broker 的主从节点都会注册到 Name Server 集群,Name Server 集群保存了 Broker 相关信息。RocketMQ client 会在本地维护一份 topic 和 Broker 地址的映射关系,放在 MQClientInstance#brokerAddrTable。

发送消息

RocketMQ client 在发送消息时,会根据 topic 首先从本地缓存(brokerAddrTable)获取 Broker,如果获取不到,就会到 Name Server 集群中获取。

//DefaultMQProducerImpl 类
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        //从 Name Server 中获取
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    if (brokerAddr != null) {
        //省略处理逻辑
    }
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

从上面的代码可以看出,如果本地缓存和 Name Server 都没有保存 Broker 信息,则会抛出 Broker 不存在的异常。这种情况解决思路就是从 Broker 启动时是否注册成功来着手分析。

消息偏移量

获取偏移量

客户端获取消息偏移量(Consume Offset)的时候,也可能会抛出这个异常:

//RemoteBrokerOffsetStore 类
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        switch (type) {
            case MEMORY_FIRST_THEN_STORE:
            case READ_FROM_MEMORY: {
                //省略实现逻辑
            }
            case READ_FROM_STORE: {
                    //省略
                    long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                    //省略
            }
            default:
                break;
        }
    }
    return -1;
}

从上面的代码中可以看到:获取偏移量的方式有 3 种:


  • MEMORY_FIRST_THEN_STORE:先从内存中获取,如果获取不到,再从 Broker 请求;
  • READ_FROM_MEMORY:直接从内存中获取;
  • READ_FROM_STORE:直接从 Broker 请求。


从 Broker 请求的代码如下:

//RemoteBrokerOffsetStore 类
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
    }
    if (findBrokerResult != null) {
        //忽略处理逻辑
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

这段代码跟上一节发送消息时获取 Broker 地址的代码一样,首先从本地内存中获取,如果过去不到,就从 Name Server 中获取,如果取不到,就抛出 Broker 不存在的异常。

其他获取偏移量方法

除了上面的获取偏移量的方法外,还有 3 个获取偏移量的方法,在 MQAdminImpl 类:


  • searchOffset:从 Broker 获取 Message-Queue 偏移量,跟上面方法类似;
  • maxOffset:从 Broker 获取 MessageQ-ueue 最大偏移量;
  • minOffset:从 Broker 获取 MessageQu-eue 最小偏移量。

这些方法的使用都在源码【rocketmq-tools】这个模块中。

获取最早消息的保存时间

还有一个跟偏移量相关的方法,获取最早的一条消息的保存时间,代码如下:

//RemoteBrokerOffsetStore 类
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    //省略处理逻辑
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

这个方法是获取一个 MessageQueue 中最小偏移量消息的保存时间。

这些方法的使用都在源码【rocketmq-tools】这个模块中。

拉取消息

正常拉取消息

拉取消息的核心代码如下:

// PullAPIWrapper 类
public PullResult pullKernelImpl(
    //省略参数
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    FindBrokerResult findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                                                          this.recalculatePullFromWhichNode(mq), false);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                                                              this.recalculatePullFromWhichNode(mq), false);
    }
    if (findBrokerResult != null) {
        //省略处理逻辑
    }
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

从上面的代码可以看到,客户端从 Broker 拉取消息之前,首先会从本地缓存获取 Broker 地址,如果获取不到,就从 Name Server 获取 Broker 地址,如果获取失败,则抛出 Broker 不存在的异常。

偏移量不合法

如果拉取消息时返回偏移量不合法(OFFSET_ILLEGAL),这时就需要重新处理偏移量。客户端代码的调用关系如下:

微信图片_20221213112633.png

这个发生在事务消息的场景,RocketMQ client 向 Broker 拉取消息时,如果 Broker 返回 PULL_OFFSET_MOVED,client 就会通过异步线程(定时 10s 后执行)通知 Broker 更新 offset 为 nextPullOffset(上次 pull 消息时 broker 返回)。代码如下:

public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
    }
    if (findBrokerResult != null) {
        //省略业务代码
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

总结

今天根据之前遇到的一次生产环境的异常日志研究了出现【The broker[xxx] not exis】的 6 个场景,每个场景都类似,首先从本地缓存获取 Broker 地址,如果获取不到,就从 Name Server 获取。

出现这种情况一般有下面三个原因:


  • Broker 挂了,客户端定时任务会判断到 Broker 离线,就会从本地缓存中移除(MQClientInstance#cleanOfflineBroker);
  • Broker 网络异常;
  • Broker 发生了主备切换,客户端获取 Broker 地址时切换还没有完成。


这些场景其实也有定时任务刷新本地缓存,见下面代码:

//MQClientInstance 类
private void startScheduledTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}

    ·············· END ··············

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 存储 Java
MQ线上消息乱序问题处理及场景详解
【11月更文挑战第22天】在现代分布式系统中,消息队列(MQ)作为核心组件,承担着异步处理、削峰填谷和系统解耦的重任。
68 1
|
5月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
2月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
125 0
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
4月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
276 0
|
5月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
|
4月前
|
消息中间件 固态存储 RocketMQ
RocketMQ消息堆积常见场景与处理方案
文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。
|
6月前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
195 1
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的