核桃编程Delta Lake实时数仓应用实践

简介: 本文简述了核桃编程应用EMR建设Delta Lake实时数仓的实践。

作者:
卢圣刚,核桃编程数据架构师,拥有多年的大数据开发和架构经验。曾担任易观数据挖掘工程师,熊猫TV大数据架构师。


核桃编程简介

核桃编程成立于2017年8月9日,作为少儿编程教育行业的领导者,始终秉持“让每个孩子爱学习、会学习,让优质的教育触手可及”的使命,致力于以科技手段促进编程教育,凭借首创的AI人机双师教学模式与十级进阶课程体系,实现规模化因材施教,“启发中国孩子的学习力”。截止2019年8月,核桃编程已经成为付费学员规模最大的少儿编程教育机构,帮助超过65万名孩子收获学习兴趣,锻炼编程技能,养成良好思维习惯,学员复购率超91%,学员完课率高达98%,在线原创作品1873万份。

1.业务现状

业务需求

  • 业务上固定时间开课,在开课时间内,班主任需要实时/准实时地知道学生的学习情况
  • 数据统计维度一般都是按班级,学期汇总,时间范围可能是几个月,甚至一年
  • 业务变化快,需要及时响应业务变化带来的指标逻辑变更

数据源

image.png

架构改造前方案

现有指标都是将Kafka/Mysql等的数据写入HDFS,使用Hive离线批处理,每10分钟执行一次,循环统计历史累计指标,再定时把数据同步到Mysql,提供给数据后台查询。如下图所示:
image.png

遇到的问题

随着计算的数据量越来越大,逐渐不能满足业务的更新频率要求。

  • 使用Apache Sqoop做全量数据同步,会对业务Mysql库/HDFS造成压力。
  • 使用Apache Sqoop做增量同步,一般只能使用某个时间字段(例如update time)来同步新修改的数据。这样在做分区表时,需要比较复杂的离线合并。
  • 随着数据越来越大,同步以及处理时间会越来越长,满足不了业务实时性需求。

2.实时数仓方案调研

离线的同步方案已经不能满足业务需求,计划迁移到实时方案上来,并做了一些调研。

迁移流式计算的问题

开发周期长

现有离线任务基本都是动辄几百行SQL,逻辑复杂,把所有逻辑迁移到流式计算,开发难度和改造成本都比较大。
例如离线增量同步,需要先同步全量base数据

sqoop import  \
--hive-import \
--hive-overwrite \
--connect jdbc:mysql://<mysqlurl>  \ 
--table <mysqltable> \
--hive-table <table_base> \
--hive-partition-key <parcolumn> \
--hive-partition-value <par1>

再消费增量binlog数据,流式写入到hive外部表,最后将两个表合并

insert overwrite table <result_storage_table>select
<col1>,
      
<col2>,
       <colN>
  from(select
row_number() over(partition by t.<primary_key_column>
 order by record_id
desc, after_flag desc) as row_number, record_id, operation_flag, after_flag,
<col1>, <col2>, <colN>
  from(select
incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>,
incr.<col2>,incr.<colN>
  from
<table_log> incr
 where
utc_timestamp< <timestamp>
 union all select 0
as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>,
base.<col2>,base.<colN>
  from
<table_base> base) t) gtwhere record_num=1 
  and
after_flag='Y'

而应用Delta Lake只需要一个streaming sql即可实现实时增量同步。

