PostgreSQL sharding extensino citus 优化器 Query Processing 之 - Subquery/CTE Push-Pull Execution

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: 标签 PostgreSQL , citus , sharding , push , pull , 优化器 背景 citus 是postgresql的sharding 开源中间件,2018年被微软收购,插件依旧开源。 在处理非常复杂的SQL时,CITUS使用推拉模型,支持跨节点的数据交换,用以处理复杂SQL。 中间结果的push,pull过程: push : shard ->

标签

PostgreSQL , citus , sharding , push , pull , 优化器


背景

citus 是postgresql的sharding 开源中间件,2018年被微软收购,插件依旧开源。

在处理非常复杂的SQL时,CITUS使用推拉模型,支持跨节点的数据交换,用以处理复杂SQL。

中间结果的push,pull过程:

push : shard -> coordinator

pull : coordinator -> worker(同一个worker包含多个shard, shard共享FILE pull from coordinator)

shard, coordinator, worker使用COPY交互,使用FILE保存中间结果,node executor使用intermediate_result访问FILE(中间结果)。

例子

If necessary Citus can gather results from subqueries and CTEs into the coordinator node and then push them back across workers for use by an outer query. This allows Citus to support a greater variety of SQL constructs, and even mix executor types between a query and its subqueries.

For example, having subqueries in a WHERE clause sometimes cannot execute inline at the same time as the main query, but must be done separately. Suppose a web analytics application maintains a visits table partitioned by page_id. To query the number of visitor sessions on the top twenty most visited pages, we can use a subquery to find the list of pages, then an outer query to count the sessions.

以下请求,subquery中的请求需要独立执行,将结果推到外层。

SELECT page_id, count(distinct session_id)  
FROM visits  
WHERE page_id IN (  
  SELECT page_id  
  FROM visits  
  GROUP BY page_id  
  ORDER BY count(*) DESC  
  LIMIT 20  
)  
GROUP BY page_id;  

The real-time executor would like to run a fragment of this query against each shard by page_id, counting distinct session_ids, and combining the results on the coordinator. However the LIMIT in the subquery means the subquery cannot be executed as part of the fragment. By recursively planning the query Citus can run the subquery separately, push the results to all workers, run the main fragment query, and pull the results back to the coordinator. The “push-pull” design supports a subqueries like the one above.

Let’s see this in action by reviewing the EXPLAIN output for this query. It’s fairly involved:

完整执行计划:

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)  
  Group Key: remote_scan.page_id  
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
    Sort Key: remote_scan.page_id  
    ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
      ->  Distributed Subplan 6_1  
        ->  Limit  (cost=0.00..0.00 rows=0 width=0)  
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC  
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)  
              Group Key: remote_scan.page_id  
              ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
                Task Count: 32  
                Tasks Shown: One of 32  
                ->  Task  
                  Node: host=localhost port=5433 dbname=postgres  
                  ->  Limit  (cost=1883.00..1883.05 rows=20 width=12)  
                    ->  Sort  (cost=1883.00..1965.54 rows=33017 width=12)  
                      Sort Key: (count(*)) DESC  
                      ->  HashAggregate  (cost=674.25..1004.42 rows=33017 width=12)  
                        Group Key: page_id  
                        ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=4)  
      Task Count: 32  
      Tasks Shown: One of 32  
      ->  Task  
        Node: host=localhost port=5433 dbname=postgres  
        ->  HashAggregate  (cost=734.53..899.61 rows=16508 width=8)  
          Group Key: visits.page_id, visits.session_id  
          ->  Hash Join  (cost=17.00..651.99 rows=16508 width=8)  
            Hash Cond: (visits.page_id = intermediate_result.page_id)  
            ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=8)  
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)  
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)  
                Group Key: intermediate_result.page_id  
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)  

Let’s break it apart and examine each piece.

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)  
  Group Key: remote_scan.page_id  
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
    Sort Key: remote_scan.page_id  

