45 张图深度解析 Netty 架构与原理(五)

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 作为一个学 Java 的,如果没有研究过 Netty,那么你对 Java 语言的使用和理解仅仅停留在表面水平,会点 SSH 写几个 MVC,访问数据库和缓存,这些只是初等 Java 程序员干的事。如果你要进阶,想了解 Java 服务器的深层高阶知识,Netty 绝对是一个必须要过的门槛。 接下来我们会学习一个 Netty 系列教程,Netty 系列由「架构与原理」,「源码」,「架构」三部分组成,今天我们先来看看第一部分:Netty 架构与原理初探,大纲如下:

2.5. Netty 的 Handler 组件

无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都继承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 又继承于 ChannelHandlerAdapter,ChannelHandlerAdapter 又实现了 ChannelHandler:

public class ChannelInboundHandlerAdapter 
    extends ChannelHandlerAdapter 
    implements ChannelInboundHandler {
    ......
public abstract class ChannelHandlerAdapter 
    implements ChannelHandler {
    ......

因此无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都可以统称为 ChannelHandler。

Netty 中的 ChannelHandler 的作用是,在当前 ChannelHandler 中处理 IO 事件,并将其传递给 ChannelPipeline 中下一个 ChannelHandler 处理,因此多个 ChannelHandler 形成一个责任链,责任链位于 ChannelPipeline 中。

数据在基于 Netty 的服务器或客户端中的处理流程是:读取数据-->解码数据-->处理数据-->编码数据-->发送数据。其中的每个过程都用得到 ChannelHandler 责任链。

125.jpg

Netty 中的 ChannelHandler 体系如下(第一张图来源于网络):

126.jpg127.jpg

其中:

  • ChannelInboundHandler 用于处理入站 IO 事件
  • ChannelOutboundHandler 用于处理出站 IO 事件
  • ChannelInboundHandlerAdapter 用于处理入站 IO 事件
  • ChannelOutboundHandlerAdapter 用于处理出站 IO 事件

ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,如果事件的方向是从客户端到服务器的,我们称事件是出站的,那么客户端发送给服务器的数据会通过 Pipeline 中的一系列 ChannelOutboundHandler 进行处理;如果事件的方向是从服务器到客户端的,我们称事件是入站的,那么服务器发送给客户端的数据会通过 Pipeline 中的一系列 ChannelInboundHandler 进行处理。

128.jpg

无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都继承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 提供的方法如下:

129.jpg

从方法名字可以看出,它们在不同的事件发生后被触发,例如注册 Channel 时执行 channelRegistred()、添加 ChannelHandler 时执行 handlerAdded()、收到入站数据时执行 channelRead()、入站数据读取完毕后执行 channelReadComplete()等等。

2.6. Netty 的 Pipeline 组件

上一节说到,Netty 的 ChannelPipeline,它维护了一个 ChannelHandler 责任链,负责拦截或者处理 inbound(入站)和 outbound(出站)的事件和操作。这一节给出更深层次的描述。

ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个 ChannelHandler 如何相互交互。

每个 Netty Channel 包含了一个 ChannelPipeline(其实 Channel 和 ChannelPipeline 互相引用),而 ChannelPipeline 又维护了一个由 ChannelHandlerContext 构成的双向循环列表,其中的每一个 ChannelHandlerContext 都包含一个 ChannelHandler。(前文描述的时候为了简便,直接说 ChannelPipeline 包含了一个 ChannelHandler 责任链,这里给出完整的细节。)

如下图所示(图片来源于网络):

image.gif130.jpg

还记得下面这张图吗?这是上文中基于 Netty 的 Server 程序的调试截图,可以从中看到 ChannelHandlerContext 中包含了哪些成分:

131.jpg

ChannelHandlerContext 除了包含 ChannelHandler 之外,还关联了对应的 Channel 和 Pipeline。可以这么来讲:ChannelHandlerContext、ChannelHandler、Channel、ChannelPipeline 这几个组件之间互相引用,互为各自的属性,你中有我、我中有你。

在处理入站事件的时候,入站事件及数据会从 Pipeline 中的双向链表的头 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每个 ChannelInboundHandler(例如解码 Handler)中得到处理;出站事件及数据会从 Pipeline 中的双向链表的尾 ChannelHandlerContext 流向头 ChannelHandlerContext,并依次在其中每个 ChannelOutboundHandler(例如编码 Handler)中得到处理。

132.jpg

2.7. Netty 的 EventLoopGroup 组件

在基于 Netty 的 TCP Server 代码中,包含了两个 EventLoopGroup——bossGroup 和 workerGroup,EventLoopGroup 是一组 EventLoop 的抽象。

追踪 Netty 的 EventLoop 的继承链,可以发现 EventLoop 最终继承于 JUC Executor,因此 EventLoop 本质就是一个 JUC Executor,即线程,JUC Executor 的源码为:

public interface Executor {
    /**
     * Executes the given command at some time in the future.
     */
    void execute(Runnable command);
}

Netty 为了更好地利用多核 CPU 的性能,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例,Selector 实例监听注册其上的 Channel 的 IO 事件。

EventLoopGroup 含有一个 next 方法,它的作用是按照一定规则从 Group 中选取一个 EventLoop 处理 IO 事件。

在服务端,通常 Boss EventLoopGroup 只包含一个 Boss EventLoop(单线程),该 EventLoop 维护者一个注册了 ServerSocketChannel 的 Selector 实例。该 EventLoop 不断轮询 Selector 得到 OP_ACCEPT 事件(客户端连接事件),然后将接收到的 SocketChannel 交给 Worker EventLoopGroup,Worker EventLoopGroup 会通过 next()方法选取一个 Worker EventLoop 并将这个 SocketChannel 注册到其中的 Selector 上,由这个 Worker EventLoop 负责该 SocketChannel 上后续的 IO 事件处理。整个过程如下图所示:

133.jpg

2.8. Netty 的 TaskQueue

在 Netty 的每一个 NioEventLoop 中都有一个 TaskQueue,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector 监听到的 IO 事件。

134.jpg

Netty 中的任务队列有三种使用场景:

1)处理用户程序的自定义普通任务的时候

2)处理用户程序的自定义定时任务的时候

