一文搞定Netty,打造单机百万连接测试!2

简介: 一文搞定Netty,打造单机百万连接测试!

4.Netty核心源码分析

4.1.Java NIO之Selector

1、什么是Selector

Selector(选择器)是java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

2、为什么使用Selector

仅用单线程来处理多个channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程来处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统一些资源(如内存),因此使用的线程越少越好。

3、Selector的创建

通过调用Selector.open()方法创建一个Selector,如下:

Selector selector = Selector.open();

4、向Selector注册通道

为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:

ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
SelecttionKey key = channel.register(selector,SelectionKey.OP_READ);

与Selector一起使用时,Channel必须处于非阻塞模式下,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道可以。

注意register()方法的第二个参数。这是一个interest集合,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

  • Connect:SelectionKey.OP_CONNECT “连接就绪”
  • Accept:SelectionKey.OP_ACCEPT “接收就绪”
  • Read:SelectionKey.OP_READ “读就绪”
  • Write:SelectionKey.OP_WRITE “写就绪”

如果不止对一种事件感兴趣,可以用”位或“操作符将常量符拼接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

5、SelectionKey

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些属性:

(1)interest集合

interest集合就是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,用“位与”操作interest 集合和给定的SelectionKey常量,可以确定某个确定的事件是否在interest 集合中。

int interestSet = selectionKey.interestOps();
//判断接收接续的事件是否在集合中
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
//判断连接就绪的事件是否在集合中
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
//判断读就绪事件是否在集合中
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
//判断写就绪事件是否在集合中
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

(2)ready集合

ready集合是通道已经准备就绪的操作的集合,在一次选择(Selection)之后,你会首先访问这个ready Set。

//获取读就绪事件的个数
int readySet = selectionKey.readyOps();

可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

//判断接收就绪的事件
selectionKey.isAcceptable();
//判断连接就绪的事件
selectionKey.isConnectable();
//判断读就绪的事件
selectionKey.isReadable();
//判断写就绪事件
selectionKey.isWriteable();

(3)Channel+Selector

从SelectionKey访问Channel和Selector很简单.

//获取当前channel
Channel channel = selectionKey.channel();
//获取当前selector
Selector selector = selectionKey.selector();

(4)附加对象

可以将一个对象或者更多的信息附着在SelectionKey上,这样就能方便的识别某个给定的通道,例如,可以附加与通道一起使用的Buffer,或者包含聚集数据的某个对像。

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以通过register()方法像Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector,SelectionKey.OP_READ,thObject);

6、通过Selector选择通道

一旦向Selector注册了一个或者多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(连接、接受、读或者写)已经准备就绪的那些通道。

(1)int select()

阻塞到至少有一个通道在你注册的事件上就绪了。

(2)int select(long timeout)

和select一样,处理最长会阻塞timeout毫秒(参数)

(3)int selectNow()

不会阻塞,不管什么通道就绪都立即返回,如果没有通道可以选择,此方法直接返回零

select()方法返回的int值表示有多少通道已经准备就绪。

7、selectedKeys()

一旦调用select()方法后,并且返回值有一个或多个准备就绪了,然后就可以通过调用selector的selectedKeys()方法,访问已选择键集中的就绪通道。

Set selectedKeys = selector.selectedKeys();

当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

这个循环遍历已选择键集中的每个键,并检测各个键锁对应的通道的就绪事件。


注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。


SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。

8、wakeUp()

某个线程调用select()方法阻塞后,即使没有通道已经就绪,也有办法让其从select()中返回,只要让其他线程在第一个线程调用select()方法的哪个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

9、close()

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

10.完整示例

