【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )

文章目录

一、 Netty 模型代码解析

二、 Netty 案例服务器端代码

1 . 服务器主程序

2 . 服务器自定义 Handler 处理者

三、 Netty 案例客户端代码

1 . 客户端主程序

2 . 客户端自定义 Handler 处理者

四、 Netty 案例运行





一、 Netty 模型代码解析


1 . 线程池 NioEventLoopGroup :



① NioEventLoopGroup 线程池使用场景 : Netty 模型中的 BossGroup 和 WorkerGroup 都是 NioEventLoopGroup 类型的线程池 ;


② NioEventLoopGroup 默认线程个数 : 系统默认每个线程池中的 NioEventLoop 线程数是 CPU 核数 × \times× 2 , 下面的代码可以获取运行 Netty 程序的设备的 CPU 核数 ;


// 获取设备的 CPU 核数
NettyRuntime.availableProcessors()


③ 指定 NioEventLoopGroup 线程个数 : 如果不想使用 Netty 线程池的默认线程个数 , 可以在 NioEventLoopGroup 构造函数中子星设定线程数 ;


// BossGroup 线程池 : 负责客户端的连接
// 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);



2 . NioEventLoopGroup 线程池线程分配 :



以客户端连接完成后 , 数据读写场景举例 ;


在 双核 CPU 的服务器上 , NioEventLoopGroup 默认有 4 44 个线程 ; 按照顺序循环分配 , 为第 n nn 个客户端分配第 n % 4 n \% 4n%4 个 NioEventLoop 线程 ;


客户端 0 00 与服务器进行数据交互在 NioEventLoop 0 00 线程中 ;

客户端 1 11 与服务器进行数据交互在 NioEventLoop 1 11 线程中 ;

客户端 2 22 与服务器进行数据交互在 NioEventLoop 2 22 线程中 ;

客户端 3 33 与服务器进行数据交互在 NioEventLoop 3 33 线程中 ;

客户端 4 44 与服务器进行数据交互在 NioEventLoop 0 00 线程中 ;

客户端 5 55 与服务器进行数据交互在 NioEventLoop 1 11 线程中 ;

客户端 6 66 与服务器进行数据交互在 NioEventLoop 2 22 线程中 ;

客户端 7 77 与服务器进行数据交互在 NioEventLoop 3 33 线程中 ;



3 . NioEventLoopGroup 线程池封装内容 :



① NioEventLoopGroup 中的若干个 NioEventLoop 线程都封装在 children 中 , 线程个数是 CPU 核数 2 倍 ;


② 每个 NioEventLoop 线程中封装了如下内容 :


选择器 ( Selector ) , 用于监听客户端的读写 IO 事件 ;

任务队列 ( taskQueue ) , 用于存储事件对应的业务逻辑任务 ;

线程执行器 ( executor ) , 用于执行线程 ;



4 . ChannelHandlerContext 通道处理者上下文对象封装内容 :



① 用户自定义的 处理者 ( Handler ) , 这里指的是 服务器端的 ServerHandr ( 自定义 ) , 客户端的 ClientHandler ( 自定义 ) ;


② 管道 ( ChannelPipeline ) : 其本质是双向链表 , 该 ChannelHandlerContext 可以获取该链表的前一个 ( prev ) , 后一个管道对象 ( next ) ;


③ 管道 与 通道 :


二者都可以通过 通道处理者上下文 ( ChannelHandlerContext ) 获取 ;

管道 与 通道 都可以互相从对方获取 ;

Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
channel = pipeline.channel();
pipeline = channel.pipeline();


④ 管道 ( Pipeline ) 与 通道 ( Channel ) 关联 : 通过管道可以获取通道 , 通过通道也可以获取其对应的管道 ;




5 . 处理者 ( Handler ) :



① 设置 Handler : 给 WorkerGroup 线程池中的 EventLoop 线程对应的管道设置 处理器 ( Handler ) ;


