45 张图深度解析 Netty 架构与原理(五)

简介: 作为一个学 Java 的,如果没有研究过 Netty,那么你对 Java 语言的使用和理解仅仅停留在表面水平,会点 SSH 写几个 MVC,访问数据库和缓存,这些只是初等 Java 程序员干的事。如果你要进阶,想了解 Java 服务器的深层高阶知识,Netty 绝对是一个必须要过的门槛。 接下来我们会学习一个 Netty 系列教程,Netty 系列由「架构与原理」,「源码」,「架构」三部分组成,今天我们先来看看第一部分:Netty 架构与原理初探,大纲如下:

2.5. Netty 的 Handler 组件

无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都继承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 又继承于 ChannelHandlerAdapter,ChannelHandlerAdapter 又实现了 ChannelHandler:

public class ChannelInboundHandlerAdapter 
    extends ChannelHandlerAdapter 
    implements ChannelInboundHandler {
    ......
public abstract class ChannelHandlerAdapter 
    implements ChannelHandler {
    ......

因此无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都可以统称为 ChannelHandler。

Netty 中的 ChannelHandler 的作用是,在当前 ChannelHandler 中处理 IO 事件,并将其传递给 ChannelPipeline 中下一个 ChannelHandler 处理,因此多个 ChannelHandler 形成一个责任链,责任链位于 ChannelPipeline 中。

数据在基于 Netty 的服务器或客户端中的处理流程是:读取数据-->解码数据-->处理数据-->编码数据-->发送数据。其中的每个过程都用得到 ChannelHandler 责任链。

125.jpg

Netty 中的 ChannelHandler 体系如下(第一张图来源于网络):

126.jpg127.jpg

其中:

  • ChannelInboundHandler 用于处理入站 IO 事件
  • ChannelOutboundHandler 用于处理出站 IO 事件
  • ChannelInboundHandlerAdapter 用于处理入站 IO 事件
  • ChannelOutboundHandlerAdapter 用于处理出站 IO 事件

ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,如果事件的方向是从客户端到服务器的,我们称事件是出站的,那么客户端发送给服务器的数据会通过 Pipeline 中的一系列 ChannelOutboundHandler 进行处理;如果事件的方向是从服务器到客户端的,我们称事件是入站的,那么服务器发送给客户端的数据会通过 Pipeline 中的一系列 ChannelInboundHandler 进行处理。

128.jpg

无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都继承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 提供的方法如下:

129.jpg

从方法名字可以看出,它们在不同的事件发生后被触发,例如注册 Channel 时执行 channelRegistred()、添加 ChannelHandler 时执行 handlerAdded()、收到入站数据时执行 channelRead()、入站数据读取完毕后执行 channelReadComplete()等等。

2.6. Netty 的 Pipeline 组件

上一节说到,Netty 的 ChannelPipeline,它维护了一个 ChannelHandler 责任链,负责拦截或者处理 inbound(入站)和 outbound(出站)的事件和操作。这一节给出更深层次的描述。

ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个 ChannelHandler 如何相互交互。

每个 Netty Channel 包含了一个 ChannelPipeline(其实 Channel 和 ChannelPipeline 互相引用),而 ChannelPipeline 又维护了一个由 ChannelHandlerContext 构成的双向循环列表,其中的每一个 ChannelHandlerContext 都包含一个 ChannelHandler。(前文描述的时候为了简便,直接说 ChannelPipeline 包含了一个 ChannelHandler 责任链,这里给出完整的细节。)

如下图所示(图片来源于网络):

image.gif130.jpg

还记得下面这张图吗?这是上文中基于 Netty 的 Server 程序的调试截图,可以从中看到 ChannelHandlerContext 中包含了哪些成分:

131.jpg

ChannelHandlerContext 除了包含 ChannelHandler 之外,还关联了对应的 Channel 和 Pipeline。可以这么来讲:ChannelHandlerContext、ChannelHandler、Channel、ChannelPipeline 这几个组件之间互相引用,互为各自的属性,你中有我、我中有你。

在处理入站事件的时候,入站事件及数据会从 Pipeline 中的双向链表的头 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每个 ChannelInboundHandler(例如解码 Handler)中得到处理;出站事件及数据会从 Pipeline 中的双向链表的尾 ChannelHandlerContext 流向头 ChannelHandlerContext,并依次在其中每个 ChannelOutboundHandler(例如编码 Handler)中得到处理。

132.jpg

2.7. Netty 的 EventLoopGroup 组件

在基于 Netty 的 TCP Server 代码中,包含了两个 EventLoopGroup——bossGroup 和 workerGroup,EventLoopGroup 是一组 EventLoop 的抽象。

追踪 Netty 的 EventLoop 的继承链,可以发现 EventLoop 最终继承于 JUC Executor,因此 EventLoop 本质就是一个 JUC Executor,即线程,JUC Executor 的源码为:

public interface Executor {
    /**
     * Executes the given command at some time in the future.
     */
    void execute(Runnable command);
}

Netty 为了更好地利用多核 CPU 的性能,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例,Selector 实例监听注册其上的 Channel 的 IO 事件。

EventLoopGroup 含有一个 next 方法,它的作用是按照一定规则从 Group 中选取一个 EventLoop 处理 IO 事件。

在服务端,通常 Boss EventLoopGroup 只包含一个 Boss EventLoop(单线程),该 EventLoop 维护者一个注册了 ServerSocketChannel 的 Selector 实例。该 EventLoop 不断轮询 Selector 得到 OP_ACCEPT 事件(客户端连接事件),然后将接收到的 SocketChannel 交给 Worker EventLoopGroup,Worker EventLoopGroup 会通过 next()方法选取一个 Worker EventLoop 并将这个 SocketChannel 注册到其中的 Selector 上,由这个 Worker EventLoop 负责该 SocketChannel 上后续的 IO 事件处理。整个过程如下图所示:

133.jpg

2.8. Netty 的 TaskQueue

在 Netty 的每一个 NioEventLoop 中都有一个 TaskQueue,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector 监听到的 IO 事件。

134.jpg

Netty 中的任务队列有三种使用场景:

1)处理用户程序的自定义普通任务的时候

2)处理用户程序的自定义定时任务的时候