//创建ServerSocketChannel对象
ServerSocketChannel channel = ServerSocketChannel.open();
//创建selector对像
Selector selector = Selector.open();
//设置非阻塞
channel.configureBlocking(false);
//绑定通道,返回读就绪的事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
   //查询当前已经有多少读就绪的事件
  int readyChannels = selector.select();
  //如果都就绪事件为0推出当前循环
  if(readyChannels == 0) continue;
  //调用select方法后,只要有就绪的通道,就开始遍历通道
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    //判断是否是那四种事件类型,做除响应的处理
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}

4.2.EventLoop和EventLoopGroup

1、EventLoop和EventLoopGroup简介

为解决系统在运行中频繁切换上下文带来的性能的损耗。

而且要考虑并发下数据的安全。Netty采用了串行化的设计理念,从消息的读取、编码以及后续ChannelHandler的执行,始终都由IO线程EventLoop负责,这就意味着整个流程不会进行线程上下文的切换,

数据也不会面临被并发修改的风险。

EventLoopGroup是一组EventLoop的抽象,一个EventLoopGroup当中包含一个或者多个EventLoop,EventLoopGroup提供next接口,可以从一组EventLoop里卖弄按照一定的规则获取其中一个EventLoop来进行工作。

2、分析这两块代码

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

这里创建了两个EventLoopGroup对象bossGroup和workGroup,bossGroup主要用来处理server socket监听client socket的连接请求,server socket接收了新的连接后,会把connection socket放到workGroup中进行处理,也就是说workGroup主要用来处理connection socket的网络IO和相关的业务逻辑。workGroup会由next选择其中一个EventLoop来讲这个SocketChannel注册到其维护的Selector并对后续的IO事件进行处理。


ChannelPipeline中每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。所以至关重要不要阻塞这个线程。

总结:

  • NioEventLoopGroup实际上就是个线程池,一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有的EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册一个EventLoop
  • 每一个EventLoop负责处理一个或者多个Channel
  • 一个EventLoop维护一个Selector

3、跟踪NioEventLoopGroup构造函数

(1)NioEventLoopGroup

 public NioEventLoopGroup(int nThreads) {
     this(nThreads, (Executor) null);
 }
 public NioEventLoopGroup() {
     this(0);
 }

(2)进入到NioEventLoopGroup的父类MultithreadEventLoopGroup

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

可以看见NioEventGroup的构造函数如果nThreads为非0值时,则为传入的实际值,如果为0的话或者没有参数,则为DEFAULT_EVENT_LOOP_THREADS,DEFAULT_EVENT_LOOP_THREADS为系统内核的2倍。

    static {
        //获取系统的内核*2付给DEFAULT_EVENT_LOOP_THREADS,静态块加载
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }


9e6afc5e4ea34d259711fca6c5771c03.jpg


aea00433c60643e480a29f46b430ed2e.jpg


44cb4ddacda84df4bbc738fc9695eaad.jpg

4.3.Netty的启动引导类

Netty的启动类分为客户端启动类和服务端启动类,分别是Bootstrap和ServerBootStrap。他们都是AbstractBootStrap的子类,总的来说他们都是Netty中的辅助类,提供了链式配置方法,方便Channel的引导和启动。

1、服务端启动引导类ServerBootStrap

(1)group:设置线程模型,Reactor线程模型对比EventLoopGroup

  • 先看一下ServerBootStrap的group方法
  • 双参group方法

044562af8ab343949de6dd02fe916221.jpg

  • 单参group方法
  • 044562af8ab343949de6dd02fe916221.jpg
  • 单线程方式

bb3d656994524c218434c2e2f7607cc8.jpg

  • 多线程方式8cbb4d4163e4420dba5a674b2fca056e.jpg


  • 主从线程方式a2f992a5088242d783322b30ea4667ca.jpg


(2)channel:设置channel通道类型NioServerSocketChannel、OioServerSocketChannel

  • 用来设置交互模型的,5种IO模型

(3)option:作用于每个新建立的channel,设置TCP连接中的一些参数

  • ChannelOption.SO_BACKLOG:存放已完成三次握手的请求的等待队列的最大长度。
  • 设置ChannelOption.SO_BACKLOG参数

