Netty入门到超神系列-Java NIO 三大核心(selector,channel,buffer)

简介: 理解Selector 和 ChannelSelector 选择器,也叫多路复用器,可以同时处理多个客户端连接,多路复用器采用轮询机制来选择有读写事件的客户端链接进行处理。通过 Selector ,一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。由于它的读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。

前言

上一章节我们理解了Java NIO三大核心,以及重点讲解了Buffer的原理和几个使用场景,其中也用到了channel。这一章我们来理解一下selector,结合channel来做一个c/s通信。

理解Selector 和 Channel

Selector 选择器,也叫多路复用器,可以同时处理多个客户端连接,多路复用器采用轮询机制来选择有读写事件的客户端链接进行处理。

通过 Selector ,一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

由于它的读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。

这里在整理一下它的工作原理,如图:

Selector :多路复用器(也叫选择器)的作用就是提供给SocketChannel通道来注册,然后Selector会轮询的去监听通道通道的读写事件从而做出相应的IO处理,它 的工作流程如下:

  1. 首先需要创建一个ServerSocketChannel,它类似于(ServerSocket)需要指定一个监听的IP和Port。需要注意的是ServerSocketChannel为了兼容BIO,默认是阻塞的,可以通过ServerSocketChannel#configureBlocking(false)来指定为NIO模式。
    通过ServerSocketChannel 可以获取一个客户端的SocketChannel ,客户端的SocketChannel 需要注册到Selector上,然后每个通道都会对应一个SelectionKey
  2. 选择器可以通过Selector.open() 创建,然后将 ServerSocketChannel 注册给Selector 。选择器的 Selector#select() 方法 可以选择有事件的通道(SocketChannel ),并返回已就绪的通道数量.事件类型包括:“连接”,“接收” ,“读”,“写”。
  3. 如果 Selector#select() 返回值大于0代表某些通道有事件发生,可以通过 selector.selectedKeys() 来得到所有有事件通道的SelectionKey。
    然后可以通过SelectionKey方向拿到SocketChannel , 从而将SocketChannel 中的数据读取到Buffer中,完成IO操作。

ServerSocketChannel

ServerSocketChannel 是服务端用来用来监听客户端Socket链接,通过accept方法可以获取客户端SocketChannel,从而将 SocketChannel 注册到Selector,相关方法如下

  • open : 创建一个 ServerSocketChannel 通道
  • bind(SocketAddress local):设置服务器端监听的地址和端口号
  • configureBlocking(boolean block) : 设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
  • accept() :接受一个连接,返回代表这个连接的通道对象SocketChannel
  • register(Selector sel, int ops): 把当前通道注册到选择器,并设置监听事件

SocketChannel

一个客户端链接服务端就会产生通道:ServerChannel ,需要注册到Selector,被Selector监听通道的读写事件。ServerChannel 负责具体的读写,把缓冲区的数据写入通道,或者把通道中的数据写入缓冲区。

  • open() : 得到一个 SocketChannel 通道
  • configureBlocking(boolean block):设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
  • connect(SocketAddress remote) :连接远程服务器,通过SocketAddress 指定IP和端口
  • finishConnect(): 如果connect连接失败,就要通过finishConnect方法完成连接操作
  • write(ByteBuffer src) : 把ByteBuffer中的数据,往通道里写
  • read(ByteBuffer dst) :从通道里读数据,写入ByteBuffer
  • register(Selector sel, int ops, Object att): 把该通道注册到selector,并设置监听事件(OPS),最后一个参数可以设置共享数据,该方法会返回一个 SelectionKey ,这个key对应了该通道。
  • close() :关闭通道

SelectionKey

当 ServerChannel 注册到Selector就会产生SelectionKey , 通过SelectionKey可以反向获得SocketChannel通道对象,从而进行IO读写操作。

  • selector(): 通过SelectionKey 获取得到与之关联的 Selector 对象
  • channel():得到与之关联的通道
  • attachment():得到与之关联的共享数据
  • interestOps(int ops):设置或改变监听事件
  • isAcceptable():是否可以 accept,“接收就绪”事件
  • isReadable():是否可以读
  • isWritable():是否可以写

