11-TDengine集成EMQX:通过规则引擎实现设备数据直接入库

简介: 11-TDengine集成EMQX:通过规则引擎实现设备数据直接入库

背景


曾使用过 IoTDB 自带的 MQTT Broker 实现了设备数据入库,那么使用 TDengine 时,我们可以借助 EMQX (一款优秀的国产开源 MQTT Broker )的规则引擎结合 TDengineRESTful API 完成设备数据的路由与入库。


  • 用到的工具


  1. TDengine RESTful API
  2. EMQX 规则引擎
  3. TDengine GUI图形化管理工具
  4. Node.js下的MQTT客户端
  5. 虚拟机CentOS操作系统


  • 版本信息


  1. TDengine: 2.2.0.0
  2. EMQX: 4.2.4
  3. Node.js: 12.16.1
  4. CentOS: 7


TDengine创建数据库表


create database if not exists ok;
create stable if not exists ok.power(ts timestamp, voltage int, current float, temperature float) tags(sn int, city nchar(64), groupid int);
create table if not exists ok.device1 using ok.power tags(1, "太原", 1);
create table if not exists ok.device2 using ok.power tags(2, "西安", 2);
insert into ok.device1 values("2021-09-04 21:03:38.734", 1, 1.0, 1.0);
insert into ok.device2 values("2021-09-04 21:03:40.734", 2, 2.0, 2.0);

初始数据如下:

image.png


EMQX创建资源


所谓的资源就是将要连接的数据库、中间件等,这里便是 TDengine 的连接,通过其 RESTful API 建立连接,在规则引擎的动作响应中会用到这里的资源。

image.png

image.png

其中关于头信息中的 Authorization 通过以下方式获得。

# 获取token
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl hadoop1:6041/rest/login/root/taosdata
{"status":"succ","code":0,"desc":"/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04"}
# 测试:附加自定义token在头信息,正常响应
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'select * from ok.power' hadoop1:6041/rest/sql
{"status":"succ","head":["ts","voltage","current","temperature","sn","city","groupid"],"column_meta":[["ts",9,8],["voltage",4,4],["current",6,4],["temperature",6,4],["sn",4,4],["city",10,64],["groupid",4,4]],"data":[["2021-09-04 21:03:38.734",1,1.00000,1.00000,1,"太原",1],["2021-09-04 21:03:40.734",2,2.00000,2.00000,2,"西安",2]],"rows":2}


EMQX创建规则


  • 创建规则:这里直接从主题device/sn中获取payload,结果命名为power

image.png

  • 测试规则:模拟一条数据,经过测试,定义的规则成功命中。

image.png


EMQX创建动作响应


