Flink之DataSet语义注释、广播变量、分布式缓存及参数传递

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)语义注释


在 Flink批量数据处理过程中,往往传入函数的对象中可能含有很多字段,其中有些字段是Function计算会用到的,但有些字段在进入 Funciton后并没有参与到实际计算过程中。针对这种情况,Fink提出了语义注解的功能,将这些字段在Function中通过注解的形式标记出来,区分出哪些是需要参与函数计算的字段,哪些是直接输出的字段。Flink Runtime在执行算子过程中,会对注解的字段进行判别,对于不需要函数处理的字段直接转发到Output对象中,以减少数据传输过程中所消耗的网络10或者不必要的排序操作等,以提升整体应用的处理效率。语义标注目的是优化我们的程序


Forwarded Fileds注解

转发字段(Forwarded Fileds)代表数据从Function进入后,对指定为Forwarded的Fileds不进行修改,且不参与函数的计算逻辑,而是根据设定的规则表达式,将Fields直接推送到Output对象中的相同位置或者指定位置上。

package com.aikfk.flink.dataset.semantic;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/9 9:22 下午
 */
public class ForwardedFields {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<String,String,Integer>> csvSource = env
                .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .types(String.class,String.class,Integer.class);
        csvSource.map(new MyMapFunction()).print();
    }
    @FunctionAnnotation.ForwardedFields("f0 -> f3")
    public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple4<String,String,Integer,String>>{
        @Override
        public Tuple4<String, String, Integer, String> map(
                Tuple3<String, String, Integer> tuple3) throws Exception {
            return new Tuple4<>(tuple3.f0, tuple3.f1, tuple3.f2,tuple3.f0);
        }
    }
}

运行结果:

(200,jone,6700,200)
(100,lili,8000,100)
(100,alex,15000,100)
(300,cherry,12000,300)
(100,jack,34000,100)
(200,tony,5000,200)
(400,lucy,7800,400)

Non-Forwarded Fileds注解

和前面提到的Forwarded Fileds相反,Non-Forwarded Fileds用于指定不转发的字段,也就是说除了某些字段不转发在输出Tuple相应的位置上,其余字段全部放置在输出Tuple中相同的位置上,对于被Non-Forwarded Fileds指定的字段将必须参与到函数计算过程中,并产生新的结果进行输出。

package com.aikfk.flink.dataset.semantic;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/9 9:22 下午
 */
public class NonForwardedFields {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<String,String,Integer>> csvSource = env
                .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .types(String.class,String.class,Integer.class);
        csvSource.map(new MyMapFunction()).print();
    }
    @FunctionAnnotation.NonForwardedFields("f2")
    public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple3<String,String,Integer>>{
        @Override
        public Tuple3<String, String, Integer> map(
                Tuple3<String, String, Integer> tuple3) throws Exception {
            return new Tuple3<>(tuple3.f0, tuple3.f1, tuple3.f2 * 2);
        }
    }
}

运行结果:

(200,tony,10000)
(300,cherry,24000)
(100,lili,16000)
(100,jack,68000)
(200,jone,13400)
(100,alex,30000)
(400,lucy,15600)

Read Fileds注解

读取字段(Read Fileds)注解用来指定Function中需要读取以及参与函数计算的字段,在注解中被指定的字段将全部参与当前函数结果的运算过程,如条件判断、数值计算等。

package com.aikfk.flink.dataset.semantic;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/9 9:22 下午
 */
public class ReadFields {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<String,String,Integer>> csvSource = env
                .readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .types(String.class,String.class,Integer.class);
        csvSource.map(new MyMapFunction()).print();
    }
    @FunctionAnnotation.ReadFields("f0;f1;f2")
    public static class MyMapFunction implements MapFunction<Tuple3<String,String,Integer>, Tuple3<String,String,Integer>>{
        @Override
        public Tuple3<String, String, Integer> map(
                Tuple3<String, String, Integer> tuple3) throws Exception {
            return new Tuple3<>(tuple3.f0, tuple3.f1, tuple3.f2 * 2);
        }
    }
}

运行结果:

(200,tony,10000)
(100,lili,16000)
(100,alex,30000)
(300,cherry,24000)
(200,jone,13400)
(100,jack,68000)
(400,lucy,15600)


(2)广播变量


