引言
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty
恰恰是Java网络编程领域的无冕之王。Netty
这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE
中的Spring
。
当然,这样去聊它大家可能无法实际感受出它的重要性,那先来看看基于Netty
构建的应用:
观察上述列出的开源组件,一眼望去几乎全是各个领域中大名鼎鼎的框架,而这些组件都是基于Netty
构建的,涵盖中间件、大数据、离线计算、分布式、RPC、No-SQL
等各个方向....,很明显的可感知出Netty
的地位之高,因此如果要打造一款Java
高性能的网络通信程序、想要真正熟知分布式架构的底层原理,Netty
成为了每个Java
开发进阶必须要掌握的核心技术之一。
Netty
的重要性不言而知,但网上相关的大部分视频、文章、书籍等资料却五花八门,很难真正帮助大家构建出一套完整的体系,本文的目的就是带诸位走入基于Netty
的网络世界,在真正意义上为诸君构建一套Netty
的知识储备。
本文会先从概念开始,到基础入门、核心组件依次展开,后续结合多个实战案例全面详解
Netty
的应用。但在学习之前,大家最好有Java-IO
体系、多路复用模型等相关知识的储备,如若未曾具备请先移步:《Java-IO机制全解》、《多路复用模型剖析》两文,前者是必须,后者则暂且无需掌握,因为在后续的《Netty
源码篇》中才会涉及。
一、初识Netty的基础概念与快速入门
注意看:上图中右边这位黑眼圈堪比熊猫眼的哥们,从他头顶的发量就能明显感受出其技术强度,他!!!旁边的这位才是Netty
框架的原作者Trustin Lee
(韩国人),同时他也是另一个著名网络框架Mina
的核心主程之一,现任职于Apple
苹果集团......,不过多介绍作者了,总之是一位网络方面的大牛。
重点来聊聊我们的主角:Netty
框架,其实这个框架是基于Java原生NIO
技术的进一步封装,在其中对Java-NIO
技术做了进一步增强,作者充分结合了Reactor
线程模型,将Netty
变为了一个基于异步事件驱动的网络框架,Netty
从诞生至今共发布了五个大版本,但目前最常用的反而并非是最新的5.x
系列,而是4.x
系列的版本,原因在于Netty
本身就是基于Java-NIO
封装的,而JDK
本身又很稳定,再加上5.x
版本并未有太大的性能差异,因此4.x
系列才是主流。
再回过头来思考一个问题:为什么Netty
要二次封装原生NIO
呢?相信看过NIO
源码的小伙伴都清楚,原生的NIO
设计的特别繁琐,而且还存在一系列安全隐患,因此Netty
则是抱着简化NIO
、解决隐患、提升性能等目的而研发的。
不过有意思的一点在于:
Netty
虽然是基于Java-NIO
封装的框架,但实际使用起来却跟之前聊到的Java-AIO(NIO2)
技术有些相似。
1.1、Netty的入门实例
上面扯了不少Netty
的概念,现在就直接先实操一番快速入门,毕竟编程讲究施展出真理,首先第一步则是添加对应的依赖,如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
然后先创建NettyServer
服务端,代码如下:
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个EventLoopGroup,boss:处理连接事件,worker处理I/O事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
// 创建一个ServerBootstrap服务端(同之前的ServerSocket类似)
ServerBootstrap server = new ServerBootstrap();
try {
// 将前面创建的两个EventLoopGroup绑定在server上
server.group(boss,worker)
// 指定服务端的通道为Nio类型
.channel(NioServerSocketChannel.class)
// 为到来的客户端Socket添加处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 这个只会执行一次(主要是用于添加更多的处理器)
@Override
protected void initChannel(NioSocketChannel ch) {
// 添加一个字符解码处理器:对客户端的数据解码
ch.pipeline().addLast(
new StringDecoder(CharsetUtil.UTF_8));
// 添加一个入站处理器,对收到的数据进行处理
ch.pipeline().addLast(
new SimpleChannelInboundHandler<String>() {
// 读取事件的回调方法
@Override
protected void channelRead0(ChannelHandlerContext
ctx,String msg) {
System.out.println("收到客户端信息:" + msg);
}
});
}
});
// 为当前服务端绑定IP与端口地址(sync是同步阻塞至连接成功为止)
ChannelFuture cf = server.bind("127.0.0.1",8888).sync();
// 关闭服务端的方法(之后不会在这里关闭)
cf.channel().closeFuture().sync();
}finally {
// 优雅停止之前创建的两个Group
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
紧接着再构建一个NettyClient
客户端,代码如下:
public class NettyClient {
public static void main(String[] args) {
// 由于无需处理连接事件,所以只需要创建一个EventLoopGroup
EventLoopGroup worker = new NioEventLoopGroup();
// 创建一个客户端(同之前的Socket、SocketChannel)
Bootstrap client = new Bootstrap();
try {
client.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc)
throws Exception {
// 添加一个编码处理器,对数据编码为UTF-8格式
sc.pipeline().addLast(new
StringEncoder(CharsetUtil.UTF_8));
}
});
// 与指定的地址建立连接
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 建立连接成功后,向服务端发送数据
System.out.println("正在向服务端发送信息......");
cf.channel().writeAndFlush("我是<竹子爱熊猫>!");
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
先看运行结果吧,控制台输出如下:
NettyServer控制台输出:
收到客户端信息:我是<竹子爱熊猫>!
NettyClient控制台输出:
正在向服务端发送信息......
从结果中很容易看出这个案例中做了什么事情,其实无非就是利用Netty
实现了简单的对端通信,实现的功能很简单,但对于未学习过Netty
技术的小伙伴,在代码方面估计有些许懵,那么接下来简单的解释一下代码。
但在此之前先声明一点:
Netty
是支持链式编程的一个框架,也就是如上述中的代码调用,所有的方法都可以一直用.
连下去,所以在Netty
的应用中会见到大量的这类写法。
上述案例的代码,说复杂呢也其实并不难,相信认真看完了之前《Java-IO篇》的小伙伴多少都能看懂代码,不理解的地方估计就在于其中出现的几个新的概念:EventLoopGroup、ServerBootstrap、childHandler
,我们先对于这些概念做简单解释,后续会重点剖析:
EventLoopGroup
:可以理解成之前的Selector
选择器,但结合了线程池(后续详细分析)。ServerBootstrap/Bootstrap
:类似于之前的ServerSocketChannel/SocketChannel
。childHandler
:这个是新概念,可以理解成过滤器,在之前的Servlet
编程中,新请求到来都会经过一个个的过滤器,而这个处理器也类似于之前的过滤器,新连接到来时,也会经过添加好的一系列处理器。
OK~,对于上述几个新概念有了简单认知后,接着把上面案例的完整流程分析一下:
- ①先创建两个
EventLoopGroup
事件组,然后创建一个ServerBootstrap
服务端。 - ②将创建的两个事件组
boss、worker
绑定在服务端上,并指定服务端通道为NIO
类型。 - ③在
server
上添加处理器,对新到来的Socket
连接进行处理,在这里主要分为两类:ChannelInitializer
:连接到来时执行,主要是用于添加更多的处理器(只触发一次)。addLast()
:通过该方式添加的处理器不会立马执行,而是根据处理器类型择机执行。
- ④为创建好的服务端绑定
IP
及端口号,调用sync()
意思是阻塞至绑定成功为止。 - ⑤再创建一个
EventLoopGroup
事件组,并创建一个Bootstrap
客户端。 - ⑥将事件组绑定在客户端上,由于无需处理连接事件,所以只需要一个事件组。
- ⑦指定
Channel
通道类型为NIO
、添加处理器.....(同服务端类似) - ⑧与前面服务端绑定的地址建立连接,由于默认是异步的,也要调用
sync()
阻塞。 - ⑨建立连接后,客户端将数据写入到通道准备发送,首先会先经过添加好的编码处理器,将数据的格式设为
UTF-8
。 - ⑩服务器收到数据后,会先经过解码处理器,然后再去到入站处理,执行对应的
Read()
方法逻辑。 - ⑪客户端完成数据发送后,先关闭通道,再优雅关闭创建好的事件组。
- ⑫同理,服务端工作完成后,先关闭通道再停止事件组。
结合上述的流程,再去看一遍给出的案例源码,相信诸位应该可以彻底理解。不过需要注意的一点是:Netty
的大部分操作都是异步的,比如地址绑定、客户端连接等。好比调用connect()
方法与服务端建立连接时,主线程会把这个工作交给事件组中的线程去完成,所以此刻如果主线程直接去向通道中写入数据,有几率会出现报错,因为实际生产环境中,可能由于网络延迟导致连接建立的时间有些长,此时通道并未建立成功,因此尝试发送数据时就会有问题,这点与之前的Java-AIO
通信案例中,客户端建立连接要调用.get()
方法是同理。
到这里,你对
Netty
框架已经入门了,接着咱们重点聊聊Netty
中的一些核心组件。
二、Netty框架核心组件:启动器与事件组
对于Netty
有了基本的认知后,接下来慢慢的熟悉这个框架吧,先依次来看看其中的一些核心组件,了解这些组件及作用后,才能真正意义上的“玩转Netty
”。
2.1、启动器-ServerBootstrap、Bootstrap
ServerBootstrap、Bootstrap
这两个组件应该无需过多解释,上个表格对比大家就理解了:
对比项 | 服务端 | 客户端 |
---|---|---|
BIO |
ServerSocket |
Socket |
NIO |
ServerSocketChannel |
SocketChannel |
AIO |
AsynchronousServerSocketChannel |
AsynchronousSocketChannel |
Netty |
ServerBootstrap |
Bootstrap |
从上表中能明显感觉出它俩在Netty
中的作用,无非就是服务端与客户端换了个叫法而已。
2.2、事件组-EventLoopGroup、EventLoop
这两个东西比较重要,但同时也比较抽象,EventLoop
这东西翻译过来就是事件循环的意思,你可以把它理解成NIO
中的Selector
选择器,实际它本质上就是这玩意儿,因为内部会维护一个Selector
,然后由一条线程会循环处理Channel
通道上发生的所有事件,所以每个EventLoop
对象都可以看成一个单线程执行器。
EventLoopGroup
可以将其理解成AIO
中的AsynchronousChannelGroup
可能会更合适,在AIO
的ACG
(前面那玩意儿的缩写)中,我们需要手动指定一个线程池,然后AIO
的所有客户端工作都会使用线程池中的线程进行管理,而Netty
中的EventLoopGroup
就类似于AIO-ACG
这玩意儿,只不过不需要我们管理线程池了,而是Netty
内部维护。
对
EventLoopGroup、EventLoop
有了基本认知后,你再点进它们的源码实现,其实能够观测到:其实它们继承了两个类,一个是Netty
自己实现的有序线程池OrderedEventExecutor
类,另一个则JDK
提供的原生定时调度线程池ScheduledExecutorService
类(源码篇会详细分析,这里先简单了解)。
看过之前关于《JDK线程池》文章的小伙伴应该清楚,既然EventLoop/EventLoopGroup
继承自JDK
原生的定时线程池,那也就代表着:它拥有JDK
线程池中所有提供的方法,同时也应该会支持执行异步任务、定时任务的功能。那么实际情况是这样吗?答案是Yes
,如下:
public static void main(String[] args) {
EventLoopGroup threadPool = new NioEventLoopGroup();
// 递交Runnable类型的普通异步任务
threadPool.execute(()->{
System.out.println("execute()方法提交的任务....");
});
// 递交Callable类型的有返回异步任务
threadPool.submit(() -> {
System.out.println("submit()方法提交的任务....");
return "我是执行结果噢!";
});
// 递交Callable类型的延时调度任务
threadPool.schedule(()->{
System.out.println("schedule()方法提交的任务,三秒后执行....");
return "调度执行后我会返回噢!";
},3,TimeUnit.SECONDS);
// 递交Runnable类型的延迟间隔调度任务
threadPool.scheduleAtFixedRate(()->{
System.out.println("scheduleAtFixedRate()方法提交的任务....");
},3,1,TimeUnit.SECONDS);
}
/* ~~~~~~~~~~~~~~~~~~我是性感的分割线~~~~~~~~~~~~~~~~~~ */
执行结果如下:
立即执行:
execute()方法提交的任务....
submit()方法提交的任务....
延时三秒后执行:
schedule()方法提交的任务....
scheduleAtFixedRate()方法提交的任务....
之后没间隔一秒执行:
scheduleAtFixedRate()方法提交的任务....
scheduleAtFixedRate()方法提交的任务....
上述我们创建了一个EventLoopGroup
事件循环组,然后通过之前JDK
线程池提供的一系列的提交任务的方法,向其递交了几个异步任务,然后运行该程序,答案显而易见,EventLoopGroup
确实可以当做JDK
原生线程池来使用。
当然,这些并非分析的重点,重点来看看
EventLoopGroup
如何在Netty
中合理运用。
在了解它们的Netty
用法之前,先来看看除原生线程池之外所提供的方法:
EventLoop.inEventLoop(Thread)
:判断一个线程是否属于当前EventLoop
。EventLoop.parent()
:判断当前EventLoop
属于哪一个事件循环组。EventLoopGroup.next()
:获取当前事件组中的下一个EventLoop
(线程)。
这些方法我们简单了解即可,因为大多数情况下在Netty
源码中才会用到,暂且无需关注太多,我们先把目光移到前面给出的Netty
使用案例中,还记得最开始定义的两个事件组吗?
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
为什么在服务端要定义两个组呢?一个难道不行吗?其实也是可以的,但定义两个组的好处在于:可以让Group
中的每个EventLoop
分工更加明确,不同的Group
分别处理不同类型的事件,各司其职。
在前面案例中,为服务端绑定了两个事件循环组,也就代表着会根据
ServerSocketChannel
上触发的不同事件,将对应的工作分发到这两个Group
中处理,其中boss
主要负责客户端的连接事件,而worker
大多数情况下负责处理客户端的IO
读写事件。
当客户端的SocketChannel
连接到来时,首先会将这个注册事件的工作交给boss
处理,boss
会调用worker.register()
方法,将这条客户端连接注册到worker
工作组中的一个EventLoop
上。前面提到过:EventLoop
内部会维护一个Selector
选择器,因此实际上也就是将客户端通道注册到其内部中的选择器上。
注意:将一个
Socket
连接注册到一个EventLoop
上之后,这个客户端连接则会和这个EventLoop
绑定,以后这条通道上发生的所有事件,都会交由这个EventLoop
处理。
到这里大家应该也理解了为何要拆出两个EventLoopGroup
,主要目的就在于分工更为明细。当然,由于EventLoopGroup
本质上可以理解成一个线程池,其中存在的线程资源自然是有限的,那此时如果到来的客户端连接大于线程数量怎么办呢?这是不影响的,因为Netty
本身是基于Java-NIO
封装的,而NIO
底层又是基于多路复用模型实现的,天生就能实现一条线程管理多个连接的功能,所以就算连接数大于线程数,也完全可以Hold
住。
OK~,除开可以根据事件类型划分Group
之外,也可以根据为每个处理器划分不同的事件组,如下:
// 创建EventLoopGroup和JDK原生的线程池一样,可以指定线程数量
EventLoopGroup extra = new NioEventLoopGroup(2);
sc.pipeline().addLast(extra, new xxxChannelHandler());
这样做的好处在于什么呢?因为前面提到过:一个连接注册到EventLoop
,之后所有的工作都会由这个EventLoop
处理,而一个EventLoop
又有可能同时管理多个连接,因此假设一条连接上的某个处理器,执行过程非常耗时,此时必然就会影响到这个EventLoop
管理的其他连接,因此对于一些较为耗时的Handler
,可以专门指派给一个额外的extra
事件组处理,这样就不会影响到所管理的其他连接。
当然,这个功能其实也略微有些鸡肋,一般多个
Handler
之间都会存在耦合关系,下一个Handler
需要依赖上一个Handler
的处理结果执行,因此也很难拆出来单独放到另一个事件组中执行。
看到这里,相信你对于EventLoopGroup、EventLoop
这两个组件应该有了基本认知,简单来说可以EventLoop
理解成有一条线程专门维护的Selector
选择器,而EventLoopGroup
则可以理解成一个有序的定时调度线程池,负责管理所有的EventLoop
。举个生活案例来加深印象:
现在有个工厂,其中分为了不同的片区,一个片区中有很多条流水线,由每个工人负责一部分流水线的作业。开始工作后,流水线的传输带会源源不断的将货物传递过来,这些货物最终会等待工人进行加工。
在上述这个例子中,工厂就是ServerBootstrap
服务端,而一个个片区就是不同的EventLoopGroup
事件组,一条流水线则可以理解成一个SocketChannel
客户端通道,而负责多条流水线的工人就是EventLoop
单线程执行器,加工的动作其实就是处理通道上发生的事件。
大家可以将这个例子套进去想象一下,相信这会让你印象更加深刻。
三、Netty中的增强版通道(ChannelFuture)
对于通道这个概念,相信诸位都不陌生,这也是Java-NIO、AIO
中的核心组件之一,而在Netty
中也对其做了增强和拓展。首先来看看通道类型,Netty
根据不同的多路复用函数,分别拓展出了不同的通道类型:
NioServerSocketChannel
:通用的NIO
通道模型,也是Netty
的默认通道。EpollServerSocketChannel
:对应Linux
系统下的epoll
多路复用函数。KQueueServerSocketChannel
:对应Mac
系统下的kqueue
多路复用函数。OioServerSocketChannel
:对应原本的BIO
模型,用的较少,一般用原生的。
当然,对于客户端的通道也可以选择TCP、UDP...
类型的,就不再介绍了,重点来看看Netty
中是如何对于通道类做的增强。
其实在
Netty
中,主要结合了JDK
提供的Future
接口,对通道类做了进一步增强。
增强的方面主要是支持了异步,但并非Future
那种伪异步,而是跟之前聊到过的《CompletableFuture》有些类似,支持异步回调处理结果。还记得之前客户端如何连接服务端的嘛?如下:
Bootstrap client = new Bootstrap();
client.connect("127.0.0.1", 8888);
但这个connect()
连接方法,本质上是一个异步方法,返回的并不是Channel
对象,而是一个ChannelFuture
对象,如下:
public ChannelFuture connect(String inetHost, int inetPort);
也包括ServerBootstrap
绑定地址的bind()
也相同,返回的并非ServerChannel
,也是一个ChannelFuture
对象。这是因为在Netty
的机制中,绑定/连接工作都是异步的,因此如果要用Netty
创建一个客户端连接,为了确保连接建立成功后再操作,通常情况下都会再调用.sync()
方法同步阻塞,直到连接建立成功后再使用通道写入数据,如下:
// 与服务端建立连接
ChannelFuture cf = client.connect("127.0.0.1", 8888);
// 同步阻塞至连接建立成功为止
cf.sync();
// 连接建立成功后再获取对应的Socket通道写入数据
cf.channel().writeAndFlush("...");
上述这种方式能够确保连接建立成功后再写数据,但既然Netty
中的绑定、连接等这些操作都是异步的,有没有办法让整个过程都是异步的呢?
答案是当然有,如何操作呢?
我们可以向ChannelFuture
中添加回调处理器,然后异步处理,如下:
ChannelFuture cf = client.connect("127.0.0.1", 8888);
cf.addListener((ChannelFutureListener) cfl -> {
// 这里可以用cf,也可以用cfl,返回的都是同一个channel通道
cf.channel().writeAndFlush("...");
});
当通过connect()
方法与服务端建立连接时,Netty
会将这个任务交给当前Bootstrap
绑定的EventLoopGroup
中的线程执行,因此建立连接的过程是异步的,所以会返还一个ChannelFuture
对象给我们,而此时可以通过该对象的addListener()
方法编写成功回调逻辑,当连接建立成功后,会由对应的线程来执行其中的代码,因此可以实现全过程的异步操作。
这样做,似乎的确实现了整个过程的异步,甚至关闭通道的过程也可以换成异步的,如下:
// 异步关闭Channel通道
ChannelFuture closeCF = cf.channel().closeFuture();
// 通道关闭后,添加对应的回调函数
closeCF.addListener((ChannelFutureListener) cfl -> {
// 关闭前面创建的EventLoopGroup事件组,也可以在这里做其他善后工作
worker.shutdownGracefully();
});
那Netty
中为何要将大量的操作都抽象成异步执行呢?这不是反而让逻辑更加复杂化吗?让发起连接、建立连接、发送数据、接收数据、关闭连接等一系列操作,全部交由调用的那条线程执行不可以吗?答案是可以的,但异步能在一定程度上提升性能,尤其是并发越高,带来的优势更为明显。
对于这段话大家估计会有疑惑,为什么能提升性能呢?下面举个例子理解。
3.1、为何Netty所有API都是异步式操作?
相信大家一定在生活中见过这样的场景:医院看病/体检、银行开户、政府办事、法院起诉、保险公司买保险等等,各类办理业务的地方,都会拿号办理,然后经过一个个的窗口办理不同的业务,那为什么要这么做呢?就拿常规的医院看病来说,为什么会分为如下步骤呢?
- 导诊处:先说明大致情况,导诊人员根据你的病理,指导你挂什么科的号。
- 挂号处:去到对应的病理科排队挂号(暂且不考虑缴费,假设网上缴挂号费)。
- 诊断室:跟着挂的号找到对应的科室,医生根据你的情况进行诊断。
- 化验处:从你身上提取一些标本,然后去到化验处等待化验结果。
- 缴费处:医生根据化验结果分析病情,然后给出具体的治疗方案,让你来缴费。
- 拿药/治疗处:交完相关的费用后,根据治疗方案进行拿药/治疗等处理措施。
有上述这些步骤实际上并不奇怪,问题是在于每个步骤都分为了专门的科室处理,因此以上述流程为例,至少需要有六个医生提供服务,那么为什么不专门由这六位医生专门提供全系列服务呢?如下:
我们分析一下,假设此时每个步骤平均要五分钟,一个病人的完整流程下来就需要半小时,而下一批预约看病的其他病人,则需要等待半小时后才能被受理,而把这些步骤拆开之后再来看看:
此时有六位医生各司其职,每位医生负责单一的工作,这样做的好处在于:每个挂号的病人只需要等待五分钟,就能够被受理,通过这种方式就将之前批次式看病,转变为了流水线式看病。
而
Netty
框架中的异步处理方式,也具备异曲同工之妙,将API
的操作从批处理转变成了流式处理。套入实际的业务中,也就是主线程(调用API
的线程)无需等待操作完成后再执行,而是调用某个API
后可继续往下执行,相较而言,在并发情况下能很大程度上提升程序性能。
但上述这个例子估计有些小伙伴照旧会犯迷糊,那接着再举个更加形象化的例子,好比快递小哥送货,如果以同步模式工作,将一个货物送达指定地点后,需要等待客户签收才能去送下个货物,这无疑会让下个客户等很久很久,并且也极其影响快递小哥的工作效率。
而采用异步模式工作,快递小哥将一个货物送达指定地点后,给对应客户发个信息后,就立马赶往下个客户的货物地点,前面的客户拿到货物后,再给快递小哥回个信息即可。在这种异步工作模式中,小哥无需在原地“阻塞”等待客户签收,只需要将手中一个个货物送达指定地点就行,这在很大程度上提升了整体工作效率,每个客户之间拿到货物的时间也大大缩短了,
Netty
框架中的异步思想也是同理。
3.2、ChannelFuture、Netty-Future、JDK-Future的关系
当大家试图翻阅ChannelFuture
的实现时,会发现该类继承了Future
接口:
public interface ChannelFuture extends Future<Void> {
// 省略内部方法.....
}
但要注意,这个Future
接口并非是JDK
原生的Future
接口,而是Netty
框架中的Future
接口:
package io.netty.util.concurrent;
public interface Future<V> extends java.util.concurrent.Future<V> {
// 省略内部方法.....
}
此时会发现,Netty-Future
又继承自JDK-Future
接口,这也就意味着Netty-Future
拓展了JDK-Future
接口的功能,在之前《并发编程-异步任务》中,咱们曾详细聊到过JDK
原生的Future
类,虽说基于Future+Callable
可以实现异步回调,但这种方式实现的异步回调则是一种“伪异步”,为啥呢?先来看看JDK-Future
提供的核心方法:
方法名 | 方法作用 |
---|---|
isDone() |
判断当前异步任务是否结束 |
cancel() |
取消当前异步任务 |
isCancel() |
判断当前异步任务是否被取消 |
get() |
阻塞等待当前异步任务执行完成 |
在JDK-Future
接口中,想要获取一个异步任务的执行结果,此时只能调用get()
方法,但该方法是一个阻塞方法,调用后会阻塞主线程直到任务结束为止,这显然依旧会导致异步变为同步执行,所以这种方式是一种“伪异步”,此时再来看看Netty-Future
中增强的核心方法:
方法名 | 方法作用 |
---|---|
getNow() |
非阻塞式获取任务结果,任务未执行完成时返回null |
sync() |
阻塞等待至异步任务执行结束,执行出错时会抛出异常 |
await() |
阻塞等待至异步任务执行结束,执行出错时不会抛出异常 |
isSuccess() |
判断任务是否执行成功,如果为true 代表执行成功 |
cause() |
获取任务执行出错时的报错信息,如果执行未出错,则返回null |
addLinstener() |
添加回调方法,异步任务执行完成后会主动执行回调方法中的代码 |
在原生JDK-Future
的基础上,Netty-Future
新增了一个异常检测机制,当异步任务执行出错时,可以通过cause()
方法处理异常,同时也基于回调模式,可通过addLinstener()
方法添加异步执行后的回调逻辑,从而让主线程创建任务后永远不会阻塞,做到了真正意义上的异步执行。
当然,除开基本的Future
接口外,Netty
框架中还有一个Promise
接口,该接口继承自Netty-Future
接口:
public interface Promise<V> extends Future<V> {
// 省略内部方法.....
}
这个接口中主要多拓展了两个方法:
方法名 | 方法作用 |
---|---|
setSuccess() |
设置任务的执行状态为成功 |
setFailure() |
设置任务的执行状态为失败 |
这两个方法可以用来设置异步任务的执行状态,因此Promise
接口除开具备Netty-Future
的功能外,还能作为多个线程之间传递异步任务结果的容器。
3.3、不同Future的效果测试
public class FutureDemo {
// 测试JDK-Future的方法
public static void jdkFuture() throws Exception {
System.out.println("--------JDK-Future测试--------");
// 创建一个JDK线程池用于执行异步任务
ExecutorService threadPool = Executors.newSingleThreadExecutor();
System.out.println("主线程:步骤①");
// 向线程池提交一个带有返回值的Callable任务
java.util.concurrent.Future<String> task =
threadPool.submit(() ->
"我是JDK-Future任务.....");
// 输出获取到的任务执行结果(阻塞式获取)
System.out.println(task.get());
System.out.println("主线程:步骤②");
// 关闭线程池
threadPool.shutdownNow();
}
// 测试Netty-Future的方法
public static void nettyFuture(){
System.out.println("--------Netty-Future测试--------");
// 创建一个Netty中的事件循环组(本质是线程池)
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
System.out.println("主线程:步骤①");
// 向线程池中提交一个带有返回值的Callable任务
io.netty.util.concurrent.Future<String> task =
eventLoop.submit(() ->
"我是Netty-Future任务.....");
// 添加一个异步任务执行完成之后的回调方法
task.addListener(listenerTask ->
System.out.println(listenerTask.getNow()));
System.out.println("主线程:步骤②");
// 关闭事件组(线程池)
group.shutdownGracefully();
}
// 测试Netty-Promise的方法
public static void nettyPromise() throws Exception {
System.out.println("--------Netty-Promise测试--------");
// 创建一个Netty中的事件循环组(本质是线程池)
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 主动创建一个传递异步任务结果的容器
DefaultPromise<String> promise = new DefaultPromise<>(eventLoop);
// 创建一条线程执行,往结果中添加数据
new Thread(() -> {
try {
// 主动抛出一个异常
int i = 100 / 0;
// 如果异步任务执行成功,向容器中添加数据
promise.setSuccess("我是Netty-Promise容器:执行成功!");
}catch (Throwable throwable){
// 如果任务执行失败,将异常信息放入容器中
promise.setFailure(throwable);
}
}).start();
// 输出容器中的任务结果
System.out.println(promise.get());
}
public static void main(String[] args) throws Exception {
jdkFuture();
nettyFuture();
nettyPromise();
}
}
在上述的测试类中,存在三个测试方法:
jdkFuture()
:测试JDK-Future
的方法。nettyFuture()
:测试Netty-Future
的方法。nettyPromise()
:测试Netty-Promise
的方法。
接着启动对应的类,来看看控制台的输出结果:
--------JDK-Future测试--------
主线程:步骤①
我是JDK-Future任务.....
主线程:步骤②
--------Netty-Future测试--------
主线程:步骤①
主线程:步骤②
我是Netty-Future任务.....
--------Netty-Promise测试--------
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zero
........
首先来对比一下JDK-Future、Netty-Future
两者之间的差别,在使用JDK-Future
时,想要获取异步任务的执行结果,调用get()
方法后会阻塞主线程,也就是主线程的步骤②,需要等到异步任务执行完成后才会继续执行,因此输出结果为:
--------JDK-Future测试--------
主线程:步骤①
我是JDK-Future任务.....
主线程:步骤②
但此时再来看看Netty-Future
,因为在内部咱们提交异步任务后,就立即通过addListener()
添加了一个回调,这个回调方法会在异步任务执行结束后调用,咱们将获取任务结果的工作,放入到了回调方法中完成,此时会观测到,获取Netty-Future
的执行结果并不会阻塞主线程:
--------Netty-Future测试--------
主线程:步骤①
主线程:步骤②
我是Netty-Future任务.....
而对于Netty-Promise
的使用就无需过多讲解,也就是可以根据异步任务的执行状态,向Promise
对象中设置不同的结果,在前面的多线程中,由于主动制造了异常,所以最终会进入catch
代码块,执行setFailure()
向容器中填充异常信息。
四、核心组件 - 通道处理器(Handler)
Handler
可谓是整个Netty
框架中最为重要的一部分,它的职责主要是用于处理Channel
通道上的各种事件,所有的处理器都可被大体分为两类:
- 入站处理器:一般都是
ChannelInboundHandlerAdapter
以及它的子类实现。 - 出站处理器:一般都是
ChannelOutboundHandlerAdapter
以及它的子类实现。
在系统中网络操作都通常会分为入站和出站两种,所谓的入站即是指接收请求,反之,所谓的出站则是指返回响应,而Netty
中的入站处理器,会在客户端消息到来时被触发,而出站处理器则会在服务端返回数据时被触发,接着来展开聊一聊。
4.1、入站处理器与出站处理器
前面讲明白了入站、出站的基本概念,接着来简单认识一下Netty
中的入站处理器,这里先上个案例:
// 服务端
public class HandlerServer {
public static void main(String[] args) {
// 0.准备工作:创建一个事件循环组、一个ServerBootstrap服务端
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server
// 1.绑定前面创建的事件循环组
.group(group)
// 2.声明通道类型为服务端NIO通道
.channel(NioServerSocketChannel.class)
// 3.通过ChannelInitializer完成通道的初始化工作
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nsc) throws Exception {
// 4.获取通道的ChannelPipeline处理器链表
ChannelPipeline pipeline = nsc.pipeline();
// 5.基于pipeline链表向通道上添加入站处理器
pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("俺是第一个入站处理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("我是第二个入站处理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-③",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("朕是第三个入站处理器...");
}
});
}
})
// 为当前启动的服务端绑定IP和端口地址
.bind("127.0.0.1",8888);
}
}
// 客户端
public class HandlerClient {
public static void main(String[] args) {
// 0.准备工作:创建一个事件循环组、一个Bootstrap启动器
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client
// 1.绑定事件循环组
.group(group)
// 2.声明通道类型为NIO客户端通道
.channel(NioSocketChannel.class)
// 3.初始化通道,添加一个UTF-8的编码器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc)
throws Exception {
// 添加一个编码处理器,对数据编码为UTF-8格式
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
}
});
// 4.与指定的地址建立连接
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 5.建立连接成功后,向服务端发送数据
System.out.println("正在向服务端发送信息......");
cf.channel().writeAndFlush("我是<竹子爱熊猫>!");
} catch (Exception e){
e.printStackTrace();
} finally {
// 6.最后关闭事件循环组
group.shutdownGracefully();
}
}
}
在上述案例的服务端代码中,启动服务端时为其添加了In-①、In-②、In-③
这三个入站处理器,接着编写了一个客户端,其内部主要是向服务端发送了一条数据,运行结果如下:
俺是In-①入站处理器...
我是In-②入站处理器...
朕是In-③入站处理器...
此时大家观察结果会发现,入站处理器的执行顺序,会按照添加的顺序执行,两个过滤器之间,依靠super.channelRead(ctx, msg);
这行代码来实现向下调用的逻辑,这和之前Servlet
中的过滤器相差无几。
除开上述重写的channelRead()
方法外,入站处理器中还有很多其他方法可以重写,每个方法都对应着一种事件,会在不同时机下被触发,如下:
// 会在当前Channel通道注册到选择器时触发(与EventLoop绑定时触发)
public void channelRegistered(ChannelHandlerContext ctx) ...
// 会在选择器移除当前Channel通道时触发(与EventLoop解除绑定时触发)
public void channelUnregistered(ChannelHandlerContext ctx) ...
// 会在通道准备就绪后触发(Pipeline处理器添加完成、绑定EventLoop后触发)
public void channelActive(ChannelHandlerContext ctx) ...
// 会在通道关闭时触发
public void channelInactive(ChannelHandlerContext ctx) ...
// 会在收到客户端数据时触发(每当有数据时都会调用该方法,表示有数据可读)
public void channelRead(ChannelHandlerContext ctx, Object msg) ...
// 会在一次数据读取完成后触发
public void channelReadComplete(ChannelHandlerContext ctx) ...
// 当通道上的某个事件被触发时,这个方法会被调用
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) ...
// 当通道的可写状态发生改变时被调用(一般在发送缓冲区超出限制时调用)
public void channelWritabilityChanged(ChannelHandlerContext ctx) ...
// 当通道在读取过程中抛出异常时,当前方法会被触发调用
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) ...
接着再来看看出站处理器,这回基于上述案例做些许改造即可,也就是再通过pipeline.addLast()
方法多添加几个处理器,但处理器的类型为ChannelOutboundHandlerAdapter
,如下:
// 基于pipeline链表向通道上添加出站处理器
pipeline.addLast("Out-A",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("在下是Out-A出站处理器...");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("Out-B",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("鄙人是Out-B出站处理器...");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("Out-C",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("寡人是Out-C出站处理器...");
super.write(ctx, msg, promise);
}
});
根据原本入站处理器的执行逻辑,是不是理论上执行顺序为Out-A、Out-B、Out-C
?先看运行结果:
此时观察结果可明显看到,在通道上添加的出站处理器压根没被触发呀,这是为何呢?这要说回前面聊到的出站概念,出站是指响应过程,也意味着出站处理器是在服务端返回数据时被触发的,而案例中并未向客户端返回数据,显然就不会触发出站处理器,所以此时咱们在In-③
入站处理器中,多加几行代码:
// 利用通道向客户端返回数据
ByteBuf resultMsg = ctx.channel().alloc().buffer();
resultMsg.writeBytes("111".getBytes());
nsc.writeAndFlush(resultMsg);
此时再运行案例,就会看到如下结果:
俺是In-①入站处理器...
我是In-②入站处理器...
朕是In-③入站处理器...
寡人是Out-C出站处理器...
鄙人是Out-B出站处理器...
在下是Out-A出站处理器...
此时注意看,结果和预料的不同,呈现的顺序并非Out-A、Out-B、Out-C
,而是Out-C、Out-B、Out-A
,这是啥原因呢?为什么与添加顺序反过来了?这其实跟pipeline
处理器链表有关,等会儿再聊聊pipeline
这个概念,先来看看出站处理器中的其他方法:
// 当通道调用bind()方法时触发(当Channel绑定端口地址时被调用,一般用于客户端通道)
public void bind(...) ...
// 当通道调用connect()方法,连接到远程节点/服务端时触发(一般也用于客户端通道)
public void connect(...) ...
// 当客户端通道调用disconnect()方法,与服务端断开连接时触发
public void disconnect(...) ...
// 当客户端通道调用close()方法,关闭连接时触发
public void close(...) ...
// 当通道与EventLoop解除绑定时触发
public void deregister(...) ...
// 当通道中读取多次数据时被调用触发
public void read(...) ...
// 当通道中写入数据时触发
public void write(...) ...
// 当通道中的数据被Flush给对端节点时调用
public void flush(...) ...
对于出站/入站处理器的这些其他方法/事件,大家可根据业务的不同,选择重写不同的方法,其中每个不同的方法,其触发时机也不同,因此可以在适当的位置重写方法,作为业务代码的切入点。
4.2、pipeline处理器链表
如果接触Netty
框架的小伙伴应该对这玩意儿不陌生,如果没接触过也无关紧要,其实它也并非是特别难懂的概念,一个处理器被称为Handler
,而一个Handler
添加到一个通道上之后,则被称之为ChannelHandler
,而一个通道上的所有ChannelHandler
全部连接起来,则被称之为ChannelPipeline
处理器链表。
以上述给出的案例来说,其内部形成的ChannelPipeline
链表如下:
pipeline
本质上是一个双向链表,同时具备head、tail
头尾节点,每当调用pipeline.addLast()
方法添加一个处理器时,就会将处理器封装成一个节点,然后加入pipeline
链表中:
- 当接收到客户端的数据时,
Netty
会从Head
节点开始依次往后执行所有入站处理器。 - 而当服务端返回数据时,
Netty
会从Tail
节点开始依次向前执行所有入站处理器。
理解上述过程后,大家应该就理解了之前出站处理器的执行顺序,为何是Out-C、Out-B、Out-A
,因为出站处理器是以Tail
尾节点开始,向前依次执行的原因造成的,那处理器的作用是干嘛的呢?举个例子大家就懂了。
这里假设
Netty
的服务端是一个饲料加工厂,客户端则是原料供应商,连接两者之间的通道就相当于一条条的流水线,而客户端发送的数据相当于原料。\
在一条流水线上,玉米、豆粕、小麦....等原料不可能啥也不干,直接从头传到尾,如果原料想要加工成某款私聊,显然需要经过一道道工序,而处理器则是这一道道工序。\
比如原料刚传进来时,首先要将其粉碎成颗粒,接着需要将其碾压成粉末,最后需要按照配方比例进行混合,才能形成按配方制成的饲料。在这个过程中,原料进入加工厂后,经过的一道道工序则可以被称为入站处理器。\
而原料被加工成饲料后,想要对外出售,还需要先装入一个个的饲料袋,然后将饲料袋进行封口,最后印上生产日期与厂家,才能打包成最终的商用饲料对外出售。而该过程中的一道道工序,则可被理解成是一个个出站处理器。
在上述的例子中,一个加工厂的流水线上,存在着一道道工序,经过依次处理后,能够将原料加工成最终商品。Netty
中亦是同理,对于客户端和服务端之间的数据,可以通过处理器,完成一系列核心处理,如转换编码格式、对数据进行序列化、对数据进行加/解密等操作。
4.3、自定义出/入站处理器
前面简单讲明白了一些关于Netty
处理器的知识,但实际开发过程中,为了更好的代码阅读性,以及代码的维护性,通常pipeline.addLast
并不会直接new
接口,而是自己定义处理器类,然后继承对应的父类,如下:
// 自定义的入站处理器
public class ZhuziHandler extends ChannelInboundHandlerAdapter {
public ZhuziHandler() {
super();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 在这里面编写处理入站msg的核心代码.....
// (如果要自定义msg的处理逻辑,请记住去掉下面这行代码)
super.channelRead(ctx, msg);
}
}
对于入站处理器而言,主要重写其channelRead()
方法即可,该方法会在消息入站时被调用,可以在其中完成对数据的复杂处理,而自定义处理器完成后,想要让该处理器生效,请记得将其绑定到对应的通道上,如下:
pipeline.addLast("In-X", new ZhuziHandler());
与入站处理器相反的出站处理器亦是同理,只不过将父类实现换成ChannelInboundHandlerAdapter
,并且重写其write()
方法即可,这样所有消息(数据)出站时,都会调用该方法。
最后,不仅仅处理器可以单独抽出来实现,而且对于通道的初始化器,也可以单独抽出来实现,如下:
// 自定义的通道初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置编码器、解码器、处理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ZhuziHandler());
}
}
这样写能够让代码的整洁性更强,并且可以统一管理通道上的所有出/入站处理器,而服务端的代码改成下述方式即可:
server
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.bind("127.0.0.1",8888);
五、Netty重构后的缓冲区(ByteBuf)
在之前讲《JavaIO体系-NIO》的时候曾聊到过它的三大件,其中就包含了ByteBuffer
,其作用主要是用来作为服务端和客户端之间传输数据的容器,NIO
中的ByteBuffer
支持使用堆内存、本地(直接)内存来创建,而Netty-ByteBuf
也同样如此,如下:
ByteBufAllocator.DEFAULT.heapBuffer(cap)
:使用堆内存来创建ByteBuf
对象。ByteBufAllocator.DEFAULT.directBuffer(cap)
:使用本地内存来创建ByteBuf
对象。
基于堆内存创建的ByteBuf
对象会受到GC
机制管理,在发生GC
时需要来回移动Buffer
对象,同时之前在NIO
中也聊到过堆、本地内存的区别,如下:
通常本地内存的读写效率都会比堆内存高,因为OS
可以直接操作本地内存,而堆内存在读写数据时,则需要多出一步内存拷贝的动作,总结如下:
- 堆内存因为直接受到
JVM
管理,所以在Java
程序中创建时,分配效率较高,但读写效率低。 - 本地内存因为
OS
可直接操作,所以读写效率高,但由于创建时,需要向OS
额外申请,分配效率低。
但上述聊到的这些特征,NIO
的Buffer
也具备,那Netty
对于Buffer
缓冲区到底增强了什么呢?主要是三方面:Buffer
池化技术、动态扩容机制、零拷贝实现。
5.1、ByteBuf缓冲区池化技术
池化这个词汇大家应该都不陌生,Java
线程池、数据库连接池,这些都是池化思想的产物,一般系统中较为珍贵的资源,都会采用池化技术来缓存,以便于下次需要时可直接使用,而无需经过繁琐的创建过程。
前面聊到过,
Netty
默认会采用本地内存创建ByteBuf
对象,而本地内存因为不是操作系统分配给Java
程序使用的,所以基于本地内存创建对象时,则需要额外单独向OS
申请,这个过程自然开销较大,在高并发情况下,频繁的创建、销毁ByteBuf
对象,一方面会导致性能降低,同时还有可能造成OOM
的风险(使用完没及时释放,内存未归还给OS
的情况下会出现内存溢出)。
而使用池化技术后,一方面能有效避免OOM
问题产生,同时还可以省略等待创建缓冲区的时间,那Netty
中的池化技术,什么时候会开启呢?这个要分平台!
Android
系统默认会采用非池化技术,而其他系统,如Linux、Mac、Windows
等会默认启用。
但上述这条原则是Netty4.1
版本之后才加入的,因为4.1
之前的版本,其内部的池化技术还不够完善,所以4.1
之前的版本默认会禁用池化技术。当然,如果你在某些平台下想自行决定是否开启池化,可通过下述参数控制:
-Dio.netty.allocator.type=unpooled
:关闭池化技术。-Dio.netty.allocator.type=pooled
:开启池化技术。
这两个参数直接通过JVM
参数的形式,在启动Java
程序时指定即可。如果你想要查看自己创建的ByteBuf
对象,是否使用了池化技术,可直接打印对象的Class
即可,如下:
// 查看创建的缓冲区是否使用了池化技术
private static void byteBufferIsPooled(){
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());
}
public static void main(String[] args) {
byteBufferIsPooled();
}
/* *
* 输出结果:
* class io.netty.buffer.PooledUnsafeDirectByteBuf
* */
从输出的结果中的类名可看出,如果是以Pooled
开头的类名,则表示当前ByteBuf
对象使用池化技术,如若是以Unpooled
开头的类名,则表示未使用池化技术。
5.2、ByteBuf动态扩容机制
在前面聊《JavaNIO-Buffer缓冲区》的时候曾简单聊过NIO
的Buffer
源码,其内部的实现有些傻,每个Buffer
对象都拥有一根limit
指针,这根指针用于控制读取/写入模式,因此在使用NIO-Buffer
时,每次写完缓冲区后,都需要调用flip()
方法来反转指针,以此来确保NIO-Buffer
的正常读写。
由于Java-NIO
中的Buffer
设计有些缺德,因此在使用NIO
的原生Buffer
对象时,就显得额外麻烦,必须要遵从如下步骤:
- ①先创建对应类型的缓冲区
- ②通过
put
这类方法往缓冲区中写入数据 - ③调用
flip()
方法将缓冲区转换为读模式 - ④通过
get
这类方法从缓冲区中读取数据 - ⑤调用
clear()、compact()
方法清空缓冲区数据
而正是由于Java-NIO
原生的Buffer
设计的不合理,因此Netty
中直接重构了整个缓冲区组件,在Netty-ByteBuf
中,存在四个核心属性:
initialCapacity
:初始容量,创建缓冲区时指定的容量大小,默认为256
字节。maxCapacity
:最大容量,当初始容量不足以供给使用时,ByteBuf
的最大扩容限制。readerIndex
:读取指针,默认为0
,当读取一部分数据时,指针会随之移动。writerIndex
:写入指针,默认为0
,当写入一部分数据时,指针会随之移动。
首先来说说和NIO-Buffer
的两个主要区别:首先将原本一根指针变为了两根,分别对应读/写操作,这样就保障了使用ByteBuf
时,无需每次读写数据时手动翻转模式。同时加入了一个最大容量限制,在创建的ByteBuf
无法存下数据时,允许在最大容量的范围内,对ByteBuf
进行自动扩容,下面上个图理解:
上图中模拟了使用ByteBuf
缓冲区的过程,在创建时会先分配一个初始容量,这个容量可以自己指定,不指定默认为256
,接着会去创建出对应容量的缓冲区,最初读写指针都为0
,后续会随着使用情况不断变化。
这里重点观察最后一个状态,在真正使用过程中,一个ByteBuf
会被分为四个区域:
- 已废弃区域:这是指已经被读取过的数据区域,因为其中的数据已被使用,所以属于废弃区域。
- 可读取区域:这主要是指被写入过数据,但还未读取的区域,这块区域的数据都可被读取使用。
- 可写入区域:这主要是指写入指针和容量之间的区域,意味着这块区域是可以被写入数据的。
- 可扩容区域:这主要是指容量和最大容量之间的区域,代表当前缓冲区可扩容的范围。
ByteBuf
的主要实现位于AbstractByteBuf
这个子类中,但内部还有两根markedReaderIndex、markedWriterIndex
标记指针,这两根指针就类似于NIO-Buffer
中的mark
指针,这里就不做重复赘述。下面上个案例简单实验一下BtyeBuf
的自动扩容特性,代码如下:
// 测试Netty-ByteBuf自动扩容机制
private static void byteBufCapacityExpansion() {
// 不指定默认容量大小为16
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println("测试前的Buffer容量:" + buffer);
// 使用StringBuffer来测试ByteBuf的自动扩容特性
StringBuffer sb = new StringBuffer();
// 往StringBuffer中插入17个字节的数据
for (int i = 0; i < 17; i++) {
sb.append("6");
}
// 将17个字节大小的数据写入缓冲区
buffer.writeBytes(sb.toString().getBytes());
printBuffer(buffer);
}
在这个测试自动扩容的方法中,最后用到了一个printBuffer()
方法来打印缓冲区,这是自定义的一个输出方法,也就基于Netty
自身提供的Dump
方法实现的,如下:
// 打印ByteBuf中数据的方法
private static void printBuffer(ByteBuf buffer) {
// 读取ByteBuffer已使用的字节数
int byteSize = buffer.readableBytes();
// 基于byteSize来计算显示的行数
int rows = byteSize / 16 + (byteSize % 15 == 0 ? 0 : 1) + 4;
// 创建一个StringBuilder用来显示输出
StringBuilder sb = new StringBuilder(rows * 80 * 2);
// 获取缓冲区的容量、读/写指针信息放入StringBuilder
sb.append("ByteBuf缓冲区信息:{");
sb.append("读取指针=").append(buffer.readerIndex()).append(", ");
sb.append("写入指针=").append(buffer.writerIndex()).append(", ");
sb.append("容量大小=").append(buffer.capacity()).append("}");
// 利用Netty框架自带的格式化方法、Dump方法输出缓冲区数据
sb.append(StringUtil.NEWLINE);
ByteBufUtil.appendPrettyHexDump(sb, buffer);
System.out.println(sb.toString());
}
接着在main
方法中调用并运行,如下:
public static void main(String[] args) {
byteBufCapacityExpansion();
}
/* * 运行结果:
*
* 测试前的Buffer容量:PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16)
* ByteBuf缓冲区信息:{读取指针=0, 写入指针=17, 容量大小=64}
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 |6666666666666666|
* |00000010| 36 |6 |
* +--------+-------------------------------------------------+----------------+
* */
先来观察最初的容量:cap=16
,因为这是咱们显示指定的初始容量,接着向该ByteBuf
中插入17
个字节数据后,会发现容量自动扩展到了64
,但如果使用NIO-Buffer
来进行这样的操作,则会抛出异常。同时最后还把缓冲区中具体的数据打印出来了,这个是利用Netty
自带的appendPrettyHexDump()
方法实现的,中间是字节值,后面是具体的值,这里就不做过多阐述~
5.3、 Netty中的读写API
首先在讲述Netty-ByteBuf
的读写API
之前,咱们再说清楚一点与NIO-Buffer
的区别,不知大家是否还记得我在之前NIO
中聊到的一点:
其实这也是NIO-Buffer
设计不合理的一个地方,当你想要向缓冲区中写入不同类型的数据,要么得自己手动转换成Byte
字节类型,要么得new
一个对应的子实现,所以整个实现就较为臃肿,大家可以点进Java.nio
包看一下,你会看到下述场景:
这里的类关系,大家一眼看过去明显会感觉头大,基本上实现都大致相同,但针对于每个数据类型,都编写了对应的实现类,而Netty
的作者显然意识到了这点,因此并未提供多种数据类型的缓冲区,仅提供了ByteBuf
这一种缓冲区,Why
?
其实道理十分简单,因为计算机上的所有数据资源,在底层本质上都是
0、1
形成的字节数据,所以只提供Byte
类型的ByteBuf
缓冲区就够了,毕竟它能够存储所有类型的数据,同时为了便于写入其他类型的数据,如Int、boolean、long....
,Netty
框架中也对外提供了相关的写入API
,接着一起来看看。
// Netty-ByteBuf抽象类
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
// 写入boolean数据的方法,内部使用一个字节表示,0=false、1=true
public abstract ByteBuf writeBoolean(boolean var1);
// 写入字节数据的方法
public abstract ByteBuf writeByte(int var1);
// 大端写入Short数据的方法
public abstract ByteBuf writeShort(int var1);
// 小端写入Short数据的方法
public abstract ByteBuf writeShortLE(int var1);
// 下述方法和写Short类型的方法仅类型不同,都区分了大小端,不再重复注释
public abstract ByteBuf writeMedium(int var1);
public abstract ByteBuf writeMediumLE(int var1);
public abstract ByteBuf writeInt(int var1);
public abstract ByteBuf writeIntLE(int var1);
public abstract ByteBuf writeLong(long var1);
public abstract ByteBuf writeLongLE(long var1);
public abstract ByteBuf writeChar(int var1);
public abstract ByteBuf writeFloat(float var1);
public ByteBuf writeFloatLE(float value) {
return this.writeIntLE(Float.floatToRawIntBits(value));
}
public abstract ByteBuf writeDouble(double var1);
public ByteBuf writeDoubleLE(double value) {
return this.writeLongLE(Double.doubleToRawLongBits(value));
}
// 将另一个ByteBuf对象写入到当前缓冲区
public abstract ByteBuf writeBytes(ByteBuf var1);
// 将另一个ByteBuf对象的前N个长度的数据,写入到当前缓冲区
public abstract ByteBuf writeBytes(ByteBuf var1, int var2);
// 将另一个ByteBuf对象的指定范围数据,写入到当前缓冲区
public abstract ByteBuf writeBytes(ByteBuf var1, int var2, int var3);
// 向缓冲区中写入一个字节数组
public abstract ByteBuf writeBytes(byte[] var1);
// 向缓冲区中写入一个字节数组中,指定范围的数据
public abstract ByteBuf writeBytes(byte[] var1, int var2, int var3);
// 将一个NIO的ByteBuffer数据写入到当前ByteBuf对象
public abstract ByteBuf writeBytes(ByteBuffer var1);
// 将一个输入流中的数据写入到当前缓冲区
public abstract int writeBytes(InputStream var1, int var2)
throws IOException;
// 将一个NIO的ScatteringByteChannel通道中的数据写入当前缓冲区
public abstract int writeBytes(ScatteringByteChannel var1, int var2)
throws IOException;
// 将一个NIO的文件通道中的数据写入当前缓冲区
public abstract int writeBytes(FileChannel var1, long var2, int var4)
throws IOException;
// 将一个任意字符类型的数据写入缓冲区(CharSequence是所有字符类型的老大)
public abstract int writeCharSequence(CharSequence var1, Charset var2);
// 省略其他写入数据的API方法........
}
上面列出了Netty-ByteBuf
中常用的写入方法,其实大家在这里就能明显观察出与NIO
的区别,NIO
是为不同数据类型提供了不同的实现类,而Netty
则仅仅只是为不同类型,提供了不同的API
方法,显然后者的做法更佳,因为整体的代码结构会更为优雅。
这里主要说一下大端写入和小端写入的区别,从前面的
API
列表中,大家可以看到,Netty
为每种数据类型,都提供了一个结尾带LE
的写入方法,这个带LE
的方法则是小端写入方法,那么大小端之间有何差异呢?
大小端写入是网络编程中的通用概念,因为网络数据传输过程中,所有的数据都是以二进制的字节格式传输的,而所谓的大端(Big Endian
)写入,是指先写高位,再写低位,高低位又是什么意思呢?
- 高位写入:指从前往后写,例如
1
这个数字,比特位形式为00000000 00000001
。 - 低位写入:指从后往前写,依旧是
1
这个数字,比特位形式为00000001 00000000
。
这里不了解的小伙伴又会疑惑:为啥高位写入时,1
在最后面呀?这是因为要先写0
,再写1
的原因导致的。而反过来。所谓的小端(Little Endian
)写入,也就是指先写低位,再写高位。默认情况下,网络通信会采用大端写入的模式。
简单了解Netty-ByteBuf
写入数据的API
后,接着再来看一些读取数据的API
方法,如下:
// 一系列read开头的读取方法,这种方式会改变读取指针(区分大小端)
public abstract boolean readBoolean();
public abstract byte readByte();
public abstract short readUnsignedByte();
public abstract short readShort();
public abstract short readShortLE();
public abstract int readUnsignedShort();
public abstract int readUnsignedShortLE();
public abstract int readMedium();
public abstract int readMediumLE();
public abstract int readUnsignedMedium();
public abstract int readUnsignedMediumLE();
public abstract int readInt();
public abstract int readIntLE();
public abstract long readUnsignedInt();
public abstract long readUnsignedIntLE();
public abstract long readLong();
public abstract long readLongLE();
public abstract char readChar();
public abstract float readFloat();
// 省略其他的read方法.....
// 一系列get开头的读取方法,这种方式不会改变读取指针(区分大小端)
public abstract boolean getBoolean(int var1);
public abstract byte getByte(int var1);
public abstract short getUnsignedByte(int var1);
public abstract short getShort(int var1);
public abstract short getShortLE(int var1);
public abstract int getUnsignedShort(int var1);
public abstract int getUnsignedShortLE(int var1);
public abstract int getMedium(int var1);
public abstract int getMediumLE(int var1);
public abstract int getUnsignedMedium(int var1);
public abstract int getUnsignedMediumLE(int var1);
public abstract int getInt(int var1);
public abstract int getIntLE(int var1);
public abstract long getUnsignedInt(int var1);
public abstract long getUnsignedIntLE(int var1);
public abstract long getLong(int var1);
public abstract long getLongLE(int var1);
public abstract char getChar(int var1);
public abstract float getFloat(int var1);
// 省略其他的get方法.....
在上面列出的一系列读取方法中,主要可分为read、get
两大类方法:
readXXX()
:这种方式读取数据后,会导致ByteBuf
内部的读取指针随之移动。getXXX()
:这种方式读取数据后,不会改变ByteBuf
内部的读取指针。
那么读取指针改变之后会出现什么影响呢?大家还记得前面聊到的ByteBuf
的四部分嘛?前面讲过,读取指针之前的数据部分,都会被标记为废弃部分,这也就意味着通过read
系列的方式读取一段数据后,会导致这些数据无法再次被读取到,这里来做个实验:
// 测试ByteBuf的read、get、mark功能
private static void bufferReader(){
// 分配一个初始容量为10的缓冲区
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 向缓冲区中写入10个字符(占位十个字节)
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(i);
}
buffer.writeBytes(sb.toString().getBytes());
// 使用read方法读取前5个字节数据
printBuffer(buffer);
buffer.readBytes(5);
printBuffer(buffer);
// 再使用get方法读取后五个字节数据
buffer.getByte(5);
printBuffer(buffer);
}
public static void main(String[] args) {
bufferReader();
}
在上面的循环中,我是通过StringBuffer
来作为缓冲区的数据,但为何不直接写入int
数据呢?这是因为int
默认会占四个字节,而StringBuffer
底层是char
,一个字符只占用一个字节~,这里是一个小细节,接着来看看运行结果:
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
从上述结果中可看出,使用readBytes()
方法读取五个字节后,读取指针会随之移动到5
,接着看看前后的数据变化,此时会发现数据从0123456789
变成了56789
,这是因为前面五个字节的数据,已经属于废弃部分了,所以printBuffer()
方法无法读取显示。
接着再看看后面,通过
getByte()
读取五个字节后,此时ByteBuf
对象的读取指针,显然不会随之移动,也就是通过get
系列方法读取缓冲区数据,并不会导致读过的数据废弃。
那如果使用read
系列方法读取数据后,后续依旧想要读取数据该怎么办呢?这里可以使用ByteBuf
内部的标记指针实现,如下:
// 在上述方法的最后继续追加下述代码:
// 使用mark标记一下读取指针,然后再使用read方法读取数据
buffer.markReaderIndex();
buffer.readBytes(5);
printBuffer(buffer);
// 此时再通过reset方法,使读取指针恢复到前面的标记位置
buffer.resetReaderIndex();
printBuffer(buffer);
此时再次查询运行结果,如下:
ByteBuf缓冲区信息:{读取指针=10, 写入指针=10, 容量大小=10}
ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
从结果中可以明显看到,对读取指针做了标记后,再次使用read
系列方法读取数据,依旧会导致读过的部分变为废弃数据,但后续可以通过reset
方法,将读取指针恢复到前面的标记位置,然后再次查看缓冲区的数据,就会发现数据又可以重复被读取啦~
其实除开可以通过
markReaderIndex()、resetReaderIndex()
方法标记、恢复读取指针外,还可以通过markWriterIndex()、resetWriterIndex()
方法来标记、恢复写入指针。标记读取指针后,可以让缓冲区中的一段数据被多次read
读取,而标记写入指针后,可以让缓冲区的一段区间被反复写入,但每次后面的写入会覆盖前面写入的数据。
OK~,对于ByteBuf
的API
操作就介绍到这里,其实内部提供了一百多个API
方法,但我就不一一去做说明啦,大家点进源码后就能看到,感兴趣的小伙伴可以自行调试!
5.4、ByteBuf的内存回收
在前面聊到过,Netty-ByteBuf
在除安卓平台外,都会使用池化技术来创建,那一个已创建出的ByteBuf
对象,其占用的内存在什么情况下会归还给内存池呢?想要聊明白这点,得先理解ByteBuf
的引用释放。
学习过
JVM-GC
机制的小伙伴应该知道,JVM
中使用的对象存活判定法是根可达算法,而在此之前的一种常用算法被称之为《引用计数法》,但由于该算法存在循环引用的问题,所以并不适合作为自动判定存活的算法,但Netty-ByteBuf
中恰恰使用了这种算法。
首先来看看Netty-ByteBuf
的类关系:
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
从上面的类定义中可明显看到,ByteBuf
实现了ReferenceCounted
接口,该接口翻译过来的含义则是引用计数,该接口中提供的方法列表如下:
public interface ReferenceCounted {
// 查看一个对象的引用计数统计值
int refCnt();
// 对一个对象的引用计数+1
ReferenceCounted retain();
// 对一个对象的引用计数+n
ReferenceCounted retain(int var1);
// 记录当前对象的当前访问位置,内存泄漏时会返回该方法记录的值
ReferenceCounted touch();
ReferenceCounted touch(Object var1);
// 对一个对象的引用计数-1
boolean release();
// 对一个对象的引用计数-n
boolean release(int var1);
}
重点关注retain()、release()
方法,这两个方法分别对应加/减一个对象的引用计数,把ByteBuf
套入进来,当一个缓冲区对象的引用计数为0
时,会清空当前缓冲区中的数据,并且将占用的内存归还给内存池,所有尝试再次访问该ByteBuf
对象的操作,都会被拒绝。简单来说,一句话总结就是:当一个ByteBuf
对象的引用计数变为0
时,该缓冲区就会变为外部不可访问的状态。
综上所述,在使用完一个ByteBuf
对象后,明确后续不会用到该对象时,一定要记得手动调用release()
清空引用计数,否则会导致该缓冲区长久占用内存,最终引发内存泄漏。
这里拓展一点小细节,似乎在
Netty-Channel
中,都会采用ByteBuf
来发送/接收数据,那这些通道传输数据用的ByteBuf
对象,其占用的内存会在何时回收呢?这会牵扯到前面的ChannelPipeline
链表。
还记得这幅通道处理器链表图嘛?在其中有两个特殊的处理器,即Head、Tail
处理器:
Head
处理器:- 如果通道上只有入站处理器,它会作为整个处理器链表的第一个处理器调用。
- 如果通道上只有出站处理器,它会作为整个处理器链表的最后一个处理器调用。
- 如果通道上入/出站处理器都有,它会作为入站的第一个处理调用,出站的最后一个处理器调用。
Tail
处理器:- 如果通道上只有入站处理器,
Tail
节点会作为整个链表的最后一个处理器调用。 - 如果通道上只有出站处理器,
Tail
节点会作为整个链表的第一个处理器调用。 - 如果通道上入/出站处理器都有,它会作为出站的第一个调用、入站的最后一个调用。
- 如果通道上只有入站处理器,
结合上面所说的内容,Head、Tail
处理器在任何情况下,其中至少会有一个,作为通道上的最后一个处理器调用,而在这两个头尾处理器中,会自动释放ByteBuf
的工作,先来看看Head
处理器,源码如下:
// ChannelPipeline处理器链表的默认实现类
public class DefaultChannelPipeline implements ChannelPipeline {
// Head处理器的实现类:同时实现了入站、出站处理器接口
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
// 作为入站链表第一个处理器时,会调用的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 继续往下调用其他自定义的入站处理器
ctx.fireChannelRead(msg);
}
// 作为出站链表的最后一个处理器时,会调用的方法
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
// unsafe.write()最终会调用到AbstractUnsafe.write()方法
this.unsafe.write(msg, promise);
}
}
// 省略其他方法....
}
public abstract class AbstractChannel extends DefaultAttributeMap
implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
public final void write(Object msg, ChannelPromise promise) {
this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 这里先不需要理解,后续源码篇会聊
if (outboundBuffer == null) {
this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.
this.initialCloseCause));
// 最终在这里,依旧调用了引用计数工具类的release方法
ReferenceCountUtil.release(msg);
} else {
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable var6) {
this.safeSetFailure(promise, var6);
// 这里也会调用了引用计数工具类的release方法
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
}
// 省略其他方法....
}
// 省略其他类与方法....
}
// 引用计数工具类
public final class ReferenceCountUtil {
public static boolean release(Object msg) {
// 这里会先判断一下对应的msg对象是否实现了引用计数接口,
// 只有对应的msg实现了ReferenceCounted接口时,才会释放引用
return msg instanceof ReferenceCounted ?
((ReferenceCounted)msg).release() : false;
}
// 省略其他方法.....
}
Head
节点会作为出站链表的最后一个处理器调用,因此在所有自定义出站处理器执行完成后,最终调用该节点的write()
方法,在这个方法内部,最终调用了AbstractUnsafe.write()
方法,对应的方法实现中,咱们仅需关注ReferenceCountUtil.release(msg)
这行代码即可,最终会在该工具类中释放msg
对象的引用计数。
接着再来看看Tail
节点的实现源码:
// ChannelPipeline处理器链表的默认实现类
public class DefaultChannelPipeline implements ChannelPipeline {
// Tail处理器的实现类:实现了入站处理器接口,作为入站调用链最后的处理器
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler {
// 所有自定义的入站处理器执行完成后,会调用的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}
// 省略其他方法.....
}
// 前面Tail、Head调用的释放方法
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
// 调用释放ByteBuf缓冲区的方法
this.onUnhandledInboundMessage(msg);
// 记录日志
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline :" +
"{}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached " +
"at the tail of the pipeline. Please check your pipeline" +
"configuration.", msg);
} finally {
// 最终调用了引用计数工具类的release方法
ReferenceCountUtil.release(msg);
}
}
}
Tail
节点会作为入站链表的最后一个处理器调用,所以在执行Tail
处理器时,最终会调用它的channelRead()
方法,而在相应的方法内部,调用了onUnhandledInboundMessage()
方法,跟着源码继续走,此时也会发现,最终也调用了ReferenceCountUtil.release(msg)
方法来释放引用。
根据源码中的推断,似乎
Netty
框架发送/接收数据用的ByteBuf
,都会由头尾处理器来释放,但答案确实如此吗?NO
,为什么呢?再次将目光放到ReferenceCountUtil.release(msg)
这处代码:
// 引用计数工具类
public final class ReferenceCountUtil {
public static boolean release(Object msg) {
// 这里会先判断一下对应的msg对象是否实现了引用计数接口,
// 只有对应的msg实现了ReferenceCounted接口时,才会释放引用
return msg instanceof ReferenceCounted ?
((ReferenceCounted)msg).release() : false;
}
// 省略其他方法.....
}
// ByteBuf的类定义
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
此时大家注意看,ReferenceCountUtil.release()
在执行前,会先判断一下当前的msg
是否实现了ReferenceCounted
接口,而ByteBuf
是实现了的,因此如果执行到Head/Tail
处理器时,msg
数据依旧为ByteBuf
类型,头尾处理器自然可以完成回收工作,但如若是下面这种情况呢?
pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("俺是In-①入站处理器...");
// 在第一个入站处理器中,将接收到的ByteBuf数据转换为String向下传递
ByteBuf buffer = (ByteBuf) msg;
String message = buffer.toString(Charset.defaultCharset());
super.channelRead(ctx, message);
}
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("我是In-②入站处理器...");
super.channelRead(ctx, msg);
}
});
在上述这个案例中,咱们在第一个入站处理器中,将接收到的ByteBuf
数据转换为String
向下传递,也就意味着从In-②
处理器开始,后面所有的处理器收到的msg
都为String
类型,当自定义的两个处理器执行完成后,最终会调用Tail
处理器完成收尾工作,但问题来了!
因为在
In-①
中msg
类型发生了改变,所以当Tail
处理器中调用ReferenceCountUtil.release()
时,由于String
并未实现ReferenceCounted
接口,所以Tail
无法对该msg
进行释放,最终就会造成内存泄漏问题。
但此时内存泄漏,发生在哪个位置呢?答案是位于In-①
中,因为In-①
处理器中就已经将ByteBuf
用完了,将其中的数据转换成了String
类型,而ByteBuf
后续处理器都不会用到,因此该ByteBuf
占用的内存永远不会被释放,所以一定要注意:在使用处理器的过程中,如果明确ByteBuf
不会继续使用,那请一定要记得手动调用release()
方法释放引用,以上述案例说明:
ByteBuf buffer = (ByteBuf) msg;
String message = buffer.toString(Charset.defaultCharset());
buffer.release();
当明确不使用该ByteBuf
值时,请记住调用对应的release()
方法释放引用!这样能够有效避免内存泄漏的问题出现,有人也许会说,JVM
不是有GC
机制吗?为什么会出现内存泄漏呀?
关于上述问题的道理十分简单,因为
Netty
默认采用本地内存来创建缓冲区,并且会利用池化技术管理所有缓冲区,如果一个ByteBuf
对象的引用不为0
,那么该ByteBuf
会永久的占用内存资源,Netty
无法主动将其占用的内存回收到池中。
5.5、 Netty中的零拷贝技术
想要讲清楚Netty-ByteBuf
中的零拷贝技术,那首先得先明白零拷贝到底是个啥,因此咱们先讲明白零拷贝的概念,再讲清楚操作系统的零拷贝技术,然后再说说Java-NIO
中的零拷贝体现,最后再来聊Netty-ByteBuf
中的零拷贝技术。
六、随处可见的零拷贝技术
零拷贝这个词,在很多地方都有出现,例如Kafka、Nginx、Tomcat、RocketMQ...
的底层都使用了零拷贝的技术,那究竟什么叫做零拷贝呢?其实所谓的零拷贝,并不是不需要经过数据拷贝,而是减少内存拷贝的次数,上个例子来理解,比如Nginx
向客户端提供文件下载的功能。
客户端要下载的文件都位于Nginx
所在的服务器磁盘中,如果当一个客户端请求下载某个资源文件时,这时需要经过的步骤如下:
先来简单聊一聊文件下载时,Nginx
服务器内部的数据传输过程:
- ①客户端请求下载服务器上的某个资源,
Nginx
解析请求并得知客户端要下载的具体文件。 - ②
Nginx
向OS
发起系统IO
调用,调用内核read(fd)
函数,应用上下文切态至内核空间。 - ③
read()
函数通过DMA
控制器,将目标文件的数据从磁盘读取至内核缓冲区。 - ④
DMA
传输数据完成后,CPU
将数据从内核缓冲区拷贝至用户缓冲区(程序的内存空间)。 - ⑤
CPU
拷贝数据完成后,read()
调用结束并返回,上下文从内核态切回用户态。 - ⑥
Nginx
再次向OS
发起内核write(fd)
函数的系统调用,应用上下文再次切到内核态。 - ⑦接着
CPU
将用户缓冲区中的数据,写入到Socket
网络套接字的缓冲区。 - ⑧数据复制到
Socket
缓冲区后,DMA
控制器将Socket
缓冲区的数据传输到网卡设备。 - ⑨
DMA
控制器将数据拷贝至网卡设备后,write()
函数调用结束,再次切回用户态。 - ⑩文件数据抵达网卡后,
Nginx
准备向客户端响应数据,组装报文返回数据......
从上述流程大家可得知,一次文件下载传统的IO
流程,需要经过四次切态,四次数据拷贝(CPU、DMA
各两次),而所谓的零拷贝,并不是指不需要经过数据拷贝,而是指减少其中的数据拷贝次数。
6.1、操作系统中的零拷贝技术
我这里指的操作系统默认是Linux
,因为MacOS、Windows
系统相对闭源,因此对于这两个操作系统中的零拷贝技术个人并不熟悉。在Linux
中提供了多种零拷贝的实现:
- ①
MMAP
共享内存 +write()
系统函数。 - ②
sendfile()
内核函数。 - ③结合
DMA-Scatter/Gather Copy
收集拷贝功能实现的sendfile()
函数。 - ④
splice()
内核函数。
6.1.1、MMAP共享内存
先来聊聊第①种吧,MMAP
共享内存这个概念,在上篇关于《Linux-IO多路复用模型:select、poll、epoll源码分析》的文章结尾提到过,MMAP
共享内存是指:在内核空间和用户空间之间的一块共享内存,这块内存可被用户态和内核态直接访问,结构如下:
先看左边的图,这也是众多资料中流传的图,共享内存位于用户态和内核态之间,这样理解其实也并无大碍,但右边的图才更为准确,因为内核态和用户态本身是两个空间,各自之间并不存在真正的共享区域,MMAP
共享内存是通过虚拟内存机制实现的,也就是通过内存映射技术实现的。
什么又叫做内存映射技术呢?这个其实很好理解,就好比
Linux
中的软链接、Windows
中的快捷方式一样,拿大家熟悉的Windows
系统来说,一般在安装一个程序后,为了方便后续使用,通常都会默认在桌面上生成快捷方式(图标),这个快捷方式其实并不是一个真正的程序,而是指向安装目录下xxx.exe
的链接。
在Windows
系统上安装一个程序后,咱们可以通过点击桌面图标打开,亦可双击安装目录下的xxx.exe
文件启动,而操作系统中的共享内存也是同样的思路。
在主流操作系统中都有一种名为虚拟内存的机制,这是指可以分配多个虚拟内存地址,指向同一个物理内存地址,此时内核态程序和用户态程序,可以通过不同的虚拟地址,来操纵同一块物理内存,这也就是
MMAP
共享内存技术的真正实现。
MMAP
的系统定义如下:
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
addr
:指定映射的虚拟内存地址。length
:映射的内存空间长度。prot
:映射内存的保护模式。flags
:指定映射的类型。fd
:进行映射的文件句柄。offset
:文件偏移量。
还是之前那幅图(不要问我为什么,因为懒的画~),重点看图中圈出来的区域:
如果内核缓冲区和用户缓冲区使用了MMAP
共享内存,那当DMA
控制器将数据拷贝至内核缓冲区时,因为这里的内核缓冲区,本质是一个虚拟内存地址指向用户缓冲区,所以DMA
会直接将磁盘数据拷贝至用户缓冲区,这就减少了一次内核缓冲区到用户缓冲区的CPU
拷贝过程,后续直接调用write()
函数把数据写到Socket
缓冲区即可,因此这也是一种零拷贝的体现。
6.1.2、sendfile()内核函数
sendfile()
是Linux2.1
版本中推出的一个内核函数,系统调用的原型如下:
ssize_t sendfile(int fd_in, int fd_out, off_t *offset, size_t count);
fd_in
:待写入数据的文件描述符(一般为Socket
网络套接字的描述符)。fd_out
:待读取数据的文件描述符(一般为磁盘文件的描述符)。offset
:磁盘文件的文件偏移量。count
:声明在fd_out
和fd_in
之间,要传输的字节数。
对于啥是文件描述符我就不重复赘述了,这依旧在上篇的《Linux多路复用函数源码分析-FD文件描述符》中聊到过,当调用sendfile()
函数传输数据时,将out_fd
指定为等待写入数据的网络套接字,将in_fd
指定为待读取数据的磁盘文件,就可以直接在内核缓冲区中完成传输过程,无需经过用户缓冲区,如下:
依旧以前面Nginx
下载文件的过程为例,完整流程如下:
- ①客户端请求下载服务器上的某个资源,
Nginx
解析请求并得知客户端要下载的具体文件。 - ②
Nginx
向OS
发起系统IO
调用,调用内核sendfile()
函数,上下文切态至内核空间。 - ③
sendfile()
函数通过DMA
控制器,将目标文件的数据从磁盘读取至内核缓冲区。 - ④
DMA
传输数据完成后,CPU
将数据从内核缓冲区拷贝至Socket
缓冲区。 - ⑤
CPU
拷贝数据完成后,DMA
控制器将数据从Socket
缓冲区拷贝至网卡设备。 - ⑥数据拷贝到网卡后,
sendfile()
调用结束,应用上下文切回用户态空间。 - ⑦
Nginx
准备向客户端响应数据,组装报文返回数据......
相较于原本的MMAP+write()
的方式,使用sendfile()
函数来处理IO
请求,这显然性能更佳,因为这里不仅仅减少了一次CPU
拷贝,而且还减少了两次切态的过程。
6.1.3、DMA-Scatter/Gather Copy - sendfile()函数
前面聊了Linux2.1
版本中的sendfile()
函数,而到了Linux2.4
版本中,又对sendfile()
做了升级,引入了S/G-DMA
技术支持,也就是在DMA
拷贝阶段,如果硬件支持的情况下,会加入Scatter/Gather
操作,这样就省去了仅有的一次CPU
拷贝过程,如下:
优化后的sendfile()
函数,拷贝数据时只需要告知out_fd、in_fd、count
即可,然后DMA
控制器会直接将数据从磁盘拷贝至网卡,而无需经过CPU
将数据拷贝至Socket
缓冲区这一步。
6.1.4、splice()内核函数
前面聊到的sendfile()
函数只适用于将数据从磁盘文件拷贝到Socket
套接字或网卡上,所以这也限制了它的使用范围,因此在Linux2.6
版本中,引入了splice()
函数,其系统调用的原型如下:
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out,
size_t len, unsigned int flags);
fd_in
:等待写入数据的文件描述符。off_in
:如果fd_in
是一个管道文件(如Socket
),该值必须为NULL
,否则为文件的偏移量。fd_out
:等待读取数据的文件描述符。off_out
:作用同off_in
参数。len
:指定fd_in、fd_out
之间传输数据的长度。flags
:控制数据传输的模式:SPLICE_F_MOVE
:如果数据合适,按标准页大小移动数据(2.6.21
版本后被废弃)。SPLICE_F_NONBLOCK
:以非阻塞式模式执行splice()
,实际依旧会受FD
状态影响。SPLICE_F_MORE
:给内核一个提示,后续splice()
还会继续传输更多的数据。SPLICE_F_GIFT
:没有效果的选项。
使用splice
函数时,fd_in、fd_out
中必须至少有一个是管道文件描述符,套到网络编程中的含义即是指:必须要有一个文件描述符是Socket
类型,如果两个磁盘文件进行复制,则无法使用splice
函数。
splice()
函数的作用和DMA-Scatter/Gather
版的sendfile()
函数完全相同,但与其不同的是:splice()
函数不仅不需要硬件支持,而且能够做到两个文件描述符之间的数据零拷贝,实现的过程是基于一端的管道文件描述符,在两个FD
之间搭建pipeline
管道,从而实现两个FD
之间的数据零拷贝。
6.2、另类的零拷贝技术
前面聊到了四种Linux
系统中的零拷贝技术,而除开Linux
系统中的零拷贝技术外,还有一些另类的零拷贝实现,先来聊一聊缓冲区共享技术,然后再聊聊应用程序中的零拷贝体现。
6.2.1、缓冲区共享
缓冲区共享技术类似于Linux
中的MMAP
共享内存,但缓冲区共享则是真正意义上的内存共享技术,内核缓冲区和用户缓冲区共享同一块内存,如下:
操作系统一般为了系统的安全性,在运行期间都会分为用户态和内核态,无法直接访问用户态程序内核态空间,所以Linux
中的MMAP
是基于虚拟内存实现的,而想要实现真正意义上的内存共享,这也就意味着需要重写内核结构,目前比较成熟的只有Solaris
系统上的Fast Buffer
技术,但大家只需了解即可,因为这个也很少用到。
6.2.2、程序数据的零拷贝
前面聊到的零拷贝技术,都是在减少磁盘文件和网络套接字之间的数据拷贝次数,而程序中也会存在很多的数据拷贝过程,比如将一个大集合拆分为两个小集合、将多个小集合合并成一个大集合等等,传统的做法如下:
List<Integer> a = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> b = new ArrayList<>();
List<Integer> c = new ArrayList<>();
for (Integer num : a) {
int index = a.indexOf(num);
if (index < 5){
b.add(num);
} else {
c.add(num);
}
}
而这种做法显然会牵扯到数据拷贝,但上述这个做法,会从a
中将数据拷贝到b、c
集合中,而所谓的零拷贝,即是无需发生拷贝动作,也能够将a
拆分成b、c
两个集合。
关于具体如何实现,这点待会儿在
Netty-ByteBuf
中演示,因为Netty
中的零拷贝技术,也实现了程序数据的零拷贝。
6.3、Java-NIO中的零拷贝体现
Java-NIO
中,主要有三个方面用到了零拷贝技术:
MappedByteBuffer.map()
:底层调用了操作系统的mmap()
内核函数。DirectByteBuffer.allocateDirect()
:可以直接创建基于本地内存的缓冲区。FileChannel.transferFrom()/transferTo()
:底层调用了sendfile()
内核函数。
观察上述给出的三处位置,其实本质也就是在调用操作系统内核提供的零拷贝函数,以此减少数据的拷贝次数。
6.4、再聊Netty中的零拷贝体现
Netty
中的零拷贝与前面操作系统层面的零拷贝不同,它是一种用户进程级别的零拷贝体现,主要也包含三方面:
①
Netty
的发送、接收数据的ByteBuf
缓冲区,默认会使用堆外本地内存创建,采用直接内存进行Socket
读写,数据传输时无需经过二次拷贝。如果使用传统的堆内存进行Socket
网络数据读写,JVM
需要先将堆内存中的数据拷贝一份到直接内存,然后才写入Socket
缓冲区中,相较于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。②
Netty
的文件传输采用了transferTo()/transferFrom()
方法,它可以直接将文件缓冲区的数据发送到目标Channel(Socket)
,底层就是调用了sendfile()
内核函数,避免了文件数据的CPU
拷贝过程。③
Netty
提供了组合、拆解ByteBuf
对象的API
,咱们可以基于一个ByteBuf
对象,对数据进行拆解,也可以基于多个ByteBuf
对象进行数据合并,这个过程中不会出现数据拷贝,下面重点聊一聊这个!
其中前两条就不过多赘述了,毕竟前面都唠叨过好几次,重点说说第三种零拷贝技术,这是一种Java
级别的零拷贝技术,ByteBuf
中主要有slice()、composite()
这两个方法,用于拆分、合并缓冲区,先来聊聊拆分缓冲区的方法,案例如下:
// 测试Netty-ByteBuf的slice零拷贝方法
private static void sliceZeroCopy(){
// 分配一个初始容量为10的缓冲区
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 写入0~9十个字节数据
byte[] numData = {
'0','1','2','3','4','5','6','7','8','9'};
buffer.writeBytes(numData);
printBuffer(buffer);
// 从下标0开始,向后截取五个字节,拆分成一个新ByteBuf对象
ByteBuf b1 = buffer.slice(0, 5);
printBuffer(b1);
// 从下标5开始,向后截取五个字节,拆分成一个新ByteBuf对象
ByteBuf b2 = buffer.slice(5, 5);
printBuffer(b2);
// 证明切割出的两个ByteBuf对象,是共享第一个ByteBuf对象数据的
// 这里修改截取后的b1对象,然后查看最初的buffer对象
b1.setByte(0,'a');
printBuffer(buffer);
}
public static void main(String[] args) {
sliceZeroCopy();
}
在上述方法中,首先创建了一个buffer
对象,往其中写入了0~9
这十个字符,接着将其拆分成了b1、b2
这两个ByteBuf
对象,b1、b2
都具备独立的读写指针,但却并未真正的从buffer
中拷贝新的数据出来,而是基于buffer
这个对象,进行了数据截取,运行结果如下:
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=0, 写入指针=5, 容量大小=5}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 |01234 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=0, 写入指针=5, 容量大小=5}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 31 32 33 34 35 36 37 38 39 |a123456789 |
+--------+-------------------------------------------------+----------------+
观察上述第二、三个ByteBuf
缓冲区信息,与前面说的毫无差异,明显都具备独立的读写指针,但我为什么说:b1、b2
没有拷贝数据呢?接着看方法中的最后一步,我对b1
的第一个元素做了修改,然后输出了buffer
对象,看上述结果中的第四个ByteBuf
缓冲区信息,其实会发现:buffer
对象中下标为0
的数据,也被改成了a
!由此即可证明前面的观点。
不过这种零拷贝方式,虽然减少了数据复制次数,但也会有一定的局限性:\
①使用slice()
方法拆分出的ByteBuf
对象,不支持扩容,也就是切割的长度为5
,最大长度也只能是5
,超出长度时会抛出下标越界异常。\
②由于拆分出的ByteBuf
对象,其数据依赖于原ByteBuf
对象,因此当原始ByteBuf
对象被释放时,拆分出的缓冲区也会不可用,所以在使用slice()
方法时,要手动调用retain()/release()
来增加引用计数(这个后面细聊)。
除开上述的slice()
方法外,还有其他一个叫做duplicate()
的零拷贝方法,它的作用是完全克隆原有ByteBuf
对象,但读写指针都是独立的,并且支持自动扩容,大家感兴趣可以自行实验。
接着聊一聊合并ByteBuf
缓冲区的零拷贝方法,该方法的使用方式与前面的方法并不同,如下:
// 测试Netty-ByteBuf的composite零拷贝方法
private static void compositeZeroCopy(){
// 创建两个小的ByteBuf缓冲区,并往两个缓冲区中插入数据
ByteBuf b1 = ByteBufAllocator.DEFAULT.buffer(5);
ByteBuf b2 = ByteBufAllocator.DEFAULT.buffer(5);
byte[] data1 = {
'a','b','c','d','e'};
byte[] data2 = {
'n','m','x','y','z'};
b1.writeBytes(data1);
b2.writeBytes(data2);
// 创建一个合并缓冲区的CompositeByteBuf对象
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
// 将前面两个小的缓冲区,合并成一个大的缓冲区
buffer.addComponents(true,b1,b2);
printBuffer(buffer);
}
public static void main(String[] args) {
compositeZeroCopy();
}
/* * 运行结果:
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 6e 6d 78 79 7a |abcdenmxyz |
+--------+-------------------------------------------------+----------------+
* */
案例中,想要将多个缓冲区合并成一个大的缓冲区,需要先创建一个CompositeByteBuf
对象,接着调用它的addComponent()/addComponents()
方法,将小的缓冲区添加进去即可。但在合并多个缓冲区时,addComponents()
方法中的第一个参数必须为true
,否则不会自动增长读写指针。
其实说到底,
Netty-ByteBuf
缓冲区的零拷贝方法,实际上也可以被称之为“一种特殊的浅拷贝”,与之对应的是“深拷贝”,而ByteBuf
中的“深拷贝”,则是一系列以Copy
开头的方法,通过这类方法复制缓冲区,会完全分配新的内存地址、读写指针。
最后,在Netty
内部还提供了一个名为Unpooled
的工具类,这主要是针对于非池化缓冲区的工具类,内部也提供了一系列wrappend
开头的方法,可以用来组合、包装多个ByteBuf
对象或字节数组,调用对应方法时,内部也不会发生拷贝动作,这也是一类零拷贝的方法。
七、Netty入门篇小结
经过上述一系列的叨叨絮絮后,对于Netty
框架的基本概念,以及Netty
框架中大多数核心组件做了介绍,但对于一些粘包、半包、解码器、长连接、心跳机制等内容未阐述,原本打算将这些内容一篇写完,但本章的字数实在太多,严重超出单章限制:
因此对于后续一些进阶的知识,会再开设一篇讲述,预计Netty
的文章会有4~5
篇左右,大体顺序为《Netty
入门篇》、《Netty
进阶篇》、《Netty
实战篇》、《Netty
应用篇》、《Netty
源码篇》,但具体的篇幅会在后续适当调整。
本篇的内容就到这里啦,如若对你有帮助,请记得点个小赞支持一下~