Flink的快速应用
通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)
实操1:单词统计案例(批数据)
1.1:需求:统计一个文件中各个单词出现的次数,把统计结果输出到文件
步骤
1:读取数据源
// 输入路径和输出路径通过程序的参数传入 // 第一个参数是输入路径(文本文件的路径),第二个参数是输出路径(结果保存的路径) String inPath = args[0]; String outPath = args[1]; // 获取 Flink 的批处理执行环境 // 批处理执行环境用于管理 Flink 的数据流和算子 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); // 从指定的输入路径读取文本文件中的数据 // readTextFile 方法返回一个 DataSet,其中每一条记录是文本文件中的一行 DataSet<String> text = executionEnvironment.readTextFile(inPath);
2:处理数据源
a:将读到的数据源文件中的每一行根据空格切分
b:将切分好的每个单词拼接1
c:根据单词聚合(将相同的单词放在一起)
d:累加相同的单词(单词后面的1进行累加)
// 对读取的数据进行处理// 1. 使用 FlatMapFunction 对每一行进行处理,将每行拆分成单词,并标记每个单词出现的次数(1)// 2. 使用 groupBy 按照单词进行分组// 3. 使用 sum 聚合每个单词的出现次数// 结果是一个 DataSet,其中包含每个单词及其出现的总次数DataSet<Tuple2<String, Integer>> dataSet = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
// 自定义 FlatMapFunction 实现类,用于将每行文本拆分为单词,并创建 (单词, 1) 的 Tuple2 对象static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { // 按空格拆分每行文本,得到每个单词 for (String word : line.split(" ")) { // 将每个单词与计数 1 组成一个 Tuple2 对象,并收集 collector.collect(new Tuple2<String, Integer>(word, 1)); } } }
3:保存结果
// 将处理结果写入到指定的输出路径// writeAsCsv 方法将结果以 CSV 格式写入到文件中// \n 表示行分隔符,"" 表示字段分隔符// setParallelism(1) 设置并行度为 1,确保所有结果写入到一个文件中 dataSet.writeAsCsv(outPath, "\n", "").setParallelism(1);// 触发作业的执行// execute 方法启动 Flink 作业,并使用指定的名称executionEnvironment.execute("wordcount batch process");
实操2:单词统计案例(流数据)
nc:netcat的缩写,有着网络界的瑞士军刀美誉,因为它短小精悍,功能实用,被设计为一个简单,可靠的网络工具。flink开发的时候,经常使用Socket作为source,使用linux/mac环境开发,可以在终端中开启nc -l 9000开启netcat程序,作为服务端,发送数据;nc的作用:数据传输;文件传输;机器之间网络测速。
2.1 需求
Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。
2.2:Flink程序开发的流程
①:获得一个执行环境
②:加载/创建初始化数据
③:指定数据操作的算子
④:指定结果数据存放位置
⑤:调用execute()触发执行程序
Flink程序是延迟计算的,只有最后调用execute()方法时才会真正触发执行程序。
2.3:代码实现
import java.io.IOException;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Scanner;import java.util.concurrent.TimeUnit; public class NcServer { // 定义端口号 private static final int PORT = 9000; public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(PORT); Socket socket = server.accept(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("[" + simpleDateFormat.format(new Date()) + "]" + socket.getInetAddress() + "已建立连接!"); //监控连接是否断开线程 new Thread(new CheckClientThread(socket)).start(); //输出流 OutputStream outputStream = socket.getOutputStream(); //控制台输入 Scanner in = new Scanner(System.in); while (true) { String str = in.nextLine() + "\n"; outputStream.write(str.getBytes()); outputStream.flush(); } } // 监控连接程序是否断开 线程类 static class CheckClientThread implements Runnable { private Socket socketClient; public CheckClientThread(Socket socketClient) { this.socketClient = socketClient; } public void run() { while (true) { try { TimeUnit.SECONDS.sleep(1);// socketClient.sendUrgentData(0xFF); } catch (Exception e) { e.printStackTrace(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("[" + simpleDateFormat.format(new Date()) + "]" + socketClient.getInetAddress() + "连接已关闭!"); // 断开后退出程序 System.exit(0); } } } }}
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector; /** * Socket模拟实时发送单词,使用Flink实时接收数据, * 对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。 */public class WordCountStreamWindow { public static void main(String[] args) throws Exception { // 监听的 IP 和端口号,默认值为 localhost 和 9000 // 可以通过命令行参数传入 IP 和端口号 String ip = "127.0.0.1"; int port = 9000; // 获取 Flink 的流执行环境 // StreamExecutionEnvironment 是 Flink 流处理作业的核心环境 StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // 从指定的 IP 和端口创建一个数据流源 // socketTextStream 方法返回一个 DataStream,其中每条记录是通过 socket 接收到的一行文本 DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n"); // 使用 FlatMapFunction 对每行文本进行处理 // 将每行文本拆分成单词,并为每个单词生成一个 (单词, 1) 的 Tuple2 对象 SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception { // 按空格拆分每行文本,得到每个单词 String[] splits = s.split("\\s"); // 遍历每个单词,将每个单词与计数 1 组成一个 Tuple2 对象,并收集 for (String word : splits) { collector.collect(Tuple2.of(word, 1L)); } } }); // 按单词进行分组,并对每个单词的计数进行聚合 // keyBy(0) 按照 Tuple2 的第 0 个元素(即单词)进行分组 // timeWindow(Time.seconds(2), Time.seconds(1)) 使用 2 秒的时间窗口,每秒钟滑动一次窗口 // sum(1) 对每个分组中的第 1 个元素(即计数)进行求和 SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(0) .timeWindow(Time.seconds(2), Time.seconds(1)) // 使用 2 秒的时间窗口,每秒钟滑动一次窗口 .sum(1); // 打印计算结果到标准输出 // 输出每个时间窗口的单词计数结果 word.print(); // 启动 Flink 作业,提交到 Flink 执行环境进行处理 streamExecutionEnvironment.execute("wordcount stream process"); }}