Flink 1.14.0 全新的 Kafka Connector

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 1.14.0 全新的 Kafka Connector

Apache Kafka Connector#

Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka Topic 读取数据和向 Kafka Topic 写入数据,并保证恰好一次次语义。

Dependency#

Apache Flink 附带了一个通用的 Kafka 连接器,它试图跟踪最新版本的 Kafka 客户端。它使用的客户端版本可能会在 Flink 版本之间发生变化。最近的 Kafka 客户端向后兼容 broker 版本 0.10.0 或更高版本。关于 Kafka 兼容性的详细信息,请参考 Kafka 官方 文档。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14.0</version>
</dependency>

Flink 的流连接器目前不是二进制发行版的一部分。在此处 查看如何与它们链接以进行集群执行。

Kafka Source#

本部分介绍基于新 数据源 API 的 Kafka Source。


Usage#

Kafka Source 提供了一个 builder 类来构建 KafkaSource 的实例。下面的代码片段展示了如何构建一个 KafkaSource 来消费来自主题 “input-topic” 最早偏移量的消息,消费者组是“my-group”,并且仅将消息的值反序列化为字符串。

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

构建 KafkaSource 「需要」以下属性:

  • Bootstrap servers,通过 setBootstrapServers(String)来配置
  • Topics / partitions to subscribe,请参阅以下 主题-分区订阅 以了解更多详细信息。
  • Deserializer to parse Kafka messages,更多详细信息请参见以下 Deserializer。

Topic-partition Subscription#

Kafka 源码提供了 3 种 topic-partition 订阅方式:

  • 主题列表,订阅主题列表中所有分区的消息。例如:
KafkaSource.builder().setTopics("topic-a", "topic-b")
  • 主题模式,从名称与提供的正则表达式匹配的所有主题订阅消息。例如:
KafkaSource.builder().setTopicPattern("topic.*")
  • 分区集,订阅提供的分区集中的分区。例如:
final HashSet<TopicPartition> partitionSet = new

Deserializer#

解析 Kafka 消息需要一个反序列化器。Deserializer(反序列化模式)可以通过 配置setDeserializer(KafkaRecordDeserializationSchema),其中KafkaRecordDeserializationSchema定义了如何反序列化一个 Kafka ConsumerRecord

如果只需要 Kafka ConsumerRecord的值,可以使用 setValueOnlyDeserializer(DeserializationSchema)在 builder 中使用,其中DeserializationSchema定义了如何反序列化 Kafka 消息值的二进制文件。

你还可以使用 Kafka Deserializer 来反序列化 Kafka 消息值. 例如使用 StringDeserializer 将 Kafka 消息值反序列化为字符串:

import org.apache.kafka.common.serialization.StringDeserializer;
KafkaSource.<String>builder()
        .setDeserializer(KafkaRecordDeserializationSchema.valueO

Starting Offset#

Kafka Source 能够通过指定 OffsetsInitializer来消费从不同偏移量开始的消息。内置的初始值设定项包括:

KafkaSource.builder()
    // Start from committed offset of the consuming group, without reset strategy
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    // Start from the first record whose timestamp is greater than or equals a timestamp
    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
    // Start from earliest offset
    .setStartingOffsets(OffsetsInitializer.earliest())
    // Start from latest offset
    .setStartingOffsets(OffsetsInitializer.latest())

如果上面的内置初始值设定项无法满足你的要求,您还可以实现自定义偏移量初始值设定项。

如果未指定 offsets 初始值设定项,「则」默认使用 「OffsetsInitializer.earliest()」


Boundedness#

Kafka Source 旨在支持流式和批量运行模式。默认情况下,KafkaSource 设置为以流式方式运行,因此永远不会停止,直到 Flink 作业失败或被取消。您可以使用setBounded(OffsetsInitializer)指定停止偏移量并设置以批处理模式运行的源。当所有分区都达到它们的停止偏移量时,Source 将退出。

您还可以将 KafkaSource 设置为在流模式下运行,但仍然使用setUnbounded(OffsetsInitializer). 当所有分区达到其指定的停止偏移量时,Source 将退出。

Additional Properties #

除了上述属性外,您还可以使用setProperties(Properties)和为 KafkaSource 和 KafkaConsumer 设置任意属性setProperty(String, String)。KafkaSource 有以下配置选项:

  • client.id.prefix 定义用于 Kafka 消费者的客户端 ID 的前缀
  • partition.discovery.interval.ms定义 Kafka 源发现新分区的时间间隔 im 毫秒。有关更多详细信息,请参阅下面的 动态分区发现。
  • register.consumer.metrics 指定是否在 Flink 指标组中注册 KafkaConsumer 的指标
  • commit.offsets.on.checkpoint 指定是否在检查点向 Kafka broker 提交消费偏移量

KafkaConsumer 的配置可以参考 Apache Kafka文档 了解更多。

请注意,即使配置了以下键,构建器也会覆盖它:

  • key.deserializer 始终设置为 ByteArrayDeserializer
  • value.deserializer 始终设置为 ByteArrayDeserializer
  • auto.offset.reset.strategy被起始偏移量覆盖OffsetsInitializer#getAutoOffsetResetStrategy()
  • partition.discovery.interval.ms被调用时被覆盖为 -1setBounded(OffsetsInitializer)

下面的代码片段显示了配置 KafkaConsumer 以使用“PLAIN”作为 SASL 机制并提供 JAAS 配置:

KafkaSource.builder()
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.secu

Dynamic Partition Discovery#

为了在不重启 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题-分区订阅模式下定期发现新分区。要启用分区发现,请为 property 设置一个非负值partition.discovery.interval.ms

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000") // discover new partitions per 10 seconds

默认情况下「禁用」分区发现。您需要明确设置分区发现间隔才能启用此功能。


Event Time and Watermarks#

默认情况下,记录将使用嵌入在 Kafka 中的时间戳ConsumerRecord作为事件时间。您可以定义自己WatermarkStrategy的从记录本身提取事件时间,并在下游发出水印:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

本文档 描述了有关如何定义WatermarkStrategy.

Consumer Offset Committing#

Kafka source 在 checkpoint 「完成」时提交当前消费的 offset ,以保证 Flink 的 checkpoint 状态和 Kafka brokers 上的 commit offset 的一致性。

如果未启用检查点,则 Kafka 源依赖于 Kafka 消费者内部的自动定期偏移提交逻辑,由Kafka 消费者的属性配置enable.auto.commit并在其属性中配置auto.commit.interval.ms

需要注意的是 Kafka source 不依赖提交偏移量来实现容错。提交偏移量只是为了暴露消费者和消费组的进度以供监控。


Monitoring#

Kafka Source 在各自的 范围内 公开以下指标。

Scope of Metric#

Scope Metrics User Variables Description Type
Operator currentEmitEventTimeLag n/a 从记录事件时间戳到源连接器发出记录的时间跨度¹: currentEmitEventTimeLag = EmitTime - EventTime. Gauge
Operator watermarkLag n/a 水印滞后于墙时钟时间的时间跨度: watermarkLag = CurrentTime - Watermark Gauge
Operator sourceIdleTime n/a 源没有处理任何记录的时间跨度: sourceIdleTime = CurrentTime - LastRecordProcessTime Gauge
Operator pendingRecords n/a 源尚未提取的记录数。例如 Kafka 分区中消费者偏移后的可用记录。 Gauge
Operator KafkaSourceReader.commitsSucceeded n/a 如果偏移提交被打开并且检查点被开启,那么成功的偏移提交到 Kafka 的总数。 Counter
Operator KafkaSourceReader.commitsFailed n/a 如果打开偏移提交并启用检查点,则向 Kafka 提交偏移提交失败的总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方式,因此提交失败不会影响 Flink 的检查点分区偏移量的完整性。 Counter
Operator KafkaSourceReader.committedOffsets topic, partition 对于每个分区,最后一次成功提交到 Kafka 的偏移量。可以通过主题名称和分区 id 指定特定分区的指标。 Gauge
Operator KafkaSourceReader.currentOffsets topic, partition 每个分区的消费者当前读取偏移量。可以通过主题名称和分区 id 指定特定分区的指标。 Gauge

¹ 该指标是为最后处理的记录记录的瞬时值。提供此指标是因为延迟直方图可能很昂贵。瞬时延迟值通常足以很好地指示延迟。

Kafka Consumer Metrics#

Kafka 消费者的所有指标也都注册在 group 下KafkaSourceReader.KafkaConsumer。例如,Kafka 消费者指标“records-consumed-total”将在指标中报告:.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total

您可以通过配置 option 来配置是否注册 Kafka 消费者的指标register.consumer.metrics。默认情况下,此选项将设置为 true。

对于 Kafka 消费者的指标,您可以参考 Apache Kafka 文档 了解更多详细信息。


Behind the Scene#

如果您对 Kafka Source 在新数据源 API 的设计下如何工作感兴趣,您可能需要阅读此部分作为参考。有关新数据源 API 的详细信息,数据源 文档 和FLIP-27 提供了更多描述性讨论。

在新数据源 API 的抽象下,Kafka Source 由以下组件组成:

在新数据源 API 的抽象下,Kafka Source 由以下组件组成:

Source Split#

Kafka Source 中的一个源拆分代表 Kafka 主题的一个分区。Kafka Source 拆分包括:

  • TopicPartition 分裂代表
  • 分区的起始偏移量
  • 停止分区的偏移量,仅在源以有界模式运行时可用

Kafka source split 的状态也存储了partition 的当前消费 offset,当 Kafka source reader 为 snapshot 时,状态会转换为 immutable split,将当前 offset 赋值给immutable split 的起始偏移量。

您可以查看类 KafkaPartitionSplitKafkaPartitionSplitState`了解更多详情。

Split Enumerator#

Kafka 的拆分枚举器负责在提供的主题分区订阅模式下发现新的拆分(分区),并将拆分分配给读者,以循环方式均匀分布在子任务中。请注意,Kafka Source 的拆分枚举器会急切地将拆分推送到源阅读器,因此它不需要处理来自源阅读器的拆分请求。

Source Reader#

Kafka source 的 source reader 扩展了提供的SourceReaderBase,并使用单线程多路复用线程模型,该模型读取多个分配的拆分(分区),一个 KafkaConsumer 由一个 驱动SplitReader。消息在从 Kafka 中获取后立即反序列化SplitReader。拆分的状态或消息消费的当前进度由 更新KafkaRecordEmitter,它还负责在记录向下游发出时分配事件时间。


Kafka SourceFunction #

FlinkKafkaConsumer已弃用,将随 Flink 1.15 一起删除,请使用 KafkaSource

对于较旧的参考,您可以查看 Flink 1.13文档。

对于较旧的参考,您可以查看 Flink 1.13文档。

Kafka Sink#

KafkaSink 允许将记录流写入一个或多个 Kafka 主题。


Usage#

Kafka sink 提供了一个 builder 类来构造一个 KafkaSink 的实例。下面的代码片段显示了如何将字符串记录写入 Kafka 主题,并保证至少一次交付。

DataStream<String> stream = ...
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build()
        )
        .build();
stream.sinkTo(sink);

构建 KafkaSink 「需要」以下属性:

  • Bootstrap servers, setBootstrapServers(String)
  • Record serializer, setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果您配置交货保证 DeliveryGuarantee.EXACTLY_ONCE 你也必须设置 setTransactionalIdPrefix(String)`


Serializer#

你始终需要提供一个KafkaRecordSerializationSchema以将传入元素从数据流转换为 Kafka 生产者记录。Flink 提供了一个模式构建器来提供一些常见的构建块,即键/值序列化、主题选择、分区。您也可以自行实现接口以施加更多控制。

KafkaRecordSerializationSchema.builder()
    .setTopicSelector((element) -> {<your-topic-selection-logic>})
    .setValueSerializationSchema(new SimpleStringSchema())
    .setKeySerializationSchema(new SimpleStringSchema())
    .setPartitioner(new FlinkFixedPartitioner())
    .build();

「需要」始终设定的值序列化方法和一个主题(选择方法)。此外,还可以通过使用setKafkaKeySerializer(Serializer)或来使用 Kafka 序列化器代替 Flink 序列化器setKafkaValueSerializer(Serializer)


Fault Tolerance#

总的来说,KafkaSink支持三种不同的DeliveryGuarantees。ForDeliveryGuarantee.AT_LEAST_ONCEDeliveryGuarantee.EXACTLY_ONCEFlink 的检查点必须启用。默认情况下KafkaSink使用DeliveryGuarantee.NONE. 您可以在下面找到对不同保证的解释。

  • DeliveryGuarantee.NONE 不提供任何保证:如果 Kafka broker 出现问题,消息可能会丢失,如果 Flink 故障,消息可能会重复。
  • DeliveryGuarantee.AT_LEAST_ONCE:接收器将等待 Kafka 缓冲区中所有未完成的记录在检查点上由 Kafka 生产者确认。如果 Kafka 代理出现任何问题,则不会丢失任何消息,但是当 Flink 重新启动时,消息可能会重复,因为 Flink 会重新处理旧的输入记录。
  • DeliveryGuarantee.EXACTLY_ONCE:在这种模式下,KafkaSink 将写入 Kafka 事务中的所有消息,这些消息将在检查点上提交给 Kafka。因此,如果消费者只读取提交的数据(参见 Kafka 消费者配置隔离级别),则在 Flink 重启的情况下不会看到重复数据。但是,这会有效地延迟记录可见性,直到写入检查点,因此相应地调整检查点持续时间。请确保在同一 Kafka 集群上运行的应用程序中使用唯一的 transactionalIdPrefix,这样多个正在运行的作业不会干扰它们的事务!此外,强烈建议调整 Kafka 事务超时(请参阅 Kafka 生产者 transaction.timeout.ms)» 最大检查点持续时间 + 最大重启持续时间或当 Kafka 未提交的事务到期时可能会发生数据丢失。


Monitoring#

Kafka sink 在各自的 scope 中 公开以下指标。

Scope Metrics User Variables Description Type
Operator currentSendTime n/a 发送最后一条记录所花费的时间。这个度量是为最后处理的记录记录的瞬时值。 Gauge


Kafka Producer#

FlinkKafkaProducer已弃用,将随 Flink 1.15 一起删除,请使用 KafkaSink

对于较旧的参考,您可以查看 Flink 1.13文档。

Kafka Connector Metrics#

Flink 的 Kafka 连接器通过 Flink 的指标系统提供了一些指标来分析连接器的行为。生产者和消费者通过 Flink 的所有支持版本的指标系统导出 Kafka 的内部指标。Kafka 文档在其文档中列出了所有导出的指标。

也可以register.consumer.metrics通过本节概述的 KafkaSource 配置或在使用 KafkaSink 时禁用 Kafka 指标的转发,您可以通过生产者属性将配置设置register.producer.metrics为 false。

Enabling Kerberos Authentication#

Flink 通过 Kafka 连接器提供一流的支持,以对为 Kerberos 配置的 Kafka 安装进行身份验证。只需配置 Flinkflink-conf.yaml即可为 Kafka 启用 Kerberos 身份验证,如下所示:

  1. 通过设置以下内容来配置 Kerberos 凭据 -
  • security.kerberos.login.use-ticket-cache:默认情况下,这是trueFlink 将尝试在由kinit. 请注意,在 YARN 上部署的 Flink 作业中使用 Kafka 连接器时,使用票证缓存的 Kerberos 授权将不起作用。
  • security.kerberos.login.keytabsecurity.kerberos.login.principal:要改用 Kerberos 密钥表,请为这两个属性设置值。
  1. 附加KafkaClientsecurity.kerberos.login.contexts:这告诉 Flink 将配置的 Kerberos 凭据提供给 Kafka 登录上下文以用于 Kafka 身份验证。

启用基于 Kerberos 的 Flink 安全性后,您可以使用 Flink Kafka Consumer 或 Producer 向 Kafka 进行身份验证,只需在传递给内部 Kafka 客户端的提供的属性配置中包含以下两个设置:

  • 设置security.protocolSASL_PLAINTEXT(默认NONE):用于与 Kafka 代理通信的协议。使用独立的 Flink 部署时,也可以使用SASL_SSL; 请在此处查看如何为 SSL 配置 Kafka 客户端。
  • 设置sasl.kerberos.service.namekafka(默认kafka):此值应与用于 Kafka 代理配置的值相匹配sasl.kerberos.service.name。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。

有关 Kerberos 安全性的 Flink 配置的更多信息,请参阅此处。您还可以在此处找到有关 Flink 如何在内部设置基于 Kerberos 的安全性的更多详细信息。

Upgrading to the Latest Connector Version#

升级作业和 Flink 版本指南中概述了通用升级步骤。对于 Kafka,您还需要执行以下步骤:

  • 请勿同时升级 Flink 和 Kafka Connector 版本。
  • 确保您为您的消费者配置了一个group.id
  • 在消费者上设置setCommitOffsetsOnCheckpoints(true),以便将读取偏移量提交给 Kafka。在停止并获取保存点之前执行此操作很重要。您可能必须在旧的连接器版本上执行停止/重新启动循环才能启用此设置。
  • 在消费者上设置setStartFromGroupOffsets(true),以便我们从 Kafka 获得读取偏移量。这只有在 Flink 状态下没有读取偏移时才会生效,这也是下一步非常重要的原因。
  • 更改源/接收器的分配uid。这确保新的源/接收器不会从旧的源/接收器操作符读取状态。
  • 开始新作业,--allow-non-restored-state因为我们在保存点中仍然拥有先前连接器版本的状态。

Troubleshooting#

如果您在使用 Flink 时遇到 Kafka 问题,请记住,Flink 只包装了KafkaConsumer或KafkaProducer,您的问题可能与 Flink 无关,有时可以通过升级 Kafka brokers、重新配置 Kafka brokers 或重新配置KafkaConsumerKafkaProducerin Flink 来解决。下面列出了一些常见问题的示例。


Data loss#

根据您的 Kafka 配置,即使在 Kafka 确认写入之后,您仍然可能会遇到数据丢失的情况。特别要记住 Kafka 配置中的以下属性:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*

上述选项的默认值很容易导致数据丢失。更多解释请参考 Kafka 文档。

UnknownTopicOrPartitionException #

此错误的一个可能原因是正在进行新的领导者选举时,例如在重新启动 Kafka Broker 之后或期间。这是一个可重试的异常,因此 Flink 作业应该能够重新启动并恢复正常运行。它也可以通过更改retries生产者设置中的属性来规避。然而,这可能会导致消息重新排序,反过来,如果不需要,可以通过设置为 1 来规避max.in.flight.requests.per.connection

ProducerFencedException #

此异常的原因很可能是代理端的事务超时。随着KAFKA-6119的实施,(producerId, epoch)将在事务超时后被隔离,并且其所有挂起的事务都被中止(每个transactional.id都映射到一个单独的事务producerId;这在下面的博客文章中有更详细的描述)。

推荐阅读

Flink 任务实时监控最佳实践 Flink on yarn 实时日志收集最佳实践


相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
241 0
|
3月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
48 7
|
3月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
79 4
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
219 0
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
63 0
|
3月前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
51 0
|
3月前
|
消息中间件 NoSQL Kafka
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
60 0
|
消息中间件 SQL Java
Flink自定义Connector
Flink自定义Connector
504 0
|
8月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版产品使用合集之如果想自定义connector和pipeline要如何入手
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 存储 NoSQL
Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ? 先来看一下官网的一张 connector 架构图:
Flink SQL 自定义 redis connector

热门文章

最新文章