字节跳动基于 Flink 的 MQ-Hive 实时数据集成

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 在数据中台建设过程中,一个典型的数据集成场景是将 MQ (Message Queue,例如 Kafka、RocketMQ 等)的数据导入到 Hive 中,以供下游数仓建设以及指标统计。由于 MQ-Hive 是数仓建设第一层,因此对数据的准确性以及实时性要求比较高。

在数据中台建设过程中,一个典型的数据集成场景是将 MQ (Message Queue,例如 Kafka、RocketMQ 等)的数据导入到 Hive 中,以供下游数仓建设以及指标统计。由于 MQ-Hive 是数仓建设第一层,因此对数据的准确性以及实时性要求比较高。

本文主要围绕 MQ-Hive 场景,针对目前字节跳动内已有解决方案的痛点,提出基于 Flink 的实时解决方案,并介绍新方案在字节跳动内部的使用现状。

已有方案及痛点

字节跳动内已有解决方案如下图所示,主要分了两个步骤:

  1. 通过 Dump 服务将 MQ 的数据写入到 HDFS 文件
  2. 再通过 Batch ETL 将 HDFS 数据导入到 Hive 中,并添加 Hive 分区

1.jpg

痛点

  1. 任务链较长,原始数据需要经过多次转换最终才能进入 Hive
  2. 实时性比较差,Dump Service、Batch ETL 延迟都会导致最终数据产出延迟
  3. 存储、计算开销大,MQ 数据重复存储和计算
  4. 基于原生 Java 打造,数据流量持续增长后,存在单点故障和机器负载不均衡等问题
  5. 运维成本较高,架构上无法复用公司内 Hadoop/Flink/Yarn 等现有基础设施
  6. 不支持异地容灾

基于 Flink 实时解决方案

优势

针对目前公司传统解决方案的痛点,我们提出基于 Flink 的实时解决方案,将 MQ 的数据实时写入到 Hive,并支持事件时间以及 Exactly Once 语义。相比老方案,新方案优势如下所示:

  1. 基于流式引擎 Flink 开发,支持 Exactly Once 语义
  2. 实时性更高,MQ 数据直接进入 Hive,无中间计算环节
  3. 减少中间存储,整个流程数据只会落地一次
  4. 支撑 Yarn 部署模式,方便用户迁移
  5. 资源管理弹性,方便扩容以及运维
  6. 支持双机房容灾

整体架构

整体架构如下图所示,主要包括 DTS(Data Transmission Service) Source、DTS Core、DTS Sink 三大模块,具体功能如下:

  1. DTS Source 接入不同 MQ 数据源,支持 Kafka、RocketMQ 等
  2. DTS Sink 将数据输出到目标数据源,支持 HDFS、Hive 等
  3. DTS Core 贯穿整个数据同步流程,通过 Source 读取源端数据,经过 DTS Framework 处理,最后通过 Sink 将数据输出到目标端。
  4. DTS Framework 集成类型系统、文件切分、Exactly Once、任务信息采集、事件时间、脏数据收集等核心功能
  5. 支持 Yarn 部署模式,资源调度、管理比较弹性

2.jpg
(DTS Dump架构图)

Exactly Once

Flink 框架通过 Checkpoint 机制,能够提供 Exactly Once 或者 At Least Once 语义。为了实现 MQ-Hive 全链路支持 Exactly-once 语义,还需要 MQ Source、Hive Sink 端支持 Exactly Once 语义。本文通过 Checkpoint + 2PC 协议实现,具体过程如下:

  1. 数据写入时,Source 端从上游 MQ 拉取数据并发送到 Sink 端;Sink 端将数据写入到临时目录中
  2. Checkpoint Snapshot 阶段,Source 端将 MQ Offset 保存到 State 中;Sink 端关闭写入的文件句柄,并保存当前 Checkpoint ID 到 State 中;
  3. Checkpoint Complete 阶段,Source 端 Commit MQ Offset;Sink 端将临时目录中的数据移动到正式目录下
  4. Checkpoint Recover 阶段,加载最新一次成功的 Checkpoint 目录并恢复 State 信息,其中 Source 端将 State 中保存的 MQ Offset 作为起始位置;Sink 端恢复最新一次成功的 Checkpoint ID,并将临时目录的数据移动到正式目录下

