Netty基础入门和基本使用-2

简介: 在Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。整个的IO处理操作包括:从通道读取数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端。前后两个环节,从通道读取数据包和由通道发送到对端由Netty的底层完成,不需要用户程序负责。

Netty基础入门和基本使用-1+https://developer.aliyun.com/article/1391142?spm=a2c6h.13148508.setting.22.671d4f0ezNOeJb

五、Handler业务处理器

在Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。整个的IO处理操作包括:从通道读取数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端。前后两个环节,从通道读取数据包和由通道发送到对端由Netty的底层完成,不需要用户程序负责。


用户程序主要在Handler业务处理器中,主要负责数据包解码、业务处理、目标数据编码、把数据包写入到通道中。前面两个环节属于入站处理器的工作,后面两个环节属于出站处理器的工作。


1、ChannelInboundHandler通道入站处理器

当数据或信息入站到Netty通道时,Netty将触发入站处理器ChannelInboundHandler所对应的入站API,进行入站处理操作。ChannelInboundHandler的主要操作如下:


  • channelRegistered:当通道注册完成后,Netty会调用fireChannelRegistered触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法会被调用到
  • channelAlive:当通道激活完成后,Netty会调用fireChannelActive触发通道激活事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelActive方法会被调用到
  • channelRead:当通道缓冲区可读,Netty会调用fireChannelRead触发通道可读事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelRead方法会被调用到
  • channelReadComplete:当通道缓冲区读完,Netty会调用fireChannelReadComplete触发通道读完事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelReadComplete方法会被调用到
  • channelInactive:当连接被断开或者不可用,Netty会调用fireChannelInactive触发通道读完事件。通道会启动该入站处理器的流水线处理,在通道注册过的入站处理器Handler的channelInactive方法会被调用到
  • exceptionCaught:当通道处理过程发生异常时,Netty会调用fireExceptionCaught。通道会启动异常捕获的流水线处理,在通道注册过的处理器Handler的channelInactive方法会被调用到。这个方法是在通道处理器中的ChannelHandler定义的方法,入站处理器,出站处理接口都继承到了该方法


上述为ChannelInboundHandler的部分重要方法。在Netty中它的默认实现为ChannelInboundHandlerAdapter,在实际开发中只需要继承这个类的默认实现,重写自己需要的方法即可。


2、ChannelOutboundHandler通道出站处理器

当业务处理完成后,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层通道的操作。包括建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler定义了大部分的出站操作,具体如下:


  • bind(监听地址(IP+端口)绑定):完成底层Java IO通道的IP地址绑定
  • connect(连接服务端):完成底层Java IO通道的服务器端连接操作
  • write(写数据到底层):完成Netty通道向底层Java IO通道的数据写入操作,此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作
  • flush:腾空缓冲区中的数据,把数据写到对端
  • read(从底层读数据):完成Netty通道从Java IO通道的数据读取
  • disConnect(断开服务器连接):断开底层Java IO通道的服务器端连接
  • close:关闭底层的通道


上述为ChannelOutboundHandler的部分重要方法,在Netty中它的默认实现为ChannelOutboundHandlerAdapter,在实际开发中只需要继承这个类的默认实现,重写自己需要的方法即可。


3、ChannelInitializer通道初始化处理器

一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作发生在通道开始工作之前。那么如何向流水线中装配业务处理器呢?这就得借助通道的初始化类ChannelInitializer。


initChannel方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员实现。在父通道调用initChannel方法时,会将新接收的通道作为参数,传递给initChannel方法。initChannel方法内部大致的业务代码是拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。


六、Pipeline流水线

Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式来设计的,内部是一个双向链表结构,能够支持动态地添加和删除Handler业务处理器。


1、Pipeline处理流程

