功能介绍
Flink1.9 新添加的功能,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据
可以使用的场景
- 异步校验或者查看某个阶段的状态,一般而言,flink作业的最终结果都会持久化输出,但在面临问题的时候,如何确定哪一级出现问题,state processor api也提供了一种可能,去检验state中的数据是否与预期的一致。
- 脏数据订正,比如有一条脏数据污染了State,就可以用State Processor API对于状态进行修复和订正。
- 状态迁移,当用户修改了作业逻辑,还想要复用原来作业中大部分的State,或者想要升级这个State的结构就可以用这个API来完成相应的工作。
- 解决作业冷启动问题,这样就不用从N天前开始重跑整个数据。
一些限制点
- window state暂时修改不了
- 每个有状态的算子都必须手动指定uid
- 无法通过读取savepoint 直接获取到metadata 信息(existing operator ids)
关联的知识点
State 分为: 1: Operator States 2: Keyed States
在读取state的时候需要根据对应的类型选择不同的读取方式
Operator States | Keyed States |
---|---|
readListState | readKeyedState |
readUnionState | |
readBroadcastState |
基于batch 热加载数据生成Savepoint 和 Savepoint state 修改
最后会给出对应的两个demo。
基本流程两者比较类似
-
基于batch 热加载数据
1: batch读取数据 --> Dataset (比如读取文本文件) 2: 编写业务逻辑处理数据 --> 获取转换后的DataSet(处理文本生成一个Tuple2<key, num> 3: 将数据结果转换为state --> KeyedStateBootstrapFunction 4: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)
- Savepoint state 修改
1: 调用Savepoint.load 加载当前已经存在的Savepoint(注意StateBackend 必须和之前生成的任务一致) 2: 调用 savepoint.readKeyedState 读取获取到的ExistingSavepoint,结果是一个DataSet数据集 3:编写Batch 业务逻辑调整生成的DataSet(比如删除某个元素),其结果还算一个DataSet 4: 自定义 KeyedStateBootstrapFunction 将数据结果转换为state 5: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)
基于batch 重新构建stream样例
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//获取外部离线数据源
DataSource<String> textSource = env.readTextFile("D:\\sources\\data.txt");
DataSet<Tuple2<String, Integer>> sourceDataSet = textSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] strArr = value.split(",");
for (String str : strArr) {
Tuple2<String, Integer> worldTuple = new Tuple2<>(str, 1);
out.collect(worldTuple);
}
}
});
//计算出需要的历史状态
DataSet<ReadAndModifyState.KeyedValueState> dataSet = sourceDataSet
.groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, ReadAndModifyState.KeyedValueState>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<ReadAndModifyState.KeyedValueState> out) throws Exception {
Iterator iterator = values.iterator();
Long countNum = 0L;
String worldkey = null;
while(iterator.hasNext()){
Tuple2<String, Integer> info = (Tuple2<String, Integer>) iterator.next();
if(worldkey == null){
worldkey = info.f0;
}
countNum++;
}
ReadAndModifyState.KeyedValueState keyedValueState = new ReadAndModifyState.KeyedValueState();
keyedValueState.key = new Tuple1<>(worldkey);
keyedValueState.countNum = countNum;
out.collect(keyedValueState);
}
});
//将历史状态转换为state 并转换为savepoint 写入hdfs上
BootstrapTransformation<ReadAndModifyState.KeyedValueState> transformation = OperatorTransformation
.bootstrapWith(dataSet)
.keyBy(new KeySelector<ReadAndModifyState.KeyedValueState, Tuple1<String>>() {
@Override
public Tuple1<String> getKey(ReadAndModifyState.KeyedValueState value) throws Exception {
return value.key;
}
})
.transform(new ReadAndModifyState.KeyedValueStateBootstrapper());
String uid = "keyby_summarize";
String savePointPath = "hdfs://ns1/user/xc/savepoint-from-batch";
StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");
Savepoint.create(rocksDBBackEnd, 128)
.withOperator(uid, transformation)
.write(savePointPath);
env.execute("batch build save point");
System.out.println("-------end------------");
}
读取和修改样例
public static void main(String[] args) throws Exception {
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
String savePointPath = "hdfs://ns1/user/xc/savepoint-61b8e1-bbee958b3087";
StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");
ExistingSavepoint savepoint = Savepoint.load(bEnv, savePointPath, rocksDBBackEnd);
//读取
String uid = "keyby_summarize";
DataSet<KeyedValueState> keyState = savepoint.readKeyedState(uid, new StateReaderFunc());
//修改
DataSet<KeyedValueState> dataSet = keyState.flatMap((FlatMapFunction<KeyedValueState, KeyedValueState>) (value, out) -> {
value.countNum = value.countNum * 2;
out.collect(value);
}).returns(KeyedValueState.class);
BootstrapTransformation<KeyedValueState> transformation = OperatorTransformation
.bootstrapWith(dataSet)
//注意keyby操作的key一定要和原来的相同
.keyBy(new KeySelector<KeyedValueState, Tuple1<String>>() {
@Override
public Tuple1<String> getKey(KeyedValueState value) throws Exception {
return value.key;
}
})
.transform(new KeyedValueStateBootstrapper());
Savepoint.create(rocksDBBackEnd, 128)
.withOperator(uid, transformation)
.write("hdfs://ns1/user/xc/savepoint-after-modify3");
bEnv.execute("read the list state");
System.out.println("-----end------------");
}
public static class StateReaderFunc extends KeyedStateReaderFunction<Tuple1<String>, KeyedValueState> {
private static final long serialVersionUID = -3616180524951046897L;
private transient ValueState<Long> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class);
state = getRuntimeContext().getState(currentCountDescriptor);
}
@Override
public void readKey(Tuple1<String> key, Context ctx, Collector<KeyedValueState> out) throws Exception {
System.out.println(key.f0 +":" + state.value());
KeyedValueState keyedValueState = new KeyedValueState();
keyedValueState.key = new Tuple1<>(key.f0);
keyedValueState.countNum = state.value();
out.collect(keyedValueState);
}
}
public static class KeyedValueState {
Tuple1<String> key;
Long countNum;
}
private static class KeyedValueStateBootstrapper extends KeyedStateBootstrapFunction<Tuple1<String>, KeyedValueState>{
private static final long serialVersionUID = 1893716139133502118L;
private ValueState<Long> currentCount = null;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class, 0L);
currentCount = getRuntimeContext().getState(currentCountDescriptor);
}
@Override
public void processElement(KeyedValueState value, Context ctx) throws Exception {
currentCount.update(value.countNum);
}
}