MQX + PolarDB-X 构建一站式物联网数据解决方案

简介: 本文详细介绍了如何使用开源分布式物联网MQTT消息服务器EMQX与云原生分布式数据库PolarDB-X打造集成方案,以实现关键物联网数据的一站式采集、传输、存储。

前言

虽然物联网的应用场景很多,但总结来看各类场景都离不开对数据的采集-传输-存储-分析。按照数据特性和业务需求不同,物联网数据可以分为不同的种类:

  • 元数据:设备最新的状态数据,如在线状态、当前传感器数值;
  • 消息数据:设备发布的消息,包括上报数据和下发指令;
  • 时序数据:持续变化的元数据和消息数据。

在物联网应用中,数据存储需求无处不在,数据只有经过传输、存储,再经由业务系统查询并处理才能实现各类业务需求,进一步发挥数据的价值。

物联网设备规模庞大,往往整体数据量也非常大,实践中通常有以下选型:

  • 元数据需要频繁插入更新,并且支持结构化查询,所以推荐使用关系数据库进行存储。
  • 消息数据不需要全量存储,只需要记录关键操作,或者提取消息数据的关键数据,应当根据业务情况适当的选型。
  • 时序数据的特点是数据修改频次低,对写入速度和存储压缩比敏感,查询需求多样,因此推荐使用时序数据库。

数据库存储选型不是绝对的,用户可以综合数据特性、数据量以及后端业务需求来选择适合的数据库,在各种因素中达到平衡。

EMQX 是由 EMQ 开发的一款大规模分布式物联网 MQTT 消息服务器。作为专门针对低带宽和不稳定网络环境的物联网应用设计的协议,MQTT 基于发布/订阅模式,具有简单易实现、支持 QoS、报文小等特点。完整支持 MQTT 协议的 EMQX 则可以连接海量物联网设备,提供高可靠、高性能的实时数据移动、处理和集成,充当设备与设备、设备与应用之间的桥梁。

PolarDB-X 是由阿里巴巴自主研发的云原生分布式数据库,是一款基于云架构理念,并同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP)的融合型分布式数据库。其具备金融级数据高可用、分布式水平扩展、混合负载、低成本存储和极致弹性等能力,专注解决海量数据存储、超高并发吞吐、大表瓶颈以及复杂计算效率等数据库瓶颈难题。

本文将介绍开源版 EMQX 与 PolarDB-X 打造的集成方案,可以实现关键物联网数据的一站式采集、传输、存储。

EMQX+PolarDB-X 集成方案详情

使用 EMQX 接入物联网设备,通过 EMQX 数据集成组件来处理并分发数据到 MQTT 汇聚主题,第三方脚本通过 MQTT 订阅的方式从汇聚主题中获取数据,并代理写入到 PolarDB-X,以下是整体架构图:

数据集成简介

数据集成是 EMQX 在发布订阅模型的基础之上的数据处理与分发组件,通过简单的可视化的配置,即可实时处理 EMQX 的消息以及设备事件并将其与 Kafka/RabbitMQ 等消息中间件、以及各类 SQL/NoSQL/时序数据库等数据系统集成。

其中消息是指设备端的上报或者云端的下发消息,设备生命周期事件是指整个设备在运行过程中的事件,这些事件对物联网的应用开发有很大作用,围绕此可以实现设备管理、安全审计、设备影子等业务,以及更精细化的控制 MQTT 消息传输过程。

共享订阅简介

MQTT 共享订阅是在多个订阅者之间实现负载均衡的订阅方式,相当于订阅端的负载均衡功能。

例如在 EMQX 集群中,如果某个节点挂了,利用共享订阅功能同时订阅多个节点则可以避免某个节点故障导致数据丢失。

用户可以通过使用 $share/{group}/{topic} 或 $queue/{topic} 格式的主题,发起共享订阅。

第三方脚本

由于 EMQX 开源版中不具备直接写入 PolarDB-X 的能力,因此需要提供第三方的脚本来实现数据写入。

EMQX - 脚本进程之间可以使用 MQTT 共享订阅连接,避免消息量过大时单个 MQTT 连接的处理能力成为方案瓶颈,同时充分使用连接池与批处理技术,提高写入 PolarDB-X 的吞吐。

利用 EMQX 海量设备数据接入能力和数据集成能力,以及 PolarDB-X 的超高并发写入和海量存储支持能力,这一集成方案可以应对连接数规模与采集点数量庞大的物联网场景。

EMQX + PolarDB-X 方案搭建步骤

接下来我们通过几个场景看一下如何进行方案搭建。

场景介绍

存储设备在线状态

