我FlinkKafkaConsumer011订阅了一个主题。我希望apply在每个kafka consumer消息上进行process(),因此custom FooTrigger会返回TriggerResult.FIRE每个元素。
下面的代码工作,我只是感到困惑timeWindowAll(Time.minutes(1))。看起来我做错了什么。
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// create a Kafka consumer
FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>(
"topic",
new Foo.FooSchema(),
props); // Properties object
// create Kafka consumer data source
DataStream trades = env.addSource(consumer)
.timeWindowAll(Time.minutes(1))
.trigger(new FooTrigger())
.evictor(new FooEvictor())
.apply(new CreateFoos());
如果您的目标是将一个函数应用于流中的每个事件,ProcessFunction那么在Flink中执行此操作将更简单。或者您可以使用Map或FlatMapFunction或其丰富的变体,即RichMapFunction或RichFlatMapFunction - 这一切都取决于您尝试做什么。
使用map或flatmap,您可以执行无状态一对一或一对多转换,其丰富的变体可以使用键控状态,ProcessFunction可以使用状态和计时器(前提是已对键进行了键控)。
timeWindowAll用于流未按键分区的情况,并且您希望按持续时间定义的批量进行非并行处理(对于键控,并行窗口,请改为使用timeWindow)。如果您只想在数据到达时处理数据,那么窗口会增加不必要的复杂性。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。