java的nio之:java的nio的服务器实现模型

简介: 【nio服务端序列图】 一:nio服务器启动类 1 package com.yeepay.sxf.testnio; 2 /** 3 * nio创建的的timerServer服务器 4 * 5 * @author sxf 6 * 7 */ 8 p...

【nio服务端序列图】

一:nio服务器启动类

 1 package com.yeepay.sxf.testnio;
 2 /**
 3  * nio创建的的timerServer服务器
 4  * 
 5  * @author sxf
 6  *
 7  */
 8 public class NIOTimerServer {
 9     
10     /**
11      * nio服务器启动的入口
12      * @param args
13      */
14     public static void main(String[] args) {
15         //启动服务器绑定的端口号
16         int port=8000;
17         //获取端口号
18         if(args!=null && args.length>0){
19             try {
20                 port=Integer.valueOf(args[0]);
21             } catch (Exception e) {
22                 e.printStackTrace();
23             }
24         }
25         
26         //新建nio服务器类
27         MultiplexerTimerServer timerServer=new MultiplexerTimerServer(port);
28         
29         //启动服务类的主线程
30         new Thread(timerServer,"NIO-MultiplexerTimerServer-001").start();
31     }
32 }
View Code

二:nio服务器

  1 package com.yeepay.sxf.testnio;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.SelectionKey;
  8 import java.nio.channels.Selector;
  9 import java.nio.channels.ServerSocketChannel;
 10 import java.nio.channels.SocketChannel;
 11 import java.util.Date;
 12 import java.util.Iterator;
 13 import java.util.Set;
 14 
 15 import com.sun.org.apache.xml.internal.utils.StopParseException;
 16 
 17 /**
 18  * nio的时间服务器
 19  * @author sxf
 20  *
 21  */
 22 public class MultiplexerTimerServer implements Runnable {
 23     
 24     //选择器
 25     private Selector selector;
 26 
 27     //
 28     private ServerSocketChannel serverSocketChannel;
 29     
 30     private volatile boolean stop;
 31     
 32     //启动服务
 33     public MultiplexerTimerServer(int port){
 34         try {
 35             //初始化多路复用器
 36             selector=Selector.open();
 37             //初始化socket通道
 38             serverSocketChannel=ServerSocketChannel.open();
 39             //设置通道为非阻塞模式
 40             serverSocketChannel.configureBlocking(false);
 41             //将该通道绑定地址和端口号
 42             serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
 43             //将该通道注册到多路复用器,并注册链接请求事件
 44             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 45             System.out.println("The time server is start in port:"+port);
 46         } catch (Exception e) {
 47             // TODO: handle exception
 48             e.printStackTrace();
 49             System.exit(1);
 50         }
 51     }
 52     
 53     /**
 54      * 停止服务器
 55      */
 56     public void stop(){
 57         this.stop=true;
 58     }
 59     
 60     
 61     /**
 62      * 服务器运行主体
 63      */
 64     @Override
 65     public void run() {
 66         while(!stop){
 67             try {
 68                 System.out.println("MultiplexerTimerServer.run()");
 69                 //select()阻塞到至少有一个通道在你注册的事件上就绪了。
 70                 selector.select();
 71                 //获取注册在这个多路复用器上的已经就绪的通道的集合
 72                 Set<SelectionKey> selectionKeys=selector.selectedKeys();
 73                 //循环迭代已经就绪的通道集合
 74                 Iterator<SelectionKey> it=selectionKeys.iterator();
 75                 SelectionKey key=null;
 76                 while(it.hasNext()){
 77                     key=it.next();
 78                     //防止重复执行通道事件
 79                     it.remove();
 80                     //处理该通道上的事件
 81                     try {
 82                         handleInput(key);
 83                     } catch (Exception e) {
 84                         if(key!=null){
 85                             key.cancel();
 86                             if(key.channel()!=null){
 87                                 key.channel().close();
 88                             }
 89                         }
 90                     }
 91                 }
 92                 
 93             } catch (Exception e) {
 94                 e.printStackTrace();
 95             }
 96         
 97             
 98         }
 99     }
100 
101     
102     /**
103      * 处理请求的事件
104      * @param key
105      * @throws IOException
106      */
107     private void handleInput(SelectionKey key) throws IOException{
108         if(key.isValid()){
109             //处理新接入的请求消息
110             if(key.isAcceptable()){
111                 //请求链接事件就绪
112                 ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
113                 SocketChannel  sc=ssc.accept();
114                 sc.configureBlocking(false);
115                 //在多路复用器上注册一个soketChannel,当有读事件则触发
116                 sc.register(selector, SelectionKey.OP_READ);
117             }
118             
119             if(key.isReadable()){
120                 //读事件就绪
121                 SocketChannel sc=(SocketChannel) key.channel();
122                 //声明一个缓冲区
123                 ByteBuffer readBuffer=ByteBuffer.allocate(1024);
124                 //从通道里读取数据写入缓冲区
125                 int readBytes=sc.read(readBuffer);
126                 //readBytes>0:表示读到了字节,对字节进行编解码。
127                 //readBytes=0:没有读取到字节,属于正常场景,忽略
128                 //readBytes=-1;链路已经关闭,需要关闭socketChannel,释放资源
129                 if(readBytes>0){
130                     //将ByteBuffer的limit设置为position,position设置为0
131                     readBuffer.flip();
132                     //编解码数据
133                     byte[] bytes=new byte[readBuffer.remaining()];
134                     //将数据从缓冲区复制到数组里
135                     readBuffer.get(bytes);
136                     //翻译请求的内容
137                     String body=new String(bytes,"UTF-8");
138                     //打印请求的内容
139                     System.out.println("the timerserver receive order:"+body);
140                 
141                     //处理请求内容
142                     String currentTime=null;
143                     if("shangxiaofei".equals(body)){
144                         currentTime=new Date().toString();
145                     }else{
146                         currentTime="request param is error";
147                     }
148                     
149                     //将处理的结果响应给客户端
150                     doWrite(sc, currentTime);
151                 }else if(readBytes<0){
152                     //对链路进行关闭
153                     key.cancel();
154                     sc.close();
155                 }else{
156                     //忽略
157                 }
158             }
159         }
160     }
161     
162     /**
163      * 响应请求的内容
164      * @param channel
165      * @param response
166      * @throws IOException 
167      */
168     private void doWrite(SocketChannel channel,String response) throws IOException{
169         if(response!=null&&response.trim().length()>0){
170             //将响应的内容转化成byte[]
171             byte[] bytes=response.getBytes();
172             //声明缓冲区
173             ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
174             //将数据写入缓冲区
175             writeBuffer.put(bytes);
176             //修改ByteBuffer的imit设置为position,position设置为0
177             writeBuffer.flip();
178             //将数据从缓冲区写入通道
179             channel.write(writeBuffer);
180         }
181     }
182     
183     
184 }
View Code

