PostgreSQL 普通表在线转换为分区表 - online exchange to partition table

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: PostgreSQL 普通表在线转换为分区表 - online exchange to partition table

背景

非分区表,如何在线(不影响业务)转换为分区表?

方法1,pg_pathman分区插件

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

使用非堵塞式的迁移接口

partition_table_concurrently(  
  relation   REGCLASS,              -- 主表OID  
  batch_size INTEGER DEFAULT 1000,  -- 一个事务批量迁移多少记录  
  sleep_time FLOAT8 DEFAULT 1.0)    -- 获得行锁失败时,休眠多久再次获取,重试60次退出任务。  
  
  
postgres=# select partition_table_concurrently('part_test'::regclass,  
                             10000,  
                             1.0);  
NOTICE:  worker started, you can stop it with the following command: select stop_concurrent_part_task('part_test');  
 partition_table_concurrently   
------------------------------  
   
(1 row)  

迁移结束后,主表数据已经没有了,全部在分区中

postgres=# select count(*) from only part_test;  
 count   
-------  
     0  
(1 row)  

数据迁移完成后,建议禁用主表,这样执行计划就不会出现主表了

postgres=# select set_enable_parent('part_test'::regclass, false);  
 set_enable_parent   
-------------------  
   
(1 row)  

方法2,原生分区

使用继承表,触发器,异步迁移,交换表名一系列步骤,在线将非分区表,转换为分区表(交换表名是需要短暂的堵塞)。

关键技术:

1、继承表(子分区)

对select, update, delete, truncate, drop透明。

2、触发器

插入,采用before触发器,数据路由到继承分区

更新,采用before触发器,删除老表记录,同时将更新后的数据插入新表

3、后台迁移数据,cte only skip locked , delete only, insert into new table

4、迁移结束(p表没有数据后),短暂上锁,剥离INHERTI关系,切换到原生分区,切换表名。

例子

将一个表在线转换为LIST分区表(伪HASH分区)。

范围分区类似。

如果要转换为原生HASH分区表,需要提取pg内置HASH分区算法。

1、创建测试表(需要被分区的表)

create table old (id int primary key, info text, crt_time timestamp);  

2、写入1000万测试记录

insert into old select generate_series(1,10000000) , md5(random()::text) , now();  

3、创建子分区(本例使用LIST分区)

do language plpgsql $$    
declare    
  parts int := 4;    
begin    
  for i in 0..parts-1 loop    
    execute format('create table old_mid%s (like old including all) inherits (old)', i);    
    execute format('alter table old_mid%s add constraint ck check(abs(mod(id,%s))=%s)', i, parts, i);    
  end loop;    
end;    
$$;    

4、插入,采用before触发器,路由到新表

create or replace function ins_tbl() returns trigger as $$    
declare    
begin    
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
    else    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  return null;    
end;    
$$ language plpgsql strict;    
  
  
create trigger tg1 before insert on old for each row execute procedure ins_tbl();    

5、更新,采用before触发器,删除老表,同时将更新后的数据插入新表

create or replace function upd_tbl () returns trigger as $$  
declare  
begin  
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
    else    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  
  delete from only old where id=NEW.id;  
  return null;    
end;    
$$ language plpgsql strict;    
  
create trigger tg2 before update on old for each row execute procedure upd_tbl();    

6、old table 如下

postgres=# \dt+ old  
                    List of relations  
 Schema | Name | Type  |  Owner   |  Size  | Description   
--------+------+-------+----------+--------+-------------  
 public | old  | table | postgres | 730 MB |   
(1 row)  
  
  
继承关系如下  
  
  
postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           | not null |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Indexes:  
    "old_pkey" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 BEFORE INSERT ON old FOR EACH ROW EXECUTE PROCEDURE ins_tbl()  
    tg2 BEFORE UPDATE ON old FOR EACH ROW EXECUTE PROCEDURE upd_tbl()  
Child tables: old_mid0,  
              old_mid1,  
              old_mid2,  
              old_mid3  

7、验证insert, update, delete, select完全符合要求。对业务SQL请求透明。

postgres=# insert into old values (0,'test',now());  
INSERT 0 0  
  
postgres=# select tableoid::regclass,* from old where id=1;  
 tableoid | id |               info               |         crt_time            
----------+----+----------------------------------+---------------------------  
 old      |  1 | 22be06200f2a967104872f6f173fd038 | 31-JAN-19 12:52:25.887242  
(1 row)  
  
