一、ChannelHandler
1.1 Channel 的生命周期
Channel 主要有四个生命周期向下表所示:
状 态 | 描 述 |
---|---|
ChannelUnregistered | Channel 已经被创建,但还未注册到 EventLoop |
ChannelRegistered | Channel 已经被注册到了 EventLoop |
ChannelActive | Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 |
ChannelInactive | Channel 没有连接到远程节点 |
当channel的生命周期发生改变后,会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。如图示:
1.2 ChannelHandler 的生命周期
ChannelHandler 的生命周期发生在ChannelHandler被添加到 ChannelPipeline 中或者被从
ChannelPipeline 中移除时。这些方法中的每一个都接受一个 ChannelHandlerContext 参数。
类 型 | 描 述 | |
---|---|---|
handlerAdded | 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用 | |
handlerRemoved | 当从 ChannelPipeline 中移除 ChannelHandler 时被调用 | |
exceptionCaught | 当处理过程中在 ChannelPipeline 中有错误产生时被调用 |
1.3 ChannelInboundHandler 接口
ChannelInboundHandler用以处理入站数据以及各种状态变化。
下面是一些常用的ChannelInboundHandler方法,它们一般在数据被接收时或者与其对应的 Channel 状态发生改变时被调用。
类 型 | 描 述 |
---|---|
channelRegistered | 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用 |
channelUnregistered | 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用 |
channelActive | 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪 |
channelInactive | 当 Channel 离开活动状态并且不再连接它的远程节点时被调用 |
channelReadComplete | 当Channel上的一个读操作完成时被调用 |
channelRead | 当从 Channel 读取数据时被调用 |
ChannelWritabilityChanged | 当 Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用 Channel 的 isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过 Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置 |
userEventTriggered | 当 ChannelnboundHandler.fireUserEventTriggered |
当某个 ChannelInboundHandler 的实现重写 channelRead()方法时,它将负责显式地释放与池化的ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCountUtil.release()。
如下面代码所示:
@Sharable
//扩展ChannelInboundHandlerAdapter
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//丢弃读取到的消息
ReferenceCountUtil.release(msg);
}
}
Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用 SimpleChannelInboundHandler。
像下面这个代码:
@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// 不需要任何显式的资源释放
}
}
由于 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。(PS:Netty实战(二)中我们提到过客户端为什么继承它的原因就是因为它会自动的释放资源)
1.4 ChannelOutboundHandler 接口
ChannelOutboundHandler用以处理出站数据并且允许拦截所有的操作。
当它处理出站操作和数据时它的方法将被 Channel、ChannelPipeline 以及 ChannelHandlerContext 调用。
ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。
下面是ChannelOutboundHandler本身所定义的所有方法:
类 型 | 描 述 |
---|---|
bind(ChannelHandlerContext,SocketAddress,ChannelPromise) | 当请求将 Channel 绑定到本地地址时被调用 |
connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise) | 当请求将 Channel 连接到远程节点时被调用 |
disconnect(ChannelHandlerContext,ChannelPromise) | 当请求将 Channel 从远程节点断开时被调用 |
close(ChannelHandlerContext,ChannelPromise) | 当请求关闭 Channel 时被调用 |
deregister(ChannelHandlerContext,ChannelPromise) | 当请求将 Channel 从它的 EventLoop 注销时被调用 |
read(ChannelHandlerContext) | 当请求从 Channel 读取更多的数据时被调用 |
flush(ChannelHandlerContext) | 当请求通过 Channel 将入队数据冲刷到远程节点时被调用 |
write(ChannelHandlerContext,Object,ChannelPromise) | 当请求通过 Channel 将数据写到远程节点时被调用 |
ChannelPromise与ChannelFuture ChannelOutboundHandler中的大部分方法都需要一个
ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变。
1.5 ChannelHandler 适配器
我们可以使用ChannelInboundHandlerAdapter 和
ChannelOutboundHandlerAdapter类作为自己的 ChannelHandler 的起始点
。
这两个适配器分别提供了 ChannelInboundHandler和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter,它们获得了它们共同的超接口 ChannelHandler 的方法。
像下面这样:
ChannelHandlerAdapter 还提供了实用方法 isSharable()。如果其对应的实现被标注为 Sharable,那么这个方法将返回 true,表示它可以被添加到多个 ChannelPipeline中。
在 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法,从而将事件转发到了 ChannelPipeline 中的下一个ChannelHandler 中。
当我们要想在自己的 ChannelHandler 中使用这些适配器类时,只需要简单地扩展它们,并且重写那些你想要自定义的方法。
1.6 资源管理
每当通过调用 ChannelInboundHandler.channelRead()或者
ChannelOutboundHandler.write()方法来处理数据时,都需要确保没有任何的资源泄漏。
Netty 使用引用计数来处理池化的 ByteBuf。所以在完全使用完某个ByteBuf 后,调整其引用计数是很重要的。
Netty提供了class ResourceLeakDetector 帮助我们诊断潜在的(资源泄漏)问题。它将对你应用程序的缓冲区分配做大约 1%的采样来检测内存泄露。相关的开销是非常小的。
如果检测到了内存泄露,将会产生类似于下面的日志消息:
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel().
Netty 目前定义了 4 种泄漏检测级别,像下面这样:
级 别 | 描 述 |
---|---|
DISABLED | 禁用泄漏检测。只有在详尽的测试之后才应设置为这个值 |
SIMPLE | 使用 1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分的情况 |
ADVANCED | 使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置 |
PARANOID | 类似于 ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大的影响,应该只在调试阶段使用 |
内存泄露检测级别可以通过将下面的 Java 系统属性设置为表中的一个值来定义:
java -Dio.netty.leakDetectionLevel=ADVANCED
如果带着该 JVM 选项重新启动你的应用程序,你将看到自己的应用程序最近被泄漏的缓冲
区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
...
那么实现 ChannelInboundHandler.channelRead()和 ChannelOutboundHandler.write()方法时,应该如何使用这个诊断工具来防止泄露呢?
1、消费入站消息的简单方式:由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被
称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在消息被 channelRead0()方法消费之后自动释放消息。
看下面代码:
@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
@Override
//通过调用 ReferenceCountUtil.release() ChannelInboundandlerAdapter方法释放资源
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
2、在出站方向这边,如果你处理了 write()操作并丢弃了一个消息,那么你也应该负责释放它。
看下面的代码:
@Sharable
public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,Object msg, ChannelPromise promise) {
//通过使用 ReferenceCountUtil.realse(...)方法释放资源
ReferenceCountUtil.release(msg);
//通知资源已经释放了(不仅要释放资源,还要通知 ChannelPromise。否则可能会出现 ChannelFutureListener 收不到某个消息已经被处理了的通知的情况。)
promise.setSuccess();
}
}
如果一个消息被消费或者丢弃了,且没有传递给 ChannelPipeline 中的下个ChannelOutboundHandler,那么用户就有责任调用 ReferenceCountUtil.release()。
如果消息到达了实际的传输层,那么当它被写入时或者 Channel 关闭时,都将被自动释放。
二、ChannelPipeline 接口
ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler 实例链。每一个新创建的
Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性的;
Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler。
ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler 交互。 ChannelHandler 可以通知其所属的 ChannelPipeline 中的下一 个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline(这里指修改 ChannelPipeline 中的 ChannelHandler 的编排)。ChannelHandlerContext 具有丰富的用于处理事件和执行 I/O 操作的 API。
在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline 将跳过该ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)
2.1 修改 ChannelPipeline
ChannelHandler 可以通过添加、删除或者替换其他的 ChannelHandler 来实时地修改ChannelPipeline
的布局。(它也可以将它自己从 ChannelPipeline 中移除。)
下面是一些ChannelHandler修改 ChannelPipeline 的方法:
名 称 | 描 述 |
---|---|
AddFirstaddBefore,addAfteraddLast | 将一个ChannelHandler 添加到ChannelPipeline 中 |
remove | 将一个ChannelHandler 从ChannelPipeline 中移除 |
replace | 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler |
展示以下如何修改 ChannelPipeline:
ChannelPipeline pipeline = ..;
//创建一个FirstHandler 实例
FirstHandler firstHandler = new FirstHandler();
//将该实例作为handler1添加到ChannelPipeline
pipeline.addLast("handler1", firstHandler);
//将一个 SecondHandler的实例作为"handler2"添加到 ChannelPipeline的第一个槽中。
//这意味着它将被放置在已有的"handler1"之前
pipeline.addFirst("handler2", new SecondHandler());
//同理,添加到了最后一个槽中
pipeline.addLast("handler3", new ThirdHandler());
...
//通过名称移除"handler3"
pipeline.remove("handler3");
//通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
pipeline.remove(firstHandler);
//将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
pipeline.replace("handler2", "handler4", new ForthHandler());
ChannelHandler 的执行和阻塞
通常 ChannelPipeline 中的每一个 ChannelHandler 都是通过它的 EventLoop(I/O 线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 处理产生负面的影响。但有时可能需要与那些使用阻塞 API 的遗留代码进行交互。对于这种情况,ChannelPipeline 有一些接受一个 EventExecutorGroup 的 add()方法。如果一个事件被传递给一个自定义的 EventExecutorGroup,它将被包含在这个 EventExecutorGroup 中的某个 EventExecutor 所处理,从而被从该Channel 本身的 EventLoop 中移除。对于这种用例,Netty 提供了一个叫 DefaultEventExecutorGroup 的默认实现
ChannelPipeline 的用于访问 ChannelHandler 的操作:
名 称 | 描 述 |
---|---|
get | 通过类型或者名称返回 ChannelHandler |
context | 返回和 ChannelHandler 绑定的 ChannelHandlerContext |
names | 返回 ChannelPipeline 中所有 ChannelHandler 的名称 |
2.2 触发事件
首先是入站:
ChannelPipeline 用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件的入站操作:
名 称 | 描 述 |
---|---|
fireChannelRegistered | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext)方法 |
fireChannelUnregistered | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelUnregistered(ChannelHandlerContext)方法 |
fireChannelActive | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelActive(ChannelHandlerContext)方法 |
fireChannelInactive | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelInactive(ChannelHandlerContext)方法 |
fireExceptionCaught | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的exceptionCaught(ChannelHandlerContext, Throwable)方法 |
fireUserEventTriggered | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的userEventTriggered(ChannelHandlerContext, Object)方法 |
fireChannelRead | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelRead(ChannelHandlerContext, Object msg)方法 |
fireChannelReadComplete | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法 |
fireChannelWritabilityChanged | 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelWritabilityChanged(ChannelHandlerContext)方法 |
接着我们看出站:
在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。
ChannelPipeline 的出站操作:
名 称 | 描 述 |
---|---|
bind | 将 Channel 绑定到一个本地地址,这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 bind(ChannelHandlerContext, SocketAddress, ChannelPromise)方法 |
connect | 将 Channel 连接到一个远程地址,这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 connect(ChannelHandlerContext, SocketAddress, ChannelPromise)方法 |
disconnect | 将Channel 断开连接。这将调用ChannelPipeline 中的下一个ChannelOutboundHandler 的 disconnect(ChannelHandlerContext, Channel Promise)方法 |
close | 将 Channel 关闭。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 close(ChannelHandlerContext, ChannelPromise)方法 |
deregister | 将 Channel 从它先前所分配的 EventExecutor(即 EventLoop)中注销。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 deregister(ChannelHandlerContext, ChannelPromise)方法 |
flush | 冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个ChannelOutboundHandler 的 flush(ChannelHandlerContext)方法 |
write | 将消息写入 Channel。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler的write(ChannelHandlerContext, Object msg, ChannelPromise)方法。注意:这并不会将消息写入底层的 Socket,而只会将它放入队列中。要将它写入 Socket,需要调用 flush()或者 writeAndFlush()方法 |
writeAndFlush | 这是一个先调用 write()方法再接着调用 flush()方法的便利方法 |
read | 请求从 Channel 中读取更多的数据。这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 read(ChannelHandlerContext)方法 |
总结一下:
- ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler;
- ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改;
- ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。
三、ChannelHandlerContext 接口
ChannelHandlerContext介绍:
- ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联。
- 每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext。
- ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。
1、如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。
2、而调用位于 ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。
ChannelHandlerContext 的 API汇总:
名 称 | 描 述 |
---|---|
alloc | 返回和这个实例相关联的Channel 所配置的 ByteBufAllocator |
bind | 绑定到给定的 SocketAddress,并返回 ChannelFuture |
channel | 返回绑定到这个实例的 Channel |
close | 关闭 Channel,并返回 ChannelFuture |
connect | 连接给定的 SocketAddress,并返回 ChannelFuture |
deregister | 从之前分配的 EventExecutor 注销,并返回 ChannelFuture |
disconnect | 从远程节点断开,并返回 ChannelFuture |
executor | 返回调度事件的 EventExecutor |
fireChannelActive | 触发对下一个 ChannelInboundHandler 上的channelActive()方法(已连接)的调用 |
fireChannelInactive | 触发对下一个 ChannelInboundHandler 上的channelInactive()方法(已关闭)的调用 |
fireChannelRead | 触发对下一个 ChannelInboundHandler 上的channelRead()方法(已接收的消息)的调用 |
fireChannelReadComplete | 触发对下一个ChannelInboundHandler上的channelReadComplete()方法的调用 |
fireChannelRegistered | 触发对下一个 ChannelInboundHandler 上的fireChannelRegistered()方法的调用 |
fireChannelUnregistered | 触发对下一个 ChannelInboundHandler 上的fireChannelUnregistered()方法的调用 |
fireChannelWritabilityChanged | 触发对下一个 ChannelInboundHandler 上的fireChannelWritabilityChanged()方法的调用 |
fireExceptionCaught | 触发对下一个 ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用 |
fireUserEventTriggered | 触发对下一个 ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用 |
handler | 返回绑定到这个实例的 ChannelHandlerisRemoved 如果所关联的 ChannelHandler 已经被从 ChannelPipeline中移除则返回 true |
name | 返回这个实例的唯一名称 |
pipeline | 返回这个实例所关联的 ChannelPipeline |
read | 将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被读取完成后通知ChannelInboundHandler 的 channelReadComplete(ChannelHandlerContext)方法 |
write | 通过这个实例写入消息并经过 ChannelPipeline |
writeAndFlush | 通过这个实例写入并冲刷消息并经过 ChannelPipeline |
当使用 ChannelHandlerContext 的 API 的时候,请牢记以下两点:
- ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
- 相对于其他类的同名方法,ChannelHandler Context的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。
3.1 使用 ChannelHandlerContext
ChannelHandlerContext、Channel 和 ChannelPipeline的关系如图:
从上面我们可以知道ChannelPipeline 包含着ChannelHandlerContext、Channel。ChannelHandler之间是用ChannelHandlerContext连接。
那么从 ChannelHandlerContext 如何访问 Channel?
看下面代码:
//先获取获取到与 ChannelHandlerContext相关联的 Channel 的引用
ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel();
//通过 Channel 写入"Netty in Action" 到缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));
那么从 ChannelHandlerContext 如何访问 ChannelPipeline?
看下面代码:
//先获取获取到与 ChannelHandlerContext相关联的 ChannelPipeline 的引用
ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline();
//通过 ChannelPipeline 写入"Netty in Action" 到缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));
可以看到上面两段代码的事件流是一样的,需要注意的是虽然被调用的 Channel 或 ChannelPipeline 上的 write()方法一直传播事件通过整个 ChannelPipeline,但是在 ChannelHandler 的级别上,事件从一个ChannelHandler到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的。
通过 Channel 或者 ChannelPipeline 进行的事件传播示例:
Channel 或者 ChannelPipeline事件的传播都是通过ChannelHandlerContext来进行,要想调用从某个特定的 ChannelHandler 开始的处理过程,必须获取到在(ChannelPipeline)该 ChannelHandler 之前的 ChannelHandler 所关联的 ChannelHandlerContext。这个 ChannelHandlerContext 将调用和它所关联的 ChannelHandler 之后的ChannelHandler。
例如我们调用 ChannelHandlerContext 的 write()方法。
代码很简单:
//获取ChannelHandlerContext
ChannelHandlerContext ctx = ..;
//write()方法将把缓冲区数据发送到下一个 ChannelHandler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
那么它内部是咋操作的呢?
如图消息将从下一个 ChannelHandler 开始流经 ChannelPipeline,绕过了所有前面的 ChannelHandler。
3.2 ChannelHandler 和 ChannelHandlerContext 的高级用法
前面我们说了基本的通过调用 ChannelHandlerContext上的pipeline()方法来获得被封闭ChannelPipeline 的引用,在运行时得以操作ChannelPipeline 的ChannelHandler。
除了这些我们还可以通过将 ChannelHandler 添加到 ChannelPipeline 中来实现动态的协议切换或者将引用缓存到ChannelHandlerContext以供稍后使用。
1、将引用缓存到 ChannelHandlerContext
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
//存储到 ChannelHandlerContext的引用以供稍后使用
this.ctx = ctx;
}
public void send(String msg) {
//使用之前存储的到 ChannelHandlerContext的引用来发送消息
ctx.writeAndFlush(msg);
}
}
因为一个 ChannelHandler 可以从属于多个 ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext 实例。
在多个 ChannelPipeline 中共享同一个 ChannelHandler,对应的 ChannelHandler 必须要使用@Sharable 注解标注;否则,试图将它添加到多个 ChannelPipeline 时将会触发异常(多个Channel使用则必须保证线程安全)。
看个可共享ChannelHandler的例子:
@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Channel read message: " + msg);
//记录方法,发送给下一个ChannelHandler的channelRead()
ctx.fireChannelRead(msg);
}
}
==注意:拥有状态时@Sharable注解将不能保证线程安全!只应该在确定了你的 ChannelHandler 是线程安全的时才使用@Sharable 注解。==
像下面就是一个错误的示例:
@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//这里并不能保证状态的写入是安全的
count++;
System.out.println("channelRead(...) called the "+ count + " time");
ctx.fireChannelRead(msg);
}
}
我们共享ChannelHandler的一个常见的原因是用于收集跨越多个 Channel 的统计信息。
四、异常处理
4.1 处理入站异常
如果在处理入站事件的过程中有异常被抛出,那么它将从它在 ChannelInboundHandler里被触发的那一点开始流经 ChannelPipeline。要想处理这种类型的入站异常,你需要在你的 ChannelInboundHandler 实现中重写下面的方法。
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
一个简单的处理例子:
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻辑的 ChannelInboundHandler 通常位于 ChannelPipeline 的最后。这确保了所有的入站异常都总是会被处理,无论它们可能会发生在 ChannelPipeline 中的什么位置。
总结一下:
- ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline 中的下一个 ChannelHandler;
- 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;
- 要想定义自定义的处理逻辑,你需要重写 exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去
4.2 处理出站异常
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。
- 每个出站操作都将返回一个 ChannelFuture。注册到 ChannelFuture 的 ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。
- 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise的实例。
作为 ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);
添加 ChannelFutureListener 只需要调用 ChannelFuture 实例上的 addListener(ChannelFutureListener)方法,并且有两种不同的方式可以做到这一点:
1、调用出站操作(如 write()方法)所返回的 ChannelFuture 上的 addListener()方法。
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
})
2、将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOutboundHandler 的方法的ChannelPromise。
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}
ChannelPromise 的可写方法
通过调用 ChannelPromise 上的 setSuccess()和 setFailure()方法,可以使一个操作的状态在ChannelHandler 的方法返回给其调用者时便即刻被感知到
如果你的 ChannelOutboundHandler 本身抛出了异常会发生什么呢?
这种情况下Netty 本身会通知任何已经注册到对应 ChannelPromise 的监听器
PS:在调用出站操作时添加 ChannelFutureListener 更合适。