Citus7.4-Citus 9.3新特性

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
简介: Citus7.4-Citus 9.3新特性

Citus7.4-Citus 9.3新特性

最近开始着手Citus7.4到Citus 9.3的升级,所以比较全面地浏览了这期间的Citus变更。
从Citus7.4到Citus 9.3很多方面的改进,本文只列出一些比较重要的部分。

以下用到了一些示例,示例的验证环境如下

软件

  • PostreSQL 12
  • Citus 9.3

集群成员

  • CN

    • 127.0.0.1:9000
  • Worker

    • 127.0.0.1:9001
    • 127.0.0.1:9002

SQL支持增强类

1.支持非分区列的count distinct

这个Citus 7.4应该已经支持了,不知道是Citus的Changelog更新延误,还是Citus 7.5支持得更完善了。

表定义

create table tb1(id int,c1 int);
select create_distributed_table('tb1','id');

非分区列的count distinct的执行计划

postgres=# explain select count(distinct c1) from tb1;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Aggregate  (cost=250.00..250.01 rows=1 width=8)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=4)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  HashAggregate  (cost=38.25..40.25 rows=200 width=4)
                     Group Key: c1
                     ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=4)
(9 rows)

2.支持UPSERT

支持UPSERT,即支持INSERT INTO SELECT..ON CONFLICT/RETURNING

表定义

create table tb1(id int, c1 int);
select create_distributed_table('tb1','id');
create table tb2(id int primary key, c1 int);
select create_distributed_table('tb2','id');

UPSERT SQL执行

postgres=# INSERT INTO tb2
  SELECT * from tb1
  ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1;
INSERT 0 1

3.支持GENERATED ALWAYS AS STORED

使用示例如下:

create table tbgenstore(id int, c1 int GENERATE ALWAYS AS (id+1)STORED);
select create_distributed_table('tbgenstore','id');

4.支持用户定义的分布式函数

支持用户自定义分布式函数。Citus会把分布式函数(包括聚合函数)以及依赖的对象定义下发到所有Worker上。
后续在执行SQL的时候也可以合理的把分布式函数的执行下推到Worker。

分布式函数还可以和某个分布表绑定"亲和"关系,这一个特性的使用场景如下:

在多租户类型的业务中,把单个租户的一个事务中的多个SQL打包成一个“分布式函数”下发到Worker上。
CN只需要下推一次分布式函数的调用,分布式函数内部的多个SQL的执行全部在Worker节点内部完成。
避免CN和Worker之间来回交互,可以大大提升OLTP的性能(利用这个特性去跑TPCC,简直太溜了!)。

下面看下手册里的例子。

https://docs.citusdata.com/en/v9.3/develop/api_udf.html?highlight=distributed%20function#create-distributed-function

-- an example function which updates a hypothetical
-- event_responses table which itself is distributed by event_id
CREATE OR REPLACE FUNCTION
  register_for_event(p_event_id int, p_user_id int)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
  INSERT INTO event_responses VALUES ($1, $2, 'yes')
  ON CONFLICT (event_id, user_id)
  DO UPDATE SET response = EXCLUDED.response;
END;
$fn$;

-- distribute the function to workers, using the p_event_id argument
-- to determine which shard each invocation affects, and explicitly
-- colocating with event_responses which the function updates
SELECT create_distributed_function(
  'register_for_event(int, int)', 'p_event_id',
  colocate_with := 'event_responses'
);

5.完全支持聚合函数

Citus中对聚合函数有3种不同的执行方式

  1. 按照分片字段分组的聚合,直接下推到Worker执行聚合
  2. 对部分Citus能够识别的聚合函数,Citus执行两阶段聚合,现在Worker执行部分聚合,再把结果汇总到CN上进行最终聚合。
  3. 对其他的聚合函数,Citus把数据拉到CN上,在CN上执行聚合。

详细参考,https://docs.citusdata.com/en/v9.3/develop/reference_sql.html?highlight=Aggregation#aggregate-functions

显然第3种方式性能会比较差,对不按分片字段分组的聚合,怎么让它按第2种方式执行呢?

Citus中预定义了一部分聚合函数可以按第2中方式执行。

citus-9.3.0/src/include/distributed/multi_logical_optimizer.h:

static const char *const AggregateNames[] = {
    "invalid", "avg", "min", "max",
    "sum", "count", "array_agg",
    "jsonb_agg", "jsonb_object_agg",
    "json_agg", "json_object_agg",
    "bit_and", "bit_or", "bool_and", "bool_or", "every",
    "hll_add_agg", "hll_union_agg",
    "topn_add_agg", "topn_union_agg",
    "any_value"
};

