flink1.18 SqlGateway 的使用和原理分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: # 了解flink1.18 sqlGateway 的安装和使用步骤# 启动sqlgateway 流程,了解核心的结构# sql提交流程,了解sql 的流转逻辑# select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑

一:安装 sqlgateway 环境

1.1. on session

 1.1 先启动session

设置环境变量:HADOOP_CLASSPATH

export HADOOP_CLASSPATH=`hadoop classpath`

./bin/yarn-session.sh -d -s 2 -jm 2048 -tm 2048

1.2.on perjob


启动SqlGateway:

./bin/sql-gateway.sh start  -Dsql-gateway.endpoint.rest.address=localhost

默认绑定端口8083


二:调用

1.beeline 调用

1.1 需要下载flink-sql-jdbc-driver-bundle-1.18.0.jar 并放到

{HIVE_HOME}/lib下

2.jdbc  客户端

使用./bin/beeline

!connect jdbc:flink://localhost:8083

链接到已经启动的flink sql gateway


三:流程分析,了解和熟悉

1.启动sqlgateway 流程,了解核心的结构

2.sql提交流程,了解sql 的流转逻辑

3.select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑


1.启动sqlgateway 流程

image.png

(1)关注SessionManager,维护了一个session的map

private final Map<SessionHandle, Session> sessions;

在调用openSesson() 会构建一个基于uuid 分配sessionId 的SessionContext;

后续的操作都是基于这个sessionId 获取到对应的SessionContext进行操作;

 (2) 通过beenline 提交 sql

eg: select * from order;

 经过handler 之后的处理流程

image.png


基于TableEnvrionment   获取parser 解析sql ,生成List<Operation>, 然后根据获取到的operator 类型进行处理,不同的operation 有对应的处理逻辑;

我们接下来分析下QueryOperation 的处理逻辑

1)sql 如何被改写的(eg: select * from table)

     -- PlannerBase.translate

     -- PlannerBase.translateTorel

         当匹配到 CollectModifyOperation,构建DynamicSinkUtils.convertCollectToRel

         convertCollectToRel 根据input的consumerDataType 构建CollectDynamicSink ,并拼接到collectionModifyOperation 上;

         关键代码如下:

final CollectDynamicSink tableSink =
        new CollectDynamicSink(
                contextResolvedTable.getIdentifier(),
                consumedDataType,
                configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE),
                configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT),
                classLoader,
                zoneId,
                configuration
                        .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)
                        .isEnabled());
collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
collectModifyOperation.setConsumedDataType(consumedDataType);
return convertSinkToRel(
        relBuilder,
        input,
        Collections.emptyMap(), // dynamicOptions
        contextResolvedTable,
        Collections.emptyMap(), // staticPartitions
        null, // targetColumns
        false,
        tableSink);

2)sql 如何提交到session 上?

    -- StreamExecutionEnvirionment 基于Configuration(execution.target) 构建PipelineExecutor

    -- 当前sqlgateway 的Configuration 的参数(execution.target=yarn-session),所以生成YarnSessionClusterExecutor

    -- YarnSessionClusterExecutor.execute(streamGraph, configuration, userClassLoader) 进行任务的提交和执行

       org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor#execute

3)  client 如何获取到session 执行的 Result.

     -- sql 任务提交完成之后,构建TableResultImpl

     -- TableResultImpl 的参数包括之前构建的resultProvider, resultProvider 关联了jobClient,用于后续进行fetcher 操作

ResultProvider resultProvider = sinkOperation.getSelectResultProvider();
resultProvider.setJobClient(jobClient);
return TableResultImpl.builder()
        .jobClient(jobClient)
        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
        .schema(operation.getResolvedSchema())
        .resultProvider(resultProvider)
        .setPrintStyle(
                PrintStyle.tableauWithTypeInferredColumnWidths(
                        // sinkOperation.getConsumedDataType() handles legacy types
                        DataTypeUtils.expandCompositeTypeToSchema(
                                sinkOperation.getConsumedDataType()),
                        resultProvider.getRowDataStringConverter(),
                        getConfig().get(TableConfigOptions.DISPLAY_MAX_COLUMN_WIDTH),
                        false,
                        isStreamingMode))
        .build();

       -- fetcher 操作

       * rest 请求

         /v2/sessions/3aa182db-cb6b-431a-86f5-4432b36776aa/operations/e822ef45-e5a3-460a-b636-9c03318763bc/result/0

       * 调用FetchResultsHandler

      *  基于request uri 获取到sessionHander. operationHandle, 也就是url 里的uuid;

      *  调用ResultFetcher 对结果集进行获取;

          -- ResultStore 对结果进行缓存和迭代获取;

          -- 封装了CollectDynamicSink 的iterator。用于迭代获取数据,其中iterator 通过封装CollectResultFetcher 来获取远程数据结果并缓存到buffer;

         -- CollectResultFetcher  属性包括JobClient, CoordinationRequestGateway;

              1) 如果任务已经结束,通过accumulator 获取指定的accumulatorName的数据;

              2)

                   2.1 如果任务未结束,则通过向CoordinationRequestGateway 发送请求来获取;

CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
return (CollectCoordinationResponse)
        gateway.sendCoordinationRequest(operatorId, request).get();

                  2.2 服务端收到请求处理CollectSink request

                       org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator#handleCoordinationRequest

image.png

           -- org.apache.flink.table.gateway.service.result.ResultStore#retrieveRecords 获取缓存在buffer 中的数据rowdata;





相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
3月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
159 3
|
11天前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
26 1
|
2月前
|
消息中间件 分布式计算 Kafka
深度分析:Apache Flink及其在大数据处理中的应用
Apache Flink是低延迟、高吞吐量的流处理框架,以其状态管理和事件时间处理能力脱颖而出。与Apache Spark Streaming相比,Flink在实时性上更强,但Spark生态系统更丰富。Apache Storm在低延迟上有优势,而Kafka Streams适合轻量级流处理。选型考虑延迟、状态管理、生态系统和运维成本。Flink适用于实时数据分析、复杂事件处理等场景,使用时注意资源配置、状态管理和窗口操作的优化。
|
3月前
|
运维 监控 Java
面经:Storm实时计算框架原理与应用场景
【4月更文挑战第11天】本文是关于Apache Storm实时流处理框架的面试攻略和核心原理解析。文章分享了面试常见主题,包括Storm的架构与核心概念(如Spout、Bolt、Topology、Tuple和Ack机制),编程模型与API,部署与运维,以及应用场景与最佳实践。通过代码示例展示了如何构建一个简单的WordCountTopology,强调理解和运用Storm的关键知识点对于面试和实际工作的重要性。
153 4
面经:Storm实时计算框架原理与应用场景
|
3月前
|
存储 NoSQL 分布式数据库
【Flink】Flink分布式快照的原理是什么?
【4月更文挑战第21天】【Flink】Flink分布式快照的原理是什么?
|
3月前
|
SQL Prometheus Kubernetes
实时计算 Flink版产品使用合集之时间戳读取的原理是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
346 0
|
3月前
|
SQL 并行计算 大数据
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
关于Flink服务的搭建与部署,由于其涉及诸多实战操作而理论部分相对较少,小编打算采用一个独立的版本和环境来进行详尽的实战讲解。考虑到文字描述可能无法充分展现操作的细节和流程,我们决定以视频的形式进行分析和介绍。因此,在本文中,我们将暂时不涉及具体的搭建和部署步骤。
538 3
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
|
3月前
|
消息中间件 Kafka API
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
233 0