开发者社区> 问答> 正文

关于event-time的定义与产生时间戳位置的问题。

hi,all: event time这个时间戳是在什么时候打到数据上面去的,看api是在flink source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗? 不知道有哪里是我理解不对的地方望指教! 祝好~*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 22:21:39 731 0
1 条回答
写回答
取消 提交回答
  • Hi, 可以看下事件时间戳的生成,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html 下面例子里时间戳都是来自element里面的时间字段。还有一个AscendingTimestampExtractor。 /** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. /publicclassBoundedOutOfOrdernessGeneratorimplementsAssignerWithPeriodicWatermarks {privatefinallongmaxOutOfOrderness=3500;// 3.5 secondsprivatelongcurrentMaxTimestamp;@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){longtimestamp=element.getCreationTime();currentMaxTimestamp=Math.max(timestamp,currentMaxTimestamp);returntimestamp;}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current highest timestamp minus the out-of-orderness boundreturnnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}/ * * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */publicclassTimeLagWatermarkGeneratorimplementsAssignerWithPeriodicWatermarks {privatefinallongmaxTimeLag=5000;// 5 seconds@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkgetCurrentWatermark(){// return the watermark as current time minus the maximum time lagreturnnewWatermark(System.currentTimeMillis()-maxTimeLag);}} publicclassPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks {@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkcheckAndGetNextWatermark(MyEventlastElement,longextractedTimestamp){returnlastElement.hasWatermarkMarker()?newWatermark(extractedTimestamp):null;}} 希望能有所帮助。

    DataStream和kafkaSource后面都可以调用assignTimestampsAndWatermarks。

    kafkaSource.assignTimestampsAndWatermarks(newAscendingTimestampExtractor (){@OverridepubliclongextractAscendingTimestamp(MyTypeelement){returnelement.eventTimestamp();}}); *来自志愿者整理的flink邮件归档

    2021-12-08 10:39:19
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Window_Time_Watermark 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载