Flink实战(八) - Streaming Connectors 编程(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Flink实战(八) - Streaming Connectors 编程(上)

1 概览

1.1 预定义的源和接收器

Flink内置了一些基本数据源和接收器,并且始终可用。该预定义的数据源包括文件,目录和插socket,并从集合和迭代器摄取数据。该预定义的数据接收器支持写入文件和标准输入输出及socket。


1.2 绑定连接器

连接器提供用于与各种第三方系统连接的代码。目前支持这些系统:


Apache Kafka (source/sink)

Apache Cassandra (sink)

Amazon Kinesis Streams (source/sink)

Elasticsearch (sink)

Hadoop FileSystem (sink)

RabbitMQ (source/sink)

Apache NiFi (source/sink)

Twitter Streaming API (source)

Google PubSub (source/sink)

要在应用程序中使用其中一个连接器,通常需要其他第三方组件,例如数据存储或消息队列的服务器。


虽然本节中列出的流连接器是Flink项目的一部分,并且包含在源版本中,但它们不包含在二进制分发版中。


1.3 Apache Bahir中的连接器

Flink的其他流处理连接器正在通过Apache Bahir发布,包括:


Apache ActiveMQ (source/sink)

Apache Flume (sink)

Redis (sink)

Akka (sink)

Netty (source)

1.4 其他连接到Flink的方法

1.4.1 通过异步I / O进行数据渲染

使用连接器不是将数据输入和输出Flink的唯一方法。一种常见的模式是在一个Map或多个FlatMap 中查询外部数据库或Web服务以渲染主数据流。

Flink提供了一个用于异步I / O的API, 以便更有效,更稳健地进行这种渲染。


1.4.2 可查询状态

当Flink应用程序将大量数据推送到外部数据存储时,这可能会成为I / O瓶颈。如果所涉及的数据具有比写入更少的读取,则更好的方法可以是外部应用程序从Flink获取所需的数据。在可查询的状态界面,允许通过Flink被管理的状态,按需要查询支持这个。


2 HDFS连接器

此连接器提供一个Sink,可将分区文件写入任一Hadoop文件系统支持的文件系统 。


要使用此连接器,请将以下依赖项添加到项目中:

1.png

2.png

请注意,流连接器当前不是二进制发布的一部分

2.1 Bucketing File Sink

可以配置分段行为以及写入,但我们稍后会介绍。这是可以创建一个默认情况下汇总到按时间拆分的滚动文件的存储槽的方法

  • Java
  • 3.png
  • Scala

4.png

唯一必需的参数是存储桶的基本路径。可以通过指定自定义bucketer,写入器和批量大小来进一步配置接收器。


默认情况下,当数据元到达时,分段接收器将按当前系统时间拆分,并使用日期时间模式"yyyy-MM-dd–HH"命名存储区。这种模式传递给 DateTimeFormatter使用当前系统时间和JVM的默认时区来形成存储桶路径。用户还可以为bucketer指定时区以格式化存储桶路径。每当遇到新日期时,都会创建一个新存储桶。


例如,如果有一个包含分钟作为最精细粒度的模式,将每分钟获得一个新桶。每个存储桶本身都是一个包含多个部分文件的目录:接收器的每个并行实例将创建自己的部件文件,当部件文件变得太大时,接收器也会在其他文件旁边创建新的部件文件。当存储桶变为非活动状态时,将刷新并关闭打开的部件文件。如果存储桶最近未写入,则视为非活动状态。默认情况下,接收器每分钟检查一次非活动存储桶,并关闭任何超过一分钟未写入的存储桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一个BucketingSink。


也可以通过指定自定义bucketer setBucketer()上BucketingSink。如果需要,bucketer可以使用数据元或元组的属性来确定bucket目录。


默认编写器是StringWriter。这将调用toString()传入的数据元并将它们写入部分文件,由换行符分隔。在a setWriter() 上指定自定义编写器使用BucketingSink。如果要编写Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter,也可以配置为使用压缩。


有两个配置选项指定何时应关闭零件文件并启动新零件文件:


通过设置批量大小(默认部件文件大小为384 MB)

通过设置批次滚动时间间隔(默认滚动间隔为Long.MAX_VALUE)

当满足这两个条件中的任何一个时,将启动新的部分文件。看如下例子:


Java

5.png

Scala

6.png

这将创建一个接收器,该接收器将写入遵循此模式的存储桶文件:

7.png

Java

8.png

生成结果

9.png

  • date-time是我们从日期/时间格式获取的字符串
  • parallel-task是并行接收器实例的索引
  • count是由于批处理大小或批处理翻转间隔而创建的部分文件的运行数
  • 10.png

然而这种方式创建了太多小文件,不适合HDFS!仅供娱乐!


3 Apache Kafka连接器

3.1 简介

此连接器提供对Apache Kafka服务的事件流的访问。


Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖Kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。


为用例和环境选择一个包(maven artifact id)和类名。对于大多数用户来说,FlinkKafkaConsumer08(部分flink-connector-kafka)是合适的。

11.png

然后,导入maven项目中的连接器:

12.png

https://blog.csdn.net/qq_33589510/article/details/97064338

3.2 ZooKeeper安装及配置

配置系统环境

13.png

修改配置数据存储路径

14.png

15.png

启动

16.png

3.3 Kafka部署及测试

假设你刚刚开始并且没有现有的Kafka或ZooKeeper数据


由于Kafka控制台脚本对于基于Unix和Windows的平台不同,因此在Windows平台上使用bin \ windows \而不是bin /,并将脚本扩展名更改为.bat。


Step 1:下载代码

下载:https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz

17.png

解压

18.png

配置环境变量

19.png

20.png

21.png

配置服务器属性

image.png

修改日志存储路径

23.png

修改主机名

24.png

Step 2: 启动服务器

Kafka使用ZooKeeper,因此如果还没有ZooKeeper服务器,则需要先启动它。

  • 后台模式启动
  • 25.png

Step 3: 创建一个主题

  • 创建topic
  • 26.png

Step 4: 发送一些消息

Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。 默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

  • 启动生产者

27.png

Step 5: 启动一个消费者

Kafka还有一个命令行使用者,它会将消息转储到标准输出。

  • 分屏,新建消费端

28.png

在不同的终端中运行上述每个命令,那么现在应该能够在生产者终端中键入消息并看到它们出现在消费者终端中

29.png

所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。

3.4 Kafka 1.0.0+ Connector

从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主要版本。 相反,它在Flink发布时跟踪最新版本的Kafka。


如果您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka连接器。 如果使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。


兼容性

通过Kafka客户端API和代理的兼容性保证,通用Kafka连接器与较旧和较新的Kafka代理兼容。 它与版本0.11.0或更高版本兼容,具体取决于所使用的功能。


将Kafka Connector从0.11迁移到通用(V1.10新增)

要执行迁移,请参阅升级作业和Flink版本指南和


在整个过程中使用Flink 1.9或更新版本。

不要同时升级Flink和操作符。

确保您作业中使用的Kafka Consumer和/或Kafka Producer分配了唯一标识符(uid):

使用stop with savepoint功能获取保存点(例如,使用stop --withSavepoint)CLI命令。

用法

要使用通用Kafka连接器,请为其添加依赖关系:

30.png

然后实例化新源(FlinkKafkaConsumer)

31.png

Flink Kafka Consumer是一个流数据源,可以从Apache Kafka中提取并行数据流。 使用者可以在多个并行实例中运行,每个实例都将从一个或多个Kafka分区中提取数据。

Flink Kafka Consumer参与了检查点,并保证在故障期间没有数据丢失,并且计算处理元素“恰好一次”。(注意:这些保证自然会假设Kafka本身不会丢失任何数据。)


请注意,Flink在内部将偏移量作为其分布式检查点的一部分进行快照。 承诺给Kafka的抵消只是为了使外部的进展观与Flink对进展的看法同步。 这样,监控和其他工作可以了解Flink Kafka消费者在多大程度上消耗了一个主题。


和接收器(FlinkKafkaProducer)。

除了从模块和类名中删除特定的Kafka版本之外,API向后兼容Kafka 0.11连接器。

3.5 Kafka消费者

Flink的Kafka消费者被称为FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供对一个或多个Kafka主题的访问。


构造函数接受以下参数:


主题名称/主题名称列表

DeserializationSchema / KeyedDeserializationSchema用于反序列化来自Kafka的数据

Kafka消费者的属性。需要以下属性:

“bootstrap.servers”(以逗号分隔的Kafka经纪人名单)

“zookeeper.connect”(逗号分隔的Zookeeper服务器列表)(仅Kafka 0.8需要)

“group.id”消费者群组的ID

33.png

34.png

35.png

36.png

  • 上述程序注意配置ip主机映射
  • 虚拟机hosts
  • 37.png
  • 本地机器 hosts

38.png

发送消息

39.png

运行程序消费消息

40.png

  • Example:
  • Java
  • 41.png
  • Scala
  • 42.png

The DeserializationSchema

Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。

在 DeserializationSchema允许用户指定这样的一个架构。T deserialize(byte[] message) 为每个Kafka消息调用该方法,从Kafka传递值。


从它开始通常很有帮助AbstractDeserializationSchema,它负责将生成的Java / Scala类型描述为Flink的类型系统。实现vanilla的用户DeserializationSchema需要自己实现该getProducedType(…)方法。


为了访问Kafka消息的键和值,KeyedDeserializationSchema具有以下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)。