■ 实现优化

在实际使用场景中,特别是大并发场景下,HDFS 写入延迟容易有毛刺,因为个别 Task Snapshot 超时或者失败,导致整个 Checkpoint 失败的问题会比较明显。因此针对 Checkpoint 失败,提高系统的容错性以及稳定性就比较重要。

这里充分利用 Checkpoint ID 严格单调递增的特性,每一次做 Checkpoint 时,当前 Checkpoint ID 一定比以前大,因此在 Checkpoint Complete 阶段,可以提交小于等于当前 Checkpoint ID 的临时数据。具体优化策略如下:

  1. Sink 端临时目录为{dump_path}/{next_cp_id},这里 next_cp_id 的定义是当前最新的 cp_id + 1
  2. Checkpoint Snapshot 阶段,Sink 端保存当前最新 cp_id 到 State,同时更新 next_cp_id 为 cp_id + 1
  3. Checkpoint Complete 阶段,Sink 端将临时目录中所有小于等于当前 cp_id 的数据移动到正式目录下
  4. Checkpoint Recover 阶段,Sink 端恢复最新一次成功的 cp_id,并将临时目录中小于等于当前 cp_id 的数据移动到正式目录下

类型系统

由于不同数据源支持的数据类型不一样,为了解决不同数据源间的数据同步以及不同类型转换兼容的问题,我们支持了 DTS 类型系统,DTS 类型可细化为基础类型和复合类型,其中复合类型支持类型嵌套,具体转换流程如下:

  1. 在 Source 端,将源数据类型,统一转成系统内部的 DTS 类型
  2. 在 Sink 端,将系统内部的 DTS 类型转换成目标数据源类型
  3. 其中 DTS 类型系统支持不同类型间的相互转换,比如 String 类型与 Date 类型的相互转换

3.jpg
(DTS Dump架构图)

Rolling Policy

Sink 端是并发写入,每个 Task 处理的流量不一样,为了避免生成太多的小文件或者生成的文件过大,需要支持自定义文件切分策略,以控制单个文件的大小。目前支持三种文件切分策略:文件大小、文件最长未更新时间、Checkpoint。

■ 优化策略

Hive 支持 Parquet、Orc、Text 等多种存储格式,不同的存储格式数据写入过程不太一样,具体可以分为两大类:

  1. RowFormat:基于单条写入,支持按照 Offset 进行 HDFS Truncate 操作,例如 Text 格式
  2. BulkFormat:基于 Block 写入,不支持 HDFS Truncate 操作,例如 Parquet、ORC 格式

为了保障 Exactly Once 语义,并同时支持 Parquet、Orc、Text 等多种格式,在每次 Checkpoint 时,强制做文件切分,保证所有写入的文件都是完整的,Checkpoint 恢复时不用做 Truncate 操作。

容错处理

理想情况下流式任务会一直运行不需要重启,但实际不可避免会遇到以下几个场景:

  • Flink 计算引擎升级,需要重启任务
  • 上游数据增加,需要调整任务并发度
  • Task Failover

■ 并发度调整

目前 Flink 原生支持 State Rescale。具体实现中,在 Task 做 Checkpoint Snapshot 时,将 MQ Offset 保存到 ListState 中;Job 重启后,Job Master 会根据 Operator 并发度,将 ListState 平均分配到各个 Task 上。

■ Task Failover

由于网络抖动、写入超时等外部因素的影响,Task 不可避免会出现写入失败,如何快速、准确的做 Task Failover 就显得比较重要。目前 Flink 原生支持多种 Task Failover 策略,本文使用 Region Failover 策略,将失败 Task 所在 Region 的所有 Task 都重启。

