前面我们已经学习了NIO的简单知识,三大组件:ByteBuffer、Channel、Selector。知道ByteBufffer是数据,而Channel是数据的载体通道,selector为多路复用。如果说线程池为线程提供了重复利用的途径,而Selector则为起到了调度线程的目的,也即高效率的使用线程。下面我们开始Netty的学习。
首先,我们来了解一下mmap、sendFile、零拷贝。在java中,由于传统的IO读写需要进行四次拷贝、四次切换(如图),因此效率上,通常在传输大文件的时候比较低。因此引入了mmap和sendFile进行优化。同时这里,我们就需要了解DMA,在计算机原理中,我们可以看到它的身影,全称Direct Memory Access,翻译出来就是直接内存拷贝(不使用CPU)。也即它是相对于操作系统而言的。
那mmap和sendFile做了哪些优化呢?
mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,可以减少内核空念到用户空间的拷贝次数。
而sendFile则体现在Linux2.1到Linux2.4的优化sendFile的操作:
从图中可以看到Linux2.4的sendFile操作实现了零拷贝操作,也即只在系统层面进行操作。同时从上面的图中可以看到拷贝的次数:mmap经过了3次拷贝、3次切换。而在sendFile中,经过了2次拷贝2次切换。
零拷贝:从操作系统角度来说的,因为内核缓冲区之间,没有数据是重复的(只有内核缓冲区有一份数据)。同时零拷贝带来了更少的数据赋值,还带来了性能上的优势,减少了上下文切换。
Reactor模式和Proactor模式:
1.Reactor模式:主动模式,所谓主动,是指应用程序不断轮询,询问操作系统或者网络框架,IO是否就绪。其中java的NIO就属于这种模式。在这种模式下,实际的IO操作还是应用程序执行的。2.Procator模式:被动模式,应用程序的读写函数操作交给操作系统或者网络框架,实际IO操作由操作系统或者网络框架完成,之后再回调应用程序。微软的asio库就是这种模式。
事件驱动模型:通常我们设计一个事件处理模型的程序有两种思路:
1.采用轮询的方式:线程不断轮询询问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑2.事件驱动方式:发生事件,主线程把事件加入到事件队列,在另外线程不断循环消费事件列表的事件,调用事件对应的处理逻辑事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。基于事件驱动的优点:可扩展性好,高性能。
Reactor线程模型中2个关键组成:
Reactor:Reactor在一个单独的线程运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。Handlers:处理程序执行I/O事件要完成的实际事件。
线程模型reactor
Reactor的三种模型:单Reactor单线程、单Reactor多线程、主从Reactor多线程。
单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待 主从 Reactor 多线程,多个前台接待员,多个服务生
Netty的线程模型:
Netty主要基于主从Reactor多线程模型,其中主从Reactor主从模型有多个Reactor:
1)MainReactor负责客户端的连接请求,并将请求转交给SubReactor2)SubReactor负责相应通道的IO读写请求3)非IO请求(具体业务逻辑处理)的任务则会直接写入队列,等待worker线程进行处理。EventLoopGroupbossGroup=newNioEventLoopGroup(); EvenLoopGroupworkerGroup=newNioEventLoopGroup(); ServerBootstrapserver=newServerBootstrap(); server.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class);
bossGroup线程池则只是在bind某个端口后,获得其中一个线程作为MainReactor,专门处理端口的Accept事件,每个端口对应一个Boss线程,workerGroup线程池会被各个SubReactor和Worker线程充分利用。
NioEventLoop:维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:
I/O任务:即selectionKey中的ready的事件,如accept、connect、read、write等,由processSelectKeys方法触发。非I/O任务:添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
下面我们来看一下Netty的简单案例,下载Netty的源码,进行编译:
包括服务器端和客户端,其中服务器端包括服务端引导 、服务端处理器,客户端包括客户端引导、客户端处理器
服务器端引导:
/*** Echoes back any received data from a client.* Netty的服务端引导*/publicfinalclassEchoServer { staticfinalbooleanSSL=System.getProperty("ssl") !=null; //端口staticfinalintPORT=Integer.parseInt(System.getProperty("port", "8007")); publicstaticvoidmain(String[] args) throwsException { // Configure SSL.finalSslContextsslCtx; if (SSL) { SelfSignedCertificatessc=newSelfSignedCertificate(); sslCtx=SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx=null; } // Configure the server.boss和worker线程组EventLoopGroupbossGroup=newNioEventLoopGroup(1); EventLoopGroupworkerGroup=newNioEventLoopGroup(); //创建服务端业务处理器对象,进行业务处理,添加线程组,同时开启通道添加so_backlog//同时添加childHandler,重写initChannel方法finalEchoServerHandlerserverHandler=newEchoServerHandler(); try { //服务端引导ServerBootstrapb=newServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(newLoggingHandler(LogLevel.INFO)) .childHandler(newChannelInitializer<SocketChannel>() { publicvoidinitChannel(SocketChannelch) throwsException { //流水线ChannelPipelinep=ch.pipeline(); if (sslCtx!=null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //添加日志信息//p.addLast(new LoggingHandler(LogLevel.INFO));//添加serverHandlerp.addLast(serverHandler); } }); // Start the server.//启动服务器ChannelFuturef=b.bind(PORT).sync(); // Wait until the server socket is closed.f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
服务端处理器:
/*** Handler implementation for the echo server.* Netty服务端业务处理器,继承ChannelInboundHandlerAdapter,* 同时重写channelRead、ChannelReadComplete、exceptionCaught三个方法*/publicclassEchoServerHandlerextendsChannelInboundHandlerAdapter { publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) { ctx.write(msg); } publicvoidchannelReadComplete(ChannelHandlerContextctx) { ctx.flush(); } publicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) { // Close the connection when an exception is raised.cause.printStackTrace(); ctx.close(); } }
客户端引导
/*** Sends one message when a connection is open and echoes back any received* data to the server. Simply put, the echo client initiates the ping-pong* traffic between the echo client and server by sending the first message to* the server.* Netty客户端引导*/publicfinalclassEchoClient { staticfinalbooleanSSL=System.getProperty("ssl") !=null; //ip、端口号staticfinalStringHOST=System.getProperty("host", "127.0.0.1"); staticfinalintPORT=Integer.parseInt(System.getProperty("port", "8007")); staticfinalintSIZE=Integer.parseInt(System.getProperty("size", "256")); publicstaticvoidmain(String[] args) throwsException { // Configure SSL.gitfinalSslContextsslCtx; if (SSL) { sslCtx=SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx=null; } // Configure the client.EventLoopGroupgroup=newNioEventLoopGroup(); try { Bootstrapb=newBootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(newChannelInitializer<SocketChannel>() { publicvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinep=ch.pipeline(); if (sslCtx!=null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(newEchoClientHandler()); } }); // Start the client.ChannelFuturef=b.connect(HOST, PORT).sync(); // Wait until the connection is closed.f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads.group.shutdownGracefully(); } } }
客户端处理:
/*** Handler implementation for the echo client. It initiates the ping-pong* traffic between the echo client and server by sending the first message to* the server.* Netty客户端业务处理器,重写四个方法:channelActive、channelRead、channelReadComplete、exceptionCaught*/publicclassEchoClientHandlerextendsChannelInboundHandlerAdapter { privatefinalByteBuffirstMessage; /*** Creates a client-side handler.* 创建客户端处理器*/publicEchoClientHandler() { //使用类似byteBuffer方式创建bufferfirstMessage=Unpooled.buffer(EchoClient.SIZE); for (inti=0; i<firstMessage.capacity(); i++) { firstMessage.writeByte((byte) i); } } publicvoidchannelActive(ChannelHandlerContextctx) { ctx.writeAndFlush(firstMessage); } publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) { ctx.write(msg); } publicvoidchannelReadComplete(ChannelHandlerContextctx) { ctx.flush(); } publicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) { // Close the connection when an exception is raised.cause.printStackTrace(); ctx.close(); } }
启动服务器端和客户端,可以看到运行结果:
同时标注部分是发送的消息
今天就学到这里了!