一、Netty之深入Hotspot源码与Linux内核理解NIO与Epoll
(一)什么是IO模型
IO模型就是说用什么样的通道进行数据的发送和接收,Java共支持3种网络编程IO模式:BIO,NIO,AIO。
(二)BIO(Blocking IO)
BIO,Blocking IO的简称,同步阻塞模型,一个客户端连接对应一个处理线程,其模型图如下:
每个客户端连接都需要一个处理线程
1.BIO的使用
服务端
创建一个BIO的服务端大致需要如下几步:
1.使用ServerSocket的构造函数创建一个ServerSocket对象
ServerSocket serverSocket = new ServerSocket(9000);
2.执行ServerSocket的accept()方法获取一个Socket对象
Socket clientSocket = serverSocket.accept();
要注意的是,如果没有客户端连接到服务端这个方法会一致阻塞
3.获取Socket中的输入流对象,通过read()方法将客户端发送的信息读取到字节数组中
byte[] bytes = new byte[1024];
int read = clientSocket.getInputStream().read(bytes);
要注意的实是,有客户端连接到服务端,但是客户端没有发送消息到服务端这个方法也会阻塞。
4.获取Socket的输出流对象,通过write方法将将消息发送到客户端
clientSocket.getOutputStream().write("HelloClient".getBytes());
注意:第2、3、4一般会放在一个while循环中去操作,第3、4步一般会放在一个子线程中去操作
服务端代码示例:
public class SocketServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待连接。。");
//阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了。。");
handler(clientSocket);
/*new Thread(new Runnable() {
@Override
public void run() {
try {
handler(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();*/
}
}
private static void handler(Socket clientSocket) throws IOException {
byte[] bytes = new byte[1024];
System.out.println("准备read。。");
//接收客户端的数据,阻塞方法,没有数据可读时就阻塞
int read = clientSocket.getInputStream().read(bytes);
System.out.println("read完毕。。");
if (read != -1) {
System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
}
clientSocket.getOutputStream().write("HelloClient".getBytes());
clientSocket.getOutputStream().flush();
}
}
客户端
创建一个客户端大致需要如下几步:
1.根据据IP和端口创建
Socket socket = new Socket("localhost", 9000);
2.获取Socket中的输出流,通过其write()方法向服务端发送数据
socket.getOutputStream().write("HelloServer".getBytes());
3.获取Socket中的输入流,通过其read()方法将服务端发送过来的数据读取到字节数组中
byte[] bytes = new byte[1024];
socket.getInputStream().read(bytes);
客户端代码示例
public class SocketClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("localhost", 9000);
//向服务端发送数据
socket.getOutputStream().write("HelloServer".getBytes());
socket.getOutputStream().flush();
System.out.println("向服务端发送数据结束");
byte[] bytes = new byte[1024];
//接收服务端回传的数据
socket.getInputStream().read(bytes);
System.out.println("接收到服务端的数据:" + new String(bytes));
socket.close();
}
}
2.BIO缺点与适应场景
BIO缺点:
1.C10k问题:BIO是个阻塞IO,serverSocket.accept()
用于接收客户端连接,若将这个操作同后续的读取客户消息及往客户端发送消息放在同一线程下,在所有操作之前执行结束之前,其他所以的客户请求都会阻塞等待,所以后续的服务端的读取数据及往客户端写消息会放在子线程中去处理,如果客户端连接太多,处理客户端线程很多,会导致服务器线程太多,压力太大,比如C10K问题。
2.浪费资源:IO代码里read操作是阻塞操作,如果连接不做数据读写操作会导致线程阻塞,浪费资源
适用场景:
BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高, 但程序简单易理解。
(三)NIO(Non Blocking IO)
NIO(Non Blocking IO),JDK1.4引入的同步非阻塞模式,服务器实现模式为一个线程可以处理多个请求(连接)。客户端发送的连接请求都会注册到多路复用(一个线程搞定所有通路)器selector上,多路复用器轮询到的连接有IO请求,就进行处理。
NIO能够极致的压榨线程,一个线程将所有的事情都做了
1.BIO的使用
1.1无多路复用器使用
1)打开一个ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
2)绑定服务地址
serverSocket.socket().bind(new InetSocketAddress(9000));
3)设置ServerSocketChannel是否阻塞,即设置执行accept()获取SocketChannel时是否阻塞
serverSocket.configureBlocking(false);
将ServerSocketChannel设置为非阻塞
4)执行ServerSocketChannel的accept()方法 ,获取SocketChannel对象
SocketChannel socketChannel = serverSocket.accept();
若第三步设置为true,执行这一步时,若没有客户端连接则会阻塞
5)设置SocketChannel是否阻塞, 即设置读取客户发送的消息时是否阻塞
socketChannel.configureBlocking(false);
6)通过SocketChannel的read方法,将客户端发送的消息读取到ByteBuffer中
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int len = sc.read(byteBuffer);
非阻塞模式read方法不会阻塞,否则会阻塞
代码实例如下
public class NioServer {
// 保存客户端连接
static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel,与BIO的serverSocket类似
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
System.out.println("服务启动成功");
while (true) {
// 非阻塞模式accept方法不会阻塞,否则会阻塞
// NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) { // 如果有客户端进行连接
System.out.println("连接成功");
// 设置SocketChannel为非阻塞
socketChannel.configureBlocking(false);
// 保存客户端连接在List中
channelList.add(socketChannel);
}
// 遍历连接进行数据读取
Iterator<SocketChannel> iterator = channelList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 非阻塞模式read方法不会阻塞,否则会阻塞
int len = sc.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客户端断开,把socket从集合中去掉
iterator.remove();
System.out.println("客户端断开连接");
}
}
}
}
}
存在的问题:如果连接数太多的话,会有大量的无效遍历,假如有10000个连接,其中只有1000 个连接有写数据,但是由于其他9000个连接并没有断开,我们还是要每次轮询遍历一万 次,其中有十分之九的遍历都是无效的,这显然不是一个让人很满意的状态。
1.2引入多路复用器
引入多路复用器Selector后, 每次只处理有事件响应的客户端
1)打开一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
2)绑定服务地址
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
3)设置ServerSocketChannel是否阻塞,即设置执行accept()获取SocketChannel时是否阻塞
serverSocketChannel.configureBlocking(false);
将ServerSocketChannel设置为非阻塞
4)打开Selector处理Channel,也就是创建epoll, 然后把ServerSocketChannel注册到selector上,让selector对客户端accept连接操作感兴趣
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
5)阻塞等待需要处理的事件发生, 这里的事件即可以是客户连接事件, 也可以是客户端发送消息事件, 若没有事件发生则一直阻塞
selector.select();
6)有事件发生时, 执行Selector的selectedKeys()获取selector中产生的全部事件的 SelectionKey 实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
7)遍历SelectionKey集合, 处理相应的事件
(1)事件为OP_ACCEPT事件, 也就是客户端连接事件时,主要进行的操作为从SelectionKey中获取到ServerSocketChannel, 然后从ServerSocketChannel中获取SocketChannel, 然后把SocketChannel注册到selector上, 让selector对客户端的发送消息感兴趣.
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
SelectionKey selKey = socketChannel.register(selector, SelectionKey.OP_READ);
(2)事件为OP_READ事件, 也就是客户端往服务端发送数据时, SelectionKey获取到的channel为SocketChannel, 这个时候会进行将客户端发送的消息读取到ByteBuffer中的操作.
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
int len = socketChannel.read(byteBuffer);
代码实例如下
public class NioSelectorServer {
public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
// 打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功");
while (true) {
// 阻塞等待需要处理的事件发生
selector.select();
// 获取selector中注册的全部事件的 SelectionKey 实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
SelectionKey selKey = socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
} else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
socketChannel.close();
}
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
}
2.多路复用器感知事件原理
2.1NIO 有三大核心组件及感知原理
NIO 有三大核心组件: Channel(通道), Buffer(缓冲区),Selector(多路复用器)
1、channel 类似于流,每个 channel 对应一个 buffer缓冲区,buffer 底层就是个数组
2、channel 会注册到 selector 上,由 selector 根据 channel 读写事件的发生将其交由某个空闲的线程处理
3、NIO 的 Buffer 和 channel 都是既可以读也可以写
感知事件原理:
JDK1.4版本是用linux的内核函数select()或poll()来实现,跟上面的不引入多路复用器Selector的NioServer代码类似,selector每次都会轮询所有的sockchannel, 看下哪个channel有读写事件,有的话就处理,没有就继续遍历, 所以1.4版本有问题
JDK1.5开始引入了epoll基于事件响应机制来优化NIO。epol内部l有一个就绪事件列表rdlist,用于存放要处理的事件。epoll借助操作系统的中断程序,调用内部内核的回调函数,把有事件的channel放入到rdilst。当有事件时,就会产生一个系统中断(操作系统做的,由操作系统硬中断的程序做的),将事件放入到epoll的就绪事件列表中.
select |
poll |
epoll(jdk 1.5及以上) |
|
操作方式 |
遍历(轮训每个 sockChannel) |
遍历(轮训每个 sockChanne) |
回调(借助操作系统的中断程 序,调用内部内核的回调函数,把有事件的channel放入 到epoll内部的rdilst) |
底层实现 |
数组 |
链表 |
哈希表 |
IO效率 |
每次调用都进行线 性遍历,时间复杂 度为O(n) |
每次调用都进行 线性遍历,时间 复杂度为O(n) |
事件通知方式,每当有IO事件 就绪,系统注册的回调函数就 会被调用,时间复杂度O(1) |
最大连接 |
有上限(限制是 1025) |
无上限 |
无上限 |
2.2 基于Linux的Hotspot源码的感知事件原理
1.创建ServerSocketChannel对象
执行ServerSocketChannel serverSocket = ServerSocketChannel.open()
会执行SelectorProvider的抽象SelectorProviderImpl#openServerSocketChannel()
创建一个SocketChannelImpl
对象。SocketChannelImpl中会报存Sokect的文件描述符对象及其文件描述符值(Sokect文件描述符通道调用本地方法socket()调用linux内核函数socket0创建)
2.创建多路复用Selector
执行Selector selector = Selector.open()
时,会执行SelectorProvider在Linux中的实现EPollSelectorProvider#openSelector
实例化一个EPollSelectorImpl
。EPollSelectorImpl中会保存一个epoll的包装对象EPollArrayWrapper
对象,在EPollArrayWrapper实例化过程中会执行本地方法epfd = epollCreate();
,从而执行linux内核函数int epfd = epoll_create(256);
创建一个创建epoll实例并返回其文件描述符,文件描述符就保存在EPoll包装对象中。
文件描述(fd,即file descriptor)是Linux内核为了高效的管理已打开的“文件”而创建的索引,用索引可以找到文件。linux系统一切皆文件,创建的所有东西,最终都是以文件的形式描述的。(相当于对象在操作系统创建的索引,根据文件描述符可以找到在操作系统对应的epoll对象)。
3.注册Channel到Selector上并设置其感兴趣的操作
这一步主要是保存channel中监听对象soket的文件描述符到EPollArrayWrapper的数组updateDescriptors
中,同时将监听对象fd及其对应的事件存放到map集合eventsHigh中,过程大致如下
1.执行SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_CONNECT);
完成Channel的注册及感兴趣的key的设置。这个过程会构建一个保存了channel和selector的SelectionKey的实现SelectionKeyImpl
,然后通过执行它的interestOps(var2)
设置感兴趣的操作。
2.interestOps其底层执行的是channel的方法this.channel.translateAndSetInterestOps(var1, this)
,在linux的实现中,会先判断SelectionKey是否为SelectionKey.OP_ACCEPT,若是会执行执行newOps |= PollArrayWrapper.POLLIN;
计算出fd对应的事件。
3.然后进一步执行selector的方法sk.selector.putEventOps(sk, newOps);
,再进一步的执行poll包装对象的方法pollWrapper.setInterest(ch.getFDVal(), ops);
。
4.setInterest会将ServerSocketChannel中需要监听的文件描述符(也就是socket对应的文件描述符)fd存放在EPollArrayWrapper的数组updateDescriptors
中,同时更新map集合eventsHigh
中fd对应的事件值。
4.阻塞等待事件响应
执行selector.select();
阻塞等待事件响应,底层会执行EPollArrayWrapper#poll
完成相应的操作,主要有两步:
1.执行updateRegistrations();
方法,将fd(socket)及其监听的事件(event)注册到epoll,完成epoll与channel的关联。这个方法中会变量fd的数组updateDescriptors
,然后根据fd找到其对应事件,然后再执行本地方法 epollCtl(epfd, opcode, fd, events);
,底层调用linux内核方法epoll_ctl(epfd, (int)opcode, (int)fd, &event)
,将fd(socket)及其监听的事件(event)注册到epoll,或更新epoll中fd对应的事件。
2.执行本地方法调用本地方法epollWait等待有事件发生。执行updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
底层会调用Linux的内核函数epoll_wait(epfd, events, numfds, -1)
等待文件描述符epfd上的事件,若没事件就阻塞,有事件就跳出阻塞。
在epoll中有一个就绪事件列表rdlist,存放的是要处理的事件。epoll借助操作系统的中断程序,当有事件时,就会由操作系统硬中断的程序产生一个系统中断,调用内部内核的回调函数,把有事件的channel放入到epoll内部的rdilst中。
当socket收到数据后,中断程序调用回调函数,会给epoll实例的事件就绪列表rdlist里添加该socket弓I用 ,当程序执行到epoll_wait时,如果rdlist为空阻塞进程,如果rdlist已经有socket引用了,那么epoll_wait跳出阻塞直接返回。
二、Netty之基础使用与线程模型
(一)Netty初探
Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,是一个高性能事件驱动的异步的非堵塞的IO(NIO)框架,用于建立TCP等底层的连接,基于Netty可以建立高性能的Http服务器。支持HTTP、 WebSocket 、Protobuf、 Binary TCP 和UDP 协议。
1为什么要使用Netty
JDK自带的NIO存在的问题
NIO 的类库和 API 繁杂, 使用麻烦: 需要熟练掌握Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer等。
开发工作量和难度都非常大: 例如客户端面临断线重连、 网络闪断、心跳处理、半包读 写、 网络拥塞和异常流的处理等等。
Netty的优势
Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。
tips:Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持
2.Netty的使用场景
1)互联网行业
在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,而Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用,典型的应用有:
阿里分布式服务框架Dubbo的RPC框架有个Dubbo协议用于节点间通信,而Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
Rocketmq底层也是用的Netty作为基础通信组件。
2)游戏行业
无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
3)大数据领域
经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
netty相关开源项目:https://netty.io/wiki/related-projects.html
netty用的很多,这些都对它做了封装,没有感知到。一般不做游戏行业的开发,用到netty的机会不是很多
(二)Netty通讯示例
1.netty依赖的引入
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.35.Final</version>
</dependency>
2.服务端
Server
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建服务器端的引导类对象--服务端代表
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数,配置整 Netty程序,串联各个组件
//设置两个线程组
bootstrap.group(bossGroup, workerGroup)
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//设置相关属性
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
//设置ServerBootstrap的childHandler,用于为Channel的请求提供服
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听我们关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
ServerHandler
/**
* 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接通道建立完成");
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到客户端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
3.客户端
Client
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("netty client start。。");
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
//对通道关闭进行监听
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
ClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Netty框架的目标就是让你的业务逻辑从网络基础应用编码中(如数据交互,网络处理,异常处理)分离出来,让你可以专注业务的开发(拿到msg进行处理),而不需写一大堆类似NIO的网络处理操作。
(三)Reactor响应式编程及Netty线程模型
1.《Scalable IO in Java》IO处理模式
在了解Netty的线程模型之前可以先理解下由李二狗编写的《Scalable IO in Java》这篇文章里说的一些IO处理模式。下面只是对文章中集中IO处理模式做简单的介绍,更详细的可以参考其原文http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
1.1经典的服务设计模型(Classic Service Designs)
Classis Service Designs ,经典的服务设计模型,当然你也可以把它翻译为传统的服务设计。在一般的网络服务当中都会为每一个连接的处理开启一个新的线程,我们可以看下大致的示意图:
而BIO模型就是一个经典的服务设计模型
1.2基于事件驱动的Events in AWT
AWT是Java提供的用来建立和设置Java的图形用户界面的基本工具,JAVA早期的时候用的还比较多,目前来讲已经很少使用了。这个设计模型是给按钮添加事件,点击按钮后就会触发后端监听按钮的程序。这个模式下通常不需要为每一个客户端建立一个线程,线程开销、上下文切换、锁互斥较少,但任务的调度可能会慢一些,而且通常实现的复杂度也会增加,相关功能必须分解成简单的非阻塞操作。
1.3Reactor 基于事件响应的编程模型
Reactor设计模式是一种事件处理模式,用于处理由一个或多个输入并发交付的服务请求。服务处理程序将传入的请求多路复用,并将它们同步地分派给相关的请求处理程序。其特点为:通过分配适当的handler(处理程序)来响应IO事件,每个handler执行非阻塞的操作,通过将handler绑定到事件进行管理。
1.3.1Basic Reactor Pattern
基础的Reactor模型,也就是就是单线程下基本的Reactor设计模式,一个线程处理所有的请求。NIO底层的原理就是单线程的Reactor,可以把Reactor理解为selector。如下图
1.3.2Worker Thread Pool
Reactor模型多线程模型NIO多线程版本,为了更快的提升它的性能,可以提供一个线程池来处理读写事件。
NIO中连接事件处理很快,可以使用一个线程取处理,但是读写事件比较耗时,当读写事件非常多的时候,再使用一个线程去处理就很慢了,所以可以使用一个线程池去处理读写事件。
1.3.2Multiple Reactors
连接事件很多的时候,读写要等一段时间时可以采用多个Reactors的处理模式,类似NIO中的多个selector。这种Reactor模式为主从从Reactor模式,可以时一主一从,一组多从模式。其中mainReacor只处理连接事件,subReactor处理读写事件。
2.Netty线程模型
Netty的线程模型为Multiple Reactors的一主多从,多主多从模式,线程模型如下图所示:
模型解释:
1) Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
2) BossGroup和WorkerGroup类型都是NioEventLoopGroup
3) NioEventLoopGroup 相当于一个事件循环线程组, 这个组中含有多个事件循环线程 , 每一个事件循环线程是NioEventLoop
4) 每个NioEventLoop都有一个selector , 用于监听注册在其上的socketChannel的网络通讯
5) 每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
将NioSocketChannel注册到某个worker NIOEventLoop上的selector 处理任务队列的任务 , 即runAllTasks
6) 每个worker NIOEventLoop线程循环执行的步骤
轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在 pipeline 中的流动处理
7) 每个worker NIOEventLoop处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler 处理器用来处理 channel 中的数据
(四)Netty模块组件
1.【Bootstrap与ServerBootstrap】
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
2.【Future、ChannelFuture】
在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听。注册监听的具体的实现就是通过 Future 和 ChannelFutures注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。如下:
等待执行完成
可以加上sync()
方法等待执行完成
ChannelFuture cf = bootstrap.bind(9000);
cf.sync();
注册监听
ChannelFuture cf = bootstrap.bind(9000);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});
3.【Channel】
Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:
1)当前网络连接的通道的状态(例如是否打开?是否已连接?)
2)网络连接的配置参数 (例如接收缓冲区大小)
3)提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
4)调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以I/O 操作成功、失败或 取消时回调通知调用方。
5)支持关联 I/O 操作与对应的处理程序。
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:
NioSocketChannel,异步的客户端 TCP Socket 连接。
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
NioDatagramChannel,异步的 UDP 连接。
NioSctpChannel,异步的客户端 Sctp 连接。
NioSctpServerChannel,异步的 Sctp 服务器端连接。
这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
4. 【Selector】
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
5.【NioEventLoop】
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks方法触发。
6.【NioEventLoopGroup】
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个Channel 只对应于一个线程。
7.【ChannelHandler】
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序。
ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
1 ChannelInboundHandler 用于处理入站 I/O 事件。
2 ChannelOutboundHandler 用于处理出站 I/O 操作。
或者使用以下适配器类:
1 ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
2 ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
8.【ChannelHandlerContext】
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
9.【ChannelPipline】
保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。
read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。
(五)ByteBuf详解
从结构上来说,ByteBuf 由一串字节数组构成,数组中每个字节用来存放信息。ByteBuf 提供了两个索引,一个用于读取数据,一个用于写入数据。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置。 当从 ByteBuf 读取时,它的 readerIndex(读索引)将会根据读取的字节数递增。 同样,当写 ByteBuf 时,它的 writerIndex 也会根据写入的字节数进行递增。
需要注意的是极限的情况是 readerIndex 刚好读到了 writerIndex 写入的地方。如果 readerIndex 超过了 writerIndex 的时候,Netty 会抛出 IndexOutOf- BoundsException 异常。
ByteBuf的简单使用实例
public class NettyByteBuf {
public static void main(String[] args) {
// 创建byteBuf对象,该对象内部包含一个字节数组byte[10]
// 通过readerindex和writerIndex和capacity,将buffer分成三个区域
// 已经读取的区域:[0,readerindex)
// 可读取的区域:[readerindex,writerIndex)
// 可写的区域: [writerIndex,capacity)
ByteBuf byteBuf = Unpooled.buffer(1);
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 8; i++) {
byteBuf.writeByte(i);
}
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.getByte(i));
}
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.readByte());
}
System.out.println("byteBuf=" + byteBuf);
//用Unpooled工具类创建ByteBuf
ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhuge!", CharsetUtil.UTF_8);
//使用相关的方法
if (byteBuf2.hasArray()) {
byte[] content = byteBuf2.array();
//将 content 转成字符串
System.out.println(new String(content, CharsetUtil.UTF_8));
System.out.println("byteBuf2=" + byteBuf2);
System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104
int len = byteBuf2.readableBytes(); //可读的字节数 12
System.out.println("len=" + len);
//使用for取出各个字节
for (int i = 0; i < len; i++) {
System.out.println((char) byteBuf2.getByte(i));
}
//范围读取
System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
}
}
}
三、Netty核心功能
(一)Netty编解码
1. 编解码相关组件
Netty中,发出的数据要是字节数组形式,需要将数据编码成字节数组的形式,收到的数据也是字节数组,需要将字节数组节解码成我们希望的数据。
Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等,先大概了解下这几个组件的作用:
ChannelHandler
ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理,而你的业务逻辑通常写在一个或者多个ChannelInboundHandler中。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler冲刷数据(channel.writeAndFlush(buf))。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。
ChannelPipeline
ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为入站的,入站只调用pipeline里的ChannelInboundHandler逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。如下图
2. 编码解码器
当你通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站(服务端到客户端)消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站(服务端到客户端)消息,它会被编码成字节。
2.1Netty提供的编解码器
Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder(基于JDK的序列化,效率不高)和ObjectDecoder(基于JDK的反序列化,效率不高)等。添加方式
客户端或服务端添加String编解码器
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
2.2高效的编解码protobuf
如果要实现高效的编解码可以用protobuf,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff。protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单,代码如下:
引入依赖
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-api</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.10</version>
</dependency>
创建编解码工具类
package com.tuling.netty.codec;
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* protostuff 序列化工具类,基于protobuf封装
*/
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
cachedSchema.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*
* @param obj
* @return
*/
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*
* @param data
* @param clazz
* @return
*/
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = clazz.newInstance();
Schema<T> schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
public static void main(String[] args) {
byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge"));
User user = ProtostuffUtil.deserializer(userBytes, User.class);
System.out.println(user);
}
}
定义客户端Handler
继承ChannelInboundHandlerAdapter,重新channelActive方法,往客户端发送消息时使用编码(序列化)方法
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到服务器消息:" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler发送数据");
//ctx.writeAndFlush("测试String编解码");
//测试对象编解码
//ctx.writeAndFlush(new User(1,"zhuge"));
//测试用protostuff对对象编解码
ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serializer(new User(1, "zhuge")));
ctx.writeAndFlush(buf);
}
}
定义服务端handler
继承ChannelInboundHandlerAdapter,重新channelRead方法,读取客户端发送的消息时使用解码(反序列化)方法
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//System.out.println("从客户端读取到String:" + msg.toString());
//System.out.println("从客户端读取到Object:" + ((User)msg).toString());
//测试用protostuff对对象编解码
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
System.out.println("从客户端读取到Object:" + ProtostuffUtil.deserializer(bytes, User.class));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TIPS:
无论是Netty提供的编解码器,还是第三方工具提供的编解码,发出的是二进制数组,收到的也是二进制数组,基于此我们可以实现自己的编解码器。
(二)Netty粘包拆包
TCP是一个流协议,就是没有界限的一长串二进制数据,面向流的通信是无消息保护边界的。客户端发送数据时不是直接发送到服务端,而是先发送到缓存中,TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
解决方案
1)消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
2)在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不能出现分隔符。
3)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。
Netty提供了多个解码器,可以进行分包的操作,如下:
- LineBasedFrameDecoder (回车换行分包)
- DelimiterBasedFrameDecoder(特殊分隔符分包)
- FixedLengthFrameDecoder(固定长度报文来分包)
- 继承MessageToByteEncoder自定义编解码器(根据自定义消息协议,将消息的长度,内容写出去)
样例如下:
自定义协议包
public class MyMessageProtocol {
//定义一次发送包体长度
private int len;
//一次发送包体内容
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
客户端
1.写数据包装
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0; i< 2; i++) {
String msg = "你好,我是张三!";
//创建协议包对象
MyMessageProtocol messageProtocol = new MyMessageProtocol();
messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(messageProtocol);
}
}
2.编码
public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");
out.writeInt(msg.getLen());
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
服务端
解码
public class MyMessageDecoder extends ByteToMessageDecoder {
int length = 0;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println();
System.out.println("MyMessageDecoder decode 被调用");
//需要将得到二进制字节码-> MyMessageProtocol 数据包(对象)
System.out.println(in);
if(in.readableBytes() >= 4) {
if (length == 0){
length = in.readInt();
}
if (in.readableBytes() < length) {
System.out.println("当前可读数据不够,继续等待。。");
return;
}
byte[] content = new byte[length];
if (in.readableBytes() >= length){
in.readBytes(content);
//封装成MyMessageProtocol对象,传递到下一个handler业务处理
MyMessageProtocol messageProtocol = new MyMessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
length = 0;
}
}
}
读数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
System.out.println("====服务端接收到消息如下====");
System.out.println("长度=" + msg.getLen());
System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8));
System.out.println("服务端接收到消息包数量=" + (++this.count));
}
(三)Netty服务端心跳检测机制
所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.
在Netty中, 实现心跳机制的关键是IdleStateHandler, 看下它的构造器:
public IdleStateHandler(
int readerIdleTimeSeconds,int writerIdleTimeSeconds,int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
这里解释下三个参数的含义:
readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从Channel读取到数据时, 会触发一个READER_IDLE的 IdleStateEvent 事件.
writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到Channel时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个ALL_IDLE 的 IdleStateEvent 事件
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
this.observeOutput = observeOutput;
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
1.IdleStateHandler解析
要实现Netty服务端心跳检测机制需要在服务器端的SocketChannel的ChannelPipeline中加入心态机制相关管的Handler对象--IdleStateHandler
实例,需要在服务端的ChannelInitializer中加上如下代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
1.1 channelRead方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
上述代码中主要执行了ctx.fireChannelRead(msg)
,表示该方法只是进行了透传,不做任何业务逻辑处理,让channelPipe中的下一个handler处理channelRead方法 。
1.2channelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
可以看到核心为:initialize(ctx)
initialize
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
可以看出,会触发三个Task:ReaderIdleTimeoutTask、WriterIdleTimeoutTask、AllIdleTimeoutTask,我们看一下ReaderIdleTimeoutTask的run方法
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
nextDelay -= ticksInNanos() - lastReadTime;
中ticksInNanos() - lastReadTime;
是用当前时间减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的超时事件是5s,那么nextDelay最后结果则为-1,说明读超时了,那么就会创建一个事件,然后执行channelIdle(ctx, event)
触发下一个handler的 userEventTriggered方法(心跳超时处理):
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
WriterIdleTimeoutTask与AllIdleTimeoutTask的业务逻辑和ReaderIdleTimeoutTask差不多,这边就不再赘述了。
2.Netty心跳检测示例
实现Netty服务端心跳检测机制需要在服务器端的SocketChannel的ChannelPipeline中加入IdleStateHandler外还必须在其后添加一个自定义Handeler,如下:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartBeatServerHandler());
在解析IdleStateHandler中我们知道了,IdleStateHandler的channelRead方法或触发其后面的Handler的channelRead方法,有超时发生时会触发IdleStateHandler后面Handler的userEventTriggered方法,因此重写的Handler需要直接或间接的重新channelRead和userEventTriggered方法,上面的HeartBeatHandler继承了SimpleChannelInboundHandler,重写了channelRead0和userEventTriggered方法,如下:
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if ("Heartbeat Packet".equals(s)) {
ctx.channel().writeAndFlush("ok");
} else {
System.out.println(" 其他信息处理 ... ");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
readIdleTimes++; // 读空闲的计数加1
break;
case WRITER_IDLE:
eventType = "写空闲";
// 不处理
break;
case ALL_IDLE:
eventType = "读写空闲";
// 不处理
break;
}
System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
if (readIdleTimes > 3) {
System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
ctx.channel().writeAndFlush("idle close");
ctx.channel().close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}
(四)Netty断线自动重连实现
1.客户端连接服务端失败重连
客户端连接服务端失败重连。客户端启动连接服务端时,如果网络或服务端有问题,客户端连接失败,可以重连,重连的逻辑加在客户端。
客户端通过bootstrap.connect(host, port)
进行连接,这个动作是个异步操作,返回的是一个ChannelFuture
,可以后面加上sync()
等待连接完成,或者是加上一个监听,在监听的回调方法中查看连接的结果。
客户端重连可以是连接时添加一个监听,在回调方法中判断是否连接成功,若未成功则通过延时任务,加上递归调用再次执行连接动作,实现重连,具体操作如下面 connect()
中代码:
public void connect() throws Exception {
System.out.println("netty client start。。");
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
//重连交给后端线程执行
future.channel().eventLoop().schedule(() -> {
System.err.println("重连服务端...");
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
}, 3000, TimeUnit.MILLISECONDS);
} else {
System.out.println("服务端连接成功...");
}
}
});
//对通道关闭进行监听
cf.channel().closeFuture().sync();
}
2.客户端与服务端断开连接重连
系统运行过程中客户端与服务端断开连接重连。系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive方法中进行重连。
可以将boostrap直接或间接的放入ChannelInboundHandlerAdapter的实现类中,重连时执行连接客户端代码即可。如下中,boostrap被封装在NettyClient的实例中,执行connect()
会执行boostrap.connect(host, port)
的动作
private NettyClient nettyClient;
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("运行中断开重连。。。");
nettyClient.connect();
}
四、Netty高并发高性能架构设计精髓与空轮询
(一)Netty高并发高性能架构设计精髓
对于Netty高并发高性能架构设计精髓包括如下:
高并发方面:主从Reactor线程模型、NIO多路复用非阻塞、无锁串行化设计思想
高性能方面:支持高性能序列化协议、零拷贝、ByteBuf内存池设计、灵活的TCP参数配置能力、并发优化
1.主从Reactor线程模型
Reactor模型是基于事件的响应式编程模型,Netty对JDK自带的NIO的API进行了良好的封装,其底层就是基于epoll的事件响应机制,与观察者模式很类似。Netty的线程模型为主从Reactor多线程模型,主线程组用做连接处理,从线程组用于读写处理。
2.NIO多路复用非阻塞
Netty对JDK自带的NIO的API进行了良好的封装
3.无锁串行化设计思想
在大多数场景下,并行多线程处理可以提升系统的并发性能。但是,如果对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会导致性能的下降。为了尽可能的避免锁竞争带来的性能损耗,可以通过串行化设计,即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。NIO的多路复用就是一种无锁串行化的设计思想。
为了尽可能提升性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够,但是,Netty通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。
4.支持高性能序列化协议
可以使用Netty自带的编解码器,如编解码字符串的StringEncoder和StringDecoder,及编解码对象的ObjectEncoder和ObjectDecoder;可以使用第三方提供编解码工具如protobuf实现高效的编解码,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff;当然也可以自定义编解码器。
5.零拷贝(直接内存的使用)
5.1.直接内存
直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,它对应的是机器的物理内存,某些情况下这部分内存也会被频繁地使用,而且也可能导致OutOfMemoryError异常出现。java里用DirectByteBuffer可以分配一块直接内存(堆外内存),元空间对应的内存也叫作直接内存,它们对应的都是机器的物理内存。
注意:方法区只是一种规范,在jdk8中,它位于元空间(有元空间实现),并且不再与堆连续,存在于本地内存。
5.1.1直接内存与堆内存的分配API
NIO堆内存分配API:
ByteBuffer buffer = ByteBuffer.allocate(1000);
NIO直接内存分配API:
ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
Netty堆内存分配API:
ByteBuf buf = Unpooled.copiedBuffer("hl".getBytes(CharsetUtil.UTF_8));
Netty直接内存分配API:
ByteBuf buf = Unpooled.directBuffer()
5.1.2直接内存与堆内存原理
NIO堆内存分配原理
由图中可以看到,NIO中通过HeapByteBuffer
类分配一个堆内存空间,而在HeapByteBuffer的父类ByteBuffer中有一个,byte数组final byte[] hb;
,而数据就存放在这个数组中,由此来说,NIO中堆内存实际是一个byte数组。
NIO直接内存分配原理
NIO中通过DirectByteBuffer
类分配一个直接内存,通过Unsafe的本地方法allocateMemory即base = unsafe.allocateMemory(size);
分配一个直接内存,通过Unsafe_AllocateMemory搜索,我们可以找到这个本地方法在openjdk8中的底层源码为:
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
UnsafeWrapper("Unsafe_AllocateMemory");
size_t sz = (size_t)size;
if (sz != (julong)size || size < 0) {
THROW_0(vmSymbols::java_lang_IllegalArgumentException());
}
if (sz == 0) {
return 0;
}
sz = round_to(sz, HeapWordSize);
void* x = os::malloc(sz, mtInternal);
if (x == NULL) {
THROW_0(vmSymbols::java_lang_OutOfMemoryError());
}
//Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
return addr_to_java(x);
UNSAFE_END
从底层源码中我们有这么一步操作void* x = os::malloc(sz, mtInternal);
,这步操作就是执行c语言实现的操作系统函数,直接从操作系统划出一块内存。内存分配完毕返回内存地址,然后将内存地址保存到Buffer类的long address;
中。综上可以看出,直接内存的分配实际上是在操作系统中划分指定大小的物理内存,然后将内存的地址存放在ByteBuffer中
Netty堆内存与直接内存分配原理
Netty堆内存和NIO的堆内存原理相同,底层实际上也是一个byte数组。当然Netty的直接内存实际上也是在操作系统中划分指定大小的物理内存,然后将内存的地址存放在ByteBuf中,不同的是Netty直接内存分配方法中封装了NIO直接内存方法。
5.1.3直接内存的回收
直接内存回收过程为:
ByteBuffer的gcroot变量被销毁后,其直接引用对象被回收,然后直接内存的引用就没有了,然后直接内存将被回收。
直接内存回收原理
我们看以下NIO的直接内存类DirectByteBuffer的构造方法源码:
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
//判断是否有足够的直接内存空间分配,可通过-XX:MaxDirectMemorySize=<size>参数指定直接内存最大可分配空间,如果不指定默认为最大堆内存大小,
//在分配直接内存时如果发现空间不够会显示调用System.gc()触发一次full gc回收掉一部分无用的直接内存的引用对象,同时直接内存也会被释放掉
//如果释放完分配空间还是不够会抛出异常java.lang.OutOfMemoryError
Bits.reserveMemory(size, cap);
long base = 0;
try {
// 调用unsafe本地方法分配直接内存
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
// 分配失败,释放内存
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
// 使用Cleaner机制注册内存回收处理函数,当直接内存引用对象被GC清理掉时,
// 会提前调用这里注册的释放直接内存的Deallocator线程对象的run方法
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
从源码中可以看到这一步操作Bits.reserveMemory(size, cap);
,这一段代码的作用为判断是否有足够的直接内存空间分配,如果发现空间不够会显示调用System.gc()触发一次full gc回收掉一部分无用的直接内存的引用对象,这样直接内存也会被释放掉 ,如果释放完分配空间还是不够会抛出异常java.lang.OutOfMemoryError 。注意可通过-XX:MaxDirectMemorySize=
参数指定直接内存最大可分配空间,如果不指定默认为最大堆内存大小。
在调用unsafe的本地方法后面有这么一步操作cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
,其作用是注册内存回收处理函数,使用Cleaner机制注册内存回收处理函数,当直接内存引用对象被GC清理掉时,会提前调用这里注册的释放直接内存的Deallocator线程对象的run方法。
5.1.4使用直接内存的优缺点
直接内存申请较慢,但访问效率高,而对内存分配相对快一点,但访问效率低。
优点:
- 不占用堆内存空间,减少了发生GC的可能
- java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)
缺点:
- 初始分配较慢
- 没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完。我们可以指定直接内存的最大值,通过-XX:MaxDirectMemorySize来指定,当达到阈值的时候,调用system.gc来进行一次FULL GC,间接把那些没有被使用的直接内存回收掉。
5.2.零拷贝
客户端发送数据到服务端,服务端把数据修改后回写到客户端过程如下:
堆内存操作数据过程
客户端发送数据到操作系统socket缓存区,然后数据会被拷贝到操作系统内存(即直接内存),接着数据会拷贝到JVM内存进行修改,修改后的数据又从JVM内存拷贝到直接内存,接着再拷贝到操作系统socket缓存区,最后数据被写回到客户端,这个过程经历了四次拷贝。
然而,client发送数据为用户空间,直接内存为内核空间,将数据拷贝到直接内存要由用户空间切换到内核空间,把数据从直接内存拷贝到JVM内存要由内核空间切换切换到用户空间,反过来服务端往客户端发送数据也要经过两次用户空间和内核空间的切换,空间切换实际上是非常重型的操作,这样一来性能就会很低。
直接内存操作数据过程
客户端发送数据到操作系统socket缓存区,然后数据会被拷贝到操作系统内存(即直接内存),JVM通过地址的引用直接操作直接内存中数据,修改后的数据数据再从直接内存拷贝到操作系统socket缓存区最后数据被写回到客户端,这个过程只经历了两次拷贝,并且少了两次的空间切换,性能会提升很多。
零拷贝
那什么又是零拷贝呢?零拷贝指的并不是没有拷贝,而是减少拷贝次数,通过使用直接内存,在JVM中通过直接内存的引用,直接操作直接内存中的数据,减少了数据在直接内存和JVM内存之间拷贝的次数,减少了用户态和内核态之间相互切换的次数,从而是的性能大大的提升。
零拷贝在Netty中的使用
我们看看AbstractNioByteChannel.NioByteUnsafe#read
这个方法,这个方法就是服务端处理客户端发送消息事件SelectionKey.OP_READ
时要执行的方法,如下:
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
方法中 byteBuf = allocHandle.allocate(allocator);
操作会构建一个ByteBuf对象,如果往深处查看这个方法,会发现这个ByteBuf默认分配的是直接内存,后面pipeline.fireChannelRead(byteBuf);
操作会调用我们定义的处理业务的Handler的ChannelRead方法,例如:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到客户端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
方法中的入参Object msg
即为fireChannelRead传入的ByteBuf byteBuf
,也就是说netty在读取客户端发送的消息时,利用的是零拷贝技术
6.ByteBuf内存池设计
随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer(相当于一个内存块),情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作,因为内存碎片化很严重,想找一块连续的相对较大的空间可能会花费很长时间,临时分配空间就很耗时。为了尽量重用缓冲区,Netty提供了基于ByteBuf内存池的缓冲区重用机制,也就是在内存池中事先分配多个ByteBuf,需要的时候直接从池子里获取ByteBuf使用即可,使用完毕之后就重新放回到池子里去。
Netty ByteBuf的实现:
7.灵活的TCP参数配置能力
合理设置TCP参数在某些场景下对于性能的提升可以起到显著的效果,例如接收缓冲区SO_RCVBUF和发送缓冲区SO_SNDBUF。如果设置不当,对性能的影响是非常大的。通常建议值为128K或者256K。
Netty在启动辅助类ChannelOption中可以灵活的配置TCP参数,满足不同的用户场景。
8.并发优化
高性能中间件一定会用到大量的并发,要把性能压榨到极致,同样在Netty中也使用了大量的并发编程,如:
- volatile的大量、正确使用;
- CAS和原子类的广泛使用;
- 线程安全容器的使用;
- 通过读写锁提升并发性能。
(二)ByteBuf扩容机制
如果我们需要了解ByteBuf的扩容,我们需要先了解ByteBuf中定义的几个成员变量,再从源码的角度来分析扩容。
- minNewCapacity:表用户需要写入的值大小,写入数据后writerIndex的值
- threshold:阈值,为Bytebuf内部设定容量的最大值
- maxCapacity:Netty最大能接受的容量大小,一般为int的最大值
ByteBuf核心扩容方法
进入ByteBuf源码中,深入分析其扩容方法,查看ByteBuf.writeByte()源码:
Netty的ByteBuf需要动态扩容来满足需要,扩容过程:
默认门限阈值为4MB(这个阈值是一个经验值,不同场景,可能取值不同),当需要的容量等于门限阈值,使用阈值作为新的缓存区容量 目标容量,如果大于阈值,采用在目标容量的基础上每次步进4MB的方式进行内存扩张((需要扩容值/4MB)*4MB),扩张后需要和最大内存(maxCapacity)进行比较,大于maxCapacity的话就用maxCapacity,否则使用扩容值 目标容量,如果小于阈值,采用倍增的方式,以64(字节)作为基本数值,每次翻倍增长64 -->128 --> 256,直到倍增后的结果刚好大于或等于需要的容量值。
(三)空轮询问题
空轮询问题
在NIO的API中有这么一个操作selector.select()
,执行这个操作后会阻塞当前线程,然后等待有事件响应返回,但是偶尔会出现没事件响应的时候也会返回的bug,而这个BUG是epoll的BUG,在JDK1.8中未得到解决。当遇到空轮询返回时,下次while循环到这个方法时,没有事件就不会阻塞直接返回,这样就一直循环话,就会导致cpu百分百,而解决的方式主要为重启。
空轮询问题解决
netty的解决空轮询的原理为:设置一个阈值,当空轮询次数达到阈值后就创建一个新的selector替换旧的select,然后将旧的selector上的事件转移到新的selector上,最后把旧的selector关闭掉。
如NioEventLoop#select
方法,这个方法的作用是等待事件的响应,其源码如下
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
//获取当前是否有事件就绪,该方法立即返回结果,不会阻塞;如果返回值>0,则代表存在一个或多个
selector.selectNow();
selectCnt = 1;
}
break;
}
//队列中有任务返回
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
//阻塞指定时长
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//有事件或
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
//
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
在以上的代码中给,我们可以看到在int selectedKeys = selector.select(timeoutMillis);
阻塞指定时长等待事件响应的后面有这么一个逻辑selectCnt ++;
,其作用是记录select的次数。再往后看有这么一块逻辑:
if(..){
....
}else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
当select次数到SELECTOR_AUTO_REBUILD_THRESHOLD
(默认为512次),时会执行 selector = selectRebuildSelector(selectCnt);
创建一个新的selector替换旧的select.旧的selector上的事件会转移到新的selector上,最后把旧的selector关闭掉
(四)NettyServer端源码解析