开发者社区> Java技术进阶> 正文

Dubbo NettyServer 消息分发策略

简介:  这篇文章的目标是分析清楚Dubbo NettyServer的消息分发策略,会分析Handler的封装和调用过程,最后分析Dubbo NettyServer支持的所有分发策略。 前半部分会讲解清楚Handler的封装流程和调用过程,具体的关系如下图。

 这篇文章的目标是分析清楚Dubbo NettyServer的消息分发策略,会分析Handler的封装和调用过程,最后分析Dubbo NettyServer支持的所有分发策略。

 前半部分会讲解清楚Handler的封装流程和调用过程,具体的关系如下图。我们关注 MultiMessageHandler => HeartbeatHandler => AllChannelHandler的封装过程及对应的调用过程。

 后半部分会分析Dubbo NettyServer支持的所有分发策略及对应处理方法,这里会涉及到Dubbo业务线程池的解析。
Handler封装流程图


ChannelHandler封装过程

  • Dubbo Protocol 服务发布篇我们已经分析了NettyServer创建的过程,这里分析NettyServer创建过程中关于ChannelHandler的封装过程。
public class NettyServer extends AbstractServer implements Server {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private Map<String, Channel> channels; // <ip:port, channel>

    private ServerBootstrap bootstrap;

    private org.jboss.netty.channel.Channel channel;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}
  • NettyServer中关注ChannelHandlers.wrap()过程,这里是ChannelHandler的封装入口,这里的handler对象是DecodeHandler。


public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
  • ChannelHandlers的wrap()方法内部实现了MultiMessageHandler => HeartbeatHandler => AllChannelHandler的封装。
  • AllChannelHandler从获取通过动态代理获取方式获取。ExtensionLoader.getExtensionLoader().getAdaptiveExtension().dispatch(handler, url)
  • 关注下Dispatcher$Adaptive的实现。内部通过SPI获取AllChannelHandler对象。


public class Dispatcher$Adaptive implements Dispatcher {
    public ChannelHandler dispatch(ChannelHandler channelHandler, URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }

        URL uRL2 = uRL;
        String string = uRL2.getParameter("dispatcher",
                uRL2.getParameter("dispather",
                    uRL2.getParameter("channel.handler", "all")));

        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append(
                    "Failed to get extension (org.apache.dubbo.remoting.Dispatcher) name from url (")
                                                              .append(uRL2.toString())
                                                              .append(") use keys([dispatcher, dispather, channel.handler])")
                                                              .toString());
        }

        Dispatcher dispatcher = (Dispatcher) ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                                            .getExtension(string);

        return dispatcher.dispatch(channelHandler, uRL);
    }
}


public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
  • Dispatcher$Adaptive内部获取扩展名然后获取Dispatcher对象。
  • uRL2.getParameter("dispatcher",
    uRL2.getParameter("dispather",

uRL2.getParameter("channel.handler", "all"))过程记录原来错别字dispather,默认值是all,返回的AllDispatcher对象

  • 至此ChannelHandler的封装过程已经分析清楚。


Handler分析

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
}
  • MultiMessageHandler为AbstractChannelHandlerDelegate子类,重载received()方法。
  • 其他connected()、disconnected()等方法都继承自AbstractChannelHandlerDelegate类。
  • MultiMessageHandler类的handler对象是HeartbeatHandler,所有调用都会调用HeartbeatHandler对应的方法。
    MultiMessageHandler的handler的定义在父类AbstractChannelHandlerDelegate。


public class HeartbeatHandler extends AbstractChannelHandlerDelegate {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

    public static final String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";

    public static final String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";

    public HeartbeatHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        setReadTimestamp(channel);
        setWriteTimestamp(channel);
        handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        clearReadTimestamp(channel);
        clearWriteTimestamp(channel);
        handler.disconnected(channel);
    }
}
  • HeartbeatHandler为AbstractChannelHandlerDelegate子类,重载connected()/disconnected()等方法。
  • HeartbeatHandler的handler对象是AllChannelHandler,所有调用都会调用AllChannelHandler对应的方法。
  • HeartbeatHandler的handler的定义在父类AbstractChannelHandlerDelegate。


