PostgreSQL 普通表在线转换为分区表 - online exchange to partition table

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: PostgreSQL 普通表在线转换为分区表 - online exchange to partition table

背景

非分区表,如何在线(不影响业务)转换为分区表?

方法1,pg_pathman分区插件

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

使用非堵塞式的迁移接口

partition_table_concurrently(  
  relation   REGCLASS,              -- 主表OID  
  batch_size INTEGER DEFAULT 1000,  -- 一个事务批量迁移多少记录  
  sleep_time FLOAT8 DEFAULT 1.0)    -- 获得行锁失败时,休眠多久再次获取,重试60次退出任务。  
  
  
postgres=# select partition_table_concurrently('part_test'::regclass,  
                             10000,  
                             1.0);  
NOTICE:  worker started, you can stop it with the following command: select stop_concurrent_part_task('part_test');  
 partition_table_concurrently   
------------------------------  
   
(1 row)  

迁移结束后,主表数据已经没有了,全部在分区中

postgres=# select count(*) from only part_test;  
 count   
-------  
     0  
(1 row)  

数据迁移完成后,建议禁用主表,这样执行计划就不会出现主表了

postgres=# select set_enable_parent('part_test'::regclass, false);  
 set_enable_parent   
-------------------  
   
(1 row)  

方法2,原生分区

使用继承表,触发器,异步迁移,交换表名一系列步骤,在线将非分区表,转换为分区表(交换表名是需要短暂的堵塞)。

关键技术:

1、继承表(子分区)

对select, update, delete, truncate, drop透明。

2、触发器

插入,采用before触发器,数据路由到继承分区

更新,采用before触发器,删除老表记录,同时将更新后的数据插入新表

3、后台迁移数据,cte only skip locked , delete only, insert into new table

4、迁移结束(p表没有数据后),短暂上锁,剥离INHERTI关系,切换到原生分区,切换表名。

例子

将一个表在线转换为LIST分区表(伪HASH分区)。

范围分区类似。

如果要转换为原生HASH分区表,需要提取pg内置HASH分区算法。

1、创建测试表(需要被分区的表)

create table old (id int primary key, info text, crt_time timestamp);  

2、写入1000万测试记录

insert into old select generate_series(1,10000000) , md5(random()::text) , now();  

3、创建子分区(本例使用LIST分区)

do language plpgsql $$    
declare    
  parts int := 4;    
begin    
  for i in 0..parts-1 loop    
    execute format('create table old_mid%s (like old including all) inherits (old)', i);    
    execute format('alter table old_mid%s add constraint ck check(abs(mod(id,%s))=%s)', i, parts, i);    
  end loop;    
end;    
$$;    

4、插入,采用before触发器,路由到新表

create or replace function ins_tbl() returns trigger as $$    
declare    
begin    
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
    else    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  return null;    
end;    
$$ language plpgsql strict;    
  
  
create trigger tg1 before insert on old for each row execute procedure ins_tbl();    

5、更新,采用before触发器,删除老表,同时将更新后的数据插入新表

create or replace function upd_tbl () returns trigger as $$  
declare  
begin  
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
    else    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  
  delete from only old where id=NEW.id;  
  return null;    
end;    
$$ language plpgsql strict;    
  
create trigger tg2 before update on old for each row execute procedure upd_tbl();    

6、old table 如下

postgres=# \dt+ old  
                    List of relations  
 Schema | Name | Type  |  Owner   |  Size  | Description   
--------+------+-------+----------+--------+-------------  
 public | old  | table | postgres | 730 MB |   
(1 row)  
  
  
继承关系如下  
  
  
postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           | not null |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Indexes:  
    "old_pkey" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 BEFORE INSERT ON old FOR EACH ROW EXECUTE PROCEDURE ins_tbl()  
    tg2 BEFORE UPDATE ON old FOR EACH ROW EXECUTE PROCEDURE upd_tbl()  
Child tables: old_mid0,  
              old_mid1,  
              old_mid2,  
              old_mid3  

7、验证insert, update, delete, select完全符合要求。对业务SQL请求透明。

postgres=# insert into old values (0,'test',now());  
INSERT 0 0  
  
postgres=# select tableoid::regclass,* from old where id=1;  
 tableoid | id |               info               |         crt_time            
----------+----+----------------------------------+---------------------------  
 old      |  1 | 22be06200f2a967104872f6f173fd038 | 31-JAN-19 12:52:25.887242  
(1 row)  
  
postgres=# select tableoid::regclass,* from old where id=0;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | test | 31-JAN-19 13:02:35.859899  
(1 row)  
postgres=# update old set info='abc' where id in (0,2) returning tableoid::regclass,*;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:02:35.859899  
(1 row)  
  
UPDATE 1  
  
postgres=# select tableoid::regclass,* from old where id in (0,2);  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:12:03.343559  
 old_mid2 |  2 | abc  | 31-JAN-19 13:11:04.763652  
(2 rows)  
postgres=# delete from old where id=3;  
DELETE 1  
postgres=# select tableoid::regclass,* from old where id=3;  
 tableoid | id | info | crt_time   
----------+----+------+----------  
(0 rows)  

8、开启压测,后台对原表数据进行迁移

create or replace function test_ins(int) returns void as $$  
declare  
begin  
  insert into old values ($1,'test',now());  
  exception when others then  
  return;  
end;  
$$ language plpgsql strict;  
vi test.sql  
  
\set id1 random(10000001,200000000)  
\set id2 random(1,5000000)  
\set id3 random(5000001,10000000)  
delete from old where id=:id2;  
update old set info=md5(random()::text),crt_time=now() where id=:id3;  
select test_ins(:id1);  

开启压测

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 4 -j 4 -T 1200  
  
...  
  
progress: 323.0 s, 12333.1 tps, lat 0.324 ms stddev 0.036  
progress: 324.0 s, 11612.9 tps, lat 0.344 ms stddev 0.203  
progress: 325.0 s, 12546.0 tps, lat 0.319 ms stddev 0.061  
progress: 326.0 s, 12728.7 tps, lat 0.314 ms stddev 0.038  
progress: 327.0 s, 12536.9 tps, lat 0.319 ms stddev 0.040  
progress: 328.0 s, 12534.1 tps, lat 0.319 ms stddev 0.042  
progress: 329.0 s, 12228.1 tps, lat 0.327 ms stddev 0.047  
...  

9、在线迁移数据

批量迁移,每一批迁移N条。调用以下SQL

with a as (  
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
  
INSERT 0 0  
  
postgres=# select count(*) from only old;  
  count    
---------  
 9998998  
(1 row)  
  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
postgres=# with a as (                     
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
  
postgres=# select count(*) from only old;  
  count    
---------  
 9997998  
(1 row)  
  
postgres=# with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 100000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from only old;  
  count    
---------  
 9897998  
(1 row)  
  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  

一次迁移1万条,分批操作。

with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 10000 for update skip locked) ) returning *  
)  
insert into old select * from a;  

持续调用以上接口,直到当old表已经没有数据,完全迁移到了分区。

select count(*) from only old;  
  
  
 count   
-------  
     0  
(1 row)  

10、切换到分区表

创建分区表如下,分区方法与继承约束一致。

create table new (id int, info text, crt_time timestamp) partition by list (abs(mod(id,4)));    

切换表名,防止雪崩,使用锁超时,由于只涉及表名变更,所以速度非常快。

begin;  
set lock_timeout ='3s';   
alter table old_mid0 no inherit old;   
alter table old_mid1 no inherit old;   
alter table old_mid2 no inherit old;   
alter table old_mid3 no inherit old;   
alter table old rename to old_tmp;  
alter table new rename to old;  
alter table old ATTACH PARTITION old_mid0 for values in (0);    
alter table old ATTACH PARTITION old_mid1 for values in (1);    
alter table old ATTACH PARTITION old_mid2 for values in (2);    
alter table old ATTACH PARTITION old_mid3 for values in (3);    
end;  

切换后的原生分区表如下

postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           |          |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Partition key: LIST (abs(mod(id, 4)))  
Partitions: old_mid0 FOR VALUES IN (0),  
            old_mid1 FOR VALUES IN (1),  
            old_mid2 FOR VALUES IN (2),  
            old_mid3 FOR VALUES IN (3)  

查询测试

