开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink中这个问题那个大佬知道吗,我用TO_TIMESTAMP_LTZ函数返回的字段作为水位线字?

Flink中这个问题那个大佬知道吗,我用TO_TIMESTAMP_LTZ函数返回的字段作为水位线字段就没有问题?image.png

展开
收起
cuicuicuic 2023-06-05 18:38:36 108 0
6 条回答
写回答
取消 提交回答
  • 当用处理时间转化为时间字段时,使用 TIMESTAMPADD 增加 8个小时。

    1、source 端创表增加处理时间列

    `proctime` as PROCTIME(),
    

    2、 窗口函数改为使用处理时间

     开窗函数使用处理时间
     TUMBLE(`proctime`, INTERVAL '1' HOUR ),
    

    3、使用处理时间转化为时间字段时,加 8个小时。

        -- 时区加 8个小时
        date_format_local(TIMESTAMPADD(HOUR, 8, cast(TUMBLE_START(`proctime`, INTERVAL '1' HOUR) as TIMESTAMP)),'yyyyMMddHHmmss') as `state_time`,
        date_format_local(TIMESTAMPADD(HOUR, 8, cast(TUMBLE_START(`proctime`, INTERVAL '1' HOUR)as TIMESTAMP))   ,'yyyyMMddHHmmss') as `stat_time_start`,
        date_format_local(TIMESTAMPADD(SECOND,-1, TIMESTAMPADD(HOUR, 8, cast(TUMBLE_END(`proctime`, INTERVAL '1' HOUR) as TIMESTAMP))  ),'yyyyMMddHHmmss') as `stat_time_end`,
    

    ——参考链接

    2024-01-25 22:40:01
    赞同 1 展开评论 打赏
  • 数据流来自外部数据源。可能存在与时间戳相关的问题。可以检查一下数据源头。微信截图_20231028103855.png

    2024-01-21 21:05:44
    赞同 展开评论 打赏
  • 某政企事业单位运维工程师,主要从事系统运维工作,同时兼顾教育行业、企业等src安全漏洞挖掘,曾获全国行业网络安全大赛二等奖。

    从图中可以看出,您已经设法解决了部分问题,但似乎仍有一些困惑之处有关于时间戳转换和Watermark生成的部分。

    为了使事情变得更清晰,让我们回顾一下几个重要的概念:

    • Watermark:Watermark是一个指示当前系统状态的概念,它是由每个事件携带的一个特殊标记,表明此事件到达的时间点。

    • Event Time:Event Time是指事件产生的时间,这是事件自身包含的信息。

    • Processing Time:Processing Time是指处理事件的节点的当前时间。

    • Ingestion Time:Ingestion Time指的是记录进入Flink系统的时刻,这通常是集群内部的时间,即集群所在的时区。

    在创建这张表的过程中,您选择了ts列作为Watermark字段,并将其设置为延迟'0'秒。这样就保证了Watermark总等于当前的Event Time,也就是ts列的值。

    但是,当您查看实时监控视图的时候,显示的ts值却晚了八小时。这说明Flink在处理时间戳时采用了错误的时区。

    幸运的是,您提到了使用TO_TIMESTAMP_LTZ()函数的情况,这种情况下就不会出现问题。这是因为TO_TIMESTAMP_LTZ()函数已内置了对时区的考量,它可以根据预设的TableConfig.localTimeZone参数来正确展示时间。

    因此,针对您的问题解决方案如下:

    • 在调用from_unixtime_unary_16bit函数时,不要忘记添加WITH LOCAL TIME ZONE关键字来获取本地化时间。

    • 使用to_timestamp_ltz函数替代to_timestamp函数,后者对于非UTC时间戳来说不够精确。

    • 确保您的TableConfig实例已经正确设置了localTimeZone参数,使之对应预期的目的地时区。

    按照这些指导原则调整代码后,您应该能够在实时监控视图中看到正常的时间戳。

    2024-01-20 18:57:02
    赞同 展开评论 打赏
  • 在 Flink 中设置本地时区并创建带有时间戳和水位线的表时,您可能遇到的问题是:尽管设置了正确的时区(如 Asia/Shanghai),但生成的时间戳或水位线仍然存在八小时的差异。这可能是由于以下原因:

    1. 日期格式不正确:
      您使用的日期格式字符串 "yyyy-MM-dd HH:mm:ss" 可能会导致错误的结果。请确保使用正确的日期/时间格式。
    2. 系统默认时区未被覆盖:
      尽管您已经通过 tableEnv.getConfig().setLocalTimeZone(TimeZone.of("Asia/Shanghai")); 设置了时区,但在某些情况下,系统的默认时区可能会覆盖这个配置。检查您的环境变量或者 Java 运行时参数是否包含任何与时区相关的设定。
    3. 数据源中的时间戳处理不当:
      如果输入的数据流来自外部数据源(例如 Kafka 或其他消息队列),那么这些数据源本身可能存在与时间戳相关的问题。请确认数据源头处的时间戳已经被转换为期望的时区。
    4. 输出结果展示问题:
      当查看最终结果时,请注意显示工具(如控制台、数据库等)是否会自动调整时间以适应不同的时区。

    为了更好地诊断此问题,建议您提供更多的上下文信息以及代码片段,并尝试进行一些调试步骤来确定具体的原因。如果上述方法无法解决问题,您可以考虑将 TO_TIMESTAMP_LTZ 函数应用于整个过程,看看是否有帮助。在执行 SQL 查询之前打印出原始事件的时间戳值,以便了解它们是如何从数据源获取到的。
    image.png

    2024-01-15 15:41:33
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    您可能想要创建一个带有时间戳字段和水位线字段的表。这里有一些建议来优化您的代码:

    1. 使用正确的语法和关键字。在创建表时,请确保使用正确的关键字和语法。例如,您应该使用CREATE TABLE而不是CREATE TABLE。
    2. 确保您正确地定义了水位线字段。在创建表时,您应该定义水位线字段,例如:

    WATERMARK FOR ts AS INTERVAL SECOND '1'
    CopyCopy

    这将创建一个每秒更新一次的水位线字段。

    1. 如果您使用的是Flink 1.13或更高版本,您还可以使用新的时间戳和水位线API。例如,您可以使用以下代码创建表:

    CREATE TABLE cityTemper (
    id STRING,
    temperature DECIMAL(32.2),
    currTimestamp TIMESTAMP,
    yyyy-MM-dd DATE,
    ts TIMESTAMP(currTimestamp, 1000) WATERMARK AS ts - INTERVAL SECOND '1'
    );
    CopyCopy

    这将创建一个表,其中包含一个名为ts的时间戳字段和一个每秒更新一次的水位线。

    2024-01-12 21:28:41
    赞同 展开评论 打赏
  • 你的Flink版本支持TO_TIMESTAMP_LTZ函数吗?
    这个函数是在Flink 1.13.0版本中引入的。
    image.png

    https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/concepts/timezone/

    2024-01-12 14:54:47
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载