3)非当前 Reactor 线程调用当前 Channel 的各种方法的时候。

对于第一种场景,举个例子,2.4 节的基于 Netty 编写的服务端的 Handler 中,假如 channelRead 方法中执行的过程很耗时,那么以下的阻塞式处理方式无疑会降低当前 NioEventLoop 的并发度:

/**
 * 当通道有数据可读时执行
 *
 * @param ctx 上下文对象
 * @param msg 客户端发送的数据
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    // 借助休眠模拟耗时操作
    Thread.sleep(LONG_TIME);
    ByteBuf byteBuf = (ByteBuf) msg;
    System.out.println("data from client: "
            + byteBuf.toString(CharsetUtil.UTF_8));
}

改进方法就是借助任务队列,代码如下:

/**
 * 当通道有数据可读时执行
 *
 * @param ctx 上下文对象
 * @param msg 客户端发送的数据
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    // 假如这里的处理非常耗时,那么就需要借助任务队列异步执行
    final Object finalMsg = msg;
    // 通过 ctx.channel().eventLoop().execute()将耗时
    // 操作放入任务队列异步执行
    ctx.channel().eventLoop().execute(new Runnable() {
        public void run() {
            // 借助休眠模拟耗时操作
            try {
                Thread.sleep(LONG_TIME);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ByteBuf byteBuf = (ByteBuf) finalMsg;
            System.out.println("data from client: "
                    + byteBuf.toString(CharsetUtil.UTF_8));
        }
    });
    // 可以继续调用 ctx.channel().eventLoop().execute()
    // 将更多操作放入队列
    System.out.println("return right now.");
}

断点跟踪这个函数的执行,可以发现该耗时任务确实被放入的当前 NioEventLoop 的 taskQueue 中了。

135.jpg

对于第二种场景,举个例子,2.4 节的基于 Netty 编写的服务端的 Handler 中,假如 channelRead 方法中执行的过程并不需要立即执行,而是要定时执行,那么代码可以这样写:

/**
 * 当通道有数据可读时执行
 *
 * @param ctx 上下文对象
 * @param msg 客户端发送的数据
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    final Object finalMsg = msg;
    // 通过 ctx.channel().eventLoop().schedule()将操作
    // 放入任务队列定时执行(5min 之后才进行处理)
    ctx.channel().eventLoop().schedule(new Runnable() {
        public void run() {
            ByteBuf byteBuf = (ByteBuf) finalMsg;
            System.out.println("data from client: "
                    + byteBuf.toString(CharsetUtil.UTF_8));
        }
    }, 5, TimeUnit.MINUTES);
    // 可以继续调用 ctx.channel().eventLoop().schedule()
    // 将更多操作放入队列
    System.out.println("return right now.");
}

断点跟踪这个函数的执行,可以发现该定时任务确实被放入的当前 NioEventLoop 的 scheduleTasjQueue 中了。

136.png

对于第三种场景,举个例子,比如在基于 Netty 构建的推送系统的业务线程中,要根据用户标识,找到对应的 SocketChannel 引用,然后调用 write 方法向该用户推送消息,这时候就会将这一 write 任务放在任务队列中,write 任务最终被异步消费。这种情形是对前两种情形的应用,且涉及的业务内容太多,不再给出示例代码,读者有兴趣可以自行完成,这里给出以下提示:

137.jpg

2.9. Netty 的 Future 和 Promise

Netty**对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)**是异步的(即都立即返回一个 Netty Future,而 IO 过程异步进行),因此,调用者调用 IO 操作后是不能直接拿到调用结果的。要想得到 IO 操作结果,可以借助 Netty 的 Future(上面代码中的 ChannelFuture 就继承了 Netty Future,Netty Future 又继承了 JUC Future)查询执行状态、等待执行结果、获取执行结果等,使用过 JUC Future 接口的同学会非常熟悉这个机制,这里不再展开描述了。也可以通过 Netty Future 的 addListener()添加一个回调方法来异步处理 IO 结果,如下:

// 启动客户端去连接服务器端
// 由于 bootstrap.connect()是一个异步操作,因此用.sync()等待
// 这个异步操作完成
final ChannelFuture channelFuture = bootstrap.connect(
        "127.0.0.1",
        8080).sync();
channelFuture.addListener(new ChannelFutureListener() {
    /**
     * 回调方法,上面的 bootstrap.connect()操作执行完之后触发
     */
    public void operationComplete(ChannelFuture future)
            throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("client has connected to server!");
            // TODO 其他处理
        } else {
            System.out.println("connect to serverfail!");
            // TODO 其他处理
        }
    }
});