当命中数据后,我们的目标是将其存入数据库,那么我们一开始定义的 TDengine 资源就派上用场了。


  1. Action选择Data to Web Server表示我们要将数据发送至Web服务(即 TDengineRESTful API


  1. Resource选择我们创建好的资源


  1. 最后填写Payload Template,写入数据表的SQL语句,这里支持插值:insert into ok.device${power.sn} values ('${power.ts}', ${power.voltage}, ${power.currente}, ${power.temperature})

image.png


Node.js模拟MQTT客户端


这里通过 Node.js 模拟一个设备,向主题 device/sn 随机发布数据,完成数据上报,当然也可以借助其他客户端来实现。

image.png


EMQX查看规则引擎Metrics


点击 Rule 菜单下的规则引擎 ID ,可查看已配置的规则详情,还可以看到多少消息被规则命中的度量信息(需刷新页面)。

image.png


TDengine客户端查看数据


数据库中确认写入两条新数据:

image.png


规则引擎扩展


开源版的 EMQX Broker 除了全面支持 MQTT5 新特性、多协议支持外,更强大的地方在于其围绕 MQTT 周边提供了一系列的 WebHookHTTP API 接口以及最为核心的规则引擎。上面我们只是通过主题选择了数据进行规则匹配,其实规则引擎还可以结合一系列的内部事件,编写规则时以$开头,包括客户端连接事件、断开事件、消息确认事件、消息发布事件、订阅事件、取消订阅事件等。

image.png

EMQX Broker 一开始的定位就是物联网消息中间件,目前开源版本功能已经非常强大,而企业版本与Cloud版本更是提供了高阶功能,全托管、更稳定、更可靠,技术支持更及时。以下是我试用的Cloud版本。

image.png


可能遇到的问题


  • 端口开放问题


因为通过宿主机访问虚拟机,所以记得关闭防火墙或者开放对应的端口,这里涉及到的端口有:


  1. 6041:TDengine的RESTful API默认端口
  2. 1883:EMQX的MQTT默认端口
  3. 18083:EMQX的Dashboard默认端口
# 关闭防火墙
[root@hadoop1 ~]# systemctl stop firewalld.service
# 放行端口
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 6041 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 1883 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 18083 -j ACCEPT


  • 主题名称不匹配导致规则无法命中


作为约定俗成的实践,一般在编码时 MQTT 的主题不以 / 开头,即写作 device/sn ,而不是 /device/sn


刚开始我在 MQTT 客户端发送数据时主题名为 /device/sn ,而规则引擎中的主题为 device/sn ,导致无法匹配。


  • SQL模板中的字符串


这里的ts以字符串形式发送,因此需要将插值用引号括起来:'${power.ts}'。否则 TDengine 日志报错:

[root@hadoop1 taos]# tailf ./log/taosdlog.0
09/23 08:42:11.707621 00001702 TSC ERROR 0x8e async result callback, code:Syntax error in SQL
09/23 08:42:11.707675 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, user:root, query error, code:Syntax error in SQL, sqlObj:0x7f5f74000c10
09/23 08:42:11.725687 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, code:400, error:Syntax error in SQL
  • 规则引擎的Metrics计数与实际发送数据不符


这与客户端发送数据指定的 QoS 相关,如果 QoS = 1 ,则MQTT协议的重发机制可能导致数据重复发送。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
26天前
|
数据采集 供应链 搜索推荐
数据集成:融合不同来源的数据
【6月更文挑战第4天】数据集成在企业中发挥关键作用,连接数据孤岛,促进信息流动,提升决策能力。通过抽取、清洗、转换和加载(ETL)不同来源、格式的数据,整合到统一框架,进行深度分析。以零售商为例,集成销售、客户和供应链数据可优化库存管理。数据清洗确保质量,转换满足分析需求,最终加载到数据仓库。Python和pandas库是实现这一过程的工具之一。随着技术进步,数据集成将推动企业向智能化和个性化发展。
46 2
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
86 2
|
2天前
|
传感器 安全 物联网
物联网(IoT)设备的硬件选型与集成技术博文
【6月更文挑战第28天】物联网设备硬件选型与集成聚焦关键要素:功能匹配、性能稳定性、兼容扩展及成本效益。嵌入式系统、通信协议、数据处理和安全性技术确保集成效果,支撑高效、智能的IoT系统,驱动家居、城市与工业自动化变革。
|
11天前
|
easyexcel Java API
SpringBoot集成EasyExcel 3.x:高效实现Excel数据的优雅导入与导出
SpringBoot集成EasyExcel 3.x:高效实现Excel数据的优雅导入与导出
35 1
|
20天前
|
SQL DataWorks 安全
DataWorks产品使用合集之在进行测试数据集成时,目标库的数据是源库数据的3倍量,是什么导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之在进行测试数据集成时,目标库的数据是源库数据的3倍量,是什么导致的
|
2天前
|
SQL 数据采集 DataWorks
DataWorks操作报错合集之数据集成里面的数据调度独享资源组测试通过了,但是数据地图里无法通过,该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6天前
|
SQL API 调度
Springboot2.4.5集成Quartz实现动态任务数据持久化-不怕重启服务
Springboot2.4.5集成Quartz实现动态任务数据持久化-不怕重启服务
11 0
|
6天前
|
存储 并行计算 算法
TDengine 3.2.3.0 集成英特尔 AVX512!快来看看为你增添了哪些助力
TDengine 3.2.3.0 引入AVX-512,提升时序数据库性能。通过SIMD指令集,AVX-512加速数据处理,尤其在Simple8B和ZigZag算法优化中,实现并行计算,减少指令数。在支持AVX-512的CPU上运行,解码性能提升1.82倍(对比软件解码)和1.28倍(对比AVX2)。蒋锴,英特尔专家,对优化工作贡献卓著。需使用gcc 9+编译并确保CPU支持AVX-512。
14 0
|
10天前
|
监控 安全 机器人
|
22天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之DataWorks是否支持通过SQL方式在MaxCompute中查询数据,并通过数据集成服务将查询结果同步至MySQL数据库
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。