深入理解 Netty-Pipeline组件 (二)

简介: 深入理解 Netty-Pipeline组件 (二)

inbound事件的传播#


什么是inbound事件#


inbound事件其实就是客户端主动发起事件,比如说客户端请求连接,连接后客户端有主动的给服务端发送需要处理的有效数据等,只要是客户端主动发起的事件,都算是Inbound事件,特征就是事件触发类型,当channel处于某个节点,触发服务端传播哪些动作


netty如何对待inbound#


netty为了更好的处理channel中的数据,给jdk原生的channel添加了pipeline组件,netty会把原生jdk的channel中的数据导向这个pipeline,从pipeline中的header开始 往下传播, 用户对这个过程拥有百分百的控制权,可以把数据拿出来处理, 也可以往下传播,一直传播到tail节点,tail节点会进行回收,如果在传播的过程中,最终没到尾节点,自己也没回收,就会面临内存泄露的问题

一句话总结,面对Inbound的数据, 被动传播


netty知道客户端发送过来的数据是啥类型吗?#


比如一个聊天程序,客户端可能发送的是心跳包,也可能发送的是聊天的内容,netty又不是人,他是不知道数据是啥的,他只知道来了数据,需要进一步处理,怎么处理呢? 把数据导向用户指定的handler链条


开始读源码#


这里书接上一篇博客的尾部,事件的传播

重点步骤如下

第一步: 等待服务端启动完成

第二步: 使用telnet模拟发送请求 --- > 新连接的接入逻辑

第三步: register0(ChannelPromise promise)方法中会传播channel激活事件 --> 目的是二次注册端口,

第三个也是我们程序的入手点: fireChannelActive() 源码如下:


@Override
public final ChannelPipeline fireChannelActive() {
    // todo ChannelActive从head开始传播
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}


调用了AbstractChannelHandlerContextinvokeChannelActive方法


在这里,我觉得特别有必须有必要告诉自己 AbstractChannelHandlerContext的重要性,现在的DefaultChannelPipeline中的每一个节点,包括header,tail,以及我们自己的添加的,都是AbstractChannelHandlerContext类型,事件的传播围绕着AbstractChannelHandlerContext的方法开始,温习它的继承体系如下图


接着回到AbstractChannelHandlerContext.invokeChannelActive(head); , 很显然,这是个静态方法, 跟进去,源码如下:


// todo 来这
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
    next.invokeChannelActive();
...
}


  • 第一点: inbound类型的事件是从header开始传播的 , next --> HeaderContext
  • 第二点: HeaderContext其实就是 AbstractChannelHandlerContext类型的,所以 invokeChannelActive()其实是当前类的方法

ok,跟进入看看他干啥了,源码:


// todo 使Channel活跃
private void invokeChannelActive() {
// todo 继续进去
((ChannelInboundHandler) handler()).channelActive(this);
}


我们看, 上面的代码做了如下几件事

  • handler() -- 返回当前的 handler, 就是从HandlerContext中拿出handler
  • 强转成ChannelInboundHandler类型的,因为他是InBound类型的处理器

如果我们用鼠标点击channelActive(this), 毫无疑问会进入ChannelInboundHandler,看到的是抽象方法

那么问题来了, 谁实现的它?

其实是headerContext 头结点做的, 之前说过,Inbound事件,是从header开始传播的,继续跟进去, 看源码:


// todo 来到这里, 分两步, 1. 把ChannelActive事件继续往下传播, 传播结束之后,做第二件事
// todo                  2.     readIfIsAutoRead();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// todo  fireChannelActive是在做了实际的端口绑定之后才触发回调
ctx.fireChannelActive();
// todo 默认方式注册一个read事件
// todo 跟进去, readIfIsAutoRead 作用是 把已经注册进selector上的事件, 重新注册绑定上在初始化NioServerSocketChannel时添加的Accept事件
// todo 目的是 新连接到来时, selector轮询到accept事件, 使得netty可以进一步的处理这个事件
readIfIsAutoRead();
}


