开发者社区> 问答> 正文

流媒体到parquet文件不匹配flink 1.6.1

flink小助手 2018-12-10 10:18:44 828

我正在尝试创建一个将我的数据源转储到parquet文件的接收器。

我的代码如下:

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
val sink = StreamingFileSink.forBulkFormat(outputPath, ParquetAvroWriters.forReflectRecord(classOf[MyClass])).build()
testSource.addSink(sink)
但它仍然没有产生正确的输出。我目前正在获得一个包含4B数据的单个.part-xxx文件。此流中有大约20,000条记录,因此看起来不对。

在我开始编写这个问题之前,我在第84行的ParquetAvroWriters.java中找到了一个未找到异常的方法。该代码如下所示:

return AvroParquetWriter.<T>builder(out)
        .withSchema(schema)
        .withDataModel(dataModel)
        .build();

AvroParquetWriter方法签名是:

public static Builder builder(Path file)
但是ParquetAvroWriters.java调用它时的参数是StreamOutputFile

我正在使用flink1.6.1和parquet-hadoop / parquet-avro 1.10.0。我究竟应该如何设置拼写文件?

流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:03

    Flink StreamingFileSink使用批量格式自动使用OnCheckpointRollingPolicy。这意味着只有在检查点完成时才会实现结果。这需要提供一次加工保证。

    我假设您使用a CollectionSource作为测试输入,并且此输入的处理小于指定的处理100ms。因此,没有检查点可以完成,也不会写入任何结果。一旦输入完全消耗,Flink将不会触发检查点。因此,最后一个完成的检查点之后的所有事件都将不可见。

    尝试减少检查点间隔,增加CollectionSource或编写的元素数量,该元素的TestingSource extends SourceFunction运行时间至少与单个检查点间隔一样长(例如,使用睡眠)。这样,Flink应该能够完成一个检查点,从而将结果写出到指定的目录。

    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题