Flink Ecosystems(二)|学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 快速学习 Flink Ecosystems(二)

开发者学堂课程【开源 Flink 极速上手教程:Flink Ecosystems】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/331/detail/3713


Flink Ecosystems(二)


二.常用 Connector

1. Kafka Connector

这个是使用最多的,因为 Flink 是一个流计算的引擎,所以 Kafka 是最流行的消息队列,所以过去 Flink 的用户大部分都使用 Kafka。如果要创建 Kafka 的表,需要指定一些特定的参数比如connector 字段指定成 Kafka, kafka 后面还可以带一个版本号,topic 代表对应的 kafka 的 topic 是什么,bootstrap.servers 是一个 Kafka brokers 的地址,group.id 是读一张 Kafka 表的时候需要的 consumer group,format 是表里源数据格式,startup.mode 表示希望从什么点位开始消费。这并不是所有的参数,如果想具体了解,还是参考官方文档。

图片50.png

要使用 Kafka Connector 需要一些依赖的 jar 包,根据使用的 Kafka 版本不一样,所使用的 Jar 包也不一样。大部分用户可能使用 universal 这个版本就可以了,0.11和0.10的可以用这两个版本特定的 Jar 包,jar 包都可以在官网上下载到。

图片51.png

2. Elasticsearch Connector

图片52.png

Elasticsearch Connector 只实现了 Sink,所以只能写不能读。connector 类型指定成elasticsearch-6 或者是 elasticsearch-7,hosts 就是指定各个节点,通过域名加版本号的形式,index 指定的写,index大约类似于传统数据库中的一张表,需要去写哪一个 index,相当于写一张表,document-type 类似于传统数据库中表中的某一行,写这张表时指定一个document-type,在 elasticsearch-6 中需要指定,在 elasticsearch-7 中这个参数不需要指定。

单独强调一点使用 Elasticsearch Connector 时 Key 的处理问题,Elasticsearch 的 Sink 支持 append 模式和 upset 模式,如果这张表指定了primary key,那么 Sink 会以 upset 模式工作,如果没有指定,则以 append 模式工作。在 upset 模式工作时,也就是指定了 PK,Sink 会用所有的 PK 字段生成document ID,document ID 是一个用 PK 字段拼出来的字符串。同时可以指定delimiter,多个字段的时候可以指定一个分隔符。要生成 document ID 字符串,有些类型因为不是很好转成 string 格式,所以它不适合作为 PK,像 ROW, MAP 这种复杂类型一般不太适合作为PK。使用 Elasticsearch 需要指定一些额外的依赖加入,也是根据 Elasticsearch的版本添加不同版本的 elasticsearch Connector。

3.FileSystem Connector

顾名思义这个 Connector 对接的是文件系统,读写的就是这个文件系统的文件,这里的 FileSystem 指的是 Flink 的FileSystem 抽象,支持很多种不同的实现比如 Local、Hadoop、S3、OSS等。在Flink1.11中也做了一些升级,比如支持了分区,分区格式与 Hive 比较相似,表目录是一级目录,子目录对应的是分区目录,每一个分区最终对应的相当于是表目录下面的一个子目录,对应的是分区的数据。但是与 Hive 不同的一点是 FileSystem Connector 的分区信息是不需要注册到一个 Catalog 中的,是通过目录结构自动的推断分区信息,Hive 必须要传一条关于分区的元数据信息。FileSystem 支持 Streaming Sink,也就是说流式的数据可以写入到文件系统的文件中,暂时不支持 Streaming Source。FileSystem Connector 本身不需要添加额外的依赖,但是如果要对接特定的FileSystem 实现的依赖还是要添加,或者需要使用特定的 Format,比如文件是ORC 格式,那么 ORC 的 jar 包需要添加。

例子:

图片53.png

schema 信息可以看到这里指定了两个分区字段,分别是 column_name1 和column_name2,位置参数中 connector 指定成 filesystem, path 指定的是文件的表目录的路径,这个表具体使用的哪种filesystem 是通过路径中的 stream 决定的,比如这里是 file 则表示这个表是本地的文件系统,如果写成 hdfs 则指定为这张表在 hdfs 上。format 表的数据格式,

partition.default-name 含义是当partition 字段的值为 null,写出来的目录名字应该叫什么,sink.shuffle-by-partition.enable 主要是用来在分区时指定分区数据需不需要做shuffle。

4. Hive Connector

Hive 作为 Hadoop 生态圈应该是最早的SQL 引擎,也是用户使用比较多,在批处理的场景上大部分用户都在使用 Hive, Flink SQL 为了更好的支持批处理场景,也提供了 Hive 的能力。Hive 分为两个层面,首先在元数据上,通过HiveCatalog 对接 Hive 元数据。同时提供 HiveTableSource、HiveTableSink 读写Hive 表数据。

