开发者学堂课程【开源 Flink 极速上手教程:Flink Ecosystems】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/331/detail/3713
Flink Ecosystems(二)
二.常用 Connector
1. Kafka Connector
这个是使用最多的,因为 Flink 是一个流计算的引擎,所以 Kafka 是最流行的消息队列,所以过去 Flink 的用户大部分都使用 Kafka。如果要创建 Kafka 的表,需要指定一些特定的参数比如connector 字段指定成 Kafka, kafka 后面还可以带一个版本号,topic 代表对应的 kafka 的 topic 是什么,bootstrap.servers 是一个 Kafka brokers 的地址,group.id 是读一张 Kafka 表的时候需要的 consumer group,format 是表里源数据格式,startup.mode 表示希望从什么点位开始消费。这并不是所有的参数,如果想具体了解,还是参考官方文档。
要使用 Kafka Connector 需要一些依赖的 jar 包,根据使用的 Kafka 版本不一样,所使用的 Jar 包也不一样。大部分用户可能使用 universal 这个版本就可以了,0.11和0.10的可以用这两个版本特定的 Jar 包,jar 包都可以在官网上下载到。
2. Elasticsearch Connector
Elasticsearch Connector 只实现了 Sink,所以只能写不能读。connector 类型指定成elasticsearch-6 或者是 elasticsearch-7,hosts 就是指定各个节点,通过域名加版本号的形式,index 指定的写,index大约类似于传统数据库中的一张表,需要去写哪一个 index,相当于写一张表,document-type 类似于传统数据库中表中的某一行,写这张表时指定一个document-type,在 elasticsearch-6 中需要指定,在 elasticsearch-7 中这个参数不需要指定。
单独强调一点使用 Elasticsearch Connector 时 Key 的处理问题,Elasticsearch 的 Sink 支持 append 模式和 upset 模式,如果这张表指定了primary key,那么 Sink 会以 upset 模式工作,如果没有指定,则以 append 模式工作。在 upset 模式工作时,也就是指定了 PK,Sink 会用所有的 PK 字段生成document ID,document ID 是一个用 PK 字段拼出来的字符串。同时可以指定delimiter,多个字段的时候可以指定一个分隔符。要生成 document ID 字符串,有些类型因为不是很好转成 string 格式,所以它不适合作为 PK,像 ROW, MAP 这种复杂类型一般不太适合作为PK。使用 Elasticsearch 需要指定一些额外的依赖加入,也是根据 Elasticsearch的版本添加不同版本的 elasticsearch Connector。
3.FileSystem Connector
顾名思义这个 Connector 对接的是文件系统,读写的就是这个文件系统的文件,这里的 FileSystem 指的是 Flink 的FileSystem 抽象,支持很多种不同的实现比如 Local、Hadoop、S3、OSS等。在Flink1.11中也做了一些升级,比如支持了分区,分区格式与 Hive 比较相似,表目录是一级目录,子目录对应的是分区目录,每一个分区最终对应的相当于是表目录下面的一个子目录,对应的是分区的数据。但是与 Hive 不同的一点是 FileSystem Connector 的分区信息是不需要注册到一个 Catalog 中的,是通过目录结构自动的推断分区信息,Hive 必须要传一条关于分区的元数据信息。FileSystem 支持 Streaming Sink,也就是说流式的数据可以写入到文件系统的文件中,暂时不支持 Streaming Source。FileSystem Connector 本身不需要添加额外的依赖,但是如果要对接特定的FileSystem 实现的依赖还是要添加,或者需要使用特定的 Format,比如文件是ORC 格式,那么 ORC 的 jar 包需要添加。
例子:
schema 信息可以看到这里指定了两个分区字段,分别是 column_name1 和column_name2,位置参数中 connector 指定成 filesystem, path 指定的是文件的表目录的路径,这个表具体使用的哪种filesystem 是通过路径中的 stream 决定的,比如这里是 file 则表示这个表是本地的文件系统,如果写成 hdfs 则指定为这张表在 hdfs 上。format 表的数据格式,
partition.default-name 含义是当partition 字段的值为 null,写出来的目录名字应该叫什么,sink.shuffle-by-partition.enable 主要是用来在分区时指定分区数据需不需要做shuffle。
4. Hive Connector
Hive 作为 Hadoop 生态圈应该是最早的SQL 引擎,也是用户使用比较多,在批处理的场景上大部分用户都在使用 Hive, Flink SQL 为了更好的支持批处理场景,也提供了 Hive 的能力。Hive 分为两个层面,首先在元数据上,通过HiveCatalog 对接 Hive 元数据。同时提供 HiveTableSource、HiveTableSink 读写Hive 表数据。
而且提供了与 Hive 非常好的兼容性,首先是支持多个 Hive 版本,从1.0.0到3.1.2,基本上涵盖了目前所有的主流的 Hive 版本,同时支持了多种文件格式,比如 text、ORC等,类型的支持也比较全,另外通过 Hive Dialect 提供了 Hive 风格的语法,用户开启 Hive Dialect 以后可以在 Flink 中写 Hive 的 SQL 语句。
在Flink1.11中也尝试了把 Hive 的离线数仓模式跟流计算的能力进行整合,得出了一个 Hive 批流一体数仓的概念,主要增强 Hive 的实时性,比如支持 Hive 的流式写入,还支持流式的读,流式的读就是可以实时监控 Hive 的数据,当 Hive 表中有新数据加入可以通过增量的方式读取出来。同时支持 Hive 表的 Lookup Join,Lookup Join 的含义是用一张 Hive 的表来做一个离线的表与线上的一个表进行关联。
例子展示如何指定 HiveCatalog:
通过 Flink SQL 的 yaml 文件来指定,yaml 文件中指定 HiveCatalog 如何配置。
使用 Hive Connector 需要添加额外的一些依赖,在 Flink1.11中提供了预先打好的四个 Hive 依赖的 jar 包,可以根据使用的版本选择对应的 jar 包下载使用。
以上介绍这些 Flink 常用的 Connector 对于刚接触的同学可能会说功能很强大可以对接外部系统,但是刚开始用没有这些外部系统,要用 Flink 系统还得先有个 Hive 等是很麻烦的,为了解决这个问题,提供了几个内置的 Connector,内置的 Connector 的作用一方面是帮助新的用户能够尽快上手,能够更快的体验 Flink 强大的功能,另一个也是为了帮助 Flink 的开发人员去做一些代码的调试或者性能测试等。
5.DataGen Connector
DataGen 实际就是一个数据生成器
比如创建了 DataGen 的一张表,指定了几个字段,那么 connector 的类型指定成 datagen,这时如果写 SQL 语句读这张 DataGen 的表,这个 connector 会负责生成数据,也就是说数据是生成出来的,并不是之前存储在某个地方。用户可以做一些比较细腻度的控制,比如可以指定每秒钟生成多少数据,整型的字段可以指定通过 sequence 的方式来创建,sequence 是从小到大来创建,比如从一到一千,那就是一共生成一千条数据,也可以指定通过 random 的方式创建,random 方式就是随机的指定数值,random 的最大最小值也可以指定,还可以指定长度是多少。
6.Print Connector
Print Connector 提供 Sink 的功能,Sink就是负责把所有数据打印到标准输出或者错误输出上,打印格式:$row_kind(f0,f1, f2)表示这条记录是upset、append 等等。创建 print 表的时候,只需要把 connector 类型指定成print 就可以。
7.BlackHole Connector
这是黑洞 Connector,这个 Connector 也是做 Sink,会丢弃掉所有的数据,也就是说数据写过来,什么都不做就丢掉,主要可以用来做性能的测试,在不关心数据实际写出去的时候可以使用来做一个性能测试,排除掉写数据对性能的影响。创建 BlackHole 只需要把 connector 类型指定成 blackhole。