Flink之DataSet语义注释、广播变量、分布式缓存及参数传递

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)语义注释


在 Flink批量数据处理过程中,往往传入函数的对象中可能含有很多字段,其中有些字段是Function计算会用到的,但有些字段在进入 Funciton后并没有参与到实际计算过程中。针对这种情况,Fink提出了语义注解的功能,将这些字段在Function中通过注解的形式标记出来,区分出哪些是需要参与函数计算的字段,哪些是直接输出的字段。Flink Runtime在执行算子过程中,会对注解的字段进行判别,对于不需要函数处理的字段直接转发到Output对象中,以减少数据传输过程中所消耗的网络10或者不必要的排序操作等,以提升整体应用的处理效率。语义标注目的是优化我们的程序


Forwarded Fileds注解

转发字段(Forwarded Fileds)代表数据从Function进入后,对指定为Forwarded的Fileds不进行修改,且不参与函数的计算逻辑,而是根据设定的规则表达式,将Fields直接推送到Output对象中的相同位置或者指定位置上。

package com.aikfk.flink.dataset.semantic;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/9 9:22 下午
 */
public class ForwardedFields {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<String,String,Integer>> csvSource = env
                .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .types(String.class,String.class,Integer.class);
        csvSource.map(new MyMapFunction()).print();
    }
    @FunctionAnnotation.ForwardedFields("f0 -> f3")
    public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple4<String,String,Integer,String>>{
        @Override
        public Tuple4<String, String, Integer, String> map(
                Tuple3<String, String, Integer> tuple3) throws Exception {
            return new Tuple4<>(tuple3.f0, tuple3.f1, tuple3.f2,tuple3.f0);
        }
    }
}

运行结果:

(200,jone,6700,200)
(100,lili,8000,100)
(100,alex,15000,100)
(300,cherry,12000,300)
(100,jack,34000,100)
(200,tony,5000,200)
(400,lucy,7800,400)

Non-Forwarded Fileds注解

和前面提到的Forwarded Fileds相反,Non-Forwarded Fileds用于指定不转发的字段,也就是说除了某些字段不转发在输出Tuple相应的位置上,其余字段全部放置在输出Tuple中相同的位置上,对于被Non-Forwarded Fileds指定的字段将必须参与到函数计算过程中,并产生新的结果进行输出。

package com.aikfk.flink.dataset.semantic;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/9 9:22 下午
 */
public class NonForwardedFields {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<String,String,Integer>> csvSource = env
                .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .types(String.class,String.class,Integer.class);
        csvSource.map(new MyMapFunction()).print();
    }
    @FunctionAnnotation.NonForwardedFields("f2")
    public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple3<String,String,Integer>>{
        @Override
        public Tuple3<String, String, Integer> map(
                Tuple3<String, String, Integer> tuple3) throws Exception {
            return new Tuple3<>(tuple3.f0, tuple3.f1, tuple3.f2 * 2);
        }
    }
}

运行结果:

(200,tony,10000)
(300,cherry,24000)
(100,lili,16000)
(100,jack,68000)
(200,jone,13400)
(100,alex,30000)
(400,lucy,15600)

Read Fileds注解

读取字段(Read Fileds)注解用来指定Function中需要读取以及参与函数计算的字段,在注解中被指定的字段将全部参与当前函数结果的运算过程,如条件判断、数值计算等。

package com.aikfk.flink.dataset.semantic;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/9 9:22 下午
 */
public class ReadFields {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<String,String,Integer>> csvSource = env
                .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .types(String.class,String.class,Integer.class);
        csvSource.map(new MyMapFunction()).print();
    }
    @FunctionAnnotation.ReadFields("f0;f1;f2")
    public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple3<String,String,Integer>>{
        @Override
        public Tuple3<String, String, Integer> map(
                Tuple3<String, String, Integer> tuple3) throws Exception {
            return new Tuple3<>(tuple3.f0, tuple3.f1, tuple3.f2 * 2);
        }
    }
}

