Flink - FLIP

简介:

https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

 

FLIP-1 : Fine Grained Recovery from Task Failures

 

When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks.

如果一个task失败,当前需要完全停掉整个job恢复,这个明显太重了;

proposal

简单的方案,如果一个task失败,就把和它相连的整条pipeline都重启,但如果所有node都和该task相连,那还是要重启整个job

image 、

 

但这个方案太naive了,是否可以尽量的减少重启的范围?

如果要只重启fail的task,以及后续的tasks,而不想重启源,只有cache 
每个node,把要发出去的Intermediate Result缓存下来,当一个node的task挂了后, 只需要从上一层node把Intermediate Result从发出来,就可以避免从source重启 
至于如何cache Intermediate Result,在memory还是disk,还是其他,只是方案不同

Caching Intermediate Result

This type of data stream caches all elements since the latest checkpoint, possibly spilling them to disk, if the data exceeds the memory capacity.

When a downstream operator restarts from that checkpoint, it can simply re-read that data stream without requiring the producing operator to restart. Applicable to both batch (bounded) and streaming (unbounded) operations. When no checkpoints are used (batch), it needs to cache all data.

Memory-only caching Intermediate Result

Similar to the caching intermediate result, but discards sent data once the memory buffering capacity is exceeded. Acts as a “best effort” helper for recovery, which will bound recovery when checkpoints are frequent enough to hold data in between checkpoints in memory. On the other hand, it comes absolutely for free, it simply used memory that would otherwise not be used anyways.

Blocking Intermediate Result

This is applicable only to bounded intermediate results (batch jobs). It means that the consuming operator starts only after the entire bounded result has been produced. This bounds the cancellations/restarts downstream in batch jobs.

image

 

FLIP-2 Extending Window Function Metadata

Right now, in Flink a WindowFunction does not get a lot of information when a window fires. 

The signature of WindowFunction is this:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out);

}

 

i.e , the user code only has access to the key for which the window fired, the window for which we fired and the data of the window itself. In the future, we might like to extend the information available to the user function. We initially propose this as additional information:

  • Why/when did the window fire. Did it fire on time, i.e. when the watermark passed the end of the window. Did it fire early because of a speculative early trigger or did it fire on late-arriving data.

  • How many times did we fire before for the current window. This would probably be an increasing index, such that each firing for a window can be uniquely identified.

当前在window functions中暴露出来的信息不够,需要给出更多的信息,比如why,when fire等

FLIP-3 - Organization of Documentation

 

FLIP-4 : Enhance Window Evictor

Right now, the ability of Window Evictor is limited

  • The Evictor is called only before the WindowFunction. (There can be use cases where the elements have to be evicted after the WindowFunction is applied)
  • Elements are evicted only from the beginning of the Window. (There can be cases where we need to allow eviction of elements from anywhere within in the Window as per the eviction logic that user wish to implement)

当前Evictor只是在WindowFunction 之前被执行,是否可以在WindowFunction 之后被执行?

当前的接口只是从beginning of the Window开始,是否可以从任意位置开始evict

 

FLIP-5: Only send data to each taskmanager once for broadcasts

Problem:

We experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per task manager.

降低在广播时,发送的冗余数据

当前状况是,

image

要达到的效果是,

image

每个taskmanager只发送一次

 

FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.

核心想法是,把jobmanager的工作分离出来

增加两个新的模块, ResourceManagerdispatcher

The ResourceManager (introduced in Flink 1.1) is the cluster-manager-specific component. There is a generic base class, and specific implementations for:

  • YARN

  • Mesos

  • Standalone-multi-job (Standalone mode)

  • Self-contained-single-job (Docker/Kubernetes)

显然对于不同的资源管理平台,只需要实现不同的ResourceManager

image

于是JobManager, TaskManager和ResourceManager之间的关系就变成这样

TaskManager,向ResourceManager进行注册,并定期汇报solts的情况

JobManager会向ResourceManager请求slot,然后ResourceManager会选择TaskManager,告诉它向某JobManager提供slots

然后该TaskManager会直接联系JobManager去提供slots

