基于eBPF的云原生可观测性开源项目Kindling之Dubbo2 协议开发流程

本文涉及的产品
性能测试 PTS,5000VUM额度
可观测可视化 Grafana 版,10个用户账号 1个月
应用实时监控服务ARMS - 应用监控,每月50GB免费额度
简介: 本文主要介绍基于eBPF的开源项目Kindling之协议开发流程,并以dubbo2协议开发为例进行说明。

1 项目概览

Kindling collector项目作为Go端分析器,使用类似opentelmetry的pinpeline进行数据分析。其中涉及5个组件:

  • UdsReceiver - Unix Domain Socket接收器,接收探针事件并传递给后续的网络分析器。
  • NetworkAnalyzer - 网络事件分析器,将接收的Read / Write事件匹配成单次请求后,根据协议解析请求内容生成关键指标,传递给后续的分析器。
  • K8sMetadataProcessor - K8S指标处理器,补充K8S指标并传递给后续的聚合处理器
  • AggregateProcessor - 数据聚合处理器,将接收的指标数据生成Trace和Metric,传递给给后续的转发器。
  • OtelExporter - Opentelmetry转发器,将Trace / Metric数据转发给Prometheus进行展示。

其中协议解析流程主要在NetworkAnalyzer组件中进行,将接收的请求/响应事件成对匹配后,交由parseProtocols()函数解析出协议指标。

1.png

1.1 协议解析流程

NetworkAnnalyzer.parseProtocols()方法定义了整体解析流程,根据协议解析器分别解析请求和响应,当最终都成功时输出指标。

2.png

1.2 协议解析设计思路

正常的协议解析只负责逐帧解析指标功能。

3.png

现已支持5种协议解析,当协议越来越多时,遍历引起的解析会越来越耗时,那么需引入fastfail快速识别协议

4.png

对于复杂的多报文协议,如Kafka有不同的API报文,而相同API也有不同的版本报文。将所有报文解析逻辑都写在一起会使整个类过于臃肿且不易维护。为此引入树形多报文结构用于快速且低耦合地实现开发。

5.png

1.2.1 报文解析结构体

在树形报文解析过程中,有如下2个场景需要考虑

  • 当父协议解析了指标A,子协议解析可能会用到A指标,那么父子协议解析的指标需要透传。
  • 父协议已解析了部分报文长度的内容,那么子协议在开始解析时可直接跳过相应长度的内容进行解析,此处引入偏移量用于下一个协议跳过解析。

定义PayloadMessage,封装报文内容、读取偏移量和指标存储的Map。

type PayloadMessage struct {
    Data         []byte
    Offset       int
    attributeMap *model.AttributeMap
}

1.2.2 报文解析API

由于引入协议树,协议解析过程parse() (ok bool)将不再适用。协议树中的个协议的解析成功不表示整个协议解析成功,需解析整颗树的协议是否成功,将API扩展为parse() (ok bool, complete bool)。

  • 对于单层协议(HTTP),返回为(ok, true)

6.png

基于以上几点需求,设计树形结构的报文解析器PkgParser。PkgParser定义了fastFail(快速识别失败) 和parser(解析单个报文)函数;每个协议只需注册自身的PkgParser即可接入整套流程。

fastFail(message *PayloadMessage) (fail bool)

  • 声明协议是否识别失败,用于快速识别协议

parser(message *PayloadMessage) (ok bool, complete bool)

  • 解析协议,将解析出的指标存储到message的Attributes中  
  • 返回是2个参数:      

         -是否解析成功      

         -是否解析完成 (默认为true,当为false主要是用于嵌套解析过程,例如先定义一个头解析,再定义不同的消息体解析)。

1.3 请求 / 响应解析

ProtocolParser定义了请求和响应的解析器,并提供ParseRequest()和ParseResponse() API用于解析请求和响应 其中response的message携带了request解析出的attributes指标,用于匹配。eg. kafka的correlationId request和response报文需一致,且response报文解析用到了request的key。

7.png

2 开发流程

8.png

2.1 添加协议名

const (
    HTTP      = "http"
  ...
    XX        = "xx" // 此处替换具体协议名
    ...
)

2.2 创建协议