public class InPipeline {
    static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("HandlerA");
            super.channelRead(ctx, msg);
        }
    }
    static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("HandlerB");
            super.channelRead(ctx, msg);
        }
    }
    static class SimpleInHandlerC extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("HandlerC");
            super.channelRead(ctx, msg);
        }
    }
    public static void main(String[] args) {
        ChannelInitializer<EmbeddedChannel> initializer = new ChannelInitializer<EmbeddedChannel>() {
            @Override
            protected void initChannel(EmbeddedChannel ch) throws Exception {
                ch.pipeline().addLast(new SimpleInHandlerA())
                        .addLast(new SimpleInHandlerB())
                        .addLast(new SimpleInHandlerC());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(initializer);
        ByteBuf buf = Unpooled.buffer();
        buf.writeInt(1);
        channel.writeInbound(buf);
    }
}

在channelRead()方法中调用了父类的channelRead方法,父类的channelRead方法会自动调用下一个inBoundHandler的channelRead方法,并且会把当前inBoundHandler入站处理器中处理完毕的对象传递到下一个inBoundHandler入站处理器。


在入站/出站的过程中,如果由于业务条件不满足,需要阶段流水线的处理,不让流水线进入下一站。以channelRead为例,我们只需要删除掉子类的super.channelRead()方法,不在子类中调用父类的channelRead入站方法,即可实现截断。此外入站处理传入下一站还有一种方法是调用Context上下文的ctx.fireChannelRead()方法,所以想要截断还不能调用该方法。


入站通道处理器的调用顺序会按照加入流水线的顺序调用,而对于出站通道处理器而言,先加入的处理器会在最后被调用。


对于出站处理流程而言,只要开始执行,就不能被截断。强行阶段的话Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。


2、ChannelHandlerContext

不管我们定义的是哪种类型的Handler业务处理器,最终它们都是以双向链表的方式保存在流水线中。这里流水线的节点类型,并不是前面的Handler业务处理器基类,而是一个新的Netty类型:ChannelHandlerContext,它代表了ChannelHandler通道处理器和ChannelPipeline通道流水线之间的关联。


ChannelHandlerContext中包含了很多方法,主要分为两类:第一类是获取上下文关联的Netty组件实例,如所关联的通道、所关联的流水线、上下文内部Handler业务处理器实例,第二类是入站和出站方法。


在Channel、ChannelPipeline、ChannelHandlerContext三个类中,会有同样的入栈和出站处理方法。如果通过Channel或ChannelPipeline的实例来调用这些方法,他们就会在整条流水线中传播。然而如果是通过ChannelHandlerContext通道处理器上下文来进行调用,就只会从当前节点开始执行Handler处理器,并传播到同类型的处理器的下一站。


Channel、Handler、ChannelHandlerContext三者的关系为:Channel通道拥有一条ChannelPipeline通道流水线,每一个流水线节点为一个ChannelHandlerContext通道处理器上下文对象,每一个上下文中报过了一个ChannelHandler通道处理器。在ChannelHandler通道处理器的入站和出站处理方法中,Netty都会传递一个Context上下文实例作为实际参数。通过Context实例的实惨,在业务处理中,可以获取ChannelPipeline通道流水线的实例或者Channel通道的实例。


3、Handler业务处理器的热拔插

在程序执行过程中,可以动态进行业务处理器的热拔插:动态地增加、删除流水线上的业务处理器。主要的热拔插方法声明在ChannelPipeline接口中,如下:


  1. ChannelPipeline addFirst(String name, ChannelHandler handler):在头部增加一个业务处理器,名字由name指定
  2. ChannelPipeline addLast(String name, ChannelHandler handler):在头部增加一个业务处理器,名字由name指定
  3. ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler):在baseName处理器前面增加一个业务处理器,名字由name指定
  4. ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler):在baseName处理器后面增加一个业务处理器,名字由name指定
  5. ChannelPipeline remove(ChannelHandler handler):删除一个业务处理器实例
  6. ChannelHandler remove(String handler):删除一个业务处理器
  7. ChannelHandler removeFirst():删除第一个业务处理器
  8. ChannelHandler removeLast():删除最后一个业务处理器


七、ByteBuf缓冲区

Netty提供了ByteBuf来替代Java NIO的ByteBuffer缓冲区,以操纵内存缓冲区。


1、优势

与Java NIO的ByteBuffer相比,ByteBuf的优势如下:


  • Pooling(池化,减少了内存复制和GC,提升了效率)
  • 复合缓冲区类型,支持零复制
  • 不需要使用flip方法去切换读/写模式
  • 扩展性好,例如StringBuffer
  • 可以自定义缓冲区类型
  • 读取和写入索引分开
  • 方法的链式调用
  • 可以进行引用计数,方便重复使用


2、逻辑部分

