网络编程(Channel通信)入门详解(二)

简介: 网络编程(Channel通信)入门详解(二)

Channel(通道)

Buffer类(缓冲区)

java.nio.Buffer(抽象类):用于特定原始类型(基本类型)的数据的容器。Channel进行通信时,底层全部使用Buffer。

它的几个子类:

  • ByteBuffer:里面可以封装一个byte[]数组。【重点掌握】
  • ShortBuffer:里面可以封装一个short[]数组。
  • CharBuffer:里面可以封装一个char[]数组
  • IntBuffer:里面可以封装一个int[]数组。
  • LongBuffer:里面可以封装一个long[]数组。
  • FloatBuffer:里面可以封装一个float[]数组。
  • DoubleBuffer:里面可以封装一个double[]数组。
  • 没有boolean类型对应的Buffer


属性

  • capacity(容量):Buffer所能够包含的元素的最大数量。定义了Buffer后,容量是不可变的。

    int capacity()        //返回此缓冲区的容量
  • limit(限制):表示如果设置“限制为某一个位置,那么此位置及其后面的位置将不可用”。

    int limit()                    // 获取此缓冲区的限制
    Buffer limit(int newLimit)    // 设置此缓冲区的限制
  • position(位置):当前可写入的索引。位置不能小于0,并且不能大于"限制"。

    int position()                // 获取当前可写入位置索引
    Buffer position(int p)        // 更改当前可写入位置索引
  • mark(标记):当调用缓冲区的reset()方法时,会将缓冲区的position位置重置为该mark设置的索引。

    ​ 不能小于0,不能大于position。

    Buffer mark()         // 在此缓冲区的当前位置(索引)设置标记
    Buffer reset()         // 将此缓冲区的位置重置为以前标记的位置
                        /* 当我们调用缓冲区的reset方法时,会将缓冲区的position索引位置重置为mark标记的位置 */


创建ByteBuffer

java.nio.ByteBuffer:字节缓冲区,里边封装了一个byte类型的数组

创建对象的方式:使用静态方法

static ByteBuffer allocate(int capacity)        // 使用一个“容量”来创建一个“间接字节缓存区”——程序的“堆”空间中创建。
static ByteBuffer allocateDirect(int capacity)    // 使用一个“容量”来创建一个“直接字节缓存区”——系统内存。
                                                // 可以直接和系统交互,效率高
static ByteBuffer wrap(byte[] byteArray)        // 使用一个“byte[]数组”创建一个“间接字节缓存区”。


ByteBuffer成员方法

/* 向ByteBuffer添加数据 */
ByteBuffer put(byte b)                // 向当前可用位置添加数据,一次添加一个字节
ByteBuffer put(byte[] byteArray)    // 向当前可用位置添加一个byte[]数组
ByteBuffer put(byte[] byteArray, int offset, int len)        // 添加一个byte[]数组的一部分
    /* 参数:int offset:数组的开始索引,从哪个索引开始添加
            int len:添加个数
    */
ByteBuffer put(int index, byte b)     // 往指定索引处添加一个byte字节(替换)

byte[] array()        // 获取此缓冲区的 byte 数组

boolean isReadOnly()    // 获取当前缓冲区是否只读
boolean isDirect()        // 获取当前缓冲区是否为直接缓冲区
int remaining()            // 获取position与limit之间的元素数量
Buffer clear()            // 还原缓冲区的状态。
    /*     将position设置为0
        将限制limit设置为容量capacity
        丢弃标记mark
    */
Buffer flip()        // 缩小limit的范围。获取读取的有效数据0到position之间的数据
    /*     将limit设置为当前position位置; [0, 1, 2, 0, 0, 0, 0, 0, 0, 0]  position=3 limit=10
        将当前position位置设置为0;   position=0 limit=3  new String(bytes, 0, len)
        丢弃标记
    */

示例:

import java.nio.ByteBuffer;
import java.util.Arrays;

