开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink sql/table watermark 迟到数据有办法获取到吗 ,有老师能给指点?

flink sql/table watermark 迟到数据有办法获取到吗 ,有老师能给指点指点吗?

展开
收起
真的很搞笑 2023-07-02 17:54:23 136 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink SQL/Table API 中,可以使用 Watermark 来处理事件时间(Event Time)数据流。当一个 Watermark 到达 Flink 中间件时,它会告诉 Flink 所有小于该 Watermark 的事件都已经到达,因此 Flink 可以在此时触发一些操作,例如窗口的关闭和计算结果的输出。然而,如果一个事件的时间戳比当前 Watermark 还要晚,那么这个事件就会被视为迟到数据(Late Data)。
    对于迟到数据,Flink SQL/Table API 提供了一些处理方式,您可以根据实际情况选择合适的方式:
    丢弃迟到数据:在某些场景下,迟到数据可能已经没有意义,可以直接将其丢弃。在 Flink SQL/Table API 中,可以通过设置 WITH (DROP_LATE_EVENT = true) 来丢弃迟到数据。例如:
    scheme
    Copy
    CREATE TABLE my_table (
    ...
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
    'connector.type' = 'kafka',
    ...
    'format.type' = 'json',
    'format.derive-schema' = 'true',
    'sink.partitioner' = 'round-robin',
    'sink.properties.bootstrap.servers' = 'localhost:9092',
    'update-mode' = 'append',
    'with (DROP_LATE_EVENT = true)'
    );
    在上述示例中,WITH (DROP_LATE_EVENT = true) 表示丢弃迟到数据。
    将迟到数据放入侧输出流:在某些场景下,迟到数据可能仍然有用,可以将其放入侧输出流(Side Output)中。在 Flink SQL/Table API 中,可以通过在 WATERMARK 语句中设置 DELAY 参数来定义迟到数据的延迟时间,然后使用 LATE 语句将迟到数据放入侧输出流。例如:
    scheme
    Copy
    CREATE TABLE my_table (
    ...
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND - DELAY '1' MINUTE
    ) WITH (
    'connector.type' = 'kafka',
    ...
    'format.type' = 'json',
    'format.derive-schema' = 'true',
    'sink.partitioner' = 'round-robin',
    'sink.properties.bootstrap.servers' = 'localhost:9092',
    'update-mode' = 'append'
    );

    CREATE TABLE my_side_output (
    ...
    ) WITH (
    'connector.type' = 'kafka',
    ...
    'format.type' = 'json',
    'format.derive-schema' = 'true',
    'sink.partitioner' = 'round-robin',
    'sink.properties.bootstrap.servers' = 'localhost:9092',
    'update-mode' = 'append'
    );

    INSERT INTO my_output
    SELECT ...
    FROM my_table
    LATE AS my_side_output
    在上述示例中,DELAY '1' MINUTE 表示迟到数据的延迟时间为 1 分钟

    2023-07-30 09:36:09
    赞同 展开评论 打赏
  • 在 Flink SQL/Table API 中,Watermark 主要用于处理事件时间数据,并确定事件时间窗口的边界。当数据到达时,Flink 会根据 Watermark 来触发窗口计算并输出结果。

    对于迟到的数据(即事件时间晚于 Watermark 的数据),默认情况下,Flink 会将其丢弃,不参与窗口计算。然而,你可以通过使用 Flink 提供的延迟数据处理机制来获取迟到的数据。

    具体来说,你可以使用窗口函数中的 side output 功能,将迟到的数据发送到一个侧输出流(Side Output)中。然后,你可以在该侧输出流上定义处理逻辑,例如将迟到的数据保存到外部存储或进行特定的处理操作。

    以下是一个示例代码片段,展示了如何使用 Flink SQL/Table API 处理迟到的数据:

    -- 创建输入表
    CREATE TABLE inputTable (
      eventTime TIMESTAMP,
      value INT
    ) WITH (
      'connector' = ...,
      -- 其他连接器配置
      'watermark.ingestion.timestamp.assigner' = '...'
    
    );
    
    -- 创建窗口表并指定 Watermark
    CREATE TABLE windowedTable AS
    SELECT TUMBLE_START(eventTime, INTERVAL '1' HOUR) as wStart,
           TUMBLE_END(eventTime, INTERVAL '1' HOUR) as wEnd,
           COUNT(value) as cnt
    FROM inputTable
    GROUP BY TUMBLE(eventTime, INTERVAL '1' HOUR)
    HAVING eventTime < TUMBLE_END(eventTime, INTERVAL '1' HOUR)
       OR WATERMARK_FOR(eventTime) IS NULL;
    
    -- 定义侧输出表
    CREATE TABLE lateDataOutput (
      eventTime TIMESTAMP,
      value INT
    ) WITH (
      'connector' = ...,
      -- 其他连接器配置
    );
    
    -- 将迟到的数据发送到侧输出表
    INSERT INTO lateDataOutput
    SELECT eventTime, value
    FROM inputTable
    WHERE eventTime >= TUMBLE_END(eventTime, INTERVAL '1' HOUR);
    
    -- 执行计划
    EXPLAIN INSERT INTO windowedTable SELECT ...
    

    在上述代码中,我们首先创建了一个输入表 inputTable,并为其指定了 Watermark 的生成方式。然后,我们创建了窗口表 windowedTable,通过指定条件来过滤掉迟到的数据。接下来,我们定义了一个侧输出表 lateDataOutput,并使用 INSERT INTO 语句将迟到的数据插入到该表中。最后,我们执行计划,将结果插入到 windowedTable 中。

    请注意,以上示例代码仅演示了一种处理迟到数据的方法。具体的实现逻辑可能会因你的业务需求和场景而有所不同。你可以根据实际情况调整代码,并参考 Flink 的官方文档和社区资源进一步了解延迟数据处理的更多细节

    2023-07-30 09:39:15
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    SQL Server在电子商务中的应用与实践 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载