在上下线时更新设备状态,并记录上线/下线时间,如果设备是初次连接则先将设备信息插入数据库。

需要使用 EMQX 的客户端上下线事件,对应的事件主题为 $event/client_connected 与 $event/client_disconnected。

PolarDB-X 表结构如下:

-- 设备在线状态表
create table clients (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  `ip_address` varchar(255) DEFAULT NULL,
  `status` tinyint(1) DEFAULT 0,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`),
  unique key(`clientid`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

记录设备事件

记录设备上下线历史、订阅/取消订阅事件,需要记录设备客户端 ID、事件发生时间等关键新轩逸,订阅/取消订阅事件还需要记录操作的主题。

EMQX 中对应的事件主题为$event/client_connected 、$event/client_disconnected 、$event/session_subscribed 、$event/session_unsubscribed。

PolarDB-X 表结构如下:

-- 设备历史事件表
create table client_events (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `event` char(20) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

存储设备消息

将指定主题的消息存储到数据库当中,需要记录设备客户端 ID、消息主题、消息 Payload 信息。

PolarDB-X 表结构如下:

-- IoT 数据记录表
create table messages (
  `id` bigint(11) auto_increment NOT NULL,
  `clientid` varchar(255) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `payload` BLOB DEFAULT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  primary key (`id`)
) engine = InnoDB default charset = utf8 partition by hash(id) partitions 2;

操作步骤

具体的操作步骤如下:

1.首先需要部署 PolarDB-X,按照文档中的快速入门指南部署即可。

2.通过 MySQL Client 连接到 PolarDB-X 并使用场景中提供的 SQL 创建数据表。

3.在本地启动 EMQX,启动成功之后,访问 http://localhost:18083 打开 Dashboard。如果需要中文显示可以到 System → Settings 中将语言更改为简体中文。

4.在数据集成中创建规则,创建一条能够处理场景中所需事件以及 t/# 消息主题的规则。

EMQX 的客户端事件通过事件主题触发,客户端会在连接成功时触发执行规则 SQL,规则可以从上下文中获取当前客户端中相关信息,包括 客户端 ID、事件名称、连接时间、连接属性等,通过调试功能可以查看规则执行结果。

规则 SQL:

SELECT
  *
FROM
  "$events/client_connected",
  "$events/client_disconnected",
  "$events/session_subscribed",
  "$events/session_unsubscribed",
  "t/#"

5.添加动作,选择消息重发布动作,将规则处理结果转发至 emqx_polardb 主题由第三方脚本处理。除此之外,可以添加一个控制台输出动作用于调试,在企业版中还可以可以配置将结果直接写入 PolarDB-X。

6.接下来,使用 Dashboard 上的 WebSocket 客户端进行测试,订阅 emqx_polardb 主题确认是否可以从中获取到数据。

7.在第三方脚本中实现 PolarDB-X 数据插入逻辑,关键代码如下:

// 消息处理函数
async function handleMessage(topic, payload) {
  let data = null
  try {
    data = JSON.parse(payload)
  } catch (e) {
    console.log('message not a JSON')
  }
  if (data === null) {
    return
  }
  // 根据事件类型处理
  const event = data.event
  // 上下线记录
  if (event === 'client.connected' || event === 'client.disconnected') {
    const status = event === 'client.connected' ? 1 : 0

    // 更新设备在线状态
    // 借助 ON DUPLICATE KEY UPDATE 特性实现
    await connection.execute(
      `INSERT INTO clients (clientid, username, ip_address, status) VALUES
    ('${data.clientid}', '${data.username}', '${data.sockname}', ${status}) ON DUPLICATE KEY UPDATE status = ${status}, ip_address = '${data.sockname}'`)

    // 保存设备上下线记录
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, '')`, [data.clientid, data.event])

  } else if (event === 'message.publish') {

    // 保存设备消息
    await connection.execute(`INSERT INTO messages(clientid, topic, payload) VALUES
       (?, ?, ?);`, [data.clientid, data.topic, data.payload])
  } else {

    // 保存设备订阅/取消订阅记录,记录操作的主题
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, ?)`, [data.clientid, data.event, data.topic])
  }
  console.log(`event ${event} saved ok`)
}

8.启动第三方脚本,将整个方案运行起来。

$ node worker.js
client 0 ready...
event session.subscribed saved ok
client 1 ready...
event client.connected saved ok
event session.subscribed saved ok
client 2 ready...
event client.connected saved ok
event session.subscribed saved ok
client 3 ready...
event session.subscribed saved ok
event client.connected saved ok
client 4 ready...
event client.connected saved ok
event session.subscribed saved ok
client 5 ready...
event client.connected saved ok
client 6 ready...
event client.connected saved ok
event session.subscribed saved ok
event session.subscribed saved ok
client 7 ready...
event client.connected saved ok
event session.subscribed saved ok
event client.connected saved ok
client 8 ready...
event session.subscribed saved ok
event client.connected saved ok
client 9 ready...
event session.subscribed saved ok

附录:第三方脚本完整示例

此处使用 Node.js 来实现消息的订阅与 PoarDB-X 插入,第三方脚本可以是任意语言编写的应用,建议与 EMQX、PolarDB-X 部署在同一个内网网络。

// worker.js
const mqtt = require('mqtt')
const mysql = require('mysql2')

const config = {
  host: 'localhost',
  port: 10743,
  username: 'polardbx_root',
  password: '****',
  database: 'emqx_iot'
}

// 建立与 PolarDB-X 的连接
const connection = mysql.createConnection({
  host: config.host,
  user: config.username,
  password: config.password,
  database: config.database,
  port: config.port,
})

function createClient() {
  return new Promise((resolve, reject) => {
    const client = mqtt.connect('mqtt://localhost:1883')
    client.on('connect', () => {
      resolve(client)
    })
  })
}

async function handleMessage(topic, payload) {
  let data = null
  try {
    data = JSON.parse(payload)
  } catch (e) {
    console.log('message not a JSON')
  }
  if (data === null) {
    return
  }
  const event = data.event
  // 上下线记录
  if (event === 'client.connected' || event === 'client.disconnected') {
    const status = event === 'client.connected' ? 1 : 0
    // 写入 or 更新设备表
    await connection.execute(
      `INSERT INTO clients (clientid, username, ip_address, status) VALUES
    ('${data.clientid}', '${data.username}', '${data.sockname}', ${status}) ON DUPLICATE KEY UPDATE status = ${status}, ip_address = '${data.sockname}'`)
    // 插入历史表
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, '')`, [data.clientid, data.event])
  } else if (event === 'message.publish') {
    // 插入消息表
    await connection.execute(`INSERT INTO messages(clientid, topic, payload) VALUES
       (?, ?, ?);`, [data.clientid, data.topic, data.payload])
  } else {
    // 订阅/取消订阅记录
    await connection.execute(`INSERT INTO client_events (clientid, event, topic) VALUES
    (?, ?, ?)`, [data.clientid, data.event, data.topic])
  }
  console.log(`event ${event} saved ok`)
}

