java的nio之:java的nio的服务器实现模型

简介: 【nio服务端序列图】 一:nio服务器启动类 1 package com.yeepay.sxf.testnio; 2 /** 3 * nio创建的的timerServer服务器 4 * 5 * @author sxf 6 * 7 */ 8 p...

【nio服务端序列图】

一:nio服务器启动类

 1 package com.yeepay.sxf.testnio;
 2 /**
 3  * nio创建的的timerServer服务器
 4  * 
 5  * @author sxf
 6  *
 7  */
 8 public class NIOTimerServer {
 9     
10     /**
11      * nio服务器启动的入口
12      * @param args
13      */
14     public static void main(String[] args) {
15         //启动服务器绑定的端口号
16         int port=8000;
17         //获取端口号
18         if(args!=null && args.length>0){
19             try {
20                 port=Integer.valueOf(args[0]);
21             } catch (Exception e) {
22                 e.printStackTrace();
23             }
24         }
25         
26         //新建nio服务器类
27         MultiplexerTimerServer timerServer=new MultiplexerTimerServer(port);
28         
29         //启动服务类的主线程
30         new Thread(timerServer,"NIO-MultiplexerTimerServer-001").start();
31     }
32 }
View Code

二:nio服务器

  1 package com.yeepay.sxf.testnio;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.SelectionKey;
  8 import java.nio.channels.Selector;
  9 import java.nio.channels.ServerSocketChannel;
 10 import java.nio.channels.SocketChannel;
 11 import java.util.Date;
 12 import java.util.Iterator;
 13 import java.util.Set;
 14 
 15 import com.sun.org.apache.xml.internal.utils.StopParseException;
 16 
 17 /**
 18  * nio的时间服务器
 19  * @author sxf
 20  *
 21  */
 22 public class MultiplexerTimerServer implements Runnable {
 23     
 24     //选择器
 25     private Selector selector;
 26 
 27     //
 28     private ServerSocketChannel serverSocketChannel;
 29     
 30     private volatile boolean stop;
 31     
 32     //启动服务
 33     public MultiplexerTimerServer(int port){
 34         try {
 35             //初始化多路复用器
 36             selector=Selector.open();
 37             //初始化socket通道
 38             serverSocketChannel=ServerSocketChannel.open();
 39             //设置通道为非阻塞模式
 40             serverSocketChannel.configureBlocking(false);
 41             //将该通道绑定地址和端口号
 42             serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
 43             //将该通道注册到多路复用器,并注册链接请求事件
 44             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 45             System.out.println("The time server is start in port:"+port);
 46         } catch (Exception e) {
 47             // TODO: handle exception
 48             e.printStackTrace();
 49             System.exit(1);
 50         }
 51     }
 52     
 53     /**
 54      * 停止服务器
 55      */
 56     public void stop(){
 57         this.stop=true;
 58     }
 59     
 60     
 61     /**
 62      * 服务器运行主体
 63      */
 64     @Override
 65     public void run() {
 66         while(!stop){
 67             try {
 68                 System.out.println("MultiplexerTimerServer.run()");
 69                 //select()阻塞到至少有一个通道在你注册的事件上就绪了。
 70                 selector.select();
 71                 //获取注册在这个多路复用器上的已经就绪的通道的集合
 72                 Set<SelectionKey> selectionKeys=selector.selectedKeys();
 73                 //循环迭代已经就绪的通道集合
 74                 Iterator<SelectionKey> it=selectionKeys.iterator();
 75                 SelectionKey key=null;
 76                 while(it.hasNext()){
 77                     key=it.next();
 78                     //防止重复执行通道事件
 79                     it.remove();
 80                     //处理该通道上的事件
 81                     try {
 82                         handleInput(key);
 83                     } catch (Exception e) {
 84                         if(key!=null){
 85                             key.cancel();
 86                             if(key.channel()!=null){
 87                                 key.channel().close();
 88                             }
 89                         }
 90                     }
 91                 }
 92                 
 93             } catch (Exception e) {
 94                 e.printStackTrace();
 95             }
 96         
 97             
 98         }
 99     }
100 
101     
102     /**
103      * 处理请求的事件
104      * @param key
105      * @throws IOException
106      */
107     private void handleInput(SelectionKey key) throws IOException{
108         if(key.isValid()){
109             //处理新接入的请求消息
110             if(key.isAcceptable()){
111                 //请求链接事件就绪
112                 ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
113                 SocketChannel  sc=ssc.accept();
114                 sc.configureBlocking(false);
115                 //在多路复用器上注册一个soketChannel,当有读事件则触发
116                 sc.register(selector, SelectionKey.OP_READ);
117             }
118             
119             if(key.isReadable()){
120                 //读事件就绪
121                 SocketChannel sc=(SocketChannel) key.channel();
122                 //声明一个缓冲区
123                 ByteBuffer readBuffer=ByteBuffer.allocate(1024);
124                 //从通道里读取数据写入缓冲区
125                 int readBytes=sc.read(readBuffer);
126                 //readBytes>0:表示读到了字节,对字节进行编解码。
127                 //readBytes=0:没有读取到字节,属于正常场景,忽略
128                 //readBytes=-1;链路已经关闭,需要关闭socketChannel,释放资源
129                 if(readBytes>0){
130                     //将ByteBuffer的limit设置为position,position设置为0
131                     readBuffer.flip();
132                     //编解码数据
133                     byte[] bytes=new byte[readBuffer.remaining()];
134                     //将数据从缓冲区复制到数组里
135                     readBuffer.get(bytes);
136                     //翻译请求的内容
137                     String body=new String(bytes,"UTF-8");
138                     //打印请求的内容
139                     System.out.println("the timerserver receive order:"+body);
140                 
141                     //处理请求内容
142                     String currentTime=null;
143                     if("shangxiaofei".equals(body)){
144                         currentTime=new Date().toString();
145                     }else{
146                         currentTime="request param is error";
147                     }
148                     
149                     //将处理的结果响应给客户端
150                     doWrite(sc, currentTime);
151                 }else if(readBytes<0){
152                     //对链路进行关闭
153                     key.cancel();
154                     sc.close();
155                 }else{
156                     //忽略
157                 }
158             }
159         }
160     }
161     
162     /**
163      * 响应请求的内容
164      * @param channel
165      * @param response
166      * @throws IOException 
167      */
168     private void doWrite(SocketChannel channel,String response) throws IOException{
169         if(response!=null&&response.trim().length()>0){
170             //将响应的内容转化成byte[]
171             byte[] bytes=response.getBytes();
172             //声明缓冲区
173             ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
174             //将数据写入缓冲区
175             writeBuffer.put(bytes);
176             //修改ByteBuffer的imit设置为position,position设置为0
177             writeBuffer.flip();
178             //将数据从缓冲区写入通道
179             channel.write(writeBuffer);
180         }
181     }
182     
183     
184 }
View Code

