车联网场景下海量车辆状态数据存储实践

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 随着通信技术、计算机技术的不断发展,移动通信正在从人与人(H2H)向人与物(H2M)以及物与物(M2M)的方向发展,“万物互联”的概念正在逐步覆盖到各行各业中,例如智能家居、智能农业、智能交通、智能物流等领域。目前,车联网技术已经先行一步,在行车安全、交通管理、生活服务等方面得到充分应用。车联网技术包括了车辆终端、云端、无线通信等方面。车辆终端实时产生大量车辆状态数
作者:李欣

背景

随着通信技术、计算机技术的不断发展,移动通信正在从人与人(H2H)向人与物(H2M)以及物与物(M2M)的方向发展,“万物互联”的概念正在逐步覆盖到各行各业中,例如智能家居、智能农业、智能交通、智能物流等领域。目前,车联网技术已经先行一步,在行车安全、交通管理、生活服务等方面得到充分应用。

车联网技术包括了车辆终端、云端、无线通信等方面。车辆终端实时产生大量车辆状态数据,例如里程、油量、胎压、坐标、温度、速度和操作等等。通过对这些数据的检索、分析,可以在环境感知、驾驶决策、行驶安全、出行规划等各个方面发挥作用。但是在车联网场景下,技术实现上也会面临几个难题:
● 数据写入并发度高:数据写入并发数取决于行驶中的车辆数。
● 数据检索延迟敏感:若数据检索出现延迟,则无法感知到最新的车辆状态信息,可能出现无法预料的后果。
● 数据规模庞大:每个车辆终端会记录几十个甚至更多维度的状态数据,周期性保存车辆的状态信息获取车辆状态时序数据。数据规模轻松达到亿行,甚至百亿行以上。

方案

MySQL + LogStash + Elasticsearch 方案

MySQL 自身具备强事务,可作为车辆状态数据存储库。数据量在一定级别内时,可满足业务的写需求。然而 MySQL 中的多列索引需要满足前缀匹配才能发挥效果,当查询条件不符合多列索引匹配规则时,可能会退化成全表扫描。这样一个慢 SQL 会增加服务端负载,使得 MySQL 服务性能降低。因此很容易想到需要一个新的索引引擎来做检索类查询的分流,将一些复杂的查询、分析类需求放到索引引擎中来完成。

Elasticsearch 是一款强大开源搜索和分析引擎,支持丰富的索引类型,通过引入 Elasticsearch 来提供检索、分析的能力,可以有效降低 MySQL 服务端负载。MySQL 数据通过 LogStash 或 Canal 工具同步到 Elasticsearch中。

方案的整体架构如下图:
c1.png

  1. MySQL:作为数据存储主库,需能够支持高并发的状态数据变更写入,支持平台侧基于主键查询的能力。
  2. LogStash:作为 MySQL 与 Elasticsearch 之间的桥梁,负责将 mysql binlog 转换成 Elasticsearch 的数据结构并写入Elasticsearch。
  3. Elasticsearch:作为存储系统索引引擎,负责承载检索、分析聚合类的请求流量。

MySQL + Elasticsearch的方案很好地解决了检索、分析类的业务需求。不过整个存储系统仍然存在一些问题:

  1. Elasticsearch 集群搭建与运维复杂度高,一旦出现问题非常难排查。
  2. Elasticsearch 成本较高。需要根据业务规模预测机器数,无法做到弹性扩容。

MySQL + Canal + Tablestore 方案

Tablestore 是阿里云自研的一款多模型结构化数据存储,能够支持 PB 级存储、千万 TPS 写入以及强大的数据检索、分析能力。Tablestore 具备两种存储引擎,数据表基于 LSM-tree 架构能支持高并发低延迟的读写能力,多元索引基于倒排索引、空间索引能支持丰富的数据检索方式,例如多列组合查询、模糊查询、匹配查询、范围查询等等。

在车辆网场景中,Tablestore 的数据表可支持千万级的并发读写,可实现车辆状态实时更新。多元索引引擎提供了百亿行数据毫秒级的检索的能力,可实现根据车辆多种状态查询。Tablestore 服务端实现了数据表与多元索引之间自动数据同步,保证了两者的最终一致性。Tablestore 方案实现车联网存储系统的架构图如下:

c2.png

下面会重点给大家介绍下如何实现:MySQL + Canal + Tablestore 的方案。

实现

开通 Tablestore 服务和创建表

  1. 创建按量模式实例,填写实例名。详情请参考 如何开通表格存储服务、如何创建 Tablestore 实例

c3.png

  1. 创建 auto_mobile 表。保存车辆状态数据信息。详情请参考 如何创建 Tablestore 数据表

c4.png

  1. 创建 his_auto_moile 表。保存车辆状态数据时序信息。

c5.png

准备数据:car-ots-demo(可选步骤)

本文中测试场景中模拟创建了 auto_mobile、his_auto_mobile 两张表,若需要同步其他业务表,请忽略此步骤。

  1. 以linux为例,运行命令 shell tar -zxvf car-ots-demo.tar.gz 解压,解压后目录中包含 application.yml 和 test-1.0-SNAPSHOT.jar。
  2. 配置 application.yml 文件。

    1. server.port:压测项目运行端口号。如:8082
    2. spring.datasource.jdbc:MySQL数据源地址,如:rm-bp13dz7yedd2d9zyr.mysql.rds.aliyuncs.com。
    3. spring.datasource.user: MySQL 数据源账号。
    4. spring.datasource.password: MySQL 数据源密码。
  3. 运行命令 shell nohup java -jar -Dspring.config.location=application.yml test-1.0-SNAPSHOT.jar& 启动压测程序。
  4. 运行命令 shell curl "localhost:8082/car/prepareTable" -X POST 创建MySQL表。

MySQL 实时同步 Tablestore

可以通过两种方案来将 MySQL 数据实时同步到 Tablestore 中。

  1. 数据传输服务 DTS 同步

关于 DTS 迁移方案可参考文档 MySQL 同步至 Tablestore。

  1. Canal 工具同步。Canal 是阿里巴巴开源 CDC 工具,可基于解析 MySQL Binlog 将增量数据传输到下游,表格存储团队实现了 TablestoreAdapter 用于写入数据到 Tablestore。
    a. 下载同步工具 Canal。运行命令 shell unzip canal.zip 解压压缩包。
    b. 配置 deployer 和 canal-adapter。
    c. 启动 deployer 和 canal-adapter。

说明:Canal 同步任务配置参数不在本文章中介绍,关于同步任务的配置项、启动步骤说明。请参考 CarmobileExample中的 READ.ME。

MySQL 测试数据写入

数据压入 jar 包 car-ots-demo 中已提供了测试数据写入接口,运行命令 shell curl "localhost:8082/car/press?carNum=10&point=10" -X POST 即可。其中 carNum 表示车辆数,point 表示车辆状态记录次数。本文测试场景carNum=1000000, point=10。

  1. 运行写入命令:

c6.png

  1. 写入MySQL日志(car-ots-demo/logs/example.log):

c7.png

  1. Canel数据同步日志(canal-adapter/logs/adapter/adapter.log):

c8.png

  1. 之前已经建立了同步链路,这个时候增量数据会自动同步到 Tablestore:

c9.png

Tablestore 创建多元索引

登录表格存储控制台,进入索引管理页面,点击创建多元索引。多元索引创建后,数据表中的存量和增量数据将自动同步到索引中。更多关于创建多元索引的介绍请参考 创建和使用多元索引
c10.png

c11.png

Tablestore 查询功能展示

  • 案例一。查询油量小于 120.0 并且胎压小于 2.3 的车辆 ID,取 3 条记录。

查询代码:

//查询油量小于 120.0 并且胎压小于 2.3 的车辆ID,取 3 条记录
public static void searchDemo01(SyncClient client){
        SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("auto_mobile")
                .indexName("auto_mobile_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.bool()
                                .must(QueryBuilders.range("oil").lessThan(120.0))
                                .must(QueryBuilders.range("typepressure").lessThan(2.3))
                        )
                        .limit(3)
                        .build())
                .addColumnsToGet(Arrays.asList("car_ID"))
                .build();
        long sys1 = System.currentTimeMillis();
        SearchResponse searchResponse = client.search(searchRequest);
        long sys2 = System.currentTimeMillis();
        System.out.println("cost time: " + (sys2-sys1) + "ms");
        for(Row row : searchResponse.getRows()){
            System.out.println(row);
        }
        client.shutdown();
    }

结果展示:

cost time: 12ms
[PrimaryKey:]car_md5ID:0000104cd168386a335ba6bf6e32219d, car_ID:848775
[Columns:]
[PrimaryKey:]car_md5ID:000093856b4e947511870f3e10464129, car_ID:646434
[Columns:]
[PrimaryKey:]car_md5ID:0001181bf1ad8f82dcf59c7c18343bd5, car_ID:752608
[Columns:]

SQL查询:

select * from auto_mobile where oil < 120.0 and typepressure < 2.3 limit 3;

SQL查询结果:
c12.png

  1. 案例二。统计距离 car_md5ID = '0000104cd168386a335ba6bf6e32219d' 五公里内的车辆数。

查询代码:

//统计距离car_md5ID='0000104cd168386a335ba6bf6e32219d'五公里内的车辆数。
public static void searchDemo02(SyncClient client){
        SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("auto_mobile")
                .indexName("auto_mobile_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.geoDistance("location")
                                .centerPoint("30.968772887652293,120.94512114660778")
                                .distanceInMeter(5000)
                        )
                        .getTotalCount(true)
                        .build())
                .build();
        long sys1 = System.currentTimeMillis();
        SearchResponse searchResponse = client.search(searchRequest);
        long sys2 = System.currentTimeMillis();
        System.out.println("cost time: " + (sys2-sys1) + "ms");
        System.out.println("车辆数:" + searchResponse.getTotalCount());
        client.shutdown();
    }

结果:

cost time: 18ms
车辆数:6026
  1. 案例三。查询所有车辆,统计车辆数。并按照里程段分组。

查询代码:

//查询所有车辆,统计车辆数。并按照里程段分组。
public static void searchDemo03(SyncClient client){
        SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("auto_mobile")
                .indexName("auto_mobile_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.matchAll())
                        .addGroupBy(GroupByBuilders.groupByRange("groupByMileage","mileage")
                                .addRange(Double.MIN_VALUE,2000)
                                .addRange(2000,4000)
                                .addRange(6000,8000)
                                .addRange(8000,Double.MAX_VALUE)
                        )
                        .build())
                .build();
        long sys1 = System.currentTimeMillis();
        SearchResponse searchResponse = client.search(searchRequest);
        long sys2 = System.currentTimeMillis();
        System.out.println("cost time: " + (sys2-sys1) + "ms");
        GroupByRangeResult groupByRangeResult =  searchResponse.getGroupByResults().getAsGroupByRangeResult("groupByMileage");
        for(GroupByRangeResultItem groupByRangeResultItem : groupByRangeResult.getGroupByRangeResultItems()){
            System.out.println("["+groupByRangeResultItem.getFrom()+","+groupByRangeResultItem.getTo()+"] : "+groupByRangeResultItem.getRowCount());
        }
        client.shutdown();
    }

结果展示:

cost time: 98ms
[-Infinity,2000.0] : 199192
[2000.0,4000.0] : 200569
[4000.0,6000.0] : 199379
[6000.0,8000.0] : 200708
[8000.0,Infinity] : 200152

最后