ByteBuf是一个字节容器,内部是一个字节数组。从逻辑上来分,字节容器内部可以分为四个部分:


  1. 废弃:已用字节,表示已经使用完的废弃的无效字节
  2. 可读:ByteBuf保存的有效数据,从ByteBuf读取的数据都来自这一部分
  3. 可写:写入到ByteBuf的数据都会写到这一部分中
  4. 可扩容:表示该ByteBuf最多还能扩容的大小


3、重要属性

ByteBuf通过三个整型的属性有效区分可读数据和可写数据,使得读写之间相互没有冲突。这三个属性定义在AbstractByteBuf抽象类中,分别是:


  • readerIndex:读指针,指示读取的起始位置。每读取一个字节就自动增加1,一旦和writerIndex相等则表示ByteBuf不可读了
  • writerIndex:写指针,指示写入的起始位置。没写入一个字节就自动增加1,一旦和capacity容量相等则表示ByteBuf不可写了
  • maxCapacity:最大容量,指示可扩容的最大容量。当向ByteBuf写数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity的值来设定,超过就会报错


小于readerIndex指针的的部分为废弃部分


此外AbstractByteBuf中还有两个属性:markedWriterIndex和markedReaderIndex,相当于一个暂存属性


4、ByteBuf的三组方法

1.容量系列


  • capacity():ByteBuf的容量(废弃的字节数+可读的字节数+可写的字节数)
  • maxCapacity():表示ByteBuf最大能够容纳的最大字节数


2.写入系列


  • isWritable():表示ByteBuf是否可写。如果capacity容量大于writerIndex指针的位置则表示可写,返回false并不代表不能再往ByteBuf中写数据了,会自动扩容
  • writableBytes():取得可写入的字节数,值等于capacity减去writerIndex
  • maxWritableBytes():取得最大的可写入字节数,值等于maxCapacity减去writerIndex
  • writeBytes(byte[] src):把src字节数组中的数据全部写到ByteBuf
  • writeTYPE(TYPE value):写入基础数据类型的数据,TYPE表示基础数据类型,包含了8大基础数据类型
  • setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值
  • markWriterIndex():把当前的写指针属性的值保存在markedWriterIndex属性中
  • resetWriterIndex():把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中


3.读取系列


  • isReadable():表示ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的位置则表示可读,否则表示不可读
  • readableBytes():取得可读取的字节数,值等于writerIndex减去readerIndex
  • readBytes(byte[] dst):读取ByteBuf中的数据,将数据从ByteBuf读取到dst字节数组中
  • readTYPE(TYPE value):读取基础数据类型的数据,TYPE表示基础数据类型,包含了8大基础数据类型
  • getTYPE(TYPE value):读取基础数据类型,不改变readerIndex指针值
  • markReaderIndex():把当前的读指针属性的值保存在markedReaderIndex属性中
  • resetReaderIndex():把之前保存的markedReaderIndex的值恢复到读指针readerIndex属性中


5、引用计数

Netty的ByteBuf的内存回收工作是通过引用计数的方式管理的。JVM中使用计数器(一种GC方法)来标记对象是否不可达进而回收,Netty也使用这种手段来对ByteBuf的引用进行计数。Netty采用计数器来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地发现那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。


什么是池化的ByteBuf缓冲区?

在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。频繁创建对象、内存分配、释放内存会使系统的开销大、性能低。因此Netty4开始新增了对象池化的机制。即创建一个Buffer对象池,将没有被引用的Buffer对象,放入对象缓冲池中。当需要时则重新从对象缓冲池中取出,而不需要重新创建。


在默认情况下,当创建完一个ByteBuf时它的引用为1。每次调用retain方法,它的引用就加1,每次调用release方法,它的引用就减1.如果引用为0,再次访问这个ByteBuf对象,将会抛出异常。如果引用为0,表示这个ByteBuf没有哪个进程引用它,它占用的内存需要回收。


为了确保引用计数不会混乱,在Netty的业务处理器开发过程中,应该坚持一个原则:retain和release方法应该结对使用。简单地说,在一个方法中,调用了retain就应该调用release。


如果retain和release这两个方法一次都不调用,那么在缓冲区使用完成后调用一次release就是释放一次。例如在Netty流水线上,中间所有的Handler业务处理器处理完ByteBuf之后直接传递给下一个,由最后一个Handler负责调用release来释放缓冲区的内存空间。