3)非当前 Reactor 线程调用当前 Channel 的各种方法的时候。

对于第一种场景,举个例子,2.4 节的基于 Netty 编写的服务端的 Handler 中,假如 channelRead 方法中执行的过程很耗时,那么以下的阻塞式处理方式无疑会降低当前 NioEventLoop 的并发度:

/**
 * 当通道有数据可读时执行
 *
 * @param ctx 上下文对象
 * @param msg 客户端发送的数据
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    // 借助休眠模拟耗时操作
    Thread.sleep(LONG_TIME);
    ByteBuf byteBuf = (ByteBuf) msg;
    System.out.println("data from client: "
            + byteBuf.toString(CharsetUtil.UTF_8));
}

改进方法就是借助任务队列,代码如下:

/**
 * 当通道有数据可读时执行
 *
 * @param ctx 上下文对象
 * @param msg 客户端发送的数据
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    // 假如这里的处理非常耗时,那么就需要借助任务队列异步执行
    final Object finalMsg = msg;
    // 通过 ctx.channel().eventLoop().execute()将耗时
    // 操作放入任务队列异步执行
    ctx.channel().eventLoop().execute(new Runnable() {
        public void run() {
            // 借助休眠模拟耗时操作
            try {
                Thread.sleep(LONG_TIME);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ByteBuf byteBuf = (ByteBuf) finalMsg;
            System.out.println("data from client: "
                    + byteBuf.toString(CharsetUtil.UTF_8));
        }
    });
    // 可以继续调用 ctx.channel().eventLoop().execute()
    // 将更多操作放入队列
    System.out.println("return right now.");
}

断点跟踪这个函数的执行,可以发现该耗时任务确实被放入的当前 NioEventLoop 的 taskQueue 中了。

135.jpg

对于第二种场景,举个例子,2.4 节的基于 Netty 编写的服务端的 Handler 中,假如 channelRead 方法中执行的过程并不需要立即执行,而是要定时执行,那么代码可以这样写:

/**
 * 当通道有数据可读时执行
 *
 * @param ctx 上下文对象
 * @param msg 客户端发送的数据
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    final Object finalMsg = msg;
    // 通过 ctx.channel().eventLoop().schedule()将操作
    // 放入任务队列定时执行(5min 之后才进行处理)
    ctx.channel().eventLoop().schedule(new Runnable() {
        public void run() {
            ByteBuf byteBuf = (ByteBuf) finalMsg;
            System.out.println("data from client: "
                    + byteBuf.toString(CharsetUtil.UTF_8));
        }
    }, 5, TimeUnit.MINUTES);
    // 可以继续调用 ctx.channel().eventLoop().schedule()
    // 将更多操作放入队列
    System.out.println("return right now.");
}

断点跟踪这个函数的执行,可以发现该定时任务确实被放入的当前 NioEventLoop 的 scheduleTasjQueue 中了。

136.png

对于第三种场景,举个例子,比如在基于 Netty 构建的推送系统的业务线程中,要根据用户标识,找到对应的 SocketChannel 引用,然后调用 write 方法向该用户推送消息,这时候就会将这一 write 任务放在任务队列中,write 任务最终被异步消费。这种情形是对前两种情形的应用,且涉及的业务内容太多,不再给出示例代码,读者有兴趣可以自行完成,这里给出以下提示:

137.jpg

2.9. Netty 的 Future 和 Promise

Netty**对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)**是异步的(即都立即返回一个 Netty Future,而 IO 过程异步进行),因此,调用者调用 IO 操作后是不能直接拿到调用结果的。要想得到 IO 操作结果,可以借助 Netty 的 Future(上面代码中的 ChannelFuture 就继承了 Netty Future,Netty Future 又继承了 JUC Future)查询执行状态、等待执行结果、获取执行结果等,使用过 JUC Future 接口的同学会非常熟悉这个机制,这里不再展开描述了。也可以通过 Netty Future 的 addListener()添加一个回调方法来异步处理 IO 结果,如下:

// 启动客户端去连接服务器端
// 由于 bootstrap.connect()是一个异步操作,因此用.sync()等待
// 这个异步操作完成
final ChannelFuture channelFuture = bootstrap.connect(
        "127.0.0.1",
        8080).sync();
channelFuture.addListener(new ChannelFutureListener() {
    /**
     * 回调方法,上面的 bootstrap.connect()操作执行完之后触发
     */
    public void operationComplete(ChannelFuture future)
            throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("client has connected to server!");
            // TODO 其他处理
        } else {
            System.out.println("connect to serverfail!");
            // TODO 其他处理
        }
    }
});

