【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)

简介: 经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。

前提介绍

经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。

整体框架如下图所示:
在这里插入图片描述
对应的组件的基本功能和功能实现范畴。
在这里插入图片描述
在上一节,我们主要讲对应的Dispatcher上面之前的逻辑操作实现,进行了对应的介绍和分析:
在这里插入图片描述

  • Boss线程:接受连接流程,主要负责接受外部请求,这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求,boss会进行必要的处理,然后将请求分发给下面的线程池worker进行处理。

  • Worker线程:系统中的工作执行者,负责接收boss分发的任务,然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池worker通过channel与boss进行通信,确保任务能够准确无误地传递。

  • ChannelHandler处理器 :ChannelHandler 接口是一个空接口,其中:ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter是我们首先要实现和操作的基础。

    本节重点

本节内容的重点是针对于Dispatcher分配和调度以及之后的操作流程的介绍和分析。

  • dispatcher机制:在worker执行任务的过程中,需要有一个机制来调度和分配任务。这就是dispatcher的作用。
    在这里插入图片描述
  • EventListener:基于在每个worker线程内部,eventListener发挥着关键作用。它负责监听和处理线程中的事件,比如任务的完成、异常等。通过eventListener,系统能够及时响应各种事件,进行必要的处理和反馈。

  • Service业务逻辑实现:它代表了整个系统的核心业务逻辑。service接收并处理来自worker线程的任务,完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。

    Dispatcher(分派调度器)

Dispatcher根据一定的策略和规则,将任务分配给合适的worker线程进行处理。这一环节保证了系统的负载均衡和高效运行。
在这里插入图片描述
消息经过Pipline链处理后,将由Dispatcher转发,并进入EventListener链进行处理。Dispatcher内部使用了两个线程池:channelExecutor和dataExecutor。
在这里插入图片描述

  • netExecutor用于处理通道事件和异常事件。由于通道事件可能需要同步调用远程服务,因此该线程池没有设定上限,因为同步调用会阻塞当前线程。

  • dataExecutor用于处理消息事件。根据经验值,默认的最大线程数为150,但可以通过选项参数进行修改。

    EventListener

    ChannelEventListener

ChannelInboundHandler接口定义了一系列方法,用于处理Channel的入站事件。这些方法负责处理数据从外部系统(如网络)流入Channel的过程。这些方法都是将对应的事件(channelRegistered、channelUnregistered、channelActive、channelInactive)转发给ChannelPipeline中的下一个ChannelInboundHandler,如下面的源码所示:

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelRegistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelUnregistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelActive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelInactive();
    }

Channel通道事件

channelRegistered、channelUnregistered、channelActive和channelInactive这几个方法是用于处理不同类型的通道事件。下面分别对这几个方法进行详细分析:

  • channelRegistered(ChannelHandlerContext ctx): 这个方法在通道被注册到EventLoop(事件循环)后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelUnregistered(ChannelHandlerContext ctx):这个方法在通道从EventLoop中注销后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelActive(ChannelHandlerContext ctx):这个方法在通道变为活跃状态后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelInactive(ChannelHandlerContext ctx):这个方法在通道变为非活跃状态后被调用。 它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。
    在这里插入图片描述
    我们只需将相应的实现注入并发布 ChannelActionEvent 对象模型事件。这样,ChannelActionEvent 对象的消费者就能够监听事件并执行相应的逻辑操作。通过这种方式,我们实现了事件的发布与订阅机制,以便实现松耦合的组件间通信,并能根据实际需求对事件进行灵活地处理和扩展。

定义ChannelActionEvent

首先,定义一个自定义事件类 ChannelActionEventextends ,继承自 ApplicationEvent,主要作为通道变化的处理器事件。

public class ChannelActionEvent extends ApplicationEvent {
   
   
    private Object data;
    public ChannelActionEventextends (Object source, String data) {
   
   
        super(source);
        this.data = data;
    }
    public String getData() {
   
   
        return data;
    }
}

@Component
public class ChannelActionEventListener extends implements ApplicationListener<ChannelActionEvent > {
   
   

    @Override
    public void onApplicationEvent(MyEvent event) {
   
   
        Object data = event.getData();
        // 执行对应的逻辑操作
        System.out.println("Received event with data: " + data);
    }
}

因此,根据同样的逻辑,ExceptionEvent事件也可以通过方法处理器的exceptionCaught方法进行处理。
在这里插入图片描述

 @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
   
   
        ctx.fireExceptionCaught(cause);
    }

DataEvent可以覆盖对应的channelRead、channelReadComplete的方法进行发布对应的事件处理即可。
在这里插入图片描述

      @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
   
        ctx.fireChannelRead(msg);
    }
    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelReadComplete();
    }

框架会预先在 XXEventListener 链末端注册 ServiceMessageEventListener,该 Listener 负责调用被注册的 Service,并将
返回值或异常回传。
在这里插入图片描述

Heartbeat、超时及重连机制

Netty提供了读空闲和写空闲的功能来处理网络连接的空闲状态。

读空闲(Read Idle):当连接在指定的时间内没有接收到任何数据时,就会触发读空闲事件。这个事件可以用来检测连接是否处于空闲状态,或者判断通信对方是否还与服务器保持连接。通过设置ChannelOption.READ_IDLE_TIME参数来定义读空闲的时间。

写空闲(Write Idle):当连接在指定的时间内没有发送任何数据时,就会触发写空闲事件。这个事件可以用来定期发送心跳消息或其他需要保持连接的数据。通过设置ChannelOption.WRITE_IDLE_TIME参数来定义写空闲的时间。