广播变量是分布式计算框架中经常会用到的一种数据共享方式,目的是对小数据集采用网络传输的方式,在每个并行的计算节点的实例内存中存储一份该数据集,所在的计算节点实例均可以在本地内存中直接读取被广播的数据集,这样能够避免在数据计算过程中多次通过远程的方式从其他节点中读取小数据集,从而提升整体任务的计算性能


注意:


广播出去的变量是存在于每个节点的内存中的,所以,这个数据量不宜太大,因为广播出去的数据会常驻内存,除非程序停止。

广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

建议在数据集大几十兆或者几百兆的大小的时候进行广播,如果数据上G,就不建议采用广播变量。


广播: 广播变量通过withBroadcastSet(DataSet, String)方法进行注册。

访问: 可通过getRuntimeContext().getBroadcastVariable(String)对变量进行访问。

package com.aikfk.flink.dataset.broadcast;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import java.util.List;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 2:04 下午
 */
public class BroadcastExample {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        DataSet<EmployeePOJO> broadcastDataSet = env.fromElements(new EmployeePOJO("100","alex",12000));
        /**
         * EmployeePOJO -> filter() -> 过滤出广播变量
         */
        DataSet<EmployeePOJO> filterSource =  dataSource.filter(new RichFilterFunction<EmployeePOJO>() {
            List<EmployeePOJO> broadList = null;
            @Override
            public boolean filter(EmployeePOJO employeePOJO) throws Exception {
                for (EmployeePOJO broad : broadList){
                    return ! employeePOJO.name.equals(broad.name);
                }
                return false;
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // 对广播变量进行访问
                broadList = getRuntimeContext().getBroadcastVariable("broadcastDataSet");
            }
            // 注册广播变量
        }).withBroadcastSet(broadcastDataSet,"broadcastDataSet");
        filterSource.print();
    }
}

运行结果:

EmployeePOJO{deptId='200', name='tony', salary=5000}
EmployeePOJO{deptId='300', name='cherry', salary=12000}
EmployeePOJO{deptId='400', name='lucy', salary=7800}
EmployeePOJO{deptId='200', name='jone', salary=6700}
EmployeePOJO{deptId='100', name='jack', salary=34000}
EmployeePOJO{deptId='100', name='lili', salary=8000}

(3)分布式缓存


在批计算中,需要处理的数据集大部分来自于文件,对于某些文件尽管是放在类于HDFS之上的分布式文件系统中,但由于Flink并不像MapReduce一样让计算随着数据所在位置上进行,因此多数情况下会出现通过网络频繁地复制文件的情况。因此对于有些高频使用的文件可以通过分布式缓存的方式,将其放置在每台计算节点实例的本地Task内存中,这样就能够避免因为读取某些文件而必须通过网络远程获取文件的情况进而提升整个任务的执行效率。

分存式缓存在 Execution Environment中直接注册文件或文件夹,Flink在启动任务的程中将会把指定的文件同步到task所在计算节点的本地文件系统中,目前支持本地文件、HDFS、S3等文件系统,另外可以通过 Boolean参数来指定文件是否可执行。


分布式缓存的用法 一 注册缓存文件:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

分布式缓存的用法 一 访问缓存文件:

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {
    @Override
    public void open(Configuration config) {
      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }
    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}


(4)参数传递


(1)通过构造函数传递

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
toFilter.filter(new MyFilter(2));
private static class MyFilter implements FilterFunction<Integer> {
  private final int limit;
  public MyFilter(int limit) {
    this.limit = limit;
  }
  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}

(2)通过withParameters(Configuration)函数传递

package com.aikfk.flink.dataset.parameters;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 2:04 下午
 */
public class PassingParameters {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        // 设置参数
        Configuration config = new Configuration();
        config.setInteger("limit", 10000);
        /**
         * EmployeePOJO -> filter()
         */
        DataSet<EmployeePOJO> filterSource =  dataSource.filter(new RichFilterFunction<EmployeePOJO>() {
            public int limit;
            @Override
            public boolean filter(EmployeePOJO employeePOJO) throws Exception {
                // 过滤出比参数1000大的值
                return employeePOJO.salary > limit;
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                limit = parameters.getInteger("limit",0);
            }
            // 通过withParameters(config)传入参数
        }).withParameters(config);
        filterSource.print();
    }
}

运行结果:

EmployeePOJO{deptId='300', name='cherry', salary=12000}
EmployeePOJO{deptId='100', name='jack', salary=34000}
EmployeePOJO{deptId='100', name='alex', salary=15000}

