基于 EMQX + Tablestore 打造车辆元数据管理平台

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 车辆网场景中的云端架构分享与案例实践。

物联网数据分类

近些年来物联网技术高速发展,广泛应用到了诸如智慧出行、智慧工业、智能家居等场景中,无论是何种场景的应用,都离不开对数据的“采集-处理-存储-分析”这套流程。但是不同的应用,其自身的数据特性和业务需求大不相同,那么对于实现上述流程所需的产品组件也会有所区别。总体来说,我们可以按照应用和数据特点分为三类场景:时序数据、消息数据、元数据

上图展示了物联网场景的三大类数据,每类数据有着不同的数据特点和应用场景,先看一下如何区分不同类型的数据场景。


以车联网场景为例,车辆定时会更新当前的最新状态信息,例如发动机当前转速、当前车速等,这些描述车辆最新状态信息的数据我们称之为元数据。在智能汽车行驶过程中,车辆的状态数据会随着时间的变化而变化,例如车辆一段时间内的车速、胎压等,这些描述车辆历史状态信息的数据我们称之为时序数据。还有一种数据场景是对车辆行为进行控制的指令消息,例如调节车辆空调温度指令下发以及车辆执行命令后的结果反馈,这些控制指令的上下行我们称之为消息数据


数据特点和应用场景决定了对存储系统的差异化需求,选择与之对应的合适的存储架构对业务应用系统的性能有着非常重要的影响。本文章主要以车辆网场景为例,分享元数据的存储架构。下面我们从元数据的应用场景出发,先来看看元数据场景中的有哪些具体的业务需求,以及如何选择合适的存储组件和实现方案。


车联网元数据场景需求

车辆在行驶的过程中会定时上报大量的状态数据,比如车架号、行驶速度、发动机转速、车内温度等等。业务上基于这些车辆状态信息,用于车辆行为分析、行驶状态监控、轨迹分析、车辆状态检索等。要实现这些需求,就必须对车辆的元数据信息进行持久化存储,车辆元数据包含了车辆的固有属性最新状态数据,比如 车架号、外壳颜色、发动机型号  等就属于元数据中的固有属性;而 车辆当前 GPS 坐标、发动机的当前转速、车辆当前运行速度 这些就属于元数据中的最新状态数据。通过存储车辆元数据,可以实现复杂的业务需求,比如统计某一个区域的车辆数用于判断道路拥挤程度,便于车辆调度。那么就需要存储所有车辆的最新 GPS 坐标,这个坐标是随着车辆的运动频繁变化的;再比如通过监测车速、发动机转速等最新数据的实时变化,可用于判断车辆行为或者状态是否出现了异常,做到及时告警处理。总结来看大致可分为如下几类基本业务需求:

  1. 车辆元数据定时上报到数据网关,并且转发到下游存储。
  2. 车辆元数据会频繁低延迟更新。
  3. 根据车架号查询某辆汽车的当前状态。
  4. 按照一个或多个车辆属性(例如 gps 坐标、车速)等条件检索车辆,或者大批量导出车辆信息(车辆圈选)。
  5. 实时监测到车辆状态的变化,并且及时处理。并且保存异常行驶日志以供后续调查。


存储侧的要求

根据上述的场景需求,我们可以总结成对存储侧的几个要求:

  1. 大规模数据存储。车辆元数据场景中,一辆汽车对应着一条元数据,而车辆数通常会达到千万或者亿级。
  2. 高并发实时更新。车辆状态的变化对应的是元数据的属性更新,而整体的写入并发会对存储侧有非常大的压力。
  3. 任意字段组合检索。要求存储侧能够支持丰富索引,可以按照车辆的参数来检索。
  4. 并发检索。对应的是车辆圈选场景,在大批量导出车辆信息的场景下,需要支持并发检索能力用于提升导出速度。
  5. 元数据更新实时计算。能够实时探测数据的变化,并能够对更新后的数据进行实时计算。

存储选型

