Flink实战(八) - Streaming Connectors 编程(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink实战(八) - Streaming Connectors 编程(下)

3.6 Kafka生产者

Flink的Kafka Producer被称为FlinkKafkaProducer011(或010 对于Kafka 0.10.0.x版本。或者直接就是FlinkKafkaProducer,对于Kafka>=1.0.0的版本来说)。

它允许将记录流写入一个或多个Kafka主题。


自应用

Pro

image.png

确保启动端口

2.png

Pro端生产消息

3.png

消费端接收

4.png

Example

  • Java
  • 5.png
  • Scala

1.png

上面的示例演示了创建Flink Kafka Producer以将流写入单个Kafka目标主题的基本用法。对于更高级的用法,还有其他构造函数变体允许提供以下内容:


提供自定义属性

生产者允许为内部的KafkaProducer提供自定义属性配置。

自定义分区程序

将记录分配给特定分区,可以为FlinkKafkaPartitioner构造函数提供实现。将为流中的每个记录调用此分区程序,以确定应将记录发送到的目标主题的确切分区。

高级序列化模式

与消费者类似,生产者还允许使用调用的高级序列化模式KeyedSerializationSchema,该模式允许单独序列化键和值。它还允许覆盖目标主题,以便一个生产者实例可以将数据发送到多个主题。

3.8 Kafka消费者开始位置配置

Flink Kafka Consumer允许配置如何确定Kafka分区的起始位置。


Java

image.png

Scala

3.png

Flink Kafka Consumer的所有版本都具有上述明确的起始位置配置方法。


setStartFromGroupOffsets(默认行为)

从group.idKafka代理(或Zookeeper for Kafka 0.8)中的消费者组(在消费者属性中设置)提交的偏移量开始读取分区。如果找不到分区的偏移量,auto.offset.reset将使用属性中的设置。

setStartFromEarliest()/ setStartFromLatest()

从最早/最新记录开始。在这些模式下,Kafka中的承诺偏移将被忽略,不会用作起始位置。

setStartFromTimestamp(long)

从指定的时间戳开始。对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。

还可以指定消费者应从每个分区开始的确切偏移量:


Java

image.png

Scala

5.png

上面的示例将使用者配置为从主题的分区0,1和2的指定偏移量开始myTopic。偏移值应该是消费者应为每个分区读取的下一条记录。请注意,如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为(即)。


请注意,当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。


3.9 Kafka生产者和容错

Kafka 0.8

在0.9之前,Kafka没有提供任何机制来保证至少一次或恰好一次的语义。


Kafka 0.9和0.10

启用Flink的检查点时,FlinkKafkaProducer09和FlinkKafkaProducer010 能提供至少一次传输保证。


除了开启Flink的检查点,还应该配置setter方法:


setLogFailuresOnly(boolean)

默认为false。启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们。这大体上就是计数已成功的记录,即使它从未写入目标Kafka主题。这必须设为false对于确保 至少一次

setFlushOnCheckpoint(boolean)

默认为true。启用此函数后,Flink的检查点将在检查点成功之前等待检查点时的任何动态记录被Kafka确认。这可确保检查点之前的所有记录都已写入Kafka。必须开启,对于确保 至少一次

总之,默认情况下,Kafka生成器对版本0.9和0.10具有至少一次保证,即

setLogFailureOnly设置为false和setFlushOnCheckpoint设置为true。


默认情况下,重试次数设置为“0”。这意味着当setLogFailuresOnly设置为时false,生产者会立即失败,包括Leader更改。

默认情况下,该值设置为“0”,以避免重试导致目标主题中出现重复消息。对于经常更改代理的大多数生产环境,建议将重试次数设置为更高的值。


Kafka目前没有生产者事务,因此Flink在Kafka主题里无法保证恰好一次交付


Kafka >= 0.11

启用Flink的检查点后,FlinkKafkaProducer011


对于Kafka >= 1.0.0版本是FlinkKafkaProduce


可以提供准确的一次交付保证。


除了启用Flink的检查点,还可以通过将适当的语义参数传递给FlinkKafkaProducer011,选择三种不同的算子操作模式


Semantic.NONE

6.png

  • Flink啥都不保证。生成的记录可能会丢失,也可能会重复。
  • Semantic.AT_LEAST_ONCE(默认设置)
  • 7.png
  • 类似于setFlushOnCheckpoint(true)在 FlinkKafkaProducer010。这可以保证不会丢失任何记录(尽管它们可以重复)。
  • Semantic.EXACTLY_ONCE

8.png

使用Kafka事务提供恰好一次的语义。每当您使用事务写入Kafka时,不要忘记为任何从Kafka消费记录的应用程序设置所需的isolation.level(read_committed 或read_uncommitted- 后者为默认值)。

注意事项

Semantic.EXACTLY_ONCE 模式依赖于在从所述检查点恢复之后提交在获取检查点之前启动的事务的能力。如果Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将导致数据丢失(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。


Kafka broker默认 transaction.max.timeout.ms 设置为15分钟。此属性不允许为生产者设置大于其值的事务超时。

FlinkKafkaProducer011默认情况下,将transaction.timeout.msproducer config中的属性设置为1小时,因此transaction.max.timeout.ms在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 该属性。


在read_committed模式中KafkaConsumer,任何未完成的事务(既不中止也不完成)将阻止来自给定Kafka主题的所有读取超过任何未完成的事务。换言之,遵循以下事件顺序:


用户事务1开启并写记录

用户事务2开启并写了一些其他记录

用户提交事务2

即使事务2已经提交了记录,在事务1提交或中止之前,消费者也不会看到它们。这有两个含义:


首先,在Flink应用程序的正常工作期间,用户可以预期Kafka主题中生成的记录的可见性会延迟,等于已完成检查点之间的平均时间。

其次,在Flink应用程序失败的情况下,读者将阻止此应用程序编写的主题,直到应用程序重新启动或配置的事务超时时间过去为止。此注释仅适用于有多个代理/应用程序写入同一Kafka主题的情况。

Semantic.EXACTLY_ONCE 模式为每个FlinkKafkaProducer011实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。


Semantic.EXACTLY_ONCE 采取所有可能的措施,不要留下任何阻碍消费者阅读Kafka主题的延迟事务,这是必要的。但是,如果Flink应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中没有关于先前池大小的信息。因此,在第一个检查点完成之前按比例缩小Flink应用程序是不安全的 FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR。


3.10 Kafka消费者及其容错

启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他 算子操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自Kafka的记录。


因此,绘制检查点的间隔定义了程序在发生故障时最多可以返回多少。


检查点常用参数

enableCheckpointing

启用流式传输作业的检查点。 将定期快照流式数据流的分布式状态。 如果发生故障,流数据流将从最新完成的检查点重新启动。


该作业在给定的时间间隔内定期绘制检查点。 状态将存储在配置的状态后端。


此刻未正确支持检查点迭代流数据流。 如果“force”参数设置为true,则系统仍将执行作业。

9.png

setCheckpointingMode

10.png

setCheckpointTimeout

11.png

setMaxConcurrentCheckpoints

12.png

要使用容错的Kafka使用者,需要在运行环境中启用拓扑的检查点:

  • Scala
  • 13.png
  • Java

14.png

另请注意,如果有足够的处理插槽可用于重新启动拓扑,则Flink只能重新启动拓扑。因此,如果拓扑由于丢失了TaskManager而失败,那么之后仍然必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。


如果未启用检查点,Kafka使用者将定期向Zookeeper提交偏移量。


参考

https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html

http://kafka.apache.org/documentation/

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
9月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
232 3
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
231 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
522 2
探索Flink动态CEP:杭州银行的实战案例
|
3月前
|
SQL 分布式计算 数据处理
Structured Streaming和Flink实时计算框架的对比
本文对比了Structured Streaming和Flink两大流处理框架。Structured Streaming基于Spark SQL,具有良好的可扩展性和容错性,支持多种数据源和输出格式。Flink则以低延迟、高吞吐和一致性著称,适合毫秒级的流处理任务。文章详细分析了两者在编程模型、窗口操作、写入模式、时间语义、API和库、状态管理和生态系统等方面的优劣势。
|
6月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
113 1
|
6月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
137 0
|
9月前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
9月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即"Top N"问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
222 1
|
9月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
106 5
|
9月前
|
存储 SQL 消息中间件
Flink作业问题之取消Flink Streaming作业失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

热门文章

最新文章