【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)

简介: 经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。

前提介绍

经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。

整体框架如下图所示:
在这里插入图片描述
对应的组件的基本功能和功能实现范畴。
在这里插入图片描述
在上一节,我们主要讲对应的Dispatcher上面之前的逻辑操作实现,进行了对应的介绍和分析:
在这里插入图片描述

  • Boss线程:接受连接流程,主要负责接受外部请求,这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求,boss会进行必要的处理,然后将请求分发给下面的线程池worker进行处理。

  • Worker线程:系统中的工作执行者,负责接收boss分发的任务,然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池worker通过channel与boss进行通信,确保任务能够准确无误地传递。

  • ChannelHandler处理器 :ChannelHandler 接口是一个空接口,其中:ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter是我们首先要实现和操作的基础。

    本节重点

本节内容的重点是针对于Dispatcher分配和调度以及之后的操作流程的介绍和分析。

  • dispatcher机制:在worker执行任务的过程中,需要有一个机制来调度和分配任务。这就是dispatcher的作用。
    在这里插入图片描述
  • EventListener:基于在每个worker线程内部,eventListener发挥着关键作用。它负责监听和处理线程中的事件,比如任务的完成、异常等。通过eventListener,系统能够及时响应各种事件,进行必要的处理和反馈。

  • Service业务逻辑实现:它代表了整个系统的核心业务逻辑。service接收并处理来自worker线程的任务,完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。

    Dispatcher(分派调度器)

Dispatcher根据一定的策略和规则,将任务分配给合适的worker线程进行处理。这一环节保证了系统的负载均衡和高效运行。
在这里插入图片描述
消息经过Pipline链处理后,将由Dispatcher转发,并进入EventListener链进行处理。Dispatcher内部使用了两个线程池:channelExecutor和dataExecutor。
在这里插入图片描述

  • netExecutor用于处理通道事件和异常事件。由于通道事件可能需要同步调用远程服务,因此该线程池没有设定上限,因为同步调用会阻塞当前线程。

  • dataExecutor用于处理消息事件。根据经验值,默认的最大线程数为150,但可以通过选项参数进行修改。

    EventListener

    ChannelEventListener

ChannelInboundHandler接口定义了一系列方法,用于处理Channel的入站事件。这些方法负责处理数据从外部系统(如网络)流入Channel的过程。这些方法都是将对应的事件(channelRegistered、channelUnregistered、channelActive、channelInactive)转发给ChannelPipeline中的下一个ChannelInboundHandler,如下面的源码所示:

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelRegistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelUnregistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelActive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelInactive();
    }
AI 代码解读

Channel通道事件

channelRegistered、channelUnregistered、channelActive和channelInactive这几个方法是用于处理不同类型的通道事件。下面分别对这几个方法进行详细分析:

  • channelRegistered(ChannelHandlerContext ctx): 这个方法在通道被注册到EventLoop(事件循环)后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelUnregistered(ChannelHandlerContext ctx):这个方法在通道从EventLoop中注销后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelActive(ChannelHandlerContext ctx):这个方法在通道变为活跃状态后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelInactive(ChannelHandlerContext ctx):这个方法在通道变为非活跃状态后被调用。 它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。
    在这里插入图片描述
    我们只需将相应的实现注入并发布 ChannelActionEvent 对象模型事件。这样,ChannelActionEvent 对象的消费者就能够监听事件并执行相应的逻辑操作。通过这种方式,我们实现了事件的发布与订阅机制,以便实现松耦合的组件间通信,并能根据实际需求对事件进行灵活地处理和扩展。

定义ChannelActionEvent

首先,定义一个自定义事件类 ChannelActionEventextends ,继承自 ApplicationEvent,主要作为通道变化的处理器事件。

public class ChannelActionEvent extends ApplicationEvent {
   
   
    private Object data;
    public ChannelActionEventextends (Object source, String data) {
   
   
        super(source);
        this.data = data;
    }
    public String getData() {
   
   
        return data;
    }
}

@Component
public class ChannelActionEventListener extends implements ApplicationListener<ChannelActionEvent > {
   
   

