前言
在大数据时代,数据已经渗透到当今每一个行业和业务职能领域,成为重要的生产要素。随着业务和数据量的不断增长,性能和成本的权衡变成了大数据系统设计面临的关键挑战,这里甚至会导致原有系统进行架构改造或者数据迁移。所以在架构设计之初,我们就需要把整套架构的成本考虑进来,这对应的就是数据的分层存储和存储计算引擎的选择。
Delta Lake是DataBricks公司推出的一种新型数据湖方案,围绕数据流入、数据组织管理、数据查询和数据流出,推出了一系列功能特性,同时提供了数据操作的ACID和CRUD。通过结合Delta Lake和上下游组件,可以搭建出一个便捷、易用、安全的数据湖架构。在数据湖架构设计中,通常我们会应用HTAP(Hybrid transaction/analytical processing)体系结构,通过合理的选择分层存储组件和计算引擎,既能支持海量数据分析和快速的事务更新写入,又能有效的进行冷热数据的分离进而降低成本。本文将主要围绕数据分层存储展开,更多架构设计的干货请参见文章:结构化大数据分析平台设计。
冷数据
数据按照实际访问的频率可以区分为热数据、温数据和冷数据,其中冷数据数据量较大,很少被访问,甚至整个生命周期都可能不会被访问,只是为了满足业务合规或者特定场景需求需要在一定时间内保存。通常我们有两个方式来区分冷热数据:
- 按数据的创建时间:常见于交易类数据,时序监控,IM聊天等场景,通常数据写入初期用户的关注度较高且访问频繁,而随着时间的推移,旧的数据的访问频率会越来越低,仅存在少量查询甚至完全不查询。
- 按照访问热度:有些数据的访问频率并非按时间,例如某些大V的旧博客由于某些原因突然大量被访问到,这样的冷数据也会变成热数据。这个时候就不应该再按时间区分,需要根据具体的业务和数据分布规律来区分冷热。
本文主要讨论按数据创建时间的冷热数据分层,而对于按访问热度的数据分层,通常可以采用业务打标或系统自识别(添加缓存)等手段。
冷热分层适用场景
从冷热数据的区分,可以看出,冷数据具备以下一些特点:
- 数据量大:不同于热数据,冷数据通常需要保存较长时间甚至是所有时间的数据。
- 成本敏感:数据量大且访问频率较低,不宜投入过多的成本。
- 性能要求不高:这里是相对的概念,相比于一般的TP请求不需要查询在毫秒级别返回,冷数据的查询可以容忍到数十秒甚至更长时间才出结果,或者可以进行异步处理。
- 业务场景较简单:对于冷数据基本都是批量的写入和删除,一般没有更新操作。在查询时,一般只需要读取指定条件的数据,且查询条件不会过于复杂。
针对于冷数据的这些特点,不难挖掘出一些冷热分层适用的场景:
时序类数据场景:时序数据天然具备时间属性,数据量大,且几乎只做追加操作。时序数据无处不在,常见于监控类数据、交易类数据、物联网数据、环境监测等场景。
- IM场景:如钉钉,用户一般会查阅最近的若干条聊天记录,历史的数据一般只有在有特殊需求的时候才会去查询。
- 监控场景: 如云监控,用户通常只会查看近期的监控,历史数据一般只有在调查问题或者制定报表时才会去查询。
- 账单场景:如支付宝,我们通常只会查询最近几天或者一个月内的账单,超过一年以上的账单基本不会去查询。
- 物联网场景:如IOT,通常设备近期上报的数据是热点数据,会经常被分析,而历史数据的分析频率都较低。
- 归档类场景:对于重写轻读的数据,可以将数据定期归档到成本更低的存储组件或更高压缩比的存储介质中,达到降成本的目的。
冷热数据方案设计
在冷热数据分层方案的设计中,我们考虑以下几个核心的问题:
- 冷热数据如何进行高效分离:传统的在线库定期投递和冷热库双写方式方式都存在一定的弊端,目前主流的有两种思路:基于日志的增量导出方案(e.g. CDC技术)和 存储产品本身构建多级冷热分层能力。
- 冷热数据如何进行异构:异构的目的主要是进行差异化存储,例如通过不同的存储格式(行存、列存),压缩算法,物理存储介质等手段进行分离,进而达到降成本的目的。
- 冷热数据如何进行查询:冷热分层架构查询的核心在于进行高效和准确的进行请求路由,通常可以借助于客户端请求提供的一些hint信息或者一些元信息(例如索引主键),进行路由决策,尽可能的去多拉取热数据享受速度红利。
海量结构化数据Delta Lake架构
针对于结构化冷热分层数据场景,表格存储(Tablestore)[注1]联合EMR团队推出了一个海量结构化数据的Delta Lake架构。针对于冷热数据方案设计的几个问题,都进行了很好的解决。基于表格存储的通道服务[注2],可以将原始数据利用CDC技术派生到多种存储组件中,例如Delta Lake和Tablestore引擎自身的列存中,进而完成冷热数据的分离和异构。同时表格存储提供灵活的上游数据入口和TTL功能,用户可以定制热数据的生命周期,将冷数据不断的实时投递到Delta Lake和列存中,达到降成本的目的。最后对于计算和查询层,Tablestore结合Spark可以完成对冷热数据的全增量一体的定制化计算,并可以最终将计算结果存入Tablestore的索引引擎中进行统一的查询。
注1: 表格存储(Tablestore)是阿里云自研的面向海量结构化数据存储的Serverless NoSQL多模型数据库,被广泛用于社交、物联网、人工智能、元数据和大数据等业务场景。提供兼容HBase的WideColumn模型、消息模型Timeline以及时空模型Timestream,可提供PB级存储、千万TPS以及毫秒级延迟的服务能力。
注2:通道服务(Tunnel Service)是基于表格存储数据接口之上的全增量一体化服务,通过为数据表建立数据通道,您可以简单地实现对表中历史存量和新增数据的消费处理。
冷热分层实战
本部分会结合Tablestore和Delta Lake进行数据湖冷热分层的具体实战。
数据源说明
数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID)和两个属性列price(价格)和timestamp(订单时间)。使用Tablestore SDK的BatchWrite接口进行订单数据的写入,订单的时间戳的时间范围为最近的90天(本文的模拟时间范围为2020-02-26~2020-05-26),共计写入3112400条,原始数据的样例如下图所示。
在模拟订单写入时,对应Tablestore表中的属性列的版本号也会被设置为相应的时间戳,这样通过配置表上的TTL属性,当写入数据的保留时长超过设置的TTL后,系统会自动清理对应版本号的数据。
实时流式投递
- 创建数据源表,同时在表格存储控制台上创建增量通道,利用通道提供的CDC(日志变更捕获)技术将新增的主表数据不断同步到Delta中,创建得到的通道ID将会用于后续的SQL配置。
- 在EMR集群的Header机器上启动streaming-sql交互式命令行。
streaming-sql --master yarn --use-emr-datasource --num-executors 16 --executor-memory 4g --executor-cores 4
在第2步启动的streaming-sql中运行以下命令,依次完成:
- Tablestore源表创建:本例中会创建
order_source
源表,其中OPTIONS参数中catalog为表字段的Schema定义(本例中对应UserId, OrderId, price和timestamp四列)。 - Delta Lake Sink表创建:本例中创建写完Delta的
delta_orders
目的表,LOCATION中指定的是Delta文件存储的位置。 - Tablestore源表上创建增量SCAN视图: 本例创建
incremental_orders
的流式视图,其中OPTIONS参数中tunnel.id为第1步创建的增量通道ID,maxoffsetsperchannel为通道每个分区(每个Spark微批)写的最大数据量。 - 启动Stream作业进行实时投递:本例中会根据Tablestore的主键列(UserId和OrderId)主键进行聚合,同时根据CDC日志的操作类型(PUT,UPDATE,DELETE)转化为对应的Delta操作。特别说明的是
__ots_record_type__
是Tablestore流式Source提供的预定义列,表示的是行操作类型。
- Tablestore源表创建:本例中会创建
// 源表和目的表
// 1. 创建源表
DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderSource",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"col": "price", "type": "double"}, "timestamp": {"col": "timestamp", "type": "long"}}}',
);
// 2. 创建Delta Lake Sink: delta_orders
DROP TABLE IF EXISTS delta_orders;
CREATE TABLE delta_orders(
UserId string,
OrderId string,
price double,
timestamp long
)
USING delta
LOCATION '/delta/orders';
// 3. 在源表上创建增量SCAN视图
CREATE SCAN incremental_orders ON order_source USING STREAM
OPTIONS(
tunnel.id="324c6bee-b10d-4265-9858-b829a1b71b4b",
maxoffsetsperchannel="10000");
// 4. 启动Stream作业,将Tablestore CDC数据实时同步到Delta Lake中。
CREATE STREAM orders_job
OPTIONS (
checkpointLocation='/delta/orders_checkpoint',
triggerIntervalMs='3000'
)
MERGE INTO delta_orders
USING incremental_orders AS delta_source
ON delta_orders.UserId=delta_source.UserId AND delta_orders.OrderId=delta_source.OrderId
WHEN MATCHED AND delta_source.__ots_record_type__='DELETE' THEN
DELETE
WHEN MATCHED AND delta_source.__ots_record_type__='UPDATE' THEN
UPDATE SET UserId=delta_source.UserId, OrderId=delta_source.OrderId, price=delta_source.price, timestamp=delta_source.timestamp
WHEN NOT MATCHED AND delta_source.__ots_record_type__='PUT' THEN
INSERT (UserId, OrderId, price, timestamp) values (delta_source.UserId, delta_source.OrderId, delta_source.price, delta_source.timestamp);
冷热数据查询
在实际的设计中,我们一般会把热数据保存在Tablestore表中进行高效的TP查询,冷数据或全量数据保存在Delta中。通过配置Tablestore表的生命周期(TTL),我们可以灵活的对热数据量进行控制。
首先,在配置主表的TTL之前,我们对源表(order_source)和目的表(delta_orders)进行一些查询,此时两边的查询结果保持一致。
接着,我们配置Tablestore的TTL为最近30天,这样Tablestore中的数据(热数据)只有最近的30天数据,而Delta中依旧保留的是全量数据,进而达到冷热分层的目的。
最后,展示一些分层之后的简单查询,具体的查询路由需要结合业务逻辑进行一些路由选择。分层之后热数据总条数为1017004条,冷数据(全量数据)保持不变依旧为3112400条。
结语
本文首先对冷热数据的特点和适用场景进行了一些介绍,接着对海量结构化数据的冷热分层进行了方案和架构的设计,最后通过Tablestore结合Delta Lake对冷热分层进行了生动的实战演示。希望通过海量结构化数据的冷热分层一体化架构和实践,让计算和存储的资源得到充分利用,进而让业务能够用更低的成本承载更优质的服务,
参考链接
写在最后
对大数据存储分析感兴趣的同学,随时欢迎同我们进行交流,E-MapReduce钉钉群号:21784001,Tablestore钉钉群号:11789671(1群)、23307953(2群)。