死磕flink(六)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 死磕flink(六)

Flink开发

Flink应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示 :

Source:数据源,Flink在流处理和批处理上的source大概有4类:

①、基于本地集合的Source

②、基于文件的Source

③、基于网络套接字的Source

④、自定义的Source,自定义Source常见的有Apache Kafka,Amazon Kinbesis Streams,RabbitMQ,Twitter Streaming API,Apache NIFI等。当然也可以自定义自己的Source。

Transformation:数据转换的各种操作:有Map/flatMap/filter/KeyBy/Reduce/Fold/Aggregations/Window/WindowAll/Union/Window Join/Split/Select/Project等,操作很多,可以将数据转换计算成你想要的数据。

Sink:接收器,Flink将转换计算后的数据发送的地点,可能需要存储下来,Flink常见的Sink大概有如下几类:

①、写入文件  ②、打印输出  ③、写入Socket   ④、自定义的sink,自定义的sink常见的有Apache Kafka,RabbitMQ,MySQL,ElasticSearch,Apache Cassandra,Hadoop FilSystem 等,也可以自定义自己的Sink.

Flink连接器

在实际生产环境中,数据通常分布在各种不同的系统中,包括文件系统,数据库,消息队列等。Flink作为一个大数据处理框架,需要与这些外部系统进行数据交互,以实现数据的输入,处理和输出。在Flink中,Source和Sink是两个关键模块,它们扮演着与外部系统进行数据连接和交互的重要角色,被称为外部连接器(Connector).

①、Source数据源:Source是Flink作业的输入模块,用于从外部系统中读取数据并将其转换为Flink的数据流,Source负责实现与不同数据源的交互逻辑,将外部数据源的数据逐条或批量读取到Flink的数据流中,以便后续的数据处理。常见的Source包括从文件中读取数据,从消息队列如Kafka,RabbitMQ中消费数据,从数据库中读取数据等。

②、Sink数据接收器:Sink是Flink作业的输出模块,用于将Flink计算的结果输出到外部系统中,Sink负责实现将Flink数据流中的数据写入到外部数据源,以便后续的持久化存储,展示或者其他处理。Sink的实现需要考虑数据的可靠性,一致性以及可能的事务性要求。常见的Sink包括数据写入文件,将数据写入数据,将数据写入到消息队列等。

外部链接器在Flink中的作用非常关键,它们使得Flink作业可以与各种不同类型的数据源和数据目的地的进行交换,实现了数据的流入和流出。这种灵活的连接机制使得Flink在处理大数据时能够更好地集成已有的系统和数据,实现复杂的数据流处理和分析任务。

Source

Flink在批处理中常见的Source主要有两大类:

①、基于本地集合的source  ②、基于文件的source

基于本地集合的source

在flink中最常见的创建本地集合的DataSet方式有三种:

A:使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

B:使用env.fromCollection(),这种方式支持多种Collection的具体类型。


