ChannelHandler

简介: ChannelHandlernetty中的ChannelHandler用于处理Channel对应的事件,每一个ChannelHandler都会和一个channel绑定,ChannelHandler体系如下:ChannelHandler接口里面只定义了三个生命周期方法: void ha...

ChannelHandler

netty中的ChannelHandler用于处理Channel对应的事件,每一个ChannelHandler都会和一个channel绑定,ChannelHandler体系如下:

image

ChannelHandler接口里面只定义了三个生命周期方法:

    void handlerAdded(ChannelHandlerContext var1) throws Exception;

    void handlerRemoved(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

子接口ChannelInboundHandler和ChannelOutboundHandler对ChannelHandler进行了扩展,但netty框架提供了ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler三个适配类,实际使用时继承这些适配类即可。

此外还有简化类SimpleChannelInboundHandler,继承SimpleChannelInboundHandler类后,会在接收到数据后⾃动release掉数据占⽤的Bytebuffer资源,并且继承该类需要指定数据格式。⽽继承ChannelInboundHandlerAdapter则不会⾃动释放Bytebuffer资源,需要⼿动调⽤ReferenceCountUtil.release()等⽅法进⾏释放,并且继承该类不需要指定数据格式。

实际编写server端时,需要继承ChannelInboundHandlerAdapter,防⽌数据未处理完就⾃动释放了。此外server端可能有多个客户端连接,并且每⼀个客户端请求的数据格式都不⼀致,相比之下ChannelInboundHandlerAdapter更灵活。

客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进⾏格式的转换了。

ChannelHandler中的三个生命周期方法分别对应如下场景:

当前ChannelHander加入ChannelHandlerContext中;

当前从ChannelHandlerContext中移除;

ChannelHandler回调方法出现异常时被回调.

ChannelInboundHandler和ChannelOutboundHandler

区别主要在于ChannelInboundHandler的channelRead和channelReadComplete回调和ChannelOutboundHandler的write和flush回调上,ChannelInboundHandler的channelRead回调负责执行入栈数据的decode逻辑,ChannelOutboundHandler的write负责执行出站数据的encode工作。

ChannelInboundHandler

ChannelInboundHandler定义了如下回调方法:

    void channelRegistered(ChannelHandlerContext var1) throws Exception;

    void channelUnregistered(ChannelHandlerContext var1) throws Exception;

    void channelActive(ChannelHandlerContext var1) throws Exception;

    void channelInactive(ChannelHandlerContext var1) throws Exception;

    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelReadComplete(ChannelHandlerContext var1) throws Exception;

    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

其回调时机为:

    channelRegistered 当前channel注册到EventLoop;
    
    channelUnregistered 当前channel从EventLoop取消注册;
    
    channelActive 当前channel激活的时候;
    
    channelInactive 当前channel失活的时候;
    
    channelRead 当前channel从远端读取到数据;
    
    channelReadComplete channel read消费完读取的数据的时候被触发;
    
    userEventTriggered 用户事件触发的时候;
    
    channelWritabilityChanged channel的写状态变化的时候触发。

ChannelHandlerContext作为参数,在每个回调事件处理完成之后,使用ChannelHandlerContext的fireChannelXXX方法来传递给pipeline中下一个ChannelHandler,netty的codec模块和业务处理代码分离就用到了这个链路处理。

ChannelOutboundHandler

ChannelOutboundHandler定义了如下回调方法:

    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;

    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;

    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void read(ChannelHandlerContext var1) throws Exception;

    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;

    void flush(ChannelHandlerContext var1) throws Exception;

回调方法触发时机:

    bind bind操作执行前触发;
    
    connect connect 操作执行前触发;
    
    disconnect disconnect 操作执行前触发;
    
    close close操作执行前触发;
    
    deregister deregister操作执行前触发;
    
    read read操作执行前触发;
    
    write write操作执行前触发;
    
    flush flush操作执行前触发;

对于ChannelPromise这个参数,可以调用它的addListener注册监听,当回调方法所对应的操作完成后,会触发这个监听下面的代码。

同样添加监听器的还有ChannelFuture,而ChannelFuture也是ChannelPromise的父接口:

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    ...
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);
    ...
}    

例如:

ctx.writeAndFlush(toFullHttpResponse()).addListener(ChannelFutureListener.CLOSE);

ChannelFutureListener.CLOSE这个监听器就会在writeAndFlush完成之后被调用来关闭channel:

    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };

ChannelHandlerContext

当ChannelHandler加入到ChannelPipeline的时候,会创建一个对应的ChannelHandlerContext并绑定,ChannelPipeline实际维护的是和ChannelHandlerContext的关系,例如在DefaultChannelPipeline:

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
}

DefaultChannelPipeline会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用。

而AbstractChannelHandlerContext中维护了next和prev指针:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    ...
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
}

这样ChannelHandlerContext之间形成了双向链表。

ChannelPipeline

在Channel创建的时候,会同时创建ChannelPipeline:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        this.id = this.newId();
        this.unsafe = this.newUnsafe();
        this.pipeline = this.newChannelPipeline();
    }
    
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
}