analyzer/network/protocol目录下创建文件夹xx,xx替换为具体协议,并创建3个文件xx_parser.go、xx_request.go 和 xx_response.go

analyzer/network/protocol/xx
├── xx_parser.go           协议解析器
├── xx_request.go          实现请求解析流程
└── xx_response.go          实现响应解析流程

2.2.1 xx_request.go

实现fastfail()和parser()函数

func fastfailXXRequest() protocol.FastFailFn {
    return func(message *protocol.PayloadMessage) bool {
        // 根据报文实现具体的fastFail()函数
        return false
    }
}
func parseXXRequest() protocol.ParsePkgFn {
    return func(message *protocol.PayloadMessage) (bool, bool) {
        // 解析报文内容
        contentKey := getContentKey(message.Data)
        if contentKey == "" {
            // 第一个参数false 表示解析失败,第二个参数表示报文解析完成
            return false, true
        }
        // 通过AddStringAttribute() 或 AttIntAttribute() 存储解析出的属性
        message.AddStringAttribute(constlabels.ContentKey, contentKey)
        // 解析成功
        return true, true
    }
}

2.2.2 xx_response.go

实现fastfail()和parser()函数

func fastfailXXResponse() protocol.FastFailFn {
    return func(message *protocol.PayloadMessage) bool {
        // 根据报文实现具体的fastFail()函数
        return false
    }
}
func parseXXResponse() protocol.ParsePkgFn {
    return func(message *protocol.PayloadMessage) (bool, bool) {
        // 通过GetStringAttribute() 或 GetIntAttribute() 读取request解析后的参数
        contentKey := message.GetStringAttribute(constlabels.ContentKey)
        // 解析响应报文
        errorCode := getErrorCode(message)
        if errorCode > 20 {
            // 有errorCode或errorMsg等常见,需定义IsError为true用于后续processor生成Metric
            message.AddBoolAttribute(constlabels.IsError, true)
            message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
        }
        message.AddStringAttribute(constlabels.XXErrorCode, errorCode)
        // 解析成功
        return true, true
    }
}

2.2.3 xx_parser.go

定义协议解析器

func NewXXParser() *protocol.ProtocolParser {
    requestParser := protocol.CreatePkgParser(fastfailXXRequest(), parseXXRequest())
    // 当存在嵌套的协议解析 eg. 解析头 + 解析各类不同报文
    // 可通过 Add()添加子协议,生成一颗协议树解析,顶部是公共部分解析,分叉是各个不同报文解析
    //             Header
    //             / | \
    //         API1 API2 API3
    //         /|\
    //       v1 v2 v3
    responseParser := protocol.CreatePkgParser(fastfailXXResponse(), parseXXResponse())
    return protocol.NewProtocolParser(protocol.XX, requestParser, responseParser, nil)
}

2.2.4 factory.go

注册协议解析器

var (
    ...
    xx_parser   *protocol.ProtocolParser = xx.NewXXParser()
)
func GetParser(key string) *protocol.ProtocolParser {
    switch key {
        ...
        case protocol.XX:
            return xx_parser
        ...
        default:
            return nil
    }
}

2.2.5 kindling-collector-config.yml

配置新增协议

analyzers:
  networkanalyzer:
    protocol_parser: [ http, mysql, dns, redis, kafka, xx ]

3 开发案例 - Dubbo2协议

3.1 dubbo2协议分析

dubbo_protocol_header.png

根据官网提供的协议规范,解析网络抓包的数据。

  • 前2个byte为魔数,可用于fastfail()方法
  • 第3个byte包含Req/Resp、序列化方式等信息,可用于解析协议中判断是否合法报文。
  • 第4个byte用于返回报文的错误码
  • 第16个byte开始需通过指定的序列化方式解析报文内容,service name + method name可用于contentKey标识该请求的URL

mmexport1654568098519.png

3.2 声明协议名

const (
    ...
    DUBBO2    = "dubbo2"
    ...
)

3.3 实现dubbo2解析

创建协议相关文件

kindling/collector/analyzer/network/protocol/dubbo2
├── dubbo2_parser.go            Dubbo2解析器
├── dubbo2_request.go           实现请求解析流程
├── dubbo2_response.go          实现响应解析流程
└── dubbo2_serialize.go         Dubbo2反序列化器

