阿里云RDS PG实践 - 流式标签 - 万亿级,实时任意标签圈人

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云数据库 RDS SQL Server,独享型 2核4GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

标签

PostgreSQL , 阅后即焚 , 流计算 , 标签


背景

varbitx是阿里云RDS PG提供的一个BIT操作插件,使用这个插件已经成功的帮助用户提供了万亿级的毫秒级实时圈人功能。

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

结合阅后即焚的流式批量处理,schemaless UDF,可以实现高效的增、删标签,以及毫秒级别的按标签圈人。

《PostgreSQL 在铁老大订单系统中的schemaless设计和性能压测》

目标是百亿级用户体量,千万级标签。

架构图

pic

阿里云varbitx 添加标签的函数接口

1. set_bit_array

set_bit_array (  
  varbit,  
  int,   -- 目标BIT (0|1)  
  int,   -- 填充BIT (0|1)  
  int[]  -- 目标位置  
) returns varbit  
   
  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1  
  例如 set_bit_array('111100001111', 0, 1, array[1,15]) 返回 1011000011111110  

正在添加这个函数,用于一次性处理添加和删除的BIT的设置

2. set_bit_array

set_bit_array (  
  varbit,  
  int,     -- 1目标BIT (0|1)  
  int[]    -- 1目标位置  
  int,     -- 2目标BIT (0|1)  
  int[],   -- 2目标位置  
  int      -- 填充BIT (0|1)  
) returns varbit  
  
  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1  
  例如 set_bit_array('111100001111', 0, array[1,15], 1, array[0,4], 0) 返回 1011100011111110  

阿里云varbitx 从bitmap得到字典ID

1. bit_posite

bit_posite (    
  varbit,     
  int,      -- (0|1)    
  boolean     
) returns int[]      
    
  返回 0|1 的位置,(起始位=0), true时正向返回,false时反向返回        
  例如 bit_posite ('11110010011', 1, true) 返回 [0,1,2,3,6,9,10]      
       bit_posite ('11110010011', 1, false) 返回 [10,9,6,3,2,1,0]     

从字典ID得到USERID

select uid from dict where id = any ( bit_posite(x,x,x) );      

demo

1 字典表

将USERID转换为ARRAY下标,即字典表

首先需要用到无缝自增ID

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

1、字典表如下

create table dict(id int8 primary key, uid int8 not null);    
  
create unique index idx_dict_uid on dict (uid);    

2 生成已有用户

create table t_uid (  
  uid int8 primary key  -- 已有用户的USER ID  
);  

插入一批 USERID (一亿)

insert into t_uid select id from generate_series(1,100000000) t(id) order by random() ;  

3 已有用户,一次性生成mapping下标

create sequence seq minvalue 0 start 0;    
  
  
insert into dict select nextval('seq'), uid from t_uid;    
  
  
select min(id),max(id),count(*) from dict;    
 min |   max    |   count     
-----+----------+-----------  
   0 | 99999999 | 100000000  
(1 row)  

4 无缝自增下标 函数

新增的用户,通过这个函数写入,确保无缝自增下标:

create or replace function f_uniq(i_uid int8) returns int as $$  
declare  
  newid int;    
  i int := 0;    
  res int;    
begin  
  loop   
    if i>0 then   
      perform pg_sleep(0.2*random());  
    else  
      i := i+1;  
    end if;  
  
    -- 获取已有的最大ID+1 (即将插入的ID)  
    select max(id)+1 into newid from dict;    
    if newid is not null then  
      -- 获取AD LOCK  
      if pg_try_advisory_xact_lock(newid) then  
        -- 插入  
        insert into dict (id,uid) values (newid,i_uid) ;    
        -- 返回此次获取到的UID  
        return newid;    
      else  
        -- 没有获取到AD LOCK则继续循环  
        continue;    
      end if;    
    else  
      -- 表示这是第一条记录,获取AD=0 的LOCK  
      if pg_try_advisory_xact_lock(0) then  
        insert into dict (id, uid) values (0, i_uid) ;    
        return 0;    
      else  
        continue;     
      end if;    
    end if;    
  end loop;    
    
  -- 如果因为瞬态导致PK冲突了,继续调用  
  exception when others then  
    select f_uniq(i_uid) into res;    
    return res;    
end;  
$$ language plpgsql strict;  

例如,新增一个USERID。

select f_uniq(?);  

5 从ID或者USERID或从USERID获得ID

create or replace function get_id_from_uid (int8) returns int8 as $$  
  select id from dict where uid=$1;  
$$ language sql strict;  
  
create or replace function get_uid_from_id (int8) returns int8 as $$  
  select uid from dict where id=$1;  
