Netty原理篇-ChannelPipeline、ChannelHandler

简介: ChannelPipeline的职责与工作原理。 ChannelHandler的作用。 常用ChannelHandler的源码解读。


本文为《Netty权威指南》的读书笔记,读书过程中也伴随着一些源码阅读和其他文档阅读,所以内容和《Netty权威指南》会略有不同,请知晓。

Netty Api地址:http://netty.io/5.0/api/

 

一 内容概述

ChannelPipeline的职责与工作原理

ChannelHandler的作用。

常用ChannelHandler的源码解读。

二 设计理念

Netty将Channel的数据管道抽象为ChannelPipeline,消息在ChannelPipeline中流动和传递。ChannelPipeline持有IO事件拦截器ChannelHandler的链表,由ChannelHandler对IO事件进行拦截和处理,可以方便地通过新增、删除ChannelHandler来实现不同业务逻辑的定制,不需要对已有ChannelHandler进行修改,能够实现对修改封闭和对扩展支持。

三 ChannelPipeline

1. 事件处理

底层的SocketChannel#read方法读取ByteBuf,触发ChannelRead事件,由IO线程NioEventLoop调用ChannelPipeline#fireChannelRead方法,将消息(ByteBuf)传输到ChannelPipeline。接着消息依次被ChannelHandler1,ChannelHandler2, ……, ChannelHandlerN拦截处理。这个过程中任何ChannelHandler都可以中断当前的流程,结束消息的传递。

调用write方法发送消息时,消息从ChannelHandlerN依次传递到ChannelHandler1,最终被添加到消息发送缓冲区中等待刷新和发送。此过程中可以终端消息的传输,例如编码失败时,就需要终端流程,构造异常的Future返回。

 

2. Netty的事件类型

Netty的事件分为inbound事件和outbound事件。

1) inbound事件

inbound事件通常由IO线程触发,例如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等。

触发inbound事件的方法如下,代码见ChannelHandlerContext中

fireChannelRegistered:Channel注册事件。

fireChannelActive:TCP链路建立成功,Channel激活事件。

fireChannelInactive:TCP链路关闭,链路不可用通知事件。

fireExceptionCaught;异常通知事件

fireUserEventTriggered:用户自定义事件

fireChannelRead:读事件

fireChannelReadComplete:读操作完成事件

fireChannelWritabilityChanged:Channel的可写状态改变事件。

 

2) outbound事件

outbound事件通常是用户主动发起的网络IO操作,IO线程触发,例如用户发起TCP连接操作、绑定操作,消息发送等操作。

触发outbound事件的方法如下,代码见ChannelHandlerContext中

bind:绑定本地地址事件

connect:连接服务端事件

disconnect:断开连接事件

close:关闭当前Channel事件

read:读事件

write:写事件

flush:刷新事件

writeAndFlush:写和刷新事件

 

 

3. ChannelPipeline主要特性

ChannelPipeline是线程安全的。注意ChannelHandler不是线程安全的。

ChannelPipeline支持运行态动态添加、删除ChannelHandler。

 

ChannelPipeline对ChannelHandler的管理主要体现在对ChannelHandler的增删改查上。新增ChannelHandler时会进行同名检查,如果名称相同则抛出IllegalArgumentException异常。示例代码如下:

pipeline.addLast(“timeClientHandler”,new TimeClientHandler());

 

 

 

四 ChannelHandler

大多数时候ChannelHandler会选择性的拦截和处理某个或某几个事件,其他的事件会忽略,由下一个ChannelHandler进行拦截和处理。这会导致一个问题:ChannelHandler是接口,用户必须事件所有的接口,导致代码冗余。为此Netty提供了ChannelHandlerAdapter类,用户继承此类,仅需要重写自己关心的事件即可。

1.ByteToMessageDecoder

将ByteBuf解码城业务的POJO对象。

注意:ByteToMessageDecoder未解决TCP粘包、组包等场景,所以使用时需要处理半包问题。

 

