PgSQL · 应用案例 · 手机行业分析、决策系统设计-实时圈选、透视、估算

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介: 背景经营分析、决策支持是现代企业的一个让数据发挥有效价值的分析型系统。在各个行业中随处可见,例如共享充电宝中,协助销售了解实时的设备租赁情况,销售业绩。在电商中,协助小二和商户发掘目标用户群体。金融行业中,协助输出国民的存款、消费、贷款的画像。

背景

经营分析、决策支持是现代企业的一个让数据发挥有效价值的分析型系统。

在各个行业中随处可见,例如共享充电宝中,协助销售了解实时的设备租赁情况,销售业绩。在电商中,协助小二和商户发掘目标用户群体。金融行业中,协助输出国民的存款、消费、贷款的画像。

PostgreSQL, Greenplum都是非常适合于经营分析、决策支持的数据库。因为它们具备了一些特性,适合实时的分析透视。(流式计算、合并写入、阅后即焚、GIN倒排索引、varbit类型、列存储、BITMAP合并扫描、HLL估值类型、采样算法等等)。

我也写过很多实际的应用案例,可以参考本文末尾。

经营分析系统的需求大同小异,在手机行业中,以imei或imsi为KEY,每个手机根据它的用户的行为,生成一些属性,针对每个属性,划分出不同的标签,形成了手机用户的画像。再针对画像进行人群的圈选、透视,协助分析。

例如,基于PostgreSQL数组以及GIN索引的设计:

pic

经营分析设计示例

1、目标设计

2、表结构设计

3、属性表

4、标签表

5、标签表索引设计

6、打标签(含新增、更新、删除标签)测试

7、圈选测试

8、透视测试

9、决策设计示例

流式+函数式计算

结构设计

1、手机用户属性表

create table tbl1 (  
  imei text primary key,     -- 手机唯一标识  
  v1 int,        -- 年龄  
  v2 float8,     -- 收入  
  v3 geometry,   -- 住址经纬  
  v4 geometry,   -- 公司经纬  
  v5 char(1),    -- 性别  
  v6 timestamp,  -- 最后活跃时间  
  v7 int2,       -- 每日在线时长  
  v8 int2,       -- 星座  
  v9 text,       -- 其他标签。。。。。  
  ......  
);  

2、标签元数据表

create table tbl2 (  
  tagid int primary key,    -- 标签名  
  desc text,    -- 描述,例如性别,年龄分段,收入分段,区域等等,作为一个标签标识。  
);  

3、标签表

create table tbl3 (  
  imei text primary key,   -- 手机唯一标识  
  tagids int[],            -- 标签数组  
  ins_tags int[],          -- 合并操作需要的中间字段  
  del_tags int[]           -- 合并操作需要的中间字段  
);  
  
create index idx_tbl3_tagids on tbl3 using gin (tagids gin__int_ops);  
或
create index idx_tbl3_tagids on tbl3 using gist (tagids gist__intbig_ops);  
或
create index idx_tbl3_tagids on tbl3 using gist (tagids gist__int_ops);  

4、标签表与属性表实际上可以合一,在透视时,可以避免一次JOIN(降低透视的耗时),但是会引入更新IO放大的问题,因为属性表可能是宽表。

根据实际的性能情况来选择是否合一。

需求与SQL设计

1、圈人

select imei from tbl3 where tagids @> array[标签1, 标签2];  -- 查找包含标签1,标签2的人群。  
  
select imei from tbl3 where tagids && array[标签1, 标签2];  -- 查找包含标签1,标签2中任意一个或多个的人群。  
  
select imei from tbl3 where tagids && array[标签1, 标签2] and tagid @> array[标签3, 标签4];  -- 查找包含标签3,标签4。同时包含标签1,标签2中任意一个或多个的人群。  

2、针对圈出人群的精准透视

select v8,count(*) from tbl1 where  
  imei = any (array(  
                     select imei from tbl3 where tagids @> array[标签1, 标签2]  
             )     )  
group by v8;  

3、新增或追加标签

使用intarray插件,简化数组交、并、差操作。

create extension intarray;  
insert into tbl3 (imei, tagids) values (?, ?[]) on conflict (imei) do update set tagids=tbl3.tagids|excluded.tagids;  

4、删标签

update tbl3 set tagids = tagids - ?[] where imei=?;  

5、更新标签

update tbl3 set tagids = ?[] where imei=?;  

6、批量并行新增、追加、删除、更新标签优化

如果要一次性操作很多条记录(例如1000万条记录),并且有并行的贴标签操作(同一条用户被多个SQL更新)。需要注意两个问题:

6.1 大事务导致膨胀的问题,建议分段操作。

6.2 行锁冲突问题,建议新增(插入),然后合并到标签表。

优化方法,

实现标签最终一致性。

将直接增、删、改标签表,改成写行为日志tag_log,采用任务调度,批量合并到标签表:

