核桃编程Delta Lake实时数仓应用实践

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文简述了核桃编程应用EMR建设Delta Lake实时数仓的实践。

作者:
卢圣刚,核桃编程数据架构师,拥有多年的大数据开发和架构经验。曾担任易观数据挖掘工程师,熊猫TV大数据架构师。


核桃编程简介

核桃编程成立于2017年8月9日,作为少儿编程教育行业的领导者,始终秉持“让每个孩子爱学习、会学习,让优质的教育触手可及”的使命,致力于以科技手段促进编程教育,凭借首创的AI人机双师教学模式与十级进阶课程体系,实现规模化因材施教,“启发中国孩子的学习力”。截止2019年8月,核桃编程已经成为付费学员规模最大的少儿编程教育机构,帮助超过65万名孩子收获学习兴趣,锻炼编程技能,养成良好思维习惯,学员复购率超91%,学员完课率高达98%,在线原创作品1873万份。

1.业务现状

业务需求

  • 业务上固定时间开课,在开课时间内,班主任需要实时/准实时地知道学生的学习情况
  • 数据统计维度一般都是按班级,学期汇总,时间范围可能是几个月,甚至一年
  • 业务变化快,需要及时响应业务变化带来的指标逻辑变更

数据源

image.png

架构改造前方案

现有指标都是将Kafka/Mysql等的数据写入HDFS,使用Hive离线批处理,每10分钟执行一次,循环统计历史累计指标,再定时把数据同步到Mysql,提供给数据后台查询。如下图所示:
image.png

遇到的问题

随着计算的数据量越来越大,逐渐不能满足业务的更新频率要求。

  • 使用Apache Sqoop做全量数据同步,会对业务Mysql库/HDFS造成压力。
  • 使用Apache Sqoop做增量同步,一般只能使用某个时间字段(例如update time)来同步新修改的数据。这样在做分区表时,需要比较复杂的离线合并。
  • 随着数据越来越大,同步以及处理时间会越来越长,满足不了业务实时性需求。

2.实时数仓方案调研

离线的同步方案已经不能满足业务需求,计划迁移到实时方案上来,并做了一些调研。

迁移流式计算的问题

开发周期长

现有离线任务基本都是动辄几百行SQL,逻辑复杂,把所有逻辑迁移到流式计算,开发难度和改造成本都比较大。
例如离线增量同步,需要先同步全量base数据

sqoop import  \
--hive-import \
--hive-overwrite \
--connect jdbc:mysql://<mysqlurl>  \ 
--table <mysqltable> \
--hive-table <table_base> \
--hive-partition-key <parcolumn> \
--hive-partition-value <par1>

再消费增量binlog数据,流式写入到hive外部表,最后将两个表合并

insert overwrite table <result_storage_table>select
<col1>,
      
<col2>,
       <colN>
  from(select
row_number() over(partition by t.<primary_key_column>
 order by record_id
desc, after_flag desc) as row_number, record_id, operation_flag, after_flag,
<col1>, <col2>, <colN>
  from(select
incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>,
incr.<col2>,incr.<colN>
  from
<table_log> incr
 where
utc_timestamp< <timestamp>
 union all select 0
as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>,
base.<col2>,base.<colN>
  from
<table_base> base) t) gtwhere record_num=1 
  and
after_flag='Y'

而应用Delta Lake只需要一个streaming sql即可实现实时增量同步。

