citus实战系列之三平滑扩容

简介: 对一个分布式数据库来说,动态扩缩容是不可回避的需求。但是citus的动态扩缩容功能只在企业版中才有。好消息是,citus的分片信息是存储在元数据表里的,通过修改元数据表,我们完全可以在citus社区版上实现动态的平滑扩缩容。

citus实战系列之三平滑扩容

前言

对一个分布式数据库来说,动态扩缩容是不可回避的需求。但是citus的动态扩缩容功能只在企业版中才有。好消息是,citus的分片信息是存储在元数据表里的,通过修改元数据表,我们完全可以在citus社区版上实现动态的平滑扩缩容。

环境

软件

  • CentOS 7.4
  • PostgreSQL 10
  • citus 7.4

集群架构(扩容前)

  • cituscn

    • cituswk1
    • cituswk2

集群架构(扩容后)

  • cituscn

    • cituswk1
    • cituswk2
    • cituswk3

实验环境可参考《citus实战系列之二实验环境搭建》搭建。

原理概述

citus提供了现成的管理函数可以添加新的worker节点,但现有的分片表和参考表却不会自动分布到新加的worker上。
我们需要手动移动这些分片,并且要保证分片移动过程中不中断业务。主要过程可以分为以下几个步骤

  1. 表复制

    • 在移动目标分片的源端和目的端建立复制
  2. 元数据切换

    • 加锁,阻塞相关的分片表的数据变更
    • 修改pg_dist_shard_placement元数据表,变更分片位置信息。
  3. 清理

    • DROP切换前的旧的分片

表复制采用PostgreSQL的逻辑复制实现,因此所有worker节点必须预先打开逻辑复制开关。

wal_level = logical

注1:citus在添加新worker节点时已经在新worker上拷贝了参考表,不需要再人工处理。

注2:扩容时,如果把worker数翻倍,也可以用物理复制实现。使用物理复制时,如果有参考表不能调用master_add_node添加节点,必须手动修改元数据表。逻辑复制不支持复制DDL,物理复制没有这个限制,但物理复制没有逻辑复制灵活,只支持worker粒度的扩容,而且不能实现缩容。

分片表扩容操作步骤

创建测试分片表

创建以下测试分片表

create table tb1(id int primary key, c1 int);
set citus.shard_count=8;
select create_distributed_table('tb1','id');
insert into tb1 select id,random()*1000 from generate_series(1,100)id;

检查分片位置

postgres=# select * from pg_dist_placement where shardid in (select shardid from pg_dist_shard where logicalrelid='tb1'::regclass);
 placementid | shardid | shardstate | shardlength | groupid 
-------------+---------+------------+-------------+---------
          33 |  102040 |          1 |           0 |       1
          34 |  102041 |          1 |           0 |       2
          35 |  102042 |          1 |           0 |       1
          36 |  102043 |          1 |           0 |       2
          37 |  102044 |          1 |           0 |       1
          38 |  102045 |          1 |           0 |       2
          39 |  102046 |          1 |           0 |       1
          40 |  102047 |          1 |           0 |       2
(8 rows)

上面的groupid代表了对应哪个worker

postgres=# select * from pg_dist_node;
 nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster 
--------+---------+----------+----------+----------+-------------+----------+----------+-------------
      1 |       1 | cituswk1 |     5432 | default  | f           | t        | primary  | default
      2 |       2 | cituswk2 |     5432 | default  | f           | t        | primary  | default
(2 rows)

添加新的worker

在CN节点上执行以下SQL,将新的worker节点cituswk3加入到集群中

SELECT * from master_add_node('cituswk3', 5432);

检查pg_dist_node元数据表。新的worker节点的groupid为4

postgres=# select * from pg_dist_node;
 nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster 
--------+---------+----------+----------+----------+-------------+----------+----------+-------------
      1 |       1 | cituswk1 |     5432 | default  | f           | t        | primary  | default
      2 |       2 | cituswk2 |     5432 | default  | f           | t        | primary  | default
      4 |       4 | cituswk3 |     5432 | default  | f           | t        | primary  | default
(3 rows)

复制分片

目前cituswk1和cituswk2上各有4个分片,cituswk3上没有分片,为了保持数据分布均匀可以移动部分分片到cituswk3上。

下面移动cituswk1上的分片102046到cituswk3上。

在cituswk1上创建PUBLICATION

CREATE PUBLICATION pub_shard FOR TABLE tb1_102046;

在cituswk3上创建分片表和SUBSCRIPTION

create table tb1_102046(id int primary key, c1 int);
CREATE SUBSCRIPTION sub_shard
    CONNECTION 'host=cituswk1'
    PUBLICATION pub_shard;

切换元数据

锁表,阻止应用修改表

lock table tb1 IN EXCLUSIVE MODE;

等待数据完全同步后,修改元数据

update pg_dist_placement set groupid=4 where shardid=102046 and groupid=1;

清理

在cituswk1上删除分片表和PUBLICATION

DROP PUBLICATION pub_shard;
drop table tb1_102046;

在cituswk3上删除SUBSCRIPTION

DROP SUBSCRIPTION sub_shard;

