【Netty】Netty 核心组件 ( Future | Channel | Selector | ChannelHandler )

简介: 【Netty】Netty 核心组件 ( Future | Channel | Selector | ChannelHandler )

文章目录

一、 Future / ChannelFuture 异步操作监听组件

二、 Channel 通道组件

三、 Selector 选择器组件

四、 ChannelHandler 通道处理器组件





一、 Future / ChannelFuture 异步操作监听组件


1 . Netty 中的 IO 操作 : Netty 中的 IO 操作 , 如 数据读取 Read , 数据写出 Write , 接受客户端连接 Accept , 连接服务器 Connect 等 4 44 种 IO 操作 ;



2 . 异步操作 : 这些 IO 操作都是异步的 , 调用相应的 IO 方法后 , 相应的操作异步执行 , 调用 IO 方法的代码位置不产生阻塞 ;



3 . 获取执行结果 : IO 方法调用后 , 不能立刻得到执行的结果 , 只返回一个 Future 对象 , 通过该 Future 对象可以异步监听 IO 操作的结果 ;



4 . 注册监听 : 为 Future 对象添加 ChannelFutureListener 监听器 , 当异步 IO 操作执行完毕后 , 会回调监听器的 operationComplete 方法 ;


// 监听绑定操作的结果
// 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if(future.isDone()){
            System.out.println("绑定端口完成");
        }
        if(future.isSuccess()){
            System.out.println("绑定端口成功");
        }else{
            System.out.println("绑定端口失败");
        }
        if(future.isCancelled()){
            System.out.println("绑定端口取消");
        }
        System.out.println("失败原因 : " + future.cause());
    }
});



5 . 获取通道 : 通过调用 ChannelFuture 对象的 channel 方法可以获取到当前 IO 操作对应的通道 ;


// 获取并关闭通道 , 开始监听操作
channelFuture.channel().closeFuture().sync();


6 . 等待异步操作完成 : 调用 ChannelFuture 对象的 sync 方法 , 可以等待该异步操作完成后 , 在执行之后的操作 , 相当于将异步操作变成了同步操作 ;


// 绑定本地端口, 进行同步操作 , 并返回 ChannelFuture
ChannelFuture channelFuture = channelFuture = bootstrap.bind(8888).sync();





二、 Channel 通道组件


1 . 注意与 NIO 通道区分 : 该 Channel 组件不是 NIO 中的通道 , 是 Netty 中的 io.netty.channel 包的类 ;



2 . Channel 通道组件作用 : 执行 IO 操作 , 获取通道状态 , 获取通道配置参数 ;



① 执行 Netty 中的 IO 操作 , 如数据写出 , 读取 , 连接 , 接受连接 等操作 ;


② Channel 通道组件获取通道状态 ;


isOpen : 通道是否打开 ;

isRegistered : 是否注册 ;

isWritable : 是否可写 ;

③ Channel 通道组件获取网络配置参数 , 如发送和接收的缓冲区等 ;




3 . Channel 通道组件提供的异步操作 :



① 提供的异步 IO 操作 : Channel 通道提供的 IO 操作都是异步的 , 如 数据读取 Read , 数据写出 Write , 接受客户端连接 Accept , 连接服务器 Connect ;


② 异步操作结果获取 : IO 操作调用后 , 立刻返回 ChannelFuture 对象 , 此时不知道是否执行成功 , 也不知道执行结果 , 可以给 ChannelFuture 对象设置监听器 , 获取执行结果 ;




4 . Channel 通道关联处理器 : Channel 中的 IO 操作可以关联 Handler 处理器 ;




5 . 常用的 Channel 组件类型 : 可以根据 协议 , 阻塞类型 , 选择合适的 Channel 组件 ;


Channel 通道组件类型 协议类型 阻塞类型 ( 同步 / 异步 ) 位置 ( 服务器 / 客户端 )

NioServerSocketChannel 异步 TCP 服务器

NioSocketChannel 异步 TCP 客户端

NioDatagramChannel 异步 UDP 服务器 / 客户端

