《Java I/O 模型》Java NIO

简介: 《Java I/O 模型》Java NIO

🚀1. Java NIO 介绍

🎁Java NIO(New IO) 又被称为 Java Non-Blocking IO,是在 Java 1.4 开始引入的一个新的 IO API. NIO 支持面向缓冲区的、基于通道的 IO 操作,以更高效的方式进行文件的读写操作。传统 IO 的读写操作只能阻塞执行,线程在读写期间不能干其他事情。例如,调用 socket.read() 时,如果服务器一直没有数据传输过来,线程就一直阻塞,而 NIO 可以配置 socket 为非阻塞模式。


NIO 的非阻塞模式详解


NIO 非阻塞读,使一个线程从某个通道发送请求或读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变为可读取之前,该线程可以继续做其他的事情。

NIO 非阻塞写,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做其他事情。

NIO 和 BIO 的比较


BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,IO 块比 IO 流效率更高。

BIO 是阻塞的,NIO 是非阻塞的。

BIO 基于字节流和字符流进行操作,NIO 基于通道(Channel)和缓存区(Buffer)进行操作,数据总是从通道读取到缓存区中,或者从缓存区写入到通道中。

NIO BIO
面向缓存区(Buffer) 面向流(Stream)
非阻塞IO(Non-Blocking IO) 阻塞IO(Blocking IO)
选择器(Selector)

NIO 有三大核心组件

通道(Channel):Java NIO 的通道类似流,但又有些不同,既可以从通道中读取数据,又可以写数据到通道中,但流的读写通常都是单向的。通道可以非阻塞读取和写入数据,通道支持读取或写入缓存区,也支持异步地读写。

缓存区(Buffer):缓存区本质上是一块可以写入数据并从中读取数据的内存,这块内存被包装成 NIO Buffer 对象,并提供了一组方法用来方便的访问这块内存。

选择器(Selector):可以监听一个或多个 NIO 通道,并确定哪些通道已经准备好进行读取或写入,单个线程可以管理多个通道,从而管理多个网络连接提高效率。2e718f6de6d94987ae2f2182c528d9c8.png

🚀2. 缓存区(Buffer)

缓存区是一个用于特定基本类型的容器,由 java.nio 包定义,所有缓存区都是 Buffer 抽象类的子类。主要用于与 NIO 通道进行交互,数据从通道读入缓存区或者从缓存区写入通道中。2e718f6de6d94987ae2f2182c528d9c8.png

缓存区就像一个数组,可以保存多个相同类型的数据,根据数据类型划分,有以下 Buffer 常用子类

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

虽然上述 Buffer 类各自管理的数据类型不同,但是都是采用类似的方式管理数据,通过下面的方法获取一个 Buffer 对象

static xxxBuffer allocate(int capacity):创建一个容量为capacity的xxxBuffer对象

缓存区的基本属性


容量(capacity):作为一个内存块,Buffer 具有固定的大小,也成为“容量”,缓冲区容量不能为负,并且创建后不能更改。

限制(limit):表示缓存区中可以操作数据的大小(limit 后数据不能进行读写),缓存区的限制不能为负,并且不能大于其容量。写入模式下,限制等于 buffer 的容量,读取模式下,limit 等于写入的数据量。

位置(position):下一个要读取或写入的数据的索引,缓存区的位置不能为负,并且不能大于其限制。

标记(mark)与重置(reset):标记是一个索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的位置,之后通过调用 reset() 方法恢复到这个位置。标记、位置、限制、容量遵循不变式0<=mark<=position<=limit<=capacity2e718f6de6d94987ae2f2182c528d9c8.png

缓存区常见方法