分片表缩容操作步骤

参考分片表扩容的步骤,将要删除的worker(cituswk3)上的分片(102046)移到其它worker(cituswk1)上,然后删除worker(cituswk3)。

select master_remove_node('cituswk3',5432);

亲和性表的处理

citus的分片表之间存在亲和性关系,具有亲和性(即colocationid相同)的所有分片表的同一范围的分片其所在位置必须相同。
移动某个分片时,必须将这些亲和分片捆绑移动。可以通过以下SQL查出某个分片的所有亲和分片。

postgres=# select * from pg_dist_shard where logicalrelid in(select logicalrelid from pg_dist_partition where colocationid=(select colocationid from pg_dist_partition where partmethod='h' and logicalrelid='tb1'::regclass)) and (shardminvalue,shardmaxvalue)=(select shardminvalue,shardmaxvalue from pg_dist_shard where shardid=102046);
 logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue 
--------------+---------+--------------+---------------+---------------
 tb1          |  102046 | t            | 1073741824    | 1610612735
 tb2          |  102055 | t            | 1073741824    | 1610612735
(2 rows)

对应的分片表元数据如下:

postgres=# select logicalrelid,partmethod,colocationid from pg_dist_partition;
 logicalrelid | partmethod | colocationid 
--------------+------------+--------------
 tb1          | h          |            2
 tb2          | h          |            2
 tb3          | h          |            4
(3 rows)

自动化

在实际生产环境中,citus集群中可能会存储了非常多的表,每个表又拆成了非常多的分片。如果按照上面的步骤手工对citus扩缩容,将是一件非常痛苦的事情,也很容易出错。所以需要将这些步骤打包成自动化程序。

citus企业版在扩缩容时利用了一个叫master_move_shard_placement()的函数迁移分片,我们可以实现一个接口类似的函数citus_move_shard_placement()

https://github.com/ChenHuajun/chenhuajun.github.io/blob/master/_posts/2018-05-23/citus_move_shard_placement.sql

    CREATE TYPE citus.old_shard_placement_drop_method AS ENUM (
       'none', -- do not drop or rename old shards, only record it into citus.citus_move_shard_placement_remained_old_shard
       'rename', -- move old shards to schema "citus_move_shard_placement_recyclebin"
       'drop' -- drop old shards in source node
    );
    
    CREATE TABLE citus.citus_move_shard_placement_remained_old_shard(
        id serial primary key,
        optime timestamptz NOT NULL default now(),
        nodename text NOT NULL,
        nodeport text NOT NULL,
        tablename text NOT NULL,
        drop_method citus.old_shard_placement_drop_method NOT NULL
    );
    
    -- move this shard and it's all colocated shards from source node to target node.
    -- drop_method define how to process old shards in the source node, default is 'none' which does not block SELECT.
    -- old shards should be drop in the future will be recorded into table citus.citus_move_shard_placement_remained_old_shard
    CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement(shard_id bigint,
                                                  source_node_name text,
                                                  source_node_port integer,
                                                  target_node_name text,
                                                  target_node_port integer,
                                                  drop_method citus.old_shard_placement_drop_method DEFAULT 'none')
    RETURNS void
    AS $citus_move_shard_placement$
    ...
    这部分太长了,略过
    ...
    $citus_move_shard_placement$ LANGUAGE plpgsql SET search_path = 'pg_catalog','public';
    
    -- drop old shards in source node 
    CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement_cleanup()
    RETURNS void
    AS $$
        BEGIN
            delete from citus.citus_move_shard_placement_remained_old_shard where id in
                (select id 
                 from (select id,dblink_exec('host='||nodename || ' port='||nodeport,'DROP TABLE IF EXISTS ' || tablename) drop_result 
                       from citus.citus_move_shard_placement_remained_old_shard)a 
                 where drop_result='DROP TABLE');
    
            PERFORM run_command_on_workers('DROP SCHEMA IF EXISTS citus_move_shard_placement_recyclebin  CASCADE');
        END;
    $$ LANGUAGE plpgsql SET search_path = 'pg_catalog','public';

注:上面的工具函数未经过严格的测试,并且不支持后面的多CN架构。

下面是一个使用的例子

把102928分片从cituswk1迁移到cituswk2,drop_method使用rename旧的分片不删除而是移到名为citus_move_shard_placement_recyclebin的schema下。

postgres=# select citus_move_shard_placement(102928,'cituswk1',5432,'cituswk2',5432,'rename');
NOTICE:  BEGIN move shards(102928,102944) from cituswk1:5432 to cituswk2:5432
NOTICE:  [1/2] LOCK TABLE scale_test.tb_dist2 IN SHARE UPDATE EXCLUSIVE MODE ...
NOTICE:  [2/2] LOCK TABLE scale_test.tb_dist IN SHARE UPDATE EXCLUSIVE MODE ...
NOTICE:  CREATE PUBLICATION in source node cituswk1:5432
NOTICE:  create shard table in the target node cituswk2:5432
NOTICE:  CREATE SUBSCRIPTION on target node cituswk2:5432
NOTICE:  wait for init data sync...
NOTICE:  init data sync in 00:00:01.010502
NOTICE:  [1/2] LOCK TABLE scale_test.tb_dist2 IN EXCLUSIVE MODE ...
NOTICE:  [2/2] LOCK TABLE scale_test.tb_dist IN EXCLUSIVE MODE ...
NOTICE:  wait for data sync...
NOTICE:  data sync in 00:00:00.273212
NOTICE:  UPDATE pg_dist_placement
NOTICE:  DROP SUBSCRIPTION and PUBLICATION
NOTICE:  END
 citus_move_shard_placement 