3.3.1 dubbo2_request.go

声明request请求的fastFail函数

  • dubbo2有魔数0xdabb可快速识别
func fastfailDubbo2Request() protocol.FastFailFn {
    return func(message *protocol.PayloadMessage) bool {
        return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
    }
}

声明request请求的解析函数

  • 将解析出 服务/方法作为 类似于URL的Key
  • 存储报文内容
func parseDubbo2Request() protocol.ParsePkgFn {
    return func(message *protocol.PayloadMessage) (bool, bool) {
        contentKey := getContentKey(message.Data)
        if contentKey == "" {
            return false, true
        }
        message.AddStringAttribute(constlabels.ContentKey, contentKey)
        message.AddStringAttribute(constlabels.Dubbo2RequestPayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
        return true, true
    }
}

解析Dubbo2请求

  • 过滤非法请求
  • 考虑到dubbo2存在单向和心跳请求,这些请求不做解析
  • 根据报文结构解析相应指标
func getContentKey(requestData []byte) string {
    serialID := requestData[2] & SerialMask
    if serialID == Zero {
        return ""
    }
    if (requestData[2] & FlagEvent) != Zero {
        return "Heartbeat"
    }
    if (requestData[2] & FlagRequest) == Zero {
        // Invalid Data
        return ""
    }
    if (requestData[2] & FlagTwoWay) == Zero {
        // Ignore Oneway Data
        return "Oneway"
    }
    serializer := GetSerializer(serialID)
    if serializer == serialUnsupport {
        // Unsupport Serial. only support hessian and fastjson.
        return "UnSupportSerialFormat"
    }
    var (
        service string
        method  string
    )
    // version
    offset := serializer.eatString(requestData, 16)
    // service name
    offset, service = serializer.getStringValue(requestData, offset)
    // service version
    offset = serializer.eatString(requestData, offset)
    // method name
    _, method = serializer.getStringValue(requestData, offset)
    return service + "#" + method
}

3.3.2 dubbo2_serialize.go

由于dubbo2内置了多套序列化方式,先定义接口dubbo2Serializer

type dubbo2Serializer interface {
    eatString(data []byte, offset int) int
    getStringValue(data []byte, offset int) (int, string)
}
dubbo2默认的序列化方式是hessian2,此处实现hessian2方式
type dubbo2Hessian struct{}
func (dh *dubbo2Hessian) eatString(data []byte, offset int) int {
    dataLength := len(data)
    if offset >= dataLength {
        return dataLength
    }
    tag := data[offset]
    if tag >= 0x30 && tag <= 0x33 {
        if offset+1 == dataLength {
            return dataLength
        }
        // [x30-x34] <utf8-data>
        return offset + 2 + int(tag-0x30)<<8 + int(data[offset+1])
    } else {
        return offset + 1 + int(tag)
    }
}
func (dh *dubbo2Hessian) getStringValue(data []byte, offset int) (int, string) {
    dataLength := len(data)
    if offset >= dataLength {
        return dataLength, ""
    }
    var stringValueLength int
    tag := data[offset]
    if tag >= 0x30 && tag <= 0x33 {
        if offset+1 == dataLength {
            return dataLength, ""
        }
        // [x30-x34] <utf8-data>
        stringValueLength = int(tag-0x30)<<8 + int(data[offset+1])
        offset += 2
    } else {
        stringValueLength = int(tag)
        offset += 1
    }
    if offset+stringValueLength >= len(data) {
        return dataLength, string(data[offset:])
    }
    return offset + stringValueLength, string(data[offset : offset+stringValueLength])
}

对外暴露公共方法,用于获取序列化方式

var (
    serialHessian2  = &dubbo2Hessian{}
    serialUnsupport = &dubbo2Unsupport{}
)
func GetSerializer(serialID byte) dubbo2Serializer {
    switch serialID {
    case SerialHessian2:
        return serialHessian2
    default:
        return serialUnsupport
    }
}

3.3.3 dubbo2_response.go

声明response响应的fastFail函数

  • 与request类似,采用魔数0xdabb可快速识别
func fastfailDubbo2Response() protocol.FastFailFn {
    return func(message *protocol.PayloadMessage) bool {
        return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
    }
}

声明response响应的解析函数

  • 根据 status解析出对应的errorCode
  • 存储报文内容
func parseDubbo2Response() protocol.ParsePkgFn {
    return func(message *protocol.PayloadMessage) (bool, bool) {
        errorCode := getErrorCode(message.Data)
        if errorCode == -1 {
            return false, true
        }
        message.AddIntAttribute(constlabels.Dubbo2ErrorCode, errorCode)
        if errorCode > 20 {
            // 有errorCode或errorMsg等常见,需定义IsError为true用于后续processor生成Metric
            message.AddBoolAttribute(constlabels.IsError, true)
            message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
        }
        message.AddStringAttribute(constlabels.Dubbo2ResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
        return true, true
    }
}

解析Dubbo2响应

  • 过滤非法响应
  • 根据报文结构解析相应指标
func getErrorCode(responseData []byte) int64 {
    SerialID := responseData[2] & SerialMask
    if SerialID == Zero {
        return -1
    }
    if (responseData[2] & FlagEvent) != Zero {
        return 20
    }
    if (responseData[2] & FlagRequest) != Zero {
        // Invalid Data
        return -1
    }
    return int64(responseData[3])
}

3.3.4 dubbo2_parser.go

声明dubbo2解析器

  • 通过CreatePkgParser()分别定义Reques / Response解析器
  • 通过NewProtocolParser()将Request / Response解析器生成Dubbo2解析器
func NewDubbo2Parser() *protocol.ProtocolParser {
    requestParser := protocol.CreatePkgParser(fastfailDubbo2Request(), parseDubbo2Request())
    responseParser := protocol.CreatePkgParser(fastfailDubbo2Response(), parseDubbo2Response())
    return protocol.NewProtocolParser(protocol.DUBBO2, requestParser, responseParser, nil)
}

3.4 注册dubbo2解析器

在factory.go中注册dubbo2协议的解析器

var (
    ...
    dubbo2_parser   *protocol.ProtocolParser = dubbo2.NewDubbo2Parser()
)
func GetParser(key string) *protocol.ProtocolParser {
    switch key {
        ...
        case protocol.DUBBO2:
            return dubbo2_parser
        ...
        default:
            return nil
    }
}

3.5 声明支持协议

在deploy/kindling-collector-config.yml中声明dubbo2协议

analyzers:
  networkanalyzer:
    protocol_parser: [ http, mysql, dns, redis, kafka, dubbo2 ]
    protocol_config:
      - key: "dubbo2"
        payload_length: 200



Kindling是一款基于eBPF的云原生可观测性开源工具,旨在帮助用户更好更快的定界云原生系统问题,并致力于打造云原生全故障域的定界能力。

Kindling项目地址:Kindling

在云可观测性方面有任何疑问欢迎与我们联系:Kindling官网

目录
相关文章
|
16天前
|
Cloud Native API C#
.NET云原生应用实践(一):从搭建项目框架结构开始
.NET云原生应用实践(一):从搭建项目框架结构开始
|
2月前
|
JSON Dubbo Java
【Dubbo协议指南】揭秘高性能服务通信,选择最佳协议的终极攻略!
【8月更文挑战第24天】在分布式服务架构中,Apache Dubbo作为一款高性能的Java RPC框架,支持多种通信协议,包括Dubbo协议、HTTP协议及Hessian协议等。Dubbo协议是默认选择,采用NIO异步通讯,适用于高要求的内部服务通信。HTTP协议通用性强,利于跨语言调用;Hessian协议则在数据传输效率上有优势。选择合适协议需综合考虑性能需求、序列化方式、网络环境及安全性等因素。通过合理配置,可实现服务性能最优化及系统可靠性提升。
46 3
|
2月前
|
Kubernetes 监控 Cloud Native
eBPF技术大揭秘:一张全景图彻底改变Kubernetes问题排查,助你成为云原生时代的超级英雄!
【8月更文挑战第8天】在云原生时代,Kubernetes作为容器编排的标准,其问题排查变得日益复杂。eBPF技术无需改动内核即可编写高效、安全的内核程序,实现系统细粒度观测与控制。近期发布的基于eBPF的Kubernetes问题排查全景图,展示了如何利用eBPF监控资源使用、网络性能及调度策略等,例如通过eBPF程序监控CPU使用率。此全景图有助于快速定位如高CPU使用率等问题所在Pod,进而优化配置或调整调度。
92 8
|
2月前
|
C# 开发者 Windows
勇敢迈出第一步:手把手教你如何在WPF开源项目中贡献你的第一行代码,从选择项目到提交PR的全过程解析与实战技巧分享
【8月更文挑战第31天】本文指导您如何在Windows Presentation Foundation(WPF)相关的开源项目中贡献代码。无论您是初学者还是有经验的开发者,参与这类项目都能加深对WPF框架的理解并拓展职业履历。文章推荐了一些适合入门的项目如MvvmLight和MahApps.Metro,并详细介绍了从选择项目、设置开发环境到提交代码的全过程。通过具体示例,如添加按钮点击事件处理程序,帮助您迈出第一步。此外,还强调了提交Pull Request时保持专业沟通的重要性。参与开源不仅能提升技能,还能促进社区交流。
39 0
|
3月前
|
存储 监控 Cloud Native
kubevela可观测体系问题之KubeVela云原生时代可观测性挑战的问题如何解决
kubevela可观测体系问题之KubeVela云原生时代可观测性挑战的问题如何解决
|
3月前
|
存储 Kubernetes Cloud Native
云原生周刊:Score 成为 CNCF 沙箱项目
以下是内容的摘要,格式为Markdown: 开源项目: - [Trident]:NetApp维护的开源存储解决方案,支持容器化应用的持久化存储,兼容CSI接口。 - [Monokle]:Kubernetes YAML编辑器,简化配置创建、分析和部署。 - [Platform Aware Scheduling]:模块化策略驱动的Kubernetes调度器扩展,考虑平台特性。 - [cdebug]):容器和Pod故障排查工具,提供端口转发、文件系统导出等功能。
|
4月前
|
弹性计算 监控 Cloud Native
构建多模态模型,生成主机观测指标,欢迎来战丨2024天池云原生编程挑战赛
本次比赛旨在如何通过分析 ECS 性能数据和任务信息,综合利用深度学习、序列分析等先进技术,生成特定机器的性能指标。参赛者的解决方案将为云资源管理和优化决策提供重要参考,助力云计算资源的高效稳定运行和智能化调度。
641 14
|
5月前
|
自然语言处理 监控 Cloud Native
对话阿里云云原生产品负责人李国强:推进可观测产品与OpenTelemetry开源生态全面融合
阿里云宣布多款可观测产品全面升级,其中,应用实时监控服务 ARMS 在业内率先推进了与 OpenTelemetry 开源生态的全面融合,极大丰富了可观测的数据类型及规模,大幅增强了 ARMS 核心能力。本次阿里云 ARMS 产品全面升级的背景是什么?为什么会产生围绕 OpenTelemetry 进行产品演进的核心策略?在云原生、大模型等新型应用架构类型层出不穷的今天,又将如何为企业解决新的挑战?阿里云云原生应用平台产品负责人李国强接受采访解答了这些疑问,点击本文走进全新升级的阿里云可观测产品。
42003 11
|
4月前
|
人工智能 监控 Cloud Native
多款可观测产品全面升级丨阿里云云原生 5 月产品月报
多款可观测产品全面升级丨阿里云云原生 5 月产品月报。
|
5月前
|
存储 Prometheus 运维
【阿里云云原生专栏】云原生下的可观测性:阿里云 ARMS 与 Prometheus 集成实践
【5月更文挑战第25天】阿里云ARMS与Prometheus集成,为云原生环境的可观测性提供强大解决方案。通过集成,二者能提供全面精准的应用监控,统一管理及高效告警,助力运维人员及时应对异常。集成示例代码展示配置方式,但需注意数据准确性、监控规划等问题。这种集成将在云原生时代发挥关键作用,不断进化以优化用户体验,推动业务稳定发展。
217 0