Flink落HDFS数据按事件时间分区解决方案

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 0x1 摘要Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,BucketingSink<Object> sink = new BucketingSink<>(path);//通过这样的方式来实现数据跨天分区sink.

0x1 摘要

Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new DateTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
AI 代码解读

0x2 问题点

如果要做到数据完全正确的落到相应分区,那必须用eventTime来划分,我们先来看看DateTimeBucketer桶实现代码,

public class DateTimeBucketer<T> implements Bucketer<T> {
 private static final long serialVersionUID = 1L;
 private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 private final String formatString;
 private final ZoneId zoneId;
 private transient DateTimeFormatter dateTimeFormatter;

 /**
  * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone.
  */
 public DateTimeBucketer() {
  this(DEFAULT_FORMAT_STRING);
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  */
 public DateTimeBucketer(String formatString) {
  this(formatString, ZoneId.systemDefault());
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path.
  */
 public DateTimeBucketer(String formatString, ZoneId zoneId) {
  this.formatString = Preconditions.checkNotNull(formatString);
  this.zoneId = Preconditions.checkNotNull(zoneId);

  this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);
 }

 @Override
 public Path getBucketPath(Clock clock, Path basePath, T element) {
  //分桶关键代码在这里,通过clock获取当前时间戳后格式
  String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));
  return new Path(basePath + "/" + newDateTimeString);
 }
}
AI 代码解读

以上代码clock实例是在BucketingSink#open方法中实例化,代码如下:

this.clock = new Clock() {
 @Override
 public long currentTimeMillis() {
  //直接返回当前处理时间
  return processingTimeService.getCurrentProcessingTime();
 }
};
AI 代码解读

结合以上源码分析发现,使用DateTimeBucketer分桶是采用当前处理时间,采用当前处理时间必然会跟事件事件存在差异,因此会导致数据跨分区落入HDFS文件,举个例子,假设有一条数据事件时间是2019-09-29 23:59:58,那这条数据应该落在2019/09/29分区,但由于这条数据延迟了3秒过来,当处理过来时当前处理时间已经是2019-09-30 00:00:01,所以这条数据会被落到2019/09/30分区,针对一些重要场景数据这样的结果是不可接受的。

0x3 解决方案

从以上第二节源码分析可以看出,解决问题的核心在getBucketPath方法中时间的获取,只要把这里的时间改为事件即可,而正好这个方法的第三参数就是element,代表每一条记录,只要记录中有事件时间就可以获取。既然现有的实现源码不好改,那我们可以自己基于Bucketer接口实现一个EventTimeBucketer分桶器,实现源码如下:

public class EventTimeBucketer implements Bucketer<BaseCountVO> {
    private static final String DEFAULT_FORMAT_STRING = "yyyy/MM/dd";

    private final String formatString;

    private final ZoneId zoneId;
    private transient DateTimeFormatter dateTimeFormatter;

    public EventTimeBucketer() {
        this(DEFAULT_FORMAT_STRING);
    }

    public EventTimeBucketer(String formatString) {
        this(formatString, ZoneId.systemDefault());
    }

    public EventTimeBucketer(ZoneId zoneId) {
        this(DEFAULT_FORMAT_STRING, zoneId);
    }

    public EventTimeBucketer(String formatString, ZoneId zoneId) {
        this.formatString = formatString;
        this.zoneId = zoneId;
        this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId);
    }

    //记住,这个方法一定要加,否则dateTimeFormatter对象会是空,此方法会在反序列的时候调用,这样才能正确初始化dateTimeFormatter对象
    //那有的人问了,上面构造函数不是初始化了吗?反序列化的时候是不走构造函数的
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();

        this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
    }

    @Override
    public Path getBucketPath(Clock clock, Path basePath, BaseCountVO element) {
        String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(element.getTimestamp()));
        return new Path(basePath + "/" + newDateTimeString);
    }
}
AI 代码解读

大家实际项目中可以把BaseCountVO改成自己的实体类即可,使用的时候只要换一下setBucketer值,代码如下:

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new EventTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
AI 代码解读
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
1
8
分享
相关文章
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
74 17
Flink + Doris 实时湖仓解决方案
Flink基于Paimon的实时湖仓解决方案的演进
本文整理自阿里云智能集团苏轩楠老师在Flink Forward Asia 2024论坛的分享,涵盖流式湖仓架构的背景介绍、技术演进和未来发展规划。背景部分介绍了ODS、DWD、DWS三层数据架构及关键组件Flink与Paimon的作用;技术演进讨论了全量与增量数据处理优化、宽表构建及Compaction操作的改进;发展规划则展望了Range Partition、Materialized Table等新功能的应用前景。通过这些优化,系统不仅简化了复杂度,还提升了实时与离线处理的灵活性和效率。
417 3
Flink基于Paimon的实时湖仓解决方案的演进
Flink基于Paimon的实时湖仓解决方案的演进
Flink基于Paimon的实时湖仓解决方案的演进
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
293 9
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
88 4
|
6月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
76 2
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
126 0
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
192 0
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
80 0
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
121 0

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等