使用流式计算引擎 eKuiper 处理 Protocol Buffers 数据

本文涉及的产品
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 1个月
简介: 本文将讲解如何在流式计算引擎eKuiper中处理Protocol Buffers格式数据,从而实现高效的云边协同数据传输,缓解云边传输带宽紧张问题。

Protocol Buffers (Protobuf) 是一种语言中立、平台中立的可扩展机制,用于序列化结构化数据的二进制传输格式。相比常规数据传输格式(如 JSON 或 XML),Protobuf 更加高效和快速并节省传输带宽,因此得到了广泛的应用。

在云边协同架构中,往往既需要发送数据到云端,同时也需要接收云端发送过来的数据,进行云边协同计算。大规模的云边协同计算传输的数据总量巨大,在公网带宽资源有限而且昂贵的情况下,采用更紧凑的数据传输格式显得尤为重要。

LF Edge eKuiper 是适合部署于资源受限的边缘端的超轻量物联网边缘数据流式分析引擎,可通过 source 和 sink 连接 MQTT、HTTP 等各种通信协议的外部系统。eKuiper 支持配置 source/sink 的传输数据的编解码格式,目前可支持 JSON、ProtoBuf 和 Binary 格式。

本文将以 Protobuf 格式为例,讲解如何在 eKuiper 中设置编解码格式,通过 source 读入并解析该格式的数据以及在 sink 中使用该格式编码写入,从而实现高效的云边协同数据传输,缓解云边传输带宽紧张问题。

本教程采用 eKuiper Manager 进行规则的创建和管理,请参考 UI 教程。您也可以采用 REST API 或者在 eKuiper 运行的边端运行 命令行工具来完成相同的规则管理操作。

环境准备

开始动手操作之前,需要准备以下环境:

  • MQTT 服务器用于数据传输。 本教程使用位于 tcp://broker.emqx.io:1883 的 MQTT 服务器, broker.emqx.io 是一个由 EMQX Cloud 提供的公共 MQTT 服务器。若本地运行 eKuiper,需要更改 etc/mqtt_source.yaml,配置项 server 改为"tcp://broker.emqx.io:1883";若使用 docker 启动,应设置环境变量 MQTT_SOURCEDEFAULTSERVER="tcp://broker.emqx.io:1883"。
  • 为了方便观察运行结果,我们需要安装一个 MQTT 客户端,例如 MQTT X

模式注册(Schema Registry)

相比于无模式的 JSON 格式,Protobuf 需要提前定义数据结构,即模式。在 proto 文件中,可以包含多个 message 以及其他实体的定义,但是在编解码格式的配置中,只有 message 的定义可以被使用。 本教程中,我们使用以下模式进行数据结构的定义。该文件定义了一个名为 Book 的 message 结构,其中包含字符串类型的 title 和整型的 price。传输的数据将依据此结构对书籍数据进行二进制数据的编解码。

message Book {
  required string title = 1; 
  required int32 price = 2;
}
  1. 注册模式。在管理控制台中,打开配置->模式,点击创建模式。

    eKuiper 注册模式.png

  2. 在模式创建窗口中,如下图所示填写。其中,模式类型选择protobuf;模式名称可输入自定义的不重复的名称作为后续规则创建中模式的标识 id;模式内容可采用文件或者文本内容填写。选择 file 的情况下,需要填写文件所在的 url;本教程使用的模式较为简单,因此可选择 content,然后在内容框中填入 proto 文件的文本。

    在模式创建窗口中.png

  3. 点击提交。在模式列表中应当能够看到新创建的模式。后续可使用操作栏中的按钮进行修改或删除的操作。

    点击提交.png

至此,我们已经注册了名为schema1的模式,其中定义了Book这种类型,在规则的 source 和 sink 中可以使用该注册的模式。用户也可以继续在此界面进行更多的模式注册和管理工作。

读取 Protobuf 数据

本节中,我们以 MQTT source 为例,介绍如何接入并解析基于 Protobuf 编码传输的数据,使之可以在 eKuiper 中进行规则的计算。需要注意的是,在 Source 中,编码格式与传输协议并不是绑定的。任何的 source 类型如 MQTT, httpPull 等都可以搭配不同的编码格式,例如 ProtoBuf 和 JSON 等。