5168c79a64db4d128312499fe8aeaf50.jpg

  • Linux服务器TCP连接底层:
  • syn queue:半连接队列,tcp_max_syn_backlog
  • accept_queue:全连接队列,net.core.somaxconn
  • 系统默认的somaxconn参数要足够大,如果backlog比somaxconn大,则会优先用后者。


97d6e017e6034668997ff7557cfa53a9.jpg

  • ChannelOption.TCP_NODELAY:为解决Nagle算法的问题,默认是false,要求高实时性,有数据时马上发送,就将该选项设置为true打开Nagle算法。

(4)childOption: 作用于被accept之后的连接

(5)childHandler:用于对每个通道里面的数据处理

06fa5d46846749ecbbb6dbd356bea271.jpg

2、客户端启动引导类Bootstrap

67460bc686894741a7ab9b8a8ae797c6.jpg

3、客户端与服务端的引导过程

(1)引导服务器


f56f6c25a7c54882a7ce87f6e859c9a1.jpg

(2)引导客户端


f48f1899374d41fdb58c32c994e42ad4.jpg

4.4.Netty核心组件Channel

  • 什么是Channel:客户端和服务器建立连接的一个连接通道
  • 什么是ChannelHandler:负责Channel的逻辑处理
  • 什么是ChannelPipeline:负责管理ChannelHandler的有序容器
  • 三者的关系:
  • 一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到
  • ChannelPipeline中创建Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久性的。
  • Channel当状态出现的变化,就会触发对应的事件
  • channelRegistered:channel注册到一个EventLoop
  • channelActive:变为活跃状态(连接到了远程主机),可以接收和发送数据
  • channelInactive:channel处于非活跃状态,没有连接到远程主机
  • channelUnregistered:channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定。


4.5.ChannelHander接口详解

1、ChannelHandler的生命周期

接口ChannelHandler定义的生命周期相关定义如下表。这些操作在ChannelHandler被添加到一个ChannelPipline,或者从一个ChannelPipeline中移除时被调用,每一个方法都有一个ChannelHandlerContext作为参数。

类型 描述
handlerAdded 当ChannelHandler被添加到一个ChannelPipeline时被调用
handlerRemoved 当ChannelHandler从一个ChannelPipeline中移除时被调用


exceptionCaught 处理过程中ChannelPipeline中发生错误时被调用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JP1ksSGR-1667215373972)(images/4.2(2).jpg)]

Netty定义了下面两个重要的ChannelHandler子接口

  • ChannelInboundHandler:处理输入数据和所有类型的状态变化
  • ChannelOutboundHandler:处理输出数据,可以拦截所有操作。
  • 2、ChannelInboundHandler接口

ChannelInboundHandler方法

类型 描述
channelRegistered 当一个Channel注册到EventLoop上调用的方法
channelUnregistered 当一个Channel从它的EventLoop上解除注册,不再处理I/O时被调用
channelActive 当Channel变成活跃状态时被调用;Channel是连接/绑定、就绪的
channelInactive 当Channel离开活跃状态,不再连接到某个远端时被调用
channelReadComplete 当Channel上某个读操作完成时调用
channelRead 当从Channel中读数据时被调用
userEventTriggered 因某个POJO穿过ChannelPipeline引发ChannelnboundHandler.fireUserEventTriggered()时被调用

示例:

f9c9dfaa4d00469bbbe8a8d5db75d7c4.jpg

当一个ChannelInboundHandler实现类重写channelRead()方法时,它负责释放ByteBuf的内存。Netty为此提供了工具方法,ReferenceCountUtil.release()。

e0e93ed17813463381b19e1ff14c9567.jpg

对未释放的资源,Netty会打印一个警告(WARN-level)的日志消息,让你很方便的查找代码中的问题,但是用这种方式管理资源有些麻烦,还有一个更简单的替代方法就是用SimpleChannelInboundHandler


