死磕flink(六)

简介: 死磕flink(六)

Flink开发

Flink应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示 :

Source:数据源,Flink在流处理和批处理上的source大概有4类:

①、基于本地集合的Source

②、基于文件的Source

③、基于网络套接字的Source

④、自定义的Source,自定义Source常见的有Apache Kafka,Amazon Kinbesis Streams,RabbitMQ,Twitter Streaming API,Apache NIFI等。当然也可以自定义自己的Source。

Transformation:数据转换的各种操作:有Map/flatMap/filter/KeyBy/Reduce/Fold/Aggregations/Window/WindowAll/Union/Window Join/Split/Select/Project等,操作很多,可以将数据转换计算成你想要的数据。

Sink:接收器,Flink将转换计算后的数据发送的地点,可能需要存储下来,Flink常见的Sink大概有如下几类:

①、写入文件  ②、打印输出  ③、写入Socket   ④、自定义的sink,自定义的sink常见的有Apache Kafka,RabbitMQ,MySQL,ElasticSearch,Apache Cassandra,Hadoop FilSystem 等,也可以自定义自己的Sink.

Flink连接器

在实际生产环境中,数据通常分布在各种不同的系统中,包括文件系统,数据库,消息队列等。Flink作为一个大数据处理框架,需要与这些外部系统进行数据交互,以实现数据的输入,处理和输出。在Flink中,Source和Sink是两个关键模块,它们扮演着与外部系统进行数据连接和交互的重要角色,被称为外部连接器(Connector).

①、Source数据源:Source是Flink作业的输入模块,用于从外部系统中读取数据并将其转换为Flink的数据流,Source负责实现与不同数据源的交互逻辑,将外部数据源的数据逐条或批量读取到Flink的数据流中,以便后续的数据处理。常见的Source包括从文件中读取数据,从消息队列如Kafka,RabbitMQ中消费数据,从数据库中读取数据等。

②、Sink数据接收器:Sink是Flink作业的输出模块,用于将Flink计算的结果输出到外部系统中,Sink负责实现将Flink数据流中的数据写入到外部数据源,以便后续的持久化存储,展示或者其他处理。Sink的实现需要考虑数据的可靠性,一致性以及可能的事务性要求。常见的Sink包括数据写入文件,将数据写入数据,将数据写入到消息队列等。

外部链接器在Flink中的作用非常关键,它们使得Flink作业可以与各种不同类型的数据源和数据目的地的进行交换,实现了数据的流入和流出。这种灵活的连接机制使得Flink在处理大数据时能够更好地集成已有的系统和数据,实现复杂的数据流处理和分析任务。

Source

Flink在批处理中常见的Source主要有两大类:

①、基于本地集合的source  ②、基于文件的source

基于本地集合的source

在flink中最常见的创建本地集合的DataSet方式有三种:

A:使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

B:使用env.fromCollection(),这种方式支持多种Collection的具体类型。