【nio客户端序列图】

 

三:nio服务器客户端启动类

 1 package com.yeepay.sxf.testnio;
 2 
 3 
 4 /**
 5  * 向TimerServer发送请求的客户端
 6  * @author sxf
 7  *
 8  */
 9 public class NIOTimerClient {
10     
11     public static void main(String[] args) {
12         int port=8000;
13         
14         if(args!=null&&args.length>0){
15             port=Integer.valueOf(args[0]);
16         }
17         new Thread(new TimerClientHandler("127.0.0.1", port),"TimeClient-001").start();
18     }
19 }
View Code

四:nio服务器的客户端

  1 package com.yeepay.sxf.testnio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.SocketChannel;
  9 import java.util.Iterator;
 10 import java.util.Set;
 11 
 12 /**
 13  * timerclient请求线程
 14  * @author sxf
 15  *
 16  */
 17 public class TimerClientHandler implements Runnable{
 18     //链接timer服务器的ip地址
 19     private String host;
 20     //链接timer服务器服务的端口号
 21     private int port;
 22     //多路复用器
 23     private Selector selector;
 24     //通道
 25     private SocketChannel socketChannel;
 26     //当前请求线程是否停止
 27     private volatile boolean stop;
 28     
 29     
 30     public TimerClientHandler(String host,int port) {
 31         this.host=host==null?"127.0.0.1":host;
 32         this.port=port;
 33         try {
 34             this.selector=Selector.open();
 35             this.socketChannel=SocketChannel.open();
 36             socketChannel.configureBlocking(false);
 37         } catch (Exception e) {
 38             e.printStackTrace();
 39             System.exit(1);
 40         }
 41     }
 42     
 43     /**
 44      * 链接时间服务器
 45      * @throws IOException 
 46      */
 47     private void doConnect() throws IOException{
 48         if(socketChannel.connect(new InetSocketAddress(host, port))){
 49             socketChannel.register(selector, SelectionKey.OP_READ);
 50             //doWrite(socketChannel);
 51         }else{
 52             socketChannel.register(selector, SelectionKey.OP_CONNECT);
 53         }
 54     }
 55     
 56     /**
 57      * 向时间服务器发送请求
 58      * @param sc
 59      * @throws IOException 
 60      */
 61     private void doWrite(SocketChannel sc) throws IOException{
 62         //发送请求的请求内容
 63         byte[] req="shangxiaofei".getBytes();
 64         //声明缓冲区
 65         ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
 66         //将请求体写入缓冲区
 67         writeBuffer.put(req);
 68         //设置limit
 69         writeBuffer.flip();
 70         //将缓冲区的内容写入通道
 71         sc.write(writeBuffer);
 72         if(!writeBuffer.hasRemaining()){
 73             System.out.println("send order to server success........");
 74         }
 75         
 76     }
 77     
 78     
 79     private void handleInput(SelectionKey key) throws IOException{
 80         if(key.isValid()){
 81             //判断链接是否成功
 82             SocketChannel sc=(SocketChannel) key.channel();
 83         
 84                 //链接事件就绪
 85                 if(sc.finishConnect()){
 86                     //是否链接完成
 87                     sc.register(selector, SelectionKey.OP_READ);
 88                     doWrite(sc);
 89                 }else{
 90                     //链接失败,进程退出
 91                     System.exit(1);
 92                 }
 93                 
 94                 if(key.isReadable()){
 95                     //读事件就绪
 96                     ByteBuffer readBuffer=ByteBuffer.allocate(1024);
 97                     int readBytes=sc.read(readBuffer);
 98                     if(readBytes>0){
 99                         readBuffer.flip();
100                         byte[] bytes=new byte[readBuffer.remaining()];
101                         readBuffer.get(bytes);
102                         String body=new String(bytes,"UTF-8");
103                         System.out.println("TimerServer response:"+body);
104                         this.stop=true;
105                     }else if(readBytes<0){
106                         //对端链路关闭
107                         key.cancel();
108                         sc.close();
109                     }else{
110                         //读到0字节,忽略
111                     }
112                 }
113             
114         }
115     }
116     
117     @Override
118     public void run() {
119         try {
120             //链接并发送请求
121             doConnect();
122         } catch (Exception e) {
123             // TODO: handle exception
124             e.printStackTrace();
125         }
126         
127         while(!stop){
128             try {
129                 //等待响应
130                 selector.select();
131                 //获取已经就绪的通道事件集合,在这个多路复用器上
132                 Set<SelectionKey> selectedKeys=selector.selectedKeys();
133                 //循环迭代处理事件集合
134                 Iterator<SelectionKey> it=selectedKeys.iterator();
135                 SelectionKey key=null;
136                 while (it.hasNext()) {
137                     key=it.next();
138                     it.remove();
139                     try {
140                         handleInput(key);
141                     } catch (Exception e) {
142                         e.printStackTrace();
143                     }
144                     
145                 }
146             } catch (Exception e) {
147                 e.printStackTrace();
148             }
149         }
150         
151         //多路复用器关闭后,所有注册在上面的channel和Pipe等资源都会被自动去注册并关闭
152         //所以不需要重复释放资源
153 //        if(selector!=null){
154 //            try {
155 //                selector.close();
156 //            } catch (Exception e) {
157 //                e.printStackTrace();
158 //            }
159 //        }
160         
161     }
162     
163 
164 }
View Code

 

