Flink state 详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink state 详解

checkPoint简介

  • 为了保证state的容错性,Flink需要对state进行checkpoint。
  • Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常
  • Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
  • 持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
  • 用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

checkPoint配置

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
  • checkpoint开启之后,默认的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有两种,Exactly-once和At-least-once
  • Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)
  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000); 
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000); 
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint

State Backend(状态的后端存储)

  • 默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。
  • state 的store和checkpoint的位置取决于State Backend的配置
  • env.setStateBackend(…)
  • 一共有三种State Backend
  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend
  • MemoryStateBackend
  • state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中
  • 基于内存的Memory state backend在生产环境下不建议使用
  • FsStateBackend
  • state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中
  • 可以使用hdfs等分布式文件系统
  • RocksDBStateBackend
  • RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地
  • RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

State Backend使用方式

修改State Backend的两种方式

  • 第一种:单任务调整
  • 修改当前任务代码
  • env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
  • 或者new MemoryStateBackend()
  • 或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
  • 第二种:全局调整
  • 修改flink-conf.yaml
state.backend: filesystem 
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints 
  • 注意:state.backend的值可以是下面几种:
  • jobmanager(MemoryStateBackend)
  • filesystem(FsStateBackend)
  • rocksdb(RocksDBStateBackend)

State backend演示

第一种:单任务调整

启动连接socket zzy:9001的程序

 

./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001

 

[iknow@data-5-63 flink-1.7.2]$ ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
2019-03-06 12:03:15,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,325 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-03-06 12:03:15,415 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,415 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,421 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,421 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,511 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-03-06 12:03:15,819 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/zhangzhiyong/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-06 12:03:16,386 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1551789318445_0004
2019-03-06 12:03:16,412 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1551789318445_0004
2019-03-06 12:03:16,412 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-03-06 12:03:16,414 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-03-06 12:03:19,940 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
Starting execution of program

如果zzy上未开启9001端口,到jobManager的web ui上看到会报下面的错

代码里设置了checkpoint

 

//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认checkpoint功能是disabled的,想要使用的时候需要先启用;每隔10000ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(10000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
//设置statebackend
//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://zzy:9000/flink/checkpoints"));
//rocksDB需要引入依赖flink-statebackend-rocksdb_2.11
//env.setStateBackend(new RocksDBStateBackend("hdfs://zzy:9000/flink/checkpoints",true));
env.setStateBackend(new FsStateBackend("hdfs://192.168.5.63:9000/flink/checkpoints"));

但是JobManager的web ui上checkpoint并未触发

报错如下,应该是连接不到zzy 9001,识别不了zzy

 

选择监听50.63上的9001端口,如果没有nc命令,用

 

yum install -y nc

安装下,用下面的命令启动flink程序,采用flink on yarn的方式

 

./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001

 

2019-03-06 16:00:24,680 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

如果一直出现Deployment xxx,此时可能是集群上没有资源了,

这里杀掉application_1551789318445_0007和application_1551789318445_0008(这两台是测试机器,资源很紧张)

然后再次重启程序

 

注意yarn是不是successfully.的状态

 

Yarn上启动了应用application_1551789318445_0009

点击AM进去jobManager的web ui界面

 

Checkpoint的UI

可以看到每隔10s进行一次checkpoint

Hdfs上查看checkpoint数据,看到保存了最近10次的checkpoint数据

 

95d75e802ba1eceefeaf98636e907883跟job ID是对应的

 

说明flink配置文件conf/flink-conf.yaml里的配置生效了

flink可以保存多个checkpoint,添加如下配置,指定最多需要保存Checkpoint的个数

state.checkpoints.num-retained: 10

If nothing else is configured, the system will use the MemoryStateBackend.

https://www.jianshu.com/p/3cd2ab1dd311

https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL Java API
flink问题之state过期设置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
369 0
|
2月前
|
SQL 消息中间件 分布式数据库
Flink问题之State 0点清除如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 分布式数据库 Apache
Flink问题之实现state定时输出如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
存储 消息中间件 Kafka
Flink state best practice| 学习笔记
快速学习 Flink state best practice。
159 0
Flink state best practice| 学习笔记
|
测试技术 API 数据库
Flink 通过 State Processor API 实现状态的读取和写入
在 1.9 版本之前,Flink 运行时的状态对于用户来说是一个黑盒,我们是无法访问状态数据的,从 Flink-1.9 版本开始,官方提供了 State Processor API 这让用户读取和更新状态成为了可能,我们可以通过 State Processor API 很方便的查看任务的状态,还可以在任务第一次启动的时候基于历史数据做状态冷启动。从此状态对于用户来说是透明的。下面就来看一下 State Processor API 的使用。
Flink 通过 State Processor API 实现状态的读取和写入
|
存储 消息中间件 缓存
学习flink的state
Apache Flink® — Stateful Computations over Data Streams,数据流上的状态计算。可以看出flink默认它是一个默认就有状态的分析引擎,State一般指一个具体的 Task/Operator 的状态,State数据默认保存在 Java 的堆内存中。 假设一个 Task 在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义(At -least-once 和 Exactly-once)上来说,Flink引入了State 和 CheckPoint。
364 2
学习flink的state
|
存储 搜索推荐 算法
Flink / Scala - DataStream Broadcast State 模式示例详解
一.引言 上一篇文章Flink / Scala - DataSet 应用 Broadcast Variables介绍了 DataSet 场景下 Broadcast 的使用,这一边
496 0
Flink / Scala - DataStream Broadcast State 模式示例详解
|
Linux 流计算
Flink - 本地执行 Failed to start the Queryable State Data Server
Flink 本地执行任务报错 Failed to start the Queryable State Data Server 以及 Unable to start Queryable State Server. All ports in provided range are occupied. 根据报错分析是因为本地端口被占用,没有足够端口供 Flink 本地客户端启动,所以解决方法就是处理被占用的端口。...
126 0
Flink - 本地执行 Failed to start the Queryable State Data Server
|
存储 缓存 监控
Flink State - Backend Improvements and Evolution in 2021
李钰 (绝顶)、唐云 (茶干) 在 FFA 2021 核心技术专场的分享
Flink State - Backend Improvements and Evolution in 2021