元数据存储场景对数据库的规模、性能、查询能力等各个方面都有较高的要求。通常来说,关系型数据库 MySQL 都是作为存储选型的第一选项,这是因为 MySQL 是最为通用,大家也最为熟悉的数据库产品。然而 MySQL 一般只在小规模的数据存储(千万级)和低并发的数据更新(一万内 QPS)场景下表现优异,当规模变大时,MySQL 的性能会急剧下降,这显然不满足元数据存储的要求。到这里,大家可能会想到使用开源数据库 HBase,因为 HBase 的分布式架构能够支持大规模数据存储和写入,在这一点上是可以满足元数据存储需求的。但是 HBase 只支持了基于主键(RowKey)查询,无法在属性列上建索引查询,所以在车辆检索、圈选时的效率极低,极端情况下可能会以全表扫描加数据过滤的这种方式来查询数据,这一点上无法满足上述的需求。最后我们来看下表格存储 Tablestore,Tablestore 是一款云原生 Serverless 的结构化数据存储,原生具备大规模的数据存储和低延迟数据更新,同时提供了多元索引功能,能够支持任意字段组合检索,是物联网平台底层依赖的核心数据存储系统,支撑了亿级车辆的元数据管理。下面对上述几个存储产品在实现元数据存储场景中的能力做一个总结:


存储规模

实时更新

任意字段组合检索

并发检索

数据更新实时计算

MySQL

支持(难以扩展)

支持(单列索引)

不支持

支持

HBase

支持(水平扩展)

不支持

不支持

支持

Tablestore

支持(水平扩展)

支持

支持

支持


Tablestore 技术介绍

功能模块

表格存储 Tablestore 是一款云上的结构化数据存储产品,具备极为丰富的产品功能和生态,提供了物联网存储 Iotstore、宽表引擎、多元索引、通道等功能,可以满足元数据、时序数据、消息数据场景基本业务需求。对于时序数据场景和消息数据场景的解决方案不会在本文章中展开讲解,先来看一下 Tablestore 的功能模块分别可以实现元数据场景中的哪些需求。



上图中展示了 Tablestore 的三个主要的功能模块。可以看出 Tablestore 采用了双存储引擎的架构,不同的存储引擎负责解决的读写需求侧重点不同。宽表引擎是一个分布式的 schema less 的数据表,主要负责了车辆元数据的存储与更新。而索引引擎自动同步了宽表引擎的全增量数据,采用倒排索引、空间索引等结构,提供数据检索、分析等能力,主要负责车辆检索与圈选。通道建立在宽表上,提供了宽表中全量、增量数据的订阅消费能力,Tablestore 基于通道服务实现了 Flink source connector ,从而实现车辆实时监控的需求。


技术原理

宽表引擎如何满足高并发元数据更新

上图描述了一条元数据写入请求在 Tablestore 服务端的处理过程。写入请求到达 Tablestore 服务端后,路由节点会按照写入数据的主键值进行范围分区(range-partitioning)。不同范围的数据将被转发到不同的分区节点进行处理和存储。而当某个分区节点出现负载过高时,服务端将自动进行分区分裂(spliting),使得服务的读写能力水平扩展,整个过程无需人工干预,从而极大地提升了整体服务的吞吐量。

索引引擎如何加速大规模车辆圈选

车辆圈选的一个重要的特点就是查询返回的数据结果集比较大,通常会达到百万行级别。在这种场景下,如果按照常规的 SQL 查询,那只能采用 Limit/Offset 这种连续翻页的方式来返回数据,而这种单并发的串行返回的性能无疑是非常低的,所以 Tablestore 针对此场景设计了一个并发数据导出的接口 ParallelScan。ParallelScan 接口是基于多元索引查询并且在服务端检索数据处理逻辑做了优化,如下图所示。

首先客户端在圈选数据之前会调用 ComputeSplit 接口,这个接口会按照索引中的数据存储量返回一个并发值。客户端启动多个异步线程以相同的条件来检索数据,唯一的区别是每个线程都带有一个并发 ID,服务端会根据并发 ID 来检索部分数据分区。这样做的好处是每个线程检索的数据量将大幅度减少,并且将原本一个比较大的数据结果集拆分成了多个子集,每个线程负责一个或者是多个子集的查询返回。从而实现车辆元数据圈选速度的提升。


EMQX + Tablestore + Flink 方案

基于云上搭建的元数据管理平台架构如下图所示



