👨🏻🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟
🌈擅长领域:Java、大数据、运维、电子
🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏,相应的有空了我也会回访,互助!!!
🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!
网络编程概念
- 两个程序通过一个双向的通信连接实现数据的交换,连接的一端称为一个socket
- Socket是一个语言无标准,可以实现网络编程语言都有Socket
- 通过·IP+Port通信
- BIO、NIO、AIO适用场景
- BIO:连接数少且固定的框架
- NIO: 连接数多且连接时间短
- AIO(NIO.2): 连接数多且连接时间长
Java IO流程图
BIO NIO Netty详解 | ProcessOn免费在线作图,在线流程图,在线思维导图 |
Socket连接步骤
- 服务器监听
- 客户端请求
- 连接确定
- Tips:连接的时候三次握手,断开连接四次挥手
同步和异步(OS底层操作)
- 同步:使用同步IO时,Java自己处理IO读写
- 异步:使用异步Io时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小给OS(用户数据),OS需要支持异步IO操作API
阻塞和非阻塞(程序阻塞代码块)
- 阻塞:使用阻塞IO时,Java调用会一直阻塞到读写完成才返回。
- 非阻塞:使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器通知可读写时再继续进行读写,不循环直到读写完成。
BIO编程
- Blocking IO:同步阻塞的编程方式
- BIO编程方式通常是在JDK1.4版本之前常用的编程方式。
- 编程实现过程:首先服务端启动一个ServerSocket来监听网络请求,客户端启动Socket发起网络请求,默认情况下ServerSocket会建立一个线程来处理这个请求,如果服务端没有线程可用,客户端会阻塞等待或遭到拒绝。(可以加入线程池)
- 改善
- 实例代码:
- Server:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * BIO服务端源码 * @author shenj * @version 1.0 */ public final class ServerNormal { //默认的端口号 private static int DEFAULT_PORT = 12345; //单例的ServerSocket private static ServerSocket server; //根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值 public static void start() throws IOException{ //使用默认值 start(DEFAULT_PORT); } //这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了 public synchronized static void start(int port) throws IOException{ if(server != null) return; try{ //通过构造函数创建ServerSocket //如果端口合法且空闲,服务端就监听成功 server = new ServerSocket(port); System.out.println("服务器已启动,端口号:" + port); //通过无线循环监听客户端连接 //如果没有客户端接入,将阻塞在accept操作上。 while(true){ Socket socket = server.accept(); //当有新的客户端接入时,会执行下面的代码 //然后创建一个新的线程处理这条Socket链路 new Thread(new ServerHandler(socket)).start(); } }finally{ //一些必要的清理工作 if(server != null){ System.out.println("服务器已关闭。"); server.close(); server = null; } } } }
- ServerHandler
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import com.anxpp.io.utils.Calculator; /** * 客户端线程 * @author shenj * 用于处理一个客户端的Socket链路 */ public class ServerHandler implements Runnable{ private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try{ in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); String expression; String result; while(true){ //通过BufferedReader读取一行 //如果已经读到输入流尾部,返回null,退出循环 //如果得到非空值,就尝试计算结果并返回 if((expression = in.readLine())==null) break; System.out.println("服务器收到消息:" + expression); try{ result = Calculator.cal(expression).toString(); }catch(Exception e){ result = "计算错误:" + e.getMessage(); } out.println(result); } }catch(Exception e){ e.printStackTrace(); }finally{ //一些必要的清理工作 if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
- client
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * 阻塞式I/O创建的客户端 * @author shenj * @version 1.0 */ public class Client { //默认的端口号 private static int DEFAULT_SERVER_PORT = 12345; private static String DEFAULT_SERVER_IP = "127.0.0.1"; public static void send(String expression){ send(DEFAULT_SERVER_PORT,expression); } public static void send(int port,String expression){ System.out.println("算术表达式为:" + expression); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try{ socket = new Socket(DEFAULT_SERVER_IP,port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(expression); System.out.println("___结果为:" + in.readLine()); }catch(Exception e){ e.printStackTrace(); }finally{ //一下必要的清理工作 if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
- 测试代码,放在一个程序中
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.util.Random; /** * 测试方法 * @author shenj * @version 1.0 */ public class Test { //测试主方法 public static void main(String[] args) throws InterruptedException { //运行服务器 new Thread(new Runnable() { @Override public void run() { try { ServerBetter.start(); } catch (IOException e) { e.printStackTrace(); } } }).start(); //避免客户端先于服务器启动前执行代码 Thread.sleep(100); //运行客户端 char operators[] = {'+','-','*','/'}; Random random = new Random(System.currentTimeMillis()); new Thread(new Runnable() { @SuppressWarnings("static-access") @Override public void run() { while(true){ //随机产生算术表达式 String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1); Client.send(expression); try { Thread.currentThread().sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
NIO编程
- Ubblocking(New IO):同步阻塞的编程方式
- 主要解决BIO的大并发问题,NIO基于Reactor,面向Buffer(缓存区)编程
- 不能完全解决BIO上的问题,当并发上来的话,还是会有BIO一样的问题
- 同步非阻塞,服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册到多路复用器器上,多路复用器轮询到有I/O操作请求时才启动一个线程进行处理
- NIO方式使用于连接数目多且比较短(轻操作)的架构,比如:聊天服务器,并发局限于应用中,编程复杂,JDK1.8开始支持
- NIO核心三大组件:Selector(多路复用器)、Channel(通道)、Buffer(缓冲区)
- NIO核心三大组件之间的关系
1)每个 channel都会对应一个 Buffer 2)Selector对应一个线程,一个线程对应多个 channel(连接) 3)该图反应了有三个 channel?注册到该 selector/程序 4)程序切换到哪个 channel是有事件决定的, Event就是一个重要的概念 5)Selector会根据不同的事件,在各个通道上切换 6)Buffer就是一个内存块,底层是有一个数组 7)数据的读取写入是通过 Buffer,这个和BIO,BIO中要么是输入流,或者是 输出流,不能双向,但是NIO的 Buffer是可以读也可以写,需要ip方法切换 8)channel是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系 通道就是双向的
- Buffer有7个子类(
没有BooleanBuffer
):ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、ShortBuffer
- 最常用的是ByteBuffer
- Buffer重要的四个属性:
- mark:标志位
- position:下标指针
- limit:当前缓冲区的终点
- capacity:缓冲区容量
- flip通过改变这四个属性的值达到反转Buffer状态的功能
- Buffer常用方法:
- ByteBuffer常用方法:
- 缓冲区代码实例:
package icu.lookyousmileface.nio.basic; import java.nio.IntBuffer; /** * @author shenj * @title: NioBuffer * @projectName NettyPro * @date 2020/11/30 14:41 */ public class NioBuffer { public static void main(String[] args) { //IntBuffer.allocate(5);表示Buffer的空间为5,并且Buffer缓冲区类型为Int IntBuffer intBuffer = IntBuffer.allocate(5); //intBuffer.capacity()表示获得Buffer的大小 for(int i = 0;i< intBuffer.capacity();i++){ intBuffer.put(i*2); } //buffer进行过写操作之后需要读操作的时候需要flip进行状态反转 intBuffer.flip(); //hasRemaining()返回剩余的可用长度 while (intBuffer.hasRemaining()){ System.out.println(intBuffer.get()); } } }
- Channel(通道):
- FileChannel:
- TransferTo拷贝的比较快底层实现是零拷贝
重点:掌握三种io的特点与改进措施,及其架构 难点:暂无
- ByteBuffer+FileChannel实现文字输入到文件中:
/** * @author shenj * @title: NioBufferChannelFileWrite * @projectName NettyPro * @date 2020/11/30 18:05 */ public class NioBufferChannelFileWrite { private static final String msg = "怒发冲冠,凭栏处、潇潇雨歇。抬望眼、仰天长啸,壮怀激烈。三十功名尘与土,八千里路云和月。莫等闲、白了少年头,空悲切。"; public static void main(String[] args) { try { FileOutputStream fileOutputStream = new FileOutputStream(new File("src/main/resources/filedata/data1.txt")); FileChannel fileOutputStreamChannel = fileOutputStream.getChannel(); ByteBuffer fileDataBuffer = ByteBuffer.allocate(1024); ByteBuffer putData = fileDataBuffer.put(msg.getBytes()); //反转 putData.flip(); fileOutputStreamChannel.write(putData); fileOutputStream.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
- 实现的流程图
- ByteBuffer+FileChannel实现从文件中读取到控制台:
/** * @author shenj * @title: NioBufferChannelRead * @projectName NettyPro * @date 2020/11/30 18:55 */ public class NioBufferChannelRead { private static final String filePath = "src/main/resources/filedata/data1.txt"; public static void main(String[] args) { File file = new File(filePath); try { FileInputStream fileInputStream = new FileInputStream(file); FileChannel fileInputStreamChannel = fileInputStream.getChannel(); //获取file的大小,避免浪费内存 ByteBuffer byteDataBuffer = ByteBuffer.allocate((int) file.length()); fileInputStreamChannel.read(byteDataBuffer); //byteDataBuffer.array()将byteBuffer缓冲区中的data变成数组 System.out.println(new String(byteDataBuffer.array())); fileInputStream.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
- 实现的流程图
- Buffer的分散和聚焦:Scattering:将数据写入到buffer时,可以采取buffer数组,依次写入【 分散】Gathering:从buffer读取数据时,可以采用buffer数组,依次读取【聚焦】
- 实例代码(Kotlin):
/** * @title: NioScatingAndGething * @projectName NettyPro * @author shenj * @date 2020/12/1 13:23 */ fun main(args: Array<String>): Unit { val serverSocketChannel = ServerSocketChannel.open(); val inetSocketAddress = InetSocketAddress(9948); //绑定端口到socket上,并启动 serverSocketChannel.socket().bind(inetSocketAddress); //buffer数组,NIO会自动将数据放到数组中,无需操心 val byteBuffer = arrayOfNulls<ByteBuffer>(2) byteBuffer[0] = ByteBuffer.allocate(5) byteBuffer[1] = ByteBuffer.allocate(3) val scoketChannel = serverSocketChannel.accept(); var messageLight = 8; while (true) { var byteRead: Int = 0 while (byteRead < messageLight) { var read = scoketChannel.read(byteBuffer) println("byteRead:" + read) byteRead += read.toInt() Arrays.asList<ByteBuffer>(*byteBuffer).stream().map { buffer: ByteBuffer -> "potion:" + buffer.position() + "limit:" + buffer.limit() }.forEach { x: String? -> println(x) } } Arrays.asList<ByteBuffer>(*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.flip() }) var byteWrite: Long = 0; while (byteWrite < messageLight) { var write = scoketChannel.write(byteBuffer) byteWrite += write } Arrays.asList<ByteBuffer>(*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.clear() }) println("byteRead:" + byteRead + "byteWrite:" + byteWrite + "messagelenght:" + messageLight) } }
- 一个Buffer实现文件的复制:
/** * @author shenj * @title: NioBufferOnlyOneWriteAndRead * @projectName NettyPro * @date 2020/11/30 19:18 */ public class NioBufferOnlyOneWriteAndRead { private static final String filePath1 = "src/main/resources/filedata/data1.txt"; private static final String filePath2 = "src/main/resources/filedata/data2.txt"; public static void main(String[] args) { File file1 = new File(filePath1); File file2 = new File(filePath2); try { FileInputStream fileInputStream = new FileInputStream(file1); FileOutputStream fileOutputStream = new FileOutputStream(file2); FileChannel fileInputStreamChannel = fileInputStream.getChannel(); FileChannel fileOutputStreamChannel = fileOutputStream.getChannel(); ByteBuffer dataBuffer = ByteBuffer.allocate(512); while (-1 != fileInputStreamChannel.read(dataBuffer)) { //读和写之间需要切换 dataBuffer.flip(); fileOutputStreamChannel.write(dataBuffer); //清空Buffer缓冲区的数据 dataBuffer.clear(); } fileInputStream.close(); fileOutputStream.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
- 利用transferFrom拷贝文件:
/** * @author shenj * @title: NioBufferTransorf * @projectName NettyPro * @date 2020/11/30 20:18 */ public class NioBufferTransorf { private static final String filepath1 = "src/main/resources/filedata/data1.txt"; private static final String filepath2 = "src/main/resources/filedata/data2.txt"; public static void main(String[] args) { try { FileInputStream fileInputStream = new FileInputStream(new File(filepath1)); FileOutputStream fileOutputStream = new FileOutputStream(new File(filepath2)); FileChannel fileInputStreamChannel = fileInputStream.getChannel(); FileChannel fileOutputStreamChannel = fileOutputStream.getChannel(); //将目标的通道的数据复制到当前通道,Channel自带数据有效长度size获取 fileOutputStreamChannel.transferFrom(fileInputStreamChannel, 0, fileInputStreamChannel.size()); fileInputStreamChannel.close(); fileOutputStreamChannel.close(); fileInputStream.close(); fileOutputStream.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
- 利用transferFrom拷贝Image(Kotlin):
fun main(args: Array<String>):Unit { val imagePath = "src/main/resources/filedata/sky.jpg"; val copyImageToPath = "src/main/resources/filedata/sky_copy.jpg" val accessFile = RandomAccessFile(imagePath, "rw") val accessFile_copy = RandomAccessFile(copyImageToPath, "rw") val accessFile_channle = accessFile.channel val accesssFile_copy_channel = accessFile_copy.channel accesssFile_copy_channel.transferFrom(accessFile_channle,0,accessFile_channle.size()) }
- Buffer和Channel的注意事项:
- 实例代码:
- NioServer.java
package bigdata.studynio; public class NioServer { public static void main(String[] args) { int port = 8080; if(args != null && args.length < 0){ //port = Integer.valueOf(args[0]); } //创建服务器线程 NioServerWork nioServerWork = new NioServerWork(port); new Thread(nioServerWork, "server").start(); } }
- NioServerWork.java
package bigdata.studynio; import java.io.BufferedReader; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioServerWork implements Runnable { //多路复用器 Selector会对注册在其上面的channel进行;轮询,当某个channel发生读写操作时, //就会处于相应的就绪状态,通过SelectionKey的值急性IO 操作 private Selector selector;//多路复用器 private ServerSocketChannel channel; private volatile boolean stop; /** * @param port * 构造函数 */ public NioServerWork(int port) { try { selector = Selector.open();//打开多路复用器 channel = ServerSocketChannel.open();//打开socketchannel channel.configureBlocking(false);//配置通道为非阻塞的状态 channel.socket().bind(new InetSocketAddress(port), 1024);//通道socket绑定地址和端口 channel.register(selector, SelectionKey.OP_ACCEPT);//将通道channel在多路复用器selector上注册为接收操作 System.out.println("NIO 服务启动 端口: "+ port); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void stop(){ this.stop=true; } @Override public void run() {//线程的Runnable程序 System.out.println("NIO 服务 run()"); while(!stop){ try { selector.select(1000);//最大阻塞时间1s //获取多路复用器的事件值SelectionKey,并存放在迭代器中 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); SelectionKey key =null; //System.out.println("NIO 服务 try"); while(iterator.hasNext()){ System.out.println("NIO 服务 iterator.hasNext()"); key = iterator.next(); iterator.remove();//获取后冲迭代器中删除此值 try { handleinput(key);//根据SelectionKey的值进行相应的读写操作 } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null) key.channel().close(); } } } } catch (IOException e) { System.out.println("NIO 服务 run catch IOException"); e.printStackTrace(); System.exit(1); } } } /** * @param key * @throws IOException * 根据SelectionKey的值进行相应的读写操作 */ private void handleinput(SelectionKey key) throws IOException { System.out.println("NIO 服务 handleinput"); if(key.isValid()){//判断所传的SelectionKey值是否可用 if(key.isAcceptable()){//在构造函数中注册的key值为OP_ACCEPT,,在判断是否为接收操作 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();//获取key值所对应的channel SocketChannel sc = ssc.accept();//设置为接收非阻塞通道 sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ);//并把这个通道注册为OP_READ } if(key.isReadable()){//判断所传的SelectionKey值是否为OP_READ,通过上面的注册后,经过轮询后就会是此操作 SocketChannel sc = (SocketChannel)key.channel();//获取key对应的channel ByteBuffer readbuf = ByteBuffer.allocate(1024); int readbytes = sc.read(readbuf);//从channel中读取byte数据并存放readbuf if(readbytes > 0){ readbuf.flip();//检测时候为完整的内容,若不是则返回完整的 byte[] bytes = new byte[readbuf.remaining()]; readbuf.get(bytes); String string = new String(bytes, "UTF-8");//把读取的数据转换成string System.out.println("服务器接受到命令 :"+ string); //"查询时间"就是读取的命令,此字符串要与客户端发送的一致,才能获取当前时间,否则就是bad order String currenttime = "查询时间".equalsIgnoreCase(string) ? new java.util.Date(System.currentTimeMillis()).toString() : "bad order"; dowrite(sc,currenttime);//获取到当前时间后,就需要把当前时间的字符串发送出去 }else if (readbytes < 0){ key.cancel(); sc.close(); }else{} } } } /** * @param sc * @param currenttime * @throws IOException * 服务器的业务操作,将当前时间写到通道内 */ private void dowrite(SocketChannel sc, String currenttime) throws IOException { System.out.println("服务器 dowrite currenttime"+ currenttime); if(currenttime !=null && currenttime.trim().length()>0){ byte[] bytes = currenttime.getBytes();//将当前时间序列化 ByteBuffer writebuf = ByteBuffer.allocate(bytes.length); writebuf.put(bytes);//将序列化的内容写入分配的内存 writebuf.flip(); sc.write(writebuf); //将此内容写入通道 } } }
- NioClient.java
package bigdata.studynio; public class NioClient { public static void main(String[] args) { int port = 8080; if(args !=null && args.length > 0){ try { //port = Integer.valueOf(args[0]); } catch (Exception e) { // TODO: handle exception } } //创建客户端线程 new Thread(new NioClientWork("127.0.0.1",port),"client").start(); } }
- NioClientWork.java
package bigdata.studynio; import java.io.BufferedReader; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioClientWork implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; /** * @param string * @param port * 构造函数 */ public NioClientWork(String string, int port) { this.host = string == null ? "127.0.0.1":string; this.port = port; try { selector= Selector.open();//打开多路复用器 socketChannel=SocketChannel.open();//打开socketchannel socketChannel.configureBlocking(false); System.out.println("NIO 客户端启动 端口: "+ port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /* (non-Javadoc) * @see java.lang.Runnable#run() */ @Override public void run() { try { doConnect();//客户端线程需要连接服务器 } catch (Exception e) { e.printStackTrace(); System.exit(1); } while(!stop){ try { selector.select(1000);//最大阻塞时间1s //获取多路复用器的事件值SelectionKey,并存放在迭代器中 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); SelectionKey key =null; while (iterator.hasNext()) { key = iterator.next(); iterator.remove(); try { handleinput(key);//获取多路复用器的事件值SelectionKey,并存放在迭代器中 } catch (Exception e) { if(key == null){ key.cancel(); if(socketChannel ==null) socketChannel.close(); } } } } catch (IOException e) { e.printStackTrace(); System.exit(1); } } if(selector !=null){ try { selector.close(); } catch (Exception e) { e.printStackTrace(); } } } /** * @throws IOException * 线程连接服务器,并注册操作 * */ private void doConnect() throws IOException { if(socketChannel.connect(new InetSocketAddress(host, port))){//检测通道是否连接到服务器 System.out.println("NIO 客户端 idoConnect OP_READ "); socketChannel.register(selector, SelectionKey.OP_READ);//如果已经连接到了服务器,就把通道在selector注册为OP_READ dowrite(socketChannel); }else{ System.out.println("NIO 客户端 doConnect OP_CONNECT "); socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果客户端未连接到服务器,则将通道注册为OP_CONNECT操作 } } /** * @param key * @throws IOException * 根据SelectionKey的值进行相应的读写操作 */ private void handleinput(SelectionKey key) throws IOException { //System.out.println("NIO 客户端 handleinput "); if(key.isValid()){//判断所传的SelectionKey值是否可用 //System.out.println("NIO 客户端 isValid() "); SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){//一开始的时候,客户端需要连接服务器操作,所以检测是否为连接状态 System.out.println("NIO 客户端 isConnectable "); if(sc.finishConnect()){//是否完成连接 System.out.println("NIO 客户端 finishConnect "); dowrite(sc);//向通道内发送数据,就是“查询时间” 的命令,读写通道与通道注册事件类型无关,注册事件只是当有事件来了,就会去处理相应事件 sc.register(selector, SelectionKey.OP_READ);//如果完成了连接,就把通道注册为 OP_READ操作,用于接收服务器出过来的数据 }else{ System.out.println("NIO 客户端 not finishConnect "); System.exit(1); } } if(key.isReadable()){//根据上面注册的selector的通道读事件,进行操作 System.out.println("NIO 客户端 isReadable() "); ByteBuffer readbuf = ByteBuffer.allocate(1024); int readbytes = sc.read(readbuf);//获取通道从服务器发过来的数据,并反序列化 if(readbytes > 0){ readbuf.flip(); byte[] bytes=new byte[readbuf.remaining()]; readbuf.get(bytes); String string = new String(bytes, "UTF-8"); System.out.println("时间是: " + string); this.stop=true; //操作完毕后,关闭所有的操作 }else if (readbytes < 0){ key.cancel(); sc.close(); }else{} } } } private void dowrite(SocketChannel sc) throws IOException { byte[] req = "查询时间".getBytes(); ByteBuffer writebuf = ByteBuffer.allocate(req.length); writebuf.put(req); writebuf.flip(); sc.write(writebuf); if(!writebuf.hasRemaining()){ System.out.println("向服务器发送命令成功 "); } } }
- asReadOnlyBuffer例子:
/** * @author shenj * @title: NioBufferOnlyRead * @projectName NettyPro * @date 2020/12/1 9:54 */ public class NioBufferOnlyRead { public static void main(String[] args) { ByteBuffer dataBuffer = ByteBuffer.allocate(5); for (int i = 0; i < dataBuffer.capacity() - 1; i++) { dataBuffer.put((byte) (i * 2)); } dataBuffer.flip(); //可以从一个创建的Buffer获取OnlyReadBuffer ByteBuffer onlyByteBuffer = dataBuffer.asReadOnlyBuffer(); while (onlyByteBuffer.hasRemaining()) { System.out.println(onlyByteBuffer.get()); } //无法往OnlyReadBuffer写数据 // onlyByteBuffer.put((byte)(2)); //可以往dataBuffer中写数据 dataBuffer.put((byte) (2)); } }
- 指定Buffer的读写操作,会生成一个新的Buffer
- MappedByteBuffer:可以实现文件在内存(堆外内存)直接修改,而操作系统无需再拷贝一份,实例代码:
/** * @author shenj * @title: NioBufferAsReadOnlyBuffer * @projectName NettyPro * @date 2020/12/1 11:35 */ public class NioBufferAsReadOnlyBuffer { public static void main(String[] args) { try { RandomAccessFile randomAccessFile = new RandomAccessFile("src/main/resources/filedata/data1.txt", "rw"); FileChannel randomAccessFileChannel = randomAccessFile.getChannel(); MappedByteBuffer map = randomAccessFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, randomAccessFileChannel.size()); map.put(0, (byte) ('H')); randomAccessFile.close(); randomAccessFileChannel.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
- RandomAccessFile处理文本,mode的源码:
String name = (file != null ? file.getPath() : null); int imode = -1; if (mode.equals("r")) imode = O_RDONLY; else if (mode.startsWith("rw")) { imode = O_RDWR; rw = true; if (mode.length() > 2) { if (mode.equals("rws")) imode |= O_SYNC; else if (mode.equals("rwd")) imode |= O_DSYNC; else imode = -1; }
- 由源码可知mode的四种模式对应的功能如下:
Untitled
- MappedByteBuffer属性解析:
public abstract MappedByteBuffer map(MapMode mode, long position, long size) throws IOException;
- mode的三种模式:
/** * Mode for a read-only mapping. */ public static final MapMode READ_ONLY = new MapMode("READ_ONLY"); /** * Mode for a read/write mapping. */ public static final MapMode READ_WRITE = new MapMode("READ_WRITE"); /** * Mode for a private (copy-on-write) mapping. */ public static final MapMode PRIVATE = new MapMode("PRIVATE");
- Selector(多路复用器)1)Java的Nio,用到非阻塞的IO方式。可以用一个线程,处理多个客户端连接,就会使用到Selector(选择器)2)Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式注册到同一个Selector),如果有事件发生,便获取时间让后针对每个事件进行相应的处理,这样就可以只用一个单线程区管理多个通道,也就是管理多个连接和请求
- Selector相关方法
//实现Closeable接口表示拥有自动关闭流的功能 public abstract class Selector implements Closeable { //表示获得一个Selector实例 public static Selector open() throws IOException { //表示设置超时时间,非阻塞,当有事件发生的时候将将注册到相应的SelectionLey中 public abstract int select(long timeout) //表示获得已经注册了所有的Selectkey public abstract Set<SelectionKey> selectedKeys(); //阻塞的方法 public abstract int select() throws IOException; //在未超过select(long timeout)的范围中,可以wakeup唤醒selector public abstract Selector wakeup(); //不阻塞,立即返回 public abstract int selectNow() throws IOException;
- 获得了SelectionKey相当于获得了对应的Channel,SelectionKey和Channel之间是注册关系
- Selectort、SelectionKey、ServerSocketChannel、SocketChannel
- 对上图的说明:
1.当客户端连接时,会通过ServerSocketChannel得到SocketChannel 2.将socketChannel注册到Selector上,register(Selector sel, int ops)一个selector上可以注册多个SocketChannel 3.注册后返回一个SelectionKey,会和该Selector关联(集合) 4.Selector进行监听select方法,返回有事件发生的通道的个数 5.进一步得到各个SelectionKey(有事件发生) 6.在通过SelectionKey反向获取SocketChannel,方法channel() 7.可以通过得到channel,完成业务处理
- 使用三大核心组件编写Server-Client实现上图业务逻辑:
- Server(Java):
/** * @author shenj * @title: NioServer * @projectName NettyPro * @date 2020/12/1 17:57 */ public class NioServer { public static void main(String[] args) { try { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); Selector selector = Selector.open(); serverSocketChannel.socket().bind(new InetSocketAddress(7798)); //设置未非阻塞 serverSocketChannel.configureBlocking(false); //注册selector,设置关注事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { if (selector.select(1000) == 0) { System.out.println("超时1s!"); continue; } //selector>0表示触发了关注事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); iterator.forEachRemaining(s -> { //监听连接事件 if (s.isAcceptable()) { try { SocketChannel socketChannel = serverSocketChannel.accept(); //非阻塞 socketChannel.configureBlocking(false); System.out.println("一个客户端连接成功"+socketChannel.hashCode()); socketChannel.register(selector, OP_READ, ByteBuffer.allocate(1024)); } catch (IOException e) { e.printStackTrace(); } } //监听读事件 if (s.isReadable()) { SocketChannel socketChannel = (SocketChannel)s.channel(); ByteBuffer dataBuffer = (ByteBuffer)s.attachment(); try { socketChannel.read(dataBuffer); System.out.println("来自客户端:"+new String(dataBuffer.array())); } catch (IOException e) { e.printStackTrace(); } dataBuffer.clear(); } iterator.remove(); }); } } catch (IOException e) { e.printStackTrace(); } } }
- Client(Kotlin):
/** * @title: NioClient * @projectName NettyPro * @author shenj * @date 2020/12/1 18:57 */ fun main(args: Array<String>):Unit { val data = "我深深地熟悉你脚步的韵律,它在我心中敲击." val socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); val inetSocketAddress = InetSocketAddress("127.0.0.1",7798) if(!socketChannel.connect(inetSocketAddress)){ while (!socketChannel.finishConnect()){ System.out.println("连接服务器,需要时间....出去溜达会吧~") } } val dataBuffer = ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)) socketChannel.write(dataBuffer) System.`in`.read() }
- SelectionKey API
//注册channel对应的SelectionKey public abstract class SelectionKey { //得到与之与之关联的selector public abstract Selector selector(); //得到与之关联的channel public abstract SelectableChannel channel(); //是否可以读 public final boolean isReadable() //是否可以写 public final boolean isWritable() //是否可以accept public final boolean isAcceptable() //获得与之关联的数据 public final Object attachment() //改变监听事件 public abstract SelectionKey interestOps(int ops);
- SelectionKey的四个重要属性,<<表示位运算向左移动
public static final int OP_READ = 1 << 0;//1 读操作 public static final int OP_WRITE = 1 << 2;//4 写操作 public static final int OP_CONNECT = 1 << 3;//8 已经连接 public static final int OP_ACCEPT = 1 << 4;//16 有新的网络可以accept
- SocketChannel API
- SocketChannel,网络IO通道,具体负责进行读写操作,NIO把缓冲区的数据写入通道,或者把通道的数据读到缓冲区
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel { //获得一个SocketChannel通道 public static SocketChannel open() throws IOException { //设置阻塞或非阻塞 public abstract boolean isConnectionPending(); //连接服务器 public abstract boolean connect(SocketAddress remote) throws IOException; //如果connect方法连接失败,接下来通过该方法进行完成连接操作 public abstract boolean finishConnect() throws IOException; //往通道内写数据 public abstract int write(ByteBuffer src) throws IOException; //往通道里读数据 public abstract int read(ByteBuffer dst) throws IOException; //注册一个选择器斌设置监听事件。最后一个参数可以设置共享数据 public final SelectionKey register(Selector sel, int ops, Object att)
- 多人聊天室
- Server
/** * @author shenj * @title: GroupToChatWithServer * @projectName NettyPro * @date 2020/12/1 22:53 */ public class GroupToChatWithServer { private Selector selector; private ServerSocketChannel serverSocketChannel; private static final int Port = 7799; /** * @author shenj * @title: GroupToChatWithServer * @date 2020/12/2 14:18 * 无参构造 * */ public GroupToChatWithServer() { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(Port)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } /** * @author shenj * @title: listen * @date 2020/12/2 14:13 *多路复用器监听channel通道的事件 */ public void listen() { try { while (true) { int status = selector.select(); if (status > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> key_Iterator = selectionKeys.iterator(); key_Iterator.forEachRemaining(s -> { if (s.isAcceptable()) { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress()+"上线了"); } catch (IOException e) { e.printStackTrace(); } } if(s.isReadable()){ readData(s); } key_Iterator.remove(); }); } else { // System.out.println("等待中...."); } } } catch (Exception e) { e.printStackTrace(); } finally { } } /** * @author shenj * @title: readData * @date 2020/12/2 14:16 * Listen监听到通道的read事件,读取信息 */ private void readData(SelectionKey key){ SocketChannel socketChannel = null; try{ socketChannel = (SocketChannel) key.channel(); ByteBuffer dataBuffer = ByteBuffer.allocate(1024); int readCount = socketChannel.read(dataBuffer); if (readCount>0){ String msg = new String(dataBuffer.array()); System.out.println("from 客户端"+msg); sendInfo(msg,socketChannel); } }catch (Exception e){ try { System.out.println(socketChannel.getRemoteAddress()+"下线了"); key.cancel(); socketChannel.close(); } catch (IOException e2) { e2.printStackTrace(); } } } /** * @author shenj * @title: sendInfo * @date 2020/12/2 14:17 * 发送信息到其他的客户端 */ private void sendInfo(String msg, SocketChannel self){ System.out.println("服务器发送信息中..."); selector.selectedKeys().stream().forEach(s->{ Channel targetChannel = s.channel(); //排除自己发送 if (targetChannel instanceof SocketChannel && targetChannel != self){ SocketChannel dest = (SocketChannel)targetChannel; ByteBuffer dataBuffer = ByteBuffer.wrap(msg.getBytes()); try { dest.write(dataBuffer); } catch (IOException e) { e.printStackTrace(); } } }); } /** * @author shenj * @title: main * @date 2020/12/2 14:19 * 主方法 */ public static void main(String[] args) { GroupToChatWithServer chatWithServer = new GroupToChatWithServer(); chatWithServer.listen(); } }
- Client
/** * @author shenj * @title: GroupClient * @projectName NettyPro * @date 2020/12/2 0:04 */ public class GroupClient { private final String host = "127.0.0.1"; private final int port = 7799; private Selector selector; private SocketChannel socketChannel; private String username; /** * @author shenj * @title: GroupClient * @date 2020/12/2 14:20 * 无参构造 */ public GroupClient() { try { selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress(host,port)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username+"is ok!!!"); } catch (IOException e) { e.printStackTrace(); } } /** * @author shenj * @title: sendInfo * @date 2020/12/2 14:12 * 发送信息到服务端 */ public void sendInfo(String info){ info = username+"说:"+info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (IOException e) { e.printStackTrace(); } } /** * @author shenj * @title: readInfo * @date 2020/12/2 14:11 * 读取服务器发送的信息 */ public void readInfo(){ try { int status = selector.select(); if (status>0){ Iterator<SelectionKey> key_iterator = selector.selectedKeys().iterator(); while (key_iterator.hasNext()) { SelectionKey s = key_iterator.next(); if (s.isReadable()) { SocketChannel socketChannel = (SocketChannel) s.channel(); ByteBuffer dataBuffer = ByteBuffer.allocate(1024); try { socketChannel.read(dataBuffer); String msg = new String(dataBuffer.array()); System.out.println(msg.trim()); } catch (IOException e) { e.printStackTrace(); } } } key_iterator.remove(); }else { // System.out.println("没有可用的通道..."); } } catch (IOException e) { e.printStackTrace(); } } /** * @author shenj * @title: main * @date 2020/12/2 14:20 * 主方法 */ public static void main(String[] args) { GroupClient chatClient = new GroupClient(); new Thread(){ @Override public void run() { while (true){ chatClient.readInfo(); try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //发送给服务端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String str = scanner.nextLine(); chatClient.sendInfo(str); } } }
- 零拷贝:是指没有CPU拷贝
- 实在网络传输优化的重要手段,从OS角度来看,内核缓冲区只有唯一的一份数据,不存在重复,更少使用CPU缓存伪共享以及无CPU校验和计算
- 常用两种零拷贝mmap(内存映射)和sendFile
- 传统io
- 三次上下文切换
- Hard driver→kernel buffer→user buffer→socket buffer→protocol engine
- kernel buffer、user buffer、socket buffer(两次cpu拷贝)
- mmao优化
- 两次上下文切换
- Hard driver→kernel buffer→socket buffer→protocol engine
- mmap直接将Hard driver文件映射到kernel buffer上,kernel buffer和user buffer共享数据,通过直接操作kernel buffer(内核内存)数据,实现文件的操作
- kernel buffer、socket buffer(一次cpu拷贝)
- sendFile
- Linux2.1
- 两次上文切换
- Hard driver→kernel buffer→socket buffer→protocol engine
- kernel buffer 、socket buffer(一次cpu拷贝(拷贝的是元数据,比如:数据长度等))
- CPU copy:cpu拷贝,DMA copy:内存拷贝
- Linux2.4
- Hard driver →kernel buffer—→(socket buffer(copy kernel buffer元数据,比如lenght等,可以忽略))→protocol engine
- kernel buffer 可以直接通过内存拷贝到协议栈protocol engin,但是还是有少量数据需要cpu copy到socket buffer上
- 使用
.transferTo
零拷贝传输文件
- Server
/** * @author shenj * @title: NioServer * @projectName NettyPro * @date 2020/12/2 10:47 */ public class NioServer implements Serializable { private static final String host = "127.0.0.1"; private static final int port = 8899; public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(host, port)); ByteBuffer dataBuffer = ByteBuffer.allocate(4098); while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); int readCount = 0; while (-1 != readCount) { try { //统计读取字节 readCount = socketChannel.read(dataBuffer); }catch (IOException e){ break; } //可以让dataBuffer复用,使position = 0;mark = -1作废 dataBuffer.rewind(); } } } }
- .rewind:表示buffer的倒带,也就是position=0,mark标志位无效
- Client:
/** * @author shenj * @title: NioClient * @projectName NettyPro * @date 2020/12/2 14:38 */ public class NioClient implements Serializable { private static final String fileName = "protoc-3.6.1-win32.zip"; private static final String host = "127.0.0.1"; private static final int port = 8899; public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); //客户端使用connect连接 socketChannel.connect(new InetSocketAddress(host,port)); //文件传输通道 FileChannel fileChannel = new FileInputStream(new File(fileName)).getChannel(); long start = System.currentTimeMillis(); long byteNum = fileChannel.transferTo(0, fileChannel.size(), socketChannel); System.out.println("传输的字节数:"+byteNum+"耗时:"+(System.currentTimeMillis()-start)); fileChannel.close(); } }
AIO编程
- Asynchronous IO:异步非阻塞的编程方式
- 与NIO不同,当进行读写操作时,只需调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并告知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。即可理解为,read/write方法都是异步的,完成后会主动调用回调函数,
- 实例代码:
- Server.java
package com.anxpp.io.calculator.aio.server; /** * AIO服务端 * @author shenj * @version 1.0 */ public class Server { private static int DEFAULT_PORT = 12345; private static AsyncServerHandler serverHandle; public volatile static long clientCount = 0; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) return; serverHandle = new AsyncServerHandler(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ Server.start(); } }
- AsyncServerHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncServerHandler implements Runnable { public CountDownLatch latch; public AsynchronousServerSocketChannel channel; public AsyncServerHandler(int port) { try { //创建服务端通道 channel = AsynchronousServerSocketChannel.open(); //绑定端口 channel.bind(new InetSocketAddress(port)); System.out.println("服务器已启动,端口号:" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //CountDownLatch初始化 //它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞 //此处,让现场在此阻塞,防止服务端执行完成后退出 //也可以使用while(true)+sleep //生成环境就不需要担心这个问题,以为服务端是不会退出的 latch = new CountDownLatch(1); //用于接收客户端的连接 channel.accept(this,new AcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
- AcceptHandler:
package com.anxpp.io.calculator.aio.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; //作为handler接收客户端连接 public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> { @Override public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { //继续接受其他客户端的请求 Server.clientCount++; System.out.println("连接的客户端数:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this); //创建新的Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //异步读 第三个参数为接收消息回调的业务Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } }
- ReadHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //用于读取半包消息和发送应答 private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //读取到消息后的处理 @Override public void completed(Integer result, ByteBuffer attachment) { //flip操作 attachment.flip(); //根据 byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("服务器收到消息: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); }catch(Exception e){ calrResult = "计算错误:" + e.getMessage(); } //向客户端发送消息 doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //发送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //异步写数据 参数与前面的read一样 channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果没有发送完,就继续发送直到完成 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //创建新的Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读 第三个参数为接收消息回调的业务Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
- Client:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("请输入请求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }
- AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //创建异步的客户端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //创建CountDownLatch等待 latch = new CountDownLatch(1); //发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法 clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //连接服务器成功 //意味着TCP三次握手完成 @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("客户端成功连接到服务器..."); } //连接服务器失败 @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("连接服务器失败..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //向服务器发送消息 public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //异步写 clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); } }
- WriteHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成全部数据的写入 if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //读取数据 ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
- ReadHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客户端收到结果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
- Test.java
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * 测试方法 * @author shenj * @version 1.0 */ public class Test { //测试主方法 @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //运行服务器 Server.start(); //避免客户端先于服务器启动前执行代码 Thread.sleep(100); //运行客户端 Client.start(); System.out.println("请输入请求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }
BIO、NIO、AIO对照表
Netty介绍
- Netty是由JBOSS提供的一个Java开源框架
- 开发工作量和难度都非常大,例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。JDK NIO的Bug:例如臭名昭著的Epoll Bug,会导致空轮询,最终CPU被拉满进程到100%。
- Netty是一个异步基于事件驱动的网络应用框架,所以快速开发高性能、高可靠性的网络IO程序
- Netty主要针对在TCP程序,面向Client端的高并发应用,或者Peer-to-Peer场景下的大量数据传输的应用
- Netty3.x(老版)(JDK5)、Netty4.x(稳定版)(JDK6)、Netty5.x(重大Bug被废弃)
- Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景
- 要透彻理解Netty,要先学习NIO
- 同步与异步
- Netty基础架构
- 适用场景
- 分布式系统中,各节点远程调用,高性能RPC框架必不可少,异步高性能的Netty必不可少
- 阿里的分布式服务框架Dubbo的RPC也使用Netty,节点间通信
- 线程模型
- 目前存在的线程模型有:
- 传统阻塞的I/O服务模型
- Reactor模式Reactor对应的叫法:
- 反应器模式
- 分发者模式(Dispatch)
- 通知者模式(notifies)
- 根据Reactor的数量和处理资源线程池的数量不同,有三种典型的实现
- 单Reactor单线程
- 优点:模型简单,没有多线程、进程通信、竞争的问题,全部在一个线程中完成
- 缺点:模型问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在某处个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈,还有伴随可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能read/write,造成节点故障
- 使用场景:客户端有限,业务处理非常快速,比如Redis在处理的时间复杂度O(1)的情况
- 单Reactor多线程
- 工作原理
1) Reactor对象通过 select 监控客户端请求 事件,收到事件后,通过 dispatch进行分发 2)如果建立连接请求,则右 Acceptor通过 accept处理连接请求,然后创建一个 Handler对象 处理完成连接后的各种事件 3)如果不是连接请求,则由 reactor分发调用连接对 应的 handler来处理 4) handler只负责响应事件,不做具体的业务处理 通过read读取数据后,会分发给后面的 worker线程池的某个线程处理业务 5) worker线程池会分配独立线程完成真正的业务并将结果返回给 handler 6) handler收到响应后,通过send将结果返回给client
- 优点:可以充分利用多核CPU的处理能力
- 缺点:多线程数据共享和访问比较复杂,reactor处理所有的事件的监听和响应,在单线程运行。在高并发场景容易出现性能瓶颈
- 主从Reactor多线程
- 工作原理
1) Reactor主线程 MainReactor 对象通过 select监听连接事件,收 到事件后,通过 Acceptor处理连接事件 2)当 Acceptor处理连接事件后, MainReactor将连接分配给 Subreactor 3) subreactor将连接加入到连接队列进行监听,并创建 handler 进行各种事件处理 4)当有新事件发生时, subreactor就会调用对应的 handler处理 5) handler通过read读取数据,分发给后面的 worker线程处理 6) worker线程池分配独立的 worker线程进行业务处理,并返 回结果 7) handler收到响应的结果后,再通过send将结果返回给 client 8) Reactor主线程可以对应多个 Reactor子程,即 Main Recator 可以关联多个 Subreactot
- 优点:父线程与子线程的数据简单职责明确,父线程只需要接受新连接,子线程完成后续的业务处理;父线程和子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据
- 缺点:编程复杂度较高
- Netty线程模式(基于主从Reactor多线程模型做了一定的改进,其中主从Reactor线程有多个Reactor)
- Reactor基础设计模型设计思路
- Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序对IO事件做出反应。
- Handlers:处理程序执行IO事件要完成的实际事件,Reactor通过调度适当的处理程序来响应IO事件,处理程序执行非阻塞操作。
- Netty架构简易版
- 原理:
1) Bossgroup线程维护 Selector,只关注 Accecpt 2)当接收到 Accept:事件,获取到对应的 Socket Channel,封装成 Nioscoket-并 注册到 Worker线程(事件循环),并进行维护 3)当 Worker线程监听到 selector中通道发生自 己感兴趣的事件后,就进行处理(就由 handler),注意 handler已经加入到通道
- Netty是基于主从Reactor多线程的
- Netty架构详细版
- 原理:
1)Nett抽象出两组线程池 Bossgroup专门负责接收客户端的连接, Workergroup专门负责网络的读写 2) Bossgroup和 Workergroup类型都是 Nioeventloopgroup 3) Nioeventloop Group相当于一个事件循环组,这个组中含有多个事件 循环,每一个事件循环是 Nioeventloop 4) Nioeventloop表示一个不断循环的执行处理任务的线程,每 Nioeventloop都有一个 selector,用于监听绑定在其上的 socket的网 络通讯 5) Nioeventloopgroup可以有多个线程,即可以含有多个 Nioeventloop 6)每个 Boss Nioeventloop 1循环执行的步骤有3步 1.轮询 accept事件 2.处理 accept事件,与 client建立连接,生成 Nioscocket Channel,并将 其注册到某个 worker Nioeventloop上的 selector 3.处理任务队列的任务,即 runalltasks 기每个 Worker Nioeventlod循环执行的步骤 1.轮询read, write事件 2.处理o事件,即read, write事件,在对应 Nioscocket Channel A处理 3.处理任务队列的任务,即 runalltasks 8)每个 Worker Nioeventloop处理业务时,会使用 pipeline(管道 pipeline中包含了 channel,即通过 pipeline可以获取到对应通道,管道 中维护了很多的处理器
- Netty入门案例:
- Server(Java):
/** * @author shenj * @title: NettyServer * @projectName NettyPro * @date 2020/12/3 16:27 */ public class NettyServer { public static void main(String[] args) throws Exception { //轮询组 EventLoopGroup bossEventLoop = new NioEventLoopGroup(); EventLoopGroup workerEventLoop = new NioEventLoopGroup(); try{ //创建Server startApplicationConfiguration对象 ServerBootstrap startApplicationConfiguration = new ServerBootstrap(); //线程组 startApplicationConfiguration.group(bossEventLoop, workerEventLoop) //通道选择NioServerSocketChannel作为通道的实现 .channel(NioServerSocketChannel.class) //队列连接个数 .option(ChannelOption.SO_BACKLOG,128) //保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE,true) //创建一个匿名的通道测试对 .childHandler(new ChannelInitializer<SocketChannel>() { //pipeline设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyHandler()); } }); System.out.println("Server is ready...."); //绑定服务器端口并同步,sync表示同步,且开启服务器,生成channelFuture对象 ChannelFuture channelFuture = startApplicationConfiguration.bind(6668).sync(); //对通道关闭进行监听,并且同步 channelFuture.channel().closeFuture().sync(); }finally { //出现异常,优雅的关闭 bossEventLoop.shutdownGracefully(); workerEventLoop.shutdownGracefully(); } } }
- Server(Kotlin):
/** * @title: NettyServer * @projectName NettyPro * @author shenj * @date 2020/12/3 16:15 */ fun main() { //获得MainReactor和WorkerReactor的无限轮询 val mainEventGroup: EventLoopGroup = NioEventLoopGroup() val workerEventGroup: EventLoopGroup = NioEventLoopGroup() try { //创建Server StartApplication 对象 val startApplicationConfiguration = ServerBootstrap() //设置startApplication对象的参数 startApplicationConfiguration.group(mainEventGroup, workerEventGroup) .channel(NioServerSocketChannel::class.java) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(object : ChannelInitializer<io.netty.channel.socket.SocketChannel>() { override fun initChannel(ch: io.netty.channel.socket.SocketChannel) { ch.pipeline().addLast(NettyHandler()) } }) println("Server is ready....") val channelFuture: ChannelFuture = startApplicationConfiguration.bind(7788).sync() // 监听通道关闭事件,同步 channelFuture.channel().closeFuture().sync() } finally { mainEventGroup.shutdownGracefully() workerEventGroup.shutdownGracefully() } }
- Server-Handler
/** * @author shenj * @title: NettyHandler * @projectName NettyPro * @date 2020/12/3 19:50 * 注意:要指定自己的Handler就需要继承netty规定好的某个HandlerAdapter(规范) * 此时的Handler才能称之为一个Handler */ public class NettyHandler extends ChannelInboundHandlerAdapter implements Serializable { /** * @author shenj * @title: channelRead * @date 2020/12/3 19:54 * 1、ChannelHandlerContext:上下文对象,包含pipeline(管道)、channel、地址 * msg:指的是客户端发送的数据,默认Object对象 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server ctx:"+ctx); //ByteBuf是netty特有的Buffer性能比NIO的ByteBuffer更好 ByteBuf dataBuf = (ByteBuf) msg; System.out.println("客户端发送来的消息是:"+dataBuf.toString(CharsetUtil.UTF_8)); System.out.println("客户端的地址是:"+ctx.channel().remoteAddress()); } /** * @author shenj * @title: channelReadComplete * @date 2020/12/4 1:12 * channelRead数据读取完毕的操作 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush=write+Flush,写入缓冲区并且刷新,使用write只是写到缓冲区 //非池化的Unpooled,Unpooled.copiedBuffer拷贝到buffer区指定字符集 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~",CharsetUtil.UTF_8)); } /** * @author shenj * @title: exceptionCaught * @date 2020/12/4 1:18 * 处理异常,一般关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
- Client:
/** * @title: NettyClient * @projectName NettyPro * @author shenj * @date 2020/12/4 1:38 */ fun main() { val host = "127.0.0.1" val port = 6668 val nioEventLoopGroup:EventLoopGroup = NioEventLoopGroup() try { //客户端使用Bootstrap val startClientApplicationConfiguration = Bootstrap() //客户端只需要channel handler startClientApplicationConfiguration.group(nioEventLoopGroup) .channel(NioSocketChannel::class.java) .handler(object : ChannelInitializer<io.netty.channel.socket.SocketChannel>() { override fun initChannel(ch: io.netty.channel.socket.SocketChannel?) { ch?.pipeline()?.addLast(NettyClientHandler()) } }) System.out.println("Client is OK!!!") var channelFuture : ChannelFuture = startClientApplicationConfiguration.connect(host, port).sync() channelFuture.channel().closeFuture().sync() }finally { nioEventLoopGroup.shutdownGracefully() } }
- Client-Handler:
/** * @title: NettyClientHandler * @projectName NettyPro * @author shenj * @date 2020/12/4 1:52 *继承自入栈适配器 */ class NettyClientHandler: ChannelInboundHandlerAdapter() { /** * @author shenj * @title: channelActive * @date 2020/12/4 2:04 * 客户端发送数据给服务器 */ override fun channelActive(ctx: ChannelHandlerContext?) { println("Client ctx:"+ctx) ctx?.writeAndFlush(Unpooled.copiedBuffer("Hi,Server!!!!", CharsetUtil.UTF_8)) } /** * @author shenj * @title: channelRead * @date 2020/12/4 2:04 * 接收服务器发送的数据并显示 */ override fun channelRead(ctx: ChannelHandlerContext?, msg: Any?) { val dataBuf = msg as ByteBuf println("服务器回复的信息:"+dataBuf.toString(CharsetUtil.UTF_8)) println("服务器地址:"+ctx?.channel()?.remoteAddress()) } /** * @author shenj * @title: exceptionCaught * @date 2020/12/4 2:11 * 异常处理 */ override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) { cause?.printStackTrace() ctx?.close() } }
- 任务队列的3种经典场合
- 用户自定义的普通任务
/** * @author shenj * @title: ServerHandler * @projectName NettyPro * @date 2020/12/4 8:18 */ public class ServerHandler extends ChannelInboundHandlerAdapter { /** * @author shenj * @title: channelRead * @date 2020/12/4 8:24 * read */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.channel().eventLoop().execute(()->{ try { Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第二条信息", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.channel().eventLoop().execute(()->{ try { Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第三条信息", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.channel().eventLoop().execute(()->{ { try { Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第四条信息", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("This is debug"); } /** * @author shenj * @title: channelReadComplete * @date 2020/12/4 8:24 * channelReadComplete send Info */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第一条信息",CharsetUtil.UTF_8)); } /** * @author shenj * @title: exceptionCaught * @date 2020/12/4 8:25 * try Bug */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
- 使用idae的Debug可以查看在ctx→pipeline→channel→eventLoop→taskQueue(size)
- 注意:新版的idae2020.3debug查看可能没有数值,右击任意一个变量,点击New Class Level Watch才可以查看详细value
- 用户自定义定时任务
/** * @author shenj * @title: ServerHandler * @projectName NettyPro * @date 2020/12/4 8:18 */ public class ServerHandler extends ChannelInboundHandlerAdapter { /** * @author shenj * @title: channelRead * @date 2020/12/4 8:24 * read */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.channel().eventLoop().execute(()->{ try { Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第二条信息", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.channel().eventLoop().execute(()->{ try { Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第三条信息", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.channel().eventLoop().schedule(()-> { try { Thread.sleep(10*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第四条信息", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } //TimeUint.SECOND是10的单位(s) },10,TimeUnit.SECONDS); System.out.println("This is debug"); } /** * @author shenj * @title: channelReadComplete * @date 2020/12/4 8:24 * channelReadComplete send Info */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第一条信息",CharsetUtil.UTF_8)); } /** * @author shenj * @title: exceptionCaught * @date 2020/12/4 8:25 * try Bug */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
- scheduleTaskQueue并不是普通的TaskQueue,它一种定时的Task,可用通过Debug查看
- 使用schedulerTaskQueue的注意事项
ctx.channel().eventLoop().schedule(()-> { try { Thread.sleep(10*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第四条信息", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } },10,TimeUnit.SECONDS);
- 其中schedule中的delay的时间应该和程序中的线程延时时间保持一致,若设了delay没有进行线程延时到对应的delay将以线程的延时为准。
- 非当前Reactor线程调用Channel的各种方法
try { ServerBootstrap severStartApplicationConfiguration = new ServerBootstrap(); severStartApplicationConfiguration.group(mainLoop, workerLoop) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //可在此处命名一个集合用于装每个客户端的SocketChannel,然后通过Handler的ctx.channel().eventLoop().execute/Scheduler //进行用户标识投放广告 System.out.println("新加入的客户端HasgCode;"+ch.hashCode()); ch.pipeline().addLast(new ServerHandler()); } });
- 异步模型
Netty的异步模型是建立在 future和 callback的之上的。 callback就是回调。重点说 Future,它的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回 显然不合适。那么可以在调用fun的时候,立马返回一个 Future,后续可以通过 Future去监控方法fun的处理过程(即: Future-listener机制)
- Future
1)表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算等 等 2) Channelfuture是一个接口: public interface Channelfuture extends Future<Void 我们可以添加监听器,当监听的事件发生时,就会通知到监听器。
- 示意图
- 说明
1)在使用Nety进行编程时,拦截操作和转换出入站数据只需要提供 callback或利用 future即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。 2) Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来
- Future-Listener机制
1)当 Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 Channelfuture 来获取操作执行的状态,注册监听函数来执行完成后的操作 常见有如下操作 通过 isDone方法来判断当前操作是否完成 通过 inSuccess方法来判断已完成的当前操作是否成功 通过 getCause方法来获取已完成的当前操作失败的原因 通过 isCancelled方法来判断已完成的当前操作是否被取消 通过 addlistener方法来注册监听器,当操作已完成( isdone方法返回完成),将会通知 指定的监听器;如果 Future对象已完成,则通知指定的监听器
- 例子
ChannelFuture channelFuture = severStartApplicationConfiguration.bind(5566).sync(); //添加监听器,监听事件 channelFuture.addListener((channelFutures) ->{ System.out.println(channelFuture.isSuccess() ? "监听5566端口成功":"监听5566端口失败"); }); channelFuture.channel().closeFuture().sync();
- 认识Http请求
- 说明
1、HTTP Request 第一部分是包含的头信息 2、HttpContent 里面包含的是数据,可以后续有多个 HttpContent 部分 3、LastHttpContent 标记是 HTTP request 的结束,同时可能包含头的尾部信息 4、完整的 HTTP request,由1,2,3组成
- 说明
1、HTTP response 第一部分是包含的头信息 2、HttpContent 里面包含的是数据,可以后续有多个 HttpContent 部分 3、LastHttpContent 标记是 HTTP response 的结束,同时可能包含头的尾部信息 4、完整的 HTTP response,由1,2,3组成 从request的介绍我们可以看出来,一次http请求并不是通过一次对话完成的,他中间可能有很次的连接。netty每一次对话都会建立一个channel,并且一个ChannelInboundHandler一般是不会同时去处理多个Channel的。 如何在一个Channel里面处理一次完整的Http请求?这就要用到我们上图提到的FullHttpRequest,我们只需要在使用netty处理channel的时候,只处理消息是FullHttpRequest的Channel,这样我们就能在一个ChannelHandler中处理一个完整的Http请求了。
- Channel知识点补充
- Netty网络通信的组件,能够用于执行网络I/O操作
- 通过Channel可获得当前网络连接的通道状态
- 通过Channel可获得网络连接的配置参数(例如接受缓冲区大小)
- Channel提供异步的网络I/O操作(例如连接操作,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成
- 调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时调用通知调用方
- 支持关联I/O操作对应的处理程序
- 不同协议、不同的阻塞类型的链接都有不同的Channel类型与之对应,常用的Channel类型:
- NioSocketChannel:异步的客户端TCP Socket连接
- NioServerSocketChannel:异步的服务器端TCP Socket连接
- NioDatagramChannel:异步的UDP连接
- NioSctpChannel;异步的客户端Sctp连接
- NioSctpServerChannel:异步的Sctp服务器端连接,这些通道涵盖了UDP和TCP网络IO以及文件IO
- ChannelHandler及其实现类
- ChannelHandler是一个结构,处理IO事件或拦截IO操作,并转发到其他ChannelPipline(业务处理链)中的下一个处理程序
- ChannelHandler及其实现类一览图
- ChannelPipeline提供ChannelHandler链的容器,以客户端程序为例,如果事件的运动方向是从客户端到服务端,则称这些事件为出站,数据会通过pipeline(管道)一些列ChannelOutboundHandler,被Handler处理,反过来称之为入站。
- ChannelInboundHandlerAdapter
//通道注册之后 public void channelRegistered(ChannelHandlerContext ctx) //注销通道 public void channelUnregistered(ChannelHandlerContext ctx) //通道就绪 public void channelActive(ChannelHandlerContext ctx) //通道未处于活动 public void channelInactive(ChannelHandlerContext ctx) //通道读取事件 public void channelRead(ChannelHandlerContext ctx, Object msg) //通道读取事件之后 public void channelReadComplete(ChannelHandlerContext ctx) //异常处理 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- ChannelPipeline是ChannelHandler的List,ChannelPipeline是实现高级的拦截器过滤模式,完全控制事件。
- 在Netty中每个Channel都有有且仅有一个ChannelPipeline与之对应
- 入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler会不干扰
- 一个 Channel包含了一个 Channelpipeline,而 Channelpipeline中又维护了一个由 ChannelhandlerContext组成的双向链表,并且每个 Channelhandlercontext中又关联着一个 Channelhandle
- Pipeline和ChannelPipeline
- 常用方法
//添加到双链表的首个 ChannelPipeline addFirst(String name, ChannelHandler handler); //添加到双链表的尾部 ChannelPipeline addLast(String name, ChannelHandler handler);
- EventLoopGroup和其实现子类NioEventLoopGroup
public NioEventLoopGroup() //继承自多线程组MultithreadEventExecutorGroup的线程关闭 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
- Unpooled类
- Netty提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类
- 常用方法如下
- 注意Netty中的ByteBuf不像普通的ByteBuffer一样需要flip(缓冲区状态反转)
- 原因:ByteBuf底层维护了readerIndex(下一个读的位置)和writeIndex(下一个写的位置)
- 当使用readByte读取的时候是使用readerIndex,也就是readerIndex会变化
for (int i = 0; i <dataBuf.capacity() ; i++) { System.out.println(dataBuf.readByte()); }
- 当使用getByte(index i)读取时候,readerIndex不会有变化
for (int i = 0; i < dataBuf.capacity(); i++) { dataBuf.writeByte(i); }
- ByteBuf常用方法
//获取ByteBuf的容量大小 public abstract int capacity(); //在原有Buf基础上设置新的容量,并返回一个Buf,注意:原来的Buf也将变化 public abstract ByteBuf capacity(int newCapacity); //最大可设置 public abstract int maxCapacity(); //read的索引 public abstract int readerIndex(); //在原有Buf基础上设置readIndex,并返回一个Buf,注意:原来的Buf也将变化 public abstract ByteBuf readerIndex(int readerIndex); //在原有Buf基础上设置writeIndex,并返回一个Buf,注意:原来的Buf也将变化 public abstract ByteBuf writerIndex(int writerIndex); //读取Buf中从指定位置,读取指定长度且指定字符集的字符序列String public abstract CharSequence getCharSequence(int index, int length, Charset charset);
- 指定字符集的方式
Charset.forName("UTF-8") CharsetUtil.UTF_8
- Netty群聊案例
- 服务端
@SuppressWarnings("ALL") public class GroupChatServer { /** * @author shenj * @title: variable * @date 2020/12/7 14:29 * 注意:channelGroup用于存储handlerAdded中的客户端的channel */ private int port; private static final EventLoopGroup mainLoop = new NioEventLoopGroup(); private static final EventLoopGroup workerLoop = new NioEventLoopGroup(); private static final ServerBootstrap startApplicaiontConfiguration = new ServerBootstrap(); private static ChannelFuture channelFuture = null; private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * @author shenj * @title: GroupChatServer * @date 2020/12/7 14:32 * constructor */ public GroupChatServer(@NotNull int portIn) { this.port = portIn; } /** * @author shenj * @title: run * @date 2020/12/7 14:33 * body */ public void run() throws InterruptedException { try { //配置channel、option、chilHandler startApplicaiontConfiguration.group(mainLoop, workerLoop) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入编解码器 pipeline.addLast("stringDecoder", new StringDecoder()); pipeline.addLast("stringEncoder", new StringEncoder()); //SimpleChannelInboundHandler<I>中的I表示发送数据类型【需要记忆】 pipeline.addLast("ServerHandler", new SimpleChannelInboundHandler<String>() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * @author shenj * @title: handlerAdded * @date 2020/12/7 14:17 * 客户一旦加入就执行的的方法 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //先发送再加入自己加入组,就可以避免发送给自己 channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new Date()) + " \\n"); channelGroup.add(channel); } /** * @author shenj * @title: channelActive * @date 2020/12/7 14:38 * 通道被激活表示上线了,在服务端显示 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("[客户端]" + ctx.channel().remoteAddress() + " 上线了~"); } /** * @author shenj * @title: channelInactive * @date 2020/12/7 14:48 * channel不活跃、在服务端显示离线 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("[客户端]" + ctx.channel().remoteAddress() + " 离线了~"); } /** * @author shenj * @title: handlerRemoved * @date 2020/12/7 14:50 * 发送给其他在线的所有客户端,当handlerRemoved触发将会从channleGroup的移除 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { channelGroup.writeAndFlush("[客户端]" + ctx.channel().remoteAddress() + " 离开了~"); } /** * @author shenj * @title: channelRead0 * @date 2020/12/7 14:18 * 读取方法 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(customer -> { customer.writeAndFlush(customer != channel ? "[客户端]" + customer.remoteAddress() + "发送了消息:" + msg + "\\n" : "[自己]发送发送了消息:" + msg + "\\n"); }); } /** * @author shenj * @title: exceptionCaught * @date 2020/12/7 14:44 * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }); } }); System.out.println("Server is runing ~"); channelFuture = startApplicaiontConfiguration.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } finally { mainLoop.shutdownGracefully(); workerLoop.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatServer(8881).run(); } }
- 客户端
/** * @author shenj * @title: GroupClient * @projectName NettyPro * @date 2020/12/7 15:17 */ public class GroupClient { private String ip; private int port; private static final EventLoopGroup workerLoop = new NioEventLoopGroup(); private static final Bootstrap startApplicationClient = new Bootstrap(); public GroupClient(String ipIn, int portIn){ this.ip = ipIn; this.port = portIn; } public void run() throws InterruptedException { try { startApplicationClient.group(workerLoop) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("stringDecoder", new StringDecoder()); pipeline.addLast("stringEncoder", new StringEncoder()); pipeline.addLast("ClientHandler", new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }); } }); ChannelFuture channelFuture = startApplicationClient.connect(ip, port).sync(); Channel channel = channelFuture.channel(); System.out.println("------------"+channel.remoteAddress()+"------------"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String msg = scanner.next(); channel.writeAndFlush(msg+"\\r\\n"); } }finally { workerLoop.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupClient("127.0.0.1", 8881).run(); } }
- 知识补充
handlerAdded(服务端向客户端发送信息) channelActive(服务端Console控制台打印Info) channelInactive(服务端Console控制台打印Info) handlerRemoved(服务端向客户端发送信息)
- 从客户端连接服务器到关闭客户端和服务器的链接,是以上方法四个先后顺序。
- Netty心跳检测机制
.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("Decoder", new StringDecoder()); pipeline.addLast("Encoder", new StringEncoder()); /** * IdleStateHandler 处理空闲的处理器 * readerIdleTime 表示多久没有读,就会发送一个心跳包进行检测是否连接 * writeIdleTime 表示多久没有写,就会发送一个心跳包进行检测是否连接 * alIdleTIme 表示既没有读有没有写,就会发送一个心跳包进行检测是否连接 * 当IdleStateHandler触发之后,就会传递给管道pipeline下一个handler处理,通过回调下一个handler的 * userEventTiggered,在该方法中处理 */ pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS)); pipeline.addLast(new HeartCheckHandler()); } });
- HeartCheckHandler
/** * @author shenj * @title: HeartCheckHandler * @projectName NettyPro * @date 2020/12/8 2:11 */ public class HeartCheckHandler extends ChannelInboundHandlerAdapter { /** * @author shenj * @title: userEventTriggered * @date 2020/12/8 2:12 * ctx山下文,evt事件 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; String msg = null; switch (event.state()){ case READER_IDLE: msg = "读空闲"; break; case WRITER_IDLE: msg = "写空闲"; break; case ALL_IDLE: msg = "读写空闲"; break; default: break; } System.out.println(ctx.channel().remoteAddress()+"----时间超时----"+msg); //服务器做相应的处理... //关闭通道就是失去心跳检测 ctx.channel().close(); } } }
- 需要使用心跳检测的事件处理就要重写userEventTriggered方法
- 添加日志处理器
//加入日志处理器LoggingHandler,且日志级别为LogLevel.INFO .handler(new LoggingHandler(LogLevel.INFO))
- WebSocket长连接
- ChannelInitializer<SocketChannel>()
protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //http编解码器 pipeline.addLast("codec", new HttpServerCodec()); //以块的方式写,http是分段的 pipeline.addLast("chunkwrite", new ChunkedWriteHandler()); //段的聚合,将分段的数据聚合,这就http请求多次的原因 pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); /** * 1.WebSocket以帧的形式传输 * 2.WebSocketFrame六个子类 * 3.浏览器请求时ws://localhost:port/msg请求的uri * 4.WebSocketServerProtocolHandler将协议升级为ws协议 */ pipeline.addLast("", new WebSocketServerProtocolHandler("/msg"));
- 使用WebSocketServerProtocolHandler
1.WebSocket以帧的形式传输 2.WebSocketFrame六个子类 3.浏览器请求时ws://localhost:port/msg请求的uri 4.WebSocketServerProtocolHandler将协议升级为ws协议
- new SimpleChannelInboundHandler<TextWebSocketFrame>()
@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("服务器收到的消息:"+msg.text()); ctx.writeAndFlush(new TextWebSocketFrame("服务器时间:"+ LocalDate.now()+"回送信息"+msg.text())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //asLogText表示全局唯一,asShortText()不是全局的可能重复 System.out.println("handlerAdded 方法被调用"+ctx.channel().id().asLongText()); System.out.println("handlerAdded 方法被调用"+ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 方法被调用"+ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生"+cause.getMessage()); ctx.close(); }
- 注意获取ChannelId的时候获取asLogText表示全局唯一,asShortText不是全局唯一的可能会有重复
- WebSocket案例
- WebServer1
/** * @author shenj * @title: WebServer1 * @projectName NettyPro * @date 2020/12/8 9:00 */ public class WebServer1 { private int port; private static final EventLoopGroup masterGroup = new NioEventLoopGroup(); private static final EventLoopGroup slaveGroup = new NioEventLoopGroup(); private static final ServerBootstrap service = new ServerBootstrap(); public WebServer1(int port) { this.port = port; } public void run() throws InterruptedException { try{ service.group(masterGroup, slaveGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //http编解码器 pipeline.addLast("codec", new HttpServerCodec()); //以块的方式写,http是分段的 pipeline.addLast("chunkwrite", new ChunkedWriteHandler()); //段的聚合,将分段的数据聚合,这就http请求多次的原因 pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); /** * 1.WebSocket以帧的形式传输 * 2.WebSocketFrame六个子类 * 3.浏览器请求时ws://localhost:port/msg请求的uri * 4.WebSocketServerProtocolHandler将协议升级为ws协议 */ pipeline.addLast("", new WebSocketServerProtocolHandler("/msg")); //WebSocketFrame页面送回的数据类型 pipeline.addLast("myhandler", new SimpleChannelInboundHandler<TextWebSocketFrame>() { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //不使用msg.text()而使用toString()是无法正常数据的 System.out.println("服务器收到的消息:"+msg.text()); ctx.writeAndFlush(new TextWebSocketFrame("服务器时间:"+ LocalDate.now()+"回送信息"+msg.text())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //asLogText表示全局唯一,asShortText()不是全局的可能重复 System.out.println("handlerAdded 方法被调用"+ctx.channel().id().asLongText()); System.out.println("handlerAdded 方法被调用"+ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 方法被调用"+ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生"+cause.getMessage()); ctx.close(); } }); } }); System.out.println("Server is ready......"); ChannelFuture channelFuture = service.bind(this.port).sync(); channelFuture.channel().closeFuture().sync(); }finally { masterGroup.shutdownGracefully(); slaveGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new WebServer1(6633).run(); } }
- index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>WebSocketTmp</title> </head> <body> <script> var socket; //检查浏览器是否支持websocket if (window.WebSocket) { socket = new WebSocket("ws://localhost:6633/msg"); socket.onmessage = function (event) { var readText = document.getElementById("showText"); readText.value = readText.value + "\\n" + event.data; } //感知连接的开启 socket.onopen = function (event) { var readText = document.getElementById("showText"); readText.value = "连接开启!"; } //感知连接关闭 socket.onclose = function (event) { var readText = document.getElementById("showText"); readText.value = readText.value + "\\n" + "连接关闭~"; } } else { alert("当前浏览器不支持WebSocket~~") } //获取send中infomation值 function send(infomation) { if (!window.socket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(infomation) } else { alert("连接没有开启哦~") } } </script> //指定表单不自动提交return false <form onsubmit="return false"> <textarea name="infomation" style="height: 300px;width: 300px"></textarea> <input type="button" value="发送信息" onclick="send(this.form.infomation.value)"> <textarea id="showText" style="height: 300px;width: 300px"></textarea> <input type="button" value="清空内容" onclick="document.getElementById('showText').value=''"> </form> </body> </html>
- Netty编解码机制
- 网络中传输 二进制传输
- codec(编解码器)组成部分有两个:decoder(解码器)和encoder(编码器)
- Netty自身提供的一些codec(StringEncoder、ObjectEncoder、StringDEcoder、ObjectDecoder)存在许多问题
- 无法跨语言
- 序列化后的体积太大,是二进制的5倍
- 序列化性能太低
- 引出新的解决方案【Google的Protobuf】
- Protobuf
- Google开源项目,是一种轻便高效的数据化存储格式,可以用于结构化数据串行化(序列化),适合做数据存储或RPC(远程过程调用)数据交换格式
- http+json→tcp+protobuf
- 跨语言、跨平台
- 高性能、高可靠
- 仓库坐标
<!-- <https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java> --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency>
- 不同版本规范不一样
- 使用protoc.exe —java_out=. *.proto
- 使用protobuf生成类(idea安装protobuf的插件,语法提示)
- 首先先编写Student.proto
syntax = "proto3";//对应proto的版本 option java_outer_classname = "StudentPoJo"; //外部类的名称 //protobuf使用message存储数据 message Student{ //生成内部类Student,是真正发送的对象 //int32、string是proto的数据类型,需要查看官网的对应java的数据类型 int32 id = 1;//并不是表示赋值,而是表示其的序号为1,1 string name =2; }
- 使用Student.proto生成类
protoc.exe —java_out=. Student.proto
- 将生成StudentPoJo.java,其中部分内容如下
package icu.lookyousmileface.netty.simple.codec;// Generated by the protocol buffer compiler. DO NOT EDIT! // source: Student.proto public final class StudentPoJo { private StudentPoJo() {} public static void registerAllExtensions( com.google.protobuf.ExtensionRegistryLite registry) { } public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { registerAllExtensions( (com.google.protobuf.ExtensionRegistryLite) registry); } public interface StudentOrBuilder extends // @@protoc_insertion_point(interface_extends:Student) com.google.protobuf.MessageOrBuilder { ................
- 案例[使用protobuf生成的类封装数据通信]:
- Server:
/** * @author shenj * @title: CodecServer * @projectName NettyPro * @date 2020/12/8 18:04 */ public class CodecServerSimp { private static int port = 4466; public static void main(String[] args) throws InterruptedException { EventLoopGroup masterGroup = new NioEventLoopGroup(); EventLoopGroup slaveGroup = new NioEventLoopGroup(); try { ServerBootstrap service = new ServerBootstrap() .group(masterGroup, slaveGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .handler(new LoggingHandler(LogLevel.INFO)) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //指定对象进行解码 pipeline.addLast("protobufDecoder", new ProtobufDecoder(StudentPoJo.Student.getDefaultInstance())); pipeline.addLast("myHandler", new SimpleChannelInboundHandler<StudentPoJo.Student>() { @Override protected void channelRead0(ChannelHandlerContext ctx, StudentPoJo.Student msg) throws Exception { System.out.println("[客户端] id = "+msg.getId()+" name = "+msg.getName()); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好客户端", Charset.forName("UTF-8"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }); } }); ChannelFuture channelFuture = service.bind(port).sync(); System.out.println("Server is runing ~"); channelFuture.channel().closeFuture().sync(); } finally { masterGroup.shutdownGracefully(); slaveGroup.shutdownGracefully(); } } }
- 其中服务器在平时代码的基础上多了如下:
- protobuf的解码
pipeline.addLast("protobufDecoder", new ProtobufDecoder(StudentPoJo.Student.getDefaultInstance()));
- 使用new SimpleChannelInboundHandler<StudentPoJo.Student>(),使方法中获取对象的参数更加友好
@Override protected void channelRead0(ChannelHandlerContext ctx, StudentPoJo.Student msg) throws Exception { System.out.println("[客户端] id = "+msg.getId()+" name = "+msg.getName()); }
- Client:
/** * @author shenj * @title: CodecClient * @projectName NettyPro * @date 2020/12/8 18:28 */ public class CodecClientSimp { public static void main(String[] args) throws InterruptedException { EventLoopGroup slaveGroup = new NioEventLoopGroup(); try { Bootstrap service = new Bootstrap() .group(slaveGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast(new SimpleChannelInboundHandler<StudentPoJo.Student>() { @Override protected void channelRead0(ChannelHandlerContext ctx, StudentPoJo.Student msg) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf dataBuf = (ByteBuf) msg; System.out.println("[服务器]:" + dataBuf.toString(CharsetUtil.UTF_8)); System.out.println("[服务器]地址:" + ctx.channel().remoteAddress()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { StudentPoJo.Student finker = StudentPoJo.Student.newBuilder().setId(1024).setName("Finker").build(); ctx.writeAndFlush(finker); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }); } }); ChannelFuture channelFuture = service.connect("127.0.0.1", 4466).sync(); channelFuture.channel().closeFuture().sync(); } finally { slaveGroup.shutdownGracefully(); } } }
- 相比平时的客户端多了如下:
- protobuf的编码
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
- protobuf生成类的对象的创建与数据的传输,使用内部类
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { StudentPoJo.Student finker = StudentPoJo.Student.newBuilder().setId(1024).setName("Finker").build(); ctx.writeAndFlush(finker); }
- 使用Protobuf传输多种数据类型
- CrateClass.proto
syntax = "proto3"; option optimize_for = SPEED;//加速解析 //option java_package = "icu.lookyousmileface.netty.simple.codec2";//指定生成到那个包底下 option java_outer_classname = "MyDataInfo";//外部类名称 //使用message管理其他的message message MyMessage{ enum DataType{ StudentType = 0;//在proto3要求enum的编号从0开始 WorkerType = 1; } //用data_type来标识是那一个枚举类型 DataType data_type = 1; //表示每次枚举醉倒只能出现其中一个,节省空间 oneof dataBody{ Student student = 2; Worker worker = 3; } } message Student{ int32 id = 1; string name = 2; } message Worker{ string name = 1; int32 age = 2; }
- 生成外部类MyDataInfo.java
package icu.lookyousmileface.netty.simple.codec2;// Generated by the protocol buffer compiler. DO NOT EDIT! // source: CrateClass.proto public final class MyDataInfo { private MyDataInfo() {} public static void registerAllExtensions( com.google.protobuf.ExtensionRegistryLite registry) { }.........
- 在protobuf的案例的基础上如下修改
- Client
//数据类型为MyDataInfo.MyMessage pipeline.addLast(new SimpleChannelInboundHandler<MyDataInfo.MyMessage>() {
public void channelActive(ChannelHandlerContext ctx) throws Exception { int num = new Random().nextInt(3); if (0 == num){//student ctx.writeAndFlush(MyDataInfo.MyMessage.newBuilder() .setDataType(MyDataInfo.MyMessage.DataType.StudentType) .setStudent(MyDataInfo.Student.newBuilder().setId(20192020).setName("Jack").build()).build()); System.out.println("[客户端]类型:学生"); }else{ ctx.writeAndFlush(MyDataInfo.MyMessage.newBuilder() .setDataType(MyDataInfo.MyMessage.DataType.WorkerType) .setWorker(MyDataInfo.Worker.newBuilder().setAge(88).setName("Mack").build()).build()); System.out.println("[客户端]类型:工作者"); } }
- Server
//设置服务端解码对象 pipeline.addLast("protobufDecoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
//设置服务端的数据对象 pipeline.addLast("myHandler", new SimpleChannelInboundHandler<MyDataInfo.MyMessage>() {
@Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception { MyDataInfo.MyMessage.DataType dataType = msg.getDataType(); if (dataType == MyDataInfo.MyMessage.DataType.StudentType) { System.out.println("[客户端属性:学生]:id = " + msg.getStudent().getId() + " name = " + msg.getStudent().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("您好,你传输过来的类型是student类型", Charset.forName("UTF-8"))); } else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) { System.out.println("[客户端属性:工作者]:age = " + msg.getWorker().getAge() +" name = "+msg.getWorker().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("您好,你传输过来的类型是worker类型", Charset.forName("UTF-8"))); }else { System.out.println("客户端传输的类型错误!!!"); ctx.writeAndFlush(Unpooled.copiedBuffer("xxx客户端你好,您传输的数据类型有误!" , Charset.forName("UTF-8"))); } }
- 基本说明
- Netty的组件设计:主要组件:Channel、EventLoop、ChannelFuture、ChannelHandler、Channelpipeline等
- ChannelHandler充当处理入站和出站数据的应用程序逻辑的容器。
- ChannelPipeline提供ChannelHandler链的容器
- 编解码器&Handler链
- Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHandler或者ChannelOutboundHandler接口。对于每个入站Channel读取的信息,已经重写的channelRead方法就会被调用,之后解码器提供的decod()方法就进行解码,并讲已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler
- 例如:ByteToMessageDecoder
- 关系继承图
- 由于可能知道远程接待你是否会一次性发送一个完整的信息,tcp会出现粘包拆包的问题,这类就会对入站数据进行缓冲,直到它准备好被处理。
- 处理方法
- 案例(客户端→服务器)
- Server
/** * @author shenj * @title: NettyServer * @projectName NettyPro * @date 2020/12/9 15:18 */ public class NettyServer { private int port; public NettyServer(int port) { this.port = port; } public void run() throws InterruptedException { EventLoopGroup master = new NioEventLoopGroup(); EventLoopGroup slave = new NioEventLoopGroup(); try { ServerBootstrap service = new ServerBootstrap() .group(master, slave) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("MyDecoder", new ByteToMessageDecoder() { private int count = 0; /** * @author shenj * @title: decode * @date 2020/12/9 16:03 * ctx:上下文对象 * in:客户端传送过来的data * out:结果List * decode会根据接收的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多的可读字节位置 * 如果List out 不为空,就会讲list的内容传递给下一个channelinboundhandler处理,该处理器方法也可能被调用多次 */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //需要判断8不然会编码混乱 if (in.readableBytes() >= 8) { out.add(in.readLong()); } count += 1; System.out.println("decode被调用 :" + count); } }); //这里的泛型用decode中的类型Long pipeline.addLast("MyHandler", new SimpleChannelInboundHandler<Long>() { private int count = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { count += 1; System.out.println("[客户端" + count + "]:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }); } }); System.out.println("Server is Runing~"); ChannelFuture channelFuture = service.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } finally { master.shutdownGracefully(); slave.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new NettyServer(1100).run(); } }
- 注意:
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //需要判断8不然会编码混乱 if (in.readableBytes() >= 8) { out.add(in.readLong()); } count += 1; System.out.println("decode被调用 :" + count); }
- 其中若in.readableByte不大于等于8就会出现编码混乱
- decode会根据接收的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多的可读字节位置,如果List out 不为空,就会讲list的内容传递给下一个channelinboundhandler处理,该处理器方法也可能被调用多次。
pipeline.addLast("MyHandler", new SimpleChannelInboundHandler<Long>()
- MyHandler中的泛型数据类型要对应不然会无法输出
- Client
/** * @author shenj * @title: NettyClient * @projectName NettyPro * @date 2020/12/9 15:18 */ public class NettyClient { private String address; private int port; public NettyClient(String address, int port) { this.address = address; this.port = port; } public void run() throws InterruptedException{ EventLoopGroup slave = new NioEventLoopGroup(); try { Bootstrap service = new Bootstrap() .group(slave) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("MyEncode", new MessageToByteEncoder<Long>() { @Override protected void encode(ChannelHandlerContext ctx,Long msg, ByteBuf out) throws Exception { System.out.println("调用encode!"); out.writeLong(msg); } }); pipeline.addLast("MyHandler", new SimpleChannelInboundHandler<Long>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { } /** * @author shenj * @title: channelActive * @date 2020/12/9 18:19 * 1.bcadbcabcabcbacbac 18字节 * 2.这个处理器的上一个处理器是内部类(MyEncode)处理器, * 兵器上一个内部类(MyEncode)处理器的父类是MessageToByteEncoder, * 并且有一个wire方法如下: * public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { * ByteBuf buf = null; * try { * if (acceptOutboundMessage(msg)) { * 判读是不是传输的类型(Long)的不是就ctx.write(msg, promise);跳过直接把msg发出 * 是的话就是调用 encode(ctx, cast, buf);编码 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(123456L); // ctx.writeAndFlush(Unpooled.copiedBuffer("bcadbcabcabcbacbac", Charset.forName("UTF-8"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }); } }); ChannelFuture channelFuture = service.connect(address, port).sync(); channelFuture.channel().closeFuture().sync(); }finally { slave.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new NettyClient("127.0.0.1", 1100).run(); } }
- 注意:
pipeline.addLast("MyEncode", new MessageToByteEncoder<Long>()
- 数据类型要对应
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(123456L); //tx.writeAndFlush(Unpooled.copiedBuffer("bcadbcabcabcbacbac", Charset.forName("UTF-8"))); }
- 这个方法需要追溯到上一个处理器(MyEncode)的父类中的write中if (acceptOutboundMessage(msg)) {方法 对传输数据类型的判断,从而选择是否执行实现类的encode方法。
- 使用ReplayingDecoder这类作为Decode,就不必调用readableByte方法,参数S指定了用户状态管理的类型,其中Void表示不需要状态管理
//解码,Void表示不需要自动管理 pipeline.addLast("MyDecoder", new ReplayingDecoder<Void>() { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readLong()); System.out.println("被调用Decode"); } });
- ReplayingDecoder虽使用方便,但是有局限性,并不是所有的ByteBuf操作都是支持的,调用了一个不被支持的方法就会抛出UnsupportedOperationException,ReplayingDecoder在某些场景可能慢于ByteToMessageDecoder,比如网站缓慢,消息格式复杂时,就会白拆成多个碎片,速度变慢。
- 其他解码器
- LineBasedFrameDecode:它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据
- DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
- HttpObjectDecoder:一个Http传输的解码器
- LengthFieldBasedFrameDecoder:通过指定的长度来标识整包消息,资源就可以自动的处理粘包和半包信息。
........
- 整合Log4j日志管理
- build.gradle
compile group: 'log4j', name: 'log4j', version: '1.2.17' compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.26' compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.26'
- log4j.properties
log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%p] %c{1} - %m%n
- Tcp粘包和拆包
- Tcp是面向连接的,面向流的,提供高可靠服务。会根据优化算法(Nagle算法),将多次间隔较小且数据量小的数据,合成一个大的数据块,进行封包,这样虽然提高了效率,但是接收端就难以辨别完整的包,因为面向流的通信四无消息保护边界的。
- 因为是无消息边界的,就需要在接收端处理消息边界的问题,如下就是粘包、拆包的问题
- TCP粘包和拆包的解决方案
- 使用自定义协议+编解码器来解决
- 关键就是要解决服务器每次读取数据长度的问题。
- 将每一段的数据和其数据的长度分装到一个对象的属性中使用对象传输。
- 案例:
- Client.java
pipeline.addLast("MyHandler", new SimpleChannelInboundHandler<PacksData>() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { String msg = "怒发冲冠,凭栏处、潇潇雨歇。抬望眼、仰天长啸,壮怀激烈。三十功名尘与土,八千里路云和月。莫等闲、白了少年头,空悲切。"; byte[] bytes = msg.getBytes(CharsetUtil.UTF_8); int length = msg.getBytes(CharsetUtil.UTF_8).length; //创建协议包对象 PacksData packsData = new PacksData(); packsData.setLen(length); packsData.setConnect(bytes); ctx.writeAndFlush(packsData); } }
pipeline.addLast("MyEncode", new MessageToByteEncoder<PacksData>() { @Override protected void encode(ChannelHandlerContext ctx, PacksData msg, ByteBuf out) throws Exception { System.out.println("调用客户端的encode方法!"); out.writeInt(msg.getLen()); out.writeBytes(msg.getConnect()); } });
- Server.java
pipeline.addLast("MyDecode", new ReplayingDecoder<PacksData>() { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //获得数据长度 int lenght = in.readInt(); //根据数据长度获取数完整包的数据块 byte[] bytes = new byte[lenght]; in.readBytes(bytes); PacksData packsData = new PacksData(); packsData.setLen(lenght); packsData.setConnect(bytes); out.add(packsData); }
pipeline.addLast("MyHandler", new SimpleChannelInboundHandler<PacksData>() { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, PacksData msg) throws Exception { System.out.println("[客户端]发送的数据长度:"+msg.getLen()); System.out.println("[客户端]发送的数据是:"+new String(msg.getConnect(), CharsetUtil.UTF_8)); System.out.println("[客户端]传递的数据次数为:"+(++this.count)); ctx.writeAndFlush(msg); }
- PacksData.java
public class PacksData { private int len; private byte[] connect; public PacksData() { }.................................
- Netty服务器启动剖析
- io.netty.example下有很多的源码案例
- doBindNetty进阶学习(三):Netty#doBind()的理解_sanding的博客-CSDN博客
- 重点:
@Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));
- 掌握了一种新的无限循环模式:for(;;)=while(true)
- Netty启动过程梳理
3. Netty-Netty 启动过程 - Netty接收客户端请求
Netty接收客户端请求过程源码分析_弄懂原理+良好代码功底=无敌-CSDN博客
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
- Pipeline Handler HandlerContext创建源码剖析
Netty源码阅读--Pipeline、Handler HandlerContext创建源码剖析_弄懂原理+良好代码功底=无敌-CSDN博客 - Pipeline调度handler剖析
Netty源码阅读-ChannelPipeline调度Handler的源码剖析_弄懂原理+良好代码功底=无敌-CSDN博客 - Netty心跳(CheckHeart)服务源码剖析Netty源码--心跳源码阅读_弄懂原理+良好代码功底=无敌-CSDN博客
- 无论是否是数据传输过慢,第一次都会超时。
- 纳秒是秒的十亿分之一。
- Netty核心组件EventLoop源码剖析Netty核心技术及源码剖析-EventLoop源码剖析
- selector的select方法,默认是阻塞一秒钟,如有定时任务,则定时任务剩余时间的基础上加0.5秒进行阻塞。当执行execute方法的时候,也就是添加任务的时候,唤醒selector,防止selector阻塞时间过长。
for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; }
- 当processSelectedKeys方法执行结束后,则按照ioReatio的比例执行runAllTasks方法,默认是IO时间和非IO任务时间是相同的,可以自行根据应用特点调优,如非IO任务较多,那么就见ioRatio调小一点,这样非IO任务就能执行长一点。防止队列积攒过多的任务。
- Handler中加入线程池和Context中加入线程池源码剖析
- 在Netty中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响Netty对Socket的处理速度
- 解决方法就是将耗时任务加入到异步线程池中,但就添加线程池这部操作来见可以有两种方式,而且区别也较大。
- 处理耗时业务的第一种方式—-handler中加入线程池
- 例子
public class ServerHandler extends ChannelInboundHandlerAdapter { //创建异步线程 static final EventExecutorGroup exectorGroup = new DefaultEventExecutorGroup(16); /** * @author shenj * @title: channelRead * @date 2020/12/4 8:24 * read */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("线程id"+Thread.currentThread().getName()); exectorGroup.submit(new Callable<Object>() { @Override public Object call() throws Exception { try { Thread.sleep(5*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("来自服务器的第二条信息" +Thread.currentThread().getName(), CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } return null; } });
- 在Handler中添加指定数量的线程池,使用线程池的submit方法在重写call方法的时候加入业务即可完成,注意这时候call的线程已经和外面的线程不是一个线程了。
- 处理耗时业务的第二种方式—-Context中加入线程池
- 例子
public class NettyServer { //创建异步线程池 static final EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(2); public static void main(String[] args) throws InterruptedException { EventLoopGroup mainLoop = new NioEventLoopGroup();
.childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //可在此处命名一个集合用于装每个客户端的SocketChannel,然后通过Handler的ctx.channel().eventLoop().execute/Scheduler //进行用户标识投放广告 System.out.println("新加入的客户端HasgCode;"+ch.hashCode()); //将Handler和创建的异步xain ch.pipeline().addLast(executorGroup ,new ServerHandler()); } });
- 两种方式的比较
- 第一种方式在Handler中添加异步,比较自由,比如访问数据库,那就异步,如果不需要,那就不异步,异步会拖长接口响应时间,业务需要将任务放进mpscTask中。如果IO时间短,task很多,可能一个循环下来,都没时间执行整个task,导致响应时间达不到标。
- 第二种方式是Netty标准方式(即加入到队列),但是,这么做将整个Context都交给业务线程池,不论耗时不耗时,都不够灵活。
- Netty实现Dubbo RPC
- RPC基本
- RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。
- 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
- RPC远程调图如下:
- 注意:在RPC 中, Client 叫服务消费者,Server 叫服务提供者
- 常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud
- 案例模拟实现Dubbo的RPC:
- 由于工程代码过多托管到GItee上了,连接如下:
Admin/NettyPro - 架构原理图
- 👋2020/12/9总结