MQX + PolarDB-X 构建一站式物联网数据解决方案

简介: 本文详细介绍了如何使用开源分布式物联网MQTT消息服务器EMQX与云原生分布式数据库PolarDB-X打造集成方案,以实现关键物联网数据的一站式采集、传输、存储。

前言

虽然物联网的应用场景很多,但总结来看各类场景都离不开对数据的采集-传输-存储-分析。按照数据特性和业务需求不同,物联网数据可以分为不同的种类:

  • 元数据:设备最新的状态数据,如在线状态、当前传感器数值;
  • 消息数据:设备发布的消息,包括上报数据和下发指令;
  • 时序数据:持续变化的元数据和消息数据。

在物联网应用中,数据存储需求无处不在,数据只有经过传输、存储,再经由业务系统查询并处理才能实现各类业务需求,进一步发挥数据的价值。

物联网设备规模庞大,往往整体数据量也非常大,实践中通常有以下选型:

  • 元数据需要频繁插入更新,并且支持结构化查询,所以推荐使用关系数据库进行存储。
  • 消息数据不需要全量存储,只需要记录关键操作,或者提取消息数据的关键数据,应当根据业务情况适当的选型。
  • 时序数据的特点是数据修改频次低,对写入速度和存储压缩比敏感,查询需求多样,因此推荐使用时序数据库。

数据库存储选型不是绝对的,用户可以综合数据特性、数据量以及后端业务需求来选择适合的数据库,在各种因素中达到平衡。

EMQX 是由 EMQ 开发的一款大规模分布式物联网 MQTT 消息服务器。作为专门针对低带宽和不稳定网络环境的物联网应用设计的协议,MQTT 基于发布/订阅模式,具有简单易实现、支持 QoS、报文小等特点。完整支持 MQTT 协议的 EMQX 则可以连接海量物联网设备,提供高可靠、高性能的实时数据移动、处理和集成,充当设备与设备、设备与应用之间的桥梁。

PolarDB-X 是由阿里巴巴自主研发的云原生分布式数据库,是一款基于云架构理念,并同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP)的融合型分布式数据库。其具备金融级数据高可用、分布式水平扩展、混合负载、低成本存储和极致弹性等能力,专注解决海量数据存储、超高并发吞吐、大表瓶颈以及复杂计算效率等数据库瓶颈难题。

本文将介绍开源版 EMQX 与 PolarDB-X 打造的集成方案,可以实现关键物联网数据的一站式采集、传输、存储。

EMQX+PolarDB-X 集成方案详情

使用 EMQX 接入物联网设备,通过 EMQX 数据集成组件来处理并分发数据到 MQTT 汇聚主题,第三方脚本通过 MQTT 订阅的方式从汇聚主题中获取数据,并代理写入到 PolarDB-X,以下是整体架构图:

数据集成简介

数据集成是 EMQX 在发布订阅模型的基础之上的数据处理与分发组件,通过简单的可视化的配置,即可实时处理 EMQX 的消息以及设备事件并将其与 Kafka/RabbitMQ 等消息中间件、以及各类 SQL/NoSQL/时序数据库等数据系统集成。

其中消息是指设备端的上报或者云端的下发消息,设备生命周期事件是指整个设备在运行过程中的事件,这些事件对物联网的应用开发有很大作用,围绕此可以实现设备管理、安全审计、设备影子等业务,以及更精细化的控制 MQTT 消息传输过程。

共享订阅简介

MQTT 共享订阅是在多个订阅者之间实现负载均衡的订阅方式,相当于订阅端的负载均衡功能。

例如在 EMQX 集群中,如果某个节点挂了,利用共享订阅功能同时订阅多个节点则可以避免某个节点故障导致数据丢失。

用户可以通过使用 $share/{group}/{topic} 或 $queue/{topic} 格式的主题,发起共享订阅。

第三方脚本

由于 EMQX 开源版中不具备直接写入 PolarDB-X 的能力,因此需要提供第三方的脚本来实现数据写入。

EMQX - 脚本进程之间可以使用 MQTT 共享订阅连接,避免消息量过大时单个 MQTT 连接的处理能力成为方案瓶颈,同时充分使用连接池与批处理技术,提高写入 PolarDB-X 的吞吐。

