inbound事件的传播#
什么是inbound事件#
inbound事件其实就是客户端主动发起事件,比如说客户端请求连接,连接后客户端有主动的给服务端发送需要处理的有效数据等,只要是客户端主动发起的事件,都算是Inbound事件,特征就是事件触发类型,当channel处于某个节点,触发服务端传播哪些动作
netty如何对待inbound#
netty为了更好的处理channel中的数据,给jdk原生的channel添加了pipeline组件,netty会把原生jdk的channel中的数据导向这个pipeline,从pipeline中的header开始 往下传播, 用户对这个过程拥有百分百的控制权,可以把数据拿出来处理, 也可以往下传播,一直传播到tail节点,tail节点会进行回收,如果在传播的过程中,最终没到尾节点,自己也没回收,就会面临内存泄露的问题
一句话总结,面对Inbound的数据, 被动传播
netty知道客户端发送过来的数据是啥类型吗?#
比如一个聊天程序,客户端可能发送的是心跳包,也可能发送的是聊天的内容,netty又不是人,他是不知道数据是啥的,他只知道来了数据,需要进一步处理,怎么处理呢? 把数据导向用户指定的handler链条
开始读源码#
这里书接上一篇博客的尾部,事件的传播
重点步骤如下
第一步: 等待服务端启动完成
第二步: 使用telnet模拟发送请求 --- > 新连接的接入逻辑
第三步: register0(ChannelPromise promise)
方法中会传播channel激活事件 --> 目的是二次注册端口,
第三个也是我们程序的入手点: fireChannelActive()
源码如下:
@Override public final ChannelPipeline fireChannelActive() { // todo ChannelActive从head开始传播 AbstractChannelHandlerContext.invokeChannelActive(head); return this; }
调用了AbstractChannelHandlerContext
的invokeChannelActive
方法
在这里,我觉得特别有必须有必要告诉自己
AbstractChannelHandlerContext
的重要性,现在的DefaultChannelPipeline
中的每一个节点,包括header,tail,以及我们自己的添加的,都是AbstractChannelHandlerContext
类型,事件的传播围绕着AbstractChannelHandlerContext
的方法开始,温习它的继承体系如下图
接着回到AbstractChannelHandlerContext.invokeChannelActive(head);
, 很显然,这是个静态方法, 跟进去,源码如下:
// todo 来这 static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); ... }
- 第一点: inbound类型的事件是从header开始传播的 , next --> HeaderContext
- 第二点: HeaderContext其实就是
AbstractChannelHandlerContext
类型的,所以invokeChannelActive()
其实是当前类的方法
ok,跟进入看看他干啥了,源码:
// todo 使Channel活跃 private void invokeChannelActive() { // todo 继续进去 ((ChannelInboundHandler) handler()).channelActive(this); }
我们看, 上面的代码做了如下几件事
- handler() -- 返回当前的 handler, 就是从HandlerContext中拿出handler
- 强转成
ChannelInboundHandler
类型的,因为他是InBound类型的处理器
如果我们用鼠标点击channelActive(this)
, 毫无疑问会进入ChannelInboundHandler
,看到的是抽象方法
那么问题来了, 谁实现的它?
其实是headerContext
头结点做的, 之前说过,Inbound事件,是从header开始传播的,继续跟进去, 看源码:
// todo 来到这里, 分两步, 1. 把ChannelActive事件继续往下传播, 传播结束之后,做第二件事 // todo 2. readIfIsAutoRead(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // todo fireChannelActive是在做了实际的端口绑定之后才触发回调 ctx.fireChannelActive(); // todo 默认方式注册一个read事件 // todo 跟进去, readIfIsAutoRead 作用是 把已经注册进selector上的事件, 重新注册绑定上在初始化NioServerSocketChannel时添加的Accept事件 // todo 目的是 新连接到来时, selector轮询到accept事件, 使得netty可以进一步的处理这个事件 readIfIsAutoRead(); }
其实这里有两种重要的事情 , 上面我们也看到了:
- 向下传播
channelActive()
目的是让header后面的用户添加的handler中的channelActive()
被回调 readIfIsAutoRead();
就是去注册Netty能看懂的感兴趣的事件
下面我们看它的事件往下传播, 于是重新回到了AbstractChannelHandlerContext
, 源码如下:
public ChannelHandlerContext fireChannelActive() { invokeChannelActive(findContextInbound()); return this; }
findContextInbound()
找出下一个Inbound类型的处理器, 我们去看他的实现,源码如下:
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
是不是明明白白的? 从当前节点开始,往后变量整个链表, 下一个节点是谁呢? 在新链接接入的逻辑中,调用的ChannelInitializer
我手动 批量添加了三个InboundHandler,按照我添加的顺序,他们会依次被找到
继续跟进本类方法invokeChannelActive(findContextInbound())
,源码如下
// todo 来这 static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); ...
一开始的next--> HeaderContext
现在的 next就是header之后,我手动添加的Inbound的handler
同样是调用本类方法invokeChannelActive()
,源码如下:
// todo 使Channel活跃 private void invokeChannelActive() { // todo 继续进去 ((ChannelInboundHandler) handler()).channelActive(this);
再次看到,回调, 我添加的handler.channelActive(this);
,进入查看
public class MyServerHandlerA extends ChannelInboundHandlerAdapter { // todo 当服务端的channel绑定上端口之后,就是 传播 channelActive 事件 // todo 事件传播到下面后,我们手动传播一个 channelRead事件 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().pipeline().fireChannelRead("hello MyServerHandlerA"); }
在我处理器中,继续往下传播手动添加的数据"hello MyServerHandlerA"
同样她会按找上面的顺序依次传播下去
最终她会来到tail , 在tail做了如下的工作, 源码如下
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // todo channelRead onUnhandledInboundMessage(msg); } protected void onUnhandledInboundException(Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception.", cause); } finally { ReferenceCountUtil.release(cause); } }
为什么Tail节点是 Inbound 类型的处理器?#
上一步就说明了为什么Tail为什么设计成Inbound, channel中的数据,无论服务端有没有使用,最终都要被释放掉,tail可以做到收尾的工作, 清理内存