1 AnalyticDB MySQL中sql语句的执行过程
AnalyticDB MySQL采用三层架构,分为接入层、计算层和存储层,这个分层和MySQL等数据库也是基本一致的,由于AnalyticDB MySQL是用于数据仓库的分析型数据库,其底层的实现同MySQL完全不同。
在AnalyticDB MySQ,这三本层次都是分布式的,接入层主要负责协议层接入、SQL解析和优化、实时写入Sharding、数据调度和查询调度,是sql执行计划中的控制节点;计算层是阿里的羲和分析计算引擎,具备分布式MPP和DAG融合执行能力,结合智能
优化器,可支持高并发和复杂SQL混合负载,存储层是阿里的玄武分析存储引擎,基于Raft协议实现的分布式实时强一致高可用存储引擎,通过数据分片和Multi-Raft实现并行。阿里官网的架构图如下:
对应这三个层次,在AnalyticDB MySQ中,一条语句的执行流程是这样的:
1 客户端将SQL语句提交到AnalyticDB MySQL版的前端接入节点(即Controller节点)。Controller节点中解析器(Parser)会对SQL语句进行解析优化并生成最终的逻辑执行计划(Plan),并根据数据是否需要在网络间传输来决定是否需要对计划进行切分成多个stage。逻辑执行计划中也会规定特定的执行处理方式,例如Join类型、Join顺序、聚合方式以及数据重分布方式等。
执行计划任务的节点(即Executor节点)会接收最终的逻辑执行计划并将其转化成物理执行计划。物理执行计划由Stage和算子(Operator)组成,在计算层进行执行,stage在每一个计算节点上的执行被称为任务。任务可以有一个或多个算子组成。任务的执行中,计算节点从存储节点获取数据,也可能会将过滤等操作下推到存储节点执行。
Executor节点将数据处理的最终结果返回到客户端,或者写入AnalyticDB MySQL版集群的内部表以及其它外部存储系统(如OSS)中。
2 AnalyticDB MySQL 执行计划的基本概念
AnalyticDB MySQL中的语句大多是并行执行的,其执行计划具有分布式和并行执行的特点,执行计划中有几个基本概念也和MySQL不同,读懂执行计划的前提是对这些基本概念要清楚,主要有stage、Task和算子三个基本概念。
Stage(执行阶段),AnalyticDB MySQL版中的查询会首先被切分为多个Stage来执行,一个Stage就是执行计划中某一部分的物理实体。Stage的数据来源可以是底层存储系统中的数据或者网络中传输的数据,一个Stage由分布在不同Executor节点上相同类型的Task组成,多个Task会并行处理数据。
Task是一个Stage在某个Executor节点上的执行实体,多个同类型的Task组成一个Stage,在集群内部并行处理数据。
算子(Operator)
算子是AnalyticDB MySQL版的基本数据处理单元。AnalyticDB MySQL版会根据算子所表达的语义或算子间的依赖关系,决定使用并行还是串行执行来处理数据。
这三个概念的之间的关系如下图所示:
3 AnalyticDB MySQL中stage
stage是执行计划的重要阶段,通过一个具体的stage的例子来看一下就明白了。
上面的是来源是阿里官网,是AnalyticDB MySQL版的SQL诊断功能以树形图的形式展现SQL查询的执行计划的stage层执行计划树。stage是下网上执行的,先由具有扫描算子的Stage进行数据扫描,再经过中间Stage节点的层层处理后,再由最上层的根节点将查询结果返回客户端。
数据由下层流向上层的有三种方式,如果不理解这三种方式,就很难理解执行计划。这三种方式是广播(broadcast),重分区(repartition),和汇集(gather)。看一下官网的三个图,对这三种方式就一目了然了。
图里的上游下游是时间上的概念,而不是空间或层次上的概念,数据总是从上游流转到下游。
3.1 广播
上游的表被复制到下游的每个节点
3.2 重分区
上游的分区分布和下游要求的分区分布方式不同,根据下游的要求重新在每个节点上分布分区。
3.3 汇集
上游的数据被汇集到一个下游节点上进行汇总计算。
4 算子
执行计划中的任务(task)不必多做解释,一个阶段(stage)在一个节点上的执行就是任务。
执行计划里的算子因为涉及到分布式和并行,出现了几个在MySQL中没有的算子,需要说明一下。
4.1 分布式和并行相关算子
RemoteSource
该算子用来表示当前Stage的输入数据是通过网络从远程节点传输过来的。
RemoteExchange
该算子用来表示上游向下游Stage传输数据时所用的方法。上下游Stage间传输数据的方法有如下几种:
A)Broadcast:表示上游Stage中每个计算节点的数据都会复制到所有下游Stage的计算节点。
B)Repartition:表示上游Stage中每个节点的数据会按照固定的规则切分后,再分发到下游Stage的指定计算节点。
C)Gather:表示上游Stage中每个节点的数据会集中到下游Stage中某一个特定的计算节点。
可以看出来,RemoteExchange三个方法和stage间传递数据的三种方式相对应。
4.2 其它算子
其它算子比较好理解,同MySQL数据库执行计划中的操作基本类似,如Aggregation算子通过sum()、count()、avg()等函数对数据进行聚合或分组聚合操作DistinctLimit算子对应SQL语句中的DISTINCT LIMIT,MarkDistinct
对应SQL语句中的count(DISTINCT)操作,Project对应SQL语句中对特定字段的投影操作。
有一个算子也是Analytic DB MySQL特有的,这个就是StageOutput算子,
这个算子用于将当前Stage处理后的数据通过网络传输到下游Stage的节点。
4.3 算子的执行
算子怎样组成任务可以从下面图中看出来。
5 解释和分析执行计划
有了上面的准备,我们可以解释和分析AnalyticDB MySQL的执行计划了,下面这个例子来自官网。要分析的sql语句先列出来。
SELECTcount(*)FROM nation, region, customer WHERE c_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name ='ASIA';
这条sql语句并不复杂,是一个三个表的内连接,获得区域为亚洲的用户的数量,限制条件加载region表上。这条语句的执行计划看起来要复杂一些
Output[count(*)] │ Outputs:[count:bigint] │ Estimates:{rows:1(8B)} │ count(*):=count └─ Aggregate(FINAL) │ Outputs:[count:bigint] │ Estimates:{rows:1(8B)} │ count:=count(`count_1`) └─ LocalExchange[SINGLE]() │ Outputs:[count_0_1:bigint] │ Estimates:{rows:1(8B)} └─ RemoteExchange[GATHER] │ Outputs:[count_0_2:bigint] │ Estimates:{rows:1(8B)} └─ Aggregate(PARTIAL) │ Outputs:[count_0_4:bigint] │ Estimates:{rows:1(8B)} │ count_4 :=count(*) └─ InnerJoin[(`c_nationkey` = `n_nationkey`)][$hashvalue, $hashvalue_0_6] │ Outputs:[] │ Estimates:{rows:302035(4.61MB)} │ Distribution: REPLICATED ├─ Project[] │ │ Outputs:[c_nationkey:integer, $hashvalue:bigint] │ │ Estimates:{rows:1500000(5.72MB)} │ │ $hashvalue := `combine_hash`(BIGINT'0', COALESCE(`$operator$hash_code`(`c_nationkey`),0)) │ └─ RuntimeFilter │ │ Outputs:[c_nationkey:integer] │ │ Estimates:{rows:1500000(5.72MB)} │ ├─ TableScan[adb:AdbTableHandle{schema=tpch, tableName=customer, partitionColumnHandles=[c_custkey]}] │ │ Outputs:[c_nationkey:integer] │ │ Estimates:{rows:1500000(5.72MB)} │ │ c_nationkey := AdbColumnHandle{columnName=c_nationkey, type=4, isIndexed=true} │ └─ RuntimeCollect │ │ Outputs:[n_nationkey:integer] │ │ Estimates:{rows:5(60B)} │ └─ LocalExchange[ROUND_ROBIN]() │ │ Outputs:[n_nationkey:integer] │ │ Estimates:{rows:5(60B)} │ └─ RuntimeScan │ Outputs:[n_nationkey:integer] │ Estimates:{rows:5(60B)} └─ LocalExchange[HASH][$hashvalue_0_6]("n_nationkey") │ Outputs:[n_nationkey:integer, $hashvalue_0_6:bigint] │ Estimates:{rows:5(60B)} └─ Project[] │ Outputs:[n_nationkey:integer, $hashvalue_0_10:bigint] │ Estimates:{rows:5(60B)} │ $hashvalue_10 := `combine_hash`(BIGINT'0', COALESCE(`$operator$hash_code`(`n_nationkey`),0)) └─ RemoteExchange[REPLICATE] │ Outputs:[n_nationkey:integer] │ Estimates:{rows:5(60B)} └─ InnerJoin[(`n_regionkey` = `r_regionkey`)][$hashvalue_0_7, $hashvalue_0_8] │ Outputs:[n_nationkey:integer] │ Estimates:{rows:5(60B)} │ Distribution: REPLICATED ├─ Project[] │ │ Outputs:[n_nationkey:integer, n_regionkey:integer, $hashvalue_0_7:bigint] │ │ Estimates:{rows:25(200B)} │ │ $hashvalue_7 := `combine_hash`(BIGINT'0', COALESCE(`$operator$hash_code`(`n_regionkey`),0)) │ └─ RuntimeFilter │ │ Outputs:[n_nationkey:integer, n_regionkey:integer] │ │ Estimates:{rows:25(200B)} │ ├─ TableScan[adb:AdbTableHandle{schema=tpch, tableName=nation, partitionColumnHandles=[]}] │ │ Outputs:[n_nationkey:integer, n_regionkey:integer] │ │ Estimates:{rows:25(200B)} │ │ n_nationkey := AdbColumnHandle{columnName=n_nationkey, type=4, isIndexed=true} │ │ n_regionkey := AdbColumnHandle{columnName=n_regionkey, type=4, isIndexed=true} │ └─ RuntimeCollect │ │ Outputs:[r_regionkey:integer] │ │ Estimates:{rows:1(4B)} │ └─ LocalExchange[ROUND_ROBIN]() │ │ Outputs:[r_regionkey:integer] │ │ Estimates:{rows:1(4B)} │ └─ RuntimeScan │ Outputs:[r_regionkey:integer] │ Estimates:{rows:1(4B)} └─ LocalExchange[HASH][$hashvalue_0_8]("r_regionkey") │ Outputs:[r_regionkey:integer, $hashvalue_0_8:bigint] │ Estimates:{rows:1(4B)} └─ ScanProject[table= adb:AdbTableHandle{schema=tpch, tableName=region, partitionColumnHandles=[]}] Outputs:[r_regionkey:integer, $hashvalue_0_9:bigint] Estimates:{rows:1(4B)}/{rows:1(B)} $hashvalue_9 := `combine_hash`(BIGINT'0', COALESCE(`$operator$hash_code`(`r_regionkey`),0)) r_regionkey := AdbColumnHandle{columnName=r_regionkey, type=4, isIndexed=true}
执行计划的localExchange是在本地节点进行的对数据的操作,RuntimeCollect是从前面的算子获得的数据。
执行计划从下往上看,最下面一个算子是ScanProject,扫描的是region表,扫描后的数据由LocalExchange[HASH]算子进行哈希计算,然后和nation表进行哈希join,
join的键是n_regionkey和r_regionkey,join的输出是n_nationkey,join操作的执行计划看起来比较复杂,可以看到对nation表的TableScan操作,以及执行中获得的对region表哈希操作后的数据,这个数据只有一行。这个join的结果再和用户表进行join后再每个节点上进行部分聚合(Aggregate(PARTIAL),最后汇集(RemoteExchange[GATHER])到一个节点上进行最后聚合(Aggregate(FINAL)),然后输出(Output[count(*)])。