PostgreSQL 重复 数据清洗 优化教程

本文涉及的产品
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,通用型 2核4GB
简介:

标签

PostgreSQL , 重复数据清洗 , with recursive , 递归 , 流式计算 , pipelinedb , 窗口查询 , file_fdw , insert on conflict , LLVM , 并行创建索引


背景

重复数据清洗是一个比较常见的业务需求,比如有些数据库不支持唯一约束,或者程序设计之初可能没有考虑到需要在某些列上面加唯一约束,导致应用在上线一段时间后,产生了一些重复的数据。

那么重复数据的清洗需求就来了。

有哪些清洗手段,如何做到高效的清洗呢?

一个小小的应用场景,带出了10项数据库技术点,听我道来。

重复数据清洗手段

比如一个表,有几个字段本来应该是唯一的,产生了重复值,现在给你一个规则,保留重复值中的一条,其他删掉。

例子

postgres=# create table tbl_dup(   
  id serial8,   
  sid int,   
  crt_time timestamp,   
  mdf_time timestamp,   
  c1 text default md5(random()::text),   
  c2 text default md5(random()::text),   
  c3 text default md5(random()::text),   
  c4 text default md5(random()::text),   
  c5 text default md5(random()::text),   
  c6 text default md5(random()::text),   
  c7 text default md5(random()::text),   
  c8 text default md5(random()::text)   
);   

删除重复的 (sid + crt_time) 组合,并保留重复值中,mdf_time最大的一条。

生成测试数据100万条,1/10 的重复概率,同时为了避免重复数据在一个数据块中,每跳跃500条生成一条重复值。

就生成测试数据 ,是不是觉得已经很炫酷了呢?一条SQL就造了一批这样的数据。

insert into tbl_dup (sid, crt_time, mdf_time)   
select   
  case when mod(id,11)=0 then id+500 else id end,   
  case when mod(id,11)=0 then now()+(''||id+500||' s')::interval else now()+(''||id||' s')::interval end,   
  clock_timestamp()   
from generate_series(1,1000000) t(id);  

验证, 重复记录的ctid不在同一个数据块中。

验证方法是不是很酷呢?用了窗口查询。

postgres=# select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt>=2;  
    ctid    |  sid   |          crt_time          |          mdf_time          | cnt   
------------+--------+----------------------------+----------------------------+-----  
 (0,11)     |    511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.092625 |   2  
 (20,11)    |    511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.102726 |   2  
 (20,22)    |    522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.102927 |   2  
 (0,22)     |    522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.09283  |   2  
 (21,8)     |    533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.103155 |   2  
 (1,8)      |    533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.093191 |   2  
 (21,19)    |    544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.103375 |   2  
 (1,19)     |    544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.093413 |   2  
....  

包含重复的值大概这么多

postgres=# select count(*) from (select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt=2) t;  
 count    
--------  
 181726  
(1 row)  
Time: 1690.709 ms  

你如果觉得这个还挺快的,偷偷告诉你测试环境CPU型号。

Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz

接下来开始去重了

方法1, 插入法

将去重后的结果插入一张新的表中,耗时5.8秒

create table tbl_uniq(like tbl_dup including all);  

insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)  
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from   
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t  
where t.rn=1;  

INSERT 0 909137  
Time: 5854.349 ms  

分析优化空间,显示排序可以优化