为方便起见,Flink提供以下模式:


TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)创建基于Flink的模式TypeInformation。如果Flink编写和读取数据,这将非常有用。此模式是其他通用序列化方法的高性能Flink替代方案。

JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”)作为(Int / String / …)()从中访问字段。KeyValue objectNode包含一个“key”和“value”字段,其中包含所有字段,以及一个可选的“元数据”字段,用于公开此消息的偏移量/分区/主题。

AvroDeserializationSchema它使用静态提供的模式读取使用Avro格式序列化的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(…))中推断出模式,也可以GenericRecords 使用手动提供的模式(with AvroDeserializationSchema.forGeneric(…))。此反序列化架构要求序列化记录不包含嵌入式架构。

还有一个可用的模式版本,可以在Confluent Schema Registry中查找编写器的模式(用于编写记录的 模式)。使用这些反序列化模式记录将使用从模式注册表中检索的模式进行读取,并转换为静态提供的模式(通过 ConfluentRegistryAvroDeserializationSchema.forGeneric(…)或ConfluentRegistryAvroDeserializationSchema.forSpecific(…))。

要使用此反序列化模式,必须添加以下附加依赖项:


当遇到因任何原因无法反序列化的损坏消息时,有两个选项 - 从deserialize(…)方法中抛出异常将导致作业失败并重新启动,或者返回null以允许Flink Kafka使用者以静默方式跳过损坏的消息。请注意,由于使用者的容错能力(请参阅下面的部分以获取更多详细信息),因此对损坏的消息执行失败将使消费者尝试再次反序列化消息。因此,如果反序列化仍然失败,则消费者将在该损坏的消息上进入不间断重启和失败循环。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
9月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
232 3
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
231 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
522 2
探索Flink动态CEP:杭州银行的实战案例
|
3月前
|
SQL 分布式计算 数据处理
Structured Streaming和Flink实时计算框架的对比
本文对比了Structured Streaming和Flink两大流处理框架。Structured Streaming基于Spark SQL,具有良好的可扩展性和容错性,支持多种数据源和输出格式。Flink则以低延迟、高吞吐和一致性著称,适合毫秒级的流处理任务。文章详细分析了两者在编程模型、窗口操作、写入模式、时间语义、API和库、状态管理和生态系统等方面的优劣势。
|
6月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
113 1
|
6月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
137 0
|
9月前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
9月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即"Top N"问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
222 1
|
9月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
106 5
|
9月前
|
存储 SQL 消息中间件
Flink作业问题之取消Flink Streaming作业失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

热门文章

最新文章