【Netty】Netty 异步任务模型 及 Future-Listener 机制

简介: 【Netty】Netty 异步任务模型 及 Future-Listener 机制

文章目录

一、 Netty 模型

二、 异步模型

三、 Future-Listener 机制

四、 Future-Listener 机制代码示例





一、 Netty 模型


以服务器端为例




1 . 线程池 : Netty 模型核心就是两个线程池 , BossGroup 线程池 和 WorkerGroup 线程池 ;



① BossGroup 线程池 : 负责维护客户端连接操作 ;


② WorkerGroup 线程池 : 负责与客户端进行数据交互 ;


③ 线程池类型 : 上述两个线程池 ( BossGroup / WorkerGroup ) 都是 NioEventLoopGroup 类型的 ;


④ 线程池中的线程 : NioEventLoopGroup 线程池中维护了多个 NioEventLoop 线程 ;




2 . 线程池中的线程 : NioEventLoopGroup 线程池中维护了若干 NioEventLoop 线程 , 这相当于主从反应器 ( Reactor ) 模型中的反应器 , 每个 NioEventLoop 中都有一个 选择器 ( Selector ) , 用于监听 Socket IO 事件 , 如 建立连接 , 数据读写 等 ;




3 . NioEventLoop 工作流程 :



NioEventLoop 中可以按照一定顺序进行数据处理 , 如数据到来后 , 按照下面的流程执行一系列操作 ;


读取数据 -> 数据解码 -> 业务逻辑处理 -> 数据编码 -> 数据发送




4 . NioEventLoop 中封装内容 :


选择器 Selector

任务队列 TaskQueue

调度任务队列 ScheduleTaskQueue

NIO 通道 NioChannel

管道 ChannelPipeline



上面是 Netty 的模型的总体架构 , 下面重点介绍 Netty 模型中的异步模型 , Netty 中的每次绑定端口 , 连接远程端口 , 读写数据都要涉及到异步操作 ;






二、 异步模型


1 . 异步操作概念 : 调用者调用一个异步操作后 , 并不能马上知道该操作的返回值 , 该操作也不会马上执行完成 , 该操作完成后 , 会通过回调机制 , 如 通知 , 注册的回调函数等机制通知调用者 ;




2 . Netty 中的异步操作与 ChannelFuture 返回值 :



① 异步操作 : Netty 模型中凡是关于 IO 的操作 , 如绑定端口 ( Bind ) , 远程连接 ( Connect ) , 读取数据 ( Read ) , 写出数据 ( Write ) 等操作都是异步操作 ;


② 异步操作返回值 : 上述 IO 操作返回值都是 ChannelFuture 类型实例 , ChannelFuture 是异步 IO 操作的返回结果 ;


③ 在服务器端绑定端口号时 , 调用 Bootstrap 的 bind 方法 , 会返回 ChannelFuture 对象 ;


④ 在客户端调用 Bootstrap 的 connect 方法 , 也会返回 ChannelFuture 对象 ;




3 . Netty 中的异步操作机制 :



① Future-Listener 机制 : Future 表示当前不知道结果 , 在未来的某个时刻才知道结果 , Listener 表示监听操作 , 监听返回的结果 ;


② Netty 异步模型的两个基础 : Future ( ChannelFuture 未来知道结果 ) , Callback ( 监听回调 ) ;




4 . 以客户端写出数据到服务器端为例 :



客户端写出数据 : 客户端调用写出数据方法 ChannelFuture writeAndFlush(Object msg) , 向服务器写出数据 ;


操作耗时 : 假设在服务器中接收到该数据后 , 要执行一个非常耗时的操作才能返回结果 , 就是操作非常耗时 ;


客户端不等待 : 客户端这里写出了数据 , 肯定不能阻塞等待写出操作的结果 , 需要立刻执行下面的操作 , 因此该方法是异步的 ;


客户端监听 : writeAndFlush 方法返回一个 ChannelFuture 对象 , 如果客户端需要该操作的返回结果 , 那么通过 ChannelFuture 可以监听该写出方法是否成功 ;




5 . 异步操作返回结果 :



① 返回结果 : Future 表示异步 IO 操作执行结果 , 通过该 Future 提供的 检索 , 计算 等方法检查异步操作是否执行完成 ;


