【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)

简介: 今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。

前提介绍

今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。

无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。

回顾Dubbo

相信大家都指导Dubbo(Dubbo3)这个非常著名的RPC框架对吧,如果你忘记了,那么我给您先垫垫底,可以看到下面就是Dubbo的借本架构图,当然Dubbo3会更加复杂,我们先按照基础的Dubbo架构进行回顾
在这里插入图片描述
无论是在分布式系统、微服务架构还是其他需要跨网络进行通信的场景下,这个框架都能够帮助你实现高效的数据传输和通信。它具备出色的性能和可扩展性,能够满足各种复杂的通信需求。
在这里插入图片描述
但是无论是从层次化和结构化而言,Dubbo/Dubbo3都过于的复杂了,我们起始未必会用到那么复杂以及扩展性那么强的功能,因此我们来实现一个属于我们自己的一个可靠且高性能的远程通信解决方案。

分布式通信框架

分布式通信框架是一种卓越的高性能远程通信解决方案,它基于 Netty 实现了 TCP 通信的底层细节,并对上层进行了封装,以提供简单易用和高度可扩展的能力。在这里插入图片描述
这个框架能够帮助开发者轻松构建分布式系统,并实现可靠的跨网络通信。通过利用 Netty 的强大功能,该框架能够提供出色的性能和可靠性,同时还具备灵活的扩展性,可以满足各种复杂的通信需求。

组成元素

先介绍一下网络通信的两个最基本的元素和属性,如下所示。
在这里插入图片描述

  • Channel:可以理解为一个通道,即一条连接线路的概念。它承载着数据、信息或者信号的传输功能。

  • ChannelGroup:由多个通道组合而成的一个概念。它将多条通道有机地集合在一起,形成一个整体,以便更高效地进行数据、信息或者信号的传输。

    程序执行流程

下图自上而下分别为boss接受连接、channel、dispatcher、event listener和service。
在这里插入图片描述
这五个部分各自承载着独特的任务,又彼此协作,形成了一个系统化、高效化的运行流程。
在这里插入图片描述

  • Boss线程:接受连接流程,主要负责接受外部请求,这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求,boss会进行必要的处理,然后将请求分发给下面的线程池worker进行处理。

  • Worker线程:系统中的工作执行者,负责接收boss分发的任务,然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池worker通过channel与boss进行通信,确保任务能够准确无误地传递。

  • dispatcher机制:在worker执行任务的过程中,需要有一个机制来调度和分配任务。这就是dispatcher的作用。

dispatcher根据一定的策略和规则,将任务分配给合适的worker线程进行处理。这一环节保证了系统的负载均衡和高效运行。

  • EventListener:基于在每个worker线程内部,eventListener发挥着关键作用。它负责监听和处理线程中的事件,比如任务的完成、异常等。通过eventListener,系统能够及时响应各种事件,进行必要的处理和反馈。

  • Service业务逻辑实现:它代表了整个系统的核心业务逻辑。service接收并处理来自worker线程的任务,完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。

消息协议设计

消息协议这里是指对消息编码和解码的规范的一种定义,通信内置的消息协议采用如下结构:其中包含了三个部分:ID、Length 和 Content。
在这里插入图片描述

  1. ID:

    • 长度:1 字节
    • 用途:表示 Content 部分是否被压缩,其中 1 表示 Content 部分被压缩,0 表示未被压缩。
  2. Length:

    • 长度:4 字节
    • 用途:表示 ID 和 Content 的总长度。这通常用于消息分片或分批传输,确保接收方可以正确地重新组装消息。
  3. Content:
    • 长度:不定(由 Length 字段决定)
    • 用途:真实的消息内容。根据 ID 的值,它可能是压缩的或未压缩的。

如果 ID 为 1,则 Content 部分可能会被某种算法(如gzip)压缩,以减少存储或传输的空间需求。Length 字段确保了数据的完整性,因为接收方可以根据这个长度字段正确地读取和重组数据。

在实际应用中,这种结构通常用于网络通信、文件存储或数据库存储等场景,其中需要对数据进行有效且紧凑的表示。

实现机制

