NIO学习四-Selector

简介: 前面我们已经简单的学习了channel,知道channel作为通道,可以在通道中进行读写操作,同时知道ByteChannel是双向的。对于NIO的优势在于多路复用选择器上,在Nginx、Redis、Netty中都有多路复用的体现。因此学习Selector是有必要的。

前面我们已经简单的学习了channel,知道channel作为通道,可以在通道中进行读写操作,同时知道ByteChannel是双向的。对于NIO的优势在于多路复用选择器上,在Nginx、Redis、Netty中都有多路复用的体现。因此学习Selector是有必要的。

1.使用多路复用选择器的方式

/*** selector 选择器 多路复用,选择器结合selectable-channel实现非阻塞效果,提高效率* 可以将通道注册进选择器中,其主要注意是使用一个线程来对多个通道中的已就绪进行选择,然后就可以对选择* 的通道进行数据处理,属于一对多的关系*/publicclassSelectorTest {
publicstaticvoidmain(String[] args) throwsIOException {
//创建serverSocketChannel对象ServerSocketChannelserverSocketChannel=ServerSocketChannel.open();
//设置websocket通道为非阻塞方式serverSocketChannel.configureBlocking(false);
//获取websocketServerSocketserverSocket=serverSocketChannel.socket();
//进行绑定操作serverSocket.bind(newInetSocketAddress("localhost", 8888));
//核心代码开始Selectorselector=Selector.open();
SelectionKeykey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//核心代码结束System.out.println("selector="+selector);
System.out.println("key="+key);
serverSocket.close();
serverSocketChannel.close();
    }
}

通常的步骤是:打开ServerSocket通道,然后将通道配置成非阻塞模式,同时拿到socket进行绑定操作。然后打开选择器,将通道注册到选择器中,进行业务处理操作,然后关闭socket,如果需要长连接,此时就不关闭了。

2.判断当前是否向任何选择器进行了注册

/*** 判断注册的状态:判断当前是否向任何选择器进行了注册。可以看到新创建的通道总是未注册的*/publicclassSelectorTest1 {
publicstaticvoidmain(String[] args) throwsIOException {
//打开serverSocket通道,同时设置为非阻塞,拿到serverSocket,进行ip和端口绑定//将选择器打开,将选择器key进行注册,关闭socket和socket通道ServerSocketChannelserverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); //需要部分,通常需要将其设置为非阻塞ServerSocketserverSocket=serverSocketChannel.socket();
serverSocket.bind(newInetSocketAddress("localhost", 8888));
System.out.println("A isRegistered="+serverSocketChannel.isRegistered());
Selectorselector=Selector.open();
SelectionKeykey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("B isRegistered="+serverSocketChannel.isRegistered());
serverSocket.close();
serverSocketChannel.close();
    }
}

3.获取支持的socketOption列表

/*** 获取支持的socketOption列表* Set<SocketOption<?> supportedOption()方法:返回通道支持的Socket Option*/publicclassSelectorTest2 {
publicstaticvoidmain(String[] args) throwsIOException {
Threadt=newThread() {
publicvoidrun() {
try {
Thread.sleep(2000);
Socketsocket=newSocket("localhost", 8088);
socket.close();
                } catch (Exceptione) {
e.printStackTrace();
                }
            }
        };
t.start();
ServerSocketChannelserverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(newInetSocketAddress("localhost", 8088));
SocketChannelsocketChannel=serverSocketChannel.accept();
Set<SocketOption<?>>set1=serverSocketChannel.supportedOptions();
Set<SocketOption<?>>set2=socketChannel.supportedOptions();
Iteratoriterator1=set1.iterator();
Iteratoriterator2=set2.iterator();
System.out.println("ServerSocketChannel supportedOptions:");
while (iterator1.hasNext()) {
SocketOptioneach= (SocketOption) iterator1.next();
System.out.println(each.name() +" "+each.getClass().getName());
        }
System.out.println();
System.out.println();
System.out.println("SocketChannel supportedOptions:");
while (iterator2.hasNext()) {
SocketOptioneach1= (SocketOption) iterator2.next();
System.out.println(each1.name() +" "+each1.getClass().getName());
        }
socketChannel.close();
serverSocketChannel.close();
    }
}

