MQX + PolarDB-X 构建一站式物联网数据解决方案

简介: 本文详细介绍了如何使用开源分布式物联网MQTT消息服务器EMQX与云原生分布式数据库PolarDB-X打造集成方案,以实现关键物联网数据的一站式采集、传输、存储。

前言

虽然物联网的应用场景很多,但总结来看各类场景都离不开对数据的采集-传输-存储-分析。按照数据特性和业务需求不同,物联网数据可以分为不同的种类:

  • 元数据:设备最新的状态数据,如在线状态、当前传感器数值;
  • 消息数据:设备发布的消息,包括上报数据和下发指令;
  • 时序数据:持续变化的元数据和消息数据。

在物联网应用中,数据存储需求无处不在,数据只有经过传输、存储,再经由业务系统查询并处理才能实现各类业务需求,进一步发挥数据的价值。

物联网设备规模庞大,往往整体数据量也非常大,实践中通常有以下选型:

  • 元数据需要频繁插入更新,并且支持结构化查询,所以推荐使用关系数据库进行存储。
  • 消息数据不需要全量存储,只需要记录关键操作,或者提取消息数据的关键数据,应当根据业务情况适当的选型。
  • 时序数据的特点是数据修改频次低,对写入速度和存储压缩比敏感,查询需求多样,因此推荐使用时序数据库。

数据库存储选型不是绝对的,用户可以综合数据特性、数据量以及后端业务需求来选择适合的数据库,在各种因素中达到平衡。

EMQX 是由 EMQ 开发的一款大规模分布式物联网 MQTT 消息服务器。作为专门针对低带宽和不稳定网络环境的物联网应用设计的协议,MQTT 基于发布/订阅模式,具有简单易实现、支持 QoS、报文小等特点。完整支持 MQTT 协议的 EMQX 则可以连接海量物联网设备,提供高可靠、高性能的实时数据移动、处理和集成,充当设备与设备、设备与应用之间的桥梁。

PolarDB-X 是由阿里巴巴自主研发的云原生分布式数据库,是一款基于云架构理念,并同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP)的融合型分布式数据库。其具备金融级数据高可用、分布式水平扩展、混合负载、低成本存储和极致弹性等能力,专注解决海量数据存储、超高并发吞吐、大表瓶颈以及复杂计算效率等数据库瓶颈难题。

本文将介绍开源版 EMQX 与 PolarDB-X 打造的集成方案,可以实现关键物联网数据的一站式采集、传输、存储。

EMQX+PolarDB-X 集成方案详情

使用 EMQX 接入物联网设备,通过 EMQX 数据集成组件来处理并分发数据到 MQTT 汇聚主题,第三方脚本通过 MQTT 订阅的方式从汇聚主题中获取数据,并代理写入到 PolarDB-X,以下是整体架构图:

数据集成简介

数据集成是 EMQX 在发布订阅模型的基础之上的数据处理与分发组件,通过简单的可视化的配置,即可实时处理 EMQX 的消息以及设备事件并将其与 Kafka/RabbitMQ 等消息中间件、以及各类 SQL/NoSQL/时序数据库等数据系统集成。

其中消息是指设备端的上报或者云端的下发消息,设备生命周期事件是指整个设备在运行过程中的事件,这些事件对物联网的应用开发有很大作用,围绕此可以实现设备管理、安全审计、设备影子等业务,以及更精细化的控制 MQTT 消息传输过程。

共享订阅简介

MQTT 共享订阅是在多个订阅者之间实现负载均衡的订阅方式,相当于订阅端的负载均衡功能。

例如在 EMQX 集群中,如果某个节点挂了,利用共享订阅功能同时订阅多个节点则可以避免某个节点故障导致数据丢失。

用户可以通过使用 $share/{group}/{topic} 或 $queue/{topic} 格式的主题,发起共享订阅。

第三方脚本

由于 EMQX 开源版中不具备直接写入 PolarDB-X 的能力,因此需要提供第三方的脚本来实现数据写入。

EMQX - 脚本进程之间可以使用 MQTT 共享订阅连接,避免消息量过大时单个 MQTT 连接的处理能力成为方案瓶颈,同时充分使用连接池与批处理技术,提高写入 PolarDB-X 的吞吐。