运行结果:

(200,tony,10000)
(100,lili,16000)
(100,alex,30000)
(300,cherry,24000)
(200,jone,13400)
(100,jack,68000)
(400,lucy,15600)


(2)广播变量


广播变量是分布式计算框架中经常会用到的一种数据共享方式,目的是对小数据集采用网络传输的方式,在每个并行的计算节点的实例内存中存储一份该数据集,所在的计算节点实例均可以在本地内存中直接读取被广播的数据集,这样能够避免在数据计算过程中多次通过远程的方式从其他节点中读取小数据集,从而提升整体任务的计算性能


注意:


广播出去的变量是存在于每个节点的内存中的,所以,这个数据量不宜太大,因为广播出去的数据会常驻内存,除非程序停止。

广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

建议在数据集大几十兆或者几百兆的大小的时候进行广播,如果数据上G,就不建议采用广播变量。


广播: 广播变量通过withBroadcastSet(DataSet, String)方法进行注册。

访问: 可通过getRuntimeContext().getBroadcastVariable(String)对变量进行访问。

package com.aikfk.flink.dataset.broadcast;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import java.util.List;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 2:04 下午
 */
public class BroadcastExample {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        DataSet<EmployeePOJO> broadcastDataSet = env.fromElements(new EmployeePOJO("100","alex",12000));
        /**
         * EmployeePOJO -> filter() -> 过滤出广播变量
         */
        DataSet<EmployeePOJO> filterSource =  dataSource.filter(new RichFilterFunction<EmployeePOJO>() {
            List<EmployeePOJO> broadList = null;
            @Override
            public boolean filter(EmployeePOJO employeePOJO) throws Exception {
                for (EmployeePOJO broad : broadList){
                    return ! employeePOJO.name.equals(broad.name);
                }
                return false;
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // 对广播变量进行访问
                broadList = getRuntimeContext().getBroadcastVariable("broadcastDataSet");
            }
            // 注册广播变量
        }).withBroadcastSet(broadcastDataSet,"broadcastDataSet");
        filterSource.print();
    }
}

运行结果:

EmployeePOJO{deptId='200', name='tony', salary=5000}
EmployeePOJO{deptId='300', name='cherry', salary=12000}
EmployeePOJO{deptId='400', name='lucy', salary=7800}
EmployeePOJO{deptId='200', name='jone', salary=6700}
EmployeePOJO{deptId='100', name='jack', salary=34000}
EmployeePOJO{deptId='100', name='lili', salary=8000}

(3)分布式缓存


在批计算中,需要处理的数据集大部分来自于文件,对于某些文件尽管是放在类于HDFS之上的分布式文件系统中,但由于Flink并不像MapReduce一样让计算随着数据所在位置上进行,因此多数情况下会出现通过网络频繁地复制文件的情况。因此对于有些高频使用的文件可以通过分布式缓存的方式,将其放置在每台计算节点实例的本地Task内存中,这样就能够避免因为读取某些文件而必须通过网络远程获取文件的情况进而提升整个任务的执行效率。

分存式缓存在 Execution Environment中直接注册文件或文件夹,Flink在启动任务的程中将会把指定的文件同步到task所在计算节点的本地文件系统中,目前支持本地文件、HDFS、S3等文件系统,另外可以通过 Boolean参数来指定文件是否可执行。


分布式缓存的用法 一 注册缓存文件:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

分布式缓存的用法 一 访问缓存文件:

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {
    @Override
    public void open(Configuration config) {
      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }
    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}


(4)参数传递


(1)通过构造函数传递

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
toFilter.filter(new MyFilter(2));
private static class MyFilter implements FilterFunction<Integer> {
  private final int limit;
  public MyFilter(int limit) {
    this.limit = limit;
  }
  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}

(2)通过withParameters(Configuration)函数传递