Netty框架原生提供了一个处理器链,该链用于对事件进行处理。每个处理器都实现了 ChannelHandler 接口。ChannelHandler 接口是一个空接口,其中:ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter
在这里插入图片描述
我们主要关注这两个接口,因为它们被用于处理读取输入和写入输出的消息。

ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapterNetty框架中用于处理从网络到应用程序的事件的组件。它是一种特殊的ChannelHandler,主要负责处理读取操作。
在这里插入图片描述

当网络通道接收到数据时,ChannelInboundHandlerAdapter会被触发,然后开发者可以通过重写其中的方法来执行需要的操作。常见的操作包括数据的解码、解压或反序列化等。

自定义事件处理

ChannelInboundHandlerAdapterNetty中实现业务逻辑的关键组件,它提供了丰富的方法来处理不同的事件,例如通道激活、数据读取和异常处理等,下面是对应的源码:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
   
   

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelRegistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelUnregistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelActive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelInactive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
   
        ctx.fireChannelRead(msg);
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelReadComplete();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
   
   
        ctx.fireUserEventTriggered(evt);
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.fireChannelWritabilityChanged();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
     * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
   
   
        ctx.fireExceptionCaught(cause);
    }
}

通过自定义ChannelInboundHandlerAdapter,开发者可以灵活地处理从网络到应用程序的数据传输过程,稍后回进行分析介绍。

ChannelOutboundHandlerAdapter

ChannelOutboundHandlerAdapter也是一种特殊的ChannelHandler,用于处理从应用程序到网络的事件,主要包括写出操作。它是Netty框架中的一个关键组件,负责将应用程序的数据写入网络通道中,以便发送给对应的接收端。
在这里插入图片描述
使用ChannelOutboundHandlerAdapter可以实现对写出事件的定制化处理,例如数据的编码、压缩或序列化等操作,以满足具体业务需求。它可以直接扩展ChannelOutboundHandlerAdapter类,并重写其中的方法来实现特定的功能。