事件包括:

SelectionKey.OP_CONNECT : 16,连接就绪,比如:一个channel连接到一个服务器SelectionKey.OP_ACCEPT8,接收就绪,比如:ServerSocketChannel准备好接入新的连接SelectionKey.OP_READ:值1,“读就绪”,通道有可以读数据可以说是SelectionKey.OP_WRITE4,“写就绪”,等待写数据的通道可以说

Selector

负责采用轮询方式监听通道,当通道有读写事件就进行IO操作。

  • open() :得到一个选择器对象
  • select(long timeout):选择有事件的通道,将其对应的 SelectionKey 加入到内部集合中,返回通道的数量
  • selectedKeys() : 从内部集合中得到所有有事件 SelectionKey
  • keys() : 从内部集合中得到所有SelectionKey
  • close:关闭选择器

综合案例

使用selector ,ServerSocketChannel,SocketChannel完成一个 C/S 案例,

服务端代码

//通道@TestpublicvoidserverSocketChannelTest() throwsIOException {
//创建服务端通道ServerSocketChannelserverSocketChannel=ServerSocketChannel.open();
//socket监听地址和端口SocketAddresssocketAddress=newInetSocketAddress("127.0.0.1",5000);
//和某个SocketAddress绑定serverSocketChannel.bind(socketAddress);
//NIO默认采用阻塞,为了兼容BIOserverSocketChannel.configureBlocking(false);
//创建选择器Selectorselector=Selector.open();
//通道注册到选择器,事件类型为:OP_ACCEPT “接受”serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//==选择器轮询=======================================================================while(true){
//select,选择有事件的通道,返回有事件发生通道的key的个数  ,超时时间 1sif(selector.select(1000) ==0){
System.out.println("无连接...轮询等待...\");continue;
       }
//有事件发生,得到有事件的通道的key的集合Set<SelectionKey>selectionKeys=selector.selectedKeys();
Iterator<SelectionKey>iterator=selectionKeys.iterator();
//遍历key的集合while (iterator.hasNext()){
//拿到每个通道的keySelectionKeykey=iterator.next();
//如果当前通道事件是: OP_ACCEPT ,就注册通道if(key.isAcceptable()){
//接收一个socketChannelSocketChannelsocketChannel=serverSocketChannel.accept();
System.out.println("客户端链接成功...");
socketChannel.configureBlocking(false);
//把socketChannel注册到选择器 ,并给通道绑定一个buffersocketChannel.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
           }
//如果通道事件是: OP_READ,说明通道有数据if(key.isReadable()){
//通过key得到SocketChannelSocketChannelchannel= (SocketChannel)key.channel();
//得到channel绑定的bufferByteBufferbyteBuffer= (ByteBuffer)key.attachment();
//从通道把数据读取到bufferchannel.read(byteBuffer);
System.out.println(newString(byteBuffer.array()));
           }
//删除当前keyiterator.remove();
       }
   }
}

客户端代码

//通道@TestpublicvoidsocketChannelTest() throwsIOException {
//创建一个SocketChannelSocketChannelsocketChannel=SocketChannel.open();
//使用非阻塞模式socketChannel.configureBlocking(false);
//链接的地址和端口InetSocketAddressinetSocketAddress=newInetSocketAddress("127.0.0.1", 5000);
//尝试链接,如果使用的异步,那么需要使用 socketChannel.finishConnect() 来确保连接成功。if(!socketChannel.connect(inetSocketAddress)){
//如果没链接成功,会通过while循环,直到 finishConnect 链接成功,跳出whilewhile(!socketChannel.finishConnect()){
System.out.println("还未完成链接...等待中...");
            }
        }
//链接成功,把数据写出去socketChannel.write(ByteBuffer.wrap("你好".getBytes()));
System.out.println("向服务端发送数据...");
//防止客户端结束,所以使用read()阻塞System.in.read();
    }

总结

本篇文章介绍了Selector ,ServerSocketChannel ,SocketChannel的作用,场景API,和三种的工作原理,并通过一个C/S综合案例来演示三者之间的关系。