利用 EMQX 海量设备数据接入能力和数据集成能力,以及 PolarDB-X 的超高并发写入和海量存储支持能力,这一集成方案可以应对连接数规模与采集点数量庞大的物联网场景。

EMQX + PolarDB-X 方案搭建步骤

接下来我们通过几个场景看一下如何进行方案搭建。

场景介绍

存储设备在线状态

在上下线时更新设备状态,并记录上线/下线时间,如果设备是初次连接则先将设备信息插入数据库。

需要使用 EMQX 的客户端上下线事件,对应的事件主题为 $event/client_connected 与 $event/client_disconnected。

PolarDB-X 表结构如下:

-- 设备在线状态表
create table clients (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  `ip_address` varchar(255) DEFAULT NULL,
  `status` tinyint(1) DEFAULT 0,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`),
  unique key(`clientid`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

记录设备事件

记录设备上下线历史、订阅/取消订阅事件,需要记录设备客户端 ID、事件发生时间等关键新轩逸,订阅/取消订阅事件还需要记录操作的主题。

EMQX 中对应的事件主题为$event/client_connected 、$event/client_disconnected 、$event/session_subscribed 、$event/session_unsubscribed。

PolarDB-X 表结构如下:

-- 设备历史事件表
create table client_events (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `event` char(20) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

存储设备消息

将指定主题的消息存储到数据库当中,需要记录设备客户端 ID、消息主题、消息 Payload 信息。

PolarDB-X 表结构如下:

-- IoT 数据记录表
create table messages (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `payload` BLOB DEFAULT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

操作步骤

具体的操作步骤如下:

1.首先需要部署 PolarDB-X,按照文档中的快速入门指南部署即可。

2.通过 MySQL Client 连接到 PolarDB-X 并使用场景中提供的 SQL 创建数据表。

3.在本地启动 EMQX,启动成功之后,访问 http://localhost:18083 打开 Dashboard。如果需要中文显示可以到 System → Settings 中将语言更改为简体中文。

4.在数据集成中创建规则,创建一条能够处理场景中所需事件以及 t/# 消息主题的规则。

EMQX 的客户端事件通过事件主题触发,客户端会在连接成功时触发执行规则 SQL,规则可以从上下文中获取当前客户端中相关信息,包括 客户端 ID、事件名称、连接时间、连接属性等,通过调试功能可以查看规则执行结果。

规则 SQL:

SELECT
  *
FROM
  "$events/client_connected",
  "$events/client_disconnected",
  "$events/session_subscribed",
  "$events/session_unsubscribed",
  "t/#"

5.添加动作,选择消息重发布动作,将规则处理结果转发至 emqx_polardb 主题由第三方脚本处理。除此之外,可以添加一个控制台输出动作用于调试,在企业版中还可以可以配置将结果直接写入 PolarDB-X。

6.接下来,使用 Dashboard 上的 WebSocket 客户端进行测试,订阅 emqx_polardb 主题确认是否可以从中获取到数据。

7.在第三方脚本中实现 PolarDB-X 数据插入逻辑,关键代码如下:

// 消息处理函数
async function handleMessage(topic, payload) {
  let data = null
  try {
    data = JSON.parse(payload)
  } catch (e) {
    console.log('message not a JSON')
  }
  if (data === null) {
    return
  }
  // 根据事件类型处理
  const event = data.event
  // 上下线记录
  if (event === 'client.connected' || event === 'client.disconnected') {
    const status = event === 'client.connected' ? 1 : 0

    // 更新设备在线状态
    // 借助 ON DUPLICATE KEY UPDATE 特性实现
    await connection.execute(
      `INSERT INTO clients (clientid, username, ip_address, status) VALUES
    ('${data.clientid}', '${data.username}', '${data.sockname}', ${status}) ON DUPLICATE KEY UPDATE status = ${status}, ip_address = '${data.sockname}'`)

    // 保存设备上下线记录
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, '')`, [data.clientid, data.event])

  } else if (event === 'message.publish') {

    // 保存设备消息
    await connection.execute(`INSERT INTO messages(clientid, topic, payload) VALUES
       (?, ?, ?);`, [data.clientid, data.topic, data.payload])
  } else {

    // 保存设备订阅/取消订阅记录,记录操作的主题
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, ?)`, [data.clientid, data.event, data.topic])
  }
  console.log(`event ${event} saved ok`)
}

8.启动第三方脚本,将整个方案运行起来。

$ node worker.js
client 0 ready...
event session.subscribed saved ok
client 1 ready...
event client.connected saved ok
event session.subscribed saved ok
client 2 ready...
event client.connected saved ok
event session.subscribed saved ok
client 3 ready...
event session.subscribed saved ok
event client.connected saved ok
client 4 ready...
event client.connected saved ok
event session.subscribed saved ok
client 5 ready...
event client.connected saved ok
client 6 ready...
event client.connected saved ok
event session.subscribed saved ok
event session.subscribed saved ok
client 7 ready...
event client.connected saved ok
event session.subscribed saved ok
event client.connected saved ok
client 8 ready...
event session.subscribed saved ok
event client.connected saved ok
client 9 ready...
event session.subscribed saved ok

附录:第三方脚本完整示例

此处使用 Node.js 来实现消息的订阅与 PoarDB-X 插入,第三方脚本可以是任意语言编写的应用,建议与 EMQX、PolarDB-X 部署在同一个内网网络。

// worker.js
const mqtt = require('mqtt')
const mysql = require('mysql2')

const config = {
  host: 'localhost',
  port: 10743,
  username: 'polardbx_root',
  password: '****',
  database: 'emqx_iot'
}

// 建立与 PolarDB-X 的连接
const connection = mysql.createConnection({
  host: config.host,
  user: config.username,
  password: config.password,
  database: config.database,
  port: config.port,
})

function createClient() {
  return new Promise((resolve, reject) => {
    const client = mqtt.connect('mqtt://localhost:1883')
    client.on('connect', () => {
      resolve(client)
    })
  })
}

async function handleMessage(topic, payload) {
  let data = null
  try {
    data = JSON.parse(payload)
  } catch (e) {
    console.log('message not a JSON')
  }
  if (data === null) {
    return
  }
  const event = data.event
  // 上下线记录
  if (event === 'client.connected' || event === 'client.disconnected') {
    const status = event === 'client.connected' ? 1 : 0
    // 写入 or 更新设备表
    await connection.execute(
      `INSERT INTO clients (clientid, username, ip_address, status) VALUES
    ('${data.clientid}', '${data.username}', '${data.sockname}', ${status}) ON DUPLICATE KEY UPDATE status = ${status}, ip_address = '${data.sockname}'`)
    // 插入历史表
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, '')`, [data.clientid, data.event])
  } else if (event === 'message.publish') {
    // 插入消息表
    await connection.execute(`INSERT INTO messages(clientid, topic, payload) VALUES
       (?, ?, ?);`, [data.clientid, data.topic, data.payload])
  } else {
    // 订阅/取消订阅记录
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, ?)`, [data.clientid, data.event, data.topic])
  }
  console.log(`event ${event} saved ok`)
}

async function main() {
  // 初始化 10 个订阅客户端
  for (let i = 0; i < 10; i++) {
    const client = await createClient()
    client.subscribe('$share/1/emqx_polardb')
    client.on('message', handleMessage)
    console.log(`client ${i} ready...`)
  }
}

main()

结语

PolarDB-X 的分布式特性以及存储计算分离架构为其带来了水平扩展、分布式事务、混合负载等能力,与同样是分布式的 MQTT 消息服务器 EMQX 结合使用,可以打造真正的可伸缩物联网应用,应对从数千到数千万的设备接入。

除了本文分享的开源版方案, EMQX 企业版以及全托管的云服务版本EMQX cloud中还提供了 MySQL 数据集成能力,可以直接通过 PolarDB-X 兼容语法完成数据集成,更加简单高效地实现物联网数据一站式连接、移动与存储分析。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/build-a-one-stop-iot-data-solution-with-emqx-and-polardb-x

目录
相关文章
|
1月前
|
人工智能 监控 物联网
深度探索人工智能与物联网的融合:构建未来智能生态系统###
在当今这个数据驱动的时代,人工智能(AI)与物联网(IoT)的深度融合正引领着一场前所未有的技术革命。本文旨在深入剖析这一融合背后的技术原理、探讨其在不同领域的应用实例及面临的挑战与机遇,为读者描绘一幅关于未来智能生态系统的宏伟蓝图。通过技术创新的视角,我们不仅揭示了AI与IoT结合的强大潜力,也展望了它们如何共同塑造一个更加高效、可持续且互联的世界。 ###
|
1月前
|
传感器 安全 算法
物联网发布者在数据传输过程中如何防止数据被篡改
在物联网数据传输中,为防止数据被篡改,可采用加密技术、数字签名、数据完整性校验等方法,确保数据的完整性和安全性。
|
1月前
|
存储 安全 算法
物联网发布者在发送数据时如何保证数据的安全性和完整性
数据加密、密钥管理和数据完整性验证是物联网安全的重要组成部分。对称加密(如AES)和非对称加密(如RSA)分别适用于大量数据和高安全需求的场景。密钥需安全存储并定期更新。数据完整性通过MAC(如HMAC-SHA256)和数字签名(如RSA签名)验证。通信协议如MQTT over TLS/SSL和CoAP over DTLS增强传输安全,确保数据在传输过程中的机密性和完整性。
|
1月前
|
传感器 监控 安全
创新无限:物联网技术在智慧城市构建中的前沿探索
【10月更文挑战第29天】在这个信息爆炸的时代,物联网(IoT)技术正重塑我们对城市的认知。智慧城市已从科幻走向现实,物联网通过连接各种设备和传感器,收集、分析数据,提升城市运行效率和居民生活质量。从智慧城管、智能交通、智慧水务到智能电网,物联网的应用正逐步实现城市的智能化、互联化和可持续发展。
83 1
|
2月前
|
传感器 机器学习/深度学习 存储
物联网设备精细化管理系统解决方案
随着科技的进步,物联网技术作为新一代信息技术的核心部分,正在深刻改变各行业的生产和管理方式。其在资产管理、智慧城市、能源管理和智慧医疗等多个领域的广泛应用,不仅提高了运营效率,还促进了资源优化配置和精细化管理。本文详细介绍了物联网的基础概念及其在设备精细化管理系统中的具体应用方案,展示了如何通过智能感知层建设、数据处理分析平台以及精细化管理应用,实现设备的实时监控、预测性维护和能耗管理等功能,从而帮助企业提升竞争力,降低成本,并推动社会向更智能化、绿色化的方向发展。
99 2
物联网设备精细化管理系统解决方案
|
2月前
|
存储 监控 物联网
医疗物联网设备精细化管理系统解决方案
华汇数据智慧医院物联网管理系统解决方案是一种集物联网、云计算、大数据和人工智能等先进技术于一体的综合性解决方案,旨在提升医院的运营效率、医疗质量和患者满意度。
84 3
|
2月前
|
存储 边缘计算 物联网
阿里云物联网平台:推动万物互联的智能化解决方案
随着物联网技术的快速发展,阿里云物联网平台为企业提供了一体化的解决方案,包括设备接入、数据管理和智能应用等核心功能。平台支持海量设备接入、实时数据采集与存储、边缘计算,并具备大规模设备管理、高安全性和开放生态等优势。广泛应用于智能制造、智慧城市和智能家居等领域,助力企业实现数字化转型。
257 5
|
3月前
|
存储 安全 物联网
.NET 跨平台工业物联网网关解决方案
【9月更文挑战第28天】本文介绍了利用 .NET 构建跨平台工业物联网网关的解决方案。通过 .NET Core 和多种通信协议(如 MQTT 和 Modbus),实现工业设备的高效接入和数据采集。系统架构包括设备接入层、数据处理层、通信层、应用层和数据库层,确保数据的准确采集、实时处理和安全传输。此外,还详细阐述了设备身份认证、数据加密及安全审计等机制,确保系统的安全性。该方案适用于不同操作系统和工业环境,具备高度灵活性和扩展性。
|
2月前
|
存储 安全 物联网
|
1月前
|
存储 安全 物联网
政府在推动物联网技术标准和规范的统一方面可以发挥哪些作用?
政府在推动物联网技术标准和规范的统一方面可以发挥哪些作用?
102 50

热门文章

最新文章

相关产品

  • 物联网平台