接上篇:https://developer.aliyun.com/article/1622681?spm=a2c6h.13148508.setting.17.27ab4f0ek8nPMY
运行测试
结果数据
查看 word-count/word-count-result.csv 打开即可看到以下内容:
Stateful 1 any 1 common 1 computations 2 on 1 setup 1 state 1 streams. 1 unbounded 1 & 3 Data 2 DataStream 1 High-availability 1 for 1 perform 1 run 1 to 1 Event-time 1 Flexible 1 Sophisticated 1 framework 1 is 1 scale. 1 Exactly-once 1 ProcessFunction 1 Stream 1 a 1 been 1 handling 1 in 1 late 1 processing 2 Batch 1 DataSet 1 at 2 bounded 1 consistency 1 deployment 1 distributed 1 engine 1 has 1 API 2 Apache 1 Flink 2 SQL 1 Streams 1 all 1 designed 1 over 2 Computations 1 Savepoints 1 and 3 data 2 environments, 1 in-memory 1 speed 1 stateful 1 (Time 1 Correctness 1 State) 1 cluster 1 guarantees 1
单词统计(流数据)
需求说明
Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5秒)的数据进行聚合统计,每隔1秒汇总计算一次,并且把时间窗口内计算结果打印出来。
编写代码
Server部分
编写一个Socket服务,提供一定的数据流。
package icu.wzk; import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Random; public class WordCountServer { public static void main(String[] args) throws IOException, InterruptedException { String ip = "localhost"; int port = 9999; Random random = new Random(); ServerSocket serverSocket = new ServerSocket(); InetSocketAddress address = new InetSocketAddress(ip, port); serverSocket.bind(address); Socket socket = serverSocket.accept(); OutputStream outputStream = socket.getOutputStream(); PrintWriter writer = new PrintWriter(outputStream, true); for (int i = 0; i < 1000; i ++) { int number = random.nextInt(100); System.out.println(number); writer.println(number); Thread.sleep((random.nextInt(900) + 100)); } socket.close(); serverSocket.close(); } }
Flink部分
连接到上述的Server部分
package icu.wzk; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WordCount2 { public static void main(String[] args) throws Exception { String ip = "localhost"; int port = 9999; // 获取 Flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取 Socket 输入数据 DataStreamSource<String> textStream = env.socketTextStream(ip, port, "\n"); SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = textStream .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new Tuple2<>(word, 1)); } } }); SingleOutputStreamOperator<Tuple2<String, Integer>> word = wordCount .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .timeWindow(Time.seconds(5), Time.seconds(1)) .sum(1); // 输出并运行 word.print(); env.execute("Word Count"); } }
观察结果
Server部分
35 18 84 72 24 51 15 13 65 98 55 68 22 84 17
Flink部分
3> (35,1) 4> (18,1) 3> (35,1) 5> (84,1) 4> (18,1) 6> (72,1) 3> (35,1) 5> (84,1) 5> (24,1) 3> (35,1) 6> (72,1) 4> (18,1) 7> (51,1) 5> (24,1) 5> (84,1) 4> (15,1) 6> (72,1) 7> (51,1) 3> (35,1) 4> (15,1) 4> (18,1)
运行结果过程截图如下所示:
过程总结
- 获得一个执行环境
- 加载、创建 初始化环境
- 指定数据操作的算子
- 指定结果数据存放位置
- 调用Execute触发执行程序
注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正的触发执行程序。