postgres=# explain (analyze,verbose,timing,costs,buffers)  insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)  
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from   
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t  
where t.rn=1;  
                                                                                                QUERY PLAN                                                                                                  
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Insert on public.tbl_uniq  (cost=423098.84..458098.84 rows=5000 width=292) (actual time=5994.723..5994.723 rows=0 loops=1)  
   Buffers: shared hit=1021856 read=36376 dirtied=36375, temp read=37391 written=37391  
   ->  Subquery Scan on t  (cost=423098.84..458098.84 rows=5000 width=292) (actual time=1715.278..3620.269 rows=909137 loops=1)  
         Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8  
         Filter: (t.rn = 1)  
         Rows Removed by Filter: 90863  
         Buffers: shared hit=40000, temp read=37391 written=37391  
         ->  WindowAgg  (cost=423098.84..445598.84 rows=1000000 width=300) (actual time=1715.276..3345.392 rows=1000000 loops=1)  
               Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8  
               Buffers: shared hit=40000, temp read=37391 written=37391  
               ->  Sort  (cost=423098.84..425598.84 rows=1000000 width=292) (actual time=1715.263..2174.426 rows=1000000 loops=1)  
                     Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8  
                     Sort Key: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time DESC  
                     Sort Method: external sort  Disk: 299128kB  
                     Buffers: shared hit=40000, temp read=37391 written=37391  
                     ->  Seq Scan on public.tbl_dup  (cost=0.00..50000.00 rows=1000000 width=292) (actual time=0.012..398.007 rows=1000000 loops=1)  
                           Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8  
                           Buffers: shared hit=40000  
 Planning time: 0.174 ms  
 Execution time: 6120.921 ms  
(20 rows)  

优化1

索引,消除排序,优化后只需要3.9秒

对于在线业务,PostgreSQL可以使用并行CONCURRENTLY创建索引,不会堵塞DML。

postgres=# create index CONCURRENTLY idx_tbl_dup on tbl_dup(sid,crt_time,mdf_time desc);  
CREATE INDEX  
Time: 765.426 ms  

postgres=# truncate tbl_uniq;  
TRUNCATE TABLE  
Time: 208.808 ms  
postgres=# insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)                                                  
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from   
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t  
where t.rn=1;  
INSERT 0 909137  
Time: 3978.425 ms  

postgres=# explain (analyze,verbose,timing,costs,buffers)  insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)  
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from   
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t  
where t.rn=1;  
                                                                                                QUERY PLAN                                                                                                  
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Insert on public.tbl_uniq  (cost=0.42..159846.13 rows=5000 width=292) (actual time=4791.360..4791.360 rows=0 loops=1)  
   Buffers: shared hit=1199971 read=41303 dirtied=36374  
   ->  Subquery Scan on t  (cost=0.42..159846.13 rows=5000 width=292) (actual time=0.061..2177.768 rows=909137 loops=1)  
         Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8  
         Filter: (t.rn = 1)  
         Rows Removed by Filter: 90863  
         Buffers: shared hit=218112 read=4929  
         ->  WindowAgg  (cost=0.42..147346.13 rows=1000000 width=300) (actual time=0.060..1901.174 rows=1000000 loops=1)  
               Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8  
               Buffers: shared hit=218112 read=4929  
               ->  Index Scan using idx_tbl_dup on public.tbl_dup  (cost=0.42..127346.13 rows=1000000 width=292) (actual time=0.051..601.249 rows=1000000 loops=1)  
                     Output: tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8  
                     Buffers: shared hit=218112 read=4929  
 Planning time: 0.304 ms  
 Execution time: 4834.392 ms  
(15 rows)  
Time: 4835.484 ms  

优化2

递归查询、递归收敛

有几个CASE用这种方法提升了几百倍性能

《时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速》

《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》

《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》

当重复值很多时,可以使用此法,效果非常好

with recursive skip as (    
  (    
    select tbl_dup as tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup order by sid,crt_time,mdf_time desc limit 1)   
  )    
  union all    
  (    
    select (   
      select tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.sid is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1)   
    ) from skip s where (s.tbl_dup).sid is not null   
  )    -- 这里的where (s.tbl_dup).sid is not null 一定要加, 否则就死循环了.   
)     
select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;   

有UK时这样用

with recursive skip as (    
  (    
    select tbl_dup as tbl_dup from tbl_dup where (id) in (select id from tbl_dup order by sid,crt_time,mdf_time desc limit 1)   
  )    
  union all    
  (    
    select (   
      select tbl_dup from tbl_dup where id in (select id from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.id is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1)   
    ) from skip s where (s.tbl_dup).id is not null   
  )    -- 这里的where (s.tbl_dup).id is not null 一定要加, 否则就死循环了.   
)     
select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;   

