Netty 使用
对 Netty 有了大概的认识之后,接下来我们用 Netty 来编写一个基础的通讯服务器,它包含两个端:服务器端和客户端,客户端负责发送消息,服务器端负责接收并打印消息,具体的实现步骤如下。
1.添加 Netty 框架
首先我们需要先添加 Netty 框架的支持,如果是 Maven 项目添加如下配置即可:
<!-- 添加 Netty 框架 --> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.56.Final</version> </dependency>
Netty 版本说明
Netty 的 3.x 和 4.x 为主流的稳定版本,而最新的 5.x 已经是放弃的测试版了,因此推荐使用 Netty 4.x 的最新稳定版。
2. 服务器端实现代码
按照官方的推荐,这里将服务器端的代码分为以下 3 个部分:
- MyNettyServer:服务器端的核心业务代码;
- ServerInitializer:服务器端通道(Channel)初始化;
- ServerHandler:服务器端接收到信息之后的处理逻辑。
PS:Channel 字面意思为“通道”,它是网络通信的载体。Channel 提供了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 是以 JDK NIO Channel 为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel 更加强大的功能,你在使用 Netty 时基本不需要再与 Java Socket 类直接打交道。
服务器端的实现代码如下:
// 定义服务器的端口号 static final int PORT = 8007; /** * 服务器端 */ static class MyNettyServer { public static void main(String[] args) { // 创建一个线程组,用来负责接收客户端连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 创建另一个线程组,用来负责 I/O 的读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建一个 Server 实例(可理解为 Netty 的入门类) ServerBootstrap b = new ServerBootstrap(); // 将两个线程池设置到 Server 实例 b.group(bossGroup, workerGroup) // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器) .channel(NioServerSocketChannel.class) // 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类) .childHandler(new ServerInitializer()); // 绑定端口并且进行同步 ChannelFuture future = b.bind(PORT).sync(); // 对关闭通道进行监听 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 资源关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服务端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服务器端连接之后的执行器(自定义的类) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); /** * 初始化通道的具体执行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 设置 ChannelPipeline pipeline = ch.pipeline(); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服务器端连接之后的执行器,接收到消息之后的业务处理 pipeline.addLast(SERVER_HANDLER); } } /** * 服务器端接收到消息之后的业务处理类 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 读取到客户端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客户端的消息:" + request); } } /** * 数据读取完毕 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 异常处理,打印异常并关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }