一、Java Nio
1.传统bio通信案例
服务器端
//创建线程池 ExecutorService executorService= Executors.newCachedThreadPool(); try { //服务器 ServerSocket serverSocket=new ServerSocket(10086); while(true){ //连接到服务器的客户端--堵塞方法 Socket client = serverSocket.accept(); new Thread(()->{ System.out.print("线程id:"+Thread.currentThread().getId()); System.out.println("线程名称:"+Thread.currentThread().getName()); try { int len=0; byte[] byteArr=new byte[1024]; //读取来自客户端的数据 InputStream inputStream = client.getInputStream(); while((len=inputStream.read(byteArr))!=-1){ String msg=new String(byteArr); System.out.println("来自客户端的消息:"+msg); } } catch (IOException e) { e.printStackTrace(); } }).start(); } } catch (IOException e) { e.printStackTrace(); }
客户端
try { //创建客户端 Socket client=new Socket("127.0.0.1", 10086); String msg="hi,dude"; OutputStream outputStream = client.getOutputStream(); outputStream.write(msg.getBytes(), 0, msg.length()); outputStream.close(); System.in.read();//目的是让客户端保持与服务器的连接 } catch (IOException e) { e.printStackTrace(); }
2.Java NIO(全称java non-blockingIO): 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
缓冲区 Buffer
缓冲区(Buffffer) :缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存,这块内存被包装成NIO Buffer对象,可以理解成是一个容器,是一个特殊的数组,该对象提供了一组方法,用来方便的访问该块内存。
Channel提供从文件或网络读取数据的渠道,但是读取或者写入的数据都是经过Buffer。
Buffer中的属性
capacity:容量;即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变。
position:位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变该值,为下
次读写作准备。
limit:表示缓冲区的当前终点 ,不能对缓冲区的超过极限的位置进行读写操作。写模式下,
limit等于Buffer的capacity。当切换Buffer到读模式时, limit表示你最多能读到多少
数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。简
而言之,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写
模式下就是position)
Buffer中的方法
Buffer的基本用法
1、创建缓冲区,写入数据到Buffer
2、调flip()方法将缓冲区改成读取模式
3、从Buffer中读取数据
4、调用clear()方法或者compact()方法
虽然java中的基本数据类型都有对应的Buffffer类型与之对应(Boolean除外),但是使用频率最高的是
ByteBuffffer类。所以先介绍一下ByteBuffffer中的常用方法。
ByteBuffer中常用方法
allocate(int):创建间接缓冲区:在堆中开辟,易于管理,垃圾回收器可以回收,空间有限,读写文件 速度较慢。
allocateDirect(int):创建直接缓冲区:不在堆中,物理内存中开辟空间,空间比较大,读写文件速度 快,缺点:不受垃圾回收器控制,创建和销毁耗性能。
案例一
clear(): position将被置为0,limit被设置成capacity的值。可以理解为Buffer被清空了,但是Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据。如果Buffer中 有一些未读的数据,调用clear()方法,未读数据将“被遗忘”,意味着不再有任何标记会告诉你哪些数据被读过,哪些还没有.
compact(): 将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。
// 1、创建缓冲区,写入数据到Buffer ByteBuffer buffer=ByteBuffer.allocate(1024);//创建指定容量的间接缓冲区 // ByteBuffer buffer1=ByteBuffer.allocateDirect(1024);//创建指定容量的直接缓冲区 //写入数据的方式1 //buffer.put("hi,dude".getBytes()); //写入数据的方式2 buffer.put((byte) 'h'); // 2、调flip()方法将缓冲区改成读取模式 buffer.flip(); // 3、从Buffer中读取数据的方式1:单个自己的读取 /*while(buffer.hasRemaining()) { byte b = buffer.get(); System.out.println((char) b); }*/ //读取数据的方式2: byte[] data=new byte[buffer.limit()]; buffer.get(data); System.out.println(new String(data)); // 4、调用clear()方法或者compact()方法 buffer.clear(); buffer.compact();
案例二
ByteBuffffer 支持类型化的put和get, put放入的是什么数据类型,get取出来依然是什么类型,
否则可能出现BufffferUnderflflowException 异常。
//1、创建buffer ByteBuffer buffer=ByteBuffer.allocate(1024); //2、写入数据:按照类型化方式 buffer.putChar('K'); buffer.putLong(1024L); buffer.putInt(512); buffer.putShort((short) 0); //3、读写切换 buffer.flip(); //4、读取数据 System.out.println(buffer.getChar()); System.out.println(buffer.getLong()); System.out.println(buffer.getInt()); System.out.println(buffer.getShort());
//创建一个Buffer ByteBuffer buffer=ByteBuffer.allocate(64); //循环放入数据 for (int i=0;i<buffer.capacity();i++){ buffer.put((byte) i); } //读写切换 buffer.flip(); //得到一个只读的buffer ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
通道 Channel
通道(Channel) :类似于BIO中的stream,例如FileInputStream对象,用来建立到目标(文件,网络套接
字,硬件设备等)的一个连接,但是也有区别:
1.既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
2.通道可以异步地读写。
3.通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
Channel的实现
常用的Channel类有: FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel.
FileChannel 从文件中读写数据。
DatagramChannel 能通过UDP读写网络中的数据。
SocketChannel 能通过TCP读写网络中的数据。
ServerSocketChannel可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创
建一个SocketChannel。
FileChannel主要用来对本地文件进行读写操作,但是FileChannel是一个抽象类,所以我们实际用的更
多的是其子类FileChannelImpl,
FileChannel
写入数据到文件
String msg="hi,dude"; String fileName="channel01.txt"; //创建一个输出流 FileOutputStream fileOutputStream=new FileOutputStream(fileName); //获取同一个通道--channel的实际类型是FileChannelImpl FileChannel channel=fileOutputStream.getChannel(); //创建一个缓冲区 ByteBuffer buffer=ByteBuffer.allocate(1024); //将信息写入缓冲区中 buffer.put(msg.getBytes()); //对缓冲区读写切换 buffer.flip(); //将缓冲区中的数据写到到通道中 int num=channel.write(buffer); System.out.println("写入完毕!"+num); fileOutputStream.close();
读取文件中的数据
File file=new File("channel01.txt"); //创建输入流 FileInputStream fileInputStream=new FileInputStream(file); //获取通道 FileChannel channel = fileInputStream.getChannel(); //创建缓冲区 ByteBuffer buffer=ByteBuffer.allocate((int) file.length()); //将通道中的数据读取buffer中 channel.read(buffer); //将buffer中的字节数组转换为字符串输出 System.out.println(new String(buffer.array())); fileInputStream.close();
文件的复制
//准备好要复制的源文件和目标文件 File file=new File("cat.jpg"); File fileCopy=new File("catCopy.jpg"); //创建输入和输出流 FileInputStream fileInputStream=new FileInputStream(file); FileOutputStream fileOutputStream=new FileOutputStream(fileCopy); //获取两个通道 FileChannel inChannel = fileInputStream.getChannel(); FileChannel outChannel = fileOutputStream.getChannel(); //创建缓冲区 ByteBuffer buffer=ByteBuffer.allocate(1024); int len=0; /* //使用transferFrom复制--两个方式适合大文件的复制 outChannel.transferFrom(inChannel, 0,inChannel.size()); //使用transferTo复制---注意方向 //inChannel.transferTo(0, inChannel.size(),outChannel); */ while(true){ //将标志位重置 buffer.clear(); len=inChannel.read(buffer); System.out.println("len="+len); if(len==-1){ break; }//读写切换 buffer.flip(); //将buffer中的数据写入到了通道中 outChannel.write(buffer); } inChannel.close(); outChannel.close(); fileInputStream.close(); fileOutputStream.close();
选择器Selector
Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
Selector类的select的三种不同形式:
1、无参的select():
Selector类的select()方法会无限阻塞等待,直到有信道准备好了IO操作,或另一个线程唤醒了它(调用了该选择器的wakeup())返回SelectionKey
2、带有超时参数的select(long time):
当需要限制线程等待通道就绪的时间时使用, 如果在指定的超时时间(以毫秒计算)内没有通道就绪时,它将返回0。
将超时参数设为0表示将无限期等待,那么它就等价于select( )方法了。
3、selectNow()是完全非阻塞的:
该方法执行就绪检查过程,但不阻塞。如果当前没有通道就绪,它将立即返回0
SelectionKey:
一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关
系。这种注册的关系共有四种:
SelectableChannel
configureBlocking()方法:设置阻塞或非阻塞模式
SelectableChannel抽象类的confifigureBlocking()
方法是由 AbstractSelectableChannel抽象类
实现的,SocketChannel、ServerSocketChannel、DatagramChannel都是直接继承了
AbstractSelectableChannel抽象类 。
register()方法 注册一个选择器并设置监听事件
register() 方法的第二个参数是一个interset集合 ,指通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件.
通道触发了一个事件意思是该事件已经就绪。比如某个Channel成功连接到另一个服务器称为“ 连接就绪 ”。一个ServerSockeChannel准备好接收新进入的连接称为“ 接收就绪 ”。一个有数据可读的通道可以说是“ 读就绪 ”。等待写数据的通道可以说是“ 写就绪 ”。
如果你对不止一种事件感兴趣,使用或运算符即可,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
ServerSocketChannel
ServerSocketChannel用来在服务器端监听新的客户端Socket连接.
常用的方法除了两个从SelectableChannel类中继承而来的两个方法confifigureBlocking()和register()方法外,还有以下要记住的方法:
SocketChannel
SocketChannel,网络IO通道,具体负责读写操作。NIO总是把缓冲区的数据写入通道,或者把通道里的
数据读出到缓冲区(buffer) 。常用方法如下所示:常用的方法除了两个从SelectableChannel类中继承而来的两个方法confifigureBlocking()和register()
方法外,还有以下要记住的方法:
NIO服务器端
//创建一个服务器 ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); //绑定服务器端口 serverSocketChannel.bind(new InetSocketAddress(10086)); //将通道设置为非堵塞 serverSocketChannel.configureBlocking(false); //创建Selector Selector selector=Selector.open(); //将通道注册到selector中,并监听的请求连接事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //循环等待客户端的连接 while(true){ if(selector.select(3000)==0){ System.out.println("Server: 等的花儿都谢了.....我也先忙一会儿"); continue; } //获取SelectionKey的集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); if(key.isAcceptable()){ //如果是连接事件 //获取连接到服务器的客户端 SocketChannel socketChannel=serverSocketChannel.accept(); //设置为非堵塞 socketChannel.configureBlocking(false); System.out.println("有客户端连接!连接的socketchannel:"+socketChannel.hashCode()); //注册 socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(10)); } if(key.isReadable()){//如果是OP_READ事件 SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer= (ByteBuffer) key.attachment(); socketChannel.read(buffer); for (var i:buffer.array()){ System.out.println((char) i); } System.out.println("来自客户端的消息是:"+new String(buffer.array())); iterator.remove(); } } }
NIO客户端
//创建一个客户端 SocketChannel socketChannel=SocketChannel.open(); //通道都要设置为非堵塞 socketChannel.configureBlocking(false); //准备要连接的服务器名称和端口号 InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1", 10086); //连接服务器 if(!socketChannel.connect(inetSocketAddress)){ while(!socketChannel.finishConnect()){ System.out.println("Client:努力连接中.....可以先做点其他事情......."); } } //连接成功之后发消息给服务器 String msg="hi,dude"; ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buffer); System.in.read();
服务器端
private ServerSocketChannel serverSocketChannel; private Selector selector; private static final int PORT=10086; public ChatServer() { //完成属性的初始化 try { serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(PORT)); serverSocketChannel.configureBlocking(false); selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } //监听事件 public void listening(){ try { while(true){ int num=selector.select(3000); if(num>0){ Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); if(key.isAcceptable()){//处理客户端的连接请求 //连接到服务器的客户端 SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false);//设置为非堵塞 //将连接成功的客户端注册到了selector socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress()+"连接成功!进入聊天室!"); } if(key.isReadable()){//处理客户端的通信请求 //处理数据的读取和转发给除了自己之外的客户端 handleReadData(key); } //避免一致处理同一个通道的事件 iterator.remove(); } }else{ System.out.println("server: waiting......."); } } } catch (IOException e) { e.printStackTrace(); } } //处理数据的读取 private void handleReadData(SelectionKey key){ SocketChannel socketChannel=null; try { //服务器接收来自客户端的消息 socketChannel= (SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(1024); int len=socketChannel.read(buffer); if(len>0){ String msg=new String(buffer.array()); System.out.println("来自客户端的消息是:"+msg); //转发消息给其他的客户端 transferMessage(msg,socketChannel); } } catch (IOException e) { try { System.out.println(socketChannel.getRemoteAddress()+"离开了聊天室!"); key.cancel();//取消注册 socketChannel.close();//关闭通道 } catch (IOException ioException) { ioException.printStackTrace(); } // e.printStackTrace(); } } //服务器端转发消息 public void transferMessage(String msg,SocketChannel socketChannel) throws IOException { System.out.println("server转发消息ing......."); for (SelectionKey selectionKey : selector.keys()) { SelectableChannel channel = selectionKey.channel(); if(channel instanceof SocketChannel && channel!=socketChannel){ SocketChannel client= (SocketChannel) channel; ByteBuffer buffer1=ByteBuffer.wrap(msg.getBytes()); client.write(buffer1); } } } public static void main(String[] args) { ChatServer chatServer=new ChatServer(); chatServer.listening(); }
客户端
private final String HOSTNAME="127.0.0.1"; private final int PORT=10086; private SocketChannel socketChannel; private Selector selector; private String userName; public ChatClient() { try { //初始化 socketChannel=SocketChannel.open(new InetSocketAddress(HOSTNAME, PORT)); socketChannel.configureBlocking(false); selector=Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ); userName=socketChannel.getLocalAddress().toString(); System.out.println(userName+" is ready!"); } catch (IOException e) { e.printStackTrace(); } } public void sendInfo(String info){ try { info=userName+":"+info; socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (IOException e) { e.printStackTrace(); } public void readInfo(){ try { int readyChannel = selector.select(); if(readyChannel>0){ Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); if(key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.read(buffer); String msg = new String(buffer.array()); System.out.println(msg); } iterator.remove(); } }else{ System.out.println("没有准备就绪的通道!"); } } catch (IOException e) { e.printStackTrace(); } public static void main(String[] args) { ChatClient chatClient=new ChatClient(); new Thread(new Runnable() { @Override public void run() { while(true){ chatClient.readInfo(); } } }).start(); Scanner input=new Scanner(System.in); while(input.hasNextLine()){ String msg=input.nextLine(); chatClient.sendInfo(msg); } }
二、netty
Reactor 模式
即 I/O 多了复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。
Reactor 模式中有 2 个关键组成:
Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对
IO 事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系
人;
Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际
负责人。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
根据 Reactor 的数量和Handler的数量不同,有 3 种典型的实现:
1)单 Reactor 单线程;
2)多线程Reactor;
3)主从 Reactor 多线程。
可以这样理解,Reactor 就是一个执行 while (true) { selector.select(); …} 循环的线程,会源源不
断的产生新的事件,称作反应堆很贴切。
单线程Reactor
1)Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发;
2)如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler
对象处理连接完成后的后续业务处理;
3)如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应;
4)Handler 会完成 Read→业务处理→Send 的完整业务流程。
多线程Reactor
1)Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发;
2)如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler
对象处理连接完成后续的各种事件;
3)如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应;
4)Handler 只负责响应事件,不做具体业务处理,通过 Read 读取数据后,会分发给后面的
Worker 线程池进行业务处理;
5)Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处
理;
6)Handler 收到响应结果后通过 Send 将响应结果返回给 Client。
主从多线程Reactor
1)Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor
接收,处理建立连接事件;
2)Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进
行处理;
3)SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件;
4)当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应;
5)Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;
6)Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处
理;
7)Handler 收到响应结果后通过 Send 将响应结果返回给 Client。
Netty线程模型
1.BossGroup负责接收客户端的连接
WorkerGroup负责网络的读写,他
们的类型都是NioEventLoopGroup;
2.NioEventLoopGroup 相当于一个事件循环组,主要管理 eventLoop 的生命周期,可以理解为一个
线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一
个 Channel 只对应于一个线程;
3.NioEventLoop 表示一个不断循环的执行处理任务的线程,每个NioEventLoop 都有一个selector
, 用于监听绑定在其上的socket的网络通讯;NioEventLoop 中维护了一个线程和任务队列,支持
异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,
由 processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks方法触发。
4.每个Boss NioEventLoop 循环执行的步骤
1、轮询监听IO 事件(accept),
2、处理监听到的连接就绪IO事件(accept) ,与client建立连接 , 生成NioScocketChannel ,
并将其注册到某个worker NIOEventLoop 上的 selector,
3、执行任务队列(taskQueue/delayTaskQueue)中的非IO任务
5. 每个 Worker NIOEventLoop 循环执行的步骤 :
1、轮询监听IO事件(read, write ),
2、处理监听到的IO事件,在对应NioScocketChannel 处理,
3、执行任务队列(taskQueue/delayTaskQueue)中的非IO任务
6. 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即
通过pipeline 可以获取到对应通道, 管道中维护了很多的处理器。
public class NettyServer { public static void main(String[] args) throws InterruptedException { //1、创建两个线程组 bossGroup workGroup //bossGroup线程组负责客户端连接 EventLoopGroup bossGroup=new NioEventLoopGroup(); //workGroup线程组负责网络读写操作 EventLoopGroup workGroup=new NioEventLoopGroup(); //2、创建服务器启动助手来配置参数--创建辅助的工具类,用于服务器通道的一些列配置 ServerBootstrap serverBootstrap=new ServerBootstrap(); //链式编程 serverBootstrap.group(bossGroup,workGroup)//绑定两个线程组 .channel(NioServerSocketChannel.class)//指定NIO模式 .option(ChannelOption.SO_BACKLOG,512)//设置TCP缓冲区 .childOption(ChannelOption.SO_KEEPALIVE,true)//保持连接 .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception {//数据接收方法的处理 ch.pipeline().addLast(new NettyServerHandler());//具体业务的处理 } }); System.out.println("Server : 准备就绪!!!"); //3 绑定端口,设置非堵塞 ChannelFuture cf = serverBootstrap.bind(8765).sync(); ChannelFuture cf2 = serverBootstrap.bind(8766).sync();//--绑定多个端口,开口变大,但是处理能力不变 System.out.println("Server : 启动!!!"); //4 关闭通道 cf.channel().closeFuture().sync();//等待关闭 cf2.channel().closeFuture().sync();//等待关闭 System.out.println("Server--关闭通道!!!"); //关闭线程组 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); System.out.println("Server--关闭线程组!!!"); } }
public class NettyServerHandler extends ChannelInboundHandlerAdapter { //数据的读取事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("serverHandler--ctx:"+ctx); ByteBuf buffer= (ByteBuf) msg; System.out.println("来自客户端的消息:"+buffer.toString(CharsetUtil.UTF_8)); } //数据读取完毕事件 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("hi,client,我收到你的消息啦!", CharsetUtil.UTF_8)); //添加监听事件:数据发送完毕之后,直接断开客户端的连接 channelFuture.addListener(ChannelFutureListener.CLOSE); } //异常捕捉事件 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class NettyClient { public void run() throws InterruptedException { EventLoopGroup group=new NioEventLoopGroup(); //创建客户端的启动助手 Bootstrap bootstrap=new Bootstrap(); //开始配置 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler());//具体业务处理 } }); System.out.println("Client: 准备就绪!!!"); //启动客户端去连接服务器 ChannelFuture cf = bootstrap.connect("127.0.0.1", 8765).sync(); ChannelFuture cf2 = bootstrap.connect("127.0.0.1", 8766).sync(); cf.channel().closeFuture().sync();//等待关闭 cf2.channel().closeFuture().sync();//等待关闭 System.out.println("Client--关闭通道!!!"); group.shutdownGracefully(); System.out.println("Client--关闭线程组!!!"); } public static void main(String[] args) { try { new NettyClient().run(); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class NettyClientHandler extends ChannelInboundHandlerAdapter { //通道就绪 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client: ctx="+ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hi,server,这是来自客户端的招呼!", CharsetUtil.UTF_8)); } //数据的读取事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf= (ByteBuf) msg; System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8)); } }
三、netty与unity游戏引擎跨语言通信
1.netty服务器端
publicenumRequestType { GetChatRecord,Say; publicstaticRequestTypevalue(intkey) { RequestType[] keyTypes=RequestType.values(); returnkeyTypes[key]; } }
publicclassRequest { privateRequestTypetype; privateStringvalue; //返回枚举常量位置publicintgetType() { returntype.ordinal(); } publicRequestTypeGetRequestType() { returntype; } publicvoidSetRequestType(RequestTypetype) { this.type=type; } publicvoidsetType(inttype) { this.type=RequestType.value(type); } publicStringgetValue() { returnvalue; } publicvoidsetValue(Stringvalue) { this.value=value; } publicRequest(inttype, Stringvalue) { setType(type); setValue(value); } publicRequest(RequestTypetype, Stringvalue) { this.type=type; setValue(value); } publicRequest() { } publicStringtoString() { return"Request [type="+type+", value="+value+"]"; } }
publicclassChatRecord { publicstaticList<String>chatRecord=newArrayList<String>(); publicstaticvoidSay() { System.out.println("list长度:"+chatRecord.size()); for(StringRecord: chatRecord) { System.out.println(Record); } } }
publicclassChatServer { privateintport; publicChatServer(intport) { this.port=port; } publicvoidstart(){ //配置服务端的NIO线程组//两个Reactor一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写EventLoopGroupbossGroup=newNioEventLoopGroup(); EventLoopGroupworkerGroup=newNioEventLoopGroup(); try { //ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端开发的复杂度ServerBootstrapbootstrap=newServerBootstrap(); bootstrap.group(bossGroup, workerGroup) //指定NIO模式 .channel(NioServerSocketChannel.class) //回调请求 .childHandler(newChannelInitializer<SocketChannel>(){ protectedvoidinitChannel(SocketChannelsocketChannel) throwsException { System.out.println("客户端连接:"+socketChannel.remoteAddress()); //ChannelPipeline类似于一个管道,管道中存放的是一系列对读取数据进行业务操作的ChannelHandler。ChannelPipelinepipeline=socketChannel.pipeline(); pipeline.addLast("frame",newDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decode",newStringDecoder());//解码器pipeline.addLast("encode",newStringEncoder()); pipeline.addLast("handler",newChatServerHandler()); } }) //配置NioServerSocketChannel的TCP参数 .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_KEEPALIVE,true); } }) //配置NioServerSocketChannel的TCP参数 .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_KEEPALIVE,true); // 绑定端口,设置非堵塞ChannelFuturefuture=bootstrap.bind(port).sync(); System.out.println("服务器开始监听:"); //关闭通道// 等待关闭future.channel().closeFuture().sync(); } catch (InterruptedExceptione) { e.printStackTrace(); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } System.out.println("服务器结束监听:"); } }
publicclassChatServerHandlerextendsSimpleChannelInboundHandler<String> { publicstaticChannelGroupchannels=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE); /*** 当有客户端连接时,handlerAdded会执行,就把该客户端的通道记录下来,加入队列* @param ctx* @throws Exception*/publicvoidhandlerAdded(ChannelHandlerContextctx) throwsException { ChannelinComing=ctx.channel();//获得客户端通道//通知其他客户端有新人进入for (Channelchannel : channels){ if (channel!=inComing) channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,"[欢迎: "+inComing.remoteAddress() +"] 进入聊天室!\n"))); channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,"[欢迎: "+inComing.remoteAddress() +"] 进入聊天室!\n"))); } ChatRecord.chatRecord.add("[欢迎: "+inComing.remoteAddress() +"] 进入聊天室!\n"); channels.add(inComing);//加入队列 } protectedvoidmessageReceived(ChannelHandlerContextctx, Stringmsg) throwsException { ChannelinComing=ctx.channel(); Requestrequest=JSON.parseObject(msg, Request.class); System.out.println("[用户"+inComing.remoteAddress() +request.GetRequestType()+"]"+request.getValue()); switch (request.GetRequestType()) { //请求获取聊天记录caseGetChatRecord: System.out.println("开始传输聊天记录"); ChatRecord.Say(); Stringvalue=""; booleanget=true; for(StringRecord: ChatRecord.chatRecord) { if(get) { value+=Record+"\n"; get=false; }else { get=true; } } inComing.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,value+"..更早记录请翻看聊天档案..\n"))); break; //说话caseSay: default: for (Channelchannel : channels){ if (channel!=inComing){ channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,inComing.remoteAddress() +":"+request.getValue()))); }else { channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,"我:"+request.getValue()))); ChatRecord.chatRecord.add(inComing.remoteAddress() +":"+request.getValue()); } } break; } }protectedvoidmessageReceived(ChannelHandlerContextctx, Stringmsg) throwsException { ChannelinComing=ctx.channel(); Requestrequest=JSON.parseObject(msg, Request.class); System.out.println("[用户"+inComing.remoteAddress() +request.GetRequestType()+"]"+request.getValue()); switch (request.GetRequestType()) { //请求获取聊天记录caseGetChatRecord: System.out.println("开始传输聊天记录"); ChatRecord.Say(); Stringvalue=""; booleanget=true; for(StringRecord: ChatRecord.chatRecord) { value+=Record+"\n"; } inComing.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,value+"..更早记录请翻看聊天档案..\n"))); inComing.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,value+"..更早记录请翻看聊天档案..\n"))); break; //说话caseSay: default: for (Channelchannel : channels){ if (channel!=inComing){ channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,inComing.remoteAddress() +":"+request.getValue()))); channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,inComing.remoteAddress() +":"+request.getValue()))); }else { channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,"我:"+request.getValue()))); channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,"我:"+request.getValue()))); ChatRecord.chatRecord.add(inComing.remoteAddress() +":"+request.getValue()); } } break; } } /*** 断开连接* @param ctx* @throws Exception*/publicvoidhandlerRemoved(ChannelHandlerContextctx) throwsException { ChanneloutComing=ctx.channel();//获得客户端通道//通知其他客户端有人离开for (Channelchannel : channels){ if (channel!=outComing) channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,"[再见: ]"+outComing.remoteAddress() +" 离开聊天室!\n"))); channel.writeAndFlush(JSON.toJSONString(newRequest(RequestType.Say,"[再见: ]"+outComing.remoteAddress() +" 离开聊天室!\n"))); } ChatRecord.chatRecord.add(outComing.remoteAddress() +" 离开聊天室!\n"); channels.remove(outComing); } /*** 当服务器监听到客户端活动时* @param ctx* @throws Exception*/publicvoidchannelActive(ChannelHandlerContextctx) throwsException { ChannelinComing=ctx.channel(); System.out.println("["+inComing.remoteAddress() +"]: 在线"); } /*** 离线* @param ctx* @throws Exception*/publicvoidchannelInactive(ChannelHandlerContextctx) throwsException { ChannelinComing=ctx.channel(); System.out.println("["+inComing.remoteAddress() +"]: 离线"); } publicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) throwsException { ChannelinComing=ctx.channel(); System.out.println(inComing.remoteAddress() +"通讯异常!"); ctx.close(); } }
2.unity客户端
publicclassNettyClient{ publicstringIP="127.0.0.1"; publicintPort=7397; publicboolisConnected; publicList<Request>myLink=newList<Request>(); //信息接收进程privateThread_ReceiveThread=null; //网络检测进程privateThread_connectionDetectorThread=null; privateSocketclientSocket=null; privatestaticbyte[] result=newbyte[1024]; //单例模式privatestaticNettyClientinstance; publicstaticNettyClientGetInstance(stringip, intport) { if (instance==null) { instance=newNettyClient(ip, port); } returninstance; } publicvoidSend() { stringkey=JsonConvert.SerializeObject(newRequest(RequestType.GetChatRecord, "")); Debug.Log(key); for (inti=0; i<2; i++) { //UTF8编码clientSocket.Send(System.Text.Encoding.UTF8.GetBytes(key+"\n")); } } //自定义服务器IP地址构造函数publicNettyClient(stringip, intport) { IP=ip; Port=port; startConnect(); //初始化网络检测线程_connectionDetectorThread=newThread(newThreadStart(connectionDetector)); //开启网络检测线程[用于检测是否正在连接,否则重新连接]_connectionDetectorThread.Start(); } privatevoidstartConnect() { //创建Socket对象, 这里我的连接类型是TCPclientSocket=newSocket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); //服务器IP地址IPAddressipAddress=IPAddress.Parse(IP); //服务器端口IPEndPointipEndpoint=newIPEndPoint(ipAddress, Port); //这是一个异步的建立连接,当连接建立成功时调用connectCallback方法IAsyncResultresult=clientSocket.BeginConnect(ipEndpoint, newAsyncCallback(connectCallback), clientSocket); //这里做一个超时的监测,当连接超过5秒还没成功表示超时boolsuccess=result.AsyncWaitHandle.WaitOne(5000, true); if (!success) { //超时clientSocket.Close(); Debug.Log("connect Time Out"); if (_ReceiveThread!=null) _ReceiveThread.Abort(); // Closed(); } else { //如果连接成功则开启接受进程,发送信息if (clientSocket.Connected) { this.isConnected=true; //初始化线程_ReceiveThread=newThread(newThreadStart(Receive)); //开启线程[用于接收数据]_ReceiveThread.Start(); Send(); } } } /// 发送数据/// </summary>publicvoidSendText(stringstr) { stringkey=JsonConvert.SerializeObject(newRequest(RequestType.Say, str)); Debug.Log(key); clientSocket.Send(System.Text.Encoding.UTF8.GetBytes(key+"\n")); } //连接-回调privatevoidconnectCallback(IAsyncResultasyncConnect) { Console.Write("连接完成"); } //关闭SocketpublicvoidClosed() { try { if (clientSocket!=null&&clientSocket.Connected) { clientSocket.Shutdown(SocketShutdown.Both); clientSocket.Close(); } clientSocket=null; //关闭线程_ReceiveThread.Abort(); _connectionDetectorThread.Abort(); Debug.Log("已关闭Socket"); } catch (Exceptione) { throwe; } } /// <summary>/// 接收数据线程/// </summary>publicvoidReceive() { intreceiveLength=0; try { while (true) { if (!clientSocket.Connected) { //与服务器断开连接跳出循环Debug.Log("Failed to clientSocket server."); clientSocket.Close(); break; } try { //Receive方法中会一直等待服务端回发消息//如果没有回发会一直在这里等着。inti=clientSocket.Receive(result); if (i<=0) { clientSocket.Close(); _ReceiveThread.Abort(); Debug.Log("断开连接"); break; } if ((receiveLength=clientSocket.Receive(result)) >0) { //UTF8解码myLink.Add(JsonConvert.DeserializeObject<Request>(Encoding.UTF8.GetString(result, 0, receiveLength))); Debug.Log(Encoding.UTF8.GetString(result, 0, receiveLength)); } } catch (Exceptionex) { Debug.Log("Failed to clientSocket error."+ex); clientSocket.Close(); } } } catch (Exception) { throw; } } /// <summary>/// 重新连接线程/// </summary>publicvoidconnectionDetector() { try { intconnectTime=0; while (true) { try { if (clientSocket.Connected) { Debug.Log("网络检测中,连接状态为:"+clientSocket.Connected); connectTime=0; } elseif (!clientSocket.Connected) { Debug.Log("网络检测中,连接状态为:False"); this.isConnected=false; //尝试重连Debug.Log("正在尝试第"+connectTime.ToString() +"次重连"); //连接startConnect(); //每5秒执行一次重连Thread.Sleep(5000); connectTime+=1; } } catch (Exception ) { } } } catch (Exception) { throw; } } }
[Serializable] publicclassRequest{ publicRequestTypetype; publicstringvalue; publicRequest(RequestType_type, string_value) { type=_type; value=_value; } publicoverridestringToString() { return"Request [type:"+type+",value:"+value+"]"; } } publicenumRequestType { GetChatRecord, Say}
publicclassNettyComponent : MonoBehaviour{ publicstringIP="127.0.0.1"; publicintPort=8888; publicTexttext; NettyClientclient; // Start is called before the first frame updatevoidStart() { //获得nettyClient实例client=NettyClient.GetInstance(IP, Port); } voidUpdate() { if (Input.GetKeyDown(KeyCode.Escape)) client.Closed(); if (client.myLink.Count() >0) { stringvalue=""; for (int_i=0; _i<client.myLink.Count(); _i++) { value+=client.myLink[_i] +"\n"; } text.text=value; } } privatevoidOnDestroy() { client.Closed(); } privatevoidOnDisable() { client.Closed(); } publicvoidSend() { stringxinxi=GameObject.Find("InputField").GetComponent<InputField>().text; GameObject.Find("InputField").GetComponent<InputField>().text=""; client.SendText(xinxi); } }