开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink统计的数值感觉像flink开窗的时候,判断现在的时间是8小时前,该如何解决?

理论上来说,Flink如果是水印显示问题,应该统计的数值是实际时间的统计才对,现在的问题是:统计的值确实是8小时前的数据,比如说我8小时前的点击有120,实际时间(8小时后)的点击有200,目前到实际时间16点结果的统计确实是120,而不是200,比如现在16点,有200点击,8点的时候有120点击,按天统计点击量。这个开窗结果是0:00-8:00有120点击,而不是0:00-8:00有200点击,而到了24点的时候,会变成0:00-16:00有200点击
就是开窗结果是对的,但是window_end的会一直落后实际时间8小时,也就是说8小时后我能得到正确的结果
感觉像flink开窗的时候,判断现在的时间是8小时前,该如何解决?

展开
收起
2401。 2023-09-17 21:16:37 78 0
6 条回答
写回答
取消 提交回答
  • 可以尝试以下三种方案:

    1、flink端不做处理。也即是在读取数据的时候加上8小时的offset。

    例如通过注入时间来解决:

    CREATE VIEW view_table AS
    SELECT
       id,
       -- 通过注入时间解决
       -- 加上东八区的时间偏移量,设置注入时间为时间戳列
       CAST(CURRENT_TIMESTAMP AS BIGINT) * 1000 + 8 * 60 * 60 * 1000 as ingest_time
    FROM 
       source_table;
    

    2、使用udf等算子给时间戳加上8小时的offset。

    sink端处理

    import org.apache.flink.table.functions.ScalarFunction;
    
    import java.sql.Timestamp;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.TimeZone;
    
    public class UTC2Local extends ScalarFunction {
        public Timestamp eval(Timestamp s) {
            long timestamp = s.getTime() + 28800000;
            return new Timestamp(timestamp);
        }
    
    }
    

    注册udf

    tEnv.registerFunction("utc2local",new UTC2Local());
    

    使用udf

    Table table1 = tEnv.sqlQuery("select count(number),utc2local(TUMBLE_END(proctime, INTERVAL '1' HOUR)) from res group by TUMBLE(proctime, INTERVAL '1' HOUR)");
    

    3、sink内部做处理。

    sink端的实现也比较简单,主要是判断输出字段类型,然后加上8小时offset即可。可以参考blink的printtablesink的实现。

    override def invoke(in: JTuple2[JBool, Row]): Unit = {
        val sb = new StringBuilder
        val row = in.f1
        for (i <- 0 to row.getArity - 1) {
          if (i > 0) sb.append(",")
          val f = row.getField(i)
          if (f.isInstanceOf[Date]) {
            sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd", tz))
          } else if (f.isInstanceOf[Time]) {
            sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss", tz))
          } else if (f.isInstanceOf[Timestamp]) {
            sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime,
              "yyyy-MM-dd HH:mm:ss.SSS", tz))
          } else {
            sb.append(StringUtils.arrayAwareToString(f))
          }
        }
    
        if (in.f0) {
          System.out.println(prefix + "(+)" + sb.toString())
        } else {
          System.out.println(prefix + "(-)" + sb.toString())
        }
      }
    

    ——参考链接

    2024-01-24 17:43:49
    赞同 1 展开评论 打赏
  • 可以使用基于时间窗口(Time Window)的功能。以下是一个简单的示例,假设我们有一个包含事件时间和数值的DataStream,要统计每个小时内8小时前的数据累计值

    2024-01-21 21:29:59
    赞同 展开评论 打赏
  • 您描述的情况表明,Flink作业中的窗口计算基于事件时间,并且水印机制没有正确地反映实时时间,而是落后了8个小时。这意味着窗口结束时间(window_end)并没有及时推进,因此在实际时间到达某个窗口结束点时,窗口尚未触发计算,所以统计的结果还停留在8小时之前的数据上。要解决这个问题,您需要检查以下几个关键点:

    1. 水印策略:

      • 水印(Watermark)应当根据数据中的最大事件时间戳减去一个合理的延迟阈值来设定。例如,如果所有事件都有时间戳,并且您知道事件最多延迟8小时到达,那么水印可以设置为当前最小事件时间戳减去8小时(但应留一些余裕以应对极端情况)。
      DataStreamSource<T> source = ...
      WatermarkStrategy<T> watermarkStrategy = Watermarks.periodicBoundedOutOfOrderness(Duration.ofHours(8));
      SingleOutputStreamOperator<T> withTimestampAndWatermark = source.assignTimestampsAndWatermarks(watermarkStrategy);
      
    2. 窗口定义:

      • 确保窗口边界基于事件时间而非处理时间,并与水印策略协调一致。例如,如果您要做的是按天统计,则窗口应该是每天的开始到结束。
      .keyBy(...) // 基于所需字段分组
      .window(TumblingEventTimeWindows.of(Time.days(1))) // 每天一个窗口
      
    3. 系统时区同步:

      • 虽然您之前提到了时区调整,但如果事件时间戳本身不是UTC时间且未正确处理时区差异,也可能导致问题。确认所有时间戳都被统一转换成了 UTC 或者作业运行环境中预期的时区。
    4. 延迟数据处理:

      • 对于确实迟到了的数据,确保Flink作业配置了合理的迟到数据处理策略,如允许迟到数据进入窗口进行计算,或者有特殊的迟到数据处理逻辑。

    只有当上述设置正确无误,Flink作业才能正确识别实时时间窗口并准确统计相应窗口内的数据。如果您发现水印总是滞后8小时,那就很可能是水印策略设置不当造成的,需要针对性地调整水印生成策略。

    2024-01-15 14:19:32
    赞同 展开评论 打赏
  • 某政企事业单位运维工程师,主要从事系统运维及大数据开发工作,多次获得阿里云、华为云、腾讯云征文比赛一二等奖;CTF选手,白帽,全国交通行业网络安全大赛二等奖,手握多张EDU、CNVD、CNNVD证书

    在Flink中,窗口通常是基于事件时间的,而不是全局时间。

    换句话说,窗口的结束时间不是固定的,而是随着新事件的到来不断移动。这对于许多用例是非常有用的,因为它可以让用户灵活地处理不同类型的事件,而不必担心全局时间的变化。

    然而,正如您指出的,有时这会导致问题,特别是当窗口的结束时间比实际时间早很多的情况下。在这种情况下,窗口的结束时间总是提前8个小时,尽管实际时间已经超过了那个时间。

    为了解决这些问题,您可以尝试将窗口的结束时间设置为实际时间(8小时后),而不是8小时前。这样,窗口的结束时间就会与实际时间保持一致,从而得到正确的结果。

    例如,您可以将窗口的结束时间设置为当前事件时间加上8小时:

    DataStream stream = ...; stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.hours(8))) .sum("click_count");
    

    这样,窗口的结束时间就会是当前事件时间加上8小时,从而得到正确的结果。

    2024-01-15 11:07:42
    赞同 展开评论 打赏
  • 这个问题可能是由于Flink的窗口计算逻辑导致的。在Flink中,窗口计算是基于事件时间的,也就是说,窗口的结束时间是相对于当前事件的时间来计算的。因此,当你的窗口结束时间设置为8小时前时,实际上窗口的结束时间是在当前事件时间的基础上减去8小时。

    要解决这个问题,你可以尝试将窗口的结束时间设置为实际时间(8小时后),而不是8小时前。这样,窗口的结束时间就会与实际时间保持一致,从而得到正确的结果。

    例如,你可以将窗口的结束时间设置为当前事件时间加上8小时:

    DataStream<ClickEvent> stream = ...;
    stream.keyBy(...)
          .window(TumblingEventTimeWindows.of(Time.hours(8)))
          .sum("click_count");
    

    这样,窗口的结束时间就会是当前事件时间加上8小时,从而得到正确的结果。

    2024-01-13 20:23:28
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    Flink 可能使用了基于时间戳的窗口,并且在处理时使用了某种时间戳偏移。这可能是由于您使用的是基于 event-time 语义的窗口,而 Flink 默认使用的是处理时间(processing time)。
    要解决这个问题,您可以尝试以下方法:

    1. 更改窗口语义:确保您的窗口是基于 event-time 语义的。例如,如果您使用的是 TumblingEventTimeWindows,那么窗口结束时,Flink 会基于事件的时间戳来确定窗口结束时间。如果您使用的是 SlidingEventTimeWindows,那么 Flink 会基于事件的时间戳来确定窗口结束时间,并且窗口会随着时间的推移而滑动。
    2. 调整时间戳偏移:如果您确实需要使用处理时间,并且需要将窗口结束时间偏移 8 小时,那么您可以尝试调整 window.trigger。例如,您可以使用 AfterWatermark 作为窗口触发器,并将 delay 参数设置为 8 小时。这样,Flink 会在接收到事件的时间戳之后等待 8 小时才关闭窗口。
      以下是使用 AfterWatermark 作为窗口触发器的示例:

    DataStream input = ...;
    // 创建一个基于 event-time 的窗口
    Window window =
    Window.tumblingEventTime(Time.seconds(8))
    .trigger(AfterWatermark.pastEndOfWindow());
    // 使用窗口进行处理
    input
    .keyBy(event -> event.getKey())
    .window(window)
    .reduce((event1, event2) -> {
    // 处理两个事件
    });

    请注意,这只是一个示例,您可能需要根据您的具体需求进行调整。

    2024-01-12 22:23:33
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载