public class BatchFromCollection {    public static void main(String[] args) throws Exception {        // 获取flink执行环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        // 0.用element创建DataSet(fromElements)        DataSet<String> ds0 = env.fromElements("spark", "flink");        ds0.print();        // 1.用Tuple创建DataSet(fromElements)        DataSet<Tuple2<Integer, String>> ds1 = env.fromElements(            new Tuple2<>(1, "spark"),            new Tuple2<>(2, "flink")        );        ds1.print();        // 2.用Array创建DataSet        DataSet<String> ds2 = env.fromCollection(new ArrayList<String>() {{            add("spark");            add("flink");        }});        ds2.print();        // 3.用ArrayDeque创建DataSet        DataSet<String> ds3 = env.fromCollection(new ArrayDeque<String>() {{            add("spark");            add("flink");        }});        ds3.print();        // 8.用Stack创建DataSet        DataSet<String> ds8 = env.fromCollection(new Stack<String>() {{            add("spark");            add("flink");        }});        ds8.print();       // 9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)        DataSet<String> ds9 = env.fromCollection(Stream.of("spark", "flink"));        ds9.print();        // 15.用HashMap创建DataSet        DataSet<Tuple2<Integer, String>> ds15 = env.fromCollection(new HashMap<Integer, String>() {{            put(1, "spark");            put(2, "flink");        }}.entrySet());        ds15.print();        // 16.用Range创建DataSet        DataSet<Integer> ds16 = env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()));        ds16.print();        // 17.用generateSequence创建DataSet        DataSet<Long> ds17 = env.generateSequence(1, 9);        ds17.print();    } }

基于文件的Source

Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:

①、读取本地文件数据;②、读取HDFS文件数据;③、读取CSV文件数据;④、读取压缩文件;⑤、遍历目录

下面分别介绍每个数据源的加载方式

读取本地文件


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile {    public static void main(String[] args) throws Exception {        // 使用readTextFile读取本地文件        // 初始化环境        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();        // 加载数据        DataSet<String> datas = environment.readTextFile("data.txt");        // 触发程序执行        datas.print();    } }

读取HDFS文件数据


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile {    public static void main(String[] args) throws Exception {        // 使用readTextFile读取本地文件        // 初始化环境        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();        // 加载数据        DataSet<String> datas = environment.readTextFile("hdfs://node01:8020/README.txt");        // 触发程序执行        datas.print();    } }

读取CSV文件数据


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.MapFunction; public class BatchFromCsvFile {    public static void main(String[] args) throws Exception {        // 初始化环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        // 用于映射CSV文件的POJO class        public static class Student {            public int id;            public String name;            public Student() {}            public Student(int id, String name) {                this.id = id;                this.name = name;            }            @Override        }      } } public String toString() {     return "Student(" + id + ", " + name + ")"; } // 读取CSV文件DataSet<Student> csvDataSet = env.readCsvFile("./data/input/student.csv") .ignoreFirstLine() .pojoType(Student.class, "id", "name");    csvDataSet.print();    } }

读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputFormat方法,flink可以自动识别并且解压缩,但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩格式
扩展名
并行化
DEFALTE .defkate
no
GZIP
.gz;.gzip
no
Bzip2
.bz2
no
XZ
.xz
no


public class BatchFromCompressFile { public static void main(String[] args) throws Exception { // 初始化环境     ExecutionEnvironment env =      ExecutionEnvironment.getExecutionEnvironment(); // 触发程序执行     result.print();    } }

遍历目录:

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。对于从文件中读取数据,当读取到数个文件夹的时候,嵌套的文件默认是不会读取的,只会读取第一个文件,其他的都会被忽略,所以我们需要使用recursive.file.enumeration进行递归读取。


public class CustomFileInputFormatExample {    public static void main(String[] args) throws Exception {        // 初始化环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 设置目录路径        String directoryPath = "D:\\BaiduNetdiskDownload\\data";        // 自定义输入格式        TextInputFormat format = new TextInputFormat(new Path(directoryPath));        format.setFilePath(directoryPath);                // 使用自定义输入格式读取数据        DataStream<String> text = env.createInput(format);        // 打印结果        text.print();        // 执行作业        env.execute("Custom File Input Format Example");    } }


public class StreamFromKafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","teacher2:9092");        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String> ("mytopic2", new SimpleStringSchema(), properties);        DataStreamSource<String> data = env.addSource(consumer);        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {                for (String word : s.split(" ")) {                    collector.collect(Tuple2.of(word, 1));                }            }        });SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);   result.print();    env.execute();  } }

自定义source


private static class SimpleSource  implements SourceFunction<Tuple2<String, Integer>> {    private int offset = 0;    private boolean isRunning = true;    @Override    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {    while (isRunning) {      Thread.sleep(500);      ctx.collect(new Tuple2<>("" + offset, offset));      offset++;      if (offset == 1000) {       isRunning = false;     }    } }     @Override     public void cancel() {     isRunning = false;    }}

自定义source,从0开始计数,将数据发送到下游在主逻辑中调用这个source


DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource());

sink

flink在批处理中常见的sink

①、基于本地集合的sink;②、基于文件的sink

基于本地集合的sink

目标:基于下列数据,分别进行打印输出,error输出,collect()


(19, "zhangsan", 178.8), (17, "lisi", 168.8), (18, "wangwu", 184.8), (21, "zhaoliu", 164.8)

代码:


public class BatchSinkCollection { public static void main(String[] args) throws Exception { ExecutionEnvironment env =     ExecutionEnvironment.getExecutionEnvironment();     List<Tuple3<Integer, String, Double>> stuData = new ArrayList<>(); stuData.add(new Tuple3<>(19, "zhangsan", 178.8)); stuData.add(new Tuple3<>(17, "lisi", 168.8)); stuData.add(new Tuple3<>(18, "wangwu", 184.8)); stuData.add(new Tuple3<>(21, "zhaoliu", 164.8)); DataSet<Tuple3<Integer, String, Double>> stu =  env.fromCollection(stuData); stu.print(); stu.printToErr(); stu.collect().forEach(System.out::println); env.execute()    } }

基于文件的sink

①、flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。

②、flink支持多种文件的存储格式,包括text文件,csv文件等。

③、writeAsText():TextOutPutFormat:将元素作为字符串写入行,字符串是通过调用每个元素的toString()方法获得。

将数据写入本地文件

目标:基于下列数据,写入到文件中


Map(1 -> "spark", 2 -> "flink")

代码:


public class BatchSinkFile {   public static void main(String[] args) throws Exception {     ExecutionEnvironment env =      ExecutionEnvironment.getExecutionEnvironment();     Map<Integer, String> data1 = new HashMap<>();     data1.put(1, "spark");     data1.put(2, "flink");     DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);     ds1.setParallelism(1)     .writeAsText("test/data1/aa", FileSystem.WriteMode.OVERWRITE)     .setParallelism(1);     env.execute();    } }

将数据写入到HDFS


public class BatchSinkFile {   public static void main(String[] args) throws Exception {   ExecutionEnvironment env =    ExecutionEnvironment.getExecutionEnvironment();   Map<Integer, String> data1 = new HashMap<>();   data1.put(1, "spark");   data1.put(2, "flink");   DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);   ds1.setParallelism(1)   .writeAsText("hdfs://bigdata1:9000/a",    FileSystem.WriteMode.OVERWRITE)   .setParallelism(1);   env.execute();   } }

Flink API

Flink的API层提供了DataStream API和DataSet API,分别用于流式处理和批处理,这两个API允许开发者使用各种操作符合转换来处理数据,包括转换,连接,聚合,窗口等计算任务

在Flink中,根据不同的场景(流处理和批处理),需要设置不同的执行环境,在批处理场景下,需要使用DataSet API,并设置批处理执行环境,在流处理场景下,需要使用DataStream API,并设置流处理执行环境。

以下是在不同场景下设置执行环境的示例代码,分别展示了批处理和流处理的情况,包括scala和java语言

3b4f721c87991a4c4f90cd7c78868684.png

批处理场景-设置DataSet API的批处理执行环境(Java)


import org.apache.flink.api.java.ExecutionEnvironment; public class BatchJobExample {     public static void main(String[] args) throws Exception {      // 创建批处理执行环境    ExecutionEnvironment env =        ExecutionEnvironment.getExecutionEnvironment();      // 在这里添加批处理作业的代码逻辑      // ...   } }

流处理场景-设置DataStream API的流处理执行环境(Java)


public class StreamJobExample {    public static void main(String[] args) throws Exception {       // 创建流处理执行环境       StreamExecutionEnvironment env =          StreamExecutionEnvironment.getExecutionEnvironment();      // 在这里添加流处理作业的代码逻辑      // ...      // 执行作业      env.execute("Stream Job Example");   } }

批处理场景-设置DataSet API的批处理执行环境(Scala)


import org.apache.flink.api.scala._ object BatchJobExample {   def main(args: Array[String]): Unit = {    // 创建批处理执行环境   val env = ExecutionEnvironment.getExecutionEnvironment   // 在这里添加批处理作业的代码逻辑   // ...   // 执行作业   env.execute("Batch Job Example")   } }

流处理场景-设置DataStream API的批处理执行环境(Scala)


import org.apache.flink.streaming.api.scala._   object StreamJobExample {   def main(args: Array[String]): Unit = {   // 创建流处理执行环境   val env = StreamExecutionEnvironment.getExecutionEnvironment   // 在这里添加流处理作业的代码逻辑   // ...    // 执行作业    env.execute("Stream Job Example")    } }

以下是一些常用的API函数和操作,以表格形式提供:

API类型
常用函数和操作
描述
DataStream API
map,flatM 对数据流中的每个元素进行映射和扁平化操作。

filter
过滤出满足条件的元素

keyBy
按指定的字段或键对数据流进行分区

window
将数据流按照时间窗口或计数窗口划分

reduce,fold
在窗口内对元素进行聚合操作

union
合并多个数据流

connect,coMap,coFlatMap
连接两个不同类型的数据流并应用相应的函数

timeWindow,countWindow
定义时间窗口或计数窗口

process
自定义处理函数,实现更复杂的流处理逻辑。
DataSet API
map,flatMap
对数据流中的每个元素进行映射和扁平化操作

filter
过滤出满足条件的元素

group by
按指定的字段或键对数据集进行分组

reduce,fold 对分组后的数据集进行聚合操作

join,coGroup
对两个数据集进行内连接和外连接操作

cross,cartesian
对两个数据集进行笛卡尔积操作

distinct
去除数据集中的重复数据

groupBy ,aggregate
分组并对分组后的数据集进行聚合操作

first,min,max
获取数据集中的第一个,最小或者最大元素

sum,avg
计算数据集中的元素的和或平均值。

collect
将数据集中的元素收集到本地的集合中

这些API函数和操作涵盖了Flink中流处理和批处理常见操作,可以帮助用户实现各种复杂的数据处理和分析任务,根据实际需求,可以选择适合的API函数和操作来构建Flink作业。下面是一些常见的API的说明:

map

将DataSet中的每一个元素转换为另外一个元素

示例:使用map操作,将以下数据


"1,张三", "2,李四", "3,王五", "4,赵六"

转换为一个scala的样例类

步骤:

①、获取ExecutionEnvironment环境

②、使用fromCollection构建数据源

③、创建一个User样例类

④、使用map操作执行转换

⑤、打印测试


public class User {    public String id;    public String name;    public User() {}    public User(String id, String name) {        this.id = id;        this.name = name;    }    @Override    public String toString() {        return "User(" + id + ", " + name + ")";    }    public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        DataSet<String> textDataSet = env.fromCollection(            Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六")        );        DataSet<User> userDataSet = textDataSet.map(new MapFunction<String, User>() {            @Override            public User map(String text) throws Exception {                String[] fieldArr = text.split(",");                return new User(fieldArr[0], fieldArr[1]);            }        });        userDataSet.print();    } }
相关文章
|
8天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
4天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2464 14
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
4天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1503 14
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
1月前
|
运维 Cloud Native Devops
一线实战:运维人少,我们从 0 到 1 实践 DevOps 和云原生
上海经证科技有限公司为有效推进软件项目管理和开发工作,选择了阿里云云效作为 DevOps 解决方案。通过云效,实现了从 0 开始,到现在近百个微服务、数百条流水线与应用交付的全面覆盖,有效支撑了敏捷开发流程。
19274 29
|
1月前
|
人工智能 自然语言处理 搜索推荐
阿里云Elasticsearch AI搜索实践
本文介绍了阿里云 Elasticsearch 在AI 搜索方面的技术实践与探索。
18822 20
|
1月前
|
Rust Apache 对象存储
Apache Paimon V0.9最新进展
Apache Paimon V0.9 版本即将发布,此版本带来了多项新特性并解决了关键挑战。Paimon自2022年从Flink社区诞生以来迅速成长,已成为Apache顶级项目,并广泛应用于阿里集团内外的多家企业。
17515 13
Apache Paimon V0.9最新进展
|
6天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
368 11
|
1月前
|
存储 人工智能 前端开发
AI 网关零代码解决 AI 幻觉问题
本文主要介绍了 AI Agent 的背景,概念,探讨了 AI Agent 网关插件的使用方法,效果以及实现原理。
18697 16
|
2天前
|
算法 Java
JAVA并发编程系列(8)CountDownLatch核心原理
面试中的编程题目“模拟拼团”,我们通过使用CountDownLatch来实现多线程条件下的拼团逻辑。此外,深入解析了CountDownLatch的核心原理及其内部实现机制,特别是`await()`方法的具体工作流程。通过详细分析源码与内部结构,帮助读者更好地理解并发编程的关键概念。
|
2天前
|
SQL 监控 druid
Druid连接池学习
Druid学习笔记,使用Druid进行密码加密。参考文档:https://github.com/alibaba/druid
195 82