4.进行socket地址获取、设置阻塞模式

/*** 进行socket地址获取、设置阻塞模式*/publicclassSocketAddressTest {
publicstaticvoidmain(String[] args) throwsIOException {
ServerSocketChannelserverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(newInetSocketAddress("localhost",8888));
InetSocketAddressaddress= (InetSocketAddress)serverSocketChannel.getLocalAddress();
//获取ip和端口System.out.println(address.getHostString());
System.out.println(address.getPort());
//查看阻塞模式System.out.println(serverSocketChannel.isBlocking());
serverSocketChannel.configureBlocking(false);
System.out.println(serverSocketChannel.isBlocking());
//获取选择器Selectorselector=Selector.open();
SelectionKeyselectionKey=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("A = "+selectionKey+" "+selectionKey.hashCode());
SelectionKeyselectionKey1=serverSocketChannel.keyFor(selector);
System.out.println("B = "+selectionKey1.hashCode());
serverSocketChannel.close();
    }
}

5.SelectionKey不是同一个对象

/*** 相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象*/publicclassSelectorKeyDemo {
publicstaticvoidmain(String[] args) throwsIOException {
//相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象selectionKeyTest1();
selectionKeyTest2();
    }
privatestaticvoidselectionKeyTest1() throwsIOException {
//打开ServerSocketChannelServerSocketChannelserverSocketChannel=ServerSocketChannel.open();
//进行ip和端口绑定serverSocketChannel.bind(newInetSocketAddress("localhost",8888));
//配置非阻塞状态serverSocketChannel.configureBlocking(false);
//打开选择器Selectorselector1=Selector.open();
Selectorselector2=Selector.open();
//将通道注册到选择器中,返回keySelectionKeyselectionKey1=serverSocketChannel.register(selector1,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey1="+selectionKey1.hashCode());
SelectionKeyselectionKey2=serverSocketChannel.register(selector2,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey2="+selectionKey2.hashCode());
serverSocketChannel.close();
    }
//不同的通道注册到相同的选择器中,返回的SelectionKey不是同一个对象privatestaticvoidselectionKeyTest2() throwsIOException {
ServerSocketChannelserverSocketChannel1=ServerSocketChannel.open();
serverSocketChannel1.bind(newInetSocketAddress("localhost",8888));
serverSocketChannel1.configureBlocking(false);
ServerSocketChannelserverSocketChannel2=ServerSocketChannel.open();
serverSocketChannel2.bind(newInetSocketAddress("localhost",8888));
serverSocketChannel2.configureBlocking(false);
Selectorselector=Selector.open();
SelectionKeyselectionKey1=serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey1="+selectionKey1.hashCode());
SelectionKeyselectionKey2=serverSocketChannel2.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("SelectionKey2="+selectionKey2.hashCode());
serverSocketChannel1.close();
serverSocketChannel2.close();
    }
}

6.获取selectorProvider

/*** 获取selectorProvider*/publicclassSelectorProviderTest {
publicstaticvoidmain(String[] args) throwsIOException {
SelectorProviderselectorProvider=SelectorProvider.provider();
System.out.println(selectorProvider);
ServerSocketChannelserverSocketChannel=null;
serverSocketChannel=serverSocketChannel.open();
SelectorProviderprovider=SelectorProvider.provider();
System.out.println(provider);
serverSocketChannel.close();
    }
}

学习了Selector,我们来学习应答模式案例

BIO模式下的客户端:

/*** BIO服务端*/publicclassBIOServer {
publicstaticvoidmain(String[] args) throwsIOException {
//创建一个ServerSocket对象,带端口ServerSocketserverSocket=newServerSocket(8888);
while(true){
//监听客户端,阻塞Socketsocket=serverSocket.accept();
//从serverSocket中拿到输入流,进行消息的接收,阻塞InputStreamis=socket.getInputStream();
byte[] b=newbyte[20];
is.read(b);
StringclientIp=socket.getInetAddress().getHostAddress();
System.out.println(clientIp+"说:"+newString(b).trim());
//从serverScoket中拿到输出流,进行消息的响应OutputStreamos=socket.getOutputStream();
os.write("你好,客户端".getBytes());
//关闭socketsocket.close();
        }
    }
}

