outbound事件的传播#
什么是outBound事件#
创建的outbound事件如: connect,disconnect,bind,write,flush,read,close,register,deregister, outbound类型事件更多的是服务端主动发起的事件,如给主动channel绑定上端口,主动往channel写数据,主动关闭用户的的连接
开始读源码#
最典型的outbound事件,就是服务端往客户端写数据,准备测试用例如下:
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println( "hello OutBoundHandlerB"); ctx.write(ctx, promise); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(()->{ // todo 模拟给 客户端一个响应 ctx.channel().write("Hello World"); // 写法二 : ctx.write("Hello World"); },3, TimeUnit.SECONDS); } } public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter { // todo 当服务端的channel绑定上端口之后,就是 传播 channelActive 事件 // todo 事件传播到下面后,我们手动传播一个 channelRead事件 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println( "hello OutBoundHandlerA"); ctx.write(ctx, promise); } } public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println( "hello OutBoundHandlerC"); ctx.write(ctx, promise); } }
下面我们把断点调试,把断点打在OutBoundHandlerB
的handlerAdded
上, 模拟向客户端发送数据, 启动程序,大概的流程如下
- 等待服务端的启动
- 服务端Selector轮询服务端channel可能发生的感兴趣的事件
- 使用telnet向服务端发送请求
- 服务端创建客户端的channel,在给客户端的原生的chanenl注册到 Selector上
- 通过
invokeChannelAddedIfNeeded()
将我们添加在Initializer中的handler添加到pipeline中
- 挨个回调这些handler中的
channelAdded()
方法
- 和我们添加进去的顺序相反
- C --> B --->A
- 这些childHandler,会添加在每一条客户端的channel的pipeline
- 传播channel注册完成事件
- 传播channelActive事件
- readIfAutoRead() 完成二次注册netty可以处理的感兴趣的事件
此外,我们上面的write以定时任务的形式提交,当用ctx中的唯一的线程执行器三秒后去执行任务,所以程序会继续下去绑定端口, 过了三秒后把定时任务聚合到普通任务队列中,那时才会执行我们OutBoundHandlerB
中的ctx.channel().write("Hello World");
outBound类型的handler添加顺序和执行顺序有什么关系#
因为Outbound类型的事件是从链表的tail开始传播的,所以执行的顺序和我们的添加进去的顺序相反
篇幅太长了,重写补一张图
从ctx.channel().write("Hello World");
开始跟源码, 鼠标直接跟进去,进入的是ChannelOutboundInvoker
, 往channel中写,我们进入DefaultChannelPipeline
的实现,源码如下
@Override public final ChannelFuture write(Object msg) { return tail.write(msg); }
再一次的验证了,出站的事件是从尾部往前传递的, 我们知道,tail节点是DefaultChannelHandlerContext
类型的,所以我们看它的write()
方法是如何实现的
@Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
其中msg-->我们要写会客户端的内容, newPromise()默认的promise()
,继续跟进本类方法write(msg, newPromise())
,源码如下:
@Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } write(msg, false, promise); return promise; }
上面做了很多判断,其中我们只关心write(msg, false, promise);
源码如下:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); }
我们可以看到,重要的逻辑findContextOutbound();
它的源码如下, 从尾节点开始遍历链表,找到前一个outbound类型的handler
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
找到后,因为我们使用函数是write
而不是writeAndFlush
所以进入上面的else代码块invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
继续跟进invokeWrite0(msg, promise);
终于看到了handler的write逻辑
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
其中:
- (ChannelOutboundHandler) handler() -- 是tail前面的节点
- 调用当前节点的write函数
实际上就是回调我们自己的添加的handler的write函数,我们跟进去,源码如下:
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println( "hello OutBoundHandlerC"); ctx.write(msg, promise); } }
我们继续调用write, 按照相同的逻辑,msg会继续往前传递
一直传递到HeadContext节点, 因为这个节点也是Outbound类型的, 这就是Outbound事件的传播,我们直接看HeaderContext是如何收尾的, 源码如下:
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
Header使用了unsafe类,这没毛病,和数据读写有关的操作,最终都离不开unsafe
为什么Header节点是outBound类型的处理器?#
拿上面的write事件来说,msg经过这么多handler的加工,最终的目的是传递到客户端,所以netty把header设计为outBound类型的节点,由他完成往客户端的写
context.write()与context.channel().write()的区别#
context.write()
,会从当前的节点开始往前传播context.channel().write()
从尾节点开始依次往前传播
异常的传播#
netty中如果发生了异常的话,异常事件的传播和当前的节点是 入站和出站处理器是没关系的,一直往下一个节点传播,如果一直没有handler处理异常,最终由tail节点处理
最佳的异常处理解决方法#
既然异常的传播和入站和出站类型的处理器没关系,那么我们就在pipeline的最后,也就是tail之前,添加我们的统一异常处理器就好了, 就像下面:
public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // todo 异常处理的最佳实践, 最pipeline的最后添加异常处理handler channelPipeline.addLast(new myExceptionCaughtHandler()); } } public class myExceptionCaughtHandler extends ChannelInboundHandlerAdapter { // 最终全部的异常都会来到这里 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof 自定义异常1){ }else if(cause instanceof 自定义异常2){ } // todo 下面不要往下传播了 // super.exceptionCaught(ctx, cause); } }
SimpleChannelInboundHandler 的特点#
通过前面的分析,我们知道如果客户端的msg一味的往后传播,最终会传播到tail节点,由tail节点处理释放,从而避免了内存的泄露
如果我们的handler使用了msg之后没有往后传递就要倒霉了,时间久了就会出现内存泄露的问题
netty人性化的为我们提供的指定泛型的 SimpleChannelInboundHandler
,可以为我们自动的释放内存,我们看他是如何做到的
/ todo 直接继承于ChanelInboundHandlerAdapter的实现 抽象类 // todo 我们自己的处理器, 同样可以继承SimpleChannelInboundHandler适配器,达到相同的效果 public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter { private final TypeParameterMatcher matcher; private final boolean autoRelease; protected SimpleChannelInboundHandler() { this(true); } protected SimpleChannelInboundHandler(boolean autoRelease) { matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I"); this.autoRelease = autoRelease; } protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) { this(inboundMessageType, true); } protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) { matcher = TypeParameterMatcher.get(inboundMessageType); this.autoRelease = autoRelease; } public boolean acceptInboundMessage(Object msg) throws Exception { return matcher.match(msg); } // todo channelRead 完全被改写了 // todo 这其实又是一种设计模式 , 模板方法设计模式 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") // todo 把消息进行了强转 I imsg = (I) msg; // todo channelRead0()在他的父类中是抽象的,因此我们自己写handler时,需要重写它的这个抽象的 方法 , 在下面 // todo 这其实又是一种设计模式 , 模板方法设计模式 channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally {// todo 对msg的计数减一, 表示对消息的引用减一. 也就意味着我们不要在任何 if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception; }
- 它本身是抽象类,抽象方法是
channelRead0
,意味着我们需要重写这个方法 - 他继承了
ChannelInboundHandlerAdapter
这是个适配器类,使他可以仅实现部分自己需要的方法就ok
我们看它实现的channelRead
, 模板方法设计模式 主要做了如下三件事
- 将msg 强转成特定的泛型类型的数据
- 将ctx和msg传递给自己的chanenlRead0使用msg和ctx(ctx,msg)
- chanenlRead0使用msg和ctx
- 在finally代码块中,将msg释放