第一章:引言
一:什么是Netty
Netty是一个异步的事件驱动的网络应用框架,用于维快速开发可维护的高性能协议服务器和客户端
1:事件驱动
服务端监控不同的事件:Accept、Read 、Write事件,只有包含对应的事件,我们才会有所操作,如果没有对应的事件,我们就在那里阻塞,此操作核心依赖的就是Selector
2:异步处理
异步:一个服务端面临多个客户端,当我们的客户端链接上服务端之后,就要进行相应的io通信,这个Worker线程可能就要去处理各种各样的业务数据处理,如果我们等到所有的业务逻辑都处理完之后再返回给客户端的话,这样的时间就会比较久,丧失了为别人服务的可能了。
所以我们有了如下的处理:
当客户端访问客户端进行IO通信的时候,Netty立刻给你响应,启用别的线程辅助去做这个事,不影响Worker处理其他客户端的请求,处理完了在返回给客户端。这个效率会更高。
Netty、Ngnix、Node.js 处理服务端操作的时候,都采用异步处理,将接收到的数据交给后续线程进行处理。他就可以服务更多的客户端,并发性更好。
1):异步处理引发的问题
实际操作的线程如何将处理完的结果返回给客户端呢?
在我们前端学习当中,我们学过了Ajax技术在运行的过程中,他就是发了请求之后,你们后台自己玩,我先处理自己的事,你们完事了把结果给我。这就是Ajax,这跟异步处理的逻辑是一摸一样的。Ajax是如何做的呢?通过了一个最普通的回调用的方式。把结果拿回来。回调,
当我们的助手线程处理完之后,进行回调,将数据返回给客户端,作为调用着,把回调给传给辅助线程,让辅助线程处理完成业务之后能进行回调将结果返回。助手线程多了,降低了Worker的压力,Worker可以接待更多的客户,助手线程将处理完事的结果给到Worker,返回给前端。
二:为什么使用Netty
为什么使用Netty不用原生的NIO?
Netty是完成网络通信的框架底层封装了NIO
NIO存API 复杂难用,尤其是ByteBuffer的指针切来切去的,在Netty当中封装成了好用的ByteBuf
NIO自设计的可靠性不易保证保证,断线重连、半包粘包、网络拥塞统统需要自己考虑
空轮询会导致CPU飙高。
补充说明:
我们用的都是Tomcat服务器,服务器自己底层都做了网络通信的封装了,我们只做上层Web应用开发,我们接触不到网络通信的。
当前很多的分布式的技术,只要是涉及到跨进程的都需要通信,通信基本上都需要Netty。
Spring cloud Gateway封装了Spring WebFlux,此技术底层就是基于Netty通信的。为什么这些技术都是使用netty,他们都是搭建集群了,都是进程间的通信,都是网络通信,网络通信基于Netty很好。
分布式系统值得就是多进程的系统架构。多进程可以放到一台物理机上,也可以放到多台物理机上。进程间通信是要走网络的,是要使用TCP协议的,同一个机器上的网络通信也是要走网络的,也是基于TCP协议。
后期我们要研究的Dubbo也是在Netty的基础之上 在此封装了一层。
三:谁在用Netty
1. 框架,GRPC、Dubbo、Spring WebFlux、Spring Cloud Gateway 2. 大数据,Spark、Hadoop、Flink 3. 消息队列,RocketMQ、ActiveMQ 4. 搜索引擎,Elasticsearch 5. 分布式协调器,Zookeeper 6. 数据库,Cassandra、Neo4j 7. 负载均衡,Ribbon
第二章:Netty的第一个应用
一:依赖添加
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.45.Final</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.9</version> </dependency>
二:Netty与NIO对照映射
ServerBootStrap相当于是封装了这main函数中的这个服务类
SocketChannel和ServerSocketChannel是Netty对这个玩意做了封装新的叫做NioServerSocketChannel+NIOSocketChannel。封装了之后,可以对Mac epoll做了很好的支持。
EventLoopGroup(事件循环组)这个是一个非常核心的组,通过死循环方式监控的Event: NIO_Accept_Read_Write。EventLoop 对应其中的一个线程,这个是一个包含单线程的线程池。
监控:Accept事件的死循环(Boss线程 对应一个线程)
监控:其他事件死循环(worker线程 对应多个线程)
Group对应一个组线程,对应一组线程或者一组worker这样的线程。group是多个线程的管理者。
Handler处理器
当我们监控到了读写操作之后,进行的处理,这个后置处理都是Handler处理的。
解码:ByteBuffer转字符串,编码:字符串数据转ByteBuffer,也包括业务处理。
Netty对于Handler封装,在设计类的过程中,各司其职,各个类型职责单一一个类型只做一件事。
编码解码和业务处理等操作都设计成了一个一个的类:
编码一个类:Deconder
解码一个类:Enconder
业务处理类:一个类或者几个类:
PipeLine流水线
多个Handler配合使用,完成业务处理。
三:第一个程序
public class MyNettyServer { public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); //创建了一组线程 通过死循环 监控状态 accept read write serverBootstrap.group(new NioEventLoopGroup());//创建了一组,后边会进行指定个数 //ServerSocketChannel SocketChannel //childHandler 配置hanlder,ChannelInitializer需要配置一个管道的初始化器。这个初始化器初始化的是SocketChannel //为什么要为这个管道做初始化?SocketChannel已经接通了,状态已经监控了,该进行后续通过流水线用Handler对管道进行处理了。 //拿到每一个管道, serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override /* channel 接通 监控 accept rw 处理 通过流水线 用handler进行处理 。 */ protected void initChannel(NioSocketChannel ch) throws Exception { //ByteBuf 字节--->字符 //ch.pipeline().addLast(new StringDecoder()); //任何读取数据的Handler都叫做 ChannelInboundHandlerAdapter,他给我们提供了适配器设计模式,我们就直接使用了。 // ch.pipeline().addLast(new StringDecoder());这个Handler处理完之后,就会将数据传递给channelRead中的msg当中。 //基于适配器模式,提供给我们的自定义的Handler ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override //msg是将上部解码之后得到的,传递过来的。 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //可以在这里使用各种JSON库转JAva对象啦。 //经过第一步之后,反过来的数据是解码的字符串数据 System.out.println("result = " + msg); } }); } }); serverBootstrap.bind(8000); } }
public class MyNettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); //Netty客户端为什么引入多线程呢?也要使用这个NioEventLoopGroup //因为在客户端当中,客户端做的也是多线程,异步的。链接服务器也做成了一个线程,通信也做成了一个线程 //基于这样异步化的处理,性能更好。 //异步处理IO操作。 //避免我第一次通信的时间过长,导致后续通信的问题。 //链接服务器和IO操作,进行独立了。为什么要独立呢?因为某一次IO操作,时间比较长,会影响后续操作 //通道用的是一个通道,因为链接了一次,基于多个线程,可以保证后续IO不受先前IO影响。 // 为什么client 引入事件 循环 组 // client Netty 做多线程 异步 // 连接服务端 一个线程 // 通信 做成一个线程 // 异步处理 连接 ---> // IO操作 bootstrap.group(new NioEventLoopGroup()); //增加一个Handler,我们基于这个处理数据,或者发送数据。 bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { //只涉及一个编码 ch.pipeline().addLast(new StringEncoder()); } }); //链接服务端。这是一个异步的处理,先连上之后, ChannelFuture connect = bootstrap.connect(new InetSocketAddress(8000)); //这一步操作就是:等你连上,在进行通讯。 connect.sync();//先让这个线程进行阻塞,等待这个线程连上服务器。阻塞的目的就是必须等到服务器连上之后,才能进行后续的两部的操作。 //创建了新的线程 进行写操作 Channel channel = connect.channel(); channel.writeAndFlush("hello suns"); } }
服务端不用解码器,我们获取到的是什么,说明,肯定是ByteBuffer,因为没有经过解码,一定是在ByteBuffer当中。
public class MyNettyServer { public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); //创建了一组线程 通过死循环 监控状态 accept read write serverBootstrap.group(new NioEventLoopGroup());//创建了一组,后边会进行指定个数 //ServerSocketChannel SocketChannel //childHandler 配置hanlder,ChannelInitializer需要配置一个管道的初始化器。这个初始化器初始化的是SocketChannel //为什么要为这个管道做初始化?SocketChannel已经接通了,状态已经监控了,该进行后续通过流水线用Handler对管道进行处理了。 //拿到每一个管道, serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override /* channel 接通 监控 accept rw 处理 通过流水线 用handler进行处理 。 */ protected void initChannel(NioSocketChannel ch) throws Exception { //ByteBuf 字节--->字符 //ch.pipeline().addLast(new StringDecoder()); //任何读取数据的Handler都叫做 ChannelInboundHandlerAdapter,他给我们提供了适配器设计模式,我们就直接使用了。 // ch.pipeline().addLast(new StringDecoder());这个Handler处理完之后,就会将数据传递给channelRead中的msg当中。 //基于适配器模式,提供给我们的自定义的Handler ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override //msg是将上部解码之后得到的,传递过来的。 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //可以在这里使用各种JSON库转JAva对象啦。 ByteBuf buffer = (ByteBuf) msg; String result = ((ByteBuf) msg).toString(Charset.defaultCharset()); System.out.println("result = " + result); } }); } }); serverBootstrap.bind(8000); } }
服务端日志:
2022-11-26 23:12:14.223 [nioEventLoopGroup-2-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@19a8d3a 2022-11-26 23:12:14.226 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x62734baf, L:/192.168.1.4:8000 - R:/192.168.1.4:57136] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f 20 73 75 6e 73 |hello suns | +--------+-------------------------------------------------+----------------+ result= hello suns 2022-11-26 23:12:14.227 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x62734baf, L:/192.168.1.4:8000 - R:/192.168.1.4:57136] READ COMPLETE
pipeline当中一定是有顺序的,不想用Netty提供的,在自定义Buffer当中去自己进行处理即可。
第三章:Netty关键组件
一:这是一个回顾
上节课代码进行回顾:
1:代码回顾
public class MyNettyServer { public static void main(String[] args) { //了解套路,细化内容,为了源码做准备。 //获取服务端 ServerBootstrap serverBootstrap = new ServerBootstrap(); //获取设置他的Channel serverBootstrap.channel(NioServerSocketChannel.class); //设置他的GROUP,我们吧有的监控对象Accept,write,read进行多线程的设计 //NioEventLoopGroup是一个管理单位。管理:EventLoop -- > 实际上就是Worker,对应一个线程 //他里边的每一个线程都有一个Selector serverBootstrap.group(new NioEventLoopGroup()); //当我们接收到数据,或者写数据之前,我们通过handler来进行处理。提供pipeLie管理多个Handler //后续Handler的使用是一个核心内容。与客户端通信是Handler解决的。 serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { //专门进行读写操作的channel,不负责简历链接。 @Override protected void initChannel(NioSocketChannel ch) throws Exception { //首先添加解码的操作。 ch.pipeline().addLast(new StringDecoder());//内置Handler //自定义开发的Handler,基于适配器模式。 //接收数据是inbound,写数据是outBound //当我们开发某一个Handler的时候,我们只需要关注某一个方法,这个时候适配器设计模式就很方便了。 ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //我们实现一个方法,read,这个时候,上边会把梳理好的数据给到我们,这里是一个典型的接口回调。 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //我们进行一个数据处理 System.out.println("msg = " + msg); } }); //我们还可以这样写 -- > ch.pipeline().addLast(xxx,xxx,xxx,xxx,..); ch.pipeline().addLast(new StringDecoder(),new ChannelInboundHandlerAdapter(){ //我们实现一个方法,read,这个时候,上边会把梳理好的数据给到我们,这里是一个典型的接口回调。 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //我们进行一个数据处理 System.out.println("msg = " + msg); } }); } }); serverBootstrap.bind(8000); } }
客户端代码:
public class MyNettyClient { public static void main(String[] args) throws InterruptedException { //他的这种编程模式,和传统的模式会有一些区别 //1:统一变成模型 /* 1:变成模型:变成的一套路,编程的一个模板 * 2:为什么统一变成模型?可以简化开发 * Netty来讲也是统一了变成模型,后续方便我们学习记忆 */ Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(new NioEventLoopGroup()); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }); //connet方法使用了一个新的线程,来开辟连接, ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000)); // 为什么要同步呢?阻塞等待 //我们只有在这里等待链接返回之后,我门才能够进行操作,所以这里必须阻塞一下,等着链接返回,这个过程是同步的。 //所以,这里必须阻塞一下。 //拿取链接的时候才是真正异步化操作。 future.sync(); Channel channel = future.channel(); channel.writeAndFlush("hello suns"); } }
前端控制台的输出日志:
"C:\Program Files\Java\jdk1.8.0_241\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2022.1.1\lib\idea_rt.jar=53955:C:\Program Files\JetBrains\IntelliJ IDEA 2022.1.1\bin" -Dfile.encoding=GBK -classpath "C:\Program Files\Java\jdk1.8.0_241\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_241\jre\lib\rt.jar;D:\code\study\netty-core-02\target\test-classes;C:\Users\Administrator\.m2\repository\io\netty\netty-all\4.1.45.Final\netty-all-4.1.45.Final.jar;C:\Users\Administrator\.m2\repository\junit\junit\4.13.2\junit-4.13.2.jar;C:\Users\Administrator\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-classic\1.2.9\logback-classic-1.2.9.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-core\1.2.9\logback-core-1.2.9.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-webmvc\5.3.14\spring-webmvc-5.3.14.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-aop\5.3.14\spring-aop-5.3.14.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-beans\5.3.14\spring-beans-5.3.14.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-context\5.3.14\spring-context-5.3.14.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-core\5.3.14\spring-core-5.3.14.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-jcl\5.3.14\spring-jcl-5.3.14.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-expression\5.3.14\spring-expression-5.3.14.jar;C:\Users\Administrator\.m2\repository\org\springframework\spring-web\5.3.14\spring-web-5.3.14.jar" com.suns.Netty02Copy.MyNettyClient 2022-10-30 16:58:51.281 [main] DEBUG i.n.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework 2022-10-30 16:58:51.295 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16 2022-10-30 16:58:51.318 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 2022-10-30 16:58:51.318 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 2022-10-30 16:58:51.322 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false 2022-10-30 16:58:51.322 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512 2022-10-30 16:58:51.336 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows 2022-10-30 16:58:51.338 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false 2022-10-30 16:58:51.338 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8 2022-10-30 16:58:51.339 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available 2022-10-30 16:58:51.339 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available 2022-10-30 16:58:51.339 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available 2022-10-30 16:58:51.339 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available 2022-10-30 16:58:51.340 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true 2022-10-30 16:58:51.340 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9 2022-10-30 16:58:51.340 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available 2022-10-30 16:58:51.340 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available 2022-10-30 16:58:51.341 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\ADMINI~1\AppData\Local\Temp (java.io.tmpdir) 2022-10-30 16:58:51.341 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model) 2022-10-30 16:58:51.342 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3771203584 bytes 2022-10-30 16:58:51.342 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1 2022-10-30 16:58:51.342 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available 2022-10-30 16:58:51.342 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false 2022-10-30 16:58:51.348 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available 2022-10-30 16:58:51.683 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 4328 (auto-detected) 2022-10-30 16:58:51.687 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false 2022-10-30 16:58:51.687 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false 2022-10-30 16:58:51.874 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1) 2022-10-30 16:58:51.874 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200 2022-10-30 16:58:52.092 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 2c:6d:c1:ff:fe:77:07:9f (auto-detected) 2022-10-30 16:58:52.101 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple 2022-10-30 16:58:52.102 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true 2022-10-30 16:58:52.127 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023 2022-10-30 16:58:52.133 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled 2022-10-30 16:58:52.133 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0 2022-10-30 16:58:52.133 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384 2022-10-30 16:58:52.159 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096 2022-10-30 16:58:52.159 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2 2022-10-30 16:58:52.159 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16 2022-10-30 16:58:52.159 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8 2022-10-30 16:58:52.168 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true 2022-10-30 16:58:52.168 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true 2022-10-30 16:58:52.169 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7833ea7f
2:回顾代码中的日志问题
我们并没有在项目中添加Logback的配置文件,那么为什么可以打印出来日志呢?
只要我们在POM文件中添加了门面和Logback的jar包,Netty会自动识别出来,基于默认的配置将日志i打印出来,当然我们可以通过添加的配置进行干预。
添加一个配置文件。
<?xml version="1.0" encoding="UTF-8"?> <configuration> <!-- 控制台输出 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> <!-- <logger name="com.suns" level="DEBUG" additivity="false"> <appender-ref ref="STDOUT"/> </logger>--> <root level="DEBUG"> <appender-ref ref="STDOUT"/> </root> </configuration>
通过这个外部的配置文件对整个项目的日志配置进行干预,从外部覆盖Netty配置的。
通过配置Logger标签控制日志数据,name控制的是哪个包下的日志按照此控制输出,level日志级别,additvity是否向上传导向上不向上,false只关注本块内容,true的话,把root的也带上。
二:EventLoop
1:EventLoop关系映射
EventLoop: 1:时间(Accept READ WRITE) 循环(死循环 ) 2:曾经讲过的worker 特点一:一定是一个独立的线程,在线程的run()方法中执行操作 特点二:通过死循环 监控状态进行操作 while(true){ selector.select SelectedKes进行遍历 } 特点三:Select一定会被封装到里边,所以我们可以把它看作worker即可。
2:NioEventLoop继承关系
如何快速了解一个类是做什么的?
1:查看这个类的注释
2:查看这个类的继承关系
如何快速查看一个类的继承关系
查看一个类继承关系的快捷键(打开他的UML结构的快捷键):光标在这个类上+Ctrl+Shift+Alt+U,打开之后,最下边的就是这个类了,右键使用Copy Diagram to Cliapboard即可以图片的形式粘贴出来
继承关系说明:
1:Executor:代表这是一个线程池
2:SingleThreadEventExecutor:单个线程的线程池,在线程池当中只会创建一个线程,咱们Worker那个类也是封装了一个线程,本质上来讲,跟这个是一样的。
3:EventLoop:UML图中选中这个类,F4可以查看代码,是一个接口注册后处理Channel所有的IO操作,一个EventLoop实例通常会处理多个Channel,单着可能取决于具体的实现和内部结构
从上述两个继承的父类,就可以知道咱们这个子类的功能是什么。
3:NioEventLoop源码初析
既然是一个线程一定是有Run方法,在run方法执行死循环操作,Alt+7找到Run方法
@Override protected void run() { int selectCnt = 0; for (;;) { ....... } }
run方法当中确实有死循环。从代码的角度剖析就可以知道现在的EventLoop就等同于咱们的Worker,当然也等同于咱们的Boss
Woker线程:select —> READ WRITE
Boss线程 :select —> Accept
这就呼应了,为什么我们在编写代码层面上只提供了一个组。因为组里边都是这些EventLoop就可以完成Boss和Workder的活,赶上谁就干谁的活。
ServerBootstrap serverBootstrap = new ServerBootstrap(); //获取设置他的Channel serverBootstrap.channel(NioServerSocketChannel.class); //设置他的GROUP,我们吧有的监控对象Accept,write,read进行多线程的设计 //NioEventLoopGroup是一个管理单位。管理:EventLoop -- > 实际上就是Worker,对应一个线程 //他里边的每一个线程都有一个Selector serverBootstrap.group(new NioEventLoopGroup());
1):获取NioEventLoop
基于构造方法可以吗?不可以!
default修饰的构造方法,包内进行访问(public允许外部使用,default包内访问)
如何创建NioEventLoop对象?
1:不会以构造法方法的形式让程序员创建。
2:通过EventLoopGroup创建
3:EventLoopGroup这个东西才是Netty使用者在编程过程中真正使用的东西
3:EventLoopGroup
使用Netty编程时的API接口,EventLoopGroup来创建和管理EventLoop,他是EventLoop的工厂
创建EventLoopGroup(一个线程的线程池,就是一个线程),多个EventLoop(多个线程)
管理EventLoopGroup
1)EventLoopGroup使用
package com.suns.netty02; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestEventLoopGroup { private static final Logger log = LoggerFactory.getLogger(TestEventLoopGroup.class); public static void main(String[] args) { //创建多个EventLoop(线程),并存起来 //1 通过构造方法 可以指定创建EventLoop的个数(线程个数) //2 默认无参构造 内置创建EventLoop 无参构造,创建多少个EventLoop // 默认创建 (cpu核数 x 2) /* EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2); //拿到一个个EventLoop进行工作。next拿到的是下一个。 EventLoop el1 = eventLoopGroup.next(); EventLoop el2 = eventLoopGroup.next(); EventLoop el3 = eventLoopGroup.next(); System.out.println("el1 = " + el1); System.out.println("el2 = " + el2); System.out.println("el3 = " + el3); */ } }
代码做了点啥
serverBootstrap.group(new NioEventLoopGroup());
Netty内部进行一次又一次的Next方法获取我们的EventLoop来进行时间监听。坚听到之后,调用我们的Handler来进行后续的操作。
4:DefaultEventLoop继承关系
与NioEventLoop一样
4:DefaultEventLoop和NioEventLoop区别
1):run方法区别
DefaultEventLoop方法非常简单,就是在死循环运行Task,就等同于一个空的线程,这个线程的内容是有我们开发来进行设置的。
NioEventLoop的run方法是Selector.select监控IO专题,SelectionKeys各种状态,去各式各样的操作。
后续开发,如果想使用多线程如果是IO监控,应该用NioEventLoop,如果用的是普通的多线程代码那么就是DefaultEventLoop,如果涉及到Selector设计到状态监控,那么就应该用NioEventLoop
区别总结:
1:NIO 是一个线程,IO Write Read 事件监控
2:DefaultEventLoop 就是一个普通的线程,工作内容可以由程序员决定,不做IO监控
注意事项:后续Netty进行多线程开发,推荐大家优先考虑DefaultEventLoop
5:DefaultEventLoop使用
EventLoopGroup defaultEventLoopGroups = new DefaultEventLoopGroup(); EventLoop defaultEventLoop = defaultEventLoopGroups.next(); //submit就是我们要干的事。 defaultEventLoop.submit(() -> { log.debug("hello"); });
2022-10-30 23:52:24.960 [defaultEventLoopGroup-2-1] DEBUG com.suns.netty02.TestEventLoopGroup - hello
从日志中可以看到,确实不是main线程。
6:注意事项
1.EventLoop是会绑定Channel,当客户端请求到达一个EventLoop之后, 后续所有的交互都会由他完成,不会在交由其他的EventLoop完成,所以肯定就绑定了Channel,客户端访问了服务端与任何一个EventLoop建立联系之后,后续所有操作都会对应这个EventLoop。
2.EventLoop支持多个Channel访问,也就是多个客户端对一个EventLoop
3.服务端进行EventLoop的分工,主从式的Reactor模式(设计模式可以用Nio、Netty…实现,我们注重的是思想),Reactor模式当中只能有一个Boss(正在工作的只能有一个),也有多个Worker,我们可以如下进行切分。
服务端代码:
package com.suns.netty02; public class MyNettyServer { private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class); public static void main(String[] args) { //主 accept 1 EventLoopGroup bossEventGroup = new NioEventLoopGroup(1); //从 worker IO 多个 EventLoopGroup workerEventGroup = new NioEventLoopGroup(3); //获得DefaultEventLoop DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); //select accept write read 多线程的设置 //线程池 EventLoop --> worker //serverBootstrap.group(new NioEventLoopGroup()); //这行是关键........ serverBootstrap.group(bossEventGroup, workerEventGroup); //接受到 写数据前 //handler进行处理。piplline--handler响应处理 //handler使用 核心内容 serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override //read write channel // channel ---> accept 建立连接的操作 protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(defaultEventLoopGroup, new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("{}", msg); } }); } }); serverBootstrap.bind(8000); } }
客户端代码:
public class MyNettyClient { private static final Logger log = LoggerFactory.getLogger(MyNettyClient.class); public static void main(String[] args) throws InterruptedException { log.debug("myNettyClientStarter------"); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(new NioEventLoopGroup()); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }); //connect使用了一个新的线程,用于连接... ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000)); //为什么要做同步,阻塞。 future.sync(); Channel channel = future.channel(); channel.writeAndFlush("hello suns"); //System.out.println("------------------------------"); } }
4:客户端请求,对于worker来讲操作非常费时,影响新的客户端使用这个worker,吞吐量减少了。只做接入,费时操作交给其他的线程。提高吞吐量。这在Netty体系来讲,这叫异步化的。也就是将非IO的操作转移到其他线程当中。腾出来这个应用线程资源去提升应用吞吐量。
A:开辟新的线程执行费IO操作
DefaultEventLoop
B:怎么获得非IO操作(如何获取数据呢?)
Netty当中获取数据是通过Handler获取的。通过新线程DefaultEventLoop处理Handler的数据
核心API:
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
服务端代码编写:
public class MyNettyServer { private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class); public static void main(String[] args) { //主 accept 1 EventLoopGroup bossEventGroup = new NioEventLoopGroup(1); //从 worker IO 多个 EventLoopGroup workerEventGroup = new NioEventLoopGroup(3); //获得DefaultEventLoop DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); //select accept write read 多线程的设置 //线程池 EventLoop --> worker //serverBootstrap.group(new NioEventLoopGroup()); serverBootstrap.group(bossEventGroup, workerEventGroup); //接受到 写数据前 //handler进行处理。piplline--handler响应处理 //handler使用 核心内容 serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override //read write channel // channel ---> accept 建立连接的操作 protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); //通过新线程DefaultEventLoop处理Handler的数据 ch.pipeline().addLast(defaultEventLoopGroup, new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("{}", msg); } }); } }); serverBootstrap.bind(8000); } }
运行结果当中会有三个线程:
解码和监听读写有啥区别?
上节课回顾
1:EventLoop是包含一个线程的线程池,集成了Selector负责IO操作,监控Wirte和Read动作。
2:EventLoopGroup是EventLoop的创建和管理工厂,默认创建的EventLoop是Cpu的核心 x 2我们也可以自定义ELG的创建EventLoop的创建个数
今天依旧是细化的工作,今天将的是异步的概念,
三:异步
异步这个概念的理解:Netty中理解异步概念十分重要,异步就代表了多线程,Netty通过异步帮我们做了什么事情?
之前的客户端的代码:
public class MyNettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(new NioEventLoopGroup()); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }); //connet方法使用了一个新的线程,来开辟连接, ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000)); // 为什么要同步呢?阻塞等待 //我们只有在这里等待链接返回之后,我门才能够进行操作,所以这里必须阻塞一下,等着链接返回,这个过程是同步的。 //所以,这里必须阻塞一下。 future.sync(); Channel channel = future.channel(); channel.writeAndFlush("hello suns"); } }
我们在这里要分析和学习的是这样代码:
ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000)); future.sync();
这行代码做的事是:这个操作开启了新的线程,之后,在这里阻塞了。这行嗲吗要跟服务器进行链接。Netty在这里采用了异步阻塞的形式来获取链接,这行代码会开启新的线程来进行去获取连接。我们把这行阻塞的代码注释掉之后
我们的程序是自上而下运行的,他的程序运行在主线程里边,之后,到了咱们研究的这行代码,这行代码获取连接是异步的操作,这样代码对应到了一个新的线程的开启,通过一个新的线程与服务器端的连接,这个线程NioEventLoopGroup-2-1做的这个事,做完会后,这个代码如果在不阻塞处理下,这个线程会继续往下走,我们的Chnnel连接操作没有完成,可能刚刚启动,哈没有处理完获取连接的操作,这个时候,获取到的Channel是一个没有意义的channel,这个时候,我们去发送数据就发不出去数据。大概率是我们的主线程走的最快,导致我们的数据没发送出去。
解决的方式,就是我们在这里添加阻塞(阻塞我们的主线程),当我们在这里获取到实际的连接对象之后,也就是上一步的连接真正建立起来之后,运行后续的代码。
新的线程:异步进行网络连接
阻塞当前线程:等待上一步连接真正的简历起来,运行后续的功能。
我觉的在这里没意义,这样不和同步一样吗?
同步的这个建立连接的过程都是线程都是非常耗时的,主线程做或者是分出来一个线程来做不都是耗时嘛,所以这么做有意义吗?这里仿佛没有意义。但是Netty采用的还是这样做的,后续,我们会分析这样做的好处。
异步和多线程之间是什么关系?以及有什么区别?
需要明确的是:
1:异步也是多线程编程
2:所谓的异步编程和我们的多线程编程在应用过程中也是有一些区别的
多线程编程:JavaWeb中多个客户端对应的后台线程是平等的关系。
异步多线程:异步多线程来讲,他有一个线程是主要的,除却主要的部分,有复杂的和耗时的操作交给另一个线程来做,这两个线程配合的去完成一些列的操作。而且往往这个主线程还需要这个辅助线程的将结果交给他。
咱们的上述操作就是异步操作。辅助线程完事后还需要返回处理结果给主线程。
除了咱们这种Sync这种异步阻塞的方式还有没有其他的异步,获取辅助线程处理结果的方式呢?
public class MyNettyClient { private static final Logger log2 = LoggerFactory.getLogger(MyNettyClient.class); private static final Logger log1 = LoggerFactory.getLogger(MyNettyClient.class); private static final Logger log = LoggerFactory.getLogger(MyNettyClient.class); public static void main(String[] args) throws InterruptedException { log.debug("myNettyClientStarter------"); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(new NioEventLoopGroup()); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }); //异步 阻塞方式进行 与服务器端连接 ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000)); //future.sync(); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { log1.debug(" add Listerner ....."); Channel channel = future.channel(); channel.writeAndFlush("hello suns"); } }); System.out.println("----------------------------------"); } }
我们通过的是异步回调的方式来处理后续的问题,我们这样怎么能实现后续操作的呢?
建立连接的时候会去创建一个新的线程会获取连接,拿到连接之后,主线程当中完成一个addLister(加监听的操作)这样的操作。新的线程获取到连接之后,在进行一个operationComplete的回调。
这样将获取连接和使用连接进行操作放到一个线程当中,就能保证同步行,这里边使用的是观察者模式。
总结:
1:阻塞主线程 完成异步操作的配合 future.sync(); 2:异步处理 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { log1.debug(" add Listerner ....."); Channel channel = future.channel(); channel.writeAndFlush("hello suns"); } });
只要是异步的操作,只能通过上述的一种来进行处理,后续还有那些操作是异步的呢?这个是Netty当中自己设计的。在Netty当中只要设计到网络或者IO的操作那么Netty都会设计到异步处理?
channel.writeAndFlush()这一步操作也是异步完成的。//所以,如果后续有关联性操作的话,这里也应该sync一下。 channel.connect() channel.close();
所以,这两种都需要使用以上的两种方式来进行处理,把耗时操作交给其他线程来做,可以让当前线程效率更高一点。更能释放主线程一点。所以我们更推荐第二种。
Netty为什么把网络的、IO的做成异步的?
1:提高系统的吞吐量
2:效率上的提高 但是决不是1+1 = 2的,可能是1+1 = 1.5
什么叫做提高吞吐量呢?或者为什么就提高吞吐量了呢?
Reactor模式的主从版的架构:
正常来讲是我们客户端发请求,找到Worker来处理,最种返回给客户端,如果这次请求客户端的操作十分耗时的话,这样我们Worker可以服务客户端的能力就降低了。
但是现在我们客户端请求过来之后,我们Worker线程进行处理,将耗时操作交给其他的线程进行处理。这样Worker可以复用更多的人了,所以异步化最大的有点就是提高了吞吐量。
Netty是异步的么?
开篇就讲过Netty是异步的,很多操作都是异步的。所有涉及到网络和Io的相关操作都会进行异步处理。对于这些操作我们需要执行两种操作,要么主线程阻塞,要么新线程执行回调。
1:Netty中的异步设计(内部原理)
Netty使用了JDK当中完成了对于异步操作的设计(Future) Netty当中也提供了一个Future,再次基础之上,Netty当中又引入了一个新的技术Promise
我们编程中用的最多的就是ChannelFuture,当我们知道一个操作如果返回值是这个,那么就意味着这个操作是异步化的。
在Netty的内部来讲,他遵循了一个体系,首先集成了JDK的Future,在此基础上又对Promise做了拓展。在后续的Netty实践中Netty用的最多的是这个Promise因为这个是最下层,他的功能是最全的,是最强大的。所以Netty实践中使用的最多的就是Promise,很多时候,变量都是基于ChannelFuture进行声明,但是实际上赋值的都是Promise,Promise的功能是Netty异步化封装过程中最强大的,他是整个继承链当中的最下层,所以他的用的最多。
JDK Future Netty Future Netty Promise
2:JDK Future
JDK的Future往往会伴随线程池使用,什么时候考虑使用JDK的Future呢?就是完成异步化处理,而且Futrue将处理完的记过返回给主线程,让主线程进行分析使用。我们在submit当中提交一个Cabble接口的实现方法,而不要去实现Runnable因为这个没有返回值,这个只能由Callable才有返回值,里边的泛型是返回值的类型。
通过Future当中的get方法获取返回值,在主线程或者调用这线程当中处理返回值。JDK Future处理过程中 异步操作的处理 只能通过阻塞的方式完成。
开启一个线程执行异步操作的时候,新线程极有可能先执行完,所以我们的future.get()是一个阻塞的操作。只要是异步处理一定会伴随着阻塞(这个得加上一个获取异步化处理结果的操作,这样才一定是阻塞的)。
public class TestJDKFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { /* JDK什么情况下考虑使用Future 异步化的工作 Future 启动一个新的线程进行处理。最后把处理结果返回给主线程(调用者线程) JDK Future处理过程中 异步操作的处理 只能通过阻塞的方式完成。 */ ExecutorService executorService = Executors.newFixedThreadPool(2); Future<Integer> future = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("异步的工作处理..."); TimeUnit.SECONDS.sleep(10);//单位能直接设计成秒 return 10; } }); log.debug("可以处理结果了..."); log.debug("处理结果 {}",future.get());//阻塞的操作.. log.debug("------------------------------------"); } }
执行结果:
2022-10-31 22:23:52.831 [main] DEBUG com.suns.netty03.TestJDKFuture - 可以处理结果了... 2022-10-31 22:23:52.831 [pool-1-thread-1] DEBUG com.suns.netty03.TestJDKFuture - 异步的工作处理... 2022-10-31 22:24:02.846 [main] DEBUG com.suns.netty03.TestJDKFuture - 处理结果 10 2022-10-31 22:24:02.849 [main] DEBUG com.suns.netty03.TestJDKFuture - ------------------------------------
以上的流程是JDK为我们提供的最为原始化的异步处理。Netty中的sync和JDK中的Future中的get都是阻塞的。但是在JDK的这个处理过程当中,这个异步操作的处理只能通过阻塞的方式完成。那个基于回调的形式,JDK的Future帮我们完成不了。
3:Netty Future
Netty当中的拓展之后的Future不仅仅支持阻塞的异步处理,还支持异步监听处理。这个就是他比原始Future强大的地方。
我们JDK的Future案例当中我们使用了JDK的内置线程池,但是在我们Netty的案例当中,我们无需如此因为Netty对于线程池做了很好的封装。就是我们的EventLoopGroup,当前我们仅仅是简单使用没有必要使用NioEventLoop,我们使用DefaultEventLoop即可。
我们获取到其中的一个线程之后(单线程的线程池),我们使用这个对象提交一个异步操作,异步操作有结果回传,回传对象是一个Future对象(io.netty.util.concurrent.Future)使用方式和JDK的基本一致,都是异步处理之后,获取异步线程的处理结果基于返回值。
基于Netty的Future进行的阻塞异步化的异步处理:(这里孙哥老说:阻塞同步处理,其实也没毛病,因为实际效果确实同步)
get方法是阻塞的
public class TestNettyFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { // EventLoopGroup 使用Netty Future---> EventLoopGroup -- NioEventLoopGroup--> selector 事件的监听 // 异步工作的处理 ,启动一个新的线程 完成操作 DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup(2); EventLoop eventLoop = defaultEventLoopGroup.next(); //如何证明 10 主线程 获得结果. 但是不能证明 这个结果是正常的还是失败呢 // 异步处理的2个问题 // runable接口 ---》 主线程(调用者线程)返回结果 // callable接口 ---》 返回值 也不能准确的表达 结果 成功 还是 失败了。 //io.netty.util.concurrent.Future Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { try { log.debug("异步操作处理..."); TimeUnit.SECONDS.sleep(10); return 10; } catch (InterruptedException e) { e.printStackTrace(); return -1; } } }); //log.debug("可以接受异步处理"); //同步阻塞 处理 log.debug("异步处理的结果...{} ",future.get()); } }
基于Netty的Future监听的方式来实现异步处理:
public class TestNettyFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { // EventLoopGroup 使用Netty Future---> EventLoopGroup -- NioEventLoopGroup--> selector 事件的监听 // 异步工作的处理 ,启动一个新的线程 完成操作 DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup(2); EventLoop eventLoop = defaultEventLoopGroup.next(); //如何证明 10 主线程 获得结果. 但是不能证明 这个结果是正常的还是失败呢 // 异步处理的2个问题 // runable接口 ---》 主线程(调用者线程)返回结果 // callable接口 ---》 返回值 也不能准确的表达 结果 成功 还是 失败了。 //io.netty.util.concurrent.Future Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { try { log.debug("异步操作处理..."); TimeUnit.SECONDS.sleep(10); return 10; } catch (InterruptedException e) { e.printStackTrace(); return -1; } } }); //log.debug("可以接受异步处理"); //同步阻塞 处理 //log.debug("异步处理的结果...{} ",future.get()); //log.debug("--------------------------------------------"); future.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.debug("异步处理的结果...{} ",future.get()); } }); log.debug("---------------------------------------"); } }
异步监听的这中形式更加的底层。这种方式并没有阻塞我们主线程,这段代码是我们客户端代码的一个底层。客户端代码就是这么实现的。
当Netty拓展了JDK的Future之后,Netty的Future既可以通过同步阻塞的方式处理异步结果,也可以通过监听的这种并不需要阻塞主线程的方式去处理异步结果。我们现在将的这个Future是Netty异步处理的底层。
到这我们就完成了Netty的Future的学习,他已经完成了两种处理方式集成。原始的Future只能通过同步阻塞的这种方式去处理异步结果。
4:Netty Promise
Promise继承了Future接口,获取了他的所有的能力,天然便能具有两种方式处理异步结果的功能,他再次基础之上,又做了新的扩充。
1:增强了获取异步处理结果的功能 – > runnable接口的线程能够获取到结果
在JDK和Netty当中,他们都是通过get方法获取call方法的返回值来获取的,这种方法好不好呢?通过callable接口实现的call方法获取了异步的返回的结果有如下几个问题:没有callable编写异步,那就没结果可获取了。如果Runnable接口我就想让他返回返回值,该怎么办呢?我们在Promise当中实现了这个功能。Callable和Runnable都能获取返回值,这个就是Promise的拓展核心。
2:返回值 不能精确的表达,子线程是否是成功执行还是执行失败了。
Netty当中使用了Promise来解决的这个返回结果的封装。承诺到底是失败了还是成功了。
这个玩意在Runnable接口这种没有返回值的情况下都能搞出来返回值。
Promise在获取异步化的操作过程中的编程:
EventLoop eventLoop = new DefaultEventLoop().next(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
我们的Promise对象在Runnable接口的异步线程当中获取返回值结果:
public class TestPromise { public static void main(String[] args) throws ExecutionException, InterruptedException { EventLoop eventLoop = new DefaultEventLoop().next(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop); new Thread(()->{ log.debug("异步处理...."); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } promise.setSuccess(10); }).start(); /*log.debug("等待异步处理的结果"); log.debug("结果是 {}",promise.get()); log.debug("----------------------");*/ promise.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.debug("等待异步处理的结果"); log.debug("结果是 {}",promise.get()); } }); log.debug("---------------------------------"); } }
上述代码采用的是线程回调和同步阻塞的方式来获取异步线程执行的结果。而且这个异步线程还是使用Runable接口来实现的。并且这个结果能标识他的返回结果的一个增强。
执行结果:
2022-10-31 23:22:41.828 [Thread-0] DEBUG com.suns.netty03.TestPromise - 异步处理.... 2022-10-31 23:22:41.830 [main] DEBUG com.suns.netty03.TestPromise - --------------------------------- 2022-10-31 23:22:51.847 [defaultEventLoop-1-1] DEBUG com.suns.netty03.TestPromise - 等待异步处理的结果 2022-10-31 23:22:51.847 [defaultEventLoop-1-1] DEBUG com.suns.netty03.TestPromise - 结果是 10
处理结果分析:
为什么横线能在最前边打印?
因为他是同步的,在主线程当中,大概率先执行。
以上代码才是真正的Netty底层处理异步操作的核心,他就是借用了Promise的这种方式。完成了相应的处理。
客户端获取连接也是采用Promise的方式获取了连接。Promise不仅仅帮我们获取到了连接,还告诉了我们这个操作是成功的。
咱们后续编程基本用不到Promise,因为Netty内部大量用的是Promise,所以我们必须研究。但是后续我们编程使用的是Handler,这是我们以后工作的重心。
总结:
IO /网络 /Socket Netty设计的都是异步
异步的处理有两种:同步阻塞 or 异步回调,通过Promise做的。封装了上述两种功能,并对返回结果做了,增强。
异步操作
1:什么是异步操作,和多线程的关系
2:Netty的异步操作是如何体现的?IO 操作和Socket都设计成了异步操作,可以提高系统吞吐量
3:Netty处理异步的返回?sync和addLister
4:Future --> Future (sync,addListener) --> promise
我们在这里加监听,是为了我这个异步线程监听,但是,会不会有这种情况,异步线程先执行,后运行了主线程当中添加监听器的这个操作,这个异步线程已经完成了,之后我们的主线程才执行添加监听的操作。这不就晚了吗,还能起到对应的效果吗?
这种情况是可能发生的,但是我们的结论是,即使我们的异步线程先与我们的添加监听的操作运行完了,我们在之后再为其添加监听的操作,这样是可以的是没有问题的。Netty已经帮我们处理的非常好了,addListener源码中,查看Promise这个实现类(debug断点到这查看下类型,F5执行快捷键计算),我们找到DefaultLiatener当中的listener里边有两行很重要的代码:
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); //加监听器,监听异步处理,在异步线程处理完成之后,触发特定函数operationComplete synchronized (this) { addListener0(listener); } //异步线程如果执行完成了 isDone返回我们的TRUE。唤起我们的监听器把我们的操作走一遍。 if (isDone()) { notifyListeners(); } return this; }
四: Channel
1:Channel回顾
ServerSocketChannel SocketChannel这两个Channel都是JDK为我们提供的,Netty将原有中的JDK中的Channel进行了统一的封装。
封装的概念,或者说为什么要进行封装?
1:对原有功能进行增强,Netty对于Channel结合自身框架特点,对Channel进行了增强
2:屏蔽差异化,
Netty去封装Channel主要有两个原因:
1:统一了编程接口,统一了编程模型。通过他的封装不在让客户区分SocketChannel和ServerSocketChannel,统一编程模型这个事在框架设计中十分的重要。如果提供一个新的功能就是一个新的套路,这个框架用起来没有任何生命力,包括版本向下的兼容都是统一编程模型。这个谁在整个社区当中做的最好?Spring,另外Spring也统一了变成模型,我们如果对一个Bean进行加工的话我们的用BeanPostProcessor来去做加工,作为Spring自己辣酱,他自己做Aop的时候,直接也用这个,这就是变形模型的统一。Netty当中也统一了编程模型,这样在客户端和服务器端都是通过一样的变成方式来讲我们的Channel添加进去。这就叫做统一的编程模型。
2:Netty一旦封装了Channel之后,可以更好的和Netty框架结合起来,比如他自己的IO模型,比如他自己的PipeLine可以非常好的进行整合,甚至可以进行配置一些Channel的Tcp的参数,包括缓冲区和滑动窗口,这些在原有的两个Channel是没有办法设置的。这就是封装了的好处。
2:Channel提供的API
1):writeAndFlush和write
1:Netty封装了Channel之后,提供了那些API(方法)
channel.writeAndFlush() //写出去之后,会刷新我们的缓冲区,数据写出去了。 channel.write() //调用完之后不会立即发出去,会存在缓冲区当中,等到我们手动flush之后
代码如下:
客户端代码:
public class MyNettyClient { private static final Logger log = LoggerFactory.getLogger(MyNettyClient.class); public static void main(String[] args) throws InterruptedException { log.debug("myNettyClientStarter------"); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程 bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new StringEncoder()); } }); Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel(); channel.write("xiaohei"); } }
服务端代码:
public class MyNettyServer { private static final Logger log2 = LoggerFactory.getLogger(MyNettyServer.class); private static final Logger log1 = LoggerFactory.getLogger(MyNettyServer.class); private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class); public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(new NioEventLoopGroup()); serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override // protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("handler1", new ChannelInboundHandlerAdapter() { @Override //ByteBuf //自己开发了一个StringDecoder public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log1.debug("handler1"); ByteBuf buf = (ByteBuf)msg; String s = buf.toString(Charset.defaultCharset()); System.out.println(s); //super.channelRead(ctx, s); ctx.fireChannelRead(s); } }); pipeline.addLast("handler2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log1.debug("handler2 ctx ... {} ",msg); super.channelRead(ctx, msg); } }); pipeline.addLast("handler3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log1.debug("handler3"); //super.channelRead(ctx, msg); } }); } }); serverBootstrap.bind(8000); } }
测试时发生的一个非常有意义的事情分析?
当我们在某一处打断点,并且讲所有的断点的阻塞设置为All的时候,我们主线程位置上某一处断点出进行吧停顿,也就是阻塞状态,然后我们拿到Channel对象之后,我们执行channel.wirte(),经过多伦的write之后,我们最后在expression当中我们执行flush,数据理论上是服务端可以收到的,但是实际上没有收到,这是为什么?因为我们设置了线程阻塞是基于All的,这样我们主线程阻塞了,我们的write和flush这些操作都是异步处理的,换句话说在Netty当中都会重新开启一个线程去进行处理,这样我们阻塞这主线程,异步处理线程也会处于阻塞状态当中,所以,这些操作服务端根本就执行不了。
我们从这里获取的教训,调试Netty的时候,我们一定要基于Thread的阻塞方式。另外我们的write方法确实将数据写入到了缓冲区,然后flush之后才会将数据写出。
2):close()
作用:使用完了,就会将Channel的资源释放掉。释放socket资源。
客户端代码:
public static void main(String[] args) throws InterruptedException { log.debug("myNettyClientStarter------"); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程 bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new StringEncoder()); } }); ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000)); future.sync(); Channel channel = future.channel(); //channel.writeAndFlush("xiaohei").sync(); channel.writeAndFlush("xiaohei"); //System.out.println("-----------------------------"); channel.close(); System.out.println("---------关闭执行之后的一些操作-------"); }
服务端代码同上:略
我们往往需要在Close方法执行完毕之后,会有其他的资源的释放,做其他的事。都得有一个前提,必须得在close方法执行之后,那么这个channel的关闭在主线程关闭可以不可以。
我们知道,Netty当中网络IO这样的操作,Netty都会进行异步的处理,我们Close的操作也是网络相关的操作,这样也必须进行异步的操作。如果异步化操作之后,这样就启动了一个新的线程,然后后边的代码是在主线程当中完成的,这样Channel还没操作呢,后边操作就执行了,这样是不允许的。
关CHannel这件事是一件很简单的事,但是关闭完成之后,其他的操作需要动点脑筋了:同步阻塞或者异步回调。
public static void main(String[] args) throws InterruptedException { log.debug("myNettyClientStarter------"); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程 bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { //这个hander可以监控Socket所做的事。 ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new StringEncoder()); } }); ChannelFuture future = bootstrap.connect(new InetSocketAddress(8000)); future.sync(); Channel channel = future.channel(); //channel.writeAndFlush("xiaohei").sync(); channel.writeAndFlush("xiaohei"); //System.out.println("-----------------------------"); ChannelFuture close = channel.close();//异步化操作 启动一个新的线程 //其他资源的释放,其他事,close()方法执行完之后,运行后面这些代码 //main主线程完成 //close.sync(); close.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { log.debug("channel.close()执行完后,操作后续的一些工作...");//不行 } }); //优雅的关闭。 //结束当前的client //所有的线程都结束处理之后,才会关闭client //32 1 channel操作 31 //linux kill -9 eventLoopGroup.shutdownGracefully(); }
我们使用 控制台选中某一行日志的一部分,邮件grep可以进行筛出来。日志截图如下:
从这里看到Channel关闭之后,阻塞才解除,然后main线程在执行后续操作。
我们使用异步回调也是同样的效果:给这个异步线程加异步回调,这样一定能保证是在子线程当中执行完close方法之后,会回到这个回调方法。这样在子线程当中完成了一个顺序性操作的保证。
我们可以清楚的在两种方式中,操作是在哪个线程当中完成是不一样的。具体看截图就好。
另外,当一个方法的返回值是xxxFuture的时候,这就意味着这一定是一个异步的操作。
以后我们如果必须要在writeAndFlush之后来执行一个什么什么操作的话,我们可以借鉴这两种方式。这是我们的正常操作。
Close就是当我们现在已经没有必要跟服务端进行通讯了,我们就必须执行一个close方法,关闭这个通道。释放Socket资源。当我们关闭之后,只是意味着我们的客户端不能和我们的服务端进行通讯了,我们的客户端还是再跑,这是因为我们后台还有线程在运行。我们可以通过关闭Debug窗口中的红灯的方式来关闭彻底关闭这个客户端,
为什么我们的客户端不像之前的main函数那样运行完代码之后就直接程序运行结束了,那是因为在Netty的设计当中他会考虑到这样的一个因素,在整个Netty当中维护了一个个线程池,当我们的线程池分配给我们一个线程来进行IO操作,这个时候还有会31个线程(16核),这些线程资源有可能做一些其他的操作,所以这是程序还在运行的原因。如果我确实想关闭我们的客户端怎么办
2):shutdownGracefully
优雅关闭。等eventLoopGroup 当中所有的线程都处理完成之后才会关闭整个的客户端,并且保证所有的线程都结束操作。
优雅就是所有的事我都能兼顾的到。
Linux当中关闭进程的命令:kill -9 进程号,idea当中的关闭就是优雅的,不然idea关闭了,但是Tomcat当中的实例海活着,这样在启动的时候不就占用了端口了。
五: Handler
Handler对于我们使用Netty进行开发的人员是最重要的
1:什么叫做Handler
当我们创建了服务器,设置了SSC之后,设置了我们的NIOEventLoopGroup之后,就通过这里边提供的线程为拿到了一个Boss和Worker来接收客户端的链接和读写,服务端就跑起来了。客户端一点进行了连接之后,后续就需要将工作交给Handler来处理了,实际上当我们的客户端连上我么你的服务端之后,实际上后面要进行IO操作,IO操作是有Handler来操作,Hadndler是一组,基于一种各司其职的思想,不论是客户端还是服务端,数据流过来之后,我们经过一系列的数据的处理,我们就可以进行后续的业务的开发。
我们的一系列Handler基于PipeLine整合在一起,PipeLine是他的管理者
作为我们的Handler来讲他是有方向的:读入数据的Handler和写出数据的Handler
读入数据的Handler:ChannelInboundHandlerAdapter都是他的子类
写出数据的Handler:ChannelOutboundHandlerAdapter我们使用他的子类
2:读数据多个Handler配合使用
package com.suns.netty04; public class MyNettyServer { private static final Logger log1 = LoggerFactory.getLogger(MyNettyServer.class); private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class); public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(new NioEventLoopGroup()); serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override // protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("handler1", new ChannelInboundHandlerAdapter() { @Override //ByteBuf //自己开发了一个StringDecoder public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler1"); ByteBuf buf = (ByteBuf) msg; String s = buf.toString(Charset.defaultCharset()); System.out.println(s); //super.channelRead(ctx, s); ctx.fireChannelRead(s); } }); pipeline.addLast("handler2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler2 ctx ... {} ", msg); super.channelRead(ctx, msg); } }); pipeline.addLast("hadler4", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log1.debug("handler4"); super.write(ctx, msg, promise); } }); pipeline.addLast("hadler5", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log1.debug("handler5"); super.write(ctx, msg, promise); } }); pipeline.addLast("handler3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler3"); //super.channelRead(ctx, msg); //ch.writeAndFlush("hello suns"); ctx.writeAndFlush("hello xiaojr"); } }); pipeline.addLast("hadler6", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log1.debug("handler6"); super.write(ctx, msg, promise); } }); } }); serverBootstrap.bind(8000); } }
以上都是我们通过适配器模式来完成的一个个自定义的Handler
Handler是有EventLoopGroup当中的线程执行的,而且同一个客户端还会过来找同一个线程对象。
接口回调来帮我们做的,这个数据是MSG就是客户端帮我们写过来的数据。只不过这时候,歇过来的数据,我们接收的时候还是ByteBuffer当中,当然在Netty当中,我们叫做ByteBuf
多个Handler之间如何进行数据传递?
super.channelRead(ctx, s);//将把一个Handler处理完的数据交给第二个Handler
在我们多个Handler之间,我们第一个Handler的数据会交给第二个Handler。这个数据是怎么传递的呢?super.的这个方法就是把一个Handler处理完的数据交给第二个Handler,第一个参数是Handler整个的上下文一个环境数据,第二个参数msg代表着我要带给下一个Handler的数据,而下一个Handler是通过他Handler当中的Object msg接收。
那为什么下一个还是byteBuf呢?因为我们传递的就是原始的ByteBuf。
最后一个Handler是不需要调用super的。
这是一个典型的责任链的设计模式,把一个工作分作一个链条。把数据进行不同的加工。最开始我们接触这种责任链是在过滤器当中,还有拦截器。包括大数据中的数据清洗。
Handler的作用
1:用于处理接收或者发送前的数据。
2:通过PipeLine把多个Handler有机的整合成了一个整体。
读取数据:ChannelInBoundHandler 子类
写出数据:ChannelOutBoundHandler 子类
3:PipeLine中 执行Handler有固定的顺序(类似一个栈)双向链表。
4:Handler当中我们如果想要传递数据,我们使用super.channelRead来传递数据,最后一个无需传递时,无需调用。
5:super.channelRead()方法的真正操作是这个方法。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
只有这个ctx才具有传递数据的能力,因为这个Ctx是上下文是环境,什么是环境:环境指的就是,这些Handler他们生活的一个环境,也管ByteBuf,也管Channel
6:多个Handler依次顺序执行,在Netty执行的时候默认会为我们添加两个Hander分别是head和tail也就是:head --> handler1 --> handler2 – handler3 --> tail
写模式:
Handler读数据还是写数据都就是由两个类型来决定的。
作为PipeLine来讲我们对于Handler的执行顺序是有顺序的,第一个添加的第一个执行。我们在Handler调用过程当中是有数据的传递的。通过调用父类的方法执行super.channelRead(ctx,msg);完成数据的传递,本质上都是通过上下文进行传递的。我们如果想要传递数据,使用两种方法哪个都行,如果我们是最后一个HANDLer无需传递数据,那么就不用调用这个方法了。Netty在我们执行Handler的时候除了会执行我们自写的Handler还会加一个Head和tail,这是在Inbound当中的顺序是这个样子的。
我们今天讲的是往外写的时候,我们的outBoundHandler的写发放和作用。
在实现他的过程中我们需要实现outBoundHandler,实现他的写方法。每一个outBoundHandler都是具体的去细化这些Handler。负责一块内容,当我们的Handler创建好之后,我们会发现我们代码只走了我们的输入的InboundHandler并没有执行我们的out的Handler这是因为我们没有对应的写的操作。
我们需要在最后的一个读的Handler当中执行我们的写的操作。也就是执行wirteAndFlush()这个时候就会往客户端写数据。通过OutBoundHandler这一系列的组合去写数据。输出的Handler输出的顺序并不是我们添加的顺序,而是倒着输出的。6.5.4这样一个反向的顺序,这跟配置的顺序是完全相反的。OutBoundHandler他的编码顺序(配置顺序)和运行顺序是相反的。
我们接下来,我们甚至将输入和输出的Handler的顺序进行打乱,这样会发生什么?这样运行时没有任何问题的。这样操作完运行顺序依旧是:123,654这样的一个顺序。
如果我们在第二个读数据的Handler我们就不往下传数据了,而且直接执行writeAndFlush这样就会造成我们第三个Handler不执行。直接走我们的写的Handler。这样就成了12-654,相当于我们的wirteAndFlush就是我们执行写的Handler的钥匙。如果我们在第二个Handler处我们既不传值也不执行写的操作,那么只会执行:12没有任何后续的操作。
InboundHandler换位置一定会改变顺序,而同种Handler位置不变,不同种顺序改变是不会改变我们的执行顺序的。
我们基于CH(SC)调用我们的writeAndFlush和ctx.writeAndFlush有什么区别?如果我们使用sc这种方式,会全局查找我们的handler而另外一种是从当前handler位置,向前或者叫向上查找输出的Handler有的话执行,没有的话不执行。这是他们的区别。
我们在编写Handler的时候,需要new ChannelInboundHandlerAdapter然后我们去实现他的很多方法,在这里边的方法当中,我们会有可以实现很多方法,我们看这个方法的时候,发现有很多registerd,unregistered这样的方法,这些方法都是用来对channel不同状态的监控,意味着通过在不同channel状态的时机切入到我们的业务代码的当中,去完成一系列的操作。这里边的方法都是一些列的回调方法,我们在监听到状态之后进行回调,切入到我们的业务需求代码当中。
Netty服务端开发和客户端开发区别
客户端:
ServerBootstrap ServerBootstrap = new ServerBootstrap () 这行代码就是创建服务端的启动器。作为这个启动器这是NEtty为我们启动好的,服务端都是围绕这个展开的,这个启动器就会构建整个服务端。 ServerBootstrap.channel(NioServerSocketChannel.class); 设置我们服务端的channel:监听Accept事件,接收客户端的连接。 ServerBootstrap.group(new NioEventLoopGroup()); 创建了多个线程,一方面做获取连接操作,一方面获取读写操作监听,对应BOSS线程和Worker线程。这里边引入了多线程模型只不过是封装好的, 将一个个多线程模型封装程。 ServerBootstrap.childHandler(new ChannelInboundHandlerAdapter(){....}) **在服务端当中childHandler和handler有什么区别?** 我们的服务端要完成两项工作:1.接收请求建立连接2.建立管道进行IO通信。这是我们的服务端特有的两个东西。handler处理的是ServerSocketChannel (泛型也要写成SSC的)而childHandler专门处理的是SocketHandler(泛型是SC的),前者无法替换后者,在我们的服务端体系当中Handler处理的是SC, 所以分配给他NioEventLoop就相当于是BOSS线程,分配给childHandler处理的sc,进行读写操作,相当于是worker线程。 我们原来可以不写handler方法一点也不影响服务器执行为什么,说明这个handler方法已经Netty写好了。也会建立PipeLine,PipeLine当中也可以写 Handler只不过这里边操作,这里边的Netty默认都给我们做了,但是后续有复杂开发的话,我们一定要去指定我们的Handler方法具体的pipeLine操作。 **我们这个childHandler能不能不写?** 不能一定要写,这玩意都不写,这是与客户端实际建立的连接,也是最真实的数据通信,他是核心。很多Netty提供的API当中很多是加child对应的还有没有 加child的,我们就知道加child的一定是为sc服务的 **PipeLine是只会创建一个么?** 在整个Netty体系当中,一个SC会对应一份PipeLine。他们彼此之间是互不干扰的。
客户端:
Bootstrap Bootstrap = new Bootstrap () 站在客户端角度就无需SSC就不需要ServerBootstrap 这个启动器而是使用一个Bootstrap 这个启动器就行了,这两个是父子关系。 Server是在父类上做了扩展,可以支持SSC的处理。 客户端的多线程体现在1.建立连接2.等待操作。
一个EventLoop支持多个SC访问,一旦连接上之后,SC都只会找一个EventLoop。
如何快速优雅的测试服务端?
我们对一些东西做mock,对一些东西做模拟测试。Handler的测试我们也可以基于Mock做测试?
public class TestEmbededHandler { private static final Logger log = LoggerFactory.getLogger(TestEmbededHandler.class); public static void main(String[] args) { ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() { @Override //ByteBuf public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("h1 {}",msg); super.channelRead(ctx, msg); } }; ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("h2"); super.channelRead(ctx, msg); } }; ChannelInboundHandlerAdapter h3 = new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("h3"); super.channelRead(ctx, msg); } }; ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("h4"); super.write(ctx, msg, promise); } }; ChannelOutboundHandlerAdapter h5 = new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("h5"); super.write(ctx, msg, promise); } }; ChannelOutboundHandlerAdapter h6 = new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("h6"); super.write(ctx, msg, promise); } }; EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4, h5, h6); //channel.writeInbound("xiaohei"); //channel.writeOutbound("xiaojr"); channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("sunshuai".getBytes())); } }
ByteBuf 是对ByteBuffer的封装,封装就意味着功能的增强
1:自动扩容
2:读写的指针方便操作,ByteBuffer没有读写指针的,也不需要切换我们的读模式和写模式。
3:ByteBuf的引入了内存的池化(线程池,连接池)可以复用内存,不然创建内存池销毁内存池是很浪费时间的
4:0copy的相关内容。
1)没有虚拟机内容进行参与,直接由读写的高速缓存直接到
2)严重依赖于Linux版本。Netty当中0拷贝赋予了更多了含义,尽可能的少占用内存。
六:ByteBuf
Netty当中的ByteBuf如何使用:
public class TestByteBuf { public static void main(String[] args) { //如何获得ByteBuf ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); //默认的ByteBuf 默认256 最大的内存空间Integer最大值 21亿 //ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer(); buffer.writeByte('a'); buffer.writeInt(10); buffer.writeInt(11); buffer.writeInt(12); //13个字节 buffer.writeInt(13); System.out.println(buffer); System.out.println(ByteBufUtil.prettyHexDump(buffer)); } }
扩容的最大值就是Integer的最大值Byte。ByteBuf提供了一个调试类,prettyHexDump以16进制的方式将数据打印出来。
我们通过toString方法看到的具体的值。整数用四个字节来存。double使用八个字节来存。所以:
bufferwirteByte(‘a’);buffer.write(10)占用了五个字节空间。
buffer.write('a') buffer.write(10) buffer.write(10) buffer.write(10)
这样就占用13个字节了,占用13个字节就需要扩容了。扩大到了16,默认的扩充最大值是Integer的最大值。
占用空间说明:我们往Byte当中写什么数据,占用多少空间是看我们的Java的数据类型的
Byte:1
short:2
int :4
long:8
String:得看具体的字符编码集
我们可以自定义N多个Handler,Netty会为我们家头和尾巴
在PipeLine当中运行的时候会不会都会帮我们执行呢?
输入的时候会不会走head和tail,输出的时候会不会走head和tail?输入的情况下head和tail都会执行,输出的情况下,只走head
tail的那个handler继承了ChannelInBoundHandler,是一个内部类,TailContext,所以输入的时候一定会走tail但是输出的时候,一定不会走tail,当我们输出的时候不走这个
head对应的handler叫做HeandContext,是一个内部类,HeadContext,Head实现了两个接口:ChannelOutBoundHaneler和
ChannelInboundHandler,输入的时候第一个走他,输出的时候最后一个走他。
因为这两个类只在Pipeline当中使用,并不需要给其他的服务暴露,所以这两个类设计成了内部类。
ByteBuf来讲是我们涉及到的数据的存储,Netty网络通信果过程中底层数据存储ByteBuf
ByteBuf是对ByteBuffer的封装。
1:自动扩容
2:读写指针分离方便操作
3:内存的池化
4:0拷贝
1:ByteBuf基础使用
我们的ByteBuf是不能直接去new的,都是通过工厂的方式来创建的。
1:自动扩容
2:可以指定初始化长度(10Byte)
3:不指定的话默认值是256的Byte
4:最大的空间是Integer.MAX,当然这只默认值,我们可以在构造方法中去指定这个值。这个值我们最好去指定,
一个客户端对应一个ByteBuf,我们一定好指定这个最大值,一个没事并发上来之后,多个客户端都动态库容,我们的
JVM内存很快就消耗没了。
ByteBuf buffer = ByteBufAllocator.Default.buffer(10);
###2 :自动扩容的规律
4的N次方,初始长度是10的话,我们往数据当中去写数据内容。主要指的是:4 – 16 – 64 这里的规律是4的N次方,超过
64之后就是在这个基础上X2进行,也就是翻倍,这就是Netty的扩容规律。
ByteBufUtil.prettyHexDump(buffer)获取当前长度。
这种机制也是为了前期担心扩容不够,后期担心扩容太狠,娶了这么一个折中的办法。自动扩容,一定是扩充到最大的ByteBuf的值就完了。
2:ByteBuf与内存关系
原始的ByteBuffer有两种内存使用方式,一种是堆内存和直接内容使用,这两个被Netty的ByteBuf继承过来了。尤其是直接内存来讲,
直接内存会减少一次内存和用户内容的切换和数据拷贝,直接做了内存映射,做了0拷贝这样效率会很高。直接内存的创建和销毁的代价
是很高的,因为他需要沟通操作系统,脱离了虚拟机,对我们的GC压力小了很多,我们的堆内存创建和销毁代价相对较小,读写效率低,多一次数据拷贝,同时呗GC管理,GC压力较大。
3:堆内存和直接内存的创建
ByteBuf buffer = ByteBufAllocator.Default.buffer(10);我们把这个对象打印一下。
这里创建的是直接内存。还有其他的方式ByteBufAllocator.Default.directBuffer()这两种方式都可以创建直接内存。
ByteBufAllocator.Default.headBuffer(10);这种方式创建的是堆内存。默认情况下,推荐我们使用直接内存。
public class TestByteBuf { public static void main(String[] args) { //如何获得ByteBuf 1. 支持自动扩容 2. 制定byteBuf初始化大小 3.256 3. 最大值 Integer.max,在构造方法中指定最大值 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); System.out.println("buffer = " + buffer); //默认的ByteBuf 默认256 最大的内存空间Integer最大值 21亿 //ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer(); /* buffer.writeByte('a'); buffer.writeInt(10); buffer.writeInt(11); buffer.writeInt(12); //13个字节 */ /* for (int i = 0; i < 513; i++) { buffer.writeByte(1); } System.out.println(buffer); System.out.println(ByteBufUtil.prettyHexDump(buffer));*/ } @Test public void test1() { //为了测试 堆内存 和 直接内存创建 //创建的是直接内存 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); System.out.println("buffer = " + buffer); ByteBuf byteBuf = ByteBufAllocator.DEFAULT.directBuffer(); System.out.println("byteBuf = " + byteBuf); ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.heapBuffer(); System.out.println("byteBuf1 = " + byteBuf1); } }
4:堆内存和直接内存的池化
以上可以看到我们的ByteBuf获取到的内存是池化的,池化的思想可以提升创建效率,服务启动的时候就已经创建好了。
用户请求过来之后无需进行创建,拿来用即可。对于调用者来讲,他的效率提高了。使用了池化的对象之后,我们不用了
会帮我们放回池子里,合理的使用了资源。
Netty在设计ByteBuf的使用他也使用了一些c语言的高效算法来进行内存分配,jemailoc,例如我们的redis也需要一些更高效的内存分配器,可以较少内存碎片。
如果使用了池化,尤其是对ByteBuf使用了池化,可以减少我们的内存溢出的风险(内存不够用),也就代表了我们的内存使用过多,
基于池化思想,来管理ByteBuf,就创建了这么多内存,使用过程中上限内直接拿来用就行,如果不够用那么就需要对你进行控制,这个
跟连接池的控制差不多的。(控制连接的创建次数)使用ByteBuf可以提供很多的好处
总结一下:
1:站在调用者角度,提升了创建的效率
2:合理的使用了资源
3:减少了内存溢出的风险
Netty池化4.1版本后是默认开启的,之前的都是关闭的,也可以通过配置虚拟机启动参数来实现池化的开启和关闭。-D这中参数的配置
在Java体系中很常见,使用应用代码很少见这种配置。
5:ByteBuf内存结构
ByteBuffer读写模式切换,我们需要执行clear(),compact(),flip();
ByteBuf这个体系当中并不是单指针操作了,而是双指针,使用读写指针进行操作,直接操作无需关心读写。
首先有两个概念,初始容量和最大容量,初始容量一般是256,初始容量到我恩的最大容量之间叫做可扩容空间。
另外我们的读写指针都是在我们的0位置。我们写数据的时候我们的写指针往后走,读数据的时候我们的读指针往
后走,读指针能读的范围是读指针到写指针的范围,这就是我们的读写指针,另外一个我们需要格外关注的是,ByteBuf
默认情况下只能读一遍数据,读完了想重复度默认情况下不行,也有其他的办法。
读过的数据叫做被废弃的数据,被废弃的数据在ByteBuf当中还存不存在?是存在的。只不过是不可到达的状态。如果有
新数据的话,就会把老数据给覆盖掉。所以有些MQ的日志是追加日志,因为删除的话,我们首先要查找,找到之后吧内存抹掉,这就
很耗费时间。
6:ByteBuf的API使用
double shift查找类的时候,我们只能在我们的项目中查找到类,这是因为project place如何切换到All place呢?执行一个
ctrl + shift +p进行切换
我们往ByteBuf当中写数据我们可以使用write方法,最常用的是wirteBytes写的是字节,还有一个就是writeInt的时候,占用的是四个字节,double占用的是8个字节。第二种还用set方法,这个set方法可以指定具体位置,覆盖对应的内容。set对应的内容
是不改变写指针的。
读数据的API就是readBytes会把数据读到,get方法也是可以快速读取一个位置的顺序,不改变读指针。
如何重复读取,我们先打一个标记(markReaderIndex),然后重置读指针就可以重新进行读取了(resetReaderIndex)。
重置指针之后,我们就可以重复读了。重复读写数据都可以通过mark和reset开头的方法去重置读写数据。
当我们经验特别多了之后,我们的学东西会越来也越快,这是一个积累的过程。我们ByteBuf提供的read一系列的方法和我们的get方法区别是什么?read和get都能读,read是顺序读,读指针会向下移动,get不是顺序读,不会动指针。我们写数据也是一个意思,write和set方法。write顺序写。
7:ByteBuf内存释放
内存释放是不是内存回收了?清空了,销毁了?
如果我们当前我们的这个ByteBuf是池化的,我们当前的这个内存释放就是byteBuf池子。可以让别人用了
即使不是ByteBuf池化的,释放之后也不一定立即销毁,如果使用的是堆内存,会涉及到垃圾回收。不能拿简单的认为垃圾回收就是销毁。
Netty涉及到内存释放的时候,设计了一种统一的内存释放方式。当然这个统一的内存释放方式是对应编程使用人员来讲的。Netty同意了内存释放接口(Api)
ReferenceCounted(引用计数器) 内存每被引用一次就会加1,创建ByteBuf的时候,ByteBuf是1,使用这个byteBuf的时候就会调用retain方法+1,release方法会导致这法-1。
ByteBuf被实例化的时候,他从引用技术1开始,retain()方法会增加引用技术,release()方法会导致引用技术-1,如果引用计数减少到0,对象会被显示的释放,也就是会被回收,他是通过这样的一种方式统一变成接口的。
ByteBuf释放的时候我们需要关注一个问题,什么时候需要释放呢?在整个Netty的过程当中我们什么时候使用这个ByteBuf呢?谁使用ByteBuf呢?是我们的channel。每一个SC对应一个ByteBuf,映射到我们的Netty当中的时候就是一个Sc对应一个PipeLine,Handler是处理程序,(最好的翻译工具DeepL)Handler处理的是什么东西呀不就是数据吗。实际上就是Handler处理ByteBuf,而且这里边有一个原则,ByteBuf不能够直接传递给我们后续的业务代码。需要转换成一个通用的数据类型,例如:String。也就是ByteBuf只能用在我们的ByteBuf只能用在我们的Handler当中,所以他的创建和销毁都是在我们的Handler当中。他和我们的PipeLine是验证绑定在一起的。
当我们一个数据读进来,我们会经过heand-h1-h2-h3-跳过写出到tail当中。这个时候,我们的ByteBuf当中已经没用了,所以应该在tail当中也就是TailContext当中完成ByteBuf的释放。这还是站在读数据的角度。
那当写数据的会后呢,我们从h6-h5-h4-head这样的一个顺序,所以在我们的HeadContext当中也应该有这样的一个释放内存的顺序。ByteBuf一定只能应用在PipeLine中在Handler中进行释放数据释放最为理想且稳妥,对于tailContext数据会对读到的数据进行ByteBuf的释放,headContext会对写的数据进行ByteBug的释放。结论就是:程序员后续无需关心ByteBuf的释放。真的是这样吗?
有这样的一个问题,这里可以执行release方法的原因就是这里的msg得是ByteBuf,因为只有我们的ByteBuf实现了ReferecConted接口,但是我们这里的不能保证这里一定是ByteBuf,例如我们以写入的时候为例(上边这个图就变成了tailContext的释放代码):我们可能h1就是一个StringDecoder这样的话,我们的msg已经成了String类型的数据。并且还有一个问题,我们在往pipeLine当中进行下游数据传递的时候,我们是不是必须调用一个ctx.fireChannelRead方法进行一个传递。如果我们压根就没有调用这个方法呢?这样这个数据就不往下传递了。
那么这里这个方法如何释放呢?基于这两个原因,要知道tailContext和HeadContext有这个功能,但是我们要记得最后一次使用ByteBuf的时候做这个ByteBuf的释放。tail和head有这个释放的功能,但是我们不一定能走到那。我们程序员需要进行控制。我们指着tail和head 的可能会有问题。所以这个释放还是要程序员去参与,程序员去释放。
创建ByteBuf的时候内存池是怎么申请的呀?
内存没有吃的概念,ByteBuf是池的概念,直接内存的操作会涉及到Native本地方法的使用。为什么集成JemaiLoc来操作呀,这个是谷歌开源的C语言编程的内存管理器,他为我们申请内存释放内存。这个Java程序员是干预不了的,到目前为止Netty当中的ByteBuf的内存池会使用这个东西。redis也会使用这个东西,redis的安装是源码编译的安装。编译redis的时候,make install和make是两个概念。make是做源码编译的。
Linux下安装软件:
1:直接使用yum 或者 rpm安装即可。centos的系统,这是他的应用管理器。
2:二进制的安装,已经编译好的可执行文件,直接解压缩就行了,比如安装tomacat
3:源码编译安装:configure+make(编译c编译程二进制),然后make install,make之后就已经可以执行了,但是只能在当前目录执行,而make install就是将编译好的命令提交到系统的bin里边,可以在系统任意位置执行,这里就是做了一个拷贝。make就是编译,make就是已经可以执行使用了。这就是跟Java的Javac一样,Jdk安装好之后,就可以进行Javac了,但是我们配置了环境变量之后,就可以在任何一个目录执行Javac。
8: ByteBuf分片机制
分片的思路是什么样的一个思路,分片是分成了多个空间,有十个空间,存了10个数,现在我们有一个需求,我们想使用这个ByteBuf当中的部分数据。如果按照常规数据来讲,我们想使用前六个,我们可以copy到新的ByteBuf当中。为了减少copy次数,ByteBuf提供了一个分片机制,不涉及创建新的ByteBuf机制,这样效率不就更高了吗。这就是分片的作用。
实际上所谓的切片实际上就是为我们锁需要的数据提供了一套独立的读写指针。仅此而已。
public class TestByteBuf3 { public static void main(String[] args) { //0copy ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0'}); System.out.println(buf); System.out.println(ByteBufUtil.prettyHexDump(buf)); ByteBuf s1 = buf.slice(0, 6); s1.retain(); System.out.println(s1); ByteBuf s2 = buf.slice(6, 4); s2.retain(); System.out.println(s2); buf.release(); System.out.println(ByteBufUtil.prettyHexDump(s1)); System.out.println(ByteBufUtil.prettyHexDump(s2)); } }
这个切片使用了都是同一片内存(并没有真正的生成了一个新的ByteBuf),如果上某个直接做了释放,下边的话也会收到影响。所以,我们要记得每次使用一个bytebBuf我们要使用一次retain方法。