开发者社区> 问答> 正文

我可以使用KSQL生成处理时间超时吗?

flink小助手 2018-12-13 14:43:17 385

我试图使用KSQL在一个时间限制内做任何处理,并在该时间限制内获得结果。有关使用Apache Beam说明的相同想法,请参阅“处理时间计时器”下的使用Apache Beam的及时(和有状态)处理。

鉴于:

具有唯一键的事务流;
在同一个流中更新这些事务; 和
在第一个流中出现事务后,希望在特定超时(例如20秒)接收更新事务的下游处理器。
从概念上讲,我正在考虑创建第一个流的KTable来保存最新的事务状态,并使用KSQL通过查询KTable来创建输出流(create_time + timeout)

我还没有找到在KSQL文档中执行此操作的方法,即使有内置的current_time,我也不确定它是否会被评估,直到另一条记录出现在流中。

我怎么能在KSQL中做到这一点?我需要自定义UDF吗?如果在KSQL中无法完成,我可以在KStreams中完成吗?

=====

更新:看起来KStreams今天不支持这一点 - Apache Flink似乎是这个用例的方式(和许多其他用户)。如果您知道解决KStreams限制的巧妙方法,请告诉我!

测试技术 Apache 流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:20:42

    看看Kafka Streams punctuate()的Processor API中的功能,这可能就是您正在寻找的功能。您可以将punctuate()与stream-time(默认值:event-time)以及处理时间(via PunctuationType.WALL_CLOCK_TIME)一起使用。在这里,您将根据您的需要实现一个Processor或一个用于超时功能的。Transformerpunctuate()

    提示:您也可以在Kafka Streams的DSL中使用这样的处理器/变换器。这意味着您可以继续使用更方便的DSL,如果您愿意,只需要在基于DSL的代码中的正确位置插入处理器/变换器。

    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题
推荐课程