微服务框架(十七)Dubbo协议及编码过程源码解析

简介:   此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。  本文为Dubbo协议、线程模型、和其基于Netty的NIO异步通讯机制及源码

  此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。
  本文为Dubbo协议、线程模型及协议编码过程源码

本系列文章中所使用的框架版本为Spring Boot 2.0.3-RELEASE,Spring 5.0.7-RELEASE,Dubbo 2.6.2。

Dubbo传输协议

Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

反之,Dubbo 缺省协议不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。

红色为dubbo协议缺省值

  • Transporter(协议的服务端和客户端实现类型): mina, netty, grizzy
  • Serialization(协议序列化方式): dubbo, hessian2, java, json
  • Dispatcher(协议的消息派发方式,用于指定线程模型): all, direct, message, execution, connection
  • ThreadPool(线程池类型): fixed, cached

报文格式

在这里插入图片描述
协议头是16字节的定长数据

  • 魔法数(2 byte)。用于识别dubbo协议数据包,魔法数为short类型的常量0xdabb
  • 消息标志位(1 byte)
    • 数据包类型(1 bit)。(0-Response,1-Request)
    • 调用方式(1 bit)。仅在第16位被设为Request的情况下有效(0-单向调用,1-双向调用)
    • 事件标识(1 bit)。(0-当前数据包是请求或响应包,1-当前数据包是心跳包)
    • 序列化器编号(5 bit)
  • 状态(1 byte)。当消息类型为响应时,设置请求响应状态,dubbo定义了一些响应的类型。具体类型见com.alibaba.dubbo.remoting.exchange.Response
  • 消息ID(8 byte)。long类型,请求的唯一识别id(由于采用异步通讯的方式,用以将请求和响应绑定)
  • 消息体长度(4 byte)。int类型,即记录Body Content有多少个字节

特性

缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 tbremoting 交互。

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议:TCP
  • 传输方式:NIO 异步传输
  • 序列化:Hessian 二进制序列化
  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
  • 适用场景:常规远程服务方法调用

Dubbo线程模型

如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。

但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。

如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VwV5Ubpn-1668685558914)(http://dubbo.apache.org/docs/zh-cn/user/sources/images/dubbo-protocol.jpg)]

因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

Dispatcher

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。(缺省)
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

ThreadPool

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

Dubbo编码过程源码解析

  • 构造消息头,并将除消息体长度外的信息写入header
    1. 设置魔法数
    2. 设置数据包类型和序列化器编号
    3. 设置通信方式(单向/双向)
    4. 设置事件标识
    5. 设置请求编号
  • 创建序列化器,并将序列化后的数据写入ChannelBuffer
  • 将消息体长度(data length)写入消息头header
  • 将消息头header写入ChannelBuffer
public class ExchangeCodec extends TelnetCodec {
   
   

    // header length.
    protected static final int HEADER_LENGTH = 16;
    // magic header.
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    // message flag.
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
   
   
        if (msg instanceof Request) {
   
   
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
   
   
            encodeResponse(channel, buffer, (Response) msg);
        } else {
   
   
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
   
   
        Serialization serialization = getSerialization(channel);

        // 创建消息头字节数组,长度为 16
        byte[] header = new byte[HEADER_LENGTH];

        // 设置魔法数
        Bytes.short2bytes(MAGIC, header);

        // 设置数据包类型(Request/Response)和序列化器编号
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 设置通信方式(单向/双向)
        if (req.isTwoWay()) {
   
   
            header[2] |= FLAG_TWOWAY;
        }

        // 设置事件标识
        if (req.isEvent()) {
   
   
            header[2] |= FLAG_EVENT;
        }

        // 设置请求编号,8个字节,从第4个字节开始设置
        Bytes.long2bytes(req.getId(), header, 4);

        // 获取 buffer 当前的写位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,为消息头预留 16 个字节的空间
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 创建序列化器,比如 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
   
   
            // 对事件数据进行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
   
   
            // 对请求数据进行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
   
   
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();

        // 获取写入的字节数,也就是消息体长度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 将消息体长度写入到消息头中
        Bytes.int2bytes(len, header, 12);

        // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
        buffer.writerIndex(savedWriteIndex);
        // 从 savedWriteIndex 下标处写入消息头
        buffer.writeBytes(header);
        // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
}

请求数据的序列化操作:

public class DubboCodec extends ExchangeCodec implements Codec2 {
   
   

    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
   
   
        RpcInvocation inv = (RpcInvocation) data;

        // 依次序列化 dubbo version、path、version
        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        // 序列化调用方法名
        out.writeUTF(inv.getMethodName());
        // 将参数类型转换为字符串,并进行序列化
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
   
   
                // 对运行时参数进行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }

        // 序列化 attachments
        out.writeObject(inv.getAttachments());
    }
}

参考资料:

  1. 线程模型
  2. Dubbo 关于同步/异步调用的几种方式
  3. Netty在dubbo中的应用浅析
  4. Dubbo学习笔记
  5. 服务调用过程
相关文章
|
11月前
|
网络协议
为何UDP协议不可靠?DNS为何选择UDP?
总的来说,UDP和TCP各有优势,选择哪种协议取决于应用的具体需求。UDP可能不如TCP可靠,但其简单、快速的特性使其在某些场景下成为更好的选择。而DNS就是这样的一个例子,它利用了UDP的优势,以实现快速、高效的名字解析服务。
609 14
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1195 29
|
编解码 监控 网络协议
RTSP协议规范与SmartMediaKit播放器技术解析
RTSP协议是实时流媒体传输的重要规范,大牛直播SDK的rtsp播放器基于此构建,具备跨平台支持、超低延迟(100-300ms)、多实例播放、高效资源利用、音视频同步等优势。它广泛应用于安防监控、远程教学等领域,提供实时录像、快照等功能,优化网络传输与解码效率,并通过事件回调机制保障稳定性。作为高性能解决方案,它推动了实时流媒体技术的发展。
640 5
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
495 4
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
790 6
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
409 1

热门文章

最新文章

推荐镜像

更多
  • DNS