Flink Ecosystems(二)|学习笔记

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 快速学习 Flink Ecosystems(二)

开发者学堂课程【开源 Flink 极速上手教程:Flink Ecosystems】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/331/detail/3713


Flink Ecosystems(二)


二.常用 Connector

1. Kafka Connector

这个是使用最多的,因为 Flink 是一个流计算的引擎,所以 Kafka 是最流行的消息队列,所以过去 Flink 的用户大部分都使用 Kafka。如果要创建 Kafka 的表,需要指定一些特定的参数比如connector 字段指定成 Kafka, kafka 后面还可以带一个版本号,topic 代表对应的 kafka 的 topic 是什么,bootstrap.servers 是一个 Kafka brokers 的地址,group.id 是读一张 Kafka 表的时候需要的 consumer group,format 是表里源数据格式,startup.mode 表示希望从什么点位开始消费。这并不是所有的参数,如果想具体了解,还是参考官方文档。

图片50.png

要使用 Kafka Connector 需要一些依赖的 jar 包,根据使用的 Kafka 版本不一样,所使用的 Jar 包也不一样。大部分用户可能使用 universal 这个版本就可以了,0.11和0.10的可以用这两个版本特定的 Jar 包,jar 包都可以在官网上下载到。

图片51.png

2. Elasticsearch Connector

图片52.png

Elasticsearch Connector 只实现了 Sink,所以只能写不能读。connector 类型指定成elasticsearch-6 或者是 elasticsearch-7,hosts 就是指定各个节点,通过域名加版本号的形式,index 指定的写,index大约类似于传统数据库中的一张表,需要去写哪一个 index,相当于写一张表,document-type 类似于传统数据库中表中的某一行,写这张表时指定一个document-type,在 elasticsearch-6 中需要指定,在 elasticsearch-7 中这个参数不需要指定。

单独强调一点使用 Elasticsearch Connector 时 Key 的处理问题,Elasticsearch 的 Sink 支持 append 模式和 upset 模式,如果这张表指定了primary key,那么 Sink 会以 upset 模式工作,如果没有指定,则以 append 模式工作。在 upset 模式工作时,也就是指定了 PK,Sink 会用所有的 PK 字段生成document ID,document ID 是一个用 PK 字段拼出来的字符串。同时可以指定delimiter,多个字段的时候可以指定一个分隔符。要生成 document ID 字符串,有些类型因为不是很好转成 string 格式,所以它不适合作为 PK,像 ROW, MAP 这种复杂类型一般不太适合作为PK。使用 Elasticsearch 需要指定一些额外的依赖加入,也是根据 Elasticsearch的版本添加不同版本的 elasticsearch Connector。

3.FileSystem Connector

顾名思义这个 Connector 对接的是文件系统,读写的就是这个文件系统的文件,这里的 FileSystem 指的是 Flink 的FileSystem 抽象,支持很多种不同的实现比如 Local、Hadoop、S3、OSS等。在Flink1.11中也做了一些升级,比如支持了分区,分区格式与 Hive 比较相似,表目录是一级目录,子目录对应的是分区目录,每一个分区最终对应的相当于是表目录下面的一个子目录,对应的是分区的数据。但是与 Hive 不同的一点是 FileSystem Connector 的分区信息是不需要注册到一个 Catalog 中的,是通过目录结构自动的推断分区信息,Hive 必须要传一条关于分区的元数据信息。FileSystem 支持 Streaming Sink,也就是说流式的数据可以写入到文件系统的文件中,暂时不支持 Streaming Source。FileSystem Connector 本身不需要添加额外的依赖,但是如果要对接特定的FileSystem 实现的依赖还是要添加,或者需要使用特定的 Format,比如文件是ORC 格式,那么 ORC 的 jar 包需要添加。

例子:

图片53.png

schema 信息可以看到这里指定了两个分区字段,分别是 column_name1 和column_name2,位置参数中 connector 指定成 filesystem, path 指定的是文件的表目录的路径,这个表具体使用的哪种filesystem 是通过路径中的 stream 决定的,比如这里是 file 则表示这个表是本地的文件系统,如果写成 hdfs 则指定为这张表在 hdfs 上。format 表的数据格式,

partition.default-name 含义是当partition 字段的值为 null,写出来的目录名字应该叫什么,sink.shuffle-by-partition.enable 主要是用来在分区时指定分区数据需不需要做shuffle。

4. Hive Connector

