车联网场景下海量车辆状态数据存储实践

本文涉及的产品
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 随着通信技术、计算机技术的不断发展,移动通信正在从人与人(H2H)向人与物(H2M)以及物与物(M2M)的方向发展,“万物互联”的概念正在逐步覆盖到各行各业中,例如智能家居、智能农业、智能交通、智能物流等领域。目前,车联网技术已经先行一步,在行车安全、交通管理、生活服务等方面得到充分应用。车联网技术包括了车辆终端、云端、无线通信等方面。车辆终端实时产生大量车辆状态数
作者:李欣

背景

随着通信技术、计算机技术的不断发展,移动通信正在从人与人(H2H)向人与物(H2M)以及物与物(M2M)的方向发展,“万物互联”的概念正在逐步覆盖到各行各业中,例如智能家居、智能农业、智能交通、智能物流等领域。目前,车联网技术已经先行一步,在行车安全、交通管理、生活服务等方面得到充分应用。

车联网技术包括了车辆终端、云端、无线通信等方面。车辆终端实时产生大量车辆状态数据,例如里程、油量、胎压、坐标、温度、速度和操作等等。通过对这些数据的检索、分析,可以在环境感知、驾驶决策、行驶安全、出行规划等各个方面发挥作用。但是在车联网场景下,技术实现上也会面临几个难题:
● 数据写入并发度高:数据写入并发数取决于行驶中的车辆数。
● 数据检索延迟敏感:若数据检索出现延迟,则无法感知到最新的车辆状态信息,可能出现无法预料的后果。
● 数据规模庞大:每个车辆终端会记录几十个甚至更多维度的状态数据,周期性保存车辆的状态信息获取车辆状态时序数据。数据规模轻松达到亿行,甚至百亿行以上。

方案

MySQL + LogStash + Elasticsearch 方案

MySQL 自身具备强事务,可作为车辆状态数据存储库。数据量在一定级别内时,可满足业务的写需求。然而 MySQL 中的多列索引需要满足前缀匹配才能发挥效果,当查询条件不符合多列索引匹配规则时,可能会退化成全表扫描。这样一个慢 SQL 会增加服务端负载,使得 MySQL 服务性能降低。因此很容易想到需要一个新的索引引擎来做检索类查询的分流,将一些复杂的查询、分析类需求放到索引引擎中来完成。

Elasticsearch 是一款强大开源搜索和分析引擎,支持丰富的索引类型,通过引入 Elasticsearch 来提供检索、分析的能力,可以有效降低 MySQL 服务端负载。MySQL 数据通过 LogStash 或 Canal 工具同步到 Elasticsearch中。

方案的整体架构如下图:
c1.png

  1. MySQL:作为数据存储主库,需能够支持高并发的状态数据变更写入,支持平台侧基于主键查询的能力。
  2. LogStash:作为 MySQL 与 Elasticsearch 之间的桥梁,负责将 mysql binlog 转换成 Elasticsearch 的数据结构并写入Elasticsearch。
  3. Elasticsearch:作为存储系统索引引擎,负责承载检索、分析聚合类的请求流量。

MySQL + Elasticsearch的方案很好地解决了检索、分析类的业务需求。不过整个存储系统仍然存在一些问题:

  1. Elasticsearch 集群搭建与运维复杂度高,一旦出现问题非常难排查。
  2. Elasticsearch 成本较高。需要根据业务规模预测机器数,无法做到弹性扩容。

MySQL + Canal + Tablestore 方案

Tablestore 是阿里云自研的一款多模型结构化数据存储,能够支持 PB 级存储、千万 TPS 写入以及强大的数据检索、分析能力。Tablestore 具备两种存储引擎,数据表基于 LSM-tree 架构能支持高并发低延迟的读写能力,多元索引基于倒排索引、空间索引能支持丰富的数据检索方式,例如多列组合查询、模糊查询、匹配查询、范围查询等等。