对不在上面白名单的聚合函数,比如用户自定义的聚合函数,可以通过create_distributed_function()添加。
示例如下:

citus-9.3.0/src/test/regress/expected/aggregate_support.out:

create function sum2_sfunc(state int, x int)
returns int immutable language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc(state int)
returns int immutable language plpgsql as $$
begin return state * 2;
end;
$$;
create aggregate sum2 (int) (
    sfunc = sum2_sfunc,
    stype = int,
    finalfunc = sum2_finalfunc,
    combinefunc = sum2_sfunc,
    initcond = '0'
);

select create_distributed_function('sum2(int)');

执行这个自定义的聚合函数的执行计划如下

postgres=# explain select sum2(c1) from tb1;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Aggregate  (cost=250.00..250.01 rows=1 width=4)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=32)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  Aggregate  (cost=38.25..38.26 rows=1 width=32)
                     ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=4)
(8 rows)

但是当前这种方式不支持stype = internal的自定义聚合函数。
Citus社区已经在对应这个问题,详细参考https://github.com/citusdata/citus/issues/3916

6.完全支持窗口函数

对不按分片字段分组的聚合函数,Citus支持把数据拉到CN上再执行,和聚合函数类型。
需要注意这种执行方式对性能的影响,特别是包含多个不同分组字段的窗口函数时,
Worker拉到CN上结果集是这些字段组合的笛卡尔积。

7.支持在事务块中传播LOCAL参数

当在CN的事务块中设置LOCAL参数时,可以把这个参数传播到Worker节点。

前提条件是citus.propagate_set_commands参数必须为local

set citus.propagate_set_commands TO local;

事务块中设置LOCAL参数

postgres=# begin;
BEGIN
postgres=*# set local enable_hashagg to off;
SET
postgres=*# SELECT current_setting('enable_hashagg') FROM tb1 WHERE id = 3;
 current_setting 
-----------------
 off
(1 row)

8. 支持本地表和参考表Join

如果一个数据库需要用到本地表,而本地表和以参考表的形式部署的维表又有Join的需求,改如何处理?

原来我们只能在CN上再创建一套本地的维表,然后由应用或者通过触发器维护两套维表之间的数据同步。

现在可以用更简单的方式实现。
具体就是把CN节点也可以作为一个Worker加到Citus集群里,groupid一定要设置为0。

SELECT master_add_node('127.0.0.1', 9001, groupid => 0);

这样CN上也就和其他Worker一样拥有了参考表的一个副本,本地表和参考表Join的时候就直接在本地执行了。

DDL支持增强

9.支持把SCHEMA的赋权广播到Worker上

GRANT USAGE ON SCHEMA dist_schema TO role1;

10.支持修改表SCHEMA广播到Worker上

ALTER TABLE ... SET SCHEMA

11.支持创建索引时指定INCLUDE选项

create index tb1_idx_id on tb1(id) include (c1);

12. 支持使用CONCURRENTLY选项创建索引

create index CONCURRENTLY tb1_idx_id2 on tb1(id);

13. 支持传播REINDEX到worker节点上

之前版本reindex不能传播到Worker节点,还需要到每个worker分别执行reindex。
新版的Citus支持了。

reindex index tb1_idx_id;

Citus MX功能增强

14.支持在MX 节点上对参考表执行DML

表定义

create table tbref(id int, c1 int);
select create_refence_table('tbref');

在MX worker(即扩展worker)上修改参考表

postgres=# insert into tbref values(1,1),(2,2);
INSERT 0 2
postgres=# update tbref set c1=10;
UPDATE 2
postgres=# delete from tbref where id=1;
DELETE 1
postgres=# select * from tbref;
 id | c1 
----+----
  2 | 10
(1 row)

15.支持在MX节点上执行TRUNCATE

之前MX节点上是不支持对分布表和参考表执行truncate操作的。现在也支持了

postgres=# truncate tb1;
TRUNCATE TABLE
postgres=# truncate tbref;
TRUNCATE TABLE

16.支持在Citus MX架构下使用serial和smallserial

之前在Citus MX(即多CN部署)环境下,自增序列只能使用bigserial类型,现在也可以支持serial和smallserial了。

表定义

create table tbserial(id int,c1 int);
select create_distributed_table('tbserial','id');

Citus中,自增字段通过CN和MX节点上逻辑表上的序列对象实现。

postgres=# \d tbserial
                            Table "public.tbserial"
 Column |  Type   | Collation | Nullable |               Default                
--------+---------+-----------+----------+--------------------------------------
 id     | integer |           | not null | nextval('tbserial_id_seq'::regclass)
 c1     | integer |           |          | 

