Channel(通道)
Buffer类(缓冲区)
java.nio.Buffer(抽象类):用于特定原始类型(基本类型)的数据的容器。Channel进行通信时,底层全部使用Buffer。
它的几个子类:
- ByteBuffer:里面可以封装一个byte[]数组。【重点掌握】
- ShortBuffer:里面可以封装一个short[]数组。
- CharBuffer:里面可以封装一个char[]数组
- IntBuffer:里面可以封装一个int[]数组。
- LongBuffer:里面可以封装一个long[]数组。
- FloatBuffer:里面可以封装一个float[]数组。
- DoubleBuffer:里面可以封装一个double[]数组。
- 没有boolean类型对应的Buffer
属性
capacity(容量):Buffer所能够包含的元素的最大数量。定义了Buffer后,容量是不可变的。
int capacity() //返回此缓冲区的容量
limit(限制):表示如果设置“限制为某一个位置,那么此位置及其后面的位置将不可用”。
int limit() // 获取此缓冲区的限制 Buffer limit(int newLimit) // 设置此缓冲区的限制
position(位置):当前可写入的索引。位置不能小于0,并且不能大于"限制"。
int position() // 获取当前可写入位置索引 Buffer position(int p) // 更改当前可写入位置索引
mark(标记):当调用缓冲区的reset()方法时,会将缓冲区的position位置重置为该mark设置的索引。
不能小于0,不能大于position。
Buffer mark() // 在此缓冲区的当前位置(索引)设置标记 Buffer reset() // 将此缓冲区的位置重置为以前标记的位置 /* 当我们调用缓冲区的reset方法时,会将缓冲区的position索引位置重置为mark标记的位置 */
创建ByteBuffer
java.nio.ByteBuffer:字节缓冲区,里边封装了一个byte类型的数组
创建对象的方式:使用静态方法
static ByteBuffer allocate(int capacity) // 使用一个“容量”来创建一个“间接字节缓存区”——程序的“堆”空间中创建。
static ByteBuffer allocateDirect(int capacity) // 使用一个“容量”来创建一个“直接字节缓存区”——系统内存。
// 可以直接和系统交互,效率高
static ByteBuffer wrap(byte[] byteArray) // 使用一个“byte[]数组”创建一个“间接字节缓存区”。
ByteBuffer成员方法
/* 向ByteBuffer添加数据 */
ByteBuffer put(byte b) // 向当前可用位置添加数据,一次添加一个字节
ByteBuffer put(byte[] byteArray) // 向当前可用位置添加一个byte[]数组
ByteBuffer put(byte[] byteArray, int offset, int len) // 添加一个byte[]数组的一部分
/* 参数:int offset:数组的开始索引,从哪个索引开始添加
int len:添加个数
*/
ByteBuffer put(int index, byte b) // 往指定索引处添加一个byte字节(替换)
byte[] array() // 获取此缓冲区的 byte 数组
boolean isReadOnly() // 获取当前缓冲区是否只读
boolean isDirect() // 获取当前缓冲区是否为直接缓冲区
int remaining() // 获取position与limit之间的元素数量
Buffer clear() // 还原缓冲区的状态。
/* 将position设置为0
将限制limit设置为容量capacity
丢弃标记mark
*/
Buffer flip() // 缩小limit的范围。获取读取的有效数据0到position之间的数据
/* 将limit设置为当前position位置; [0, 1, 2, 0, 0, 0, 0, 0, 0, 0] position=3 limit=10
将当前position位置设置为0; position=0 limit=3 new String(bytes, 0, len)
丢弃标记
*/
示例:
import java.nio.ByteBuffer;
import java.util.Arrays;
public class Demo02put {
public static void main(String[] args) {
//创建一个长度为10的ByteBuffer ==> 包含了一个长度为10的数组,默认值:{0,0,0,..0}
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer); //java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]
//- byte[] array()获取此缓冲区的 byte 数组
byte[] arr = buffer.array();
System.out.println(Arrays.toString(arr));//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
//- public ByteBuffer put(byte b):向当前可用位置添加数据,一次添加一个字节
buffer.put((byte)1); //1默认是int类型,put方法的参数需要byte类型,需要强转
byte b1 = 2;
buffer.put(b1);
System.out.println(Arrays.toString(arr));//[1, 2, 0, 0, 0, 0, 0, 0, 0, 0]
//- public ByteBuffer put(byte[] byteArray):向当前可用位置添加一个byte[]数组
byte[] bytes = {10,20,30,40,50};
buffer.put(bytes);
System.out.println(Arrays.toString(arr));//[1, 2, 10, 20, 30, 40, 50, 0, 0, 0]
/*
- public ByteBuffer put(byte[] byteArray,int offset,int len):添加一个byte[]数组的一部分
int offset:数组的开始索引,从哪个索引开始添加
int len:添加个数
*/
buffer.put(bytes,3,2);//40,50
System.out.println(Arrays.toString(arr));//[1, 2, 10, 20, 30, 40, 50, 40, 50, 0]
//ByteBuffer put(int index, byte b) 往指定索引处添加一个byte字节(替换)
buffer.put(1,(byte)88);
System.out.println(Arrays.toString(arr));//[1, 88, 10, 20, 30, 40, 50, 40, 50, 0]
//创建一个长度为10的ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte)0);
buffer.put((byte)1);
buffer.put((byte)2);
System.out.println(Arrays.toString(buffer.array()));//[0, 1, 2, 0(positon), 0, 0, 0, 0, 0, 0(limit 10)]
//public int remaining():获取position与limit之间的元素数量。
System.out.println(buffer.remaining());//7[3-9]
//public boolean isReadOnly():获取当前缓冲区是否只读。
System.out.println(buffer.isReadOnly());//false:既能读,有能写 true:只能读,不能写(只读)
//public boolean isDirect():获取当前缓冲区是否为直接缓冲区。
System.out.println(buffer.isDirect());//false:间接字节缓冲区(堆) true:直接字节缓冲区(系统)
System.out.println(ByteBuffer.allocateDirect(10).isDirect());//true
buffer.limit(5);//设置限制为5
System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:3 限制:5
/*
public Buffer clear():还原缓冲区的状态。
- 将position设置为:0
- 将限制limit设置为容量capacity;
- 丢弃标记mark。
*/
//buffer.clear();
//System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:0 限制:10
//System.out.println(Arrays.toString(buffer.array()));//[0, 1, 2, 0, 0, 0, 0, 0, 0, 0]
/*
public Buffer flip():缩小limit的范围。 获取读取的有效数据:0到position之间的数据
- 将limit设置为当前position位置; [0, 1, 2, 0, 0, 0, 0, 0, 0, 0] position=3 limit=10
- 将当前position位置设置为0; position=0 limit=3 new String(bytes,0,len)
- 丢弃标记。
*/
buffer.flip();
System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:0 限制:3
}
}
Channel 概述
1).java.nio.channels.Channel(接口):用于 I/O 操作的连接。
- 表示:通道。
- 可以是“文件通道-FileChannel”、“网络通道-SocketChannel和ServerSockecChannel”。
- 它类似于IO流,但比IO流更强大。read(byte[]) write(byte[])
- IO流是“单向”的,Channel是“双向的”。
2).Channel全部使用Buffer实现读、写。read(ByteBuffer) write(ByteBuffer)
BIO / FileChannel(文件通道)
java.nio.channels.FileChannel:用于读取、写入、映射和操作文件的通道。
类似IO流,用于读取文件,写入文件,复制文件
FileChannel是一个抽象类,无法直接创建对象,获取对象的方式:
/* 使用字节输入流(FileInputStream)中的方法获取读取文件的FileChannel */
FileChannel getChannel() // 返回与此文件输入流有关的唯一 FileChannel 对象
/* 使用字节输出流(FileOutputStream)中的方法获取写入文件的FileChannel */
FileChannel getChannel() // 返回与此文件输出流有关的唯一 FileChannel 对象
FileChannel的成员方法:
int read(ByteBuffer dst) // 读取多个字节存储到ByteBuffer中,相当于FileInputStream中的read(byte[])
int write(ByteBuffer src) // 将ByteBuffer中的数据写入到文件中,相当于FileOutputStream中的write(byte[])
示例:
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class Demo01FileChannel {
public static void main(String[] args) throws IOException {
//1.创建FileInputStream对象,构造方法中绑定要读取的数据源
FileInputStream fis = new FileInputStream("c:\\1.jpg");
//2.创建FileOutputStream对象,构造方法中绑定要写入的目的地
FileOutputStream fos = new FileOutputStream("d:\\1.jpg");
//3.使用FileInputStream对象中的方法getChannel,获取读取文件的FileChannel对象
FileChannel fisChannel = fis.getChannel();
//4.使用FileOutputStream对象中的方法getChannel,获取写入文件的FIleChannel对象
FileChannel fosChannel = fos.getChannel();
//一读一写复制文件
//5.使用读取文件的FileChannel对象中的方法read,读取文件 int read(ByteBuffer dst)
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = fisChannel.read(buffer)) != -1){
//6.使用写入文件的FIleChannel对象中的方法write,把读取到的数据写入到文件中 int write(ByteBuffer src)
//使用flip方法缩小limit的范围:最后一次读取的不一定是1024个字节
System.out.println("flip前:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
buffer.flip();
System.out.println("flip后:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
//write方法是从position(0)开始往目的地写数据,写到limit(有效字节),写完之后positon会变的
System.out.println("write方法写数据:从" + buffer.position() + "写到:" + buffer.limit());
fosChannel.write(buffer);//position(0)-limit(有效字节)之间的数据
//初始化ByteBuffer的状态
System.out.println("clear前:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
buffer.clear();//将position设置为0,将limit设置为容量(1024)
System.out.println("clear后:position的位置:" + buffer.position()+",limit:" + buffer.limit());
System.out.println("-----------------------------------------------");
}
//7.释放资源
fosChannel.close();
fisChannel.close();
fos.close();
fis.close();
}
}
MappedByteBuffer:高效读写
java.io.RandomAccessFile类:可以设置读、写模式的IO流类
构造方法:
public RandomAccessFile(String name, String mode)
/* 参数:
String name;要读取的数据源,或者写入的目的地
String mode:设置流的读写模式
"r":只读,必须是小写
"rw":读写,必须是小写
*/
使用 RandomAccessFile 类中的方法 getChannel 获取 FileChannel
FileChannel getChannel() // 返回与此文件关联的唯一 FileChannel 对象
使用 FileChannel 类中的方法 map 获取 MappedByteBuffer
MappedByteBuffer map(FileChannel.MapMode mode, long position, long size) // 将此通道的文件区域直接映射到内存中
/* 参数:
FileChannel.MapMode mode:设置读写的模式
READ_ONLY:只读映射模式
READ_WRITE:读取/写入映射模式
long position:文件中的位置,映射区域从此位置开始,一般都是从0开始
long size:要映射的区域大小,就是要复制文件的大小,单位字节
*/
java.nio.MappedByteBuffer:它可以创建“直接缓存区”,将文件的磁盘数据映射到内存。
系统内存中,和系统直接交互数据,速度快,效率高
注意:
- 它最大可以映射:Integer.MAX_VALUE个字节(2G)左右,直接复制的文件不能超过2G,超过了就需要分块复制。
- 磁盘和内存实时映射:内存中直接缓冲区中的文件改变,映射的硬盘中的文件也会实时改变。效率高(内存和内存进行读写)
MappedByteBuffer中的成员方法:
byte get(int index) // 获取缓冲区中指定索引处的字节
ByteBuffer put(int index, byte b) // 把字节写入到指定的索引处
示例:使用FileChannel结合MappedByteBuffer实现高效读写复制2g以上的文件
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
public class Demo03FileChannel {
public static void main(String[] args) throws IOException {
long s = System.currentTimeMillis();
//1.创建读取文件的RandomAccessFile对象,构造方法中封装要读取的数据源和设置只读模式("r")
RandomAccessFile inRAF = new RandomAccessFile("c:\\2g.rar", "r");
//2.创建写入文件的RandomAccessFile对象,构造方法中封装要写入的目的地和设置读写模式("rw")
RandomAccessFile outRAF = new RandomAccessFile("d:\\2g.rar", "rw");
//3.使用读取文件的RandomAccessFile对象中的方法getChannel,获取读取文件的FileChannel对象
FileChannel inRAFChannel = inRAF.getChannel();
//4.使用写入文件的RandomAccessFile对象中的方法getChannel,获取读取写入的FileChannel对象
FileChannel outRAFChannel = outRAF.getChannel();
//5.使用读取文件的FileChannel对象中的方法size,获取读取文件的大小(单位字节)
long size = inRAFChannel.size();
System.out.println(size);//2355126731 字节
//定义复制文件需要使用的变量
long count = 1; //复制文件的块数,默认值是1
long startIndex = 0; //每次复制每块文件的开始索引
long everySize = 512*1024*1024; //分块,每块的大小 512M
long copySize = size; //每次复制文件的大小,默认等于文件的总大小
//判断要复制的文件大小是否大于每块文件的大小
if(size > everySize){
//复制的文件大于512M,进行分块
//计算复制文件可以分成几块 2242.56M/512M
count = size%everySize==0 ? size/everySize : size/everySize+1;
System.out.println("文件的大小:" + size + "字节,可以分成:" + count + "块");
//第一次复制文件的大小等于每块的大小
copySize = everySize;
}
//定义一个for循环,分几块就循环复制几次
for (int i = 0; i < count; i++) {
//6.使用读取文件的FileChannel对象中的方法map,创建读取文件的直接缓冲区MappedByteBuffer对象
MappedByteBuffer inMap = inRAFChannel.map(FileChannel.MapMode.READ_ONLY, startIndex, copySize);
//7.使用写入文件的FileChannel对象中的方法map,创建写入文件的直接缓冲区MappedByteBuffer对象
MappedByteBuffer outMap = outRAFChannel.map(
FileChannel.MapMode.READ_WRITE, startIndex, copySize);
System.out.println("每块文件的开始复制的索引:" + startIndex);
System.out.println("每块文件的大小:" + copySize + "字节");
System.out.println("--------------------------------------------");
//8.创建for循环,循环size次
for (int j = 0; j < copySize; j++) {
//9.使用取文件的直接缓冲区MappedByteBuffer对象中的方法get,读取数据源指定索引处的文件
byte b = inMap.get(j);
//10.使用写入文件的直接缓冲区MappedByteBuffer对象中的方法put,把读取到的字节写入到目的地指定的索引处
outMap.put(j, b);
}
//复制完每块文件,重新计算startIndex和copySize的大小
startIndex += copySize;
copySize = size-startIndex>everySize ? everySize : size-startIndex;
}
//11.释放资源
outRAFChannel.close();
inRAFChannel.close();
outRAF.close();
inRAF.close();
long e = System.currentTimeMillis();
System.out.println("复制文件共耗时:"+(e-s)+"毫秒!");//复制文件共耗时:4912毫秒!
}
}
NIO / SocketChannel(网络通道)
服务器端
相关的类:java.nio.channels.ServerSocketChannel:用于面向流的侦听套接字的可选通道。
获取对象的方式:使用静态方法open
static ServerSocketChannel open() // 打开服务器插槽通道
成员方法:
ServerSocketChannel bind(SocketAddress local) // 给服务器绑定指定的端口号
SocketChannel accept() // 监听客户端的请求
SelectableChannel configureBlocking(boolean block) // 设置服务器的阻塞模式 true:阻塞(不写默认) false:非阻塞
客户端
相关的类:java.nio.channels.SocketChannel:用于面向流的连接插座的可选通道。
获取对象的方法:使用静态方法open
static SocketChannel open() // 打开套接字通道
成员方法:
boolean connect(SocketAddress remote) // 根据服务器的ip地址和端口号连接服务器
/* 参数:
SocketAddress remote:封装服务器的ip地址和端口号,用的时候直接new
返回值:boolean
连接服务器成功:true
连接服务器失败:false
*/
SelectableChannel configureBlocking(boolean block) // 设置客户端的阻塞模式。true:阻塞(不写默认) false:非阻塞
/*
1.客户端设置为阻塞模式:connect方法会多次尝试连接服务器
connect连接成功服务器,返回true
connect连接服务器失败,会抛出连接异常ConnectException: Connection refused: connect
2.客户端设置为非阻塞模式:connect方法只会连接一次服务器
connect方法无论连接是否成功还是失败都会false
所以客户端设置为非阻塞模式没有意义,结束不了轮询
*/
int write(ByteBuffer src) // 给服务器发送数据
int read(ByteBuffer dst) // 读取服务器回写的数据
示例
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/*
实现同步非阻塞的服务器:轮询监听客户端的请求
*/
public class NIOTCPServer {
public static void main(String[] args) throws IOException, InterruptedException {
//1.使用open方法获取ServerSocketChannel对象
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2.使用ServerSocketChannel对象中的方法bind给服务器绑定指定的端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
//SelectableChannel configureBlocking(boolean block) 设置服务器的阻塞模式 true:阻塞(不写默认) false:非阻塞
serverSocketChannel.configureBlocking(false);
//轮询监听客户端的请求 ==> 死循环一直执行,监听客户端
while (true){
//3.使用ServerSocketChannel对象中的方法accept监听客户端的请求
System.out.println("服务器等待客户端的连接...");
SocketChannel socketChannel = serverSocketChannel.accept();//accpet:非阻塞 不会等待客户端请求
//对客户端SocketChannel对象进行一个非空判断,没有客户端连接服务器,accpet方法返回null
if(socketChannel != null){
System.out.println("有客户端连接服务器,服务器读取客户端发送的数据,给客户端回写数据...");
//int read(ByteBuffer dst) 读取客户端发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(buffer);
String msg = new String(buffer.array(), 0, len);
System.out.println("服务器读取客户端发送的数据:" + msg);
//int write(ByteBuffer src) 服务器给客户端发送数据
socketChannel.write(ByteBuffer.wrap("收到,谢谢".getBytes()));
System.out.println("服务器读写数据完成,结束轮询...");
break;//结束轮询
}else{
System.out.println("没有客户端连接服务器,休息2秒钟,干点其他事情,在继续下一次轮询监听客户端连接...");
Thread.sleep(2000);
}
}
//释放资源
serverSocketChannel.close();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/*
客户端:轮询连接服务器。客户端轮询连接服务器成功,给服务器发送数据,读取服务器回写的数据
*/
public class NIOTCPClient {
public static void main(String[] args) {
//客户端轮询连接服务器,连接成功,结束轮询
while (true){
try {
//1.使用open方法获取客户端SocketChannel对象
SocketChannel socketChannel = SocketChannel.open();
System.out.println("客户端开始连接服务器...");
//2.使用SocketChannel对象中的方法connect根据服务器的ip地址和端口号连接服务器
boolean b = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
System.out.println(b);
System.out.println("客户端连接服务器成功,给服务器发送数据,读取服务器回写的数据...");
//int write(ByteBuffer src) 给服务器发送数据
ByteBuffer buffer = ByteBuffer.wrap("你好服务器".getBytes());
System.out.println("容量:"+buffer.capacity());
System.out.println("索引:"+buffer.position());
System.out.println("限定:"+buffer.limit());
socketChannel.write(buffer);
//int read(ByteBuffer dst) 读取服务器回写的数据
ByteBuffer buffer2 = ByteBuffer.allocate(1024);
int len = socketChannel.read(buffer2);
//len:读取的有效字节个数
//System.out.println("客户端读取服务器发送的数据:" + new String(buffer2.array(), 0, len));
buffer2.flip();//缩小limit的范围: position=0 limit=position(读取的有效字节个数)
System.out.println("客户端读取服务器发送的数据:"+new String(buffer2.array(),0,buffer2.limit()));
System.out.println("客户端读写数据完毕,结束轮询...");
break;
} catch (IOException e) {
System.out.println("客户端connect方法连接服务器失败,休息2秒钟,干点其他事情,在继续下一次连接服务器...");
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
}
多路复用 / 选择器:Selector
多路复用的概念
选择器Selector是NIO中的重要技术之一。它与SelectableChannel联合使用实现了非阻塞的多路复用。使用它可以节省CPU资源,提高程序的运行效率。
"多路"是指:服务器端同时监听多个“端口”的情况。每个端口都要监听多个客户端的连接。
- 服务器端的非多路复用效果
如果不使用“多路复用”,服务器端需要开很多线程处理每个端口的请求。如果在高并发环境下,造成系统性能下降。
- 服务器端的多路复用效果
使用了多路复用,只需要一个线程就可以处理多个通道,降低内存占用率,减少CPU切换时间,在高并发、高频段业务环境下有非常重要的优势
选择器Selector_服务器端实现多路注册
相关的类: java.nio.channels.Selector:SelectableChannel对象的多路复用器。
获取Selector对象的方式:
static Selector open() // 打开选择器
注册Channel(服务器通道)到Selector上:使用ServerSocketChannel中的方法
SelectionKey register(Selector sel, int ops) // 使用给定的选择器注册此频道,返回一个选择键。
/* 参数:
Selector sel:传递要注册的选择器对象
int ops:传递对应的事件
使用SelectionKey中的常量:SelectionKey.OP_ACCEPT(固定写法,把服务器通道注册到选择器上)
OP_ACCEPT:监听客户端件事
*/
选择器Selector_常用方法
// 返回一个Set<SelectionKey>集合,表示:已注册通道的集合。每个已注册通道封装为一个SelectionKey对象
Set<SelectionKey> keys()
// 返回一个Set<SelectionKey>集合,表示:当前已连接的通道的集合。每个已连接通道同一封装为一个SelectionKey对象
Set<SelectionKey> selectedKeys()
int select() // 会阻塞,直到至少有1个客户端连接。返回一个int值,表示有几个客户端连接了服务器。
示例:多路信息接收
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/*
服务器
*/
public class TCPServer {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建3个ServerSocketChannel服务器对象
ServerSocketChannel channel01 = ServerSocketChannel.open();
ServerSocketChannel channel02 = ServerSocketChannel.open();
ServerSocketChannel channel03 = ServerSocketChannel.open();
//2.分别给3个ServerSocketChannel服务器对象绑定不同的端口号
channel01.bind(new InetSocketAddress(7777));
channel02.bind(new InetSocketAddress(8888));
channel03.bind(new InetSocketAddress(9999));
//3.设置3个ServerSocketChannel服务器对象为非阻塞模式(只要使用Selector选择器,服务器必须是非阻塞的)
channel01.configureBlocking(false);
channel02.configureBlocking(false);
channel03.configureBlocking(false);
//4.获取Selector对象
Selector selector = Selector.open();
//5.使用服务器ServerSocketChannel对象中的方法register,把3个服务器通道注册到选择器Selector上
channel01.register(selector, SelectionKey.OP_ACCEPT);
channel02.register(selector, SelectionKey.OP_ACCEPT);
channel03.register(selector, SelectionKey.OP_ACCEPT);
//Selector的keys()方法 此方法返回一个Set<SelectionKey>集合,表示:已注册通道的集合。每个已注册通道封装为一个SelectionKey对象。
Set<SelectionKey> keys = selector.keys();
System.out.println("已注册服务器通道的数量:" + keys.size()); //3
//服务器轮询监听客户端的请求
while (true){
//Selector的select()方法:获取客户端连接的数量,没有客户端连接服务器,此方法会一直阻塞
int select = selector.select();
System.out.println("连接服务器的客户端数量:" + select);
//Selector的selectedKeys()方法:获取当前已经连接的通道集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("已经到服务器的通道数量:" + selectionKeys.size());
//处理Selector监听到客户端的请求事件:遍历Set集合,获取每一个SelectionKey对象
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()){
SelectionKey selectionKey = it.next();
//获取SelectionKey里边封装的服务器ServerSocketChannel对象
ServerSocketChannel channel = (ServerSocketChannel)selectionKey.channel();
System.out.println("获取当前通道ServerSocketChannel监听的端口号:" + channel.getLocalAddress());
//处理监听的accept事件==>获取请求服务器的客户单SocketChannel对象
SocketChannel socketChannel = channel.accept();
//读取客户端SocketChannel发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(buffer);
System.out.println("服务器读取到客户端发送的数据:" + new String(buffer.array(), 0, len));
//处理完SelectionKey监听到的事件,要在Set集合中移除已经处理完的SelectionKey对象
it.remove();//使用迭代器对象移除集合中的元素,不会抛出并发修改异常(移除的就是it.next()方法取出的对象)
}
//获取完一个客户端的连接,睡眠2秒,在进行下一次轮询
Thread.sleep(2000);
}
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/*
开启三个线程,每个线程分别创建一个客户端对象,连接服务器的三个端口
*/
public class TCPClient {
public static void main(String[] args) {
new Thread(()->{
//创建客户端对象,轮询连接服务器
while (true){
try(SocketChannel socketChannel = SocketChannel.open();) {
System.out.println("客户端开始连接7777端口...");
socketChannel.connect(new InetSocketAddress("127.0.0.1",7777));
System.out.println("客户端连接7777端口成功,给服务器发送数据");
socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接7777端口号的客户端对象!".getBytes()));
System.out.println("客户端7777发送数据完毕,结束轮询...");
break;
} catch (IOException e) {
System.out.println("客户端连接7777端口异常");
}
}
}).start();
new Thread(()->{
//创建客户端对象,轮询连接服务器
while (true){
try(SocketChannel socketChannel = SocketChannel.open();) {
System.out.println("客户端开始连接8888端口...");
socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
System.out.println("客户端连接8888端口成功,给服务器发送数据");
socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接8888端口号的客户端对象!".getBytes()));
System.out.println("客户端8888发送数据完毕,结束轮询...");
break;
} catch (IOException e) {
System.out.println("客户端连接8888端口异常");
}
}
}).start();
new Thread(()->{
//创建客户端对象,轮询连接服务器
while (true){
try(SocketChannel socketChannel = SocketChannel.open();) {
System.out.println("客户端开始连接9999端口...");
socketChannel.connect(new InetSocketAddress("127.0.0.1",9999));
System.out.println("客户端连接9999端口成功,给服务器发送数据");
socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接9999端口号的客户端对象!".getBytes()));
System.out.println("客户端9999发送数据完毕,结束轮询...");
break;
} catch (IOException e) {
System.out.println("客户端连接9999端口异常");
}
}
}).start();
}
}
AIO / 异步通道:AsynchronousServerSocketChannel
概述
服务器端:java.nio.channels.AsynchronousServerSocketChannel:用于面向流的侦听套接字的异步通道。
获取对象的方法:
static AsynchronousServerSocketChannel open() // 打开异步服务器套接字通道
成员方法:
AsynchronousServerSocketChannel bind(SocketAddress local) // 给服务器绑定指定的端口号
void accept(A attachment, CompletionHandler<?> handler) // 监听客户端的请求,默认就是非阻塞的
/* 参数:
A attachment:附件,可以传递null
CompletionHandler<?> handler:事件处理的接口,用于处理accept方法监听到的事件
*/
Future<Integer> write(ByteBuffer src) // 给客户端发送数据。阻塞方法,会一直等待客户端发送数据
// 读取客户端发送的数据。非阻塞的读取方法
Future<Integer> read(ByteBuffer dst)
void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<?> handler)
/* 参数:
ByteBuffer dst: 用来存储读取到的数据
long timeout: 完成I / O操作的最长时间
TimeUnit unit: timeout参数的时间单位(TimeUnit.SECONDS:秒)
A attachment: 要附加到I / O操作的对象; 可以是null
CompletionHandler<?> handler: 消费结果的处理程序,是一个回调函数
*/
java.nio.channels.CompletionHandler<V,A>接口:用于消除异步I / O操作结果的处理程序。
CompletionHandler 也叫回调函数,accept或read方法执行成功,服务器就会自动执行这个回调函数,来读取客户端发送的数据
接口中的方法:
void completed(V result, A attachment) // 主方法执行成功,执行的方法
void failed(Throwable exc, A attachment) // 主方法执行失败,执行的方法
客户端: java.nio.channels.AsynchronousSocketChannel:用于面向流的连接插座的异步通道。
获取对象的方法:
static AsynchronousSocketChannel open() // 打开异步套接字通道
成员方法:
Future<Void> connect(SocketAddress remote) // 连接服务器的方法,参数传递服务器的ip地址和端口号
/* 注意:
connect是一个非阻塞的方法,不会等待方法运行完毕,连接服务器成功在执行下边的代码
客户端连接服务器需要时间的,如果没有连接成功,就给服务器使用write方法发送数据,会抛出异常
*/
Future<Integer> write(ByteBuffer src) // 给服务器发送数据
Future<Integer> read(ByteBuffer dst) // 读取服务器发送的数据
java.util.concurrent.Future接口
接口中的方法:
boolean isDone() // 如果此任务完成,则返回 true
/* 返回true:连接服务器成功
返回false:还没有连接上服务器(客户端连接服务器是需要时间的)
*/
示例:AIO异步连接,异步阻塞读写
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/*
服务器端
*/
public class AIOTCPServer {
public static void main(String[] args) throws IOException, InterruptedException {
//1.获取异步非阻塞的服务器AsynchronousServerSocketChannel对象
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//2.使用bind方法给AsynchronousServerSocketChannel对象绑定指定的端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
//3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端的请求
System.out.println("accept方法开始执行了...");
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("客户端连接服务器成功...");
//读取客户端发送的数据 Future<Integer> read(ByteBuffer dst)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//read方法是一个阻塞的方法,会一直等待客户端发送数据
Future<Integer> future = result.read(buffer);
Integer len = 0;
try {
len = future.get();//获取客户端发送数据长度
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("服务器读取客户端发送的数据:"+new String(buffer.array(),0,len));
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接服务器失败...");
}
});
System.out.println("accept方法执行结束了...");
//accept方法是一个非阻塞的方法,我们执行完accept方法,可以去做其他的事情
//死循环的目的不让程序停止(工作中:写一些具体的需求);当有客户端请求服务器,会自动执行回调函数中的方法
while (true){
System.out.println("正在忙其他的事情!");
Thread.sleep(2000);
}
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;
/*
客户端
*/
public class AIOTCPClient {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建异步非阻塞的客户端AsynchronousSocketChannel对象
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//2.使用AsynchronousSocketChannel对象中的方法connect连接服务器
Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
System.out.println(future.isDone()); //false:还未连接上服务器
System.out.println(111);
//休眠5秒钟,等待客户端连接服务器成功,再给服务器发送数据
Thread.sleep(5000);
System.out.println(future.isDone()); //true:已经连接服务器成功
if(future.isDone()){
socketChannel.write(ByteBuffer.wrap("你好服务器".getBytes()));
}
//释放资源
socketChannel.close();
}
}
示例:AIO异步连接,异步非阻塞读写
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/*
服务器端
*/
public class AIOTCPServer {
public static void main(String[] args) throws IOException, InterruptedException {
//1.获取异步非阻塞的服务器AsynchronousServerSocketChannel对象
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//2.使用bind方法给AsynchronousServerSocketChannel对象绑定指定的端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
//3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端的请求
System.out.println("accept方法开始执行了...");
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("客户端连接服务器成功...");
ByteBuffer buffer = ByteBuffer.allocate(1024);
//当客户端连接服务器成功,read方法只会等待客户端10秒钟,10秒钟发送了数据,执行completed方法;10秒钟之后还没有发送数据,执行failed
result.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("服务器读取客户端发送数据成功,执行completed方法");
//服务器读取客户端发送的数据
String msg = new String(buffer.array(), 0, result);//把读取到的有效数据,转换为字符串
System.out.println("服务器读取客户端发送的数据为:"+msg);
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("服务器读取客户端发送数据失败,执行failed方法");
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接服务器失败...");
}
});
System.out.println("accept方法执行结束了...");
//accept方法是一个非阻塞的方法,我们执行完accept方法,可以去做其他的事情
//死循环的目的不让程序停止(工作中:写一些具体的需求);当有客户端请求服务器,会自动执行回调函数中的方法
while (true){
System.out.println("正在忙其他的事情!");
Thread.sleep(2000);
}
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;
/*
客户端
*/
public class AIOTCPClient {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建异步非阻塞的客户端AsynchronousSocketChannel对象
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//2.使用AsynchronousSocketChannel对象中的方法connect连接服务器
Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
System.out.println(future.isDone());//false:还未连接上服务器
System.out.println(111);
//休眠5秒钟,等待客户端连接服务器成功,再给服务器发送数据
Thread.sleep(15000);
System.out.println(future.isDone());//true:已经连接服务器成功
if(future.isDone()){
socketChannel.write(ByteBuffer.wrap("你好服务器".getBytes()));
}
//释放资源
socketChannel.close();
}
}