正文
一、什么是netty
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
二、BIO &NIO &AIO的概念
BIO
所谓IO即input和output的缩写,是对数据的流入和流出的一种抽象。我们平时说的IO大多指的是BIO。BIO(BlockingIO)是一个同步阻塞的IO,是面向流传输的。阻塞的IO当没有获取到数据时(没有读写操作)会一直处于等待阻塞状态,占用系统资源。虽然可以使用多线程解决,但是频繁(来一个请求创建一个线程)的创建线程对我们的CPU就很不友好,于是NIO就应运而出。
NIO
NIO是同步非阻塞IO(NonBlocking IO),是面向缓存区,非阻塞的IO并不会等待直到获取到数据之后返回,而是不管有没有数据都会立即响应结果。
NIO的三个组件
Channel(通道):通常NIO的操作都是由通道开始的,channel类似于流,每个channel对应一个buffer缓冲区,buffer底层是个数组。Channel会注册到Selector(选择器),由Selector根据channel读写事件的发生将其交给空闲的线程处理。
Buffer(缓冲区):buffer就是一个数组,本质上相当于一个内存区,读取数据时并不会一个字节一个字节的读取,而是将数据先写入到buffer(缓存区),再统一的写到硬盘上。
Selector(选择器):选择器也可以叫做多路复用器(选择那个线程执行),可以对应一个或多个线程。NIO 的Buffer和Channel都是既可以读也可以写的。
AIO
AIO是一个异步非阻塞IO,由操作系统完成后回调通知服务端程序启用线程去处理,一般使用于连接数较多且连接时间长的应用。
三者区别
老张爱喝茶,废话不说,煮开水。 出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
老张把水壶放到火上,立等水开。(同步阻塞BIO) 老张觉得自己有点傻
老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞NIO) 老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀~~~~的噪音。
老张把响水壶放到火上,立等水开。(异步阻塞) 老张觉得这样傻等意义不大
老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞AIO) 老张觉得自己聪明了。
三、为什么使用Netty
Jdk的Nio API复杂,而且还有空轮训的bug。
Netty封装了NIO的API,API简单易用,开发门槛低。
内置很多编解码功能,支持多种主流协议。
性能高,社区活跃。
四、代码
BIO实现服务器&客户端
package com.xiaojie.bio; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Description: bio服务器模拟 * @author: xiaojie * @date: 2021.07.29 */ public class BioServer { static int count = 0; public static void main(String[] args) throws Exception { //创建一个线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //创建serversocket ServerSocket serverSocket = new ServerSocket(); //绑定端口号 serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 9999)); System.out.println("等待发送消息。。。。。进入阻塞状态。。。。。"); while (true) { final Socket accept = serverSocket.accept(); final PrintWriter printWriter = new PrintWriter(accept.getOutputStream()); executorService.submit(() -> { count++; System.out.println("count..............." + count); //获取OutputStream try { byte[] bytes = new byte[1024]; if (accept.getInputStream().read(bytes) > 0) { System.out.println("接收到客户端的消息是" + new String(bytes).trim()); } //服务端响应 String responMsg = "我是服务端响应信息:hello client"; printWriter.write(responMsg); printWriter.flush(); } catch (IOException e) { e.printStackTrace(); } finally { /* try { serverSocket.close(); accept.close(); printWriter.close(); } catch (IOException e) { e.printStackTrace(); }*/ } }); } } }
package com.xiaojie.bio; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; /** * @Description:客户端 * @author: xiaojie * @date: 2021.07.29 */ public class BioClient { public static void main(String[] args) throws Exception { //创建socket Socket socket = new Socket(); //连接 socket.connect( new InetSocketAddress(InetAddress.getLocalHost(),9999)); //创建PrintWriter PrintWriter printWriter = new PrintWriter(socket.getOutputStream()); //读回数据 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //发送的请求数据 String requeMsg="我是客户端发送请求:hello server"; //客户端写出数据 printWriter.write(requeMsg); printWriter.flush(); //读回数据 // String readLine = bufferedReader.readLine(); byte[] bytes = new byte[1024]; int read = socket.getInputStream().read(bytes); if (read>0){ System.out.println("服务端返回消息是:"+new String(bytes).trim()); } while(true){ } //关闭连接 // socket.close(); // printWriter.close(); // bufferedReader.close(); } }
NIO实现服务器&客户端
nio执行过程
创建一个ServerSocketChannel和Selector,将serverSocketChannel注册到Selector上
selector通过select()方法监听channel事件,当客户端连接时selector监听到连接事件,获取到ServerSocketChannel注册时绑定的selectionKey
selectionKey通过channel()方法可以获取绑定的ServerSocketChannel
ServerSocketChannel通过accept()方法得到SocketChannel
将SocketChannel注册到Selector上,关心read事件
注册后返回一个SelectionKey,会和该SocketChannel关联
selector继续通过select()方法监听事件,当客户端发送数据给服务端,selector监听到read事件,获取到SocketChannel注册时绑定的selectionKey
selectionKey通过channel()方法可以获取绑定的socketChannel
将socketChannel里的数据读取出来
用socketChannel将服务端数据写回客户端
package com.xiaojie.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /* nio执行过程 1、创建一个ServerSocketChannel和Selector,将serverSocketChannel注册到Selector上 2、selector通过select()方法监听channel事件,当客户端连接时selector监听到连接事件,获取到ServerSocketChannel注册时绑定的selectionKey 3、selectionKey通过channel()方法可以获取绑定的ServerSocketChannel 4、ServerSocketChannel通过accept()方法得到SocketChannel 5、将SocketChannel注册到Selector上,关心read事件 6、注册后返回一个SelectionKey,会和该SocketChannel关联 7、selector继续通过select()方法监听事件,当客户端发送数据给服务端,selector监听到read事件,获取到SocketChannel注册时绑定的selectionKey 8、selectionKey通过channel()方法可以获取绑定的socketChannel 9、将socketChannel里的数据读取出来 10、用socketChannel将服务端数据写回客户端 */ /** * @Description: * @author: xiaojie * @date: 2021.07.29 */ public class NioServer { /* * 初始化 * @param port * @todo * @author xiaojie * @date 2021/7/29 14:17 * @return void */ private Selector selector; public void initServer(int port) throws IOException { //创建ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置通道为非阻塞 serverSocketChannel.configureBlocking(false); //绑定端口号 serverSocketChannel.bind(new InetSocketAddress(port)); //获取通道管理器 this.selector = Selector.open(); /*将通道管理器和通道绑定,并为通道注册SelectionKey.OP_ACCEPT事件, *当该事件到达时,selector.select()会返回,如果该事件没有到达selector.select()会一直阻塞 */ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } /* * 监听 * @todo * @author xiaojie * @date 2021/7/29 14:26 * @return void */ public void listen() throws IOException { while (true) { int select = selector.select(); if (select == 0) { continue; } //获取选中的迭代器 Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); //删除防止重复处理 iterator.remove(); if (key.isAcceptable()) { //客户端请求连接事件 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); //获取客户端连接的通道 SocketChannel channel = serverSocketChannel.accept(); //设置非阻塞 channel.configureBlocking(false); //在与客户端连接成功之后,给通道设置权限 channel.register(this.selector, SelectionKey.OP_READ); } else if (key.isReadable()) { read(key); } } } } /* * 读取数据 并返回结果 * @todo * @author xiaojie * @date 2021/7/29 14:41 * @return void */ public void read(SelectionKey key) throws IOException { //服务器可读取消息,得到事件发生的通道 SocketChannel socketChannel = (SocketChannel) key.channel(); //创建缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(512); socketChannel.read(byteBuffer); byte[] array = byteBuffer.array(); String msg = new String(array); System.out.println("服务端获取数据:" + msg.trim()); String res = "我是服务端响应信息:hello client"; ByteBuffer buffer = ByteBuffer.wrap(res.getBytes("utf-8")); //将消息发给客户端 socketChannel.write(buffer); } public static void main(String[] args) throws IOException { NioTcpServer server = new NioServer(); server.initServer(9999); server.listen(); } }
package com.xiaojie.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NioClient { Selector selector; public static void main(String[] args) throws IOException { NioClient nioClient = new NioClient(); nioClient.initClient("127.0.0.1",9999); nioClient.connection(); } public void initClient(String ip,int port) throws IOException { // 获取一个socket 通道 SocketChannel socketChannel = SocketChannel.open(); // 设置通道为非阻塞 socketChannel.configureBlocking(false); // 获取一个通道管理器 this.selector = Selector.open(); // 客户端连接服务器,其实方法执行并没实现连接,需要在listen()方法中 // 调用channel.finishConnection才能完成连接 socketChannel.connect(new InetSocketAddress(ip,port)); // 将管道管理器和通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件 socketChannel.register(this.selector, SelectionKey.OP_CONNECT); } public void connection() throws IOException{ // 轮询访问selector while(true){ // 选择一组可以进行I/O操作的事件,放在selector中,客户端该方法不会阻塞 // 这里和服务端的方法不一样,查看api注释可以知道,服务端当至少一个通道被选中时 // selector的wakeup方法被调用,方法返回,而对于客户端来说,通道是一直被选中的 this.selector.select(); Iterator<SelectionKey> it = this.selector.selectedKeys().iterator(); while (it.hasNext()){ SelectionKey key = it.next(); it.remove(); // 连接事件发生 if(key.isConnectable()){ SocketChannel socketChannel =(SocketChannel) key.channel(); // 如果正在连接则完成连接 if(socketChannel.isConnectionPending()){ socketChannel.finishConnect(); } // 设置成非阻塞 socketChannel.configureBlocking(false); // 向服务器发送信息 ByteBuffer byteBuffer = ByteBuffer.wrap("我是客户端发送请求:hello server".getBytes()); socketChannel.write(byteBuffer); // 连接成功之后注册读取服务器信息事件 socketChannel.register(this.selector,SelectionKey.OP_READ); }else if(key.isReadable()){ read(key); } } } } public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = channel.read(byteBuffer); if(len!=-1){ System.out.println("接收到服务端信息:"+new String(byteBuffer.array(),0,len)); } } }
Netty实现服务器&客户端
1、创建ServerBootStrap实例。 2、设置并绑定Reactor线程池:EventLoopGroup,EventLoop就是处理所有注册到本线程的Selector上面的Channel。 3、设置并绑定服务端的channel。 4、创建处理网络事件的ChannelPipeline和handler,网络时间以流的形式在其中流转,handler完成多数的功能定制:比如编解码 SSl安全认证。 5、绑定并启动监听端口。 6、当轮训到准备就绪的channel后,由Reactor线程:NioEventLoop执行pipline中的方法,最终调度并执行channelHandler。
package com.xiaojie.netty.server; import com.sun.webkit.EventLoop; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /* * 1、创建ServerBootStrap实例 2、设置并绑定Reactor线程池:EventLoopGroup,EventLoop就是处理所有注册到本线程的Selector上面的Channel 3、设置并绑定服务端的channel 4、5、创建处理网络事件的ChannelPipeline和handler,网络时间以流的形式在其中流转,handler完成多数的功能定制:比如编解码 SSl安全认证 6、绑定并启动监听端口 7、当轮训到准备就绪的channel后,由Reactor线程:NioEventLoop执行pipline中的方法,最终调度并执行channelHandler */ /** * @Description: * @author: xiaojie * @date: 2021.07.28 */ public class Nettyserver { public static void main(String[] args) { //创建主线程池,接受请求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); //创建work线程池,处理请求 NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { //创建serverbootstrap ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class).group(bossGroup, workGroup) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)) .addLast(new ServerHandler()); } }); ChannelFuture future =serverBootstrap.bind(8090).sync(); System.out.println("starting..................."); //wait until server is closed future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //关闭线程池 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
package com.xiaojie.netty.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @Description: * @author: xiaojie * @date: 2021.07.28 */ public class ServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception { // 接受我们的数据 ByteBuf byteBuf = (ByteBuf) o; String request = byteBuf.toString(CharsetUtil.UTF_8); System.out.println("获取到客户端的请求:" + request); // 响应内容: ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端响应信息:hello client", CharsetUtil.UTF_8)); } }
package com.xiaojie.netty.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @Description: netty客户端 * @author: xiaojie * @date: 2021.07.29 */ public class NettyClient { public static void main(String[] args) { //创建线程池 NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //创建bootstrap Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class) .group(eventLoopGroup).remoteAddress("127.0.0.1",8090) .handler(new ChannelInitializer<SocketChannel>() { //处理消息 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ClientHandler()); } }); //开始连接 try { ChannelFuture channelFuture = bootstrap.connect().sync(); ChannelFuture sync = channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //关闭 eventLoopGroup.shutdownGracefully(); } } }
package com.xiaojie.netty.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @Description: 客户端处理handler * @author: xiaojie * @date: 2021.07.29 */ public class ClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf=(ByteBuf) msg; String respon = byteBuf.toString(CharsetUtil.UTF_8); System.out.println("获取到服务器端响应数据是:"+respon); } /* * * @param ctx * @在建立连接并准备传输数据时调用 * @author xiaojie * @date 2021/7/30 10:00 * @return void */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i=0;i<10;i++){ ctx.writeAndFlush(Unpooled.copiedBuffer("我是客户端发送请求:hello server", CharsetUtil.UTF_8)); } } }
四、粘包和拆包
在netty服务器的代码中运行后结果如下图,这种现象我们叫粘包/拆包。产生的原因是应用层面使用了Netty,但是对于操作系统来说,只认TCP协议,尽管我们的应用层是按照 ByteBuf 为 单位来发送数据,server按照Bytebuf读取,但是到了底层操作系统仍然是按照字节流发送数据,因此,数据到了服务端,也是按照字节流的方式读入,然后到了 Netty 应用层面,重新拼装成 ByteBuf,而这里的 ByteBuf 与客户端按顺序发送的 ByteBuf 可能是不对等的。因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。拆包和粘包是相对的,一端粘了包,另外一端就需要将粘过的包拆开。说直白点就是ByteBuf(缓存区的锅),当我们发送的数据小于ByteBuf(缓存区)的大小时,就会发生粘包(发送的数据多条合并成一条),当发送的数据大于ByteBuf(缓存区)的大小时,就会发生拆包(一次发送的数据没有完全包含我们要发送的数据)。
解决粘包拆包Netty自带四中拆包器
LineBasedFrameDecoder: 行拆包器,在发送数据时以换行符(\n)作为分隔,接收端通过LineBasedFrameDecoder 将粘包的byteBuf进行拆包。
FixedLengthFrameDecoder:固定长度的拆包器,顾名思义就是针对每个数据包长度固定。
DelimiterBasedFrameDecoder:分隔符拆包器,可以自定义分隔符。
LengthFieldBasedFrameDecoder:基于长度域的拆包器。
代码如下:
参考:
https://baike.baidu.com/item/Netty/10061624?fr=aladdin
https://blog.csdn.net/Zev_java/article/details/113932466
https://blog.csdn.net/u012250875/article/details/78341874
https://www.cnblogs.com/shineman-zhang/articles/13884407.html