MQTT Broker 规则引擎入门:快速指南

简介: EMQX MQTT Broker 的规则引擎功能在 MQTT 消息转换和数据集成方面起着重要作用。本文将提供一份快速入门指南,通过实例帮助您快速上手 MQTT 规则引擎。

引言

规则引擎是一种能够根据输入数据按照预设规则进行决策或执行动作的软件系统。本文将向您介绍 EMQX MQTT Broker 的规则引擎功能,并阐述其在 MQTT 消息转换和数据集成方面的重要作用。同时,我们还将提供一份快速入门指南,通过实例帮助您快速上手 MQTT 规则引擎。

MQTT 的规则引擎是什么?

MQTT 是一种高效可靠的消息传输协议,特别适用于低带宽、高延迟网络(在物联网领域十分常见)。在 MQTT 的发布/订阅模型中,MQTT Broker 扮演着关键角色,负责接收发布者发送的消息并将其可靠高效地分发给订阅者,确保消息的顺利传递。

MQTT 规则引擎是一种可以针对 MQTT 消息制定和执行规则的组件。该规则引擎具备提取、过滤、加工和转换 MQTT 消息的能力,并且可以在满足特定条件时触发相应的动作。通过规则引擎的应用,可以减少人工干预,提高数据集成和应用开发的效率。

MQTT Broker 中规则引擎的应用场景

规则引擎在 MQTT Broker 中有着广泛的应用,可以实现任务自动化、系统监控、效率提升和安全保障。例如:

  • 在智能家居自动化中,规则引擎可以实现一些任务的自动化,比如根据人员进出房间的情况自动开关灯,或者根据不同时间段调节室温。既可以节省能源,也可以让居住者享受更加便捷的生活体验。
  • 在工业物联网应用中,它可以监控和控制复杂的系统,比如制造流程或电网。通过制定相应的规则,规则引擎可以发现和处理异常,从而避免设备故障,提高系统性能。
  • 在医疗保健行业中,它可以监测患者的健康状态,并及时向医护人员报告可能存在的问题,帮助医护人员更好地关注患者的健康状况,及时采取必要的治疗措施,提高医疗保健的效果和质量。

EMQX 开箱即用的内置规则引擎

EMQX 是一个开源、高度可扩展的 MQTT Broker,内置了规则引擎组件。它让用户可以用低代码的方式快速构建数据处理的业务逻辑,从而降低了软件架构的复杂度。

我们选择在 Broker 内部嵌入规则引擎功能,而不是依赖 Broker 外部的独立的规则引擎,有两个原因:

  • 首先,Broker 内部的规则引擎可以实现更高效和流畅的通信。规则引擎可以直接获取 MQTT 消息,无需额外的通信渠道或协议,这降低了延迟,提升了系统性能。
  • 其次,这可以让整个系统的部署和管理更加简单。将规则引擎和 Broker 作为统一的组件部署在一起,无需分别进行集成和管理,从而简化了部署过程,减少了系统复杂度。

规则引擎快速入门

这里我们将以 EMQX Enterprise 为例,以便使用企业版提供的各种数据集成动作和资源。

下载 EMQX Enterprise

根据您的操作系统类型,在这里下载 EMQX Enterprise。

在下拉列表中选择最新版的 EMQX Enterprise 4.4:

下载 EMQX Enterprise

EMQX Enterprise 的默认许可证支持最多 10 个 MQTT 连接,对于我们体验规则引擎的功能来说已经足够了。如果您需要更多的 MQTT 连接,请点击“下载”按钮下方的链接申请 15 天免费试用。

这里我们以 Ubuntu 20.04 为例,下载 EMQX Enterprise 4.4.17 安装包:

$ wget https://www.emqx.com/en/downloads/enterprise/4.4.17/emqx-ee-4.4.17-otp24.3.4.2-1-ubuntu20.04-amd64.zip
$ unzip emqx-ee-4.4.17-otp24.3.4.2-1-ubuntu20.04-amd64.zip

