PostgreSQL 实践 - 实时广告位推荐 1 (任意字段组合、任意维度组合搜索、输出TOP-K)

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介:

标签

PostgreSQL , gin , 倒排 , rum , gin_fuzzy_search_limit , 随机采样 , 分区索引 , 分段索引 , score分段


背景

店铺,广告推荐位,自动计算,高效检索,高效更新。

根据:本店、全网用户行为,库存等进行运算,得到每个商品的分值,推荐排行靠前的商品。

维度可能很多,例如:北京的男性用户在秋天买袜子的可能性是0.431,这里面就是4个维度。实际场景维度可能有几十个,几百个,甚至几千个。

需要支持任意维度,排序,求TOP 100,要求毫秒级延迟,100万QPS。

设计1

1、定义维度

create table tbl_weidu (  
  wid int primary key,  
  info json  
);   

2、定义推荐表,只存储排在前100的商品和分值

create table tbl_score (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  top10 text[] not null, -- top 10的 item_score  
  primary key(wid,uid)  
);   

3、定义一个函数,用于合并两个text数组,在有新的商品分值输入时,合并为一个新值(当商品重复时,新值覆盖旧值,最后排序,保留输出TOP N)

create or replace function merge_top10(  
text[],   -- old value  
text[],   -- new value  
ln int    -- 按score排序,保留 top N  
) returns text[] as $$  
  select array_agg(v2||'_'||v3 order by v3 desc) from   
  (  
    select v2,v3 from   
    (  
      select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from   -- 同一个商品, 使用new values  
      (  
        select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info) -- old values  
        union all  
        select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info) -- new values  
      ) t  
    ) t where rn=1 order by v3 desc limit ln   -- 同一个商品, 使用new values  
  ) t;  
$$ language sql strict immutable;  

4、定义日志表,用于记录商品在某个维度上的值的变更,后面消费这个LOG表,合并更新最后的tbl_score表

create unlogged table tbl_score_log (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create index idx_tbl_score_log_1 on tbl_score_log (wid,uid,crt_time);  

5、定义消费函数

create or replace function consume_log(  
  i_loop int,    -- 循环处理多少次,(多少组wid,uid)  
  i_limit int,   -- 对于同一组wid,uid,单次处理多少行  
  i_topn int     -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)  
) returns void as $$  
declare  
  v_wid int;  
  v_uid int8;  
  v_top1 text[];  
  i int := 0;  