f86957ede41b44528a6e13a134da4fac.jpg

因为SimpleChannelInboundHandler自动释放资源,任何对消息的引用都会变成无效,所以你不能保存这些引用在后面的ChannelHandler中再次使用。

3、ChannelOutboundHandler接口

输出的操作和数据由ChannelOutBoundHandler处理。它的方法可以被Channel,ChannelPipeline和ChannelHandlerContext调用。

ChannelOutboundHandler有一个强大的功能,可可以按需推迟一个操作,这使得处理请求可以用到更为复杂的策略。比如,如果写数据到远端被暂停,你可以推迟flush操作,稍后再试。

ChannelOutboundHandler方法


image.png


ChannelOutboundHander的大部分方法都用了一个ChannelPromise输入参数,用于当操作完成时收到通知。ChannelPromise是ChannelFuture的子接口,定义了可写的方法,比如setSuccess(),或者setFailure(),而ChannelFuture则是不可变的对象。

4、ChannelHandler适配器类

刚开始尝试些写你自己的ChannelHandler时,你可以用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter这两个类。

这两个适配器类分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。它们继承了共同的父接口ChannelHandler的方法,扩展了抽象类ChannelHandlerAdapter。

ChannelHandlerAdapter类层级关系


d5e6004872884218b57c9c94a8aa0000.jpg

ChannelHandlerAdapter还提供了一个工具方法isSharable()。如果实现类带有@Sharable注解,那么这个方法就会返回true,意味着这个对象可以被添加到多个ChannelPipeline中。

ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中的方法调用相关ChannelHandlerContext中的等效方法,一次将事件转发到管道中的下一个ChannelHandler。

5、资源管理

无论何时你对数据操作ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write(),你需要确保没有资源泄露。Netty采用计数来处理ByteBuf池。所以在你用完一个ByteBuf后,调整引用计数的值是很重要的。


为了帮助检测潜在问题,Netty提供了ResourceLeakDetector类,它通过采样应用程序1%的buffer分配来检查是否有内存泄露。这个过程开销是很小的。


如果泄露被检测到,会产生类似下面的日志消息:

99be3a9013d74eafafebeef2ea3d124b.jpg

内存泄露检测级别

级别 描述
DISABLED 关闭内存泄露检测。只有在大量测试后,才能引用这个级别。
SIMPLE 报告默认的1%采样率中发现的任何泄露。
ADVANCED 报告发现的泄露和消息的位置。使用默认的采样率。
PARANOID 类似ADVANCED级别,但是每个消息的获取都被检测采样。这对性能有很大影响,只能在调试阶段使用。

在你实现ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()时,你怎样用这个诊断工具来防止内存泄露呢。让我们来看看当前handler没有通过ChannelContext.fireChannelRead()把消息传递到下一个ChannelInboundHandler。下面的代码说明了如何释放这条消息占用的内存。


845f7d3ba0f4451faf41a6922f51e93e.jpg


因为读取和释放输入消息是一个非常罕见的任务,Netty提供了一个特殊的ChannelInboundHandler实现类-SimpleChannelInboundHandler。一条消息在被SimplateChannelInboundHandler的channelRead0()读取后,会被自动释放资源。


在输出方向,如果你处理一个write()操作并且丢弃一条消息(没有写入Channel),你就应该负责释放这条消息。下面代码实现丢弃所有待写的数据。

50de25837b1f409cbf25ece76d5ed6af.jpg



重要的是,不仅要释放资源,而且要通知ChannelPromise,否则会出现某个ChannelFutureListener没有被告知消息已被处理的情况。


总之,如果一个消息被“消费”或者丢弃,没有送到ChannelPipeline中的下一个ChannelOutboundHandler,用户就要负责调用ReferenceCountUtil.release()。如果消息到达了真正的传输层,在它被写到socket中或者Channel关闭时,会被自动释放(这种情况下用户就不用管了)。

