4.Netty核心源码分析
4.1.Java NIO之Selector
1、什么是Selector
Selector(选择器)是java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
2、为什么使用Selector
仅用单线程来处理多个channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程来处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统一些资源(如内存),因此使用的线程越少越好。
3、Selector的创建
通过调用Selector.open()方法创建一个Selector,如下:
Selector selector = Selector.open();
4、向Selector注册通道
为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:
ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); SelecttionKey key = channel.register(selector,SelectionKey.OP_READ);
与Selector一起使用时,Channel必须处于非阻塞模式下,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道可以。
注意register()方法的第二个参数。这是一个interest集合,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:
- Connect:SelectionKey.OP_CONNECT “连接就绪”
- Accept:SelectionKey.OP_ACCEPT “接收就绪”
- Read:SelectionKey.OP_READ “读就绪”
- Write:SelectionKey.OP_WRITE “写就绪”
如果不止对一种事件感兴趣,可以用”位或“操作符将常量符拼接起来,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE
5、SelectionKey
当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些属性:
(1)interest集合
interest集合就是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,用“位与”操作interest 集合和给定的SelectionKey常量,可以确定某个确定的事件是否在interest 集合中。
int interestSet = selectionKey.interestOps(); //判断接收接续的事件是否在集合中 boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; //判断连接就绪的事件是否在集合中 boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; //判断读就绪事件是否在集合中 boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; //判断写就绪事件是否在集合中 boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
(2)ready集合
ready集合是通道已经准备就绪的操作的集合,在一次选择(Selection)之后,你会首先访问这个ready Set。
//获取读就绪事件的个数 int readySet = selectionKey.readyOps();
可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:
//判断接收就绪的事件 selectionKey.isAcceptable(); //判断连接就绪的事件 selectionKey.isConnectable(); //判断读就绪的事件 selectionKey.isReadable(); //判断写就绪事件 selectionKey.isWriteable();
(3)Channel+Selector
从SelectionKey访问Channel和Selector很简单.
//获取当前channel Channel channel = selectionKey.channel(); //获取当前selector Selector selector = selectionKey.selector();
(4)附加对象
可以将一个对象或者更多的信息附着在SelectionKey上,这样就能方便的识别某个给定的通道,例如,可以附加与通道一起使用的Buffer,或者包含聚集数据的某个对像。
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
还可以通过register()方法像Selector注册Channel的时候附加对象。如:
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,thObject);
6、通过Selector选择通道
一旦向Selector注册了一个或者多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(连接、接受、读或者写)已经准备就绪的那些通道。
(1)int select()
阻塞到至少有一个通道在你注册的事件上就绪了。
(2)int select(long timeout)
和select一样,处理最长会阻塞timeout毫秒(参数)
(3)int selectNow()
不会阻塞,不管什么通道就绪都立即返回,如果没有通道可以选择,此方法直接返回零
select()方法返回的int值表示有多少通道已经准备就绪。
7、selectedKeys()
一旦调用select()方法后,并且返回值有一个或多个准备就绪了,然后就可以通过调用selector的selectedKeys()方法,访问已选择键集中的就绪通道。
Set selectedKeys = selector.selectedKeys();
当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。
Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
这个循环遍历已选择键集中的每个键,并检测各个键锁对应的通道的就绪事件。
注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。
8、wakeUp()
某个线程调用select()方法阻塞后,即使没有通道已经就绪,也有办法让其从select()中返回,只要让其他线程在第一个线程调用select()方法的哪个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。
9、close()
用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
10.完整示例
//创建ServerSocketChannel对象 ServerSocketChannel channel = ServerSocketChannel.open(); //创建selector对像 Selector selector = Selector.open(); //设置非阻塞 channel.configureBlocking(false); //绑定通道,返回读就绪的事件 SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { //查询当前已经有多少读就绪的事件 int readyChannels = selector.select(); //如果都就绪事件为0推出当前循环 if(readyChannels == 0) continue; //调用select方法后,只要有就绪的通道,就开始遍历通道 Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); //判断是否是那四种事件类型,做除响应的处理 if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } }
4.2.EventLoop和EventLoopGroup
1、EventLoop和EventLoopGroup简介
为解决系统在运行中频繁切换上下文带来的性能的损耗。
而且要考虑并发下数据的安全。Netty采用了串行化的设计理念,从消息的读取、编码以及后续ChannelHandler的执行,始终都由IO线程EventLoop负责,这就意味着整个流程不会进行线程上下文的切换,
数据也不会面临被并发修改的风险。
EventLoopGroup是一组EventLoop的抽象,一个EventLoopGroup当中包含一个或者多个EventLoop,EventLoopGroup提供next接口,可以从一组EventLoop里卖弄按照一定的规则获取其中一个EventLoop来进行工作。
2、分析这两块代码
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
这里创建了两个EventLoopGroup对象bossGroup和workGroup,bossGroup主要用来处理server socket监听client socket的连接请求,server socket接收了新的连接后,会把connection socket放到workGroup中进行处理,也就是说workGroup主要用来处理connection socket的网络IO和相关的业务逻辑。workGroup会由next选择其中一个EventLoop来讲这个SocketChannel注册到其维护的Selector并对后续的IO事件进行处理。
ChannelPipeline中每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。所以至关重要不要阻塞这个线程。
总结:
- NioEventLoopGroup实际上就是个线程池,一个EventLoopGroup包含一个或者多个EventLoop
- 一个EventLoop在它的生命周期内只和一个Thread绑定
- 所有的EventLoop处理的I/O事件都将在它专有的Thread上被处理
- 一个Channel在它的生命周期内只注册一个EventLoop
- 每一个EventLoop负责处理一个或者多个Channel
- 一个EventLoop维护一个Selector
3、跟踪NioEventLoopGroup构造函数
(1)NioEventLoopGroup
public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } public NioEventLoopGroup() { this(0); }
(2)进入到NioEventLoopGroup的父类MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
可以看见NioEventGroup的构造函数如果nThreads为非0值时,则为传入的实际值,如果为0的话或者没有参数,则为DEFAULT_EVENT_LOOP_THREADS,DEFAULT_EVENT_LOOP_THREADS为系统内核的2倍。
static { //获取系统的内核*2付给DEFAULT_EVENT_LOOP_THREADS,静态块加载 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
4.3.Netty的启动引导类
Netty的启动类分为客户端启动类和服务端启动类,分别是Bootstrap和ServerBootStrap。他们都是AbstractBootStrap的子类,总的来说他们都是Netty中的辅助类,提供了链式配置方法,方便Channel的引导和启动。
1、服务端启动引导类ServerBootStrap
(1)group:设置线程模型,Reactor线程模型对比EventLoopGroup
- 先看一下ServerBootStrap的group方法
- 双参group方法
- 单参group方法
- 单线程方式
- 多线程方式
- 主从线程方式
(2)channel:设置channel通道类型NioServerSocketChannel、OioServerSocketChannel
- 用来设置交互模型的,5种IO模型
(3)option:作用于每个新建立的channel,设置TCP连接中的一些参数
- ChannelOption.SO_BACKLOG:存放已完成三次握手的请求的等待队列的最大长度。
- 设置ChannelOption.SO_BACKLOG参数
- Linux服务器TCP连接底层:
- syn queue:半连接队列,tcp_max_syn_backlog
- accept_queue:全连接队列,net.core.somaxconn
- 系统默认的somaxconn参数要足够大,如果backlog比somaxconn大,则会优先用后者。
- ChannelOption.TCP_NODELAY:为解决Nagle算法的问题,默认是false,要求高实时性,有数据时马上发送,就将该选项设置为true打开Nagle算法。
(4)childOption: 作用于被accept之后的连接
(5)childHandler:用于对每个通道里面的数据处理
2、客户端启动引导类Bootstrap
3、客户端与服务端的引导过程
(1)引导服务器
(2)引导客户端
4.4.Netty核心组件Channel
- 什么是Channel:客户端和服务器建立连接的一个连接通道
- 什么是ChannelHandler:负责Channel的逻辑处理
- 什么是ChannelPipeline:负责管理ChannelHandler的有序容器
- 三者的关系:
- 一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到
- ChannelPipeline中创建Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久性的。
- Channel当状态出现的变化,就会触发对应的事件
- channelRegistered:channel注册到一个EventLoop
- channelActive:变为活跃状态(连接到了远程主机),可以接收和发送数据
- channelInactive:channel处于非活跃状态,没有连接到远程主机
- channelUnregistered:channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定。
4.5.ChannelHander接口详解
1、ChannelHandler的生命周期
接口ChannelHandler定义的生命周期相关定义如下表。这些操作在ChannelHandler被添加到一个ChannelPipline,或者从一个ChannelPipeline中移除时被调用,每一个方法都有一个ChannelHandlerContext作为参数。
类型 | 描述 |
handlerAdded | 当ChannelHandler被添加到一个ChannelPipeline时被调用 |
handlerRemoved | 当ChannelHandler从一个ChannelPipeline中移除时被调用 |
exceptionCaught | 处理过程中ChannelPipeline中发生错误时被调用 |
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JP1ksSGR-1667215373972)(images/4.2(2).jpg)]
Netty定义了下面两个重要的ChannelHandler子接口
- ChannelInboundHandler:处理输入数据和所有类型的状态变化
- ChannelOutboundHandler:处理输出数据,可以拦截所有操作。
- 2、ChannelInboundHandler接口
ChannelInboundHandler方法
类型 | 描述 |
channelRegistered | 当一个Channel注册到EventLoop上调用的方法 |
channelUnregistered | 当一个Channel从它的EventLoop上解除注册,不再处理I/O时被调用 |
channelActive | 当Channel变成活跃状态时被调用;Channel是连接/绑定、就绪的 |
channelInactive | 当Channel离开活跃状态,不再连接到某个远端时被调用 |
channelReadComplete | 当Channel上某个读操作完成时调用 |
channelRead | 当从Channel中读数据时被调用 |
userEventTriggered | 因某个POJO穿过ChannelPipeline引发ChannelnboundHandler.fireUserEventTriggered()时被调用 |
示例:
当一个ChannelInboundHandler实现类重写channelRead()方法时,它负责释放ByteBuf的内存。Netty为此提供了工具方法,ReferenceCountUtil.release()。
对未释放的资源,Netty会打印一个警告(WARN-level)的日志消息,让你很方便的查找代码中的问题,但是用这种方式管理资源有些麻烦,还有一个更简单的替代方法就是用SimpleChannelInboundHandler。
因为SimpleChannelInboundHandler自动释放资源,任何对消息的引用都会变成无效,所以你不能保存这些引用在后面的ChannelHandler中再次使用。
3、ChannelOutboundHandler接口
输出的操作和数据由ChannelOutBoundHandler处理。它的方法可以被Channel,ChannelPipeline和ChannelHandlerContext调用。
ChannelOutboundHandler有一个强大的功能,可可以按需推迟一个操作,这使得处理请求可以用到更为复杂的策略。比如,如果写数据到远端被暂停,你可以推迟flush操作,稍后再试。
ChannelOutboundHandler方法
ChannelOutboundHander的大部分方法都用了一个ChannelPromise输入参数,用于当操作完成时收到通知。ChannelPromise是ChannelFuture的子接口,定义了可写的方法,比如setSuccess(),或者setFailure(),而ChannelFuture则是不可变的对象。
4、ChannelHandler适配器类
刚开始尝试些写你自己的ChannelHandler时,你可以用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter这两个类。
这两个适配器类分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。它们继承了共同的父接口ChannelHandler的方法,扩展了抽象类ChannelHandlerAdapter。
ChannelHandlerAdapter类层级关系
ChannelHandlerAdapter还提供了一个工具方法isSharable()。如果实现类带有@Sharable注解,那么这个方法就会返回true,意味着这个对象可以被添加到多个ChannelPipeline中。
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中的方法调用相关ChannelHandlerContext中的等效方法,一次将事件转发到管道中的下一个ChannelHandler。
5、资源管理
无论何时你对数据操作ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write(),你需要确保没有资源泄露。Netty采用计数来处理ByteBuf池。所以在你用完一个ByteBuf后,调整引用计数的值是很重要的。
为了帮助检测潜在问题,Netty提供了ResourceLeakDetector类,它通过采样应用程序1%的buffer分配来检查是否有内存泄露。这个过程开销是很小的。
如果泄露被检测到,会产生类似下面的日志消息:
内存泄露检测级别
级别 | 描述 |
DISABLED | 关闭内存泄露检测。只有在大量测试后,才能引用这个级别。 |
SIMPLE | 报告默认的1%采样率中发现的任何泄露。 |
ADVANCED | 报告发现的泄露和消息的位置。使用默认的采样率。 |
PARANOID | 类似ADVANCED级别,但是每个消息的获取都被检测采样。这对性能有很大影响,只能在调试阶段使用。 |
在你实现ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()时,你怎样用这个诊断工具来防止内存泄露呢。让我们来看看当前handler没有通过ChannelContext.fireChannelRead()把消息传递到下一个ChannelInboundHandler。下面的代码说明了如何释放这条消息占用的内存。
因为读取和释放输入消息是一个非常罕见的任务,Netty提供了一个特殊的ChannelInboundHandler实现类-SimpleChannelInboundHandler。一条消息在被SimplateChannelInboundHandler的channelRead0()读取后,会被自动释放资源。
在输出方向,如果你处理一个write()操作并且丢弃一条消息(没有写入Channel),你就应该负责释放这条消息。下面代码实现丢弃所有待写的数据。
重要的是,不仅要释放资源,而且要通知ChannelPromise,否则会出现某个ChannelFutureListener没有被告知消息已被处理的情况。
总之,如果一个消息被“消费”或者丢弃,没有送到ChannelPipeline中的下一个ChannelOutboundHandler,用户就要负责调用ReferenceCountUtil.release()。如果消息到达了真正的传输层,在它被写到socket中或者Channel关闭时,会被自动释放(这种情况下用户就不用管了)。
4.6.ChannelPipeline接口详解
1、ChannelPipeline接口简介
如果你把ChannelPipeline看成是一串ChannelHandler实例,连接穿过Channel的输入输出event,那么就很容易理解,每个新创建的Channel都会分配一个新的ChannelPipeline,这个关系是恒定的,Channel不可以换别的ChannelPipeline,也不可以解除当前分配的ChannelPipeline。
根据来源一个event可以被一个ChannelInboundHandler或者ChannelOutboundHandler处理,接下来通过调用ChannelHandlerContext的方法,
它会被转发到下一个同类型的handler。
2、ChannelHandlerContext
**ChannelHandlerContext让一个ChannelHandler可以和它的ChannelPipeline和其他的Handler交互。**通过ChannelHandlerContext,一个handler可以通知ChannelPipeline中的下一个ChannelHandler。甚至动态改变他所属ChannelPipeline。
3、修改一个ChannelPipeline
一个ChannelHandler可以通过增加,删除或者替换其他ChannelHandler来改变一个ChannelPipeline的布局。这是ChannelHandler最重要的特性之一。
改动ChannelPipeline的ChannelHandler方法
方法名 | 描述 |
addFirst / addBefore / addAfter / addLast | 添加一个ChannelHandler到ChannelPipeline |
remove | 从ChannelPipeline中删除一个ChannelHandler |
replace | 用一个ChannelHandler替换ChannelPipeline中的另一个ChannelHandler |
4、ChannelPipeline获取ChannelHandler的操作
方法名 | 描述 |
get | 通过类型或者名字返回一个ChannelHandler |
context | 返回ChannelHandler绑定的ChannelHandlerContext |
names | 返回ChannelPipeline中所有ChannelHandler的名字 |
5、ChannelPipeline输入方法
ChannelPipeline API提供了一些调用输入和输出操作的额外方法。用来通知ChannelInboundHandler在ChannelPipline发生的event。
实现类为DefaultChannelPipeline中具体实现
6、ChannelPipeline输出方法
7、总结
- ChannelPipeline保存了所有Channel相关的ChannelHandler
- 可以通过增加和减少ChannelHandler来动态修改ChannelPipeline
- ChannelPipeline有大量的API,用来对输入输出做出响应行动
4.7.ChannelHandlerContext接口详解
1、ChannelHandlerContext接口简介
ChannelHandlerContext代表了一个ChannelHandler和一个ChannelPipeline之间的关系,它在ChannelHandler被添加到ChannelPipeline时被创建。ChannelHandlerContext的主要功能是管理它对应的ChannelHandler和属于同一个ChannelPipeline的其他ChannelHandler之间的交互。
ChannelHandlerContext有很多方法,其中一些方法和Channel与ChannelPipeline中相同,但是还是有些区别的,如果你在Channel或者ChannelPipeline实例上调用这些方法,他们会贯穿整个pipeline,但是ChannelHandlerContext中调用相同的方法,仅仅从当前ChannelHandler开始,走到pipeline中下一个可以处理这个event的ChannelHandler。
一个ChannelHandler绑定的ChannelHandlerContext 永远不会改变,所以把它的引用缓存起来是安全的。
2、ChannelHandlerContext的使用
通过Channel或者ChannelPipeline传递event
通过ChannelHandlerContext触发event流操作
4.8.多个入站出战ChannelHandler的执行顺序
1、一般项目中,InboundHandler和OutboundHandler有多个,在Pipeline中执行顺序?
- InboundHandler顺序执行,OutboundHandler逆序执行。
- 2、测试执行顺序
(1)按照InboundHandler1、InboundHandler2、OutboundHandler1、OutboundHandler2顺序存放
socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2());
ChannelHandlerContext不会贯穿整个pipeline,寻找到要处理的Handler开始处理。
(2)按照OutboundHandler1、OutboundHandler2、InboundHandler1、InboundHandler2顺序存放
socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2()); socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2());
(3)按照InboundHandler1、InboundHandler2、OutboundHandler1、OutboundHandler2顺序存放,用pipeline调用
socketChannel.pipeline().addLast(new InboundHandler1()); socketChannel.pipeline().addLast(new InboundHandler2()); socketChannel.pipeline().addLast(new OutboundHandler1()); socketChannel.pipeline().addLast(new OutboundHandler2());
3、结论
- InboundHandler顺序执行,OutboundHandler逆序执行
- InboundHandler顺序执行,OutboundHandler逆序执行
InboundHandler之间传递数据,通过ctx.fireChannelRead(msg)
InboundHandler通过ctx.write(msg),则会传递outboudnHandler
使用ctx.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行,但是使用channel.write(msg)、pipeline.write(msg)情况会不一致,都会执行。
outBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求在接收数据,先outbound在inbound,服务端则相反。
4.9.Netty异步回调模式-Future和Promise
1、Future简介
我们知道Netty的I/O操作都是异步的,例如bind、connect、write等操作,会返回一个Future接口。Netty源码中大量使用了异步回调处理模式。在做Netty应用开发时,我们也会用到,所以了解Netty的异步监听,无论是做Netty应用开发还是阅读源码都是十分重要的。
2、Future接口剖析
可知:ChannelFuture继承了Netty的Future,Netty的Future继承了JDK的Future
JDK的Future,用的最多的就是线程池ThreadPoolExecutor,通过submit方法提交任务返回一个Future实例,通过它来查询任务的执行状态和执行结果,最常用的方法就是isDone()和get()。
Netty的Future,在继承JDK的Future基础上,扩展了自己的方法:
public interface Future<V> extends java.util.concurrent.Future<V> { // 任务执行成功,返回true boolean isSuccess(); // 任务被取消,返回true boolean isCancellable(); // 支付执行失败,返回异常 Throwable cause(); // 添加Listener,异步非阻塞处理回调结果 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 移除Listener Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 同步阻塞等待任务结束;执行失败话,会将“异常信息”重新抛出来 Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); // 同步阻塞等待任务结束,和sync方法一样,只不过不会抛出异常信息 Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // 非阻塞,获取执行结果 V getNow(); // 取消任务 @Override boolean cancel(boolean mayInterruptIfRunning); }
我们知道JDK的Future的任务结果获取需要主动查询,而Netty的Future通过添加监听器Listener,可以做到异步非阻塞处理任务结果,可以称为被动回调。
同步阻塞有两种方式:sync和await(),区别:sync()方法在任务失败后,会把异常信息抛出,await()方法对异常信息不做任何处理,当我们不关心异常信息的时候可以用await(),通过阅读源码我们知道sync()方法其实里面调的就是await()方法。
Future接口没有和IO操作关联在一起,Future的子接口ChannelFuture,它和IO操作中的channel通道关联在一起了,用于异步处理channel事件,这个接口用的最多。
public interface ChannelFuture extends Future<Void> { // 获取channel通道 Channel channel(); @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); // 标记Futrue是否为Void,如果ChannelFuture是一个void的Future,不允许调// 用addListener(),await(),sync()相关方法 boolean isVoid(); }
ChannelFuture接口相比父类Future接口,就增加了**channel()和isVoid()**两个方法,其他方法都是覆盖父类的接口,没有别的扩展。
了解一下ChannelFuture的状态转换:
ChannelFutue就两种状态Uncompleted(未完成)和Completed(完成),Completed包括三种,执行成功,执行失败和任务取消。注意:执行失败和任务取消都属于Completed。
让我们来看一下Future的另一个子接口Promise,它是个可写的Future
public interface Promise<V> extends Future<V> { // 执行成功,设置返回值,并通知所有listener,如果已经设置,则会抛出异常 Promise<V> setSuccess(V result); // 设置返回值,如果已经设置,返回false boolean trySuccess(V result); // 执行失败,设置异常,并通知所有listener Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); // 标记Future不可取消 boolean setUncancellable(); @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }
Future接口只提供了获取返回值的get()方法,不可设置返回值。
Promise接口在Future基础上,还提供了设置返回值和异常信息,并立即通知listeners。
而且,一旦setSuccess()或setFailure()后,那些await()或者sync()的线程就会从等待中返回。
接下来,让我们来看看ChannelFuture的可写的子接口ChannelPromise
public interface ChannelPromise extends ChannelFuture, Promise<Void> { // 覆盖ChannelFuture接口 @Override Channel channel(); // 覆盖Promise接口 @Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause); @Override ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); ChannelPromise unvoid(); }
ChannelPromise接口只是综合了ChannelFuture和Promise接口,没有新增的功能。
3、Promise的实现类
DefaultPromise的主要属性
// setSuccess设置result为null时,设置成SUCCESS private static final Object SUCCESS = new Object(); // 不可取消值 private static final Object UNCANCELLABLE = new Object(); // 取消异常信息持有者 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder( StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)")); //执行结果,使用volatile修饰,保证可见性 private volatile Object result; // 当Promise执行完成时需要通知Listener,此时就使用这个executor private final EventExecutor executor; // 要通知的listener,因为它可能是不同的类型,所以定义为Object类型,使用时再判断类型 private Object listeners; // 要使用wait()/notifyAll()机制,这个变量记录了waiter的数量 private short waiters; // 是否正在通知listener private boolean notifyingListeners;
从属性中可以看出, DefaultPromise 通过 Object 的 wait/notify 机制实现线程间的同步,通过 volatile 属性保证线程间的可见性。
@Override public Promise<V> setSuccess(V result) { // 第一次成功设置返回值后,返回Promise对象 if (setSuccess0(result)) { return this; } // 否则抛出异常 throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) { // result为null,设置为SUCCESS return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { // CAS设置result值,只有当result为null或者UNCANCELLABLE,才可以执行成功 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // 唤醒等待的waiter,同时判断是否存在listener if (checkNotifyWaiters()) { // 通知所有的listener notifyListeners(); } return true; } return false; } private synchronized boolean checkNotifyWaiters() { if (waiters > 0) { // 通知所有waiters notifyAll(); } // 判断是否添加了监听者listeners return listeners != null; } private void notifyListeners() { EventExecutor executor = executor(); // 是否是同一个线程 if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 用自己的executor执行 safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); }
DefaultPromise的方法实现逻辑挺简单,不再全部讲解了,只需知道通过 Object 的 wait/notify 机制实现线程间的同步和观察者设计模式进行通知就可以了。
DefaultPromise还有个子类DefaultChannelPromise,这个类在Netty中用的最多的,其内部逻辑调用的都是父类DefaultPromise的方法。DefaultChannelPromise类层次结构图如下:
Netty的Future继承JDK的Future,通过Object的wait/notify机制,实现了线程间的同步,使用观察者设计模式,实现了异步非阻塞回调处理。