$$ language sql strict;  

6 标签描述表

create table t_tags(tagid int primary key, desc text);    

7 流式标签表

1、UID范围映射表,落在什么区间的ID,对应什么后缀的表名

建议在写入时,自动写到不同的t_tag_log表,这样的话,不需要在消费时过滤。

create table t_mapping (  
  dict_id int8range,      -- 字典ID区间  
  suffix text unique             -- 表名后缀  
);  
  
alter table t_mapping add constraint ck_exclude_dict_id exclude using gist(dict_id with &&);  -- 防止交叉区间  
insert into t_mapping values (int8range(0,100000000), '0');  
insert into t_mapping values (int8range(100000000,200000000), '1');  
insert into t_mapping values (int8range(200000000,300000000), '2');  
insert into t_mapping values (int8range(300000000,400000000), '3');  

2、流式标签主表

create table t_tag_log (  
  dict_id int8,        -- 用户下标ID    
  action int2,         -- 删除0、新增1    
  tagid int,           -- 标签ID   
  crt_time timestamp   -- 时间    
);  
  
create index idx_t_tag_log on t_tag_log (crt_time);  

3、创建流式标签子表

按tagid分区,按t_mapping后缀对应关系分区。

do language plpgsql $$  
declare  
  x text;  
begin  
  for i in 0..63 loop  
    for x in select suffix from t_mapping loop  
      execute format('create table t_tag_log_%s_%s (like t_tag_log including all) inherits (t_tag_log)', i, x);  
    end loop;  
  end loop;  
end;  
$$;  
  
postgres=# \dt t_tag_log_*  
             List of relations  
 Schema |      Name      | Type  |  Owner     
--------+----------------+-------+----------  
 public | t_tag_log_0_0  | table | postgres  
 public | t_tag_log_0_1  | table | postgres  
 public | t_tag_log_0_2  | table | postgres  
 public | t_tag_log_0_3  | table | postgres  
 public | t_tag_log_10_0 | table | postgres  
 public | t_tag_log_10_1 | table | postgres  
 public | t_tag_log_10_2 | table | postgres  
 public | t_tag_log_10_3 | table | postgres  
 public | t_tag_log_11_0 | table | postgres  
 public | t_tag_log_11_1 | table | postgres  
.....................

4、使用schema lessUDF写入标签信息

create or replace function ins_tag(v_uid int8, v_action int2, v_tagid int, v_crt_time timestamp) returns void as $$  
declare  
  i int := mod(v_tagid, 64);  
  x text;   
begin  
  select suffix into x from t_mapping where dict_id @> $1;  
  execute format ('insert into t_tag_log_%s_%s (dict_id, action, tagid, crt_time) values (%s, %s, %s, %L)', i, x, get_id_from_uid(v_uid), v_action, v_tagid, v_crt_time);  
end;  
$$ language plpgsql strict;  

8 高速写入贴标签、删除标签

vi test.sql  
  
\set uid random(1,100000000)  
\set tagid random(1,100000)  
\set action random(0,1)  
select ins_tag (:uid::int8, :action::int2, :tagid, now()::timestamp);  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  

单表单条写入性能:

单表或多表 批量写入 性能可以超过100万行/s。

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 32  
number of threads: 32  
duration: 120 s  
number of transactions actually processed: 19013298  
latency average = 0.202 ms  
latency stddev = 0.267 ms  
tps = 158442.952245 (including connections establishing)  
tps = 158449.386772 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set uid random(1,100000000)  
         0.000  \set tagid random(1,100000)  
         0.000  \set action random(0,1)  
         0.200  insert into t_tag_log (dict_id, action, tagid, crt_time) values (get_id_from_uid(:uid), :action, :tagid, now());  

9 标签反转表

对应到前面的拆分模式,有两种模式,一种模式,使用多表存储。另一种模式使用单表存储。

1、多表存储。

标签反转主表

create table tag_userbitmap (  
  tagid int primary key,             -- 标签ID  
  usrebitmap varbit                  -- 用户bitmap  
);  

标签反转子表

do language plpgsql $$  
declare  
  x text;  
begin  
  for x in select suffix from t_mapping loop  
    execute format('create table tag_userbitmap_%s (like tag_userbitmap including all) inherits (tag_userbitmap)', x);  
  end loop;  
end;  
$$;  

tag不需要分区,因为增量更新时,没有行锁冲突,而且表大小也足够。

postgres=# \dt tag_userbitmap*  
              List of relations  
 Schema |       Name       | Type  |  Owner     
