Netty之JavaNIO编程模型介绍02

简介: NIO 非阻塞 网络编程相关的(Selector、SelectionKey、ServerScoketChannel和SocketChannel) 关系梳理图


  因为篇幅问题我们继续上一篇的内容继续。

一、NIO网络编程原理分析

  NIO 非阻塞 网络编程相关的(Selector、SelectionKey、ServerScoketChannel和SocketChannel) 关系梳理图

image.png

对上图的说明:

  1. 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
  3. 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
  4. 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
  5. 进一步得到各个 SelectionKey (有事件发生)
  6. 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
  7. 可以通过 得到的 channel , 完成业务处理

二、NIO网络编程快速入门

  接下来我们通过具体的案例代码来实现网络通信

服务端:

package com.dpb.netty.nio;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * @program: netty4demo
 * @description: Nio的服务端
 * @author: 波波烤鸭
 * @create: 2019-12-28 14:17
 */
public class NioServer {
    public static void main(String[] args) throws Exception{
        //创建ServerSocketChannel -> ServerSocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //得到一个Selecor对象
        Selector selector = Selector.open();
        //绑定一个端口6666, 在服务器端监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        //把 serverSocketChannel 注册到  selector 关心 事件为 OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1
        //循环等待客户端连接
        while (true) {
            //这里我们等待1秒,如果没有事件发生, 返回
            if(selector.select(1000) == 0) { //没有事件发生
                System.out.println("服务器等待了1秒,无连接");
                continue;
            }
            //如果返回的>0, 就获取到相关的 selectionKey集合
            //1.如果返回的>0, 表示已经获取到关注的事件
            //2. selector.selectedKeys() 返回关注事件的集合
            //   通过 selectionKeys 反向获取通道
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            System.out.println("selectionKeys 数量 = " + selectionKeys.size());
            //遍历 Set<SelectionKey>, 使用迭代器遍历
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()) {
                //获取到SelectionKey
                SelectionKey key = keyIterator.next();
                //根据key 对应的通道发生的事件做相应处理
                if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接
                    //该该客户端生成一个 SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
                    //将  SocketChannel 设置为非阻塞
                    socketChannel.configureBlocking(false);
                    //将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel
                    //关联一个Buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                    System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..
                }
                if(key.isReadable()) {  //发生 OP_READ
                    //通过key 反向获取到对应channel
                    SocketChannel channel = (SocketChannel)key.channel();
                    //获取到该channel关联的buffer
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    channel.read(buffer);
                    System.out.println("form 客户端 " + new String(buffer.array(), CharsetUtil.UTF_8));
                }
                //手动从集合中移动当前的selectionKey, 防止重复操作
                keyIterator.remove();
            }
        }
    }
}

客户端代码

package com.dpb.netty.nio;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
 * @program: netty4demo
 * @description: NIO的客户端
 * @author: 波波烤鸭
 * @create: 2019-12-28 14:18
 */
public class NioClient {
    public static void main(String[] args) throws Exception{
        //得到一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //提供服务器端的ip 和 端口
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        //连接服务器
        if (!socketChannel.connect(inetSocketAddress)) {
            while (!socketChannel.finishConnect()) {
                System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
            }
        }
        //...如果连接成功,就发送数据
        String str = "hello, bobo烤鸭~";
        //Wraps a byte array into a buffer
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes(CharsetUtil.UTF_8));
        //发送数据,将 buffer 数据写入 channel
        socketChannel.write(buffer);
        System.in.read();
    }
}

效果

image.png

三、SelectionKey

  SelectionKey,表示 Selector 和网络通道的注册关系, 共四种

int OP_ACCEPT:有新的网络连接可以 accept,值为 16

int OP_CONNECT:代表连接已经建立,值为 8

int OP_READ:代表读操作,值为 1

int OP_WRITE:代表写操作,值为 4

源码中:

public static final int OP_READ = 1 << 0; 
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

SelectionKey相关方法