Buffer clear() //清空缓存区并返回对缓存区的引用
Buffer flip() //为将缓存区的界限设置为当前位置,并将当前位置重置为0
int capacity() //返回Buffer的capacity大小 
boolean hasRemaining()//判断缓存区中是否还有元素 
int limit() //返回缓存区的界限(limit)的位置 
Buffer limit(int n)//将设置缓存区界限为n,并返回一个具有新limit的缓存区对象 
Buffer mark() //对缓存区设置标记 
int position() //返回缓存区的当前位置position 
Buffer position(int n)//将设置缓存区的当前位置为n,并返回修改后的缓存区对象 
int remaining() //返回position和limit之间的元素个数 
Buffer reset() //将位置position转到以前设置的mark所在的位置 
Buffer rewind() //将位置设为为0.取消设置的mark

缓存区的数据操作

Buffer  所有子类提供了两个用于数据操作的方法:get() put() 方法获取 Buffer 中的数据
get(): 读取单个字节
get(byte[] dst): 批量读取多个字节到 dst 中
get(int index): 读取指定索引位置的字节(不会移动position)
放入数据到 Buffer 中
put(byte b): 将单个字节写入缓存区的当前位置
put(byte[] src): 将src中的字节写入缓存区的当前位置
put(int index, byte b): 将指定字节写入缓存区的索引位置(不会移动position)

使用缓存区读写数据一般遵循以下四个步骤

  1. 写入数据到缓存区
  2. 调用 buffer.flip() 方法,转换为读取模式
  3. 从缓存区中读取数据
  4. 调用 buffer.clear() 方法或 buffer.compact() 方法清除缓存区

案例演示

public class BufferTest {
    @Test
    public void test01() {
        //1.分配一个缓存区,容量设置为10
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());   //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");
        //put往缓存区中添加数据
        String name = "java nio";
        buffer.put(name.getBytes());
        System.out.println(buffer.position()); //8
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");
        //3.flip()为将缓存区的界限设置为当前位置,并将当前位置设置为0 可读模式
        buffer.flip();
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());    //8
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");
        //4.get数据的获取
        char ch = (char) buffer.get();
        System.out.println(ch);  //j
        System.out.println(buffer.position()); //1
        System.out.println(buffer.limit());    //8
        System.out.println(buffer.capacity()); //10
    }
    @Test
    public void test02() {
        //分配一个缓存区,容量设置为10
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());   //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");
        //put往缓存区中添加数据
        String name = "java nio";
        buffer.put(name.getBytes());
        System.out.println(buffer.position()); //8
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");
        //clear清除缓存区中的数据
        buffer.clear();
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println((char)buffer.get()); //j, 数据并没有被恢复,只是恢复了position的位置
        System.out.println("----------------------");
        //定义一个缓存区
        ByteBuffer buf = ByteBuffer.allocate(10);
        String n = "javanio";
        buf.put(n.getBytes());
        buf.flip();
        //读取数据
        byte[] b = new byte[2];
        buf.get(b);
        String rs = new String(b);
        System.out.println(rs); //ja
        System.out.println(buffer.position()); //1
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");
        buf.mark(); //标记此刻这个位置:2
        byte[] b2 = new byte[3];
        buf.get(b2);
        System.out.println(new String(b2)); //van
        System.out.println(buffer.position()); //1
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");
        buf.reset();  //回到标记位置
        if (buf.hasRemaining()) {
            System.out.println(buf.remaining()); //5
        }
    }
}

直接内存与非直接内存


byte buffer 可以是两种类型,一种是基于直接内存(也就是非堆内存),另一种是非直接内存(也就是堆内存)。对于直接内存来说,JVM 将会在 IO 操作上具有更高的性能,因为它是直接作用于本地系统的 IO 操作;而非直接内存,如果要进行 IO 操作,会先从本进程内存复制到直接内存,再利用本地 IO 处理。

从数据流的角度,非直接内存是下面这样的作用链

本地IO---->直接内存---->非直接内存---->直接内存---->本地IO

而直接内存的作用链为

本地IO---->直接内存---->本地IO