② 常用接口 : ChannelFuture 继承了 Future , 也是一个接口 , 可以为该接口对象注册监听器 , 当异步任务完成后会回调该监听器方法 ;


public interface ChannelFuture extends Future<Void>




6 . Future 链式操作 : 这里以读取数据 , 处理后返回结果为例 ;



数据读取操作 ;


对读取的数据进行解码处理 ;


执行业务逻辑


将数据编码 ;


将编码后的数据写出 ;


上述 5 55 个步骤 , 每个数据处理操作 , 都有与之对应的 Handler 处理器 ;


异步机制 : 在 Handler 处理器中需要实现异步机制 , 一般使用 Callback 回调 , 或 Future 机制 ;



链式操作优势 : 上述的链式操作 , 简洁 , 高效 , 可以让开发者快速开发高性能 , 高可靠性服务器 , 只关注业务逻辑 , 不用过多的将精力浪费在网络基础功能开发上 ;


这里的网络基础功能就是高可靠性 , 高性能的网络传输模块 ;






三、 Future-Listener 机制


1 . Future-Listener 机制 :



① Future 返回值 : 在 Netty 中执行 IO 操作 , 如 bind , read , write , connect 等方法 , 会立刻返回 ChannelFuture 对象 ;


② ChannelFuture 返回时状态 : 调用 IO 方法后 , 立刻返回 ChannelFuture 对象 , 此时该操作未完成 ;


③ 注册监听器 : ChannelFuture 可以设置 ChannelFutureListener 监听器 , 监听该 IO 操作完成状态 , 如果 IO 操作完成 , 那么会回调其 public void operationComplete(ChannelFuture future) throws Exception 接口实现方法 ;


④ IO 操作执行状态判定 : 在 operationComplete 方法中通过 调用 ChannelFuture future 参数的如下方法 , 判定当前 IO 操作完成状态 ;


future.isDone() : IO 操作是否完成 ;

future.isSuccess() : IO 操作是否成功 ; ( 常用 )

future.isCancelled() : IO 操作是否被取消 ;

future.cause() : IO 操作的失败原因 ;



2 . IO 操作的同步与异步 :



① 同步 IO 操作 : BIO 中的同步 IO 操作 , 会阻塞当前的线程 , IO 操作返回前 , 处于阻塞状态 , 不能执行其它操作 ;


② 异步 IO 操作 : 异步 IO 操作不会阻塞当前的线程 , 调用 IO 操作之后 , 可以立即执行其它操作 , 不会阻塞当前线程 , 该机制非常适用于高并发的场景 , 开发稳定 , 并发 , 高吞吐量 的服务器 ;




3 . 核心代码示例 :


// 监听绑定操作的结果
// 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if(future.isDone()){
            System.out.println("绑定端口完成");
        }
        if(future.isSuccess()){
            System.out.println("绑定端口成功");
        }else{
            System.out.println("绑定端口失败");
        }
        if(future.isCancelled()){
            System.out.println("绑定端口取消");
        }
        System.out.println("失败原因 : " + future.cause());
    }
});



四、 Future-Listener 机制代码示例


1 . 代码示例 : 这里以服务器程序为例 , 客户端程序就不贴了 ;


package kim.hsl.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * Netty 案例服务器端
 */
