Dubbo线程模型设计解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 该文章主要介绍了Dubbo线程模型的设计解析,包括Dubbo作为一个支持大量并发请求的网络框架的特点,以及其线程模型的工作原理。

前言

Dubbo是一个支持大量并发请求的网络框架,单机TPS能够达到1w,这种并发处理请求的能力和它的线程模型是分不开的。

在提供者处理请求这一端,Dubbo通过多线程同时处理多个客户端请求。

Dubbo底层是使用netty作为通信组件的,了解Dubbo的线程模型之前我们先了解下Netty的线程模型,在Dubbo中使用的是netty的主从 Reactor 多线程模式, 如下图:

在这种模式中,客户端的连接事件和读写数据的事件由不同的线程处理,一般连接事件使用1个线程处理,读写数据的事件交给线程池处理。在dubbo的源码中可以看出使用的是这种模式。

       @Override
    protected void doOpen() throws Throwable {
   
   
        bootstrap = new ServerBootstrap();
        //处理连接事件的线程
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        //处理读写数据事件的线程池
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
​
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
   
   
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
   
   
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

上面讲的是Netty的线程池模型,而在Dubbo中,Dubbo自身框架也有一套线程池模型,它和Netty的线程池模型是有一定关系的。

上图是Dubbo的线程派发流程,在Netty的线程池把请求转发到Dubbo的handler时候,会进行请求分发,这个时候就可能会通过Dubbo自身的线程来处理业务请求了。

Dubbo中的线程派发策略

dubbo总共有5类线程分发器,不同的线程分发器代表不同的线程派发策略,表示哪类消息会使用dubbo自身的线程池处理,默认使用AllDispatcher。

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。默认的
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

比如AllDispatcher分发器使用AllChannelHandler处理消息,这种处理器会使用dubbo自身线程池处理请求,响应,连接,端口,心跳所有的和客户端交互的事件。源码如下:

public class AllChannelHandler extends WrappedChannelHandler {
   
   public AllChannelHandler(ChannelHandler handler, URL url) {
   
   
        super(handler, url);
    }
    //连接事件
    @Override
    public void connected(Channel channel) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    //断开事件
    @Override
    public void disconnected(Channel channel) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    //接收读事件
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
   
   
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
          if(message instanceof Request && t instanceof RejectedExecutionException){
   
   
            Request request = (Request)message;
            if(request.isTwoWay()){
   
   
              String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
              Response response = new Response(request.getId(), request.getVersion());
              response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
              response.setErrorMessage(msg);
              channel.send(response);
              return;
            }
          }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    //异常事件
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
   
   
        ExecutorService executor = getExecutorService();
        try {
   
   
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
   
   
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

Dubbo线程池分类

总共4种线程池,默认使用固定大小的线程池。

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(默认)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

如果线程如果满了,dubbo会抛异常,按如下提示打印日志。

总结

本文介绍了netty线程池模型和dubbo线程池模型的关系,也分析了dubbo线程池模型分发的策略,最后对dubbo线程池分类进行了分析总结。

相关文章
|
19天前
|
机器学习/深度学习 人工智能 PyTorch
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
本文探讨了Transformer模型中变长输入序列的优化策略,旨在解决深度学习中常见的计算效率问题。文章首先介绍了批处理变长输入的技术挑战,特别是填充方法导致的资源浪费。随后,提出了多种优化技术,包括动态填充、PyTorch NestedTensors、FlashAttention2和XFormers的memory_efficient_attention。这些技术通过减少冗余计算、优化内存管理和改进计算模式,显著提升了模型的性能。实验结果显示,使用FlashAttention2和无填充策略的组合可以将步骤时间减少至323毫秒,相比未优化版本提升了约2.5倍。
35 3
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
|
20天前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
37 10
|
21天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
50 12
|
20天前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
38 4
|
20天前
|
算法 调度 开发者
多线程编程核心:上下文切换深度解析
在多线程编程中,上下文切换是一个至关重要的概念,它直接影响到程序的性能和响应速度。本文将深入探讨上下文切换的含义、原因、影响以及如何优化,帮助你在工作和学习中更好地理解和应用多线程技术。
31 4
|
20天前
|
Java 调度 Android开发
安卓与iOS开发中的线程管理差异解析
在移动应用开发的广阔天地中,安卓和iOS两大平台各自拥有独特的魅力。如同东西方文化的差异,它们在处理多线程任务时也展现出不同的哲学。本文将带你穿梭于这两个平台之间,比较它们在线程管理上的核心理念、实现方式及性能考量,助你成为跨平台的编程高手。
|
24天前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
29天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
25天前
|
机器学习/深度学习 人工智能 自然语言处理
探索深度学习与自然语言处理的前沿技术:Transformer模型的深度解析
探索深度学习与自然语言处理的前沿技术:Transformer模型的深度解析
76 0

推荐镜像

更多
下一篇
DataWorks