你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +
但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗? java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
即使我在datastream里定义了strategy , ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));
也还是报上面一样的错。*来自志愿者整理的flink邮件归档
Hi marble, 使用 Datastream 开发的话,Kafka connector 的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。 对应的中文文档对应在文献3和4. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html [3] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html [4] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。