flink1.18 SqlGateway 的使用和原理分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: # 了解flink1.18 sqlGateway 的安装和使用步骤# 启动sqlgateway 流程,了解核心的结构# sql提交流程,了解sql 的流转逻辑# select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑

一:安装 sqlgateway 环境

1.1. on session

 1.1 先启动session

设置环境变量:HADOOP_CLASSPATH

export HADOOP_CLASSPATH=`hadoop classpath`

./bin/yarn-session.sh -d -s 2 -jm 2048 -tm 2048

1.2.on perjob


启动SqlGateway:

./bin/sql-gateway.sh start  -Dsql-gateway.endpoint.rest.address=localhost

默认绑定端口8083


二:调用

1.beeline 调用

1.1 需要下载flink-sql-jdbc-driver-bundle-1.18.0.jar 并放到

{HIVE_HOME}/lib下

2.jdbc  客户端

使用./bin/beeline

!connect jdbc:flink://localhost:8083

链接到已经启动的flink sql gateway


三:流程分析,了解和熟悉

1.启动sqlgateway 流程,了解核心的结构

2.sql提交流程,了解sql 的流转逻辑

3.select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑


1.启动sqlgateway 流程

image.png

(1)关注SessionManager,维护了一个session的map

private final Map<SessionHandle, Session> sessions;

在调用openSesson() 会构建一个基于uuid 分配sessionId 的SessionContext;

后续的操作都是基于这个sessionId 获取到对应的SessionContext进行操作;

 (2) 通过beenline 提交 sql

eg: select * from order;

 经过handler 之后的处理流程

image.png


基于TableEnvrionment   获取parser 解析sql ,生成List<Operation>, 然后根据获取到的operator 类型进行处理,不同的operation 有对应的处理逻辑;

我们接下来分析下QueryOperation 的处理逻辑

1)sql 如何被改写的(eg: select * from table)

     -- PlannerBase.translate

     -- PlannerBase.translateTorel

         当匹配到 CollectModifyOperation,构建DynamicSinkUtils.convertCollectToRel

         convertCollectToRel 根据input的consumerDataType 构建CollectDynamicSink ,并拼接到collectionModifyOperation 上;

         关键代码如下:

final CollectDynamicSink tableSink =
        new CollectDynamicSink(
                contextResolvedTable.getIdentifier(),
                consumedDataType,
                configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE),
                configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT),
                classLoader,
                zoneId,
                configuration
                        .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)
                        .isEnabled());
collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
collectModifyOperation.setConsumedDataType(consumedDataType);
return convertSinkToRel(
        relBuilder,
        input,
        Collections.emptyMap(), // dynamicOptions
        contextResolvedTable,
        Collections.emptyMap(), // staticPartitions
        null, // targetColumns
        false,
        tableSink);

2)sql 如何提交到session 上?

    -- StreamExecutionEnvirionment 基于Configuration(execution.target) 构建PipelineExecutor

    -- 当前sqlgateway 的Configuration 的参数(execution.target=yarn-session),所以生成YarnSessionClusterExecutor

    -- YarnSessionClusterExecutor.execute(streamGraph, configuration, userClassLoader) 进行任务的提交和执行

       org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor#execute

3)  client 如何获取到session 执行的 Result.

     -- sql 任务提交完成之后,构建TableResultImpl

     -- TableResultImpl 的参数包括之前构建的resultProvider, resultProvider 关联了jobClient,用于后续进行fetcher 操作

ResultProvider resultProvider = sinkOperation.getSelectResultProvider();
resultProvider.setJobClient(jobClient);
return TableResultImpl.builder()
        .jobClient(jobClient)
        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
        .schema(operation.getResolvedSchema())
        .resultProvider(resultProvider)
        .setPrintStyle(
                PrintStyle.tableauWithTypeInferredColumnWidths(
                        // sinkOperation.getConsumedDataType() handles legacy types
                        DataTypeUtils.expandCompositeTypeToSchema(
                                sinkOperation.getConsumedDataType()),
                        resultProvider.getRowDataStringConverter(),
                        getConfig().get(TableConfigOptions.DISPLAY_MAX_COLUMN_WIDTH),
                        false,
                        isStreamingMode))
        .build();

       -- fetcher 操作

       * rest 请求

         /v2/sessions/3aa182db-cb6b-431a-86f5-4432b36776aa/operations/e822ef45-e5a3-460a-b636-9c03318763bc/result/0

       * 调用FetchResultsHandler

      *  基于request uri 获取到sessionHander. operationHandle, 也就是url 里的uuid;

      *  调用ResultFetcher 对结果集进行获取;

          -- ResultStore 对结果进行缓存和迭代获取;

          -- 封装了CollectDynamicSink 的iterator。用于迭代获取数据,其中iterator 通过封装CollectResultFetcher 来获取远程数据结果并缓存到buffer;

         -- CollectResultFetcher  属性包括JobClient, CoordinationRequestGateway;

              1) 如果任务已经结束,通过accumulator 获取指定的accumulatorName的数据;

              2)

                   2.1 如果任务未结束,则通过向CoordinationRequestGateway 发送请求来获取;

CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
return (CollectCoordinationResponse)
        gateway.sendCoordinationRequest(operatorId, request).get();

                  2.2 服务端收到请求处理CollectSink request

                       org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator#handleCoordinationRequest

image.png

           -- org.apache.flink.table.gateway.service.result.ResultStore#retrieveRecords 获取缓存在buffer 中的数据rowdata;





相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
57 5
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
181 2
|
21天前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
40 1
|
21天前
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
14 0
|
21天前
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
48 0
|
21天前
|
存储 SQL 分布式计算
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
37 0
|
21天前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
86 0
|
25天前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
46 0
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
58 1
|
3月前
|
流计算
Flink执行原理
Flink执行原理
35 0