create table tag_log (  
  imei text,    -- 手机唯一标识  
  action text,  -- insert, delete  表示增加、删除标签  (更新需求应该没有,如有,直接到标签表操作)  
  tagids int[], -- 标签IDs  
  crt_time timestamp default clock_timestamp()   -- 时间  
);  
  
create index idx_tag_log_1 on tag_log (crt_time);  
  
-- 16个分区表
do language plpgsql $$
declare
begin
  for i in 0..15 loop
    execute format('create table tag_log%s (like tag_log including all) inherits(tag_log)', i);
  end loop;
end;
$$;

串行任务,阅后即焚(假设-99999999是一个永远不存在的TAGID)

-- CTE语法,支持阅后即焚的批量合并方法  
with tmp as (delete from tag_log where ctid = any ( array (  
  select ctid from tag_log order by crt_time limit 10000  -- 按时序,批量取1万条  
  )) returning * )  
, tmp1 as (select imei,  
             uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,  
             uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags  
           from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)  
insert into tbl3 (imei, tagids, ins_tags, del_tags)  
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1  
 on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags;  

并行任务,阅后即焚

例如开启16个并行  
  
abs(mod(hashtext(imei), 16))=?  
-- CTE语法,支持阅后即焚的批量合并方法  
with tmp as (delete from tag_log where ctid = any ( array (  
  select ctid from tag_log where abs(mod(hashtext(imei), 16))=0 order by crt_time limit 10000  -- 按时序,批量取1万条,按HASH并行  
  )) returning * )  
, tmp1 as (select imei,  
             uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,  
             uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags  
           from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)  
insert into tbl3 (imei, tagids, ins_tags, del_tags)  
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1  
 on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags;  

写成函数,方便调用

create or replace function consume_tag_log(mo int, mov int, lim int) returns void as $$  
declare  
begin  
  execute format($_$with tmp as (delete from tag_log where ctid = any ( array (  
  select ctid from tag_log where abs(mod(hashtext(imei), %s))=%s order by crt_time limit %s  
  )) returning * )  
, tmp1 as (select imei,  
             uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,  
             uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags  
           from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)  
insert into tbl3 (imei, tagids, ins_tags, del_tags)  
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1  
 on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,  
 mo, mov, lim);  
end;  
$$ language plpgsql strict;  
  
  
select consume_tag_log(16,0,10000);   -- 并行处理  
select consume_tag_log(16,1,10000);  
.....  
select consume_tag_log(16,15,10000);  
create or replace function consume_tag_log(lim int) returns void as $$  
declare  
begin  
  execute format($_$with tmp as (delete from tag_log where ctid = any ( array (  
  select ctid from tag_log order by crt_time limit %s  
  )) returning * )  
, tmp1 as (select imei,  
             uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,  
             uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags  
           from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)  
insert into tbl3 (imei, tagids, ins_tags, del_tags)  
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1  
 on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,  
 lim);  
end;  
$$ language plpgsql strict;  
  
  
select consume_tag_log(10000);  -- 每次处理1万条  

创建调度任务,执行消费函数调度即可。

阅后即焚的处理速度,每秒 百万行。

1、标签取值范围5万,正态分布

pic

2、多表批量写入函数

create or replace function ins(
  imei text,
  tagids int[]
) returns void as $$
declare
  suffix int := abs(mod(hashtext(imei),16)); 
begin
  execute format($_$insert into tag_log%s values ('%s', 'insert', '%s'::int[])$_$, suffix, imei, tagids);
end;
$$ language plpgsql strict;

3、多表批量消费

标签表分表

do language plpgsql $$
declare
begin
  for i in 0..15 loop
    execute format('create table tbl3_%s (like tbl3 including all) inherits(tbl3)', i);
  end loop;
end;
$$;

多表批量消费

CREATE OR REPLACE FUNCTION public.consume_tag_log(suffix int, lim integer)
 RETURNS void
 LANGUAGE plpgsql
 STRICT
AS $function$
declare
begin
  execute format($_$with tmp as (delete from tag_log%s where ctid = any ( array (  
  select ctid from tag_log%s order by crt_time limit %s  -- 按时序,批量取1万条,按HASH并行
  )) returning * )   
, tmp1 as (select imei, 
             uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,   
     uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
   from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)  
insert into tbl3_%s (imei, tagids, ins_tags, del_tags) 
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1 
 on conflict (imei) do update set tagids=((tbl3_%s.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,
 suffix, suffix, lim, suffix, suffix);
end;
$function$;

4、数据写入压测脚本

vi test.sql  
  
\set tag1 random_gaussian(1, 50000, 20)  
\set tag2 random_gaussian(1, 50000, 20)  
\set tag3 random_gaussian(1, 50000, 20)  
\set tag4 random_gaussian(1, 50000, 20)  
\set tag5 random_gaussian(1, 50000, 20)  
\set tag6 random_gaussian(1, 50000, 20)  
\set tag7 random_gaussian(1, 50000, 20)  
\set tag8 random_gaussian(1, 50000, 20)  
\set imei random(1,1000000000)  
select ins(:imei, (array[:tag1,:tag2,:tag3,:tag4,:tag5,:tag6,:tag7,:tag8])::int[]);
  
nohup pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 3000 >./tag.log 2>&1 &  

5、数据消费,并行调度

用秒杀技术实现并行调度,避免单个HASH被重复调用。

6、压测结果

写入速度

单条单步写入,约 14.3万 行/s  
  
改成多表批量写入,可以提高到100万+ 行/s  

消费速度

单表并行批量消费,约 25.5万 行/s  
  
改成多表并行批量消费,可以提高到 100万+ 行/s  

查询速度,毫秒级

postgres=# explain (analyze,verbose,timing,costs,buffers) select count(imei) from tbl3 where tagids @> (array[25281,25288])::int[];
                                                           QUERY PLAN                                                           
--------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=224.50..224.51 rows=1 width=8) (actual time=2.745..2.746 rows=1 loops=1)
   Output: count(imei)
   Buffers: shared hit=193
   ->  Bitmap Heap Scan on public.tbl3  (cost=218.44..224.49 rows=5 width=33) (actual time=2.716..2.738 rows=9 loops=1)
         Output: imei, tagids, ins_tags, del_tags
         Recheck Cond: (tbl3.tagids @> '{25281,25288}'::integer[])
         Heap Blocks: exact=9
         Buffers: shared hit=193
         ->  Bitmap Index Scan on idx_tbl3_tagids  (cost=0.00..218.44 rows=5 width=0) (actual time=2.707..2.707 rows=9 loops=1)
               Index Cond: (tbl3.tagids @> '{25281,25288}'::integer[])
               Buffers: shared hit=184
 Planning time: 0.165 ms
 Execution time: 2.797 ms
(13 rows)

除了以上基于数组、GIN索引的设计,PostgreSQL还有一些技术,可以用在经营分析系统。

技术1 实时透视 - 技术之 - 流式统计

相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
SQL 分布式计算 数据可视化
滴滴出行大数据数仓实战
滴滴出行大数据数仓实战
441 0
滴滴出行大数据数仓实战
|
4月前
|
资源调度 供应链 算法
一文讲清智能排产系统搭建全攻略
本文将聚焦智能排产系统的落地路径:先阐释其“整合订单、工艺、资源等要素,实现自动化排产方案生成”的核心功能,再从基础数据搭建、智能排产工作台配置、生产报工闭环、经营看板分析四大模块,详解系统搭建的关键逻辑与实施步骤,为制造企业提供从排产混乱到精益协同的系统化解决方案。
|
分布式计算 Java Hadoop
MapReduce编程(一) Intellij Idea配置MapReduce编程环境
介绍如何在Intellij Idea中通过创建maven工程配置MapReduce的编程环境。 一、软件环境 我使用的软件版本如下: Intellij Idea 2017.1 Maven 3.3.9 Hadoop伪分布式环境( 安装教程可参考这里) 二、创建maven工程 打开Idea,file->new->Project,左侧面板选择maven工程。
2792 0
|
算法 数据处理 开发工具
Android平台RTSP|RTMP播放器如何回调YUV或RGB数据
在开发Android平台上的RTSP或RTMP播放器时,开发者不仅追求低延迟播放,还希望获取解码后的视频数据(如YUV或RGB格式),以便进行视觉算法分析。使用大牛直播SDK中的SmartPlayer,可在确保播放流畅的同时,通过设置外部渲染器(`SmartPlayerSetExternalRender`)来高效地回调原始视频数据。例如,对于RGBA数据,需实现`NTExternalRender`接口,并重写相关方法以处理数据和尺寸变化。同样地,对于I420(YUV)数据,也需要相应地实现接口以满足需求。这种方式使得开发者能在不影响常规播放功能的情况下,进行定制化的视频处理任务。
184 1
|
存储 安全 算法
Google 如何写设计文档
Google 如何写设计文档
607 0
|
关系型数据库 数据库 PostgreSQL
postgresql | 数据库| 生成2000W条的简单测试表
postgresql | 数据库| 生成2000W条的简单测试表
396 0
|
Web App开发 存储 数据可视化
VisualVM【实践 01】工具VisualVM下载使用及插件Visual GC示例说明HashMap初始化容量initialCapacity的影响(源码及visualvm_215.zip分享)
VisualVM【实践 01】工具VisualVM下载使用及插件Visual GC示例说明HashMap初始化容量initialCapacity的影响(源码及visualvm_215.zip分享)
300 0
|
消息中间件 NoSQL Java
RabbitMQ死信队列实战——解决订单超时未支付
RabbitMQ死信队列实战——解决订单超时未支付
1038 0
RabbitMQ死信队列实战——解决订单超时未支付
|
存储 Java 程序员
如何写好技术文档——来自Google十多年的文档经验
如何写好技术文档——来自Google十多年的文档经验
698 2
如何写好技术文档——来自Google十多年的文档经验
|
新零售 存储 供应链
严选库存中心设计实践
严选库存中心设计实践
578 0
下一篇
开通oss服务