Netty Future 提供的接口有:

138.jpg

注:会有一些资料给出这样的描述:“Netty 中所有的 IO 操作都是异步的”,这显然是错误的。Netty 基于 Java NIO,Java NIO 是同步非阻塞 IO。Netty 基于 Java NIO 做了封装,向使用者提供了异步特性的接口,因此本文说 Netty**对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)**是异步的。例如在 io.netty.channel.ChannelOutboundInvoker(Netty Channel 的 IO 方法多继承于此)提供的多数 IO 接口都返回 Netty Future:

139.jpg

Promise 是可写的 Future,Future 自身并没有写操作相关的接口,Netty 通过 Promise 对 Future 进行扩展,用于设置 IO 操作的结果。Future 继承了 Future,相关的接口定义如下图所示,相比于上图 Future 的接口,它多出了一些 setXXX 方法:

140.jpg

Netty 发起 IO 写操作的时候,会创建一个新的 Promise 对象,例如调用 ChannelHandlerContext 的 write(Object object)方法时,会创建一个新的 ChannelPromise,相关代码如下:

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
......
@Override
public ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel(), executor());
}
......

当 IO 操作发生异常或者完成时,通过 Promise.setSuccess()或者 Promise.setFailure()设置结果,并通知所有 Listener。关于 Netty 的 Future/Promise 的工作原理,我将在下一篇文章中进行源码级的解析。

3. 结束语

我想,到此为止,读者再次看到这幅 Netty 的架构图会有不一样的感觉。它变得简洁、生动、优雅,因为你已经熟知了它的细节和运作流程。

141.jpg


参考资料:

  1. Netty官网文档,https://netty.io/wiki/all-documents.html
  2. 《Netty权威指南(第一版)》,李林锋
  3. 《Netty in Action》,Norman Maurer
  4. 《Scalable IO in Java》,Doug Lea
  5. 尚硅谷Netty系列教程,韩顺平主讲

最后欢迎大家关注我的公号,加我好友:「GG_Stone」,一起交流,共同进步!

相关文章
|
8天前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
41 6
|
8天前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
24 1
|
8天前
|
Kubernetes Cloud Native 云计算
云原生技术深度解析:重塑企业IT架构的未来####
本文深入探讨了云原生技术的核心理念、关键技术组件及其对企业IT架构转型的深远影响。通过剖析Kubernetes、微服务、容器化等核心技术,本文揭示了云原生如何提升应用的灵活性、可扩展性和可维护性,助力企业在数字化转型中保持领先地位。 ####
|
9天前
|
运维 Kubernetes Cloud Native
Kubernetes云原生架构深度解析与实践指南####
本文深入探讨了Kubernetes作为领先的云原生应用编排平台,其设计理念、核心组件及高级特性。通过剖析Kubernetes的工作原理,结合具体案例分析,为读者呈现如何在实际项目中高效部署、管理和扩展容器化应用的策略与技巧。文章还涵盖了服务发现、负载均衡、配置管理、自动化伸缩等关键议题,旨在帮助开发者和运维人员掌握利用Kubernetes构建健壮、可伸缩的云原生生态系统的能力。 ####
|
13天前
|
机器学习/深度学习 人工智能 自然语言处理
医疗行业的语音识别技术解析:AI多模态能力平台的应用与架构
AI多模态能力平台通过语音识别技术,实现实时转录医患对话,自动生成结构化数据,提高医疗效率。平台具备强大的环境降噪、语音分离及自然语言处理能力,支持与医院系统无缝集成,广泛应用于门诊记录、多学科会诊和急诊场景,显著提升工作效率和数据准确性。
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
66 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
52 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
60 0
|
1月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
80 0

推荐镜像

更多