接下来,我们启动 EMQX Broker:

$ cd emqx
$ ./bin/emqx start

使用 EMQX Cloud 体验我们的云上 MQTT Broker

如果您不想安装 Linux 操作系统、下载和安装 EMQX,或者执行任何 Linux 命令的话,您可以使用 EMQX Cloud 来体验规则引擎功能。您只需在网页上进行一些简单的配置即可。

配置一条简单规则将 MQTT 消息转发到其它主题

需求

我们假设有个用户名为 “Steve” 的设备周期性地向 notify 主题发送 JSON 格式的消息。下面是该设备发送的一条示例消息:

{"city": "Stockholm", "value": 21}

我们希望从原始消息中提取 value 字段值,并用它构造一条带有 temperature 字段的新消息,然后根据原始消息中的 city 字段值,将新消息动态转发到相应的 MQTT 主题,比如,上述示例消息将被转发到主题 city/Stockholm

{"temperature": 21}

为规则编写 SQL

既然我们已经明确了需求,就让我们打开 EMQX Dashboard(http://localhost:18083)来创建一个规则吧(仪表板的默认用户名/密码是 admin/public)。

在 EMQX Dashboard 中创建规则

为了实现上述需求,我们需要编写如下 SQL 语句:

SELECT
   payload.city as city,
   payload.value as val
FROM
   "notify"
WHERE
   username = 'Steve'

我们只关注主题为 notify 的消息,因此在 FROM 子句中指定了它。WHERE 子句用于筛选出 username 字段等于 Steve 的消息。

SELECT 语句将原始消息中 cityvalue 字段的值分别赋给了两个新变量:cityval。这两个变量将在接下来创建动作时使用。

为规则绑定 Republish 动作

接下来,我们需要为规则绑定 Republish 动作,并设置动作的参数:

  • 目标主题:city/${city}
  • 载荷模板:{"temperature": ${val}}

为规则绑定 Republish 动作

在前一节中,SELECT 语句将 payload.citypayload.value 的值分别赋给了两个新变量:cityval。我们的 Republish 动作使用 ${city}` 和 `${val} 占位符来引用这些变量,规则引擎会在执行时将变量的值替换到相应的位置。

测试规则

规则和动作创建完毕后,我们可以向 Broker 发送满足条件的消息来检验规则是否生效。

我们将使用 MQTTX 这款 MQTT 客户端工具来进行测试。您可以从这里下载并安装它。

首先,我们建立一个 MQTT 连接,将用户名设为 “Steve”。

使用 MQTTX 客户端建立一个 MQTT 连接

然后,我们订阅目标主题 city/Stockholm,并向 notify 主题发送一条消息:

发布 MQTT 消息

成功了!我们收到了由规则转发过来的消息:

成功接收 MQTT 消息

结语

MQTT 规则引擎是一种强大的工具,可以根据物联网应用中的特定条件或事件自动触发动作。通过本文的快速入门指南和示例,您可以轻松上手 EMQX 规则引擎,在物联网项目中充分发挥它的优势。

EMQX Enterprise 规则引擎提供了丰富的数据集成动作和资源,可以快速高效地完成大部分与数据处理相关的业务逻辑。我们强烈推荐您自己探索和体验。有关规则引擎 SQL 语法的详细文档,请参考 Rule SQL 文档

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/getting-started-with-rule-engine-in-mqtt-broker

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
6月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
178 19
|
5月前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
60 0
分享一下rocketmq入门小知识
|
5月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
252 2
|
5月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
289 0
|
6月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
EMQ
|
8月前
|
安全 网络性能优化
MQTT 5.0 报文(Packets)入门指南
MQTT 控制报文是 MQTT 数据传输的最小单元。MQTT 客户端和服务端通过交换控制报文来完成它们的工作,比如订阅主题和发布消息。
EMQ
733 16
MQTT 5.0 报文(Packets)入门指南