【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )

简介: 【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )

文章目录

一、 任务队列 TaskQueue

二、 处理器 Handler 同步异步操作

三、 异步任务 ( 用户自定义任务 )

四、 异步任务 ( 用户自定义定时任务 )

五、 异步任务 ( 其它线程向本线程调度任务 )





一、 任务队列 TaskQueue


任务队列 TaskQueue 的任务 Task 应用场景 :



① 自定义任务 : 自己开发的任务 , 然后将该任务提交到任务队列中 ;


② 自定义定时任务 : 自己开发的任务 , 然后将该任务提交到任务队列中 , 同时可以指定任务的执行时间 ;


③ 其它线程调度任务 : 上面的任务都是在当前的 NioEventLoop ( 反应器 Reactor 线程 ) 中的任务队列中排队执行 , 在其它线程中也可以调度本线程的 Channel 通道与该线程对应的客户端进行数据读写 ;






二、 处理器 Handler 同步异步操作


在之前的 Netty 服务器与客户端项目中 , 用户自定义的 Handler 处理器 , 该处理器继承了 ChannelInboundHandlerAdapter 类 , 在重写的 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 方法中 , 执行的业务逻辑要注意以下两点 :


同步操作 : 如果在该业务逻辑中只执行一个短时间的操作 , 那么可以直接执行 ;

异步操作 : 如果在该业务逻辑中执行访问数据库 , 访问网络 , 读写本地文件 , 执行一系列复杂计算等耗时操作 , 肯定不能在该方法中处理 , 这样会阻塞整个线程 ; 正确的做法是将耗时的操作放入任务队列 TaskQueue , 异步执行 ;

在 ChannelInboundHandlerAdapter 的 channelRead 方法执行时 , 客户端与服务器端的反应器 Reactor 线程 NioEventLoop 是处于阻塞状态的 , 此时服务器端与客户端同时都处于阻塞状态 , 这样肯定不行 , 因为 NioEventLoop 需要为多个客户端服务 , 不能因为与单一客户端交互而产生阻塞 ;






三、 异步任务 ( 用户自定义任务 )


1 . 用户自定义任务流程 :



① 获取通道 : 首先获取 通道 Channel ;


② 获取线程 : 获取通道对应的 EventLoop 线程 , 就是 NioEventLoop , 该 NioEventLoop 中封装了任务队列 TaskQueue ;


③ 任务入队 : 向任务队列 TaskQueue 中放入异步任务 Runnable , 调用 NioEventLoop 线程的 execute 方法 , 即可将上述 Runnable 异步任务放入任务队列 TaskQueue ;




2 . 多任务执行 : 如果用户连续向任务队列中放入了多个任务 , NioEventLoop 会按照顺序先后执行这些任务 , 注意任务队列中的任务 是先后执行 , 不是同时执行 ;


顺序执行任务 ( 不是并发 ) : 任务队列任务执行机制是顺序执行的 ; 先执行第一个 , 执行完毕后 , 从任务队列中获取第二个任务 , 执行完毕之后 , 依次从任务队列中取出任务执行 , 前一个任务执行完毕后 , 才从任务队列中取出下一个任务执行 ;




3 . 代码示例 : 监听到客户端上传数据后 , channelRead 回调 , 执行 获取通道 -> 获取线程 -> 异步任务调度 流程 ;


/**
 * Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类
 *
 * 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类
 * 才可以设置给 NioEventLoop 线程
 *
 * 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发
 */
public class ServerHandr extends ChannelInboundHandlerAdapter {
    /**
     * 读取数据 : 在服务器端读取客户端发送的数据
     * @param ctx
     *      通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息
     *      管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler
     *      通道 ( Channel ) : 注重数据读写
     * @param msg
     *      客户端上传的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 1 . 从 ChannelHandlerContext ctx 中获取通道
        Channel channel = ctx.channel();
        // 2 . 获取通道对应的事件循环
        EventLoop eventLoop = channel.eventLoop();
        // 3 . 在 Runnable 中用户自定义耗时操作, 异步执行该操作, 该操作不能阻塞在此处执行
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                //执行耗时操作
            }
        });
    }
}





四、 异步任务 ( 用户自定义定时任务 )



1 . 用户自定义定时任务 与 用户自定义任务流程基本类似 , 有以下两个不同之处 :



① 调度方法 :


定时异步任务使用 schedule 方法进行调度 ;

普通异步任务使用 execute 方法进行调度 ;

② 任务队列 :


定时异步任务提交到 ScheduleTaskQueue 任务队列中 ;

普通异步任务提交到 TaskQueue 任务队列中 ;



2 . 用户自定义定时任务流程 :



① 获取通道 : 首先获取 通道 Channel ;


② 获取线程 : 获取通道对应的 EventLoop 线程 , 就是 NioEventLoop , 该 NioEventLoop 中封装了任务队列 TaskQueue ;


③ 任务入队 : 向任务队列 ScheduleTaskQueue 中放入异步任务 Runnable , 调用 NioEventLoop 线程的 schedule 方法 , 即可将上述 Runnable 异步任务放入任务队列 ScheduleTaskQueue ;




3 . 代码示例 : 监听到客户端上传数据后 , channelRead 回调 , 执行 获取通道 -> 获取线程 -> 异步任务调度 流程 ;


/**
 * Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类
 *
 * 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类
 * 才可以设置给 NioEventLoop 线程
 *
 * 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发
 */
public class ServerHandr extends ChannelInboundHandlerAdapter {
    /**
     * 读取数据 : 在服务器端读取客户端发送的数据
     * @param ctx
     *      通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息
     *      管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler
     *      通道 ( Channel ) : 注重数据读写
     * @param msg
     *      客户端上传的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 1 . 从 ChannelHandlerContext ctx 中获取通道
        Channel channel = ctx.channel();
        // 2 . 获取通道对应的事件循环
        EventLoop eventLoop = channel.eventLoop();
        // 3 . 在 Runnable 中用户自定义耗时操作, 异步执行该操作, 该操作不能阻塞在此处执行
        // schedule(Runnable command, long delay, TimeUnit unit)
        // Runnable command 参数 : 异步任务
        // long delay 参数 : 延迟执行时间
        // TimeUnit unit参数 : 延迟时间单位, 秒, 毫秒, 分钟
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                //执行耗时操作
            }
        }, 100, TimeUnit.MILLISECONDS);
    }
}






五、 异步任务 ( 其它线程向本线程调度任务 )


1 . 获取通道 Channel 即可调度异步任务 : 由上面的任务调度流程可知 , 只要获取到了本 NioEventLoop 线程对应的 Channel 通道 , 就可以获取该 NioEventLoop 线程的 EventLoop 事件调度器 , 向 ScheduleTaskQueue 或 TaskQueue 任务队列中加入异步任务 ;




2 . Channel 通道获取与管理 :



① Channel 通道获取 : 在服务器启动设置 ServerBootstrap 中 , 会设置 ChannelInitializer , 在与客户端的连接建立成功后 , 会回调 initChannel 方法 , 此时就会得到该客户端连接对应的通道 SocketChannel ;


② Channel 通道管理 : 在服务器中使用 Map 集合管理该 Channel 通道 , 需要时根据用户标识信息 , 获取该通道 , 向该客户端通道对应的 NioEventLoop 线程中调度任务 ;




3 . 代码示例 : 这里只展示一下 ChannelInitializer 的回调位置 , 不再详细描述怎么维护集合的过程了 , 自己定义 Map 集合维护 ;


// 服务器启动对象, 需要为该对象配置各种参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor
        .channel(NioServerSocketChannel.class)  // 设置 NIO 网络套接字通道类型
        .option(ChannelOption.SO_BACKLOG, 128)  // 设置线程队列维护的连接个数
        .childOption(ChannelOption.SO_KEEPALIVE, true)  // 设置连接状态行为, 保持连接状态
        .childHandler(  // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handler
                new ChannelInitializer<SocketChannel>() {// 创建通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 该方法在服务器与客户端连接建立成功后会回调
                        // 为 管道 Pipeline 设置处理器 Hanedler
                        ch.pipeline().addLast(new ServerHandr());
                    }
                }
        );



目录
相关文章
|
8月前
|
缓存 网络协议 Dubbo
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
52 0
|
6月前
|
缓存 编解码 网络协议
一文带你由浅入深Netty异步非阻塞世界
一文带你由浅入深Netty异步非阻塞世界
一文带你由浅入深Netty异步非阻塞世界
|
12天前
|
监控 网络协议 Java
Java一分钟之-Netty:高性能异步网络库
【6月更文挑战第11天】Netty是Java的高性能异步网络框架,基于NIO,以其高吞吐量、低延迟、灵活性和安全性受到青睐。常见问题包括内存泄漏、ChannelHandler滥用和异常处理不当。要规避这些问题,需正确释放ByteBuf,精简ChannelPipeline,妥善处理异常,并深入理解Netty原理。通过代码审查、遵循最佳实践和监控日志,可提升代码质量和性能。掌握Netty,打造高效网络服务。
18 2
|
1月前
|
前端开发 网络协议
启动异步之旅:探索Netty中Bootstrap的神奇世界
启动异步之旅:探索Netty中Bootstrap的神奇世界
19 0
|
1月前
|
前端开发 网络协议 Java
Netty入门指南:从零开始的异步网络通信
Netty入门指南:从零开始的异步网络通信
41 0
|
9月前
|
弹性计算 Java Unix
搭稳Netty开发的地基,用漫画帮你分清同步异步阻塞非阻塞
Netty Netty是一款非常优秀的网络编程框架,是对NIO的二次封装,本文将重点剖析Netty客户端的启动流程,深入底层了解如何使用NIO编程客户端。 Linux网络编程5种IO模型 根据UNIX网络编程对于IO模型的分类,UNIX提供了5种IO模型,分别是 阻塞IO 、 非阻塞IO、 IO复用 、 信号驱动IO 、 异步IO 。这几种IO模型在《UNIX网络编程》中有详解,这里作者只简单介绍,帮助大家回忆一下这几种模型。 对于Linux来说,所有的操作都是基于文件的,也就是我们非常熟悉的fd,在缺省的情况下,基于文件的操作都是 阻塞的 。下面就通过系统调用 recvfrom 来回顾下
75 0
|
1月前
|
前端开发 Java API
构建异步高并发服务器:Netty与Spring Boot的完美结合
构建异步高并发服务器:Netty与Spring Boot的完美结合
|
1月前
Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力
Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力
48 0
|
11月前
|
前端开发 JavaScript
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
Netty异步NIO框架(二)websocket 前端后端聊天 私聊及群聊
|
11月前
|
网络协议 前端开发 Java
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道
Netty异步NIO框架(一)java服务端与客户端实现聊天 websocket通道