开发者社区> 问答> 正文

我可以使用KSQL生成处理时间超时吗?

我试图使用KSQL在一个时间限制内做任何处理,并在该时间限制内获得结果。有关使用Apache Beam说明的相同想法,请参阅“处理时间计时器”下的使用Apache Beam的及时(和有状态)处理。

鉴于:

具有唯一键的事务流;
在同一个流中更新这些事务; 和
在第一个流中出现事务后,希望在特定超时(例如20秒)接收更新事务的下游处理器。
从概念上讲,我正在考虑创建第一个流的KTable来保存最新的事务状态,并使用KSQL通过查询KTable来创建输出流(create_time + timeout)

我还没有找到在KSQL文档中执行此操作的方法,即使有内置的current_time,我也不确定它是否会被评估,直到另一条记录出现在流中。

我怎么能在KSQL中做到这一点?我需要自定义UDF吗?如果在KSQL中无法完成,我可以在KStreams中完成吗?

=====

更新:看起来KStreams今天不支持这一点 - Apache Flink似乎是这个用例的方式(和许多其他用户)。如果您知道解决KStreams限制的巧妙方法,请告诉我!

展开
收起
flink小助手 2018-12-13 14:43:17 2155 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    看看Kafka Streams punctuate()的Processor API中的功能,这可能就是您正在寻找的功能。您可以将punctuate()与stream-time(默认值:event-time)以及处理时间(via PunctuationType.WALL_CLOCK_TIME)一起使用。在这里,您将根据您的需要实现一个Processor或一个用于超时功能的。Transformerpunctuate()

    提示:您也可以在Kafka Streams的DSL中使用这样的处理器/变换器。这意味着您可以继续使用更方便的DSL,如果您愿意,只需要在基于DSL的代码中的正确位置插入处理器/变换器。

    2019-07-17 23:20:42
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载