死磕flink(四)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 死磕flink(四)

Flink的快速应用

通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)

实操1:单词统计案例(批数据)

1.1:需求:统计一个文件中各个单词出现的次数,把统计结果输出到文件

步骤

1:读取数据源  


// 输入路径和输出路径通过程序的参数传入  // 第一个参数是输入路径(文本文件的路径),第二个参数是输出路径(结果保存的路径)  String inPath = args[0];  String outPath = args[1];  // 获取 Flink 的批处理执行环境  // 批处理执行环境用于管理 Flink 的数据流和算子  ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();  // 从指定的输入路径读取文本文件中的数据  // readTextFile 方法返回一个 DataSet,其中每一条记录是文本文件中的一行 DataSet<String> text = executionEnvironment.readTextFile(inPath);

2:处理数据源

  a:将读到的数据源文件中的每一行根据空格切分

  b:将切分好的每个单词拼接1

  c:根据单词聚合(将相同的单词放在一起)

  d:累加相同的单词(单词后面的1进行累加)


// 对读取的数据进行处理// 1. 使用 FlatMapFunction 对每一行进行处理,将每行拆分成单词,并标记每个单词出现的次数(1)// 2. 使用 groupBy 按照单词进行分组// 3. 使用 sum 聚合每个单词的出现次数// 结果是一个 DataSet,其中包含每个单词及其出现的总次数DataSet<Tuple2<String, Integer>> dataSet = text.flatMap(new LineSplitter()).groupBy(0).sum(1);


// 自定义 FlatMapFunction 实现类,用于将每行文本拆分为单词,并创建 (单词, 1) 的 Tuple2 对象static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {       @Override   public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {      // 按空格拆分每行文本,得到每个单词      for (String word : line.split(" ")) {          // 将每个单词与计数 1 组成一个 Tuple2 对象,并收集        collector.collect(new Tuple2<String, Integer>(word, 1));       }    }  }

3:保存结果


// 将处理结果写入到指定的输出路径// writeAsCsv 方法将结果以 CSV 格式写入到文件中// \n 表示行分隔符,"" 表示字段分隔符// setParallelism(1) 设置并行度为 1,确保所有结果写入到一个文件中 dataSet.writeAsCsv(outPath, "\n", "").setParallelism(1);// 触发作业的执行// execute 方法启动 Flink 作业,并使用指定的名称executionEnvironment.execute("wordcount batch process");

实操2:单词统计案例(流数据)

nc:netcat的缩写,有着网络界的瑞士军刀美誉,因为它短小精悍,功能实用,被设计为一个简单,可靠的网络工具。flink开发的时候,经常使用Socket作为source,使用linux/mac环境开发,可以在终端中开启nc -l 9000开启netcat程序,作为服务端,发送数据;nc的作用:数据传输;文件传输;机器之间网络测速。

2.1 需求

Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。

2.2:Flink程序开发的流程

①:获得一个执行环境

②:加载/创建初始化数据

③:指定数据操作的算子

④:指定结果数据存放位置

⑤:调用execute()触发执行程序

Flink程序是延迟计算的,只有最后调用execute()方法时才会真正触发执行程序。

2.3:代码实现


