我正在尝试创建一个将我的数据源转储到parquet文件的接收器。
我的代码如下:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
val sink = StreamingFileSink.forBulkFormat(outputPath, ParquetAvroWriters.forReflectRecord(classOf[MyClass])).build()
testSource.addSink(sink)
但它仍然没有产生正确的输出。我目前正在获得一个包含4B数据的单个.part-xxx文件。此流中有大约20,000条记录,因此看起来不对。
在我开始编写这个问题之前,我在第84行的ParquetAvroWriters.java中找到了一个未找到异常的方法。该代码如下所示:
return AvroParquetWriter.<T>builder(out)
.withSchema(schema)
.withDataModel(dataModel)
.build();
AvroParquetWriter方法签名是:
public static Builder builder(Path file)
但是ParquetAvroWriters.java调用它时的参数是StreamOutputFile
我正在使用flink1.6.1和parquet-hadoop / parquet-avro 1.10.0。我究竟应该如何设置拼写文件?
Flink StreamingFileSink使用批量格式自动使用OnCheckpointRollingPolicy。这意味着只有在检查点完成时才会实现结果。这需要提供一次加工保证。
我假设您使用a CollectionSource作为测试输入,并且此输入的处理小于指定的处理100ms。因此,没有检查点可以完成,也不会写入任何结果。一旦输入完全消耗,Flink将不会触发检查点。因此,最后一个完成的检查点之后的所有事件都将不可见。
尝试减少检查点间隔,增加CollectionSource或编写的元素数量,该元素的TestingSource extends SourceFunction运行时间至少与单个检查点间隔一样长(例如,使用睡眠)。这样,Flink应该能够完成一个检查点,从而将结果写出到指定的目录。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。