4.6.ChannelPipeline接口详解

1、ChannelPipeline接口简介

如果你把ChannelPipeline看成是一ChannelHandler实例,连接穿过Channel的输入输出event,那么就很容易理解,每个新创建的Channel都会分配一个新的ChannelPipeline,这个关系是恒定的,Channel不可以换别的ChannelPipeline,也不可以解除当前分配的ChannelPipeline。

根据来源一个event可以被一个ChannelInboundHandler或者ChannelOutboundHandler处理,接下来通过调用ChannelHandlerContext的方法,

它会被转发到下一个同类型的handler。

2、ChannelHandlerContext

**ChannelHandlerContext让一个ChannelHandler可以和它的ChannelPipeline和其他的Handler交互。**通过ChannelHandlerContext,一个handler可以通知ChannelPipeline中的下一个ChannelHandler。甚至动态改变他所属ChannelPipeline。


277eda71807545f1b52a259e621160b1.jpg

3、修改一个ChannelPipeline

一个ChannelHandler可以通过增加,删除或者替换其他ChannelHandler来改变一个ChannelPipeline的布局。这是ChannelHandler最重要的特性之一。

改动ChannelPipeline的ChannelHandler方法

方法名 描述
addFirst / addBefore / addAfter / addLast 添加一个ChannelHandler到ChannelPipeline
remove 从ChannelPipeline中删除一个ChannelHandler
replace 用一个ChannelHandler替换ChannelPipeline中的另一个ChannelHandler


933b947d775c44529a74e5860b2d5f35.jpg

4、ChannelPipeline获取ChannelHandler的操作

方法名 描述
get 通过类型或者名字返回一个ChannelHandler
context 返回ChannelHandler绑定的ChannelHandlerContext
names 返回ChannelPipeline中所有ChannelHandler的名字

5、ChannelPipeline输入方法

ChannelPipeline API提供了一些调用输入和输出操作的额外方法。用来通知ChannelInboundHandler在ChannelPipline发生的event。

image.png



535ddbf0141a4fcd9f9b73846b27e424.jpg

实现类为DefaultChannelPipeline中具体实现

011337d318874947a916ec9008019da2.jpg

6、ChannelPipeline输出方法

image.png

7、总结

  • ChannelPipeline保存了所有Channel相关的ChannelHandler
  • 可以通过增加和减少ChannelHandler来动态修改ChannelPipeline
  • ChannelPipeline有大量的API,用来对输入输出做出响应行动

4.7.ChannelHandlerContext接口详解

1、ChannelHandlerContext接口简介

ChannelHandlerContext代表了一个ChannelHandler和一个ChannelPipeline之间的关系,它在ChannelHandler被添加到ChannelPipeline时被创建。ChannelHandlerContext的主要功能是管理它对应的ChannelHandler和属于同一个ChannelPipeline的其他ChannelHandler之间的交互。


ChannelHandlerContext有很多方法,其中一些方法和Channel与ChannelPipeline中相同,但是还是有些区别的,如果你在Channel或者ChannelPipeline实例上调用这些方法,他们会贯穿整个pipeline,但是ChannelHandlerContext中调用相同的方法,仅仅从当前ChannelHandler开始,走到pipeline中下一个可以处理这个event的ChannelHandler。


一个ChannelHandler绑定的ChannelHandlerContext 永远不会改变,所以把它的引用缓存起来是安全的。

2、ChannelHandlerContext的使用

25584cb9c9ee4d01890b9b26584cef7e.jpg

通过Channel或者ChannelPipeline传递event

8ece67f5fd3847e087a9ea2c955e7ac2.jpg


通过ChannelHandlerContext触发event流操作

3c54b1a706d848909299a9b3a1b660f4.jpg

4.8.多个入站出战ChannelHandler的执行顺序

