11-TDengine集成EMQX:通过规则引擎实现设备数据直接入库

简介: 11-TDengine集成EMQX:通过规则引擎实现设备数据直接入库

背景


曾使用过 IoTDB 自带的 MQTT Broker 实现了设备数据入库,那么使用 TDengine 时,我们可以借助 EMQX (一款优秀的国产开源 MQTT Broker )的规则引擎结合 TDengineRESTful API 完成设备数据的路由与入库。


  • 用到的工具


  1. TDengine RESTful API
  2. EMQX 规则引擎
  3. TDengine GUI图形化管理工具
  4. Node.js下的MQTT客户端
  5. 虚拟机CentOS操作系统


  • 版本信息


  1. TDengine: 2.2.0.0
  2. EMQX: 4.2.4
  3. Node.js: 12.16.1
  4. CentOS: 7


TDengine创建数据库表


create database if not exists ok;
create stable if not exists ok.power(ts timestamp, voltage int, current float, temperature float) tags(sn int, city nchar(64), groupid int);
create table if not exists ok.device1 using ok.power tags(1, "太原", 1);
create table if not exists ok.device2 using ok.power tags(2, "西安", 2);
insert into ok.device1 values("2021-09-04 21:03:38.734", 1, 1.0, 1.0);
insert into ok.device2 values("2021-09-04 21:03:40.734", 2, 2.0, 2.0);

初始数据如下:

image.png


EMQX创建资源


所谓的资源就是将要连接的数据库、中间件等,这里便是 TDengine 的连接,通过其 RESTful API 建立连接,在规则引擎的动作响应中会用到这里的资源。

image.png

image.png

其中关于头信息中的 Authorization 通过以下方式获得。

# 获取token
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl hadoop1:6041/rest/login/root/taosdata
{"status":"succ","code":0,"desc":"/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04"}
# 测试:附加自定义token在头信息,正常响应
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'select * from ok.power' hadoop1:6041/rest/sql
{"status":"succ","head":["ts","voltage","current","temperature","sn","city","groupid"],"column_meta":[["ts",9,8],["voltage",4,4],["current",6,4],["temperature",6,4],["sn",4,4],["city",10,64],["groupid",4,4]],"data":[["2021-09-04 21:03:38.734",1,1.00000,1.00000,1,"太原",1],["2021-09-04 21:03:40.734",2,2.00000,2.00000,2,"西安",2]],"rows":2}


EMQX创建规则


  • 创建规则:这里直接从主题device/sn中获取payload,结果命名为power

image.png

  • 测试规则:模拟一条数据,经过测试,定义的规则成功命中。

image.png


EMQX创建动作响应