BIO模式下的客户端

/*** BIO客户端*/publicclassBIOClient {
publicstaticvoidmain(String[] args) throwsIOException {
while (true){
//创建客户端socketSocketsocket=newSocket("localhost",8888);
//从客户端socket中拿到输出流,进行消息发送OutputStreamos=socket.getOutputStream();
System.out.println("输入信息:");
//你好,服务端Scannersc=newScanner(System.in);
Stringmsg=sc.nextLine();
os.write(msg.getBytes());
//从客户端socket中拿到输入流,进行消息回复InputStreamis=socket.getInputStream();
byte[] b=newbyte[20];
is.read(b);
System.out.println("服务端说:"+newString(b).trim());
        }
    }
}

运行:客户端输入

可以看到服务端

NIO的服务端

/*** NIO服务端*/publicclassNIOServer {
publicstaticvoidmain(String[] args) throwsIOException {
//开启ServerScoketChannelServerSocketChannelserverSocketChannel=ServerSocketChannel.open();
//开启selectorSelectorselector=Selector.open();
//绑定端口号serverSocketChannel.bind(newInetSocketAddress(8888));
//设置非阻塞模式serverSocketChannel.configureBlocking(false);
//将serverSocketChannel对象注册给Selector对象serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//进行操作while(true){
//如果在限定时间没有客户端的请求,则进行别的操作if(selector.select(2000)==0){
System.out.println("server:没有客户端信息需要处理,做别的事情");
continue;
            }
//拿到所以的selectionkey,进行迭代,获取SelectorKey,判断通道里的时间Iterator<SelectionKey>keyIterator=selector.selectedKeys().iterator();
while (keyIterator.hasNext()){
SelectionKeykey=keyIterator.next();
//可接收if(key.isAcceptable()){
System.out.println("OP_ACCEPT");
SocketChannelsocketChannel=serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
//可读if (key.isReadable()){
SocketChannelchannel= (SocketChannel) key.channel();
ByteBufferbuffer= (ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println("客户端发来请求:"+newString(buffer.array()));
                }
//移除所有的keykeyIterator.remove();
            }
        }
    }
}

NIO的客户端

/*** NIO客户端*/publicclassNIOClient {
publicstaticvoidmain(String[] args) throwsIOException {
//开启网络通道SocketChannelchannel=SocketChannel.open();////设置非阻塞channel.configureBlocking(false);
//绑定ip和端口InetSocketAddressaddress=newInetSocketAddress("localhost",8888);
if(!channel.connect(address)){
while (!channel.finishConnect()){
System.out.println("连接服务器socket进行对话,做别的事情");
            }
//获取缓冲区并存入数据Stringmsg="hello,l'm Client";
ByteBufferwiterBuffer=ByteBuffer.wrap(msg.getBytes());
//发送数据信息channel.write(witerBuffer);
System.in.read();
        }
    }
}

基于NIO的聊天:

服务器端

/*** 聊天室服务端*/publicclassChatServer {
privateServerSocketChannellistenerChannel; //监听通道  老大privateSelectorselector;//选择器对象  间谍privatestaticfinalintPORT=9999; //服务器端口//构造方法publicChatServer() {
try {
// 1. 得到监听通道listenerChannel=ServerSocketChannel.open();
// 2. 得到选择器selector=Selector.open();
// 3. 绑定端口listenerChannel.bind(newInetSocketAddress(PORT));
// 4. 设置为非阻塞模式listenerChannel.configureBlocking(false);
// 5. 将选择器绑定到监听通道并监听accept事件listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
printInfo("Chat Server is ready.......");
        } catch (IOExceptione) {
e.printStackTrace();
        }
    }
