问题描述
项目组在使用RocketMQ的过程中,遇到RocketMQ Consumer连接namesvr失败的情况,异常线程栈如下:
2024-03-20 15:06:27,674 ERROR RocketmqClient - updateTopicRouteInfoFromNameServer Exception org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [10.2.96.43:30286] failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel(NettyRemotingClient.java:445) at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(NettyRemotingClient.java:400) at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:369) at com.xx.yy.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1394) at com.xx.yy.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1384) at com.xx.yy.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:627) at com.xx.yy.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:514) at com.xx.yy.impl.factory.MQClientInstance.findConsumerIdList(MQClientInstance.java:1084) at com.xx.yy.impl.consumer.RebalanceImpl.rebalanceByTopic(RebalanceImpl.java:261) at com.xx.yy.impl.consumer.RebalanceImpl.doRebalance(RebalanceImpl.java:224) at com.xx.yy.impl.consumer.DefaultMQPushConsumerImpl.doRebalance(DefaultMQPushConsumerImpl.java:1021) at com.xx.yy.impl.factory.MQClientInstance.doRebalance(MQClientInstance.java:981) at com.xx.yy.impl.consumer.RebalanceService.run(RebalanceService.java:41) at java.lang.Thread.run(Thread.java:748)
问题分析
getAndCreateNameserverChannel
从异常线程栈可以看出在调用org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel的时候抛出了异常,该方法代码如下:
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } final List<String> addrList = this.namesrvAddrList.get(); if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } if (addrList != null && !addrList.isEmpty()) { for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr); log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); Channel channelNew = this.createChannel(newAddr); if (channelNew != null) { return channelNew; } } throw new RemotingConnectException(addrList.toString()); } } finally { this.namesrvChannelLock.unlock(); } } else { log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } return null; }
结合arthas vmtool一步步分析以上代码的执行路径:
vmtool --action getInstances --className org.apache.rocketmq.remoting.netty.NettyRemotingClient --express 'instances[0].namesrvAddrChoosed'
最终分析到程序执行this.createChannel(newAddr)返回的是null,所以最终抛出了RemotingConnectException异常。
watch org.apache.rocketmq.remoting.netty.NettyRemotingClient createChannel "{params,returnObj}" -x 2
createChannel
createChannel为什么会返回null呢?为了弄清楚这个问题,需要知道createChannel的执行逻辑。借助arthas trace命令来确定代码的执行逻辑。
trace org.apache.rocketmq.remoting.netty.NettyRemotingClient createChannel
通过以上调用过程和createChannel代码逻辑,来定位具体异常。
private Channel createChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { boolean createNewConnection; cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { return cw.getChannel(); } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; } if (createNewConnection) { ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); } } catch (Exception e) { log.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } } return null; }
从以上分析可以定位到代码执行到了503行,但是503行的异常日志并没有打印出来。借助arthas watch把异常栈打印出来。
watch org.apache.rocketmq.logging.InternalLogger warn "{params[1]}" -x 2
从异常栈可以看出在初始化io.netty.buffer.ByteBufAllocator的时候出错了,接下来看看io.netty.buffer.ByteBufAllocator和io.netty.channel.DefaultChannelConfig分别来自哪个jar包。
sc -d io.netty.buffer.ByteBufAllocator
sc -d io.netty.channel.DefaultChannelConfig
以上可以,io.netty.buffer.ByteBufAllocator加载自netty-buffer-4.1.68.Final.jar,io.netty.channel.DefaultChannelConfig加载自netty-all-4.0.42.Final.jar。
分析到这里,猜测大概率netty包有冲突,查看下应用有哪些netty包:
解决方法
排除掉冲突的netty包,只保留netty-all-4.0.41.Final版本。