begin  
  LOOP  
  exit when i >= i_loop;   --  loops  
  
  select wid,uid into v_wid,v_uid from tbl_score_log for update skip locked limit 1;  
  
  with  
  a as (  
    delete from tbl_score_log where ctid= any (array(  
      select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit  -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc) into v_top1   
    from  
    (select item,score from a order by score desc limit i_topn) t;        -- limit topn  
  
  insert into tbl_score   
  values (v_wid, v_uid, v_top1)  
  on conflict (wid,uid)  
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where  
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);  
  
  i := i+1;  
  END LOOP;  
end;  
$$ language plpgsql strict;  

6、压测1,生成分值变更日志

(1000个维度,1万家店,1亿个商品)

vi test.sql  
\set wid random(1,1000)  
\set uid random(1,10000)  
\set item random(1,100000000)  
insert into tbl_score_log values (:wid,:uid,:item,random()*100,now());  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  
  
tps = 257737.493753 (including connections establishing)  
tps = 257752.428348 (excluding connections establishing)  

写入超过25万行/s.

8、消费LOG表,合并结果到分值表

postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
postgres=# \timing  
Timing is on.  
  
postgres=# select * from tbl_score limit 10;  
 wid | uid  |                top10                  
-----+------+-------------------------------------  
 115 |   69 | {989915_22.2217}  
 441 | 3914 | {7521898_39.2669}  
 423 | 7048 | {75494665_92.5439}  
 789 | 1335 | {57756208_23.4602}  
 776 | 8065 | {41134454_46.8727}  
 785 | 6248 | {76364646_93.4671,94065193_69.2552}  
 567 | 7539 | {97116865_6.93694}  
 207 | 6926 | {45163995_14.1626}  
 788 | 9025 | {73053901_80.3204}  
 334 | 2805 | {80532634_78.1224}  
(10 rows)  
  
Time: 0.300 ms  

9、跟踪每一次消费消耗的资源

load 'auto_explain';  
set auto_explain.log_analyze =on;  
set auto_explain.log_buffers =on;  
set auto_explain.log_min_duration =0;  
set auto_explain.log_nested_statements =on;  
set auto_explain.log_time=on;  
set auto_explain.log_verbose =on;  
set client_min_messages ='log';  
postgres=# select consume_log(1, 10000, 100);  
LOG:  duration: 0.819 ms  plan:  
Query Text: select wid,uid                  from tbl_score_log for update skip locked limit 1  
Limit  (cost=10000000000.00..10000000000.03 rows=1 width=18) (actual time=0.816..0.816 rows=1 loops=1)  
  Output: wid, uid, ctid  
  Buffers: shared hit=177  
  ->  LockRows  (cost=10000000000.00..10000876856.44 rows=30947272 width=18) (actual time=0.815..0.815 rows=1 loops=1)  
        Output: wid, uid, ctid  
        Buffers: shared hit=177  
        ->  Seq Scan on public.tbl_score_log  (cost=10000000000.00..10000567383.72 rows=30947272 width=18) (actual time=0.808..0.808 rows=1 loops=1)  
              Output: wid, uid, ctid  
              Buffers: shared hit=176  
LOG:  duration: 0.104 ms  plan:  
Query Text: with  
  a as (  
    delete from tbl_score_log where ctid= any (array(  
      select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit  -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)                  from  
    (select item,score from a order by score desc limit i_topn) t  
Aggregate  (cost=13.56..13.57 rows=1 width=32) (actual time=0.100..0.100 rows=1 loops=1)  
  Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)  
  Buffers: shared hit=20  
  CTE a  
    ->  Delete on public.tbl_score_log tbl_score_log_1  (cost=2.06..13.16 rows=10 width=6) (actual time=0.059..0.063 rows=4 loops=1)  
          Output: tbl_score_log_1.item, tbl_score_log_1.score  
          Buffers: shared hit=20  
          InitPlan 1 (returns $0)  
            ->  Limit  (cost=0.56..2.05 rows=1 width=14) (actual time=0.017..0.043 rows=4 loops=1)  
                  Output: tbl_score_log.ctid, tbl_score_log.crt_time  
                  Buffers: shared hit=8  
                  ->  Index Scan using idx_tbl_score_log_1 on public.tbl_score_log  (cost=0.56..5.02 rows=3 width=14) (actual time=0.017..0.041 rows=4 loops=1)  
                        Output: tbl_score_log.ctid, tbl_score_log.crt_time  
                        Index Cond: ((tbl_score_log.wid = $5) AND (tbl_score_log.uid = $6))  
                        Buffers: shared hit=8  
          ->  Tid Scan on public.tbl_score_log tbl_score_log_1  (cost=0.01..11.11 rows=10 width=6) (actual time=0.053..0.055 rows=4 loops=1)  
                Output: tbl_score_log_1.ctid  
                TID Cond: (tbl_score_log_1.ctid = ANY ($0))  
                Buffers: shared hit=12  
  ->  Limit  (cost=0.37..0.37 rows=1 width=12) (actual time=0.077..0.079 rows=4 loops=1)  
        Output: a.item, a.score  
        Buffers: shared hit=20  
        ->  Sort  (cost=0.37..0.39 rows=10 width=12) (actual time=0.076..0.077 rows=4 loops=1)  
              Output: a.item, a.score  
              Sort Key: a.score DESC  
              Sort Method: quicksort  Memory: 25kB  
              Buffers: shared hit=20  
              ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=12) (actual time=0.060..0.066 rows=4 loops=1)  
                    Output: a.item, a.score  
                    Buffers: shared hit=20  
LOG:  duration: 0.046 ms  plan:  
Query Text: insert into tbl_score   
  values (v_wid, v_uid, v_top1)  
  on conflict (wid,uid)  
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where  
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  
Insert on public.tbl_score  (cost=0.00..0.01 rows=1 width=44) (actual time=0.045..0.045 rows=0 loops=1)  
  Conflict Resolution: UPDATE  
  Conflict Arbiter Indexes: tbl_score_pkey  
  Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))  
  Tuples Inserted: 1  
  Conflicting Tuples: 0  
  Buffers: shared hit=7  
  ->  Result  (cost=0.00..0.01 rows=1 width=44) (actual time=0.000..0.001 rows=1 loops=1)  
        Output: $5, $6, $7  