上述的架构图中包含了四个模块:EMQX Cloud/Enterprise ,Kafka,Tablestore,Flink。各个模块在架构中承担的角色功能如下:

  • EMQX Cloud/Enterprise负责车辆的接入、消息桥接到下游应用 Kafka。EMQX Enterprise 是 EMQ 推出的企业级云原生分布式物联网接入平台,通过一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力(随处运行、无限连接、任意集成),助力企业快速构建关键业务的 IoT 平台与应用。EMQX Cloud 是全托管的 MQTT 公有云服务,在提供可靠、实时的海量物联网设备连接、事件消息处理、IoT 数据侨接等能力基础上,免除用户基础设施管理维护负担,加速物联网应用开发。本文采用的是 EMQX Cloud。
  • Kafka。作为消息处理的中间件,负责数据 ETL 并转发到下游存储 Tablestore。
  • Tablestore。负责了车辆元数据的存储、更新、查询、圈选,提供增量通道服务与 Flink 进行对接。
  • Flink。负责实时消费 Tablestore 的车辆元数据表变化,判断和处理异常数据,将异常数据回写到 Tablestore 中进行存储。


方案实现

案例介绍

假定某汽车厂商管理一千万台智能汽车,现需要将车辆信息接入到云端,并借助云端的架构来打造一个车辆管理平台。实现状态汇报、状态检索、车辆查询、车辆圈选、车辆监控这些业务需求。我们使用上述 EMQX Cloud + Tablestore + Flink 的方案架构来实现。车辆元数据规模与数据结构如下:

元数据规模 : 一千万行

数据结构 : 每辆车定时上报不同维度的属性(里程、坐标等),schema如下

//car metadata schema sample
data = {
  "vin_id" : STRING ,         # 车架号,表示唯一车辆
  "mileage" : DOUBLE ,        # 当前里程数
  "speed" : DOUBLE ,          # 当前速度
  "pressure_level" : STRING , # 压力值水平
  "engine_rpm" : INTEGER ,     # 发动机转速
  "inner_temp" : DOUBLE ,     # 车内温度
  "outer_temp" : DOUBLE ,     # 车外温度
  "gps" : STRING ,            # 车辆 gps 坐标,如 30.23,52.78
  "ignition_state" : STRING , # 点火状态,0-未点火,1-已点火
  "bms_soc" : DOUBLE ,        # 排放值 
}


开通服务

我们使用 EMQX Cloud + Kafka + Tablestore + Flink 组合架构来实现上述场景,首先开通如下服务:

  1. 开通并部署 EMQX Cloud 专业版项目。
  2. 开通阿里云 Kafka 消息队列服务。
  3. 开通阿里云表格存储 Tablestore 服务。
  4. 开通阿里云实时计算 Flink 服务。


设备接入和数据上报

车辆设备需要基于 MQTT 协议上报车辆元数据,可以使用 EMQX SDK 或者 MQTT X 客户端工具 开进行数据上报。本文中采用了 MQTT X  来模拟车辆设备上报数据的过程。我们使用 MQTT X 客户端创建一个 mqtt 连接到 EMQX Cloud 云平台,这个过程模拟了一台车辆设备连接到了 EMQX 上。

然后基于此 MQTT 连接来上报一条车辆元数据信息。例如下图中将车辆的一条元数据 data 上报到了主题名为 t 的 topic 中。topic 作为消息的载体,可以用于将不同的车辆设备消息进行分类处理。


数据转发到消息中间件

车辆元数据 data 上报到 EMQX Cloud 后,我们需要配置控制数据流转的规则引擎,将指定 topic 中的数据转发到消息中间件 Kafka 中。数据流转的过程如下图所示:

image.png

规则引擎包含了两个部分,分别是 规则 SQL响应动作。以本文的案例来说:

  • 规则 SQL。包含了车辆元数据字段和消息主题 topic,例如从 topic =t 的主题中,获取车辆所有属性字段,推送到响应动作中进行处理。
SELECT payload.vin_id as vin_id, payload.mileage as mileage, ...
FROM"t/#"
  • 响应动作。配置数据转发的下游资源。本案例中创建了阿里云 Kafka 资源,并将数据转发到 Kafka 里主题为 car_data_topic  中。如下


Kafka 转发数据到 Tablestore 存储

Kafka 接收到来自 EMQX Cloud 的车辆元数据后,需要转发到下游 Tablestore 中进行持久化存储,用于后续的数据检索和数据分析。在此过程中,可以根据业务的具体需求对数据进行格式转换。Tablestore 团队提供了与 Kafka 实现自动对接的 tablestore sink connector,在阿里云 Kafka 控制台中可以一键配置 tablestore connector 来实现数据转发存储。Kafka 转发数据到 Tablestore 的过程如下图所示 :

本案例中将 EMQX Cloud 中的车辆元数据转发到了阿里云 Kakfa 的 car_data_topic 中,并以 car_data_topic 作为 connector 的数据源,将数据写入到 Tablestore 的车辆元数据表 car_metatable 里进行持久化存储。如下图所示:


车辆查询

在 Tablestore 中基于唯一车架号 vin_id 来查询车辆。Tablestore 提供了 SQL、SDK 和控制台等多种数据访问方式。下面通过 SQL 来查询车辆:

  • 查询 vin_id = “000000fe-79bc-43ac-9afb-c0cfce5a0b3c” 的车辆最新状态信息
select*from `car_metatable` where vin_id ='000000fe-79bc-43ac-9afb-c0cfce5a0b3c';


车辆检索和圈选

车辆检索

车辆检索和圈选依赖 Tablestore 的多元索引能力,分别采用 search 和 parallelScan 接口来实现。先来说车辆检索,车辆检索通常不会返回比较大的数据结果集,即使符合查询条件的数据量很大,也会采取分页的策略返回。针对于此场景,Tablestore 的多元索引功能采用了倒排索引、空间索引等数据结构来加速数据检索速度,例如下面一个例子:

  • 检索车辆当前行驶速度大于 80.0 km/h ,并且排放值大于 2.0 mg 的车辆位置
select vin_id,gps from `car_metatable` where speed >80.0and bms_soc >2.0 limit 10;

车辆圈选

车辆圈选通常返回的数据量比较大,如果使用 SQL 只能单并发查询和返回,这样性能显然是不高的。针对这个场景,Tablestore 推出了 parallelScan 接口来实现。上文介绍 Tablestore 技术原理提到了 parallelScan 接口的执行过程。例如圈选以某一个坐标为中心点,周围一定范围内所有的车辆信息。

  • 圈选以 gps 坐标点为 73.00 123.00 中心,半径 5 公里范围内所有车辆信息