(3)通过全局自定义配置


Flink允许将自定义配置值传递到ExecutionConfig,以供自定义配置全局可用。


注意:

GlobalConfig与params的区别在于,后者只是在局部代码中设置的Config,并没有影响到env,所以在代码的后面要加上withParameters (config),但是GlobalConfig不同,它改变了env的环境里面的config。所以,在代码后面,不需要显式的指定加上config.

package com.aikfk.flink.dataset.parameters;
import com.aikfk.flink.base.EmployeePOJO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 2:04 下午
 */
public class GlobalParameters {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<EmployeePOJO> dataSource = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")
                .pojoType(EmployeePOJO.class,"deptId","name","salary");
        // 设置全局参数
        Configuration config = new Configuration();
        config.setInteger("limit", 10000);
        env.getConfig().setGlobalJobParameters(config);
        /**
         * EmployeePOJO -> filter()
         */
        DataSet<EmployeePOJO> filterSource =  dataSource.filter(new RichFilterFunction<EmployeePOJO>() {
            public int limit;
            @Override
            public boolean filter(EmployeePOJO employeePOJO) throws Exception {
                // 过滤出比参数1000大的值
                return employeePOJO.salary > limit;
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // 传入全局参数
                ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
                Configuration globConf = (Configuration) globalParams;
                limit = globConf.getInteger("limit", 0);
            }
        });
        filterSource.print();
    }
}

运行结果:

EmployeePOJO{deptId='300', name='cherry', salary=12000}
EmployeePOJO{deptId='100', name='jack', salary=34000}
EmployeePOJO{deptId='100', name='alex', salary=15000}




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8月前
|
数据处理 Apache 流计算
Flink Watermark和时间语义
Flink Watermark和时间语义
79 2
|
8月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
791 5
|
8月前
|
存储 缓存 编译器
【CMake 命令相关知识】深入理解 CMake命令中的 内置缓存变量
【CMake 命令相关知识】深入理解 CMake命令中的 内置缓存变量
151 0
|
缓存 NoSQL Java
Java实现redis缓存效果变量过期
Java实现redis缓存效果变量过期
109 0
|
8月前
|
Apache 流计算
【Flink】Flink的三种时间语义
【4月更文挑战第19天】【Flink】Flink的三种时间语义
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
113 3
|
程序员 API 数据安全/隐私保护
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
|
5月前
|
存储 Java 流计算
Flink 分布式快照,神秘机制背后究竟隐藏着怎样的惊人奥秘?快来一探究竟!
【8月更文挑战第26天】Flink是一款开源框架,支持有状态流处理与批处理任务。其核心功能之一为分布式快照,通过“检查点(Checkpoint)”机制确保系统能在故障发生时从最近的一致性状态恢复,实现可靠容错。Flink通过JobManager触发检查点,各节点暂停接收新数据并保存当前状态至稳定存储(如HDFS)。采用“异步屏障快照(Asynchronous Barrier Snapshotting)”技术,插入特殊标记“屏障(Barrier)”随数据流传播,在不影响整体流程的同时高效完成状态保存。例如可在Flink中设置每1000毫秒进行一次检查点并指定存储位置。
92 0
|
8月前
|
SQL 分布式计算 HIVE
基于 Kyuubi 实现分布式 Flink SQL 网关
本文整理自网易互娱资深开发工程师、Apache Kyuubi Committer 林小铂的《基于 Kyuubi 实现分布式 Flink SQL 网关》分享。
104803 64
基于 Kyuubi 实现分布式 Flink SQL 网关
|
6月前
|
存储 缓存 算法
同时使用线程本地变量以及对象缓存的问题
【7月更文挑战第15天】同时使用线程本地变量和对象缓存需小心处理以避免数据不一致、竞争条件及内存泄漏等问题。线程本地变量使各线程拥有独立存储,但若与对象缓存关联,可能导致多线程环境下访问旧数据。缺乏同步机制时,多线程并发修改缓存中的共享对象还会引起数据混乱。此外,若线程结束时未释放对象引用,可能导致内存泄漏。例如,在Web服务器场景下,若一更新缓存而另一线程仍获取旧数据,则可能返回错误信息;在图像处理应用中,若多线程无序修改算法对象则可能产生错误处理结果。因此,需确保数据一致性、避免竞争条件并妥善管理内存。