Flink1.4 HDFS Connector

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 原文来源于:Flink1.4 HDFS Connector此连接器提供一个 Sink,将分区文件写入 Hadoop FileSystem 支持的任何文件系统。

原文来源于:Flink1.4 HDFS Connector

此连接器提供一个 Sink,将分区文件写入 Hadoop FileSystem 支持的任何文件系统。要使用此连接器,添加以下依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.10</artifactId>
  <version>1.4-SNAPSHOT</version>
</dependency>

备注

streaming 连接器目前还不是二进制发布包的一部分,请参阅此处来了解有关如何将程序与Libraries打包以进行集群执行的信息。

文件分桶的Sink(Bucketing File Sink)

分桶(Bucketing)行为以及写入数据操作都可以配置,我们稍后会讲到。下面展示了如何通过默认配置创建分桶的Sink,输出到按时间切分的滚动文件中:

Java版本:

DataStream<String> input = ...;
input.addSink(new BucketingSink<String>("/base/path"));


Scala版本:

val input: DataStream[String] = ...
input.addSink(new BucketingSink[String]("/base/path"))


这里唯一必需的参数是这些分桶文件存储的基本路径 /base/path。可以通过指定自定义 bucketerwriter 和 batch 大小来进一步配置 sink

默认情况下,分桶 sink 根据元素到达时当前系统时间来进行切分,并使用 yyyy-MM-dd--HH 时间格式来命名这些分桶。这个时间格式传递给当前的系统时间的 SimpleDateFormat 来命名桶的路径。每当遇到一个新的时间就会创建一个新的桶。例如,如果你有一个包含分钟的最细粒度时间格式,那么你将会每分钟获得一个新桶。每个桶本身就是一个包含 part 文件的目录:Sink的每个并行实例都将创建自己的 part 文件,当 part 文件变得太大时,会紧挨着其他文件创建一个新的 part 文件。当一个桶在最近没有被写入数据时被视为非活跃的。当桶变得不活跃时,打开的 part 文件将被刷新(flush)并关闭。默认情况下,sink 每分钟都会检查非活跃的桶,并关闭一分钟内没有写入数据的桶。可以在 BucketingSink上 使用 setInactiveBucketCheckInterval() 和 setInactiveBucketThreshold() 配置这些行为。

你还可以使用 BucketingSink上 的 setBucketer() 指定自定义 bucketer。如果需要,bucketer 可以使用元素或元组的属性来确定 bucket目录。

默认的 writer 是StringWriter。对传入的元素调用 toString(),并将它们写入 part 文件,用换行符分隔。要在 BucketingSink 上指定一个自定义的 writer,使用 setWriter() 方法即可。如果要写入 Hadoop SequenceFiles 文件中,可以使用提供的 SequenceFileWriter,并且可以配置使用压缩格式。

最后一个配置选项是 batch 大小。这指定何时关闭 part 文件,并开启一个新文件。(默认part文件大小为384MB)。

Java版本:

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

input.addSink(sink);


Scala版本:

val input: DataStream[Tuple2[IntWritable, Text]] = ...

val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,

input.addSink(sink)


上面例子将创建一个sink,写入遵循下面格式的分桶文件中:

/base/path/{date-time}/part-{parallel-task}-{count}


其中 date-time 是从日期/时间格式获得的字符串, parallel-task 是并行 sink 实例的索引,count 是由于 batch大小而创建的part文件的运行编号。

备注:

Sink版本:1.4

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
12天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2天前
|
Prometheus 监控 Cloud Native
实时计算 Flink版产品使用问题之怎么关闭HDFS的Web界面
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5天前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
5天前
|
搜索推荐 流计算
美团 Flink 大作业部署问题之HDFS 在 Flink 作业中面临什么压力
美团 Flink 大作业部署问题之HDFS 在 Flink 作业中面临什么压力
|
2月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之独立集群与hdfs集群不在一起,何配置checkpoint目录为hdfs
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL JSON 数据处理
实时计算 Flink版产品使用问题之把hdfs集群里的core-site.xml hdfs.xml两个文件放到flink/conf/目录下,启动集群说找不到hdfs,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
395 0
下一篇
云函数