LOG:  duration: 1.951 ms  plan:  
Query Text: select consume_log(1, 10000, 100);  
Result  (cost=0.00..0.26 rows=1 width=4) (actual time=1.944..1.944 rows=1 loops=1)  
  Output: consume_log(1, 10000, 100)  
  Buffers: shared hit=212  
 consume_log   
-------------  
   
(1 row)  
  
Time: 2.390 ms  

消耗1万个指标,约1.5秒。

10、压测2,查询某个维度,某个店铺的广告位推荐

vi test1.sql  
  
\set wid random(1,1000)  
\set uid random(1,10000)  
select * from tbl_score where wid=:wid and uid=:uid;  
  
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 32 -j 32 -T 120  
  
tps = 470514.018510 (including connections establishing)  
tps = 470542.672975 (excluding connections establishing)  

查询速度可以达到 45万 qps.

设计2

设计1的一个可以优化的点,在写入tbl_score_log时,如果不同维度的数据夹杂在一起输入,在消费时会引入IO放大的问题:

《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》

我们可以使用以上同样的方法来对维度数据分区存放,消费时也按分区消费。

1、创建维度描述表

create table tbl_weidu (  
  wid int primary key,  
  info json   
);   

2、创建TOP-K分值表

create table tbl_score (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  top10 text[] not null, -- top 10的item_score  
  primary key(wid,uid)  
);   

3、创建任务表,记录每次消耗LOG时的计数,每个维度一个计数器

create table tbl_score_task (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  cnt int8 default 0, -- 被计算次数  
  primary key(wid,uid)  
);   
  
create index idx_tbl_score_task_cnt on tbl_score_task (cnt);  

4、合并两个TEXT数组的函数

create or replace function merge_top10(  
text[],   -- old value  
text[],   -- new value  
ln int    -- 按score排序,保留 top N  
) returns text[] as $$  
  select array_agg(v2||'_'||v3 order by v3 desc) from   
  (  
    select v2,v3 from   
    (  
      select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from   
      (  
        select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info)   
        union all  
        select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info)   
      ) t  
    ) t where rn=1 order by v3 desc limit ln   
  ) t;  
$$ language sql strict immutable;  

5、日志表

create unlogged table tbl_score_log (  -- 流水数据,不计日志,数据库崩溃会丢失所有记录  
  item int8 not null,     -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);   

6、创建写入LOG的函数,解决<设计1>的IO放大问题,

create or replace function ins_score_log(  
  i_wid int,   
  i_uid int8,   
  i_item int8,   
  i_score float4   
) returns void as $$  
declare  
begin  
  execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);  
  insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;  
  exception when others then  
    execute format('create unlogged table tbl_score_log_%s_%s (like tbl_score_log including all) inherits (tbl_score_log)', i_wid, i_uid, i_item, i_score);  
    execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);  
    insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;  
end;  
$$ language plpgsql strict;   

但是请注意

《PostgreSQL 单库对象过多,触发Linux系统限制 (ext4_dx_add_entry: Directory index full!) (could not create file "xx/xx/xxxxxx": No space left on device)》

如果有以上问题,那么建议按UID或WID切库,将数据切到不同的库里面,避免单个目录文件过多。

7、消费LOG

create or replace function consume_log(  
  i_loop int,    -- 循环处理多少次,(多少组wid,uid)  
  i_limit int,   -- 对于同一组wid,uid,单次处理多少行  
  i_topn int     -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)  
) returns void as $$  
declare  
  v_wid int;  
  v_uid int8;  
  v_top1 text[];  
  i int := 0;  
