使用 EMQX 和 eKuiper 进行 MQTT 流处理:快速教程

简介: 了解如何使用 EMQX 和 eKuiper 进行 MQTT 流处理,实现实时数据处理、转换和分析。优化您的 MQTT 数据流处理能力,提高物联网应用的效率和性能。

引言

MQTT 协议是一种专为物联网应用而设计的轻量级消息传输协议。它具有简单、开放、易于实现的特点,是物联网应用的理想选择。MQTT 数据以连续实时的方式进行传输,非常适合由流处理引擎进行处理。

EMQX 是一款大规模分布式物联网 MQTT Broker,能够高效、可靠地连接海量的物联网设备,并实时处理和分发消息和事件流数据。eKuiper 是一个开源的流处理引擎,可以对流数据进行过滤、转换和聚合等操作。

本文将向您展示如何使用 eKuiper 实时流处理引擎来处理来自 EMQX 的 MQTT 数据。

MQTT Stream Processing with EMQX and eKuiper

场景描述

假设我们有个 MQTT 主题 demo/sensor,用于在 EMQX 中接收温度和湿度数据。我们希望使用 eKuiper 订阅该主题,并用流处理技术对数据进行处理和分析。然后,我们可以根据分析结果,触发用户的 HTTP 服务,或者将结果保存到外部存储中。

EMQX

由于 EMQX 支持标准的 MQTT 协议,所以 eKuiper 可以连接到任何版本的 EMQX。在这里,我们使用 EMQX Cloud 提供的免费公共 MQTT Broker 进行测试:

集群 集群地址 监听端口
emqx1 broker.emqx.io 1883

eKuiper

eKuiper 可以部署在边缘或云端。我们可以使用 Docker 进行快速安装。

docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVER=tcp://broker.emqx.io:1883 lfedge/ekuiper:1.10.0

我们可以用这个命令拉取并运行 eKuiper 1.10.0 版本的 docker 镜像。我们将 REST API 端口设置为 9081,在本教程中,我们将使用 REST API 来管理 eKuiper。我们还通过环境变量把默认的 MQTT Broker 地址指向了 EMQX Cloud 集群。

如果您想使用其他方法安装 eKuiper,请查看安装指南

EMQX ECP (EMQX Edge-to-Cloud Platform) 是专为云边协同而打造的高级 MQTT 平台。它提供了专业的 Web UI 让您可以方便地管理 eKuiper。在本教程中,您也可以使用 ECP 来管理 eKuiper。更多细节,请参考 [ECP 文档]

配置 eKuiper 订阅 MQTT 数据流

MQTT 数据是一种无界的、连续的流式数据。在 eKuiper 中,我们使用流的概念来映射这种类型的数据。要处理 MQTT 数据,我们首先要创建一个流来描述数据。

我们用 eKuiper REST API 来创建一个流:

POST http://127.0.0.1:9081/streams
Content-Type: application/json

{
  "sql": "CREATE STREAM demoMqttStream (temperature FLOAT, humidity FLOAT) WITH (TYPE=\"mqtt\", DATASOURCE=\"demo/sensor\", FORMAT=\"json\", SHARED=\"true\")"
}

用 Postman 等 HTTP 客户端发送上面的请求,将创建一个名为 demoMqttStream 的流,它是 MQTT 类型的数据源。datasource 属性的值是 demo/sensor,表示订阅 MQTT 的 demo/sensor 主题。数据格式是 JSON。SHARED 选项表示这个流可以被所有规则共享。

注意

我们运行 eKuiper docker 容器时,MQTT Broker 地址默认是 tcp://broker.emqx.io:1883。如果您用的是别的 MQTT Broker,请在安装时换成您的 Broker 地址。

如果您想改变 MQTT Broker 地址或其他 MQTT 连接参数,如认证相关配置,可以修改 data/mqtt_souce.yaml 文件里的设置。

您可以用 +# 通配符订阅多个主题,在 datasource 属性里使用这些通配符。比如,demo/+ 是订阅所有以 demo/ 开头的主题。demo/# 是订阅所有以 demo/ 开头的主题和 demo/ 下的所有子主题。

流处理 MQTT 数据

在 eKuiper 中,我们用规则来定义流处理的工作流程。规则是 SQL 语句,它规定了数据处理的方式和处理后执行的动作。除了连续的数据处理,像 eKuiper 这样的流处理引擎还支持有状态处理。我们将演示两个流处理和有状态处理的例子。

有状态的报警规则

第一个流处理例子是监测温度和湿度数据,温度上升超过 0.5 或湿度上升超过 1 就触发报警。这要求处理引擎能够记住前一条数据的状态,并和当前数据比较。

假设我们有个 URL 为 http://yourhost/alert 的 HTTP webhook,用来接收报警数据。我们首先用下面的 HTTP 请求创建一个规则。

###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "rule1",
  "sql": "SELECT temperature, humidity FROM demoMqttStream WHERE temperature - LAG(temperature) > 0.5 OR humidity - LAG(humidity) > 1",
  "actions": [{
    "rest": {
      "url": "http://yourhost/alert",
      "method": "post",
      "sendSingle": true
    }
  }]
}

上述请求创建了一个名为 rule1 的规则,该规则对应的 SQL 语句如下:

SELECT temperature, humidity 
FROM demoMqttStream 
WHERE 
  temperature - LAG(temperature) > 0.5 
  OR humidity - LAG(humidity) > 1

这个 SQL 从 demoMqttStream 里选出变化达到我们条件的温度和湿度数据。LAG 函数用来获取前一条数据。

actions 属性规定了规则触发后的动作。这里,我们用 rest 动作把数据发送到 http://yourhost/alert 。发送的是 SQL 筛选出的数据,以 JSON 格式发送。所以,发送的数据是这样的:

