因为篇幅问题我们继续上一篇的内容继续。
一、NIO网络编程原理分析
NIO 非阻塞 网络编程相关的(Selector、SelectionKey、ServerScoketChannel和SocketChannel) 关系梳理图
对上图的说明:
- 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
- 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
- 可以通过 得到的 channel , 完成业务处理
二、NIO网络编程快速入门
接下来我们通过具体的案例代码来实现网络通信
服务端:
package com.dpb.netty.nio; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @program: netty4demo * @description: Nio的服务端 * @author: 波波烤鸭 * @create: 2019-12-28 14:17 */ public class NioServer { public static void main(String[] args) throws Exception{ //创建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个Selecor对象 Selector selector = Selector.open(); //绑定一个端口6666, 在服务器端监听 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //设置为非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1 //循环等待客户端连接 while (true) { //这里我们等待1秒,如果没有事件发生, 返回 if(selector.select(1000) == 0) { //没有事件发生 System.out.println("服务器等待了1秒,无连接"); continue; } //如果返回的>0, 就获取到相关的 selectionKey集合 //1.如果返回的>0, 表示已经获取到关注的事件 //2. selector.selectedKeys() 返回关注事件的集合 // 通过 selectionKeys 反向获取通道 Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println("selectionKeys 数量 = " + selectionKeys.size()); //遍历 Set<SelectionKey>, 使用迭代器遍历 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { //获取到SelectionKey SelectionKey key = keyIterator.next(); //根据key 对应的通道发生的事件做相应处理 if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接 //该该客户端生成一个 SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode()); //将 SocketChannel 设置为非阻塞 socketChannel.configureBlocking(false); //将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel //关联一个Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4.. } if(key.isReadable()) { //发生 OP_READ //通过key 反向获取到对应channel SocketChannel channel = (SocketChannel)key.channel(); //获取到该channel关联的buffer ByteBuffer buffer = (ByteBuffer)key.attachment(); channel.read(buffer); System.out.println("form 客户端 " + new String(buffer.array(), CharsetUtil.UTF_8)); } //手动从集合中移动当前的selectionKey, 防止重复操作 keyIterator.remove(); } } } }
客户端代码
package com.dpb.netty.nio; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * @program: netty4demo * @description: NIO的客户端 * @author: 波波烤鸭 * @create: 2019-12-28 14:18 */ public class NioClient { public static void main(String[] args) throws Exception{ //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务器端的ip 和 端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); //连接服务器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作.."); } } //...如果连接成功,就发送数据 String str = "hello, bobo烤鸭~"; //Wraps a byte array into a buffer ByteBuffer buffer = ByteBuffer.wrap(str.getBytes(CharsetUtil.UTF_8)); //发送数据,将 buffer 数据写入 channel socketChannel.write(buffer); System.in.read(); } }
效果
三、SelectionKey
SelectionKey
,表示 Selector 和网络通道的注册关系, 共四种
int OP_ACCEPT:有新的网络连接可以 accept,值为 16
int OP_CONNECT:代表连接已经建立,值为 8
int OP_READ:代表读操作,值为 1
int OP_WRITE:代表写操作,值为 4
源码中:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
SelectionKey相关方法
public abstract class SelectionKey { public abstract Selector selector();//得到与之关联的 Selector 对象 public abstract SelectableChannel channel();//得到与之关联的通道 public final Object attachment();//得到与之关联的共享数据 public abstract SelectionKey interestOps(int ops);//设置或改变监听事件 public final boolean isAcceptable();//是否可以 accept public final boolean isReadable();//是否可以读 public final boolean isWritable();//是否可以写 }
四、ServerSocketChannel
ServerSocketChannel 在服务器端监听
新的客户端 Socket 连接
相关方法如下
public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{ //得到一个 ServerSocketChannel 通道 public static ServerSocketChannel open(); //设置服务器端端口号 public final ServerSocketChannel bind(SocketAddress local); //设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式 public final SelectableChannel configureBlocking(boolean block); //接受一个连接,返回代表这个连接的通道对象 public SocketChannel accept(); //注册一个选择器并设置监听事件 public final SelectionKey register(Selector sel, int ops); }
五、SocketChannel
SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。
相关方法如下
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{ //得到一个 SocketChannel 通道 public static SocketChannel open(); //设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式 public final SelectableChannel configureBlocking(boolean block); //连接服务器 public boolean connect(SocketAddress remote); //如果上面的方法连接失败,接下来就要通过该方法完成连接操作 public boolean finishConnect(); public int write(ByteBuffer src);//往通道里写数据 public int read(ByteBuffer dst);//从通道里读数据 //注册一个选择器并设置监听事件,最后一个参数可以设置共享数据 public final SelectionKey register(Selector sel, int ops, Object att); public final void close();//关闭通道 }
六、群聊系统
接下来提供一个群聊系统的案例的简单代码。
- 编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
实现多人群聊- 服务器端:可以监测用户上线,离线,并实现消息转发功能
- 客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
- 目的:进一步理解NIO非阻塞网络编程机制
服务端代码
package com.dpb.netty.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class GroupChatServer { //定义属性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; //构造器 //初始化工作 public GroupChatServer() { try { //得到选择器 selector = Selector.open(); //ServerSocketChannel listenChannel = ServerSocketChannel.open(); //绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞模式 listenChannel.configureBlocking(false); //将该listenChannel 注册到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (IOException e) { e.printStackTrace(); } } //监听 public void listen() { System.out.println("监听线程: " + Thread.currentThread().getName()); try { //循环处理 while (true) { int count = selector.select(); if(count > 0) {//有事件处理 //遍历得到selectionKey 集合 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //取出selectionkey SelectionKey key = iterator.next(); //监听到accept if(key.isAcceptable()) { SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); //将该 sc 注册到seletor sc.register(selector, SelectionKey.OP_READ); //提示 System.out.println(sc.getRemoteAddress() + " 上线 "); } if(key.isReadable()) { //通道发送read事件,即通道是可读的状态 //处理读 (专门写方法..) readData(key); } //当前的key 删除,防止重复处理 iterator.remove(); } } else { System.out.println("等待...."); } } }catch (Exception e) { e.printStackTrace(); }finally { //发生异常处理.... } } //读取客户端消息 private void readData(SelectionKey key) { //取到关联的channle SocketChannel channel = null; try { //得到channel channel = (SocketChannel) key.channel(); //创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根据count的值做处理 if(count > 0) { //把缓存区的数据转成字符串 String msg = new String(buffer.array()); //输出该消息 System.out.println("form 客户端: " + msg); //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } }catch (IOException e) { try { System.out.println(channel.getRemoteAddress() + " 离线了.."); //取消注册 key.cancel(); //关闭通道 channel.close(); }catch (IOException e2) { e2.printStackTrace();; } } } //转发消息给其它客户(通道) private void sendInfoToOtherClients(String msg, SocketChannel self ) throws IOException{ System.out.println("服务器转发消息中..."); System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName()); //遍历 所有注册到selector 上的 SocketChannel,并排除 self for(SelectionKey key: selector.keys()) { //通过 key 取出对应的 SocketChannel Channel targetChannel = key.channel(); //排除自己 if(targetChannel instanceof SocketChannel && targetChannel != self) { //转型 SocketChannel dest = (SocketChannel)targetChannel; //将msg 存储到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //将buffer 的数据写入 通道 dest.write(buffer); } } } public static void main(String[] args) { //创建服务器对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } } //可以写一个Handler class MyHandler { public void readData() { } public void sendInfoToOtherClients(){ } }
客户端代码
package com.dpb.netty.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class GroupChatClient { //定义相关的属性 private final String HOST = "127.0.0.1"; // 服务器的ip private final int PORT = 6667; //服务器端口 private Selector selector; private SocketChannel socketChannel; private String username; //构造器, 完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel 注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok..."); } //向服务器发送消息 public void sendInfo(String info) { info = username + " 说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); }catch (IOException e) { e.printStackTrace(); } } //读取从服务器端回复的消息 public void readInfo() { try { int readChannels = selector.select(); if(readChannels > 0) {//有可以用的通道 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isReadable()) { //得到相关的通道 SocketChannel sc = (SocketChannel) key.channel(); //得到一个Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //读取 sc.read(buffer); //把读到的缓冲区的数据转成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); //删除当前的selectionKey, 防止重复操作 } else { //System.out.println("没有可以用的通道..."); } }catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //启动我们客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程, 每个3秒,读取从服务器发送数据 new Thread() { public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); }catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } } }
效果
七、NIO与零拷贝
零拷贝基本介绍
零拷贝是网络编程的关键,很多性能优化都离不开。
在 Java 程序中,常用的零拷贝有 mmap(内存映射) 和 sendFile。那么,他们在 OS 里,到底是怎么样的一个的设计?我们分析 mmap 和 sendFile 这两个零拷贝
另外我们看下NIO 中如何使用零拷贝
传统IO数据读写
Java 传统 IO 和 网络编程的一段代码
File file = new File("test.txt"); RandomAccessFile raf = new RandomAccessFile(file, "rw"); byte[] arr = new byte[(int) file.length()]; raf.read(arr); Socket socket = new ServerSocket(8080).accept(); socket.getOutputStream().write(arr);
DMA: direct
memory access
直接内存拷贝(不使用CPU)
mmap 优化
mmap
通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户控件的拷贝次数。如下图
mmap示意图
sendFile 优化
Linux 2.1 版本 提供了 sendFile 函数,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到 Socket Buffer,同时,由于和用户态完全无关,就减少了一次上下文切换
示意图和小结
提示:零拷贝从操作系统角度,是没有cpu 拷贝
Linux 在 2.4 版本中,做了一些修改,避免了从内核缓冲区拷贝到 Socket buffer 的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝。具体如下图和小结:
这里其实有 一次cpu 拷贝
kernel buffer -> socket buffer
但是,拷贝的信息很少,比如
lenght , offset , 消耗低,可以忽略
零拷贝的再次理解
我们说零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有 kernel buffer 有一份数据)。
零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的 CPU 缓存伪共享以及无 CPU 校验和计算。
mmap 和 sendFile 的区别
- mmap 适合小数据量读写,sendFile 适合大文件传输。
- mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。
- sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)。
NIO 零拷贝案例
NewIOServer
package com.dpb.netty.nio.zerocopy; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; //服务器 public class NewIOServer { public static void main(String[] args) throws Exception { InetSocketAddress address = new InetSocketAddress(7001); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(address); //创建buffer ByteBuffer byteBuffer = ByteBuffer.allocate(4096); while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); int readcount = 0; while (-1 != readcount) { try { readcount = socketChannel.read(byteBuffer); }catch (Exception ex) { // ex.printStackTrace(); break; } // byteBuffer.rewind(); //倒带 position = 0 mark 作废 } } } }
NewIOClient
package com.dpb.netty.nio.zerocopy; import java.io.FileInputStream; import java.net.InetSocketAddress; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; public class NewIOClient { public static void main(String[] args) throws Exception { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("localhost", 7001)); String filename = "protoc-3.6.1-win32.zip"; //得到一个文件channel FileChannel fileChannel = new FileInputStream(filename).getChannel(); //准备发送 long startTime = System.currentTimeMillis(); //在linux下一个transferTo 方法就可以完成传输 //在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要 //传输时的位置 =》 课后思考... //transferTo 底层使用到零拷贝 long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel); System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime)); //关闭 fileChannel.close(); } }
好了本文就介绍到此~