Dubbo线程模型设计解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 该文章主要介绍了Dubbo线程模型的设计解析,包括Dubbo作为一个支持大量并发请求的网络框架的特点,以及其线程模型的工作原理。

前言

Dubbo是一个支持大量并发请求的网络框架,单机TPS能够达到1w,这种并发处理请求的能力和它的线程模型是分不开的。

在提供者处理请求这一端,Dubbo通过多线程同时处理多个客户端请求。

Dubbo底层是使用netty作为通信组件的,了解Dubbo的线程模型之前我们先了解下Netty的线程模型,在Dubbo中使用的是netty的主从 Reactor 多线程模式, 如下图:

在这种模式中,客户端的连接事件和读写数据的事件由不同的线程处理,一般连接事件使用1个线程处理,读写数据的事件交给线程池处理。在dubbo的源码中可以看出使用的是这种模式。

       @Override
    protected void doOpen() throws Throwable {
   
   
        bootstrap = new ServerBootstrap();
        //处理连接事件的线程
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        //处理读写数据事件的线程池
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
​
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
   
   
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
   
   
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

上面讲的是Netty的线程池模型,而在Dubbo中,Dubbo自身框架也有一套线程池模型,它和Netty的线程池模型是有一定关系的。

上图是Dubbo的线程派发流程,在Netty的线程池把请求转发到Dubbo的handler时候,会进行请求分发,这个时候就可能会通过Dubbo自身的线程来处理业务请求了。

Dubbo中的线程派发策略

dubbo总共有5类线程分发器,不同的线程分发器代表不同的线程派发策略,表示哪类消息会使用dubbo自身的线程池处理,默认使用AllDispatcher。

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。默认的
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

比如AllDispatcher分发器使用AllChannelHandler处理消息,这种处理器会使用dubbo自身线程池处理请求,响应,连接,端口,心跳所有的和客户端交互的事件。源码如下:

public class AllChannelHandler extends WrappedChannelHandler {
   
   public AllChannelHandler(ChannelHandler handler, URL url) {
   
   
        super(handler, url);
    }
    //连接事件
    @Override
    public void connected(Channel channel) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    //断开事件
    @Override
    public void disconnected(Channel channel) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    //接收读事件
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
   
   
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
          if(message instanceof Request && t instanceof RejectedExecutionException){
   
   
            Request request = (Request)message;
            if(request.isTwoWay()){
   
   
              String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
              Response response = new Response(request.getId(), request.getVersion());
              response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
              response.setErrorMessage(msg);
              channel.send(response);
              return;
            }
          }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    //异常事件
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

Dubbo线程池分类

总共4种线程池,默认使用固定大小的线程池。

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(默认)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

如果线程如果满了,dubbo会抛异常,按如下提示打印日志。

总结

本文介绍了netty线程池模型和dubbo线程池模型的关系,也分析了dubbo线程池模型分发的策略,最后对dubbo线程池分类进行了分析总结。

相关文章
|
21天前
|
机器学习/深度学习 人工智能 算法
模型无关的局部解释(LIME)技术原理解析及多领域应用实践
在当前数据驱动的商业环境中,人工智能(AI)和机器学习(ML)已成为各行业决策的关键工具,但随之而来的是“黑盒”问题:模型内部机制难以理解,引发信任缺失、监管合规难题及伦理考量。LIME(局部可解释模型无关解释)应运而生,通过解析复杂模型的个别预测,提供清晰、可解释的结果。LIME由华盛顿大学的研究者于2016年提出,旨在解决AI模型的透明度问题。它具有模型无关性、直观解释和局部保真度等优点,在金融、医疗等领域广泛应用。LIME不仅帮助企业提升决策透明度,还促进了模型优化和监管合规,是实现可解释AI的重要工具。
58 9
|
10天前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
76 0
|
5天前
|
开发框架 供应链 监控
并行开发模型详解:类型、步骤及其应用解析
在现代研发环境中,企业需要在有限时间内推出高质量的产品,以满足客户不断变化的需求。传统的线性开发模式往往拖慢进度,导致资源浪费和延迟交付。并行开发模型通过允许多个开发阶段同时进行,极大提高了产品开发的效率和响应能力。本文将深入解析并行开发模型,涵盖其类型、步骤及如何通过辅助工具优化团队协作和管理工作流。
|
15天前
|
机器学习/深度学习 搜索推荐 大数据
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
【10月更文挑战第2天】在处理大规模数据集的推荐系统项目时,提高检索模型的召回率成为关键挑战。本文分享了通过改进特征工程(如加入用户活跃时段和物品相似度)和优化模型结构(引入注意力机制)来提升召回率的具体策略与实现代码。严格的A/B测试验证了新模型的有效性,为改善用户体验奠定了基础。这次实践加深了对特征工程与模型优化的理解,并为未来的技术探索提供了方向。
50 2
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
|
7天前
|
机器学习/深度学习 算法 Python
深度解析机器学习中过拟合与欠拟合现象:理解模型偏差背后的原因及其解决方案,附带Python示例代码助你轻松掌握平衡技巧
【10月更文挑战第10天】机器学习模型旨在从数据中学习规律并预测新数据。训练过程中常遇过拟合和欠拟合问题。过拟合指模型在训练集上表现优异但泛化能力差,欠拟合则指模型未能充分学习数据规律,两者均影响模型效果。解决方法包括正则化、增加训练数据和特征选择等。示例代码展示了如何使用Python和Scikit-learn进行线性回归建模,并观察不同情况下的表现。
66 3
|
1月前
|
缓存 Java 应用服务中间件
Java虚拟线程探究与性能解析
本文主要介绍了阿里云在Java-虚拟-线程任务中的新进展和技术细节。
|
18天前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
35 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
6天前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
1月前
|
缓存 负载均衡 Dubbo
Dubbo技术深度解析及其在Java中的实战应用
Dubbo是一款由阿里巴巴开源的高性能、轻量级的Java分布式服务框架,它致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。
58 6
|
17天前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
18 0

推荐镜像

更多