其实这里有两种重要的事情 , 上面我们也看到了:

  • 向下传播channelActive() 目的是让header后面的用户添加的handler中的channelActive()被回调
  • readIfIsAutoRead(); 就是去注册Netty能看懂的感兴趣的事件

下面我们看它的事件往下传播, 于是重新回到了AbstractChannelHandlerContext, 源码如下:


public ChannelHandlerContext fireChannelActive() {
        invokeChannelActive(findContextInbound());
        return this;
    }


  • findContextInbound()找出下一个Inbound类型的处理器, 我们去看他的实现,源码如下:


private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}


是不是明明白白的? 从当前节点开始,往后变量整个链表, 下一个节点是谁呢? 在新链接接入的逻辑中,调用的ChannelInitializer我手动 批量添加了三个InboundHandler,按照我添加的顺序,他们会依次被找到

继续跟进本类方法invokeChannelActive(findContextInbound()),源码如下


// todo 来这
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
...


一开始的next--> HeaderContext

现在的 next就是header之后,我手动添加的Inbound的handler

同样是调用本类方法invokeChannelActive(),源码如下:


// todo 使Channel活跃
private void invokeChannelActive() {
    // todo 继续进去
    ((ChannelInboundHandler) handler()).channelActive(this);


再次看到,回调, 我添加的handler.channelActive(this); ,进入查看


public class MyServerHandlerA extends ChannelInboundHandlerAdapter {
// todo  当服务端的channel绑定上端口之后,就是 传播 channelActive 事件
// todo   事件传播到下面后,我们手动传播一个 channelRead事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().pipeline().fireChannelRead("hello MyServerHandlerA");
}


在我处理器中,继续往下传播手动添加的数据"hello MyServerHandlerA"

同样她会按找上面的顺序依次传播下去

最终她会来到tail , 在tail做了如下的工作, 源码如下


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // todo channelRead
    onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundException(Throwable cause) {
try {
    logger.warn(
            "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                    "It usually means the last handler in the pipeline did not handle the exception.",
            cause);
} finally {
    ReferenceCountUtil.release(cause);
}
}


为什么Tail节点是 Inbound 类型的处理器?#


上一步就说明了为什么Tail为什么设计成Inbound, channel中的数据,无论服务端有没有使用,最终都要被释放掉,tail可以做到收尾的工作, 清理内存


相关文章
|
7月前
|
监控 前端开发 网络协议
Netty核心组件详解
Netty核心组件详解
33 0
|
8月前
|
前端开发 Java 调度
Netty中有哪些核心组件?
最近又有粉丝问我这样一个问题,说Netty中最核心的组件有哪些?它们都起什么作用?今天,给大家详细聊一聊
53 0
|
9月前
|
编解码 前端开发 网络协议
Netty实战(三)Netty的组件和设计
对于 Channel、EventLoop 和 ChannelFuture 类进行的讨论
80 0
|
11月前
|
安全 前端开发 Java
Netty组件
Netty组件
38 0
|
设计模式 前端开发 测试技术
深入理解 Netty-Pipeline组件 (三)
深入理解 Netty-Pipeline组件 (三)
114 0
|
消息中间件 分布式计算 前端开发
03、Netty学习笔记—(Netty组件学习)
03、Netty学习笔记—(Netty组件学习)
03、Netty学习笔记—(Netty组件学习)
|
Java
深入理解 Netty-Pipeline组件 (一)
深入理解 Netty-Pipeline组件 (一)
171 0
|
存储 前端开发 Java
【Netty】Netty核心组件介绍
前篇博文体验了Netty的第一个示例,下面接着学习Netty的组件和其设计。
252 0
【Netty】Netty核心组件介绍
【Netty】Netty 核心组件 ( Pipeline | ChannelPipeline )
【Netty】Netty 核心组件 ( Pipeline | ChannelPipeline )
149 0
【Netty】Netty 核心组件 ( Pipeline | ChannelPipeline )