在车辆网场景中,Tablestore 的数据表可支持千万级的并发读写,可实现车辆状态实时更新。多元索引引擎提供了百亿行数据毫秒级的检索的能力,可实现根据车辆多种状态查询。Tablestore 服务端实现了数据表与多元索引之间自动数据同步,保证了两者的最终一致性。Tablestore 方案实现车联网存储系统的架构图如下:

c2.png

下面会重点给大家介绍下如何实现:MySQL + Canal + Tablestore 的方案。

实现

开通 Tablestore 服务和创建表

  1. 创建按量模式实例,填写实例名。详情请参考 如何开通表格存储服务、如何创建 Tablestore 实例

c3.png

  1. 创建 auto_mobile 表。保存车辆状态数据信息。详情请参考 如何创建 Tablestore 数据表

c4.png

  1. 创建 his_auto_moile 表。保存车辆状态数据时序信息。

c5.png

准备数据:car-ots-demo(可选步骤)

本文中测试场景中模拟创建了 auto_mobile、his_auto_mobile 两张表,若需要同步其他业务表,请忽略此步骤。

  1. 以linux为例,运行命令 shell tar -zxvf car-ots-demo.tar.gz 解压,解压后目录中包含 application.yml 和 test-1.0-SNAPSHOT.jar。
  2. 配置 application.yml 文件。

    1. server.port:压测项目运行端口号。如:8082
    2. spring.datasource.jdbc:MySQL数据源地址,如:rm-bp13dz7yedd2d9zyr.mysql.rds.aliyuncs.com。
    3. spring.datasource.user: MySQL 数据源账号。
    4. spring.datasource.password: MySQL 数据源密码。
  3. 运行命令 shell nohup java -jar -Dspring.config.location=application.yml test-1.0-SNAPSHOT.jar& 启动压测程序。
  4. 运行命令 shell curl "localhost:8082/car/prepareTable" -X POST 创建MySQL表。

MySQL 实时同步 Tablestore

可以通过两种方案来将 MySQL 数据实时同步到 Tablestore 中。

  1. 数据传输服务 DTS 同步

关于 DTS 迁移方案可参考文档 MySQL 同步至 Tablestore。

  1. Canal 工具同步。Canal 是阿里巴巴开源 CDC 工具,可基于解析 MySQL Binlog 将增量数据传输到下游,表格存储团队实现了 TablestoreAdapter 用于写入数据到 Tablestore。
    a. 下载同步工具 Canal。运行命令 shell unzip canal.zip 解压压缩包。
    b. 配置 deployer 和 canal-adapter。
    c. 启动 deployer 和 canal-adapter。

说明:Canal 同步任务配置参数不在本文章中介绍,关于同步任务的配置项、启动步骤说明。请参考 CarmobileExample中的 READ.ME。

MySQL 测试数据写入

数据压入 jar 包 car-ots-demo 中已提供了测试数据写入接口,运行命令 shell curl "localhost:8082/car/press?carNum=10&point=10" -X POST 即可。其中 carNum 表示车辆数,point 表示车辆状态记录次数。本文测试场景carNum=1000000, point=10。

  1. 运行写入命令:

c6.png

  1. 写入MySQL日志(car-ots-demo/logs/example.log):

c7.png

  1. Canel数据同步日志(canal-adapter/logs/adapter/adapter.log):

c8.png

  1. 之前已经建立了同步链路,这个时候增量数据会自动同步到 Tablestore:

c9.png

Tablestore 创建多元索引

登录表格存储控制台,进入索引管理页面,点击创建多元索引。多元索引创建后,数据表中的存量和增量数据将自动同步到索引中。更多关于创建多元索引的介绍请参考 创建和使用多元索引
c10.png

c11.png

Tablestore 查询功能展示

  • 案例一。查询油量小于 120.0 并且胎压小于 2.3 的车辆 ID,取 3 条记录。

查询代码:

