前言
本地生活服务近年来越来越让我们的自身生活变得便捷。包括社区服务、生活配送、同城跑腿、餐饮、电影票务、休闲娱乐、智慧出现等。已经有大量的互联网企业在其中发现商机,帮忙用户改善原有的生活方式。而这所有的服务都是基于位置的服务(Location Based Services,LBS)。位置信息是提供本地差异化服务的关键信息。如何基于位置与时间信息来帮助更好的关联用户与本地服务是提供架构设计的核心。
原有架构
原有架构的问题:
- 系统扩展性差:基于 Hadoop 体系,存储计算一体化,无法根据计算与存储动态调整比例。需要机器级别来进行扩容。同时还需要花大量精力维护整体服务。
- 数据分析效率过慢:位置信息一般存储在类似 HBase 这样的架构中,但由于 HBase 本质来说是一个只支持简单查询的行存。一次分析请求往往需要扫描全表才能出结构。千亿规模下,需要小时级别才能出结果。
- 数据存储成本过高:历史数据的价值往往较低,单一的存储方案无法为历史数据的提供更好的成本节省方案。
本文将针对本地生活用户圈选的场景下,介绍如何通过 Tablestore 进行数据分析。
Tablestore 是什么
Tablestore 启发自 Google 的 Bigtable 论文,从 2009 年开始,在阿里云的飞天团队内,开始孵化。经过10年的锤炼,如今在集团、公有云和专有云上积累了各式各样的客户和场景。
Tablestore 是一款 Serverless 云原生存储引擎,Serverless 相比实例售卖类型的产品,在业务有波峰波谷时天生就有较大的优势,基于 Bigtable 的主存储采用行的方式进行存储,可以支撑单表亿级别的 QPS。下面列了一些 Tablestore 的核心特性:
Tablestore 除了有强大的主存储满足海量业务的实时读写外,基于主存储的分布式日志提供了完整的数据派生能力(详情参考),海量实时写入 Tablestore 的数据,可以实时订阅进行消费。这样就满足了我们的实时计算需求。
Lambda 架构中除了实时数据写入,实时计算之前,全量数据需要提供高性能扫描能力,Tablestore 采用行列混合,双引擎的架构,在主存储之外内部通过通道服务实时构建一个列存储,支撑 PB 级别数据的高吞吐扫描。同时在海量的数据场景下,我们相信数据是需要分层存储,所以在构建自身列存的同时,我们会帮助用户构建推送云上数据湖的链路,通过全托管的数据湖投递,降低用户的存储成本。
基于 Tablestore 的 Lambda 架构
Tablestore 在专注于打造一款极致性能和成本的存储引擎同时,更加关注完整的计算生态,伴随产品核心功能迭代的过程中,我们和阿里云的几大核心计算引擎做了完善的对接具体包括:
- MaxCompute 的对接,支持 MaxCompute 计算引擎通过外表的方式直读写 Tablestore
- EMR Spark 对接,支持流批源表读,流批结果表写,集团内第一款全 Connector 支持的 KV 存储引擎
- Blink 对接,支持流批源表读,流批结果表写,维表读,集团内第一款全 Connector 支持的 KV 存储引擎
- DLA 对接,支持 SQL 直接读写 Tablestore 的数据
- FC 对接,支持流式增量触发器
数据与圈选场景定义
前言中介绍了圈选场景的大致概念,在本章中定义一个较为简单的圈选场景,供本文的后续内容使用,具体如下:
- 数据:用户的数据规模约1亿条;数据格式有三列,其中包括id, location, locationTime,分别代表用户ID,用户发生行为时的地点(通过经纬度表示),以及用户发生行为的时间。
- 圈选场景:
a. 通过时间范围进行行为圈选用户
b. 通过时间范围+地理区域进行用户数统计
热数据分析
前提条件 - 开通 EMR 服务
- 已创建 E-MapReduce Hadoop 集群。具体操作,请参见创建集群。
创建集群时,请确保打开挂载公网开关,将集群挂载到公网,用于Shell远程登录服务器。
说明:本文使用Shell命令演示,如果需要使用 E-MapReduce 的图形化页面进行数据开发。具体操作,请参见数据开发。
通过时间圈选 - 主键查询
步骤一 创建 Tablestore 表
本文中所创建出来的表名为 workshop_location, 其中 locationTime、id 作为数据表的主键,分别代表用户ID,用户发生行为的时间,location 为数据表的预定义列,代表用户发生行为时的地点,数据示例如下图所示。
步骤二 创建 Tablestore 在 EMR 的外表
- 登录 EMR Header 服务器。
- 执行如下命令启动 spark-sql 命令行,用于 Spark 外表创建和后续的 SQL 实战操作。
其中 Spark 的资源参数为--num-executors 2 --executor-memory 4g --executor-cores 1,可以根据具体的集群配置进行自定义调整。 表示上传 jar 包的版本信息,请根据实际填写,例如 2.2.0-SNAPSHOT。
spark-sql --driver-class-path emr-datasources_shaded_2.11-<Version>.jar \
--jars emr-datasources_shaded_2.11-<Version>.jar \
--master yarn --num-executors 2 --executor-memory 2g --executor-cores 1
- 创建EMR外表workshop_location(对应表格存储的workshop_location表)。
DROP TABLE IF EXISTS workshop_location;
CREATE TABLE workshop_station
USING tablestore
OPTIONS(
endpoint="http://instance_name.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="instance_name",
table.name="workshop_location",
catalog='{"columns": {"locationTime": {"type": "long"}, "id": {"type": "long"},"location": {"type": "string"}}}'
);
参数说明:
参数 | 说明 |
---|---|
endpoint | 表格存储实例访问地址,EMR 集群中使用 VPC 地址 |
access.key.id | 阿里云账号的 AccessKey ID |
access.key.secret | 阿里云账号的 AccessKey Secret |
instance.name | 实例名称 |
table.name | 表格存储的数据表名称 |
catalog | 数据表的 Schema 定义。后续版本中会支持 DDL 上配置字段 Schema,到时本参数将不再必要 |
步骤三 数据分析
查询1:统计总数据条数
select count(1) from workshop_location;
查询结果:
查询2:统计 11.11 日 0:00-0:05 的行为个数
select count(1) from workshop_location
where locationTime >= 1605024000000 and locationTime <= 1605024300000;
查询结果:
通过地理信息圈选 - 非主键查询 & 时空数据查询
步骤一 创建 Tablestore 多元索引
Tablestore 多元索引的详细使用方式请参考 官方文档 - tablestore多元索引,本文中所创建出来的表名为 workshop_location_index, 其中 location 配置成地理位置。具体字段显示如下:
注意:- 地理位置在tablestore中需配置成string类型,格式为 "latitude,longitude",如"31.12,121.30"代表上海。
步骤二 创建 Tablestore 在 EMR 的外表
登录 EMR header 机器和启动 spark sql 的方式和前文相同,不再赘述。
创建使用多元索引的外表方式如下:
DROP TABLE IF EXISTS workshop_location_index;
create TABLE workshop_location_index
USING tablestore
OPTIONS(
endpoint="http://instance_name.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="instance_name",
table.name="workshop_location",
catalog='{"columns": {"locationTime": {"type": "long"}, "id": {"type": "long"},"location": {"type": "string"}}}',search.index.name="workshop_location_index"
);
多元索引相关参数说明:
参数 | 说明 |
---|---|
search.index.name | 表格存储数据表的多元索引名 |
步骤三 数据分析
查询1:统计距离 (50,50) 1 米的行为个数
select count(1) from workshop_location_index
where location = '{\"centerPoint\":\"50,50\", \"distanceInMeter\":1.0}';
查询结果:
查询2:列举 11.11 日 0:00-0:05 距离 (50,50) 1 米的用户列表
select id,location,locationTime from workshop_location_index
where location = '{\"centerPoint\":\"50,50\", \"distanceInMeter\":1.0}'
and (locationTime >= 1605024000000 and locationTime <= 1605024300000);
查询结果:
冷数据分析与归档
Tablestore 提供数据湖投递的能力,将主表数据同步到 OSS,以 parquet 文件的方式存储。利用数据湖投递可以实现如下场景需求:
- 冷热数据分层数据湖投递结合表格存储的数据生命周期功能,可以快速实现 OSS 低成本存储全量数据,表格存储提供热数据的低延迟查询和分析的需求。
- 全量数据备份数据湖投递可以自动将表格存储的全表数据投递到 OSS Bucket 中,作为备份归档数据。
- 大规模实时数据分析数据湖投递可以实时(每2分钟)投递增量的表格存储数据到 OSS,投递的数据支持按系统时间分区、Parquet 列存格式存储;再利用 OSS 的高读带宽和列存面向扫描场景优化实现高效实时数据分析。
- 加速 SQL 分析性能当表格存储数据未建立多元索引且查询条件中不包含主键列的过滤条件时,可以通过数据投递自动同步数据到 OSS,再利用 DLA+OSS 数据扫描实现 SQL 分析加速。
数据投递到 OSS
Tablestore 原生提供将主表数据投递到 OSS 的能力,详细文档请参考官方文档 - 数据湖投递。
本文中创建了任务 workshop_x,将数据投递到 OSS 上。
OSS上文件如下图所示:
用 DLA 分析
开通 DLA 服务的方式详见官方文档 - DLA。
具体步骤为:
- 配置元数据爬取。注意:配置完元数据爬取后,DLA 将自动创建外表,供后续分析使用。
- 进行数据分析:
用 EMR + JindoFS 分析
开通 EMR 与 JindoFS 服务的方式详见 官方文档 - EMR JindoFS。
具体分析步骤如下:
- 配置 SmartData 中 JindoFS 规则:namespace 和 OSS 挂载点,开启缓存。在配置完后注意重启 Jindo Namespace Service 以生效配置。
- 创建外表,并修复表结构。注意在每次数据存在改动后,请务必执行下 msck repair table 进行新数据感知。
DROP TABLE IF EXISTS workshop_oss;
CREATE EXTERNAL TABLE `workshop_oss`(`locationTime` bigint, `id` bigint, `location` string)
PARTITIONED BY (`year` int, `month` int, `day` int)
STORED AS PARQUET LOCATION 'jfs://test/';
msck repair table workshop_oss;
- 进行数据分析
select count(1) as count from workshop_oss;
由于 JindoFS 存在缓存加速,所以数据查询越多,数据返回越快,数据结果如下所示。
第一次查询:
第二次查询:
第三次查询:
后语
本文将针对本地生活用户圈选的场景下,介绍了如何通过 Tablestore 进行冷热数据分析。
热数据分析建议使用计算引擎(如本文介绍的 EMR Spark)+ Tablestore 主表或者多元索引。Tablestore 可以提供流批一体存储能力,同时可以提供分析与查询两种能力。
冷数据分析建议使用数据湖投递,通过 OSS 存储冷数据,再基于计算引擎(如本文介绍的 EMR Spark 和 DLA)进行分析。
欢迎加入
表格存储 Tablestore 推出了很多贴近用户场景的文章与示例代码,欢迎大家加入我们的钉钉公开交流群一起讨论,群号:23307953。(1 群已满员,欢迎加入 2 群)