1、一般项目中,InboundHandler和OutboundHandler有多个,在Pipeline中执行顺序?

  • InboundHandler顺序执行,OutboundHandler逆序执行。
  • 2、测试执行顺序

(1)按照InboundHandler1、InboundHandler2、OutboundHandler1、OutboundHandler2顺序存放

socketChannel.pipeline().addLast(new InboundHandler1());
socketChannel.pipeline().addLast(new InboundHandler2());
socketChannel.pipeline().addLast(new OutboundHandler1());
socketChannel.pipeline().addLast(new OutboundHandler2());

98563f62603d44189aebd986154b9f47.jpg

ChannelHandlerContext不会贯穿整个pipeline,寻找到要处理的Handler开始处理。

9dbd00f87bda4ca1895aece6ea216f5e.jpg

(2)按照OutboundHandler1、OutboundHandler2、InboundHandler1、InboundHandler2顺序存放

socketChannel.pipeline().addLast(new OutboundHandler1());
socketChannel.pipeline().addLast(new OutboundHandler2());
socketChannel.pipeline().addLast(new InboundHandler1());
socketChannel.pipeline().addLast(new InboundHandler2());

92d8e570e9ba4d7d9218a74a0ce5c167.jpg

(3)按照InboundHandler1、InboundHandler2、OutboundHandler1、OutboundHandler2顺序存放,用pipeline调用

socketChannel.pipeline().addLast(new InboundHandler1());
socketChannel.pipeline().addLast(new InboundHandler2());
socketChannel.pipeline().addLast(new OutboundHandler1());
socketChannel.pipeline().addLast(new OutboundHandler2());

963712672257435c99e9f56f123ad424.jpg

3、结论

  • InboundHandler顺序执行,OutboundHandler逆序执行
  • InboundHandler顺序执行,OutboundHandler逆序执行

InboundHandler之间传递数据,通过ctx.fireChannelRead(msg)

InboundHandler通过ctx.write(msg),则会传递outboudnHandler

使用ctx.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行,但是使用channel.write(msg)、pipeline.write(msg)情况会不一致,都会执行。

outBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求在接收数据,先outbound在inbound,服务端则相反。

4.9.Netty异步回调模式-Future和Promise

1、Future简介


我们知道Netty的I/O操作都是异步的,例如bind、connect、write等操作,会返回一个Future接口。Netty源码中大量使用了异步回调处理模式。在做Netty应用开发时,我们也会用到,所以了解Netty的异步监听,无论是做Netty应用开发还是阅读源码都是十分重要的。

2、Future接口剖析


ed53c3b7332547ccbefdbb3b7b3f5a37.jpg

可知:ChannelFuture继承了Netty的Future,Netty的Future继承了JDK的Future

JDK的Future,用的最多的就是线程池ThreadPoolExecutor,通过submit方法提交任务返回一个Future实例,通过它来查询任务的执行状态和执行结果,最常用的方法就是isDone()和get()。

Netty的Future,在继承JDK的Future基础上,扩展了自己的方法:

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 任务执行成功,返回true
    boolean isSuccess();
    // 任务被取消,返回true
    boolean isCancellable();
    // 支付执行失败,返回异常
    Throwable cause();
    // 添加Listener,异步非阻塞处理回调结果
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 移除Listener
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 同步阻塞等待任务结束;执行失败话,会将“异常信息”重新抛出来
    Future<V> sync() throws InterruptedException;
    Future<V> syncUninterruptibly();
    // 同步阻塞等待任务结束,和sync方法一样,只不过不会抛出异常信息
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    // 非阻塞,获取执行结果
    V getNow();
    // 取消任务
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

我们知道JDK的Future的任务结果获取需要主动查询,而Netty的Future通过添加监听器Listener,可以做到异步非阻塞处理任务结果,可以称为被动回调

同步阻塞有两种方式:sync和await(),区别:sync()方法在任务失败后,会把异常信息抛出,await()方法对异常信息不做任何处理,当我们不关心异常信息的时候可以用await(),通过阅读源码我们知道sync()方法其实里面调的就是await()方法。



