【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)

简介: 经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。

前提介绍

经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。

整体框架如下图所示:
在这里插入图片描述
对应的组件的基本功能和功能实现范畴。
在这里插入图片描述
在上一节,我们主要讲对应的Dispatcher上面之前的逻辑操作实现,进行了对应的介绍和分析:
在这里插入图片描述

  • Boss线程:接受连接流程,主要负责接受外部请求,这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求,boss会进行必要的处理,然后将请求分发给下面的线程池worker进行处理。

  • Worker线程:系统中的工作执行者,负责接收boss分发的任务,然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池worker通过channel与boss进行通信,确保任务能够准确无误地传递。

  • ChannelHandler处理器 :ChannelHandler 接口是一个空接口,其中:ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter是我们首先要实现和操作的基础。

    本节重点

本节内容的重点是针对于Dispatcher分配和调度以及之后的操作流程的介绍和分析。

  • dispatcher机制:在worker执行任务的过程中,需要有一个机制来调度和分配任务。这就是dispatcher的作用。
    在这里插入图片描述
  • EventListener:基于在每个worker线程内部,eventListener发挥着关键作用。它负责监听和处理线程中的事件,比如任务的完成、异常等。通过eventListener,系统能够及时响应各种事件,进行必要的处理和反馈。

  • Service业务逻辑实现:它代表了整个系统的核心业务逻辑。service接收并处理来自worker线程的任务,完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。

    Dispatcher(分派调度器)

Dispatcher根据一定的策略和规则,将任务分配给合适的worker线程进行处理。这一环节保证了系统的负载均衡和高效运行。
在这里插入图片描述
消息经过Pipline链处理后,将由Dispatcher转发,并进入EventListener链进行处理。Dispatcher内部使用了两个线程池:channelExecutor和dataExecutor。
在这里插入图片描述

  • netExecutor用于处理通道事件和异常事件。由于通道事件可能需要同步调用远程服务,因此该线程池没有设定上限,因为同步调用会阻塞当前线程。

  • dataExecutor用于处理消息事件。根据经验值,默认的最大线程数为150,但可以通过选项参数进行修改。

    EventListener

    ChannelEventListener

ChannelInboundHandler接口定义了一系列方法,用于处理Channel的入站事件。这些方法负责处理数据从外部系统(如网络)流入Channel的过程。这些方法都是将对应的事件(channelRegistered、channelUnregistered、channelActive、channelInactive)转发给ChannelPipeline中的下一个ChannelInboundHandler,如下面的源码所示:

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelRegistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelUnregistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelActive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelInactive();
    }

Channel通道事件

channelRegistered、channelUnregistered、channelActive和channelInactive这几个方法是用于处理不同类型的通道事件。下面分别对这几个方法进行详细分析:

  • channelRegistered(ChannelHandlerContext ctx): 这个方法在通道被注册到EventLoop(事件循环)后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelUnregistered(ChannelHandlerContext ctx):这个方法在通道从EventLoop中注销后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelActive(ChannelHandlerContext ctx):这个方法在通道变为活跃状态后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelInactive(ChannelHandlerContext ctx):这个方法在通道变为非活跃状态后被调用。 它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。
    在这里插入图片描述
    我们只需将相应的实现注入并发布 ChannelActionEvent 对象模型事件。这样,ChannelActionEvent 对象的消费者就能够监听事件并执行相应的逻辑操作。通过这种方式,我们实现了事件的发布与订阅机制,以便实现松耦合的组件间通信,并能根据实际需求对事件进行灵活地处理和扩展。

定义ChannelActionEvent

首先,定义一个自定义事件类 ChannelActionEventextends ,继承自 ApplicationEvent,主要作为通道变化的处理器事件。

public class ChannelActionEvent extends ApplicationEvent {
   
   
    private Object data;
    public ChannelActionEventextends (Object source, String data) {
   
   
        super(source);
        this.data = data;
    }
    public String getData() {
   
   
        return data;
    }
}

@Component
public class ChannelActionEventListener extends implements ApplicationListener<ChannelActionEvent > {
   
   

    @Override
    public void onApplicationEvent(MyEvent event) {
   
   
        Object data = event.getData();
        // 执行对应的逻辑操作
        System.out.println("Received event with data: " + data);
    }
}

因此,根据同样的逻辑,ExceptionEvent事件也可以通过方法处理器的exceptionCaught方法进行处理。
在这里插入图片描述

 @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
   
   
        ctx.fireExceptionCaught(cause);
    }

DataEvent可以覆盖对应的channelRead、channelReadComplete的方法进行发布对应的事件处理即可。
在这里插入图片描述

      @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
   
        ctx.fireChannelRead(msg);
    }
    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelReadComplete();
    }

框架会预先在 XXEventListener 链末端注册 ServiceMessageEventListener,该 Listener 负责调用被注册的 Service,并将
返回值或异常回传。
在这里插入图片描述

Heartbeat、超时及重连机制

Netty提供了读空闲和写空闲的功能来处理网络连接的空闲状态。

读空闲(Read Idle):当连接在指定的时间内没有接收到任何数据时,就会触发读空闲事件。这个事件可以用来检测连接是否处于空闲状态,或者判断通信对方是否还与服务器保持连接。通过设置ChannelOption.READ_IDLE_TIME参数来定义读空闲的时间。

写空闲(Write Idle):当连接在指定的时间内没有发送任何数据时,就会触发写空闲事件。这个事件可以用来定期发送心跳消息或其他需要保持连接的数据。通过设置ChannelOption.WRITE_IDLE_TIME参数来定义写空闲的时间。

在这里插入图片描述
在Netty中,可以通过ChannelOption设置读空闲和写空闲的时间,然后通过ChannelHandler的回调方法来处理空闲事件。常用的回调方法包括:

  • channelIdle(ChannelHandlerContext ctx, IdleStateEvent stateEvent):当发生空闲事件时调用该方法,可以在该方法中执行相应的逻辑操作。

通常,通过在管道中配置IdleStateHandler来启用空闲事件的检测和处理。

IdleStateHandler是Netty提供的一个特殊的ChannelHandler,用于检测并处理读空闲和写空闲事件。例如,可以在初始化管道时添加以下代码:

pipeline.addLast(new IdleStateHandler(0, 0, idleTime)); // 设置读写空闲时间
pipeline.addLast(new MyIdleHandler()); // 自定义的空闲事件处理器

在自定义的空闲事件处理器中,可以根据读空闲或写空闲事件执行相应的操作。例如,发送心跳消息、关闭连接等。

相关文章
|
16天前
|
存储 人工智能 PyTorch
基于PyTorch/XLA的高效分布式训练框架
基于PyTorch/XLA的高效分布式训练框架
26 2
|
29天前
|
消息中间件 算法 Java
【亿级数据专题】「分布式服务框架」 盘点本年度我们探索服务的保障容量的三大关键方案实现
【亿级数据专题】「分布式服务框架」 盘点本年度我们探索服务的保障容量的三大关键方案实现
187 0
|
16天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
29天前
|
设计模式 安全 Java
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
【分布式技术专题】「Tomcat技术专题」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)
33 0
|
3天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
29天前
|
存储 监控 安全
金石推荐 | 【分布式技术专题】「单点登录技术架构」一文带领你好好认识以下Saml协议的运作机制和流程模式
金石推荐 | 【分布式技术专题】「单点登录技术架构」一文带领你好好认识以下Saml协议的运作机制和流程模式
66 1
|
29天前
|
存储 Java 应用服务中间件
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
52 0
|
29天前
|
canal 消息中间件 关系型数据库
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
79 0
|
29天前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
34 1
|
7月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
40 0