代码仓库
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
当前章节
继续上一节的内容:https://blog.csdn.net/w776341482/article/details/139875037
上一节中,我们需要使用 nc 或者 telnet 等工具来模拟 Socket 流。这节我们写一个 ServerSocket 来模拟这些 操作,让流自动的写入不用我们手动去操作了。
POM.xml
与上一节一致,不需要修改
编写代码
还是和上一节一样的 Socket 流,这里略去其他的代码
DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");
FlinkServer
继承 Thread
启动一个线程来进行Flink的服务
package icu.wzk.demo03; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class FlinkServer extends Thread { @Override public void run() { String ip = "0.0.0.0"; int port = 9999; StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n"); SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception { String[] splits = s.split("\\s"); for (String word : splits) { collector.collect(Tuple2.of(word, 1L)); } } }); SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator .keyBy(new KeySelector<Tuple2<String, Long>, Object>() { @Override public Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception { return stringLongTuple2.f0; } }) .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))) .sum(1); System.out.println("wait word print()"); word.print(); try { streamExecutionEnvironment.execute("stream!"); } catch (Exception e) { throw new RuntimeException(e); } } }
NumRandom
使用 ServerSocket
实现一个持续的流输出
package icu.wzk.demo03; import java.io.OutputStream; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Random; public class RandomNumClient extends Thread { @Override public void run() { String ip = "0.0.0.0"; int port = 9999; try { ServerSocket serverSocket = new ServerSocket(); InetSocketAddress address = new InetSocketAddress(ip, port); serverSocket.bind(address); Socket socket = serverSocket.accept(); OutputStream output = socket.getOutputStream(); PrintWriter writer = new PrintWriter(output, true); Random random = new Random(); for (int i = 0; i < 500; i ++) { int randomNumber = random.nextInt(10) + 1; writer.println(randomNumber); System.out.println("ServerSocket Send To Flink: " + randomNumber); Thread.sleep(200); } } catch (Exception e) { throw new RuntimeException(e); } } }
StartApp
将上述的两个类组装起来
package icu.wzk.demo03; public class StartApp { public static void main(String[] args) throws Exception { RandomNumClient randomNumClient = new RandomNumClient(); FlinkServer flinkServer = new FlinkServer(); flinkServer.start(); randomNumClient.start(); } }