async function main() {
  // 初始化 10 个订阅客户端
  for (let i = 0; i < 10; i++) {
    const client = await createClient()
    client.subscribe('$share/1/emqx_polardb')
    client.on('message', handleMessage)
    console.log(`client ${i} ready...`)
  }
}

main()

结语

PolarDB-X 的分布式特性以及存储计算分离架构为其带来了水平扩展、分布式事务、混合负载等能力,与同样是分布式的 MQTT 消息服务器 EMQX 结合使用,可以打造真正的可伸缩物联网应用,应对从数千到数千万的设备接入。

除了本文分享的开源版方案, EMQX 企业版以及全托管的云服务版本EMQX cloud中还提供了 MySQL 数据集成能力,可以直接通过 PolarDB-X 兼容语法完成数据集成,更加简单高效地实现物联网数据一站式连接、移动与存储分析。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/build-a-one-stop-iot-data-solution-with-emqx-and-polardb-x

目录
相关文章
|
3月前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
在9月20日2024云栖大会上,阿里云智能集团副总裁,数据库产品事业部负责人,ACM、CCF、IEEE会士(Fellow)李飞飞发表《从数据到智能:Data+AI驱动的云原生数据库》主题演讲。他表示,数据是生成式AI的核心资产,大模型时代的数据管理系统需具备多模处理和实时分析能力。阿里云瑶池将数据+AI全面融合,构建一站式多模数据管理平台,以数据驱动决策与创新,为用户提供像“搭积木”一样易用、好用、高可用的使用体验。
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
|
3月前
|
人工智能 关系型数据库 分布式数据库
拥抱Data+AI|“全球第一”雅迪如何实现智能营销?DMS+PolarDB注入数据新活力
针对雅迪“云销通App”的需求与痛点,本文将介绍阿里云瑶池数据库DMS+PolarDB for AI提供的一站式Data+AI解决方案,助力销售人员高效用数,全面提升销售管理效率。
|
7天前
|
SQL 关系型数据库 分布式数据库
PolarDB 开源基础教程系列 7.1 快速构建“海量逼真”数据
本文介绍了如何使用PostgreSQL和PolarDB快速生成“海量且逼真”的测试数据,以满足不同业务场景的需求。传统数据库测试依赖标准套件(如TPC-C、TPC-H),难以生成符合特定业务特征的复杂数据。通过自定义函数(如`gen_random_int`、`gen_random_string`等)、SRF函数(如`generate_series`)和pgbench工具,可以高效生成大规模、高仿真度的数据,并进行压力测试。文中还提供了多个示例代码展示.
25 7
|
1月前
|
存储 关系型数据库 分布式数据库
PolarDB PG 版冷热数据分层功能介绍
本文介绍了云原生数据库PolarDB PG版的冷热数据分层存储功能,涵盖其原理、特性及最佳实践。冷热分层存储通过将冷数据归档至OSS(对象存储服务),实现低成本高效存储,同时保持SQL操作透明性和性能优化。支持多种分层模式,如表与索引分层、大字段独立归档等,并提供压缩和缓存机制以提升访问速度。此外,还介绍了如何通过DDL语句轻松转存数据至OSS,以及一系列最佳实践,包括自动冷热分层、无锁表转存和一键转存等功能。
133 36
|
4月前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
阿里云瑶池在2024云栖大会上重磅发布由Data+AI驱动的多模数据管理平台DMS:OneMeta+OneOps,通过统一、开放、多模的元数据服务实现跨环境、跨引擎、跨实例的统一治理,可支持高达40+种数据源,实现自建、他云数据源的无缝对接,助力业务决策效率提升10倍。
|
5月前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
阿里云数据库重磅升级!元数据服务OneMeta + OneOps统一管理多模态数据
|
5月前
|
存储 物联网 关系型数据库
PolarDB在物联网(IoT)数据存储中的应用探索
【9月更文挑战第6天】随着物联网技术的发展,海量设备数据对实时存储和处理提出了更高要求。传统数据库在扩展性、性能及实时性方面面临挑战。阿里云推出的PolarDB具备高性能、高可靠及高扩展性特点,能有效应对这些挑战。它采用分布式存储架构,支持多副本写入优化、并行查询等技术,确保数据实时写入与查询;多副本存储架构和数据持久化存储机制保证了数据安全;支持动态调整数据库规模,适应设备和数据增长。通过API或SDK接入IoT设备,实现数据实时写入、分布式存储与高效查询,展现出在IoT数据存储领域的巨大潜力。
121 1
|
6月前
|
存储 SQL Cloud Native
揭秘!PolarDB-X存储引擎如何玩转“时间魔术”?Lizard多级闪回技术让你秒回数据“黄金时代”!
【8月更文挑战第25天】PolarDB-X是一款由阿里巴巴自主研发的云原生分布式数据库,以其高性能、高可用性和出色的可扩展性著称。其核心竞争力之一是Lizard存储引擎的多级闪回技术,能够提供高效的数据恢复与问题诊断能力。本文通过一个电商公司的案例展示了一级与二级闪回技术如何帮助快速恢复误删的大量订单数据,确保业务连续性不受影响。一级闪回通过维护最近时间段内历史数据版本链,支持任意时间点查询;而二级闪回则通过扩展数据保留时间并采用成本更低的存储方式,进一步增强了数据保护能力。多级闪回技术的应用显著提高了数据库的可靠性和灵活性,为企业数据安全保驾护航。
86 1
|
6月前
|
数据库 Windows
超详细步骤解析:从零开始,手把手教你使用 Visual Studio 打造你的第一个 Windows Forms 应用程序,菜鸟也能轻松上手的编程入门指南来了!
【8月更文挑战第31天】创建你的第一个Windows Forms (WinForms) 应用程序是一个激动人心的过程,尤其适合编程新手。本指南将带你逐步完成一个简单WinForms 应用的开发。首先,在Visual Studio 中创建一个“Windows Forms App (.NET)”项目,命名为“我的第一个WinForms 应用”。接着,在空白窗体中添加一个按钮和一个标签控件,并设置按钮文本为“点击我”。然后,为按钮添加点击事件处理程序`button1_Click`,实现点击按钮后更新标签文本为“你好,你刚刚点击了按钮!”。
527 0
|
1月前
|
Cloud Native 关系型数据库 分布式数据库
让PolarDB更了解您--PolarDB云原生数据库核心功能体验馆
让PolarDB更了解您——PolarDB云原生数据库核心功能体验馆,由阿里云数据库产品事业部负责人宋震分享。内容涵盖PolarDB技术布局、开源进展及体验馆三大部分。技术布局包括云计算加速数据库演进、数据处理需求带来的变革、软硬协同优化等;开源部分介绍了兼容MySQL和PostgreSQL的两款产品;体验馆则通过实际操作让用户直观感受Serverless、无感切换、SQL2Map等功能。
107 7

相关产品

  • 物联网平台