Netty基础入门和基本使用-1+https://developer.aliyun.com/article/1391142?spm=a2c6h.13148508.setting.22.671d4f0ezNOeJb
五、Handler业务处理器
在Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。整个的IO处理操作包括:从通道读取数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端。前后两个环节,从通道读取数据包和由通道发送到对端由Netty的底层完成,不需要用户程序负责。
用户程序主要在Handler业务处理器中,主要负责数据包解码、业务处理、目标数据编码、把数据包写入到通道中。前面两个环节属于入站处理器的工作,后面两个环节属于出站处理器的工作。
1、ChannelInboundHandler通道入站处理器
当数据或信息入站到Netty通道时,Netty将触发入站处理器ChannelInboundHandler所对应的入站API,进行入站处理操作。ChannelInboundHandler的主要操作如下:
- channelRegistered:当通道注册完成后,Netty会调用fireChannelRegistered触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法会被调用到
- channelAlive:当通道激活完成后,Netty会调用fireChannelActive触发通道激活事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelActive方法会被调用到
- channelRead:当通道缓冲区可读,Netty会调用fireChannelRead触发通道可读事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelRead方法会被调用到
- channelReadComplete:当通道缓冲区读完,Netty会调用fireChannelReadComplete触发通道读完事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelReadComplete方法会被调用到
- channelInactive:当连接被断开或者不可用,Netty会调用fireChannelInactive触发通道读完事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelInactive方法会被调用到
- exceptionCaught:当通道处理过程发生异常时,Netty会调用fireExceptionCaught。通道会启动异常捕获的流水线处理,在通道注册过的处理器Handler的channelInactive方法会被调用到。这个方法是在通道处理器中的ChannelHandler定义的方法,入站处理器,出站处理接口都继承到了该方法
上述为ChannelInboundHandler的部分重要方法。在Netty中它的默认实现为ChannelInboundHandlerAdapter,在实际开发中只需要继承这个类的默认实现,重写自己需要的方法即可。
2、ChannelOutboundHandler通道出站处理器
当业务处理完成后,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层通道的操作。包括建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler定义了大部分的出站操作,具体如下:
- bind(监听地址(IP+端口)绑定):完成底层Java IO通道的IP地址绑定
- connect(连接服务端):完成底层Java IO通道的服务器端连接操作
- write(写数据到底层):完成Netty通道向底层Java IO通道的数据写入操作,此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作
- flush:腾空缓冲区中的数据,把数据写到对端
- read(从底层读数据):完成Netty通道从Java IO通道的数据读取
- disConnect(断开服务器连接):断开底层Java IO通道的服务器端连接
- close:关闭底层的通道
上述为ChannelOutboundHandler的部分重要方法,在Netty中它的默认实现为ChannelOutboundHandlerAdapter,在实际开发中只需要继承这个类的默认实现,重写自己需要的方法即可。
3、ChannelInitializer通道初始化处理器
一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作发生在通道开始工作之前。那么如何向流水线中装配业务处理器呢?这就得借助通道的初始化类ChannelInitializer。
initChannel方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员实现。在父通道调用initChannel方法时,会将新接收的通道作为参数,传递给initChannel方法。initChannel方法内部大致的业务代码是拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。
六、Pipeline流水线
Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式来设计的,内部是一个双向链表结构,能够支持动态地添加和删除Handler业务处理器。
1、Pipeline处理流程
public class InPipeline { static class SimpleInHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("HandlerA"); super.channelRead(ctx, msg); } } static class SimpleInHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("HandlerB"); super.channelRead(ctx, msg); } } static class SimpleInHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("HandlerC"); super.channelRead(ctx, msg); } } public static void main(String[] args) { ChannelInitializer<EmbeddedChannel> initializer = new ChannelInitializer<EmbeddedChannel>() { @Override protected void initChannel(EmbeddedChannel ch) throws Exception { ch.pipeline().addLast(new SimpleInHandlerA()) .addLast(new SimpleInHandlerB()) .addLast(new SimpleInHandlerC()); } }; EmbeddedChannel channel = new EmbeddedChannel(initializer); ByteBuf buf = Unpooled.buffer(); buf.writeInt(1); channel.writeInbound(buf); } }
在channelRead()方法中调用了父类的channelRead方法,父类的channelRead方法会自动调用下一个inBoundHandler的channelRead方法,并且会把当前inBoundHandler入站处理器中处理完毕的对象传递到下一个inBoundHandler入站处理器。
在入站/出站的过程中,如果由于业务条件不满足,需要阶段流水线的处理,不让流水线进入下一站。以channelRead为例,我们只需要删除掉子类的super.channelRead()方法,不在子类中调用父类的channelRead入站方法,即可实现截断。此外入站处理传入下一站还有一种方法是调用Context上下文的ctx.fireChannelRead()方法,所以想要截断还不能调用该方法。
入站通道处理器的调用顺序会按照加入流水线的顺序调用,而对于出站通道处理器而言,先加入的处理器会在最后被调用。
对于出站处理流程而言,只要开始执行,就不能被截断。强行阶段的话Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。
2、ChannelHandlerContext
不管我们定义的是哪种类型的Handler业务处理器,最终它们都是以双向链表的方式保存在流水线中。这里流水线的节点类型,并不是前面的Handler业务处理器基类,而是一个新的Netty类型:ChannelHandlerContext,它代表了ChannelHandler通道处理器和ChannelPipeline通道流水线之间的关联。
ChannelHandlerContext中包含了很多方法,主要分为两类:第一类是获取上下文关联的Netty组件实例,如所关联的通道、所关联的流水线、上下文内部Handler业务处理器实例,第二类是入站和出站方法。
在Channel、ChannelPipeline、ChannelHandlerContext三个类中,会有同样的入栈和出站处理方法。如果通过Channel或ChannelPipeline的实例来调用这些方法,他们就会在整条流水线中传播。然而如果是通过ChannelHandlerContext通道处理器上下文来进行调用,就只会从当前节点开始执行Handler处理器,并传播到同类型的处理器的下一站。
Channel、Handler、ChannelHandlerContext三者的关系为:Channel通道拥有一条ChannelPipeline通道流水线,每一个流水线节点为一个ChannelHandlerContext通道处理器上下文对象,每一个上下文中报过了一个ChannelHandler通道处理器。在ChannelHandler通道处理器的入站和出站处理方法中,Netty都会传递一个Context上下文实例作为实际参数。通过Context实例的实惨,在业务处理中,可以获取ChannelPipeline通道流水线的实例或者Channel通道的实例。
3、Handler业务处理器的热拔插
在程序执行过程中,可以动态进行业务处理器的热拔插:动态地增加、删除流水线上的业务处理器。主要的热拔插方法声明在ChannelPipeline接口中,如下:
- ChannelPipeline addFirst(String name, ChannelHandler handler):在头部增加一个业务处理器,名字由name指定
- ChannelPipeline addLast(String name, ChannelHandler handler):在头部增加一个业务处理器,名字由name指定
- ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler):在baseName处理器前面增加一个业务处理器,名字由name指定
- ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler):在baseName处理器后面增加一个业务处理器,名字由name指定
- ChannelPipeline remove(ChannelHandler handler):删除一个业务处理器实例
- ChannelHandler remove(String handler):删除一个业务处理器
- ChannelHandler removeFirst():删除第一个业务处理器
- ChannelHandler removeLast():删除最后一个业务处理器
七、ByteBuf缓冲区
Netty提供了ByteBuf来替代Java NIO的ByteBuffer缓冲区,以操纵内存缓冲区。
1、优势
与Java NIO的ByteBuffer相比,ByteBuf的优势如下:
- Pooling(池化,减少了内存复制和GC,提升了效率)
- 复合缓冲区类型,支持零复制
- 不需要使用flip方法去切换读/写模式
- 扩展性好,例如StringBuffer
- 可以自定义缓冲区类型
- 读取和写入索引分开
- 方法的链式调用
- 可以进行引用计数,方便重复使用
2、逻辑部分
ByteBuf是一个字节容器,内部是一个字节数组。从逻辑上来分,字节容器内部可以分为四个部分:
- 废弃:已用字节,表示已经使用完的废弃的无效字节
- 可读:ByteBuf保存的有效数据,从ByteBuf读取的数据都来自这一部分
- 可写:写入到ByteBuf的数据都会写到这一部分中
- 可扩容:表示该ByteBuf最多还能扩容的大小
3、重要属性
ByteBuf通过三个整型的属性有效区分可读数据和可写数据,使得读写之间相互没有冲突。这三个属性定义在AbstractByteBuf抽象类中,分别是:
- readerIndex:读指针,指示读取的起始位置。每读取一个字节就自动增加1,一旦和writerIndex相等则表示ByteBuf不可读了
- writerIndex:写指针,指示写入的起始位置。没写入一个字节就自动增加1,一旦和capacity容量相等则表示ByteBuf不可写了
- maxCapacity:最大容量,指示可扩容的最大容量。当向ByteBuf写数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity的值来设定,超过就会报错
小于readerIndex指针的的部分为废弃部分
此外AbstractByteBuf中还有两个属性:markedWriterIndex和markedReaderIndex,相当于一个暂存属性
4、ByteBuf的三组方法
1.容量系列
- capacity():ByteBuf的容量(废弃的字节数+可读的字节数+可写的字节数)
- maxCapacity():表示ByteBuf最大能够容纳的最大字节数
2.写入系列
- isWritable():表示ByteBuf是否可写。如果capacity容量大于writerIndex指针的位置则表示可写,返回false并不代表不能再往ByteBuf中写数据了,会自动扩容
- writableBytes():取得可写入的字节数,值等于capacity减去writerIndex
- maxWritableBytes():取得最大的可写入字节数,值等于maxCapacity减去writerIndex
- writeBytes(byte[] src):把src字节数组中的数据全部写到ByteBuf
- writeTYPE(TYPE value):写入基础数据类型的数据,TYPE表示基础数据类型,包含了8大基础数据类型
- setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值
- markWriterIndex():把当前的写指针属性的值保存在markedWriterIndex属性中
- resetWriterIndex():把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中
3.读取系列
- isReadable():表示ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的位置则表示可读,否则表示不可读
- readableBytes():取得可读取的字节数,值等于writerIndex减去readerIndex
- readBytes(byte[] dst):读取ByteBuf中的数据,将数据从ByteBuf读取到dst字节数组中
- readTYPE(TYPE value):读取基础数据类型的数据,TYPE表示基础数据类型,包含了8大基础数据类型
- getTYPE(TYPE value):读取基础数据类型,不改变readerIndex指针值
- markReaderIndex():把当前的读指针属性的值保存在markedReaderIndex属性中
- resetReaderIndex():把之前保存的markedReaderIndex的值恢复到读指针readerIndex属性中
5、引用计数
Netty的ByteBuf的内存回收工作是通过引用计数的方式管理的。JVM中使用计数器(一种GC方法)来标记对象是否不可达进而回收,Netty也使用这种手段来对ByteBuf的引用进行计数。Netty采用计数器来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地发现那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。
什么是池化的ByteBuf缓冲区?
在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。频繁创建对象、内存分配、释放内存会使系统的开销大、性能低。因此Netty4开始新增了对象池化的机制。即创建一个Buffer对象池,将没有被引用的Buffer对象,放入对象缓冲池中。当需要时则重新从对象缓冲池中取出,而不需要重新创建。
在默认情况下,当创建完一个ByteBuf时它的引用为1。每次调用retain方法,它的引用就加1,每次调用release方法,它的引用就减1.如果引用为0,再次访问这个ByteBuf对象,将会抛出异常。如果引用为0,表示这个ByteBuf没有哪个进程引用它,它占用的内存需要回收。
为了确保引用计数不会混乱,在Netty的业务处理器开发过程中,应该坚持一个原则:retain和release方法应该结对使用。简单地说,在一个方法中,调用了retain就应该调用release。
如果retain和release这两个方法一次都不调用,那么在缓冲区使用完成后调用一次release就是释放一次。例如在Netty流水线上,中间所有的Handler业务处理器处理完ByteBuf之后直接传递给下一个,由最后一个Handler负责调用release来释放缓冲区的内存空间。
当引用计数已经为0,Netty会进行ByteBuf的回收。分为两种情况:
- Pooled池化的ByteBuf内存,回收方法是放入可以重新分配的ByteBuf池子,等待下一次分配
- Unpooled未池化的ByteBuf缓冲区,回收分两种情况:
- 如果是堆结构缓冲,会被JVM的垃圾回收机制回收
- 如果是Direct类型,调用本地方法释放外部内存(Unsafe.freeMemory)
6、Allocator分配器
Netty通过ByteBufAllocator分配器来创建缓冲区和分配内存空间。Netty提供了ByteBufAllocator的两种实现:PoolByteBufAllocator和UnpooledByteAllocator。
PoolByteBufAllocator将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小,这个池化分配器采用了jemalloc高效内存分配策略,该策略被好几种现代操作系统所采用。
UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,它没有把ByteBuf放入池中,每次被调用时返回一个新的ByteBuf实例,通过Java的垃圾回收机制回收。
在Netty中默认的分配器为ByteBufAllocator.DEFAULT,可以通过Java系统参数的选项io.netty.allocator.type进行配置,配置时使用字符串值“unpooled”和“pooled”。不同Netty版本对于分配器的默认使用策略是不同的,在Netty4.0中默认的分配器为UnpooledByteBufAllocator,而在Netty4.1中默认的分配器为PooledByteBufAllocator。可以在Netty程序中设置启动器Bootstrap的时候将PooledByteBufAllocator设置为默认的分配器。
使用分脾气分配ByteBuf的方法有多种:
- ByteBufAllocator.DEFAULT.buffer(9,100):分配器默认分配初始容量为9,最大容量100的缓冲区
- ByteBufAllocator.DEAULT.buffer():分配器默认分配初始容量256,最大容量Integer.MAX_VALUE的缓冲区
- UnpooledByteBufAllocator.DEFAULT.heapBuffer():分池化分配器,分配基于Java的堆结构内存缓冲区
- PooledByteBufAllocator.DEFAULT.directBuffer():池化分配器,分配基于操作系统管理的直接内存缓冲区
7、缓冲区的类型
根据内存的管理方式不同,分为堆缓冲区和直接缓冲区,也就是Heap ByteBuf和Direct ByteBuf。另外为了方便缓冲区进行组合,提供了一种组合缓冲区:
1.Heap ByteBuf:内部数据为一个Java数组,存储在JVM的堆空间中,通过hasArray来判断是不是堆缓冲区。
- 优点:未使用池化的情况下能提供快速的分配和释放
- 缺点:写入底层传输通道之前都会复制到直接缓冲区
2.Direct ByteBuf:内部数据存储在操作系统的物理内存中
- 优点:能获取超过JVM堆限制大小的内存空间,写入传输通道比堆缓冲区更快
- 缺点:释放和分配空间昂贵(因为使用系统的方法),在Java中操作时需要复制一次到堆上
3.CompositeBuffer:多个缓冲区的组合表示
- 优点:方便一次操作多个缓冲区实例
上面三种缓冲区的类型,无论哪一种,都可以通过池化、非池化两种分配器来创建和分配内存空间。
Direct Memory(直接内存)不属于Java堆内存,所分配的内存其实是调用操作系统malloc函数来获得的,由Netty的本地内存堆Native堆进行管理。Direct Memory容量可以通过-XX:MaxDirectMemorySize来指定,如果不指定则默认与Java对的最大值-Xmx一样。
Direct Memory的使用避免了Java堆和Native堆之间来回复制数据,在某些应用场景中提高了性能。
在需要频繁创建缓冲区的场合,由于创建和销毁Direct Buffer的代价比较高昂,因此不宜使用Direct Buffer,也就是说Direct Buffer尽量在池化分配器中分配和回收。如果能将Direct Buffer进行复用,在读写频繁的情况下可以大幅度改善性能。
在Java的垃圾回收机制回收Java堆时,Netty框架也会释放不再使用的Direct Buffer缓冲区,因为它的内存为堆外内存,所以清理的工作不会为Java虚拟机带来压力。注意一下垃圾回收的应用场景:
- 垃圾回收仅在Java堆被填满以至于无法为新的堆分配请求提供服务时发生
- 在Java应用程序中调用System.gc()函数来释放内存
对比Heap ByteBuf和Direct ByteBuf两类缓冲区的使用:
- 创建的方法不同:Heap ByteBuf通过调用分配器的heapBuffer()方法来创建,而Direct ByteBuf的创建时通过调用分配器的directBuffer()方法,如果调用buffer()方法创建,在Netty4.1中默认创建的是Direct Buffer
- Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组,而Direct ByteBuf缓冲区不能读取内部数组
- 可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区,如果返回true则表示是堆缓冲,否则不是(可能是Direct缓冲或CompositeByteBuf缓冲区)
- Direct ByteBuf尧都区缓冲数据进行业务处理需要通过getBytes/readBytes等方法先将数据复制到Java的堆内存然后进行其他的计算
public static void main(String[] args) { ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(); heapBuf.writeBytes("你好".getBytes(Charset.forName("UTF-8"))); if (!heapBuf.hasArray()) { // 获取内部数组 byte[] array = heapBuf.array(); int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); System.out.println(new String(array, offset, length, Charset.forName("UTF-8"))); } heapBuf.release(); ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer(); directBuffer.writeBytes("你好Direct".getBytes()); if (!directBuffer.hasArray()) { int length = directBuffer.readableBytes(); byte[] array = new byte[length]; // 把数据读到堆内存 directBuffer.getBytes(directBuffer.readerIndex(), array); System.out.println(new String(array)); } directBuffer.release(); }
8、ByteBuf的自动释放
Netty的Reactor反应器线程会在底层的Java NIO通道读数据,也就是AbstractNioByteChannel.NioByteUnsafe.read(),调用ByteBufAllocator方法,创建ByteBuf实例,从操作系统缓冲区把数据读取到ByteBuf实例中,然后调用pipeline.fireChannelRead(byteBuf)方法将读取到的数据包送入到入站处理流水线中。
以上是ByteBuf的创建方式,而入站的ByteBuf的释放方式有以下两种:
1)TailHandler自动释放
Netty默认会在ChannelPipeline通道流水线的最添加一个TailHeader末尾处理器,它实现了默认的处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作。在默认情况下,如果每个InboundHandler入站处理器,把最初的ByteBuf数据包一路往下传,那么TailHandler默认处理器会自动释放入站的ByteBuf实例。
总体来说如果自定义的InboundHandler入站处理器继承自ChannelInboundHandlerAdapter适配器,那么可以调用以下两种方法来释放ByteBuf内存:
- 手动式发那个ByteBuf,具体的方式为调用byteBuf.release()
- 调用父类的入站方法将msg向后传递,依赖后面的处理器释放ByteBuf。具体的方式为调用基类的入站处理方法super.channelRead
2)SimpleChannelInboundHandler自动释放
如果Handler业务处理器需要阶段流水线的处理流程,不将ByteBuf数据包送入后边的InboundHandler入站处理器,这是流水线末端的TailHandler末尾处理器自动释放缓冲区的工作自然就失效了。
在这种场景下,Handler业务处理器有两种选择:
- 手动释放ByteBuf实例
- 继承SimpleChannelInboundHandler,利用它的自动释放功能
以入站读数据为例,Handler业务处理器必须继承自SimpleChannelInboundHandler基类,并且业务处理器
的代码必须移动到重写的channelRead0方法中。SimpleChannelInboundHandler类的channelRead等入站处理方法,会在调用完实际的channelRead方法后帮忙释放ByteBuf实例。
至于出站处理,在出站处理流程中,申请分配到的ByteBuf主要是通过HeadHandler完成自动释放的。出站处理用到的ByteBuf缓冲区一般是要发送的消息,通常由Handler业务处理器所申请而分配的,在每一个出站Handler业务处理器的处理完成后,最后数据包回来到出站的最后一棒HeadHandler,在数据输出完成后,ByteBuf会被释放一次,如果计数器为0,将被彻底释放掉。
在Netty开发中,必须密切关注ByteBuf缓冲区的释放,如果释放的不及时,会造成Netty的内存泄露,最终导致内存耗尽。
八、ByteBuf浅层复制的高级使用方式
浅层复制是一种非常重要的操作,可以很大程度地避免内存复制。ByteBuf的浅层复制分为切片浅层复制和整体浅层复制两种。
1、slice切片浅层复制
ByteBuf的slice方法可以获取到一个ByteBuf的一个切片。一个ByteBuf可以进行多次的切片浅层复制,多次切片后的ByteBuf对象可以共享一个存储区域。
slice方法有两个重载版本:
- public ByteBuf slice()
- public ByteBuf slice(int index, int length)
第一个是不带参数的slice方法,在内部是调用了buf.slice(buf.readerIndex(), buf.readableBytes()),也就是说第一个无参数slice方法的返回值是ByteBuf实例中可读部分的切片。
调用slice()方法后,返回的切片是一个新的ByteBuf对象,该对象的几个重要属性,大致如下:
- readerIndex(读指针)的值为0
- writerIndex(写指针)的值为源ByteBuf的readableBytes可读字节数
- maxCapacity(最大容量)的值为源ByteBuf的readableBytes可读字节数
切片后的心ByteBuf有两个特点:
- 切片不可以写入,原因是maxCapacity与writerIndex值相同
- 切片和源ByteBuf的可读字节数相同
切片后的新ByteBuf和源ByteBuf的关联性:
- 切片不会复制源ByteBuf的底层数据,底层数据和源ByteBuf的底层数组是同一个
- 切片不会改变源ByteBuf的引用计数
从根本上说,slice无参数方法所生成的切片就是源ByteBuf可读部分的浅层复制
2、duplicate整体浅层复制
和slice切片不同,duplicate返回的源ByteBuf的整个对象的一个浅层复制,包括以下内容:
- duplicate的读写指针、最大容量值
- duplicate不会改变源的ByteBuf的应用技术
- duplicate不会复制源的ByteBuf的底层数据
duplicate和slice方法都是浅层复制,不同的是slice方法是切取一段的浅层复制,而duplicate是整体的浅层复制
3、浅层复制的问题
浅层复制方法不会实际去复制数据,也不会改变ByteBuf的引用计数,这就会导致一个问题:在源ByteBuf调用release之后,一旦引用计数为零就比拿的不能访问了。在这种场景下,源ByteBuf的所有浅层复制实例也不能进行读写了,如果强行对浅层复制实例进行读写则会报错。
因此在调用千层复制实例时,可以通过一次retain方法来增加引用,表示它们对应的底层内存多了一次引用。
九、实例
1、服务器端
public class NettyEchoServer { private final int serverPort; ServerBootstrap bootstrap = new ServerBootstrap(); public NettyEchoServer(int serverPort) { this.serverPort = serverPort; } public void runServer() { // 创建反应器线程组 EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); EventLoopGroup workerLoopGroup = new NioEventLoopGroup(); try { // 1.设置反应器线程组 bootstrap.group(bossLoopGroup, workerLoopGroup); // 2.设置nio类型的通道 bootstrap.channel(NioServerSocketChannel.class); // 3.设置监听端口 bootstrap.localAddress(serverPort); // 4.设置通道的参数 bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // 5.装配子通道流水线 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { // 有连接到达时会创建一个通道 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 流水线管理子通道中的Handler处理器 // 向子通道流水线添加一个handler处理器 socketChannel.pipeline().addLast(NettyEchoServerHandler.INSTANCE); } }); // 6.开始绑定服务器 // 通过调用sync同步方法阻塞直到绑定成功 ChannelFuture channelFuture = bootstrap.bind().sync(); // 7.等待通道关闭的异步任务结束 // 服务监听通道会一直等待通道关闭的异步任务结束 ChannelFuture closeFuture = channelFuture.channel().closeFuture(); closeFuture.sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 8.关闭EventLoopGroup // 释放掉所有资源包括创建的线程 workerLoopGroup.shutdownGracefully(); bossLoopGroup.shutdownGracefully(); } } public static void main(String[] args) { new NettyEchoServer(8819).runServer(); } }
2、服务端处理器
@ChannelHandler.Sharable public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter { public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("msg type:" + ((in.hasArray()) ? "堆内存": "直接内存")); int len = in.readableBytes(); byte[] arr = new byte[len]; in.getBytes(0, arr); System.out.println("server received:" + new String(arr, "UTF-8")); System.out.println("写回前, msg.refCnt:" + ((ByteBuf) msg).refCnt()); ChannelFuture f = ctx.writeAndFlush(msg); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt()); } }); } }
这里的NettyEchoServerHandler在前面加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享,也就是说这个通道的流水线可以加入同一个Handler业务处理器实例,而这种操作Netty默认是不允许的。
但是如果一个服务器处理很多的通道,每个通道都新建很多重复的Handler实例,就需要很多重复的Handler实例,这就会浪费很多空间。所以在Handler实例中没有与特定通道强相关的数据或者状态,建议设计成共享的模式,即在前面加上注解@ChannelHandler.Sharable。反过来如果没有加@ChannelHandler.Sharable注解,试图将一个Handler实例添加到多个ChannelPipeline通道流水线时,Netty将会抛出异常。
默认同一个通道上的所有业务处理器,只能被同一个线程处理。所以不是@Sharable共享类型的业务处理器,在线程的层面是安全的,不需要进行线程的同步控制。而不同的通道可能绑定到多个不同的EventLoop反应器线程,因此加上@ChannelHandler.Sharable注解后的共享业务处理器的实例,可能被多个线程并发执行。这样就会导致一个结果@Sharable共享实例不是线程层面安全的,因此@Sharable共享的业务处理器,如果需要操作的数据不仅仅是局部变量,则需要进行线程的同步控制,以保证操作是线程层面安全的。
ChannelHandlerAdapater提供了使用方法isSharable(),如果其对应的实现加上了@Sharable注解,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline通道流水线中。
3、客户端
public class NettyEchoClient { private int serverPort; private String serverIp; Bootstrap b = new Bootstrap(); public NettyEchoClient(String ip, int port) { this.serverPort = port; this.serverIp = ip; } public void runClient() { EventLoopGroup workerLoopGroup = new NioEventLoopGroup(); try { b.group(workerLoopGroup); b.channel(NioSocketChannel.class); b.remoteAddress(serverIp, serverPort); b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE); } }); ChannelFuture f = b.connect(); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (f.isSuccess()) { System.out.println("连接成功"); } else { System.out.println("连接失败"); } } }); f.sync(); Channel channel = f.channel(); Scanner scanner = new Scanner(System.in); System.out.println("请输入发送内容:"); while (scanner.hasNext()) { String next = scanner.next(); ByteBuf buffer = channel.alloc().buffer(); buffer.writeBytes(next.getBytes("UTF-8")); channel.writeAndFlush(buffer); System.out.println("请输入发送内容:"); } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } finally { workerLoopGroup.shutdownGracefully(); } } }
4、客户端处理器
public class NettyEchoClientHandler extends ChannelInboundHandlerAdapter { public static final NettyEchoClientHandler INSTANCE = new NettyEchoClientHandler(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; int len = buf.readableBytes(); byte[] arr = new byte[len]; buf.getBytes(0, arr); System.out.println("client received:" + new String(arr, "UTF-8")); buf.release(); } }