package com.aikfk.flink.dataset.parameters;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 2:04 下午
 */
public class PassingParameters {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        // 设置参数
        Configuration config = new Configuration();
        config.setInteger("limit", 10000);
        /**
         * EmployeePOJO -> filter()
         */
        DataSet<EmployeePOJO> filterSource =  dataSource.filter(new RichFilterFunction<EmployeePOJO>() {
            public int limit;
            @Override
            public boolean filter(EmployeePOJO employeePOJO) throws Exception {
                // 过滤出比参数1000大的值
                return employeePOJO.salary > limit;
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                limit = parameters.getInteger("limit",0);
            }
            // 通过withParameters(config)传入参数
        }).withParameters(config);
        filterSource.print();
    }
}

运行结果:

EmployeePOJO{deptId='300', name='cherry', salary=12000}
EmployeePOJO{deptId='100', name='jack', salary=34000}
EmployeePOJO{deptId='100', name='alex', salary=15000}

(3)通过全局自定义配置


Flink允许将自定义配置值传递到ExecutionConfig,以供自定义配置全局可用。


注意:

GlobalConfig与params的区别在于,后者只是在局部代码中设置的Config,并没有影响到env,所以在代码的后面要加上withParameters (config),但是GlobalConfig不同,它改变了env的环境里面的config。所以,在代码后面,不需要显式的指定加上config.

package com.aikfk.flink.dataset.parameters;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 2:04 下午
 */
public class GlobalParameters {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        // 设置全局参数
        Configuration config = new Configuration();
        config.setInteger("limit", 10000);
        env.getConfig().setGlobalJobParameters(config);
        /**
         * EmployeePOJO -> filter()
         */
        DataSet<EmployeePOJO> filterSource =  dataSource.filter(new RichFilterFunction<EmployeePOJO>() {
            public int limit;
            @Override
            public boolean filter(EmployeePOJO employeePOJO) throws Exception {
                // 过滤出比参数1000大的值
                return employeePOJO.salary > limit;
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // 传入全局参数
                ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
                Configuration globConf = (Configuration) globalParams;
                limit = globConf.getInteger("limit", 0);
            }
        });
        filterSource.print();
    }
}

运行结果:

EmployeePOJO{deptId='300', name='cherry', salary=12000}
EmployeePOJO{deptId='100', name='jack', salary=34000}
EmployeePOJO{deptId='100', name='alex', salary=15000}




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
数据处理 Apache 流计算
Flink Watermark和时间语义
Flink Watermark和时间语义
27 2
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
4天前
|
Apache 流计算
【Flink】Flink的三种时间语义
【4月更文挑战第19天】【Flink】Flink的三种时间语义
|
6月前
|
程序员 API 数据安全/隐私保护
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
|
1天前
|
存储 NoSQL 分布式数据库
【Flink】Flink分布式快照的原理是什么?
【4月更文挑战第21天】【Flink】Flink分布式快照的原理是什么?
|
1月前
|
SQL 关系型数据库 MySQL
Flink CDC产品常见问题之读分布式mysql报连接超时如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
SQL 分布式计算 HIVE
基于 Kyuubi 实现分布式 Flink SQL 网关
本文整理自网易互娱资深开发工程师、Apache Kyuubi Committer 林小铂的《基于 Kyuubi 实现分布式 Flink SQL 网关》分享。
104456 64
基于 Kyuubi 实现分布式 Flink SQL 网关
|
3月前
|
存储 SQL 分布式数据库
OceanBase X Flink 基于原生分布式数据库构建实时计算解决方案
OceanBase X Flink 基于原生分布式数据库构建实时计算解决方案
|
3月前
|
消息中间件 Java Kafka
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
36 0
|
3月前
|
Java 数据处理 分布式数据库
Flink中的Exactly-Once语义是什么?请解释其作用和实现原理。
Flink中的Exactly-Once语义是什么?请解释其作用和实现原理。
25 0

热门文章

最新文章