在ChannelPipeline中也会持有Channel的引用,ChannelPipeline会维护一个ChannelHandlerContext的双向链表,链表的头尾有默认实现:

public class DefaultChannelPipeline implements ChannelPipeline {

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }
}

我们添加的自定义ChannelHandler会插入到head和tail之间,以addLast为例:

    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return this.addLast((EventExecutorGroup)null, name, handler);
    }

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            ...
            this.addLast0(newCtx);
            ...
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = this.tail.prev;
        newCtx.prev = prev;
        newCtx.next = this.tail;
        prev.next = newCtx;
        this.tail.prev = newCtx;
    }

如果是ChannelInboundHandler的回调,根据插入的顺序从head向tail进行链式调用,ChannelOutboundHandler则相反:
image

值得注意的是,整条链路的调用需要通过Channel接口直接触发,如果使用ChannelContextHandler的接口方法间接触发,链路会从该ChannelContextHandler对应的ChannelHandler开始,而不是从头或尾开始。

ChannelPipeline入口在NioEventLoop的processSelectedKey():

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            ...
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {...}
    }

当为OP_READ事件时,调用unsafe.read():

        @Override
        public final void read() {
            final ChannelPipeline pipeline = pipeline();
            ...
            try {
                do {
                    ...
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    ....
                } while (allocHandle.continueReading());
            } catch (Throwable t) {...}
        }

pipeline.fireChannelRead(byteBuf)是入栈入口,实际上是pipeline中ChannelHandlerContext的head节点进行fireChannelRead的同语义操作。

ChannelPipeline相关元素之间的关系如下:

image

1. 每个Channel都会绑定且只绑定一个ChannelPipeline,ChannelPipeline中也会持有Channel的引用;

2. ChannelPipeline持有ChannelHandlerContext链路;

3. 每个ChannelHandlerContext对应一个ChannelHandler;

4. ChannelHandlerContext同时也会持有ChannelPipeline引用,也就间接持有Channel引用;

5. ChannelHandler链路会根据Handler的类型,分为InBound和OutBound两条链路,inbound处理入栈数据,outbound处理出栈数据。
目录
相关文章
|
前端开发 JavaScript
Jupyter Notebook自动补全代码配置
Jupyter Notebook自动补全代码配置
2125 0
Jupyter Notebook自动补全代码配置
|
存储 JSON 关系型数据库
基于GeoTools的GeoJson导入到PostGis实战
GeoJson是一种对各种地理数据结构进行编码的格式,基于json的地理空间信息数据交换格式。GeoJson对象可以用来表示几何,特征或者特征集合。支持地理点、线、面、多点、多线、多面及几何集合。GeoJson不是本文的重点,因此不再赘述。
2096 0
基于GeoTools的GeoJson导入到PostGis实战
|
4月前
|
存储 算法
飞桨x昇腾生态适配方案:09_Paddle转ONNX
本节主要介绍如何将 PP-OCRv4 模型转化为 ONNX 模型,包括环境准备、模型下载、训练模型转 inference 模型及最终转为 ONNX 格式的过程。首先需安装 Paddle2ONNX 和 ONNXRuntime,接着下载并解压训练模型。通过 `export_model.py` 脚本将训练模型转化为 inference 模型,生成包含结构和参数的文件。最后使用 Paddle2ONNX 工具完成到 ONNX 格式的转换,并可选地使用 onnxslim 进行模型优化。各步骤均提供详细命令与参数说明,便于实际操作与部署。
221 9
|
安全 Java
java线程之List集合并发安全问题及解决方案
java线程之List集合并发安全问题及解决方案
1280 1
|
Rust API Go
API 网关 OpenID Connect 实战:单点登录(SSO)如此简单
单点登录(SSO)可解决用户在多系统间频繁登录的问题,OIDC 因其标准化、简单易用及安全性等优势成为实现 SSO 的优选方案,本文通过具体步骤示例对 Higress 中开源的 OIDC Wasm 插件进行了介绍,帮助用户零代码实现 SSO 单点登录。
1364 114
|
10月前
|
安全 API 开发者
微信开发者工具里面没有企业微信模式
企业微信与普通微信在应用场景和开发体系上存在本质区别,主要体现在身份认证、功能丰富性和开放能力等方面。企业微信开发需使用特定的API和工具,本文介绍了企业微信开发的基本步骤、特点及开发进度安排,帮助开发者更好地理解和应用企业微信的开发环境。
bootstrap+fileinput插件实现可预览上传照片功能
bootstrap+fileinput插件实现可预览上传照片功能
246 0
|
JavaScript 前端开发 搜索推荐
推荐5款免费、开箱即用的Vue后台管理系统模板
推荐5款免费、开箱即用的Vue后台管理系统模板
628 1
|
XML JavaScript 前端开发
vue实战——图标,请使用SVG!(含插件vue-svg-icon的使用)
vue实战——图标,请使用SVG!(含插件vue-svg-icon的使用)
846 1

热门文章

最新文章