当引用计数已经为0,Netty会进行ByteBuf的回收。分为两种情况:


  1. Pooled池化的ByteBuf内存,回收方法是放入可以重新分配的ByteBuf池子,等待下一次分配


  1. Unpooled未池化的ByteBuf缓冲区,回收分两种情况:


  1. 如果是堆结构缓冲,会被JVM的垃圾回收机制回收
  2. 如果是Direct类型,调用本地方法释放外部内存(Unsafe.freeMemory)


6、Allocator分配器

Netty通过ByteBufAllocator分配器来创建缓冲区和分配内存空间。Netty提供了ByteBufAllocator的两种实现:PoolByteBufAllocator和UnpooledByteAllocator。


PoolByteBufAllocator将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小,这个池化分配器采用了jemalloc高效内存分配策略,该策略被好几种现代操作系统所采用。


UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,它没有把ByteBuf放入池中,每次被调用时返回一个新的ByteBuf实例,通过Java的垃圾回收机制回收。


在Netty中默认的分配器为ByteBufAllocator.DEFAULT,可以通过Java系统参数的选项io.netty.allocator.type进行配置,配置时使用字符串值“unpooled”和“pooled”。不同Netty版本对于分配器的默认使用策略是不同的,在Netty4.0中默认的分配器为UnpooledByteBufAllocator,而在Netty4.1中默认的分配器为PooledByteBufAllocator。可以在Netty程序中设置启动器Bootstrap的时候将PooledByteBufAllocator设置为默认的分配器。


使用分脾气分配ByteBuf的方法有多种:


  1. ByteBufAllocator.DEFAULT.buffer(9,100):分配器默认分配初始容量为9,最大容量100的缓冲区
  2. ByteBufAllocator.DEAULT.buffer():分配器默认分配初始容量256,最大容量Integer.MAX_VALUE的缓冲区
  3. UnpooledByteBufAllocator.DEFAULT.heapBuffer():分池化分配器,分配基于Java的堆结构内存缓冲区
  4. PooledByteBufAllocator.DEFAULT.directBuffer():池化分配器,分配基于操作系统管理的直接内存缓冲区


7、缓冲区的类型

根据内存的管理方式不同,分为堆缓冲区和直接缓冲区,也就是Heap ByteBuf和Direct ByteBuf。另外为了方便缓冲区进行组合,提供了一种组合缓冲区:


1.Heap ByteBuf:内部数据为一个Java数组,存储在JVM的堆空间中,通过hasArray来判断是不是堆缓冲区。


  • 优点:未使用池化的情况下能提供快速的分配和释放
  • 缺点:写入底层传输通道之前都会复制到直接缓冲区


2.Direct ByteBuf:内部数据存储在操作系统的物理内存中


  • 优点:能获取超过JVM堆限制大小的内存空间,写入传输通道比堆缓冲区更快
  • 缺点:释放和分配空间昂贵(因为使用系统的方法),在Java中操作时需要复制一次到堆上


3.CompositeBuffer:多个缓冲区的组合表示


  • 优点:方便一次操作多个缓冲区实例


上面三种缓冲区的类型,无论哪一种,都可以通过池化、非池化两种分配器来创建和分配内存空间。


Direct Memory(直接内存)不属于Java堆内存,所分配的内存其实是调用操作系统malloc函数来获得的,由Netty的本地内存堆Native堆进行管理。Direct Memory容量可以通过-XX:MaxDirectMemorySize来指定,如果不指定则默认与Java对的最大值-Xmx一样。

Direct Memory的使用避免了Java堆和Native堆之间来回复制数据,在某些应用场景中提高了性能。

在需要频繁创建缓冲区的场合,由于创建和销毁Direct Buffer的代价比较高昂,因此不宜使用Direct Buffer,也就是说Direct Buffer尽量在池化分配器中分配和回收。如果能将Direct Buffer进行复用,在读写频繁的情况下可以大幅度改善性能。

在Java的垃圾回收机制回收Java堆时,Netty框架也会释放不再使用的Direct Buffer缓冲区,因为它的内存为堆外内存,所以清理的工作不会为Java虚拟机带来压力。注意一下垃圾回收的应用场景:

  1. 垃圾回收仅在Java堆被填满以至于无法为新的堆分配请求提供服务时发生
  2. 在Java应用程序中调用System.gc()函数来释放内存