相关文章
|
2月前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
40 2
|
5天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
8 1
|
1月前
|
Java Linux
java读取linux服务器下某文档的内容
java读取linux服务器下某文档的内容
33 3
java读取linux服务器下某文档的内容
|
16天前
|
Java
让星星⭐月亮告诉你,Java NIO之Buffer详解 属性capacity/position/limit/mark 方法put(X)/get()/flip()/compact()/clear()
这段代码演示了Java NIO中`ByteBuffer`的基本操作,包括分配、写入、翻转、读取、压缩和清空缓冲区。通过示例展示了`position`、`limit`和`mark`属性的变化过程,帮助理解缓冲区的工作原理。
20 2
|
16天前
|
运维 Java Linux
【运维基础知识】Linux服务器下手写启停Java程序脚本start.sh stop.sh及详细说明
### 启动Java程序脚本 `start.sh` 此脚本用于启动一个Java程序,设置JVM字符集为GBK,最大堆内存为3000M,并将程序的日志输出到`output.log`文件中,同时在后台运行。 ### 停止Java程序脚本 `stop.sh` 此脚本用于停止指定名称的服务(如`QuoteServer`),通过查找并终止该服务的Java进程,输出操作结果以确认是否成功。
22 1
|
2月前
|
存储 网络协议 Java
Java NIO 开发
本文介绍了Java NIO(New IO)及其主要组件,包括Channel、Buffer和Selector,并对比了NIO与传统IO的优势。文章详细讲解了FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel及Pipe.SinkChannel和Pipe.SourceChannel等Channel实现类,并提供了示例代码。通过这些示例,读者可以了解如何使用不同类型的通道进行数据读写操作。
Java NIO 开发
|
22天前
|
分布式计算 资源调度 Hadoop
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
56 4
|
22天前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
78 4
|
2月前
|
Java
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
45 4
|
2月前
|
Kubernetes Java Maven
揭秘无服务器革命:Quarkus如何让Java应用在云端“零”负担起飞?
本文介绍如何使用Quarkus从零开始开发无服务器应用,通过示例代码和详细步骤引导读者掌握这一技术。无服务器架构让开发者无需管理服务器,具有自动扩展和成本效益等优势。Quarkus作为Kubernetes Native Java框架,优化了Java应用的启动速度和内存使用,适合无服务器环境。文章涵盖环境搭建、项目创建及部署全流程,并介绍了Quarkus的扩展性和监控工具,助力高效开发与应用性能提升。
41 9