死磕flink(六)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 死磕flink(六)

Flink开发

Flink应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示 :

Source:数据源,Flink在流处理和批处理上的source大概有4类:

①、基于本地集合的Source

②、基于文件的Source

③、基于网络套接字的Source

④、自定义的Source,自定义Source常见的有Apache Kafka,Amazon Kinbesis Streams,RabbitMQ,Twitter Streaming API,Apache NIFI等。当然也可以自定义自己的Source。

Transformation:数据转换的各种操作:有Map/flatMap/filter/KeyBy/Reduce/Fold/Aggregations/Window/WindowAll/Union/Window Join/Split/Select/Project等,操作很多,可以将数据转换计算成你想要的数据。

Sink:接收器,Flink将转换计算后的数据发送的地点,可能需要存储下来,Flink常见的Sink大概有如下几类:

①、写入文件  ②、打印输出  ③、写入Socket   ④、自定义的sink,自定义的sink常见的有Apache Kafka,RabbitMQ,MySQL,ElasticSearch,Apache Cassandra,Hadoop FilSystem 等,也可以自定义自己的Sink.

Flink连接器

在实际生产环境中,数据通常分布在各种不同的系统中,包括文件系统,数据库,消息队列等。Flink作为一个大数据处理框架,需要与这些外部系统进行数据交互,以实现数据的输入,处理和输出。在Flink中,Source和Sink是两个关键模块,它们扮演着与外部系统进行数据连接和交互的重要角色,被称为外部连接器(Connector).

①、Source数据源:Source是Flink作业的输入模块,用于从外部系统中读取数据并将其转换为Flink的数据流,Source负责实现与不同数据源的交互逻辑,将外部数据源的数据逐条或批量读取到Flink的数据流中,以便后续的数据处理。常见的Source包括从文件中读取数据,从消息队列如Kafka,RabbitMQ中消费数据,从数据库中读取数据等。

②、Sink数据接收器:Sink是Flink作业的输出模块,用于将Flink计算的结果输出到外部系统中,Sink负责实现将Flink数据流中的数据写入到外部数据源,以便后续的持久化存储,展示或者其他处理。Sink的实现需要考虑数据的可靠性,一致性以及可能的事务性要求。常见的Sink包括数据写入文件,将数据写入数据,将数据写入到消息队列等。

外部链接器在Flink中的作用非常关键,它们使得Flink作业可以与各种不同类型的数据源和数据目的地的进行交换,实现了数据的流入和流出。这种灵活的连接机制使得Flink在处理大数据时能够更好地集成已有的系统和数据,实现复杂的数据流处理和分析任务。

Source

Flink在批处理中常见的Source主要有两大类:

①、基于本地集合的source  ②、基于文件的source

基于本地集合的source

在flink中最常见的创建本地集合的DataSet方式有三种:

A:使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

B:使用env.fromCollection(),这种方式支持多种Collection的具体类型。