在做 IO 处理时,例如通过网络发送大量数据,直接内存具有更高的效率,因为直接内存使用 allocateDirect 创建。虽然它比申请普通的堆内存需要耗费更高的性能,但是这部分的数据是在 JVM 之外的,它不会占用应用的内存。


因此,如果有很大的数据需要缓存,并且数据生命周期很长,那么使用直接内存比较合适。如果不能带来很明显的性能提升,还是推荐直接使用堆内存,字节缓冲区是直接缓冲区还是非字节缓冲区,可以通过调用 isDirect() 方法确定。

@Test
 public void test03() {
     //创建一个非直接内存的缓存区
     ByteBuffer buffer = ByteBuffer.allocate(1024);
     //buffer.isDirect()用于判断是否为直接内存
     System.out.println(buffer.isDirect()); //false
     System.out.println("----------------------");
     //创建一个直接内存的缓存区
     ByteBuffer buffer2 = ByteBuffer.allocateDirect(1024);
     System.out.println(buffer2.isDirect()); //true
 }

直接内存使用场景

  • 数据量很大并且这些数据的生命周期很长
  • 频繁的 IO 操作,例如网络并发场景

🚀3. 通道(Channel)

通道(Channel)由 java.nio.channels 包定义,表示 IO 源与目标打开的连接,它类似于传统的“流”,只不过通道(Channel)本身不能直接访问数据,只能与 Buffer 进行交互。


NIO 的通道类似于流,但有些区别如下


通道可以同时进行读写,而流只能读或者只能写

通道可以实现异步读写数据,流只能同步读或同步写

通道可以从缓存读数据,也可以写数据到缓存

通道(Channel)在 NIO 中是一个接口

public interface Channel extends Closeable()

常用的 Channel 实现类


FileChannel:用于读取、写入、映射和操作文件的通道

DatagramChannel:通过 UDP 读写网络中的数据通道

SocketChannel:通过 TCP 读写网络中的数据通道

ServerSocketChannel:可以监听新建立的 TCP 连接,对每一个新建立的连接都会创建一个 SocketChannel

对于 FileChannel 类,获取通道的一种方式是对支持通道的对象调用 getChannel() 方法,支持通道的类如下


FileInputStream

FileOutputStream

RandomAccessFile

DatagramSocket

Socket

ServerSocket

获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道或通过通道的静态方法 open() 打开并返回指定通道。


FileChannel 的常用方法

int read(ByteBuffer dst)  从Channel当中读取数据至ByteBuffer
long read(ByteBuffer[] dsts) 将Channel当中的数据“分散”至ByteBuffer[]
int write(ByteBuffer src)   将ByteBuffer当中的数据写入到Channel
long write(ByteBuffer[] srcs) 将ByteBuffer[]当中的数据“聚集”到Channel
long position()   返回此通道的文件位置
FileChannel position(long p)  设置此通道的文件位置
long size()   返回此通道的文件的当前大小
FileChannel truncate(long s)   将此通道的文件截取为给定大小
void force(boolean meteData)   强制将所有对此通道的文件更新写入到存储设备中

案例1-本地文件写数据

