PostgreSQL sharding extensino citus 优化器 Query Processing 之 - Subquery/CTE Push-Pull Execution

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: 标签 PostgreSQL , citus , sharding , push , pull , 优化器 背景 citus 是postgresql的sharding 开源中间件,2018年被微软收购,插件依旧开源。 在处理非常复杂的SQL时,CITUS使用推拉模型,支持跨节点的数据交换,用以处理复杂SQL。 中间结果的push,pull过程: push : shard ->

标签

PostgreSQL , citus , sharding , push , pull , 优化器


背景

citus 是postgresql的sharding 开源中间件,2018年被微软收购,插件依旧开源。

在处理非常复杂的SQL时,CITUS使用推拉模型,支持跨节点的数据交换,用以处理复杂SQL。

中间结果的push,pull过程:

push : shard -> coordinator

pull : coordinator -> worker(同一个worker包含多个shard, shard共享FILE pull from coordinator)

shard, coordinator, worker使用COPY交互,使用FILE保存中间结果,node executor使用intermediate_result访问FILE(中间结果)。

例子

If necessary Citus can gather results from subqueries and CTEs into the coordinator node and then push them back across workers for use by an outer query. This allows Citus to support a greater variety of SQL constructs, and even mix executor types between a query and its subqueries.

For example, having subqueries in a WHERE clause sometimes cannot execute inline at the same time as the main query, but must be done separately. Suppose a web analytics application maintains a visits table partitioned by page_id. To query the number of visitor sessions on the top twenty most visited pages, we can use a subquery to find the list of pages, then an outer query to count the sessions.

以下请求,subquery中的请求需要独立执行,将结果推到外层。

SELECT page_id, count(distinct session_id)  
FROM visits  
WHERE page_id IN (  
  SELECT page_id  
  FROM visits  
  GROUP BY page_id  
  ORDER BY count(*) DESC  
  LIMIT 20  
)  
GROUP BY page_id;  

The real-time executor would like to run a fragment of this query against each shard by page_id, counting distinct session_ids, and combining the results on the coordinator. However the LIMIT in the subquery means the subquery cannot be executed as part of the fragment. By recursively planning the query Citus can run the subquery separately, push the results to all workers, run the main fragment query, and pull the results back to the coordinator. The “push-pull” design supports a subqueries like the one above.

Let’s see this in action by reviewing the EXPLAIN output for this query. It’s fairly involved:

完整执行计划:

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)  
  Group Key: remote_scan.page_id  
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
    Sort Key: remote_scan.page_id  
    ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
      ->  Distributed Subplan 6_1  
        ->  Limit  (cost=0.00..0.00 rows=0 width=0)  
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC  
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)  
              Group Key: remote_scan.page_id  
              ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
                Task Count: 32  
                Tasks Shown: One of 32  
                ->  Task  
                  Node: host=localhost port=5433 dbname=postgres  
                  ->  Limit  (cost=1883.00..1883.05 rows=20 width=12)  
                    ->  Sort  (cost=1883.00..1965.54 rows=33017 width=12)  
                      Sort Key: (count(*)) DESC  
                      ->  HashAggregate  (cost=674.25..1004.42 rows=33017 width=12)  
                        Group Key: page_id  
                        ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=4)  
      Task Count: 32  
      Tasks Shown: One of 32  
      ->  Task  
        Node: host=localhost port=5433 dbname=postgres  
        ->  HashAggregate  (cost=734.53..899.61 rows=16508 width=8)  
          Group Key: visits.page_id, visits.session_id  
          ->  Hash Join  (cost=17.00..651.99 rows=16508 width=8)  
            Hash Cond: (visits.page_id = intermediate_result.page_id)  
            ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=8)  
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)  
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)  
                Group Key: intermediate_result.page_id  
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)  

Let’s break it apart and examine each piece.

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)  
  Group Key: remote_scan.page_id  
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
    Sort Key: remote_scan.page_id  

以上,根节点,最外层GROUP BY page_id;,采用的是group agg,而非hash agg.