利用 EMQX 海量设备数据接入能力和数据集成能力,以及 PolarDB-X 的超高并发写入和海量存储支持能力,这一集成方案可以应对连接数规模与采集点数量庞大的物联网场景。

EMQX + PolarDB-X 方案搭建步骤

接下来我们通过几个场景看一下如何进行方案搭建。

场景介绍

存储设备在线状态

在上下线时更新设备状态,并记录上线/下线时间,如果设备是初次连接则先将设备信息插入数据库。

需要使用 EMQX 的客户端上下线事件,对应的事件主题为 $event/client_connected 与 $event/client_disconnected。

PolarDB-X 表结构如下:

-- 设备在线状态表
create table clients (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  `ip_address` varchar(255) DEFAULT NULL,
  `status` tinyint(1) DEFAULT 0,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`),
  unique key(`clientid`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

记录设备事件

记录设备上下线历史、订阅/取消订阅事件,需要记录设备客户端 ID、事件发生时间等关键新轩逸,订阅/取消订阅事件还需要记录操作的主题。

EMQX 中对应的事件主题为$event/client_connected 、$event/client_disconnected 、$event/session_subscribed 、$event/session_unsubscribed。

PolarDB-X 表结构如下:

-- 设备历史事件表
create table client_events (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `event` char(20) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

存储设备消息

将指定主题的消息存储到数据库当中,需要记录设备客户端 ID、消息主题、消息 Payload 信息。

PolarDB-X 表结构如下:

-- IoT 数据记录表
create table messages (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `payload` BLOB DEFAULT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

操作步骤

具体的操作步骤如下:

1.首先需要部署 PolarDB-X,按照文档中的快速入门指南部署即可。

2.通过 MySQL Client 连接到 PolarDB-X 并使用场景中提供的 SQL 创建数据表。

3.在本地启动 EMQX,启动成功之后,访问 http://localhost:18083 打开 Dashboard。如果需要中文显示可以到 System → Settings 中将语言更改为简体中文。

4.在数据集成中创建规则,创建一条能够处理场景中所需事件以及 t/# 消息主题的规则。

EMQX 的客户端事件通过事件主题触发,客户端会在连接成功时触发执行规则 SQL,规则可以从上下文中获取当前客户端中相关信息,包括 客户端 ID、事件名称、连接时间、连接属性等,通过调试功能可以查看规则执行结果。

规则 SQL:

SELECT
  *
FROM
  "$events/client_connected",
  "$events/client_disconnected",
  "$events/session_subscribed",
  "$events/session_unsubscribed",
  "t/#"

5.添加动作,选择消息重发布动作,将规则处理结果转发至 emqx_polardb 主题由第三方脚本处理。除此之外,可以添加一个控制台输出动作用于调试,在企业版中还可以可以配置将结果直接写入 PolarDB-X。

6.接下来,使用 Dashboard 上的 WebSocket 客户端进行测试,订阅 emqx_polardb 主题确认是否可以从中获取到数据。

7.在第三方脚本中实现 PolarDB-X 数据插入逻辑,关键代码如下:

// 消息处理函数
async function handleMessage(topic, payload) {
  let data = null
  try {
    data = JSON.parse(payload)
  } catch (e) {
    console.log('message not a JSON')
  }
  if (data === null) {
    return
  }
  // 根据事件类型处理
  const event = data.event
  // 上下线记录
  if (event === 'client.connected' || event === 'client.disconnected') {
    const status = event === 'client.connected' ? 1 : 0

    // 更新设备在线状态
    // 借助 ON DUPLICATE KEY UPDATE 特性实现
    await connection.execute(
      `INSERT INTO clients (clientid, username, ip_address, status) VALUES
    ('${data.clientid}', '${data.username}', '${data.sockname}', ${status}) ON DUPLICATE KEY UPDATE status = ${status}, ip_address = '${data.sockname}'`)

    // 保存设备上下线记录
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, '')`, [data.clientid, data.event])

  } else if (event === 'message.publish') {

    // 保存设备消息
    await connection.execute(`INSERT INTO messages(clientid, topic, payload) VALUES
       (?, ?, ?);`, [data.clientid, data.topic, data.payload])
  } else {

    // 保存设备订阅/取消订阅记录,记录操作的主题
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, ?)`, [data.clientid, data.event, data.topic])
  }
  console.log(`event ${event} saved ok`)
}

8.启动第三方脚本,将整个方案运行起来。

$ node worker.js
client 0 ready...
event session.subscribed saved ok
client 1 ready...
event client.connected saved ok
event session.subscribed saved ok
client 2 ready...
event client.connected saved ok
event session.subscribed saved ok
client 3 ready...
event session.subscribed saved ok
event client.connected saved ok
client 4 ready...
event client.connected saved ok
event session.subscribed saved ok
client 5 ready...
event client.connected saved ok
client 6 ready...
event client.connected saved ok
event session.subscribed saved ok
event session.subscribed saved ok
client 7 ready...
event client.connected saved ok
event session.subscribed saved ok
event client.connected saved ok
client 8 ready...
event session.subscribed saved ok
event client.connected saved ok
client 9 ready...
event session.subscribed saved ok

附录:第三方脚本完整示例

此处使用 Node.js 来实现消息的订阅与 PoarDB-X 插入,第三方脚本可以是任意语言编写的应用,建议与 EMQX、PolarDB-X 部署在同一个内网网络。

// worker.js
const mqtt = require('mqtt')
const mysql = require('mysql2')

const config = {
  host: 'localhost',
  port: 10743,
  username: 'polardbx_root',
  password: '****',
  database: 'emqx_iot'
}

// 建立与 PolarDB-X 的连接
const connection = mysql.createConnection({
  host: config.host,
  user: config.username,
  password: config.password,
  database: config.database,
  port: config.port,
})

function createClient() {
  return new Promise((resolve, reject) => {
    const client = mqtt.connect('mqtt://localhost:1883')
    client.on('connect', () => {
      resolve(client)
    })
  })
}

async function handleMessage(topic, payload) {
  let data = null
  try {
    data = JSON.parse(payload)
  } catch (e) {
    console.log('message not a JSON')
  }
  if (data === null) {
    return
  }
  const event = data.event
  // 上下线记录
  if (event === 'client.connected' || event === 'client.disconnected') {
    const status = event === 'client.connected' ? 1 : 0
    // 写入 or 更新设备表
    await connection.execute(
      `INSERT INTO clients (clientid, username, ip_address, status) VALUES
    ('${data.clientid}', '${data.username}', '${data.sockname}', ${status}) ON DUPLICATE KEY UPDATE status = ${status}, ip_address = '${data.sockname}'`)
    // 插入历史表
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, '')`, [data.clientid, data.event])
  } else if (event === 'message.publish') {
    // 插入消息表
    await connection.execute(`INSERT INTO messages(clientid, topic, payload) VALUES
       (?, ?, ?);`, [data.clientid, data.topic, data.payload])
  } else {
    // 订阅/取消订阅记录
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, ?)`, [data.clientid, data.event, data.topic])
  }
  console.log(`event ${event} saved ok`)
}

async function main() {
  // 初始化 10 个订阅客户端
  for (let i = 0; i < 10; i++) {
    const client = await createClient()
    client.subscribe('$share/1/emqx_polardb')
    client.on('message', handleMessage)
    console.log(`client ${i} ready...`)
  }
}

main()

结语

PolarDB-X 的分布式特性以及存储计算分离架构为其带来了水平扩展、分布式事务、混合负载等能力,与同样是分布式的 MQTT 消息服务器 EMQX 结合使用,可以打造真正的可伸缩物联网应用,应对从数千到数千万的设备接入。

除了本文分享的开源版方案, EMQX 企业版以及全托管的云服务版本EMQX cloud中还提供了 MySQL 数据集成能力,可以直接通过 PolarDB-X 兼容语法完成数据集成,更加简单高效地实现物联网数据一站式连接、移动与存储分析。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/build-a-one-stop-iot-data-solution-with-emqx-and-polardb-x

目录
相关文章
|
3月前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
106 1
|
4天前
|
人工智能 监控 物联网
深度探索人工智能与物联网的融合:构建未来智能生态系统###
在当今这个数据驱动的时代,人工智能(AI)与物联网(IoT)的深度融合正引领着一场前所未有的技术革命。本文旨在深入剖析这一融合背后的技术原理、探讨其在不同领域的应用实例及面临的挑战与机遇,为读者描绘一幅关于未来智能生态系统的宏伟蓝图。通过技术创新的视角,我们不仅揭示了AI与IoT结合的强大潜力,也展望了它们如何共同塑造一个更加高效、可持续且互联的世界。 ###
|
6天前
|
传感器 安全 算法
物联网发布者在数据传输过程中如何防止数据被篡改
在物联网数据传输中,为防止数据被篡改,可采用加密技术、数字签名、数据完整性校验等方法,确保数据的完整性和安全性。
|
6天前
|
存储 安全 算法
物联网发布者在发送数据时如何保证数据的安全性和完整性
数据加密、密钥管理和数据完整性验证是物联网安全的重要组成部分。对称加密(如AES)和非对称加密(如RSA)分别适用于大量数据和高安全需求的场景。密钥需安全存储并定期更新。数据完整性通过MAC(如HMAC-SHA256)和数字签名(如RSA签名)验证。通信协议如MQTT over TLS/SSL和CoAP over DTLS增强传输安全,确保数据在传输过程中的机密性和完整性。
|
13天前
|
传感器 监控 安全
创新无限:物联网技术在智慧城市构建中的前沿探索
【10月更文挑战第29天】在这个信息爆炸的时代,物联网(IoT)技术正重塑我们对城市的认知。智慧城市已从科幻走向现实,物联网通过连接各种设备和传感器,收集、分析数据,提升城市运行效率和居民生活质量。从智慧城管、智能交通、智慧水务到智能电网,物联网的应用正逐步实现城市的智能化、互联化和可持续发展。
33 1
|
1月前
|
存储 安全 物联网
|
2月前
|
传感器 搜索推荐 物联网
5G与物联网:构建万物互联的未来世界
【9月更文挑战第11天】5G与物联网的融合正引领我们进入一个万物互联的未来世界。在这个世界中,各种设备将通过网络紧密相连,实现数据的实时传输和处理。这不仅将极大地方便人们的生活和工作,还将推动社会向智能化、数字化迈进。我们有理由相信,在不久的将来,一个更加智能、便捷、高效的世界将呈现在我们面前。
|
2月前
|
存储 物联网 关系型数据库
PolarDB在物联网(IoT)数据存储中的应用探索
【9月更文挑战第6天】随着物联网技术的发展,海量设备数据对实时存储和处理提出了更高要求。传统数据库在扩展性、性能及实时性方面面临挑战。阿里云推出的PolarDB具备高性能、高可靠及高扩展性特点,能有效应对这些挑战。它采用分布式存储架构,支持多副本写入优化、并行查询等技术,确保数据实时写入与查询;多副本存储架构和数据持久化存储机制保证了数据安全;支持动态调整数据库规模,适应设备和数据增长。通过API或SDK接入IoT设备,实现数据实时写入、分布式存储与高效查询,展现出在IoT数据存储领域的巨大潜力。
66 1
|
2月前
|
传感器 自动驾驶 物联网
5G+物联网:构建智慧城市的新基石
【9月更文挑战第4天】5G+物联网作为构建智慧城市的新基石,正引领着城市管理的智能化变革。随着技术的不断成熟和应用场景的不断拓展,智慧城市将变得更加智能、高效和可持续。然而,我们也应看到,智慧城市的建设还面临着数据安全、隐私保护等挑战。未来,我们需要加强技术创新和法规建设,确保智慧城市在快速发展的同时,也能够保障人民的安全和权益。
|
3月前
|
存储 SQL Cloud Native
揭秘!PolarDB-X存储引擎如何玩转“时间魔术”?Lizard多级闪回技术让你秒回数据“黄金时代”!
【8月更文挑战第25天】PolarDB-X是一款由阿里巴巴自主研发的云原生分布式数据库,以其高性能、高可用性和出色的可扩展性著称。其核心竞争力之一是Lizard存储引擎的多级闪回技术,能够提供高效的数据恢复与问题诊断能力。本文通过一个电商公司的案例展示了一级与二级闪回技术如何帮助快速恢复误删的大量订单数据,确保业务连续性不受影响。一级闪回通过维护最近时间段内历史数据版本链,支持任意时间点查询;而二级闪回则通过扩展数据保留时间并采用成本更低的存储方式,进一步增强了数据保护能力。多级闪回技术的应用显著提高了数据库的可靠性和灵活性,为企业数据安全保驾护航。
44 1

相关产品

  • 物联网平台