public abstract class SelectionKey {
    public abstract Selector selector();//得到与之关联的 Selector 对象
  public abstract SelectableChannel channel();//得到与之关联的通道
  public final Object attachment();//得到与之关联的共享数据
  public abstract SelectionKey interestOps(int ops);//设置或改变监听事件
  public final boolean isAcceptable();//是否可以 accept
  public final boolean isReadable();//是否可以读
  public final boolean isWritable();//是否可以写
}

image.png

四、ServerSocketChannel

  ServerSocketChannel 在服务器端监听新的客户端 Socket 连接

相关方法如下

public abstract class ServerSocketChannel    extends AbstractSelectableChannel    implements NetworkChannel{
//得到一个 ServerSocketChannel 通道
public static ServerSocketChannel open();
//设置服务器端端口号
public final ServerSocketChannel bind(SocketAddress local);
//设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public final SelectableChannel configureBlocking(boolean block);
//接受一个连接,返回代表这个连接的通道对象
public SocketChannel accept();
//注册一个选择器并设置监听事件
public final SelectionKey register(Selector sel, int ops);
}

image.png

五、SocketChannel

  SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。

相关方法如下

public abstract class SocketChannel    extends AbstractSelectableChannel    implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{
//得到一个 SocketChannel 通道
public static SocketChannel open();
//设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public final SelectableChannel configureBlocking(boolean block);
//连接服务器
public boolean connect(SocketAddress remote);
//如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public boolean finishConnect();
public int write(ByteBuffer src);//往通道里写数据
public int read(ByteBuffer dst);//从通道里读数据
//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
public final SelectionKey register(Selector sel, int ops, Object att);
public final void close();//关闭通道
}

image.png

六、群聊系统

  接下来提供一个群聊系统的案例的简单代码。

  1. 编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
    实现多人群聊
  2. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  3. 客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
  4. 目的:进一步理解NIO非阻塞网络编程机制

服务端代码

package com.dpb.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class GroupChatServer {
    //定义属性
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;
    //构造器
    //初始化工作
    public GroupChatServer() {
        try {
            //得到选择器
            selector = Selector.open();
            //ServerSocketChannel
            listenChannel =  ServerSocketChannel.open();
            //绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞模式
            listenChannel.configureBlocking(false);
            //将该listenChannel 注册到selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
    //监听
    public void listen() {
        System.out.println("监听线程: " + Thread.currentThread().getName());
        try {
            //循环处理
            while (true) {
                int count = selector.select();
                if(count > 0) {//有事件处理
                    //遍历得到selectionKey 集合
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        //取出selectionkey
                        SelectionKey key = iterator.next();
                        //监听到accept
                        if(key.isAcceptable()) {
                            SocketChannel sc = listenChannel.accept();
                            sc.configureBlocking(false);
                            //将该 sc 注册到seletor
                            sc.register(selector, SelectionKey.OP_READ);
                            //提示
                            System.out.println(sc.getRemoteAddress() + " 上线 ");
                        }
                        if(key.isReadable()) { //通道发送read事件,即通道是可读的状态
                            //处理读 (专门写方法..)
                            readData(key);
                        }
                        //当前的key 删除,防止重复处理
                        iterator.remove();
                    }
                } else {
                    System.out.println("等待....");
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            //发生异常处理....
        }
    }
    //读取客户端消息
    private void readData(SelectionKey key) {
        //取到关联的channle
        SocketChannel channel = null;
        try {
           //得到channel
            channel = (SocketChannel) key.channel();
            //创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            //根据count的值做处理
            if(count > 0) {
                //把缓存区的数据转成字符串
                String msg = new String(buffer.array());
                //输出该消息
                System.out.println("form 客户端: " + msg);
                //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                sendInfoToOtherClients(msg, channel);
            }
        }catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress() + " 离线了..");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            }catch (IOException e2) {
                e2.printStackTrace();;
            }
        }
    }
    //转发消息给其它客户(通道)
    private void sendInfoToOtherClients(String msg, SocketChannel self ) throws  IOException{
        System.out.println("服务器转发消息中...");
        System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName());
        //遍历 所有注册到selector 上的 SocketChannel,并排除 self
        for(SelectionKey key: selector.keys()) {
            //通过 key  取出对应的 SocketChannel
            Channel targetChannel = key.channel();
            //排除自己
            if(targetChannel instanceof  SocketChannel && targetChannel != self) {
                //转型
                SocketChannel dest = (SocketChannel)targetChannel;
                //将msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                //将buffer 的数据写入 通道
                dest.write(buffer);
            }
        }
    }
    public static void main(String[] args) {
        //创建服务器对象
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }
}
//可以写一个Handler
class MyHandler {
    public void readData() {
    }
    public void sendInfoToOtherClients(){
    }
}