public class Demo02put {
    public static void main(String[] args) {
        //创建一个长度为10的ByteBuffer ==> 包含了一个长度为10的数组,默认值:{0,0,0,..0}
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println(buffer);        //java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]

        //- byte[] array()获取此缓冲区的 byte 数组
        byte[] arr = buffer.array();
        System.out.println(Arrays.toString(arr));//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

        //- public ByteBuffer put(byte b):向当前可用位置添加数据,一次添加一个字节
        buffer.put((byte)1);    //1默认是int类型,put方法的参数需要byte类型,需要强转
        byte b1 = 2;
        buffer.put(b1);
        System.out.println(Arrays.toString(arr));//[1, 2, 0, 0, 0, 0, 0, 0, 0, 0]

        //- public ByteBuffer put(byte[] byteArray):向当前可用位置添加一个byte[]数组
        byte[] bytes = {10,20,30,40,50};
        buffer.put(bytes);
        System.out.println(Arrays.toString(arr));//[1, 2, 10, 20, 30, 40, 50, 0, 0, 0]

        /*
            - public ByteBuffer put(byte[] byteArray,int offset,int len):添加一个byte[]数组的一部分
            int offset:数组的开始索引,从哪个索引开始添加
            int len:添加个数
         */
        buffer.put(bytes,3,2);//40,50
        System.out.println(Arrays.toString(arr));//[1, 2, 10, 20, 30, 40, 50, 40, 50, 0]

        //ByteBuffer put(int index, byte b) 往指定索引处添加一个byte字节(替换)
        buffer.put(1,(byte)88);
        System.out.println(Arrays.toString(arr));//[1, 88, 10, 20, 30, 40, 50, 40, 50, 0]
        
        
        //创建一个长度为10的ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put((byte)0);
        buffer.put((byte)1);
        buffer.put((byte)2);
        System.out.println(Arrays.toString(buffer.array()));//[0, 1, 2, 0(positon), 0, 0, 0, 0, 0, 0(limit 10)]

        //public int remaining():获取position与limit之间的元素数量。
        System.out.println(buffer.remaining());//7[3-9]

        //public boolean isReadOnly():获取当前缓冲区是否只读。
        System.out.println(buffer.isReadOnly());//false:既能读,有能写 true:只能读,不能写(只读)

        //public boolean isDirect():获取当前缓冲区是否为直接缓冲区。
        System.out.println(buffer.isDirect());//false:间接字节缓冲区(堆) true:直接字节缓冲区(系统)
        System.out.println(ByteBuffer.allocateDirect(10).isDirect());//true

        buffer.limit(5);//设置限制为5
        System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:3 限制:5

        /*
            public Buffer clear():还原缓冲区的状态。
            - 将position设置为:0
            - 将限制limit设置为容量capacity;
            - 丢弃标记mark。
         */
        //buffer.clear();
        //System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:0 限制:10
        //System.out.println(Arrays.toString(buffer.array()));//[0, 1, 2, 0, 0, 0, 0, 0, 0, 0]

        /*
            public Buffer flip():缩小limit的范围。 获取读取的有效数据:0到position之间的数据
            - 将limit设置为当前position位置; [0, 1, 2, 0, 0, 0, 0, 0, 0, 0]  position=3 limit=10
            - 将当前position位置设置为0;   position=0 limit=3  new String(bytes,0,len)
            - 丢弃标记。
         */
        buffer.flip();
        System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:0 限制:3
    }
}

Channel 概述

1).java.nio.channels.Channel(接口):用于 I/O 操作的连接。

  • 表示:通道。
  • 可以是“文件通道-FileChannel”、“网络通道-SocketChannel和ServerSockecChannel”。
  • 它类似于IO流,但比IO流更强大。read(byte[]) write(byte[])
  • IO流是“单向”的,Channel是“双向的”。

2).Channel全部使用Buffer实现读、写。read(ByteBuffer) write(ByteBuffer)


BIO / FileChannel(文件通道)

java.nio.channels.FileChannel:用于读取、写入、映射和操作文件的通道。

​ 类似IO流,用于读取文件,写入文件,复制文件

FileChannel是一个抽象类,无法直接创建对象,获取对象的方式:

/* 使用字节输入流(FileInputStream)中的方法获取读取文件的FileChannel */
FileChannel getChannel()     // 返回与此文件输入流有关的唯一 FileChannel 对象
/* 使用字节输出流(FileOutputStream)中的方法获取写入文件的FileChannel */
FileChannel getChannel()     // 返回与此文件输出流有关的唯一 FileChannel 对象

FileChannel的成员方法:

int read(ByteBuffer dst)      // 读取多个字节存储到ByteBuffer中,相当于FileInputStream中的read(byte[])
int write(ByteBuffer src)      // 将ByteBuffer中的数据写入到文件中,相当于FileOutputStream中的write(byte[])