postgres=# select tableoid::regclass,* from old where id=0;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | test | 31-JAN-19 13:02:35.859899  
(1 row)  
postgres=# update old set info='abc' where id in (0,2) returning tableoid::regclass,*;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:02:35.859899  
(1 row)  
  
UPDATE 1  
  
postgres=# select tableoid::regclass,* from old where id in (0,2);  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:12:03.343559  
 old_mid2 |  2 | abc  | 31-JAN-19 13:11:04.763652  
(2 rows)  
postgres=# delete from old where id=3;  
DELETE 1  
postgres=# select tableoid::regclass,* from old where id=3;  
 tableoid | id | info | crt_time   
----------+----+------+----------  
(0 rows)  

8、开启压测,后台对原表数据进行迁移

create or replace function test_ins(int) returns void as $$  
declare  
begin  
  insert into old values ($1,'test',now());  
  exception when others then  
  return;  
end;  
$$ language plpgsql strict;  
vi test.sql  
  
\set id1 random(10000001,200000000)  
\set id2 random(1,5000000)  
\set id3 random(5000001,10000000)  
delete from old where id=:id2;  
update old set info=md5(random()::text),crt_time=now() where id=:id3;  
select test_ins(:id1);  

开启压测

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 4 -j 4 -T 1200  
  
...  
  
progress: 323.0 s, 12333.1 tps, lat 0.324 ms stddev 0.036  
progress: 324.0 s, 11612.9 tps, lat 0.344 ms stddev 0.203  
progress: 325.0 s, 12546.0 tps, lat 0.319 ms stddev 0.061  
progress: 326.0 s, 12728.7 tps, lat 0.314 ms stddev 0.038  
progress: 327.0 s, 12536.9 tps, lat 0.319 ms stddev 0.040  
progress: 328.0 s, 12534.1 tps, lat 0.319 ms stddev 0.042  
progress: 329.0 s, 12228.1 tps, lat 0.327 ms stddev 0.047  
...  

9、在线迁移数据

批量迁移,每一批迁移N条。调用以下SQL

with a as (  
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
  
INSERT 0 0  
  
postgres=# select count(*) from only old;  
  count    
---------  
 9998998  
(1 row)  
  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
postgres=# with a as (                     
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
  
postgres=# select count(*) from only old;  
  count    
---------  
 9997998  
(1 row)  
  
postgres=# with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 100000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from only old;  
  count    
---------  
 9897998  
(1 row)  
  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  

一次迁移1万条,分批操作。

with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 10000 for update skip locked) ) returning *  
)  
insert into old select * from a;  

持续调用以上接口,直到当old表已经没有数据,完全迁移到了分区。

select count(*) from only old;  
  
  
 count   
-------  
     0  
(1 row)  

10、切换到分区表

创建分区表如下,分区方法与继承约束一致。

create table new (id int, info text, crt_time timestamp) partition by list (abs(mod(id,4)));    

切换表名,防止雪崩,使用锁超时,由于只涉及表名变更,所以速度非常快。

begin;  
set lock_timeout ='3s';   
alter table old_mid0 no inherit old;   
alter table old_mid1 no inherit old;   
alter table old_mid2 no inherit old;   
alter table old_mid3 no inherit old;   
alter table old rename to old_tmp;  
alter table new rename to old;  
alter table old ATTACH PARTITION old_mid0 for values in (0);    
alter table old ATTACH PARTITION old_mid1 for values in (1);    
alter table old ATTACH PARTITION old_mid2 for values in (2);    
alter table old ATTACH PARTITION old_mid3 for values in (3);    
end;  

切换后的原生分区表如下

postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           |          |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Partition key: LIST (abs(mod(id, 4)))  
Partitions: old_mid0 FOR VALUES IN (0),  
            old_mid1 FOR VALUES IN (1),  
            old_mid2 FOR VALUES IN (2),  
            old_mid3 FOR VALUES IN (3)  

查询测试

postgres=# explain select * from old where id=1;  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..10.04 rows=4 width=44)  
   ->  Index Scan using old_mid0_pkey on old_mid0  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid2_pkey on old_mid2  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid3_pkey on old_mid3  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
(9 rows)  
  
  
  
postgres=# explain select * from old where id=? and abs(mod(id, 4)) = abs(mod(?, 4));  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..2.52 rows=1 width=45)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
         Filter: (mod(id, 4) = 1)  
(4 rows)  

数据

postgres=# select count(*) from old;  
  count     
----------  
 10455894  
(1 row)  

方法3,logical replication

使用逻辑复制的方法,同步到分区表。

简单步骤如下:

snapshot 快照(lsn位点)  
  
全量  
  
