RocketMQ—一次连接namesvr失败的案例分析

本文涉及的产品
应用实时监控服务-用户体验监控,每月100OCU免费额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。

问题描述

项目组在使用RocketMQ的过程中,遇到RocketMQ Consumer连接namesvr失败的情况,异常线程栈如下:

2024-03-20 15:06:27,674 ERROR RocketmqClient - updateTopicRouteInfoFromNameServer Exception
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [10.2.96.43:30286] failed
  at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel(NettyRemotingClient.java:445)
  at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(NettyRemotingClient.java:400)
  at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:369)
  at com.xx.yy.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1394)
  at com.xx.yy.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1384)
  at com.xx.yy.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:627)
  at com.xx.yy.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:514)
  at com.xx.yy.impl.factory.MQClientInstance.findConsumerIdList(MQClientInstance.java:1084)
  at com.xx.yy.impl.consumer.RebalanceImpl.rebalanceByTopic(RebalanceImpl.java:261)
  at com.xx.yy.impl.consumer.RebalanceImpl.doRebalance(RebalanceImpl.java:224)
  at com.xx.yy.impl.consumer.DefaultMQPushConsumerImpl.doRebalance(DefaultMQPushConsumerImpl.java:1021)
  at com.xx.yy.impl.factory.MQClientInstance.doRebalance(MQClientInstance.java:981)
  at com.xx.yy.impl.consumer.RebalanceService.run(RebalanceService.java:41)
  at java.lang.Thread.run(Thread.java:748)

问题分析

getAndCreateNameserverChannel

从异常线程栈可以看出在调用org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel的时候抛出了异常,该方法代码如下:

private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
  String addr = this.namesrvAddrChoosed.get();
  if (addr != null) {
    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
      return cw.getChannel();
    }
  }
  final List<String> addrList = this.namesrvAddrList.get();
  if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
    try {
      addr = this.namesrvAddrChoosed.get();
      if (addr != null) {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
          return cw.getChannel();
        }
      }
      if (addrList != null && !addrList.isEmpty()) {
        for (int i = 0; i < addrList.size(); i++) {
          int index = this.namesrvIndex.incrementAndGet();
          index = Math.abs(index);
          index = index % addrList.size();
          String newAddr = addrList.get(index);
          this.namesrvAddrChoosed.set(newAddr);
          log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
          Channel channelNew = this.createChannel(newAddr);
          if (channelNew != null) {
            return channelNew;
          }
        }
        throw new RemotingConnectException(addrList.toString());
      }
    } finally {
      this.namesrvChannelLock.unlock();
    }
  } else {
    log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
  }
  return null;
}

结合arthas vmtool一步步分析以上代码的执行路径:

vmtool --action getInstances --className org.apache.rocketmq.remoting.netty.NettyRemotingClient --express 'instances[0].namesrvAddrChoosed'

最终分析到程序执行this.createChannel(newAddr)返回的是null,所以最终抛出了RemotingConnectException异常。

watch org.apache.rocketmq.remoting.netty.NettyRemotingClient createChannel "{params,returnObj}" -x 2

createChannel

createChannel为什么会返回null呢?为了弄清楚这个问题,需要知道createChannel的执行逻辑。借助arthas trace命令来确定代码的执行逻辑。

trace org.apache.rocketmq.remoting.netty.NettyRemotingClient createChannel

通过以上调用过程和createChannel代码逻辑,来定位具体异常。

private Channel createChannel(final String addr) throws InterruptedException {
  ChannelWrapper cw = this.channelTables.get(addr);
  if (cw != null && cw.isOK()) {
    return cw.getChannel();
  }
  if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
    try {
      boolean createNewConnection;
      cw = this.channelTables.get(addr);
      if (cw != null) {
        if (cw.isOK()) {
          return cw.getChannel();
        } else if (!cw.getChannelFuture().isDone()) {
          createNewConnection = false;
        } else {
          this.channelTables.remove(addr);
          createNewConnection = true;
        }
      } else {
        createNewConnection = true;
      }
      if (createNewConnection) {
        ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
        log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
        cw = new ChannelWrapper(channelFuture);
        this.channelTables.put(addr, cw);
      }
    } catch (Exception e) {
      log.error("createChannel: create channel exception", e);
    } finally {
      this.lockChannelTables.unlock();
    }
  } else {
    log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
  }
  if (cw != null) {
    ChannelFuture channelFuture = cw.getChannelFuture();
    if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
      if (cw.isOK()) {
        log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
        return cw.getChannel();
      } else {
        log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
      }
    } else {
      log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
        channelFuture.toString());
    }
  }
  return null;
}

从以上分析可以定位到代码执行到了503行,但是503行的异常日志并没有打印出来。借助arthas watch把异常栈打印出来。

watch org.apache.rocketmq.logging.InternalLogger warn "{params[1]}" -x 2

从异常栈可以看出在初始化io.netty.buffer.ByteBufAllocator的时候出错了,接下来看看io.netty.buffer.ByteBufAllocator和io.netty.channel.DefaultChannelConfig分别来自哪个jar包。

sc -d io.netty.buffer.ByteBufAllocator

sc -d io.netty.channel.DefaultChannelConfig

以上可以,io.netty.buffer.ByteBufAllocator加载自netty-buffer-4.1.68.Final.jar,io.netty.channel.DefaultChannelConfig加载自netty-all-4.0.42.Final.jar。

分析到这里,猜测大概率netty包有冲突,查看下应用有哪些netty包:

1(1).png

解决方法

排除掉冲突的netty包,只保留netty-all-4.0.41.Final版本。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
307 2
|
7月前
|
消息中间件 存储 开发工具
消息队列 MQ产品使用合集之C++如何使用Paho MQTT库进行连接、发布和订阅消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
54 0
|
5月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
133 1
|
5月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
168 0
|
6月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
6月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 RocketMQ
消息队列 MQ使用问题之如何使用SockJS和Stomp与RabbitMQ建立连接
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
203 1

相关产品

  • 云消息队列 MQ