citus对join的支持

简介: citus对join的支持 前言 citus对支持的SQL有一定的限制,其中包括最常见的join,具体如下 inner join 无限制。

citus对join的支持

前言

citus对支持的SQL有一定的限制,其中包括最常见的join,具体如下

inner join

无限制。根据情况会以下面几种方式之一支持

  1. 亲和表
    即2个表的分片规则完全相同,且join列即为分片列值
  2. 参考表
    join的2个表中,其中有一个表不分片且每个worker上都存一份副本的表,即"参考表"
  3. 小表广播
    分片数小于citus.large_table_shard_count的表被认为是小表(默认值为4),citus会将小表的分片广播到所有worker上并缓存。小表机制容易产生bug不建议使用,比如之后更新小表时不会更新缓存从而导致数据不一致,建议改用参考表代替。
  4. 数据重分布
    使用task-tracker执行器可支持数据重分布,然后以MapMerge的方式支持。

outer join

与inner join的主要不同是不支持数据重分布,因此无法支持两个分片规则不一致的大表的outer join。 另外,参考表只有出现在left join的右边或right join的左边才被支持。

实验

下面演示2个大表的join

环境

  • CentOS release 6.5 x64物理机(16C/128G/3TB SSD)
  • PostgreSQL 9.6.2
  • citus 6.1.0
  • pgbouncer 1.7.2

master和worker都在1台机器上,端口号不同

  • master :60001
  • worker1:60002
  • worker2:60003

worker设置

在master上添加worker节点

SELECT * from master_add_node('/tmp', 60002);
SELECT * from master_add_node('/tmp', 60003); 

数据定义

postgres=# create table tb1(id int,k int);
CREATE TABLE
postgres=# create table tb2(id int,k int);
CREATE TABLE
postgres=# select create_distributed_table('tb1','id');
 create_distributed_table 
--------------------------

(1 行记录)

postgres=# select create_distributed_table('tb2','id');
 create_distributed_table 
--------------------------

(1 行记录) 

数据导入

postgres=# create unlogged table tbx as select  id,id ss from ( select generate_series(1,1000000) id) a;
SELECT 1000000
时间:414.776 ms
postgres=# copy tb1 from PROGRAM 'psql -p60001 -Atc "copy tbx to STDOUT"';
COPY 1000000
时间:748.383 ms
postgres=# copy tb2 from PROGRAM 'psql -p60001 -Atc "copy tbx to STDOUT"';
COPY 1000000
时间:757.981 ms 

执行查询

inner join(分片列相同)

postgres=# select count(1) from tb1 join tb2 on(tb1.id=tb2.id);
  count  
---------
 1000000
(1 行记录)

时间:1889.941 ms 

执行计划

postgres=# explain select count(1) from tb1 join tb2 on(tb1.id=tb2.id);
                                            QUERY PLAN                                            
--------------------------------------------------------------------------------------------------
 Distributed Query into pg_merge_job_0033
   Executor: Task-Tracker
   Task Count: 32
   Tasks Shown: One of 32
   ->  Task
         Node: host=/tmp port=60002 dbname=postgres
         ->  Aggregate  (cost=1818.06..1818.07 rows=1 width=8)
               ->  Hash Join  (cost=849.88..1739.19 rows=31550 width=0)
                     Hash Cond: (tb1.id = tb2.id)
                     ->  Seq Scan on tb1_102008 tb1  (cost=0.00..455.50 rows=31550 width=4)
                     ->  Hash  (cost=455.50..455.50 rows=31550 width=4)
                           ->  Seq Scan on tb2_102040 tb2  (cost=0.00..455.50 rows=31550 width=4)
 Master Query
   ->  Aggregate  (cost=0.00..0.00 rows=0 width=0)
         ->  Seq Scan on pg_merge_job_0033  (cost=0.00..0.00 rows=0 width=0)
(15 行记录)

时间:14.952 ms 

inner join(分片列不同)

postgres=# select count(1) from tb1 join tb2 on(tb1.id=tb2.k);
ERROR:  cannot use real time executor with repartition jobs
提示:  Set citus.task_executor_type to "task-tracker".
时间:16.238 ms 

默认的real-time执行器不支持这种join,先设置执行器为'task-tracker'

set citus.task_executor_type='task-tracker' 

再执行SQL

postgres=# select count(1) from tb1 join tb2 on(tb1.id=tb2.k);
  count  
---------
 1000000
(1 行记录)

时间:16339.376 ms
postgres=# select count(1) from tb1 join tb2 on(tb1.k=tb2.k);
  count  
---------
 1000000
(1 行记录)

时间:16263.971 ms 

16秒完成2个100w大表的join效率也不低了。

执行计划如下:

postgres=# explain select count(1) from tb1 join tb2 on(tb1.k=tb2.k);
                                 QUERY PLAN                                  
