Netty 线程模型与基本使用

简介: Netty 线程模型与基本使用


原创
Se7enSe7en的架构笔记 2021-03-27 20:52

为什么使用 Netty

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能和高伸缩性的服务器和客户端。Netty 拥有高性能,吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。

Netty 和 NIO

NIO 的缺点

  • NIO 的类库和 API 繁杂,学习成本高,你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  • 需要熟悉 Java 多线程编程。这是因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的 NIO 程序。
  • 臭名昭著的 epoll bug。它会导致 Selector 空轮询,最终导致 CPU 使用率飙升至 100%。直到 JDK1.7 版本依然没得到根本性的解决。

Netty 的优点

  • Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,API使用简单,学习成本低。
  • 功能强大,内置了多种解码编码器,支持多种协议。
  • 性能高,对比其他主流的 NIO 框架,Netty 的性能最优。
  • 社区活跃,发现 BUG 会及时修复,迭代版本周期短,不断加入新的功能。Dubbo、Elasticsearch 都采用了 Netty,质量得到验证。

Netty线程模型

image.png模型解释

  • 1.Netty 抽象出两组线程池 BossGroup 和 WorkerGroup,BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写。
  • 2.BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup。NioEventLoopGroup 相当于是一个事件循环线程组,这个组内含有多个事件循环线程,每一个事件循环线程都是 NioEventLoop。
  • 3.每个 NioEventLoop 都有一个 Selector,用于监听注册在其上的 SocketChannel 的网络通讯。
  • 4.每个 Boss NioEventLoop 线程内部循环执行的3个步骤:
  • 处理 Accept 事件,与客户端建立连接,生成 NioSocketChannel。
  • 将 NioSocketChannel 注册到某个 Worker NioEventLoop 上的 Selector。
  • 处理任务队列的任务,即 runAllTask。
  • 5.每个 Worker NioEventLoop 线程内部循环执行的3个步骤:
  • 轮询注册到自己 Selector 上的所有 NioSocketChannel 的 read,write 事件。
  • 处理 I/O 事件,即 read,write 事件,在对应的 NioSocketChannel 处理业务。
  • runAllTasks 处理任务队列 TaskQueue 的任务,一些耗时的业务处理一般可以放入 TaskQueue 中慢慢处理,这样不影响数据在 Pipeline 中的流动处理。
  • 6.每个 Worker NioEventLoop 处理 NioSocketChannel 业务时,会使用 Pipeline(管道),Pipeline 中维护了很多的 handler 处理器用来处理 NioSocketChannel 中的数据。

Netty 客户端 & 服务器开发

创建并配置服务器启动器

Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

Bootstrap 和 ServerBootStrap 是 Netty 提供的一个创建客户端和服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类。image.png

group()

服务端要使用两个线程组:

  • bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到 workerGroup 的 Selector 中。
  • workerGroup 用于处理每一个连接发生的读写事件。

一般创建线程组直接使用以下new就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

默认的线程数是cpu核数的两倍。假设想自定义线程数,可以使用有参构造器:

//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为8
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

channel()

这个方法用于设置通道类型,当建立连接后,会根据这个设置创建对应的 Channel 实例。

常用的就是这两个通道类型,因为是异步非阻塞的。所以是首选:

  • NioSocketChannel:异步非阻塞的客户端 TCP Socket 连接。
  • NioServerSocketChannel:异步非阻塞的服务器端 TCP Socket 连接。

还有就是同步阻塞的,一般没什么人用:

  • OioSocketChannel:同步阻塞的客户端 TCP Socket 连接。
  • OioServerSocketChannel:同步阻塞的服务器端 TCP Socket 连接。

option() 与 childOption()

option() 设置的是服务端用于接收进来的连接,也就是 boosGroup 线程。option() 常用参数:

SO_BACKLOG //Socket 参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows 为200,其他为128。

childOption() 是提供给父管道接收到的连接,也就是 workerGroup 线程。childOption() 常用的参数:

SO_RCVBUF  //Socket 参数,TCP 数据接收缓冲区大小。
TCP_NODELAY //TCP 参数,立即发送数据,默认值为 True。
SO_KEEPALIVE //Socket 参数,连接保活,默认值为 False。启用该功能时,TCP 会主动探测空闲连接的有效性。

pipeline(ChannelPipeline)