方法3, 删除法

导入需要处理的时,新增一个row_number字段,并建立where row_number<>1的partial index.

删除时删除此部分记录即可,2秒搞定需求。

postgres=# delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1);  

DELETE 90863  
Time: 2079.588 ms  


postgres=# explain delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1);  
                                                             QUERY PLAN                                                               
------------------------------------------------------------------------------------------------------------------------------------  
 Delete on tbl_dup  (cost=187947.63..283491.75 rows=995000 width=50)  
   ->  Hash Semi Join  (cost=187947.63..283491.75 rows=995000 width=50)  
         Hash Cond: ((tbl_dup.sid = t.sid) AND (tbl_dup.crt_time = t.crt_time) AND (tbl_dup.mdf_time = t.mdf_time))  
         ->  Seq Scan on tbl_dup  (cost=0.00..50000.00 rows=1000000 width=26)  
         ->  Hash  (cost=159846.13..159846.13 rows=995000 width=64)  
               ->  Subquery Scan on t  (cost=0.42..159846.13 rows=995000 width=64)  
                     Filter: (t.rn <> 1)  
                     ->  WindowAgg  (cost=0.42..147346.13 rows=1000000 width=28)  
                           ->  Index Only Scan using idx_tbl_dup on tbl_dup tbl_dup_1  (cost=0.42..127346.13 rows=1000000 width=20)  
(9 rows)  

验证

postgres=# select count(*) , count(distinct (sid,crt_time)) from tbl_dup;  
 count  | count    
--------+--------  
 909137 | 909137  
(1 row)  

一气呵成的方法

假如重复数据来自文本,从文本去重后,导入数据库,再导出文本。

怎么听起来像把数据库当成了文本处理工具在用呢?

没关系,反正目的就是要快速。

怎么一气呵成呢?

首先是文件外部表,其次是COPY管道,一气呵成。

https://www.postgresql.org/docs/9.6/static/file-fdw.html

postgres=# create extension file_fdw;  
CREATE EXTENSION  


postgres=# copy tbl_dup to '/home/digoal/tbl_dup.csv' ;  
COPY 1000000  

postgres=# create server file foreign data wrapper file_fdw;  
CREATE SERVER  

CREATE FOREIGN TABLE ft_tbl_dup (   
  id serial8,   
  sid int,   
  crt_time timestamp,   
  mdf_time timestamp,   
  c1 text default md5(random()::text),   
  c2 text default md5(random()::text),   
  c3 text default md5(random()::text),   
  c4 text default md5(random()::text),   
  c5 text default md5(random()::text),   
  c6 text default md5(random()::text),   
  c7 text default md5(random()::text),   
  c8 text default md5(random()::text)   
) server file options (filename '/home/digoal/tbl_dup.csv' );  

postgres=# copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from   
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from ft_tbl_dup) t  
where t.rn=1) to '/home/digoal/tbl_uniq.csv';  

COPY 909137  
Time: 10973.289 ms  

并行处理优化手段

拆分成多个文件,并行处理,耗时降低到800毫秒左右。注意这没有结束,最后还需要merge sort对全局去重。

split -l 50000 tbl_dup.csv load_test_  

for i in `ls load_test_??`   
do  
psql <<EOF &  
drop foreign table "ft_$i";  
CREATE FOREIGN TABLE "ft_$i" (   
  id serial8,   
  sid int,   
  crt_time timestamp,   
  mdf_time timestamp,   
  c1 text default md5(random()::text),   
  c2 text default md5(random()::text),   
  c3 text default md5(random()::text),   
  c4 text default md5(random()::text),   
  c5 text default md5(random()::text),   
  c6 text default md5(random()::text),   
  c7 text default md5(random()::text),   
  c8 text default md5(random()::text)   
) server file options (filename '/home/digoal/$i' );  