public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
    // handler变量保存
    protected ChannelHandler handler;

    protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
        Assert.notNull(handler, "handler == null");
        this.handler = handler;
    }

    @Override
    public ChannelHandler getHandler() {
        if (handler instanceof ChannelHandlerDelegate) {
            return ((ChannelHandlerDelegate) handler).getHandler();
        }
        return handler;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        handler.disconnected(channel);
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
        handler.sent(channel, message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        handler.received(channel, message);
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        handler.caught(channel, exception);
    }
}
  • AbstractChannelHandlerDelegate类实现ChannelHandlerDelegate接口。
  • AbstractChannelHandlerDelegate包含ChannelHandler handler对象。
  • HeartbeatHandler和MultiMessageHandler的handler是在AbstractChannelHandlerDelegate中定义的。


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() 
                                           + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() +
                 " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information 
            //can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, 
            // and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                                  ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}


public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
    protected static final ExecutorService SHARED_EXECUTOR = 
       Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
                          .getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}
  • AllChannelHandler继承WrappedChannelHandler类,重载如connected()等方法。
  • AllChannelHandler内部的executor对象在WrappedChannelHandler类中定义。
  • ExtensionLoader.getExtensionLoader().getAdaptiveExtension().getExecutor()
    生成executor对象。
  • ExtensionLoader.getExtensionLoader().getAdaptiveExtension()返回ThreadPool$Adaptive。

public class ThreadPool$Adaptive implements ThreadPool {
    public Executor getExecutor(URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("threadpool", "fixed");
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().
             append("Failed to get extension (org.apache.dubbo.common.threadpool.ThreadPool) name from url (")
             .append(uRL2.toString()).append(") use keys([threadpool])").toString());
        }
        ThreadPool threadPool = (ThreadPool)ExtensionLoader.getExtensionLoader(ThreadPool.class)
                   .getExtension(string);
        return threadPool.getExecutor(uRL);
    }
}
  • ThreadPool$Adaptive的通过ExtensionLoader.getExtensionLoader().getExtension()获取ThreadPool对象。
  • 返回的ThreadPool对象是FixedThreadPool对象。
  • WrappedChannelHandler中的executor对象是FixedThreadPool对象。
  • WrappedChannelHandler中的executor对象是业务线程池,是FixedThreadPool类型。

至此,我们了解到默认的消息分发策略为AllChannelHandler,AllChannelHandler内部的线程池executor为FixedThreadPool类型。继续分析消息分发策略


消息分发策略

  • Dubbo中NettyServer的消息分发策略如下图所示,各类消息分发策略有对应说明。
    消息分发策略
  • 针对不同的分发策略,我们从源码角度进行分析下。
org.apache.dubbo.remoting.Dispatcher文件

all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher


name handler desc
all AllChannelHandler 将所有请求、连接等事件都分发到ThreadPool扩展点指定的线程池中,如果ThreadPool指定的线程池已关闭或不存在则使用默认的一个cached线程池
connection ConnectionOrderedChannelHandler 将所有请求、异常都分发到ThreadPool扩展点指定的线程池中。而连接等事件发送到只有一个工作线程的线程池内按调用接收顺序逐个执行
direct 不使用多线程,在当前的接收线程(IO线程)上直接处理所有事件、请求
execution ExecutionChannelHandler 只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行
message MessageOnlyChannelHandler 只将所有请求都分发到ThreadPool扩展点指定的线程池中,事件直接由接收线程(IO线程)处理


WrappedChannelHandler

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
    protected static final ExecutorService SHARED_EXECUTOR = 
       Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    // 核心的业务线程池ExecutorService对象
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
             .getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

    public ExecutorService getExecutorService() {
        return executor == null || executor.isShutdown() ? SHARED_EXECUTOR : executor;
    }
}
  • WrappedChannelHandler内部包含业务处理线程池executor。
  • 业务线程池通过适配器方法返回的FixedThreadPool对象。