客户端代码

package com.dpb.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class GroupChatClient {
    //定义相关的属性
    private final String HOST = "127.0.0.1"; // 服务器的ip
    private final int PORT = 6667; //服务器端口
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;
    //构造器, 完成初始化工作
    public GroupChatClient() throws IOException {
        selector = Selector.open();
        //连接服务器
        socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //将channel 注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        //得到username
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username + " is ok...");
    }
    //向服务器发送消息
    public void sendInfo(String info) {
        info = username + " 说:" + info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
    //读取从服务器端回复的消息
    public void readInfo() {
        try {
            int readChannels = selector.select();
            if(readChannels > 0) {//有可以用的通道
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if(key.isReadable()) {
                        //得到相关的通道
                       SocketChannel sc = (SocketChannel) key.channel();
                       //得到一个Buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //读取
                        sc.read(buffer);
                        //把读到的缓冲区的数据转成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    }
                }
                iterator.remove(); //删除当前的selectionKey, 防止重复操作
            } else {
                //System.out.println("没有可以用的通道...");
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception {
        //启动我们客户端
        GroupChatClient chatClient = new GroupChatClient();
        //启动一个线程, 每个3秒,读取从服务器发送数据
        new Thread() {
            public void run() {
                while (true) {
                    chatClient.readInfo();
                    try {
                        Thread.currentThread().sleep(3000);
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
        //发送数据给服务器端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }
    }
}

效果

image.png

七、NIO与零拷贝

零拷贝基本介绍

  零拷贝是网络编程的关键,很多性能优化都离不开。

  在 Java 程序中,常用的零拷贝有 mmap(内存映射) 和 sendFile。那么,他们在 OS 里,到底是怎么样的一个的设计?我们分析 mmap 和 sendFile 这两个零拷贝

  另外我们看下NIO 中如何使用零拷贝

传统IO数据读写

  Java 传统 IO 和 网络编程的一段代码

File file = new File("test.txt");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
byte[] arr = new byte[(int) file.length()];
raf.read(arr);
Socket socket = new ServerSocket(8080).accept();
socket.getOutputStream().write(arr);

image.png

DMA: direct

memory access

直接内存拷贝(不使用CPU)

mmap 优化

  mmap 通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户控件的拷贝次数。如下图

mmap示意图

image.png

sendFile 优化

  Linux 2.1 版本 提供了 sendFile 函数,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到 Socket Buffer,同时,由于和用户态完全无关,就减少了一次上下文切换

示意图和小结

image.png

提示:零拷贝从操作系统角度,是没有cpu 拷贝

Linux 在 2.4 版本中,做了一些修改,避免了从内核缓冲区拷贝到 Socket buffer 的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝。具体如下图和小结:

image.png

这里其实有 一次cpu 拷贝

kernel buffer -> socket buffer

但是,拷贝的信息很少,比如

lenght , offset , 消耗低,可以忽略

零拷贝的再次理解

我们说零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有 kernel buffer 有一份数据)。

零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的 CPU 缓存伪共享以及无 CPU 校验和计算。

mmap 和 sendFile 的区别

  1. mmap 适合小数据量读写,sendFile 适合大文件传输。
  2. mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。
  3. sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)。

NIO 零拷贝案例

NewIOServer

package com.dpb.netty.nio.zerocopy;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
//服务器
public class NewIOServer {
    public static void main(String[] args) throws Exception {
        InetSocketAddress address = new InetSocketAddress(7001);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(address);
        //创建buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int readcount = 0;
            while (-1 != readcount) {
                try {
                    readcount = socketChannel.read(byteBuffer);
                }catch (Exception ex) {
                   // ex.printStackTrace();
                    break;
                }
                //
                byteBuffer.rewind(); //倒带 position = 0 mark 作废
            }
        }
    }
}

NewIOClient

package com.dpb.netty.nio.zerocopy;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class NewIOClient {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 7001));
        String filename = "protoc-3.6.1-win32.zip";
        //得到一个文件channel
        FileChannel fileChannel = new FileInputStream(filename).getChannel();
        //准备发送
        long startTime = System.currentTimeMillis();
        //在linux下一个transferTo 方法就可以完成传输
        //在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要
        //传输时的位置 =》 课后思考...
        //transferTo 底层使用到零拷贝
        long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
        System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));
        //关闭
        fileChannel.close();
    }
}