begin  
  LOOP  
  exit when i >= i_loop;   --  loops  
  
  with a as   
  (select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)   
  update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid into v_wid, v_uid;  
  
  execute format ($_$  
  with  
  a as (  
    delete from tbl_score_log_%s_%s where ctid= any (array(  
      select ctid from tbl_score_log_%s_%s order by crt_time limit %s      -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)   
    from  
    (select item,score from a order by score desc limit %s) t    -- limit topn  
  $_$, v_wid, v_uid, v_wid, v_uid, i_limit, i_topn   
  ) into v_top1;    
  
  -- raise notice '%', v_top1;  
    
  if v_top1 is null then  
    continue;  
  end if;  
  
  insert into tbl_score   
  values (v_wid, v_uid, v_top1)   
  on conflict (wid,uid)   
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where   
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);   
   
  i := i+1;  
  END LOOP;  
end;  
$$ language plpgsql strict;   

8、写入压测

vi test.sql  
\set wid random(1,1000)  
\set uid random(1,10000)  
\set item random(1,100000000)  
select ins_score_log (:wid,:uid::int8,:item::int8,(random()*100)::float4);  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  
  
tps = 146606.220095 (including connections establishing)  
tps = 146614.705007 (excluding connections establishing)  

所有分区都建好之后,由于使用了动态SQL,写入只有15万行/s左右。

9、消耗LOG,合并到SCORE表

postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
postgres=# \timing  
Timing is on.  
postgres=# select * from tbl_score limit 10;  
 wid | uid  |                top10                  
-----+------+-------------------------------------  
 115 |   69 | {989915_22.2217}  
 441 | 3914 | {7521898_39.2669}  
 423 | 7048 | {75494665_92.5439}  
 789 | 1335 | {57756208_23.4602}  
 776 | 8065 | {41134454_46.8727}  
 785 | 6248 | {76364646_93.4671,94065193_69.2552}  
 567 | 7539 | {97116865_6.93694}  
 207 | 6926 | {45163995_14.1626}  
 788 | 9025 | {73053901_80.3204}  
 334 | 2805 | {80532634_78.1224}  
(10 rows)  
  
Time: 0.300 ms  
postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
Time: 3677.130 ms (00:03.677)  
postgres=# select consume_log(1, 10000, 100);  
LOG:  duration: 0.105 ms  plan:  
Query Text: with a as   
  (select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)   
  update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid  
Update on public.tbl_score_task t  (cost=0.60..2.85 rows=1 width=62) (actual time=0.099..0.100 rows=1 loops=1)  
  Output: t.wid, t.uid  
  Buffers: shared hit=13  
  CTE a  
    ->  Limit  (cost=0.28..0.32 rows=1 width=26) (actual time=0.036..0.036 rows=1 loops=1)  
          Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
          Buffers: shared hit=4  
          ->  LockRows  (cost=0.28..271.41 rows=7057 width=26) (actual time=0.035..0.035 rows=1 loops=1)  
                Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
                Buffers: shared hit=4  
                ->  Index Scan using idx_tbl_score_task_cnt on public.tbl_score_task  (cost=0.28..200.84 rows=7057 width=26) (actual time=0.018..0.018 rows=1 loops=1)  
                      Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
                      Buffers: shared hit=3  
  ->  Nested Loop  (cost=0.28..2.53 rows=1 width=62) (actual time=0.059..0.060 rows=1 loops=1)  
        Output: t.wid, t.uid, (t.cnt + 1), t.ctid, a.*  
        Inner Unique: true  
        Buffers: shared hit=7  
        ->  CTE Scan on a  (cost=0.00..0.02 rows=1 width=48) (actual time=0.046..0.047 rows=1 loops=1)  
              Output: a.*, a.wid, a.uid  
              Buffers: shared hit=4  
        ->  Index Scan using tbl_score_task_pkey on public.tbl_score_task t  (cost=0.28..2.50 rows=1 width=26) (actual time=0.009..0.009 rows=1 loops=1)  
              Output: t.wid, t.uid, t.cnt, t.ctid  
              Index Cond: ((t.wid = a.wid) AND (t.uid = a.uid))  
              Buffers: shared hit=3  
LOG:  duration: 24.624 ms  plan:  
Query Text:   
  with  
  a as (  
    delete from tbl_score_log_3_5 where ctid= any (array(  
      select ctid from tbl_score_log_3_5 order by crt_time limit 10000      -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)   
    from  
    (select item,score from a order by score desc limit 100) t    -- limit topn  
    
Aggregate  (cost=279.53..279.54 rows=1 width=32) (actual time=24.619..24.619 rows=1 loops=1)  
  Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)  
  Buffers: shared hit=39297  
  CTE a  
    ->  Delete on public.tbl_score_log_3_5 tbl_score_log_3_5_1  (cost=267.76..278.86 rows=10 width=6) (actual time=10.193..19.993 rows=10000 loops=1)  
          Output: tbl_score_log_3_5_1.item, tbl_score_log_3_5_1.score  
          Buffers: shared hit=39297  
          InitPlan 1 (returns $0)  
            ->  Limit  (cost=0.42..267.75 rows=10000 width=14) (actual time=0.017..7.185 rows=10000 loops=1)  
                  Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time  
                  Buffers: shared hit=9297  
                  ->  Index Scan using tbl_score_log_3_5_crt_time_idx on public.tbl_score_log_3_5  (cost=0.42..3907.05 rows=146135 width=14) (actual time=0.016..5.319 rows=10000 loops=1)  
                        Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time  
                        Buffers: shared hit=9297  
          ->  Tid Scan on public.tbl_score_log_3_5 tbl_score_log_3_5_1  (cost=0.01..11.11 rows=10 width=6) (actual time=10.188..13.238 rows=10000 loops=1)  
                Output: tbl_score_log_3_5_1.ctid  
                TID Cond: (tbl_score_log_3_5_1.ctid = ANY ($0))  
                Buffers: shared hit=19297  
  ->  Limit  (cost=0.37..0.39 rows=10 width=12) (actual time=24.433..24.461 rows=100 loops=1)  
        Output: a.item, a.score  
        Buffers: shared hit=39297  
        ->  Sort  (cost=0.37..0.39 rows=10 width=12) (actual time=24.432..24.443 rows=100 loops=1)  
              Output: a.item, a.score  
              Sort Key: a.score DESC  
              Sort Method: top-N heapsort  Memory: 32kB  
              Buffers: shared hit=39297  
              ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=12) (actual time=10.195..22.790 rows=10000 loops=1)  
                    Output: a.item, a.score  
                    Buffers: shared hit=39297  
LOG:  duration: 0.084 ms  plan:  
Query Text: insert into tbl_score   
  values (v_wid, v_uid, v_top1)   
  on conflict (wid,uid)   
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where   
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  
Insert on public.tbl_score  (cost=0.00..0.01 rows=1 width=44) (actual time=0.083..0.083 rows=0 loops=1)  
  Conflict Resolution: UPDATE  
  Conflict Arbiter Indexes: tbl_score_pkey  
  Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))  
  Tuples Inserted: 1  
  Conflicting Tuples: 0  
  Buffers: shared hit=4  
  ->  Result  (cost=0.00..0.01 rows=1 width=44) (actual time=0.001..0.001 rows=1 loops=1)  
        Output: $5, $6, $7  
LOG:  duration: 26.335 ms  plan:  
Query Text: select consume_log(1, 10000, 100);  
Result  (cost=0.00..0.26 rows=1 width=4) (actual time=26.329..26.329 rows=1 loops=1)  
  Output: consume_log(1, 10000, 100)  
  Buffers: shared hit=39388  
 consume_log   
-------------  
   
(1 row)  
  
Time: 26.937 ms  

设计3

与设计1类似,只是在前面再加一个离散写入表,定期对离散表排序后写入tbl_score_log表,再从tbl_score_log消费(与设计1保持一致),解决IO放大问题。

使用AB表切换:

create unlogged table tbl_score_log_a (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create unlogged table tbl_score_log_b (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   

例如堆积了2000万记录后,排序写入tbl_score_log

begin;  
lock table tbl_score_log_a in ACCESS EXCLUSIVE mode;   
insert into tbl_score_log select * from tbl_score_log_a order by wid,uid,crt_time;  
truncate tbl_score_log_a;  
end;  

设计4

与设计1类似,只是每次计算的是多个维度而不是一个维度。

单次计算多个维度的TOP-K,参考这种方法:

《PostgreSQL 递归妙用案例 - 分组数据去重与打散》

设计1采用每个维度计算一次的方法,如果使用设计1,那么会导致IO放大,而如果使用单次计算多个维度的方法,IO放大的问题就没了。(但是建议这种方法单次计算更大量的数据(比如一次计算1000万条),否则可能造成tbl_score更新频次过多的问题(单个维度多次消耗,多次更新))

与设计1不同的设计之处如下:

create unlogged table tbl_score_log (    
  wid int not null,   -- 维度ID    
  uid int8 not null,  -- ToB 店铺ID    
  item int8 not null, -- 商品ID    
  score float4 not null,  -- 打分    
  crt_time timestamp not null     
);     
    
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);    
create or replace function consume_log(    
  i_limit int,   -- 单次处理多少行    
  i_topn int     -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)    
) returns void as $$    
declare    
begin    
      
  with    
  a as (    
    delete from tbl_score_log where ctid= any (array(    
      select ctid from tbl_score_log order by crt_time limit i_limit     -- limit batch    
    )) returning wid,uid,item,score    
  )    
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t   
      where rn <= i_topn  -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)     -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  -- limit topn 
  ;  