示例:

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Demo01FileChannel {
    public static void main(String[] args) throws IOException {
        //1.创建FileInputStream对象,构造方法中绑定要读取的数据源
        FileInputStream fis = new FileInputStream("c:\\1.jpg");
        //2.创建FileOutputStream对象,构造方法中绑定要写入的目的地
        FileOutputStream fos = new FileOutputStream("d:\\1.jpg");
        //3.使用FileInputStream对象中的方法getChannel,获取读取文件的FileChannel对象
        FileChannel fisChannel = fis.getChannel();
        //4.使用FileOutputStream对象中的方法getChannel,获取写入文件的FIleChannel对象
        FileChannel fosChannel = fos.getChannel();
        //一读一写复制文件
        //5.使用读取文件的FileChannel对象中的方法read,读取文件 int read(ByteBuffer dst)
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int len = 0;
        while ((len = fisChannel.read(buffer)) != -1){
            //6.使用写入文件的FIleChannel对象中的方法write,把读取到的数据写入到文件中 int write(ByteBuffer src)
            //使用flip方法缩小limit的范围:最后一次读取的不一定是1024个字节
            System.out.println("flip前:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
            buffer.flip();
            System.out.println("flip后:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
            //write方法是从position(0)开始往目的地写数据,写到limit(有效字节),写完之后positon会变的
            System.out.println("write方法写数据:从" + buffer.position() + "写到:" + buffer.limit());
            fosChannel.write(buffer);//position(0)-limit(有效字节)之间的数据
            //初始化ByteBuffer的状态
            System.out.println("clear前:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
            buffer.clear();//将position设置为0,将limit设置为容量(1024)
            System.out.println("clear后:position的位置:" + buffer.position()+",limit:" + buffer.limit());
            System.out.println("-----------------------------------------------");
        }
        //7.释放资源
        fosChannel.close();
        fisChannel.close();
        fos.close();
        fis.close();
    }
}


MappedByteBuffer:高效读写

java.io.RandomAccessFile类:可以设置读、写模式的IO流类

构造方法:

public RandomAccessFile(String name, String mode)
/* 参数:
        String name;要读取的数据源,或者写入的目的地
        String mode:设置流的读写模式
            "r":只读,必须是小写
            "rw":读写,必须是小写
*/

使用 RandomAccessFile 类中的方法 getChannel 获取 FileChannel

FileChannel getChannel()     // 返回与此文件关联的唯一 FileChannel 对象

使用 FileChannel 类中的方法 map 获取 MappedByteBuffer

MappedByteBuffer map(FileChannel.MapMode mode, long position, long size)  // 将此通道的文件区域直接映射到内存中
/* 参数:
        FileChannel.MapMode mode:设置读写的模式
            READ_ONLY:只读映射模式
            READ_WRITE:读取/写入映射模式
       long position:文件中的位置,映射区域从此位置开始,一般都是从0开始
       long size:要映射的区域大小,就是要复制文件的大小,单位字节
*/

java.nio.MappedByteBuffer:它可以创建“直接缓存区”,将文件的磁盘数据映射到内存。

​ 系统内存中,和系统直接交互数据,速度快,效率高

注意:

  • 它最大可以映射:Integer.MAX_VALUE个字节(2G)左右,直接复制的文件不能超过2G,超过了就需要分块复制。
  • 磁盘和内存实时映射:内存中直接缓冲区中的文件改变,映射的硬盘中的文件也会实时改变。效率高(内存和内存进行读写)

MappedByteBuffer中的成员方法:

byte get(int index)                  // 获取缓冲区中指定索引处的字节
ByteBuffer put(int index, byte b)    // 把字节写入到指定的索引处


示例:使用FileChannel结合MappedByteBuffer实现高效读写复制2g以上的文件

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class Demo03FileChannel {
    public static void main(String[] args) throws IOException {
        long s = System.currentTimeMillis();
        //1.创建读取文件的RandomAccessFile对象,构造方法中封装要读取的数据源和设置只读模式("r")
        RandomAccessFile inRAF = new RandomAccessFile("c:\\2g.rar", "r");
        //2.创建写入文件的RandomAccessFile对象,构造方法中封装要写入的目的地和设置读写模式("rw")
        RandomAccessFile outRAF = new RandomAccessFile("d:\\2g.rar", "rw");
        //3.使用读取文件的RandomAccessFile对象中的方法getChannel,获取读取文件的FileChannel对象
        FileChannel inRAFChannel = inRAF.getChannel();
        //4.使用写入文件的RandomAccessFile对象中的方法getChannel,获取读取写入的FileChannel对象
        FileChannel outRAFChannel = outRAF.getChannel();
        //5.使用读取文件的FileChannel对象中的方法size,获取读取文件的大小(单位字节)
        long size = inRAFChannel.size();
        System.out.println(size);//2355126731 字节

        //定义复制文件需要使用的变量
        long count = 1;  //复制文件的块数,默认值是1
        long startIndex = 0;  //每次复制每块文件的开始索引
        long everySize = 512*1024*1024;  //分块,每块的大小 512M
        long copySize = size;  //每次复制文件的大小,默认等于文件的总大小

        //判断要复制的文件大小是否大于每块文件的大小
        if(size > everySize){
            //复制的文件大于512M,进行分块
            //计算复制文件可以分成几块  2242.56M/512M
            count = size%everySize==0 ? size/everySize : size/everySize+1;
            System.out.println("文件的大小:" + size + "字节,可以分成:" + count + "块");

            //第一次复制文件的大小等于每块的大小
            copySize = everySize;
        }

        //定义一个for循环,分几块就循环复制几次
        for (int i = 0; i < count; i++) {
            //6.使用读取文件的FileChannel对象中的方法map,创建读取文件的直接缓冲区MappedByteBuffer对象
            MappedByteBuffer inMap = inRAFChannel.map(FileChannel.MapMode.READ_ONLY, startIndex, copySize);
            //7.使用写入文件的FileChannel对象中的方法map,创建写入文件的直接缓冲区MappedByteBuffer对象
            MappedByteBuffer outMap = outRAFChannel.map(
                FileChannel.MapMode.READ_WRITE, startIndex, copySize);
            System.out.println("每块文件的开始复制的索引:" + startIndex);
            System.out.println("每块文件的大小:" + copySize + "字节");
            System.out.println("--------------------------------------------");
            //8.创建for循环,循环size次
            for (int j = 0; j < copySize; j++) {
                //9.使用取文件的直接缓冲区MappedByteBuffer对象中的方法get,读取数据源指定索引处的文件
                byte b = inMap.get(j);
                //10.使用写入文件的直接缓冲区MappedByteBuffer对象中的方法put,把读取到的字节写入到目的地指定的索引处
                outMap.put(j, b);
            }

            //复制完每块文件,重新计算startIndex和copySize的大小
            startIndex += copySize;
            copySize = size-startIndex>everySize ? everySize : size-startIndex;
        }

        //11.释放资源
        outRAFChannel.close();
        inRAFChannel.close();
        outRAF.close();
        inRAF.close();
        long e = System.currentTimeMillis();
        System.out.println("复制文件共耗时:"+(e-s)+"毫秒!");//复制文件共耗时:4912毫秒!
    }
}


NIO / SocketChannel(网络通道)

服务器端

相关的类:java.nio.channels.ServerSocketChannel:用于面向流的侦听套接字的可选通道。

获取对象的方式:使用静态方法open

static ServerSocketChannel open()         // 打开服务器插槽通道

成员方法:

ServerSocketChannel bind(SocketAddress local)         // 给服务器绑定指定的端口号
SocketChannel accept()         // 监听客户端的请求
SelectableChannel configureBlocking(boolean block)         // 设置服务器的阻塞模式 true:阻塞(不写默认) false:非阻塞

客户端

相关的类:java.nio.channels.SocketChannel:用于面向流的连接插座的可选通道。

获取对象的方法:使用静态方法open

static SocketChannel open()         // 打开套接字通道

成员方法:

boolean connect(SocketAddress remote)         // 根据服务器的ip地址和端口号连接服务器
/* 参数:
        SocketAddress remote:封装服务器的ip地址和端口号,用的时候直接new
            返回值:boolean
                连接服务器成功:true
                连接服务器失败:false
*/
SelectableChannel configureBlocking(boolean block)     // 设置客户端的阻塞模式。true:阻塞(不写默认) false:非阻塞
/*
    1.客户端设置为阻塞模式:connect方法会多次尝试连接服务器
        connect连接成功服务器,返回true
        connect连接服务器失败,会抛出连接异常ConnectException: Connection refused: connect
    2.客户端设置为非阻塞模式:connect方法只会连接一次服务器
        connect方法无论连接是否成功还是失败都会false
        所以客户端设置为非阻塞模式没有意义,结束不了轮询
*/
    
int write(ByteBuffer src)         // 给服务器发送数据
int read(ByteBuffer dst)         // 读取服务器回写的数据


示例

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/*
    实现同步非阻塞的服务器:轮询监听客户端的请求
 */
public class NIOTCPServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1.使用open方法获取ServerSocketChannel对象
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //2.使用ServerSocketChannel对象中的方法bind给服务器绑定指定的端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));

        //SelectableChannel configureBlocking(boolean block) 设置服务器的阻塞模式 true:阻塞(不写默认) false:非阻塞
        serverSocketChannel.configureBlocking(false);

        //轮询监听客户端的请求 ==> 死循环一直执行,监听客户端
        while (true){
            //3.使用ServerSocketChannel对象中的方法accept监听客户端的请求
            System.out.println("服务器等待客户端的连接...");
            SocketChannel socketChannel = serverSocketChannel.accept();//accpet:非阻塞 不会等待客户端请求

            //对客户端SocketChannel对象进行一个非空判断,没有客户端连接服务器,accpet方法返回null
            if(socketChannel != null){
                System.out.println("有客户端连接服务器,服务器读取客户端发送的数据,给客户端回写数据...");

                //int read(ByteBuffer dst) 读取客户端发送的数据
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int len = socketChannel.read(buffer);
                String msg = new String(buffer.array(), 0, len);
                System.out.println("服务器读取客户端发送的数据:" + msg);

                //int write(ByteBuffer src) 服务器给客户端发送数据
                socketChannel.write(ByteBuffer.wrap("收到,谢谢".getBytes()));

                System.out.println("服务器读写数据完成,结束轮询...");
                break;//结束轮询
            }else{
                System.out.println("没有客户端连接服务器,休息2秒钟,干点其他事情,在继续下一次轮询监听客户端连接...");
                Thread.sleep(2000);
            }
        }

        //释放资源
        serverSocketChannel.close();
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/*
    客户端:轮询连接服务器。客户端轮询连接服务器成功,给服务器发送数据,读取服务器回写的数据
 */
public class NIOTCPClient {
    public static void main(String[] args) {
        //客户端轮询连接服务器,连接成功,结束轮询
        while (true){
            try {
                //1.使用open方法获取客户端SocketChannel对象
                SocketChannel socketChannel = SocketChannel.open();
                System.out.println("客户端开始连接服务器...");

                //2.使用SocketChannel对象中的方法connect根据服务器的ip地址和端口号连接服务器
                boolean b = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
                System.out.println(b);
                System.out.println("客户端连接服务器成功,给服务器发送数据,读取服务器回写的数据...");
                //int write(ByteBuffer src) 给服务器发送数据
                ByteBuffer buffer = ByteBuffer.wrap("你好服务器".getBytes());
                System.out.println("容量:"+buffer.capacity());
                System.out.println("索引:"+buffer.position());
                System.out.println("限定:"+buffer.limit());
                socketChannel.write(buffer);

                //int read(ByteBuffer dst) 读取服务器回写的数据
                ByteBuffer buffer2 = ByteBuffer.allocate(1024);
                int len = socketChannel.read(buffer2);
                //len:读取的有效字节个数
                //System.out.println("客户端读取服务器发送的数据:" + new String(buffer2.array(), 0, len));

                buffer2.flip();//缩小limit的范围: position=0 limit=position(读取的有效字节个数)
                System.out.println("客户端读取服务器发送的数据:"+new String(buffer2.array(),0,buffer2.limit()));

                System.out.println("客户端读写数据完毕,结束轮询...");
                break;
            } catch (IOException e) {
                System.out.println("客户端connect方法连接服务器失败,休息2秒钟,干点其他事情,在继续下一次连接服务器...");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
}


多路复用 / 选择器:Selector

多路复用的概念

选择器Selector是NIO中的重要技术之一。它与SelectableChannel联合使用实现了非阻塞的多路复用。使用它可以节省CPU资源,提高程序的运行效率。

"多路"是指:服务器端同时监听多个“端口”的情况。每个端口都要监听多个客户端的连接。

  • 服务器端的非多路复用效果

    11.png

如果不使用“多路复用”,服务器端需要开很多线程处理每个端口的请求。如果在高并发环境下,造成系统性能下降。

  • 服务器端的多路复用效果

    12.png

使用了多路复用,只需要一个线程就可以处理多个通道,降低内存占用率,减少CPU切换时间,在高并发、高频段业务环境下有非常重要的优势


选择器Selector_服务器端实现多路注册

相关的类: java.nio.channels.Selector:SelectableChannel对象的多路复用器。

获取Selector对象的方式:

static Selector open()         // 打开选择器

注册Channel(服务器通道)到Selector上:使用ServerSocketChannel中的方法

SelectionKey register(Selector sel, int ops)         // 使用给定的选择器注册此频道,返回一个选择键。
/* 参数:
       Selector sel:传递要注册的选择器对象
       int ops:传递对应的事件
           使用SelectionKey中的常量:SelectionKey.OP_ACCEPT(固定写法,把服务器通道注册到选择器上)
           OP_ACCEPT:监听客户端件事
*/


选择器Selector_常用方法

// 返回一个Set<SelectionKey>集合,表示:已注册通道的集合。每个已注册通道封装为一个SelectionKey对象
Set<SelectionKey> keys()

// 返回一个Set<SelectionKey>集合,表示:当前已连接的通道的集合。每个已连接通道同一封装为一个SelectionKey对象
Set<SelectionKey> selectedKeys()
    
int select()    // 会阻塞,直到至少有1个客户端连接。返回一个int值,表示有几个客户端连接了服务器。


示例:多路信息接收

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/*
    服务器
 */
public class TCPServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1.创建3个ServerSocketChannel服务器对象
        ServerSocketChannel channel01 = ServerSocketChannel.open();
        ServerSocketChannel channel02 = ServerSocketChannel.open();
        ServerSocketChannel channel03 = ServerSocketChannel.open();
        //2.分别给3个ServerSocketChannel服务器对象绑定不同的端口号
        channel01.bind(new InetSocketAddress(7777));
        channel02.bind(new InetSocketAddress(8888));
        channel03.bind(new InetSocketAddress(9999));
        //3.设置3个ServerSocketChannel服务器对象为非阻塞模式(只要使用Selector选择器,服务器必须是非阻塞的)
        channel01.configureBlocking(false);
        channel02.configureBlocking(false);
        channel03.configureBlocking(false);
        //4.获取Selector对象
        Selector selector = Selector.open();
        //5.使用服务器ServerSocketChannel对象中的方法register,把3个服务器通道注册到选择器Selector上
        channel01.register(selector, SelectionKey.OP_ACCEPT);
        channel02.register(selector, SelectionKey.OP_ACCEPT);
        channel03.register(selector, SelectionKey.OP_ACCEPT);

        //Selector的keys()方法 此方法返回一个Set<SelectionKey>集合,表示:已注册通道的集合。每个已注册通道封装为一个SelectionKey对象。
        Set<SelectionKey> keys = selector.keys();
        System.out.println("已注册服务器通道的数量:" + keys.size());    //3

        //服务器轮询监听客户端的请求
        while (true){
            //Selector的select()方法:获取客户端连接的数量,没有客户端连接服务器,此方法会一直阻塞
            int select = selector.select();
            System.out.println("连接服务器的客户端数量:" + select);

            //Selector的selectedKeys()方法:获取当前已经连接的通道集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            System.out.println("已经到服务器的通道数量:" + selectionKeys.size());

            //处理Selector监听到客户端的请求事件:遍历Set集合,获取每一个SelectionKey对象
            Iterator<SelectionKey> it = selectionKeys.iterator();
            while (it.hasNext()){
                SelectionKey selectionKey = it.next();
                //获取SelectionKey里边封装的服务器ServerSocketChannel对象
                ServerSocketChannel channel = (ServerSocketChannel)selectionKey.channel();
                System.out.println("获取当前通道ServerSocketChannel监听的端口号:" + channel.getLocalAddress());
                //处理监听的accept事件==>获取请求服务器的客户单SocketChannel对象
                SocketChannel socketChannel = channel.accept();
                //读取客户端SocketChannel发送的数据
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int len = socketChannel.read(buffer);
                System.out.println("服务器读取到客户端发送的数据:" + new String(buffer.array(), 0, len));

                //处理完SelectionKey监听到的事件,要在Set集合中移除已经处理完的SelectionKey对象
                it.remove();//使用迭代器对象移除集合中的元素,不会抛出并发修改异常(移除的就是it.next()方法取出的对象)
            }

            //获取完一个客户端的连接,睡眠2秒,在进行下一次轮询
            Thread.sleep(2000);
        }
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/*
    开启三个线程,每个线程分别创建一个客户端对象,连接服务器的三个端口
 */
public class TCPClient {
    public static void main(String[] args) {
        new Thread(()->{
            //创建客户端对象,轮询连接服务器
            while (true){
                try(SocketChannel socketChannel = SocketChannel.open();) {
                    System.out.println("客户端开始连接7777端口...");
                    socketChannel.connect(new InetSocketAddress("127.0.0.1",7777));

                    System.out.println("客户端连接7777端口成功,给服务器发送数据");
                    socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接7777端口号的客户端对象!".getBytes()));

                    System.out.println("客户端7777发送数据完毕,结束轮询...");
                    break;
                } catch (IOException e) {
                    System.out.println("客户端连接7777端口异常");
                }
            }
        }).start();

        new Thread(()->{
            //创建客户端对象,轮询连接服务器
            while (true){
                try(SocketChannel socketChannel = SocketChannel.open();) {
                    System.out.println("客户端开始连接8888端口...");
                    socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));

                    System.out.println("客户端连接8888端口成功,给服务器发送数据");
                    socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接8888端口号的客户端对象!".getBytes()));

                    System.out.println("客户端8888发送数据完毕,结束轮询...");
                    break;
                } catch (IOException e) {
                    System.out.println("客户端连接8888端口异常");
                }
            }
        }).start();

        new Thread(()->{
            //创建客户端对象,轮询连接服务器
            while (true){
                try(SocketChannel socketChannel = SocketChannel.open();) {
                    System.out.println("客户端开始连接9999端口...");
                    socketChannel.connect(new InetSocketAddress("127.0.0.1",9999));

                    System.out.println("客户端连接9999端口成功,给服务器发送数据");
                    socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接9999端口号的客户端对象!".getBytes()));

                    System.out.println("客户端9999发送数据完毕,结束轮询...");
                    break;
                } catch (IOException e) {
                    System.out.println("客户端连接9999端口异常");
                }
            }
        }).start();
    }
}


AIO / 异步通道:AsynchronousServerSocketChannel

概述

服务器端:java.nio.channels.AsynchronousServerSocketChannel:用于面向流的侦听套接字的异步通道。

获取对象的方法:

static AsynchronousServerSocketChannel open()     // 打开异步服务器套接字通道

成员方法:

AsynchronousServerSocketChannel bind(SocketAddress local)     // 给服务器绑定指定的端口号
void accept(A attachment, CompletionHandler<?> handler)     // 监听客户端的请求,默认就是非阻塞的
/* 参数:
        A attachment:附件,可以传递null
        CompletionHandler<?> handler:事件处理的接口,用于处理accept方法监听到的事件
*/
    
Future<Integer> write(ByteBuffer src)             // 给客户端发送数据。阻塞方法,会一直等待客户端发送数据
// 读取客户端发送的数据。非阻塞的读取方法
Future<Integer> read(ByteBuffer dst)
void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<?> handler)
/* 参数:
        ByteBuffer dst: 用来存储读取到的数据
        long timeout: 完成I / O操作的最长时间
        TimeUnit unit: timeout参数的时间单位(TimeUnit.SECONDS:秒)
        A attachment: 要附加到I / O操作的对象; 可以是null
        CompletionHandler<?> handler: 消费结果的处理程序,是一个回调函数
*/

java.nio.channels.CompletionHandler<V,A>接口:用于消除异步I / O操作结果的处理程序。

CompletionHandler 也叫回调函数,accept或read方法执行成功,服务器就会自动执行这个回调函数,来读取客户端发送的数据

接口中的方法:

void completed(V result, A attachment)         // 主方法执行成功,执行的方法
void failed(Throwable exc, A attachment)     // 主方法执行失败,执行的方法


客户端: java.nio.channels.AsynchronousSocketChannel:用于面向流的连接插座的异步通道。

获取对象的方法:

static AsynchronousSocketChannel open()         // 打开异步套接字通道

成员方法:

Future<Void> connect(SocketAddress remote)         // 连接服务器的方法,参数传递服务器的ip地址和端口号
/* 注意:
        connect是一个非阻塞的方法,不会等待方法运行完毕,连接服务器成功在执行下边的代码
        客户端连接服务器需要时间的,如果没有连接成功,就给服务器使用write方法发送数据,会抛出异常
*/

Future<Integer> write(ByteBuffer src)             // 给服务器发送数据
Future<Integer> read(ByteBuffer dst)             // 读取服务器发送的数据

java.util.concurrent.Future接口

接口中的方法:

boolean isDone()             // 如果此任务完成,则返回 true
/*    返回true:连接服务器成功
    返回false:还没有连接上服务器(客户端连接服务器是需要时间的)
*/


示例:AIO异步连接,异步阻塞读写

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/*
    服务器端
*/
public class AIOTCPServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1.获取异步非阻塞的服务器AsynchronousServerSocketChannel对象
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        //2.使用bind方法给AsynchronousServerSocketChannel对象绑定指定的端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端的请求
        System.out.println("accept方法开始执行了...");
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                System.out.println("客户端连接服务器成功...");
                //读取客户端发送的数据  Future<Integer> read(ByteBuffer dst)
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //read方法是一个阻塞的方法,会一直等待客户端发送数据
                Future<Integer> future = result.read(buffer);
                Integer len = 0;
                try {
                    len = future.get();//获取客户端发送数据长度
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                System.out.println("服务器读取客户端发送的数据:"+new String(buffer.array(),0,len));
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("客户端连接服务器失败...");
            }
        });
        System.out.println("accept方法执行结束了...");
        //accept方法是一个非阻塞的方法,我们执行完accept方法,可以去做其他的事情
        //死循环的目的不让程序停止(工作中:写一些具体的需求);当有客户端请求服务器,会自动执行回调函数中的方法
        while (true){
            System.out.println("正在忙其他的事情!");
            Thread.sleep(2000);
        }
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;

/*
    客户端
 */
public class AIOTCPClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1.创建异步非阻塞的客户端AsynchronousSocketChannel对象
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        //2.使用AsynchronousSocketChannel对象中的方法connect连接服务器
        Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
        System.out.println(future.isDone());    //false:还未连接上服务器
        System.out.println(111);
        //休眠5秒钟,等待客户端连接服务器成功,再给服务器发送数据
        Thread.sleep(5000);
        System.out.println(future.isDone());    //true:已经连接服务器成功
        if(future.isDone()){
            socketChannel.write(ByteBuffer.wrap("你好服务器".getBytes()));
        }
        //释放资源
        socketChannel.close();
    }
}


