首先我们知道,在NIO网络编程模型中,IO操作直接和channel相关,比如客户端的请求连接,或者向服务端发送数据, 服务端都要从客户端的channel获取这个数据
那么channelPipeline是什么?
其实,这个channelPepiline是Netty增加给原生的channel的组件,在ChannelPipeline
接口的上的注解阐述了channelPipeline
的作用,这个channelPipeline是高级过滤器的实现,netty将chanenl中数据导向channelPipeline,进而给了用户对channel中数据的百分百的控制权, 此外,channelPipeline数据结构是双向链表,每一个节点都是channelContext
,channelContext
里面维护了对应的handler和pipeline的引用, 大概总结一下: 通过chanelPipeline,用户客户轻松的往channel写数据,从channel读数据
创建pipeline#
通过前面几篇博客的追踪,我们知道无论我们是通过反射创建出服务端的channel也好,还是直接new创建客户端的channel也好,随着父类构造函数的逐层调用,最终我们都会在Channel体系的顶级抽象类AbstractChannel
中,创建出Channel的一大组件 channelPipeline
于是我们程序的入口,AbstractChannel
的pipeline = newChannelPipeline();
,跟进去,看到他的源码如下:
protected DefaultChannelPipeline newChannelPipeline() { // todo 跟进去 return new DefaultChannelPipeline(this); }
可以看到,它创建了一个DefaultChannelPipeline(thisChannel)
DefaultChannelPipeline
是channelPipeline的默认实现,他有着举足轻重的作用,我们看一下下面的 Channel
ChannelContext
ChannelPipeline
的继承体系图,我们可以看到图中两个类,其实都很重要,
他们之间有什么关系呢?
当我们看完了DefaultChannelPipeline()
构造中做了什么自然就知道了
// todo 来到这里 protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); // todo 把当前的Channel 保存起来 succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); // todo 这两方法很重要 // todo 设置尾 tail = new TailContext(this); // todo 设置头 head = new HeadContext(this); // todo 双向链表关联 head.next = tail; tail.prev = head; }
主要做了如下几件事:
- 初始化succeededFuture
- 初始化voidPromise
- 创建尾节点
- 创建头节点
- 关联头尾节点
其实,到现在为止,pipiline的初始化已经完成了,我们接着往下看
此外,我们看一下DefaultChannelPipeline
的内部类和方法,如下图()
我们关注我圈出来的几部分
- 两个重要的内部类
- 头结点 HeaderContext
- 尾节点 TailContext
- PendingHandlerAddedTask 添加完handler之后处理的任务
- PendingHandlerCallBack 添加完handler的回调
- PengdingHandlerRemovedTask 移除Handler之后的任务
- 大量的addXXX方法,
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail;
跟进它的封装方法:
TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } // todo 来到这里 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); // todo 为ChannelContext的pipeline附上值了 this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }
如下图是HeaderContext和TailContext的声明截图:
我们可以看到,这个tail节点是inbound类型的处理器,一开始确实很纳闷,难道header不应该是Inbound类型的吗?我也不买关子了,直接说为啥
是的,header确实是Inbound类型的处理器, 同时也是出站处理器 (评论区有个老哥说的也很清楚,可以瞅瞅)
因为,对netty来说用发送过来的数据,要就从header节点开始往后传播,怎么传播呢? 因为是双向链表,直接找后一个节点,什么类型的节点呢? inbound类型的,于是数据msg就从header之后的第一个结点往后传播,如果说,一直到最后,都只是传播数据而没有任何处理就会传播到tail节点,因为tail也是inbound类型的, tail节点会替我们释放掉这个msg,防止内存泄露,当然如果我们自己使用了msg,而没往后传播,也没有释放,内存泄露是早晚的时,这就是为啥tail是Inbound类型的, header节点和它相反,在下面说
ok,现在知道了ChannelPipeline的创建了吧
Channelpipeline与ChannelHandler和ChannelHandlerContext之间的关系#
它三者的关系也直接说了, 在上面pipeline
的创建的过程中, DefaultChannelPipeline
中的头尾节点都是ChannelHandlerContext
, 这就意味着, 在pipeline双向链表的结构中,每一个节点都是一个ChannelHandlerContext
, 而且每一个 ChannelHandlerContext
维护一个handler
,这一点不信可以看上图,ChannelHandlerContext
的实现类DefaultChannelHandlerContext
的实现类, 源码如下:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { // todo Context里面有了 handler的引用 private final ChannelHandler handler; // todo 创建默认的 ChannelHandlerContext, DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler;
ChannelHandlerContext
接口同时继承ChannelOutBoundInvoker
和和ChannelInBoundInvoker
使得他同时拥有了传播入站事件和出站事件的能力, ChannelHandlerContext
把事件传播之后,是谁处理的呢? 当然是handler
下面给出ChannelHandler
的继承体系图,可以看到针对入站出来和出站处理ChannelHandler
有不同的继承分支应对
添加一个新的节点:#
一般我们都是通过ChanelInitialezer
动态的一次性添加多个handler, 下面就去看看,在服务端启动过程中,ServerBootStrap
的init()
,如下源码:解析我写在代码下面
// todo 这是ServerBootStrapt对 他父类初始化 channel的实现, 用于初始化 NioServerSocketChannel @Override void init(Channel channel) throws Exception { // todo ChannelOption 是在配置 Channel 的 ChannelConfig 的信息 final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { // todo 把 NioserverSocketChannel 和 options Map传递进去, 给Channel里面的属性赋值 // todo 这些常量值全是关于和诸如TCP协议相关的信息 setChannelOptions(channel, options, logger); } // todo 再次一波 给Channel里面的属性赋值 attrs0()是获取到用户自定义的业务逻辑属性 -- AttributeKey final Map<AttributeKey<?>, Object> attrs = attrs0(); // todo 这个map中维护的是 程序运行时的 动态的 业务数据 , 可以实现让业务数据随着netty的运行原来存进去的数据还能取出来 synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // todo------- options attrs : 都可以在创建BootStrap时动态的传递进去 // todo ChannelPipeline 本身 就是一个重要的组件, 他里面是一个一个的处理器, 说他是高级过滤器,交互的数据 会一层一层经过它 // todo 下面直接就调用了 p , 说明,在channel调用pipeline方法之前, pipeline已经被创建出来了!, // todo 到底是什么时候创建出来的 ? 其实是在创建NioServerSocketChannel这个通道对象时,在他的顶级抽象父类(AbstractChannel)中创建了一个默认的pipeline对象 /// todo 补充: ChannelHandlerContext 是 ChannelHandler和Pipeline 交互的桥梁 ChannelPipeline p = channel.pipeline(); // todo workerGroup 处理IO线程 final EventLoopGroup currentChildGroup = childGroup; // todo 我们自己添加的 Initializer final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; // todo 这里是我们在Server类中添加的一些针对新连接channel的属性设置, 这两者属性被acceptor使用到!!! synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } // todo 默认 往NioServerSocketChannel的管道里面添加了一个 ChannelInitializer , // todo ( 后来我们自己添加的ChildHandler 就继承了的这个ChannelInitializer , 而这个就继承了的这个ChannelInitializer 实现了ChannelHandler) p.addLast(new ChannelInitializer<Channel>() { // todo 进入addlast // todo 这个ChannelInitializer 方便我们一次性往pipeline中添加多个处理器 @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // todo 获取bootStrap的handler 对象, 没有返回空 // todo 这个handler 针对bossgroup的Channel , 给他添加上我们在server类中添加的handler()里面添加处理器 ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // todo ServerBootstrapAcceptor 接收器, 是一个特殊的chanelHandler ch.eventLoop().execute(new Runnable() { @Override public void run() { // todo !!! -- 这个很重要,在ServerBootStrap里面,netty已经为我们生成了接收器 --!!! // todo 专门处理新连接的接入, 把新连接的channel绑定在 workerGroup中的某一条线程上 // todo 用于处理用户的请求, 但是还有没搞明白它是怎么触发执行的 pipeline.addLast(new ServerBootstrapAcceptor( // todo 这些参数是用户自定义的参数 // todo NioServerSocketChannel, worker线程组 处理器 关系的事件 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
这个函数真的是好长,但是我们的重点放在ChannelInitializer
身上, 现在的阶段, 当前的channel还没有注册上EventLoop上的Selector中
还有不是分析怎么添加handler? 怎么来这里了? 其实下面的 ServerBootstrapAcceptor就是一个handler
我们看一下上面的代码做了啥
ch.eventLoop().execute(new Runnable() { @Override public void run() { // todo !!! -- 这个很重要,在ServerBootStrap里面,netty已经为我们生成了接收器 --!!! // todo 专门处理新连接的接入, 把新连接的channel绑定在 workerGroup中的某一条线程上 // todo 用于处理用户的请求, 但是还有没搞明白它是怎么触发执行的 pipeline.addLast(new ServerBootstrapAcceptor( // todo 这些参数是用户自定义的参数 // todo NioServerSocketChannel, worker线程组 处理器 关系的事件 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });
懵逼不? 当时真的是给我整蒙圈了, 还没有关联上 EventLoop呢!!! 哪来的ch.eventLoop()....
后来整明白了,这其实是一个回调,netty提供给用户在任意时刻都可以往pipeline中添加handler的实现手段
那么在哪里回调呢? 其实是在 jdk原生的channel注册进EventLoop中的Selector后紧接着回调的,源码如下
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // todo 进入这个方法doRegister() // todo 它把系统创建的ServerSocketChannel 注册进了选择器 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // todo 确保在 notify the promise前调用 handlerAdded(...) // todo 这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件。 // todo 如果需要的话,执行HandlerAdded()方法 // todo 正是这个方法, 回调了前面我们添加 Initializer 中添加 Accpter的重要方法 pipeline.invokeHandlerAddedIfNeeded();
回调函数在pipeline.invokeHandlerAddedIfNeeded();
, 看它的命名, 如果需要的话,执行handler已经添加完成了操作 哈哈,我们现在当然需要,刚添加了个ServerBootstrapAcceptor
在跟进入看源码之间,注意,方法是pipeline调用的, 哪个pipeline呢? 就是上面我们说的DefaultChannelPipeline
, ok,跟进源码,进入 DefaultChannelPipeline
// todo 执行handler的添加,如果 需要的话 final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; // todo 现在我们的channel已经注册在bossGroup中的eventLoop上了, 是时候回调执行那些在注册前添加的 handler了 callHandlerAddedForAllHandlers(); } }
调用本类方法callHandlerAddedForAllHandlers();
继续跟进下
// todo 回调原来在没有注册完成之前添加的handler private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; // This Channel itself was registered. registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. this.pendingHandlerCallbackHead = null; } PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { task.execute(); task = task.next; } }
我们它的动作task.execute();
其中的task是谁? pendingHandlerCallbackHead
这是DefaultChannelPipeline
的内部类, 它的作用就是辅助完成 添加handler之后的回调, 源码如下:
private abstract static class PendingHandlerCallback implements Runnable { final AbstractChannelHandlerContext ctx; PendingHandlerCallback next; PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; } abstract void execute(); }
我们跟进上一步的task.execute()
就会看到它的抽象方法,那么是谁实现的呢? 实现类是PendingHandlerAddedTask
同样是DefaultChannelPipeline
的内部类, 既然不是抽象类了, 就得同时实现他父类PendingHandlerCallback
的抽象方法,其实有两个一是个excute()
另一个是run()
--Runable
我们进入看它是如何实现excute
,源码如下:
@Override void execute() { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } remove0(ctx); ctx.setRemoved(); } }
HandlerAdded()
的回调时机#
我们往下追踪, 调用类本类方法callHandlerAdded0(ctx);
源码如下:
// todo 重点看看这个方法 , 入参是刚才添加的 Context private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { // todo 在channel关联上handler之后并且把Context添加到了 Pipeline之后进行调用!!! ctx.handler().handlerAdded(ctx); // todo 他是诸多的回调方法中第一个被调用的 ctx.setAddComplete(); // todo 修改状态 } ...
继续往下追踪
- ctx.handler() -- 获取到了当前的channel
- 调用channel的
.handlerAdded(ctx);
这个handlerAdded()
是定义在ChannelHandler中的回调方法, 什么时候回调呢? 当handler添加后回调, 因为我们知道,当服务端的channel在启动时,会通过 channelInitializer 添加那个ServerBootstrapAcceptor
,所以ServerBootstrapAcceptor
的handlerAdded()
的回调时机就在上面代码中的ctx.handler().handlerAdded(ctx);
如果直接点击去这个函数,肯定就是ChannelHandler
接口中去; 那么 新的问题来了,谁是实现类? 答案是抽象类 ChannelInitializer`` 就在上面我们添加ServerBootstrapAcceptor
就创建了一个ChannelInitializer
的匿名对象
它的继承体系图如下:
介绍这个ChannelInitializer
他是Netty提供的辅助类,用于提供针对channel的初始化工作,什么工作呢? 批量初始化channel
这个中有三个重要方法,如下
- 重写的channel的
handlerAdded()
, 这其实也是handlerAdded()
的回调的体现 - 自己的
initChannel()
- 自己的
remove()
继续跟进我们上面的handlerAdded(ChannelHandlerContext ctx)
源码如下:
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { initChannel(ctx); // todo 这个方法在上面, 进入 可以在 finally中 找到移除Initializer的逻辑 } }
调用本类的initChannel(ctx);
源码如下:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { // todo remove(ctx); 删除 ChannelInitializer remove(ctx); } return true; } return false; }
两个点
- 第一: 继续调用本类的抽象方法
initChannel((C) ctx.channel());
- 第二: 移除了
remove(ctx);
分开进行第一步
initChannel((C) ctx.channel());
初始化channel,这个函数被设计成了抽象的, 问题来了, 实现类是谁? 实现类其实刚才说了,就是netty在添加ServerBootStrapAcceptor
时创建的那个匿名内部类,我们跟进去看他的实现: 源码如下:
@Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // todo 获取bootStrap的handler 对象, 没有返回空 // todo 这个handler 针对bossgroup的Channel , 给他添加上我们在server类中添加的handler()里面添加处理器 ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // todo ServerBootstrapAcceptor 接收器, 是一个特殊的chanelHandler ch.eventLoop().execute(new Runnable() { @Override public void run() { // todo !!! -- 这个很重要,在ServerBootStrap里面,netty已经为我们生成了接收器 --!!! // todo 专门处理新连接的接入, 把新连接的channel绑定在 workerGroup中的某一条线程上 // todo 用于处理用户的请求, 但是还有没搞明白它是怎么触发执行的 pipeline.addLast(new ServerBootstrapAcceptor( // todo 这些参数是用户自定义的参数 // todo NioServerSocketChannel, worker线程组 处理器 关系的事件 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
实际上就是完成了一次方法的回调,成功添加了ServerBootstrapAcceptor
处理器
删除一个节点#
回来看第二步
remove(ctx);
删除一个节点, 把Initializer
删除了? 是的, 把这个初始化器删除了, 为啥要把它删除呢, 说了好多次, 其实他是一个辅助类, 目的就是通过他往channel中一次性添加多个handler, 现在handler也添加完成了, 留着他也没啥用,直接移除了
我们接着看它的源码
// todo 删除当前ctx 节点 private void remove(ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } finally { initMap.remove(ctx); } }
从pipeline中移除, 一路看过去,就会发现底层删除链表节点的操作
private static void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; }