表格存储 Tablestore 中多元索引功能可支持在大规模数据场景下实现毫秒级检索,以及轻量级分析的能力。通过表格存储控制台一键索引,自动完成全增量数据同步,省去了索引集群运维的烦恼。通过多元索引的范围查询、多条件组合查询、分组聚合等能力,实现了根据车辆多种状态检索聚合的功能。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
8月前
|
数据采集 监控 数据挖掘
如何更有价值采集电商数据,高效分析数据?
大数据,就是在一定时间范围内用常规工具软件对历史数据捕捉、处理,加以分析,进而改善决策和管理。在大数据时代,企业必须用大数据分析方法来做电商。
|
存储 NoSQL 应用服务中间件
如何高效存储海量GPS数据
GPS数据使用越来越广,但如何高性能存储海量GPS数据仍然具有挑战,本文会介绍一种非常适合存储GPS数据的存储系统:阿里云NoSQL数据库TableStore,同时会介绍多个不同场景的技术方案。
23166 0
|
3月前
|
传感器 存储 监控
TDengine 签约北微传感,实现海量传感器数据的秒级响应
在当今物联网(IoT)快速发展的背景下,传感器技术已成为各个行业数字化转型的关键组成部分。随着设备数量的激增和数据生成速度的加快,如何高效地管理和分析这些数据,成为企业实现智能化运营的重要挑战。
59 0
|
8月前
|
存储 边缘计算 物联网
未来数据存储技术发展趋势分析
随着数字化时代的到来,数据量不断增长,传统存储技术面临挑战。本文探讨未来数据存储技术的发展趋势,包括分布式存储、云存储、边缘计算等新兴技术的应用前景。
|
8月前
|
NoSQL 安全 物联网
检索时间减少83%!部署MongoDB后,通用电气医疗集团狠狠提升了物联网设备的利用效率!
作为医疗技术领域的全球领导者,通用电气医疗集团选择了 MongoDB由其管理旗下物联网设备,从部署(生命周期初期,即 BoL)到报废(生命周期结束,即 EoL)的整个生命周期
1951 3
检索时间减少83%!部署MongoDB后,通用电气医疗集团狠狠提升了物联网设备的利用效率!
|
8月前
|
供应链 NoSQL 物联网
链接全球数十亿台设备!物联网行业如何应对数据管理、实时分析和供应链优化的挑战?
物联网已成为面向未来的解决方案的关键组成部分,且其所蕴含的巨大经济价值潜力有待挖掘
1552 0
链接全球数十亿台设备!物联网行业如何应对数据管理、实时分析和供应链优化的挑战?
|
8月前
|
传感器 算法 安全
虹软智能驾驶技术:实现低成本高性能L2+级别ADAS功能
虹软正在积极推动面向舱外的智能驾驶视觉解决方案,以满足不断增长的智能汽车市场需求。随着虹软ADAS域控解决方案的逐渐成熟,将更进一步增强驾驶的安全性、便捷性和智能化水平,为智能汽车的未来发展注入新的动力。
|
8月前
|
存储 数据采集 监控
智慧工地整体方案,实现现场各类工况数据采集、存储、分析与应用
“智慧工地整体方案”以智慧工地物联网云平台为核心,基于智慧工地物联网云平台与现场多个子系统的互联,实现现场各类工况数据采集、存储、分析与应用。通过接入智慧工地物联网云平台的多个子系统板块,根据现场管理实际需求灵活组合,实现一体化、模块化、智能化、网络化的施工现场过程全面感知、协同工作、智能分析、风险预控、知识共享、互联互通等业务,全面满足建筑施工企业精细化管理的业务需求,智能化地辅助建筑施工企业进行科学决策,促进施工企业监管水平的全面提高。
395 0
|
存储
《交易风控小微金融业务跨平台数据共享与处理数据的海量存储与多种离线计算处理》电子版地址
交易风控小微金融业务跨平台数据共享与处理数据的海量存储与多种离线计算处理
69 0
《交易风控小微金融业务跨平台数据共享与处理数据的海量存储与多种离线计算处理》电子版地址
|
消息中间件 前端开发 Java
实时即未来,车联网项目之车辆驾驶行为分析【五】
单次行驶里程区间分布、单次行程消耗soc区间分布、最大里程分布、充电行程占比、平均行驶里程分布、周行驶里程分布、最大行驶里程分段统计、常用行驶里程、全国-每日平均行驶里程(近4周)、全国-单车日均行驶里程分布(近一年)、各车系单次最大行驶里程分布、不同里程范围内车辆占比情况。
374 0
实时即未来,车联网项目之车辆驾驶行为分析【五】