目录
相关文章
|
13天前
|
Java 数据库连接 API
2025 更新必看:Java 编程基础入门级超级完整版指南
本教程为2025更新版Java编程基础入门指南,涵盖开发环境搭建(SDKMAN!管理JDK、VS Code配置)、Java 17+新特性(文本块、Switch表达式增强、Record类)、面向对象编程(接口默认方法、抽象类与模板方法)、集合框架深度应用(Stream API高级操作、并发集合)、模式匹配与密封类等。还包括学生成绩管理系统实战项目,涉及Maven构建、Lombok简化代码、JDBC数据库操作及JavaFX界面开发。同时提供JUnit测试、日志框架使用技巧及进阶学习资源推荐,助你掌握Java核心技术并迈向高级开发。
84 5
|
2月前
|
存储 安全 Java
【Java并发】【原子类】适合初学体质的原子类入门
什么是CAS? 说到原子类,首先就要说到CAS: CAS(Compare and Swap) 是一种无锁的原子操作,用于实现多线程环境下的安全数据更新。 CAS(Compare and Swap) 的
85 15
【Java并发】【原子类】适合初学体质的原子类入门
|
4月前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
276 60
【Java并发】【线程池】带你从0-1入门线程池
|
2月前
|
缓存 安全 Java
【Java并发】【ConcurrentHashMap】适合初学体质的ConcurrentHashMap入门
ConcurrentHashMap是Java中线程安全的哈希表实现,支持高并发读写操作。相比Hashtable,它通过分段锁(JDK1.7)或CAS+synchronized(JDK1.8)实现更细粒度锁控制,提升性能与安全性。本文详细介绍其构造方法、添加/获取/删除元素等常用操作,并对比JDK1.7和1.8的区别,帮助开发者深入理解与使用ConcurrentHashMap。欢迎关注,了解更多!
128 5
【Java并发】【ConcurrentHashMap】适合初学体质的ConcurrentHashMap入门
|
2月前
|
Java
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门
前言 有了前文对简单实用的学习 【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门 聪明的你,一定会想知道更多。哈哈哈哈哈,下面主播就...
65 6
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门
|
2月前
|
安全 Java
【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门
什么是ArrayBlockingQueue ArrayBlockingQueue是 Java 并发编程中一个基于数组实现的有界阻塞队列,属于 java.util.concurrent 包,实现了 Bl...
75 6
【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门
|
2月前
|
监控 Java API
【Java并发】【ReentrantLock】适合初学体质的ReentrantLock入门
前言 什么是ReentrantLock? ReentrantLock 是 Java 并发包 (java.util.concurrent.locks) 中的一个类,它实现了 Lock 接口,提供了与
125 10
【Java并发】【ReentrantLock】适合初学体质的ReentrantLock入门
|
2月前
|
安全 Java
【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门
前言 你是否在线程池工具类里看到过它的身影? 你是否会好奇LinkedBlockingQueue是啥呢? 没有关系,小手手点上关注,跟上主播的节奏。 什么是LinkedBlockingQueue? ...
73 1
【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门
|
3月前
|
设计模式 存储 安全
【Java并发】【AQS】适合初学者体质的AQS入门
AQS这是灰常重要的哈,很多JUC下的框架的核心,那都是我们的AQS,所以这里,我们直接开始先研究AQS。 那说到研究AQS,那我们应该,使用开始说起🤓 入门 什么是AQS? AQS(Abst
97 8
【Java并发】【AQS】适合初学者体质的AQS入门
|
3月前
|
缓存 安全 Java
【Java并发】【synchronized】适合初学者体质入门的synchronized
欢迎来到我的Java线程同步入门指南!我不是外包员工,梦想是写高端CRUD。2025年我正在沉淀中,博客更新速度加快,欢迎点赞、收藏、关注。 本文介绍Java中的`synchronized`关键字,适合初学者。`synchronized`用于确保多个线程访问共享资源时不会发生冲突,避免竞态条件、保证内存可见性、防止原子性破坏及协调多线程有序访问。
108 8
【Java并发】【synchronized】适合初学者体质入门的synchronized