Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ 2.flinK sql代码里我也设置了时区tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")) 2.currTimestamp是13位时间错 二、然后会有以下问题: 1.其中使用TO_TIMESTAMP(FROM_UNIXTIME(currTimestamp / 1000, 'yyyy-MM-dd HH:mm:ss'))的字段设置为水位线,水位线会比实际数据的时间早8小时;
这个问题可能是由于Flink在处理事件时间时,对于TIMESTAMP和TIMESTAMP_LTZ类型的数据,会自动减去一个固定的偏移量(8小时)。这个偏移量是由Flink的系统配置决定的,即使你在代码中设置了时区,也无法改变这个偏移量。
你可以尝试以下方法解决这个问题:
将数据类型从TIMESTAMP改为TIMESTAMP_LTZ,这样Flink就不会自动减去8小时的偏移量了。
如果你不能改变数据类型,那么你需要在代码中手动减去8小时的偏移量。例如,你可以使用以下代码来获取正确的水位线:
SELECT TO_TIMESTAMP(FROM_UNIXTIME(currTimestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) - INTERVAL '8' HOUR AS watermark
FROM your_table;
- 如果以上方法都不能解决你的问题,那么你可能需要考虑使用其他的时间处理库,如Java 8的java.time包,它提供了更灵活的时间处理功能。