以上,根节点,最外层GROUP BY page_id;,采用的是group agg,而非hash agg.

The root of the tree is what the coordinator node does with the results from the workers. In this case it is grouping them, and GroupAggregate requires they be sorted first.

    ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
      ->  Distributed Subplan 6_1  
.  

以上,由于有subquery的存在,所以整个SQL拆成两个独立部分执行,第一个部分为subquery。

The custom scan has two large sub-trees, starting with a “distributed subplan.”

        ->  Limit  (cost=0.00..0.00 rows=0 width=0)  
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC  
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)  
              Group Key: remote_scan.page_id  
              ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
                Task Count: 32  
                Tasks Shown: One of 32  
                ->  Task  
                  Node: host=localhost port=5433 dbname=postgres  
                  ->  Limit  (cost=1883.00..1883.05 rows=20 width=12)  
                    ->  Sort  (cost=1883.00..1965.54 rows=33017 width=12)  
                      Sort Key: (count(*)) DESC  
                      ->  HashAggregate  (cost=674.25..1004.42 rows=33017 width=12)  
                        Group Key: page_id  
                        ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=4)  
.  

以上,SUBQUERY的执行计划。每个SHARD执行count group,汇总到coordinator使用sum group得到最终的count group,LIMIT 20。 第一个独立执行过程产生的中间结果为intermediate results

Worker nodes run the above for each of the thirty-two shards (Citus is choosing one representative for display). We can recognize all the pieces of the IN (…) subquery: the sorting, grouping and limiting. When all workers have completed this query, they send their output back to the coordinator which puts it together as “intermediate results.”

      Task Count: 32  
      Tasks Shown: One of 32  
      ->  Task  
        Node: host=localhost port=5433 dbname=postgres  
        ->  HashAggregate  (cost=734.53..899.61 rows=16508 width=8)  
          Group Key: visits.page_id, visits.session_id  
          ->  Hash Join  (cost=17.00..651.99 rows=16508 width=8)  
            Hash Cond: (visits.page_id = intermediate_result.page_id)  
.  

以上,整个QUERY的为第二个独立执行部分。coordinator 将intermediate results 通过PG COPY协议发送到worker (存储为FILE),intermediate_result 函数从FILE中加载中间件结果,用于JOIN。

Citus starts another real-time job in this second subtree. It’s going to count distinct sessions in visits. It uses a JOIN to connect with the intermediate results. The intermediate results will help it restrict to the top twenty pages.

            ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=8)  
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)  
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)  
                Group Key: intermediate_result.page_id  
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)  
.  

以上,整个QUERY的为第二个独立执行部分。发生在shard上的hash join。

The worker internally retrieves intermediate results using a read_intermediate_result function which loads data from a file that was copied in from the coordinator node.

This example showed how Citus executed the query in multiple steps with a distributed subplan, and how you can use EXPLAIN to learn about distributed query execution.

小结

当一个复杂SQL无法在一个worker executor process 中独立完成时,citus 使用push-pull的方式,实现SQL的多阶段执行,以支持更多复杂SQL。

例如本文提到的subquery,实际上就是一个独立的execute过程,这个subquery独立执行,中间结果使用PG COPY协议发送到coordinator。作为intermediate results。进入下一个独立执行过程后,intermediate results将从coordinator发送到worker节点(同样使用PG COPY协议),在worker节点中存储为临时FILE,在下一个独立执行过程中,read_intermediate_result这个函数来读取这些FILE并使用它们(即PG的Function Scan)。

参考

http://docs.citusdata.com/en/v8.1/develop/reference_processing.html#citus-query-processing

《PostgreSQL sharding : citus 系列7 - topn 加速(count(*) group by order by count(*) desc limit x) (use 估值插件 topn)》

《PostgreSQL sharding : citus 系列6 - count(distinct xx) 加速 (use 估值插件 hll|hyperloglog)》

《PostgreSQL sharding : citus 系列5 - worker节点网络优化》

