(1)Queryable State架构
在展示如何使用 Queryable State 之前,先简单描述一下该特性的组成部分,主要包括以下三部分:
QueryableStateClient,默认运行在 Flink 集群外部,负责提交用户的查询请求;
QueryableStateClientProxy,运行在每个 TaskManager 上(即 Flink 集群内部),负责接收客户端的查询请求,从所负责的 Task Manager 获取请求的 state,并返回给客户端;
QueryableStateServer, 运行在 TaskManager 上,负责服务本地存储的 state。
(2)Queryable State集群配置
在/opt/modules/flink/conf/flink-conf.yaml文件中
将参数 queryable-state.enable 设置为 true
queryable-state.enable: true queryable-state.proxy.ports: 9069
将 flink-queryable-state-runtime_2.11-1.12.0.jar 从 Flink的 opt/ 目录拷贝到 lib/ 目录
(3)将 state 设置为可查询
激活集群的 queryable state 功能后,还要将 state 设置为可查询的才能对外可见,可以通过以下两种方式进行设置:
创建 QueryableStateStream,它会作为一个 sink,并将输入数据转化为 queryable state;
通过 stateDescriptor.setQueryable(String queryableStateName) 将 state 描述符所表示的 keyed state 设置成可查询的。
接下来的部分将详细解释这两种方式。
(3.1)Queryable State Stream
在 KeyedStream 上调用 .asQueryableState(stateName, stateDescriptor) 将会返回一个 QueryableStateStream, 它会将流数据转化为 queryable state。 对应不同的 state 类型,asQueryableState() 有以下一些方法变体:
// ValueState QueryableStateStream asQueryableState( String queryableStateName, ValueStateDescriptor stateDescriptor) // Shortcut for explicit ValueStateDescriptor variant QueryableStateStream asQueryableState(String queryableStateName) // ReducingState QueryableStateStream asQueryableState( String queryableStateName, ReducingStateDescriptor stateDescriptor)
注意: 没有可查询的 ListState sink,因为这种情况下 list 会不断增长,并且可能不会被清理,最终会消耗大量的内存。
返回的 QueryableStateStream 可以被视作一个sink,而且不能再被进一步转换。在内部实现上,一个 QueryableStateStream 被转换成一个 operator,使用输入的数据来更新 queryable state。state 如何更新是由 asQueryableState 提供的 StateDescriptor 来决定的。在下面的代码中, keyed stream 的所有数据将会通过 ValueState.update(value) 来更新状态:
stream.keyBy(value -> value.f0).asQueryableState("query-name")
(3.2)Managed Keyed State
operator 中的 Managed keyed state 可以通过 StateDescriptor.setQueryable(String queryableStateName) 将 state descriptor 设置成可查询的,从而使 state 可查询,如下所示:
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information descriptor.setQueryable("query-name"); // queryable state name
注意: 参数 queryableStateName 可以任意选取,并且只被用来进行查询,它可以和 state 的名称不同。
这种方式不会限制 state 类型,即任意的 ValueState、ReduceState、ListState、MapState、AggregatingState 以及已弃用的 FoldingState 均可作为 queryable state。
(4)测试查询 state
package com.aikfk.flink.datastream.state; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/31 4:04 下午 */ public class QueryState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource()) .map(new MapFunction<String, Tuple2<String,Long>>() { @Override public Tuple2<String, Long> map(String line) throws Exception { String[] words = line.split(","); return new Tuple2<>(words[0],Long.parseLong(words[1])); } }) .keyBy(value -> value.f0) .flatMap(new CountFunction()); dataStream.print(); env.execute("KeyedState"); } static class CountFunction extends RichFlatMapFunction<Tuple2<String,Long>,Tuple2<String,Long>>{ // 定义状态ValueState private ValueState<Tuple2<String,Long>> keyCount; /** * 初始化 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<String,Long>> descriptor = new ValueStateDescriptor<Tuple2<String, Long>>("keycount", TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {})); descriptor.setQueryable("query-name"); keyCount = getRuntimeContext().getState(descriptor); } @Override public void flatMap(Tuple2<String, Long> input, Collector<Tuple2<String, Long>> collector) throws Exception { // 使用状态 Tuple2<String, Long> currentValue = (keyCount.value() == null) ? new Tuple2<>("", 0L) : keyCount.value(); // 累加数据 currentValue.f0 = input.f0; currentValue.f1 ++; // 更新状态 keyCount.update(currentValue); collector.collect(keyCount.value()); } } public static class MySource implements SourceFunction<String> { @Override public void cancel() { } @Override public void run(SourceContext<String> ctx) throws Exception { String data = "a,4"; while (true) { Thread.sleep(1000); ctx.collect(data); } } } }
将上面程序打包上传到集群运行:
测试QueryStateClient客户端查询
package com.aikfk.flink.datastream.state; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.QueryableStateClient; import java.util.concurrent.CompletableFuture; /** * @author :caizhengjie * @description:TODO * @date :2021/4/5 1:53 下午 */ public class QueryStateClient { public static void main(String[] args) throws Exception { QueryableStateClient client = new QueryableStateClient("bigdata-pro-m07",9069); ValueStateDescriptor<Tuple2<String,Long>> descriptor = new ValueStateDescriptor<Tuple2<String, Long>> ("keycount", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() { })); CompletableFuture<ValueState<Tuple2<String, Long>>> resultFuture = client.getKvState(JobID.fromHexString("bea88812593e6af84cbf26affa630d5e"), "query-name", "a", BasicTypeInfo.STRING_TYPE_INFO, descriptor); System.out.println(resultFuture.get().value()); } }
测试QueryStateClient客户端查询
package com.aikfk.flink.datastream.state; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.QueryableStateClient; import java.util.concurrent.CompletableFuture; /** * @author :caizhengjie * @description:TODO * @date :2021/4/5 1:53 下午 */ public class QueryStateClient { public static void main(String[] args) throws Exception { QueryableStateClient client = new QueryableStateClient("bigdata-pro-m07",9069); ValueStateDescriptor<Tuple2<String,Long>> descriptor = new ValueStateDescriptor<Tuple2<String, Long>> ("keycount", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() { })); CompletableFuture<ValueState<Tuple2<String, Long>>> resultFuture = client.getKvState(JobID.fromHexString("bea88812593e6af84cbf26affa630d5e"), "query-name", "a", BasicTypeInfo.STRING_TYPE_INFO, descriptor); System.out.println(resultFuture.get().value()); } }
(a,2874)