对比Heap ByteBuf和Direct ByteBuf两类缓冲区的使用:


  1. 创建的方法不同:Heap ByteBuf通过调用分配器的heapBuffer()方法来创建,而Direct ByteBuf的创建时通过调用分配器的directBuffer()方法,如果调用buffer()方法创建,在Netty4.1中默认创建的是Direct Buffer
  2. Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组,而Direct ByteBuf缓冲区不能读取内部数组
  3. 可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区,如果返回true则表示是堆缓冲,否则不是(可能是Direct缓冲或CompositeByteBuf缓冲区)
  4. Direct ByteBuf尧都区缓冲数据进行业务处理需要通过getBytes/readBytes等方法先将数据复制到Java的堆内存然后进行其他的计算
public static void main(String[] args) {
    ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer();
    heapBuf.writeBytes("你好".getBytes(Charset.forName("UTF-8")));
    if (!heapBuf.hasArray()) {
        // 获取内部数组
        byte[] array = heapBuf.array();
        int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
        int length = heapBuf.readableBytes();
        System.out.println(new String(array, offset, length, Charset.forName("UTF-8")));
    }
    heapBuf.release();
    ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
    directBuffer.writeBytes("你好Direct".getBytes());
    if (!directBuffer.hasArray()) {
        int length = directBuffer.readableBytes();
        byte[] array = new byte[length];
        // 把数据读到堆内存
        directBuffer.getBytes(directBuffer.readerIndex(), array);
        System.out.println(new String(array));
    }
    directBuffer.release();
}



8、ByteBuf的自动释放

Netty的Reactor反应器线程会在底层的Java NIO通道读数据,也就是AbstractNioByteChannel.NioByteUnsafe.read(),调用ByteBufAllocator方法,创建ByteBuf实例,从操作系统缓冲区把数据读取到ByteBuf实例中,然后调用pipeline.fireChannelRead(byteBuf)方法将读取到的数据包送入到入站处理流水线中。


以上是ByteBuf的创建方式,而入站的ByteBuf的释放方式有以下两种:


1)TailHandler自动释放

Netty默认会在ChannelPipeline通道流水线的最添加一个TailHeader末尾处理器,它实现了默认的处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作。在默认情况下,如果每个InboundHandler入站处理器,把最初的ByteBuf数据包一路往下传,那么TailHandler默认处理器会自动释放入站的ByteBuf实例。


总体来说如果自定义的InboundHandler入站处理器继承自ChannelInboundHandlerAdapter适配器,那么可以调用以下两种方法来释放ByteBuf内存:


  1. 手动式发那个ByteBuf,具体的方式为调用byteBuf.release()
  2. 调用父类的入站方法将msg向后传递,依赖后面的处理器释放ByteBuf。具体的方式为调用基类的入站处理方法super.channelRead


2)SimpleChannelInboundHandler自动释放

如果Handler业务处理器需要阶段流水线的处理流程,不将ByteBuf数据包送入后边的InboundHandler入站处理器,这是流水线末端的TailHandler末尾处理器自动释放缓冲区的工作自然就失效了。


在这种场景下,Handler业务处理器有两种选择:


  1. 手动释放ByteBuf实例
  2. 继承SimpleChannelInboundHandler,利用它的自动释放功能


以入站读数据为例,Handler业务处理器必须继承自SimpleChannelInboundHandler基类,并且业务处理器

的代码必须移动到重写的channelRead0方法中。SimpleChannelInboundHandler类的channelRead等入站处理方法,会在调用完实际的channelRead方法后帮忙释放ByteBuf实例。


至于出站处理,在出站处理流程中,申请分配到的ByteBuf主要是通过HeadHandler完成自动释放的。出站处理用到的ByteBuf缓冲区一般是要发送的消息,通常由Handler业务处理器所申请而分配的,在每一个出站Handler业务处理器的处理完成后,最后数据包回来到出站的最后一棒HeadHandler,在数据输出完成后,ByteBuf会被释放一次,如果计数器为0,将被彻底释放掉。


在Netty开发中,必须密切关注ByteBuf缓冲区的释放,如果释放的不及时,会造成Netty的内存泄露,最终导致内存耗尽。