异地容灾

■ 背景

大数据时代,数据的准确性和实时性显得尤为重要。本文提供多机房部署及异地容灾解决方案,当主机房因为断网、断电、地震、火灾等原因暂时无法对外提供服务时,能快速将服务切换到备灾机房,并同时保障 Exactly Once 语义。

■ 容灾组件

整体解决方案需要多个容灾组件一起配合实现,容灾组件如下图所示,主要包括 MQ、YARN、HDFS,具体如下:

  1. MQ 需要支持多机房部署,当主机房故障时,能将 Leader 切换到备机房,以供下游消费
  2. Yarn 集群在主机房、备机房都有部署,以便 Flink Job 迁移
  3. 下游 HDFS 需要支持多机房部署,当主机房故障时,能将 Master 切换到备机房
  4. Flink Job 运行在 Yarn 上,同时任务 State Backend 保存到 HDFS,通过 HDFS 的多机房支持保障 State Backend 的多机房

4.jpg

■ 容灾过程

整体容灾过程如下所示:

  1. 正常情况下,MQ Leader 以及 HDFS Master 部署在主机房,并将数据同步到备机房。同时 Flink Job 运行在主机房,并将任务 State 写入到 HDFS 中,注意 State 也是多机房部署模式
  2. 灾难情况下,MQ Leader 以及 HDFS Master 从主机房迁移到备灾机房,同时 Flink Job 也迁移到备灾机房,并通过 State 恢复灾难前的 Offset 信息,以提供 Exactly Once 语义

5.jpg
6.jpg

事件时间归档

■ 背景

在数仓建设中,处理时间(Process Time)和事件时间(Event Time)的处理逻辑不太一样,对于处理时间会将数据写到当前系统时间所对应的时间分区下;对于事件时间,则是根据数据的生产时间将数据写到对应时间分区下,本文也简称为归档。

在实际场景中,不可避免会遇到各种上下游故障,并在持续一段时间后恢复,如果采用 Process Time 的处理策略,则事故期间的数据会写入到恢复后的时间分区下,最终导致分区空洞或者数据漂移的问题;如果采用归档的策略,会按照事件时间写入,则没有此类问题。

由于上游数据事件时间会存在乱序,同时 Hive 分区生成后就不应该再继续写入,因此实际写入过程中不可能做到无限归档,只能在一定时间范围内归档。归档的难点在于如何确定全局最小归档时间以及如何容忍一定的乱序。

■ 全局最小归档时间

Source 端是并发读取,并且一个 Task 可能同时读取多个 MQ Partition 的数据,对于 MQ 的每一个 Parititon 会保存当前分区归档时间,取分区中最小值作为 Task 的最小归档时间,最终取 Task 中最小值,作为全局最小归档时间。

7.jpg

■ 乱序处理

为了支持乱序的场景,会支持一个归档区间的设置,其中 Global Min Watermark 为全局最小归档时间,Partition Watermark 为分区当前归档时间,Partition Min Watermark 为分区最小归档时间,只有当事件时间满足以下条件时,才会进行归档:

  1. 事件时间大于全局最小归档时间
  2. 事件时间大于分区最小归档时间

8.jpg

Hive 分区生成

■ 原理

Hive 分区生成的难点在于如何确定分区的数据是否就绪以及如何添加分区。由于 Sink 端是并发写入,同时会有多个 Task 写同一个分区数据,因此只有当所有 Task 分区数据写入完成,才能认为分区数据是就绪,本文解决思路如下:

  1. 在 Sink 端,对于每个 Task 保存当前最小处理时间,需要满足单调递增的特性
  2. 在 Checkpoint Complete 时,Task 上报最小处理时间到 JM 端
  3. JM 拿到所有 Task 的最小处理时间后,可以得到全局最小处理时间,并以此作为 Hive 分区的最小就绪时间
  4. 当最小就绪时间更新时,可判断是否添加 Hive 分区

10.jpg

■ 动态分区

