前言
Dubbo是一个支持大量并发请求的网络框架,单机TPS能够达到1w,这种并发处理请求的能力和它的线程模型是分不开的。
在提供者处理请求这一端,Dubbo通过多线程同时处理多个客户端请求。
Dubbo底层是使用netty作为通信组件的,了解Dubbo的线程模型之前我们先了解下Netty的线程模型,在Dubbo中使用的是netty的主从 Reactor 多线程模式, 如下图:
在这种模式中,客户端的连接事件和读写数据的事件由不同的线程处理,一般连接事件使用1个线程处理,读写数据的事件交给线程池处理。在dubbo的源码中可以看出使用的是这种模式。
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
//处理连接事件的线程
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
//处理读写数据事件的线程池
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
上面讲的是Netty的线程池模型,而在Dubbo中,Dubbo自身框架也有一套线程池模型,它和Netty的线程池模型是有一定关系的。
上图是Dubbo的线程派发流程,在Netty的线程池把请求转发到Dubbo的handler时候,会进行请求分发,这个时候就可能会通过Dubbo自身的线程来处理业务请求了。
Dubbo中的线程派发策略
dubbo总共有5类线程分发器,不同的线程分发器代表不同的线程派发策略,表示哪类消息会使用dubbo自身的线程池处理,默认使用AllDispatcher。
all
所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。默认的direct
所有消息都不派发到线程池,全部在 IO 线程上直接执行。message
只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。execution
只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。connection
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
比如AllDispatcher分发器使用AllChannelHandler处理消息,这种处理器会使用dubbo自身线程池处理请求,响应,连接,端口,心跳所有的和客户端交互的事件。源码如下:
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
//连接事件
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
//断开事件
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
//接收读事件
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
//异常事件
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
Dubbo线程池分类
总共4种线程池,默认使用固定大小的线程池。
fixed
固定大小线程池,启动时建立线程,不关闭,一直持有。(默认)cached
缓存线程池,空闲一分钟自动删除,需要时重建。limited
可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。eager
优先创建Worker
线程池。在任务数量大于corePoolSize
但是小于maximumPoolSize
时,优先创建Worker
来处理任务。当任务数量大于maximumPoolSize
时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException
。(相比cached
:cached
在任务数量超过maximumPoolSize
时直接抛出异常而不是将任务放入阻塞队列)
如果线程如果满了,dubbo会抛异常,按如下提示打印日志。
总结
本文介绍了netty线程池模型和dubbo线程池模型的关系,也分析了dubbo线程池模型分发的策略,最后对dubbo线程池分类进行了分析总结。