假设我们有一个 MQTT 主题 demo,出于节省传输带宽的目的,里面传输的数据为 Protobuf 编码的二进制数据。接下来,我们将配置 eKuiper 数据源,接入这个主题的数据并进行处理。

  1. 创建数据流:在管理控制台中,选择源管理->流管理,点击创建流。
  2. 配置数据流及其格式:流名称可设置为自定义的不重复的名称;数据源为要监听的 MQTT 主题;流类型设置为 mqtt;流格式选择 protobuf;模式名称选择上一步注册的 schema1;模式消息设置为 proto 文件里定义的 message Book。该配置表示数据流 protoDemo 将监听 MQTT 主题 protoDemo,收到二进制数据后将采用 schema1 中的 Book 的格式进行 protobuf 解码。点击提交,在流列表中应当列出新创建的流。

    配置数据流及其格式.png

  3. 创建规则:选择规则,点击新建规则,进入规则创建界面。如下图所示,右上角点击进入文本模式,输入自定义的规则ID,规则名字,在文本内容中输入规则的 JSON 文本。该规则表示选择流 protoDemo 中的内容,发送到 MQTT 主题 result/protobuf 中。

    {
       "id": "ruleDecode",
       "sql": "SELECT * FROM protoDemo",
       "actions": [{
         "mqtt": {
           "server": "tcp://broker.emqx.io:1883",
           "topic": "result/protobuf",
           "sendSingle": true
         }
       }]
    }

    创建规则.png

  4. 发送数据并查看结果:我们将使用 MQTTX 发送 Protobuf 编码后的二进制数据到 protoDemo 主题中,观察收到的结果是否是解码后的正确数据。

    1. 打开 MQTT X,连接到云端 tcp://broker.emqx.io:1883
    2. 订阅主题上文规则发送结果的主题 result/protobuf,便于观察结果。
    3. 在消息发送窗格中,设置主题为 protoDemo,Payload 格式为 Hex, 发送根据 schema1 中 Book 格式编码的二进制数据,例如 0a1073747265616d696e672073797374656d107b

      在消息发送窗格中设置主题.png

    4. 确保接收窗口收到正确的 JSON 数据,如下图所示。

      接收 JSON 数据.png

至此,我们完成了 Protobuf 数据的读取和解码并用简单的规则进行处理输出。用户像处理普通 JSON 格式数据一样创建各种各样的规则。若未得到预期结果,可在管理控制台的规则列表页面,查看规则状态,确保规则数据入出的指标符合预期。

写入 Protobuf 数据

本节中,我们将展示读取 JSON 格式数据进行处理后采用 Protobuf 格式发送到云端 MQTT broker 的用法。在物联网边云协同的场景中,该用法可节省边云传输的带宽开销。部署在边缘端的 eKuiper 接入本地的 MQTT broker 无需消耗带宽,可通过处理较快的 JSON 格式接入。规则运算之后,计算结果需要发送到云端 MQTT broker 时,可使用 Protobuf 编码节省带宽。

  1. 创建数据流:在管理控制台中,选择源管理->流管理,点击创建流。如下图所示,创建一个连入 demo 主题,JSON 格式数据的流。

    创建数据流.png

  2. 创建规则,使用 Protobuf 格式发送到云端。

    1. 点击新建规则,输入自定义的 Rule ID 和名称,输入 SQL SELECT * FROM demo
    2. 点击动作右边的新建按钮,配置 MQTT 动作。其中,MQTT 服务器地址配置为云端 broker 地址,MQTT 主题为 result/protobufOut;数据按条发送配置为 true,确保收到的为单条数据以匹配格式配置;流格式配置为 protobuf,模式名称为第一节注册的 schema1,模式消息为 Book。该规则将读取 JSON 数据,然后按照 Book 的格式编码成二进制数据发往 result/protobufOut 主题。点击提交,完成动作配置。

      配置 MQTT 动作.png

      配置 MQTT 动作1.png

    3. 每个规则可以有多个动作,每个动作使用的编码格式是独立的。用户可以继续配置其余动作。全部配置完成后,点击提交,完成规则的创建。
  3. 发送数据并查看结果,该流程与上一节类似。本次我们将向 demo 主题发送 JSON 数据,并期望在订阅的 result/protobufOut 主题中查看到 protobuf 编码的二进制数据。如下图所示,注意数据格式的配置以免显示乱码。

    发送数据并查看结果.png

总结

本教程介绍了如何在 eKuiper 中进行 Protobuf 数据的读取和写入。ProtoBuf 格式是 eKuiper 对外连接的格式的一种,各种格式之间可以任意组合,接入系统后使用的都是内部的格式表示。首先,用户需要先定义 Protobuf 的模式;之后在流的创建和动作的创建中可配置 Protobuf 格式,并选择已定义的模式进行数据的编解码。