-----------------------------------------------------------------------------
 Distributed Query into pg_merge_job_0036
   Executor: Task-Tracker
   Task Count: 8
   Tasks Shown: None, not supported for re-partition queries
   ->  MapMergeJob
         Map Task Count: 32
         Merge Task Count: 8
   ->  MapMergeJob
         Map Task Count: 32
         Merge Task Count: 8
 Master Query
   ->  Aggregate  (cost=0.00..0.00 rows=0 width=0)
         ->  Seq Scan on pg_merge_job_0036  (cost=0.00..0.00 rows=0 width=0)
(13 行记录)

时间:22.865 ms
postgres=# explain select count(1) from tb1 join tb2 on(tb1.k=tb2.k);
                                 QUERY PLAN                                  
-----------------------------------------------------------------------------
 Distributed Query into pg_merge_job_0039
   Executor: Task-Tracker
   Task Count: 8
   Tasks Shown: None, not supported for re-partition queries
   ->  MapMergeJob
         Map Task Count: 32
         Merge Task Count: 8
   ->  MapMergeJob
         Map Task Count: 32
         Merge Task Count: 8
 Master Query
   ->  Aggregate  (cost=0.00..0.00 rows=0 width=0)
         ->  Seq Scan on pg_merge_job_0039  (cost=0.00..0.00 rows=0 width=0)
(13 行记录)

时间:21.905 ms 

left join

join列和分片列一致时可以支持

postgres=# select count(1) from tb1 left join tb2 on(tb1.id=tb2.id);
  count  
---------
 1000000
(1 行记录)

时间:1929.182 ms 

join列和分片列不一致时不支持

postgres=# select count(1) from tb1 left join tb2 on(tb1.id=tb2.k);
ERROR:  cannot run outer join query if join is not on the partition column
描述:  Outer joins requiring repartitioning are not supported.
时间:0.268 ms 

和参考表的outer join

创建参考表tb3

postgres=# create table tb3(id int,k int);
CREATE TABLE
时间:0.758 ms
postgres=# select create_reference_table('tb3');
 create_reference_table 
------------------------

(1 行记录)

时间:28.051 ms 

参考表在left join右边时可以支持

postgres=# select count(1) from tb1 left join tb3 on(tb1.k=tb3.k);
  count  
---------
 1000000
(1 行记录)

时间:1942.156 ms 

参考表在left join左边时不支持

postgres=# select count(1) from tb3 left join tb1 on(tb1.k=tb3.k);
ERROR:  cannot run outer join query if join is not on the partition column
描述:  Outer joins requiring repartitioning are not supported.
时间:0.183 ms 

right join正好相反

postgres=# select count(1) from tb3 right join tb1 on(tb1.k=tb3.k);
  count  
---------
 1000000
(1 行记录)

时间:2155.268 ms

postgres=# select count(1) from tb1 right join tb3 on(tb1.k=tb3.k);
ERROR:  cannot run outer join query if join is not on the partition column
描述:  Outer joins requiring repartitioning are not supported.
时间:0.348 ms 

full join不支持

postgres=# select count(1) from tb1 full join tb3 on(tb1.k=tb3.k);
ERROR:  cannot run outer join query if join is not on the partition column
描述:  Outer joins requiring repartitioning are not supported.
时间:0.180 ms
postgres=# select count(1) from tb3 full join tb1 on(tb1.k=tb3.k);
ERROR:  cannot run outer join query if join is not on the partition column
描述:  Outer joins requiring repartitioning are not supported.
时间:0.163 ms
相关文章
|
11月前
|
人工智能 关系型数据库 MySQL
17MyCat - 分片join(join 的概述)
17MyCat - 分片join(join 的概述)
60 0
|
28天前
|
存储 Cloud Native 关系型数据库
PolarDB 并行查询问题之EXISTS子查询在并行查询中如何解决
PolarDB 并行查询问题之EXISTS子查询在并行查询中如何解决
27 1
|
28天前
|
关系型数据库 MySQL 分布式数据库
PolarDB 并行查询问题之处理类似JOIN和GROUP BY的复杂查询如何解决
PolarDB 并行查询问题之处理类似JOIN和GROUP BY的复杂查询如何解决
11 1
openGauss向量化Merge Join--semi join
openGauss向量化Merge Join--semi join
80 0
|
11月前
|
SQL
20MyCat - 分片join(Share join)
20MyCat - 分片join(Share join)
41 0
openGauss向量化Merge Join--inner join
openGauss向量化Merge Join--inner join
78 0
openGauss向量化Merge Join--inner join
|
算法 关系型数据库 PostgreSQL
PostgreSQL/GreenPlum Merge Inner Join解密
PostgreSQL/GreenPlum Merge Inner Join解密
73 0
PostgreSQL/GreenPlum Merge Inner Join解密
|
OLAP Serverless
openGauss向量化引擎--hash join
openGauss向量化引擎--hash join
115 0
|
存储 SQL JSON
Citus 分布式 PostgreSQL 集群 - SQL Reference(查询分布式表 SQL)
Citus 分布式 PostgreSQL 集群 - SQL Reference(查询分布式表 SQL)
207 0