--------+------------------+-------+----------  
 public | tag_userbitmap   | table | postgres  
 public | tag_userbitmap_0 | table | postgres  
 public | tag_userbitmap_1 | table | postgres  
 public | tag_userbitmap_2 | table | postgres  
 public | tag_userbitmap_3 | table | postgres  
(5 rows)  

2、单表存储,则加一列区分OFFSET。

create table tag_userbitmap_single (
  tagid int ,               -- 标签ID
  bitmap_offset int ,       -- bit号段offset 值
  usrebitmap varbit,        -- 当前号段的bitmap
  primary key (tagid, bitmap_offset)
);

10 阅后即焚-流式消费标签,合并到标签反转表

不考虑分区表时,这样来进行消费。

with tmp as (delete from t_tag_log     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log order by crt_time limit 100000       -- 批量处理10万条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)  -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               -- 聚合为下标数组  
;  

11 schemaless UDF, 阅后即焚-流式消费标签,合并到标签反转表

对应tag_userbimap表的设计(单表、或多表),schemaless UDF也有不同。

1、多表:在UDF中自动拼接表名,将BITMAP合并到对应子表。

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$  
declare  
  v_tagid int;  
  v_action int2;  
  v_dict_id int[];  
  v_offset int;  
  v_fixed_dict_id int[];  
begin  
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   
  
  for v_tagid, v_action, v_dict_id in   
    execute format   