动态分区是根据上游输入数据的值,确定数据写到哪个分区目录,而不是写到固定分区目录,例如 date={date}/hour={hour}/app={app}的场景,根据分区时间以及 app 字段的值确定最终的分区目录,以实现每个小时内,相同的 app 数据在同一个分区下。

在静态分区场景下,每个 Task 每次只会写入一个分区文件,但在动态分区场景下,每个 Task 可能同时写入多个分区文件。对于 Parque 格式的写入,会先将数据写到做本地缓存,然后批次写入到 Hive,当 Task 同时处理的文件句柄过多时,容易出现 OOM。为了防止单 Task OOM,会周期性对文件句柄做探活检测,及时释放长时间没有写入的文件句柄。

11.jpg

Messenger

Messenger 模块用于采集 Job 运行状态信息,以便衡量 Job 健康度以及大盘指标建设。

■ 元信息采集

元信息采集的原理如下所示,在 Sink 端通过 Messenger 采集 Task 的核心指标,例如流量、QPS、脏数据、写入 Latency、事件时间写入效果等,并通过 Messenger Collector 汇总。其中脏数据需要输出到外部存储中,任务运行指标输出到 Grafana,用于大盘指标展示。

12.jpg

■ 脏数据收集

数据集成场景下,不可避免会遇到脏数据,例如类型配置错误、字段溢出、类型转换不兼容等场景。对于流式任务来说,由于任务会一直运行,因此需要能够实时统计脏数据流量,并且将脏数据保存到外部存储中以供排查,同时在运行日志中采样输出。

■ 大盘监控

大盘指标覆盖全局指标以及单个 Job 指标,包括写入成功流量和 QPS、写入 Latency、写入失败流量和 QPS、归档效果统计等,具体如下图所示:

13.jpg
14.jpg

未来规划

基于 Flink 实时解决方案目前已在公司上线和推广,未来主要关注以下几个方面:

  1. 数据集成功能增强,支持更多数据源的接入,支持用户自定义数据转换逻辑等
  2. Data Lake 打通,支持 CDC 数据实时导入
  3. 流批架构统一,支持全量、增量场景数据集成
  4. 架构升级,支持更多部署环境,比如 K8S
  5. 服务化完善,降低用户接入成本

总结

随着字节跳动业务产品逐渐多元化快速发展,字节跳动内部一站式大数据开发平台功能也越来越丰富,并提供离线、实时、全量、增量场景下全域数据集成解决方案,从最初的几百个任务规模增长到数万级规模,日处理数据达到 PB 级,其中基于 Flink 实时解决方案目前已在公司内部大力推广和使用,并逐步替换老的 MQ-Hive 链路。

参考文献:

  1. Real-time Exactly-once ETL with Apache Flink
    http://shzhangji.com/blog/2018/12/23/real-time-exactly-once-etl-with-apache-flink/
  2. Implementing the Two-Phase Commit Operator in Flink
    https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
  3. A Deep Dive into Rescalable State in Apache Flink
    https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
  4. Data Streaming Fault Tolerance
    https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
722 0
|
11月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
735 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
7月前
|
SQL DataX HIVE
【YashanDB知识库】DataX迁移Hive到崖山分布式
本文来自YashanDB官网,介绍通过DataX将Hive数据迁移到YashanDB的实现方法。源环境为Hive 3.1.3,目标为YashanDB 23.2.3.100。文章提供了Hive与YashanDB的建表脚本、数据类型映射及DataX配置示例,包含reader和writer插件参数设置,并通过`datax.py`执行同步任务。内容详尽展示了数据迁移的全流程。
【YashanDB知识库】DataX迁移Hive到崖山分布式
|
8月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1936 45
|
8月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
639 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
650 12
Flink CDC YAML:面向数据集成的 API 设计
|
7月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
312 6
|
7月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
228 5
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
3334 2
Flink CDC:新一代实时数据集成框架
|
11月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
297 1

相关产品

  • 实时计算 Flink版
  • 下一篇
    oss教程