当命中数据后,我们的目标是将其存入数据库,那么我们一开始定义的 TDengine 资源就派上用场了。


  1. Action选择Data to Web Server表示我们要将数据发送至Web服务(即 TDengineRESTful API


  1. Resource选择我们创建好的资源


  1. 最后填写Payload Template,写入数据表的SQL语句,这里支持插值:insert into ok.device${power.sn} values ('${power.ts}', ${power.voltage}, ${power.currente}, ${power.temperature})

image.png


Node.js模拟MQTT客户端


这里通过 Node.js 模拟一个设备,向主题 device/sn 随机发布数据,完成数据上报,当然也可以借助其他客户端来实现。

image.png


EMQX查看规则引擎Metrics


点击 Rule 菜单下的规则引擎 ID ,可查看已配置的规则详情,还可以看到多少消息被规则命中的度量信息(需刷新页面)。

image.png


TDengine客户端查看数据


数据库中确认写入两条新数据:

image.png


规则引擎扩展


开源版的 EMQX Broker 除了全面支持 MQTT5 新特性、多协议支持外,更强大的地方在于其围绕 MQTT 周边提供了一系列的 WebHookHTTP API 接口以及最为核心的规则引擎。上面我们只是通过主题选择了数据进行规则匹配,其实规则引擎还可以结合一系列的内部事件,编写规则时以$开头,包括客户端连接事件、断开事件、消息确认事件、消息发布事件、订阅事件、取消订阅事件等。

image.png

EMQX Broker 一开始的定位就是物联网消息中间件,目前开源版本功能已经非常强大,而企业版本与Cloud版本更是提供了高阶功能,全托管、更稳定、更可靠,技术支持更及时。以下是我试用的Cloud版本。

image.png


可能遇到的问题


  • 端口开放问题


因为通过宿主机访问虚拟机,所以记得关闭防火墙或者开放对应的端口,这里涉及到的端口有:


  1. 6041:TDengine的RESTful API默认端口
  2. 1883:EMQX的MQTT默认端口
  3. 18083:EMQX的Dashboard默认端口
# 关闭防火墙
[root@hadoop1 ~]# systemctl stop firewalld.service
# 放行端口
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 6041 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 1883 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 18083 -j ACCEPT


  • 主题名称不匹配导致规则无法命中


作为约定俗成的实践,一般在编码时 MQTT 的主题不以 / 开头,即写作 device/sn ,而不是 /device/sn


刚开始我在 MQTT 客户端发送数据时主题名为 /device/sn ,而规则引擎中的主题为 device/sn ,导致无法匹配。


  • SQL模板中的字符串


这里的ts以字符串形式发送,因此需要将插值用引号括起来:'${power.ts}'。否则 TDengine 日志报错:

[root@hadoop1 taos]# tailf ./log/taosdlog.0
09/23 08:42:11.707621 00001702 TSC ERROR 0x8e async result callback, code:Syntax error in SQL
09/23 08:42:11.707675 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, user:root, query error, code:Syntax error in SQL, sqlObj:0x7f5f74000c10
09/23 08:42:11.725687 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, code:400, error:Syntax error in SQL
  • 规则引擎的Metrics计数与实际发送数据不符


这与客户端发送数据指定的 QoS 相关,如果 QoS = 1 ,则MQTT协议的重发机制可能导致数据重复发送。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
JSON API 数据处理
Winform管理系统新飞跃:无缝集成SqlSugar与Web API,实现数据云端同步的革新之路!
【8月更文挑战第3天】在企业应用开发中,常需将Winform桌面应用扩展至支持Web API调用,实现数据云端同步。本文通过实例展示如何在已有SqlSugar为基础的Winform系统中集成HTTP客户端调用Web API。采用.NET的`HttpClient`处理请求,支持异步操作。示例包括创建HTTP辅助类封装请求逻辑及在Winform界面调用API更新UI。此外,还讨论了跨域与安全性的处理策略。这种方法提高了系统的灵活性与扩展性,便于未来的技术演进。
252 2
|
1月前
|
存储 JSON Ubuntu
时序数据库 TDengine 支持集成开源的物联网平台 ThingsBoard
本文介绍了如何结合 Thingsboard 和 TDengine 实现设备管理和数据存储。Thingsboard 中的“设备配置”与 TDengine 中的超级表相对应,每个设备对应一个子表。通过创建设备配置和设备,实现数据的自动存储和管理。具体操作包括创建设备配置、添加设备、写入数据,并展示了车辆实时定位追踪和车队维护预警两个应用场景。
57 3
|
4月前
|
SQL 运维 监控
SLS 数据加工全面升级,集成 SPL 语法
在系统开发、运维过程中,日志是最重要的信息之一,其最大的优点是简单直接。SLS 数据加工功能旨在解决非结构化的日志数据处理,当前全面升级,集成 SPL 语言、更强的数据处理性能、更优的使用成本。
18180 139
|
3月前
|
监控 数据安全/隐私保护 异构计算
借助PAI-EAS一键部署ChatGLM,并应用LangChain集成外部数据
【8月更文挑战第8天】借助PAI-EAS一键部署ChatGLM,并应用LangChain集成外部数据
96 1
|
3月前
|
JSON 数据管理 关系型数据库
【Dataphin V3.9】颠覆你的数据管理体验!API数据源接入与集成优化,如何让企业轻松驾驭海量异构数据,实现数据价值最大化?全面解析、实战案例、专业指导,带你解锁数据整合新技能!
【8月更文挑战第15天】随着大数据技术的发展,企业对数据处理的需求不断增长。Dataphin V3.9 版本提供更灵活的数据源接入和高效 API 集成能力,支持 MySQL、Oracle、Hive 等多种数据源,增强 RESTful 和 SOAP API 支持,简化外部数据服务集成。例如,可轻松从 RESTful API 获取销售数据并存储分析。此外,Dataphin V3.9 还提供数据同步工具和丰富的数据治理功能,确保数据质量和一致性,助力企业最大化数据价值。
171 1
|
3月前
|
开发框架 .NET 数据库连接
闲话 Asp.Net Core 数据校验(三)EF Core 集成 FluentValidation 校验数据例子
闲话 Asp.Net Core 数据校验(三)EF Core 集成 FluentValidation 校验数据例子
|
3月前
|
Java 测试技术 容器
从零到英雄:Struts 2 最佳实践——你的Web应用开发超级变身指南!
【8月更文挑战第31天】《Struts 2 最佳实践:从设计到部署的全流程指南》深入介绍如何利用 Struts 2 框架从项目设计到部署的全流程。从初始化配置到采用 MVC 设计模式,再到性能优化与测试,本书详细讲解了如何构建高效、稳定的 Web 应用。通过最佳实践和代码示例,帮助读者掌握 Struts 2 的核心功能,并确保应用的安全性和可维护性。无论是在项目初期还是后期运维,本书都是不可或缺的参考指南。
50 0
|
3月前
|
SQL 存储 数据管理
掌握SQL Server Integration Services (SSIS)精髓:从零开始构建自动化数据提取、转换与加载(ETL)流程,实现高效数据迁移与集成——轻松上手SSIS打造企业级数据管理利器
【8月更文挑战第31天】SQL Server Integration Services (SSIS) 是 Microsoft 提供的企业级数据集成平台,用于高效完成数据提取、转换和加载(ETL)任务。本文通过简单示例介绍 SSIS 的基本使用方法,包括创建数据包、配置数据源与目标以及自动化执行流程。首先确保安装了 SQL Server Data Tools (SSDT),然后在 Visual Studio 中创建新的 SSIS 项目,通过添加控制流和数据流组件,实现从 CSV 文件到 SQL Server 数据库的数据迁移。
189 0
|
5月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
53 0
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
|
4月前
|
SQL DataWorks 关系型数据库
DataWorks产品使用合集之数据集成时源头提供数据库自定义函数调用返回数据,数据源端是否可以写自定义SQL实现
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。

热门文章

最新文章

下一篇
无影云桌面