开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

问下 flink DateTimeBucketAssigner 想自定义时间格式的目录类似于这种 d

问下 flink DateTimeBucketAssigner 想自定义时间格式的目录类似于这种 dt=2022-06-30 该怎么操作啊

展开
收起
游客3oewgrzrf6o5c 2022-07-03 17:42:39 592 0
1 条回答
写回答
取消 提交回答
  • 十分耕耘,一定会有一分收获!

    楼主你好,可以通过继承 DateTimeBucketAssigner 并重写 getBucketPrefix 方法实现自定义时间格式的目录。具体实现可以参考以下代码:

    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.streaming.api.functions.sink.filesystem.DateTimeBucketAssigner;
    
    import java.time.ZoneId;
    import java.time.format.DateTimeFormatter;
    
    public class CustomDateTimeBucketAssigner<T> extends DateTimeBucketAssigner<T> {
    
        private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dt=yyyy-MM-dd")
                .withZone(ZoneId.systemDefault());
    
        public CustomDateTimeBucketAssigner(String formatString, Time zoneOffset) {
            super(formatString, zoneOffset);
        }
    
        @Override
        public String getBucketPrefix(final T element, final String context) {
            return formatter.format(getDateTime(element));
        }
    }
    

    在上面的代码中,我们继承了 DateTimeBucketAssigner 类并重写了 getBucketPrefix 方法,将时间格式化成了 dt=yyyy-MM-dd 的形式,并返回了格式化后的字符串。

    然后在使用时,只需要将自定义的 CustomDateTimeBucketAssigner 类替换掉原来的 DateTimeBucketAssigner 类即可。例如:

    DataStream<Tuple2<String, Long>> inputStream = ...
    
    DateTimeBucketAssigner<Tuple2<String, Long>> bucketAssigner =
            new CustomDateTimeBucketAssigner<>("yyyy-MM-dd", Time.hours(8));
    
    StreamingFileSink<Tuple2<String, Long>> sink = StreamingFileSink
            .forRowFormat(new Path("hdfs:///output/"), new SimpleStringEncoder<>())
            .withBucketAssigner(bucketAssigner)
            .withBucketCheckInterval(Time.minutes(1))
            .build();
    
    inputStream.addSink(sink);
    

    在上面的代码中,我们使用自定义的 CustomDateTimeBucketAssigner 类创建了 bucketAssigner 对象,并将其传入 StreamingFileSink 中。这样就可以实现自定义时间格式的目录了。

    2023-08-22 16:41:04
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载