CREATE SCAN <SCAN_TABLE> on <STREAM> using
stream;
CREATE STREAM job
OPTIONS(
checkpointLocation='/cdc',
triggerInterval=30000
)
MERGE INTO <CDC_TABLE> as target
USING (
 SELECT
 from_unixtime(<col2>,'yyyyMMdd') as
par_date,
 <col1>
 FROM(
  SELECT 
  recordId, 
  recordType, 
  CAST(before.id as
LONG) as before_id,
  CAST(after.id as
LONG) as id,
 
after.<col1>,
  after.ctime,
  dense_rank() OVER
(PARTITION BY coalesce(before.id,after.id) ORDER BY recordId DESC) as rank
  FROM (
   SELECT 
   recordId, 
   recordType, 
  
from_json(CAST(beforeImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as before,
  
from_json(CAST(afterImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as after
   FROM (
    select
from_avro(value) as (recordID, source, dbTable, recordType, recordTimestamp,
extraTags, fields, beforeImages, afterImages) from <SCAN_TABLE>
   ) binlog WHERE
recordType != 'INIT'
  ) binlog_wo_init
 ) binlog_extract
WHERE rank=1
) as source
ON target.id = source.before_id
WHEN MATCHED AND source.recordType='UPDATE' THEN
UPDATE SET *
WHEN MATCHED AND source.recordType='DELETE' THEN
DELETE
WHEN NOT MATCHED AND (source.recordType='INSERT' OR
source.recordType='UPDATE') THEN
INSERT *;

数据恢复困难

对离线任务来说数据恢复只需要重新执行任务就行。

但对流式计算,当数据异常,或者逻辑变更,需要重新跑全量数据的时候,只能离线补历史数据,再union实时数据。因为Kafka不可能存所有历史数据,而且从头消费追数据时间也会很久。

而为了满足快速恢复的需求,所有指标都需要从一开始准备离线和实时两套代码,类似Lambda架构。

数据验证困难

Kafka在大数据架构中一般充当消息队列的角色,数据保存周期较短。全量历史数据,会消费Kafka写到HDFS。如果一个指标计算了一个月,发现计算结果有异常,很难追溯是当时Kafka数据有问题,还是计算逻辑有问题。HDFS数据虽然可以用来排查,但是HDFS里的数据和当时Kafka的数据是否一致,是不能保证的。

希望满足的功能

正因为迁移流式作业会有一些迁移成本和问题,所以对实时计算方案提出了一些功能要求。

开发灵活

互联网公司业务发展速度快,人力资源比较紧张,需要更低成本更快捷的开发新指标,满足业务敏捷性的要求。

重跑历史数据方便

业务指标的定义经常发生变更,一旦变更,或者有新的数据指标就需要从最早开始消费。但是历史数据通常非常多,而且一般实时数据源Kafka也不可能存历史所有数据。

数据异常时容易排查问题

以离线数仓为例,几百行的SQL,可以分段执行,来逐步排查。Flink可以埋metrics获取中间过程。

3.基于Delta Lake实时数仓方案

Delta Lake

Delta Lake是美国Databricks开源的数据湖技术,基于Apache Parquet丰富了数据管理功能,如元数据管理/事务/数据更新/数据版本回溯等。使用Delta Lake可以很方便的将流处理和批处理串联起来,快速构建Near-RealTime的Data Pipeline.
image.png

目前阿里巴巴E-MapReduce(简称“EMR”)团队对Delta Lake做了很多功能和性能上的优化,并和Spark做了深度集成,主要以下方面,更多信息详见EMR官方文档

  • SparkSQL支持Update/Delete/Merge Into/Optimize/Vacuum等语法来操作Delta Lake
  • 自研SparkStreaming SQL,支持Delta Lake的相关DML操作
  • Hive&Presto On Delta Lake
  • Delta Lake On OSS(阿里云对象存储)
  • Delta Lake事务冲突检测优化
  • DataSkipping & Zorder性能优化
    image.png

SparkStreaming SQL

阿里巴巴EMR团队在StructStreaming基础上自研了SparkStreaming SQL,用户可以很方便的使用SQL来写流式作业的逻辑,大大降低了开发门槛, 详见 SparkStreaming SQL官方文档
image.png

  • 批流统一引擎
    可以复用底层SparkSQL/SparkCore的优化
  • 丰富的SQL支持
    CREATE TABLE / CREATE SCAN / CREAT STREAM / CTAS

INSERT INTO / MERGE INTO
SELECT / WHERE/ GROUP BY / JOIN / UNION ALL

  • 丰富的UDF支持
    Hive UDF / 窗口函数
  • 丰富的数据源支持
    Delta/Kudu/Druid/HBase/MySQL/Redis/SLS/Datahub/TableStore

并且支持Kafka的Exactly Once
github: https://github.com/aliyun/aliyun-emapreduce-sdk

  • Delta Lake深度集成
    结合Delta Lake的使用场景,新增了一些功能的支持(比如流式写动态分区表)

实时数仓方案

架构方案

基于Delta Lake+SparkStreaming SQL可以快速构建实时数仓的pipeline,如下所示:
image.png

  • ODS层
    ODS的数据主要是实时埋点数据,CDC中的binlog日志等
  • DIM维表
  • DW层
    DW层主要是一部分轻度汇总数据,例如用户维度的课程,作业等信息。

     主要复用的是dw层数据,因此针对每一个指标,需要综合考虑是否聚合,聚合到哪一个维度,是否关联维表。

    DW层分为两种

a.业务简单,基本不会变化。直接写入Kafka。
b.业务逻辑复杂,数据可能<频繁>变化,写入Delta Lake。实践上看,直接写入Kafka是最容易的方案,但是灵活性很低,历史数据无法追溯,也无法修改。DW层通过引入Delta Lake,可以实现流批统一数据源,历史分区数据恢复等功能。

  • DM层

     DM层就是最后的报表展示指标了,可以将DW层delta表做为数据源,再次汇总后sink到展示用的DataBase。
    

备注:
EMR团队提供了流式Merge Into功能,可以通过写SparkStreaming SQL的方式来做CDC回放binlog到Delta表。
详见CDC同步文档

问题的优化

在使用Delta Lake的过程中,我们也发现了一些问题,详细的解决方案和建议如下:

小文件多

CDC流式Merge回放binlog的过程中,会不断产生小文件,需要对小文件进行一些处理,EMR提供了一些优化方案

  • 新增串行auto compaction的功能
    在CDC流式作业运行过程中,根据一定的策略对小文件进行合并compact操作
  • 使用Adaptive Execution
    打开自适应执行开关,可以有效减少Merge过程产生的小文件,如单个batch从100个小文件减少到1~2个文件。

Compact冲突问题

如果不使用串行Compact功能,需要定期手工对Delta表进行Compact合并小文件,但是经常碰到Compact在事务提交的时候和CDC流作业事务提交产生冲突,是的CDC流或者Compact失败,这块也提供了一些优化以及建议:

  • 优化Delta内核冲突机制,使得CDC流能够稳定运行,不会因为Compact挂掉
  • 使用分区表,批量对分区进行Compact,减少冲突概率
  • 在数据库表update/delete操作很少的时候进行Compact(可以使用EMR工作流调度)
  • 使用EMR工作流中的作业重试功能,当遇到Compact事务提交失败时进行重试

架构方案进一步说明

• 为什么不直接从ODS计算

以核桃的到课指标为例,数据源是kafka的埋点topic,需要计算的指标有个人维度到课数据,学期维度,班级维度,学期维度,市场渠道维度。
每个维度都需要消费所有的埋点数据,从中挑出到课相关的事件。并且每个维度的计算程序都需要查询HBase/Mysql关联相关的学期,班级,unit等维表。
一旦有整体逻辑的调整,例如过滤测试班数据,不可能从ods层就把数据过滤掉(这样从底层就开始丢失数据,后期无法追查),那么所有程序都需要重新调整,添加这个过滤逻辑。

• 怎么恢复数据

理想情况是,实时与离线使用同一套SQL,同一套计算逻辑,同一个数据源,这样随时可以用离线脚本重跑历史数据。但是现实是没有哪个框架支持。所谓流批一体,都是在引擎层面,例如Spark的streaming和SQL都是batch的方式,流只是更小的批。而Flink则希望用流的方式去处理批数据,批只是有边界的流。针对高阶的SQL API,流批都有很大的区别。基于Delta Lake的分区表,将dw层的实时数据按时间分区,这样可以随时用离线作业恢复历史分区的数据。而DW之上的汇总因为数据量相对较小,恢复之后可以用流作业从头消费。

4. 业务效果

Delta Lake实时数仓在核桃编程部分数据仓库生产环境上线后,部分业务统计指标已基于新架构产出,指标更新延迟从几十分钟,提升到1分钟以内。班主任可以更快获取学生的学习状态,及时跟进学习进度,从而显著提升了教学质量。
在CDC应用后,数据同步延迟从半小时提升到30秒,同时解决了Sqoop高并发同步时对业务数据库的影响。数据分析人员Ad-Hoc查询时,可以获取实时的业务数据,明显提升了数据分析效果,并且可以更及时的指导业务发展。

5. 后续计划

根据目前的业务应用效果,后续大数据团队会继续梳理业务范围所有实时指标,进一步优化实时数仓各层的结构,推进全面应用基于Delta Lake的实时数仓建设。
基于Delta Lake模式执行、时间旅行等特性,进一步推进机器学习场景下对Delta的应用,构造更可靠、易扩展的Data Pipeline。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

相关实践学习
基于Hologres+PAI+计算巢,5分钟搭建企业级AI问答知识库
本场景采用阿里云人工智能平台PAI、Hologres向量计算和计算巢,搭建企业级AI问答知识库。通过本教程的操作,5分钟即可拉起大模型(PAI)、向量计算(Hologres)与WebUI资源,可直接进行对话问答。
相关文章
|
SQL 存储 OLAP
阿里CCO基于Hologres的亿级明细BI探索分析实践
阿里CCO基于Hologres的亿级明细BI探索分析实践。
1018 0
阿里CCO基于Hologres的亿级明细BI探索分析实践
|
2月前
|
存储 JSON BI
友盟+Hologres:千亿级多维分析平台建设实践
Hologres 在友盟+统计分析、营销等多个产品线使用,很好地满足了用户行为分析、人群圈选与洞察场景的多维度分析、灵活下钻、快速人群预估和圈选等分析需求,提供客户更流畅的数据查询和分析体验。
|
2月前
|
存储 运维 监控
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
通过对各个业务线实时需求的调研了解到,当前实时数据处理场景是各个业务线基于Java服务独自处理的。各个业务线实时能力不能复用且存在计算资源的扩展性问题,而且实时处理的时效已不能满足业务需求。鉴于当前大数据团队数据架构主要解决离线场景,无法承接更多实时业务,因此我们需要重新设计整合,从架构合理性,复用性以及开发运维成本出发,建设一套通用的大数据实时数仓链路。本次实时数仓建设将以游戏运营业务为典型场景进行方案设计,综合业务时效性、资源成本和数仓开发运维成本等考虑,我们最终决定基于Flink + Hudi + Hologres来构建阿里云云原生实时湖仓,并在此文中探讨实时数据架构的具体落地实践。
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
|
4月前
|
存储 分布式计算 MaxCompute
Hologres RoaringBitmap实践:千亿级画像数据秒级分析
Hologres RoaringBitmap实践:千亿级画像数据秒级分析
394 1
|
5月前
|
SQL Cloud Native 关系型数据库
陈长城:NineData面向Doris实时数仓集成的技术实践
在刚刚过去的北京Doris Summit Asia 2023,玖章算术技术副总裁陈长城受邀参加并做了《NineData面向Doris实时数仓集成的技术实践》报告。
1010 1
|
7月前
|
Cloud Native 关系型数据库 MySQL
下一代企业级云原生实时数仓的创新和实践(一)
下一代企业级云原生实时数仓的创新和实践(一)
222 1
|
7月前
|
存储 分布式计算 MaxCompute
Hologres RoaringBitmap实践,千亿级画像数据秒级分析
本文将会分享Hologres RoaringBitmap 方案在画像分析的应用实践,实现更快更准的画像分析。
|
7月前
|
存储 SQL 关系型数据库
AnalyticDB PostgreSQL构建一站式实时数仓实践
本文介绍通过 AnalyticDB PostgreSQL 版基于实时物化视图,构建流批一体的一站式实时数仓解决方案,实现一套系统、一份数据、一次写入,即可在数仓内完成实时数据源头导入到实时分析全流程。
1870 5
AnalyticDB PostgreSQL构建一站式实时数仓实践
|
8月前
|
SQL 存储 消息中间件
芒果 TV 基于 Flink 的实时数仓建设实践
基于 Flink 技术的特点,芒果 TV 在未来的数仓建设中将注重实现湖仓一体的架构,以实现对数据的全面管理和利用。
17868 35
芒果 TV 基于 Flink 的实时数仓建设实践
|
9月前
|
存储 消息中间件 SQL
Flink CDC & MongoDB 联合实时数仓的探索实践
XTransfer 技术专家, Flink CDC Maintainer 孙家宝,在 Flink Forward Asia 2022 数据集成专场的分享。
1045 0
Flink CDC & MongoDB 联合实时数仓的探索实践