为啥我这个窗口函数里面的 log一直没数据呢。上面那个有输出。 logger.info("kafkaStream Window....."); 没有。 看起来是执行了。
/** *todo: 2023/3/20 23:14 九师兄 * * 测试数据 {"id":"11","name":"akka","password":"123","password1":"123","age":1} * 输出结果:1 分钟内收集到 student 的数据条数是:7 **/ public void timeWindowAll() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //2.定义加载或创建数据源(source),监听9000端口的socket消息 DataStream textStream9000 = env.socketTextStream("localhost", 9992, "\n");
SingleOutputStreamOperator<Student> student = textStream9000.map(string -> JSONObject.parseObject(string, Student.class));
student.timeWindowAll(Time.seconds(5)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
ArrayList<Student> students = Lists.newArrayList(values);
if (students.size() > 0) {
System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
out.collect(students);
}
}
}).print();
env.execute("flink learning connectors kafka");
}我测试是可以的 你对比一下。此回答整理自钉群“【③群】Apache Flink China社区”
Flink中如何使用TimeWindowAll https://www.yisu.com/zixun/525527.html
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。