为了防止多个MX节点产生的序列冲突。在Citus MX环境下,序列值的开头部分是产生序列的节点的groupid,后面才是顺序累加的值。
这等于按groupid把序列值分成了不同的范围,互不重叠。

即:

全局序列值 = groupid,节点内的顺序递增值

对不同serial的数据类型,groupid占的位数是不一样的。具体如下

  • bigserial:16bit
  • serial:4bit
  • smallserial:4bit

根据上groupid占的长度,我们需要注意

  1. 单个节点(CN或扩展Worker)上,能产生的序列值的数量变少了,要防止溢出。
  2. 如果使用了serial或smallserial,最多部署7个扩展Worker节点。

序列对象的定义

上面提到的全局序列的实现具体体现为:在不同节点上,序列对象定义的范围不一样。如下

CN节点上的序列对象定义(CN节点的groupid固定为0)

postgres=# \d tbserial_id_seq
                  Sequence "public.tbserial_id_seq"
  Type   | Start | Minimum |  Maximum   | Increment | Cycles? | Cache 
---------+-------+---------+------------+-----------+---------+-------
 integer |     1 |       1 | 2147483647 |         1 | no      |     1
Owned by: public.tbserial.id

MX Worker节点上的序列对象定义(groupid=1)

postgres=# \d tbserial_id_seq
                    Sequence "public.tbserial_id_seq"
  Type  |   Start   |  Minimum  |  Maximum  | Increment | Cycles? | Cache 
--------+-----------+-----------+-----------+-----------+---------+-------
 bigint | 268435457 | 268435457 | 536870913 |         1 | no      |     1

如何知道每个Worker节点的groupid?

每个Worker节点的groupid可以从pg_dist_node获取。

postgres=# select * from pg_dist_node;
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards 
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
      2 |       2 | 127.0.0.1 |     9002 | default  | t           | t        | primary  | default     | t              | t
      1 |       1 | 127.0.0.1 |     9001 | default  | t           | t        | primary  | default     | t              | t
(2 rows)

也可以在每个节点本地查询pg_dist_local_group获得本节点的groupid。

postgres=# select * from pg_dist_local_group;
 groupid 
---------
       1
(1 row)

CN节点和普通的Worker节点(非MX Worker)的pg_dist_local_group中查询到的groupid都为0.

17.在Citus MX通过本地执行提升性能

之前测试Citus MX架构的时候发现,当Citus MX节点上放分片时,性能比不放分片差一倍。
新版的Citus在这方面做了优化,当在Citus MX节点上访问本节点上的分片时,不再走新建一个到本地的数据库连接再读写分片的常规执行方式。
而是直接用当前连接访问分片。根据下面的测试数据,性能可以提升一倍。

https://github.com/citusdata/citus/pull/2938

 - Test 1: HammerDB test with 250 users, 1,000,000 transactions per. 8 Node Citus MX 
          - (a) With local execution: `System achieved 116473 PostgreSQL TPM at 160355 NOPM`   
          - (b) without local execution: ` System achieved 61392 PostgreSQL TPM at 100503 NOPM`

  - Test 2: HammerDB test with 250 users, 10,000,000 transactions per. 8 Node Citus MX 
           - (a) With local execution: `System achieved 91921 PostgreSQL TPM at 174557 NOPM`   
           - (b) without local execution: ` System achieved 84186 PostgreSQL TPM at 98408 NOPM`

- Test 3: Pgbench, 1 worker node, -c64 -c256 -T 120
            - (a) Local execution enabled (tps): `select-only`: 56202   `simple-update`:  11771 `tpcb-like`: 7796
            - (a) Local execution disabled (tps): `select-only`:  24524 `simple-update`: 5077  `tpcb-like`:   3510 (some connection errors for tpcb-like)

在我司的多CN部署方式下,扩展Worker上是不放分片的。所以这个优化和我们无关。

性能增强

18.替换real-time为新的执行器Adaptive Executor

Adaptive Executor是一个新的执行器,它和real-time的差异主要体现在可以通过参数对CN到worker的连接数进行控制。具体如下:

  1. citus.max_shared_pool_size

    可以通过`citus.max_shared_pool_size`控制CN(或MX Worker)在单个Worker上可同时建立的最大连接数,默认值等于CN的`max_connections`。
    达到连接数使用上限后,新的SQL请求可能等待,有些操作不受限制,比如COPY和重分区的Join。
    Citus MX架构下,单个Worker上同时接受到连接数最大可能是 `max_shared_pool_size * (1 + MX Worker节点数)`
  2. citus.max_adaptive_executor_pool_size

    可以通过`citus.max_adaptive_executor_pool_size`控制CN(或MX Worker)上的单个会话在单个Worker上可同时建立的最大连接数,默认值等于16。
  3. citus.max_cached_conns_per_worker

    可以通过`citus.max_cached_conns_per_worker`控制CN(或MX Worker)上的单个会话在事务结束后对每个Worker缓存的连接数,默认值等于1。
  4. citus.executor_slow_start_interval

    对于执行时间很短的多shard的SQL,并发开多个连接,不仅频繁创建销毁连接的消耗很高,也极大的消耗了worker上有限的连接资源。
    adaptive执行器,在执行多shard的SQL时,不是一次就创建出所有需要的连接数,而是先创建一部分,隔一段时间再创建一部分。
    中途如果有shard的任务提前完成了,它的连接可以被复用,就可以减少对新建连接的需求。
    因此执行多shard的SQL最少只需要一个连接,最多不超过`max_adaptive_executor_pool_size`,当然也不会超过目标worker上的shard数。
    
    这个算法叫"慢启动",慢启动的间隔由参数`citus.executor_slow_start_interval`控制,默认值为10ms。
    初始创建的连接数是:max(1,`citus.max_cached_conns_per_worker`),之后每批新建的连接数都在前一批的基础上加1。
    即默认情况下,每批新建的连接数依次为1,2,3,4,5,6...
    
    "慢启动"主要优化了短查询,对长查询(手册上给的标准是大于500ms),会增加一定的响应时间。
    
    

下面看几个例子

citus.max_shared_pool_size的使用示例

postgres=# alter system set citus.max_shared_pool_size to 4;
ALTER SYSTEM
postgres=# select pg_reload_conf();
 pg_reload_conf 
----------------
 t
(1 row)
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=11;
UPDATE 1
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        4
 127.0.0.1 | 9001 | postgres      |                        4
(2 rows)

citus.executor_slow_start_interval的使用示例

tb1总共有32个分片,每个worker上有16个分片。
初始每个worker上保持2个连接

postgres=# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        2
 127.0.0.1 | 9001 | postgres      |                        2
(2 rows)

citus.executor_slow_start_interval = '10ms'时,执行一个空表的update,只额外创建了2个新连接。

postgres=# set citus.executor_slow_start_interval='10ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        4
 127.0.0.1 | 9001 | postgres      |                        4
(2 rows)

citus.executor_slow_start_interval = '500ms'时,没有创建新的连接,都复用了一个缓存的连接

postgres=# set citus.executor_slow_start_interval='500ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        2
 127.0.0.1 | 9001 | postgres      |                        2
(2 rows)

citus.executor_slow_start_interval = '0ms'时,创建了比较多的新连接。

postgres=# set citus.executor_slow_start_interval = '0ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        5
 127.0.0.1 | 9001 | postgres      |                       14
(2 rows)

参考

adaptive执行器连接创建"慢启动"的代码参考:

citus-9.3.0/src/backend/distributed/executor/adaptive_executor.c:

static void
ManageWorkerPool(WorkerPool *workerPool)
{
...
        /* cannot open more than targetPoolSize connections */
        int maxNewConnectionCount = targetPoolSize - initiatedConnectionCount;//targetPoolSize的值为max(1,`citus.max_cached_conns_per_worker`)

        /* total number of connections that are (almost) available for tasks */
        int usableConnectionCount = UsableConnectionCount(workerPool);

        /*
         * Number of additional connections we would need to run all ready tasks in
         * parallel.
         */
        int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount;

        /*
         * Open enough connections to handle all tasks that are ready, but no more
         * than the target pool size.
         */
        newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);

        if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED)
        {
            if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >=
                ExecutorSlowStartInterval)
            {
                newConnectionCount = Min(newConnectionCount,
                                         workerPool->maxNewConnectionsPerCycle);

                /* increase the open rate every cycle (like TCP slow start) */
                workerPool->maxNewConnectionsPerCycle += 1;
            }
            else
            {
                /* wait a bit until opening more connections */
                return;
            }
        }

19.通过adaptive执行器执行重分布的Join

citus.enable_repartition_joins=on时,Citus支持通过数据重分布的方式执行非亲和Inner Join,
之前版本Citus会自动切换到task-tracker执行器执行重分布的Join,但是使用task-tracker执行器需要CN节点给Worker下发任务再不断检查任务完成状态,其额外消耗很大,响应时间非常长。

新版Citus改进后,可以通过adaptive执行器执行重分布的Join。

根据官网博客,1000w以下数据的重分布Join,性能提升了10倍。
详细参考:https://www.citusdata.com/blog/2020/03/02/citus-9-2-speeds-up-large-scale-htap/

我们自己的简单测试中,2张空表的重分布Join,之前需要16秒,现在只需要2秒。

20.支持重分布的方式执行INSERT...SELECT

表定义

create table tb1(id int, c1 int);
select create_distributed_table('tb1','id');
set citus.shard_count to 16;
create table tb2(id int primary key, c1 int);
select create_distributed_table('tb2','id');

tb1和tb2的分片数不一样,即它们不是亲和的。
此前,Citus必须把数据全拉到CN节点上中转。
新版Citus可以通过重分布的方式执行这个SQL,各个Worker之间直接互相传送数据,CN节点只执行工具函数驱动任务执行,性能可大幅提升。

postgres=# explain INSERT INTO tb2
  SELECT * from tb1
  ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1;
                                     QUERY PLAN                                     
------------------------------------------------------------------------------------
 Custom Scan (Citus INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
   INSERT/SELECT method: repartition
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=8)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=8)
(8 rows)

根据官网博客,这项优化使性能提升了5倍。
详细参考:https://www.citusdata.com/blog/2020/03/02/citus-9-2-speeds-up-large-scale-htap/

需要注意的是,如果插入时,需要在目标表上自动生成自增字段,Citus会退回到原来的执行方式,数据都会经过CN中转一下。

21.支持以轮询的方式访问参考表的多个副本

之前Citus查询参考表时,始终只访问参考表的第一个副本,新版Citus可以通过参数设置,在参考表多个副本轮询访问,均衡负载。

postgres=# set citus.task_assignment_policy TO "round-robin";
SET
postgres=# explain select * from tbref;
                                    QUERY PLAN                                    
----------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=127.0.0.1 port=9001 dbname=postgres
         ->  Seq Scan on tbref_102371 tbref  (cost=0.00..32.60 rows=2260 width=8)
(6 rows)

postgres=# explain select * from tbref;
                                    QUERY PLAN                                    
----------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=127.0.0.1 port=9002 dbname=postgres
         ->  Seq Scan on tbref_102371 tbref  (cost=0.00..32.60 rows=2260 width=8)
(6 rows)

citus.task_assignment_policy的默认值是greedy。greedy比较适合多副本的分布表。
对于涉及多个shard的SQL,每个shard都有多个可选的副本,在greedy策略下,
Citus会尽量确保每个worker分配到任务数相同。

具体实现时Citus一次轮询所有Worker,直到把所有shard任务都分配完。
因此对参考表这种只有一个shard的场景,greedy会导致其始终把任务分配给第一个worker。
详细可以参考GreedyAssignTaskList()函数的代码。

22.表数据导出优化

Citus导出数据时,中间结果会写到在CN上,而且CN从Worker拉数据是并行拉的,不过Worker还是CN负载都会很高。
新版Citus优化了COPY导出处理,依次从每个Worker上抽出数返回给客户端,中途数据不落盘。

但是这一优化只适用于下面这种固定形式的全表COPY到STDOUT的场景

COPY table tb1 to STDOUT

这可以大大优化pg_dump,延迟更低,内存使用更少。

集群管理增强

23.支持控制worker不分配shard

可以通过设置节点的shouldhaveshards属性控制某个节点不放分片。

SELECT master_set_node_property('127.0.0.1', 9002, 'shouldhaveshards', false);

shouldhaveshards属性会对后续创建新的分布表和参考表生效。
也会对后续执行的企业版Citus的rebalance功能生效,社区版不支持rebalance,但如果自研Citus部署和维护工具也可以利用这个参数。

  • 扩展Worker的实现逻辑改为使用这个参数,简化处理逻辑,不用先建好分布表后再挪分片。
  • 扩缩容脚本也可以使用这个参数决定Worker上是否放置分片,不需要区分是不是全部是扩展Worker的部署架构

24.支持使用master_update_node实施failover

采用主备流复制实现Worker高可用时,一般CN通过VIP访问Worker,worker主备切换时只需要漂移vip到新的主节点即可。
新版Citus提供了一个新的可选方案,通过master_update_node()函数修改某个worker的IP和Port。
这提供了一种新的不依赖VIP的Worker HA实现方案。

postgres=# \df master_update_node
List of functions
-[ RECORD 1 ]-------+-----------------------------------------------------------------------------------------------------------------------------
Schema              | pg_catalog
Name                | master_update_node
Result data type    | void
Argument data types | node_id integer, new_node_name text, new_node_port integer, force boolean DEFAULT false, lock_cooldown integer DEFAULT 10000
Type                | func

25.支持变更亲和定义

新版Citus可以在分布表创建后,修改亲和关系。

表定义

create table tba(id int,c1 int);
select create_distributed_table('tba','id');
create table tbb(id int,c1 int);
select create_distributed_table('tbb','id');
create table tbc(id text,c1 int);
select create_distributed_table('tbc','id');

tba和tbb这两个表是亲和的

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
(2 rows)

将tbb设置为新的亲和ID,打破它们的亲和关系

postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'none');
 update_distributed_table_colocation 
-------------------------------------
 
(1 row)

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |           14 | s
(2 rows)

重新设置它们亲和

postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'tba');
 update_distributed_table_colocation 
-------------------------------------
 
(1 row)

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
(2 rows)

也可以用批量将一组表设置为和某一个表亲和

postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb', 'tbc']);
ERROR:  cannot colocate tables tba and tbc
DETAIL:  Distribution column types don't match for tba and tbc.

tbc的分片字段类型不一致,不能亲和,去掉tbc再次执行成功。

postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb']);
 mark_tables_colocated 
-----------------------
 
(1 row)

26.支持truncate分布表的本地数据

把一个原来就有数据的本地表创建成分布表,会把原来的数据拷贝到各个shard上,但原始本地表上的数据不会删除,只是对用户不可见。

原来没有直接的办法删掉这些不需要的本地数据(可以通过临时篡改元数据的方式删),现在可以用一个函数实现。

SELECT truncate_local_data_after_distributing_table('tb1');

27. 延迟复制参考表副本

当新的Worker节点添加到Citus集群的时候,会同步参考表的副本到上面。
如果集群中存在比较大参考表,会导致添加Worker节点的时间不可控。
这可能使得用户不敢在业务高峰期扩容节点。

现在Citus可以支持把参考表的同步延迟到下次创建分片的的时候。
方法就是设置下面这个参数为off,它的默认值为on。

citus.replicate_reference_tables_on_activate = off

这样我们可以在白天扩容,夜里在后台同步数据。

28.创建集群范围一致的恢复点

之前我们备份Citus集群的时候,都是各个节点各自备份恢复,真发生故障,没办法恢复到一个集群范围的一致点。

现在可以使用下面的函数,创建一个全局的恢复点实行全局一致性备份。
使用方法类似于PG的pg_create_restore_point(),详细可参考手册。

select citus_create_restore_point('foo');

29.支持设置Citus集群节点间互联的连接选项

可以通过citus.node_conninfo参数设置Citus内节点间互连的一些非敏感的连接选项。
支持连接选项下面的libpq的一个子集。

  • application_name
  • connect_timeout
  • gsslib
  • keepalives
  • keepalives_count
  • keepalives_idle
  • keepalives_interval
  • krbsrvname
  • sslcompression
  • sslcrl
  • sslmode (defaults to “require” as of Citus 8.1)
  • sslrootcert

Citus 8.1以后,在支持SSL的PostgreSQL上,citus.node_conninfo的默认值为'sslmode=require'。
即默认开启了SSL。这是Citus出于安全的考虑,但是启用SSL后部署和维护会比较麻烦。
因此我们的部署环境下,需要将其修改为sslmode=prefer

postgres=# show citus.node_conninfo;
 citus.node_conninfo 
---------------------
 sslmode=prefer
(1 row)

30.默认关闭Citus统计收集

之前Citus的守护进程默认会收集Citus集群的一些元数据信息上报到CitusData公司的服务上(明显有安全问题)。
新版本把这个功能默认关闭了。当然更彻底的做法是在编译Citus的时候就把这个功能屏蔽掉。

postgres=# show citus.enable_statistics_collection;
 citus.enable_statistics_collection 
------------------------------------
 off
(1 row)

31. 增加查看集群范围活动的函数和视图

新版Citus提供了几个函数和视图,可以在CN上非常方便的查看整体Citus的当前活动状况

  • citus_remote_connection_stats()

查看所有worker上的来自CN节点和MX Worker节点的远程连接数。

postgres=# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        3
 127.0.0.1 | 9001 | postgres      |                        3
(2 rows)
  • citus_dist_stat_activity

查看从本CN节点或MX worker上发起的活动。这个视图在pg_stat_activity上附加了一些Citus相关的信息。

postgres=# select * from citus_dist_stat_activity;
-[ RECORD 1 ]----------+------------------------------
query_hostname         | coordinator_host
query_hostport         | 9000
master_query_host_name | coordinator_host
master_query_host_port | 9000
transaction_number     | 57
transaction_stamp      | 2020-06-19 15:05:22.142242+08
datid                  | 13593
datname                | postgres
pid                    | 2574
usesysid               | 10
usename                | postgres
application_name       | psql
client_addr            | 
client_hostname        | 
client_port            | -1
backend_start          | 2020-06-19 10:57:58.472994+08
xact_start             | 2020-06-19 15:05:17.45487+08
query_start            | 2020-06-19 15:05:22.140954+08
state_change           | 2020-06-19 15:05:22.140957+08
wait_event_type        | Client
wait_event             | ClientRead
state                  | active
backend_xid            | 
backend_xmin           | 5114
query                  | select * from tb1;
backend_type           | client backend

