Flink 事件时间的陷进及解决思路

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介:

0x1 摘要

大家都知道Flink引入了事件时间(eventTime)这个重要概念,来提升数据统计的准确性,但引入事件时间后在具体业务实现时存在一些问题必需要合理去解决,否则会造成非常严重的问题。

0x2 Flink 时间概念介绍

Flink 支持不同的时间概念,包括:

  • Event Time :事件时间
  • Processing Time :处理时间
  • Ingestion Time :消息提取时间

参考下图可以清晰的知道这三者的关系:
时间概念图
Ingestion Time是介于Event TimeProcessing Time之间的概念。
程序中可以通过env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);指定使用时间类型。

0x3 事件时间存在的问题

事件时间存在什么样的问题呢?下面先看一个简单的业务场景。
比如:要统计APP上搜索按钮每1分钟的点击次数。
前端埋点数据结构:

字段名 字段类型 描述
eventCode String 事件编码
clickTime Long 点击时间

基于以上数据结构我们可设计如下水印处理器:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {

 private long currentMaxTimestamp = 0L;

 @Override
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }

 @Override
 public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
  return eventTime;
 }
}

extractTimestamp方法会拿事件时间和上一次事件时间比较,并取较大值来更新当前水印值。
假设前端发送了以下这些数据,方便直观看数据clickTime直接采用格式化后的值,并以逗号分隔数据。

001,2018-12-17 13:30:00
001,2018-12-17 13:30:01
001,2018-12-17 13:30:02
001,2018-12-18 13:30:00
001,2018-12-17 13:30:03
001,2018-12-17 13:30:04
001,2018-12-17 13:30:05

正常数据都是17号,突然来了一条18号的数据,再结合上面的水印逻辑,一旦出现这种问题数据,直接导致水位上升到18号,后面再来17号的数据全部无法处理。针对业务来讲这样的错误是致命的,统计结果出现断层。

0x4 解决思路

针对以上问题我们可以对水印实现类做如下改造:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {

 private long currentMaxTimestamp = 0L;

 @Override
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }

 @Override
 public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  if((currentMaxTimestamp == 0) || (eventTime - currentMaxTimestamp < MESSAGE_FORWARD_TIME)) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
        }
  return eventTime;
 }
}

MESSAGE_FORWARD_TIME变量是自定义的消息最大跳跃时间,如果超出这个范围则不更新最大水印时间。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
程序员 API 数据安全/隐私保护
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
615 9
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
278 0
|
消息中间件 存储 Kafka
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
150 1
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之同步MySQL数据并EP(复杂事件处理)时,编译报错,如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
210 1
|
运维 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在处理MySQL表新增数据记录时,没有正确触发变更事件,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
504 0
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
134 3

热门文章

最新文章