开发者社区> 问答> 正文

适用于所有输入

我FlinkKafkaConsumer011订阅了一个主题。我希望apply在每个kafka consumer消息上进行process(),因此custom FooTrigger会返回TriggerResult.FIRE每个元素。

下面的代码工作,我只是感到困惑timeWindowAll(Time.minutes(1))。看起来我做错了什么。

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

// create a Kafka consumer
FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>(

"topic",
new Foo.FooSchema(),
props);   // Properties object

// create Kafka consumer data source
DataStream trades = env.addSource(consumer)

.timeWindowAll(Time.minutes(1))
.trigger(new FooTrigger())
.evictor(new FooEvictor())
.apply(new CreateFoos());

展开
收起
flink小助手 2018-12-10 10:38:08 1822 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    如果您的目标是将一个函数应用于流中的每个事件,ProcessFunction那么在Flink中执行此操作将更简单。或者您可以使用Map或FlatMapFunction或其丰富的变体,即RichMapFunction或RichFlatMapFunction - 这一切都取决于您尝试做什么。

    使用map或flatmap,您可以执行无状态一对一或一对多转换,其丰富的变体可以使用键控状态,ProcessFunction可以使用状态和计时器(前提是已对键进行了键控)。

    timeWindowAll用于流未按键分区的情况,并且您希望按持续时间定义的批量进行非并行处理(对于键控,并行窗口,请改为使用timeWindow)。如果您只想在数据到达时处理数据,那么窗口会增加不必要的复杂性。

    2019-07-17 23:19:04
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载