//6.业务处理,首先匹配selectorkey的状态,是连接请求事件还是读取数据事件//如果是连接请求事件,则进行key的迭代,进行连接请求操作,否者进行数据的读取//读取完成或者请求之后,将selectorkey进行删除,避免重复处理publicvoidstart() throwsException{
try {
while (true) { //不停监控if (selector.select(2000) ==0) {
System.out.println("Server:没有客户端找我, 我就干别的事情");
continue;
                }
Iterator<SelectionKey>iterator=selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKeykey=iterator.next();
if (key.isAcceptable()) { //连接请求事件SocketChannelsc=listenerChannel.accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress().toString().substring(1)+"上线了...");
                    }
if (key.isReadable()) { //读取数据事件readMsg(key);
                    }
//一定要把当前key删掉,防止重复处理iterator.remove();
                }
            }
        } catch (IOExceptione) {
e.printStackTrace();
        }
    }
//读取客户端发来的消息并广播出去publicvoidreadMsg(SelectionKeykey) throwsException{
SocketChannelchannel=(SocketChannel) key.channel();
ByteBufferbuffer=ByteBuffer.allocate(1024);
intcount=channel.read(buffer);
if(count>0){
Stringmsg=newString(buffer.array());
printInfo(msg);
//发广播broadCast(channel,msg);
        }
    }
//给所有的客户端发广播publicvoidbroadCast(SocketChannelexcept,Stringmsg) throwsException{
System.out.println("服务器发送了广播...");
for(SelectionKeykey:selector.keys()){
ChanneltargetChannel=key.channel();
if(targetChannelinstanceofSocketChannel&&targetChannel!=except){
SocketChanneldestChannel=(SocketChannel)targetChannel;
ByteBufferbuffer=ByteBuffer.wrap(msg.getBytes());
destChannel.write(buffer);
            }
        }
    }
privatevoidprintInfo(Stringstr) { //往控制台打印消息SimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("["+sdf.format(newDate()) +"] -> "+str);
    }
publicstaticvoidmain(String[] args) throwsException {
newChatServer().start();
    }
}

客户端

//聊天程序客户端publicclassChatClient {
privatefinalStringHOST="127.0.0.1"; //服务器地址privateintPORT=9999; //服务器端口privateSocketChannelsocketChannel; //网络通道privateStringuserName; //聊天用户名//构造方法publicChatClient() throwsIOException {
//1. 得到一个网络通道socketChannel=SocketChannel.open();
//2. 设置非阻塞方式socketChannel.configureBlocking(false);
//3. 提供服务器端的IP地址和端口号InetSocketAddressaddress=newInetSocketAddress(HOST,PORT);
//4. 连接服务器端if(!socketChannel.connect(address)){
while(!socketChannel.finishConnect()){  //nio作为非阻塞式的优势System.out.println("Client:连接服务器端的同时,我还可以干别的一些事情");
            }
        }
//5. 得到客户端IP地址和端口信息,作为聊天用户名使用userName=socketChannel.getLocalAddress().toString().substring(1);
System.out.println("---------------Client("+userName+") is ready---------------");
    }
//向服务器端发送数据publicvoidsendMsg(Stringmsg) throwsException{
if(msg.equalsIgnoreCase("bye")){
socketChannel.close();
return;
        }
msg=userName+"说:"+msg;
ByteBufferbuffer=ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);
    }