//查询油量小于 120.0 并且胎压小于 2.3 的车辆ID,取 3 条记录
public static void searchDemo01(SyncClient client){
        SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("auto_mobile")
                .indexName("auto_mobile_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.bool()
                                .must(QueryBuilders.range("oil").lessThan(120.0))
                                .must(QueryBuilders.range("typepressure").lessThan(2.3))
                        )
                        .limit(3)
                        .build())
                .addColumnsToGet(Arrays.asList("car_ID"))
                .build();
        long sys1 = System.currentTimeMillis();
        SearchResponse searchResponse = client.search(searchRequest);
        long sys2 = System.currentTimeMillis();
        System.out.println("cost time: " + (sys2-sys1) + "ms");
        for(Row row : searchResponse.getRows()){
            System.out.println(row);
        }
        client.shutdown();
    }

结果展示:

cost time: 12ms
[PrimaryKey:]car_md5ID:0000104cd168386a335ba6bf6e32219d, car_ID:848775
[Columns:]
[PrimaryKey:]car_md5ID:000093856b4e947511870f3e10464129, car_ID:646434
[Columns:]
[PrimaryKey:]car_md5ID:0001181bf1ad8f82dcf59c7c18343bd5, car_ID:752608
[Columns:]

SQL查询:

select * from auto_mobile where oil < 120.0 and typepressure < 2.3 limit 3;

SQL查询结果:
c12.png

  1. 案例二。统计距离 car_md5ID = '0000104cd168386a335ba6bf6e32219d' 五公里内的车辆数。

查询代码:

//统计距离car_md5ID='0000104cd168386a335ba6bf6e32219d'五公里内的车辆数。
public static void searchDemo02(SyncClient client){
        SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("auto_mobile")
                .indexName("auto_mobile_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.geoDistance("location")
                                .centerPoint("30.968772887652293,120.94512114660778")
                                .distanceInMeter(5000)
                        )
                        .getTotalCount(true)
                        .build())
                .build();
        long sys1 = System.currentTimeMillis();
        SearchResponse searchResponse = client.search(searchRequest);
        long sys2 = System.currentTimeMillis();
        System.out.println("cost time: " + (sys2-sys1) + "ms");
        System.out.println("车辆数:" + searchResponse.getTotalCount());
        client.shutdown();
    }

结果:

cost time: 18ms
车辆数:6026
  1. 案例三。查询所有车辆,统计车辆数。并按照里程段分组。

查询代码:

//查询所有车辆,统计车辆数。并按照里程段分组。
public static void searchDemo03(SyncClient client){
        SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("auto_mobile")
                .indexName("auto_mobile_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.matchAll())
                        .addGroupBy(GroupByBuilders.groupByRange("groupByMileage","mileage")
                                .addRange(Double.MIN_VALUE,2000)
                                .addRange(2000,4000)
                                .addRange(6000,8000)
                                .addRange(8000,Double.MAX_VALUE)
                        )
                        .build())
                .build();
        long sys1 = System.currentTimeMillis();
        SearchResponse searchResponse = client.search(searchRequest);
        long sys2 = System.currentTimeMillis();
        System.out.println("cost time: " + (sys2-sys1) + "ms");
        GroupByRangeResult groupByRangeResult =  searchResponse.getGroupByResults().getAsGroupByRangeResult("groupByMileage");
        for(GroupByRangeResultItem groupByRangeResultItem : groupByRangeResult.getGroupByRangeResultItems()){
            System.out.println("["+groupByRangeResultItem.getFrom()+","+groupByRangeResultItem.getTo()+"] : "+groupByRangeResultItem.getRowCount());
        }
        client.shutdown();
    }

结果展示:

cost time: 98ms
[-Infinity,2000.0] : 199192
[2000.0,4000.0] : 200569
[4000.0,6000.0] : 199379
[6000.0,8000.0] : 200708
[8000.0,Infinity] : 200152

最后