{
  "temperature": 25.5,
  "humidity": 60.5
}

测试规则

我们可以用 MQTTX 或者其他 MQTT 客户端来发布 MQTT 数据到 demo/sensor 主题。规则会处理这些数据。比如,我们发送以下数据到主题:

{"temperature": 25.5, "humidity": 60.5}
{"temperature": 26.1, "humidity": 62}
{"temperature": 25.9, "humidity": 62.1}
{"temperature": 26.5, "humidity": 62.3}

我们将在 HTTP 警报服务中收到下列数据:

{"temperature": 26.1, "humidity": 62}
{"temperature": 26.5, "humidity": 62.3}

这是因为只有第二和第四条消息,温度上升超 0.5 或湿度上升超 1。

时间窗口聚合规则

第二个例子是计算每分钟的平均温度和湿度,并把它发送回 EMQX。这涉及到一个经典的流处理概念,叫做时间窗口。我们可以用以下 HTTP 请求来创建一个规则。

###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "rule2",
  "sql": "SELECT 
  trunc(avg(temperature), 2) as avg_temperature, trunc(avg(humidity), 2) as avg_humidity, window_end() as ts FROM demoMqttStream GROUP BY TumblingWindow(mi, 1)",
  "actions": [{
    "mqtt": {
      "server": "tcp://broker.emqx.io:1883",
      "topic": "result/aggregation",
      "sendSingle": true
    }
  }]
}

上述请求创建了一个名为 rule2 的规则,该规则对应的 SQL 语句如下:

SELECT 
  trunc(avg(temperature), 2) as avg_temperature, 
  trunc(avg(humidity), 2) as avg_humidity,
  window_end() as ts
FROM demoMqttStream
GROUP BY TumblingWindow(mi, 1)

这个 SQL 会选出每分钟的温度和湿度平均值。时间窗口在 GROUP BY 子句中用 TumblingWindow 定义。这种窗口类型把 MQTT 数据分成固定长度的窗口。在 SELECT 子句中,我们用聚合函数 avg 来计算时间窗口内温度和湿度的平均值。window_end() 函数用来获取时间窗口的结束时间,这样我们就能知道这些平均值对应的时间段。trunc 函数用来把平均值四舍五入到两位小数。

actions 属性规定了规则触发后的动作。这里,我们用 mqtt 动作发送数据到 EMQX 的 result/aggregation 主题。发送的是 SQL 筛选出的数据,以 JSON 格式发送。所以,发送到主题的数据是这样的:

{
  "avg_temperature": 25.5,
  "avg_humidity": 60.5,
  "ts": 1621419600000
}

测试规则

同样,我们可以用 MQTTX 或者其他 MQTT 客户端来发布 MQTT 数据到 demo/sensor 主题。规则会处理这些数据。比如,我们每 30 秒发送一条数据到主题,两分钟的数据如下所示:

{"temperature": 25.5, "humidity": 60.5}
{"temperature": 26.1, "humidity": 62}
{"temperature": 25.9, "humidity": 62.1}
{"temperature": 26.5, "humidity": 62.3}

我们将在 HTTP 警报服务中收到下列数据:

{"avg_temperature": 25.8, "avg_humidity": 61.25, "ts": 1621419600000}
{"avg_temperature": 26.2, "avg_humidity": 62.2, "ts": 1621419660000}

我们发送了两分钟的数据,所以得到了两个每分钟的平均值。

结语

在本教程中,我们学习了如何使用 eKuiper 处理 MQTT 数据。通过本教程,您能够:

  • 通过订阅 EMQX MQTT Broker 主题接收 MQTT 数据
  • 制定规则来处理 MQTT 数据
  • 将处理后的数据反馈给 EMQX Broker

我们用两个示例展示了 eKuiper 对 MQTT 数据的流处理能力。eKuiper 强大的流处理能力可以应用于多种流式数据源。欢迎您探索 eKuiper 的各种功能,构建实时高效的 MQTT 数据处理通道。

版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接: https://www.emqx.com/zh/blog/mqtt-stream-processing-with-emqx-and-ekuiper
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
5月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
2月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
141 2
|
2月前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
99 0
|
5月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
4月前
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化
|
5月前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
4月前
|
消息中间件 Java Spring
最新spingboot整合rabbitmq详细教程
最新spingboot整合rabbitmq详细教程
|
5月前
|
消息中间件 中间件 Java
RocketMQ实战教程之几种MQ优缺点以及选型
该文介绍了几种主流消息中间件,包括ActiveMQ、RabbitMQ、RocketMQ和Kafka。ActiveMQ和RabbitMQ是较老牌的选择,前者在中小企业中常见,后者因强大的并发能力和活跃社区而流行。RocketMQ是阿里巴巴的开源产品,适用于大规模分布式系统,尤其在数据可靠性方面进行了优化。Kafka最初设计用于大数据日志处理,强调高吞吐量。在选择MQ时,考虑因素包括性能、功能、开发语言、社区支持、学习难度、稳定性和集群功能。小型公司推荐使用RabbitMQ,而大型公司则可在RocketMQ和Kafka之间根据具体需求抉择。
|
5月前
|
消息中间件 Cloud Native 自动驾驶
RocketMQ实战教程之MQ简介
Apache RocketMQ 是一个云原生的消息流平台,支持消息、事件和流处理,适用于云边端一体化场景。官网提供详细文档和下载资源:[RocketMQ官网](https://rocketmq.apache.org/zh/)。示例中提到了RocketMQ在物联网(如小米台灯)和自动驾驶等领域的应用。要开始使用,可从[下载页面](https://rocketmq.apache.org/zh/download)获取软件。
下一篇
无影云桌面