本文为《Netty权威指南》的读书笔记,读书过程中也伴随着一些源码阅读和其他文档阅读,所以内容和《Netty权威指南》会略有不同,请知晓。
Netty Api地址:http://netty.io/5.0/api/
一 内容概述
ChannelPipeline的职责与工作原理。
ChannelHandler的作用。
常用ChannelHandler的源码解读。
二 设计理念
Netty将Channel的数据管道抽象为ChannelPipeline,消息在ChannelPipeline中流动和传递。ChannelPipeline持有IO事件拦截器ChannelHandler的链表,由ChannelHandler对IO事件进行拦截和处理,可以方便地通过新增、删除ChannelHandler来实现不同业务逻辑的定制,不需要对已有ChannelHandler进行修改,能够实现对修改封闭和对扩展支持。
三 ChannelPipeline
1. 事件处理
底层的SocketChannel#read方法读取ByteBuf,触发ChannelRead事件,由IO线程NioEventLoop调用ChannelPipeline#fireChannelRead方法,将消息(ByteBuf)传输到ChannelPipeline。接着消息依次被ChannelHandler1,ChannelHandler2, ……, ChannelHandlerN拦截处理。这个过程中任何ChannelHandler都可以中断当前的流程,结束消息的传递。
调用write方法发送消息时,消息从ChannelHandlerN依次传递到ChannelHandler1,最终被添加到消息发送缓冲区中等待刷新和发送。此过程中可以终端消息的传输,例如编码失败时,就需要终端流程,构造异常的Future返回。
2. Netty的事件类型
Netty的事件分为inbound事件和outbound事件。
1) inbound事件
inbound事件通常由IO线程触发,例如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等。
触发inbound事件的方法如下,代码见ChannelHandlerContext中
fireChannelRegistered:Channel注册事件。
fireChannelActive:TCP链路建立成功,Channel激活事件。
fireChannelInactive:TCP链路关闭,链路不可用通知事件。
fireExceptionCaught;异常通知事件
fireUserEventTriggered:用户自定义事件
fireChannelRead:读事件
fireChannelReadComplete:读操作完成事件
fireChannelWritabilityChanged:Channel的可写状态改变事件。
2) outbound事件
outbound事件通常是用户主动发起的网络IO操作,IO线程触发,例如用户发起TCP连接操作、绑定操作,消息发送等操作。
触发outbound事件的方法如下,代码见ChannelHandlerContext中
bind:绑定本地地址事件
connect:连接服务端事件
disconnect:断开连接事件
close:关闭当前Channel事件
read:读事件
write:写事件
flush:刷新事件
writeAndFlush:写和刷新事件
3. ChannelPipeline主要特性
ChannelPipeline是线程安全的。注意ChannelHandler不是线程安全的。
ChannelPipeline支持运行态动态添加、删除ChannelHandler。
ChannelPipeline对ChannelHandler的管理主要体现在对ChannelHandler的增删改查上。新增ChannelHandler时会进行同名检查,如果名称相同则抛出IllegalArgumentException异常。示例代码如下:
pipeline.addLast(“timeClientHandler”,new TimeClientHandler());
四 ChannelHandler
大多数时候ChannelHandler会选择性的拦截和处理某个或某几个事件,其他的事件会忽略,由下一个ChannelHandler进行拦截和处理。这会导致一个问题:ChannelHandler是接口,用户必须事件所有的接口,导致代码冗余。为此Netty提供了ChannelHandlerAdapter类,用户继承此类,仅需要重写自己关心的事件即可。
1.ByteToMessageDecoder
将ByteBuf解码城业务的POJO对象。
注意:ByteToMessageDecoder未解决TCP粘包、组包等场景,所以使用时需要处理半包问题。
1) channelRead
首先判断msg是否为ByteBuf类型,如果是则解码;否则放弃解码。
如果cumulation为空,说明没有缓存半包信息需要处理,直接将需要解码的ByteBuf赋值给cumulation;如果cumulation非空,说明有尚未处理的半包信息没有完成解码,需要将msg复制到cumulation中,然后解码。
示例代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
2) callDecode
业务解码器需要遵循Netty的契约,才能使解码器正常工作:如果业务解码器认为当期的字节缓冲区无法完成业务层的解码,需要将readIndex复位,告诉Netty解码器条件不满足应该退出解码,继续读取数据报再次尝试解码。
此方法的主要逻辑是:
decode需要子类根据业务实现,解码后如果out的长度没变化,说明此次解码什么也没做,应该继续读取数据包,再次尝试解码。
如果解码出内容来,但是可读字节无变化(即oldInputLength == in.readableBytes())是非法的。
如果是isSingleDecode,那么解码完以后,退出循环。
代码如下:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
2. MessageToMessageDecoder
MessageToMessageDecoder是Netty的二次解码器,负责讲一个对象解码为其他对象。
示例如下:
public class StringToIntegerDecoder extends MessageToMessageDecoder<String> {
@Override
public void decode(ChannelHandlerContext ctx, String message, List<Object> out) throws Exception {
//将Message解码为Message的长度
out.add(message.length());
}
}
3. LengthFieldBasedFrameDecoder
如果消息是通过长度进行区分的,那么LengthFieldBasedFrameDecoder可以自动处理粘包、半包问题。
1) 区分整包消息
通常有以下四种方法,区分一个整包消息。
ÃÂ 固定长度,例如每100个字节代表一个整包消息,不足的前面补零,解码器在处理这类消息时,每次读到指定长度的自己后进行解码。
ÃÂ 通过回车换行符区分消息,例如FTP协议。多用于文本协议。
ÃÂ 通过分隔符区分整包消息
ÃÂ 通过指定长度来表示整包消息。
2) 原理
使用LengthFieldBasedFrameDecoder解码是,一般我们只需要关注一下四个字段:
lengthFieldOffset:长度字段的偏移量
lengthFieldLength:长度字段的长度
lengthAdjustment:长度调整值
initialBytesToStrip:解码时跳过的字节长度
3) 解码规则
a) 长度字段+消息体,全部解码
lengthFieldOffset = 0 长度字段offset=0
lengthFieldLength = 2 长度字段的长度为2
lengthAdjustment = 0 消息长度不调整
initialBytesToStrip = 0 不跳过任何内容
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
b) 长度字段+消息体,解码消息体
lengthFieldOffset = 0长度字段offset=0
lengthFieldLength = 2长度字段的长度为2
lengthAdjustment = 0消息长度不调整
initialBytesToStrip = 2 跳过2字节,所以解析出来的是消息体
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
c) 长度字段+消息体,长度大于消息体长度,全部解码
lengthFieldOffset = 0长度字段offset=0
lengthFieldLength = 2 长度字段的长度为2
lengthAdjustment = -2 长度调整-2,长度字段的值是14,而消息体长度为12,所以解码时需要-2
initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
d) header+长度字段+消息体,全部解码
lengthFieldOffset = 2长度字段offset=2
lengthFieldLength = 3 长度字段的长度为3
lengthAdjustment = 0消息长度不调整
initialBytesToStrip = 0不跳过任何内容
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
e) header+长度字段+消息体,全部解码
lengthFieldOffset = 0长度字段offset=0
lengthFieldLength = 3长度字段的长度为3
lengthAdjustment = 2长度调整2,因为length后Header 1占2字节
initialBytesToStrip = 0不跳过任何内容
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
f) header+长度字段+Header2+消息体,解码Header2+消息体
lengthFieldOffset = 1长度字段offset=0
lengthFieldLength = 2长度字段的长度为2
lengthAdjustment = 1长度调整1,因为length后HDR2占1字节
initialBytesToStrip = 3跳过3字节(HDR1+ Length),解码HDR2+消息体
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
g) header+长度字段+Header2+消息体,解码Header2+消息体
lengthFieldOffset = 1长度字段offset=1
lengthFieldLength = 2长度字段的长度为2
lengthAdjustment = -3长度调整-3,因为消息体长度12,HDR2长度1,Length值是16,所以调整长度是12+1-16=-3
initialBytesToStrip = 3
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
4) decode
通过discardingTooLongFrame判断是否应该丢弃当前刻度的字节缓冲区。
丢弃的字节长度不能大于当前缓冲区可读的字节数,计算丢弃字节后,通过in.skipBytes跳过这些字节。跳过这些字节以后需要重置discardingTooLongFrame。
如果当前可读字节数小于lengthFieldEndOffset,说明当前缓冲区的数据报不够,需要继续读取数据报,所以放弃decode。
计算出实际字段的长度,并通过索引值读取报文数据。具体规则参考“解码规则”部分。
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
4. MessageToByteEncoder
将POJO转换成ByteBuf。和ByteToMessageDecoder对应。
1) Write
首先当前编码器判断是否支持需要发送的消息,不支持则透传,支持则判断缓冲区的类型类型。直接内存分配ioBuffer(堆外内存),堆内存分配heapBuffer。
然后进行编码。
通过ReferenceCountUtil#release释放编码对象。这里是保证以后对象可以及时回收。
代码如下:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
if (preferDirect) {
buf = ctx.alloc().ioBuffer();
} else {
buf = ctx.alloc().heapBuffer();
}
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
5. MessageToMessageEncoder
二次解码器,和MessageToMessageDecoder对应
6. LengthFieldPrepender
通过LengthFieldPrepender可以将待发送消息的长度写入到ByteBuf的前两个字节,编码后的消息组成为:长度字段+消息。
1) 编码规则
LengthFieldPrepender提供了四个构造方法,主要分为以下两种。
a) 长度+消息
构造函数如下:
public LengthFieldPrepender(int lengthFieldLength) {
this(lengthFieldLength, false);
}
使用new LengthFieldPrepender(2)编码
BEFORE ENCODE (12 bytes) AFTER ENCODE (length=12)
+----------------+ +--------+----------------+
| "HELLO, WORLD" |-----> + 0x000C | "HELLO, WORLD" |
+----------------+ +--------+----------------+
b) 长度(包括调整值)+消息
public LengthFieldPrepender(int lengthFieldLength, int lengthAdjustment) {
this(lengthFieldLength, lengthAdjustment, false);
}
使用new LengthFieldPrepender(2,2)编码
BEFORE ENCODE (12 bytes) AFTER ENCODE (length=12+2)
+----------------+ +--------+----------------+
| "HELLO, WORLD" |-----> + 0x000E | "HELLO, WORLD" |
+----------------+ +--------+----------------+