【nio客户端序列图】

 

三:nio服务器客户端启动类

 1 package com.yeepay.sxf.testnio;
 2 
 3 
 4 /**
 5  * 向TimerServer发送请求的客户端
 6  * @author sxf
 7  *
 8  */
 9 public class NIOTimerClient {
10     
11     public static void main(String[] args) {
12         int port=8000;
13         
14         if(args!=null&&args.length>0){
15             port=Integer.valueOf(args[0]);
16         }
17         new Thread(new TimerClientHandler("127.0.0.1", port),"TimeClient-001").start();
18     }
19 }
View Code

四:nio服务器的客户端

  1 package com.yeepay.sxf.testnio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.SocketChannel;
  9 import java.util.Iterator;
 10 import java.util.Set;
 11 
 12 /**
 13  * timerclient请求线程
 14  * @author sxf
 15  *
 16  */
 17 public class TimerClientHandler implements Runnable{
 18     //链接timer服务器的ip地址
 19     private String host;
 20     //链接timer服务器服务的端口号
 21     private int port;
 22     //多路复用器
 23     private Selector selector;
 24     //通道
 25     private SocketChannel socketChannel;
 26     //当前请求线程是否停止
 27     private volatile boolean stop;
 28     
 29     
 30     public TimerClientHandler(String host,int port) {
 31         this.host=host==null?"127.0.0.1":host;
 32         this.port=port;
 33         try {
 34             this.selector=Selector.open();
 35             this.socketChannel=SocketChannel.open();
 36             socketChannel.configureBlocking(false);
 37         } catch (Exception e) {
 38             e.printStackTrace();
 39             System.exit(1);
 40         }
 41     }
 42     
 43     /**
 44      * 链接时间服务器
 45      * @throws IOException 
 46      */
 47     private void doConnect() throws IOException{
 48         if(socketChannel.connect(new InetSocketAddress(host, port))){
 49             socketChannel.register(selector, SelectionKey.OP_READ);
 50             //doWrite(socketChannel);
 51         }else{
 52             socketChannel.register(selector, SelectionKey.OP_CONNECT);
 53         }
 54     }
 55     
 56     /**
 57      * 向时间服务器发送请求
 58      * @param sc
 59      * @throws IOException 
 60      */
 61     private void doWrite(SocketChannel sc) throws IOException{
 62         //发送请求的请求内容
 63         byte[] req="shangxiaofei".getBytes();
 64         //声明缓冲区
 65         ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
 66         //将请求体写入缓冲区
 67         writeBuffer.put(req);
 68         //设置limit
 69         writeBuffer.flip();
 70         //将缓冲区的内容写入通道
 71         sc.write(writeBuffer);
 72         if(!writeBuffer.hasRemaining()){
 73             System.out.println("send order to server success........");
 74         }
 75         
 76     }
 77     
 78     
 79     private void handleInput(SelectionKey key) throws IOException{
 80         if(key.isValid()){
 81             //判断链接是否成功
 82             SocketChannel sc=(SocketChannel) key.channel();
 83         
 84                 //链接事件就绪
 85                 if(sc.finishConnect()){
 86                     //是否链接完成
 87                     sc.register(selector, SelectionKey.OP_READ);
 88                     doWrite(sc);
 89                 }else{
 90                     //链接失败,进程退出
 91                     System.exit(1);
 92                 }
 93                 
 94                 if(key.isReadable()){
 95                     //读事件就绪
 96                     ByteBuffer readBuffer=ByteBuffer.allocate(1024);
 97                     int readBytes=sc.read(readBuffer);
 98                     if(readBytes>0){
 99                         readBuffer.flip();
100                         byte[] bytes=new byte[readBuffer.remaining()];
101                         readBuffer.get(bytes);
102                         String body=new String(bytes,"UTF-8");
103                         System.out.println("TimerServer response:"+body);
104                         this.stop=true;
105                     }else if(readBytes<0){
106                         //对端链路关闭
107                         key.cancel();
108                         sc.close();
109                     }else{
110                         //读到0字节,忽略
111                     }
112                 }
113             
114         }
115     }
116     
117     @Override
118     public void run() {
119         try {
120             //链接并发送请求
121             doConnect();
122         } catch (Exception e) {
123             // TODO: handle exception
124             e.printStackTrace();
125         }
126         
127         while(!stop){
128             try {
129                 //等待响应
130                 selector.select();
131                 //获取已经就绪的通道事件集合,在这个多路复用器上
132                 Set<SelectionKey> selectedKeys=selector.selectedKeys();
133                 //循环迭代处理事件集合
134                 Iterator<SelectionKey> it=selectedKeys.iterator();
135                 SelectionKey key=null;
136                 while (it.hasNext()) {
137                     key=it.next();
138                     it.remove();
139                     try {
140                         handleInput(key);
141                     } catch (Exception e) {
142                         e.printStackTrace();
143                     }
144                     
145                 }
146             } catch (Exception e) {
147                 e.printStackTrace();
148             }
149         }
150         
151         //多路复用器关闭后,所有注册在上面的channel和Pipe等资源都会被自动去注册并关闭
152         //所以不需要重复释放资源
153 //        if(selector!=null){
154 //            try {
155 //                selector.close();
156 //            } catch (Exception e) {
157 //                e.printStackTrace();
158 //            }
159 //        }
160         
161     }
162     
163 
164 }
View Code

 

