Flink教程(10)- Flink批流一体API(其它)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(10)- Flink批流一体API(其它)

01 引言

在前面的博客,我们已经对Flink的程序模型里的Connectors使用有了一定的了解了,有兴趣的同学可以参阅下:

到此,我们把Flink批流一体的API大致学完了,还剩余一些其它API没有讲,本文来讲解下。

02 累加器

2.1 相关API

Flink累加器Flink中的累加器,与Mapreduce counter的应用场景类似,可以很好地观察task在运行期间的数据变化,如在Flink job任务中的算子函数中操作累加器,在任务执行结束之后才能获得累加器的最终结果。

Flink有以下内置累加器,每个累加器都实现了Accumulator接口。

  • IntCounter
  • LongCounter
  • DoubleCounter

编码步骤:

1.创建累加器

private IntCounter numLines = new IntCounter();
• 1

2.注册累加器

getRuntimeContext().addAccumulator("num-lines", this.numLines);
• 1

3.使用累加器

this.numLines.add(1);
• 1

4.获取累加器的结果

myJobExecutionResult.getAccumulatorResult("num-lines")
• 1

2.2 示例代码

/**
 * 累加器
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:36 下午
 */
public class Accumulator {
    public static void main(String[] args) throws Exception {
        //1.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataSource<String> dataDS = env.fromElements("aaa", "bbb", "ccc", "ddd");
        //3.Transformation
        MapOperator<String, String> result = dataDS.map(new RichMapFunction<String, String>() {
            //-1.创建累加器
            private IntCounter elementCounter = new IntCounter();
            Integer count = 0;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //-2注册累加器
                getRuntimeContext().addAccumulator("elementCounter", elementCounter);
            }
            @Override
            public String map(String value) throws Exception {
                //-3.使用累加器
                this.elementCounter.add(1);
                count += 1;
                System.out.println("不使用累加器统计的结果:" + count);
                return value;
            }
        }).setParallelism(2);
        //4.Sink
        result.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);
        //5.execute
        //-4.获取加强结果
        JobExecutionResult jobResult = env.execute();
        int nums = jobResult.getAccumulatorResult("elementCounter");
        System.out.println("使用累加器统计的结果:" + nums);
    }
}

运行结果:

03 广播变量

3.1 原理

Flink支持广播:可以将数据广播到TaskManager上就可以供TaskManager中的SubTask/task去使用,数据存储到内存中,这样可以减少大量的shuffle操作,而不需要多次传递给集群节点;

比如:在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;

图解:

  • 可以理解广播就是一个公共的共享变量
  • 将一个数据集广播后,不同的Task都可以在节点上获取到
  • 每个节点只存一份
  • 如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费

注意:

  • 广播变量是要把dataset广播到内存中,所以广播的数据量不能太大,否则会出现OOM
  • 广播变量的值不可修改,这样才能确保每个节点获取到的值都是一致的。

编码步骤:

  1. 广播数据:.withBroadcastSet(DataSet, "name");
  2. 获取广播的数据:Collection<> broadcastSet = getRuntimeContext().getBroadcastVariable("name");
  3. 使用广播数据

3.2 示例代码

需求:将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)

然后使用scoreDS(学号,学科,成绩)和广播数据(学号,姓名)进行关联,得到这样格式的数据:(姓名,学科,成绩)

/**
 * Broadcast
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:43 下午
 */