public class ChannelTest {
    @Test
    public void write() {
        try {
            //1.字节输出流通向目标文件
            FileOutputStream fos = new FileOutputStream("data01.txt");
            //2.得到字节输出流对应的通道Channel
            FileChannel channel = fos.getChannel();
            //3.分配缓存区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put("hello, friends".getBytes());
            //4.把缓存区切换为写模式
            buffer.flip();
            channel.write(buffer);
            channel.close();
            System.out.println("写数据到文件中!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果截图2e718f6de6d94987ae2f2182c528d9c8.png

案例2-本地文件读数据

@Test
public void read() throws Exception {
    //1.定义一个文件字节输入流与源文件接通
    FileInputStream is = new FileInputStream("data01_txt");
    //2.需要得到文件字节输入流的文件通道
    FileChannel channel = is.getChannel();
    //3.定义一个缓存区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //4.读取数据到缓存区
    channel.read(buffer);
    buffer.flip(); //归位
    //5.读取出缓存区中的数据并输出即可
    String rs = new String(buffer.array(), 0, buffer.remaining());
    System.out.println(rs);
}

运行结果如下

hello, friends

案例3-使用 Buffer 完成文件复制

@Test
public void copy() throws Exception {
//源文件
File srcFile = new File("C:\\Users\\Desktop\\1.jpg");
File destFile = new File("C:\\Users\\Desktop\\1_copy.jpg");
//得到一个字节输出流、字节输入流
FileInputStream fis = new FileInputStream(srcFile);
FileOutputStream fos = new FileOutputStream(destFile);
//得到文件通道
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
//分配缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
    //必须先清空缓存区然后再写入数据到缓存区
    buffer.clear();
    //开始读取一次数据
    int flag = fisChannel.read(buffer);
    if (flag == -1) {
        break;
    }
    //已经读取了数据,把缓存区的模式切换为可读模式
    buffer.flip();
    fosChannel.write(buffer);
}
fisChannel.close();
fosChannel.close();
System.out.println("复制完成");
}

案例4-分散(Scatter)和聚集(Gatter)

  • 分散读取(Scatter):把 Channel 通道的数据读取到多个缓存区中
  • 聚集写入(Gathering):是指将多个 Buffer 中的数据聚集到 Channel
@Test
public void test() throws Exception {
   //1.字节输入管道
   FileInputStream is = new FileInputStream("data01.txt");
   FileChannel isChannel = is.getChannel();
   //2.字节输出管道
   FileOutputStream os = new FileOutputStream("data02.txt");
   FileChannel osChannel = os.getChannel();
   //3.定义多个缓存区做数据分散
   ByteBuffer buffer1 = ByteBuffer.allocate(4);
   ByteBuffer buffer2 = ByteBuffer.allocate(1024);
   ByteBuffer[] buffers = {buffer1, buffer2};
   //4.从通道中读取数据分散到多个缓存区
   isChannel.read(buffers);
   //5.从每个缓存区中查询是否有数据读取到
   for (ByteBuffer buffer : buffers) {
       buffer.flip(); //切换到读数据模式
       System.out.println(new String(buffer.array(), 0, buffer.remaining()));
   }
   //6.聚集写入到通道
   osChannel.write(buffers);
   isChannel.close();
   osChannel.close();
   System.out.println("文件复制");
}

运行结果截图如下2e718f6de6d94987ae2f2182c528d9c8.png

案例5-从目标通道中去复制原通道数据

@Test
public void test02() throws Exception {
    //1.字节输入通道
    FileInputStream is = new FileInputStream("data01.txt");
    FileChannel isChannel = is.getChannel();
    //2.字节输出管道
    FileOutputStream os = new FileOutputStream("data03.txt");
    FileChannel osChannel = os.getChannel(); //目标通道
    //3.复制数据
    osChannel.transferFrom(isChannel, isChannel.position(), isChannel.size());
    isChannel.close();
    osChannel.close();
    System.out.println("复制完成");
}

运行结果截图如下

2e718f6de6d94987ae2f2182c528d9c8.png案例6-把原通道数据复制到目标通道

@Test
public void test03() throws Exception{
    //1.字节输入通道
    FileInputStream is = new FileInputStream("data01.txt");
    FileChannel isChannel = is.getChannel();
    //2.字节输出管道
    FileOutputStream os = new FileOutputStream("data04.txt");
    FileChannel osChannel = os.getChannel();
    //3.复制数据
    isChannel.transferTo(isChannel.position(), isChannel.size(), osChannel);
    isChannel.close();
    osChannel.close();
    System.out.println("复制完成");
}

运行结果截图如下2e718f6de6d94987ae2f2182c528d9c8.png

🚀4. 选择器(Selector)

选择器(Selector) 是 SelectableChannel 对象的多路复用器,它可以同时监控多个SelectableChannel 的 IO 状况,利用选择器(Selector) 可以使一个单独的线程管理多个通道(Channel).


Selector 是非阻塞 IO 的核心


Java 的 NIO 使用非阻塞的 IO 方式,可以用一个线程处理多个客户端的连接,就会使用到选择器。

选择器能够检测到多个注册的通道,通道上若有事件发生便获取事件然后针对每个事件进行相应的处理,这样便可以只用一个单线程去管理多个通道,即管理多个连接和请求。

只有在连接的通道真正有读写事件发生时,才会进行读写,而不用为每个连接都创建一个线程,不用去维护多个线程避免了多线程之间的上下文切换带来的开销2e718f6de6d94987ae2f2182c528d9c8.png

创建 Selector:通过 Selector.open() 方法创建一个 Selector

Selector selector = Selector.open()

向选择器注册通道SelectableChannel.register(Selector sel, int ops)

 //1.获取通道
 ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2.切换非阻塞模式
 ssChannel.configureBlocking(false);
 //3.绑定连接
 ssChannel.bind(new InetSocketAddress(9898));
 //4.获取选择器
 Selector selector = Selector.open();
 //5.将通道注册到选择器上,并指定"监听接收事件"
 ssChannel.register(selector, SelectionKey.OP_ACCEPT);

当调用 register(Selector sel, mt ops) 向通道注册选择器时,通过第二个参数 ops 指定选择器对通道的监听事件,可以监听的事件类型有


读:SelectionKey.OP_READ(1)

写:SelectionKey.OP_WRITE(4)

连接:SelectionKey.OP_CONNECT(8)

接收:SelectionKey.OP_ACCEPT(16)

若注册时不止监听一个事件,则可以使用“位或”操作符连接

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

🚀5. NIO 非阻塞式网络通信原理分析

Java NIO 的组件-----选择器(Selector)能够实现一个 IO 线程可以并发处理 N 个客户端连接和读写操作,从根本上解决了传统同步阻塞 IO 的一个连接一个线程的模型在性能上的缺陷问题,提高了架构的性能、弹性伸缩能力和可靠性。2e718f6de6d94987ae2f2182c528d9c8.png

服务端流程

1、当客户端连接服务端时,服务端会通过 ServerSocketChannel 得到 SocketChannel

ServerSocketChannel ssChannel = ServerSocketChannel.open();

2、切换阻塞模式

ss.Channel.configureBlocking(false);

3、绑定连接

ssChannel.bind(new InetSocketAddress(9999));

4、获取选择器

Selector selector = Selector.open();

5、将通道注册到选择器上,并且指定“监听接收事件”

ssChannel.register(seletor, SelectionKey.OP_ACCEPT);

6、轮询式的获取选择器上已经“准备就绪”的事件

//轮询式的获取选择器上已经"准备就绪"的事件
while (selector.select() > 0) {
 System.out.println("轮询");
 //获取当前选择器中所有注册的"选择键(已就绪的监听事件)"
 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
 while (it.hasNext()) {
     //获取准备“就绪”的事件
     SelectionKey sk = it.next();
     //判断具体是什么事件准备就绪
     if (sk.isAcceptable()) {
         //若“接收就绪”,获取客户端连接
         SocketChannel sChannel = ssChannel.accept();
         //切换非阻塞模式
         sChannel.configureBlocking(false);
         //将该通道注册到选择器上
         sChannel.register(selector, SelectionKey.OP_READ);
     } else if (sk.isReadable()) {
         //获取当前选择器上"读就绪"状态的通道
         SocketChannel sChannel = (SocketChannel) sk.channel();
         //读取数据
         ByteBuffer buf = ByteBuffer.allocate(1024);
         int len = 0;
         while ((len = sChannel.read(buf)) > 0) {
             buf.flip();
             System.out.println(new String(buf.array(), 0, len));
             System.out.println();
             buf.clear();
         }
     }
 }
 //取消选择键SelectionKey
 it.remove();

客户端流程

1、获取通道

SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));

2、切换非阻塞模式

sChannel.configureBlocking(false);

3、分配指定大小的缓存区

ByteBuffer buf = ByteBuffer.allocate(1024);

4、发送数据给服务端

Scanner scan = new Scanner(System.in);
while (scan.hasNext()) {
    String str = scan.nextLine();
    buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis())
    + "\n" + str).getBytes());
    buf.flip();
    sChannel.write(buf);
    buf.clear();
}
//关闭通道
sChannel.close();

