01 引言
在前面的博客,我们已经对Flink
的程序模型里的Connectors
使用有了一定的了解了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
到此,我们把Flink
批流一体的API
大致学完了,还剩余一些其它API
没有讲,本文来讲解下。
02 累加器
2.1 相关API
Flink累加器:Flink
中的累加器,与Mapreduce counter
的应用场景类似,可以很好地观察task
在运行期间的数据变化,如在Flink job
任务中的算子函数中操作累加器,在任务执行结束之后才能获得累加器的最终结果。
Flink
有以下内置累加器,每个累加器都实现了Accumulator
接口。
- IntCounter
- LongCounter
- DoubleCounter
编码步骤:
1.创建累加器
private IntCounter numLines = new IntCounter(); • 1
2.注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines); • 1
3.使用累加器
this.numLines.add(1); • 1
4.获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines") • 1
2.2 示例代码
/** * 累加器 * * @author : YangLinWei * @createTime: 2022/3/7 5:36 下午 */ public class Accumulator { public static void main(String[] args) throws Exception { //1.env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.Source DataSource<String> dataDS = env.fromElements("aaa", "bbb", "ccc", "ddd"); //3.Transformation MapOperator<String, String> result = dataDS.map(new RichMapFunction<String, String>() { //-1.创建累加器 private IntCounter elementCounter = new IntCounter(); Integer count = 0; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //-2注册累加器 getRuntimeContext().addAccumulator("elementCounter", elementCounter); } @Override public String map(String value) throws Exception { //-3.使用累加器 this.elementCounter.add(1); count += 1; System.out.println("不使用累加器统计的结果:" + count); return value; } }).setParallelism(2); //4.Sink result.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE); //5.execute //-4.获取加强结果 JobExecutionResult jobResult = env.execute(); int nums = jobResult.getAccumulatorResult("elementCounter"); System.out.println("使用累加器统计的结果:" + nums); } }
运行结果:
03 广播变量
3.1 原理
Flink支持广播:可以将数据广播到TaskManager
上就可以供TaskManager
中的SubTask/task
去使用,数据存储到内存中,这样可以减少大量的shuffle
操作,而不需要多次传递给集群节点;
比如:在数据
join
阶段,不可避免的就是大量的shuffle
操作,我们可以把其中一个dataSet
广播出去,一直加载到taskManager
的内存中,可以直接在内存中拿数据,避免了大量的shuffle
,导致集群性能下降;
图解:
- 可以理解广播就是一个公共的共享变量
- 将一个数据集广播后,不同的
Task
都可以在节点上获取到 - 每个节点只存一份
- 如果不使用广播,每一个
Task
都会拷贝一份数据集,造成内存资源浪费
注意:
- 广播变量是要把
dataset
广播到内存中,所以广播的数据量不能太大,否则会出现OOM
; - 广播变量的值不可修改,这样才能确保每个节点获取到的值都是一致的。
编码步骤:
- 广播数据:
.withBroadcastSet(DataSet, "name");
- 获取广播的数据:
Collection<> broadcastSet = getRuntimeContext().getBroadcastVariable("name");
- 使用广播数据
3.2 示例代码
需求:将studentDS
(学号,姓名)集合广播出去(广播到各个TaskManager
内存中)
然后使用scoreDS(
学号,学科,成绩)和广播数据(学号,姓名)进行关联,得到这样格式的数据:(姓名,学科,成绩)
/** * Broadcast * * @author : YangLinWei * @createTime: 2022/3/7 5:43 下午 */ public class Broadcast { public static void main(String[] args) throws Exception { //1.env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.Source //学生数据集(学号,姓名) DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection( Arrays.asList(Tuple2.of(1, "张三"), Tuple2.of(2, "李四"), Tuple2.of(3, "王五")) ); //成绩数据集(学号,学科,成绩) DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection( Arrays.asList(Tuple3.of(1, "语文", 50), Tuple3.of(2, "数学", 70), Tuple3.of(3, "英文", 86)) ); //3.Transformation //将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中) //然后使用scoreDS(学号,学科,成绩)和广播数据(学号,姓名)进行关联,得到这样格式的数据:(姓名,学科,成绩) MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map( new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() { //定义一集合用来存储(学号,姓名) Map<Integer, String> studentMap = new HashMap<>(); //open方法一般用来初始化资源,每个subtask任务只被调用一次 @Override public void open(Configuration parameters) throws Exception { //-2.获取广播数据 List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentInfo"); for (Tuple2<Integer, String> tuple : studentList) { studentMap.put(tuple.f0, tuple.f1); } //studentMap = studentList.stream().collect(Collectors.toMap(t -> t.f0, t -> t.f1)); } @Override public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception { //-3.使用广播数据 Integer stuID = value.f0; String stuName = studentMap.getOrDefault(stuID, ""); //返回(姓名,学科,成绩) return Tuple3.of(stuName, value.f1, value.f2); } //-1.广播数据到各个TaskManager }).withBroadcastSet(studentDS, "studentInfo"); //4.Sink result.print(); } }
运行结果:
04 分布式缓存
4.1 原理
Flink
提供了一个类似于Hadoop
的分布式缓存,让并行运行实例的函数可以在本地访问,这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等。
注意 :广播变量是将变量分发到各个TaskManager
节点的内存上,分布式缓存是将文件缓存到各个TaskManager
节点上;
编码步骤:
- 注册一个分布式缓存文件:`env.registerCachedFile(“hdfs:///path/file”, “cachefilename”)
- 访问分布式缓存文件中的数据:
File myFile = getRuntimeContext().getDistributedCache().getFile("cachefilename");
- 使用
4.2 示例代码
需求:将scoreDS
(学号, 学科, 成绩)中的数据和分布式缓存中的数据(学号,姓名)关联,得到这样格式的数据: (学生姓名,学科,成绩)
/** * DistributedCache * * @author : YangLinWei * @createTime: 2022/3/7 5:49 下午 */ public class DistributedCache { public static void main(String[] args) throws Exception { //1.env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.Source //注意:先将本地资料中的distribute_cache_student文件上传到HDFS //-1.注册分布式缓存文件 //env.registerCachedFile("hdfs://node01:8020/distribute_cache_student", "studentFile"); env.registerCachedFile("data/input/distribute_cache_student", "studentFile"); //成绩数据集(学号,学科,成绩) DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection( Arrays.asList(Tuple3.of(1, "语文", 50), Tuple3.of(2, "数学", 70), Tuple3.of(3, "英文", 86)) ); //3.Transformation //将scoreDS(学号, 学科, 成绩)中的数据和分布式缓存中的数据(学号,姓名)关联,得到这样格式的数据: (学生姓名,学科,成绩) MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map( new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() { //定义一集合用来存储(学号,姓名) Map<Integer, String> studentMap = new HashMap<>(); @Override public void open(Configuration parameters) throws Exception { //-2.加载分布式缓存文件 File file = getRuntimeContext().getDistributedCache().getFile("studentFile"); List<String> studentList = FileUtils.readLines(file); for (String str : studentList) { String[] arr = str.split(","); studentMap.put(Integer.parseInt(arr[0]), arr[1]); } } @Override public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception { //-3.使用分布式缓存文件中的数据 Integer stuID = value.f0; String stuName = studentMap.getOrDefault(stuID, ""); //返回(姓名,学科,成绩) return Tuple3.of(stuName, value.f1, value.f2); } }); //4.Sink result.print(); } }
05 文末
本文主要讲解了Flink
批流一体的其它API
,即累加器、广播和分布式缓存,谢谢大家的阅读,本文完!