2.5. Netty 的 Handler 组件
无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都继承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 又继承于 ChannelHandlerAdapter,ChannelHandlerAdapter 又实现了 ChannelHandler:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { ......
public abstract class ChannelHandlerAdapter implements ChannelHandler { ......
因此无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都可以统称为 ChannelHandler。
Netty 中的 ChannelHandler 的作用是,在当前 ChannelHandler 中处理 IO 事件,并将其传递给 ChannelPipeline 中下一个 ChannelHandler 处理,因此多个 ChannelHandler 形成一个责任链,责任链位于 ChannelPipeline 中。
数据在基于 Netty 的服务器或客户端中的处理流程是:读取数据-->解码数据-->处理数据-->编码数据-->发送数据。其中的每个过程都用得到 ChannelHandler 责任链。
Netty 中的 ChannelHandler 体系如下(第一张图来源于网络):
其中:
- ChannelInboundHandler 用于处理入站 IO 事件
- ChannelOutboundHandler 用于处理出站 IO 事件
- ChannelInboundHandlerAdapter 用于处理入站 IO 事件
- ChannelOutboundHandlerAdapter 用于处理出站 IO 事件
ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,如果事件的方向是从客户端到服务器的,我们称事件是出站的,那么客户端发送给服务器的数据会通过 Pipeline 中的一系列 ChannelOutboundHandler 进行处理;如果事件的方向是从服务器到客户端的,我们称事件是入站的,那么服务器发送给客户端的数据会通过 Pipeline 中的一系列 ChannelInboundHandler 进行处理。
无论是服务端代码中自定义的 NettyServerHandler 还是客户端代码中自定义的 NettyClientHandler,都继承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 提供的方法如下:
从方法名字可以看出,它们在不同的事件发生后被触发,例如注册 Channel 时执行 channelRegistred()、添加 ChannelHandler 时执行 handlerAdded()、收到入站数据时执行 channelRead()、入站数据读取完毕后执行 channelReadComplete()等等。
2.6. Netty 的 Pipeline 组件
上一节说到,Netty 的 ChannelPipeline,它维护了一个 ChannelHandler 责任链,负责拦截或者处理 inbound(入站)和 outbound(出站)的事件和操作。这一节给出更深层次的描述。
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个 ChannelHandler 如何相互交互。
每个 Netty Channel 包含了一个 ChannelPipeline(其实 Channel 和 ChannelPipeline 互相引用),而 ChannelPipeline 又维护了一个由 ChannelHandlerContext 构成的双向循环列表,其中的每一个 ChannelHandlerContext 都包含一个 ChannelHandler。(前文描述的时候为了简便,直接说 ChannelPipeline 包含了一个 ChannelHandler 责任链,这里给出完整的细节。)
如下图所示(图片来源于网络):
还记得下面这张图吗?这是上文中基于 Netty 的 Server 程序的调试截图,可以从中看到 ChannelHandlerContext 中包含了哪些成分:
ChannelHandlerContext 除了包含 ChannelHandler 之外,还关联了对应的 Channel 和 Pipeline。可以这么来讲:ChannelHandlerContext、ChannelHandler、Channel、ChannelPipeline 这几个组件之间互相引用,互为各自的属性,你中有我、我中有你。
在处理入站事件的时候,入站事件及数据会从 Pipeline 中的双向链表的头 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每个 ChannelInboundHandler(例如解码 Handler)中得到处理;出站事件及数据会从 Pipeline 中的双向链表的尾 ChannelHandlerContext 流向头 ChannelHandlerContext,并依次在其中每个 ChannelOutboundHandler(例如编码 Handler)中得到处理。
2.7. Netty 的 EventLoopGroup 组件
在基于 Netty 的 TCP Server 代码中,包含了两个 EventLoopGroup——bossGroup 和 workerGroup,EventLoopGroup 是一组 EventLoop 的抽象。
追踪 Netty 的 EventLoop 的继承链,可以发现 EventLoop 最终继承于 JUC Executor,因此 EventLoop 本质就是一个 JUC Executor,即线程,JUC Executor 的源码为:
public interface Executor { /** * Executes the given command at some time in the future. */ void execute(Runnable command); }
Netty 为了更好地利用多核 CPU 的性能,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例,Selector 实例监听注册其上的 Channel 的 IO 事件。
EventLoopGroup 含有一个 next 方法,它的作用是按照一定规则从 Group 中选取一个 EventLoop 处理 IO 事件。
在服务端,通常 Boss EventLoopGroup 只包含一个 Boss EventLoop(单线程),该 EventLoop 维护者一个注册了 ServerSocketChannel 的 Selector 实例。该 EventLoop 不断轮询 Selector 得到 OP_ACCEPT 事件(客户端连接事件),然后将接收到的 SocketChannel 交给 Worker EventLoopGroup,Worker EventLoopGroup 会通过 next()方法选取一个 Worker EventLoop 并将这个 SocketChannel 注册到其中的 Selector 上,由这个 Worker EventLoop 负责该 SocketChannel 上后续的 IO 事件处理。整个过程如下图所示:
2.8. Netty 的 TaskQueue
在 Netty 的每一个 NioEventLoop 中都有一个 TaskQueue,设计它的目的是在任务提交的速度大于线程的处理速度的时候起到缓冲作用。或者用于异步地处理 Selector 监听到的 IO 事件。
Netty 中的任务队列有三种使用场景:
1)处理用户程序的自定义普通任务的时候
2)处理用户程序的自定义定时任务的时候
3)非当前 Reactor 线程调用当前 Channel 的各种方法的时候。
对于第一种场景,举个例子,2.4 节的基于 Netty 编写的服务端的 Handler 中,假如 channelRead 方法中执行的过程很耗时,那么以下的阻塞式处理方式无疑会降低当前 NioEventLoop 的并发度:
/** * 当通道有数据可读时执行 * * @param ctx 上下文对象 * @param msg 客户端发送的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 借助休眠模拟耗时操作 Thread.sleep(LONG_TIME); ByteBuf byteBuf = (ByteBuf) msg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); }
改进方法就是借助任务队列,代码如下:
/** * 当通道有数据可读时执行 * * @param ctx 上下文对象 * @param msg 客户端发送的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 假如这里的处理非常耗时,那么就需要借助任务队列异步执行 final Object finalMsg = msg; // 通过 ctx.channel().eventLoop().execute()将耗时 // 操作放入任务队列异步执行 ctx.channel().eventLoop().execute(new Runnable() { public void run() { // 借助休眠模拟耗时操作 try { Thread.sleep(LONG_TIME); } catch (InterruptedException e) { e.printStackTrace(); } ByteBuf byteBuf = (ByteBuf) finalMsg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); } }); // 可以继续调用 ctx.channel().eventLoop().execute() // 将更多操作放入队列 System.out.println("return right now."); }
断点跟踪这个函数的执行,可以发现该耗时任务确实被放入的当前 NioEventLoop 的 taskQueue 中了。
对于第二种场景,举个例子,2.4 节的基于 Netty 编写的服务端的 Handler 中,假如 channelRead 方法中执行的过程并不需要立即执行,而是要定时执行,那么代码可以这样写:
/** * 当通道有数据可读时执行 * * @param ctx 上下文对象 * @param msg 客户端发送的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { final Object finalMsg = msg; // 通过 ctx.channel().eventLoop().schedule()将操作 // 放入任务队列定时执行(5min 之后才进行处理) ctx.channel().eventLoop().schedule(new Runnable() { public void run() { ByteBuf byteBuf = (ByteBuf) finalMsg; System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8)); } }, 5, TimeUnit.MINUTES); // 可以继续调用 ctx.channel().eventLoop().schedule() // 将更多操作放入队列 System.out.println("return right now."); }
断点跟踪这个函数的执行,可以发现该定时任务确实被放入的当前 NioEventLoop 的 scheduleTasjQueue 中了。
对于第三种场景,举个例子,比如在基于 Netty 构建的推送系统的业务线程中,要根据用户标识,找到对应的 SocketChannel 引用,然后调用 write 方法向该用户推送消息,这时候就会将这一 write 任务放在任务队列中,write 任务最终被异步消费。这种情形是对前两种情形的应用,且涉及的业务内容太多,不再给出示例代码,读者有兴趣可以自行完成,这里给出以下提示:
2.9. Netty 的 Future 和 Promise
Netty**对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)**是异步的(即都立即返回一个 Netty Future,而 IO 过程异步进行),因此,调用者调用 IO 操作后是不能直接拿到调用结果的。要想得到 IO 操作结果,可以借助 Netty 的 Future(上面代码中的 ChannelFuture 就继承了 Netty Future,Netty Future 又继承了 JUC Future)查询执行状态、等待执行结果、获取执行结果等,使用过 JUC Future 接口的同学会非常熟悉这个机制,这里不再展开描述了。也可以通过 Netty Future 的 addListener()添加一个回调方法来异步处理 IO 结果,如下:
// 启动客户端去连接服务器端 // 由于 bootstrap.connect()是一个异步操作,因此用.sync()等待 // 这个异步操作完成 final ChannelFuture channelFuture = bootstrap.connect( "127.0.0.1", 8080).sync(); channelFuture.addListener(new ChannelFutureListener() { /** * 回调方法,上面的 bootstrap.connect()操作执行完之后触发 */ public void operationComplete(ChannelFuture future) throws Exception { if (channelFuture.isSuccess()) { System.out.println("client has connected to server!"); // TODO 其他处理 } else { System.out.println("connect to serverfail!"); // TODO 其他处理 } } });
Netty Future 提供的接口有:
注:会有一些资料给出这样的描述:“Netty 中所有的 IO 操作都是异步的”,这显然是错误的。Netty 基于 Java NIO,Java NIO 是同步非阻塞 IO。Netty 基于 Java NIO 做了封装,向使用者提供了异步特性的接口,因此本文说 Netty**对使用者提供的多数 IO 接口(即 Netty Channel 中的 IO 方法)**是异步的。例如在 io.netty.channel.ChannelOutboundInvoker(Netty Channel 的 IO 方法多继承于此)提供的多数 IO 接口都返回 Netty Future:
Promise 是可写的 Future,Future 自身并没有写操作相关的接口,Netty 通过 Promise 对 Future 进行扩展,用于设置 IO 操作的结果。Future 继承了 Future,相关的接口定义如下图所示,相比于上图 Future 的接口,它多出了一些 setXXX 方法:
Netty 发起 IO 写操作的时候,会创建一个新的 Promise 对象,例如调用 ChannelHandlerContext 的 write(Object object)方法时,会创建一个新的 ChannelPromise,相关代码如下:
@Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } ...... @Override public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); } ......
当 IO 操作发生异常或者完成时,通过 Promise.setSuccess()或者 Promise.setFailure()设置结果,并通知所有 Listener。关于 Netty 的 Future/Promise 的工作原理,我将在下一篇文章中进行源码级的解析。
3. 结束语
我想,到此为止,读者再次看到这幅 Netty 的架构图会有不一样的感觉。它变得简洁、生动、优雅,因为你已经熟知了它的细节和运作流程。
参考资料:
- Netty官网文档,https://netty.io/wiki/all-documents.html
- 《Netty权威指南(第一版)》,李林锋
- 《Netty in Action》,Norman Maurer
- 《Scalable IO in Java》,Doug Lea
- 尚硅谷Netty系列教程,韩顺平主讲
最后欢迎大家关注我的公号,加我好友:「GG_Stone」,一起交流,共同进步!