\timing  

copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from   
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from "ft_$i") t  
where t.rn=1) to '/home/digoal/uniq_csv.$i';  

EOF  
done  

速度提升到了1秒以内完成,还可以继续提高并行度,总耗时降低到200毫秒左右。

COPY 45500  
Time: 764.978 ms  
COPY 45500  
Time: 683.255 ms  
COPY 45500  
Time: 775.625 ms  
COPY 45500  
Time: 733.227 ms  
COPY 45500  
Time: 750.978 ms  
COPY 45500  
Time: 766.984 ms  
COPY 45500  
Time: 796.796 ms  
COPY 45500  
Time: 797.016 ms  
COPY 45500  
Time: 881.682 ms  
COPY 45500  
Time: 794.691 ms  
COPY 45500  
Time: 812.932 ms  
COPY 45500  
Time: 921.792 ms  
COPY 45500  
Time: 890.095 ms  
COPY 45500  
Time: 845.815 ms  
COPY 45500  
Time: 867.456 ms  
COPY 45500  
Time: 874.979 ms  
COPY 45500  
Time: 882.578 ms  
COPY 45500  
Time: 880.131 ms  
COPY 45500  
Time: 901.515 ms  
COPY 45500  
Time: 904.857 ms  

注意这没有结束,最后还需要merge sort对全局去重。 略

一气呵成方法2

并行导入单表处理后倒出,中间结果不需要保存,所以使用UNLOGGED TABLE

CREATE unlogged TABLE tmp (   
  id serial8,   
  sid int,   
  crt_time timestamp,   
  mdf_time timestamp,   
  c1 text default md5(random()::text),   
  c2 text default md5(random()::text),   
  c3 text default md5(random()::text),   
  c4 text default md5(random()::text),   
  c5 text default md5(random()::text),   
  c6 text default md5(random()::text),   
  c7 text default md5(random()::text),   
  c8 text default md5(random()::text)   
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);  

create index idx_tmp_1 on tmp (sid,crt_time,mdf_time desc);  
split -l 20000 tbl_dup.csv load_test_  
date +%F%T.%N  

for i in `ls load_test_??`   
do  
psql <<EOF &  
truncate tmp;  
copy tmp from '/home/digoal/$i';  

EOF  
done  

for ((i=1;i>0;i=1))  
do  
sleep 0.0001  
cnt=`ps -ewf|grep -v grep|grep -c psql`  
if [ $cnt -eq 0 ]; then  
break  
fi  
done  

psql <<EOF  
copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from   
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tmp) t  
where t.rn=1) to '/dev/shm/tbl_uniq.csv';  
EOF  

date +%F%T.%N  
2016-12-3000:59:42.309126109  
2016-12-3000:59:47.589134168  

5.28秒。

重复数据清洗优化手段 - 技术点分享

前面用到了很多种方法来进行优化,下面总结一下

1. 窗口查询

主要用于筛选出重复值,并加上标记。

需要去重的字段作为窗口,规则字段作为排序字段,建立好复合索引,即可开始了。

2. 外部表

如果你的数据来自文本,那么可以采用一气呵成的方法来完成去重,即把数据库当成文本处理平台,通过PostgreSQL的file_fdw外部表直接访问文件,在SQL中进行去重。

3. 并行计算

如果你的数据来自文本,可以将文本切割成多个小文件,使用外部表,并行的去重,但是注意,去完重后,需要用merge sort再次去重。

另一方面,PostgreSQL 9.6已经支持单个QUERY使用多个CPU核来处理,可以线性的提升性能。(去重需要考虑合并的问题)。

4. 递归查询、递归收敛

使用递归查询,可以对重复度很高的场景进行优化,曾经在几个CASE中使用,优化效果非常明显,从几十倍到几百倍不等。

《时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速》

《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》

《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》

5. insert on conflict

PostgreSQL 9.5新增的特性,可以在数据导入时完成去重的操作。 直接导出结果。

