Flink 必知必会经典课程8:Flink Connector 详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 关于Flink Connector的详解,本文将通过四部分展开介绍:1. 连接器;2. Source API;3. Sink API;4. Collector的未来发展。

作者|任庆盛

关于Flink Connector的详解,本文将通过四部分展开介绍:

  1. 连接器
  2. Source API
  3. Sink API
  4. Collector的未来发展

一. 连接器Connecter的概述-Flink与外部系统的桥梁

1. 连接器 Connector

image.png

Flink的数据重要的来源和去向

连接器是Flink与外部系统间沟通的桥梁。如:我们需要从Kafka里读取数据,在Flink里把数据处理之后再重新写回到HIVE、elastic search这样的外部系统里去。

处理流程中的事件控制:事件处理水印(watermark),检查点对齐记录
负载均衡:根据不同并发的负载对数据分区进行合理的分配
数据解析与序列化:我们的数据在外部系统里可能是以二进制的形式存储的,在数据库里可能是以各种列的形式来存储的。我们再把它读到Flink里后,需要对他进行一个解析,之后才能够进行后面的数据处理。所以我们同样在写回外部系统的时候也需要对数据进行一个序列化的操作-把它转换成外部系统里对应的存储格式来进行存储。

image.png

上图显示的是一项十分典型的例子。
我们首先从kafka里通过Source读取其中的部分记录。然后把这些记录送到Flink当中的一些算子进行对应的运算,再通过Sink写出到elastic search当中去,所以Source和Sink在这个Flink作业的两端起到了一个接口的作用。

二. Source API- Flink数据的入口

1. Source 接口演进

image.png

Source在Flink 1.10版本之前是左侧的这两个接口:SourceFunction API(用来处理流式数据),InputFormat API( 用来处理批式数据)。在Flink 1.10之后,社区引入了一个新的Source API,对整个的Source进行了重构。那么为什么我们社区要做这样的一个工作呢?

批流实现不一致:生态不断壮大的过程中,旧的API暴露出来一些问题。其中最直观的问题就是批流实现的不一致。
接口简单但实现复杂:之前的API可能接口实现比较简单,但实际上对于开发者来讲,在实现这个接口的时候,所有的逻辑、所有的操作实现起来是非常复杂的,对于开发者来讲也不够友好。
因此,基于这些问题,在FLIP-27中提出了一个新的Source API的设计。其特点有二:
批流统一:流式数据处理和批式数据处理不需要再维护两套代码,用一套代码就够了。
实现简单:Source API定义了很多概念上的抽象,虽然说这些抽象看起来会比较复杂,但是实际上是简化了开发者操作的开发者开发工作。

2. 核心抽象

image.png

1) 记录分片(Split)

有编号的记录集合

以Kafka来举例子。Kafka的分片既可以定义成一整个分区,也可以定义成一个分区里的某一部分。比如说我从 offset 为 100的数据开始消费,到200号之间我们定义为一个分片;201~300定义成另外一个分片,这样也是可以的。只要他是一个记录的集合、我们给他一个唯一的编号,我们就可以定义这样的一个记录分片。

进度可追踪

我们需要在这个分片当中记录现在处理到了哪一个位置,我们在记录检查点的时候需要知道当前处理了哪些东西,便于一旦出现了故障,可以直接从故障中恢复起来。

记录分片的所有信息

以Kafka举例来讲,一个分区的起始和终止位点等信息是都要包含在整个记录分片里的。因为我们在做Checkpoint的时候也是以记录分片为单位的,所以说记录分辨里的信息也应该是自洽的。

2) 记录分片枚举器(Split Enumerator)

发现记录分片:检测外部系统中所存在的分片
分配记录分片:Enumerator是处于一个协调者的角色存在的。它需要给我们的Source读取器分配任务。
协调Source读取器:例如某些读取器的进度可能太快了,此时便要告诉他稍微慢一点儿来保证watermark大致是一致的。

3) Source读取器(Source Reader)
从记录分片读取数据:根据枚举器分配的记录分片来读取数据
事件时间水印处理:需要从我们从外部系统中读下来的数据里提取事件时间,然后做出对应的水印发送的操作。
数据解析:对从外部系统中读取到的数据进行反序列化,发送至下游算子

3. 枚举器-读取器架构

image.png

分片枚举器是运行在Job Master上面的,Source读取器是运行在Task Executor上面的。因此,枚举器是领导者、协调者的角色,读取器是执行者的角色。
他们的检查点存储也是各自分开的,但之间会存在一些通信。比如说枚举器是需要给读取器来分配任务,也要通知读取器后续没有更多的分片需要处理。由于一个运行环境不一样,他们两个之间也不可避免地会存在一些网络通信。便有了如下通讯栈的定义。

image.png

这个通讯栈上面确定了一些event来提供给开发者进行自己的实现。

首先,最上面这层是Source Event,留给开发者自己去定义一些客户化的操作。比如假使现在设计的一个Source,可能reader在某些条件下可能要暂停读取,那么SplitEnumerator可以通过这种Source event的方式发送给Source Reader。

