Flink1.4 HDFS Connector-阿里云开发者社区

开发者社区> sjf0115> 正文

Flink1.4 HDFS Connector

简介: 原文来源于:Flink1.4 HDFS Connector 此连接器提供一个 Sink,将分区文件写入 Hadoop FileSystem 支持的任何文件系统。
+关注继续查看

原文来源于:Flink1.4 HDFS Connector

此连接器提供一个 Sink,将分区文件写入 Hadoop FileSystem 支持的任何文件系统。要使用此连接器,添加以下依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.10</artifactId>
  <version>1.4-SNAPSHOT</version>
</dependency>

备注

streaming 连接器目前还不是二进制发布包的一部分,请参阅此处来了解有关如何将程序与Libraries打包以进行集群执行的信息。

文件分桶的Sink(Bucketing File Sink)

分桶(Bucketing)行为以及写入数据操作都可以配置,我们稍后会讲到。下面展示了如何通过默认配置创建分桶的Sink,输出到按时间切分的滚动文件中:

Java版本:

DataStream<String> input = ...;
input.addSink(new BucketingSink<String>("/base/path"));


Scala版本:

val input: DataStream[String] = ...
input.addSink(new BucketingSink[String]("/base/path"))


这里唯一必需的参数是这些分桶文件存储的基本路径 /base/path。可以通过指定自定义 bucketerwriter 和 batch 大小来进一步配置 sink

默认情况下,分桶 sink 根据元素到达时当前系统时间来进行切分,并使用 yyyy-MM-dd--HH 时间格式来命名这些分桶。这个时间格式传递给当前的系统时间的 SimpleDateFormat 来命名桶的路径。每当遇到一个新的时间就会创建一个新的桶。例如,如果你有一个包含分钟的最细粒度时间格式,那么你将会每分钟获得一个新桶。每个桶本身就是一个包含 part 文件的目录:Sink的每个并行实例都将创建自己的 part 文件,当 part 文件变得太大时,会紧挨着其他文件创建一个新的 part 文件。当一个桶在最近没有被写入数据时被视为非活跃的。当桶变得不活跃时,打开的 part 文件将被刷新(flush)并关闭。默认情况下,sink 每分钟都会检查非活跃的桶,并关闭一分钟内没有写入数据的桶。可以在 BucketingSink上 使用 setInactiveBucketCheckInterval() 和 setInactiveBucketThreshold() 配置这些行为。

你还可以使用 BucketingSink上 的 setBucketer() 指定自定义 bucketer。如果需要,bucketer 可以使用元素或元组的属性来确定 bucket目录。

默认的 writer 是StringWriter。对传入的元素调用 toString(),并将它们写入 part 文件,用换行符分隔。要在 BucketingSink 上指定一个自定义的 writer,使用 setWriter() 方法即可。如果要写入 Hadoop SequenceFiles 文件中,可以使用提供的 SequenceFileWriter,并且可以配置使用压缩格式。

最后一个配置选项是 batch 大小。这指定何时关闭 part 文件,并开启一个新文件。(默认part文件大小为384MB)。

Java版本:

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

input.addSink(sink);


Scala版本:

val input: DataStream[Tuple2[IntWritable, Text]] = ...

val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,

input.addSink(sink)


上面例子将创建一个sink,写入遵循下面格式的分桶文件中:

/base/path/{date-time}/part-{parallel-task}-{count}


其中 date-time 是从日期/时间格式获得的字符串, parallel-task 是并行 sink 实例的索引,count 是由于 batch大小而创建的part文件的运行编号。

备注:

Sink版本:1.4

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
10052 0
阿里云服务器ECS远程登录用户名密码查询方法
阿里云服务器ECS远程连接登录输入用户名和密码,阿里云没有默认密码,如果购买时没设置需要先重置实例密码,Windows用户名是administrator,Linux账号是root,阿小云来详细说下阿里云服务器远程登录连接用户名和密码查询方法
11596 0
windows server 2008阿里云ECS服务器安全设置
最近我们Sinesafe安全公司在为客户使用阿里云ecs服务器做安全的过程中,发现服务器基础安全性都没有做。为了为站长们提供更加有效的安全基础解决方案,我们Sinesafe将对阿里云服务器win2008 系统进行基础安全部署实战过程! 比较重要的几部分 1.
9156 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
13865 0
阿里云服务器ECS登录用户名是什么?系统不同默认账号也不同
阿里云服务器Windows系统默认用户名administrator,Linux镜像服务器用户名root
4497 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,云吞铺子总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系统盘、创建快照、配置安全组等操作如何登录ECS云服务器控制台? 1、先登录到阿里云ECS服务器控制台 2、点击顶部的“控制台” 3、通过左侧栏,切换到“云服务器ECS”即可,如下图所示 通过ECS控制台的远程连接来登录到云服务器 阿里云ECS云服务器自带远程连接功能,使用该功能可以登录到云服务器,简单且方便,如下图:点击“远程连接”,第一次连接会自动生成6位数字密码,输入密码即可登录到云服务器上。
22375 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
7359 0
+关注
sjf0115
Stay Hungry, Stay Foolish---我们必须用谦虚者的自觉,饥饿者的渴望的求职态度,来拥抱我们的未来。
788
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载