标签
PostgreSQL , citus , sharding , push , pull , 优化器
背景
citus 是postgresql的sharding 开源中间件,2018年被微软收购,插件依旧开源。
在处理非常复杂的SQL时,CITUS使用推拉模型,支持跨节点的数据交换,用以处理复杂SQL。
中间结果的push,pull过程:
push : shard -> coordinator
pull : coordinator -> worker(同一个worker包含多个shard, shard共享FILE pull from coordinator)
shard, coordinator, worker使用COPY交互,使用FILE保存中间结果,node executor使用intermediate_result访问FILE(中间结果)。
例子
If necessary Citus can gather results from subqueries and CTEs into the coordinator node and then push them back across workers for use by an outer query. This allows Citus to support a greater variety of SQL constructs, and even mix executor types between a query and its subqueries.
For example, having subqueries in a WHERE clause sometimes cannot execute inline at the same time as the main query, but must be done separately. Suppose a web analytics application maintains a visits table partitioned by page_id. To query the number of visitor sessions on the top twenty most visited pages, we can use a subquery to find the list of pages, then an outer query to count the sessions.
以下请求,subquery中的请求需要独立执行,将结果推到外层。
SELECT page_id, count(distinct session_id)
FROM visits
WHERE page_id IN (
SELECT page_id
FROM visits
GROUP BY page_id
ORDER BY count(*) DESC
LIMIT 20
)
GROUP BY page_id;
The real-time executor would like to run a fragment of this query against each shard by page_id, counting distinct session_ids, and combining the results on the coordinator. However the LIMIT in the subquery means the subquery cannot be executed as part of the fragment. By recursively planning the query Citus can run the subquery separately, push the results to all workers, run the main fragment query, and pull the results back to the coordinator. The “push-pull” design supports a subqueries like the one above.
Let’s see this in action by reviewing the EXPLAIN output for this query. It’s fairly involved:
完整执行计划:
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=5433 dbname=postgres
-> Limit (cost=1883.00..1883.05 rows=20 width=12)
-> Sort (cost=1883.00..1965.54 rows=33017 width=12)
Sort Key: (count(*)) DESC
-> HashAggregate (cost=674.25..1004.42 rows=33017 width=12)
Group Key: page_id
-> Seq Scan on visits_102264 visits (cost=0.00..509.17 rows=33017 width=4)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=5433 dbname=postgres
-> HashAggregate (cost=734.53..899.61 rows=16508 width=8)
Group Key: visits.page_id, visits.session_id
-> Hash Join (cost=17.00..651.99 rows=16508 width=8)
Hash Cond: (visits.page_id = intermediate_result.page_id)
-> Seq Scan on visits_102264 visits (cost=0.00..509.17 rows=33017 width=8)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
Let’s break it apart and examine each piece.
GroupAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.page_id
以上,根节点,最外层GROUP BY page_id;
,采用的是group agg,而非hash agg.
The root of the tree is what the coordinator node does with the results from the workers. In this case it is grouping them, and GroupAggregate requires they be sorted first.
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
-> Distributed Subplan 6_1
.
以上,由于有subquery的存在,所以整个SQL拆成两个独立部分执行,第一个部分为subquery。
The custom scan has two large sub-trees, starting with a “distributed subplan.”
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.page_id
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=5433 dbname=postgres
-> Limit (cost=1883.00..1883.05 rows=20 width=12)
-> Sort (cost=1883.00..1965.54 rows=33017 width=12)
Sort Key: (count(*)) DESC
-> HashAggregate (cost=674.25..1004.42 rows=33017 width=12)
Group Key: page_id
-> Seq Scan on visits_102264 visits (cost=0.00..509.17 rows=33017 width=4)
.
以上,SUBQUERY的执行计划。每个SHARD执行count group,汇总到coordinator使用sum group得到最终的count group,LIMIT 20。 第一个独立执行过程产生的中间结果为intermediate results
Worker nodes run the above for each of the thirty-two shards (Citus is choosing one representative for display). We can recognize all the pieces of the IN (…) subquery: the sorting, grouping and limiting. When all workers have completed this query, they send their output back to the coordinator which puts it together as “intermediate results.”
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=localhost port=5433 dbname=postgres
-> HashAggregate (cost=734.53..899.61 rows=16508 width=8)
Group Key: visits.page_id, visits.session_id
-> Hash Join (cost=17.00..651.99 rows=16508 width=8)
Hash Cond: (visits.page_id = intermediate_result.page_id)
.
以上,整个QUERY的为第二个独立执行部分。coordinator 将intermediate results 通过PG COPY协议发送到worker (存储为FILE),intermediate_result 函数从FILE中加载中间件结果,用于JOIN。
Citus starts another real-time job in this second subtree. It’s going to count distinct sessions in visits. It uses a JOIN to connect with the intermediate results. The intermediate results will help it restrict to the top twenty pages.
-> Seq Scan on visits_102264 visits (cost=0.00..509.17 rows=33017 width=8)
-> Hash (cost=14.50..14.50 rows=200 width=4)
-> HashAggregate (cost=12.50..14.50 rows=200 width=4)
Group Key: intermediate_result.page_id
-> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)
.
以上,整个QUERY的为第二个独立执行部分。发生在shard上的hash join。
The worker internally retrieves intermediate results using a read_intermediate_result function which loads data from a file that was copied in from the coordinator node.
This example showed how Citus executed the query in multiple steps with a distributed subplan, and how you can use EXPLAIN to learn about distributed query execution.
小结
当一个复杂SQL无法在一个worker executor process 中独立完成时,citus 使用push-pull的方式,实现SQL的多阶段执行,以支持更多复杂SQL。
例如本文提到的subquery,实际上就是一个独立的execute过程,这个subquery独立执行,中间结果使用PG COPY协议发送到coordinator。作为intermediate results。进入下一个独立执行过程后,intermediate results将从coordinator发送到worker节点(同样使用PG COPY协议),在worker节点中存储为临时FILE,在下一个独立执行过程中,read_intermediate_result这个函数来读取这些FILE并使用它们(即PG的Function Scan)。
参考
http://docs.citusdata.com/en/v8.1/develop/reference_processing.html#citus-query-processing
《PostgreSQL sharding : citus 系列6 - count(distinct xx) 加速 (use 估值插件 hll|hyperloglog)》
《PostgreSQL sharding : citus 系列5 - worker节点网络优化》
《PostgreSQL sharding : citus 系列4 - DDL 操作规范 (新增DB,TABLE,SCHEMA,UDF,OP,用户等)》
《PostgreSQL 11 相似图像搜索插件 imgsmlr 性能测试与优化 3 - citus 8机128shard (4亿图像)》
《Deepgreen(Greenplum) 多机部署测试 , TPC-H VS citus》
《PostgreSQL sharding : citus 系列3 - 窗口函数调用限制 与 破解之法(套用gpdb执行树,分步执行)》
《PostgreSQL sharding : citus 系列2 - TPC-H》
《PostgreSQL citus, Greenplum 分布式执行计划 DEBUG》
《PostgreSQL sharding : citus 系列1 - 多机部署(含OLTP(TPC-B)测试)》