Dubbo线程模型设计解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 该文章主要介绍了Dubbo线程模型的设计解析,包括Dubbo作为一个支持大量并发请求的网络框架的特点,以及其线程模型的工作原理。

前言

Dubbo是一个支持大量并发请求的网络框架,单机TPS能够达到1w,这种并发处理请求的能力和它的线程模型是分不开的。

在提供者处理请求这一端,Dubbo通过多线程同时处理多个客户端请求。

Dubbo底层是使用netty作为通信组件的,了解Dubbo的线程模型之前我们先了解下Netty的线程模型,在Dubbo中使用的是netty的主从 Reactor 多线程模式, 如下图:

在这种模式中,客户端的连接事件和读写数据的事件由不同的线程处理,一般连接事件使用1个线程处理,读写数据的事件交给线程池处理。在dubbo的源码中可以看出使用的是这种模式。

       @Override
    protected void doOpen() throws Throwable {
   
   
        bootstrap = new ServerBootstrap();
        //处理连接事件的线程
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        //处理读写数据事件的线程池
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
​
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
   
   
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
   
   
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

上面讲的是Netty的线程池模型,而在Dubbo中,Dubbo自身框架也有一套线程池模型,它和Netty的线程池模型是有一定关系的。

上图是Dubbo的线程派发流程,在Netty的线程池把请求转发到Dubbo的handler时候,会进行请求分发,这个时候就可能会通过Dubbo自身的线程来处理业务请求了。

Dubbo中的线程派发策略

dubbo总共有5类线程分发器,不同的线程分发器代表不同的线程派发策略,表示哪类消息会使用dubbo自身的线程池处理,默认使用AllDispatcher。

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。默认的
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

比如AllDispatcher分发器使用AllChannelHandler处理消息,这种处理器会使用dubbo自身线程池处理请求,响应,连接,端口,心跳所有的和客户端交互的事件。源码如下:

public class AllChannelHandler extends WrappedChannelHandler {
   
   public AllChannelHandler(ChannelHandler handler, URL url) {
   
   
        super(handler, url);
    }
    //连接事件
    @Override
    public void connected(Channel channel) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    //断开事件
    @Override
    public void disconnected(Channel channel) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    //接收读事件
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
   
   
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
          if(message instanceof Request && t instanceof RejectedExecutionException){
   
   
            Request request = (Request)message;
            if(request.isTwoWay()){
   
   
              String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
              Response response = new Response(request.getId(), request.getVersion());
              response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
              response.setErrorMessage(msg);
              channel.send(response);
              return;
            }
          }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    //异常事件
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

Dubbo线程池分类

总共4种线程池,默认使用固定大小的线程池。

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(默认)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

如果线程如果满了,dubbo会抛异常,按如下提示打印日志。

总结

本文介绍了netty线程池模型和dubbo线程池模型的关系,也分析了dubbo线程池模型分发的策略,最后对dubbo线程池分类进行了分析总结。

相关文章
|
19天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
13天前
|
机器学习/深度学习 存储 人工智能
让模型评估模型:构建双代理RAG评估系统的步骤解析
在当前大语言模型(LLM)应用开发中,评估模型输出的准确性成为关键问题。本文介绍了一个基于双代理的RAG(检索增强生成)评估系统,使用生成代理和反馈代理对输出进行评估。文中详细描述了系统的构建过程,并展示了基于四种提示工程技术(ReAct、思维链、自一致性和角色提示)的不同结果。实验结果显示,ReAct和思维链技术表现相似,自一致性技术则呈现相反结果,角色提示技术最为不稳定。研究强调了多角度评估的重要性,并提供了系统实现的详细代码。
41 10
让模型评估模型:构建双代理RAG评估系统的步骤解析
|
23天前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
67 20
剖析 Redis List 消息队列的三种消费线程模型
|
13天前
|
缓存 Java 应用服务中间件
Java虚拟线程探究与性能解析
本文主要介绍了阿里云在Java-虚拟-线程任务中的新进展和技术细节。
|
10天前
|
缓存 负载均衡 Dubbo
Dubbo技术深度解析及其在Java中的实战应用
Dubbo是一款由阿里巴巴开源的高性能、轻量级的Java分布式服务框架,它致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。
32 6
|
1月前
|
机器学习/深度学习 数据采集 存储
一文读懂蒙特卡洛算法:从概率模拟到机器学习模型优化的全方位解析
蒙特卡洛方法起源于1945年科学家斯坦尼斯劳·乌拉姆对纸牌游戏中概率问题的思考,与约翰·冯·诺依曼共同奠定了该方法的理论基础。该方法通过模拟大量随机场景来近似复杂问题的解,因命名灵感源自蒙特卡洛赌场。如今,蒙特卡洛方法广泛应用于机器学习领域,尤其在超参数调优、贝叶斯滤波等方面表现出色。通过随机采样超参数空间,蒙特卡洛方法能够高效地找到优质组合,适用于处理高维度、非线性问题。本文通过实例展示了蒙特卡洛方法在估算圆周率π和优化机器学习模型中的应用,并对比了其与网格搜索方法的性能。
163 1
|
2月前
|
机器学习/深度学习 数据挖掘
机器学习模型的选择与评估:技术深度解析
【8月更文挑战第21天】机器学习模型的选择与评估是一个复杂而重要的过程。通过深入理解问题、选择合适的评估指标和交叉验证方法,我们可以更准确地评估模型的性能,并选择出最适合当前问题的模型。然而,机器学习领域的发展日新月异,新的模型和评估方法不断涌现。因此,我们需要保持对新技术的学习和关注,不断优化和改进我们的模型选择与评估策略。
|
2月前
|
机器学习/深度学习 自然语言处理 负载均衡
揭秘混合专家(MoE)模型的神秘面纱:算法、系统和应用三大视角全面解析,带你领略深度学习领域的前沿技术!
【8月更文挑战第19天】在深度学习领域,混合专家(Mixture of Experts, MoE)模型通过整合多个小型专家网络的输出以实现高性能。从算法视角,MoE利用门控网络分配输入至专家网络,并通过组合机制集成输出。系统视角下,MoE需考虑并行化、通信开销及负载均衡等优化策略。在应用层面,MoE已成功应用于Google的BERT模型、Facebook的推荐系统及Microsoft的语音识别系统等多个场景。这是一种强有力的工具,能够解决复杂问题并提升效率。
55 2
|
2月前
|
开发者 算法 虚拟化
惊爆!Uno Platform 调试与性能分析终极攻略,从工具运用到代码优化,带你攻克开发难题成就完美应用
【8月更文挑战第31天】在 Uno Platform 中,调试可通过 Visual Studio 设置断点和逐步执行代码实现,同时浏览器开发者工具有助于 Web 版本调试。性能分析则利用 Visual Studio 的性能分析器检查 CPU 和内存使用情况,还可通过记录时间戳进行简单分析。优化性能涉及代码逻辑优化、资源管理和用户界面简化,综合利用平台提供的工具和技术,确保应用高效稳定运行。
39 0
|
2月前
|
机器学习/深度学习 搜索推荐 数据挖掘
【深度解析】超越RMSE和MSE:揭秘更多机器学习模型性能指标,助你成为数据分析高手!
【8月更文挑战第17天】本文探讨机器学习模型评估中的关键性能指标。从均方误差(MSE)和均方根误差(RMSE)入手,这两种指标对较大预测偏差敏感,适用于回归任务。通过示例代码展示如何计算这些指标及其它如平均绝对误差(MAE)和决定系数(R²)。此外,文章还介绍了分类任务中的准确率、精确率、召回率和F1分数,并通过实例说明这些指标的计算方法。最后,强调根据应用场景选择合适的性能指标的重要性。
150 0

推荐镜像

更多
下一篇
无影云桌面