1) channelRead

首先判断msg是否为ByteBuf类型,如果是则解码;否则放弃解码。

如果cumulation为空,说明没有缓存半包信息需要处理,直接将需要解码的ByteBuf赋值给cumulation;如果cumulation非空,说明有尚未处理的半包信息没有完成解码,需要将msg复制到cumulation中,然后解码。

示例代码如下:


public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }
 
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

 

2) callDecode

业务解码器需要遵循Netty的契约,才能使解码器正常工作:如果业务解码器认为当期的字节缓冲区无法完成业务层的解码,需要将readIndex复位,告诉Netty解码器条件不满足应该退出解码,继续读取数据报再次尝试解码。

此方法的主要逻辑是:

decode需要子类根据业务实现,解码后如果out的长度没变化,说明此次解码什么也没做,应该继续读取数据包,再次尝试解码。

如果解码出内容来,但是可读字节无变化(即oldInputLength == in.readableBytes())是非法的。

如果是isSingleDecode,那么解码完以后,退出循环。

代码如下:


protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                decode(ctx, in, out);
 
   // Check if this handler was removed before continuing the loop.
   // If it was removed, it is not safe to continue to operate on the buffer.
   // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }
 
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
 
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }
 
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }
 

 

2. MessageToMessageDecoder

MessageToMessageDecoder是Netty的二次解码器,负责讲一个对象解码为其他对象。

示例如下:

public class StringToIntegerDecoder extends MessageToMessageDecoder<String> {
 
    @Override
    public void decode(ChannelHandlerContext ctx, String message, List<Object> out) throws Exception {
        //将Message解码为Message的长度
        out.add(message.length());
    }
}
 


3. LengthFieldBasedFrameDecoder

如果消息是通过长度进行区分的,那么LengthFieldBasedFrameDecoder可以自动处理粘包、半包问题。

1) 区分整包消息

通常有以下四种方法,区分一个整包消息。

Ø  固定长度,例如每100个字节代表一个整包消息,不足的前面补零,解码器在处理这类消息时,每次读到指定长度的自己后进行解码。

Ø  通过回车换行符区分消息,例如FTP协议。多用于文本协议。

Ø  通过分隔符区分整包消息

Ø  通过指定长度来表示整包消息。

 

2)   原理

使用LengthFieldBasedFrameDecoder解码是,一般我们只需要关注一下四个字段:

lengthFieldOffset:长度字段的偏移量

lengthFieldLength:长度字段的长度

lengthAdjustment:长度调整值

initialBytesToStrip:解码时跳过的字节长度

3) 解码规则
a) 长度字段+消息体,全部解码
 lengthFieldOffset   = 0 长度字段offset=0
 lengthFieldLength   = 2 长度字段的长度为2
 lengthAdjustment    = 0 消息长度不调整
 initialBytesToStrip = 0 不跳过任何内容
 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 +--------+----------------+      +--------+----------------+
 | Length | Actual Content |----->| Length | Actual Content |
 | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
 +--------+----------------+      +--------+----------------+

 

b) 长度字段+消息体,解码消息体
 lengthFieldOffset   = 0长度字段offset=0
 lengthFieldLength   = 2长度字段的长度为2
 lengthAdjustment    = 0消息长度不调整
 initialBytesToStrip = 2 跳过2字节,所以解析出来的是消息体
 BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 +--------+----------------+      +----------------+
 | Length | Actual Content |----->| Actual Content |
 | 0x000C | "HELLO, WORLD" |       | "HELLO, WORLD" |
 +--------+----------------+      +----------------+

 

c) 长度字段+消息体,长度大于消息体长度,全部解码
 lengthFieldOffset   =  0长度字段offset=0
 lengthFieldLength   =  2 长度字段的长度为2
 lengthAdjustment    = -2 长度调整-2,长度字段的值是14,而消息体长度为12,所以解码时需要-2
 initialBytesToStrip =  0
 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 +--------+----------------+      +--------+----------------+
 | Length | Actual Content |----->| Length | Actual Content |
 | 0x000E | "HELLO, WORLD" |       | 0x000E | "HELLO, WORLD" |
 +--------+----------------+      +--------+----------------+

 