增量(逻辑复制,从LSN位置开始解析WAL LOG)  
  
切换表名

其他

hash函数

postgres=# \df *.*hash*  
                                            List of functions  
   Schema   |           Name           | Result data type |          Argument data types          | Type   
------------+--------------------------+------------------+---------------------------------------+------  
 pg_catalog | hash_aclitem             | integer          | aclitem                               | func  
 pg_catalog | hash_aclitem_extended    | bigint           | aclitem, bigint                       | func  
 pg_catalog | hash_array               | integer          | anyarray                              | func  
 pg_catalog | hash_array_extended      | bigint           | anyarray, bigint                      | func  
 pg_catalog | hash_numeric             | integer          | numeric                               | func  
 pg_catalog | hash_numeric_extended    | bigint           | numeric, bigint                       | func  
 pg_catalog | hash_range               | integer          | anyrange                              | func  
 pg_catalog | hash_range_extended      | bigint           | anyrange, bigint                      | func  
 pg_catalog | hashbpchar               | integer          | character                             | func  
 pg_catalog | hashbpcharextended       | bigint           | character, bigint                     | func  
 pg_catalog | hashchar                 | integer          | "char"                                | func  
 pg_catalog | hashcharextended         | bigint           | "char", bigint                        | func  
 pg_catalog | hashenum                 | integer          | anyenum                               | func  
 pg_catalog | hashenumextended         | bigint           | anyenum, bigint                       | func  
 pg_catalog | hashfloat4               | integer          | real                                  | func  
 pg_catalog | hashfloat4extended       | bigint           | real, bigint                          | func  
 pg_catalog | hashfloat8               | integer          | double precision                      | func  
 pg_catalog | hashfloat8extended       | bigint           | double precision, bigint              | func  
 pg_catalog | hashhandler              | index_am_handler | internal                              | func  
 pg_catalog | hashinet                 | integer          | inet                                  | func  
 pg_catalog | hashinetextended         | bigint           | inet, bigint                          | func  
 pg_catalog | hashint2                 | integer          | smallint                              | func  
 pg_catalog | hashint2extended         | bigint           | smallint, bigint                      | func  
 pg_catalog | hashint4                 | integer          | integer                               | func  
 pg_catalog | hashint4extended         | bigint           | integer, bigint                       | func  
 pg_catalog | hashint8                 | integer          | bigint                                | func  
 pg_catalog | hashint8extended         | bigint           | bigint, bigint                        | func  
 pg_catalog | hashmacaddr              | integer          | macaddr                               | func  
 pg_catalog | hashmacaddr8             | integer          | macaddr8                              | func  
 pg_catalog | hashmacaddr8extended     | bigint           | macaddr8, bigint                      | func  
 pg_catalog | hashmacaddrextended      | bigint           | macaddr, bigint                       | func  
 pg_catalog | hashname                 | integer          | name                                  | func  
 pg_catalog | hashnameextended         | bigint           | name, bigint                          | func  
 pg_catalog | hashoid                  | integer          | oid                                   | func  
 pg_catalog | hashoidextended          | bigint           | oid, bigint                           | func  
 pg_catalog | hashoidvector            | integer          | oidvector                             | func  
 pg_catalog | hashoidvectorextended    | bigint           | oidvector, bigint                     | func  
 pg_catalog | hashtext                 | integer          | text                                  | func  
 pg_catalog | hashtextextended         | bigint           | text, bigint                          | func  
 pg_catalog | hashvarlena              | integer          | internal                              | func  
 pg_catalog | hashvarlenaextended      | bigint           | internal, bigint                      | func  
 pg_catalog | interval_hash            | integer          | interval                              | func  
 pg_catalog | interval_hash_extended   | bigint           | interval, bigint                      | func  
 pg_catalog | jsonb_hash               | integer          | jsonb                                 | func  
 pg_catalog | jsonb_hash_extended      | bigint           | jsonb, bigint                         | func  
 pg_catalog | pg_lsn_hash              | integer          | pg_lsn                                | func  
 pg_catalog | pg_lsn_hash_extended     | bigint           | pg_lsn, bigint                        | func  
 pg_catalog | satisfies_hash_partition | boolean          | oid, integer, integer, VARIADIC "any" | func  
 pg_catalog | time_hash                | integer          | time without time zone                | func  
 pg_catalog | time_hash_extended       | bigint           | time without time zone, bigint        | func  
 pg_catalog | timestamp_hash           | integer          | timestamp without time zone           | func  
 pg_catalog | timestamp_hash_extended  | bigint           | timestamp without time zone, bigint   | func  
 pg_catalog | timetz_hash              | integer          | time with time zone                   | func  
 pg_catalog | timetz_hash_extended     | bigint           | time with time zone, bigint           | func  
 pg_catalog | uuid_hash                | integer          | uuid                                  | func  
 pg_catalog | uuid_hash_extended       | bigint           | uuid, bigint                          | func  
