文章目录
一、 任务队列 TaskQueue
二、 处理器 Handler 同步异步操作
三、 异步任务 ( 用户自定义任务 )
四、 异步任务 ( 用户自定义定时任务 )
五、 异步任务 ( 其它线程向本线程调度任务 )
一、 任务队列 TaskQueue
任务队列 TaskQueue 的任务 Task 应用场景 :
① 自定义任务 : 自己开发的任务 , 然后将该任务提交到任务队列中 ;
② 自定义定时任务 : 自己开发的任务 , 然后将该任务提交到任务队列中 , 同时可以指定任务的执行时间 ;
③ 其它线程调度任务 : 上面的任务都是在当前的 NioEventLoop ( 反应器 Reactor 线程 ) 中的任务队列中排队执行 , 在其它线程中也可以调度本线程的 Channel 通道与该线程对应的客户端进行数据读写 ;
二、 处理器 Handler 同步异步操作
在之前的 Netty 服务器与客户端项目中 , 用户自定义的 Handler 处理器 , 该处理器继承了 ChannelInboundHandlerAdapter 类 , 在重写的 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 方法中 , 执行的业务逻辑要注意以下两点 :
同步操作 : 如果在该业务逻辑中只执行一个短时间的操作 , 那么可以直接执行 ;
异步操作 : 如果在该业务逻辑中执行访问数据库 , 访问网络 , 读写本地文件 , 执行一系列复杂计算等耗时操作 , 肯定不能在该方法中处理 , 这样会阻塞整个线程 ; 正确的做法是将耗时的操作放入任务队列 TaskQueue , 异步执行 ;
在 ChannelInboundHandlerAdapter 的 channelRead 方法执行时 , 客户端与服务器端的反应器 Reactor 线程 NioEventLoop 是处于阻塞状态的 , 此时服务器端与客户端同时都处于阻塞状态 , 这样肯定不行 , 因为 NioEventLoop 需要为多个客户端服务 , 不能因为与单一客户端交互而产生阻塞 ;
三、 异步任务 ( 用户自定义任务 )
1 . 用户自定义任务流程 :
① 获取通道 : 首先获取 通道 Channel ;
② 获取线程 : 获取通道对应的 EventLoop 线程 , 就是 NioEventLoop , 该 NioEventLoop 中封装了任务队列 TaskQueue ;
③ 任务入队 : 向任务队列 TaskQueue 中放入异步任务 Runnable , 调用 NioEventLoop 线程的 execute 方法 , 即可将上述 Runnable 异步任务放入任务队列 TaskQueue ;
2 . 多任务执行 : 如果用户连续向任务队列中放入了多个任务 , NioEventLoop 会按照顺序先后执行这些任务 , 注意任务队列中的任务 是先后执行 , 不是同时执行 ;
顺序执行任务 ( 不是并发 ) : 任务队列任务执行机制是顺序执行的 ; 先执行第一个 , 执行完毕后 , 从任务队列中获取第二个任务 , 执行完毕之后 , 依次从任务队列中取出任务执行 , 前一个任务执行完毕后 , 才从任务队列中取出下一个任务执行 ;
3 . 代码示例 : 监听到客户端上传数据后 , channelRead 回调 , 执行 获取通道 -> 获取线程 -> 异步任务调度 流程 ;
/** * Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类 * * 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类 * 才可以设置给 NioEventLoop 线程 * * 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发 */ public class ServerHandr extends ChannelInboundHandlerAdapter { /** * 读取数据 : 在服务器端读取客户端发送的数据 * @param ctx * 通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息 * 管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler * 通道 ( Channel ) : 注重数据读写 * @param msg * 客户端上传的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 1 . 从 ChannelHandlerContext ctx 中获取通道 Channel channel = ctx.channel(); // 2 . 获取通道对应的事件循环 EventLoop eventLoop = channel.eventLoop(); // 3 . 在 Runnable 中用户自定义耗时操作, 异步执行该操作, 该操作不能阻塞在此处执行 eventLoop.execute(new Runnable() { @Override public void run() { //执行耗时操作 } }); } }
四、 异步任务 ( 用户自定义定时任务 )
1 . 用户自定义定时任务 与 用户自定义任务流程基本类似 , 有以下两个不同之处 :
① 调度方法 :
定时异步任务使用 schedule 方法进行调度 ;
普通异步任务使用 execute 方法进行调度 ;
② 任务队列 :
定时异步任务提交到 ScheduleTaskQueue 任务队列中 ;
普通异步任务提交到 TaskQueue 任务队列中 ;
2 . 用户自定义定时任务流程 :
① 获取通道 : 首先获取 通道 Channel ;
② 获取线程 : 获取通道对应的 EventLoop 线程 , 就是 NioEventLoop , 该 NioEventLoop 中封装了任务队列 TaskQueue ;
③ 任务入队 : 向任务队列 ScheduleTaskQueue 中放入异步任务 Runnable , 调用 NioEventLoop 线程的 schedule 方法 , 即可将上述 Runnable 异步任务放入任务队列 ScheduleTaskQueue ;
3 . 代码示例 : 监听到客户端上传数据后 , channelRead 回调 , 执行 获取通道 -> 获取线程 -> 异步任务调度 流程 ;
/** * Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类 * * 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类 * 才可以设置给 NioEventLoop 线程 * * 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发 */ public class ServerHandr extends ChannelInboundHandlerAdapter { /** * 读取数据 : 在服务器端读取客户端发送的数据 * @param ctx * 通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息 * 管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler * 通道 ( Channel ) : 注重数据读写 * @param msg * 客户端上传的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 1 . 从 ChannelHandlerContext ctx 中获取通道 Channel channel = ctx.channel(); // 2 . 获取通道对应的事件循环 EventLoop eventLoop = channel.eventLoop(); // 3 . 在 Runnable 中用户自定义耗时操作, 异步执行该操作, 该操作不能阻塞在此处执行 // schedule(Runnable command, long delay, TimeUnit unit) // Runnable command 参数 : 异步任务 // long delay 参数 : 延迟执行时间 // TimeUnit unit参数 : 延迟时间单位, 秒, 毫秒, 分钟 eventLoop.schedule(new Runnable() { @Override public void run() { //执行耗时操作 } }, 100, TimeUnit.MILLISECONDS); } }
五、 异步任务 ( 其它线程向本线程调度任务 )
1 . 获取通道 Channel 即可调度异步任务 : 由上面的任务调度流程可知 , 只要获取到了本 NioEventLoop 线程对应的 Channel 通道 , 就可以获取该 NioEventLoop 线程的 EventLoop 事件调度器 , 向 ScheduleTaskQueue 或 TaskQueue 任务队列中加入异步任务 ;
2 . Channel 通道获取与管理 :
① Channel 通道获取 : 在服务器启动设置 ServerBootstrap 中 , 会设置 ChannelInitializer , 在与客户端的连接建立成功后 , 会回调 initChannel 方法 , 此时就会得到该客户端连接对应的通道 SocketChannel ;
② Channel 通道管理 : 在服务器中使用 Map 集合管理该 Channel 通道 , 需要时根据用户标识信息 , 获取该通道 , 向该客户端通道对应的 NioEventLoop 线程中调度任务 ;
3 . 代码示例 : 这里只展示一下 ChannelInitializer 的回调位置 , 不再详细描述怎么维护集合的过程了 , 自己定义 Map 集合维护 ;
// 服务器启动对象, 需要为该对象配置各种参数 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor .channel(NioServerSocketChannel.class) // 设置 NIO 网络套接字通道类型 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列维护的连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置连接状态行为, 保持连接状态 .childHandler( // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handler new ChannelInitializer<SocketChannel>() {// 创建通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { // 该方法在服务器与客户端连接建立成功后会回调 // 为 管道 Pipeline 设置处理器 Hanedler ch.pipeline().addLast(new ServerHandr()); } } );