('  
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               
',   
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    
  
loop  
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;    
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;  
  execute format('insert into tag_userbitmap_%s (tagid, usrebitmap) values (%s, set_bit_array(''0'', %s, %s, %L))   
                  on conflict (tagid)   
                  do update set usrebitmap=set_bit_array(tag_userbitmap_%s.usrebitmap, %s, %s, %L)',   
                  v_suffix2, v_tagid, v_action, 0, v_fixed_dict_id, v_suffix2, v_action, 0, v_fixed_dict_id);  
end loop;  
end;  
$$ language plpgsql strict;  

2、单表:

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$  
declare  
  v_tagid int;  
  v_action int2;  
  v_dict_id int[];  
  v_offset int;  
  v_fixed_dict_id int[];  
begin  
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   
  
  for v_tagid, v_action, v_dict_id in   
    execute format   
('  
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               
',   
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    
  
loop  
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;    
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;  
  execute format('insert into tag_userbitmap (tagid, bitmap_offset, usrebitmap) values (%s, %s, set_bit_array(''0'', %s, %s, %L))   
                  on conflict (tagid, bitmap_offset)   
                  do update set usrebitmap=set_bit_array(tag_userbitmap.usrebitmap, %s, %s, %L)',   
                  v_suffix2, v_suffix1, v_action, 0, v_fixed_dict_id, v_action, 0, v_fixed_dict_id);  
end loop;  
end;  
$$ language plpgsql strict;  

3、单表和多表的函数接口是一样的,都是调用merge_tags函数,增量更新TAG,不同组合,允许并行调用。

以下QUERY可以并行调用:

select merge_tags(100000, 0, 0);  
select merge_tags(100000, 0, 1);  
select merge_tags(100000, 0, 2);  
select merge_tags(100000, 0, 3);  
.....  
select merge_tags(100000, 63, 0);  
select merge_tags(100000, 63, 1);  
select merge_tags(100000, 63, 2);  
select merge_tags(100000, 63, 3);  

12 查询包含哪些TAG的USEID

这个SQL可以略微修改,包括coalesce,补齐varbit长度到固定长度等。

full outer JOIN 得到最终的varbit。

根据bitmap求dict_id, 再根据dict_id求user_id。

多表:

select bit_posite(t1.usrebitmap||t2.usrebitmap||t3.usrebitmap||t4.usrebitmap, '1', true) 
from tag_userbitmap_0 t1 , ....
where tx.tagid in (....);

或单表:

create aggregate bit_agg (varbit) (sfunc = bitcat, stype=varbit) ;
select bit_posite(bit_and(tag), '1', true) from (
  select bit_agg(usrebitmap) from tag_userbitmap where tagid in (...) group by tagid
) t

详见用法

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

其他

1、维护字典表(删除僵尸用户)

2、同时收缩VARBITX

查询某个用户有哪些标签

这种点查,建议使用以下结构,数组作为标签。与本文相反。

create table t_user_tags(   
  uid int8 primary key,   -- 用户ID  
  tagid int[]             -- 标签ID  
);  

相似案例

《PostgreSQL手机行业经营分析、决策系统设计 - 实时圈选、透视、估算》

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》

《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
9天前
|
存储 容灾 安全
在阿里云RDS(Relational Database Service)迁移前准备目标区域选择
在阿里云RDS(Relational Database Service)迁移前准备目标区域选择
16 3
|
12天前
|
监控 NoSQL 关系型数据库
在进行RDS(例如阿里云的RDS)数据迁移后,评估数据一致性
在进行RDS(例如阿里云的RDS)数据迁移后,评估数据一致性
24 3
|
13天前
|
SQL 监控 关系型数据库
规划阿里云RDS跨区迁移业务需求迁移方案设计
规划阿里云RDS跨区迁移业务需求迁移方案设计
21 5
|
13天前
|
存储 弹性计算 关系型数据库
规划阿里云RDS跨区迁移业务需求数据量与迁移时间预估
规划阿里云RDS跨区迁移业务需求数据量与迁移时间预估
12 4
|
13天前
|
存储 运维 关系型数据库
规划阿里云RDS跨区迁移业务需求业务影响分析
规划阿里云RDS跨区迁移业务需求业务影响分析
12 4
|
1天前
|
弹性计算 大数据 测试技术
阿里云服务器价格,2024年最新阿里云服务器活动报价及租用收费标准参考
2024年阿里云服务器租用价格表更新,阿里云服务器价格更新了,不同时期阿里云服务器的租用价格不同,目前阿里云在官网活动中新增加了一款经济型e实例规格的云服务器。
|
1天前
|
弹性计算 大数据 测试技术
阿里云服务器租用价格 2024阿里云服务器新版报价及租用收费标准
阿里云服务器租用价格是多少?阿里云服务器价格由云服务器配置、实例规格、带宽等组成。阿里云服务器分为轻量应用服务器和云服务器ECS,轻量适合个人开发者使用,搭建轻量级的网站、测试环境使用;专业级如大数据、科学计算、高并发网站等需要使用云服务器ECS。2024年阿里云服务器租用价格表出炉!云服务器ECS经济型e实例2核2G、3M固定带宽99元一年、ECS u1实例2核4G、5M固定带宽、80G ESSD Entry盘优惠价格199元一年,轻量应用服务器2核2G3M带宽轻量服务器一年61元、2核4G4M带宽轻量服务器一年165元12个月、2核4G服务器30元3个月,幻兽帕鲁4核16G和8核32G服务
|
1天前
|
弹性计算 Ubuntu Linux
幻兽帕鲁/Palworld2024年阿里云自建服务器教程(新手攻略)
最近,幻兽帕鲁这款游戏在全球范围内掀起了一股热潮,其火爆程度可谓前所未有。短短几天内,同时在线人数已经突破了惊人的200万,这无疑给游戏厂商带来了巨大的压力。由于玩家数量激增,游戏服务器承受了前所未有的负载,导致游戏出现了严重的延时、掉线等问题,给玩家们的游戏体验带来了不小的困扰。本文旨在提供针对新手的《幻兽帕鲁/Palworld》自建主机教程,通过简单易懂的语言和详细的步骤,帮助新手用户快速掌握搭建私人游戏服务器的方法和技巧。
|
1天前
|
弹性计算 大数据 测试技术
2024阿里云服务器租用价格表(CPU/内存/带宽/磁盘收费标准)
阿里云服务器分为轻量应用服务器和云服务器ECS,轻量适合个人开发者使用,搭建轻量级的网站、测试环境使用;专业级如大数据、科学计算、高并发网站等需要使用云服务器ECS。2024年阿里云服务器租用价格表出炉!云服务器ECS经济型e实例2核2G、3M固定带宽99元一年、ECS u1实例2核4G、5M固定带宽、80G ESSD Entry盘优惠价格199元一年,轻量应用服务器2核2G3M带宽轻量服务器一年61元、2核4G4M带宽轻量服务器一年165元12个月、2核4G服务器30元3个月,幻兽帕鲁4核16G和8核32G服务器配置,云服务器ECS可以选择经济型e实例、通用算力u1实例、ECS计算型c7、通
|
1天前
|
弹性计算 大数据 测试技术
阿里云服务器租用多少钱?2024年阿里云服务器租用价格表出炉!
阿里云服务器租用多少钱?2024年阿里云服务器租用价格表出炉!云服务器ECS经济型e实例2核2G、3M固定带宽99元一年、ECS u1实例2核4G、5M固定带宽、80G ESSD Entry盘优惠价格199元一年,轻量应用服务器2核2G3M带宽轻量服务器一年61元、2核4G4M带宽轻量服务器一年165元12个月、2核4G服务器30元3个月,幻兽帕鲁4核16G和8核32G服务器配置,云服务器ECS可以选择经济型e实例、通用算力u1实例、ECS计算型c7、通用型g7、c8i、g8i等企业级实例规格。今天分享阿里云服务器租用费用最新报价:
25 0

相关产品

  • 云数据库 RDS MySQL 版