postgres=# explain select * from old where id=1;  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..10.04 rows=4 width=44)  
   ->  Index Scan using old_mid0_pkey on old_mid0  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid2_pkey on old_mid2  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid3_pkey on old_mid3  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
(9 rows)  
  
  
  
postgres=# explain select * from old where id=? and abs(mod(id, 4)) = abs(mod(?, 4));  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..2.52 rows=1 width=45)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
         Filter: (mod(id, 4) = 1)  
(4 rows)  

数据

postgres=# select count(*) from old;  
  count     
----------  
 10455894  
(1 row)  

方法3,logical replication

使用逻辑复制的方法,同步到分区表。

简单步骤如下:

snapshot 快照(lsn位点)  
  
全量  
  
增量(逻辑复制,从LSN位置开始解析WAL LOG)  
  
切换表名

其他

hash函数

postgres=# \df *.*hash*  
                                            List of functions  
   Schema   |           Name           | Result data type |          Argument data types          | Type   
------------+--------------------------+------------------+---------------------------------------+------  
 pg_catalog | hash_aclitem             | integer          | aclitem                               | func  
 pg_catalog | hash_aclitem_extended    | bigint           | aclitem, bigint                       | func  
 pg_catalog | hash_array               | integer          | anyarray                              | func  
 pg_catalog | hash_array_extended      | bigint           | anyarray, bigint                      | func  
 pg_catalog | hash_numeric             | integer          | numeric                               | func  
 pg_catalog | hash_numeric_extended    | bigint           | numeric, bigint                       | func  
 pg_catalog | hash_range               | integer          | anyrange                              | func  
 pg_catalog | hash_range_extended      | bigint           | anyrange, bigint                      | func  
 pg_catalog | hashbpchar               | integer          | character                             | func  
 pg_catalog | hashbpcharextended       | bigint           | character, bigint                     | func  
 pg_catalog | hashchar                 | integer          | "char"                                | func  
 pg_catalog | hashcharextended         | bigint           | "char", bigint                        | func  
 pg_catalog | hashenum                 | integer          | anyenum                               | func  
 pg_catalog | hashenumextended         | bigint           | anyenum, bigint                       | func  
 pg_catalog | hashfloat4               | integer          | real                                  | func  
 pg_catalog | hashfloat4extended       | bigint           | real, bigint                          | func  
 pg_catalog | hashfloat8               | integer          | double precision                      | func  
 pg_catalog | hashfloat8extended       | bigint           | double precision, bigint              | func  
 pg_catalog | hashhandler              | index_am_handler | internal                              | func  
 pg_catalog | hashinet                 | integer          | inet                                  | func  
 pg_catalog | hashinetextended         | bigint           | inet, bigint                          | func  
 pg_catalog | hashint2                 | integer          | smallint                              | func  
 pg_catalog | hashint2extended         | bigint           | smallint, bigint                      | func  
 pg_catalog | hashint4                 | integer          | integer                               | func  
 pg_catalog | hashint4extended         | bigint           | integer, bigint                       | func  
 pg_catalog | hashint8                 | integer          | bigint                                | func  
 pg_catalog | hashint8extended         | bigint           | bigint, bigint                        | func  
 pg_catalog | hashmacaddr              | integer          | macaddr                               | func  
 pg_catalog | hashmacaddr8             | integer          | macaddr8                              | func  
 pg_catalog | hashmacaddr8extended     | bigint           | macaddr8, bigint                      | func  
 pg_catalog | hashmacaddrextended      | bigint           | macaddr, bigint                       | func  
 pg_catalog | hashname                 | integer          | name                                  | func  
 pg_catalog | hashnameextended         | bigint           | name, bigint                          | func  
 pg_catalog | hashoid                  | integer          | oid                                   | func  
 pg_catalog | hashoidextended          | bigint           | oid, bigint                           | func  
 pg_catalog | hashoidvector            | integer          | oidvector                             | func  
 pg_catalog | hashoidvectorextended    | bigint           | oidvector, bigint                     | func  
 pg_catalog | hashtext                 | integer          | text                                  | func  
 pg_catalog | hashtextextended         | bigint           | text, bigint                          | func  
 pg_catalog | hashvarlena              | integer          | internal                              | func  
 pg_catalog | hashvarlenaextended      | bigint           | internal, bigint                      | func  
 pg_catalog | interval_hash            | integer          | interval                              | func  
 pg_catalog | interval_hash_extended   | bigint           | interval, bigint                      | func  
 pg_catalog | jsonb_hash               | integer          | jsonb                                 | func  
 pg_catalog | jsonb_hash_extended      | bigint           | jsonb, bigint                         | func  
 pg_catalog | pg_lsn_hash              | integer          | pg_lsn                                | func  
 pg_catalog | pg_lsn_hash_extended     | bigint           | pg_lsn, bigint                        | func  
 pg_catalog | satisfies_hash_partition | boolean          | oid, integer, integer, VARIADIC "any" | func  
 pg_catalog | time_hash                | integer          | time without time zone                | func  
 pg_catalog | time_hash_extended       | bigint           | time without time zone, bigint        | func  
 pg_catalog | timestamp_hash           | integer          | timestamp without time zone           | func  
 pg_catalog | timestamp_hash_extended  | bigint           | timestamp without time zone, bigint   | func  
 pg_catalog | timetz_hash              | integer          | time with time zone                   | func  
 pg_catalog | timetz_hash_extended     | bigint           | time with time zone, bigint           | func  
 pg_catalog | uuid_hash                | integer          | uuid                                  | func  
 pg_catalog | uuid_hash_extended       | bigint           | uuid, bigint                          | func  