d) header+长度字段+消息体,全部解码
 lengthFieldOffset   = 2长度字段offset=2
 lengthFieldLength   = 3 长度字段的长度为3
 lengthAdjustment    = 0消息长度不调整
 initialBytesToStrip = 0不跳过任何内容
 BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 +----------+----------+----------------+      +----------+----------+----------------+
 | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
 |  0xCAFE  | 0x00000C | "HELLO, WORLD" |       |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
 +----------+----------+----------------+      +----------+----------+----------------+

 

e) header+长度字段+消息体,全部解码
 lengthFieldOffset  = 0长度字段offset=0
 lengthFieldLength  = 3长度字段的长度为3
 lengthAdjustment   = 2长度调整2,因为lengthHeader 12字节
 initialBytesToStrip = 0不跳过任何内容
 BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 +----------+----------+----------------+      +----------+----------+----------------+
 |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
 | 0x00000C |  0xCAFE  | "HELLO, WORLD" |       | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
 +----------+----------+----------------+      +----------+----------+----------------+

 

f) header+长度字段+Header2+消息体,解码Header2+消息体
 lengthFieldOffset   = 1长度字段offset=0
 lengthFieldLength   = 2长度字段的长度为2
 lengthAdjustment    = 1长度调整1,因为lengthHDR21字节
 initialBytesToStrip = 3跳过3字节(HDR1+ Length),解码HDR2+消息体
 BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 +------+--------+------+----------------+      +------+----------------+
 | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 +------+--------+------+----------------+      +------+----------------+

  

g) header+长度字段+Header2+消息体,解码Header2+消息体
 lengthFieldOffset   =  1长度字段offset=1
 lengthFieldLength   =  2长度字段的长度为2
 lengthAdjustment    = -3长度调整-3,因为消息体长度12HDR2长度1Length值是16,所以调整长度是12+1-16=-3
 initialBytesToStrip =  3
 BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 +------+--------+------+----------------+      +------+----------------+
 | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 +------+--------+------+----------------+      +------+----------------+

 

 

 

4) decode

通过discardingTooLongFrame判断是否应该丢弃当前刻度的字节缓冲区。

丢弃的字节长度不能大于当前缓冲区可读的字节数,计算丢弃字节后,通过in.skipBytes跳过这些字节。跳过这些字节以后需要重置discardingTooLongFrame。

如果当前可读字节数小于lengthFieldEndOffset,说明当前缓冲区的数据报不够,需要继续读取数据报,所以放弃decode。

计算出实际字段的长度,并通过索引值读取报文数据。具体规则参考“解码规则”部分。

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            long bytesToDiscard = this.bytesToDiscard;
            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
            in.skipBytes(localBytesToDiscard);
            bytesToDiscard -= localBytesToDiscard;
            this.bytesToDiscard = bytesToDiscard;
 
            failIfNecessary(false);
        }
 
        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }
 
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
 
        if (frameLength < 0) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "negative pre-adjustment length field: " + frameLength);
        }
 
        frameLength += lengthAdjustment + lengthFieldEndOffset;
 
        if (frameLength < lengthFieldEndOffset) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than lengthFieldEndOffset: " + lengthFieldEndOffset);
        }
 
        if (frameLength > maxFrameLength) {
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;
 
            if (discard < 0) {
                // buffer contains more bytes then the frameLength so we can discard all now
                in.skipBytes((int) frameLength);
            } else {
                // Enter the discard mode and discard everything received so far.
                discardingTooLongFrame = true;
                bytesToDiscard = discard;
                in.skipBytes(in.readableBytes());
            }
            failIfNecessary(true);
            return null;
        }
 
        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }
 
        if (initialBytesToStrip > frameLengthInt) {
            in.skipBytes(frameLengthInt);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than initialBytesToStrip: " + initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);
 
        // extract frame
        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

 

4. MessageToByteEncoder

将POJO转换成ByteBuf。和ByteToMessageDecoder对应。

 

1) Write