end;    
$$ language plpgsql strict;    
select consume_log(10000000,100);  

或者可以直接使用如下SQL来进行消费例如

  with    
  a as (    
    delete from tbl_score_log where ctid= any (array(    
      select ctid from tbl_score_log order by crt_time limit 10000000     -- limit batch    
    )) returning wid,uid,item,score    
  )    
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t   
      where rn <= 100  -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100)     -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100)  -- limit topn 
  ;  
                                                                                     QUERY PLAN                                                                                      
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Insert on public.tbl_score  (cost=36744.69..36745.17 rows=3 width=44) (actual time=69966.565..69966.565 rows=0 loops=1)
   Conflict Resolution: UPDATE
   Conflict Arbiter Indexes: tbl_score_pkey
   Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, 100))
   Tuples Inserted: 317084
   Conflicting Tuples: 634683
   Buffers: shared hit=13811948 read=7001 dirtied=7001
   CTE a
     ->  Delete on public.tbl_score_log tbl_score_log_1  (cost=36733.22..36744.32 rows=10 width=6) (actual time=968.724..1891.686 rows=1000000 loops=1)
           Output: tbl_score_log_1.wid, tbl_score_log_1.uid, tbl_score_log_1.item, tbl_score_log_1.score
           Buffers: shared hit=4007463
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.43..36733.21 rows=1000000 width=14) (actual time=0.011..660.528 rows=1000000 loops=1)
                   Output: tbl_score_log.ctid, tbl_score_log.crt_time
                   Buffers: shared hit=999099
                   ->  Index Scan using idx_tbl_score_log_1 on public.tbl_score_log  (cost=0.43..427926.61 rows=11649711 width=14) (actual time=0.010..494.951 rows=1000000 loops=1)
                         Output: tbl_score_log.ctid, tbl_score_log.crt_time
                         Buffers: shared hit=999099
           ->  Tid Scan on public.tbl_score_log tbl_score_log_1  (cost=0.01..11.11 rows=10 width=6) (actual time=968.673..1265.722 rows=1000000 loops=1)
                 Output: tbl_score_log_1.ctid
                 TID Cond: (tbl_score_log_1.ctid = ANY ($0))
                 Buffers: shared hit=1999099
   ->  GroupAggregate  (cost=0.37..0.82 rows=3 width=44) (actual time=2907.640..8707.867 rows=951767 loops=1)
         Output: t.wid, t.uid, array_agg((((t.item)::text || '_'::text) || (t.score)::text) ORDER BY t.score DESC)
         Group Key: t.wid, t.uid
         Buffers: shared hit=4007463
         ->  Subquery Scan on t  (cost=0.37..0.72 rows=3 width=24) (actual time=2907.590..4711.497 rows=1000000 loops=1)
               Output: t.wid, t.uid, t.item, t.score, t.rn
               Filter: (t.rn <= 100)
               Buffers: shared hit=4007463
               ->  WindowAgg  (cost=0.37..0.59 rows=10 width=32) (actual time=2907.588..4395.127 rows=1000000 loops=1)
                     Output: a.wid, a.uid, a.item, a.score, row_number() OVER (?)
                     Buffers: shared hit=4007463
                     ->  Sort  (cost=0.37..0.39 rows=10 width=24) (actual time=2907.575..3283.649 rows=1000000 loops=1)
                           Output: a.wid, a.uid, a.score, a.item
                           Sort Key: a.wid, a.uid, a.score DESC
                           Sort Method: quicksort  Memory: 102702kB
                           Buffers: shared hit=4007463
                           ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=24) (actual time=968.728..2201.439 rows=1000000 loops=1)
                                 Output: a.wid, a.uid, a.score, a.item
                                 Buffers: shared hit=4007463
 Planning time: 0.623 ms
 Execution time: 69990.738 ms