intsplitsSize=client.computeSplits(computeSplitsRequest).getSplitsSize();//获取并发数//多线程执行run(){
ParallelScanRequestparallelScanRequest=ParallelScanRequest.newBuilder().tableName(tableName).indexName(indexName)
     .scanQuery(ScanQuery.newBuilder()
                        .query(QueryBuilders.geoDistance("gps")//字段名                                            .centerPoint("73.00,123.00")//中心点坐标                                            .distanceInMeter(5000.00))//周围5公里                        .maxParallel(splitsSize)
             .currentParallelId(currentParallelId)//当前并发ID            .build())
RowIteratoriterator=client.createParallelScanIterator(parallelScanRequest);//返回当前并发命中的结果集}
//多线程执行

车辆圈选的结果被分为了四个异步线程来返回,每个线程包含了部分结果,如下图所示:


车辆状态变化监控与记录

车辆实时监控依赖于 Tablestore 的通道服务能力,通道服务可以直接对接实时计算 Flink 来实现元数据流计算,如下图所示:

在实时计算中创建了源表、结果表和流计算作业。其中源表依赖 Tablestore 中车辆元数据中创建增量通道服务,通道服务将车辆元数据的实时变化数据推送到 Flink 计算引擎中进行流计算作业。而本案例中流计算作业将异常的车辆状态数据保存到了 Tablestore 的车辆异常结果表中,后续可通过异常结果表中查询某台汽车的异常记录信息。针对于异常数据的处理还有一种业务场景是推送到 Kafka 等消息队列中进行处理,如设置告警机制、短信通知驾驶员等。

为了更直观地看到流计算作业效果,我们可以在车辆元数据表中更新一条车辆元数据:

update `car_metatable` set `speed` =130.0where `vin_id` ="000000fe-79bc-43ac-9afb-c0cfce5a0b3c"

经过流计算引擎处理后,会检测到当前车辆已经超速,并超速记录信息自动写入到异常结果数据表中。查询异常结果表:

select*from `car_error_record` where `vin_id` ="000000fe-79bc-43ac-9afb-c0cfce5a0b3c"


总结

本文分享了车辆元数据的场景特点和架构实践。设备元数据场景中业务需求对存储组件的存储规模、查询方式以及计算性能有很高的要求, 从存储、查询、计算几个维度对比 MySQL、HBase、Tablestore 三款产品,最终选择了采用表格存储 Tablestore 作为元数据场景核心存储库,并运用 Tablestore 上下游生态,打造出 EMQX Cloud + Tablestore + 实时计算 Flink 组合作为实现车辆元数据存储的云端架构。


联系我们

关于元数据场景的相关介绍就到这里了,后续我们还会分享物联网中时序数据、消息数据的场景与方案。若对本文内容有疑问或希望进一步了解物联网场景里的各种数据存储的方案实现,欢迎加入钉钉群:“物联网存储 IoTstore 开发者交流群”。群内提供免费的在线专家服务,欢迎扫码加入,群号:44327024,或扫下方二维码加入。

目录
相关文章
|
7月前
|
传感器 数据可视化 数据管理
数据管理平台Splunk Enterprise本地部署结合内网穿透实现远程访问
数据管理平台Splunk Enterprise本地部署结合内网穿透实现远程访问
80 0
|
1月前
|
人工智能 Cloud Native 数据管理
媒体声音|阿里云王远:一站式数据管理平台的智能化跃迁
在DTCC 2024大会上,阿里云数据库产品管理与技术架构部负责人王远与IT168 & ITPUB特约嘉宾薛晓刚就数据库与AI技术的融合、云原生数据库的新趋势及向量数据库的支撑能力等热点话题进行了深入探讨。王远认为,Data+AI不仅是一个概念,已进入实际落地阶段。在智能化时代,单一数据库引擎难以满足多元业务需求,需要构建统一的数据管理能力,以支持不同工作负载。阿里云通过“瑶池”数据库品牌,提供云原生、平台化、一体化和智能化的数据库解决方案,助力用户应对复杂的数据管理挑战。
|
2月前
|
数据采集 DataWorks 数据管理
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第10天】随着大数据技术的发展,企业对数据处理的需求日益增长。阿里云推出的DataWorks是一款强大的数据集成和管理平台,提供从数据采集、清洗、加工到应用的一站式解决方案。本文通过电商平台案例,详细介绍了DataWorks的核心功能和优势,展示了如何高效处理大规模数据,帮助企业挖掘数据价值。
134 1
|
2月前
|
数据采集 SQL DataWorks
DataWorks不是Excel,它是一个数据集成和数据管理平台
【10月更文挑战第5天】本文通过一家电商平台的案例,详细介绍了阿里云DataWorks在数据处理全流程中的应用。从多源数据采集、清洗加工到分析可视化,DataWorks提供了强大的一站式解决方案,显著提升了数据分析效率和质量。通过具体SQL示例,展示了如何构建高效的数据处理流程,突显了DataWorks相较于传统工具如Excel的优势,为企业决策提供了有力支持。
128 3
|
4月前
|
存储 人工智能 数据管理
OSS&Tablestore 向量检索能力全新升级,重塑AI时代数据管理
阿里云 OSS Indexing 发布了向量索引和检索能力。该功能除了可以对 OSS Meta 进行检索之外,还可以对多媒体数据元信息、用户自定义元数据以及向量语义进行检索。OSS Indexing 功能,是依托阿里云表格存储 TableStore 提供的索引存储和检索能力而构建的。表格存储针对成本、规模、召回率等挑战,发布了低成本、大规模、高性能、高召回率的向量检索服务,能以较低成本支持千亿规模数据的存储和检索。
243 8
|
4月前
|
存储 人工智能 NoSQL
OSS&Tablestore 向量检索能力全新升级,重塑AI时代数据管理
近日,阿里云成功举办了“AI驱动:数据管理的进化与创新 ”线上新品发布会。发布会上,阿里云存储产品向量检索能力全新升级,重塑AI时代数据管理。
|
4月前
|
机器学习/深度学习 前端开发 数据挖掘
基于Python Django的房价数据分析平台,包括大屏和后台数据管理,有线性、向量机、梯度提升树、bp神经网络等模型
本文介绍了一个基于Python Django框架开发的房价数据分析平台,该平台集成了多种机器学习模型,包括线性回归、SVM、GBDT和BP神经网络,用于房价预测和市场分析,同时提供了前端大屏展示和后台数据管理功能。
124 9
|
5月前
|
Java 数据管理 API
构建基于Spring Boot的数据管理平台
构建基于Spring Boot的数据管理平台
|
7月前
|
关系型数据库 Linux Docker
datahub元数据管理平台从安装到使用一站式指南(未完)_datahub安装
datahub元数据管理平台从安装到使用一站式指南(未完)_datahub安装
datahub元数据管理平台从安装到使用一站式指南(未完)_datahub安装
|
7月前
|
数据管理 关系型数据库 MySQL
数据管理DMS产品使用合集之DMS可以接入其他平台的MySQL数据库,是否还支持无感知变更功能
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。