② 自定义 Handler : 一般这个 Handler 都是用户自定义的类 , 继承 ChannelInboundHandlerAdapter 类 ;


③ 运行机制 : 在 BossGroup 中连接客户端成功后 , 将 NioSocketChannel 注册给 WorkerGroup 中的 EventLoop 中的 选择器 ( Selector ) , 如果监听到客户端数据 IO 事件 , 就会调用 管道 ( Pipeline ) 处理该事件 , 管道 ( Pipeline ) 中调用 处理器 ( Handler ) 处理相应的事件 , 该 处理器 ( Handler ) 可以是 Netty 提供的 , 也可以是开发者自定义的 ;



特别注意 : 自定义 Handler 中 , 重写的 ChannelInboundHandlerAdapter 方法 , 将 super() 语句都删除 ;






二、 Netty 案例服务器端代码



1 . 服务器主程序

package kim.hsl.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * Netty 案例服务器端
 */
public class Server {
    public static void main(String[] args) {
        // 1. 创建 BossGroup 线程池 和 WorkerGroup 线程池, 其中维护 NioEventLoop 线程
        //     NioEventLoop 线程中执行无限循环操作
        // BossGroup 线程池 : 负责客户端的连接
        // 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // WorkerGroup 线程池 : 负责客户端连接的数据读写
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 2. 服务器启动对象, 需要为该对象配置各种参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor
                .channel(NioServerSocketChannel.class)  // 设置 NIO 网络套接字通道类型
                .option(ChannelOption.SO_BACKLOG, 128)  // 设置线程队列维护的连接个数
                .childOption(ChannelOption.SO_KEEPALIVE, true)  // 设置连接状态行为, 保持连接状态
                .childHandler(  // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handler
                        new ChannelInitializer<SocketChannel>() {// 创建通道初始化对象
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                // 为 管道 Pipeline 设置处理器 Hanedler
                                ch.pipeline().addLast(new ServerHandr());
                            }
                        }
                );
        System.out.println("服务器准备完毕 ...");
        ChannelFuture cf = null;
        try {
            // 绑定本地端口, 进行同步操作 , 并返回 ChannelFuture
            cf = bootstrap.bind(8888).sync();
            System.out.println("服务器开始监听 8888 端口 ...");
            // 关闭通道 , 开始监听操作
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 出现异常后, 优雅的关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}



2 . 服务器自定义 Handler 处理者

package kim.hsl.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
/**
 * Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类
 *
 * 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类
 * 才可以设置给 NioEventLoop 线程
 *
 * 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发
 */
public class ServerHandr extends ChannelInboundHandlerAdapter {
    /**
     * 读取数据 : 在服务器端读取客户端发送的数据
     * @param ctx
     *      通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息
     *      管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler
     *      通道 ( Channel ) : 注重数据读写
     * @param msg
     *      客户端上传的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 查看 ChannelHandlerContext 中封装的内容
        System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);
        // 将客户端上传的数据转为 ByteBuffer
        // 这里注意该类是 Netty 中的 io.netty.buffer.ByteBuf 类
        // 不是 NIO 中的 ByteBuffer
        // io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBuffer
        ByteBuf byteBuf = (ByteBuf) msg;
        // 将 ByteBuf 缓冲区数据转为字符串, 打印出来
        System.out.println(ctx.channel().remoteAddress() + " 接收到客户端发送的数据 : " + 
          byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 服务器端读取数据完毕后回调的方法
     * @param ctx
     *      通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息
     *      *      管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler
     *      *      通道 ( Channel ) : 注重数据读写
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 数据编码 : 将字符串编码, 存储到 io.netty.buffer.ByteBuf 缓冲区中
        ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);
        // 写出并刷新操作 : 写出数据到通道的缓冲区 ( write ), 并执行刷新操作 ( flush )
        ctx.writeAndFlush(byteBuf);
    }
    /**
     * 异常处理 , 上面的方法中都抛出了 Exception 异常, 在该方法中进行异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("通道异常, 关闭通道");
        //如果出现异常, 就关闭该通道
        ctx.close();
    }
}







三、 Netty 案例客户端代码



1 . 客户端主程序

package kim.hsl.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
    public static void main(String[] args) {
        // 客户端只需要一个 时间循环组 , 即 NioEventLoopGroup 线程池
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        // 客户端启动对象
        Bootstrap bootstrap = new Bootstrap();
        // 设置相关参数
        bootstrap.group(eventLoopGroup)     // 设置客户端的线程池
                .channel(NioSocketChannel.class)    // 设置客户端网络套接字通道类型
                .handler(   // 设置客户端的线程池对应的 NioEventLoop 设置对应的事件处理器 Handler
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ClientHandr());
                            }
                        }
                );
        try {
            // 开始连接服务器, 并进行同步操作
            // ChannelFuture 类分析 , Netty 异步模型
            // sync 作用是该方法不会再次阻塞
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
            System.out.println("客户端连接服务器成功 ...");
            // 关闭通道, 开始监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 优雅的关闭
            eventLoopGroup.shutdownGracefully();
        }
    }
}





2 . 客户端自定义 Handler 处理者

package kim.hsl.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 * Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类
 *
 * 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类
 * 才可以设置给 NioEventLoop 线程
 *
 * 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发
 */
public class ClientHandr extends ChannelInboundHandlerAdapter {
    /**
     * 通道就绪后触发该方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 查看 ChannelHandlerContext 中封装的内容
        System.out.println("channelActive : ChannelHandlerContext ctx = " + ctx);
        // 数据编码 : 将字符串编码, 存储到 io.netty.buffer.ByteBuf 缓冲区中
        ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Server", CharsetUtil.UTF_8);
        // 写出并刷新操作 : 写出数据到通道的缓冲区 ( write ), 并执行刷新操作 ( flush )
        ctx.writeAndFlush(byteBuf);
        System.out.println("客户端向服务器端发送 Hello Server 成功");
    }
    /**
     * 读取数据 : 在服务器端读取客户端发送的数据
     * @param ctx
     *      通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息
     *      管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler
     *      通道 ( Channel ) : 注重数据读写
     * @param msg
     *      服务器返回的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 查看 ChannelHandlerContext 中封装的内容
        System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);
        // 将服务器下发的数据转为 ByteBuffer
        // 这里注意该类是 Netty 中的 io.netty.buffer.ByteBuf 类
        // 不是 NIO 中的 ByteBuffer
        // io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBuffer
        ByteBuf byteBuf = (ByteBuf) msg;
        // 将 ByteBuf 缓冲区数据转为字符串, 打印出来
        System.out.println(ctx.channel().remoteAddress() + " 服务器返回的数据 : " + 
          byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 异常处理 , 上面的方法中都抛出了 Exception 异常, 在该方法中进行异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("通道异常, 关闭通道");
        //如果出现异常, 就关闭该通道
        ctx.close();
    }
}







四、 Netty 案例运行


1 . 运行服务器端 : 服务器启动 , 监听 8888 端口 ;


image.png



2 . 运行客户端 : 客户端连接服务器的 8888 端口 , 并向服务器端写出 Hello Server 字符串 , 之后便接到服务器端回送的 Hello Client 字符串信息 ;

image.png



3 . 查看客户端 : 服务器端接收到客户端信息 , 向客户端写出 Hello Client 字符串 ;

image.png


目录
相关文章
|
30天前
|
Oracle 关系型数据库 数据库
服务器数据恢复—服务器raid5阵列数据恢复案例
一台服务器上的8块硬盘组建了一组raid5磁盘阵列。上层安装windows server操作系统,部署了oracle数据库。 raid5阵列中有2块硬盘的硬盘指示灯显示异常报警。服务器操作系统无法启动,ORACLE数据库也无法启动。
60 17
|
1月前
|
运维 数据挖掘 Windows
服务器数据恢复—服务器硬盘指示灯亮黄灯的数据恢复案例
服务器硬盘指示灯闪烁黄灯是一种警示,意味着服务器硬盘出现故障即将下线。发现这种情况建议及时更换硬盘。 一旦服务器上有大量数据频繁读写,硬盘指示灯会快速闪烁。服务器上某个硬盘的指示灯只有黄灯亮着,而其他颜色的灯没有亮的话,通常表示这块硬盘出现故障,这时候更换新硬盘同步数据即可。 如果没有及时发现硬盘损坏或者更换硬盘失败导致服务器崩溃,应该如何恢复数据呢?下面通过一个真实案例讲解一下服务器硬盘指示灯亮黄色的数据恢复案例。
|
19天前
|
存储 运维 资源调度
阿里云服务器经济型e实例解析:性能、稳定性与兼顾成本
阿里云经济型e云服务器以其高性价比、稳定可靠的性能以及灵活多样的配置选项,成为了众多企业在搭建官网时的首选。那么,阿里云经济型e云服务器究竟怎么样?它是否能够满足企业官网的搭建需求?本文将从性能表现、稳定性与可靠性、成本考虑等多个方面对阿里云经济型e云服务器进行深入剖析,以供大家参考选择。
|
19天前
|
安全 网络协议 网络安全
解析HTTP代理服务器不稳定致使掉线的关键原因
随着数字化发展,网络安全和隐私保护成为核心需求。HTTP代理服务器掉线原因主要包括:1. 网络问题,如本地网络不稳定、路由复杂;2. 服务器质量差、IP资源不稳定;3. 用户配置错误、超时或请求频率异常;4. IP失效或协议不兼容。这些问题会影响连接稳定性。
50 8
|
24天前
|
数据挖掘 数据库
服务器数据恢复—Zfs文件系统下误删除数据的恢复案例
服务器数据恢复环境&故障: 一台zfs文件系统的服务器,管理员误操作删除了服务器上的数据。
|
26天前
|
存储 数据挖掘 数据库
服务器数据恢复—EMC UNITY 400存储卷被误删除的数据恢复案例
EMC Unity 400存储连接了2台硬盘柜。2台硬盘柜上一共有21块硬盘(520字节)。21块盘组建了2组RAID6:一组有11块硬盘,一组有10块硬盘。 在存储运行过程中,管理员误操作删除了 2组POOL上的部分数据卷。
|
1月前
|
存储 算法 数据挖掘
服务器数据恢复—nas中raid6阵列失效,存储无法访问的数据恢复案例
一台nas上共有14块硬盘组建了一组raid6磁盘阵列。 该nas在工作过程中,raid6阵列中硬盘出现故障离线,导致raid6阵列失效,nas无法正常访问。
|
2月前
|
存储 数据挖掘 数据库
服务器数据恢复—OceanStor存储数据恢复案例
华为OceanStor T系列某型号存储中有一组由24块机械硬盘组建的一组RAID5阵列。 运行过程中该存储设备RAID5阵列上多块硬盘出现故障离线,阵列失效,存储中数据无法访问。
|
1月前
|
存储 数据挖掘
服务器数据恢复—zfs文件系统服务器数据恢复案例
一台配有32块硬盘的服务器在运行过程中突然崩溃不可用。经过初步检测,基本上确定服务器硬件不存在物理故障。管理员重启服务器后问题依旧。需要恢复该服务器中的数据。
|
2月前
|
运维 数据挖掘 索引
服务器数据恢复—Lustre分布式文件系统服务器数据恢复案例
5台节点服务器,每台节点服务器上有一组RAID5阵列。每组RAID5阵列上有6块硬盘(其中1块硬盘设置为热备盘,其他5块硬盘为数据盘)。上层系统环境为Lustre分布式文件系统。 机房天花板漏水导致这5台节点服务器进水,每台服务器都有至少2块硬盘出现故障。每台服务器中的RAID5阵列短时间内同时掉线2块或以上数量的硬盘,导致RAID崩溃,服务器中数据无法正常读取。

推荐镜像

更多