🚀6. NIO 非阻塞式网络通信案例

服务端接收客户端的连接请求,并接收多个客户端发送过来的事件。

客户端代码如下

public class Client {
    public static void main(String[] args) throws IOException {
        //1.获取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
        //2.切换为非阻塞模式
        sChannel.configureBlocking(false);
        //3.分配指定大小缓存区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //4.发送数据给服务端
        Scanner sc = new Scanner(System.in);
        while (true) {
            System.out.print("请输入:");
            String msg = sc.nextLine();
            buffer.put(("华仔仔:"+msg).getBytes());
            buffer.flip();
            sChannel.write(buffer);
            buffer.clear();
        }
    }
}

服务端代码如下

public class Server {
    public static void main(String[] args) throws IOException {
        System.out.println("---------服务端启动-----------");
        //1.获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        //2.切换为非阻塞模式
        ssChannel.configureBlocking(false);
        //3.绑定连接的端口
        ssChannel.bind(new InetSocketAddress(9999));
        //4.获取选择器
        Selector selector = Selector.open();
        //5.将通道都注册到选择器上去,并且开始指定监听接收事件
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        //6.使用Selector选择器轮询已经就绪好的事件
        while (selector.select() > 0) {
            System.out.println("开始一轮事件处理...");
            //7.获取选择器中的所有注册的通道中已经就绪好的事件
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            //8.开始遍历这些准备好的事件
            while (it.hasNext()) {
                //提取当前这个事件
                SelectionKey sk = it.next();
                //9.判断这个事件具体是什么事件
                if (sk.isAcceptable()) {
                    //10.直接获取当前接入的客户端通道
                    SocketChannel sChannel = ssChannel.accept();
                    //11.将客户端通道也设置为非阻塞式的
                    sChannel.configureBlocking(false);
                    //12.将客户端通道也注册到选择器Selector上
                    sChannel.register(selector, SelectionKey.OP_READ);
                } else if (sk.isReadable()) {
                    //13.获取当前选择器上的"读就绪事件"
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    //14.开始读取数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int len = 0;
                    while ((len = sChannel.read(buffer)) > 0) {
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, len));
                        buffer.clear();  //清除之前的数据
                    }
                }
                //处理完毕当前事件后,需要移除当前事件,否则会重复处理
                it.remove();
            }
        }
    }
}

