Flink中这个问题那个大佬知道吗,我用TO_TIMESTAMP_LTZ函数返回的字段作为水位线字段就没有问题?
当用处理时间转化为时间字段时,使用 TIMESTAMPADD 增加 8个小时。
1、source 端创表增加处理时间列
`proctime` as PROCTIME(),
2、 窗口函数改为使用处理时间
开窗函数使用处理时间
TUMBLE(`proctime`, INTERVAL '1' HOUR ),
3、使用处理时间转化为时间字段时,加 8个小时。
-- 时区加 8个小时
date_format_local(TIMESTAMPADD(HOUR, 8, cast(TUMBLE_START(`proctime`, INTERVAL '1' HOUR) as TIMESTAMP)),'yyyyMMddHHmmss') as `state_time`,
date_format_local(TIMESTAMPADD(HOUR, 8, cast(TUMBLE_START(`proctime`, INTERVAL '1' HOUR)as TIMESTAMP)) ,'yyyyMMddHHmmss') as `stat_time_start`,
date_format_local(TIMESTAMPADD(SECOND,-1, TIMESTAMPADD(HOUR, 8, cast(TUMBLE_END(`proctime`, INTERVAL '1' HOUR) as TIMESTAMP)) ),'yyyyMMddHHmmss') as `stat_time_end`,
——参考链接。
从图中可以看出,您已经设法解决了部分问题,但似乎仍有一些困惑之处有关于时间戳转换和Watermark生成的部分。
为了使事情变得更清晰,让我们回顾一下几个重要的概念:
Watermark:Watermark是一个指示当前系统状态的概念,它是由每个事件携带的一个特殊标记,表明此事件到达的时间点。
Event Time:Event Time是指事件产生的时间,这是事件自身包含的信息。
Processing Time:Processing Time是指处理事件的节点的当前时间。
Ingestion Time:Ingestion Time指的是记录进入Flink系统的时刻,这通常是集群内部的时间,即集群所在的时区。
在创建这张表的过程中,您选择了ts列作为Watermark字段,并将其设置为延迟'0'秒。这样就保证了Watermark总等于当前的Event Time,也就是ts列的值。
但是,当您查看实时监控视图的时候,显示的ts值却晚了八小时。这说明Flink在处理时间戳时采用了错误的时区。
幸运的是,您提到了使用TO_TIMESTAMP_LTZ()函数的情况,这种情况下就不会出现问题。这是因为TO_TIMESTAMP_LTZ()函数已内置了对时区的考量,它可以根据预设的TableConfig.localTimeZone参数来正确展示时间。
因此,针对您的问题解决方案如下:
在调用from_unixtime_unary_16bit函数时,不要忘记添加WITH LOCAL TIME ZONE关键字来获取本地化时间。
使用to_timestamp_ltz函数替代to_timestamp函数,后者对于非UTC时间戳来说不够精确。
确保您的TableConfig实例已经正确设置了localTimeZone参数,使之对应预期的目的地时区。
按照这些指导原则调整代码后,您应该能够在实时监控视图中看到正常的时间戳。
在 Flink 中设置本地时区并创建带有时间戳和水位线的表时,您可能遇到的问题是:尽管设置了正确的时区(如 Asia/Shanghai),但生成的时间戳或水位线仍然存在八小时的差异。这可能是由于以下原因:
tableEnv.getConfig().setLocalTimeZone(TimeZone.of("Asia/Shanghai"));
设置了时区,但在某些情况下,系统的默认时区可能会覆盖这个配置。检查您的环境变量或者 Java 运行时参数是否包含任何与时区相关的设定。为了更好地诊断此问题,建议您提供更多的上下文信息以及代码片段,并尝试进行一些调试步骤来确定具体的原因。如果上述方法无法解决问题,您可以考虑将 TO_TIMESTAMP_LTZ 函数应用于整个过程,看看是否有帮助。在执行 SQL 查询之前打印出原始事件的时间戳值,以便了解它们是如何从数据源获取到的。
您可能想要创建一个带有时间戳字段和水位线字段的表。这里有一些建议来优化您的代码:
WATERMARK FOR ts AS INTERVAL SECOND '1'
CopyCopy
这将创建一个每秒更新一次的水位线字段。
CREATE TABLE cityTemper (
id STRING,
temperature DECIMAL(32.2),
currTimestamp TIMESTAMP,
yyyy-MM-dd DATE,
ts TIMESTAMP(currTimestamp, 1000) WATERMARK AS ts - INTERVAL SECOND '1'
);
CopyCopy
这将创建一个表,其中包含一个名为ts的时间戳字段和一个每秒更新一次的水位线。
你的Flink版本支持TO_TIMESTAMP_LTZ函数吗?
这个函数是在Flink 1.13.0版本中引入的。
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/concepts/timezone/
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。