    @Override
    public void onApplicationEvent(MyEvent event) {
   
   
        Object data = event.getData();
        // 执行对应的逻辑操作
        System.out.println("Received event with data: " + data);
    }
}
AI 代码解读

因此,根据同样的逻辑,ExceptionEvent事件也可以通过方法处理器的exceptionCaught方法进行处理。
在这里插入图片描述

 @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
   
   
        ctx.fireExceptionCaught(cause);
    }
AI 代码解读

DataEvent可以覆盖对应的channelRead、channelReadComplete的方法进行发布对应的事件处理即可。
在这里插入图片描述

      @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
   
        ctx.fireChannelRead(msg);
    }
    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelReadComplete();
    }
AI 代码解读

框架会预先在 XXEventListener 链末端注册 ServiceMessageEventListener,该 Listener 负责调用被注册的 Service,并将
返回值或异常回传。
在这里插入图片描述

Heartbeat、超时及重连机制

Netty提供了读空闲和写空闲的功能来处理网络连接的空闲状态。

读空闲(Read Idle):当连接在指定的时间内没有接收到任何数据时,就会触发读空闲事件。这个事件可以用来检测连接是否处于空闲状态,或者判断通信对方是否还与服务器保持连接。通过设置ChannelOption.READ_IDLE_TIME参数来定义读空闲的时间。

写空闲(Write Idle):当连接在指定的时间内没有发送任何数据时,就会触发写空闲事件。这个事件可以用来定期发送心跳消息或其他需要保持连接的数据。通过设置ChannelOption.WRITE_IDLE_TIME参数来定义写空闲的时间。

在这里插入图片描述
在Netty中,可以通过ChannelOption设置读空闲和写空闲的时间,然后通过ChannelHandler的回调方法来处理空闲事件。常用的回调方法包括:

  • channelIdle(ChannelHandlerContext ctx, IdleStateEvent stateEvent):当发生空闲事件时调用该方法,可以在该方法中执行相应的逻辑操作。

通常,通过在管道中配置IdleStateHandler来启用空闲事件的检测和处理。

IdleStateHandler是Netty提供的一个特殊的ChannelHandler,用于检测并处理读空闲和写空闲事件。例如,可以在初始化管道时添加以下代码:

pipeline.addLast(new IdleStateHandler(0, 0, idleTime)); // 设置读写空闲时间
pipeline.addLast(new MyIdleHandler()); // 自定义的空闲事件处理器
AI 代码解读

在自定义的空闲事件处理器中,可以根据读空闲或写空闲事件执行相应的操作。例如,发送心跳消息、关闭连接等。

目录
打赏
0
0
0
0
379
分享
相关文章
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
分布式爬虫框架Scrapy-Redis实战指南
登顶TPC-C|云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
阿里云PolarDB云原生数据库在TPC-C基准测试中以20.55亿tpmC的成绩刷新世界纪录,展现卓越性能与性价比。其轻量版满足国产化需求,兼具高性能与低成本,适用于多种场景,推动数据库技术革新与发展。
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
1260 66
DeepSeek进阶开发与应用4:DeepSeek中的分布式训练技术
随着深度学习模型和数据集规模的扩大,单机训练已无法满足需求,分布式训练技术应运而生。DeepSeek框架支持数据并行和模型并行两种模式,通过将计算任务分配到多个节点上并行执行,显著提高训练效率。本文介绍DeepSeek中的分布式训练技术,包括配置与启动方法,帮助用户轻松实现大规模模型训练。数据并行通过`MirroredStrategy`同步梯度,适用于大多数模型;模型并行则通过`ParameterServerStrategy`异步处理大模型。DeepSeek简化了分布式环境配置,支持单机多卡和多机多卡等场景。
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
127 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
203 7
从零到一:分布式缓存技术初探
分布式缓存通过将数据存储在多个节点上,利用负载均衡算法提高访问速度、降低数据库负载并增强系统可用性。常见产品有Redis、Memcached等。其优势包括性能扩展、高可用性、负载均衡和容错性,适用于页面缓存、应用对象缓存、状态缓存、并行处理、事件处理及极限事务处理等多种场景。
301 1
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
147 2
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
75 1
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
489 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁

热门文章

最新文章