Flink之Queryable State的使用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Queryable State架构


在展示如何使用 Queryable State 之前,先简单描述一下该特性的组成部分,主要包括以下三部分:


QueryableStateClient,默认运行在 Flink 集群外部,负责提交用户的查询请求;

QueryableStateClientProxy,运行在每个 TaskManager 上(即 Flink 集群内部),负责接收客户端的查询请求,从所负责的 Task Manager 获取请求的 state,并返回给客户端;

QueryableStateServer, 运行在 TaskManager 上,负责服务本地存储的 state。


(2)Queryable State集群配置


在/opt/modules/flink/conf/flink-conf.yaml文件中

将参数 queryable-state.enable 设置为 true

queryable-state.enable: true
queryable-state.proxy.ports: 9069

将 flink-queryable-state-runtime_2.11-1.12.0.jar 从 Flink的 opt/ 目录拷贝到 lib/ 目录


(3)将 state 设置为可查询


激活集群的 queryable state 功能后,还要将 state 设置为可查询的才能对外可见,可以通过以下两种方式进行设置:


创建 QueryableStateStream,它会作为一个 sink,并将输入数据转化为 queryable state;

通过 stateDescriptor.setQueryable(String queryableStateName) 将 state 描述符所表示的 keyed state 设置成可查询的。

接下来的部分将详细解释这两种方式。


(3.1)Queryable State Stream

在 KeyedStream 上调用 .asQueryableState(stateName, stateDescriptor) 将会返回一个 QueryableStateStream, 它会将流数据转化为 queryable state。 对应不同的 state 类型,asQueryableState() 有以下一些方法变体:

// ValueState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// ReducingState
QueryableStateStream asQueryableState(
    String queryableStateName,
    ReducingStateDescriptor stateDescriptor)

注意: 没有可查询的 ListState sink,因为这种情况下 list 会不断增长,并且可能不会被清理,最终会消耗大量的内存。


返回的 QueryableStateStream 可以被视作一个sink,而且不能再被进一步转换。在内部实现上,一个 QueryableStateStream 被转换成一个 operator,使用输入的数据来更新 queryable state。state 如何更新是由 asQueryableState 提供的 StateDescriptor 来决定的。在下面的代码中, keyed stream 的所有数据将会通过 ValueState.update(value) 来更新状态:

stream.keyBy(value -> value.f0).asQueryableState("query-name")

(3.2)Managed Keyed State

operator 中的 Managed keyed state 可以通过 StateDescriptor.setQueryable(String queryableStateName) 将 state descriptor 设置成可查询的,从而使 state 可查询,如下所示:

ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
                "average", // the state name
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name"); // queryable state name

注意: 参数 queryableStateName 可以任意选取,并且只被用来进行查询,它可以和 state 的名称不同。


这种方式不会限制 state 类型,即任意的 ValueState、ReduceState、ListState、MapState、AggregatingState 以及已弃用的 FoldingState 均可作为 queryable state。


(4)测试查询 state


package com.aikfk.flink.datastream.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/31 4:04 下午
 */
public class QueryState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource())
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String line) throws Exception {
                        String[] words = line.split(",");
                        return new Tuple2<>(words[0],Long.parseLong(words[1]));
                    }
                })
                .keyBy(value -> value.f0)
                .flatMap(new CountFunction());
        dataStream.print();
        env.execute("KeyedState");
    }
    static class CountFunction extends RichFlatMapFunction<Tuple2<String,Long>,Tuple2<String,Long>>{
        // 定义状态ValueState
        private ValueState<Tuple2<String,Long>> keyCount;
        /**
         * 初始化
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Tuple2<String,Long>> descriptor =
                    new ValueStateDescriptor<Tuple2<String, Long>>("keycount",
                            TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {}));
            descriptor.setQueryable("query-name");
            keyCount = getRuntimeContext().getState(descriptor);
        }
        @Override
        public void flatMap(Tuple2<String, Long> input,
                            Collector<Tuple2<String, Long>> collector) throws Exception {
            // 使用状态
            Tuple2<String, Long> currentValue =
                    (keyCount.value() == null) ? new Tuple2<>("", 0L) : keyCount.value();
            // 累加数据
            currentValue.f0 = input.f0;
            currentValue.f1 ++;
            // 更新状态
            keyCount.update(currentValue);
            collector.collect(keyCount.value());
        }
    }
    public static class MySource implements SourceFunction<String> {
        @Override
        public void cancel() {
        }
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            String data = "a,4";
            while (true) {
                Thread.sleep(1000);
                ctx.collect(data);
            }
        }
    }
}

将上面程序打包上传到集群运行:

60.png61.png


测试QueryStateClient客户端查询

package com.aikfk.flink.datastream.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import java.util.concurrent.CompletableFuture;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 1:53 下午
 */
public class QueryStateClient {
    public static void main(String[] args) throws Exception {
        QueryableStateClient client =
                new QueryableStateClient("bigdata-pro-m07",9069);
        ValueStateDescriptor<Tuple2<String,Long>> descriptor =
                new ValueStateDescriptor<Tuple2<String, Long>>
                        ("keycount",
                                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                                }));
        CompletableFuture<ValueState<Tuple2<String, Long>>> resultFuture =
                client.getKvState(JobID.fromHexString("bea88812593e6af84cbf26affa630d5e"),
                        "query-name",
                        "a",
                        BasicTypeInfo.STRING_TYPE_INFO,
                        descriptor);
        System.out.println(resultFuture.get().value());
    }
}

测试QueryStateClient客户端查询

package com.aikfk.flink.datastream.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import java.util.concurrent.CompletableFuture;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 1:53 下午
 */
public class QueryStateClient {
    public static void main(String[] args) throws Exception {
        QueryableStateClient client =
                new QueryableStateClient("bigdata-pro-m07",9069);
        ValueStateDescriptor<Tuple2<String,Long>> descriptor =
                new ValueStateDescriptor<Tuple2<String, Long>>
                        ("keycount",
                                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                                }));
        CompletableFuture<ValueState<Tuple2<String, Long>>> resultFuture =
                client.getKvState(JobID.fromHexString("bea88812593e6af84cbf26affa630d5e"),
                        "query-name",
                        "a",
                        BasicTypeInfo.STRING_TYPE_INFO,
                        descriptor);
        System.out.println(resultFuture.get().value());
    }
}
(a,2874)
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8月前
|
SQL Java API
flink问题之state过期设置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
443 0
|
8月前
|
SQL 消息中间件 分布式数据库
Flink问题之State 0点清除如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
143 0
|
3月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
88 5
|
3月前
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
26 0
|
3月前
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
79 0
|
3月前
|
存储 SQL 分布式计算
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
54 0
|
5月前
|
消息中间件 应用服务中间件 API
Flink四大基石——3.State
Flink四大基石——3.State
72 1
|
5月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
56 1
|
8月前
|
SQL 分布式数据库 Apache
Flink问题之实现state定时输出如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
8月前
|
存储 消息中间件 资源调度
Flink state 详解
Flink state 详解
76 0