大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction

简介: 大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节完成了如下的内容:


FlinkDataStreamAPI 自定义输入源

非并行源介绍与代码

并行源介绍与代码

Rich并行源

基本介绍

在 Apache Flink 中,RichSourceFunction 是一种增强的源函数(Source Function),它允许开发者在定义源操作时,能够访问 Flink 的生命周期方法、状态管理、配置访问等更多功能。RichSourceFunction 是并行源的一个扩展,它继承自 RichFunction 接口,而 RichFunction 提供了更丰富的功能,比如访问运行时上下文、管理状态、以及在作业开始和结束时执行初始化或清理操作。


主要特点

生命周期方法:RichSourceFunction 提供了 open() 和 close() 方法,分别在作业开始时和结束时调用。这允许你在数据读取前进行初始化操作(如打开连接、加载配置),以及在作业结束时进行清理工作(如关闭连接、释放资源)。

访问运行时上下文:通过 getRuntimeContext() 方法,RichSourceFunction 可以访问 Flink 的运行时上下文,获取并行度信息、任务名称、指标管理器,以及与状态相关的操作。

状态管理:RichSourceFunction 可以结合 Flink 的状态管理机制,保存和恢复状态。这对于需要在流处理中维护中间状态的源函数非常有用,尤其是在故障恢复时,状态可以帮助恢复到故障前的状态。

并行执行:与普通的 SourceFunction 类似,RichSourceFunction 也可以通过设置并行度来并行执行,这使得它可以处理大规模的数据

源。

状态管理

RichFunction 与 Flink 的状态管理系统高度集成,允许你在分布式环境中维护和管理操作符的中间状态。Flink 支持两种主要类型的状态:ValueState 和 ListState,以及更复杂的 MapState 和 ReducingState。


ValueState: 适用于需要保存单个值的场景,如计数器、标志位等。

ListState: 适用于需要保存多个值的场景,如窗口计算中的中间结果。

MapState: 适用于需要维护键值对的场景,特别是在进行复杂的数据关联或聚合时。

ReducingState: 适用于需要持续聚合数据的场景,比如计数、求和等。

示例代码

以下是一个使用 RichParallelSourceFunction 的简单示例,展示了如何在 Flink 中实现一个并行的、具有生命周期管理的源函数:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class MyRichParallelSource extends RichParallelSourceFunction<String> {

    private volatile boolean isRunning = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 在任务开始时执行初始化操作
        System.out.println("Task " + getRuntimeContext().getTaskName() + " is starting.");
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        // 模拟数据流的产生
        while (isRunning) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect("Data from task " + getRuntimeContext().getIndexOfThisSubtask());
            }
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void close() throws Exception {
        // 在任务结束时执行清理操作
        System.out.println("Task " + getRuntimeContext().getTaskName() + " is closing.");
    }
}

代码解析

open() 方法:在任务开始时调用,适用于进行连接初始化、参数设置等操作。在这个方法中,你可以访问 Flink 的配置和运行时上下文。

run() 方法:实现数据源的核心逻辑,这个方法会在源函数启动后被调用。可以使用 ctx.collect() 方法将生成的数据发送到下游处理。

cancel() 方法:用于取消任务。当作业被取消或停止时,Flink 会调用这个方法,可以在这里做一些清理工作或者安全地停止数据生成。

close() 方法:在任务结束时调用,用于释放资源和进行清理操作。

注意事项

状态一致性:在并行源中,如果需要维护状态,一定要注意状态的一致性和恢复机制,确保在作业恢复时可以正确地恢复数据源的状态。

并行度设置:RichParallelSourceFunction 作为并行源,可以通过 setParallelism 方法设置并行度,确保根据任务的需求合理分配并行实例的数量。

RichParallelSource

package icu.wzk;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class RichParallelSourceRich extends RichParallelSourceFunction<String> {

    private long count = 1L;
    private boolean running = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (running) {
            count ++;
            ctx.collect(String.valueOf(count));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

RichParallelSourceTest

package icu.wzk;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;

public class RichParallelSourceRichTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.getJavaEnv().addSource(new RichParallelSourceRich());
        data.print();
        env.execute("RichParallelSourceRichTest");
    }

}

运行结果

3> 10
5> 10
8> 10
6> 10
2> 10
4> 10
7> 10
1> 10
6> 11
5> 11
8> 11
2> 11
3> 11
4> 11
7> 11
1> 11
2> 12
3> 12
...

控制台输出结果如下所示:

为什么 Rich 类使用广泛

生命周期管理:Rich 类提供了 open() 和 close() 方法,允许开发者在任务开始和结束时执行初始化和清理操作。这对于需要设置资源(如数据库连接、文件读写、外部服务连接)的操作非常有用。

运行时上下文访问:通过 getRuntimeContext(),Rich 类可以访问任务的并行度信息、任务名称、子任务索引、状态管理等。对于需要根据任务上下文调整行为或需要跨并行实例共享状态的场景,这些信息是至关重要的。

状态管理:RichFunction 可以方便地与 Flink 的状态管理结合使用。在状态丰富的应用场景(如需要维护中间计算结果、计数器、缓存等)的流处理中,Rich 类显得非常有用。

性能监控:Rich 类允许开发者在 open() 方法中注册 Flink 的度量指标(Metrics),帮助监控和优化作业的性能。

什么时候不用 Rich 类

简单操作:如果你只是需要进行简单的转换或过滤操作,没有复杂的初始化、状态管理或清理需求,那么 Rich 类的额外功能可能并不必要。

高性能要求的场景:在一些对性能要求极高的场景中,尽量减少复杂的操作和额外的上下文访问,直接使用轻量级的 MapFunction、FilterFunction 等可能会有更好的性能表现。


相关文章
|
26天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
3天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
358 14
|
19天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
6天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
23天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2591 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
5天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
181 2
|
3天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
105 65
|
6天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
330 2
|
23天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1580 17
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码