0821e2ca6fe5473d821f681250a8ca25.jpg

Future接口没有和IO操作关联在一起,Future的子接口ChannelFuture,它和IO操作中的channel通道关联在一起了,用于异步处理channel事件,这个接口用的最多。

public interface ChannelFuture extends Future<Void> {
    // 获取channel通道
    Channel channel();
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();
    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();
    // 标记Futrue是否为Void,如果ChannelFuture是一个void的Future,不允许调// 用addListener(),await(),sync()相关方法
    boolean isVoid();
}

ChannelFuture接口相比父类Future接口,就增加了**channel()isVoid()**两个方法,其他方法都是覆盖父类的接口,没有别的扩展。

了解一下ChannelFuture的状态转换


fbef2b44e8354c33a70fbe257aa747d3.jpg

ChannelFutue就两种状态Uncompleted(未完成)Completed(完成),Completed包括三种,执行成功,执行失败和任务取消。注意:执行失败和任务取消都属于Completed


让我们来看一下Future的另一个子接口Promise,它是个可写的Future

public interface Promise<V> extends Future<V> {
    // 执行成功,设置返回值,并通知所有listener,如果已经设置,则会抛出异常
    Promise<V> setSuccess(V result);
    // 设置返回值,如果已经设置,返回false
    boolean trySuccess(V result);
    // 执行失败,设置异常,并通知所有listener
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    // 标记Future不可取消
    boolean setUncancellable();
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();
    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}

Future接口只提供了获取返回值的get()方法,不可设置返回值。

Promise接口在Future基础上,还提供了设置返回值和异常信息并立即通知listeners

而且,一旦setSuccess()或setFailure()后,那些await()或者sync()的线程就会从等待中返回。

接下来,让我们来看看ChannelFuture的可写的子接口ChannelPromise

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    // 覆盖ChannelFuture接口
    @Override
    Channel channel();
    // 覆盖Promise接口
    @Override
    ChannelPromise setSuccess(Void result);
    ChannelPromise setSuccess();
    boolean trySuccess();
    @Override
    ChannelPromise setFailure(Throwable cause);
    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise sync() throws InterruptedException;
    @Override
    ChannelPromise syncUninterruptibly();
    @Override
    ChannelPromise await() throws InterruptedException;
    @Override
    ChannelPromise awaitUninterruptibly();
    ChannelPromise unvoid();
}

ChannelPromise接口只是综合了ChannelFuture和Promise接口,没有新增的功能。

1dc3ab592ab640fb92f72a78acba7afb.jpg

3、Promise的实现类

DefaultPromise的主要属性

// setSuccess设置result为null时,设置成SUCCESS
private static final Object SUCCESS = new Object();
// 不可取消值
private static final Object UNCANCELLABLE = new Object();
// 取消异常信息持有者
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
        StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
//执行结果,使用volatile修饰,保证可见性
private volatile Object result;
// 当Promise执行完成时需要通知Listener,此时就使用这个executor
private final EventExecutor executor;
// 要通知的listener,因为它可能是不同的类型,所以定义为Object类型,使用时再判断类型
private Object listeners;
// 要使用wait()/notifyAll()机制,这个变量记录了waiter的数量
private short waiters;
// 是否正在通知listener
private boolean notifyingListeners;

从属性中可以看出, DefaultPromise 通过 Object 的 wait/notify 机制实现线程间的同步,通过 volatile 属性保证线程间的可见性。

@Override
public Promise<V> setSuccess(V result) {
    // 第一次成功设置返回值后,返回Promise对象
    if (setSuccess0(result)) {
        return this;
    }
    // 否则抛出异常
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
    // result为null,设置为SUCCESS
    return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
    // CAS设置result值,只有当result为null或者UNCANCELLABLE,才可以执行成功
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        // 唤醒等待的waiter,同时判断是否存在listener
        if (checkNotifyWaiters()) {
            // 通知所有的listener
            notifyListeners();
        }
        return true;
    }
    return false;
}
private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        // 通知所有waiters
        notifyAll();
    }
    // 判断是否添加了监听者listeners
    return listeners != null;
}
private void notifyListeners() {
    EventExecutor executor = executor();
    // 是否是同一个线程
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    // 用自己的executor执行
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

DefaultPromise的方法实现逻辑挺简单,不再全部讲解了,只需知道通过 Object 的 wait/notify 机制实现线程间的同步观察者设计模式进行通知就可以了。

DefaultPromise还有个子类DefaultChannelPromise,这个类在Netty中用的最多的,其内部逻辑调用的都是父类DefaultPromise的方法。DefaultChannelPromise类层次结构图如下:

b6a54b88f9654b0e92e0f1cfafd21158.jpg

Netty的Future继承JDK的Future,通过Object的wait/notify机制,实现了线程间的同步,使用观察者设计模式,实现了异步非阻塞回调处理。


相关文章
|
2月前
|
关系型数据库 MySQL 数据库
6-2|测试连接数据库的命令
6-2|测试连接数据库的命令
|
3月前
|
消息中间件 测试技术 RocketMQ
docker部署RockerMQ单机测试环境
docker部署RockerMQ单机测试环境
百万并发连接的实践测试02
百万并发连接的实践测试02
|
3月前
|
网络协议 Ubuntu
百万并发连接的实践测试01
百万并发连接的实践测试01
|
3月前
|
JavaScript 前端开发 应用服务中间件
【qkl】JavaScript连接web3钱包,实现测试网络中的 Sepolia ETH余额查询、转账功能
【区块链】JavaScript连接web3钱包,实现测试网络中的 Sepolia ETH余额查询、转账功能
|
5月前
|
NoSQL Redis 数据安全/隐私保护
连接测试服务器redis
连接测试服务器redis
|
5月前
|
关系型数据库 分布式数据库 数据库
PolarDB产品使用问题之如何解决测试连接时出现2003-Can't connect的问题
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
22天前
|
JSON 算法 数据可视化
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
这篇文章是关于如何通过算法接口返回的目标检测结果来计算性能指标的笔记。它涵盖了任务描述、指标分析(包括TP、FP、FN、TN、精准率和召回率),接口处理,数据集处理,以及如何使用实用工具进行文件操作和数据可视化。文章还提供了一些Python代码示例,用于处理图像文件、转换数据格式以及计算目标检测的性能指标。
36 0
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
|
2月前
|
移动开发 JSON Java
Jmeter实现WebSocket协议的接口测试方法
WebSocket协议是HTML5的一种新协议,实现了浏览器与服务器之间的全双工通信。通过简单的握手动作,双方可直接传输数据。其优势包括极小的头部开销和服务器推送功能。使用JMeter进行WebSocket接口和性能测试时,需安装特定插件并配置相关参数,如服务器地址、端口号等,还可通过CSV文件实现参数化,以满足不同测试需求。
200 7
Jmeter实现WebSocket协议的接口测试方法
|
2月前
|
JSON 移动开发 监控
快速上手|HTTP 接口功能自动化测试
HTTP接口功能测试对于确保Web应用和H5应用的数据正确性至关重要。这类测试主要针对后台HTTP接口,通过构造不同参数输入值并获取JSON格式的输出结果来进行验证。HTTP协议基于TCP连接,包括请求与响应模式。请求由请求行、消息报头和请求正文组成,响应则包含状态行、消息报头及响应正文。常用的请求方法有GET、POST等,而响应状态码如2xx代表成功。测试过程使用Python语言和pycurl模块调用接口,并通过断言机制比对实际与预期结果,确保功能正确性。
214 3
快速上手|HTTP 接口功能自动化测试