一:安装 sqlgateway 环境
1.1. on session
1.1 先启动session
设置环境变量:HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh -d -s 2 -jm 2048 -tm 2048
1.2.on perjob
启动SqlGateway:
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
默认绑定端口8083
二:调用
1.beeline 调用
1.1 需要下载flink-sql-jdbc-driver-bundle-1.18.0.jar 并放到
{HIVE_HOME}/lib下
2.jdbc 客户端
使用./bin/beeline
!connect jdbc:flink://localhost:8083
链接到已经启动的flink sql gateway
三:流程分析,了解和熟悉
1.启动sqlgateway 流程,了解核心的结构
2.sql提交流程,了解sql 的流转逻辑
3.select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑
1.启动sqlgateway 流程
(1)关注SessionManager,维护了一个session的map
private final Map<SessionHandle, Session> sessions;
在调用openSesson() 会构建一个基于uuid 分配sessionId 的SessionContext;
后续的操作都是基于这个sessionId 获取到对应的SessionContext进行操作;
(2) 通过beenline 提交 sql
eg: select * from order;
经过handler 之后的处理流程
基于TableEnvrionment 获取parser 解析sql ,生成List<Operation>, 然后根据获取到的operator 类型进行处理,不同的operation 有对应的处理逻辑;
我们接下来分析下QueryOperation 的处理逻辑
1)sql 如何被改写的(eg: select * from table)
-- PlannerBase.translate
-- PlannerBase.translateTorel
当匹配到 CollectModifyOperation,构建DynamicSinkUtils.convertCollectToRel
convertCollectToRel 根据input的consumerDataType 构建CollectDynamicSink ,并拼接到collectionModifyOperation 上;
关键代码如下:
final CollectDynamicSink tableSink = new CollectDynamicSink( contextResolvedTable.getIdentifier(), consumedDataType, configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE), configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT), classLoader, zoneId, configuration .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR) .isEnabled()); collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider()); collectModifyOperation.setConsumedDataType(consumedDataType); return convertSinkToRel( relBuilder, input, Collections.emptyMap(), // dynamicOptions contextResolvedTable, Collections.emptyMap(), // staticPartitions null, // targetColumns false, tableSink);
2)sql 如何提交到session 上?
-- StreamExecutionEnvirionment 基于Configuration(execution.target) 构建PipelineExecutor
-- 当前sqlgateway 的Configuration 的参数(execution.target=yarn-session),所以生成YarnSessionClusterExecutor
-- YarnSessionClusterExecutor.execute(streamGraph, configuration, userClassLoader) 进行任务的提交和执行
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor#execute
3) client 如何获取到session 执行的 Result.
-- sql 任务提交完成之后,构建TableResultImpl
-- TableResultImpl 的参数包括之前构建的resultProvider, resultProvider 关联了jobClient,用于后续进行fetcher 操作
ResultProvider resultProvider = sinkOperation.getSelectResultProvider(); resultProvider.setJobClient(jobClient); return TableResultImpl.builder() .jobClient(jobClient) .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .schema(operation.getResolvedSchema()) .resultProvider(resultProvider) .setPrintStyle( PrintStyle.tableauWithTypeInferredColumnWidths( // sinkOperation.getConsumedDataType() handles legacy types DataTypeUtils.expandCompositeTypeToSchema( sinkOperation.getConsumedDataType()), resultProvider.getRowDataStringConverter(), getConfig().get(TableConfigOptions.DISPLAY_MAX_COLUMN_WIDTH), false, isStreamingMode)) .build();
-- fetcher 操作
* rest 请求
/v2/sessions/3aa182db-cb6b-431a-86f5-4432b36776aa/operations/e822ef45-e5a3-460a-b636-9c03318763bc/result/0
* 调用FetchResultsHandler
* 基于request uri 获取到sessionHander. operationHandle, 也就是url 里的uuid;
* 调用ResultFetcher 对结果集进行获取;
-- ResultStore 对结果进行缓存和迭代获取;
-- 封装了CollectDynamicSink 的iterator。用于迭代获取数据,其中iterator 通过封装CollectResultFetcher 来获取远程数据结果并缓存到buffer;
-- CollectResultFetcher 属性包括JobClient, CoordinationRequestGateway;
1) 如果任务已经结束,通过accumulator 获取指定的accumulatorName的数据;
2)
2.1 如果任务未结束,则通过向CoordinationRequestGateway 发送请求来获取;
CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset); return (CollectCoordinationResponse) gateway.sendCoordinationRequest(operatorId, request).get();
2.2 服务端收到请求处理CollectSink request
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator#handleCoordinationRequest
-- org.apache.flink.table.gateway.service.result.ResultStore#retrieveRecords 获取缓存在buffer 中的数据rowdata;