(56 rows)  

小结

在线将表转换为分区表,可以使用的方法:

1、转换为pg_pathman分区,直接调用pg_pathman的UDF即可。

2、转换为原生分区,使用继承,异步迁移的方法。割接是短暂锁表。

不支持 insert ino on conflict 语法。

insert into old values (1,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;  

3、逻辑复制的方法,将数据增量迁移到分区表(目标可以是原生分区方法或者是pg_pathman分区方法的新表)。

参考

《PostgreSQL 9.x, 10, 11 hash分区表 用法举例》

《PostgreSQL 触发器 用法详解 1》

《PostgreSQL 触发器 用法详解 2》

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
16天前
|
SQL 关系型数据库 MySQL
关系型数据库使用 TRUNCATE TABLE 语句
`TRUNCATE TABLE` SQL 语句快速删除表所有记录,不记录删除操作,通常比 `DELETE` 快。不触发 DELETE 触发器,可能重置自增字段,并产生较少日志。语法:`TRUNCATE TABLE 表名`。注意:不可回滚,不激活触发器,慎用,确保数据不可恢复。考虑使用 `DELETE` 当需保留触发器功能或删除特定条件的行。
16 1
|
2月前
|
SQL 关系型数据库 数据库
一文熟悉PolarDB-PG 分区表核心特性
在 PolarDB-PG 数据库中,分区表 (Partitioned Table) 使您能够将非常大的表分解为更小且更易于管理的部分,这个部分称为分区 (Partition) 。 每个分区都是一个独立的对象,具有自己的名称和可选的存储特性。本文首先简单的介绍了分区表策略以及它的优势特点,然后介绍了PolarDB-PG 分区表支持的查询优化特性,最后介绍了分区表上的本地索引和全局索引,从而帮助用户对PolarDB-PG 分区表有一个全面的了解。
|
7月前
|
SQL 监控 关系型数据库
PostgreSQL普通表转换成分区表
如何使用pg_rewrite扩展将普遍表转换成分区表
386 0
|
3月前
|
监控 负载均衡 关系型数据库
MySQL技能完整学习列表13、MySQL高级特性——1、分区表(Partitioning)——2、复制(Replication)——3、集群(Clustering)
MySQL技能完整学习列表13、MySQL高级特性——1、分区表(Partitioning)——2、复制(Replication)——3、集群(Clustering)
54 0
|
10月前
|
传感器 关系型数据库 MySQL
php语句:MySQL指定分区表跨分区根据时间条件快速查询记录的封装函数
php语句:MySQL指定分区表跨分区根据时间条件快速查询记录的封装函数
98 0
|
7月前
|
存储 关系型数据库 MySQL
MySQL分区表详解
在我们日常处理海量数据的过程中,如何有效管理和优化数据库一直是一个既重要又具有挑战性的问题
59 0
|
9月前
|
存储 关系型数据库 MySQL
【MySQL】MySQL分区表详解
【MySQL】MySQL分区表详解
182 0
|
10月前
|
存储 SQL 运维
PolarDB MySQL大表实践-分区表篇
背景:分区表到底是什么?分区作为传统企业级数据库的特性,早已经在很多大数据和数仓场景中得到广泛应用。基于维基百科的解释,分区是将逻辑数据库或其组成元素如表、表空间等划分为不同的独立部分。数据库分区通常是出于可管理性、性能或可用性的原因,或者是为了负载平衡。它在分布式数据库管理系统中很流行,其中每个分区可能分布在多个节点上,节点上的用户在分区上执行本地事务。这提高了具有涉及某些数据视图的常规事务的站
659 0
PolarDB MySQL大表实践-分区表篇
|
11月前
|
存储 关系型数据库 MySQL
高性能 MySQL(十二):分区表
分区表是一个独立的逻辑表,其底层由多个物理子表组成。对分区表的请求,在 MySQL 底层都会被转换为对范围内的物理子表的请求,并将结果合并到一起返回。
98 0
|
存储 SQL 缓存
【MySQL高级】查询缓存、合并表、分区表
【MySQL高级】查询缓存、合并表、分区表
181 0
【MySQL高级】查询缓存、合并表、分区表

相关产品

  • 云原生数据库 PolarDB