public class Server {
    public static void main(String[] args) {
        // 1. 创建 BossGroup 线程池 和 WorkerGroup 线程池, 其中维护 NioEventLoop 线程
        //     NioEventLoop 线程中执行无限循环操作
        // BossGroup 线程池 : 负责客户端的连接
        // 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // WorkerGroup 线程池 : 负责客户端连接的数据读写
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 2. 服务器启动对象, 需要为该对象配置各种参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor
                .channel(NioServerSocketChannel.class)  // 设置 NIO 网络套接字通道类型
                .option(ChannelOption.SO_BACKLOG, 128)  // 设置线程队列维护的连接个数
                .childOption(ChannelOption.SO_KEEPALIVE, true)  // 设置连接状态行为, 保持连接状态
                .childHandler(  // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handler
                        new ChannelInitializer<SocketChannel>() {// 创建通道初始化对象
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                // 该方法在服务器与客户端连接建立成功后会回调
                                // 为 管道 Pipeline 设置处理器 Hanedler
                                // 这里暂时设置为 null , 执行不会失败 , 服务器绑定端口会成功
                                ch.pipeline().addLast(null);
                            }
                        }
                );
        System.out.println("服务器准备完毕 ...");
        ChannelFuture channelFuture = null;
        try {
            // 绑定本地端口, 进行同步操作 , 并返回 ChannelFuture
            channelFuture = bootstrap.bind(8888).sync();
            System.out.println("服务器开始监听 8888 端口 ...");
    // ( 本次示例核心代码 ) ----------------------------------------------------------
            // 监听绑定操作的结果 ( 本次示例核心代码 )
            // 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if(future.isDone()){
                        System.out.println("绑定端口完成");
                    }
                    if(future.isSuccess()){
                        System.out.println("绑定端口成功");
                    }else{
                        System.out.println("绑定端口失败");
                    }
                    if(future.isCancelled()){
                        System.out.println("绑定端口取消");
                    }
                    System.out.println("失败原因 : " + future.cause());
                }
            });
            // ( 本次示例核心代码 ) ----------------------------------------------------------
            // 关闭通道 , 开始监听操作
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 出现异常后, 优雅的关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}




2 . 执行结果 : 执行上述服务器 , 由此可见 绑定 bind 操作执行完成 , 并且执行成功 , 没有失败 , 因此失败原因为 null ;

image.png


目录
相关文章
|
缓存 网络协议 Dubbo
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
82 0
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
2月前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
|
5月前
|
监控 网络协议 Java
Java一分钟之-Netty:高性能异步网络库
【6月更文挑战第11天】Netty是Java的高性能异步网络框架,基于NIO,以其高吞吐量、低延迟、灵活性和安全性受到青睐。常见问题包括内存泄漏、ChannelHandler滥用和异常处理不当。要规避这些问题,需正确释放ByteBuf,精简ChannelPipeline,妥善处理异常,并深入理解Netty原理。通过代码审查、遵循最佳实践和监控日志,可提升代码质量和性能。掌握Netty,打造高效网络服务。
86 2
|
4月前
|
安全 NoSQL Java
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
|
6月前
|
消息中间件 监控 Java
滴滴面试:谈谈你对Netty线程模型的理解?
Netty 线程模型是指 Netty 框架为了提供高性能、高并发的网络通信,而设计的管理和利用线程的策略和机制。 **Netty 线程模型被称为 Reactor(响应式)模型/模式,它是基于 NIO 多路复用模型的一种升级,它的核心思想是将 IO 事件和业务处理进行分离,使用一个或多个线程来执行任务的一种机制。** ## 1.**Reactor三大组件** Reactor 包含以下三大组件: ![image.png](https://cdn.nlark.com/yuque/0/2024/png/92791/1717079218890-89000a00-48bc-4a1a-b87e-e1b6
66 2
|
6月前
|
存储 缓存 运维
时间轮奇妙旅程:深度解析Netty中的时间轮机制
时间轮奇妙旅程:深度解析Netty中的时间轮机制
251 1
|
6月前
|
前端开发 网络协议
启动异步之旅:探索Netty中Bootstrap的神奇世界
启动异步之旅:探索Netty中Bootstrap的神奇世界
40 0
|
6月前
|
前端开发 网络协议 Java
Netty入门指南:从零开始的异步网络通信
Netty入门指南:从零开始的异步网络通信
180 0
|
弹性计算 Java Unix
搭稳Netty开发的地基,用漫画帮你分清同步异步阻塞非阻塞
Netty Netty是一款非常优秀的网络编程框架,是对NIO的二次封装,本文将重点剖析Netty客户端的启动流程,深入底层了解如何使用NIO编程客户端。 Linux网络编程5种IO模型 根据UNIX网络编程对于IO模型的分类,UNIX提供了5种IO模型,分别是 阻塞IO 、 非阻塞IO、 IO复用 、 信号驱动IO 、 异步IO 。这几种IO模型在《UNIX网络编程》中有详解,这里作者只简单介绍,帮助大家回忆一下这几种模型。 对于Linux来说,所有的操作都是基于文件的,也就是我们非常熟悉的fd,在缺省的情况下,基于文件的操作都是 阻塞的 。下面就通过系统调用 recvfrom 来回顾下
110 0