hi,all: event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? 不知道有哪里是我理解不对的地方望指教! 祝好~*来自志愿者整理的flink邮件归档
Hi, 可以看下事件时间戳的生成,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html 下面例子里时间戳都是来自element里面的时间字段。还有一个AscendingTimestampExtractor。 /** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. /publicclassBoundedOutOfOrdernessGeneratorimplementsAssignerWithPeriodicWatermarks {privatefinallongmaxOutOfOrderness=3500;// 3.5 secondsprivatelongcurrentMaxTimestamp;@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){longtimestamp=element.getCreationTime();currentMaxTimestamp=Math.max(timestamp,currentMaxTimestamp);returntimestamp;}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current highest timestamp minus the out-of-orderness boundreturnnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}/ * * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */publicclassTimeLagWatermarkGeneratorimplementsAssignerWithPeriodicWatermarks {privatefinallongmaxTimeLag=5000;// 5 seconds@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current time minus the maximum time lagreturnnewWatermark(System.currentTimeMillis()-maxTimeLag);}} publicclassPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks {@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkcheckAndGetNextWatermark(MyEventlastElement,longextractedTimestamp){returnlastElement.hasWatermarkMarker()?newWatermark(extractedTimestamp):null;}} 希望能有所帮助。
DataStream和kafkaSource后面都可以调用assignTimestampsAndWatermarks。
kafkaSource.assignTimestampsAndWatermarks(newAscendingTimestampExtractor (){@OverridepubliclongextractAscendingTimestamp(MyTypeelement){returnelement.eventTimestamp();}}); *来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。