同时JobManager会有slot pool,来保持申请到的slots

The SlotPool is a modification of what is currently the InstanceManager.

这样就算ResourceManager挂掉了,JobManager仍然可以继续使用已经申请的slots

 

The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.

image

In the future run, the dispatcher will also help with the following aspects:

  • The dispatcher is a cross-job service that can run a long-lived web dashboard

  • Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters

  • The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications

把Dispatcher从JobManager中分离出来的好处,

首先dispatcher是可以跨cluster的,是个long-lived web dashboard,比如后面如果一个cluster或jobmanager挂了,我可以简单的spawn到另外一个 
第二,client到dispatcher是基于http的很容易穿过防火墙

第三,dispatcher可以当作类似proxy的作用,比如authentications

所以对于不同的cluster manager的具体架构如下,

Yarn,

Compared to the state in Flink 1.1, the new Flink-on-YARN architecture offers the following benefits:

  • The client directly starts the Job in YARN, rather than bootstrapping a cluster and after that submitting the job to that cluster. The client can hence disconnect immediately after the job was submitted

  • All user code libraries and config files are directly in the Application Classpath, rather than in the dynamic user code class loader

  • Containers are requested as needed and will be released when not used any more

  • The “as needed” allocation of containers allows for different profiles of containers (CPU / memory) to be used for different operators

image

 

这个架构把整个Flink都托管在Yarn内部

好处,

你不需要先拉起flink集群,然后再提交job,只需要直接提交job;Yarn的ResourcManager会先拉起Application Master,其中包含Resource Manager和Job Manager;然后当Flink resource manager需要资源时,会先和YARN ResourceManager请求,它会去创建container,其中包含TaskManager;

 

Mesos,

这个架构和Yarn类似,

image

Mesos-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside a regular Mesos container. The Dispatcher is responsible for monitoring and restarting those containers in case they fail. The Dispatcher itself must be made highly available by a Mesos service like Marathon

 

Standalone,

 

The Standalone Setup is should keep compatibility with current Standalone Setups.

The role of the long running JobManager is now  a “local dispatcher” process that spawns JobManagers with Jobs internally. The ResourceManager lives across jobs and handles TaskManager registration.

For highly-available setups, there are multiple dispatcher processes, competing for being leader, similar as the currently the JobManagers do.

image

 

Component Design and Details

更具体的步骤,

image

 

FLIP-7: Expose metrics to WebInterface

With the introduction of the metric system it is now time to make it easily accessible to users. As the WebInterface is the first stop for users for any details about Flink, it seems appropriate to expose the gathered metrics there as well.

The changes can be roughly broken down into 4 steps:

  1.     Create a data-structure on the Job-/TaskManager containing a metrics snapshot
  2.     Transfer this snapshot to the WebInterface back-end
  3.     Store the snapshot in the WebRuntimeMonitor in an easily accessible way
  4.     Expose the stored metrics to the WebInterface via REST API

 

FLIP-8: Rescalable Non-Partitioned State

要解决的问题是,当dynamic scaling的时候,如何解决状态的问题

如果没有状态,动态的scaling,需要做的只是把流量分到新的operator的并发上

但是对于状态,当增加并发的时候,需要把状态切分,而减少并发的时候,需要把状态合并

这个就比较麻烦了

同时在Flink里面,状态分为3部分,operator state, the function state and key-value states

其中对于key-value states的方案相对简单一些,https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#

这里基本的思想,就是更细粒度的checkpoint; 
原先是以task级别为粒度,这样加载的时候,只能加载一个task,如果一个task扩成2个,task级别的checkpoint也需要切分 
而采用更细粒度的checkpoint独立存储,而不依赖task,这样就可以独立于task进行调度

比如对于key-value,创建一个叫key groups的概念,以key group作为一个checkpoint的单元

In order to efficiently distribute key-value states across the cluster, they should be grouped into key groups. Each key group represents a subset of the key space and is checkpointed as an independent unit. The key groups can then be re-assigned to different tasks if the DOP changes.

这样当发生增减operator的并发度的时候,只需要以key group为单位调度到新的operator上,同时在该operator上恢复相应的checkpoint即可,如图