《PostgreSQL sharding : citus 系列4 - DDL 操作规范 (新增DB,TABLE,SCHEMA,UDF,OP,用户等)》

《PostgreSQL 11 相似图像搜索插件 imgsmlr 性能测试与优化 3 - citus 8机128shard (4亿图像)》

《Deepgreen(Greenplum) 多机部署测试 , TPC-H VS citus》

《PostgreSQL sharding : citus 系列3 - 窗口函数调用限制 与 破解之法(套用gpdb执行树,分步执行)》

《PostgreSQL sharding : citus 系列2 - TPC-H》

《PostgreSQL citus, Greenplum 分布式执行计划 DEBUG》

《PostgreSQL sharding : citus 系列1 - 多机部署(含OLTP(TPC-B)测试)》

 

免费领取阿里云RDS PostgreSQL实例、ECS虚拟机

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
SQL 弹性计算 关系型数据库
PostgreSQL 12 preview - CTE 增强,支持用户语法层控制 materialized 优化
标签 PostgreSQL , CTE , materialized , not materialized , push down 背景 PostgreSQL with 语法,能跑非常复杂的SQL逻辑,包括递归,多语句物化计算等。 在12以前的版本中,WITH中的每一个CTE(common table express),都是直接进行物化的,也就是说外层的条件不会推到CTE(物化节点)里
913 0
|
人工智能 分布式计算 前端开发
更高效的Cascades优化器 - Columbia Query Optimizer
在较早的文章中介绍了些Volcano/Cascades优化器框架的设计理念和实现思路,基本是基于论文的解读:VolcanoCascades虽然cascades号称目前最为先进的优化器搜索框架,但不得不说这2篇paper的内容,实在是让人看起来有些吃力。尤其是后篇,说是从工程实现的角度来描述,但讲解的不够详尽,而且有些地方既模糊又抽象。此外工业界并没有一款优化器是完全基于paper的框架去实现的,这
1716 0
更高效的Cascades优化器 - Columbia Query Optimizer
|
传感器 SQL 并行计算
【重新发现PostgreSQL之美】 - 6 index链表跳跳糖 (CTE recursive 递归的详细用例)
大家好,这里是重新发现PostgreSQL之美 - 6 index链表跳跳糖 (CTE recursive 递归的详细用例)
|
SQL 弹性计算 关系型数据库
PostgreSQL 大宽表,全列索引,高并发合并写入(insert into on conflict, upsert, merge insert) - 实时adhoc query
标签 PostgreSQL , 全列索引 , 大宽表 , 写测试 , insert on conflict , upsert , merge insert , adhoc query 背景 OLAP系统中,adhoc query非常场景(任意维度查询分析)。 adhoc query,通常来说,可以加GIN倒排,或者每一列都加一个索引来实现。 《PostgreSQL 设计优化case
8548 0
|
SQL 弹性计算 安全
PostgreSQL sharding extension citus 优化器 Query Processing 之 - Distributed Query Planner、Executor (Real-time Executor, Router Executor, Task Tracker Executor)
标签 PostgreSQL , citus , sharding , 优化器 , query planner , query exexutor , Real-time Executor , Router Executor , Task Tracker Executor , co-locate 背景 A Citus cluster consists of a coordinator ins
523 0
|
SQL 分布式计算 并行计算
PostgreSQL 并行计算解说 之24 - parallel CTE (Common Table Express)
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan
337 0
|
SQL 分布式计算 并行计算
PostgreSQL 并行计算解说 之29 - parallel 递归查询, 树状查询, 异构查询, CTE, recursive CTE, connect by
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan parallel index scan
234 0
|
SQL 关系型数据库 OLAP
AnalyticDB for PostgreSQL 6.0 新特性解析:Recursive CTE (Common Table Expressions)
Recursive CTE (Common Table Expressions) 能够实现SQL的递归查询功能,一般用于处理逻辑上为层次化或树状结构的数据(如查询组织结构、物料清单等),方便对该类数据进行多级递归查询。