public class BatchFromCollection {    public static void main(String[] args) throws Exception {        // 获取flink执行环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        // 0.用element创建DataSet(fromElements)        DataSet<String> ds0 = env.fromElements("spark", "flink");        ds0.print();        // 1.用Tuple创建DataSet(fromElements)        DataSet<Tuple2<Integer, String>> ds1 = env.fromElements(            new Tuple2<>(1, "spark"),            new Tuple2<>(2, "flink")        );        ds1.print();        // 2.用Array创建DataSet        DataSet<String> ds2 = env.fromCollection(new ArrayList<String>() {{            add("spark");            add("flink");        }});        ds2.print();        // 3.用ArrayDeque创建DataSet        DataSet<String> ds3 = env.fromCollection(new ArrayDeque<String>() {{            add("spark");            add("flink");        }});        ds3.print();        // 8.用Stack创建DataSet        DataSet<String> ds8 = env.fromCollection(new Stack<String>() {{            add("spark");            add("flink");        }});        ds8.print();       // 9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)        DataSet<String> ds9 = env.fromCollection(Stream.of("spark", "flink"));        ds9.print();        // 15.用HashMap创建DataSet        DataSet<Tuple2<Integer, String>> ds15 = env.fromCollection(new HashMap<Integer, String>() {{            put(1, "spark");            put(2, "flink");        }}.entrySet());        ds15.print();        // 16.用Range创建DataSet        DataSet<Integer> ds16 = env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()));        ds16.print();        // 17.用generateSequence创建DataSet        DataSet<Long> ds17 = env.generateSequence(1, 9);        ds17.print();    } }

基于文件的Source

Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:

①、读取本地文件数据;②、读取HDFS文件数据;③、读取CSV文件数据;④、读取压缩文件;⑤、遍历目录

下面分别介绍每个数据源的加载方式

读取本地文件


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile {    public static void main(String[] args) throws Exception {        // 使用readTextFile读取本地文件        // 初始化环境        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();        // 加载数据        DataSet<String> datas = environment.readTextFile("data.txt");        // 触发程序执行        datas.print();    } }

读取HDFS文件数据


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile {    public static void main(String[] args) throws Exception {        // 使用readTextFile读取本地文件        // 初始化环境        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();        // 加载数据        DataSet<String> datas = environment.readTextFile("hdfs://node01:8020/README.txt");        // 触发程序执行        datas.print();    } }

读取CSV文件数据


package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.MapFunction; public class BatchFromCsvFile {    public static void main(String[] args) throws Exception {        // 初始化环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        // 用于映射CSV文件的POJO class        public static class Student {            public int id;            public String name;            public Student() {}            public Student(int id, String name) {                this.id = id;                this.name = name;            }            @Override        }      } } public String toString() {     return "Student(" + id + ", " + name + ")"; } // 读取CSV文件DataSet<Student> csvDataSet = env.readCsvFile("./data/input/student.csv") .ignoreFirstLine() .pojoType(Student.class, "id", "name");    csvDataSet.print();    } }

读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputFormat方法,flink可以自动识别并且解压缩,但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩格式
扩展名
并行化
DEFALTE .defkate
no
GZIP
.gz;.gzip
no
Bzip2
.bz2
no
XZ
.xz
no


public class BatchFromCompressFile { public static void main(String[] args) throws Exception { // 初始化环境     ExecutionEnvironment env =      ExecutionEnvironment.getExecutionEnvironment(); // 触发程序执行     result.print();    } }

遍历目录:

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。对于从文件中读取数据,当读取到数个文件夹的时候,嵌套的文件默认是不会读取的,只会读取第一个文件,其他的都会被忽略,所以我们需要使用recursive.file.enumeration进行递归读取。


public class CustomFileInputFormatExample {    public static void main(String[] args) throws Exception {        // 初始化环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 设置目录路径        String directoryPath = "D:\\BaiduNetdiskDownload\\data";        // 自定义输入格式        TextInputFormat format = new TextInputFormat(new Path(directoryPath));        format.setFilePath(directoryPath);                // 使用自定义输入格式读取数据        DataStream<String> text = env.createInput(format);        // 打印结果        text.print();        // 执行作业        env.execute("Custom File Input Format Example");    } }


public class StreamFromKafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","teacher2:9092");        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String> ("mytopic2", new SimpleStringSchema(), properties);        DataStreamSource<String> data = env.addSource(consumer);        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {                for (String word : s.split(" ")) {                    collector.collect(Tuple2.of(word, 1));                }            }        });SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);   result.print();    env.execute();  } }

自定义source


private static class SimpleSource  implements SourceFunction<Tuple2<String, Integer>> {    private int offset = 0;    private boolean isRunning = true;    @Override    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {    while (isRunning) {      Thread.sleep(500);      ctx.collect(new Tuple2<>("" + offset, offset));      offset++;      if (offset == 1000) {       isRunning = false;     }    } }     @Override     public void cancel() {     isRunning = false;    }}

自定义source,从0开始计数,将数据发送到下游在主逻辑中调用这个source


DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource());

sink

flink在批处理中常见的sink

①、基于本地集合的sink;②、基于文件的sink

基于本地集合的sink

目标:基于下列数据,分别进行打印输出,error输出,collect()


(19, "zhangsan", 178.8), (17, "lisi", 168.8), (18, "wangwu", 184.8), (21, "zhaoliu", 164.8)

代码:


public class BatchSinkCollection { public static void main(String[] args) throws Exception { ExecutionEnvironment env =     ExecutionEnvironment.getExecutionEnvironment();     List<Tuple3<Integer, String, Double>> stuData = new ArrayList<>(); stuData.add(new Tuple3<>(19, "zhangsan", 178.8)); stuData.add(new Tuple3<>(17, "lisi", 168.8)); stuData.add(new Tuple3<>(18, "wangwu", 184.8)); stuData.add(new Tuple3<>(21, "zhaoliu", 164.8)); DataSet<Tuple3<Integer, String, Double>> stu =  env.fromCollection(stuData); stu.print(); stu.printToErr(); stu.collect().forEach(System.out::println); env.execute()    } }

基于文件的sink

①、flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。

②、flink支持多种文件的存储格式,包括text文件,csv文件等。

③、writeAsText():TextOutPutFormat:将元素作为字符串写入行,字符串是通过调用每个元素的toString()方法获得。

将数据写入本地文件

目标:基于下列数据,写入到文件中


Map(1 -> "spark", 2 -> "flink")

代码:


public class BatchSinkFile {   public static void main(String[] args) throws Exception {     ExecutionEnvironment env =      ExecutionEnvironment.getExecutionEnvironment();     Map<Integer, String> data1 = new HashMap<>();     data1.put(1, "spark");     data1.put(2, "flink");     DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);     ds1.setParallelism(1)     .writeAsText("test/data1/aa", FileSystem.WriteMode.OVERWRITE)     .setParallelism(1);     env.execute();    } }

将数据写入到HDFS


public class BatchSinkFile {   public static void main(String[] args) throws Exception {   ExecutionEnvironment env =    ExecutionEnvironment.getExecutionEnvironment();   Map<Integer, String> data1 = new HashMap<>();   data1.put(1, "spark");   data1.put(2, "flink");   DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);   ds1.setParallelism(1)   .writeAsText("hdfs://bigdata1:9000/a",    FileSystem.WriteMode.OVERWRITE)   .setParallelism(1);   env.execute();   } }

Flink API

Flink的API层提供了DataStream API和DataSet API,分别用于流式处理和批处理,这两个API允许开发者使用各种操作符合转换来处理数据,包括转换,连接,聚合,窗口等计算任务

在Flink中,根据不同的场景(流处理和批处理),需要设置不同的执行环境,在批处理场景下,需要使用DataSet API,并设置批处理执行环境,在流处理场景下,需要使用DataStream API,并设置流处理执行环境。

以下是在不同场景下设置执行环境的示例代码,分别展示了批处理和流处理的情况,包括scala和java语言

3b4f721c87991a4c4f90cd7c78868684.png

批处理场景-设置DataSet API的批处理执行环境(Java)


import org.apache.flink.api.java.ExecutionEnvironment; public class BatchJobExample {     public static void main(String[] args) throws Exception {      // 创建批处理执行环境    ExecutionEnvironment env =        ExecutionEnvironment.getExecutionEnvironment();      // 在这里添加批处理作业的代码逻辑      // ...   } }

流处理场景-设置DataStream API的流处理执行环境(Java)


public class StreamJobExample {    public static void main(String[] args) throws Exception {       // 创建流处理执行环境       StreamExecutionEnvironment env =          StreamExecutionEnvironment.getExecutionEnvironment();      // 在这里添加流处理作业的代码逻辑      // ...      // 执行作业      env.execute("Stream Job Example");   } }

批处理场景-设置DataSet API的批处理执行环境(Scala)


import org.apache.flink.api.scala._ object BatchJobExample {   def main(args: Array[String]): Unit = {    // 创建批处理执行环境   val env = ExecutionEnvironment.getExecutionEnvironment   // 在这里添加批处理作业的代码逻辑   // ...   // 执行作业   env.execute("Batch Job Example")   } }

流处理场景-设置DataStream API的批处理执行环境(Scala)


import org.apache.flink.streaming.api.scala._   object StreamJobExample {   def main(args: Array[String]): Unit = {   // 创建流处理执行环境   val env = StreamExecutionEnvironment.getExecutionEnvironment   // 在这里添加流处理作业的代码逻辑   // ...    // 执行作业    env.execute("Stream Job Example")    } }

以下是一些常用的API函数和操作,以表格形式提供:

API类型
常用函数和操作
描述
DataStream API
map,flatM 对数据流中的每个元素进行映射和扁平化操作。

filter
过滤出满足条件的元素

keyBy
按指定的字段或键对数据流进行分区

window
将数据流按照时间窗口或计数窗口划分

reduce,fold
在窗口内对元素进行聚合操作

union
合并多个数据流

connect,coMap,coFlatMap
连接两个不同类型的数据流并应用相应的函数

timeWindow,countWindow
定义时间窗口或计数窗口

process
自定义处理函数,实现更复杂的流处理逻辑。
DataSet API
map,flatMap
对数据流中的每个元素进行映射和扁平化操作

filter
过滤出满足条件的元素

group by
按指定的字段或键对数据集进行分组

reduce,fold 对分组后的数据集进行聚合操作

join,coGroup
对两个数据集进行内连接和外连接操作

cross,cartesian
对两个数据集进行笛卡尔积操作

distinct
去除数据集中的重复数据

groupBy ,aggregate
分组并对分组后的数据集进行聚合操作

first,min,max
获取数据集中的第一个,最小或者最大元素

sum,avg
计算数据集中的元素的和或平均值。

collect
将数据集中的元素收集到本地的集合中

这些API函数和操作涵盖了Flink中流处理和批处理常见操作,可以帮助用户实现各种复杂的数据处理和分析任务,根据实际需求,可以选择适合的API函数和操作来构建Flink作业。下面是一些常见的API的说明:

map

将DataSet中的每一个元素转换为另外一个元素

示例:使用map操作,将以下数据


"1,张三", "2,李四", "3,王五", "4,赵六"

转换为一个scala的样例类

步骤:

①、获取ExecutionEnvironment环境

②、使用fromCollection构建数据源

③、创建一个User样例类

④、使用map操作执行转换

⑤、打印测试


public class User {    public String id;    public String name;    public User() {}    public User(String id, String name) {        this.id = id;        this.name = name;    }    @Override    public String toString() {        return "User(" + id + ", " + name + ")";    }    public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        DataSet<String> textDataSet = env.fromCollection(            Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六")        );        DataSet<User> userDataSet = textDataSet.map(new MapFunction<String, User>() {            @Override            public User map(String text) throws Exception {                String[] fieldArr = text.split(",");                return new User(fieldArr[0], fieldArr[1]);            }        });        userDataSet.print();    } }
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 存储 大数据
Flink 基础详解:大数据处理的强大引擎
Apache Flink 是一个分布式流批一体化的开源平台,专为大规模数据处理设计。它支持实时流处理和批处理,具有高吞吐量、低延迟特性。Flink 提供统一的编程抽象,简化大数据应用开发,并在流处理方面表现卓越,广泛应用于实时监控、金融交易分析等场景。其架构包括 JobManager、TaskManager 和 Client,支持并行度、水位线、时间语义等基础属性。Flink 还提供了丰富的算子、状态管理和容错机制,如检查点和 Savepoint,确保作业的可靠性和一致性。此外,Flink 支持 SQL 查询和 CDC 功能,实现实时数据捕获与同步,广泛应用于数据仓库和实时数据分析领域。
3744 32
|
JavaScript
vue 报错 Cannot find package ‘unplugin-vue-components‘ 解决
vue 报错 Cannot find package ‘unplugin-vue-components‘ 解决
773 0
|
12月前
|
资源调度 流计算 Docker
死磕flink(七)
死磕flink(七)
|
分布式计算 Hadoop 网络安全
Hadoop 集群启动后,从节点的NodeManager没有启动解决
1.slaves节点报错,报的是启动nodemanager 所需内存不足 解决: a: 修改 yarn-site.
5562 0
|
12月前
|
SQL 算法 API
死磕flink(三)
死磕flink(三)
|
11月前
|
缓存 应用服务中间件 PHP
502错误是nginx返回的吗(502错误和nginx有关系吗)
本文详细介绍了Nginx出现502 Bad Gateway错误的原因及解决方法,包括缓冲区错误、Header过大和PHP-CGI进程不足等问题,并提供了增大缓冲区、调整Header大小及增加PHP-CGI进程数量的具体步骤。此外,还解释了502错误的含义及其可能原因,如上游服务器故障、网络故障和配置错误,并给出了检查上游服务器、代理配置及联系网络管理员等多种解决方案。以上内容仅供参考,具体操作需根据实际情况调整。
3361 4
|
安全 网络安全 网络架构
掌握traceroute:网络工程师解决路由问题的利器
【8月更文挑战第22天】`traceroute`是网络工程师的关键工具,用于追踪数据包从源到目的地的路径,帮助诊断网络问题并优化性能。通过向目标发送具有特定生存时间(TTL)值的数据包,`traceroute`能揭示每跳路由器的信息及延迟,便于识别瓶颈与故障。其基本用法为`traceroute [options] hostname/IP`。
381 1
|
流计算
Flink CDC在运行过程中遇到"Could not upload job files"的问题
Flink CDC在运行过程中遇到"Could not upload job files"的问题
395 1
|
12月前
|
设计模式 存储 Java
spring源码设计模式分析(一)
spring源码设计模式分析(一)
|
12月前
|
流计算 Docker 容器
死磕flink(八)
死磕flink(八)

热门文章

最新文章