在这里插入图片描述
在Netty中,可以通过ChannelOption设置读空闲和写空闲的时间,然后通过ChannelHandler的回调方法来处理空闲事件。常用的回调方法包括:

  • channelIdle(ChannelHandlerContext ctx, IdleStateEvent stateEvent):当发生空闲事件时调用该方法,可以在该方法中执行相应的逻辑操作。

通常,通过在管道中配置IdleStateHandler来启用空闲事件的检测和处理。

IdleStateHandler是Netty提供的一个特殊的ChannelHandler,用于检测并处理读空闲和写空闲事件。例如,可以在初始化管道时添加以下代码:

pipeline.addLast(new IdleStateHandler(0, 0, idleTime)); // 设置读写空闲时间
pipeline.addLast(new MyIdleHandler()); // 自定义的空闲事件处理器

在自定义的空闲事件处理器中,可以根据读空闲或写空闲事件执行相应的操作。例如,发送心跳消息、关闭连接等。

相关文章
|
7天前
|
负载均衡 测试技术 调度
大模型分布式推理:张量并行与流水线并行技术
本文深入探讨大语言模型分布式推理的核心技术——张量并行与流水线并行。通过分析单GPU内存限制下的模型部署挑战,详细解析张量并行的矩阵分片策略、流水线并行的阶段划分机制,以及二者的混合并行架构。文章包含完整的分布式推理框架实现、通信优化策略和性能调优指南,为千亿参数大模型的分布式部署提供全面解决方案。
111 4
|
4月前
|
消息中间件 负载均衡 中间件
⚡ 构建真正的高性能即时通讯服务:基于 Netty 集群的架构设计与实现
本文介绍了如何基于 Netty 构建分布式即时通讯集群。随着用户量增长,单体架构面临性能瓶颈,文章对比了三种集群方案:Nginx 负载均衡、注册中心服务发现与基于 ZooKeeper 的消息路由架构。最终选择第三种方案,通过 ZooKeeper 实现服务注册发现与消息路由,并结合 RabbitMQ 支持跨服务器消息广播。文中还详细讲解了 ZooKeeper 搭建、Netty 集群改造、动态端口分配、服务注册、负载均衡及消息广播的实现,构建了一个高可用、可水平扩展的即时通讯系统。
401 0
|
2月前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
256 4
|
29天前
|
机器学习/深度学习 监控 PyTorch
68_分布式训练技术:DDP与Horovod
随着大型语言模型(LLM)规模的不断扩大,从早期的BERT(数亿参数)到如今的GPT-4(万亿级参数),单卡训练已经成为不可能完成的任务。分布式训练技术应运而生,成为大模型开发的核心基础设施。2025年,分布式训练技术已经发展到相当成熟的阶段,各种优化策略和框架不断涌现,为大模型训练提供了强大的支持。
|
2月前
|
JSON 监控 Java
Elasticsearch 分布式搜索与分析引擎技术详解与实践指南
本文档全面介绍 Elasticsearch 分布式搜索与分析引擎的核心概念、架构设计和实践应用。作为基于 Lucene 的分布式搜索引擎,Elasticsearch 提供了近实时的搜索能力、强大的数据分析功能和可扩展的分布式架构。本文将深入探讨其索引机制、查询 DSL、集群管理、性能优化以及与各种应用场景的集成,帮助开发者构建高性能的搜索和分析系统。
171 0
|
8月前
|
Cloud Native 关系型数据库 分布式数据库
登顶TPC-C|云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
阿里云PolarDB云原生数据库在TPC-C基准测试中以20.55亿tpmC的成绩刷新世界纪录,展现卓越性能与性价比。其轻量版满足国产化需求,兼具高性能与低成本,适用于多种场景,推动数据库技术革新与发展。
|
2月前
|
监控 前端开发 安全
Netty 高性能网络编程框架技术详解与实践指南
本文档全面介绍 Netty 高性能网络编程框架的核心概念、架构设计和实践应用。作为 Java 领域最优秀的 NIO 框架之一,Netty 提供了异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。本文将深入探讨其 Reactor 模型、ChannelPipeline、编解码器、内存管理等核心机制,帮助开发者构建高性能的网络应用系统。
190 0
|
6月前
|
安全 JavaScript 前端开发
HarmonyOS NEXT~HarmonyOS 语言仓颉:下一代分布式开发语言的技术解析与应用实践
HarmonyOS语言仓颉是华为专为HarmonyOS生态系统设计的新型编程语言,旨在解决分布式环境下的开发挑战。它以“编码创造”为理念,具备分布式原生、高性能与高效率、安全可靠三大核心特性。仓颉语言通过内置分布式能力简化跨设备开发,提供统一的编程模型和开发体验。文章从语言基础、关键特性、开发实践及未来展望四个方面剖析其技术优势,助力开发者掌握这一新兴工具,构建全场景分布式应用。
590 35
|
7月前
|
Cloud Native 关系型数据库 分布式数据库
登顶TPC-C|云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
|
6月前
|
算法 Java 容器
Netty源码—4.客户端接入流程
本文主要介绍了关于Netty客户端连接接入问题整理、Reactor线程模型和服务端启动流程、Netty新连接接入的整体处理逻辑、新连接接入之检测新连接、新连接接入之创建NioSocketChannel、新连接接入之绑定NioEventLoop线程、新连接接入之注册Selector和注册读事件、注册Reactor线程总结、新连接接入总结

热门文章

最新文章

下一篇
开通oss服务