package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;
/**
 * Skeleton implementation of a {@link ChannelOutboundHandler}. This implementation just forwards each method call via
 * the {@link ChannelHandlerContext}.
 */
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
   
   

    /**
     * Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
   
   
        ctx.bind(localAddress, promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
   
   
        ctx.connect(remoteAddress, localAddress, promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
   
   
        ctx.disconnect(promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
   
   
        ctx.close(promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#deregister(ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
   
   
        ctx.deregister(promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#read()} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.read();
    }

    /**
     * Calls {@link ChannelHandlerContext#write(Object, ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
   
   
        ctx.write(msg, promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#flush()} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
   
   
        ctx.flush();
    }
}

编(解)码处理器

编码(解码)处理器、压缩(解压)处理器以及序列化(反序列化)处理器等都是直接或间接用于实现ChannelHandler的组件。

编码过程阶段

编码过程由三个Handler组合完成,分别为序列化,压缩数据以及编码处理。
在这里插入图片描述

ChannelOutboundHandlerAdapter序列化实现

当你需要实现序列化数据的发送时,可以基于ChannelOutboundHandlerAdapter接口进行实现。下面是一个简单的示例代码,展示了如何使用write方法将序列化后的数据发送到网络:

public class SerializationHandler extends ChannelOutboundHandlerAdapter {
   
   

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
   
   
        // 进行数据序列化操作,这里假设使用Java内置的序列化方式
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        oos.flush();
        byte[] serializedData = bos.toByteArray();
        // 将序列化后的数据写入网络通道
        ByteBuf byteBuf = ctx.alloc().buffer();
        byteBuf.writeBytes(serializedData);
        ctx.write(byteBuf, promise);
    }
}

重写了write方法,在该方法中进行了数据的序列化操作。具体来说,我们使用Java内置的序列化方式将msg对象序列化为字节数组serializedData,然后将序列化后的数据写入网络通道。

ChannelOutboundHandlerAdapter压缩实现

要通过数据压缩进行处理,基于ChannelOutboundHandlerAdapter接口实现一个压缩处理器。使用DeflaterOutputStream进行数据压缩并发送到网络:

public class CompressionHandler extends ChannelOutboundHandlerAdapter {
   
   

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
   
   
        // 创建输出流,使用DeflaterOutputStream进行数据压缩
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DeflaterOutputStream dos = new DeflaterOutputStream(bos);

        // 压缩数据
        dos.write((byte[]) msg);
        dos.finish();

        // 获取压缩后的数据
        byte[] compressedData = bos.toByteArray();

        // 将压缩后的数据写入网络通道
        ByteBuf byteBuf = ctx.alloc().buffer();
        byteBuf.writeBytes(compressedData);
        ctx.write(byteBuf, promise);
    }
}

同样也是重写了write方法,在该方法中进行了数据的压缩操作。我们使用DeflaterOutputStream将原始数据(byte[]) msg进行压缩,然后将压缩后的数据写入网络通道。

LengthBasedEncoder编码器

要实现Netty中的编码器,你可以自定义一个类并实现MessageToByteEncoder接口。展示了如何编写一个基于字符串的编码器:

public class LengthBasedEncoder extends MessageToByteEncoder<String> {
   
   

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
   
   
        byte[] data = msg.getBytes(StandardCharsets.UTF_8);
        int length = data.length;
        out.writeInt(length);
        out.writeBytes(data);
    }
}

encode方法中,我们首先将字符串转换为字节数组,使用UTF-8字符集进行编码。然后,我们获取字节数组的长度,并将其写入输出ByteBuf。最后,我们将字节数组写入输出缓冲区。

注意,上述示例中使用的是字符串编码器,你可以根据实际需求替换成其他类型的编码器。同时,也请确保在创建ByteBuf对象时使用适当的Allocator,以获取更高效的内存分配和释放。

通过将自定义的编码器StringEncoder添加到NettyChannelPipeline中,作为ChannelOutboundHandler使用,你就可以在数据发送前将字符串编码为字节并写入网络通道中了。

解码过程阶段

解码的代码和编码的代码就是一个镜像操作和处理,在这里就进行赘余了,相信小伙伴都可以实现,如果真的有不会实现的,可以评论区留言告诉我,我把完整代码给你们。
在这里插入图片描述
对于 TCP 通信而言,粘包是很正常的现象,因此 decoder 必须处理粘包问题。LengthFrameDecoder 是一个支持粘包处理的decoder 类抽象,可基于基于长度的解码器的实现方式进行控制。

处理器链的建立

通过处理器链,Netty框架可以非常灵活地处理不同类型的事件,在Netty中,我们可以通过ChannelPipeline来建立处理器链。ChannelPipeline是一个用于管理和执行处理器的容器,它负责处理入站和出站的事件,并将这些事件传递给适当的处理器。

创建ChannelPipeline对象

ChannelPipeline pipeline = channel.pipeline();

ChannelPipeline中添加处理器

pipeline.addLast("handler1", new Handler1());
pipeline.addLast("handler2", new Handler2());

这里的 "handler1""handler2" 是处理器的名称,可以根据需要进行命名。

添加的顺序形成处理器链

数据将按照顺序在处理器之间传递。最后一个添加的处理器将是数据的出站处理器,第一个添加的处理器将是数据的入站处理器。

ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new MyHandler());

通过建立处理器链,可以根据需要按照一定的顺序和逻辑处理数据。

未完待续

由于篇幅过长,本文就到这里为止。下一篇文章将继续介绍《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)》,并详细说明剩下的内容。敬请期待!

相关文章
|
14天前
|
机器学习/深度学习 自然语言处理 并行计算
DeepSpeed分布式训练框架深度学习指南
【11月更文挑战第6天】随着深度学习模型规模的日益增大,训练这些模型所需的计算资源和时间成本也随之增加。传统的单机训练方式已难以应对大规模模型的训练需求。
58 3
|
18天前
|
监控 算法 网络协议
|
18天前
|
机器学习/深度学习 并行计算 Java
谈谈分布式训练框架DeepSpeed与Megatron
【11月更文挑战第3天】随着深度学习技术的不断发展,大规模模型的训练需求日益增长。为了应对这种需求,分布式训练框架应运而生,其中DeepSpeed和Megatron是两个备受瞩目的框架。本文将深入探讨这两个框架的背景、业务场景、优缺点、主要功能及底层实现逻辑,并提供一个基于Java语言的简单demo例子,帮助读者更好地理解这些技术。
42 2
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
42 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
30天前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
47 1
|
2月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
83 3
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
47 0
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
110 2
基于Redis的高可用分布式锁——RedLock