Hive 作为 Hadoop 生态圈应该是最早的SQL 引擎,也是用户使用比较多,在批处理的场景上大部分用户都在使用 Hive, Flink SQL 为了更好的支持批处理场景,也提供了 Hive 的能力。Hive 分为两个层面,首先在元数据上,通过HiveCatalog 对接 Hive 元数据。同时提供 HiveTableSource、HiveTableSink 读写Hive 表数据。

而且提供了与 Hive 非常好的兼容性,首先是支持多个 Hive 版本,从1.0.0到3.1.2,基本上涵盖了目前所有的主流的 Hive 版本,同时支持了多种文件格式,比如 text、ORC等,类型的支持也比较全,另外通过 Hive Dialect 提供了 Hive 风格的语法,用户开启 Hive Dialect 以后可以在 Flink 中写 Hive 的 SQL 语句。

在Flink1.11中也尝试了把 Hive 的离线数仓模式跟流计算的能力进行整合,得出了一个 Hive 批流一体数仓的概念,主要增强 Hive 的实时性,比如支持 Hive 的流式写入,还支持流式的读,流式的读就是可以实时监控 Hive 的数据,当 Hive 表中有新数据加入可以通过增量的方式读取出来。同时支持 Hive 表的 Lookup Join,Lookup Join 的含义是用一张 Hive 的表来做一个离线的表与线上的一个表进行关联。

例子展示如何指定 HiveCatalog:

图片54.png

通过 Flink SQL 的 yaml 文件来指定,yaml 文件中指定 HiveCatalog 如何配置。

使用 Hive Connector 需要添加额外的一些依赖,在 Flink1.11中提供了预先打好的四个 Hive 依赖的 jar 包,可以根据使用的版本选择对应的 jar 包下载使用。

以上介绍这些 Flink 常用的 Connector 对于刚接触的同学可能会说功能很强大可以对接外部系统,但是刚开始用没有这些外部系统,要用 Flink 系统还得先有个 Hive 等是很麻烦的,为了解决这个问题,提供了几个内置的 Connector,内置的 Connector 的作用一方面是帮助新的用户能够尽快上手,能够更快的体验 Flink 强大的功能,另一个也是为了帮助 Flink 的开发人员去做一些代码的调试或者性能测试等。

5.DataGen Connector

DataGen 实际就是一个数据生成器

图片55.png

比如创建了 DataGen 的一张表,指定了几个字段,那么 connector 的类型指定成 datagen,这时如果写 SQL 语句读这张 DataGen 的表,这个 connector 会负责生成数据,也就是说数据是生成出来的,并不是之前存储在某个地方。用户可以做一些比较细腻度的控制,比如可以指定每秒钟生成多少数据,整型的字段可以指定通过 sequence 的方式来创建,sequence 是从小到大来创建,比如从一到一千,那就是一共生成一千条数据,也可以指定通过 random 的方式创建,random 方式就是随机的指定数值,random 的最大最小值也可以指定,还可以指定长度是多少。

6.Print Connector

Print Connector 提供 Sink 的功能,Sink就是负责把所有数据打印到标准输出或者错误输出上,打印格式:$row_kind(f0,f1, f2)表示这条记录是upset、append 等等。创建 print 表的时候,只需要把 connector 类型指定成print 就可以。

图片56.png

7.BlackHole Connector

这是黑洞 Connector,这个 Connector 也是做 Sink,会丢弃掉所有的数据,也就是说数据写过来,什么都不做就丢掉,主要可以用来做性能的测试,在不关心数据实际写出去的时候可以使用来做一个性能测试,排除掉写数据对性能的影响。创建 BlackHole 只需要把 connector 类型指定成 blackhole。

图片57.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 关系型数据库 MySQL
Hive跨集群和版本迁移
Hive跨集群和版本迁移
|
数据处理 流计算
Flink CDC 开启事物精准一致之后也会有存在丢数据的情况
Flink CDC 开启事物精准一致之后也会有存在丢数据的情况嘛?
568 1
|
SQL 缓存 Java
flink cdc 同步问题之如何同步多张库表
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
5月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
887 1
|
9月前
|
消息中间件 缓存 负载均衡
php怎么解决高并发的问题
在实际应用中,应根据具体需求和应用场景,选择合适的优化方案,并进行持续监控和优化,确保系统的高效稳定运行。
443 6
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
1388 11
|
SQL 关系型数据库 数据库
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
877 1
|
SQL 关系型数据库 MySQL
MySQL数据库——视图-介绍及基本语法(创建、查询、修改、删除、演示示例)
MySQL数据库——视图-介绍及基本语法(创建、查询、修改、删除、演示示例)
489 0
|
消息中间件 SQL Oracle
实时计算 Flink版产品使用合集之增量同步速度较慢,导致延迟增加,该如何优化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。