好了本文就介绍到此~


相关文章
|
4天前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
3月前
|
消息中间件 监控 Java
滴滴面试:谈谈你对Netty线程模型的理解?
Netty 线程模型是指 Netty 框架为了提供高性能、高并发的网络通信,而设计的管理和利用线程的策略和机制。 **Netty 线程模型被称为 Reactor(响应式)模型/模式,它是基于 NIO 多路复用模型的一种升级,它的核心思想是将 IO 事件和业务处理进行分离,使用一个或多个线程来执行任务的一种机制。** ## 1.**Reactor三大组件** Reactor 包含以下三大组件: ![image.png](https://cdn.nlark.com/yuque/0/2024/png/92791/1717079218890-89000a00-48bc-4a1a-b87e-e1b6
46 2
|
网络协议
netty编程实战02-创建一个带有连接重试的tcp客户端程序
netty编程实战02-创建一个带有连接重试的tcp客户端程序
188 0
|
10月前
|
缓存 网络协议 前端开发
从BIO到NIO在到Netty线程模型详解
从BIO到NIO在到Netty线程模型详解
157 1
|
10月前
|
Java 容器
【深入研究NIO与Netty线程模型的源码】
【深入研究NIO与Netty线程模型的源码】
|
监控 网络协议 Dubbo
Netty入门到超神系列-Netty介绍和线程模型
经过前面章节的学习你应该能感受到NIO的问题,就是类比较多,方法也比较多,而且复杂,开发工作量和难度都非常大,还需要考虑网络问题、数据丢包和异常流的处理等等。 NIO是底层API,它的实现依赖于操作系统针对IO操作的APIs,使用NIO会经常发现代码在Linux上正常运行,但在Windows上就会出现问题。JDK的NIO还有一个Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。 总之直接使用Java NIO会面临一系列的问题,Netty的其出现就是为了解决NIO的不足
108 0
|
12月前
|
Java
Netty中提供了哪些线程模型?
最近,我更新了一些Netty相关的内容,于是有很多粉丝开始私信问我一些关于Netty的问题。今天,给大家分享一个大家问得比较多问题,Netty中提供了哪些线程模型?
60 0
|
缓存 Java 测试技术
Netty实战(七)EventLoop和线程模型
简单地说,线程模型指定了操作系统、编程语言、框架或者应用程序的上下文中的线程管理的关键方面。
137 0
|
网络协议 Java
netty编程实战01-创建一个tcp服务端程序
netty编程实战01-创建一个tcp服务端程序
230 0
|
编解码 弹性计算 缓存
Netty源码和Reactor模型
Netty源码和Reactor模型
85 0