相关实践学习
自建数据库迁移到云数据库
本场景将引导您将网站的自建数据库平滑迁移至云数据库RDS。通过使用RDS,您可以获得稳定、可靠和安全的企业级数据库服务,可以更加专注于发展核心业务,无需过多担心数据库的管理和维护。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
目录
相关文章
|
数据可视化 PyTorch 算法框架/工具
零一万物Yi-34B-Chat 微调模型及量化版开源!魔搭社区最佳实践教程!
11月24日,零一万物基正式发布并开源微调模型 Yi-34B-Chat,可申请免费商用。同时,零一万物还为开发者提供了 4bit/8bit 量化版模型,Yi-34B-Chat 4bit 量化版模型可以直接在消费级显卡(如RTX3090)上使用。魔搭社区已支持下载、推理训练体验,并推出相关教程,欢迎大家来玩!
|
存储 关系型数据库 MySQL
Nacos 配置数据持久化
Nacos 不仅仅可以作为注册中,还带有配置中心的功能。Nacos 配置默认存储在内存中,对于一些项目往往配置项有成百上千个这些都是非常重要的。在这个场景下我们可以将配置保存到 MySQL 中。来持久化我们的配置,保证 Nacos 重新启动或者服务节点挂掉后配置不会丢失。 环境介绍:nacos-server-1.4.2
1567 0
Nacos 配置数据持久化
使用pip时报错:No module named ‘chardet‘ 的解决办法
使用pip时报错:No module named ‘chardet‘ 的解决办法
2342 0
使用pip时报错:No module named ‘chardet‘ 的解决办法
|
数据库管理
【SQLite】解决unrecognized token:“‘“
【SQLite】解决unrecognized token:“‘“
1482 0
|
开发工具 git Windows
文件过大无法git pull/git clone解决办法
由于公司无线限速, windows下拉代码会比较慢, 导致过大的仓库无法clone/pull下来, 可以尝试以下方法, 将一次拉取的size缩小, 然后再fetch 效率云中代码库过大时, 会有限制, 出现无法pull或者clone的情况, 如下图 首先以shallow模式克隆 例如:  git clone http://gaoyuan03_iwaimai.
6635 0
|
Ubuntu Java Linux
alpine Linux与基于alpine制作JDK8镜像
Docker commit 命令 1.下载基础镜像 2.使用此基础镜像创建/启动/进入容器 3.在容器安装自己需要的软件 4.将保存配置完成的容器提交成镜像 语法如下 docker commit [OPTIONS] CONTAINER [REPOSITORY[:TAG]] OPTIONS说明: -a :提交的镜像作者; -c :使用Dockerfile指令来创建镜像; -m :提交时的说明文字; -p :在commit时,将容器暂停。 实例:将容器a404c6c174a2 保存为新的镜像,并添加提交人信息和说明
|
存储 数据挖掘 大数据
详解阿里云数据中台,一篇文章全面了解大数据“网红”
一直想写一篇关于数据中台正面文章,现在有闲时做些总结,想充分诠释一下DT内部人如何看待数据中台。 数据中台的概念是最早由阿里巴巴首次提出,是为了应对内部众多业务部门千变万化的数据需求和高速时效性的要求而成长起来的,它既要满足业务部门日常性的多个业务前台的数据需求,又要满足像双十一,六一八这样的业务高峰、应对大规模数据的线性可扩展问题、应对复杂活动场景业务系统的解耦问题,而在技术、组织架构等方面采取的一些变革。
26693 0
|
SQL 数据库 索引
内连接(INNER JOIN)在SQL中的简单应用与技巧
在SQL查询中,内连接(INNER JOIN)是一种基本且常用的连接类型,用于从两个或多个表中检索匹配的记录
1047 0
|
机器学习/深度学习 算法
概率分布深度解析:PMF、PDF和CDF的技术指南
本文将深入探讨概率分布,详细阐述概率质量函数(PMF)、概率密度函数(PDF)和累积分布函数(CDF)这些核心概念,并通过实际示例进行说明。
1104 15
概率分布深度解析:PMF、PDF和CDF的技术指南
|
12月前
|
存储 关系型数据库 MySQL
MySQL中的Redo Log、Undo Log和Binlog:深入解析
【10月更文挑战第21天】在数据库管理系统中,日志是保障数据一致性和完整性的关键机制。MySQL作为一种广泛使用的关系型数据库管理系统,提供了多种日志类型来满足不同的需求。本文将详细介绍MySQL中的Redo Log、Undo Log和Binlog,从背景、业务场景、功能、底层实现原理、使用措施等方面进行详细分析,并通过Java代码示例展示如何与这些日志进行交互。
982 0