public class Broadcast {
    public static void main(String[] args) throws Exception {
        //1.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //学生数据集(学号,姓名)
        DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(
                Arrays.asList(Tuple2.of(1, "张三"), Tuple2.of(2, "李四"), Tuple2.of(3, "王五"))
        );
        //成绩数据集(学号,学科,成绩)
        DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
                Arrays.asList(Tuple3.of(1, "语文", 50), Tuple3.of(2, "数学", 70), Tuple3.of(3, "英文", 86))
        );
        //3.Transformation
        //将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)
        //然后使用scoreDS(学号,学科,成绩)和广播数据(学号,姓名)进行关联,得到这样格式的数据:(姓名,学科,成绩)
        MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map(
                new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
                    //定义一集合用来存储(学号,姓名)
                    Map<Integer, String> studentMap = new HashMap<>();
                    //open方法一般用来初始化资源,每个subtask任务只被调用一次
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //-2.获取广播数据
                        List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentInfo");
                        for (Tuple2<Integer, String> tuple : studentList) {
                            studentMap.put(tuple.f0, tuple.f1);
                        }
                        //studentMap = studentList.stream().collect(Collectors.toMap(t -> t.f0, t -> t.f1));
                    }
                    @Override
                    public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
                        //-3.使用广播数据
                        Integer stuID = value.f0;
                        String stuName = studentMap.getOrDefault(stuID, "");
                        //返回(姓名,学科,成绩)
                        return Tuple3.of(stuName, value.f1, value.f2);
                    }
                    //-1.广播数据到各个TaskManager
                }).withBroadcastSet(studentDS, "studentInfo");
        //4.Sink
        result.print();
    }
}

运行结果:

04 分布式缓存

4.1 原理

Flink提供了一个类似于Hadoop分布式缓存,让并行运行实例的函数可以在本地访问,这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等。

注意 :广播变量是将变量分发到各个TaskManager节点的内存上,分布式缓存是将文件缓存到各个TaskManager节点上;