八、ByteBuf浅层复制的高级使用方式

浅层复制是一种非常重要的操作,可以很大程度地避免内存复制。ByteBuf的浅层复制分为切片浅层复制和整体浅层复制两种。


1、slice切片浅层复制

ByteBuf的slice方法可以获取到一个ByteBuf的一个切片。一个ByteBuf可以进行多次的切片浅层复制,多次切片后的ByteBuf对象可以共享一个存储区域。


slice方法有两个重载版本:


  1. public ByteBuf slice()
  2. public ByteBuf slice(int index, int length)


第一个是不带参数的slice方法,在内部是调用了buf.slice(buf.readerIndex(), buf.readableBytes()),也就是说第一个无参数slice方法的返回值是ByteBuf实例中可读部分的切片。


调用slice()方法后,返回的切片是一个新的ByteBuf对象,该对象的几个重要属性,大致如下:


  • readerIndex(读指针)的值为0
  • writerIndex(写指针)的值为源ByteBuf的readableBytes可读字节数
  • maxCapacity(最大容量)的值为源ByteBuf的readableBytes可读字节数


切片后的心ByteBuf有两个特点:


  1. 切片不可以写入,原因是maxCapacity与writerIndex值相同
  2. 切片和源ByteBuf的可读字节数相同


切片后的新ByteBuf和源ByteBuf的关联性:


  • 切片不会复制源ByteBuf的底层数据,底层数据和源ByteBuf的底层数组是同一个
  • 切片不会改变源ByteBuf的引用计数


从根本上说,slice无参数方法所生成的切片就是源ByteBuf可读部分的浅层复制


2、duplicate整体浅层复制

和slice切片不同,duplicate返回的源ByteBuf的整个对象的一个浅层复制,包括以下内容:


  • duplicate的读写指针、最大容量值
  • duplicate不会改变源的ByteBuf的应用技术
  • duplicate不会复制源的ByteBuf的底层数据


duplicate和slice方法都是浅层复制,不同的是slice方法是切取一段的浅层复制,而duplicate是整体的浅层复制


3、浅层复制的问题

浅层复制方法不会实际去复制数据,也不会改变ByteBuf的引用计数,这就会导致一个问题:在源ByteBuf调用release之后,一旦引用计数为零就比拿的不能访问了。在这种场景下,源ByteBuf的所有浅层复制实例也不能进行读写了,如果强行对浅层复制实例进行读写则会报错。


因此在调用千层复制实例时,可以通过一次retain方法来增加引用,表示它们对应的底层内存多了一次引用。


九、实例

1、服务器端

public class NettyEchoServer {
    private final int serverPort;
    ServerBootstrap bootstrap = new ServerBootstrap();
    public NettyEchoServer(int serverPort) {
        this.serverPort = serverPort;
    }
    public void runServer() {
        // 创建反应器线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
        try {
            // 1.设置反应器线程组
            bootstrap.group(bossLoopGroup, workerLoopGroup);
            // 2.设置nio类型的通道
            bootstrap.channel(NioServerSocketChannel.class);
            // 3.设置监听端口
            bootstrap.localAddress(serverPort);
            // 4.设置通道的参数
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            // 5.装配子通道流水线
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                // 有连接到达时会创建一个通道
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // 流水线管理子通道中的Handler处理器
                    // 向子通道流水线添加一个handler处理器
                    socketChannel.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
                }
            });
            // 6.开始绑定服务器
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = bootstrap.bind().sync();
            // 7.等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8.关闭EventLoopGroup
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new NettyEchoServer(8819).runServer();
    }
}

2、服务端处理器

@ChannelHandler.Sharable
public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter {
    public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("msg type:" + ((in.hasArray()) ? "堆内存": "直接内存"));
        int len = in.readableBytes();
        byte[] arr = new byte[len];
        in.getBytes(0, arr);
        System.out.println("server received:" + new String(arr, "UTF-8"));
        System.out.println("写回前, msg.refCnt:" + ((ByteBuf) msg).refCnt());
        ChannelFuture f = ctx.writeAndFlush(msg);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                System.out.println("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
            }
        });
    }
}

这里的NettyEchoServerHandler在前面加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享,也就是说这个通道的流水线可以加入同一个Handler业务处理器实例,而这种操作Netty默认是不允许的。


