大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink 状态存储

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

KeyedState

Operator State

上节进度

上节我们到了:

使用ManageOperatorState

(这里以及后续放到下一篇:大数据-127 Flink)


接下来我们继续上节的内容


使用ManageOperatorState

我们可以通过实现CheckpointedFunction或ListCheckpointed接口来使用 ManagedOperatorState。

CheckpointFunction

CheckpointFunction接口提供了访问 non-keyed state的方法,需要实现如下两个方法:


void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

进行Checkpoint时会调用snapshotState(),用户自定义函数化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的Checkpoint恢复。因此 initializeState(),不仅是定义不同的状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前,ManagedOperatorState以list的形式存在,这些状态是一个可序列化对象的集合List,彼此独立,方便在改变后进行状态的重新分派,换句话说,这些对象是重新分配non-keyed state的最新粒度,根据状态不同访问方式,有如下几种重新分配的模式:


Event-split redistribution:每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成,当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。比如说,算子A的并发读为1,包含两个元素element1和element2,当并发增加为2时,element1会被分发到并发0上,element2会被分发到并发1上。

Union redistribution:每个算子保存一个列表形式的状态集合,整个状态由所有列表拼接而成,当作业恢复或重新分配时,每个算子都将获得所有的状态数据。

ListCheckpointed

ListCheckpointed 接口是 CheckpointedFunction的精简版,仅支持 even-split redistribution 的list state,同样需要实现下面两个方法:


List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;


snapshotState()需要返回一个将写入到checkpoint的对象列表,restoreState则需要处理恢复回来的对象列表,如果状态不可切分,则可以在snapshotState()中返回,Collections.singletonList(MY_STATE)。


StateBackend 如何保存

上面我们介绍了三种 StateBackend:


MemoryStateBackend

FsStateBackend

RocksDBStateBackend

在Flink的实际实现中,对于同一种StateBackend,不同的State在运行时会有细分的StateBackend托管,例如:MemoryStateBackend,就有DefaultOperatorStateBackend管理OperatorState,HeapKeyedStateBackend管理KeyedState。


我们看到MemoryStateBackend和FsStateBackend对于KeyedState和OperatorState的存储都符合我们之前的理解,运行时State数据保存于内存,checkpoint的保存位置需要注意下,并不是在RocksDB中,而是通过DefaultOperatorStateBackend保存于TaskManager内存。创建的源码如下:

// RocksDBStateBackend.java
// 创建 keyed statebackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(...){
    ...
    return new RocksDBKeyedStateBackend<>(
    ...);
}
// 创建 Operator statebackend
public OperatorStateBackend createOperatorStateBackend(
    Environment env, String operatorIdentifier) throws Exception {
        //the default for RocksDB; eventually there can be a operator state
        backend based on RocksDB, too.
        final boolean asyncSnapshots = true;
        return new DefaultOperatorStateBackend(
    ...);
}

源码中也标注了,未来会提供基于RocksDB存储的OperatorState,所以当前即使使用RocksDBStateBackend,OperatorState也会超过内存限制。


Operator State 在内存中对应两种数据结构:

数据结构1:ListState 对应的实际实现类为 PartitionableListState,创建并注册的代码如下:

// DefaultOperatorStateBackend.java
private <S> ListState<S> getListState(...){
    partitionableListState = new PartitionableListState<>(
        new RegisteredOperatorStateBackendMetaInfo<>(
            name,
            partitionStateSerializer,
            mode));
    registeredOperatorStates.put(name, partitionableListState);
}

PartitionableListState中通过ArrayList来保存State数据:

// PartitionableListState.java
/**
* The internal list the holds the elements of the state
*/
private final ArrayList<S> internalList;

数据结构2:BroadcastState 对应的实际实现类为 HeapBroadcastState

创建并注册的代码如下:

public <K, V> BroadcastState<K, V> getBroadcastState(...) {
    broadcastState = new HeapBroadcastState<>(
        new RegisteredBroadcastStateBackendMetaInfo<>(
            name,
            OperatorStateHandle.Mode.BROADCAST,
            broadcastStateKeySerializer,
            broadcastStateValueSerializer));
    registeredBroadcastStates.put(name, broadcastState);
}

HeapBroadcastState中通过HashMap来保存State数据:

/**
* The internal map the holds the elements of the state.
*/
private final Map<K, V> backingMap;
HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
    this(stateMetaInfo, new HashMap<>());
}

配置StateBackend

我们知道Flink提供了三个StateBackend,那么如何配置使用某个StateBackend呢?默认的配置在conf/flink-conf.yaml文件中 state.backend 指定,如果没有配置该值,就会使用 MemoryStateBackend,默认的是StateBackend可以被代码中的配置覆盖。


Per-job设置

我们可以通过StreamExecutionEnvironment设置:


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new
FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果想使用RocksDBStateBackend,你需要将相关依赖加入你的Flink中:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
</dependency>

默认设置

如果没有在程序中指定,Flink将使用 conf/flink-conf.yaml文件中的 state.backend 指定的 StateBackend,这个值有三种配置:


JobManager(代表 MemoryStateBackend)

FileSystem(代表FsStateBackend)

RocksDB(代表RocksDBStateBackend)

开启Checkpoint