(43 rows)

设计5

与设计4类似,只是我们不使用delete tbl_score_log的方式来消耗,而是将tbl_score_log使用分区表或类似AB表的方式,一次消耗一整张表。那么就不需要delete了,而是算完直接truncate.

begin;
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by wid,uid,score desc) as rn from tbl_score_log_a) t   -- AB表切换的方式
      where rn <= 100                      -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100)       -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100)     -- limit topn 
  ;  

  truncate tbl_score_log_a;
end;  

小结

1、使用预排的方法,使得查询响应得到保障,单个RDS PG实例可以做到45万的tps。

2、初始数据生成,可以从OSS导入(在HDB PG或ODPS中计算好,生成初始数据,写入OSS)。使用并行导入,可以加快导入速度,参考如下:

《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》

3、增量数据,通过记日志的形式写入RDS PG,在RDS PG中调度消费日志,合并到最终的tbl_score表。

增量(新增、删除、更新):

删除,设置SCORE=0

更新,UDF已包含(覆盖)。

其他思考

1、考虑引入概率计算?

《PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一》

2、单次计算多个维度的TOP-K,参考这种方法:

《PostgreSQL 递归妙用案例 - 分组数据去重与打散》

目前采用每个维度计算一次的方法,如果使用设计1,那么会导致IO放大,而如果使用单次计算多个维度的方法,IO放大的问题就没了。(但是建议这种方法单次计算更大量的数据(比如一次计算1000万条),否则可能造成tbl_score更新频次过多的问题(单个维度多次消耗,多次更新))

参考

《PostgreSQL 单库对象过多,触发Linux系统限制 (ext4_dx_add_entry: Directory index full!) (could not create file "xx/xx/xxxxxx": No space left on device)》

《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》

《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》

《PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
6月前
|
关系型数据库 MySQL 索引
MySQL数据表添加字段的三种方式
MySQL数据表添加字段的三种方式
5526 0
|
1月前
|
分布式计算 关系型数据库 MySQL
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型 图像处理 光通信 分布式计算 算法语言 信息技术 计算机应用
50 8
|
25天前
|
关系型数据库 MySQL 索引
MySQL的group by与count(), *字段使用问题
正确使用 `GROUP BY`和 `COUNT()`函数是进行数据聚合查询的基础。通过理解它们的用法和常见问题,可以有效避免查询错误和性能问题。无论是在单列分组、多列分组还是结合其他聚合函数的场景中,掌握这些技巧和注意事项都能大大提升数据查询和分析的效率。
90 0
|
1月前
|
关系型数据库 MySQL Java
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
32 0
|
3月前
|
SQL 关系型数据库 MySQL
MySQL数据库中给表添加字段并设置备注的脚本编写
通过上述步骤,你可以在MySQL数据库中给表成功添加新字段并为其设置备注。这样的操作对于保持数据库结构的清晰和最新非常重要,同时也帮助团队成员理解数据模型的变化和字段的具体含义。在实际操作中,记得调整脚本以适应具体的数据库和表名称,以及字段的详细规范。
74 8
|
2月前
|
SQL 存储 关系型数据库
MySQL新增字段/索引会不会锁表?
MySQL新增字段/索引会不会锁表?
186 0
|
2月前
|
SQL 关系型数据库 MySQL
MySQL 查询某个字段含有字母数字的值
MySQL 查询某个字段含有字母数字的值
82 0
|
2月前
|
存储 关系型数据库 MySQL
MySQL 字符字段长度设置详解:语法、注意事项和示例
MySQL 字符字段长度设置详解:语法、注意事项和示例
255 0
|
2月前
|
关系型数据库 MySQL 数据库管理
MySQL技术指南:如何更改数据字段的前几位数字
MySQL技术指南:如何更改数据字段的前几位数字
61 0
|
3月前
|
SQL 关系型数据库 MySQL
MySQL根据某个字段包含某个字符串或者字段的长度情况更新另一个字段的值,如何写sql
MySQL根据某个字段包含某个字符串或者字段的长度情况更新另一个字段的值,如何写sql
211 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版