NioSctpChannel 异步 SCTP 客户端

NioSctpServerChannel 异步 SCTP 服务器

其中 SCTP 协议包含 UDP , TCP , 文件 IO 等相关协议和操作 ;




6 . Channel 类型设置 : 调用 ServerBootstrap 的 channel 方法 , 可以设置通道类型 ;


ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) 
         // 核心代码 ---------------------------------------------------------
        .channel(NioServerSocketChannel.class)  // 设置 NIO 网络套接字通道类型
         // 核心代码 ---------------------------------------------------------
        .option(ChannelOption.SO_BACKLOG, 128)  
        .childOption(ChannelOption.SO_KEEPALIVE, true)  
        .handler(null)
        .childHandler(null);


7 . 获取 Channel : BootStrap 启动后 , 返回 ChannelFuture 对象 , 调用 ChannelFuture 对象的 channel() 方法即可获取对应的通道 ;


// 1. 绑定本地端口, 进行同步操作 , 并返回 ChannelFuture
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 2. 获取通道
Channel Channel = channelFuture.channel();





三、 Selector 选择器组件


1 . Netty 中的 Selector 选择器组件 :



① 实现多路复用 : Selector 选择器是 Netty 中实现 多路 IO 复用的最重要的手段 ;


② 在 NioEventLoop 线程中维护 : 选择器 Selector 在 NioEventLoopGroup 线程池中的 NioEventLoop 线程中维护 ;


③ 单线程监听多通道 : 借助 Selector 选择器 , 可以实现 一个 NioEventLoop 线程 , 监听多个客户端连接对应的 Channel 通道事件 ;




2 . 选择器 Selector 运行机制 :



① 注册通道 : 注册 Channel 通道到 Selector 选择器 ;


② 监听事件 : 选择器 Selector 调用 select 方法 , 监听该 Selector 注册的通道 , 查看是否有 IO 事件触发 ;


③ 可触发的 IO 事件列举 : 数据读取 Read , 数据写出 Write , 接受客户端连接 Accept , 连接服务器 Connect 等 4 44 种可触发的 IO 事件 ;



使用上述 Selector 选择器监听 Channel 通道事件机制 , 可以在单个 NioEventLoop 线程中 , 实现了多个客户端 IO 操作的管理 ;






四、 ChannelHandler 通道处理器组件


1 . ChannelHandler 通道处理器组件 :



① ChannelHandler 作用 : 其实现类主要作用是 处理 或 拦截 IO 事件 , 将其转给对应的 ChannelPipeline 管道进行业务逻辑的处理 ;


② ChannelHandler 使用 : ChannelHandler 是接口 , 不能直接使用 , 使用的时候 , 需要使用 ChannelHandler 接口的 实现类 , 常用的类下面会介绍 ;




2 . 入站 和 出站 概念 :



① 入站 : 从管道读取数据 , 相当于有数据进来 ;


② 出站 : 向管道输出数据 , 相当于写出数据 ;




3 . 常用的 ChannelHandler 类列举 :



ChannelInboundHandler : 处理数据入站事件 , 即其它设备向本设备发送数据 ;


ChannelOutboundHandler : 处理数据出站事件 , 即本设备写出数据到其它设备 ;


ChannelDuplexHandler ( 不推荐使用 ) : 该类继承了 ChannelInboundHandler , 实现了 ChannelOutboundHandler 接口 , 因此该类既可以处理数据入站 , 又可以处理数据出站 ; 但是一般情况下不使用该类 , 容易产生混淆 ;


ChannelInboundHandlerAdapter : 入站 IO 事件处理器适配器 ;


ChannelOutboundHandlerAdapter : 出站 IO 事件处理器适配器 ;




4 . ChannelInboundHandlerAdapter 常用方法 :



① 通道就绪 : 通道就绪后回调该函数 ;


public void channelActive(ChannelHandlerContext ctx) throws Exception

1


② 数据读取 : 当有数据入站时 , 回调该函数 ;


public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception


③ 数据读取完毕 : 当数据读取完毕后 , 回调该函数 ;


public void channelReadComplete(ChannelHandlerContext ctx) throws Exception



④ 异常回调 : 发生异常时 , 回调该函数 ;


public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception



5 . 代码示例 : 这是之前服务器端的 ChannelInboundHandlerAdapter 子类示例 , 用于处理服务器端的业务逻辑 ;


package kim.hsl.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
 * Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类
 *
 * 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类
 * 才可以设置给 NioEventLoop 线程
 *
 * 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 读取数据 : 在服务器端读取客户端发送的数据
     * @param ctx
     *      通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息
     *      管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler
     *      通道 ( Channel ) : 注重数据读写
     * @param msg
     *      客户端上传的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 查看 ChannelHandlerContext 中封装的内容
        System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);
        // 从 ChannelHandlerContext ctx 中获取通道
        Channel channel = ctx.channel();
        // 获取通道对应的事件循环
        EventLoop eventLoop = channel.eventLoop();
        // 在 Runnable 中用户自定义耗时操作, 异步执行该操作, 该操作不能阻塞在此处执行
        // schedule(Runnable command, long delay, TimeUnit unit)
        // Runnable command 参数 : 异步任务
        // long delay 参数 : 延迟执行时间
        // TimeUnit unit参数 : 延迟时间单位, 秒, 毫秒, 分钟
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                //执行耗时操作
            }
        }, 100, TimeUnit.MILLISECONDS);
        // 将客户端上传的数据转为 ByteBuffer
        // 这里注意该类是 Netty 中的 io.netty.buffer.ByteBuf 类
        // 不是 NIO 中的 ByteBuffer
        // io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBuffer
        ByteBuf byteBuf = (ByteBuf) msg;
        // 将 ByteBuf 缓冲区数据转为字符串, 打印出来
        System.out.println(ctx.channel().remoteAddress() + " 接收到客户端发送的数据 : " + 
              byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 服务器端读取数据完毕后回调的方法
     * @param ctx
     *      通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息
     *      *      管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler
     *      *      通道 ( Channel ) : 注重数据读写
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 数据编码 : 将字符串编码, 存储到 io.netty.buffer.ByteBuf 缓冲区中
        ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);
        // 写出并刷新操作 : 写出数据到通道的缓冲区 ( write ), 并执行刷新操作 ( flush )
        ctx.writeAndFlush(byteBuf);
    }
    /**
     * 异常处理 , 上面的方法中都抛出了 Exception 异常, 在该方法中进行异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("通道异常, 关闭通道");
        //如果出现异常, 就关闭该通道
        ctx.close();
    }
}


目录
相关文章
|
8月前
|
编解码 开发者
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
160 0
|
前端开发 Java 数据处理
【Netty】Netty 异步任务模型 及 Future-Listener 机制
【Netty】Netty 异步任务模型 及 Future-Listener 机制
837 0
【Netty】Netty 异步任务模型 及 Future-Listener 机制
|
监控 安全 Java
Netty之EventLoop 解读
Netty之EventLoop 解读
|
安全
Netty之EventLoop
EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理channel上源源不断的io事件。
Netty之EventLoop
|
安全
关于netty的EventLoop
关于netty的EventLoop
关于netty的EventLoop
|
Java API 调度
Netty组件EventLoopGroup和EventLoop源码分析
Netty组件EventLoopGroup和EventLoop源码分析
84 0
|
前端开发
netty系列之:channel,ServerChannel和netty中的实现
netty系列之:channel,ServerChannel和netty中的实现
netty系列之:channel,ServerChannel和netty中的实现
|
存储 前端开发
netty系列之:EventLoop,EventLoopGroup和netty的默认实现
netty系列之:EventLoop,EventLoopGroup和netty的默认实现
netty系列之:EventLoop,EventLoopGroup和netty的默认实现
|
存储 监控 API
netty系列之:netty中的Channel详解
netty系列之:netty中的Channel详解
netty系列之:netty中的Channel详解