CREATE unlogged TABLE tmp_uniq (   
  id serial8,   
  sid int,   
  crt_time timestamp,   
  mdf_time timestamp,   
  c1 text default md5(random()::text),   
  c2 text default md5(random()::text),   
  c3 text default md5(random()::text),   
  c4 text default md5(random()::text),   
  c5 text default md5(random()::text),   
  c6 text default md5(random()::text),   
  c7 text default md5(random()::text),   
  c8 text default md5(random()::text),  
  unique (sid,crt_time)  
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);  

并行装载(目前不能在同一条QUERY中多次UPDATE一条记录)

ERROR:  21000: ON CONFLICT DO UPDATE command cannot affect row a second time  
HINT:  Ensure that no rows proposed for insertion within the same command have duplicate constrained values.  
LOCATION:  ExecOnConflictUpdate, nodeModifyTable.c:1133  
split -l 20000 tbl_dup.csv load_test_  

for i in `ls load_test_??`   
do  
psql <<EOF &  
drop foreign table "ft_$i";  

CREATE FOREIGN TABLE "ft_$i" (   
  id serial8,   
  sid int,   
  crt_time timestamp,   
  mdf_time timestamp,   
  c1 text default md5(random()::text),   
  c2 text default md5(random()::text),   
  c3 text default md5(random()::text),   
  c4 text default md5(random()::text),   
  c5 text default md5(random()::text),   
  c6 text default md5(random()::text),   
  c7 text default md5(random()::text),   
  c8 text default md5(random()::text)   
) server file options (filename '/home/digoal/$i' );  

\timing  

insert into tmp_uniq select * from "ft_$i" on conflict do update set   
id=excluded.id, sid=excluded.sid, crt_time=excluded.crt_time, mdf_time=excluded.mdf_time,  
c1=excluded.c1,c2=excluded.c2,c3=excluded.c3,c4=excluded.c4,c5=excluded.c5,c6=excluded.c6,c7=excluded.c7,c8=excluded.c8  
where mdf_time<excluded.mdf_time  
;  

EOF  
done  

6. LLVM

处理多行时,减少上下文切换。

性能可以提升一倍左右。

《分析加速引擎黑科技 - LLVM、列存、多核并行、算子复用 大联姻 - 一起来开启PostgreSQL的百宝箱》

7. 流式计算

在数据导入过程中,流式去重,是不是很炫酷呢。

create stream ss_uniq (  
  id int8,   
  sid int,   
  crt_time timestamp,   
  mdf_time timestamp,   
  c1 text default md5(random()::text),   
  c2 text default md5(random()::text),   
  c3 text default md5(random()::text),   
  c4 text default md5(random()::text),   
  c5 text default md5(random()::text),   
  c6 text default md5(random()::text),   
  c7 text default md5(random()::text),   
  c8 text default md5(random()::text)  
);  
CREATE CONTINUOUS VIEW cv_uniq as  
select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from ss_uniq;  

《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》

8. 并行创建索引

在创建索引时,为了防止堵塞DML操作,可以使用concurrently的方式创建,不会影响DML操作。

建立索引时,加大maintenance_work_mem可以提高创建索引的速度。

9. 并行读取文件片段导入

为了加快导入速度,可以切片,并行导入。

将来可以在file_fdw这种外部访问接口中做到分片并行导入。

10. bulk load, nologgin

如果数据库只做计算,也就是说在数据库中处理的中间结果无需保留时,可以适应bulk的方式导入,或者使用unlogged table。

可以提高导入的速度,同时导入时也可以关闭autovacuum.

小结

1. 如果数据已经在数据库中,在原表基础上,删除重复数据,耗时约2秒。

2. 如果数据要从文本导入,并将去重后的数据导出,整个流程约耗时5.28秒。

参考

《分析加速引擎黑科技 - LLVM、列存、多核并行、算子复用 大联姻 - 一起来开启PostgreSQL的百宝箱》

《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》

《时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速》

《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》