AllDispatcher

  • all : (AllDispatcher类)所有消息都派发到业务线程池,这些消息包括请求/响应/连接事件/断开事件/心跳等,这些线程模型如下图:

AllDispatcher

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}
  • AllDispatcher将所有请求、事件都分发到ThreadPool扩展点指定的线程池中,如果ThreadPool指定的线程池已关闭或不存在则使用默认的一个cached线程池。
  • AllDispatcher内部返回AllChannelHandler()对象。
  • AllDispatcher作为Dubbo默认的消息分发策略。


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + 
                    " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + 
                  " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information 
            //can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and 
            //causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                                       ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}
  • AllChannelHandler重载了WrappedChannelHandler方法。
  • AllChannelHandler将所有请求、事件都分发到ThreadPool扩展点指定的线程池中,如果ThreadPool指定的线程池已关闭或不存在则使用默认的一个cached线程池。
  • AllChannelHandler处理了所有的请求、事件,包括连接等事件和具体的消息处理等。


MessageOnlyDispatcher

  • message : (MessageOnlyDispatcher类)只有请求响应消息派发到业务线程池,其他连接断开事件/心跳等消息,直接在IO线程上执行,模型图如下:

MessageOnlyDispatcher

public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }
}
  • MessageOnlyDispatcher返回的是MessageOnlyChannelHandler对象。


public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}
  • MessageOnlyDispatcher只将所有请求都分发到ThreadPool扩展点指定的线程池中,连接等事件直接由接收线程(IO线程)处理。
  • MessageOnlyDispatcher重载received()方法,用于处理所有请求。


ExecutionDispatcher

  • execution:(ExecutionDispatcher类)只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行,模型如下图:

ExecutionDispatcher

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}
  • ExecutionDispatcher返回的是ExecutionChannelHandler对象。


/**
 * Only request message will be dispatched to thread pool. Other messages like response, connect, disconnect,
 * heartbeat will be directly executed by I/O thread.
 */
public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}
  • ExecutionChannelHandler只把请求类消息派发到业务线程池处理,但是响应和其它连接断开事件,心跳等消息直接在IO线程上执行。
  • ExecutionChannelHandler重载received()方法,只处理Request类型的Message。


ConnectionOrderedDispatcher

  • connection:(ConnectionOrderedDispatcher类)在IO线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池处理,模型如下图:

ConnectionOrderedDispatcher

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }
}
  • ConnectionOrderedDispatcher返回ConnectionOrderedChannelHandler对象。


public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() 
                 + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            // fix, reject exception can not be sent to consumer 
            // because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + 
                         ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " 
               + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}
  • ConnectionOrderedChannelHandler在IO线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池处理。
  • ConnectionOrderedChannelHandler包含两个线程池,一个是IO线程池,一个是业务线程池。这里的IO线程池是区别于NettyServer的Worker线程池。
  • IO线程池connectionExecutor是单个执行线程的线程池实现事件的有序处理。
  • ConnectionOrderedChannelHandler重载received()/connected()等事件。


DirectDispatcher

  • direct : (DirectDispacher类)所有消息都不派发到业务线程池,全部在IO线程上直接执行,模型如下图:

DirectDispatcher

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}
  • DirectDispatcher内部没有任何线程池对象,所有的请求事件都由NettyServer的worker线程进行处理。


参考

Dubbo 线程池策略和线程模型

版权声明:本文中所有内容均属于阿里云开发者社区所有,任何媒体、网站或个人未经阿里云开发者社区协议授权不得转载、链接、转贴或以其他方式复制发布/发表。申请授权请邮件developerteam@list.alibaba-inc.com,已获得阿里云开发者社区协议授权的媒体、网站,在转载使用时必须注明"稿件来源:阿里云开发者社区,原文作者姓名",违者本社区将依法追究责任。 如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
上一篇:DubboProtocol 服务发布 下一篇:Dubbo ZookeeperRegistry分析
官网链接