转自dbaplus社群公众号
作者:王康,唯品会数据平台高级开发工程师
GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~
自 2017 年起,为保障内部业务在平时和大促期间的平稳运行,唯品会就开始基于 Kubernetes 深入打造高性能、稳定、可靠、易用的实时计算平台,现在的平台支持 Flink、Spark、Storm 等主流框架。
本文将分为五个方面,分享唯品会 Flink 的容器化实践应用以及产品化经验:
- 发展概览
- Flink 容器化实践
- Flink SQL 平台化建设
- 应用案例
- 未来规划
一、发展概览
1、集群规模
在集群规模方面,我们有 2000+ 的物理机,主要部署 Kubernetes 异地双活的集群,利用 Kubernetes 的 namespaces,labels 和 taints 等实现业务隔离以及初步的计算负载隔离。
Flink 任务数、Flink SQL 任务数、Storm 任务数、Spark 任务数,这些线上实时应用加起来有 1000 多个。目前我们主要支持 Flink SQL 这一块,因为 SQL 化是一个趋势,所以我们要支持 SQL 任务的上线平台。
2、平台架构
我们从下往上进行解析实时计算平台的整体架构:
- 资源调度层(最底层)
实际上是用 deployment 的模式运行 Kubernetes 上,平台虽然支持 yarn 调度,但是 yarn 调度与批任务共享资源,所以主流任务还是运行在 Kubernetes 上的。并且,yarn 调度这一层主要是离线部署的一套 yarn 集群。在 2017 年的时候,我们自研了 Flink on Kubernetes 的一套方案,因为底层调度分了两层,所以在大促资源紧张的时候,实时跟离线就可以做一个资源的借调。
- 存储层
主要用来支持公司内部基于 Kafka 的实时数据 vms,基于 binlog 的 vdp 数据和原生 Kafka 作为消息总线,状态存储在 HDFS 上,数据主要存入 Redis、MySQL、HBase、Kudu、HDFS、ClickHouse 等。
- 计算引擎层
主要是 Flink、Storm、Spark,目前主推的是 Flink,每个框架会都会支持几个版本的镜像以满足不同的业务需求。
- 实时平台层
主要提供作业配置、调度、版本管理、容器监控、job 监控、告警、日志等功能,提供多租户的资源管理(quota,label 管理)以及 Kafka 监控。资源配置也分为大促日和平常日,大促的资源和平常的资源是不一样的,资源的权限管控也是不一样的。在 Flink 1.11 版本之前,平台自建元数据管理系统为 Flink SQL 管理 schema;从 1.11 版本开始,则是通过 Hive metastore 与公司元数据管理系统融合。
- 应用层
主要是支持实时大屏、推荐、实验平台、实时监控和实时数据清洗的一些场景。
二、Flink容器化实践
1、容器化方案
上面是实时平台 Flink 容器化的架构图。Flink 容器化其实是基于 Standalone 模式部署的。
我们的部署模式共有 Client、Job Manager、Task Manager 三个角色,每一个角色都会有一个 Deployment 来控制。
用户通过平台上传任务 jar 包、配置等,存储于 HDFS 上。同时由平台维护的配置、依赖等也存储在 HDFS 上,当 pod 启动时,就会进行拉取等初始化操作。
Client 中主进程是一个由 go 开发的 agent,当 Client 启动时,会首先检查集群状态,当集群准备好后,从 HDFS 上拉取 jar 包,再向这个集群提交任务。Client 的主要任务是做容错,它主要功能还有监控任务状态,做 savepoint 等操作。
通过部署在每台物理机上的 smart-agent 采集容器的指标写入 m3,以及通过 Flink 暴漏的接口将 metrics 写入 prometheus,结合 grafana 展示。同样通过部署在每台物理机上的 vfilebeat 采集挂载出来的相关日志写入 es,在 dragonfly 可以实现日志检索。
1)Flink 平台化
在实践过程中,一定要结合具体场景和易用性,再去考虑做平台化工作。
2)Flink 稳定性
在我们应用部署以及运行过程中,异常是不可避免的,这时候平台就需要做一些保证任务在出现异常状况后,依旧保持稳定性的一些策略。
- pod 的健康和可用:
由 livenessProbe 和 readinessProbe 检测,同时指定 pod 的重启策略,Kubernetes 本身可以做一个 pod 的拉起。
Flink 任务产生异常时:
- Flink 有自已本身的一套 restart 策略和 failover 机制,这是它的第一层保障。
- 在 Client 中会定时监控 Flink 状态,同时将最新的 checkpoint 地址更新到自己的缓存中,并汇报到平台,然后固化到 MySQL 中。当 Flink 无法再重启时,由 Client 重新从最新的成功 checkpoint 提交任务。这是它的第二层保障。
这一层将 checkpoint 固化到 MySQL 中后,就不再使用 Flink HA 机制了,少了 zk 的组件依赖。
- 当前两层无法重启时或集群出现异常时,由平台自动从固化到 MySQL 中的最新 checkpoint 重新拉起一个集群,提交任务,这是它的第三层保障。
机房容灾:
- 用户的 jar 包,checkpoint 都做了异地双 HDFS 存储。
- 异地双机房双集群。
2、Kafka 监控方案
Kafka 监控是任务监控里非常重要的一个环节,整体的流程如下:
平台提供监控 Kafka 堆积,用户在界面上,可以配置自己的 Kafka 监控,告知在怎样的集群,以及用户消费 message 等配置信息。可以从 MySQL 中将用户 Kafka 监控配置提取后,再通过 jmx 监控 Kafka,这样的信息采集之后,写入下游 Kafka,再通过另一个 Flink 任务实时监控告警,同时将这些数据同步写入 ck 里面,从而反馈给我们的用户(这里也可以不用 ck,用 Prometheus 去做监控也是可以的,但 ck 会更加适合),最后再用 Grafana 组件去展示给用户。
三、Flink SQL 平台化建设
有了前面 Flink 的容器化方案之后,就要开始 Flink SQL 平台化建设了。大家都知道,这样流式的 api 开发起来,还是有一定的成本的。 Flink 肯定是比 Storm 快的,也相对比较稳定、容易一些,但是对于一些用户,特别是 Java 开发的一些同学来说,做这个是有一定门槛的。
Kubernetes 的 Flink 容器化实现以后,方便了 Flink api 应用的发布,但是对于 Flink SQL 的任务仍然不够便利。于是平台提供了更加方便的在线编辑发布、SQL 管理等一栈式开发平台。
1、 Flink SQL 方案
平台的 Flink SQL 方案如上图所示,任务发布系统与元数据管理系统是完全解耦的。
1)Flink SQL 任务发布平台化
在实践过程中,需要考虑易用性,做平台化工作,主操作界面如下图所示:
- Flink SQL 的版本管理、语法校验、拓扑图管理等;
- UDF 通用和任务级别的管理,支持用户自定义 udf;
- 提供参数化的配置界面,方便用户上线任务。
下图是一个用户界面配置的例子:
下图是一个集群配置的范例:
2)元数据管理
平台在 1.11 之前通过构建自己的元数据管理系统 UDM,MySQL 存储 Kafka,Redis 等 schema,通过自定义 catalog 打通 Flink 与 UDM,从而实现元数据管理。
在 1.11 之后,Flink 集成 Hive 逐渐完善,平台重构了 Flink SQL 框架,并通过部署一个 SQL-gateway service 服务,中间调用自己维护的 SQL-Client jar 包,从而与离线元数据打通,实现了实时离线元数据的统一,为之后的流批一体打好了基础。
在元数据管理系统创建的 Flink 表操作界面如下图所示:创建 Flink 表的元数据,持久化到 Hive 里,Flink SQL 启动时从 Hive 里读取对应表的 table schema 信息。
2、Flink SQL 相关实践
平台对于官方原生支持或者不支持的 connector 进行整合和开发,镜像和 connector,format 等相关依赖进行解耦,可以快捷的进行更新与迭代。
1)Flink SQL 相关实践
Flink SQL 主要分为以下三层:
connector 层
- 支持 VDP connector 读取 source 数据源;
- 支持 Redis string、hash 等数据类型的 sink & 维表关联;
- 支持 kudu connector & catalog & 维表关联;
- 支持 protobuf format 解析实时清洗数据;
- 支持 vms connector 读取 source 数据源;
- 支持 ClickHouse connector sink 分布式表 & 本地表高 TPS 写入;
- Hive connector 支持数坊 Watermark Commit Policy 分区提交策略 & array、decimal 等复杂数据类型。
runtime 层
- 主要支持拓扑图执行计划修改;
- 维表关联 keyBy 优化 cache 提升查询性能;
- 维表关联延迟 join。
平台层
- Hive UDF;
- 支持 json HLL 相关处理函数;
- 支持 Flink 运行相关参数设置如 minibatch、聚合优化参数;
- Flink 升级 hadoop3。
2)拓扑图执行计划修改
针对现阶段 SQL 生成的 stream graph 并行度无法修改等问题,平台提供可修改的拓扑预览修改相关参数。平台会将解析后的 FlinkSQL 的 excution plan json 提供给用户,利用 uid 保证算子的唯一性,修改每个算子的并行度,chain 策略等,也为用户解决反压问题提供方法。例如针对 ClickHouse sink 小并发大批次的场景,我们支持修改 ClickHouse sink 并行度,source 并行度 = 72,sink 并行度 = 24,提高 ClickHouse sink tps。
3)维表关联 keyBy 优化 cache
针对维表关联的情况,为了降低 IO 请求次数,降低维表数据库读压力,从而降低延迟,提高吞吐,有以下三种措施:
下面是维表关联 KeyBy 优化 cache 的图:
在优化之前的时候,维表关联 LookupJoin 算子和正常算子 chain 在一起,优化之间维表关联 Lookup Join 算子和正常算子不 chain 在一起,将join key 作为 hash 策略的 key。
采用这种方式优化后,例如原来的 3000W 数据量维表,10 个 TM 节点,每个节点都要缓存 3000W 的数据,总共需要缓存 3 亿的量。而经过 keyBy 优化之后,每个 TM 节点只需要缓存 3000W/10 = 300W 的数据量,总共缓存的数据量只有 3000W,这非常大程度减少了缓存数据量。
4)维表关联延迟 join
维表关联中,有很多业务场景,在维表数据新增数据之前,主流数据已经发生 join 操作,会出现关联不上的情况。因此,为了保证数据的正确,将关联不上的数据进行缓存,进行延迟 join。
最简单的做法是,在维表关联的 function 里设置重试次数和重试间隔,这个方法会增大整个流的延迟,但主流 qps 不高的情况下,可以解决问题。
增加延迟 join 的算子,当 join 维表未关联时,先缓存起来,根据设置重试次数和重试间隔从而进行延迟的 join。
四、应用案例
1、实时数仓
1)实时数据入仓
实时数仓主要分为三个过程:
- 流量数据一级 Kafka 进行实时数据清洗后,可以写到二级清洗 Kafka,主要是 protobuf 格式,再通过 Flink SQL 写入 Hive 5min 表,以便做后续的准实时 ETL,加速 ods 层数据源的准备时间。
- MySQL 业务库的数据,通过 VDP 解析形成 binlog cdc 消息流,再通过 Flink SQL 写入 Hive 5min 表,同时会提交到自定义分区,再把分区状态汇报到服务接口,最后再做一个离线的调度。
- 业务系统通过 VMS API 产生业务 Kafka 消息流,通过 Flink SQL 解析之后写入 Hive 5min 表。可以支持 string、json、csv 等消息格式。
使用 Flink SQL 做流式数据入仓是非常方便的,而且 1.12 版本已经支持了小文件的自动合并,解决了大数据层一个非常普遍的痛点。
我们自定义分区提交策略,当前分区 ready 时候会调一下实时平台的分区提交 api,在离线调度定时调度通过这个 api 检查分区是否 ready。
采用 Flink SQL 统一入仓方案以后,我们可获得以下成果:
- 首先我们不仅解决了以往 Flume 方案不稳定的问题,用户也可以实现自助入仓,大大降低入仓任务的维护成本,稳定性也可以得到保障。
- 其次我们还提升了离线数仓的时效性,从小时级降低至 5min 粒度入仓,时效性可以增强。
2)实时指标计算
- 实时应用消费清洗后 Kafka,通过 Redis 维表、api 等方式关联,再通过 Flink window 增量计算 UV,持久化写到 HBase 里。
- 实时应用消费 VDP 消息流之后,通过 Redis 维表、api 等方式关联,再通过 Flink SQL 计算出销售额等相关指标,增量 upsert 到 kudu 里,方便根据 range 分区批量查询,最终通过数据服务对实时大屏提供最终服务。
以往指标计算通常采用 Storm 方式,这个方式需要通过 api 定制化开发,采用这样 Flink 方案以后,我们可以获得了以下成果:
- 将计算逻辑切到 Flink SQL 上,降低计算任务口径变化快,解决修改上线周期慢等问题;
- 切换至 Flink SQL 可以做到快速修改,并且实现快速上线,降低了维护的成本。
3)实时离线一体化ETL数据集成
具体的流程如下图所示:
Flink SQL 在最近的版本中持续强化了维表 join 的能力,不仅可以实时关联数据库中的维表数据,还能关联 Hive 和 Kafka 中的维表数据,能灵活满足不同工作负载和时效性的需求。
基于 Flink 强大的流式 ETL 的能力,我们可以统一在实时层做数据接入和数据转换,然后将明细层的数据回流到离线数仓中。
我们通过将 presto 内部使用的 HyperLogLog(后面简称 HLL)实现引入到 Spark UDAF 函数里,打通 HLL 对象在 Spark SQL 与 presto 引擎之间的互通。如 Spark SQL 通过 prepare 函数生成的 HLL 对象,不仅可以在 Spark SQL 里 merge 查询而且可以在 presto 里进行 merge 查询。
具体流程如下:
UV 近似计算示例:
2、实验平台(Flink 实时数据入 OLAP)
唯品会实验平台是通过配置多维度分析和下钻分析,提供海量数据的 A/B-test 实验效果分析的一体化平台。一个实验是由一股流量(比如用户请求)和在这股流量上进行的相对对比实验的修改组成。实验平台对于海量数据查询有着低延迟、低响应、超大规模数据(百亿级)的需求。
整体数据架构如下:
- 离线数据是通过 waterdrop 导入到 ClickHouse 里面去;
- 实时数据通过 Flink SQL 将 Kafka 里的数据清洗解析展开等操作之后,通过 Redis 维表关联商品属性,通过分布式表写入到 ClickHouse,然后通过数据服务 adhoc 查询,通过数据服务提供对外的接口。
业务数据流如下:
我们的实验平台有一个很重要的 ES 场景,我们上线一个应用场景后,如果我想看效果如何,包括上线产生的曝光、点击、加购、收藏是怎样的。我们需要把每一个数据的明细,比如说分流的一些数据,根据场景分区,写到 ck 里面去。
我们通过 Flink SQL Redis connector,支持 Redis 的 sink 、source 维表关联等操作,可以很方便地读写 Redis,实现维表关联,维表关联内可配置 cache ,极大提高应用的 TPS。通过 Flink SQL 实现实时数据流的 pipeline,最终将大宽表 sink 到 CK 里,并按照某个字段粒度做 murmurHash3_64 存储,保证相同用户的数据都存在同一 shard 节点组内,从而使得 ck 大表之间的 join 变成 local 本地表之间的 join,减少数据 shuffle 操作,提升 join 查询效率。
五、未来规划
1、提高Flink SQL易用性
Flink SQL 对于 Hive 用户来说,使用起来还是有一点不一样的地方。不管是 Hive,还是 Spark SQL,都是批量处理的一个场景。
所以当前我们的 Flink SQL 调试起来仍有很多不方便的地方,对于做离线 Hive 的用户来说还有一定的使用门槛,例如手动配置 Kafka 监控、任务的压测调优。所以如何能让用户的使用门槛降至最低,让用户只需要懂 SQL 或者懂业务,把 Flink SQL 里面的概念对用户屏蔽掉,简化用户的使用流程,是一个比较大的挑战。
将来我们考虑做一些智能监控,告诉用户当前任务存在的问题,不需要用户去学习太多的东西,尽可能自动化并给用户一些优化建议。
2、数据湖CDC分析方案落地
一方面,我们做数据湖主要是为了解决我们 binlog 实时更新的场景,目前我们的 VDP binlog 消息流,通过 Flink SQL 写入到 Hive ods 层,以加速 ods 层数据源的准备时间,但是会产生大量重复消息去重合并。我们会考虑 Flink + 数据湖的 cdc 入仓方案来做增量入仓。
另一方面我们希望通过数据湖,来替代我们 Kudu,我们这边一部分重要的业务在用 Kudu。虽然 Kudu 没有大量的使用,但鉴于 Kudu 的运维比一般的数据库运维复杂得多、比较小众,并且像订单打宽之后的 Kafka 消息流、以及聚合结果都需要非常强的实时 upsert 能力,所以我们就开始调研 CDC+数据湖这种解决方案,用这种方案的增量 upsert 能力来替换 kudu 增量 upsert 场景。
Q&A
Q1:vdp connector 是 MySQL binlog 读取吗?和 canal是一种工具吗?
A1 :vdp 是公司 binlog 同步的一个组件,将 binlog 解析之后发送到 Kafka。是基于 canal 二次开发的。我们定义了一个 cdc format 可以对接公司的 vdp Kafka 数据源,与 Canal CDC format 有点类似。目前没有开源,使我们公司用的 binlog 的一个同步方案。
Q2 : uv 数据输出到 HBase,销售数据输出到 kudu,输出到了不同的数据源,主要是因为什么采取的这种策略?
A2 :kudu 的应用场景没有 HBase 这么广泛。uv 实时写入的 TPS 比较高,HBase 比较适合单条查询的场景,写入 HBase 高吞吐 + 低延迟,小范围查询延迟低;kudu 的话具备一些 OLAP 的特性,可以存订单类明细,列存加速,结合 Spark、presto 等做 OLAP 分析。
Q3 : 请问一下,你们怎么解决的 ClickHouse 的数据更新问题?比如数据指标更新。
A3 : ck 的更新是异步 merge,只能在同一 shard 同一节点同一分区内异步 merge,是弱一致性。对于指标更新场景不太建议使用 ck。如果在 ck 里有更新强需求的场景,可以尝试 AggregatingMergeTree 解决方案,用 insert 替换 update,做字段级的 merge。
Q4:binlog 写入怎么保证数据的去重和一致性?
A4 : binlog 目前还没有写入 ck 的场景,这个方案看起来不太成熟。不建议这么做,可以用采用 CDC + 数据湖的解决方案。
Q5 : 如果 ck 各个节点写入不均衡,怎么去监控,怎么解决?怎么样看数据倾斜呢?
A5 :可以通过 ck 的 system.parts 本地表监控每台机器每个表每个分区的写入数据量以及 size,来查看数据分区,从而定位到某个表某台机器某个分区。
Q6 : 你们在实时平台是如何做任务监控或者健康检查的?又是如何在出错后自动恢复的?现在用的是 yarn-application 模式吗?存在一个 yarn application 对应多个 Flink job 的情况吗?
A6 : 对于 Flink 1.12+ 版本,支持了 PrometheusReporter 方式暴露一些 Flink metrics 指标,比如算子的 watermark、checkpoint 相关的指标如 size、耗时、失败次数等关键指标,然后采集、存储起来做任务监控告警。
Flink 原生的 restart 策略和 failover 机制,作为第一层的保证。
在 Client 中会定时监控 Flink 状态,同时将最新的 checkpoint 地址更新到自己的缓存中,并汇报到平台,固化到 MySQL 中。当 Flink 无法再重启时,由 Client 重新从最新的成功 checkpoint 提交任务。作为第二层保证。这一层将 checkpoint 固化到 MySQL 中后,就不再使用 Flink HA 机制了,少了 zk 的组件依赖。
当前两层无法重启时或集群出现异常时,由平台自动从固化到 MySQL 中的最新 chekcpoint 重新拉起一个集群,提交任务,作为第三层保证。
我们支持 yarn-per-job 模式,主要基于 Flink on Kubernetes 模式部署 standalone 集群。
Q7 : 目前你们大数据平台上所有的组件都是容器化的还是混合的?
A7 :目前我们实时这一块的组件 Flink、Spark 、Storm、Presto 等计算框架实现了容器化,详情可看上文 1.2 平台架构。
Q8 :kudu 不是在 Kubernetes 上跑的吧?
A8 :kudu 不是在 Kubernetes 上运行,这个目前还没有特别成熟的方案。并且 kudu 是基于 cloudera manager 运维的,没有上 Kubernetes 的必要。
Q9 : Flink 实时数仓维度表存到 ck 中,再去查询 ck,这样的方案可以吗?
A9:这是可以的,是可以值得尝试的。事实表与维度表数据都可以存,可以按照某个字段做哈希(比如 user_id),从而实现 local join 的效果。
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99元试用实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc