ChannelHandler
netty中的ChannelHandler用于处理Channel对应的事件,每一个ChannelHandler都会和一个channel绑定,ChannelHandler体系如下:
ChannelHandler接口里面只定义了三个生命周期方法:
void handlerAdded(ChannelHandlerContext var1) throws Exception;
void handlerRemoved(ChannelHandlerContext var1) throws Exception;
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
子接口ChannelInboundHandler和ChannelOutboundHandler对ChannelHandler进行了扩展,但netty框架提供了ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler三个适配类,实际使用时继承这些适配类即可。
此外还有简化类SimpleChannelInboundHandler,继承SimpleChannelInboundHandler类后,会在接收到数据后⾃动release掉数据占⽤的Bytebuffer资源,并且继承该类需要指定数据格式。⽽继承ChannelInboundHandlerAdapter则不会⾃动释放Bytebuffer资源,需要⼿动调⽤ReferenceCountUtil.release()等⽅法进⾏释放,并且继承该类不需要指定数据格式。
实际编写server端时,需要继承ChannelInboundHandlerAdapter,防⽌数据未处理完就⾃动释放了。此外server端可能有多个客户端连接,并且每⼀个客户端请求的数据格式都不⼀致,相比之下ChannelInboundHandlerAdapter更灵活。
客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进⾏格式的转换了。
ChannelHandler中的三个生命周期方法分别对应如下场景:
当前ChannelHander加入ChannelHandlerContext中;
当前从ChannelHandlerContext中移除;
ChannelHandler回调方法出现异常时被回调.
ChannelInboundHandler和ChannelOutboundHandler
区别主要在于ChannelInboundHandler的channelRead和channelReadComplete回调和ChannelOutboundHandler的write和flush回调上,ChannelInboundHandler的channelRead回调负责执行入栈数据的decode逻辑,ChannelOutboundHandler的write负责执行出站数据的encode工作。
ChannelInboundHandler
ChannelInboundHandler定义了如下回调方法:
void channelRegistered(ChannelHandlerContext var1) throws Exception;
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
void channelActive(ChannelHandlerContext var1) throws Exception;
void channelInactive(ChannelHandlerContext var1) throws Exception;
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
其回调时机为:
channelRegistered 当前channel注册到EventLoop;
channelUnregistered 当前channel从EventLoop取消注册;
channelActive 当前channel激活的时候;
channelInactive 当前channel失活的时候;
channelRead 当前channel从远端读取到数据;
channelReadComplete channel read消费完读取的数据的时候被触发;
userEventTriggered 用户事件触发的时候;
channelWritabilityChanged channel的写状态变化的时候触发。
ChannelHandlerContext作为参数,在每个回调事件处理完成之后,使用ChannelHandlerContext的fireChannelXXX方法来传递给pipeline中下一个ChannelHandler,netty的codec模块和业务处理代码分离就用到了这个链路处理。
ChannelOutboundHandler
ChannelOutboundHandler定义了如下回调方法:
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void read(ChannelHandlerContext var1) throws Exception;
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
void flush(ChannelHandlerContext var1) throws Exception;
回调方法触发时机:
bind bind操作执行前触发;
connect connect 操作执行前触发;
disconnect disconnect 操作执行前触发;
close close操作执行前触发;
deregister deregister操作执行前触发;
read read操作执行前触发;
write write操作执行前触发;
flush flush操作执行前触发;
对于ChannelPromise这个参数,可以调用它的addListener注册监听,当回调方法所对应的操作完成后,会触发这个监听下面的代码。
同样添加监听器的还有ChannelFuture,而ChannelFuture也是ChannelPromise的父接口:
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
...
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);
...
}
例如:
ctx.writeAndFlush(toFullHttpResponse()).addListener(ChannelFutureListener.CLOSE);
ChannelFutureListener.CLOSE这个监听器就会在writeAndFlush完成之后被调用来关闭channel:
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
ChannelHandlerContext
当ChannelHandler加入到ChannelPipeline的时候,会创建一个对应的ChannelHandlerContext并绑定,ChannelPipeline实际维护的是和ChannelHandlerContext的关系,例如在DefaultChannelPipeline:
public class DefaultChannelPipeline implements ChannelPipeline {
...
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
}
DefaultChannelPipeline会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用。
而AbstractChannelHandlerContext中维护了next和prev指针:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
...
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
}
这样ChannelHandlerContext之间形成了双向链表。
ChannelPipeline
在Channel创建的时候,会同时创建ChannelPipeline:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
在ChannelPipeline中也会持有Channel的引用,ChannelPipeline会维护一个ChannelHandlerContext的双向链表,链表的头尾有默认实现:
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}
}
我们添加的自定义ChannelHandler会插入到head和tail之间,以addLast为例:
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return this.addLast((EventExecutorGroup)null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
...
this.addLast0(newCtx);
...
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx;
}
如果是ChannelInboundHandler的回调,根据插入的顺序从head向tail进行链式调用,ChannelOutboundHandler则相反:
值得注意的是,整条链路的调用需要通过Channel接口直接触发,如果使用ChannelContextHandler的接口方法间接触发,链路会从该ChannelContextHandler对应的ChannelHandler开始,而不是从头或尾开始。
ChannelPipeline入口在NioEventLoop的processSelectedKey():
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {...}
}
当为OP_READ事件时,调用unsafe.read():
@Override
public final void read() {
final ChannelPipeline pipeline = pipeline();
...
try {
do {
...
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
....
} while (allocHandle.continueReading());
} catch (Throwable t) {...}
}
pipeline.fireChannelRead(byteBuf)是入栈入口,实际上是pipeline中ChannelHandlerContext的head节点进行fireChannelRead的同语义操作。
ChannelPipeline相关元素之间的关系如下:
1. 每个Channel都会绑定且只绑定一个ChannelPipeline,ChannelPipeline中也会持有Channel的引用;
2. ChannelPipeline持有ChannelHandlerContext链路;
3. 每个ChannelHandlerContext对应一个ChannelHandler;
4. ChannelHandlerContext同时也会持有ChannelPipeline引用,也就间接持有Channel引用;
5. ChannelHandler链路会根据Handler的类型,分为InBound和OutBound两条链路,inbound处理入栈数据,outbound处理出栈数据。