CREATE SCAN <SCAN_TABLE> on <STREAM> using
stream;
CREATE STREAM job
OPTIONS(
checkpointLocation='/cdc',
triggerInterval=30000
)
MERGE INTO <CDC_TABLE> as target
USING (
 SELECT
 from_unixtime(<col2>,'yyyyMMdd') as
par_date,
 <col1>
 FROM(
  SELECT 
  recordId, 
  recordType, 
  CAST(before.id as
LONG) as before_id,
  CAST(after.id as
LONG) as id,
 
after.<col1>,
  after.ctime,
  dense_rank() OVER
(PARTITION BY coalesce(before.id,after.id) ORDER BY recordId DESC) as rank
  FROM (
   SELECT 
   recordId, 
   recordType, 
  
from_json(CAST(beforeImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as before,
  
from_json(CAST(afterImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as after
   FROM (
    select
from_avro(value) as (recordID, source, dbTable, recordType, recordTimestamp,
extraTags, fields, beforeImages, afterImages) from <SCAN_TABLE>
   ) binlog WHERE
recordType != 'INIT'
  ) binlog_wo_init
 ) binlog_extract
WHERE rank=1
) as source
ON target.id = source.before_id
WHEN MATCHED AND source.recordType='UPDATE' THEN
UPDATE SET *
WHEN MATCHED AND source.recordType='DELETE' THEN
DELETE
WHEN NOT MATCHED AND (source.recordType='INSERT' OR
source.recordType='UPDATE') THEN
INSERT *;

数据恢复困难

对离线任务来说数据恢复只需要重新执行任务就行。

但对流式计算,当数据异常,或者逻辑变更,需要重新跑全量数据的时候,只能离线补历史数据,再union实时数据。因为Kafka不可能存所有历史数据,而且从头消费追数据时间也会很久。

而为了满足快速恢复的需求,所有指标都需要从一开始准备离线和实时两套代码,类似Lambda架构。

数据验证困难

Kafka在大数据架构中一般充当消息队列的角色,数据保存周期较短。全量历史数据,会消费Kafka写到HDFS。如果一个指标计算了一个月,发现计算结果有异常,很难追溯是当时Kafka数据有问题,还是计算逻辑有问题。HDFS数据虽然可以用来排查,但是HDFS里的数据和当时Kafka的数据是否一致,是不能保证的。

希望满足的功能

正因为迁移流式作业会有一些迁移成本和问题,所以对实时计算方案提出了一些功能要求。

开发灵活

互联网公司业务发展速度快,人力资源比较紧张,需要更低成本更快捷的开发新指标,满足业务敏捷性的要求。

重跑历史数据方便

业务指标的定义经常发生变更,一旦变更,或者有新的数据指标就需要从最早开始消费。但是历史数据通常非常多,而且一般实时数据源Kafka也不可能存历史所有数据。

数据异常时容易排查问题

以离线数仓为例,几百行的SQL,可以分段执行,来逐步排查。Flink可以埋metrics获取中间过程。

3.基于Delta Lake实时数仓方案

Delta Lake

Delta Lake是美国Databricks开源的数据湖技术,基于Apache Parquet丰富了数据管理功能,如元数据管理/事务/数据更新/数据版本回溯等。使用Delta Lake可以很方便的将流处理和批处理串联起来,快速构建Near-RealTime的Data Pipeline.
image.png

目前阿里巴巴E-MapReduce(简称“EMR”)团队对Delta Lake做了很多功能和性能上的优化,并和Spark做了深度集成,主要以下方面,更多信息详见EMR官方文档

  • SparkSQL支持Update/Delete/Merge Into/Optimize/Vacuum等语法来操作Delta Lake
  • 自研SparkStreaming SQL,支持Delta Lake的相关DML操作
  • Hive&Presto On Delta Lake
  • Delta Lake On OSS(阿里云对象存储)
  • Delta Lake事务冲突检测优化
  • DataSkipping & Zorder性能优化

image.png

SparkStreaming SQL

阿里巴巴EMR团队在StructStreaming基础上自研了SparkStreaming SQL,用户可以很方便的使用SQL来写流式作业的逻辑,大大降低了开发门槛, 详见 SparkStreaming SQL官方文档。
image.png

  • 批流统一引擎

可以复用底层SparkSQL/SparkCore的优化

  • 丰富的SQL支持

CREATE TABLE / CREATE SCAN / CREAT STREAM / CTAS
INSERT INTO / MERGE INTO
SELECT / WHERE/ GROUP BY / JOIN / UNION ALL

  • 丰富的UDF支持

Hive UDF / 窗口函数

  • 丰富的数据源支持

Delta/Kudu/Druid/HBase/MySQL/Redis/SLS/Datahub/TableStore
并且支持Kafka的Exactly Once
github: https://github.com/aliyun/aliyun-emapreduce-sdk

  • Delta Lake深度集成

结合Delta Lake的使用场景,新增了一些功能的支持(比如流式写动态分区表)

实时数仓方案

架构方案

基于Delta Lake+SparkStreaming SQL可以快速构建实时数仓的pipeline,如下所示:
image.png

  • ODS层

ODS的数据主要是实时埋点数据,CDC中的binlog日志等

  • DIM维表
  • DW层

DW层主要是一部分轻度汇总数据,例如用户维度的课程,作业等信息。

 主要复用的是dw层数据,因此针对每一个指标,需要综合考虑是否聚合,聚合到哪一个维度,是否关联维表。

DW层分为两种
a.业务简单,基本不会变化。直接写入Kafka。
b.业务逻辑复杂,数据可能<频繁>变化,写入Delta Lake。实践上看,直接写入Kafka是最容易的方案,但是灵活性很低,历史数据无法追溯,也无法修改。DW层通过引入Delta Lake,可以实现流批统一数据源,历史分区数据恢复等功能。

  • DM层
    DM层就是最后的报表展示指标了,可以将DW层delta表做为数据源,再次汇总后sink到展示用的DataBase。

备注:
EMR团队提供了流式Merge Into功能,可以通过写SparkStreaming SQL的方式来做CDC回放binlog到Delta表。
详见CDC同步文档

问题的优化

在使用Delta Lake的过程中,我们也发现了一些问题,详细的解决方案和建议如下:

小文件多

CDC流式Merge回放binlog的过程中,会不断产生小文件,需要对小文件进行一些处理,EMR提供了一些优化方案

  • 新增串行auto compaction的功能

在CDC流式作业运行过程中,根据一定的策略对小文件进行合并compact操作

  • 使用Adaptive Execution

打开自适应执行开关,可以有效减少Merge过程产生的小文件,如单个batch从100个小文件减少到1~2个文件。

Compact冲突问题

如果不使用串行Compact功能,需要定期手工对Delta表进行Compact合并小文件,但是经常碰到Compact在事务提交的时候和CDC流作业事务提交产生冲突,是的CDC流或者Compact失败,这块也提供了一些优化以及建议:

  • 优化Delta内核冲突机制,使得CDC流能够稳定运行,不会因为Compact挂掉
  • 使用分区表,批量对分区进行Compact,减少冲突概率
  • 在数据库表update/delete操作很少的时候进行Compact(可以使用EMR工作流调度)
  • 使用EMR工作流中的作业重试功能,当遇到Compact事务提交失败时进行重试

架构方案进一步说明

• 为什么不直接从ODS计算

以核桃的到课指标为例,数据源是kafka的埋点topic,需要计算的指标有个人维度到课数据,学期维度,班级维度,学期维度,市场渠道维度。
每个维度都需要消费所有的埋点数据,从中挑出到课相关的事件。并且每个维度的计算程序都需要查询HBase/Mysql关联相关的学期,班级,unit等维表。
一旦有整体逻辑的调整,例如过滤测试班数据,不可能从ods层就把数据过滤掉(这样从底层就开始丢失数据,后期无法追查),那么所有程序都需要重新调整,添加这个过滤逻辑。

• 怎么恢复数据

理想情况是,实时与离线使用同一套SQL,同一套计算逻辑,同一个数据源,这样随时可以用离线脚本重跑历史数据。但是现实是没有哪个框架支持。所谓流批一体,都是在引擎层面,例如Spark的streaming和SQL都是batch的方式,流只是更小的批。而Flink则希望用流的方式去处理批数据,批只是有边界的流。针对高阶的SQL API,流批都有很大的区别。基于Delta Lake的分区表,将dw层的实时数据按时间分区,这样可以随时用离线作业恢复历史分区的数据。而DW之上的汇总因为数据量相对较小,恢复之后可以用流作业从头消费。

4. 业务效果

Delta Lake实时数仓在核桃编程部分数据仓库生产环境上线后,部分业务统计指标已基于新架构产出,指标更新延迟从几十分钟,提升到1分钟以内。班主任可以更快获取学生的学习状态,及时跟进学习进度,从而显著提升了教学质量。
在CDC应用后,数据同步延迟从半小时提升到30秒,同时解决了Sqoop高并发同步时对业务数据库的影响。数据分析人员Ad-Hoc查询时,可以获取实时的业务数据,明显提升了数据分析效果,并且可以更及时的指导业务发展。

5. 后续计划

根据目前的业务应用效果,后续大数据团队会继续梳理业务范围所有实时指标,进一步优化实时数仓各层的结构,推进全面应用基于Delta Lake的实时数仓建设。
基于Delta Lake模式执行、时间旅行等特性,进一步推进机器学习场景下对Delta的应用,构造更可靠、易扩展的Data Pipeline。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
相关文章
|
4月前
|
存储 运维 搜索推荐
实时数仓Hologres发展问题之Hologres在无人车送货场景中的应用如何解决
实时数仓Hologres发展问题之Hologres在无人车送货场景中的应用如何解决
51 2
|
1月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
13天前
|
DataWorks 数据挖掘 大数据
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
|
1月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
2月前
|
SQL 分布式计算 数据挖掘
加速数据分析:阿里云Hologres在实时数仓中的应用实践
【10月更文挑战第9天】随着大数据技术的发展,企业对于数据处理和分析的需求日益增长。特别是在面对海量数据时,如何快速、准确地进行数据查询和分析成为了关键问题。阿里云Hologres作为一个高性能的实时交互式分析服务,为解决这些问题提供了强大的支持。本文将深入探讨Hologres的特点及其在实时数仓中的应用,并通过具体的代码示例来展示其实际应用。
253 0
|
3月前
|
存储 监控 算法
Hologres 在 BI 场景中的应用
【9月更文第1天】随着企业对实时数据分析的需求不断增加,传统的批处理方式已经无法满足现代业务决策的速度要求。Hologres,作为一款专为在线分析处理(OLAP)设计的实时数仓解决方案,提供了高性能的查询能力,能够支持大规模数据集的实时分析需求。本文将探讨 Hologres 在商业智能(BI)场景中的应用,包括如何集成 BI 工具以提供实时数据洞察,并加速决策过程。
79 3
|
3月前
|
消息中间件 SQL 大数据
Hologres 在大数据实时处理中的应用
【9月更文第1天】随着大数据技术的发展,实时数据处理成为企业获取竞争优势的关键。传统的批处理框架虽然在处理大量历史数据时表现出色,但在应对实时数据流时却显得力不从心。阿里云的 Hologres 是一款全托管、实时的交互式分析服务,它不仅支持 SQL 查询,还能够与 Kafka、MaxCompute 等多种数据源无缝对接,非常适合于实时数据处理和分析。
135 2
|
4月前
|
运维 监控 搜索推荐
Hologres的应用场景有哪些?
【8月更文挑战第24天】Hologres的应用场景有哪些?
82 2
|
4月前
|
SQL 监控 大数据
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
|
4月前
|
消息中间件 监控 关系型数据库
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题