运行结果如下

---------服务端启动-----------
开始一轮事件处理...
开始一轮事件处理...
开始一轮事件处理...
开始一轮事件处理...
华仔仔:你好,我是client1
开始一轮事件处理...
华仔仔:你好,我是client2
开始一轮事件处理...
华仔仔:你好,我是client3
--------client1-----------
请输入:你好,我是client1
--------client2-----------
请输入:你好,我是client2
--------client3-----------
请输入:你好,我是client3

🚀7. NIO 网络编程实现群聊系统应用

需求说明

编写一个 NIO 群聊系统,实现客户端与客户端的通信需求(非阻塞)

服务端:可以检测用户上线、离线,并实现消息转发功能

客户端:通过通道(Channel) 可以无阻塞发送信息给其他所有客户端用户,同时可以接受其他客户端用户通过服务端转发的消息

服务端代码实现

public class Server {
    private Selector selector;
    private ServerSocketChannel ssChannel;
    private static final int PORT = 9999;
    //初始化工作
    public Server() {
        try {
            //1.创建选择器
            selector = Selector.open();
            //2.获取通道
            ssChannel = ServerSocketChannel.open();
            //3.切换为非阻塞模式
            ssChannel.configureBlocking(false);
            //4.绑定连接的端口
            ssChannel.bind(new InetSocketAddress(PORT));
            //5.将通道都注册到选择器上去,并且开始指定监听接收事件
            ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //监听
    public void listen() {
        try {
            while (selector.select() > 0) {
                //获取选择器中所有注册通道的就绪事件
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                //开始遍历这个事件
                while (it.hasNext()) {
                    //提取这个事件
                    SelectionKey sk = it.next();
                    //判断这个事件
                    if (sk.isAcceptable()) {
                        //客户端接入请求
                        //获取当前客户端通道
                        SocketChannel schannel = ssChannel.accept();
                        //注册成非阻塞模式
                        schannel.configureBlocking(false);
                        //注册给选择器,监听读数据的事件
                        schannel.register(selector, SelectionKey.OP_READ);
                    } else if (sk.isReadable()) {
                        //处理这个客户端的消息接收它,然后实现转发逻辑
                        readClientData(sk);
                    }
                    it.remove(); //处理完毕之后,需要移除当前事件
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //接收当前客户端消息,转发给其他全部客户端通道
    private void readClientData(SelectionKey sk) {
        SocketChannel sChannel = null;
        try {
            //直接得到当前客户端通道
            sChannel = (SocketChannel) sk.channel();
            //创建缓存区对象,开始接收客户端通道的数据
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = sChannel.read(buffer);
            if (count > 0) {
                buffer.flip();
                //提取读取到的信息
                String msg = new String(buffer.array(), 0, buffer.remaining());
                System.out.println("接收到了客户端消息:"+msg);
                //把这个消息推送给全部客户端接收
                sendMsgToAllClient(msg, sChannel);
            }
        } catch (Exception e) {
            try {
                System.out.println("有人离线了:"+sChannel.getRemoteAddress());
                //当前客户端离线
                sk.cancel(); //取消注册
                sChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }
    //把当前客户端的消息推送给当前全部在线注册的channel
    private void sendMsgToAllClient(String msg, SocketChannel sChannel) throws IOException {
        System.out.println("服务端开始转发这个消息,当前处理的线程"+ Thread.currentThread().getName());
        for (SelectionKey key : selector.keys()) {
            Channel channel = key.channel();
            if (channel instanceof SocketChannel && channel != sChannel) {
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                ((SocketChannel) channel).write(buffer);
            }
        }
    }
    public static void main(String[] args) {
        //创建服务端对象
        Server server = new Server();
        //开始监听客户端的各种消息事件:连接、群聊消息、离线消息
        server.listen();
    }
}

客户端代码实现

public class Client {
    private Selector selector;
    private static int PORT = 9999;
    private SocketChannel socketChannel;
    //初始化客户端信息
    public Client() {
        try {
            //创建选择器
            selector = Selector.open();
            //连接服务器
             socketChannel= SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
            //设置非阻塞模式
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("当前客户端准备完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private void sendToServer(String s) {
        try {
            socketChannel.write(ByteBuffer.wrap(("华仔说:"+s).getBytes()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private void readInfo() throws IOException {
        while (selector.select() > 0) {
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                if (key.isReadable()) {
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    sc.read(buffer);
                    System.out.println(new String(buffer.array()).trim());
                    System.out.println("-dsd---------------------");
                }
                it.remove();
            }
        }
    }
    public static void main(String[] args) {
        Client client = new Client();
        //定义一个线程,专门负责监听服务端发送过来的读消息事件
        new Thread(() -> {
            try {
                client.readInfo();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        //发消息
        Scanner sc = new Scanner(System.in);
        while (sc.hasNextLine()) {
            System.out.println("-----------------------");
            String s = sc.nextLine();
            client.sendToServer(s);
        }
    }
}
相关文章
|
12天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
24天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
16 1
|
1月前
|
Java
让星星⭐月亮告诉你,Java NIO之Buffer详解 属性capacity/position/limit/mark 方法put(X)/get()/flip()/compact()/clear()
这段代码演示了Java NIO中`ByteBuffer`的基本操作,包括分配、写入、翻转、读取、压缩和清空缓冲区。通过示例展示了`position`、`limit`和`mark`属性的变化过程,帮助理解缓冲区的工作原理。
28 2
|
2月前
|
存储 网络协议 Java
Java NIO 开发
本文介绍了Java NIO(New IO)及其主要组件,包括Channel、Buffer和Selector,并对比了NIO与传统IO的优势。文章详细讲解了FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel及Pipe.SinkChannel和Pipe.SourceChannel等Channel实现类,并提供了示例代码。通过这些示例,读者可以了解如何使用不同类型的通道进行数据读写操作。
Java NIO 开发
|
3月前
|
Java
"揭秘Java IO三大模式:BIO、NIO、AIO背后的秘密!为何AIO成为高并发时代的宠儿,你的选择对了吗?"
【8月更文挑战第19天】在Java的IO编程中,BIO、NIO与AIO代表了三种不同的IO处理机制。BIO采用同步阻塞模型,每个连接需单独线程处理,适用于连接少且稳定的场景。NIO引入了非阻塞性质,利用Channel、Buffer与Selector实现多路复用,提升了效率与吞吐量。AIO则是真正的异步IO,在JDK 7中引入,通过回调或Future机制在IO操作完成后通知应用,适合高并发场景。选择合适的模型对构建高效网络应用至关重要。
83 2
|
3月前
|
网络协议 Java 关系型数据库
16 Java网络编程(计算机网络+网络模型OSI/TCP/IP+通信协议等)
16 Java网络编程(计算机网络+网络模型OSI/TCP/IP+通信协议等)
85 2
|
3月前
|
网络协议 C# 开发者
WPF与Socket编程的完美邂逅:打造流畅网络通信体验——从客户端到服务器端,手把手教你实现基于Socket的实时数据交换
【8月更文挑战第31天】网络通信在现代应用中至关重要,Socket编程作为其实现基础,即便在主要用于桌面应用的Windows Presentation Foundation(WPF)中也发挥着重要作用。本文通过最佳实践,详细介绍如何在WPF应用中利用Socket实现网络通信,包括创建WPF项目、设计用户界面、实现Socket通信逻辑及搭建简单服务器端的全过程。具体步骤涵盖从UI设计到前后端交互的各个环节,并附有详尽示例代码,助力WPF开发者掌握这一关键技术,拓展应用程序的功能与实用性。
115 0
|
4月前
|
安全 Java Linux
(七)Java网络编程-IO模型篇之从BIO、NIO、AIO到内核select、epoll剖析!
IO(Input/Output)方面的基本知识,相信大家都不陌生,毕竟这也是在学习编程基础时就已经接触过的内容,但最初的IO教学大多数是停留在最基本的BIO,而并未对于NIO、AIO、多路复用等的高级内容进行详细讲述,但这些却是大部分高性能技术的底层核心,因此本文则准备围绕着IO知识进行展开。
165 1
|
3月前
|
存储 网络协议 Java
【Netty 神奇之旅】Java NIO 基础全解析:从零开始玩转高效网络编程!
【8月更文挑战第24天】本文介绍了Java NIO,一种非阻塞I/O模型,极大提升了Java应用程序在网络通信中的性能。核心组件包括Buffer、Channel、Selector和SocketChannel。通过示例代码展示了如何使用Java NIO进行服务器与客户端通信。此外,还介绍了基于Java NIO的高性能网络框架Netty,以及如何用Netty构建TCP服务器和客户端。熟悉这些技术和概念对于开发高并发网络应用至关重要。
70 0
|
4月前
|
Java Linux
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
Java演进问题之1:1线程模型对于I/O密集型任务如何解决