示例:AIO异步连接,异步非阻塞读写

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/*
    服务器端
*/
public class AIOTCPServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1.获取异步非阻塞的服务器AsynchronousServerSocketChannel对象
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        //2.使用bind方法给AsynchronousServerSocketChannel对象绑定指定的端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端的请求
        System.out.println("accept方法开始执行了...");
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                System.out.println("客户端连接服务器成功...");
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //当客户端连接服务器成功,read方法只会等待客户端10秒钟,10秒钟发送了数据,执行completed方法;10秒钟之后还没有发送数据,执行failed
                result.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("服务器读取客户端发送数据成功,执行completed方法");
                        //服务器读取客户端发送的数据
                        String msg = new String(buffer.array(), 0, result);//把读取到的有效数据,转换为字符串
                        System.out.println("服务器读取客户端发送的数据为:"+msg);
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("服务器读取客户端发送数据失败,执行failed方法");
                    }
                });

            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("客户端连接服务器失败...");
            }
        });
        System.out.println("accept方法执行结束了...");
        //accept方法是一个非阻塞的方法,我们执行完accept方法,可以去做其他的事情
        //死循环的目的不让程序停止(工作中:写一些具体的需求);当有客户端请求服务器,会自动执行回调函数中的方法
        while (true){
            System.out.println("正在忙其他的事情!");
            Thread.sleep(2000);
        }
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;