但是如果一个服务器处理很多的通道,每个通道都新建很多重复的Handler实例,就需要很多重复的Handler实例,这就会浪费很多空间。所以在Handler实例中没有与特定通道强相关的数据或者状态,建议设计成共享的模式,即在前面加上注解@ChannelHandler.Sharable。反过来如果没有加@ChannelHandler.Sharable注解,试图将一个Handler实例添加到多个ChannelPipeline通道流水线时,Netty将会抛出异常。


默认同一个通道上的所有业务处理器,只能被同一个线程处理。所以不是@Sharable共享类型的业务处理器,在线程的层面是安全的,不需要进行线程的同步控制。而不同的通道可能绑定到多个不同的EventLoop反应器线程,因此加上@ChannelHandler.Sharable注解后的共享业务处理器的实例,可能被多个线程并发执行。这样就会导致一个结果@Sharable共享实例不是线程层面安全的,因此@Sharable共享的业务处理器,如果需要操作的数据不仅仅是局部变量,则需要进行线程的同步控制,以保证操作是线程层面安全的。


ChannelHandlerAdapater提供了使用方法isSharable(),如果其对应的实现加上了@Sharable注解,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline通道流水线中。


3、客户端

public class NettyEchoClient {
    private int serverPort;
    private String serverIp;
    Bootstrap b = new Bootstrap();
    public NettyEchoClient(String ip, int port) {
        this.serverPort = port;
        this.serverIp = ip;
    }
    public void runClient() {
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
        try {
            b.group(workerLoopGroup);
            b.channel(NioSocketChannel.class);
            b.remoteAddress(serverIp, serverPort);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
                }
            });
            ChannelFuture f = b.connect();
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (f.isSuccess()) {
                        System.out.println("连接成功");
                    } else {
                        System.out.println("连接失败");
                    } 
                }
            });
            f.sync();
            Channel channel = f.channel();
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入发送内容:");
            while (scanner.hasNext()) {
                String next = scanner.next();
                ByteBuf buffer = channel.alloc().buffer();
                buffer.writeBytes(next.getBytes("UTF-8"));
                channel.writeAndFlush(buffer);
                System.out.println("请输入发送内容:");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        } finally {
            workerLoopGroup.shutdownGracefully();
        }
    }
}

4、客户端处理器

public class NettyEchoClientHandler extends ChannelInboundHandlerAdapter {
    public static final NettyEchoClientHandler INSTANCE = new NettyEchoClientHandler();
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        int len = buf.readableBytes();
        byte[] arr = new byte[len];
        buf.getBytes(0, arr);
        System.out.println("client received:" + new String(arr, "UTF-8"));
        buf.release();
    }
}


相关文章
|
4天前
|
缓存 网络协议 算法
Netty的基础入门(上)
Netty的基础入门(上)
93 0
|
4天前
|
缓存 网络协议 算法
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
151 0
|
4天前
|
存储 消息中间件 缓存
Netty的基础入门(下)
Netty的基础入门(下)
46 0
|
4天前
|
消息中间件 缓存 Java
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
106 0
|
4天前
|
缓存 网络协议 算法
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析(二)
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
65 1
|
4天前
|
设计模式 网络协议 算法
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析(一)
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析(一)
94 1
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析(一)
|
4天前
|
消息中间件 缓存 Java
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty(二)
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
116 1
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty(二)
|
4天前
|
缓存 Java 数据挖掘
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty(一)
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
95 0
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty(一)
|
4天前
|
编解码 网络协议 Java
Netty基础入门学习
Netty基础入门学习
34 0
|
4天前
|
监控 网络协议 前端开发
Netty基础入门和基本使用-1
Netty继承和扩展了JDK Future系列异步回调的API,定义了自身的Futrue系列接口和类,实现了异步任务的监控、异步执行结果的获取。总体来说Netty对Java Future异步任务的扩展如下: 继承Java的Future接口,得到了一个新的属于Netty自己的Future异步任务接口,该接口对原有接口进行了增强,使得Netty异步任务能够以非阻塞的方式处理回调的结果 引入了一个新街口——GenericFutureListener,用于表示异步执行完成的监听器。这个Netty使用了监听器的模式,异步任务的执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的Ge