作者:李欣
背景
随着通信技术、计算机技术的不断发展,移动通信正在从人与人(H2H)向人与物(H2M)以及物与物(M2M)的方向发展,“万物互联”的概念正在逐步覆盖到各行各业中,例如智能家居、智能农业、智能交通、智能物流等领域。目前,车联网技术已经先行一步,在行车安全、交通管理、生活服务等方面得到充分应用。
车联网技术包括了车辆终端、云端、无线通信等方面。车辆终端实时产生大量车辆状态数据,例如里程、油量、胎压、坐标、温度、速度和操作等等。通过对这些数据的检索、分析,可以在环境感知、驾驶决策、行驶安全、出行规划等各个方面发挥作用。但是在车联网场景下,技术实现上也会面临几个难题:
● 数据写入并发度高:数据写入并发数取决于行驶中的车辆数。
● 数据检索延迟敏感:若数据检索出现延迟,则无法感知到最新的车辆状态信息,可能出现无法预料的后果。
● 数据规模庞大:每个车辆终端会记录几十个甚至更多维度的状态数据,周期性保存车辆的状态信息获取车辆状态时序数据。数据规模轻松达到亿行,甚至百亿行以上。
方案
MySQL + LogStash + Elasticsearch 方案
MySQL 自身具备强事务,可作为车辆状态数据存储库。数据量在一定级别内时,可满足业务的写需求。然而 MySQL 中的多列索引需要满足前缀匹配才能发挥效果,当查询条件不符合多列索引匹配规则时,可能会退化成全表扫描。这样一个慢 SQL 会增加服务端负载,使得 MySQL 服务性能降低。因此很容易想到需要一个新的索引引擎来做检索类查询的分流,将一些复杂的查询、分析类需求放到索引引擎中来完成。
Elasticsearch 是一款强大开源搜索和分析引擎,支持丰富的索引类型,通过引入 Elasticsearch 来提供检索、分析的能力,可以有效降低 MySQL 服务端负载。MySQL 数据通过 LogStash 或 Canal 工具同步到 Elasticsearch中。
方案的整体架构如下图:
- MySQL:作为数据存储主库,需能够支持高并发的状态数据变更写入,支持平台侧基于主键查询的能力。
- LogStash:作为 MySQL 与 Elasticsearch 之间的桥梁,负责将 mysql binlog 转换成 Elasticsearch 的数据结构并写入Elasticsearch。
- Elasticsearch:作为存储系统索引引擎,负责承载检索、分析聚合类的请求流量。
MySQL + Elasticsearch的方案很好地解决了检索、分析类的业务需求。不过整个存储系统仍然存在一些问题:
- Elasticsearch 集群搭建与运维复杂度高,一旦出现问题非常难排查。
- Elasticsearch 成本较高。需要根据业务规模预测机器数,无法做到弹性扩容。
MySQL + Canal + Tablestore 方案
Tablestore 是阿里云自研的一款多模型结构化数据存储,能够支持 PB 级存储、千万 TPS 写入以及强大的数据检索、分析能力。Tablestore 具备两种存储引擎,数据表基于 LSM-tree 架构能支持高并发低延迟的读写能力,多元索引基于倒排索引、空间索引能支持丰富的数据检索方式,例如多列组合查询、模糊查询、匹配查询、范围查询等等。
在车辆网场景中,Tablestore 的数据表可支持千万级的并发读写,可实现车辆状态实时更新。多元索引引擎提供了百亿行数据毫秒级的检索的能力,可实现根据车辆多种状态查询。Tablestore 服务端实现了数据表与多元索引之间自动数据同步,保证了两者的最终一致性。Tablestore 方案实现车联网存储系统的架构图如下:
下面会重点给大家介绍下如何实现:MySQL + Canal + Tablestore 的方案。
实现
开通 Tablestore 服务和创建表
- 创建按量模式实例,填写实例名。详情请参考 如何开通表格存储服务、如何创建 Tablestore 实例。
- 创建 auto_mobile 表。保存车辆状态数据信息。详情请参考 如何创建 Tablestore 数据表。
- 创建 his_auto_moile 表。保存车辆状态数据时序信息。
准备数据:car-ots-demo(可选步骤)
本文中测试场景中模拟创建了 auto_mobile、his_auto_mobile 两张表,若需要同步其他业务表,请忽略此步骤。
- 以linux为例,运行命令
shell tar -zxvf car-ots-demo.tar.gz
解压,解压后目录中包含 application.yml 和 test-1.0-SNAPSHOT.jar。 配置 application.yml 文件。
- server.port:压测项目运行端口号。如:8082
- spring.datasource.jdbc:MySQL数据源地址,如:rm-bp13dz7yedd2d9zyr.mysql.rds.aliyuncs.com。
- spring.datasource.user: MySQL 数据源账号。
- spring.datasource.password: MySQL 数据源密码。
- 运行命令
shell nohup java -jar -Dspring.config.location=application.yml test-1.0-SNAPSHOT.jar&
启动压测程序。 - 运行命令
shell curl "localhost:8082/car/prepareTable" -X POST
创建MySQL表。
MySQL 实时同步 Tablestore
可以通过两种方案来将 MySQL 数据实时同步到 Tablestore 中。
- 数据传输服务 DTS 同步
关于 DTS 迁移方案可参考文档 MySQL 同步至 Tablestore。
- Canal 工具同步。Canal 是阿里巴巴开源 CDC 工具,可基于解析 MySQL Binlog 将增量数据传输到下游,表格存储团队实现了 TablestoreAdapter 用于写入数据到 Tablestore。
a. 下载同步工具 Canal。运行命令shell unzip canal.zip
解压压缩包。
b. 配置 deployer 和 canal-adapter。
c. 启动 deployer 和 canal-adapter。
说明:Canal 同步任务配置参数不在本文章中介绍,关于同步任务的配置项、启动步骤说明。请参考 CarmobileExample中的 READ.ME。
MySQL 测试数据写入
数据压入 jar 包 car-ots-demo 中已提供了测试数据写入接口,运行命令 shell curl "localhost:8082/car/press?carNum=10&point=10" -X POST
即可。其中 carNum 表示车辆数,point 表示车辆状态记录次数。本文测试场景carNum=1000000, point=10。
- 运行写入命令:
- 写入MySQL日志(car-ots-demo/logs/example.log):
- Canel数据同步日志(canal-adapter/logs/adapter/adapter.log):
- 之前已经建立了同步链路,这个时候增量数据会自动同步到 Tablestore:
Tablestore 创建多元索引
登录表格存储控制台,进入索引管理页面,点击创建多元索引。多元索引创建后,数据表中的存量和增量数据将自动同步到索引中。更多关于创建多元索引的介绍请参考 创建和使用多元索引。
Tablestore 查询功能展示
- 案例一。查询油量小于 120.0 并且胎压小于 2.3 的车辆 ID,取 3 条记录。
查询代码:
//查询油量小于 120.0 并且胎压小于 2.3 的车辆ID,取 3 条记录
public static void searchDemo01(SyncClient client){
SearchRequest searchRequest = SearchRequest.newBuilder()
.tableName("auto_mobile")
.indexName("auto_mobile_index")
.searchQuery(SearchQuery.newBuilder()
.query(QueryBuilders.bool()
.must(QueryBuilders.range("oil").lessThan(120.0))
.must(QueryBuilders.range("typepressure").lessThan(2.3))
)
.limit(3)
.build())
.addColumnsToGet(Arrays.asList("car_ID"))
.build();
long sys1 = System.currentTimeMillis();
SearchResponse searchResponse = client.search(searchRequest);
long sys2 = System.currentTimeMillis();
System.out.println("cost time: " + (sys2-sys1) + "ms");
for(Row row : searchResponse.getRows()){
System.out.println(row);
}
client.shutdown();
}
结果展示:
cost time: 12ms
[PrimaryKey:]car_md5ID:0000104cd168386a335ba6bf6e32219d, car_ID:848775
[Columns:]
[PrimaryKey:]car_md5ID:000093856b4e947511870f3e10464129, car_ID:646434
[Columns:]
[PrimaryKey:]car_md5ID:0001181bf1ad8f82dcf59c7c18343bd5, car_ID:752608
[Columns:]
SQL查询:
select * from auto_mobile where oil < 120.0 and typepressure < 2.3 limit 3;
SQL查询结果:
- 案例二。统计距离 car_md5ID = '0000104cd168386a335ba6bf6e32219d' 五公里内的车辆数。
查询代码:
//统计距离car_md5ID='0000104cd168386a335ba6bf6e32219d'五公里内的车辆数。
public static void searchDemo02(SyncClient client){
SearchRequest searchRequest = SearchRequest.newBuilder()
.tableName("auto_mobile")
.indexName("auto_mobile_index")
.searchQuery(SearchQuery.newBuilder()
.query(QueryBuilders.geoDistance("location")
.centerPoint("30.968772887652293,120.94512114660778")
.distanceInMeter(5000)
)
.getTotalCount(true)
.build())
.build();
long sys1 = System.currentTimeMillis();
SearchResponse searchResponse = client.search(searchRequest);
long sys2 = System.currentTimeMillis();
System.out.println("cost time: " + (sys2-sys1) + "ms");
System.out.println("车辆数:" + searchResponse.getTotalCount());
client.shutdown();
}
结果:
cost time: 18ms
车辆数:6026
- 案例三。查询所有车辆,统计车辆数。并按照里程段分组。
查询代码:
//查询所有车辆,统计车辆数。并按照里程段分组。
public static void searchDemo03(SyncClient client){
SearchRequest searchRequest = SearchRequest.newBuilder()
.tableName("auto_mobile")
.indexName("auto_mobile_index")
.searchQuery(SearchQuery.newBuilder()
.query(QueryBuilders.matchAll())
.addGroupBy(GroupByBuilders.groupByRange("groupByMileage","mileage")
.addRange(Double.MIN_VALUE,2000)
.addRange(2000,4000)
.addRange(6000,8000)
.addRange(8000,Double.MAX_VALUE)
)
.build())
.build();
long sys1 = System.currentTimeMillis();
SearchResponse searchResponse = client.search(searchRequest);
long sys2 = System.currentTimeMillis();
System.out.println("cost time: " + (sys2-sys1) + "ms");
GroupByRangeResult groupByRangeResult = searchResponse.getGroupByResults().getAsGroupByRangeResult("groupByMileage");
for(GroupByRangeResultItem groupByRangeResultItem : groupByRangeResult.getGroupByRangeResultItems()){
System.out.println("["+groupByRangeResultItem.getFrom()+","+groupByRangeResultItem.getTo()+"] : "+groupByRangeResultItem.getRowCount());
}
client.shutdown();
}
结果展示:
cost time: 98ms
[-Infinity,2000.0] : 199192
[2000.0,4000.0] : 200569
[4000.0,6000.0] : 199379
[6000.0,8000.0] : 200708
[8000.0,Infinity] : 200152
最后
表格存储 Tablestore 中多元索引功能可支持在大规模数据场景下实现毫秒级检索,以及轻量级分析的能力。通过表格存储控制台一键索引,自动完成全增量数据同步,省去了索引集群运维的烦恼。通过多元索引的范围查询、多条件组合查询、分组聚合等能力,实现了根据车辆多种状态检索聚合的功能。