//从服务器端接收数据publicvoidreceiveMsg() throwsException{
ByteBufferbuffer=ByteBuffer.allocate(1024);
intsize=socketChannel.read(buffer);
if(size>0){
Stringmsg=newString(buffer.array());
System.out.println(msg.trim());
        }
    }
}
//启动聊天程序客户端
public class TestChat {
    public static void main(String[] args) throws Exception {
        ChatClient chatClient=new ChatClient();
        new Thread(){
            public void run(){
                while(true){
                    try {
                        chatClient.receiveMsg();
                        Thread.sleep(2000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }.start();
        Scanner scanner=new Scanner(System.in);
        while (scanner.hasNextLine()){
            String msg=scanner.nextLine();
            chatClient.sendMsg(msg);
        }
    }
}

启动运行:

客户端输入信息和服务端看到的信息


微信图片_20221214022717.png

微信图片_20221214022723.png


目录
相关文章
|
9月前
|
Java
NIO之Selector解读
NIO之Selector解读
|
4月前
|
监控 网络协议 Java
NIO - 灵魂核心之选择器Selector(多路复用器)
NIO - 灵魂核心之选择器Selector(多路复用器)
75 1
|
9月前
|
存储 网络协议 Java
Netty入门到超神系列-Java NIO 三大核心(selector,channel,buffer)
选择器,也叫多路复用器,Java的NIO通过selector实现一个线程处理多个客户端链接,多个channel可以注册到同一个Selector,Selector能够监测到channel上是否有读/写事件发生,从而获取事件和对事件进行处理,所以Selector切到哪个channel是由事件决定的。当线程从某个客户端通道未读取到数据时,可以把空闲时间用来做其他任务,性能得到了提升。
100 0
|
9月前
|
弹性计算 Java API
Netty入门到超神系列-Java NIO 三大核心(selector,channel,buffer)
理解Selector 和 Channel Selector 选择器,也叫多路复用器,可以同时处理多个客户端连接,多路复用器采用轮询机制来选择有读写事件的客户端链接进行处理。 通过 Selector ,一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。 由于它的读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
191 0
|
缓存 网络协议 Java
Java NIO学习(二):Channel通道
Java NIO 的通道类似流,但又有些不同:
143 0
Java NIO学习(二):Channel通道
|
设计模式 缓存 网络协议
Java NIO学习(一):Java NIO概述
IO 的操作方式通常分为几种:同步阻塞 BIO、同步非阻塞 NIO、异步非阻塞 AIO。
129 0
Java NIO学习(一):Java NIO概述
|
Java 测试技术 容器
NIO 下的 ByteBuffer简单学习
NIO 下的 ByteBuffer简单学习
99 0
|
Java API
NIO学习三-Channel
在学习NIO时,ByteBuffer、Channel、Selector三个组件是必须了解的。前面我们说到ByteBuffer是作为缓冲区进行数据的存放或者获取。通常我们需要进行flip翻转操作,但是这个在Netty中,有一个更为强大的类可以替代ByteBuf,其不需要进行翻转,也可以进行读写的双向操作。要将数据打包到缓冲区中,通常需要使用通道,而通道作为传输数据的载体,也即它可以使数据从一端到另一端,因此就必须进行了解。 Channel中,我们也看到其子类有很多,通常都是用于读写操作的。其中ByteChannel可以进行读写操作,也即可以进行双向操作。 操作过程:首先创建流对象,有了流对象获取
62 0
NIO学习三-Channel
NIO学习二-ByteBuffer
前面我们已经了解到Buffer中,0<=mark<=postion<=limit<=capacity。其中mark是标记,如果为-1时丢弃。postion是当前位置,limit是限制,也即上界。capacity是容量。同时了解了直接缓冲区与缓冲区的底层实现是不同的,缓冲区是基于数组的,而直接缓冲区是基于内存的。同时可以基于反射,拿到cleaner,进而拿到clean进行清理。同时clear是还原缓冲区的状态,flip是反转缓冲区的,rewind重绕缓冲区,标记清除。remianing对剩余元素的个数记录。offset获取偏移量。 ByteBuffer是Buffer的子类,可以在缓冲区以字节为单
81 0
NIO学习二-ByteBuffer
|
存储 索引
NIO学习一
NIO相比普通IO提供了功能更为强大、处理数据更快的解决方案。 常用于高性能服务器上。NIO实现高性能处理的原理是使用较少的线程来处理更多的任务 常规io使用的byte[]、char[]进行封装,而NIO采用ByteBuffer类来操作数据,再结合 针对File或socket技术的channel,采用同步非阻塞技术来实现高性能处理,而Netty 正是采用ByteBuffer(缓冲区)、Channel(通道)、Selector(选择器)进行封装的。 因此我们需要先了解NIO相关的知识。
84 0
NIO学习一