Seata 高性能RPC通信的实现基石-Netty篇

简介: Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

一、Netty 简述

Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。从下方所列举的特性中不难发现 Netty 优点很多。

学习 Netty 需要从了解与 Netty 相关的几个关键类开始,如BootstrapServerBootstrapChannelSelectorChannelFutureEventLoopEventLoopGroupChannelHandlerPipeline 等。这些类是 Netty 对网络编程抽象的代表,也是 Netty 的精髓。

二、Bootstrap 和 ServerBootstrap

BootstrapServerBootstrap 作为 Netty 的引导类,提供配置 Netty 组件的接口,开发者通过这些接口来定制搭配 Netty 的各个组件,组装出一个健壮、高性能的网络通信模块。

BootstrapNetty 的客户端引导类,引导客户端进程连接到另一个运行在某个指定主机的指定端口上的服务端进程后进行网络通信。

ServerBootstrapNetty 的服务端引导类,引导一个服务端进程绑定到某个指定的端口,接收来自客户端的网络连接后进行网络通信。

三、Channel

Channel 是 Java NIO 的一个基本构造,从网络编程视角看可把Channel 理解成是对 Socket 操作的封装,所提供的如端口绑定、建立连接、数据读写等 API 降低了直接使用 Socket 的复杂度;Channel具备以下特性:

  • 可获得当前网络连接的通道状态
  • 可获得网络连接的配置参数(缓冲区大小等)
  • 提供异步的⽹络 I/O 操作,⽐如建⽴连接、绑定端⼝、数据读写等
  • 获得 ChannelFuture 实例,并在其上注册监听器⽤于监听 I/O 操作成功、失败、取消时的事件回调。
  • 不同协议、不同 I/O 类型的连接都有不同的 Channel 类型与之对应

四、Selector

java.nio.channels.Selector 是 Java 非阻塞 I/O 实现的关键。Selector 管理一组非阻塞 socket,当这些 socket 中有已就绪可进行 I/O 相关操作的时候,会进行事件通知。使用非阻塞 I/O 比用阻塞 I/O 来处理大量事件相比,处理更快速、更经济。

Selector 被称作多路复⽤器,正是因为借助它可以实现用一个线程监视多个文件句柄,在网络场景中即是一个线程监视多个 socket 句柄。

Netty 中即是一个 Selector 可以监视多个 Channel ,监听 I/O 事件,如 OP_ACCEPT(接收连接事件)、OP_CONNECT(连接事件)、OP_READ(读事件)、OP_WRITE(写事件),还可以不断的查询已注册 Channel 是否处于就绪状态,通过一个线程中管理一个Selector,一个Selector监视多个 Channel,继而达到用少量的线程管理大量的 Channel

五、ChannelFuture

Netty 中所有的 I/O 操作都是异步的。异步操作会立即返回,但操作结果可能不会立即返回,获取结果有同步和异步两种方式:

  • 异步方式,即需要一种在操作执行之后的某个时间点通知用户其结果的方法。ChannelFuture 可通过 addListener()方法注册了一个或多个 ChannelFutureListener,当操作完成时(无论是否成功)监听器的operationComplete(ChannelFuture channelFuture)方法会被回调执行,若是异常可通过channelFuture.cause()来获得对应的Throwable对象。
  • 同步方式,需借助ChannelFuture#sync() ⽅法达到同步执⾏的效果。

六、EventLoop 和 EventLoopGroup

Netty具有用少量的线程管理大量的 Channel的能力的基础是一个线程可管理一个可监听多个 Channel中 I/O 事件 的 Selector,那从开发者视角出发,如何提供线程,如何关注事件并提供对应的处理逻辑,并尽量少的关注线程安全问题?Netty 是了解开发者的,提供的这个组件就是EventLoop

EventLoop 内创建一个线程并管理一个 Selector,每个 Channel 被创建后就会被分配给一个 SelectorSelector 会监听注册在其上的多个 Channel 的 I/O 事件,EventLoop 会在这个内部线程中通过Selector检测到多个 Channel 里发生的 I/O 事件,并将 I/O 事件派发给对应ChannelChannelHandler。所以的一个 Channel 的所有 I/O 事件都在EventLoop 内的这个线程中被处理。EventLoop并不独立存在,在 Netty 中是被池化管理的,这个管理者就是 EventLoopGroup,因为每个EventLoop内都有一个线程,所以通常也把EventLoopGroup类比为线程池,参考下图:

通过上边的介绍不难看出 EventLoop 的能力封装将 Selector透明化了,因此通常 Netty 的资料尝尝仅介绍 ChannelEventLoopEventLoopGroup 之间的关系:

  • 一个 EventLoopGroup 包含 n 个 EventLoop
  • EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop,在当前实现中,使用 round-robin(顺序循环)的方式进行分配以获取一个均衡的分布
  • 一个 EventLoop 在它的生命周期内只和一个线程绑定;且线程是按需创建
  • 所有由 EventLoop 处理的 I/O 事件都将在它专有的线程上被处理;
  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop
  • 多个 Channel 会被分配给同一个 EventLoop

从运行机制来说EventLoop是一种事件等待和处理的程序模型,如 Node.js 就是采用 EventLoop 的运行机制,这种机制可以解决多线程资源消耗高的问题。每当事件发生时,应用程序都会将产生的事件放入事件队列当中,然后会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行。事件执行的方式通常分为立即执行、延后执行、定期执行几种,Netty 中的事件执行方式也是这样,只是事件名称上稍有差异。

Netty 中 EventLoop 的实现大概是这样,当EventLoop首次收到任务后,在其内部实例化一个线程,这个线程run()方法的主体逻辑是 for 循环来处理事件(Selector上监听到的 I/O 任务)和 异步任务(⾮ I/O 任务,每个 EventLoop 都拥有它自已的异步任务队列):

  • 事件(I/O 任务):如 OP_ACCEPT(接收连接事件)、OP_CONNECT(连接事件)、OP_READ(读事件)、OP_WRITE(写事件)等,由 processSelectedKeys() ⽅法触发。
  • 异步任务(⾮ I/O 任务):如 register0、bind0 等任务,以及其他显式提交的调度任务最终将会被添加到 taskQueue 任务队列中,由 runAllTasks ⽅法触发。

事件和异步任务是以先进先出(FIFO)的顺序执行的。这样可以通过保证字节内容总是按正确的顺序被处理。一个异步任务提交的小细节,当调用 execute() 或者submit()方法提交异步任务的线程刚好是EventLoop中的线程,则任务会被立即执行,而无需在投入到taskQueue中。

EventLoop中除了可以提交这种普通异步任务,还可以提交定时任务(也算一种特殊的异步任务),定时任务是调度一个任务在稍后(延迟)执行或者周期性地执行。例如,定时发送心跳消息到服务端,以检查连接是否仍然还活着。如果没有响应,你便知道可以关闭该 Channel 了。定时任务和普通异步任务在EventLoop中的执行时机基本类似,其他区别之处在于:

  1. 提交⽅法 :
  • 定时异步任务使⽤ scheduleAtFixedRate() 或者 scheduleWithFixedDelay() ⽅法提交任务
  • 普通异步任务使⽤ execute() 或者submit()方法提交任务
  1. 任务队列 :
  • 定时异步任务提交到 ScheduleTaskQueue 任务队列中
  • 普通异步任务提交到 TaskQueue 任务队列中

EventLoop 中 I/O 事件的处理优先级是高于taskQueue中的异步任务,优先级的管控可通过ioRatio微调(请读者老师自省查阅)。优先级的控制方法是限制runAllTasks(xxx)这个方法中异步任务的处理时长。在runAllTasks(xxx),会提取定时任务队列ScheduleTaskQueue中到时间点需要被执行的任务,转移到taskQueue排队,之后从taskQueue里面逐个取出任务并执行,当本次runAllTasks()中处理耗时超过限定时间后终止,转去继续处理 I/O 事件,如此形成循环。

NioEventLoop#run()的核心逻辑是处理完轮询到的 key 之后, 首先记录下耗时, 然后通过 runAllTasks(ioTime \* (100 - ioRatio) / ioRatio),限时执行taskQueue中的任务

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //轮询io事件(1)
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //默认是50
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                //记录下开始时间
                final long ioStartTime = System.nanoTime();
                try {
                    //处理轮询到的key(2)
                    processSelectedKeys();
                } finally {
                    //计算耗时
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //执行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //代码省略
    }
}

runAllTasks(xxx)异步任务的限时处理环节,会提取定时任务队列ScheduleTaskQueue中到时间点需要被执行的任务,转移到taskQueue排队,之后从taskQueue里面逐个取出任务并执行,处理耗时超过限定时间后终止任务处理,退出方法。

protected boolean runAllTasks(long timeoutNanos) {
    //定时任务队列中提到点取需执行任务
    fetchFromScheduledTaskQueue();
    //从普通taskQ里面拿一个任务
    Runnable task = pollTask();
    //task为空, 则直接返回
    if (task == null) {
        //跑完所有的任务执行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果队列不为空
    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //执行每一个任务
    for (;;) {
        safeExecute(task);
        //标记当前跑完的任务
        runTasks ++;
        //当跑完64个任务的时候, 会计算一下当前时间
        if ((runTasks & 0x3F) == 0) {
            //定时任务初始化到当前的时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超过截止时间则不执行(nanoTime()是耗时的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果没有超过这个时间, 则继续从普通任务队列拿任务
        task = pollTask();
        //直到没有任务执行
        if (task == null) {
            //记录下最后执行时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

fetchFromScheduledTaskQueue()这个方法将定时任务中提取到点取需执行的定时任务添加到 taskQueue

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //从定时任务队列中抓取第一个定时任务
    //寻找截止时间为nanoTime的任务
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果该定时任务队列不为空, 则塞到普通任务队列里面
    while (scheduledTask != null) {
        //如果添加到普通任务队列过程中失败
        if (!taskQueue.offer(scheduledTask)) {
            //则重新添加到定时任务队列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //继续从定时任务队列中拉取任务
        //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

EventLoop 将负责在同一个线程中处理一个或多个 Channel 的整个生命周期内的所有事件。这个情况也有弊端:

  1. 若事件处理耗时很长,将导致 I/O 流量下降。所以需考虑任务处理对系统性能的影响,选择合适的 Netty 线程模型,配置合理的线程数
  2. EventLoop 被多个 Channel 复用,那么这些 ChannelThreadLocal 都将是一样的。

七、ChannelPipelineChannelHandler

前文提到Eventloop 中将Channel中的 I/O 事件派发给 ChannelHandler 处理,开发人员在ChannelHandler中添加对应事件的处理逻辑,从 NettyChannelHandler 的组织管理来说,开发者的视角是用ChannelPipelineChannelHandler以链表的方式串联起来。如果一个完整的 I/O 处理流程是由 解码构建消息->接收处理消息回执发送消息->消息编码发送 这3个步骤组成,那就用 3 个ChannelHandler分别实现这 3 步骤的逻辑,这种链式处理层次分明、代码清晰。

但从源码实现的角度是这样的,Channel中有个ChannelPipeline属性,创建Channel时,同时实例化ChannelPipeline属性。ChannelPipeline串起的其实是ChannelHandlerContext,为什么资料常说ChannelPipelineChannelHandler以链表的方式串联起来呢?原因是被串起的 ctx 中有个属性是 ChannelHandler

ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler 交互。

八、最后说一句

我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【架构染色】进行交流和学习,您的支持是我坚持写作最大的动力。

另外技术群中也有许多热心的大佬交流、互助;可通过点击【架构染色】公众号主页右下角【交流探讨】获得笔者微信二维码,加微时备注“加群”即可,期待并欢迎您的加入。

参考并感谢

相关文章
|
8月前
|
网络协议 Java Maven
基于Netty实现TCP通信
基于Netty实现TCP通信
117 0
|
8月前
|
网络协议
【Netty 网络通信】Socket 通信原理
【1月更文挑战第9天】【Netty 网络通信】Socket 通信原理
|
5月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
219 1
|
5月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
134 1
|
8月前
|
网络协议
Netty实现TCP通信
Netty实现TCP通信
113 0
|
8月前
|
Dubbo Java 应用服务中间件
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。 无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。
161 2
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
|
8月前
|
移动开发 网络协议 Java
通信密码学:探秘Netty中解码器的神奇力量
通信密码学:探秘Netty中解码器的神奇力量
231 0
|
8月前
|
前端开发 API 开发者
通信的枢纽:探秘Netty中神奇的Channel
通信的枢纽:探秘Netty中神奇的Channel
164 0
|
8月前
|
消息中间件 缓存 API
|
8月前
|
负载均衡 Java 调度
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。
102 0
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)