Flink计算每10秒的时间点某个状态的数据有多少咋弄啊?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
博主可以参考下这个代码
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//DataSource操作
DataStreamSource<String> sourceStream = env.socketTextStream("192.168.153.10", 6666);
//通过匿名内部类的方式实现flatMap算子
final SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapStream = sourceStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
final String[] words = line.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
});
//keyBy分组操作
final KeyedStream<Tuple2<String, Integer>, String> keyedStream = flatMapStream.keyBy(value -> value.f0);
//每隔5秒,统计最近10秒的窗口数据
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
//sum求和操作
final SingleOutputStreamOperator<Tuple2<String, Integer>> result = window.sum(1);
//输出结果
result.print();
//执行程序
env.execute("StreamWordCount");
}
}
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。