编码步骤:

  1. 注册一个分布式缓存文件:`env.registerCachedFile(“hdfs:///path/file”, “cachefilename”)
  2. 访问分布式缓存文件中的数据:File myFile = getRuntimeContext().getDistributedCache().getFile("cachefilename");
  3. 使用

4.2 示例代码

需求:将scoreDS(学号, 学科, 成绩)中的数据和分布式缓存中的数据(学号,姓名)关联,得到这样格式的数据: (学生姓名,学科,成绩)

/**
 * DistributedCache
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:49 下午
 */
public class DistributedCache {
    public static void main(String[] args) throws Exception {
        //1.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //注意:先将本地资料中的distribute_cache_student文件上传到HDFS
        //-1.注册分布式缓存文件
        //env.registerCachedFile("hdfs://node01:8020/distribute_cache_student", "studentFile");
        env.registerCachedFile("data/input/distribute_cache_student", "studentFile");
        //成绩数据集(学号,学科,成绩)
        DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
                Arrays.asList(Tuple3.of(1, "语文", 50), Tuple3.of(2, "数学", 70), Tuple3.of(3, "英文", 86))
        );
        //3.Transformation
        //将scoreDS(学号, 学科, 成绩)中的数据和分布式缓存中的数据(学号,姓名)关联,得到这样格式的数据: (学生姓名,学科,成绩)
        MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map(
                new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
                    //定义一集合用来存储(学号,姓名)
                    Map<Integer, String> studentMap = new HashMap<>();
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //-2.加载分布式缓存文件
                        File file = getRuntimeContext().getDistributedCache().getFile("studentFile");
                        List<String> studentList = FileUtils.readLines(file);
                        for (String str : studentList) {
                            String[] arr = str.split(",");
                            studentMap.put(Integer.parseInt(arr[0]), arr[1]);
                        }
                    }
                    @Override
                    public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
                        //-3.使用分布式缓存文件中的数据
                        Integer stuID = value.f0;
                        String stuName = studentMap.getOrDefault(stuID, "");
                        //返回(姓名,学科,成绩)
                        return Tuple3.of(stuName, value.f1, value.f2);
                    }
                });
        //4.Sink
        result.print();
    }
}

05 文末

本文主要讲解了Flink批流一体的其它API,即累加器、广播和分布式缓存,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7天前
|
API
车牌号归属地查询免费API接口教程
本接口用于根据车牌号查询社会车辆的归属地,不支持军车、使馆等特殊车牌。请求地址为 `https://cn.apihz.cn/api/other/chepai.php`,支持 POST 和 GET 请求。请求参数包括 `id`、`key` 和 `words`,返回数据包含车牌归属地信息。示例请求:`https://cn.apihz.cn/api/other/chepai.php?id=88888888&key=88888888&words=川B1234`。
41 21
|
5天前
|
API
获取网页重定向地址免费API接口教程
该API用于获取网页重定向跳转后的最终地址。请求地址为`https://cn.apihz.cn/api/wangzhan/tiaozhuan.php`,支持POST或GET方式。请求参数包括`id`、`key`和`url`,返回数据包含状态码`code`和最终URL`url`。示例返回:`{&quot;code&quot;:200,&quot;url&quot;:&quot;https://www.baidu.com/&quot;}`。
47 29
|
10天前
|
API
将秒数转换为时间免费API接口教程
该API用于将指定秒数转换为年、日、时、分、秒。支持指定转换类型。请求地址为 `https://cn.apihz.cn/api/time/stime.php`,需提供ID、密钥、类型和秒数参数。返回结果包含转换后的年、日、时、分、秒等信息。示例请求:`https://cn.apihz.cn/api/time/stime.php?id=88888888&key=88888888&type=1&s=123456`。更多详情见 [文档](https://www.apihz.cn/api/timestime.html)。
将秒数转换为时间免费API接口教程
|
6天前
|
网络协议 API
检测指定TCP端口开放状态免费API接口教程
该API用于检测目标主机指定TCP端口是否开放,适用于检测连通状态等场景。支持指定大陆、美国、香港等检测节点。请求地址为 `https://cn.apihz.cn/api/wangzhan/port.php`,支持POST和GET请求方式。请求参数包括 `id`、`key`、`type`、`host` 和 `port`。返回参数包含检测结果和状态码。示例请求:`https://cn.apihz.cn/api/wangzhan/port.php?id=88888888&key=88888888&type=1&host=49.234.56.78&port=80`。
|
5天前
|
API 数据安全/隐私保护
抖音视频,图集无水印直链解析免费API接口教程
该接口用于解析抖音视频和图集的无水印直链地址。请求地址为 `https://cn.apihz.cn/api/fun/douyin.php`,支持POST或GET请求。请求参数包括用户ID、用户KEY和视频或图集地址。返回参数包括状态码、信息提示、作者昵称、标题、视频地址、封面、图集和类型。示例请求和返回数据详见文档。
|
10天前
|
API
图片压缩+格式转换免费API接口教程
这是一个免费的图片压缩和格式转换API接口,支持GET和POST请求。请求地址为 `https://cn.apihz.cn/api/img/yasuo.php`,需提供 `id`、`key`、`img` 等参数。返回数据包含处理后的图片URL和其他相关信息。更多详情请参考:https://www.apihz.cn/api/imgyasuo.html
|
9天前
|
API
天气预报-腾讯天气-7天-IP查询版免费API接口教程
根据IP地址自动查询该IP归属地7天天气预报的腾讯天气API。请求地址为`https://cn.apihz.cn/api/tianqi/tengxunip.php`,支持GET和POST请求。需提供ID、Key和IP地址作为参数。返回数据包含天气预报信息。
|
8天前
|
前端开发 JavaScript API
取网页纯文本内容免费API接口教程
该API用于获取指定网页的纯文本内容,去除HTML标签、CSS和JS等元素。支持POST和GET请求,需提供ID、Key、URL等参数。请求示例:https://cn.apihz.cn/api/wangzhan/getyuan.php?id=88888888&key=88888888&url=www.apihz.cn&dy=1。返回纯文本数据。
|
1月前
|
API 微服务
Traefik 微服务 API 网关教程(全)
Traefik 微服务 API 网关教程(全)
|
2月前
|
存储 JSON API
实战派教程!Python Web开发中RESTful API的设计哲学与实现技巧,一网打尽!
在数字化时代,Web API成为连接前后端及构建复杂应用的关键。RESTful API因简洁直观而广受欢迎。本文通过实战案例,介绍Python Web开发中的RESTful API设计哲学与技巧,包括使用Flask框架构建一个图书管理系统的API,涵盖资源定义、请求响应设计及实现示例。通过准确使用HTTP状态码、版本控制、错误处理及文档化等技巧,帮助你深入理解RESTful API的设计与实现。希望本文能助力你的API设计之旅。
65 3

热门文章

最新文章