The root of the tree is what the coordinator node does with the results from the workers. In this case it is grouping them, and GroupAggregate requires they be sorted first.

    ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
      ->  Distributed Subplan 6_1  
.  

以上,由于有subquery的存在,所以整个SQL拆成两个独立部分执行,第一个部分为subquery。

The custom scan has two large sub-trees, starting with a “distributed subplan.”

        ->  Limit  (cost=0.00..0.00 rows=0 width=0)  
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC  
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)  
              Group Key: remote_scan.page_id  
              ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
                Task Count: 32  
                Tasks Shown: One of 32  
                ->  Task  
                  Node: host=localhost port=5433 dbname=postgres  
                  ->  Limit  (cost=1883.00..1883.05 rows=20 width=12)  
                    ->  Sort  (cost=1883.00..1965.54 rows=33017 width=12)  
                      Sort Key: (count(*)) DESC  
                      ->  HashAggregate  (cost=674.25..1004.42 rows=33017 width=12)  
                        Group Key: page_id  
                        ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=4)  
.  

以上,SUBQUERY的执行计划。每个SHARD执行count group,汇总到coordinator使用sum group得到最终的count group,LIMIT 20。 第一个独立执行过程产生的中间结果为intermediate results

Worker nodes run the above for each of the thirty-two shards (Citus is choosing one representative for display). We can recognize all the pieces of the IN (…) subquery: the sorting, grouping and limiting. When all workers have completed this query, they send their output back to the coordinator which puts it together as “intermediate results.”

      Task Count: 32  
      Tasks Shown: One of 32  
      ->  Task  
        Node: host=localhost port=5433 dbname=postgres  
        ->  HashAggregate  (cost=734.53..899.61 rows=16508 width=8)  
          Group Key: visits.page_id, visits.session_id  
          ->  Hash Join  (cost=17.00..651.99 rows=16508 width=8)  
            Hash Cond: (visits.page_id = intermediate_result.page_id)  
.  

以上,整个QUERY的为第二个独立执行部分。coordinator 将intermediate results 通过PG COPY协议发送到worker (存储为FILE),intermediate_result 函数从FILE中加载中间件结果,用于JOIN。

Citus starts another real-time job in this second subtree. It’s going to count distinct sessions in visits. It uses a JOIN to connect with the intermediate results. The intermediate results will help it restrict to the top twenty pages.

            ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=8)  
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)  
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)  
                Group Key: intermediate_result.page_id  
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)  
.  

以上,整个QUERY的为第二个独立执行部分。发生在shard上的hash join。

The worker internally retrieves intermediate results using a read_intermediate_result function which loads data from a file that was copied in from the coordinator node.

This example showed how Citus executed the query in multiple steps with a distributed subplan, and how you can use EXPLAIN to learn about distributed query execution.

小结

当一个复杂SQL无法在一个worker executor process 中独立完成时,citus 使用push-pull的方式,实现SQL的多阶段执行,以支持更多复杂SQL。

例如本文提到的subquery,实际上就是一个独立的execute过程,这个subquery独立执行,中间结果使用PG COPY协议发送到coordinator。作为intermediate results。进入下一个独立执行过程后,intermediate results将从coordinator发送到worker节点(同样使用PG COPY协议),在worker节点中存储为临时FILE,在下一个独立执行过程中,read_intermediate_result这个函数来读取这些FILE并使用它们(即PG的Function Scan)。

参考

http://docs.citusdata.com/en/v8.1/develop/reference_processing.html#citus-query-processing

《PostgreSQL sharding : citus 系列7 - topn 加速(count(*) group by order by count(*) desc limit x) (use 估值插件 topn)》

《PostgreSQL sharding : citus 系列6 - count(distinct xx) 加速 (use 估值插件 hll|hyperloglog)》

《PostgreSQL sharding : citus 系列5 - worker节点网络优化》

《PostgreSQL sharding : citus 系列4 - DDL 操作规范 (新增DB,TABLE,SCHEMA,UDF,OP,用户等)》

