(1)语义注释
在 Flink批量数据处理过程中,往往传入函数的对象中可能含有很多字段,其中有些字段是Function计算会用到的,但有些字段在进入 Funciton后并没有参与到实际计算过程中。针对这种情况,Fink提出了语义注解的功能,将这些字段在Function中通过注解的形式标记出来,区分出哪些是需要参与函数计算的字段,哪些是直接输出的字段。Flink Runtime在执行算子过程中,会对注解的字段进行判别,对于不需要函数处理的字段直接转发到Output对象中,以减少数据传输过程中所消耗的网络10或者不必要的排序操作等,以提升整体应用的处理效率。语义标注目的是优化我们的程序
Forwarded Fileds注解
转发字段(Forwarded Fileds)代表数据从Function进入后,对指定为Forwarded的Fileds不进行修改,且不参与函数的计算逻辑,而是根据设定的规则表达式,将Fields直接推送到Output对象中的相同位置或者指定位置上。
package com.aikfk.flink.dataset.semantic; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; /** * @author :caizhengjie * @description:TODO * @date :2021/3/9 9:22 下午 */ public class ForwardedFields { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<String,String,Integer>> csvSource = env .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .types(String.class,String.class,Integer.class); csvSource.map(new MyMapFunction()).print(); } @FunctionAnnotation.ForwardedFields("f0 -> f3") public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple4<String,String,Integer,String>>{ @Override public Tuple4<String, String, Integer, String> map( Tuple3<String, String, Integer> tuple3) throws Exception { return new Tuple4<>(tuple3.f0, tuple3.f1, tuple3.f2,tuple3.f0); } } }
运行结果:
(200,jone,6700,200) (100,lili,8000,100) (100,alex,15000,100) (300,cherry,12000,300) (100,jack,34000,100) (200,tony,5000,200) (400,lucy,7800,400)
Non-Forwarded Fileds注解
和前面提到的Forwarded Fileds相反,Non-Forwarded Fileds用于指定不转发的字段,也就是说除了某些字段不转发在输出Tuple相应的位置上,其余字段全部放置在输出Tuple中相同的位置上,对于被Non-Forwarded Fileds指定的字段将必须参与到函数计算过程中,并产生新的结果进行输出。
package com.aikfk.flink.dataset.semantic; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple3; /** * @author :caizhengjie * @description:TODO * @date :2021/3/9 9:22 下午 */ public class NonForwardedFields { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<String,String,Integer>> csvSource = env .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .types(String.class,String.class,Integer.class); csvSource.map(new MyMapFunction()).print(); } @FunctionAnnotation.NonForwardedFields("f2") public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple3<String,String,Integer>>{ @Override public Tuple3<String, String, Integer> map( Tuple3<String, String, Integer> tuple3) throws Exception { return new Tuple3<>(tuple3.f0, tuple3.f1, tuple3.f2 * 2); } } }
运行结果:
(200,tony,10000) (300,cherry,24000) (100,lili,16000) (100,jack,68000) (200,jone,13400) (100,alex,30000) (400,lucy,15600)
Read Fileds注解
读取字段(Read Fileds)注解用来指定Function中需要读取以及参与函数计算的字段,在注解中被指定的字段将全部参与当前函数结果的运算过程,如条件判断、数值计算等。
package com.aikfk.flink.dataset.semantic; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple3; /** * @author :caizhengjie * @description:TODO * @date :2021/3/9 9:22 下午 */ public class ReadFields { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<String,String,Integer>> csvSource = env .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .types(String.class,String.class,Integer.class); csvSource.map(new MyMapFunction()).print(); } @FunctionAnnotation.ReadFields("f0;f1;f2") public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple3<String,String,Integer>>{ @Override public Tuple3<String, String, Integer> map( Tuple3<String, String, Integer> tuple3) throws Exception { return new Tuple3<>(tuple3.f0, tuple3.f1, tuple3.f2 * 2); } } }
运行结果:
(200,tony,10000) (100,lili,16000) (100,alex,30000) (300,cherry,24000) (200,jone,13400) (100,jack,68000) (400,lucy,15600)
(2)广播变量
广播变量是分布式计算框架中经常会用到的一种数据共享方式,目的是对小数据集采用网络传输的方式,在每个并行的计算节点的实例内存中存储一份该数据集,所在的计算节点实例均可以在本地内存中直接读取被广播的数据集,这样能够避免在数据计算过程中多次通过远程的方式从其他节点中读取小数据集,从而提升整体任务的计算性能
注意:
广播出去的变量是存在于每个节点的内存中的,所以,这个数据量不宜太大,因为广播出去的数据会常驻内存,除非程序停止。
广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
建议在数据集大几十兆或者几百兆的大小的时候进行广播,如果数据上G,就不建议采用广播变量。
广播: 广播变量通过withBroadcastSet(DataSet, String)方法进行注册。
访问: 可通过getRuntimeContext().getBroadcastVariable(String)对变量进行访问。
package com.aikfk.flink.dataset.broadcast; import com.aikfk.flink.base.EmployeePOJO; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import java.util.List; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 2:04 下午 */ public class BroadcastExample { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取csv数据(方式一:映射POJO类对象) DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .pojoType(EmployeePOJO.class,"deptId","name","salary"); DataSet<EmployeePOJO> broadcastDataSet = env.fromElements(new EmployeePOJO("100","alex",12000)); /** * EmployeePOJO -> filter() -> 过滤出广播变量 */ DataSet<EmployeePOJO> filterSource = dataSource.filter(new RichFilterFunction<EmployeePOJO>() { List<EmployeePOJO> broadList = null; @Override public boolean filter(EmployeePOJO employeePOJO) throws Exception { for (EmployeePOJO broad : broadList){ return ! employeePOJO.name.equals(broad.name); } return false; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 对广播变量进行访问 broadList = getRuntimeContext().getBroadcastVariable("broadcastDataSet"); } // 注册广播变量 }).withBroadcastSet(broadcastDataSet,"broadcastDataSet"); filterSource.print(); } }
运行结果:
EmployeePOJO{deptId='200', name='tony', salary=5000} EmployeePOJO{deptId='300', name='cherry', salary=12000} EmployeePOJO{deptId='400', name='lucy', salary=7800} EmployeePOJO{deptId='200', name='jone', salary=6700} EmployeePOJO{deptId='100', name='jack', salary=34000} EmployeePOJO{deptId='100', name='lili', salary=8000}
(3)分布式缓存
在批计算中,需要处理的数据集大部分来自于文件,对于某些文件尽管是放在类于HDFS之上的分布式文件系统中,但由于Flink并不像MapReduce一样让计算随着数据所在位置上进行,因此多数情况下会出现通过网络频繁地复制文件的情况。因此对于有些高频使用的文件可以通过分布式缓存的方式,将其放置在每台计算节点实例的本地Task内存中,这样就能够避免因为读取某些文件而必须通过网络远程获取文件的情况进而提升整个任务的执行效率。
分存式缓存在 Execution Environment中直接注册文件或文件夹,Flink在启动任务的程中将会把指定的文件同步到task所在计算节点的本地文件系统中,目前支持本地文件、HDFS、S3等文件系统,另外可以通过 Boolean参数来指定文件是否可执行。
分布式缓存的用法 一 注册缓存文件:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // register a file from HDFS env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile") // register a local executable file (script, executable, ...) env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true) // define your program and execute ... DataSet<String> input = ... DataSet<Integer> result = input.map(new MyMapper()); ... env.execute();
分布式缓存的用法 一 访问缓存文件:
// extend a RichFunction to have access to the RuntimeContext public final class MyMapper extends RichMapFunction<String, Integer> { @Override public void open(Configuration config) { // access cached file via RuntimeContext and DistributedCache File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile"); // read the file (or navigate the directory) ... } @Override public Integer map(String value) throws Exception { // use content of cached file ... } }
(4)参数传递
(1)通过构造函数传递
DataSet<Integer> toFilter = env.fromElements(1, 2, 3); toFilter.filter(new MyFilter(2)); private static class MyFilter implements FilterFunction<Integer> { private final int limit; public MyFilter(int limit) { this.limit = limit; } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }
(2)通过withParameters(Configuration)函数传递
package com.aikfk.flink.dataset.parameters; import com.aikfk.flink.base.EmployeePOJO; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 2:04 下午 */ public class PassingParameters { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取csv数据(方式一:映射POJO类对象) DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .pojoType(EmployeePOJO.class,"deptId","name","salary"); // 设置参数 Configuration config = new Configuration(); config.setInteger("limit", 10000); /** * EmployeePOJO -> filter() */ DataSet<EmployeePOJO> filterSource = dataSource.filter(new RichFilterFunction<EmployeePOJO>() { public int limit; @Override public boolean filter(EmployeePOJO employeePOJO) throws Exception { // 过滤出比参数1000大的值 return employeePOJO.salary > limit; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); limit = parameters.getInteger("limit",0); } // 通过withParameters(config)传入参数 }).withParameters(config); filterSource.print(); } }
运行结果:
EmployeePOJO{deptId='300', name='cherry', salary=12000} EmployeePOJO{deptId='100', name='jack', salary=34000} EmployeePOJO{deptId='100', name='alex', salary=15000}
(3)通过全局自定义配置
Flink允许将自定义配置值传递到ExecutionConfig,以供自定义配置全局可用。
注意:
GlobalConfig与params的区别在于,后者只是在局部代码中设置的Config,并没有影响到env,所以在代码的后面要加上withParameters (config),但是GlobalConfig不同,它改变了env的环境里面的config。所以,在代码后面,不需要显式的指定加上config.
package com.aikfk.flink.dataset.parameters; import com.aikfk.flink.base.EmployeePOJO; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 2:04 下午 */ public class GlobalParameters { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取csv数据(方式一:映射POJO类对象) DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv") .pojoType(EmployeePOJO.class,"deptId","name","salary"); // 设置全局参数 Configuration config = new Configuration(); config.setInteger("limit", 10000); env.getConfig().setGlobalJobParameters(config); /** * EmployeePOJO -> filter() */ DataSet<EmployeePOJO> filterSource = dataSource.filter(new RichFilterFunction<EmployeePOJO>() { public int limit; @Override public boolean filter(EmployeePOJO employeePOJO) throws Exception { // 过滤出比参数1000大的值 return employeePOJO.salary > limit; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 传入全局参数 ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); Configuration globConf = (Configuration) globalParams; limit = globConf.getInteger("limit", 0); } }); filterSource.print(); } }
运行结果:
EmployeePOJO{deptId='300', name='cherry', salary=12000} EmployeePOJO{deptId='100', name='jack', salary=34000} EmployeePOJO{deptId='100', name='alex', salary=15000}