public class BatchFromCollection {    public static void main(String[] args) throws Exception {        // 获取flink执行环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        // 0.用element创建DataSet(fromElements)        DataSet<String> ds0 = env.fromElements("spark", "flink");        ds0.print();        // 1.用Tuple创建DataSet(fromElements)        DataSet<Tuple2<Integer, String>> ds1 = env.fromElements(            new Tuple2<>(1, "spark"),            new Tuple2<>(2, "flink")        );        ds1.print();        // 2.用Array创建DataSet        DataSet<String> ds2 = env.fromCollection(new ArrayList<String>() {{            add("spark");            add("flink");        }});        ds2.print();        // 3.用ArrayDeque创建DataSet        DataSet<String> ds3 = env.fromCollection(new ArrayDeque<String>() {{            add("spark");            add("flink");        }});        ds3.print();        // 8.用Stack创建DataSet        DataSet<String> ds8 = env.fromCollection(new Stack<String>() {{            add("spark");            add("flink");        }});        ds8.print();       // 9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)        DataSet<String> ds9 = env.fromCollection(Stream.of("spark", "flink"));        ds9.print();        // 15.用HashMap创建DataSet        DataSet<Tuple2<Integer, String>> ds15 = env.fromCollection(new HashMap<Integer, String>() {{            put(1, "spark");            put(2, "flink");        }}.entrySet());        ds15.print();        // 16.用Range创建DataSet        DataSet<Integer> ds16 = env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()));        ds16.print();        // 17.用generateSequence创建DataSet        DataSet<Long> ds17 = env.generateSequence(1, 9);        ds17.print();    } }

基于文件的Source

Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:

①、读取本地文件数据;②、读取HDFS文件数据;③、读取CSV文件数据;④、读取压缩文件;⑤、遍历目录

下面分别介绍每个数据源的加载方式

读取本地文件


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile {    public static void main(String[] args) throws Exception {        // 使用readTextFile读取本地文件        // 初始化环境        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();        // 加载数据        DataSet<String> datas = environment.readTextFile("data.txt");        // 触发程序执行        datas.print();    } }

读取HDFS文件数据


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile {    public static void main(String[] args) throws Exception {        // 使用readTextFile读取本地文件        // 初始化环境        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();        // 加载数据        DataSet<String> datas = environment.readTextFile("hdfs://node01:8020/README.txt");        // 触发程序执行        datas.print();    } }

读取CSV文件数据


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.MapFunction; public class BatchFromCsvFile {    public static void main(String[] args) throws Exception {        // 初始化环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        // 用于映射CSV文件的POJO class        public static class Student {            public int id;            public String name;            public Student() {}            public Student(int id, String name) {                this.id = id;                this.name = name;            }            @Override        }      } } public String toString() {     return "Student(" + id + ", " + name + ")"; } // 读取CSV文件DataSet<Student> csvDataSet = env.readCsvFile("./data/input/student.csv") .ignoreFirstLine() .pojoType(Student.class, "id", "name");    csvDataSet.print();    } }

读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputFormat方法,flink可以自动识别并且解压缩,但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩格式
扩展名
并行化
DEFALTE .defkate
no
GZIP
.gz;.gzip
no
Bzip2
.bz2
no
XZ
.xz
no


public class BatchFromCompressFile { public static void main(String[] args) throws Exception { // 初始化环境     ExecutionEnvironment env =      ExecutionEnvironment.getExecutionEnvironment(); // 触发程序执行     result.print();    } }

遍历目录:

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。对于从文件中读取数据,当读取到数个文件夹的时候,嵌套的文件默认是不会读取的,只会读取第一个文件,其他的都会被忽略,所以我们需要使用recursive.file.enumeration进行递归读取。


public class CustomFileInputFormatExample {    public static void main(String[] args) throws Exception {        // 初始化环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 设置目录路径        String directoryPath = "D:\\BaiduNetdiskDownload\\data";        // 自定义输入格式        TextInputFormat format = new TextInputFormat(new Path(directoryPath));        format.setFilePath(directoryPath);                // 使用自定义输入格式读取数据        DataStream<String> text = env.createInput(format);        // 打印结果        text.print();        // 执行作业        env.execute("Custom File Input Format Example");    } }


public class StreamFromKafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","teacher2:9092");        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String> ("mytopic2", new SimpleStringSchema(), properties);        DataStreamSource<String> data = env.addSource(consumer);        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {                for (String word : s.split(" ")) {                    collector.collect(Tuple2.of(word, 1));                }            }        });SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);   result.print();    env.execute();  } }

自定义source


private static class SimpleSource  implements SourceFunction<Tuple2<String, Integer>> {    private int offset = 0;    private boolean isRunning = true;    @Override    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {    while (isRunning) {      Thread.sleep(500);      ctx.collect(new Tuple2<>("" + offset, offset));      offset++;      if (offset == 1000) {       isRunning = false;     }    } }     @Override     public void cancel() {     isRunning = false;    }}

自定义source,从0开始计数,将数据发送到下游在主逻辑中调用这个source


DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource());

sink

flink在批处理中常见的sink

①、基于本地集合的sink;②、基于文件的sink

基于本地集合的sink

目标:基于下列数据,分别进行打印输出,error输出,collect()


(19, "zhangsan", 178.8), (17, "lisi", 168.8), (18, "wangwu", 184.8), (21, "zhaoliu", 164.8)

代码:


public class BatchSinkCollection { public static void main(String[] args) throws Exception { ExecutionEnvironment env =     ExecutionEnvironment.getExecutionEnvironment();     List<Tuple3<Integer, String, Double>> stuData = new ArrayList<>(); stuData.add(new Tuple3<>(19, "zhangsan", 178.8)); stuData.add(new Tuple3<>(17, "lisi", 168.8)); stuData.add(new Tuple3<>(18, "wangwu", 184.8)); stuData.add(new Tuple3<>(21, "zhaoliu", 164.8)); DataSet<Tuple3<Integer, String, Double>> stu =  env.fromCollection(stuData); stu.print(); stu.printToErr(); stu.collect().forEach(System.out::println); env.execute()    } }

基于文件的sink

①、flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。

②、flink支持多种文件的存储格式,包括text文件,csv文件等。

③、writeAsText():TextOutPutFormat:将元素作为字符串写入行,字符串是通过调用每个元素的toString()方法获得。

将数据写入本地文件

目标:基于下列数据,写入到文件中


Map(1 -> "spark", 2 -> "flink")

代码:


public class BatchSinkFile {   public static void main(String[] args) throws Exception {     ExecutionEnvironment env =      ExecutionEnvironment.getExecutionEnvironment();     Map<Integer, String> data1 = new HashMap<>();     data1.put(1, "spark");     data1.put(2, "flink");     DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);     ds1.setParallelism(1)     .writeAsText("test/data1/aa", FileSystem.WriteMode.OVERWRITE)     .setParallelism(1);     env.execute();    } }

将数据写入到HDFS


public class BatchSinkFile {   public static void main(String[] args) throws Exception {   ExecutionEnvironment env =    ExecutionEnvironment.getExecutionEnvironment();   Map<Integer, String> data1 = new HashMap<>();   data1.put(1, "spark");   data1.put(2, "flink");   DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);   ds1.setParallelism(1)   .writeAsText("hdfs://bigdata1:9000/a",    FileSystem.WriteMode.OVERWRITE)   .setParallelism(1);   env.execute();   } }

Flink API

Flink的API层提供了DataStream API和DataSet API,分别用于流式处理和批处理,这两个API允许开发者使用各种操作符合转换来处理数据,包括转换,连接,聚合,窗口等计算任务

在Flink中,根据不同的场景(流处理和批处理),需要设置不同的执行环境,在批处理场景下,需要使用DataSet API,并设置批处理执行环境,在流处理场景下,需要使用DataStream API,并设置流处理执行环境。

以下是在不同场景下设置执行环境的示例代码,分别展示了批处理和流处理的情况,包括scala和java语言

3b4f721c87991a4c4f90cd7c78868684.png

批处理场景-设置DataSet API的批处理执行环境(Java)


import org.apache.flink.api.java.ExecutionEnvironment; public class BatchJobExample {     public static void main(String[] args) throws Exception {      // 创建批处理执行环境    ExecutionEnvironment env =        ExecutionEnvironment.getExecutionEnvironment();      // 在这里添加批处理作业的代码逻辑      // ...   } }

流处理场景-设置DataStream API的流处理执行环境(Java)


public class StreamJobExample {    public static void main(String[] args) throws Exception {       // 创建流处理执行环境       StreamExecutionEnvironment env =          StreamExecutionEnvironment.getExecutionEnvironment();      // 在这里添加流处理作业的代码逻辑      // ...      // 执行作业      env.execute("Stream Job Example");   } }

批处理场景-设置DataSet API的批处理执行环境(Scala)


import org.apache.flink.api.scala._ object BatchJobExample {   def main(args: Array[String]): Unit = {    // 创建批处理执行环境   val env = ExecutionEnvironment.getExecutionEnvironment   // 在这里添加批处理作业的代码逻辑   // ...   // 执行作业   env.execute("Batch Job Example")   } }

流处理场景-设置DataStream API的批处理执行环境(Scala)


import org.apache.flink.streaming.api.scala._   object StreamJobExample {   def main(args: Array[String]): Unit = {   // 创建流处理执行环境   val env = StreamExecutionEnvironment.getExecutionEnvironment   // 在这里添加流处理作业的代码逻辑   // ...    // 执行作业    env.execute("Stream Job Example")    } }

以下是一些常用的API函数和操作,以表格形式提供:

API类型
常用函数和操作
描述
DataStream API
map,flatM 对数据流中的每个元素进行映射和扁平化操作。

filter
过滤出满足条件的元素

keyBy
按指定的字段或键对数据流进行分区

window
将数据流按照时间窗口或计数窗口划分

reduce,fold
在窗口内对元素进行聚合操作

union
合并多个数据流

connect,coMap,coFlatMap
连接两个不同类型的数据流并应用相应的函数

timeWindow,countWindow
定义时间窗口或计数窗口

process
自定义处理函数,实现更复杂的流处理逻辑。
DataSet API
map,flatMap
对数据流中的每个元素进行映射和扁平化操作

filter
过滤出满足条件的元素

group by
按指定的字段或键对数据集进行分组

reduce,fold 对分组后的数据集进行聚合操作

join,coGroup
对两个数据集进行内连接和外连接操作

cross,cartesian
对两个数据集进行笛卡尔积操作

distinct
去除数据集中的重复数据

groupBy ,aggregate
分组并对分组后的数据集进行聚合操作

first,min,max
获取数据集中的第一个,最小或者最大元素

sum,avg
计算数据集中的元素的和或平均值。

collect
将数据集中的元素收集到本地的集合中

这些API函数和操作涵盖了Flink中流处理和批处理常见操作,可以帮助用户实现各种复杂的数据处理和分析任务,根据实际需求,可以选择适合的API函数和操作来构建Flink作业。下面是一些常见的API的说明:

map

将DataSet中的每一个元素转换为另外一个元素

示例:使用map操作,将以下数据


"1,张三", "2,李四", "3,王五", "4,赵六"

转换为一个scala的样例类

步骤:

①、获取ExecutionEnvironment环境

②、使用fromCollection构建数据源

③、创建一个User样例类

④、使用map操作执行转换

⑤、打印测试


public class User {    public String id;    public String name;    public User() {}    public User(String id, String name) {        this.id = id;        this.name = name;    }    @Override    public String toString() {        return "User(" + id + ", " + name + ")";    }    public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        DataSet<String> textDataSet = env.fromCollection(            Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六")        );        DataSet<User> userDataSet = textDataSet.map(new MapFunction<String, User>() {            @Override            public User map(String text) throws Exception {                String[] fieldArr = text.split(",");                return new User(fieldArr[0], fieldArr[1]);            }        });        userDataSet.print();    } }
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
消息中间件 API 数据处理
Flink常见面试问题(附答案)
Apache Flink是开源的流批处理框架,提供低延迟、高吞吐的数据处理。与Hadoop不同,Flink专注于实时数据流。其核心特性包括事件时间和处理时间的概念,事件时间通过水印处理乱序事件。Flink通过检查点实现容错,支持滚动、滑动和会话窗口进行流数据处理。状态后端用于管理应用程序状态,水印用于处理延迟数据。Flink与Kafka集成能保证事件顺序,支持多种连接器如Kafka、JDBC等。其处理延迟数据、乱序事件的能力,以及Exactly-Once语义,使其在大规模数据处理中具有优势。Flink还支持表格API和DataStream API,以及多种容错和性能优化策略。
242 2
Flink常见面试问题(附答案)
|
2天前
|
消息中间件 存储 分布式计算
死磕-kafka(三)
死磕-kafka(三)
|
2天前
|
消息中间件 存储 算法
死磕-kafka(二)
死磕-kafka(二)
|
2天前
|
消息中间件 Kafka 调度
死磕-kafka(一)
死磕-kafka(一)
|
2天前
|
分布式计算 大数据 数据处理
死磕Flink(一)
死磕Flink(一)
|
2天前
|
SQL 资源调度 Kubernetes
死磕flink(五)
死磕flink(五)
|
2天前
|
存储 分布式计算 大数据
死磕Flink(二)
死磕Flink(二)
|
2天前
|
资源调度 流计算 Docker
死磕flink(七)
死磕flink(七)
|
2天前
|
流计算 Docker 容器
死磕flink(八)
死磕flink(八)
|
2天前
|
Linux 流计算
死磕flink(四)
死磕flink(四)