表格存储 Tablestore 中多元索引功能可支持在大规模数据场景下实现毫秒级检索,以及轻量级分析的能力。通过表格存储控制台一键索引,自动完成全增量数据同步,省去了索引集群运维的烦恼。通过多元索引的范围查询、多条件组合查询、分组聚合等能力,实现了根据车辆多种状态检索聚合的功能。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
3月前
|
供应链 NoSQL 物联网
链接全球数十亿台设备!物联网行业如何应对数据管理、实时分析和供应链优化的挑战?
物联网已成为面向未来的解决方案的关键组成部分,且其所蕴含的巨大经济价值潜力有待挖掘
1496 0
链接全球数十亿台设备!物联网行业如何应对数据管理、实时分析和供应链优化的挑战?
|
3月前
|
存储 数据采集 监控
智慧工地整体方案,实现现场各类工况数据采集、存储、分析与应用
“智慧工地整体方案”以智慧工地物联网云平台为核心,基于智慧工地物联网云平台与现场多个子系统的互联,实现现场各类工况数据采集、存储、分析与应用。通过接入智慧工地物联网云平台的多个子系统板块,根据现场管理实际需求灵活组合,实现一体化、模块化、智能化、网络化的施工现场过程全面感知、协同工作、智能分析、风险预控、知识共享、互联互通等业务,全面满足建筑施工企业精细化管理的业务需求,智能化地辅助建筑施工企业进行科学决策,促进施工企业监管水平的全面提高。
|
消息中间件 前端开发 Java
实时即未来,车联网项目之车辆驾驶行为分析【五】
单次行驶里程区间分布、单次行程消耗soc区间分布、最大里程分布、充电行程占比、平均行驶里程分布、周行驶里程分布、最大行驶里程分段统计、常用行驶里程、全国-每日平均行驶里程(近4周)、全国-单车日均行驶里程分布(近一年)、各车系单次最大行驶里程分布、不同里程范围内车辆占比情况。
318 0
实时即未来,车联网项目之车辆驾驶行为分析【五】
|
JSON 关系型数据库 MySQL
实时即未来,车联网项目之电子围栏分析【六】
翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
612 0
实时即未来,车联网项目之电子围栏分析【六】
|
存储 SQL 关系型数据库
海量物联网设备元数据存储引擎选型和演进
# 前言 之前参加DataFun社区举办的线上活动(2022.3.26 DataFunSummit大数据存储架构峰会),作为讲师分享了一站式物联网存储产品的架构设计的主题,本文是当天的演讲稿整理,在ATA上面也记录一下。感谢DataFun社区和整理编辑李瑶。 # 演讲内容 ## 导读 本文将结合一个常见的物联网场景分享其存储引擎架构和设计。通过对团队在过去一段时间承接的一些设备元数据案例进行总
816 12
海量物联网设备元数据存储引擎选型和演进
|
人工智能 监控 自动驾驶
物联网对 5G 的指标要求 | 带你读《5G时代的承载网》之九
未来移动互联网主要面向以人为主体的通信,注重提供更好的用户体验,进一步改变人类社会信息交互方式,为用户提供增强现实、虚拟现实、超高清视频、云端办公、休闲 娱乐等更加身临其境的极致业务体验。
物联网对 5G 的指标要求  | 带你读《5G时代的承载网》之九
|
存储 NoSQL 关系型数据库
实时即未来,车联网项目之远程诊断实时故障分析【七】
geohash 就是将地图上位置(经纬度)转换成偶数位是经度、奇数数是维度,新的二进制字节,转换成字符串,用字符串代表某一个地理位置。
457 0
|
存储 分布式计算 算法
海量电力设备监测数据的存储和特征分析
海量电力设备监测数据的存储和特征分析
157 0
|
存储 SQL 弹性计算
深度|物联网海量时序数据存储有哪些挑战?
随着 IoT 技术的快速发展,物联网设备产生的数据呈爆炸式增长,数据的总量(Volume)、数据类型越来越多(Variety)、访问速度要求越来越快(Velocity)、对数据价值(Value)的挖掘越来越重视。
782 0
深度|物联网海量时序数据存储有哪些挑战?