Flink落HDFS数据按事件时间分区解决方案

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 0x1 摘要Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,BucketingSink<Object> sink = new BucketingSink<>(path);//通过这样的方式来实现数据跨天分区sink.

0x1 摘要

Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new DateTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
AI 代码解读

0x2 问题点

如果要做到数据完全正确的落到相应分区,那必须用eventTime来划分,我们先来看看DateTimeBucketer桶实现代码,

public class DateTimeBucketer<T> implements Bucketer<T> {
 private static final long serialVersionUID = 1L;
 private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 private final String formatString;
 private final ZoneId zoneId;
 private transient DateTimeFormatter dateTimeFormatter;

 /**
  * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone.
  */
 public DateTimeBucketer() {
  this(DEFAULT_FORMAT_STRING);
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  */
 public DateTimeBucketer(String formatString) {
  this(formatString, ZoneId.systemDefault());
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path.
  */
 public DateTimeBucketer(String formatString, ZoneId zoneId) {
  this.formatString = Preconditions.checkNotNull(formatString);
  this.zoneId = Preconditions.checkNotNull(zoneId);

  this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);
 }

 @Override
 public Path getBucketPath(Clock clock, Path basePath, T element) {
  //分桶关键代码在这里,通过clock获取当前时间戳后格式
  String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));
  return new Path(basePath + "/" + newDateTimeString);
 }
}
AI 代码解读

以上代码clock实例是在BucketingSink#open方法中实例化,代码如下:

this.clock = new Clock() {
 @Override
 public long currentTimeMillis() {
  //直接返回当前处理时间
  return processingTimeService.getCurrentProcessingTime();
 }
};
AI 代码解读

结合以上源码分析发现,使用DateTimeBucketer分桶是采用当前处理时间,采用当前处理时间必然会跟事件事件存在差异,因此会导致数据跨分区落入HDFS文件,举个例子,假设有一条数据事件时间是2019-09-29 23:59:58,那这条数据应该落在2019/09/29分区,但由于这条数据延迟了3秒过来,当处理过来时当前处理时间已经是2019-09-30 00:00:01,所以这条数据会被落到2019/09/30分区,针对一些重要场景数据这样的结果是不可接受的。

0x3 解决方案

从以上第二节源码分析可以看出,解决问题的核心在getBucketPath方法中时间的获取,只要把这里的时间改为事件即可,而正好这个方法的第三参数就是element,代表每一条记录,只要记录中有事件时间就可以获取。既然现有的实现源码不好改,那我们可以自己基于Bucketer接口实现一个EventTimeBucketer分桶器,实现源码如下:

public class EventTimeBucketer implements Bucketer<BaseCountVO> {
    private static final String DEFAULT_FORMAT_STRING = "yyyy/MM/dd";

    private final String formatString;

    private final ZoneId zoneId;
    private transient DateTimeFormatter dateTimeFormatter;

    public EventTimeBucketer() {
        this(DEFAULT_FORMAT_STRING);
    }

    public EventTimeBucketer(String formatString) {
        this(formatString, ZoneId.systemDefault());
    }

    public EventTimeBucketer(ZoneId zoneId) {
        this(DEFAULT_FORMAT_STRING, zoneId);
    }

    public EventTimeBucketer(String formatString, ZoneId zoneId) {
        this.formatString = formatString;
        this.zoneId = zoneId;
        this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId);
    }

    //记住,这个方法一定要加,否则dateTimeFormatter对象会是空,此方法会在反序列的时候调用,这样才能正确初始化dateTimeFormatter对象
    //那有的人问了,上面构造函数不是初始化了吗?反序列化的时候是不走构造函数的
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();

        this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
    }

    @Override
    public Path getBucketPath(Clock clock, Path basePath, BaseCountVO element) {
        String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(element.getTimestamp()));
        return new Path(basePath + "/" + newDateTimeString);
    }
}
AI 代码解读

大家实际项目中可以把BaseCountVO改成自己的实体类即可,使用的时候只要换一下setBucketer值,代码如下:

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new EventTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
AI 代码解读
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
1
8
分享
相关文章
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
331 43
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
396 61
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
143 1
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
98 0
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
2113 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
264 0
Flink CDC 在阿里云实时计算Flink版的云上实践
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等