回顾下Nio的组成由以下构成,既然Netty是基于Nio的,那么肯定也包含下面那么多组件。但具体Netty有哪些组件呢,其作用是什么呢?和Nio的关系又是什么呢?接下来的篇章中,会一 一为大家解答。
一、Netty事件响应机制
1.1 Netty的事件响应机制
/** * 作者:DarkKing * 创建日期:2019/10/02 * 类说明:netty服务端 * */ public class NettyServer { private final int port; public NettyServer(int port) { this.port = port; } public static void main(String[] args) throws InterruptedException { int port = 9999; NettyServer echoServer = new NettyServer(port); System.out.println("服务器启动"); echoServer.start(); System.out.println("服务器关闭"); } public void start() throws InterruptedException { final NettyServerHandler serverHandler = new NettyServerHandler(); /*线程组*/ EventLoopGroup group = new NioEventLoopGroup(); try { /*服务端启动必须*/ ServerBootstrap b = new ServerBootstrap(); b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { /*添加到该子channel的pipeline的尾部*/ ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } }
在回顾下上篇文章的demo。上面程序中,基本上包含了netty重要的一些概念和组件。
事件回调
一个回调其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。这使得后者可以在适当的时候调用前者。回调在广泛的编程场景中都有应用,而且也是在操作完成后通知相关方最常见的方式之一。
Netty 在内部使用了回调来处理事件;当一个回调被触发时,相关的事件可以被一个interface-ChannelHandler 的实现处理。
Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
JDK 预置了interface java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以Netty提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用。
ChannelFuture提供了几种额外的方法,这些方法使得我们能够注册一个或者多个ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的操作完成时被调用。然后监听器可以判断该操作是成功地完成了还是出错了。如果是后者,我们可以检索产生的Throwable。简而言之,由ChannelFutureListener提供的通知机制消除了手动检查对应的操作是否完成的必要。
每个Netty 的出站I/O 操作都将返回一个ChannelFuture。
事件
Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。
Netty事件是按照它们与入站或出站数据流的相关性进行分类的。可能由入站数据或者相关的状态更改而触发的事件包括:连接已被激活或者连接失活;数据读取;用户事件;错误事件。
出站事件是未来将会触发的某个动作的操作结果,这些动作包括:打开或者关闭到远程节点的连接;将数据写到或者冲刷到套接字。
每个事件都可以被分发给ChannelHandler 类中的某个用户实现的方法。可以认为每个ChannelHandler 的实例都类似于一种为了响应特定事件而被执行的回调。
Netty 提供了大量预定义的可以开箱即用的ChannelHandler 实现,包括用于各种协议(如HTTP 和SSL/TLS)的ChannelHandler。
二、 Netty重要组件介绍
2.1 Channel、EventLoop(Group)和ChannelFuture
其中每个组件作用如下
Channel 类似于JAVA中的Socket,用于客户端连接,数据交换;
EventLoop用于控制流、多线程处理以及并发处理;
ChannelFuture 异步事件通知。
Channel和EventLoop关系如图:
2.1.1 Channel
基本的I/O 操作(bind()、connect()、read()和write())依赖于底层网络传输所提供的原语。在基于Java 的网络编程中,其基本的构造是类Socket。Netty 的Channel 接口所提供的API,被用于所有的I/O 操作。大大地降低了直接使用Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。
由于Channel 是独一无二的,所以为了保证顺序将Channel 声明为java.lang.Comparable 的一个子接口。因此,如果两个不同的Channel 实例都返回了相同的散列码,那么AbstractChannel 中的compareTo()方法的实现将会抛出一个Error。
Channel 的生命周期状态
状态 | 描述 |
ChannelUnregistered | Channel 已经被创建,但还未注册到EventLoop |
ChannelRegistered | Channel 已经被注册到了EventLoop |
ChannelActive | Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 |
ChannelInactive | Channel 没有连接到远程节点 |
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline 中的ChannelHandler,其可以随后对它们做出响应。
重要Channel 的方法
方法 | 描述 |
eventLoop | 返回分配给Channel 的EventLoop |
pipeline | 返回分配给Channel 的ChannelPipeline |
isActive | 如果Channel 是活动的,则返回true。活动的意义可能依赖于底层的传输。例如,一个Socket 传输一旦连接到了远程节点便是活动的,而一个Datagram 传输一旦被打开便是活动的。 |
localAddress | 返回本地的SokcetAddress |
remoteAddress | 返回远程的SocketAddress |
write | 将数据写到远程节点。这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷 |
flush | 将之前已写的数据冲刷到底层传输,如一个Socket |
writeAndFlush | 一个简便的方法,等同于调用write()并接着调用flush() |
2.1.2 EventLoop和EventLoopGroup
回想一下我们在NIO中是如何处理我们关心的事件的?
//NIO处理时间事件方式 @Override public void run() { //循环遍历selector while (started) { try { //阻塞,只有当至少一个注册的事件发生的时候才会继续. selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { t.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if (selector != null) try { selector.close(); } catch (Exception e) { e.printStackTrace(); } }
在一个while循环中select出事件,然后依次处理每种事件。我们可以把它称为事件循环,这就是EventLoop。interface io.netty.channel. EventLoop 定义了Netty 的核心抽象,用于处理网络连接的生命周期中所发生的事件。
io.netty.util.concurrent 包构建在JDK 的java.util.concurrent 包上。而,io.netty.channel 包中的类,为了与Channel 的事件进行交互,扩展了这些接口/类。一个EventLoop 将由一个永远都不会改变的Thread 驱动,同时任务(Runnable 或者Callable)可以直接提交给EventLoop 实现,以立即执行或者调度执行。
根据配置和可用核心的不同,可能会创建多个EventLoop 实例用以优化资源的使用,并且单个EventLoop 可能会被指派用于服务多个Channel。
Netty的EventLoop在继承了ScheduledExecutorService的同时,只定义了一个方法,parent()。在Netty 4 中,所有的I/O操作和事件都由已经被分配给了EventLoop的那个Thread来处理。
任务调度
偶尔,你将需要调度一个任务以便稍后(延迟)执行或者周期性地执行。例如,你可能想要注册一个在客户端已经连接了5 分钟之后触发的任务。一个常见的用例是,发送心跳消息到远程节点,以检查连接是否仍然还活着。如果没有响应,你便知道可以关闭该Channel 了。
线程管理
在内部,当提交任务到如果(当前)调用线程正是支撑EventLoop 的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。
线程的分配
服务于Channel 的I/O 和事件的EventLoop 则包含在EventLoopGroup 中。
异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个Channel 所共享。这使得可以通过尽可能少量的Thread 来支撑大量的Channel,而不是每个Channel 分配一个Thread。EventLoopGroup 负责为每个新创建的Channel 分配一个EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的EventLoop可能会被分配给多个Channel。
一旦一个Channel 被分配给一个EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的Thread)。请牢记这一点,因为它可以使你从担忧你的ChannelHandler 实现中的线程安全和同步问题中解脱出来。
需要注意,EventLoop 的分配方式对ThreadLocal 的使用的影响。因为一个EventLoop 通常会被用于支撑多个Channel,所以对于所有相关联的Channel 来说,ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然可以被用于在多个Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。
2.1.3 ChannelFuture
Netty 中所有的I/O 操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture 接口,其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
可以将ChannelFuture 看作是将来要执行的操作的结果的占位符。它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确地预测,但是可以肯定的是它将会被执行。
2.2 ChannelHandler、ChannelPipeline和ChannelHandlerContext
2.2.1 ChannelHandler
从应用程序开发人员的角度来看,Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 的方法是由网络事件触发的。事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,例如各种编解码,或者处理转换过程中所抛出的异常。
举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler 直接冲刷数据然后输出到对端。应用程序的业务逻辑通常实现在一个或者多个ChannelInboundHandler 中。
这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被应用程序的业务逻辑所处理。
ChannelHandler 的生命周期
接口 ChannelHandler 定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline 中或者被从ChannelPipeline 中移除时会调用这些操作。这些方法中的每一个都接受一个ChannelHandlerContext 参数。
生命周期 | 描述 |
handlerAdded | 当把ChannelHandler 添加到ChannelPipeline 中时被调用 |
handlerRemoved | 当从ChannelPipeline 中移除ChannelHandler 时被调用 |
exceptionCaught | 当处理过程中在ChannelPipeline 中有错误产生时被调用 |
Netty 定义了下面两个重要的ChannelHandler 子接口:
ChannelInboundHandler——处理入站数据以及各种状态变化;
ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。