ChannelPipeline 是 Netty 处理请求的责任链,ChannelHandler 则是具体处理请求的处理器。在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:image.png处理器 Handler 分为两种:ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)。

一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler,通过 ChannelHandlerContext 上下文对象,就可以拿到 Channel、Pipeline 等对象,就可以进行读写等操作。

read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 Handler。两种类型的 Handler 互不干扰,相同类型的 Handler 的处理顺序是有影响的。

在 Bootstrap 中 childHandler() 方法需要初始化通道,实例化一个 ChannelInitializer,这时候需要重写 initChannel() 初始化通道的方法,装配流水线就是在这个地方进行。代码如下:

//使用匿名内部类的形式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
    //对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
      ch.pipeline().addLast(new NettyServerHandler());
   }
 });

自定义 NettyServerHandler 代码如下:

package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 * 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
 * @author 程治玮
 * @since 2021/3/25 9:31 下午
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道 channel,管道 pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        //将 msg 转成一个 ByteBuf,类似 NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 数据读取完毕后的处理方法
     *
     * @param ctx 上下文对象, 含有通道 channel,管道 pipeline
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }
    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动,如果加上 sync() 方法则是同步。

// sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 异步
// 给cf注册监听器,监听我们关心的事件
channelFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
          if (channelFuture.isSuccess()) {
             System.out.println("监听端口9000成功");
          } else {
             System.out.println("监听端口9000失败");
          }
      }
 });

优雅地关闭 EventLoopGroup

//释放掉所有的资源,包括创建的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

服务端启动器完整代码

package com.chengzw.netty.base;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * Netty 服务端
 * @author 程治玮
 * @since 2021/3/25 9:31 下午
 */
public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建两个线程组 bossGroup 和 workerGroup, 含有的子线程 NioEventLoop 的个数默认为cpu核数的两倍
        // bossGroup 只是处理连接请求 ,真正的和客户端业务处理,会交给 workerGroup 完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); //1个线程
        EventLoopGroup workerGroup = new NioEventLoopGroup(8); //8个线程
        try {
            // 创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    // 使用 NioServerSocketChannel 作为服务器的通道实现,该类用于实例化新的 Channel 来接收客户端的连接
                    .channel(NioServerSocketChannel.class)
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start ...");
            // 绑定一个端口并且同步, 生成了一个 ChannelFuture 异步对象,通过 isDone() 等方法可以判断异步事件的执行情况
            // 启动服务器(并绑定端口),bind 是异步操作,sync 方法是等待异步操作执行完毕
            // sync 同步
            ChannelFuture channelFuture = bootstrap.bind(9000).sync();
            // 异步
            // 给cf注册监听器,监听我们关心的事件
            /*channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (channelFuture.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            // 等待服务端监听端口关闭,closeFuture是异步操作
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
            // 在这里面cf.channel().closeFuture().sync();这个语句的主要目的是,如果缺失上述代码,则main方法所在的线程,
            // 即主线程会在执行完bind().sync()方法后,会进入finally 代码块,之前的启动的nettyserver也会随之关闭掉,整个程序都结束了。
            // 原文的例子有英文注释:
            // Wait until the server socket is closed,In this example, this does not happen, but you can do that to gracefully shut down your server.
            // 线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver,
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 资源优雅释放
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

创建并配置客户端启动器

客户端只需要一个 NioEventLoopGroup,其余代码和服务器类似。首先自定义 NettyClientHandler 用于处理客户端 ChannelPipeline 的业务。

package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 * 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
 * @author 程治玮
 * @since 2021/3/25 9:50 下午
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }
    /**
     * 当通道有读取事件时会触发,即服务端发送数据给客户端
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }
    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

然后配置客户端启动器:

package com.chengzw.netty.base;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
 * Netty 客户端
 * @author 程治玮
 * @since 2021/3/25 9:52 下午
 */
public class NettyClient {
    public static void main(String[] args) throws Exception {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是ServerBootstrap而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //加入处理器
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("netty client start..");
            //启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

参考链接

目录
相关文章
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
21天前
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
22天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
15 1
|
2月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
96 20
剖析 Redis List 消息队列的三种消费线程模型
|
1月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
44 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
1月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
2月前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
|
1月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
23 0
|
4月前
|
缓存 编译器 Go
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
55 3
|
4月前
|
算法 调度 人工智能
人工智能线程问题之无锁化编程如何解决
人工智能线程问题之无锁化编程如何解决
48 2