/*
    客户端
 */
public class AIOTCPClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1.创建异步非阻塞的客户端AsynchronousSocketChannel对象
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        //2.使用AsynchronousSocketChannel对象中的方法connect连接服务器
        Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
        System.out.println(future.isDone());//false:还未连接上服务器
        System.out.println(111);
        //休眠5秒钟,等待客户端连接服务器成功,再给服务器发送数据
        Thread.sleep(15000);
        System.out.println(future.isDone());//true:已经连接服务器成功
        if(future.isDone()){
            socketChannel.write(ByteBuffer.wrap("你好服务器".getBytes()));
        }
        //释放资源
        socketChannel.close();
    }
}
相关文章
|
8天前
|
机器学习/深度学习 自然语言处理 语音技术
【Python 机器学习专栏】Python 深度学习入门:神经网络基础
【4月更文挑战第30天】本文介绍了Python在深度学习中应用于神经网络的基础知识,包括神经网络概念、基本结构、训练过程,以及Python中的深度学习库TensorFlow和PyTorch。通过示例展示了如何使用Python实现神经网络,并提及优化技巧如正则化和Dropout。最后,概述了神经网络在图像识别、语音识别和自然语言处理等领域的应用,并强调掌握这些知识对深度学习的重要性。随着技术进步,神经网络的应用将持续扩展,期待更多创新。
|
10天前
|
机器学习/深度学习 Python
【深度学习入门】- 神经网络
【深度学习入门】- 神经网络
|
12天前
|
安全 数据安全/隐私保护 智能硬件
|
13天前
|
存储 NoSQL Linux
Redis入门到通关之Redis5种网络模型详解
Redis入门到通关之Redis5种网络模型详解
30 1
|
13天前
|
NoSQL Ubuntu 关系型数据库
Redis入门到通关之Redis网络模型-用户空间和内核态空间
Redis入门到通关之Redis网络模型-用户空间和内核态空间
22 1
|
15天前
|
存储 网络协议 关系型数据库
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
|
15天前
|
关系型数据库 MySQL 数据库
Python从入门到精通:2.3.1数据库操作与网络编程:使用Python连接和操作数据库
Python从入门到精通:2.3.1数据库操作与网络编程:使用Python连接和操作数据库
|
19天前
|
监控 安全 API
《计算机网络简易速速上手小册》第5章:无线网络和移动通信(2024 最新版)
《计算机网络简易速速上手小册》第5章:无线网络和移动通信(2024 最新版)
23 2
|
23天前
|
安全 测试技术 网络安全
基于计算机网络的毕业设计 - 实现高效网络服务与安全通信
本毕业设计旨在利用计算机网络技术设计并实现一个综合性系统,以解决特定领域的问题或提供特定的服务。设计包括网络通信协议的优化、网络安全机制的设计、数据传输与处理的优化以及用户界面设计等方面。通过本设计,旨在提高学生对计算机网络及相关技术的理解和应用能力,并为用户提供高效、可靠的网络服务。
|
27天前
|
前端开发 网络协议 Java
Netty入门指南:从零开始的异步网络通信
Netty入门指南:从零开始的异步网络通信
26 0