深入理解 Netty-Pipeline组件 (三)

简介: 深入理解 Netty-Pipeline组件 (三)

outbound事件的传播#


什么是outBound事件#


创建的outbound事件如: connect,disconnect,bind,write,flush,read,close,register,deregister, outbound类型事件更多的是服务端主动发起的事件,如给主动channel绑定上端口,主动往channel写数据,主动关闭用户的的连接


开始读源码#


最典型的outbound事件,就是服务端往客户端写数据,准备测试用例如下:


public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println( "hello OutBoundHandlerB");
        ctx.write(ctx, promise);
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(()->{
            // todo 模拟给 客户端一个响应
            ctx.channel().write("Hello World");
            // 写法二 :  ctx.write("Hello World");
        },3, TimeUnit.SECONDS);
    }
}
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
    // todo  当服务端的channel绑定上端口之后,就是 传播 channelActive 事件
    // todo   事件传播到下面后,我们手动传播一个 channelRead事件
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println( "hello OutBoundHandlerA");
        ctx.write(ctx, promise);
    }
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println( "hello OutBoundHandlerC");
        ctx.write(ctx, promise);
    }
}


下面我们把断点调试,把断点打在OutBoundHandlerBhandlerAdded上, 模拟向客户端发送数据, 启动程序,大概的流程如下

  • 等待服务端的启动
  • 服务端Selector轮询服务端channel可能发生的感兴趣的事件
  • 使用telnet向服务端发送请求
  • 服务端创建客户端的channel,在给客户端的原生的chanenl注册到 Selector上
  • 通过invokeChannelAddedIfNeeded()将我们添加在Initializer中的handler添加到pipeline中
  • 挨个回调这些handler中的channelAdded()方法
  • 和我们添加进去的顺序相反
  • C --> B --->A
  • 这些childHandler,会添加在每一条客户端的channel的pipeline
  • 传播channel注册完成事件
  • 传播channelActive事件
  • readIfAutoRead() 完成二次注册netty可以处理的感兴趣的事件

此外,我们上面的write以定时任务的形式提交,当用ctx中的唯一的线程执行器三秒后去执行任务,所以程序会继续下去绑定端口, 过了三秒后把定时任务聚合到普通任务队列中,那时才会执行我们OutBoundHandlerB中的ctx.channel().write("Hello World");


outBound类型的handler添加顺序和执行顺序有什么关系#


因为Outbound类型的事件是从链表的tail开始传播的,所以执行的顺序和我们的添加进去的顺序相反

篇幅太长了,重写补一张图



ctx.channel().write("Hello World");开始跟源码, 鼠标直接跟进去,进入的是ChannelOutboundInvoker, 往channel中写,我们进入DefaultChannelPipeline的实现,源码如下


@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}


再一次的验证了,出站的事件是从尾部往前传递的, 我们知道,tail节点是DefaultChannelHandlerContext类型的,所以我们看它的write()方法是如何实现的


@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}


其中msg-->我们要写会客户端的内容, newPromise()默认的promise()

,继续跟进本类方法write(msg, newPromise()),源码如下:


@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }
    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    write(msg, false, promise);
    return promise;
}


上面做了很多判断,其中我们只关心write(msg, false, promise); 源码如下:


private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }


我们可以看到,重要的逻辑findContextOutbound(); 它的源码如下, 从尾节点开始遍历链表,找到前一个outbound类型的handler


private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}


找到后,因为我们使用函数是write而不是writeAndFlush所以进入上面的else代码块invokeWrite


private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}


继续跟进invokeWrite0(msg, promise); 终于看到了handler的write逻辑


private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}


其中:

  • (ChannelOutboundHandler) handler() -- 是tail前面的节点
  • 调用当前节点的write函数

实际上就是回调我们自己的添加的handler的write函数,我们跟进去,源码如下:


public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println( "hello OutBoundHandlerC");
        ctx.write(msg, promise);
    }
}


我们继续调用write, 按照相同的逻辑,msg会继续往前传递

一直传递到HeadContext节点, 因为这个节点也是Outbound类型的, 这就是Outbound事件的传播,我们直接看HeaderContext是如何收尾的, 源码如下:


@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}


Header使用了unsafe类,这没毛病,和数据读写有关的操作,最终都离不开unsafe


为什么Header节点是outBound类型的处理器?#


拿上面的write事件来说,msg经过这么多handler的加工,最终的目的是传递到客户端,所以netty把header设计为outBound类型的节点,由他完成往客户端的写


context.write()与context.channel().write()的区别#


  • context.write(),会从当前的节点开始往前传播
  • context.channel().write() 从尾节点开始依次往前传播


异常的传播#


netty中如果发生了异常的话,异常事件的传播和当前的节点是 入站和出站处理器是没关系的,一直往下一个节点传播,如果一直没有handler处理异常,最终由tail节点处理


