我试图使用KSQL在一个时间限制内做任何处理,并在该时间限制内获得结果。有关使用Apache Beam说明的相同想法,请参阅“处理时间计时器”下的使用Apache Beam的及时(和有状态)处理。
鉴于:
具有唯一键的事务流;
在同一个流中更新这些事务; 和
在第一个流中出现事务后,希望在特定超时(例如20秒)接收更新事务的下游处理器。
从概念上讲,我正在考虑创建第一个流的KTable来保存最新的事务状态,并使用KSQL通过查询KTable来创建输出流(create_time + timeout)
我还没有找到在KSQL文档中执行此操作的方法,即使有内置的current_time,我也不确定它是否会被评估,直到另一条记录出现在流中。
我怎么能在KSQL中做到这一点?我需要自定义UDF吗?如果在KSQL中无法完成,我可以在KStreams中完成吗?
=====
更新:看起来KStreams今天不支持这一点 - Apache Flink似乎是这个用例的方式(和许多其他用户)。如果您知道解决KStreams限制的巧妙方法,请告诉我!
看看Kafka Streams punctuate()的Processor API中的功能,这可能就是您正在寻找的功能。您可以将punctuate()与stream-time(默认值:event-time)以及处理时间(via PunctuationType.WALL_CLOCK_TIME)一起使用。在这里,您将根据您的需要实现一个Processor或一个用于超时功能的。Transformerpunctuate()
提示:您也可以在Kafka Streams的DSL中使用这样的处理器/变换器。这意味着您可以继续使用更方便的DSL,如果您愿意,只需要在基于DSL的代码中的正确位置插入处理器/变换器。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。