其次,再下面一层分别是叫Operator Coordinator,算子的协调者。它和真正去执行任务的算子通过Operator Event算子事件进行沟通的。我们已经事先定义好了一些算子事件,如添加分片、通知我们的leader没有新的分片了等。这些对于所有的Source都通用的事件,是在Operator Event这一层来进行抽象的。

Address Lookup是用来定位消息应该发送给哪一个Operator的。因为Flink整个作业执行起来后会有一个加一个有向无环图的。不同的算子可能运行在不同的Task Manager上面,那么怎么去找到对应的task、对应的算子便是这一层的任务。

由于网络通信的存在,Job Master和Task Executor之间有一个RPC Gateway。所有的Event最终都会通过RPC Gateway、通过RPC调用的方式来进行网络传输。

4. Source读取器设计

为了简化Source读取器实践步骤,减少开发者工作,社区已经为大家提供了SourceReaderBase。用户在开发的时候可以直接继承SourceReaderBase类,从而大大简化开发者的一些开发工作。那么我们接下来对 SourceReaderBase进行分析。看上去好像这张图里有非常多的组件,但实际上我们可以把它拆成两部分来理解。

image.png

以中间elementQueue队列作为界限,队列左侧用蓝色标出来的部分是需要和外部系统打交道的组件,在elementQueue的右侧用橙色标出来的部分是和Flink的引擎侧打交道的部分。

首先,左侧是由一个或者是多个分片的读取器构成的,每一个reader通过一个Fetcher来驱动,多个Fetcher会统一由一个Fetcher Manager来管理。这里的实现也有非常多种,比如说可以只开一个线程、只开这一个SplitReader,通过这一个读取器来消费多个分区。此外,我们也可以根据需求,开多个线程-一个线程运行一个feature,进行一个reader,每个reader负责一个分区来并行的去消费数据。这些完全取决于用户的实现、选择。
出于性能考虑,每次SplitReader会从外部系统中取一批数据,把它们放到elementQueue里。如图所示,在这个蓝色框子里的是每次取下来的一批数据,而后橙色框是这一批数据下面的每条数据。

其次,elementQueue的右侧是由RecordEmitter和SourceOutput组成的。RecordEmitter把每条记录发送给下游的另外一个SourceOutput会把记录输出出去。每次RecordEmitter会从中间elementQueue里拿一批数据下来,把它们一条一条发送到下游。由于RecordEmitter是由主线程来驱动的,该主线程现在的设计里是用了一个无锁的mailbox模型,它会把需要执行工作分成一个一个mail,每次工作线程从mailbox里取出来一个mail然后来进行工作,所以我们应该注意,这里的实现一定要是无阻塞的。

RecordEmitter每次往下游发送数据的同时会向下游汇报-后面会不会还有后续的数据需要处理。与此同时呢,我们也会把当前这个分片的处理进度记录在SplitStates当中,记录它当前的状态、处理到了什么位置。

当SplitEnumerator在外部系统当中发现了新的分片,它需要通过RPC调用addSplits方法将新的分片添加读取器。在SplitFetchermanager这一侧会根据之前用户已经选定的线程模型把新分片分配出去(如只有一个线程,那便会给这个线程分配一个新任务,再让reader去读取这个新的分片。如果整体是多线程的实现的,那便新建一个线程,新建一个reader来单独去处理分片。同样我们也要在SplitStates中记录当前处理的这个进度是怎么样的。

5. 创建检查点

image.png

接下来我们来看一下在新的Source API当中是怎么处理检查点的。
首先,左侧我们的协调者,分片枚举器。图中所示,它目前手中还有一个分片(Split_5)没有分配出去。中间箭头部分是正在传输路上的一些分片。虚线是这个检查点的边界。我们可以看到二号分片已经在检查点前面了,四号分片在检查点后面,最下方的reader正在向SplitEnumerator请求一个新的分片。再看reader,三个reader分别已经分配到了某一些Split、也进行了一些处理,已经有Position了。那我们分别来看一下枚举器和读取器需要在检查点的时候存储哪些东西
· 枚举器:未分配记录分片(Split_5),已分配未存入检查点记录分片(Split_4)
· 读取器:已被分配记录分片(Split_0,1,3),记录分配状态(Split_2)

6. 三步简单实现Source

1) Split/SplitState

  • Split:外部系统分片
  • SplitSerializer:序列化/反序列化Split传递给SourceReader
  • SplitState:Split状态,用于Checkpoint与恢复

2) SplitEnumerator

  • 发现与订阅Split
  • EnumState:Enumerator的状态,用于Checkpoint与恢复
  • EnumStateSerializer:序列化/反序列化EnumState

3) SourceReader

  • SplitReader:与外部系统进行数据交互的接口
  • FetcherManager:选择线程模型(目前已有)
  • RecordEmitter:转换消息类型与处理事件时间

