Flink分布式程序的异常处理

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink分布式程序的异常处理

在我们的数据平台产品中,为了简化开发,对Flink做了一层封装,定义了Job和Flow的抽象。一个Job其实就是Flink的一个作业,每个Job可以定义多个Flow,一个Flow可以理解为是Flink的一个DataStream,利用Job传递的StreamExecutionEnvironment可以在Flow中添加包括Source与Sink的多个算子。


Job与Flow之间的关系可以利用自定义的@JobFlow注解进行配置,如此就可以在执行抽象的AbstractJob的run()方法时,利用反射获得该Job下的所有Flow,遍历执行每个Flow的run()方法。在Flow的run()方法中,才会真正根据StreamExecutionEnvironment执行多个算子。


Flink为了保证计算的稳定性,提供了不同的重启策略。例如,当我们将重启策略设置为失败率(failure-rate)时,如果执行的任务出错次数达到了失败率配置的要求,Flink的Worker节点的TaskManager就会重启。如果超过重启次数,Task Manager就会停止运行。


失败的原因可能有很多,例如资源不足、网络通信出现故障等Flink集群环境导致的故障,但是也可能是我们编写的作业在处理流式数据时,因为处理数据不当抛出了业务异常,使得Flink将其视为一次失败。


为了减少因为业务原因抛出异常导致Task Manager的不必要重启,需要规定我们编写的Flink程序的异常处理机制。由于封装了Flink的Job,从一开始,我就考虑一劳永逸地解决业务异常的问题,即在AbstractJob的run()方法中,捕获我们自定义的业务异常,在日志记录了错误信息后,把该异常“吃”掉,避免异常的抛出导致执行失败,造成TaskManager的重启,如:

public abstract class AbstractFlow implements Flow {  
    public void run() {
        try {
            runBare();
        } catch (DomainException ex) {
            //...
        }
    }
    protected abstract void runBare();
}

哪知道这一处理机制压根儿就无法捕获业务异常!为什么呢?这就要从Flink的分布式机制说起了。


在Flink集群上执行任务,需要Client将作业提交给Flink集群的Master节点。Master的Dispatcher接收到Job并启动JobManager,通过解析Job的逻辑视图,了解Job对资源的要求,然后向ResourceManager(Standalone模式,如果是YARN,则由YARN管理和调度资源)申请本次Job需要的资源。JobManager将Job的逻辑视图转换为物理视图,并将计算任务分发部署到Flink集群的TaskManager上。整个执行过程如下图所示:


image.png


我们封装的一个Flow,在物理视图中,其实就是一个作业,即前面所说的计算任务。一个作业可以包含多个算子。如果相邻算子之间不存在数据Shuffle、并行度相同,则会合并为算子链(Operator Chain)。每个算子或算子链组成一个JobVertex,在执行时作为一个任务(Task)。根据并行度的设置,每个任务包含并行度数目的子任务(SubTask),这些子任务就是作业调度的最小逻辑单元,对应于进程资源中的一个线程,在Flink中,就是一个Slot(如果不考虑Slot共享的话)。


假定Flink环境的并行度设置为1,作业的前面两个算子满足合并算子链的要求,且并行度设置为2;之后,通过keyBy()之类的算子完成了数据的Shuffle,然后再合并到同一个Sink中。那么它们的关系如下图所示:


image.png

显然,Flink集群在执行作业时,会对作业进行划分,并将划分后的各个子任务分发到TaskManager中的每个Slot。一个TaskManager就是一个JVM,Slot则是进程中的一个线程。


答案不言而喻。AbstractFlow之所以无法捕获到各个算子执行任务时抛出的业务异常,是因为它们根本就没有执行在一个JVM上,也没有运行在同一个线程中。这正是分布式开发与本地开发的本质区别。如果不了解Flink的执行原理,可能就会困惑Java的异常处理机制为何不生效。在进行分布式开发时,如果还是照搬本地开发的经验,可能真的会撞得头碰血流才会看清真相。因此,正确的做法是在每个算子的实现中捕获各自的异常,也就是要保证每个算子自身都是健壮的,如此才能保证作业尽可能健壮。


当然,分布式开发与本地开发的本质区别不只限于此,例如分布式开发跨进程调用对序列化的要求,对数据一致性的不同要求,对异步通信机制以及阻塞调用的认识,都可能给程序员带来不同的体验。归根结底,了解分布式开发或分布式系统的底层原理,可以让我们尽早看到真相,避免调到坑里而不自知。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
43 0
|
1月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
61 3
|
1月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
43 0
|
3月前
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何在程序因故停掉后能从之前的Binlog位置继续读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
38 0
|
3月前
|
存储 Java 流计算
Flink 分布式快照,神秘机制背后究竟隐藏着怎样的惊人奥秘?快来一探究竟!
【8月更文挑战第26天】Flink是一款开源框架,支持有状态流处理与批处理任务。其核心功能之一为分布式快照,通过“检查点(Checkpoint)”机制确保系统能在故障发生时从最近的一致性状态恢复,实现可靠容错。Flink通过JobManager触发检查点,各节点暂停接收新数据并保存当前状态至稳定存储(如HDFS)。采用“异步屏障快照(Asynchronous Barrier Snapshotting)”技术,插入特殊标记“屏障(Barrier)”随数据流传播,在不影响整体流程的同时高效完成状态保存。例如可在Flink中设置每1000毫秒进行一次检查点并指定存储位置。
61 0
|
4月前
|
SQL Java 数据库
实时计算 Flink版产品使用问题之Spring Boot集成Flink可以通过什么方式实现通过接口启动和关闭Flink程序
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Java 调度 流计算
基于多线程的方式优化 FLink 程序
这篇内容介绍了线程的基本概念和重要性。线程是程序执行的最小单位,比进程更细粒度,常用于提高程序响应性和性能。多线程可以实现并发处理,利用多核处理器,实现资源共享和复杂逻辑。文章还讨论了线程的五种状态(NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED)以及如何在Java中创建和停止线程。最后提到了两种停止线程的方法:使用标识和中断机制。
158 5
|
6月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
195 0