Netty Future 提供的接口有:

138.jpg

注:会有一些资料给出这样的描述:“Netty 中所有的 IO 操作都是异步的”,这显然是错误的。Netty 基于 Java NIO,Java NIO 是同步非阻塞 IO。Netty 基于 Java NIO 做了封装,向使用者提供了异步特性的接口,因此本文说 Netty**对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)**是异步的。例如在 io.netty.channel.ChannelOutboundInvoker(Netty Channel 的 IO 方法多继承于此)提供的多数 IO 接口都返回 Netty Future:

139.jpg

Promise 是可写的 Future,Future 自身并没有写操作相关的接口,Netty 通过 Promise 对 Future 进行扩展,用于设置 IO 操作的结果。Future 继承了 Future,相关的接口定义如下图所示,相比于上图 Future 的接口,它多出了一些 setXXX 方法:

140.jpg

Netty 发起 IO 写操作的时候,会创建一个新的 Promise 对象,例如调用 ChannelHandlerContext 的 write(Object object)方法时,会创建一个新的 ChannelPromise,相关代码如下:

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
......
@Override
public ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel(), executor());
}
......

当 IO 操作发生异常或者完成时,通过 Promise.setSuccess()或者 Promise.setFailure()设置结果,并通知所有 Listener。关于 Netty 的 Future/Promise 的工作原理,我将在下一篇文章中进行源码级的解析。