如果我们仔细去想一下就会发现,其实这些东西绝大多数都是和外部系统打交道的,也就是说和Flink引擎本身打交道的部分很少,用户不再需要去担心 checkpoint 锁的问题,多线程的问题等等,能够把更多的开发精力来集中在开发和外部系统交互的部分上。所以说,新的Source API是通过这些抽象来大大的简化了开发者的开发。

三. Sink API- Flink数据的出口

如果对Flink有一定的了解的话会发现它可以做到精确一次的语义,数据既不重复也不丢失。那么为了实现这个“精确一次”Flink也做了很多的工作,其中非常重要的一点就是在Sink端实现了二阶段提交。

image.png

1. 预提交阶段

在预提交阶段里,由于我们的这个分布式系统一般是存在这种“协调者1+执行者n”的模式,那么在预提交的预提交阶段里,首先我们的协调者是需要请求提交的,也就是说他需要给所有的执行者来发送请求提交的消息,从而来开始整个的二阶段提交。
当执行者收到了请求提交的消息,他会做一些提交的准备工作。在所有的准备工作都做完之后,他所有的执行者会向这个协调者回复说明现在已经准备好进行下一步的提交工作了。当协调者收到了所有执行者的“可继续”请求后,预提交阶段结束,进入我们提交第二阶段-提交执行阶段。

2. 提交执行阶段

image.png

提交者会向执行者发送决定提交的消息,执行者会把刚刚准备好的提交相关的东西来进行一个处理,来真正的去执行一个提交的动作。在完成之后会向协调者汇报一个回复的结果,反馈提交是否正常执行。

一旦协调者决定进入第二个提交执行阶段,所有的执行者必须要不打折扣地把命令执行下去。也就是说如果某个协调者在这一阶段出了问题的话,他在恢复起来之后还是要把这个决定执行下去的。也就是说一旦决定提交,执行者便必须要把提交这一个动作贯彻下去。

image.png

如果在预提交阶段某一个执行者准备提交的时候可能出现了一些故障等、没有做正确的提交动作,那么他可能向协调者会回应了一个错误,比如网络断了,也可能经过一段时间超时之后协调者没有收到这个三号执行者的回应请求,那么协调者就会触发第二阶段的回滚动作。也就是会告诉所有的执行者“这次提交尝试失败了,需要大家回滚到之前的状态”。而后我们的执行者便会出现一个回滚动作,撤销上一步操作。

3. 二阶段提交在Flink中的做法

1)预提交阶段

image.png

以这个文件系统的Sink来举个例子。
文件系统的Sink在接收到了检查点边界之后做预提交动作(把当前的数据落盘写到硬盘上的某一个临时文件里),当预提交阶段完成之后,所有的operator会向我们的协调者回复 “已经准备好进行提交”的信息。

2) 提交执行阶段
image.png

第二个阶段,提交执行阶段开启。JobManager会向所有的算子发送提交执行的指令,Sink在接收到这个指令之后,便会真正的去做最后的提交动作。
我们还是以文件系统来来举例子,那么刚刚我们已经说过了,在预提交的阶段数据被写到了一个临时文件里,那么在真正的进行提交的时候,临时文件会被按照我们事先定义好的这个名字规范重命名,相当于实现了提交。
这里要注意,临时文件这一设置并非无用,它对后续可能发生的回滚等状况具有铺垫性的作用。我们是巧妙利用了二阶段提交的机制来保障精确一次的语义。

4. Sink模型

image.png

1) Writer:负责在写入或预提交的阶段,把上游源源不断的数据写到中间的某一个状态里去。
2) Committable:上述所说的“中间的状态”,是可以进行这个提交操作的元件。
3) Committer:把Committable真正的去提交上去
4) Global Commiter:全局提交器。这个组件是可选的、取决于你的外部系统。例:Iceberg。

四. 未来发展

image.png

  • 完善新Source

因为Source和Sink刚刚推出不久,所以说相对来讲还是存在一些问题的。有些开发者可能会有一些新的需求、需要新的更新与提升。目前已经算一个相对稳定的状态,但还是需要去不断地完善。

  • 迁移现有连接器至新API

随着流批一体连接器的不断推进,所有的连接器会迁移到新的API上。

  • 连接器测试框架

连接器测试框架尝试去给所有的connector提供一个相对来讲比较一致、统一的测试标准。测试开发者不再需要去自己写一些case、考虑各种各样的测试环境、测试场景等等。让我们的开发者能够像搭积木一样快速的用不同的场景,不同的用例来测试自己的代码,从而把更多的开发精力集中在开发这个本身的逻辑上面,大大减少开发者的测试负担。这也是Source API,Sink API和后续的framework研发的一致目标。是为了让连接器开发更加简单、门槛更低,从而吸引更多的开发者为Flink生态做贡献。

活动推荐:

仅需99元即可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方链接了解活动详情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
NoSQL 关系型数据库 Java
实时计算 Flink版产品使用问题之如何使用Flink MongoDB Connector连接MongoDB
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
416 0
|
7月前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
7月前
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
7月前
|
SQL Java API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
7月前
|
SQL 关系型数据库 数据库
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

相关产品

  • 实时计算 Flink版