image

然后,对于non-partitioned operator and function state,这个问题怎么解

比如对于kafkasource,4个partitions,2个source的并发

image

scaling down后,就会出现下图,左边的情况,因为只有s1 task了,他只会load他自己的checkpoint,而之前s2的checkpoint就没人管了 
而理论上,我们是要达到右边的情况的

image

scaling up后,也会出现下图左边的case,因为S1,S2加载了原来的checkpoint,但是当前其实partition3,partition4已经不再分配到s2了

image

 

思路还是一样,把checkpoint的粒度变细,而不依赖于task,

image

 

FLIP-9: Trigger DSL

当前支持的trigger方式不够灵活,而且对late element只能drop,需要设计更为灵活和合理的DSL,用于描述Trigger policy

 

FLIP-10: Unify Checkpoints and Savepoints

Currently checkpoints and savepoints are handled in slightly different ways with respect to storing and restoring them. The main differences are that savepoints 1) are manually triggered, 2) persist checkpoint meta data, and 3) are not automatically discarded.

With this FLIP, I propose to allow to unify checkpoints and savepoints by allowing savepoints to be triggered automatically.

 

FLIP-11: Table API Stream Aggregations

The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support:

  • Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group.

  • Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows.

Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables.

Since time-windowed aggregates will be the first operation that require the definition of time, this FLIP does also discuss how the Table API handles time characteristics, timestamps, and watermarks.

 

FLIP-12: Asynchronous I/O Design and Implementation

I/O access, for the most case, is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. Starting multiple threads may be an option to handle this problem, but the drawbacks are obvious: The programming model for end users may become more complicated as they have to implement thread model in the operator. Furthermore, they have to pay attention to coordinate with checkpointing.

流最终会碰到外部存储,那就会有IO瓶颈,比如写数据库,那么会阻塞整个流

这个问题怎么解?可以用多线程,这个会让编程模型比较复杂,说白了,不够优雅

所以解决方法其实就是用reactor模型,典型的解决I/O等待的方法

AsyncFunction: Async I/O will be triggered in AsyncFunction.

AsyncWaitOperator: An StreamOperator which will invoke AsyncFunction.

AsyncCollector: For each input streaming record, an AsyncCollector will be created and passed into user's callback to get the async i/o result.

AsyncCollectorBuffer: A buffer to keep all AsyncCollectors.

Emitter Thread: A working thread in AsyncCollectorBuffer, being signalled while some of AsyncCollectors have finished async i/o and emitting results to the following opeartors.

对于普通的operator,调用function,然后把数据用collector发送出去

但对于AsyncWaitOperator,无法直接得到结果,所以把AsyncCollector传入callback,当function触发callback的时候,再emit数据

但这样有个问题,emit的顺序是取决于,执行速度,如果对emit顺序没有要求应该可以

但如果模拟同步的行为,理论上,emit的顺序应该等同于收到的顺序

这样就需要一个buffer,去cache所有的AsyncCollector,即AsyncCollectorBuffer

当callback被执行时,某个AsyncCollector会被填充上数据,这个时候被mark成可发送

但是否发送,需要依赖一个外部的emitter

他会check,并决定是否真正的emit这个AsyncCollector,比如check是否它之前的所有的都已经emit,否则需要等待

这里还需要考虑的是, watermark,它必须要等到前面的数据都已经被成功emit后,才能被emit;这样才能保证一致性

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
613 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
1609 7
阿里云实时计算Flink在多行业的应用和实践
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
4042 74
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
628 56
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
808 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
人工智能 Apache 流计算
Flink Forward Asia 2024 上海站|探索实时计算新边界
Flink Forward Asia 2024 即将盛大开幕!11 月 29 至 30 日在上海举行,大会聚焦 Apache Flink 技术演进与未来规划,涵盖流式湖仓、流批一体、Data+AI 融合等前沿话题,提供近百场专业演讲。立即报名,共襄盛举!官网:https://asia.flink-forward.org/shanghai-2024/
1411 33
Flink Forward Asia 2024 上海站|探索实时计算新边界

热门文章

最新文章