(56 rows)  

小结

在线将表转换为分区表,可以使用的方法:

1、转换为pg_pathman分区,直接调用pg_pathman的UDF即可。

2、转换为原生分区,使用继承,异步迁移的方法。割接是短暂锁表。

不支持 insert ino on conflict 语法。

insert into old values (1,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;  

3、逻辑复制的方法,将数据增量迁移到分区表(目标可以是原生分区方法或者是pg_pathman分区方法的新表)。

参考

《PostgreSQL 9.x, 10, 11 hash分区表 用法举例》

《PostgreSQL 触发器 用法详解 1》

《PostgreSQL 触发器 用法详解 2》

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
4月前
|
SQL 关系型数据库 数据库
PostgreSQL数据库报错 ERROR: multiple default values specified for column "" of table "" 如何解决?
PostgreSQL数据库报错 ERROR: multiple default values specified for column "" of table "" 如何解决?
425 59
|
7月前
|
分布式计算 DataWorks 关系型数据库
实时数仓 Hologres产品使用合集之如何将MySQL数据初始化到分区表中
实时数仓Hologres的基本概念和特点:1.一站式实时数仓引擎:Hologres集成了数据仓库、在线分析处理(OLAP)和在线服务(Serving)能力于一体,适合实时数据分析和决策支持场景。2.兼容PostgreSQL协议:Hologres支持标准SQL(兼容PostgreSQL协议和语法),使得迁移和集成变得简单。3.海量数据处理能力:能够处理PB级数据的多维分析和即席查询,支持高并发低延迟查询。4.实时性:支持数据的实时写入、实时更新和实时分析,满足对数据新鲜度要求高的业务场景。5.与大数据生态集成:与MaxCompute、Flink、DataWorks等阿里云产品深度融合,提供离在线
|
7月前
|
存储 关系型数据库 MySQL
MySQL 分区表
MySQL 分区表
80 4
|
6月前
|
存储 关系型数据库 数据库
MySQL设计规约问题之是否可以使用分区表
MySQL设计规约问题之是否可以使用分区表
|
7月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之当需要将数据从ODPS同步到RDS,且ODPS表是二级分区表时,如何同步所有二级分区的数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
85 7
|
7月前
|
存储 监控 关系型数据库
MySQL普通表转换为分区表实战指南
MySQL普通表转换为分区表实战指南
|
7月前
|
存储 关系型数据库 MySQL
MySQL分区表:万字详解与实践指南
MySQL分区表:万字详解与实践指南
|
SQL 监控 关系型数据库
PostgreSQL普通表转换成分区表
如何使用pg_rewrite扩展将普遍表转换成分区表
|
7月前
|
存储 关系型数据库 MySQL
【MySQL技术内幕】4.8-分区表
【MySQL技术内幕】4.8-分区表
154 0
|
8月前
|
关系型数据库 数据库 PostgreSQL

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版