1 概览
1.1 预定义的源和接收器
Flink内置了一些基本数据源和接收器,并且始终可用。该预定义的数据源包括文件,目录和插socket,并从集合和迭代器摄取数据。该预定义的数据接收器支持写入文件和标准输入输出及socket。
1.2 绑定连接器
连接器提供用于与各种第三方系统连接的代码。目前支持这些系统:
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)
要在应用程序中使用其中一个连接器,通常需要其他第三方组件,例如数据存储或消息队列的服务器。
虽然本节中列出的流连接器是Flink项目的一部分,并且包含在源版本中,但它们不包含在二进制分发版中。
1.3 Apache Bahir中的连接器
Flink的其他流处理连接器正在通过Apache Bahir发布,包括:
Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)
1.4 其他连接到Flink的方法
1.4.1 通过异步I / O进行数据渲染
使用连接器不是将数据输入和输出Flink的唯一方法。一种常见的模式是在一个Map或多个FlatMap 中查询外部数据库或Web服务以渲染主数据流。
Flink提供了一个用于异步I / O的API, 以便更有效,更稳健地进行这种渲染。
1.4.2 可查询状态
当Flink应用程序将大量数据推送到外部数据存储时,这可能会成为I / O瓶颈。如果所涉及的数据具有比写入更少的读取,则更好的方法可以是外部应用程序从Flink获取所需的数据。在可查询的状态界面,允许通过Flink被管理的状态,按需要查询支持这个。
2 HDFS连接器
此连接器提供一个Sink,可将分区文件写入任一Hadoop文件系统支持的文件系统 。
要使用此连接器,请将以下依赖项添加到项目中:
请注意,流连接器当前不是二进制发布的一部分
2.1 Bucketing File Sink
可以配置分段行为以及写入,但我们稍后会介绍。这是可以创建一个默认情况下汇总到按时间拆分的滚动文件的存储槽的方法
- Java
- Scala
唯一必需的参数是存储桶的基本路径。可以通过指定自定义bucketer,写入器和批量大小来进一步配置接收器。
默认情况下,当数据元到达时,分段接收器将按当前系统时间拆分,并使用日期时间模式"yyyy-MM-dd–HH"命名存储区。这种模式传递给 DateTimeFormatter使用当前系统时间和JVM的默认时区来形成存储桶路径。用户还可以为bucketer指定时区以格式化存储桶路径。每当遇到新日期时,都会创建一个新存储桶。
例如,如果有一个包含分钟作为最精细粒度的模式,将每分钟获得一个新桶。每个存储桶本身都是一个包含多个部分文件的目录:接收器的每个并行实例将创建自己的部件文件,当部件文件变得太大时,接收器也会在其他文件旁边创建新的部件文件。当存储桶变为非活动状态时,将刷新并关闭打开的部件文件。如果存储桶最近未写入,则视为非活动状态。默认情况下,接收器每分钟检查一次非活动存储桶,并关闭任何超过一分钟未写入的存储桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一个BucketingSink。
也可以通过指定自定义bucketer setBucketer()上BucketingSink。如果需要,bucketer可以使用数据元或元组的属性来确定bucket目录。
默认编写器是StringWriter。这将调用toString()传入的数据元并将它们写入部分文件,由换行符分隔。在a setWriter() 上指定自定义编写器使用BucketingSink。如果要编写Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter,也可以配置为使用压缩。
有两个配置选项指定何时应关闭零件文件并启动新零件文件:
通过设置批量大小(默认部件文件大小为384 MB)
通过设置批次滚动时间间隔(默认滚动间隔为Long.MAX_VALUE)
当满足这两个条件中的任何一个时,将启动新的部分文件。看如下例子:
Java
Scala
这将创建一个接收器,该接收器将写入遵循此模式的存储桶文件:
Java
生成结果
- date-time是我们从日期/时间格式获取的字符串
- parallel-task是并行接收器实例的索引
- count是由于批处理大小或批处理翻转间隔而创建的部分文件的运行数
然而这种方式创建了太多小文件,不适合HDFS!仅供娱乐!
3 Apache Kafka连接器
3.1 简介
此连接器提供对Apache Kafka服务的事件流的访问。
Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖Kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。
为用例和环境选择一个包(maven artifact id)和类名。对于大多数用户来说,FlinkKafkaConsumer08(部分flink-connector-kafka)是合适的。
然后,导入maven项目中的连接器:
https://blog.csdn.net/qq_33589510/article/details/97064338
3.2 ZooKeeper安装及配置
配置系统环境
修改配置数据存储路径
启动
3.3 Kafka部署及测试
假设你刚刚开始并且没有现有的Kafka或ZooKeeper数据
由于Kafka控制台脚本对于基于Unix和Windows的平台不同,因此在Windows平台上使用bin \ windows \而不是bin /,并将脚本扩展名更改为.bat。
Step 1:下载代码
下载:https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
解压
配置环境变量
配置服务器属性
修改日志存储路径
修改主机名
Step 2: 启动服务器
Kafka使用ZooKeeper,因此如果还没有ZooKeeper服务器,则需要先启动它。
- 后台模式启动
Step 3: 创建一个主题
- 创建topic
Step 4: 发送一些消息
Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。 默认情况下,每行将作为单独的消息发送。
运行生产者,然后在控制台中键入一些消息以发送到服务器。
- 启动生产者
Step 5: 启动一个消费者
Kafka还有一个命令行使用者,它会将消息转储到标准输出。
- 分屏,新建消费端
在不同的终端中运行上述每个命令,那么现在应该能够在生产者终端中键入消息并看到它们出现在消费者终端中
所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。
3.4 Kafka 1.0.0+ Connector
从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主要版本。 相反,它在Flink发布时跟踪最新版本的Kafka。
如果您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka连接器。 如果使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。
兼容性
通过Kafka客户端API和代理的兼容性保证,通用Kafka连接器与较旧和较新的Kafka代理兼容。 它与版本0.11.0或更高版本兼容,具体取决于所使用的功能。
将Kafka Connector从0.11迁移到通用(V1.10新增)
要执行迁移,请参阅升级作业和Flink版本指南和
在整个过程中使用Flink 1.9或更新版本。
不要同时升级Flink和操作符。
确保您作业中使用的Kafka Consumer和/或Kafka Producer分配了唯一标识符(uid):
使用stop with savepoint功能获取保存点(例如,使用stop --withSavepoint)CLI命令。
用法
要使用通用Kafka连接器,请为其添加依赖关系:
然后实例化新源(FlinkKafkaConsumer)
Flink Kafka Consumer是一个流数据源,可以从Apache Kafka中提取并行数据流。 使用者可以在多个并行实例中运行,每个实例都将从一个或多个Kafka分区中提取数据。
Flink Kafka Consumer参与了检查点,并保证在故障期间没有数据丢失,并且计算处理元素“恰好一次”。(注意:这些保证自然会假设Kafka本身不会丢失任何数据。)
请注意,Flink在内部将偏移量作为其分布式检查点的一部分进行快照。 承诺给Kafka的抵消只是为了使外部的进展观与Flink对进展的看法同步。 这样,监控和其他工作可以了解Flink Kafka消费者在多大程度上消耗了一个主题。
和接收器(FlinkKafkaProducer)。
除了从模块和类名中删除特定的Kafka版本之外,API向后兼容Kafka 0.11连接器。
3.5 Kafka消费者
Flink的Kafka消费者被称为FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供对一个或多个Kafka主题的访问。
构造函数接受以下参数:
主题名称/主题名称列表
DeserializationSchema / KeyedDeserializationSchema用于反序列化来自Kafka的数据
Kafka消费者的属性。需要以下属性:
“bootstrap.servers”(以逗号分隔的Kafka经纪人名单)
“zookeeper.connect”(逗号分隔的Zookeeper服务器列表)(仅Kafka 0.8需要)
“group.id”消费者群组的ID
- 上述程序注意配置ip主机映射
- 虚拟机hosts
- 本地机器 hosts
发送消息
运行程序消费消息
- Example:
- Java
- Scala
The DeserializationSchema
Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。
在 DeserializationSchema允许用户指定这样的一个架构。T deserialize(byte[] message) 为每个Kafka消息调用该方法,从Kafka传递值。
从它开始通常很有帮助AbstractDeserializationSchema,它负责将生成的Java / Scala类型描述为Flink的类型系统。实现vanilla的用户DeserializationSchema需要自己实现该getProducedType(…)方法。
为了访问Kafka消息的键和值,KeyedDeserializationSchema具有以下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)。
为方便起见,Flink提供以下模式:
TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)创建基于Flink的模式TypeInformation。如果Flink编写和读取数据,这将非常有用。此模式是其他通用序列化方法的高性能Flink替代方案。
JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”)作为(Int / String / …)()从中访问字段。KeyValue objectNode包含一个“key”和“value”字段,其中包含所有字段,以及一个可选的“元数据”字段,用于公开此消息的偏移量/分区/主题。
AvroDeserializationSchema它使用静态提供的模式读取使用Avro格式序列化的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(…))中推断出模式,也可以GenericRecords 使用手动提供的模式(with AvroDeserializationSchema.forGeneric(…))。此反序列化架构要求序列化记录不包含嵌入式架构。
还有一个可用的模式版本,可以在Confluent Schema Registry中查找编写器的模式(用于编写记录的 模式)。使用这些反序列化模式记录将使用从模式注册表中检索的模式进行读取,并转换为静态提供的模式(通过 ConfluentRegistryAvroDeserializationSchema.forGeneric(…)或ConfluentRegistryAvroDeserializationSchema.forSpecific(…))。
要使用此反序列化模式,必须添加以下附加依赖项:
当遇到因任何原因无法反序列化的损坏消息时,有两个选项 - 从deserialize(…)方法中抛出异常将导致作业失败并重新启动,或者返回null以允许Flink Kafka使用者以静默方式跳过损坏的消息。请注意,由于使用者的容错能力(请参阅下面的部分以获取更多详细信息),因此对损坏的消息执行失败将使消费者尝试再次反序列化消息。因此,如果反序列化仍然失败,则消费者将在该损坏的消息上进入不间断重启和失败循环。