《PostgreSQL 11 相似图像搜索插件 imgsmlr 性能测试与优化 3 - citus 8机128shard (4亿图像)》

《Deepgreen(Greenplum) 多机部署测试 , TPC-H VS citus》

《PostgreSQL sharding : citus 系列3 - 窗口函数调用限制 与 破解之法(套用gpdb执行树,分步执行)》

《PostgreSQL sharding : citus 系列2 - TPC-H》

《PostgreSQL citus, Greenplum 分布式执行计划 DEBUG》

《PostgreSQL sharding : citus 系列1 - 多机部署(含OLTP(TPC-B)测试)》

 

免费领取阿里云RDS PostgreSQL实例、ECS虚拟机

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
2月前
|
关系型数据库 分布式数据库 数据库
PostgreSQL+Citus分布式数据库
PostgreSQL+Citus分布式数据库
83 15
|
SQL 存储 运维
从Citus深度解密如何基于PostgreSQL做分布式数据库
从源码级别揭秘Citus如何基于PostgreSQL做一款分布式数据库,解决分布式场景的数据分片、分布式SQL、分布式事务、数据倾斜、数据迁移等难点问题,理解分布式领域设计的“取”与“舍”。
1911 3
从Citus深度解密如何基于PostgreSQL做分布式数据库
|
SQL 存储 关系型数据库
分布式 PostgreSQL 集群(Citus)官方示例 - 多租户应用程序实战
分布式 PostgreSQL 集群(Citus)官方示例 - 多租户应用程序实战
527 0
分布式 PostgreSQL 集群(Citus)官方示例 - 多租户应用程序实战
|
存储 SQL 安全
分布式 PostgreSQL,Citus(11.x) 效用函数
分布式 PostgreSQL,Citus(11.x) 效用函数
716 0
|
SQL 分布式计算 关系型数据库
「PostgreSQL技巧」Citus实时执行程序如何并行化查询
「PostgreSQL技巧」Citus实时执行程序如何并行化查询
|
8月前
|
SQL 关系型数据库 分布式数据库
从Citus深度解密如何基于PostgreSQL做分布式数据库
前言分布式数据库能够解决海量数据存储、超高并发吞吐、大表瓶颈以及复杂计算效率等单机数据库瓶颈难题,当业务体量即将突破单机数据库承载极限和单表过大导致性能、维护问题时,分布式数据库是解决上述问题的高性价比方案。数据库作为分布式改造的最大难点,就是"和使用单机数据库一样使用分布式数据库",这也一直是广大...
3316 0
从Citus深度解密如何基于PostgreSQL做分布式数据库
|
人工智能 分布式计算 前端开发
更高效的Cascades优化器 - Columbia Query Optimizer
在较早的文章中介绍了些Volcano/Cascades优化器框架的设计理念和实现思路,基本是基于论文的解读:VolcanoCascades虽然cascades号称目前最为先进的优化器搜索框架,但不得不说这2篇paper的内容,实在是让人看起来有些吃力。尤其是后篇,说是从工程实现的角度来描述,但讲解的不够详尽,而且有些地方既模糊又抽象。此外工业界并没有一款优化器是完全基于paper的框架去实现的,这
2004 0
更高效的Cascades优化器 - Columbia Query Optimizer
|
SQL 缓存 关系型数据库
分布式 PostgreSQL,Citus 11.x SQL 参考(中文手册)
分布式 PostgreSQL,Citus 11.x SQL 参考(中文手册)
738 0
|
网络协议 Ubuntu 关系型数据库
分布式 PostgreSQL 集群(Citus)官方安装指南
分布式 PostgreSQL 集群(Citus)官方安装指南
1342 0
|
SQL 关系型数据库 PostgreSQL
Citus 11(分布式 PostgreSQL) 文档贡献与本地运行
Citus 11(分布式 PostgreSQL) 文档贡献与本地运行
209 0
Citus 11(分布式 PostgreSQL) 文档贡献与本地运行