Dubbo [DUBBO] disconected from 问题
重启 Dubbo provider(生产者) 服务,出现如下异常日志:
[INFO ] 2017-11-15 10:50:07,790--DubboServerHandler-10.255.242.97:20990-thread-517--[com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol] [DUBBO] disconected from /10.255.242.96:11582,url:dubbo://10.255.242.97:20990/com.tms.express.service.ServiceA?anyhost=true&application=tms-express-service&channel.readonly.sent=true&codec=dubbo&default.accepts=4000&default.buffer=8192&default.connections=20&default.exporter.listener=apiMonitorProviderExporterListener&default.loadbalance=random&default.payload=88388608&default.queues=0&default.retries=0&default.service.filter=apiMonitorProviderFilter,,&default.threadpool=fixed&default.threads=600&default.weight=100&dubbo=2.8.3.2&generic=false&heartbeat=60000&interface=com.tms.express.service.ServiceA&logger=slf4j&methods=handlePassbackDataSf,orderWeightSchedule&owner=nobody&pid=6694&revision=1.0-SNAPSHOT&side=provider×tamp=1510563828892,dubbo version: 2.8.3.2, current host: 10.255.242.97
解决方案是重启服务调用端(消费者)即可。
在 dubbo 创建客户端连接服务端的时候,会同时创建一个心跳定时任务,该任务会每隔 2 s 发送一次心跳,但是如果服务端宕机,那么心跳将会超时,客户端会重连。
消费者的日志来源
HeaderExchangeClient#startHeatbeatTimer
消费者创建连接时会创建定时任务
private void startHeatbeatTimer() { stopHeartbeatTimer(); if ( heartbeat > 0 ) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { public Collection<Channel> getChannels() { return Collections.<Channel>singletonList( HeaderExchangeClient.this ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS ); } }
定时任务核心逻辑 有几个关键逻辑:
- 当前时间戳减去最后操作的时间戳,大于心跳时间,则发送心跳。
- 当前时间戳减去最后操作时间不仅大于心跳时间,还大于心跳超时时间,那么可以认为通道预警关闭,需要重连
public void run() { try { long now = System.currentTimeMillis(); for ( Channel channel : channelProvider.getChannels() ) { if (channel.isClosed()) { continue; } try { Long lastRead = ( Long ) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP ); Long lastWrite = ( Long ) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP ); // 当前时间戳,减去最后操作的时间戳大于心跳时间,则发送心跳 if ( ( lastRead != null && now - lastRead > heartbeat ) || ( lastWrite != null && now - lastWrite > heartbeat ) ) { Request req = new Request(); req.setVersion( "2.0.0" ); req.setTwoWay( true ); req.setEvent( Request.HEARTBEAT_EVENT ); // 发送心跳 channel.send( req ); if ( logger.isDebugEnabled() ) { logger.debug( "Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms" ); } } //当前时间戳 减去 最后操作的时间戳不仅大于心跳时间, 还大于了心跳超时时间, 那么可以任务通道已经被关闭, 开始尝试重连 if ( lastRead != null && now - lastRead > heartbeatTimeout ) { logger.warn( "Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms" ); if (channel instanceof Client) { try { // 当连接超时, 执行此方法进行重连 ((Client)channel).reconnect(); }catch (Exception e) { //do nothing } } else { channel.close(); } } } catch ( Throwable t ) { logger.warn( "Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t ); } } } catch ( Throwable t ) { logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t ); } }
AbstractClient#reconnect 超时重新连接
重连时先断开连接,然后重新连接服务端(生产者)
public void reconnect() throws RemotingException { disconnect(); connect(); }
AbstractClient#connect
创建连接
- 第一步,初始化连接任务 initConnectStatusCheckCommand 方法主要逻辑是创建一个定时任务线程,每隔两秒一次 connect() 方法尝试重连服务端
- 第二步,doConnect()方法的主要逻辑是去连接服务端。
protected void connect() throws RemotingException { connectLock.lock(); try { if (isConnected()) { return; } /** * 1. 初始化重连任务 */ initConnectStatusCheckCommand(); /** *创建连接 **/ doConnect(); if (! isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getTimeout() + "ms."); } else { if (logger.isInfoEnabled()){ logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", channel is " + this.getChannel()); } } reconnect_count.set(0); reconnect_error_log_flag.set(false); } catch (RemotingException e) { throw e; } catch (Throwable e) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); } finally { connectLock.unlock(); } }
AbstractClient#initConnectStatusCheckCommand
private synchronized void initConnectStatusCheckCommand(){ //reconnect=false to close reconnect int reconnect = getReconnectParam(getUrl()); if(reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())){ Runnable connectStatusCheckCommand = new Runnable() { public void run() { try { if (! isConnected()) { /** 在定时任务中执行 connect()方法 取重新初始化 reconnectExecutorFuture l连接重连 **/ connect(); } else { lastConnectedTime = System.currentTimeMillis(); } } catch (Throwable t) { String errorMsg = "client reconnect to "+getUrl().getAddress()+" find error . url: "+ getUrl(); // wait registry sync provider list if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout){ if (!reconnect_error_log_flag.get()){ reconnect_error_log_flag.set(true); logger.error(errorMsg, t); return ; } } if ( reconnect_count.getAndIncrement() % reconnect_warning_period == 0){ logger.warn(errorMsg, t); } } } }; // 每隔2秒,尝试一次重连 reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); } }
NettyClient#doConnect
protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); ChannelFuture future = bootstrap.connect(getConnectAddress()); try{ boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.getChannel(); newChannel.setInterestOps(Channel.OP_READ_WRITE); try { // 关闭旧的连接 Channel oldChannel = NettyClient.this.channel; // copy reference if (oldChannel != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); } oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { /** 如果 Netty 客户端为关闭状态,则关闭新创建的Channel **/ if (NettyClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info("Close new netty channel " + newChannel + ", because the client closed."); } newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } else if (future.getCause() != null) { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause()); } else { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); } }finally{ if (! isConnected()) { future.cancel(); } } }
AbstractClient#disconnect
断开连接时,执行的是 destroyConnectStatusCheckCommand 方法,该方法的主要逻辑是取消connected()方法执行时创建的重连任务reconnectExecutorFuture。cancel 掉
public void disconnect() { connectLock.lock(); try { destroyConnectStatusCheckCommand(); try { Channel channel = getChannel(); if (channel != null) { channel.close(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { doDisConnect(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } finally { connectLock.unlock(); } } private synchronized void destroyConnectStatusCheckCommand(){ try { if (reconnectExecutorFuture != null && ! reconnectExecutorFuture.isDone()){ /** * 关闭重连任务, 定时任务取消,不再进行重连 * bug: 满足上面的前提是 reconnectExecutorFuture.cancel(true)执行时, 重连的定时任务线程并没有执行到connect()处 * 否则, 由于zookeeper只会通知一次取消定时任务, 但是在connect()方法中又重新创建了一个定时任务, 这将会导致定时任务将不会再被取消, 客户端将一直进行重连 */ reconnectExecutorFuture.cancel(true); //清除线程的一些资源信息 reconnectExecutorService.purge(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
消费者调用服务过程可能出现什么问题?
关键看下面这段代码:
public void disconnect() { connectLock.lock(); try { destroyConnectStatusCheckCommand(); try { Channel channel = getChannel(); if (channel != null) { channel.close(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { doDisConnect(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } finally { connectLock.unlock(); } } private synchronized void destroyConnectStatusCheckCommand(){ try { if (reconnectExecutorFuture != null && ! reconnectExecutorFuture.isDone()){ reconnectExecutorFuture.cancel(true); reconnectExecutorService.purge(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
通过查看 disconnect 调用链可以看到如下:
在关闭连接的时候和重连的时候会调用 disconnect 方法。这样就存在一个问题:加入线程A 进行 重连, 线程 B 关闭连接。加入此时,线程A 已经指向到了 connect方法,但是还未执行。线程B 执行到了 reconnectExecutorFuture.cancel(true) 方法并将 重连任务取消了。此时 线程A 再次进入Connect 方法,在执行到 connect 方法中的 initConnectStatusCheckCommand 方法时,有如下判断:
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled() ))
由于线程 B 已经将 reconnectExecutorFuture 取消了,上面的判断是返回 true ,因此会 执行如下代码:
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
意味着还是会启动一个重连任务。由于 zookeeper的节点变更事件只会通知一次,之后disconnect 中的 destroyConnectStatusCheckCommand() 方法不再会被执行,因此这个重连的定时任务会一直执行下去。
- 由于定时重连任务一直存在,每执行一次重连任务,都会创建一个新的channel, 此时消费者可以连接到服务提供者。
- 其次当 zookeeper 发送节点变更通知时,会去关闭已经失去连接的 NettyClient (服务端重启将创建一个新的NettyClient连接去连接服务器),并将此客户端关闭标识 Close 设置成 true。因此会去关闭刚刚创建的 channel,客户端 channel 关闭后也就导致了服务器将不能连接到该 channel,会报错 disconnect from xxx 错误。
服务提供者日志来源
DubboProtocol协议
public class DubboProtocol extendsAbstractProtocol { ......省略部分代码 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throwsRemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation)message; Invoker<?> invoker =getInvoker(channel, inv); //如果是callback 需要处理高版本调用低版本的问题 if(Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr =invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null ||methodsStr.indexOf(",") == -1) { hasMethod =inv.getMethodName().equals(methodsStr); } else { String[] methods =methodsStr.split(","); for (String method :methods) { if(inv.getMethodName().equals(method)) { hasMethod =true; break; } } } if (!hasMethod) { logger.warn(newIllegalStateException("The methodName " + inv.getMethodName() +" not found in callback service interface ,invoke will be ignored. pleaseupdate the api interface. url is:" + invoker.getUrl()) + ",invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " +message == null ? null : (message.getClass().getName() + ": " +message) + ", channel: consumer: " + channel.getRemoteAddress() +" --> provider: " + channel.getLocalAddress()); } @Override public void received(Channel channel, Object message) throwsRemotingException { if (message instanceof Invocation) { reply((ExchangeChannel)channel, message); } else { super.received(channel,message); } } @Override public void connected(Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isInfoEnabled()) { logger.info("disconected from " + channel.getRemoteAddress() + ",url:" +channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(),methodKey); if (invocation != null) { try { received(channel,invocation); } catch (Throwable t) { logger.warn("Failed toinvoke event method " + invocation.getMethodName() + "(), cause:" + t.getMessage(), t); } } } private Invocation createInvocation(Channel channel, URL url, StringmethodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, newClass<?>[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY,url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY,url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY,url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY,Boolean.TRUE.toString()); } return invocation; } } ...省略部分代码 private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key,createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } } private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY,Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str +", url: " + url); url = url.addParameter(Constants.CODEC_KEY,Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url +") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes =ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw newRpcException("Unsupported client type: " + str); } } return server; } }
requestHandler绑定到了provider的url上(DubboProtocol的openServer方法),用于响应dubbo的连接、断开、调用等请求,如果consumer到这个provider的连接断开了,就输出日志(requestHandler的disconnected方法)disconected from 日志输出。
总结
主要原因是服务调用者(消费者),在不断重连(断开连接,然后连接)channel在不断的被关闭和新建,主要服务提供方响应连接断开情况,服务提供者(生产者)就不断在打印 disconnect from xxx 日志。