最佳的异常处理解决方法#


既然异常的传播和入站和出站类型的处理器没关系,那么我们就在pipeline的最后,也就是tail之前,添加我们的统一异常处理器就好了, 就像下面:


public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
      // todo 异常处理的最佳实践, 最pipeline的最后添加异常处理handler
      channelPipeline.addLast(new myExceptionCaughtHandler());
    }
}
public class myExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
// 最终全部的异常都会来到这里
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (cause instanceof 自定义异常1){
    }else if(cause instanceof  自定义异常2){
    }
    // todo 下面不要往下传播了
     // super.exceptionCaught(ctx, cause);
}
}


SimpleChannelInboundHandler 的特点#


通过前面的分析,我们知道如果客户端的msg一味的往后传播,最终会传播到tail节点,由tail节点处理释放,从而避免了内存的泄露

如果我们的handler使用了msg之后没有往后传递就要倒霉了,时间久了就会出现内存泄露的问题

netty人性化的为我们提供的指定泛型的 SimpleChannelInboundHandler ,可以为我们自动的释放内存,我们看他是如何做到的


/ todo 直接继承于ChanelInboundHandlerAdapter的实现 抽象类
// todo 我们自己的处理器, 同样可以继承SimpleChannelInboundHandler适配器,达到相同的效果
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
    private final TypeParameterMatcher matcher;
    private final boolean autoRelease;
    protected SimpleChannelInboundHandler() {
        this(true);
    }
    protected SimpleChannelInboundHandler(boolean autoRelease) {
        matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
        this.autoRelease = autoRelease;
    }
    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
        this(inboundMessageType, true);
    }
    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
        matcher = TypeParameterMatcher.get(inboundMessageType);
        this.autoRelease = autoRelease;
    }
    public boolean acceptInboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }
    // todo  channelRead 完全被改写了
    // todo 这其实又是一种设计模式 ,   模板方法设计模式
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                // todo 把消息进行了强转
                I imsg = (I) msg;
                //  todo channelRead0()在他的父类中是抽象的,因此我们自己写handler时,需要重写它的这个抽象的 方法 , 在下面
                // todo 这其实又是一种设计模式 ,   模板方法设计模式
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {// todo 对msg的计数减一, 表示对消息的引用减一. 也就意味着我们不要在任何
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
    protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}


  • 它本身是抽象类,抽象方法是channelRead0,意味着我们需要重写这个方法
  • 他继承了ChannelInboundHandlerAdapter 这是个适配器类,使他可以仅实现部分自己需要的方法就ok

我们看它实现的channelRead, 模板方法设计模式 主要做了如下三件事

  • 将msg 强转成特定的泛型类型的数据
  • 将ctx和msg传递给自己的chanenlRead0使用msg和ctx(ctx,msg)
  • chanenlRead0使用msg和ctx
  • 在finally代码块中,将msg释放
相关文章
|
Java
由浅入深Netty组件实战3
由浅入深Netty组件实战3
61 0
|
前端开发 算法 Java
由浅入深Netty组件实战2
由浅入深Netty组件实战2
224 0
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
74 0
|
Java
由浅入深Netty基础知识NIO三大组件原理实战 1
由浅入深Netty基础知识NIO三大组件原理实战
101 0
|
前端开发 安全 Java
由浅入深Netty组件实战1
由浅入深Netty组件实战1
85 0
|
6月前
|
设计模式 前端开发 网络协议
面试官:说说Netty的核心组件?
Netty 核心组件是指 Netty 在执行过程中所涉及到的重要概念,这些核心组件共同组成了 Netty 框架,使 Netty 框架能够正常的运行。 Netty 核心组件包含以下内容: 1. 启动器 Bootstrap/ServerBootstrap 2. 事件循环器 EventLoopGroup/EventLoop 3. 通道 Channel 4. 通道处理器 ChannelHandler 5. 通道管道 ChannelPipeline 这些组件的交互流程如下: ![image.png](https://cdn.nlark.com/yuque/0/2024/png/92791/1716
41 0
面试官:说说Netty的核心组件?
|
6月前
|
前端开发 Java 网络安全
【Netty 网络通信】Netty 核心组件
【1月更文挑战第9天】【Netty 网络通信】Netty 核心组件
|
6月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
341 0
|
6月前
|
网络协议 前端开发 Java
Netty Review - 核心组件扫盲
Netty Review - 核心组件扫盲
99 0
|
编解码 前端开发 Java
源码分析Netty:核心组件及启动过程分析
本篇从实例出发,了解Netty核心组件的概念、作用及串联过程。从概念到设计原理,再到深入了解实现细节,从而能够清晰地掌握Netty的技术细节甚至存在的问题,才能最终更好地支持我们实际的各项业务。
351 0