1 介绍
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html https://blog.csdn.net/u013220482/article/details/100901471
1.1 场景描述
StreamingFileSink是Flink1.7中推出的新特性,是为了解决如下的问题:大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中。
StreamingFileSink就可以用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中,支持Exactly-Once语义。这种sink实现的Exactly-Once都是基于Flink checkpoint来实现的两阶段提交模式来保证的,主要应用在实时数仓、topic拆分、基于小时分析处理等场景下。
1.2 Bucket和SubTask、PartFile
⚫ Bucket
StreamingFileSink可向由Flink FileSystem抽象支持的文件系统写入分区文件(因为是流式写入,数据被视为无界)。该分区行为可配,默认按时间,具体来说每小时写入一个Bucket,该Bucket包括若干文件,内容是这一小时间隔内流中收到的所有record。
⚫ PartFile
每个Bukcket内部分为多个PartFile来存储输出数据,该Bucket生命周期内接收到数据的sink的每个
子任务至少有一个PartFile。而额外文件滚动由可配的滚动策略决定,默认策略是根据文件大小和打开超时(文件可以被打开的最大持续时间)以及文件最大不活动超时等决定是否滚动。Bucket和SubTask、PartFile关系如图所示
2 案例演示
⚫ 需求
编写Flink程序,接收socket的字符串数据,然后将接收到的数据流式方式存储到hdfs
⚫ 开发步骤
1.初始化流计算运行环境
2.设置Checkpoint(10s)周期性启动
3.指定并行度为1
4.接入socket数据源,获取数据
5.指定文件编码格式为行编码格式
6.设置桶分配策略
7.设置文件滚动策略
8.指定文件输出配置
9.将streamingfilesink对象添加到环境
10.执行任务
⚫ 实现代码
package cn.oldlu.extend; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.util.concurrent.TimeUnit; public class StreamFileSinkDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10)); env.setStateBackend(new FsStateBackend("file:///D:/ckp")); //2.source DataStreamSource<String> lines = env.socketTextStream("node1", 9999); //3.sink //设置sink的前缀和后缀 //文件的头和文件扩展名 //prefix-xxx-.txt OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".txt") .build(); //设置sink的路径 String outputPath = "hdfs://node1:8020/FlinkStreamFileSink/parquet"; //创建StreamingFileSink final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) /** * 设置桶分配政策 * DateTimeBucketAssigner --默认的桶分配政策,默认基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH * BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶) */ .withBucketAssigner(new DateTimeBucketAssigner<>()) /** * 有三种滚动政策 * CheckpointRollingPolicy * DefaultRollingPolicy * OnCheckpointRollingPolicy */ .withRollingPolicy( /** * 滚动策略决定了写出文件的状态变化过程 * 1. In-progress :当前文件正在写入中 * 2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态 * 3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态 * * 观察到的现象 * 1.会根据本地时间和时区,先创建桶目录 * 2.文件名称规则:part-<subtaskIndex>-<partFileIndex> * 3.在macos中默认不显示隐藏文件,需要显示隐藏文件才能看到处于In-progress和Pending状态的文件,因为文件是按照.开头命名的 * */ DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) //设置滚动间隔 .withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) //设置不活动时间间隔 .withMaxPartSize(1024 * 1024 * 1024) // 最大尺寸 .build()) .withOutputFileConfig(config) .build(); lines.addSink(sink).setParallelism(1); env.execute(); } }
3 File Sink介绍
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
新的 Data Sink API (Beta)之前发布的 Flink 版本中[1],已经支持了 source connector 工作在流批两种模式下,因此在Flink 1.12 中,社区着重实现了统一的 Data Sink API(FLIP-143)。新的抽象引入了write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what 和 how:SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);Committer 和GlobalCommitter,封装了如何处理 committables。框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中 commit。
这种模块化的抽象允许为 BATCH 和 STREAMING 两种执行模式,实现不同的运行时策略,以达到仅使用一种 sink 实现,也可以使两种模式都可以高效执行。Flink 1.12 中,提供了统一的FileSink connector,以替换现有的 StreamingFileSink connector (FLINK-19758)。其它的connector 也将逐步迁移到新的接口。Flink 1.12的 FileSink 为批处理和流式处理提供了一个统一的接收器,它将分区文件写入Flink文件系统抽象所支持的文件系统。这个文件系统连接器为批处理和流式处理提供了相同的保证,它是现有流式文件接收器的一种改进。
3.1 案例演示
package cn.oldlu.extend; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.util.concurrent.TimeUnit; /** * Author oldlu * Desc */ public class FileSinkDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10)); env.setStateBackend(new FsStateBackend("file:///D:/ckp")); //2.source DataStreamSource<String> lines = env.socketTextStream("node1", 9999); //3.sink //设置sink的前缀和后缀 //文件的头和文件扩展名 //prefix-xxx-.txt OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".txt") .build(); //设置sink的路径 String outputPath = "hdfs://node1:8020/FlinkFileSink/parquet"; final FileSink<String> sink = FileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) .withBucketAssigner(new DateTimeBucketAssigner<>()) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .withOutputFileConfig(config) .build(); lines.sinkTo(sink).setParallelism(1); env.execute(); } }