开启CheckPoint后,StateBackend管理的TaskManager上的状态数据才会被定期备份到JobManager或外部存储,这些状态数据在作业失败恢复时会用到。我们可以通过以下代码开启和配置CheckPoint:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.getConfig().disableSysoutLogging();
//每 30 秒触发一次 checkpoint,checkpoint 时间应该远小于(该值 + MinPauseBetweenCheckpoints),否则程序会一直做checkpoint,影响数据处理速度
env.enableCheckpointing(30000); // create a checkpoint every 30 seconds
// set mode to exactly-once (this is the default)
// flink 框架内保证 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 30 s of progress happen between checkpoints
// 两个 checkpoints之间最少有 30s 间隔(上一个checkpoint完成到下一个checkpoint开始,默认 为0,这里建议设置为非0值)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
// checkpoints have to complete within one minute, or are discarded
// checkpoint 超时时间(默认 600 s)
env.getCheckpointConfig().setCheckpointTimeout(600000);
// allow only one checkpoint to be in progress at the same time
// 同时只有一个checkpoint运行(默认)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
// 取消作业时是否保留 checkpoint (默认不保留)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// checkpoint失败时 task 是否失败( 默认 true, checkpoint失败时,task会失败)
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
// 对 FsStateBackend 刷出去的文件进行文件压缩,减小 checkpoint 体积
env.getConfig().setUseSnapshotCompression(true);

FsStateBackend 和 RocksDBStateBackend CheckPoint完成后最终保存到下面的目录:

hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/

JOB_ID是应用的唯一ID,CHECK_POINT_ID 是每次 CheckPoint时自增的数字ID,我们可以从备份的CheckPoint数据恢复当时的作业状态。

flink-1x.x/bin/flink run -s hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/ path/to//your/jar

我们可以实现 CheckpointedFunction 方法,在程序初始化的时候修改状态:

public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue> implements CheckpointedFunction {
    ValueState<Integer> processedInt;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }
    @Override
    public void processElement(KeyValue keyValue, Context context,
        Collector<KeyValue> collector) throws Exception {
        try{
            Integer a = Integer.parseInt(keyValue.getValue());
            processedInt.update(a);
            collector.collect(keyValue);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void initializeState(FunctionInitializationContext
        functionInitializationContext) throws Exception {
        processedInt = functionInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor<>("processedInt", Integer.class));
        if(functionInitializationContext.isRestored()){
            //Apply logic to restore the data
        }
    }
    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        processedInt.clear();
    }
}
```


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
27天前
|
Cloud Native 大数据 Java
大数据新视界--大数据大厂之大数据时代的璀璨导航星:Eureka 原理与实践深度探秘
本文深入剖析 Eureka 在大数据时代分布式系统中的关键作用。涵盖其原理,包括服务注册、续约、发现及自我保护机制;详述搭建步骤、两面性;展示在大数据等多领域的应用场景、实战案例及代码演示。Eureka 如璀璨导航星,为分布式系统高效协作指引方向。
|
7月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
454 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
6月前
|
存储 SQL 缓存
Flink 2.0 存算分离状态存储 — ForSt DB 
本文整理自阿里云技术专家兰兆千在Flink Forward Asia 2024上的分享,主要介绍Flink 2.0的存算分离架构、全新状态存储内核ForSt DB及工作进展与未来展望。Flink 2.0通过存算分离解决了本地磁盘瓶颈、检查点资源尖峰和作业恢复速度慢等问题,提升了云原生部署能力。ForSt DB作为嵌入式Key-value存储内核,支持远端读写、批量并发优化和快速检查点等功能。性能测试表明,ForSt在异步访问和本地缓存支持下表现卓越。未来,Flink将继续完善SQL Operator的异步优化,并引入更多流特性支持。
731 88
Flink 2.0 存算分离状态存储 — ForSt DB 
|
4月前
|
机器学习/深度学习 数据采集 分布式计算
大数据分析中的机器学习基础:从原理到实践
大数据分析中的机器学习基础:从原理到实践
188 3
|
5月前
|
SQL 存储 大数据
Flink 基础详解:大数据处理的强大引擎
Apache Flink 是一个分布式流批一体化的开源平台,专为大规模数据处理设计。它支持实时流处理和批处理,具有高吞吐量、低延迟特性。Flink 提供统一的编程抽象,简化大数据应用开发,并在流处理方面表现卓越,广泛应用于实时监控、金融交易分析等场景。其架构包括 JobManager、TaskManager 和 Client,支持并行度、水位线、时间语义等基础属性。Flink 还提供了丰富的算子、状态管理和容错机制,如检查点和 Savepoint,确保作业的可靠性和一致性。此外,Flink 支持 SQL 查询和 CDC 功能,实现实时数据捕获与同步,广泛应用于数据仓库和实时数据分析领域。
1396 32
|
5月前
|
存储 SQL 数据挖掘
深入理解 Flink 中的 State
Flink 的 State(状态)是其四大核心之一,为流处理和批处理任务提供强大支持。本文深入探讨 Flink 中的状态管理,涵盖 State 在 HDFS 中的存储格式、存在形式(如 ValueState、ListState 等)、使用方法、过期时间 TTL 和清除策略,并介绍 Table API 和 SQL 模块中的状态管理。通过实际案例,帮助读者理解如何在电商订单处理、实时日志统计等场景中有效利用状态管理功能。
416 16
zdl
|
7月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
316 56
|
6月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
246 16
|
7月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
177 1
|
1月前
|
存储 机器学习/深度学习 人工智能
数据与生命的对话:当大数据遇上生物信息学
数据与生命的对话:当大数据遇上生物信息学
71 17

热门文章

最新文章