《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
1月前
|
关系型数据库 Java 数据库连接
PostgreSQL从小白到高手教程 - 第47讲:JMETER工具使用
PostgreSQL从小白到高手教程 - 第47讲:JMETER工具使用
142 3
|
26天前
|
关系型数据库 分布式数据库 数据库
【PolarDB开源】PolarDB资源隔离技术:在多租户环境中的应用与优化
【5月更文挑战第29天】PolarDB,阿里云的云原生数据库,在多租户环境中通过逻辑(Schema/Partition隔离)和物理(分布式存储计算节点)隔离保障数据安全和资源独占。它支持动态资源分配,适应不同租户需求,处理大规模并发,提供租户管理及数据访问控制功能。通过优化资源分配算法、提升事务处理能力和强化监控告警,PolarDB确保性能和稳定性,满足多租户的高效数据库服务需求。
68 1
|
1月前
|
存储 关系型数据库 分布式数据库
数据库索引回表困难?揭秘PolarDB存储引擎优化技术
PolarDB分布式版存储引擎采用CSM方案均衡资源开销与可用性。
数据库索引回表困难?揭秘PolarDB存储引擎优化技术
|
27天前
|
SQL 缓存 监控
关系型数据库优化查询语句
【5月更文挑战第18天】
30 2
|
27天前
|
存储 监控 关系型数据库
关系型数据库数据库设计优化
【5月更文挑战第18天】关系型数据库数据库设计优化
34 1
|
27天前
|
关系型数据库 MySQL 数据库
关系型数据库索引设计优化
【5月更文挑战第18天】
29 1
|
28天前
|
SQL 关系型数据库 分布式数据库
【PolarDB开源】PolarDB Proxy配置与优化:提升数据库访问效率
【5月更文挑战第27天】PolarDB Proxy是阿里云PolarDB的高性能数据库代理,负责SQL请求转发和负载均衡。其关键配置包括:连接池管理(如最大连接数、空闲超时时间),负载均衡策略(轮询、权重轮询、一致性哈希),以及SQL过滤规则。优化方面,关注监控与调优、缓存策略、网络优化。通过这些措施,可提升数据库访问效率和系统稳定性。
130 1
|
1月前
|
SQL 监控 关系型数据库
【PolarDB开源】PolarDB SQL优化实践:提升查询效率与资源利用
【5月更文挑战第24天】PolarDB是高性能的云原生数据库,强调SQL查询优化以提升性能。本文分享了其SQL优化策略,包括查询分析、索引优化、查询重写、批量操作和并行查询,以及性能监控与调优方法。通过这些措施,可以减少响应时间、提高并发处理能力和降低成本。文中还提供了相关示例代码,展示如何分析查询和创建索引,帮助用户实现更高效的数据库管理。
202 1
|
18天前
|
SQL 存储 关系型数据库
PolarDB产品使用合集之有的sql里面有自定义存储函数 如果想走列存有什么优化建议吗
PolarDB是阿里云推出的一种云原生数据库服务,专为云设计,提供兼容MySQL、PostgreSQL的高性能、低成本、弹性可扩展的数据库解决方案,可以有效地管理和优化PolarDB实例,确保数据库服务的稳定、高效运行。以下是使用PolarDB产品的一些建议和最佳实践合集。
289 0
|
29天前
|
负载均衡 关系型数据库 分布式数据库
【PolarDB开源】PolarDB读写分离实践:优化读取性能与负载均衡策略
【5月更文挑战第26天】PolarDB是云原生关系型数据库,通过读写分离优化性能和扩展性。它设置主节点处理写操作,从节点处理读操作,异步复制保证数据一致性。优化读取性能的策略包括增加从节点数量、使用只读实例和智能分配读请求。负载均衡策略涉及基于权重、连接数和地理位置的分配。实践示例中,电商网站通过主从架构、只读实例和负载均衡策略提升商品查询效率。PolarDB的读写分离与负载均衡为企业应对大数据和高并发提供了有效解决方案。
137 0