3. 结束语

我想,到此为止,读者再次看到这幅 Netty 的架构图会有不一样的感觉。它变得简洁、生动、优雅,因为你已经熟知了它的细节和运作流程。

141.jpg


参考资料:

  1. Netty官网文档,https://netty.io/wiki/all-documents.html
  2. 《Netty权威指南(第一版)》,李林锋
  3. 《Netty in Action》,Norman Maurer
  4. 《Scalable IO in Java》,Doug Lea
  5. 尚硅谷Netty系列教程,韩顺平主讲

最后欢迎大家关注我的公号,加我好友:「GG_Stone」,一起交流,共同进步!

相关文章
|
2天前
|
缓存 自然语言处理 JavaScript
万字长文深度解析JDK序列化原理及Fury高度兼容的极致性能实现
Fury是一个基于JIT动态编译的高性能多语言原生序列化框架,支持Java/Python/Golang/C++/JavaScript等语言,提供全自动的对象多语言/跨语言序列化能力,以及相比于别的框架最高20~200倍的性能。
|
8天前
|
存储 芯片
【期末不挂科-单片机考前速过系列P11】(第十一章:15题速过串行口的工作原理和应用)经典例题盘点(带图解析)
【期末不挂科-单片机考前速过系列P11】(第十一章:15题速过串行口的工作原理和应用)经典例题盘点(带图解析)
【期末不挂科-单片机考前速过系列P10】(第十章:11题中断系统的工作原理及应用)经典例题盘点(带图解析)
【期末不挂科-单片机考前速过系列P10】(第十章:11题中断系统的工作原理及应用)经典例题盘点(带图解析)
|
8天前
|
C语言 C++
【期末不挂科-单片机考前速过系列P1】(第一章:27题搞定单片机&其工作原理)经典例题盘点【选择题&判断题&填空题】(带图解析)
【期末不挂科-单片机考前速过系列P1】(第一章:27题搞定单片机&其工作原理)经典例题盘点【选择题&判断题&填空题】(带图解析)
|
8天前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。
|
8天前
|
前端开发 测试技术 数据处理
安卓开发中的MVP架构模式深度解析
【4月更文挑战第30天】在移动应用开发领域,模型-视图-呈现器(Model-View-Presenter, MVP)是一种广泛采用的架构模式。它旨在通过解耦组件间的直接交互来提高代码的可维护性和可测试性。本文将深入探讨MVP在安卓开发中的应用,揭示其如何促进代码的模块化,提升用户界面的响应性,并简化单元测试过程。我们将从理论概念出发,逐步过渡到实践案例,为读者提供一套行之有效的MVP实施策略。
|
9天前
|
JavaScript 前端开发 算法
vue生命周期函数原理解析,vue阻止事件冒泡方法实现
vue生命周期函数原理解析,vue阻止事件冒泡方法实现
|
9天前
|
芯片
EDA设计:原理、实践与代码深度解析
EDA设计:原理、实践与代码深度解析
24 2
|
9天前
|
算法 计算机视觉 Python
DSP技术深度解析:原理、实践与应用
DSP技术深度解析:原理、实践与应用
19 1
|
11天前
|
机器学习/深度学习 人工智能 算法
AI作画原理及相关理论解析
本文探讨了AI作画,特别是深度学习技术如何驱动这一艺术形式的发展。AI作画基于卷积神经网络(CNN),通过学习艺术作品风格和内容生成新作品。流程包括数据收集、模型训练、风格迁移和后处理。文章介绍了风格迁移理论,包括内容损失和风格损失,以及生成对抗网络(GAN)的基本概念。提供的代码示例展示了使用TensorFlow和Keras实现风格迁移的简化过程。为了优化结果,可以调整优化器、权重参数、模型选择及图像处理技术。

推荐镜像

更多