相关文章
|
2天前
|
网络协议 Dubbo Java
【网络编程】理解客户端和服务器并使用Java提供的api实现回显服务器
【网络编程】理解客户端和服务器并使用Java提供的api实现回显服务器
8 0
|
3天前
|
关系型数据库 MySQL Java
通过使用阿里云服务器,搭建Java程序的运行环境
通过使用阿里云服务器,搭建Java程序的运行环境
|
3天前
|
网络协议 Ubuntu Java
如何使用MCSM搭建我的世界Java版服务器并实现远程联机游戏
如何使用MCSM搭建我的世界Java版服务器并实现远程联机游戏
15 0
|
9天前
|
Java
如何解决使用若依前后端分离打包部署到服务器上后主包无法找到从包中的文件的问题?如何在 Java 代码中访问 jar 包中的资源文件?
如何解决使用若依前后端分离打包部署到服务器上后主包无法找到从包中的文件的问题?如何在 Java 代码中访问 jar 包中的资源文件?
46 0
|
12天前
|
弹性计算 运维 Java
Serverless 应用引擎产品使用之在Serverless 应用引擎中,将 Java 应用从 ECS 迁移到 SAE如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
40 2
|
17天前
|
缓存 Java API
Java NIO和IO之间的区别
NIO(New IO),这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多。在Java API中提供了两套NIO,一套是针对标准输入输出NIO,另一套就是网络编程NIO。
16 1
|
23小时前
|
安全 Java
【JAVA进阶篇教学】第十篇:Java中线程安全、锁讲解
【JAVA进阶篇教学】第十篇:Java中线程安全、锁讲解
|
23小时前
|
安全 Java
【JAVA进阶篇教学】第六篇:Java线程中状态
【JAVA进阶篇教学】第六篇:Java线程中状态
|
23小时前
|
缓存 Java
【JAVA进阶篇教学】第五篇:Java多线程编程
【JAVA进阶篇教学】第五篇:Java多线程编程
|
1天前
|
Java
【JAVA基础篇教学】第十二篇:Java中多线程编程
【JAVA基础篇教学】第十二篇:Java中多线程编程