import java.io.IOException;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Scanner;import java.util.concurrent.TimeUnit;
public class NcServer {
    // 定义端口号    private static final int PORT = 9000;
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(PORT);        Socket socket = server.accept();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        System.out.println("[" + simpleDateFormat.format(new Date()) + "]" + socket.getInetAddress() + "已建立连接!");
        //监控连接是否断开线程        new Thread(new CheckClientThread(socket)).start();
        //输出流        OutputStream outputStream = socket.getOutputStream();
        //控制台输入        Scanner in = new Scanner(System.in);
        while (true) {            String str = in.nextLine() + "\n";
            outputStream.write(str.getBytes());            outputStream.flush();        }
    }
    // 监控连接程序是否断开 线程类    static class CheckClientThread implements Runnable {
        private Socket socketClient;
        public CheckClientThread(Socket socketClient) {            this.socketClient = socketClient;        }
        public void run() {            while (true) {                try {                    TimeUnit.SECONDS.sleep(1);//                    socketClient.sendUrgentData(0xFF);                } catch (Exception e) {                    e.printStackTrace();                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                    System.out.println("[" + simpleDateFormat.format(new Date()) + "]" + socketClient.getInetAddress() + "连接已关闭!");                    // 断开后退出程序                    System.exit(0);                }            }        }    }}


import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;
/** *   Socket模拟实时发送单词,使用Flink实时接收数据, *   对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。 */public class WordCountStreamWindow {
    public static void main(String[] args) throws Exception {        // 监听的 IP 和端口号,默认值为 localhost 和 9000        // 可以通过命令行参数传入 IP 和端口号        String ip = "127.0.0.1";        int port = 9000;
        // 获取 Flink 的流执行环境        // StreamExecutionEnvironment 是 Flink 流处理作业的核心环境        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从指定的 IP 和端口创建一个数据流源        // socketTextStream 方法返回一个 DataStream,其中每条记录是通过 socket 接收到的一行文本        DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");
        // 使用 FlatMapFunction 对每行文本进行处理        // 将每行文本拆分成单词,并为每个单词生成一个 (单词, 1) 的 Tuple2 对象        SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {            @Override            public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {                // 按空格拆分每行文本,得到每个单词                String[] splits = s.split("\\s");                // 遍历每个单词,将每个单词与计数 1 组成一个 Tuple2 对象,并收集                for (String word : splits) {                    collector.collect(Tuple2.of(word, 1L));                }            }        });
        // 按单词进行分组,并对每个单词的计数进行聚合        // keyBy(0) 按照 Tuple2 的第 0 个元素(即单词)进行分组        // timeWindow(Time.seconds(2), Time.seconds(1)) 使用 2 秒的时间窗口,每秒钟滑动一次窗口        // sum(1) 对每个分组中的第 1 个元素(即计数)进行求和        SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(0)                .timeWindow(Time.seconds(2), Time.seconds(1))  // 使用 2 秒的时间窗口,每秒钟滑动一次窗口                .sum(1);
        // 打印计算结果到标准输出        // 输出每个时间窗口的单词计数结果        word.print();
        // 启动 Flink 作业,提交到 Flink 执行环境进行处理        streamExecutionEnvironment.execute("wordcount stream process");    }}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 存储 分布式计算
死磕-kafka(三)
死磕-kafka(三)
|
7月前
|
消息中间件 API 数据处理
Flink常见面试问题(附答案)
Apache Flink是开源的流批处理框架,提供低延迟、高吞吐的数据处理。与Hadoop不同,Flink专注于实时数据流。其核心特性包括事件时间和处理时间的概念,事件时间通过水印处理乱序事件。Flink通过检查点实现容错,支持滚动、滑动和会话窗口进行流数据处理。状态后端用于管理应用程序状态,水印用于处理延迟数据。Flink与Kafka集成能保证事件顺序,支持多种连接器如Kafka、JDBC等。其处理延迟数据、乱序事件的能力,以及Exactly-Once语义,使其在大规模数据处理中具有优势。Flink还支持表格API和DataStream API,以及多种容错和性能优化策略。
305 2
Flink常见面试问题(附答案)
|
3月前
|
资源调度 流计算 Docker
死磕flink(七)
死磕flink(七)
|
3月前
|
消息中间件 存储 算法
死磕-kafka(二)
死磕-kafka(二)
|
3月前
|
消息中间件 Kafka 调度
死磕-kafka(一)
死磕-kafka(一)
|
3月前
|
SQL 算法 API
死磕flink(三)
死磕flink(三)
|
3月前
|
存储 分布式计算 大数据
死磕Flink(二)
死磕Flink(二)
|
3月前
|
SQL 资源调度 Kubernetes
死磕flink(五)
死磕flink(五)
|
3月前
|
流计算 Docker 容器
死磕flink(八)
死磕flink(八)
|
3月前
|
消息中间件 存储 API
死磕flink(六)
死磕flink(六)