一、Netty的异步回调模式
Netty继承和扩展了JDK Future系列异步回调的API,定义了自身的Futrue系列接口和类,实现了异步任务的监控、异步执行结果的获取。总体来说Netty对Java Future异步任务的扩展如下:
- 继承Java的Future接口,得到了一个新的属于Netty自己的Future异步任务接口,该接口对原有接口进行了增强,使得Netty异步任务能够以非阻塞的方式处理回调的结果
- 引入了一个新街口——GenericFutureListener,用于表示异步执行完成的监听器。这个Netty使用了监听器的模式,异步任务的执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的GenericFutureListener监听器接口加入Netty异步任务Future中,实现对异步任务执行状态的事件监听
1、GenericFutureListener接口
public interface GenericFutureListener<F extends Future<?>> extends EventListener { // 监听器的回调方法 void operationComplete(F var1) throws Exception; }
GenericFutureListener拥有一个回调方法operationComplete,表示异步任务操作完成。在Future异步任务执行完成后将回调执行此方法。在大多数情况下,Netty的异步回调代码编写在GenericFutureListener接口的实现类中的operationComplete方法中。
它的父接口EventListener是一个空接口,没有任何的抽象方法,是一个仅仅具有标识作用的接口。
2、Future接口
Netty也定义了自己的Future接口,对JDK原有的Future接口进行了扩展:
public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); // 判断异步执行是否成功 boolean isCancellable(); // 判断异步执行是否取消 Throwable cause(); // 获取异步任务异常的原因 // 增加异步任务执行完成与否的监听器 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); // 溢出异步任务执行完成与否的监听器 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); }
Netty的Future接口一般不会直接使用,而是会使用子接口。Netty有一系列的子接口,代表不同类型的异步任务,如ChannelFuture接口。
3、ChannelFuture的使用
在Netty的网络编程中,网络连接通道的输入和输出处理都是异步进行的,都会返回一个ChannelFuture接口的实例。通过返回的异步任务实例,可以为它增加异步回调的监听器。在异步任务真正完成后,回调才会执行。
// connect是异步的,仅提交异步任务 ChannelFuture future = bootstrap.conncect(new InetSocketAddress("www.wanning.com", 80)); // connect的异步任务真正执行完成后,future回调监听器才会执行 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()) { ... } } });
GenericFutureListener接口在Netty中是一个基础类型接口。在网络编程的异步回调中,一般使用Netty中提供的某个接口,如ChannelFutureListener接口。
二、Netty中的Reactor反应器模式
1、Channel通道组件
Channel通道主键是Netty中非常重要的主键,反应器模式和通道紧密相关,反应器的查询和分发的IO事件都来自于Channel通道组件。
Netty中不直接使用Java NIO的Channel通道组件,对Channel通道组件进行了自己的封装。在Netty中,有一系列的Channel通道组件,为了支持多种通信协议,或者说对于每一种通信连接协议,Netty都实现了自己的通道。
另外一点就是除了Java的NIO,Netty还能处理Java的面向流的OIO。总的来说Netty中每一种协议的通道,都有NIO和OIO两个版本。
- NioSocketChannel:异步非阻塞TCP Socket传输通道
- NioServerSocketChannel:异步非阻塞TCP Socket服务器端监听通道
- NioDatagramChannel:异步非阻塞的UDP传输通道
- NioSctpChannel:异步非阻塞Sctp传输通道
- NioSctpServerChannel:异步非阻塞Sctp服务器端监听通道
- OioSocketChannel:同步阻塞式TCP Socket传输通道
- OioServerSocketChannel:同步阻塞式TCP Socket服务器端监听通道
- OioDatagramChannel:同步阻塞式UDP传输通道
- OioSctpChannel:同步阻塞式Sctp传输通道
- OioSctpServerChannel:同步阻塞式Sctp服务器端监听通道
一般来说,服务器端编程用到最多的通信协议还是TCP协议,对应的传输通道类型是NioSocketChannel,服务器监听类为NioServerSocketChannel。在主要使用的方法上,其他的通道类型和这个NioSocketChannel类在原理上基本是相通的。在Netty的NioSocketChannel内部封装了一个Java NIO的SelectableChannel成员,通过这个内部的Java NIO通道,Netty的NioSocketChannel通道上的IO操作,最终会落地到Java NIO的SelectableChanne底层通道。
2、Reactor反应器
在反应器模式中,一个反应器会负责一个事件处理线程,不断地轮询,通过Selector选择器不断查询注册过的IO事件(选择键)。如果查询到IO事件,则分发给Handler业务处理器。
Netty中的反应器有多个实现类,与Channel通道类有关系。对应于NioSocketChannel通道,Netty的反应器类为NioEventLoop。NioEventLoop类绑定了两个重要的Java成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。
也就是说一个NioEventLoop拥有一个Thread线程,负责一个Java NIO Selector选择器的IO事件轮询。在Netty中,一个EventLoop反应器和Channel通道是一对多的关系,一个反应器可以注册成千上万的通道。
3、Handler处理器
Java NIO可供选择器监控的通道IO事件类型包括以下4种:
- 可读:SelectionKey.OP_READ
- 可写:SelectionKey.OP_WRITE
- 连接:SelectionKey.OP_CONNECT
- 接收:SelectionKey.OP_ACCEPT
在Netty中,EventLoop反应器内部有一个Java NIO选择器成员执行以上事件的查询,然后进行对应的事件分发。事件分发的目标就是Netty自己的Handler处理器。
**Netty的Handler处理器分为两大类,第一类是ChannelInboundHandler通道入站处理器,第二类ChannelOutboundHandler通道出站处理器。二者都继承了ChannelHandler处理器接口。**Netty中的入站处理,不仅仅是OP_READ输入事件的处理,还是从通道底层触发,由Netty通过层层传递,调用ChannelInboundHandler通道入站处理器进行的某个处理。以底层的Java NIO中的OP_READ输入事件为例:在通道中发生了OP_READ事件后会被EventLoop查询到,然后分发给ChannelInboundHandler通道入站处理器,调用它的入站处理的方法read。在ChannelInboundHandler通道入站处理器内部的read方法可以从通道中读取数据。
Netty中的入站处理,触发的方向为:从通道到ChannelInboundHandler通道入站处理器。
Netty中的出站处理,本来就包括Java NIO的OP_WRITE可写事件。不过OP_WRITE可写事件是Java NIO的底层概念,它和Netty的出站处理的概念不是一个维度的,Netty的出站处理是应用层维度的,具体指的是从ChannelOutboundHandler通道出站处理器到通道的某次IO操作。在应用程序完成业务处理后,可以通过ChannelOutboundHandler通道出站处理器将处理的结果写入底层通道,它的最常用的一个方法就是write()方法,把数据写入到通道。
这两个业务处理接口都有各自的默认实现:ChannelInboundHandler的默认实现为ChannelInboundHandlerApater,叫作通道入站处理适配器。ChannelOutboundHandler的默认实现ChannelOutboundHandlerAdapter,叫作通道出站处理适配器。这两个默认的通道处理器适配器,分别实现了入站操作和出站的基本功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理器适配器即可。
4、Netty的流水线(Pipeline)
首先梳理一下Netty反应器模式中各个组件之间的关系:
- 反应器和通道之间是一对多的关系,一个反应器可以查询很多个通道的IO事件
- 通道和Handler处理器实例之间是多对多的关系,一个通道的IO事件被多个的Handler实例处理器,一个Handler处理器实例也可以绑定到很多的通道,处理多个通道的IO事件。
问题是通道和Handler处理器实例之间的绑定关系,Netty是如何组织的呢?
Netty设计了一个特殊的组件,叫做ChannelPipeline(通道流水线),它像一条管道,将绑定到一个通道的多个Handler处理器实例,串在一起形成一条流水线。ChannelPipeline的默认实现实际上被设计成一个双向链表。所有的Handler处理器实例被包装成了双向链表的节点,被加入到了ChannelPipeline中。也就是说一个Netty通道拥有一条Handler处理器流水线,成员的名称叫作pipeline。
以入站处理为例,每一个来自通道的IO事件,都会进入一次ChannelPipeline通道流水线,在进入第一个Handler处理器后,这个IO事件将按照既定的从前往后次序,在流水线上不断地向后流动,流向一个Handler处理器。在向后流动的过程中,会出现3种情况:
- 如果后面还有其他Handler入站处理器,那么IO事件可以交给下一个Handler处理器向后流动
- 如果后面没有其他的入站处理器,就意味着这个IO事件在此次流水线中的处理结束了
- 如果在流水线中间需要终止流动,可以选择不将IO事件交给下一个Handler处理器,流水线的执行也就被终止了
总之流水线是通道的大管家,为通道管理好了它的一大堆Handler。
三、Bootstrap启动器类
Bootstrap类是Netty提供的一个便利的工厂类,可以通过它来完成Netty的客户端或服务器端的Netty组件的组装以及Netty程序的初始化。当然我们也可以不用这个Bootstrap启动器,但是一点点去手动创建通道、完成各种设置和启动、并且注册到EventLoop,这个过程会非常麻烦。
在Netty中有两个启动器类,分别用在服务器和客户端,这两个启动器仅仅是使用的地方不同,它们大致的配置和使用方法都是相同的。
1、父子通道
在Netty中每一个NioSocketChannel通道所封装的是Java NIO通道,再往下就对应到操作系统底层的socket描述符。理论上来说,操作系统底层的socket描述符分为两类:连接监听类型和传输数据类型。在Netty中,异步非阻塞的服务器端监听通道NioServerSocketChannel,封装在Linux底层的描述符,是连接监听类型的socket描述符。而NioSocketChannel异步非阻塞TCP Socket传输通道,封装在底层Linux的描述符,是数据传输类型的socket描述符。
在Netty中,NioServerSocketChannel负责服务器连接监听和接收,也叫父通道。对于每一个接收到的NioSocketChannel传输类通道,也叫子通道。
2、EventLoopGroup线程组
Netty的Reactor反应器模式是多线程版本的反应器模式,在Netty中,一个EventLoop相当于一个子反应器,一个NioEventLoop子反应器拥有了一个线程,同时拥有一个Java NIO选择器。多个EventLoop线程组成一个EventLoopGroup线程组。
反过来说,Netty的EventLoopGroup线程组就是一个多线程版本的反应器。Netty的程序开发不会直接使用单个EventLoop线程,而是使用EventLoopGroup线程组。EventLoopGroup的构造函数有一个参数,用于指定内部的线程数。在构造器初始化时,会按照传入的线程数量,在内部构造多个Thread线程和多个EventLoop子反应器,进行多线程的IO时间查询和分发。如果使用无参构造函数或传入线程数为0,那么默认EventLoopGroup内部线程数为最大可用CPU处理器数量的两倍。
在服务器端一般有两个独立的反应器,一个负责新连接的监听和接收,一个负责IO事件处理。对应到Netty服务器程序中则是设置两个EventLoopGroup线程组。负责新连接的监听和接受的线程组查询父通道的IO事件,称为Boss线程组。另一个线程组负责查询所有子通道的IO事件,并且执行Handler处理器中的业务处理,例如数据的输入和输出,称为Worker线程组。
3、Bootstrap启动流程
1.创建反应器线程组,并赋值给ServerBootstrap启动器实例
- 通过NioEventLoopGroup创建两个线程组
- 通过bootstrap.group配置线程组,可以只配置一个反应器线程组,这种模式下连接监听IO事件和数据传输IO事件在同一个线程中处理,会导致连接的接受被更加耗时的数据传输或业务处理所阻塞
2.设置通道的IO类型
- Netty不止支持Java NIO,也支持OIO,因此需要进行配置
- 通过bootstrap.channel()方法,传入通道的class类文件,如bootstrap.channel(NioServerSocketChannel.class)
3.设置监听端口:bootstrap.localAddress(new InetSocketAddress(port))
4.设置传输通道的配置选项:bootstrap.option用于给父通道接收连接通道设置一些选项,bootstrap.childOption设置子通道设置一些通道选项
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true):开启心跳机制
- bootstrap.option(ChannelOption.SO_ALLOCATOR, PooledByteBufAllocator.DEFAULT)
5.装配子通道的Pipeline流水线:装配子通道的Handler流水线调用childHandler()方法,传递一个ChannelInitializer通道初始化类的实例。在父通道成功接收一个连接,并创建成功一个子通道后,就会初始化子通道,这里配置的ChannelInitializer实例就会被调用
- 在ChannelInitializer通道初始化类的事例中,有一个initChannel初始化方法,在子通道创建后被执行到,向子通道流水线增加业务处理器
- 也可以调用handler为付通道设置ChannelInitializer初始化器,但是父通道接受新连接后除了初始化子通道一般不需要特别的配置
// 裝配子通道流水綫 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { // 有连接到达时会创建一个通道的子通道,并初始化 protected void initChannel(SocketChannel ch) throws Exception { // 流水线管理子通道中的Handler业务处理器 // 向子通道流水线添加一个Handler业务处理器 ch.pipeline().addLast(new NettyDiscardHandler()); } });
6.开始绑定服务器连接的监听端口
- 开始绑定端口,通过调用sync同步方法阻塞直到绑定成功:bootstrap.bind().sync()
- bind返回一个端口绑定Netty的异步任务channelFuture,在Netty中所有的IO操作都是异步执行的,可以通过自我阻塞一直到ChannelFuture异步任务执行完成或者为ChannelFuture增加事件监听器的两种方式以获得Netty中的IO操作的真正结果
7.自我阻塞,直到通道关闭
- channelFuture.channel().closeFuture().sync()
- 如果要阻塞当前线程直到通道关闭,可以使用通道的closeFuture方法,以获取通道关闭的异步任务,当通道被关闭时,closeFuture实例的sync方法会返回
8.关闭EventLoopGroup
- loopGroup.shutdownGracefully
- 关闭Reactor反应器线程组,同时会关闭内部的子反应器线程,也会关闭内部的Selector选择器、内部的轮询线程以及负责查询的所有的子通道。在子通道关闭后会释放所有的资源,如TCP Socket文件描述符等
4、ChannelOption通道选项
无论是对于NioServerSocketChannel父通道类型,还是对于NioSocketChannel子通道类型,都可以设置一系列的ChannelOption选项,在ChannelOption类中,定义了一些通道选项:
- SO_RCVBUF,SO_SNDBUF:此为TCP参数,每个TCP套接字在内核中都有一个发送缓冲区和一个接收缓冲区,这两个选项就是用来设置TCP连接的这两个缓冲区大小的
- TCP_NODELAY:此为TCP参数,表示立即发送数据,默认值为true(Netty默认值为true,而操作系统默认值为false),该值用于设置Nagle算法的启用,该算法将小的碎片数据连接成更大的报文,来最小化所发送报文的数量。如果需要发送一些娇小的报文,则需要禁用该算法。
- SO_KEEPALIVE:此为TCP参数,表示底层TCP协议的心跳机制,true为连接保持心跳,默认值为false。启用该功能时TCP会主动探测空闲连接的有效性(默认心跳间隔为7200s)。Netty默认关闭该功能
- 5.SO_REUSEADDR:此为TCP参数,设置为true时表示地址复用,默认值为false。由四种情况需要用到这个参数设置:
- 当有一个由相同本地地址和端口的socket处于TIME_WAIT状态时,而我们希望启动的程序socket2要占用该地址和端口,例如在重启服务且保持先前端口时
- 有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同
- 单个进程绑定相同的端口到多个socket上,但每个socket绑定的IP地址不同
- 完全相同的地址和端口的重复绑定,但这只用于UDP的多播,不用于TCP
5.SO_LINGER:此为TCP参数,表示关闭socket的延迟时间,默认值为-1,表示禁用该功能。-1表示socke.close方法立即返回,但操作系统底层会将发送缓冲区全部发送到对端。0表示socket.close方法立即放回,操作系统放弃发送缓冲区的数据,直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close方法的线程被阻塞,直到延迟时间到来、发送缓冲区的数据发送完毕,若超时,则对端会收到服务复位错误。
6.SO_BACKLOG:此为TCP参数,表示服务器端接收连接的队列长度,如果队列已满,客户端连接将被拒绝。在Windows中默认值为200,其他操作系统为128.如果连接建立频繁,服务器处理新连接较慢,可以适当调大这个参数
7.SO_BROADCAST:此为TCP参数,表示设置广播模式
四、Channel通道
1、主要成员和方法
在Netty中,通道是其中的核心概念之一,代表着网络连接。它负责同对端进行网路通信,可以写入数据到对端,也可以从对端读取数据。
通道的抽象类AbstractChannel的构造函数如下:
protected abstract Channel(Channel parent) { this.parent = parent; // 父通道 id = newId(); unsafe = newUnsafe() // 底层NIO通道,完成实际的IO操作 pipeline = new ChannelPipeline(); // 一条通道,拥有一条流水线 }
AbstractChannel内部有一个pipeline属性,表示处理器的流水线。Netty在对通道进行初始化的时候,会将pipeline属性初始化为DefaultChannelPipeline的实例。
AbstractChannel内部有一个parent属性,表示通道的父通道。对于连接监听通道来说,其父通道为null,而对于每一个传输通道,其parent属性的值为接收到该连接的服务器连接监听通道。
几乎所有的通道实现类都继承了AbstractChannel抽象类,都拥有上面的parent和pipeline两个属性成员。
再来看一下,在通道接口中所定义的几个重要方法:
- ChannelFuture connect(SocketAddress address):连接远程服务器,方法的参数为远程服务器的地址,调用后会立即返回,返回值为负责连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用
- ChannelFuture bind(SocketAddress address):绑定监听地址,开始监听新的客户端连接,此方法在服务器的新连接监听和接收通道使用
- ChannelFuture close():关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法,或者调用ChannelFuture异步任务的sync()方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕
- Channel read():读取通道数据,并且启动入站处理。具体来说从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline流水线,开始数据读取的入站处理,此方法的返回通道自身用于链式调用
- ChannelFuture write(Object obj):启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的一部处理任务。
- Channel flush():将缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会根据缓冲区的情况决定什么时候把数据写到对端,而执行flush()方法立即将缓冲区的数据写到对端。
2、EmbeddedChannel嵌入式通道
在Netty的实际开发中,通信的基础工作已经由Netty完成了。实际上大量的工作时设计和开发ChannelHandler通道业务处理器,而不是开发Outbound出站处理器,换句话就是开发Inbound入站处理器。开发完成之后需要投入单元测试,将Handler业务处理器加入到通道的Pipeline流水线中,然后启动Netty服务器、客户端程序,相互发送消息,测试业务处理器的效果。如果每开发一个业务处理器,都进行服务器和客户端的重复启动,那么是非常繁琐和浪费时间的。
因此Netty提供了一个专用通道EmbededChannel,它仅仅是模拟入站和出站的操作,底层不进行实际的传输,不需要启动Netty服务器和客户端。除了不进行传输之外,EmbeddedChannel的其他的事件机制和处理流程和真正的传输通道是一模一样的。因此开发人员可以在开发过程中方便快捷地进行ChannelHandler业务处理器的单元测试。具体提供的方法如下:
- writeInbound:向通道写入inbound入站数据,模拟通道收到数据。也就是说这些写入的数据会被流水线上的入站处理器处理
- readInbound:从EmbeddedChannel中读取入站数据,返回经过流水线最后一个入站处理器完成之后的入站数据,如果没有数据则返回null
- writeOutbound:向通道写入outbound出站数据,模拟通道发送数据。也就是说这些写入的数据会被流水线上的出站处理器处理
- readOutbound:从EmbeddedChannel中读取出站数据,返回经过流水线最后一个出站处理器处理之后的出站数据,如果没有数据则返回null
- finish:结束EmbeddedChannel,它会调用通道的close反复噶
最重要的两个方法为writeInbound和readOutbound方法
Netty基础入门和基本使用-2+https://developer.aliyun.com/article/1391143?spm=a2c6h.13148508.setting.21.671d4f0ezNOeJb