首先当前编码器判断是否支持需要发送的消息,不支持则透传,支持则判断缓冲区的类型类型。直接内存分配ioBuffer(堆外内存),堆内存分配heapBuffer。

然后进行编码。

通过ReferenceCountUtil#release释放编码对象。这里是保证以后对象可以及时回收。

代码如下:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                if (preferDirect) {
                    buf = ctx.alloc().ioBuffer();
                } else {
                    buf = ctx.alloc().heapBuffer();
                }
                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
 
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }

5. MessageToMessageEncoder

二次解码器,和MessageToMessageDecoder对应

 

6. LengthFieldPrepender

通过LengthFieldPrepender可以将待发送消息的长度写入到ByteBuf的前两个字节,编码后的消息组成为:长度字段+消息。

  

 

1) 编码规则

LengthFieldPrepender提供了四个构造方法,主要分为以下两种。

a) 长度+消息

构造函数如下:

    public LengthFieldPrepender(int lengthFieldLength) {
        this(lengthFieldLength, false);
    }

使用new LengthFieldPrepender(2)编码
BEFORE ENCODE (12 bytes)   AFTER ENCODE (length=12)
 +----------------+         +--------+----------------+
 | "HELLO, WORLD" |-----> + 0x000C | "HELLO, WORLD" |
 +----------------+         +--------+----------------+

 

b) 长度(包括调整值)+消息

    public LengthFieldPrepender(int lengthFieldLength, int lengthAdjustment) {
        this(lengthFieldLength, lengthAdjustment, false);
    }

使用new LengthFieldPrepender(2,2)编码
  BEFORE ENCODE (12 bytes)   AFTER ENCODE (length=12+2)
 +----------------+         +--------+----------------+
 | "HELLO, WORLD" |-----> + 0x000E | "HELLO, WORLD" |
 +----------------+         +--------+----------------+
 

 

 

相关文章
|
6月前
|
网络协议
【Netty 网络通信】Socket 通信原理
【1月更文挑战第9天】【Netty 网络通信】Socket 通信原理
|
6月前
|
Java Unix Linux
【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
当涉及到网络通信和高性能的Java应用程序时,Netty是一个强大的框架。它提供了许多功能和组件,其中之一是JNI传输。JNI传输是Netty的一个特性,它为特定平台提供了高效的网络传输。 在本文中,我们将深入探讨Netty提供的特定平台的JNI传输功能,分析其优势和适用场景。我们将介绍每个特定平台的JNI传输,并讨论其性能、可靠性和可扩展性。通过了解这些特定平台的JNI传输,您将能够更好地选择和配置适合您应用程序需求的网络传输方式,以实现最佳的性能和可靠性。
141 7
【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
|
3月前
|
Java 调度
Netty运行原理问题之ChannelHandler在Netty中扮演什么角色
Netty运行原理问题之ChannelHandler在Netty中扮演什么角色
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
3月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
3月前
|
调度
Netty运行原理问题之事件调度工作的问题如何解决
Netty运行原理问题之事件调度工作的问题如何解决
|
3月前
|
开发者
Netty运行原理问题之Netty高性能实现的问题如何解决
Netty运行原理问题之Netty高性能实现的问题如何解决
|
3月前
|
API 开发者
Netty运行原理问题之Netty实现低开发门槛的问题如何解决
Netty运行原理问题之Netty实现低开发门槛的问题如何解决
|
6月前
|
编解码 开发者
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
142 0
|
6月前
|
前端开发 UED
Netty Review - Netty自动重连机制揭秘:原理与最佳实践
Netty Review - Netty自动重连机制揭秘:原理与最佳实践
211 0