注意上面的transaction_number,它代表一个事务号。
涉及更新的SQL,事务块中查询和push-pull方式执行的查询都会分配一个非0的事务号。
通过这个事务号,我们可以很容易地识别出所有worker上来自同一SQL(或事务)的活动。

详细参考下面的注释。(这段注释应该写错了,下面2类SQL的区别不是是否能被'show',而是transaction_number是否非0)
citus-9.3.0/src/backend/distributed/transaction/citus_dist_stat_activity.c

 *  An important note on this views is that they only show the activity
 *  that are inside distributed transactions. Distributed transactions
 *  cover the following:
 *     - All multi-shard modifications (DDLs, COPY, UPDATE, DELETE, INSERT .. SELECT)
 *     - All multi-shard queries with CTEs (modifying CTEs, read-only CTEs)
 *     - All recursively planned subqueries
 *     - All queries within transaction blocks (BEGIN; query; COMMMIT;)
 *
 *  In other words, the following types of queries won't be observed in these
 *  views:
 *      - Single-shard queries that are not inside transaction blocks
 *      - Multi-shard select queries that are not inside transaction blocks
 *      - Task-tracker queries
  • citus_worker_stat_activity

查看所有worker上的活动。排除非citus会话,即不经过CN或MX worker直连worker的会话。

我们可以指定transaction_number查看特定SQL在worker上的活动。

postgres=# select * from citus_worker_stat_activity where transaction_number = 57;
-[ RECORD 1 ]----------+---------------------------------------------
query_hostname         | 127.0.0.1
query_hostport         | 9001
master_query_host_name | coordinator_host
master_query_host_port | 9000
transaction_number     | 57
transaction_stamp      | 2020-06-19 15:05:22.142242+08
datid                  | 13593
datname                | postgres
pid                    | 4108
usesysid               | 10
usename                | postgres
application_name       | citus
client_addr            | 127.0.0.1
client_hostname        | 
client_port            | 33676
backend_start          | 2020-06-19 15:05:22.162829+08
xact_start             | 2020-06-19 15:05:22.168811+08
query_start            | 2020-06-19 15:05:22.171398+08
state_change           | 2020-06-19 15:05:22.172237+08
wait_event_type        | Client
wait_event             | ClientRead
state                  | idle in transaction
backend_xid            | 
backend_xmin           | 
query                  | SELECT id, c1 FROM tb1_102369 tb1 WHERE true
backend_type           | client backend
...
  • citus_lock_waits

查看Citus集群内的被阻塞的查询。下面引用Ciuts手册上的例子

表定义

CREATE TABLE numbers AS
  SELECT i, 0 AS j FROM generate_series(1,10) AS i;
SELECT create_distributed_table('numbers', 'i');

使用2个会话终端,顺序执行下面的SQL。

-- session 1                           -- session 2
-------------------------------------  -------------------------------------
BEGIN;
UPDATE numbers SET j = 2 WHERE i = 1;
                                       BEGIN;
                                       UPDATE numbers SET j = 3 WHERE i = 1;
                                       -- (this blocks)

通过citus_lock_waits可以看到,这2个查询是阻塞状态。

SELECT * FROM citus_lock_waits;
-[ RECORD 1 ]-------------------------+----------------------------------------
waiting_pid                           | 88624
blocking_pid                          | 88615
blocked_statement                     | UPDATE numbers SET j = 3 WHERE i = 1;
current_statement_in_blocking_process | UPDATE numbers SET j = 2 WHERE i = 1;
waiting_node_id                       | 0
blocking_node_id                      | 0
waiting_node_name                     | coordinator_host
blocking_node_name                    | coordinator_host
waiting_node_port                     | 5432
blocking_node_port                    | 5432

这个视图只能在CN节点查看,MX worker节点查不到数据。但是并不要求阻塞所涉及的SQL必须从CN节点发起。

详细参考:https://docs.citusdata.com/en/v9.3/develop/api_metadata.html?highlight=citus_worker_stat_activity#distributed-query-activity

32. 增加查看表元数据的函数和视图

  • master_get_table_metadata()

查看分布表的元数据

postgres=# select * from master_get_table_metadata('tb1');
-[ RECORD 1 ]---------+-----------
logical_relid         | 17148
part_storage_type     | t
part_method           | h
part_key              | id
part_replica_count    | 1
part_max_size         | 1073741824
part_placement_policy | 2
  • get_shard_id_for_distribution_column()

查看某个分布列值对应的shardid

