ChannelHandler

简介: ChannelHandlernetty中的ChannelHandler用于处理Channel对应的事件,每一个ChannelHandler都会和一个channel绑定,ChannelHandler体系如下:ChannelHandler接口里面只定义了三个生命周期方法: void ha...

ChannelHandler

netty中的ChannelHandler用于处理Channel对应的事件,每一个ChannelHandler都会和一个channel绑定,ChannelHandler体系如下:

image

ChannelHandler接口里面只定义了三个生命周期方法:

    void handlerAdded(ChannelHandlerContext var1) throws Exception;

    void handlerRemoved(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

子接口ChannelInboundHandler和ChannelOutboundHandler对ChannelHandler进行了扩展,但netty框架提供了ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler三个适配类,实际使用时继承这些适配类即可。

此外还有简化类SimpleChannelInboundHandler,继承SimpleChannelInboundHandler类后,会在接收到数据后⾃动release掉数据占⽤的Bytebuffer资源,并且继承该类需要指定数据格式。⽽继承ChannelInboundHandlerAdapter则不会⾃动释放Bytebuffer资源,需要⼿动调⽤ReferenceCountUtil.release()等⽅法进⾏释放,并且继承该类不需要指定数据格式。

实际编写server端时,需要继承ChannelInboundHandlerAdapter,防⽌数据未处理完就⾃动释放了。此外server端可能有多个客户端连接,并且每⼀个客户端请求的数据格式都不⼀致,相比之下ChannelInboundHandlerAdapter更灵活。

客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进⾏格式的转换了。

ChannelHandler中的三个生命周期方法分别对应如下场景:

当前ChannelHander加入ChannelHandlerContext中;

当前从ChannelHandlerContext中移除;

ChannelHandler回调方法出现异常时被回调.

ChannelInboundHandler和ChannelOutboundHandler

区别主要在于ChannelInboundHandler的channelRead和channelReadComplete回调和ChannelOutboundHandler的write和flush回调上,ChannelInboundHandler的channelRead回调负责执行入栈数据的decode逻辑,ChannelOutboundHandler的write负责执行出站数据的encode工作。

ChannelInboundHandler

ChannelInboundHandler定义了如下回调方法:

    void channelRegistered(ChannelHandlerContext var1) throws Exception;

    void channelUnregistered(ChannelHandlerContext var1) throws Exception;

    void channelActive(ChannelHandlerContext var1) throws Exception;

    void channelInactive(ChannelHandlerContext var1) throws Exception;

    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelReadComplete(ChannelHandlerContext var1) throws Exception;

    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

其回调时机为:

    channelRegistered 当前channel注册到EventLoop;
    
    channelUnregistered 当前channel从EventLoop取消注册;
    
    channelActive 当前channel激活的时候;
    
    channelInactive 当前channel失活的时候;
    
    channelRead 当前channel从远端读取到数据;
    
    channelReadComplete channel read消费完读取的数据的时候被触发;
    
    userEventTriggered 用户事件触发的时候;
    
    channelWritabilityChanged channel的写状态变化的时候触发。

ChannelHandlerContext作为参数,在每个回调事件处理完成之后,使用ChannelHandlerContext的fireChannelXXX方法来传递给pipeline中下一个ChannelHandler,netty的codec模块和业务处理代码分离就用到了这个链路处理。

ChannelOutboundHandler

ChannelOutboundHandler定义了如下回调方法:

    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;

    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;

    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void read(ChannelHandlerContext var1) throws Exception;

    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;

    void flush(ChannelHandlerContext var1) throws Exception;

回调方法触发时机:

    bind bind操作执行前触发;
    
    connect connect 操作执行前触发;
    
    disconnect disconnect 操作执行前触发;
    
    close close操作执行前触发;
    
    deregister deregister操作执行前触发;
    
    read read操作执行前触发;
    
    write write操作执行前触发;
    
    flush flush操作执行前触发;

对于ChannelPromise这个参数,可以调用它的addListener注册监听,当回调方法所对应的操作完成后,会触发这个监听下面的代码。

同样添加监听器的还有ChannelFuture,而ChannelFuture也是ChannelPromise的父接口:

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    ...
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);
    ...
}    

例如:

ctx.writeAndFlush(toFullHttpResponse()).addListener(ChannelFutureListener.CLOSE);

ChannelFutureListener.CLOSE这个监听器就会在writeAndFlush完成之后被调用来关闭channel:

    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };

ChannelHandlerContext

当ChannelHandler加入到ChannelPipeline的时候,会创建一个对应的ChannelHandlerContext并绑定,ChannelPipeline实际维护的是和ChannelHandlerContext的关系,例如在DefaultChannelPipeline:

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
}

DefaultChannelPipeline会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用。

而AbstractChannelHandlerContext中维护了next和prev指针:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    ...
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
}

这样ChannelHandlerContext之间形成了双向链表。

ChannelPipeline

在Channel创建的时候,会同时创建ChannelPipeline:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        this.id = this.newId();
        this.unsafe = this.newUnsafe();
        this.pipeline = this.newChannelPipeline();
    }
    
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
}

在ChannelPipeline中也会持有Channel的引用,ChannelPipeline会维护一个ChannelHandlerContext的双向链表,链表的头尾有默认实现:

public class DefaultChannelPipeline implements ChannelPipeline {

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }
}

我们添加的自定义ChannelHandler会插入到head和tail之间,以addLast为例:

    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return this.addLast((EventExecutorGroup)null, name, handler);
    }

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            ...
            this.addLast0(newCtx);
            ...
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = this.tail.prev;
        newCtx.prev = prev;
        newCtx.next = this.tail;
        prev.next = newCtx;
        this.tail.prev = newCtx;
    }

如果是ChannelInboundHandler的回调,根据插入的顺序从head向tail进行链式调用,ChannelOutboundHandler则相反:
image

值得注意的是,整条链路的调用需要通过Channel接口直接触发,如果使用ChannelContextHandler的接口方法间接触发,链路会从该ChannelContextHandler对应的ChannelHandler开始,而不是从头或尾开始。

ChannelPipeline入口在NioEventLoop的processSelectedKey():

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            ...
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {...}
    }

当为OP_READ事件时,调用unsafe.read():

        @Override
        public final void read() {
            final ChannelPipeline pipeline = pipeline();
            ...
            try {
                do {
                    ...
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    ....
                } while (allocHandle.continueReading());
            } catch (Throwable t) {...}
        }

pipeline.fireChannelRead(byteBuf)是入栈入口,实际上是pipeline中ChannelHandlerContext的head节点进行fireChannelRead的同语义操作。

ChannelPipeline相关元素之间的关系如下:

image

1. 每个Channel都会绑定且只绑定一个ChannelPipeline,ChannelPipeline中也会持有Channel的引用;

2. ChannelPipeline持有ChannelHandlerContext链路;

3. 每个ChannelHandlerContext对应一个ChannelHandler;

4. ChannelHandlerContext同时也会持有ChannelPipeline引用,也就间接持有Channel引用;

5. ChannelHandler链路会根据Handler的类型,分为InBound和OutBound两条链路,inbound处理入栈数据,outbound处理出栈数据。
目录
相关文章
|
3天前
|
编解码 开发者
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
64 0
|
10月前
|
存储 缓存 安全
Netty实战(六)ChannelHandler和ChannelPipeline
ChannelHandler 的生命周期发生在ChannelHandler被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时。这些方法中的每一个都接受一个 ChannelHandlerContext 参数
114 0
|
10月前
|
监控 安全 Java
Netty之EventLoop 解读
Netty之EventLoop 解读
|
前端开发 算法 Java
Netty组件Future、Promise、Handler、Pipline、ByteBuf
Netty组件Future、Promise、Handler、Pipline、ByteBuf
57 0
|
安全
Netty之EventLoop
EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理channel上源源不断的io事件。
Netty之EventLoop
|
安全
关于netty的EventLoop
关于netty的EventLoop
关于netty的EventLoop
|
缓存 移动开发 网络协议
第 8 章 Netty 编解码器和 Handler 调用机制
第 8 章 Netty 编解码器和 Handler 调用机制
140 0
|
缓存 前端开发 网络协议
Netty4 ChannelHandler 概述(通道篇)
Netty4 ChannelHandler 概述(通道篇)
Netty4 ChannelHandler 概述(通道篇)
|
缓存 Java API
【Netty】ChannelHandler和ChannelPipeline
前面学习了Netty的ByteBuf,接着学习ChannelHandler和ChannelPipeline。
120 0
【Netty】ChannelHandler和ChannelPipeline
netty系列之:channelPipeline详解
netty系列之:channelPipeline详解