而且提供了与 Hive 非常好的兼容性,首先是支持多个 Hive 版本,从1.0.0到3.1.2,基本上涵盖了目前所有的主流的 Hive 版本,同时支持了多种文件格式,比如 text、ORC等,类型的支持也比较全,另外通过 Hive Dialect 提供了 Hive 风格的语法,用户开启 Hive Dialect 以后可以在 Flink 中写 Hive 的 SQL 语句。

在Flink1.11中也尝试了把 Hive 的离线数仓模式跟流计算的能力进行整合,得出了一个 Hive 批流一体数仓的概念,主要增强 Hive 的实时性,比如支持 Hive 的流式写入,还支持流式的读,流式的读就是可以实时监控 Hive 的数据,当 Hive 表中有新数据加入可以通过增量的方式读取出来。同时支持 Hive 表的 Lookup Join,Lookup Join 的含义是用一张 Hive 的表来做一个离线的表与线上的一个表进行关联。

例子展示如何指定 HiveCatalog:

图片54.png

通过 Flink SQL 的 yaml 文件来指定,yaml 文件中指定 HiveCatalog 如何配置。

使用 Hive Connector 需要添加额外的一些依赖,在 Flink1.11中提供了预先打好的四个 Hive 依赖的 jar 包,可以根据使用的版本选择对应的 jar 包下载使用。

以上介绍这些 Flink 常用的 Connector 对于刚接触的同学可能会说功能很强大可以对接外部系统,但是刚开始用没有这些外部系统,要用 Flink 系统还得先有个 Hive 等是很麻烦的,为了解决这个问题,提供了几个内置的 Connector,内置的 Connector 的作用一方面是帮助新的用户能够尽快上手,能够更快的体验 Flink 强大的功能,另一个也是为了帮助 Flink 的开发人员去做一些代码的调试或者性能测试等。

5.DataGen Connector

DataGen 实际就是一个数据生成器

图片55.png

比如创建了 DataGen 的一张表,指定了几个字段,那么 connector 的类型指定成 datagen,这时如果写 SQL 语句读这张 DataGen 的表,这个 connector 会负责生成数据,也就是说数据是生成出来的,并不是之前存储在某个地方。用户可以做一些比较细腻度的控制,比如可以指定每秒钟生成多少数据,整型的字段可以指定通过 sequence 的方式来创建,sequence 是从小到大来创建,比如从一到一千,那就是一共生成一千条数据,也可以指定通过 random 的方式创建,random 方式就是随机的指定数值,random 的最大最小值也可以指定,还可以指定长度是多少。

6.Print Connector

Print Connector 提供 Sink 的功能,Sink就是负责把所有数据打印到标准输出或者错误输出上,打印格式:$row_kind(f0,f1, f2)表示这条记录是upset、append 等等。创建 print 表的时候,只需要把 connector 类型指定成print 就可以。

图片56.png

7.BlackHole Connector

这是黑洞 Connector,这个 Connector 也是做 Sink,会丢弃掉所有的数据,也就是说数据写过来,什么都不做就丢掉,主要可以用来做性能的测试,在不关心数据实际写出去的时候可以使用来做一个性能测试,排除掉写数据对性能的影响。创建 BlackHole 只需要把 connector 类型指定成 blackhole。

图片57.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
SQL 存储 API
Flink教程(20)- Flink高级特性(双流Join)
Flink教程(20)- Flink高级特性(双流Join)
143 0
|
1月前
|
Java 数据库连接 数据库
实时计算 Flink版操作报错合集之flink jdbc写入数据时,长时间没写入后报错,是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
49 9
|
1月前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之报错 NoResourceAvailableException 是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
31 0
|
1月前
|
SQL 分布式计算 资源调度
实时计算 Flink版产品使用合集之Flink CDCRangeMap该怎么使用
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
存储 消息中间件 分布式计算
flink的常见知识点总结(一)
flink的常见知识点总结(一)
|
7月前
|
SQL 存储 API
Flink教程(25)- Flink高级特性(FlinkSQL整合Hive)
Flink教程(25)- Flink高级特性(FlinkSQL整合Hive)
498 0
|
7月前
|
SQL 关系型数据库 MySQL
Flink教程(19)- Flink高级特性(BroadcastState)
Flink教程(19)- Flink高级特性(BroadcastState)
28 0
|
7月前
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
81 0
|
7月前
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
133 0
|
7月前
|
资源调度 Java 调度
Flink教程(05)- Flink原理简单分析
Flink教程(05)- Flink原理简单分析
82 0