flink1.18 SqlGateway 的使用和原理分析

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: # 了解flink1.18 sqlGateway 的安装和使用步骤# 启动sqlgateway 流程,了解核心的结构# sql提交流程,了解sql 的流转逻辑# select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑

一:安装 sqlgateway 环境

1.1. on session

 1.1 先启动session

设置环境变量:HADOOP_CLASSPATH

export HADOOP_CLASSPATH=`hadoop classpath`

./bin/yarn-session.sh -d -s 2 -jm 2048 -tm 2048

1.2.on perjob


启动SqlGateway:

./bin/sql-gateway.sh start  -Dsql-gateway.endpoint.rest.address=localhost

默认绑定端口8083


二:调用

1.beeline 调用

1.1 需要下载flink-sql-jdbc-driver-bundle-1.18.0.jar 并放到

{HIVE_HOME}/lib下

2.jdbc  客户端

使用./bin/beeline

!connect jdbc:flink://localhost:8083

链接到已经启动的flink sql gateway


三:流程分析,了解和熟悉

1.启动sqlgateway 流程,了解核心的结构

2.sql提交流程,了解sql 的流转逻辑

3.select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑


1.启动sqlgateway 流程

image.png

(1)关注SessionManager,维护了一个session的map

private final Map<SessionHandle, Session> sessions;

在调用openSesson() 会构建一个基于uuid 分配sessionId 的SessionContext;

后续的操作都是基于这个sessionId 获取到对应的SessionContext进行操作;

 (2) 通过beenline 提交 sql

eg: select * from order;

 经过handler 之后的处理流程

image.png


基于TableEnvrionment   获取parser 解析sql ,生成List<Operation>, 然后根据获取到的operator 类型进行处理,不同的operation 有对应的处理逻辑;

我们接下来分析下QueryOperation 的处理逻辑

1)sql 如何被改写的(eg: select * from table)

     -- PlannerBase.translate

     -- PlannerBase.translateTorel

         当匹配到 CollectModifyOperation,构建DynamicSinkUtils.convertCollectToRel

         convertCollectToRel 根据input的consumerDataType 构建CollectDynamicSink ,并拼接到collectionModifyOperation 上;

         关键代码如下:

final CollectDynamicSink tableSink =
        new CollectDynamicSink(
                contextResolvedTable.getIdentifier(),
                consumedDataType,
                configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE),
                configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT),
                classLoader,
                zoneId,
                configuration
                        .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)
                        .isEnabled());
collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
collectModifyOperation.setConsumedDataType(consumedDataType);
return convertSinkToRel(
        relBuilder,
        input,
        Collections.emptyMap(), // dynamicOptions
        contextResolvedTable,
        Collections.emptyMap(), // staticPartitions
        null, // targetColumns
        false,
        tableSink);

2)sql 如何提交到session 上?

    -- StreamExecutionEnvirionment 基于Configuration(execution.target) 构建PipelineExecutor

    -- 当前sqlgateway 的Configuration 的参数(execution.target=yarn-session),所以生成YarnSessionClusterExecutor

    -- YarnSessionClusterExecutor.execute(streamGraph, configuration, userClassLoader) 进行任务的提交和执行

       org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor#execute

3)  client 如何获取到session 执行的 Result.

     -- sql 任务提交完成之后,构建TableResultImpl

     -- TableResultImpl 的参数包括之前构建的resultProvider, resultProvider 关联了jobClient,用于后续进行fetcher 操作

ResultProvider resultProvider = sinkOperation.getSelectResultProvider();
resultProvider.setJobClient(jobClient);
return TableResultImpl.builder()
        .jobClient(jobClient)
        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
        .schema(operation.getResolvedSchema())
        .resultProvider(resultProvider)
        .setPrintStyle(
                PrintStyle.tableauWithTypeInferredColumnWidths(
                        // sinkOperation.getConsumedDataType() handles legacy types
                        DataTypeUtils.expandCompositeTypeToSchema(
                                sinkOperation.getConsumedDataType()),
                        resultProvider.getRowDataStringConverter(),
                        getConfig().get(TableConfigOptions.DISPLAY_MAX_COLUMN_WIDTH),
                        false,
                        isStreamingMode))
        .build();

       -- fetcher 操作

       * rest 请求

         /v2/sessions/3aa182db-cb6b-431a-86f5-4432b36776aa/operations/e822ef45-e5a3-460a-b636-9c03318763bc/result/0

       * 调用FetchResultsHandler

      *  基于request uri 获取到sessionHander. operationHandle, 也就是url 里的uuid;

      *  调用ResultFetcher 对结果集进行获取;

          -- ResultStore 对结果进行缓存和迭代获取;

          -- 封装了CollectDynamicSink 的iterator。用于迭代获取数据,其中iterator 通过封装CollectResultFetcher 来获取远程数据结果并缓存到buffer;

         -- CollectResultFetcher  属性包括JobClient, CoordinationRequestGateway;

              1) 如果任务已经结束,通过accumulator 获取指定的accumulatorName的数据;

              2)

                   2.1 如果任务未结束,则通过向CoordinationRequestGateway 发送请求来获取;

CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
return (CollectCoordinationResponse)
        gateway.sendCoordinationRequest(operatorId, request).get();

                  2.2 服务端收到请求处理CollectSink request

                       org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator#handleCoordinationRequest

image.png

           -- org.apache.flink.table.gateway.service.result.ResultStore#retrieveRecords 获取缓存在buffer 中的数据rowdata;





相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
360 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
5月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
178 11
|
12月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
217 5
|
10月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
389 16
|
11月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
986 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
11月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
12月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
215 1
|
12月前
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
106 0
|
12月前
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
228 0
|
12月前
|
存储 SQL 分布式计算
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
153 0

热门文章

最新文章