Flink开发
Flink应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示 :
Source:数据源,Flink在流处理和批处理上的source大概有4类:
①、基于本地集合的Source
②、基于文件的Source
③、基于网络套接字的Source
④、自定义的Source,自定义Source常见的有Apache Kafka,Amazon Kinbesis Streams,RabbitMQ,Twitter Streaming API,Apache NIFI等。当然也可以自定义自己的Source。
Transformation:数据转换的各种操作:有Map/flatMap/filter/KeyBy/Reduce/Fold/Aggregations/Window/WindowAll/Union/Window Join/Split/Select/Project等,操作很多,可以将数据转换计算成你想要的数据。
Sink:接收器,Flink将转换计算后的数据发送的地点,可能需要存储下来,Flink常见的Sink大概有如下几类:
①、写入文件 ②、打印输出 ③、写入Socket ④、自定义的sink,自定义的sink常见的有Apache Kafka,RabbitMQ,MySQL,ElasticSearch,Apache Cassandra,Hadoop FilSystem 等,也可以自定义自己的Sink.
Flink连接器
在实际生产环境中,数据通常分布在各种不同的系统中,包括文件系统,数据库,消息队列等。Flink作为一个大数据处理框架,需要与这些外部系统进行数据交互,以实现数据的输入,处理和输出。在Flink中,Source和Sink是两个关键模块,它们扮演着与外部系统进行数据连接和交互的重要角色,被称为外部连接器(Connector).
①、Source数据源:Source是Flink作业的输入模块,用于从外部系统中读取数据并将其转换为Flink的数据流,Source负责实现与不同数据源的交互逻辑,将外部数据源的数据逐条或批量读取到Flink的数据流中,以便后续的数据处理。常见的Source包括从文件中读取数据,从消息队列如Kafka,RabbitMQ中消费数据,从数据库中读取数据等。
②、Sink数据接收器:Sink是Flink作业的输出模块,用于将Flink计算的结果输出到外部系统中,Sink负责实现将Flink数据流中的数据写入到外部数据源,以便后续的持久化存储,展示或者其他处理。Sink的实现需要考虑数据的可靠性,一致性以及可能的事务性要求。常见的Sink包括数据写入文件,将数据写入数据,将数据写入到消息队列等。
外部链接器在Flink中的作用非常关键,它们使得Flink作业可以与各种不同类型的数据源和数据目的地的进行交换,实现了数据的流入和流出。这种灵活的连接机制使得Flink在处理大数据时能够更好地集成已有的系统和数据,实现复杂的数据流处理和分析任务。
Source
Flink在批处理中常见的Source主要有两大类:
①、基于本地集合的source ②、基于文件的source
基于本地集合的source
在flink中最常见的创建本地集合的DataSet方式有三种:
A:使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
B:使用env.fromCollection(),这种方式支持多种Collection的具体类型。
public class BatchFromCollection { public static void main(String[] args) throws Exception { // 获取flink执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 0.用element创建DataSet(fromElements) DataSet<String> ds0 = env.fromElements("spark", "flink"); ds0.print(); // 1.用Tuple创建DataSet(fromElements) DataSet<Tuple2<Integer, String>> ds1 = env.fromElements( new Tuple2<>(1, "spark"), new Tuple2<>(2, "flink") ); ds1.print(); // 2.用Array创建DataSet DataSet<String> ds2 = env.fromCollection(new ArrayList<String>() {{ add("spark"); add("flink"); }}); ds2.print(); // 3.用ArrayDeque创建DataSet DataSet<String> ds3 = env.fromCollection(new ArrayDeque<String>() {{ add("spark"); add("flink"); }}); ds3.print(); // 8.用Stack创建DataSet DataSet<String> ds8 = env.fromCollection(new Stack<String>() {{ add("spark"); add("flink"); }}); ds8.print(); // 9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合) DataSet<String> ds9 = env.fromCollection(Stream.of("spark", "flink")); ds9.print(); // 15.用HashMap创建DataSet DataSet<Tuple2<Integer, String>> ds15 = env.fromCollection(new HashMap<Integer, String>() {{ put(1, "spark"); put(2, "flink"); }}.entrySet()); ds15.print(); // 16.用Range创建DataSet DataSet<Integer> ds16 = env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList())); ds16.print(); // 17.用generateSequence创建DataSet DataSet<Long> ds17 = env.generateSequence(1, 9); ds17.print(); } }
基于文件的Source
Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:
①、读取本地文件数据;②、读取HDFS文件数据;③、读取CSV文件数据;④、读取压缩文件;⑤、遍历目录
下面分别介绍每个数据源的加载方式
读取本地文件
package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile { public static void main(String[] args) throws Exception { // 使用readTextFile读取本地文件 // 初始化环境 ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); // 加载数据 DataSet<String> datas = environment.readTextFile("data.txt"); // 触发程序执行 datas.print(); } }
读取HDFS文件数据
package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; public class BatchFromFile { public static void main(String[] args) throws Exception { // 使用readTextFile读取本地文件 // 初始化环境 ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); // 加载数据 DataSet<String> datas = environment.readTextFile("hdfs://node01:8020/README.txt"); // 触发程序执行 datas.print(); } }
读取CSV文件数据
package com.demo.batch; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.MapFunction; public class BatchFromCsvFile { public static void main(String[] args) throws Exception { // 初始化环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 用于映射CSV文件的POJO class public static class Student { public int id; public String name; public Student() {} public Student(int id, String name) { this.id = id; this.name = name; } @Override } } } public String toString() { return "Student(" + id + ", " + name + ")"; } // 读取CSV文件DataSet<Student> csvDataSet = env.readCsvFile("./data/input/student.csv") .ignoreFirstLine() .pojoType(Student.class, "id", "name"); csvDataSet.print(); } }
读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputFormat方法,flink可以自动识别并且解压缩,但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
压缩格式 |
扩展名 |
并行化 |
DEFALTE | .defkate |
no |
GZIP |
.gz;.gzip |
no |
Bzip2 |
.bz2 |
no |
XZ |
.xz |
no |
public class BatchFromCompressFile { public static void main(String[] args) throws Exception { // 初始化环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 触发程序执行 result.print(); } }
遍历目录:
flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。对于从文件中读取数据,当读取到数个文件夹的时候,嵌套的文件默认是不会读取的,只会读取第一个文件,其他的都会被忽略,所以我们需要使用recursive.file.enumeration进行递归读取。
public class CustomFileInputFormatExample { public static void main(String[] args) throws Exception { // 初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置目录路径 String directoryPath = "D:\\BaiduNetdiskDownload\\data"; // 自定义输入格式 TextInputFormat format = new TextInputFormat(new Path(directoryPath)); format.setFilePath(directoryPath); // 使用自定义输入格式读取数据 DataStream<String> text = env.createInput(format); // 打印结果 text.print(); // 执行作业 env.execute("Custom File Input Format Example"); } }
public class StreamFromKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers","teacher2:9092"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String> ("mytopic2", new SimpleStringSchema(), properties); DataStreamSource<String> data = env.addSource(consumer); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")) { collector.collect(Tuple2.of(word, 1)); } } });SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1); result.print(); env.execute(); } }
自定义source
private static class SimpleSource implements SourceFunction<Tuple2<String, Integer>> { private int offset = 0; private boolean isRunning = true; @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (isRunning) { Thread.sleep(500); ctx.collect(new Tuple2<>("" + offset, offset)); offset++; if (offset == 1000) { isRunning = false; } } } @Override public void cancel() { isRunning = false; }}
自定义source,从0开始计数,将数据发送到下游在主逻辑中调用这个source
DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource());
sink
flink在批处理中常见的sink
①、基于本地集合的sink;②、基于文件的sink
基于本地集合的sink
目标:基于下列数据,分别进行打印输出,error输出,collect()
(19, "zhangsan", 178.8), (17, "lisi", 168.8), (18, "wangwu", 184.8), (21, "zhaoliu", 164.8)
代码:
public class BatchSinkCollection { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List<Tuple3<Integer, String, Double>> stuData = new ArrayList<>(); stuData.add(new Tuple3<>(19, "zhangsan", 178.8)); stuData.add(new Tuple3<>(17, "lisi", 168.8)); stuData.add(new Tuple3<>(18, "wangwu", 184.8)); stuData.add(new Tuple3<>(21, "zhaoliu", 164.8)); DataSet<Tuple3<Integer, String, Double>> stu = env.fromCollection(stuData); stu.print(); stu.printToErr(); stu.collect().forEach(System.out::println); env.execute() } }
基于文件的sink
①、flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。
②、flink支持多种文件的存储格式,包括text文件,csv文件等。
③、writeAsText():TextOutPutFormat:将元素作为字符串写入行,字符串是通过调用每个元素的toString()方法获得。
将数据写入本地文件
目标:基于下列数据,写入到文件中
Map(1 -> "spark", 2 -> "flink")
代码:
public class BatchSinkFile { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Map<Integer, String> data1 = new HashMap<>(); data1.put(1, "spark"); data1.put(2, "flink"); DataSet<Map<Integer, String>> ds1 = env.fromElements(data1); ds1.setParallelism(1) .writeAsText("test/data1/aa", FileSystem.WriteMode.OVERWRITE) .setParallelism(1); env.execute(); } }
将数据写入到HDFS
public class BatchSinkFile { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Map<Integer, String> data1 = new HashMap<>(); data1.put(1, "spark"); data1.put(2, "flink"); DataSet<Map<Integer, String>> ds1 = env.fromElements(data1); ds1.setParallelism(1) .writeAsText("hdfs://bigdata1:9000/a", FileSystem.WriteMode.OVERWRITE) .setParallelism(1); env.execute(); } }
Flink API
Flink的API层提供了DataStream API和DataSet API,分别用于流式处理和批处理,这两个API允许开发者使用各种操作符合转换来处理数据,包括转换,连接,聚合,窗口等计算任务
在Flink中,根据不同的场景(流处理和批处理),需要设置不同的执行环境,在批处理场景下,需要使用DataSet API,并设置批处理执行环境,在流处理场景下,需要使用DataStream API,并设置流处理执行环境。
以下是在不同场景下设置执行环境的示例代码,分别展示了批处理和流处理的情况,包括scala和java语言
批处理场景-设置DataSet API的批处理执行环境(Java)
import org.apache.flink.api.java.ExecutionEnvironment; public class BatchJobExample { public static void main(String[] args) throws Exception { // 创建批处理执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 在这里添加批处理作业的代码逻辑 // ... } }
流处理场景-设置DataStream API的流处理执行环境(Java)
public class StreamJobExample { public static void main(String[] args) throws Exception { // 创建流处理执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 在这里添加流处理作业的代码逻辑 // ... // 执行作业 env.execute("Stream Job Example"); } }
批处理场景-设置DataSet API的批处理执行环境(Scala)
import org.apache.flink.api.scala._ object BatchJobExample { def main(args: Array[String]): Unit = { // 创建批处理执行环境 val env = ExecutionEnvironment.getExecutionEnvironment // 在这里添加批处理作业的代码逻辑 // ... // 执行作业 env.execute("Batch Job Example") } }
流处理场景-设置DataStream API的批处理执行环境(Scala)
import org.apache.flink.streaming.api.scala._ object StreamJobExample { def main(args: Array[String]): Unit = { // 创建流处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 在这里添加流处理作业的代码逻辑 // ... // 执行作业 env.execute("Stream Job Example") } }
以下是一些常用的API函数和操作,以表格形式提供:
API类型 |
常用函数和操作 |
描述 |
DataStream API |
map,flatM | 对数据流中的每个元素进行映射和扁平化操作。 |
filter |
过滤出满足条件的元素 |
|
keyBy |
按指定的字段或键对数据流进行分区 |
|
window |
将数据流按照时间窗口或计数窗口划分 |
|
reduce,fold |
在窗口内对元素进行聚合操作 |
|
union |
合并多个数据流 |
|
connect,coMap,coFlatMap |
连接两个不同类型的数据流并应用相应的函数 |
|
timeWindow,countWindow |
定义时间窗口或计数窗口 |
|
process |
自定义处理函数,实现更复杂的流处理逻辑。 |
DataSet API |
map,flatMap |
对数据流中的每个元素进行映射和扁平化操作 |
filter |
过滤出满足条件的元素 | |
group by |
按指定的字段或键对数据集进行分组 | |
reduce,fold | 对分组后的数据集进行聚合操作 |
|
join,coGroup |
对两个数据集进行内连接和外连接操作 |
|
cross,cartesian |
对两个数据集进行笛卡尔积操作 |
|
distinct |
去除数据集中的重复数据 |
|
groupBy ,aggregate |
分组并对分组后的数据集进行聚合操作 |
|
first,min,max |
获取数据集中的第一个,最小或者最大元素 |
|
sum,avg |
计算数据集中的元素的和或平均值。 |
collect |
将数据集中的元素收集到本地的集合中 |
这些API函数和操作涵盖了Flink中流处理和批处理常见操作,可以帮助用户实现各种复杂的数据处理和分析任务,根据实际需求,可以选择适合的API函数和操作来构建Flink作业。下面是一些常见的API的说明:
map
将DataSet中的每一个元素转换为另外一个元素
示例:使用map操作,将以下数据
"1,张三", "2,李四", "3,王五", "4,赵六"
转换为一个scala的样例类
步骤:
①、获取ExecutionEnvironment环境
②、使用fromCollection构建数据源
③、创建一个User样例类
④、使用map操作执行转换
⑤、打印测试
public class User { public String id; public String name; public User() {} public User(String id, String name) { this.id = id; this.name = name; } @Override public String toString() { return "User(" + id + ", " + name + ")"; } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> textDataSet = env.fromCollection( Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六") ); DataSet<User> userDataSet = textDataSet.map(new MapFunction<String, User>() { @Override public User map(String text) throws Exception { String[] fieldArr = text.split(","); return new User(fieldArr[0], fieldArr[1]); } }); userDataSet.print(); } }