----------------------------
 
(1 row)

从上面的输出可以看出,有一个步骤是锁表,这段时间内所有SQL都会被阻塞。对分析型业务来说,几十秒甚至更长SQL执行时间是很常见的,这意味着有可能出现先拿到一个表的锁,再拿下一个锁时,等了几十秒。更糟糕的情况下还可能发生死锁。
回避这种风险的办法是将drop_method设置为none,这也是默认值。drop_methodnone时将会改为获取一个EXCLUSIVE锁,EXCLUSIVE锁和SELECT不会冲突。这大大降低了分片迁移对业务的影响,死锁发生的概率也同样大大降低(仅有可能发生在应用程序在一个事务里先后更新了2张分片表时)。

确认扩容成功后,删除残留的旧分片(drop_methoddrop时不需要清理)。

postgres=# select citus_move_shard_placement_cleanup();
 citus_move_shard_placement_cleanup 
------------------------------------
 
(1 row)
相关文章
|
3月前
|
运维 监控 安全
【TiDB原理与实战详解】2、部署与节点的扩/缩容~学不会? 不存在的!
TiUP 是 TiDB 4.0 引入的集群运维工具,TiUP cluster 用于部署、管理 TiDB 集群,支持 TiDB、TiFlash、TiDB Binlog 等组件。本文介绍使用 TiUP 部署生产环境的具体步骤,包括节点规划、工具安装、配置文件修改及集群部署等。同时,提供了常用命令和安全优化方法,并详细说明了如何进行集群的扩缩容操作,以及时区设置等维护工作。
|
6月前
|
存储 运维 监控
大长案例 - 经典长连接可水平扩容高可用架构
大长案例 - 经典长连接可水平扩容高可用架构
88 0
|
12月前
|
NoSQL Redis
轻松掌握组件启动之Redis集群扩展秘籍:轻松扩容与缩容,释放高性能潜能
在这篇文章中,我们将揭示Redis集群的扩容和缩容操作,让您的Redis集群发挥最佳性能和可伸缩性。通过增加主节点和从节点,并将它们无缝添加到集群中,您将能够轻松扩展您的Redis集群以满足不断增长的需求。同时,我们还将探讨如何进行缩容操作,即删除节点,以优化集群资源的利用。无论您是初学者还是经验丰富的Redis用户,本文将为您提供一系列有用的技巧和最佳实践,帮助您更好地管理和优化Redis集群,实现更高的性能和可扩展性。
244 0
|
运维 负载均衡 OceanBase
第四章:OceanBase集群技术架构(动态扩容和缩容)
第四章:OceanBase集群技术架构(动态扩容和缩容)
575 0
|
关系型数据库 MySQL 测试技术
动态扩容缩容的分库分表我想说 | 青训营笔记
动态扩容缩容的分库分表我想说 | 青训营笔记
88 0
|
消息中间件 弹性计算 固态存储
256变4096:分库分表扩容如何实现平滑数据迁移?
本文作者就一个高德打车弹外订单系统进行了一次扩分库分表和数据库迁移。
256变4096:分库分表扩容如何实现平滑数据迁移?
EMQ
|
消息中间件 运维 负载均衡
EMQX Enterprise 4.4.12&4.4.13 发布:集群负载重平衡、TDengine 3.0 适配以及子表批量插入
EMQX Enterprise 4.4.12与4.4.13版本发布:集群负载重平衡与节点疏散功能为运维人员提供更灵活的集群管理方式,适配TDengine 3.0版本并新增分表批量插入功能,以提供更高的数据集成吞吐。
EMQ
268 0
EMQX Enterprise 4.4.12&4.4.13 发布:集群负载重平衡、TDengine 3.0 适配以及子表批量插入
|
存储 关系型数据库 分布式数据库
Paper Reading 预告 | 揭秘 PolarDB 计算存储分离架构性能优化之路
12月29日 19:00 锁定「阿里云数据库视频号」揭秘PolarDB计算存储分离架构性能优化之路
Paper Reading 预告 | 揭秘 PolarDB 计算存储分离架构性能优化之路
|
关系型数据库 MySQL 中间件
|
SQL 并行计算 Oracle
【笔记】最佳实践—混合负载HTAP的实践和优化
背景信息 本文主要提供数据库上云后OLTP+OLAP一体化架构的最佳实践,既HTAP。主要面对的业务应用范围: 混合型HTAP数据库需求:如ORACLE数据库改造上云,云上数据库方案选型; OLTP系统查询慢,存在分析型场景和瓶颈的客户; 读写分离需求。
263 0
【笔记】最佳实践—混合负载HTAP的实践和优化