postgres=# SELECT get_shard_id_for_distribution_column('tb1', 4);
 get_shard_id_for_distribution_column 
--------------------------------------
                               102347
(1 row)

其他

33. 允许在CN备库执行简单的DML

通过设置citus.writable_standby_coordinator参数为on,可以在CN的备库上执行部分简单的DML。
看下下面的例子

表定义

create table tbl(id int,c1 int);
select create_distributed_table('tbserial','id');

在CN备节点上可以执行带分片字段的DML

postgres=# insert into tb1 values(3,3);
ERROR:  writing to worker nodes is not currently allowed
DETAIL:  the database is in recovery mode 
postgres=# set citus.writable_standby_coordinator TO ON;
SET
postgres=# insert into tb1 values(3,3);
INSERT 0 1
postgres=# update tb1 set c1=20 where id=3;
UPDATE 1
postgres=# delete from tb1 where id=3;
DELETE 1

不支持不带分片字段的UPDATE和DELETE

postgres=# update tb1 set c1=20;
ERROR:  cannot assign TransactionIds during recovery
postgres=# delete from tb1 where c1=20;
ERROR:  cannot assign TransactionIds during recovery

也不支持跨节点的事务

postgres=# begin;
BEGIN
postgres=*# insert into tb1 values(3,3);
INSERT 0 1
postgres=*# insert into tb1 values(4,4);
INSERT 0 1
postgres=*# commit;
ERROR:  cannot assign TransactionIds during recovery

对于2pc的分布式事务,Citus需要将事务信息记录到事务表pg_dist_transaction中。
所以,Citus也无法在CN备节点上支持2pc的分布式事务。

但是如果切换成1pc提交模式,还是可以支持跨节点事务的。

postgres=# set citus.multi_shard_commit_protocol TO '1pc';
SET
postgres=# begin;
BEGIN
postgres=*# insert into tb1 values(4,4);
INSERT 0 1
postgres=*# insert into tb1 values(5,5);
INSERT 0 1
postgres=*# commit;

并且在1pc提交模式下,跨多个分片的SQL也是支持的。

postgres=# set citus.multi_shard_commit_protocol TO '1pc';
SET
postgres=# update tb1 set c1=10;
UPDATE 3
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
存储 SQL 安全
分布式 PostgreSQL,Citus(11.x) 效用函数
分布式 PostgreSQL,Citus(11.x) 效用函数
569 0
|
存储 SQL 负载均衡
Citus 11 for Postgres 完全开源,可从任何节点查询(Citus 官方博客)
Citus 11.0 来了! Citus 是一个 PostgreSQL 扩展,它为 PostgreSQL 添加了分布式数据库的超能力。 使用 Citus,您可以创建跨 PostgreSQL 节点集群透明分布或复制的表。 Citus 11.0 是一个新的主版本,这意味着它带有一些非常令人兴奋的新功能,可以实现更高级别的可扩展性。
453 1
Citus 11 for Postgres 完全开源,可从任何节点查询(Citus 官方博客)
|
SQL 存储 关系型数据库
Citus 分布式 PostgreSQL 集群 - SQL Reference(查询处理)
Citus 分布式 PostgreSQL 集群 - SQL Reference(查询处理)
151 0
|
SQL 并行计算 关系型数据库
Citus 分布式 PostgreSQL 集群 - SQL Reference(SQL支持和变通方案)
Citus 分布式 PostgreSQL 集群 - SQL Reference(SQL支持和变通方案)
146 0
|
SQL 缓存 关系型数据库
分布式 PostgreSQL,Citus 11.x SQL 参考(中文手册)
分布式 PostgreSQL,Citus 11.x SQL 参考(中文手册)
519 0
|
SQL 关系型数据库 物联网
Hyperscale (Citus) ,分布式 PostgreSQL 实战指南
Hyperscale (Citus) ,分布式 PostgreSQL 实战指南
260 0
|
SQL 缓存 关系型数据库
Citus 分布式 PostgreSQL 集群 - SQL Reference(手动查询传播)
Citus 分布式 PostgreSQL 集群 - SQL Reference(手动查询传播)
157 0
|
SQL 存储 关系型数据库
Citus 简介,将 Postgres 转换为分布式数据库
Citus 简介,将 Postgres 转换为分布式数据库
228 0
|
存储 SQL 缓存
分布式 PostgreSQL - Citus 架构及概念
分布式 PostgreSQL - Citus 架构及概念
782 0
分布式 PostgreSQL - Citus 架构及概念
|
存储 SQL JSON
Citus 分布式 PostgreSQL 集群 - SQL Reference(查询分布式表 SQL)
Citus 分布式 PostgreSQL 集群 - SQL Reference(查询分布式表 SQL)
177 0