PostgreSQL 无会话、有会话模式 - 客服平均响应速度(RT)实时计算实践(窗口查询\流计算)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 标签PostgreSQL , 无会话 , 客服响应速度 , 触发器 , rule , 窗口查询背景通常客服系统可能存在一对多,多对多的情况。例如,我们在使用淘宝时,与店家交流时,你根本不知道后面的小二是一个人还是多个人共用一个账号,还有可能是多个人使用了多个账号但是对消费者只看到一个。

标签

PostgreSQL , 无会话 , 客服响应速度 , 触发器 , rule , 窗口查询


背景

通常客服系统可能存在一对多,多对多的情况。

例如,

我们在使用淘宝时,与店家交流时,你根本不知道后面的小二是一个人还是多个人共用一个账号,还有可能是多个人使用了多个账号但是对消费者只看到一个。

例如:

小二(n)账号 -> 统一对外账号 -> 消费者

还有的情况是一个小二为多个消费者服务:

小二账号 -> 统一对外账号 -> 消费者(n)

小二重要的KPI之一是响应速度,因为这直接反应到消费者的感受上。如果消费者一个问题,很久没人回复,可能就直接关闭页面,更换其他商家了。

那么如何统计响应速度呢?

通常来说,需要从消费者维度看待响应速度,因为一个问题可能被多个小二回复,也可能被1个小二回复,这种情况下,应该统计第一反馈时间作为响应时间。

另一方面,如果系统没有会话机制的话,统计起来会比较麻烦。(并且,一个真实的会话里面的若干次交互,可能统计时会被抽象成若干的“虚拟会话”)

我们来看个例子。

1 无会话模式的响应速度统计

假设数据以TS字段顺序到达为前提(通常这种场景,按TS到达的可能性较大,或者你可以使用clock_timestamp()来作为这个时间,可能性就更大了。),后面会讲如果不这样有什么问题,以及解决方案。

无会话模式,适合于客户发起消息后,后台任意分配一个客服给他(或者分配一个客服池子给他),第一时间响应他的可以是任意客服。

1、客服、客户交谈表(只展示重要字段)

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

2、客服的平均响应时间

一个客户的最早发言时间,下一时刻任意客服最早回复这位客户的回复时间。(中间部分略过)

例如

1, 2, 0001, false   -- 客户2给客服1发信息时间,作为一次虚拟会话的开始时间    
100, 2, 0003, false   -- 客户2给客服100发信息时间,如果比下一条先到达,这次虚拟会话 ,按这种方法将计算不到。    
22, 2, 0002, true   -- 客服22给客户2发信息时间,作为一次虚拟会话的最早响应时间    
1, 2, 0005, true   -- 客服1给客户2发信息时间    

3、实时计算解决这个问题

结果表结构

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int default -1,  -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);    

4、实时处理逻辑

when insert into tbl

if    
b -> a 逻辑(客户发给客服)    
    
select 1 from tbl_result where b=? and a = -1;    
if not found then     
    insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
    -- update set b_ts=excluded.b_ts     
    -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
-- else    
  -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
end if;    
    
if    
a -> b 逻辑(客服发给客户)    
    
select 1 from tbl_result where b=? and a = -1;    
if found then    
update tbl_result set a=? , a_ts=? where b=? and a = -1 and NEW.ts >= b_ts;    
-- else    
  -- 说明已有人回复,不需要更新    
end if;    

5、tbl的insert trigger函数

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if found then    
      update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    

创建触发器

create trigger tg0 after insert on tbl for each row execute procedure tb();    

6、写入压测

假设有100个客服    
100万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,100)    
\set b random(1,1000000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
postgres=# select count(*) from tbl;    
  count       
----------    
 19805266    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 5202622    
(1 row)    

7、算法校验,正确

postgres=# select * from tbl where b=1 order by ts limit 10;    
 a  | b |             ts             | direct     
----+---+----------------------------+--------    
 25 | 1 | 2018-08-15 09:43:22.862526 | f    
 17 | 1 | 2018-08-15 09:43:25.180255 | f    
 63 | 1 | 2018-08-15 09:43:29.901536 | t    
  3 | 1 | 2018-08-15 09:43:31.906753 | t    
 38 | 1 | 2018-08-15 09:43:52.035444 | f    
 24 | 1 | 2018-08-15 09:43:52.679127 | f    
 69 | 1 | 2018-08-15 09:43:54.855426 | t    
 44 | 1 | 2018-08-15 09:44:05.735922 | t    
 75 | 1 | 2018-08-15 09:44:10.555001 | t    
 17 | 1 | 2018-08-15 09:44:10.565798 | f    
(10 rows)    
    
postgres=# select * from tbl_result where b=1 order by b_ts limit 10;    
 b |            b_ts            | a  |            a_ts                
---+----------------------------+----+----------------------------    
 1 | 2018-08-15 09:43:22.862526 | 63 | 2018-08-15 09:43:29.901536    
 1 | 2018-08-15 09:43:52.035444 | 69 | 2018-08-15 09:43:54.855426    
 1 | 2018-08-15 09:44:10.565798 | 86 | 2018-08-15 09:44:33.090099    
 1 | 2018-08-15 09:44:33.815634 | 63 | 2018-08-15 09:44:45.737907    
 1 | 2018-08-15 09:44:52.277396 | 45 | 2018-08-15 09:44:59.006899    
 1 | 2018-08-15 09:45:19.288931 | -1 |     
(6 rows)    

性能,写入吞吐达到16.5万行/s。

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 19805266    
latency average = 0.194 ms    
latency stddev = 0.221 ms    
tps = 165043.068862 (including connections establishing)    
tps = 165056.827167 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,100)    
         0.000  \set b random(1,1000000)    
         0.000  \set bo random(0,1)    
         0.191  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    

2 有会话模式的响应速度统计

假设数据以TS字段顺序到达为前提(通常这种场景,按TS到达的可能性较大,或者你可以使用clock_timestamp()来作为这个时间,可能性就更大了。),后面会讲如果不这样有什么问题,以及解决方案。

相比前面的不同之处,a,b一一对应,即有会话模式。

客户1发给客服2    
    
那么就只看客服2第一次响应客户1的时间。    

有会话模式,适合于客户发起消息后,后台分配一个客服给他,第一时间响应他的必须是这个分配的客服。

稍微修改前面的代码即可。

1、客服、客户交谈表(只展示重要字段)

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

2、客服的平均响应时间

一个客户的最早发言时间,下一时刻对应客服最早回复这位客户的回复时间。(中间部分略过)

例如

1, 2, 0001, false   -- 客户2给客服1发信息时间,作为一次虚拟会话的开始时间    
1, 2, 0003, false   -- 客户2给客服1发信息时间。    
1, 2, 0002, true   -- 客服1给客户2发信息时间,作为一次虚拟会话的最早响应时间    
1, 2, 0005, true   -- 客服1给客户2发信息时间    

3、实时计算解决这个问题

结果表结构

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int,           -- 客户给谁发起了这次会话    
  rsp_a int default -1,  -- 响应这次虚拟会话的客服ID, -1表示没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,与同一客服,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =, a with =) where (rsp_a=-1);    

4、实时处理逻辑

when insert into tbl

if    
b -> a 逻辑(客户发给客服)    
    
select 1 from tbl_result where b=? and a=? and rsp_a = -1;    
if not found then     
    insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing;    
    -- update set b_ts=excluded.b_ts     
    -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
-- else    
  -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
end if;    
    
if    
a -> b 逻辑(客服发给客户)    
    
select 1 from tbl_result where b=? and a=? and rsp_a = -1;    
if found then    
update tbl_result set rsp_a=? , a_ts=? where b=? and a=? and rsp_a = -1 and NEW.ts >= b_ts;    
-- else    
  -- 说明已有人回复,不需要更新    
end if;    

5、tbl的insert trigger函数

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1;    
    if found then    
      update tbl_result set rsp_a=NEW.a , a_ts=NEW.ts where b=NEW.b and a=NEW.a and rsp_a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    

创建触发器

create trigger tg0 after insert on tbl for each row execute procedure tb();    

6、写入压测

假设有10个客服    
1万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,10)    
\set b random(1,10000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
postgres=# select count(*) from tbl;    
  count       
----------    
 19771381    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 4967253    
(1 row)    

7、算法校验,正确

postgres=# select * from tbl where b=1 and a=9 order by ts limit 30;    
 a | b |             ts             | direct     
---+---+----------------------------+--------    
 9 | 1 | 2018-08-15 10:08:20.82439  | f    
 9 | 1 | 2018-08-15 10:08:21.341471 | f    
 9 | 1 | 2018-08-15 10:08:23.084166 | f    
 9 | 1 | 2018-08-15 10:08:23.160162 | f    
 9 | 1 | 2018-08-15 10:08:23.596106 | f    
 9 | 1 | 2018-08-15 10:08:23.735911 | f    
 9 | 1 | 2018-08-15 10:08:23.869232 | f    
 9 | 1 | 2018-08-15 10:08:25.379688 | t    
 9 | 1 | 2018-08-15 10:08:26.471402 | t    
 9 | 1 | 2018-08-15 10:08:26.622047 | t    
 9 | 1 | 2018-08-15 10:08:26.640313 | t    
 9 | 1 | 2018-08-15 10:08:27.28104  | f    
 9 | 1 | 2018-08-15 10:08:27.285187 | f    
 9 | 1 | 2018-08-15 10:08:27.992076 | t    
 9 | 1 | 2018-08-15 10:08:28.233072 | t    
 9 | 1 | 2018-08-15 10:08:28.590125 | t    
 9 | 1 | 2018-08-15 10:08:29.6004   | t    
 9 | 1 | 2018-08-15 10:08:30.058747 | f    
 9 | 1 | 2018-08-15 10:08:30.114936 | t    
 9 | 1 | 2018-08-15 10:08:30.237846 | f    
 9 | 1 | 2018-08-15 10:08:30.468956 | t    
 9 | 1 | 2018-08-15 10:08:31.904644 | t    
 9 | 1 | 2018-08-15 10:08:32.092077 | t    
 9 | 1 | 2018-08-15 10:08:32.407465 | t    
 9 | 1 | 2018-08-15 10:08:32.530952 | f    
 9 | 1 | 2018-08-15 10:08:32.991299 | f    
 9 | 1 | 2018-08-15 10:08:33.567598 | f    
 9 | 1 | 2018-08-15 10:08:33.726376 | f    
 9 | 1 | 2018-08-15 10:08:33.734359 | f    
 9 | 1 | 2018-08-15 10:08:34.288767 | f    
(30 rows)    
    
postgres=# select * from tbl_result where b=1 and a=9 order by b_ts limit 10;    
 b |            b_ts            | a | rsp_a |            a_ts                
---+----------------------------+---+-------+----------------------------    
 1 | 2018-08-15 10:08:20.82439  | 9 |     9 | 2018-08-15 10:08:25.379688    
 1 | 2018-08-15 10:08:27.28104  | 9 |     9 | 2018-08-15 10:08:27.992076    
 1 | 2018-08-15 10:08:30.058747 | 9 |     9 | 2018-08-15 10:08:30.114936    
 1 | 2018-08-15 10:08:30.237846 | 9 |     9 | 2018-08-15 10:08:30.468956    
 1 | 2018-08-15 10:08:32.530952 | 9 |     9 | 2018-08-15 10:08:34.749098    
 1 | 2018-08-15 10:08:35.615081 | 9 |     9 | 2018-08-15 10:08:35.681585    
 1 | 2018-08-15 10:08:35.689469 | 9 |     9 | 2018-08-15 10:08:37.099554    
 1 | 2018-08-15 10:08:40.70679  | 9 |     9 | 2018-08-15 10:08:40.80081    
 1 | 2018-08-15 10:08:40.892459 | 9 |     9 | 2018-08-15 10:08:44.732971    
 1 | 2018-08-15 10:08:45.685787 | 9 |     9 | 2018-08-15 10:08:46.301875    
(10 rows)    

性能,写入吞吐达到16.5万行/s。

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 19771381    
latency average = 0.194 ms    
latency stddev = 0.222 ms    
tps = 164760.717898 (including connections establishing)    
tps = 164774.989399 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,10)    
         0.000  \set b random(1,10000)    
         0.000  \set bo random(0,1)    
         0.192  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    

看似问题解决了吗?

3 统计算法问题与解决办法

前面都是假设数据按TS到达的情况(使用clock_timestamp生成ts还是比较靠谱的),如果数据完全不按TS到达,会出现什么问题么?

1、如果不按顺序到达,会话的发起时间、第一响应时间可能无法得到正确结果

因为一旦触发生成tbl_result后,后面进来的数据无法修正前面的错误。

2、允许一定时间的延迟,同时容忍一定的错误率的情况下。比如每小时消费前一小时的数据,中间预留1小时的缓冲时间,降低错误率:

2.1、按时间区间,延迟消费适当解决以上问题。

单线程消费,统计。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 hour'     
  order by ts limit 10000    
))    
returning *    
) select * from tmp     
order by ts;   

然后,按顺序消费。

2.2、按时间区间,延迟并行消费,解决大数据量的问题。例如按客户ID,HASH,并行消费。

多线程(每个HASH一个线程),消费,统计。

create index idx_tbl_mod_32 on tbl (abs(mod(hashint4(b), 32)), ts);    
    
with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 hour'     
  and    
  abs(mod(hashint4(b), 32))=0  -- hash 并行    
  order by ts limit 10000    
))    
returning *    
) select * from tmp     
order by ts;    

然后,按顺序消费。

例子1

以第一种场景(无会话状态)为例。延迟批量消费的方法生成最终数据。

1、会话表

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    
    
create index idx_tbl_ts on tbl(ts);    

2、统计结果表

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int default -1,  -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);    

3、中间会话表(可以不落地,只顺序计算)。

create table tbl_mid (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

4、中间会话表触发器

(before 触发器 return null(不落地,只顺序计算))

(after 触发器 return null(落地))

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if found then    
      update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    
create trigger tg0 after insert on tbl_mid for each row execute procedure tb();    

5、写入大批量数据,由于触发器转移到了中间表,所以写入吞吐达到了接近29万行/s。

假设有100个客服    
100万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,100)    
\set b random(1,1000000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 34403943    
latency average = 0.112 ms    
latency stddev = 0.229 ms    
tps = 286698.048259 (including connections establishing)    
tps = 286718.916176 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,100)    
         0.000  \set b random(1,1000000)    
         0.000  \set bo random(0,1)    
         0.109  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
postgres=# select count(*) from tbl;    
  count       
----------    
 19805266    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 5202622    
(1 row)    

6、单线程消费,一次消费100万行,速度约每秒6万。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 min'  -- 测试时改成了消费1分钟前的数据    
  order by ts limit 1000000    
))    
returning *    
)     
insert into tbl_mid     
select * from tmp     
order by ts;    
    
Time: 16532.939 ms (00:16.533)    

7、算法校验,正确

postgres=# select * from tbl_mid where b=2 order by ts limit 10;    
 a  | b |             ts             | direct     
----+---+----------------------------+--------    
 10 | 2 | 2018-08-15 10:24:58.538558 | t    
 25 | 2 | 2018-08-15 10:25:00.585426 | f    
 62 | 2 | 2018-08-15 10:25:04.2633   | f    
 45 | 2 | 2018-08-15 10:25:04.406764 | t    
(4 rows)    
    
postgres=# select * from tbl_result where b=2 order by b_ts limit 10;    
 b |            b_ts            | a  |            a_ts                
---+----------------------------+----+----------------------------    
 2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764    
(1 row)    

消费性能,单线程吞吐达到6万行/s。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 min'  -- 测试时改成了消费1分钟前的数据    
  order by ts limit 1000000    
))    
returning *    
)     
insert into tbl_mid     
select * from tmp     
order by ts;    
    
Time: 16532.939 ms (00:16.533)    

消费节奏:

1、消费    
2、VACUUM tbl;    
3、消费    
loop;    

例子2

以第一种场景(无会话状态)为例。延迟批量统计的方法生成最终数据。(不消费(delete)已有数据)

1、会话表

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    
    
create index idx_tbl_ts on tbl(ts);    
-- 也可以使用brin索引  
-- create index idx_tbl_ts on tbl using brin(ts);    

2、统计结果表

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int default -1,  -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);    

3、中间会话表(可以不落地,只顺序计算)。

create table tbl_mid (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

4、中间会话表触发器

(before 触发器 return null(不落地,只顺序计算))

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if found then    
      update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    
create trigger tg0 before insert on tbl_mid for each row execute procedure tb();    

5、写入大批量数据,由于触发器转移到了中间表,所以写入吞吐达到了接近29万行/s。

假设有100个客服    
100万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,100)    
\set b random(1,1000000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 34403943    
latency average = 0.112 ms    
latency stddev = 0.229 ms    
tps = 286698.048259 (including connections establishing)    
tps = 286718.916176 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,100)    
         0.000  \set b random(1,1000000)    
         0.000  \set bo random(0,1)    
         0.109  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
postgres=# select count(*) from tbl;    
  count       
----------    
 19805266    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 5202622    
(1 row)    

6、单线程读取,统计,例如每次读取一个小时的数据(定义清楚边界,连续消费,同时避免并发、或重复消费,或者在写统计结果时做到幂等,不用担心重复消费)。

创建一张消费记录表,统计已消费的时间间隔。

create table tbl_record (ts1 timestamp, ts2 timestamp);  

下次消费时,参考上次已消费的时间。

with tmp as (  
  insert into tbl_record (ts1, ts2) values ('2018-01-01 12:00:00', '2018-01-01 13:00:00')  -- 记录当前消费窗口  
)  
insert into tbl_mid     
select * from tbl  
where ts >= '2018-01-01 12:00:00' and ts < '2018-01-01 13:00:00'  -- 上一个小时为窗口 (当前时间 大于等于 '2018-01-01 14:00:00')   
order by ts;  -- 无会话模式    
    
Time: 16532.939 ms (00:16.533)    

7、算法校验,正确

postgres=# select * from tbl_mid where b=2 order by ts limit 10;    
 a  | b |             ts             | direct     
----+---+----------------------------+--------    
 10 | 2 | 2018-08-15 10:24:58.538558 | t    
 25 | 2 | 2018-08-15 10:25:00.585426 | f    
 62 | 2 | 2018-08-15 10:25:04.2633   | f    
 45 | 2 | 2018-08-15 10:25:04.406764 | t    
(4 rows)    
    
postgres=# select * from tbl_result where b=2 order by b_ts limit 10;    
 b |            b_ts            | a  |            a_ts                
---+----------------------------+----+----------------------------    
 2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764    
(1 row)    

消费性能,单线程吞吐达到6万行/s。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 min'  -- 测试时改成了消费1分钟前的数据    
  order by ts limit 1000000    
))    
returning *    
)     
insert into tbl_mid     
select * from tmp     
order by b, ts;  -- 无会话模式    
    
Time: 16532.939 ms (00:16.533)    

消费节奏:

1、消费    
2、VACUUM tbl;    
3、消费    
loop;    

例子3,使用窗口查询解决同一问题

1、新增索引,用于窗口查询加速

create index idx_tbl_1 on tbl (b,ts);  

2、无会话模式,使用窗口查询,得到每个虚拟会话的开始时间、第一响应时间

select  
  a,  -- 虚拟会话的第一条消息,客户发给了哪位客服ID
  b,  -- 客户ID
  ts,  -- 虚拟会话开始时间  
  lead_a,  -- 最先响应的是谁(哪位客服)
  lead_session_end_ts,  -- 虚拟会话第一次响应时间  
  lead_session_end_ts - ts as dur,  -- 响应间隔  
  direct,lag_direct,lag_ts  
from  
(  
select *,   
  lead(session_end_ts) over w2 as lead_session_end_ts,  -- 当前窗口,当前行的下一条ts值 , 即会话第一次响应时间   
  lead(a) over w2 as lead_a -- 当前窗口,当前行的下一条的b(客服ID) , 即响应的是哪位客服
from  
(  
select * from   
(  
select a,b,ts,direct,lag_direct,lag_ts,  
case when ((direct = false and lag_direct is null)  -- 判断虚拟会话开始时间的逻辑  
or  
(direct = false and lag_direct = true))  
then ts  
end as session_begin_ts,  -- 虚拟会话开始时间  
case when (direct = true and lag_direct = false)  -- 判断虚拟会话第一次响应时间的逻辑  
then ts  
end as session_end_ts  -- 虚拟会话第一次响应时间  
from  
(  
select   
  a,  -- 客服ID  
  b,  -- 客户ID  
  ts, -- 消息时间  
  direct,  -- 消息方向 true: a->b, false: b->a   
  lag(direct) over w1 as lag_direct,  -- 当前窗口,当前行的上一条direct值  
  lag(ts) over w1 as lag_ts           -- 当前窗口,当前行的上一条ts值  
from tbl   
  window w1 as (partition by b order by ts)   
  -- where ts between xx and xx  , 一次只查部分数据时可用  
) t  
) t  
where session_begin_ts is not null  -- 虚拟会话开始时间字段不为空,表示这条记录是会话开始的记录  
or  
session_end_ts is not null   -- 虚拟会话结束时间字段不为空,表示这条记录是会话第一次响应的记录  
) t   
window w2 as (partition by b order by ts)    
) t  
where   
direct = false -- 客户在虚拟会话中发起第一条消息的记录   
and  
lead_session_end_ts - ts is not null  
limit 100;  

3、结果、算法正确性验证

  a  | b  |             ts             | lead_a |    lead_session_end_ts     |       dur       | direct | lag_direct |           lag_ts           
-----+----+----------------------------+--------+----------------------------+-----------------+--------+------------+----------------------------
  26 |  1 | 2018-08-15 10:25:13.056316 |     75 | 2018-08-15 10:25:16.546126 | 00:00:03.48981  | f      |            | 
  43 |  1 | 2018-08-15 10:25:21.483542 |     99 | 2018-08-15 10:25:25.552488 | 00:00:04.068946 | f      | t          | 2018-08-15 10:25:16.546126
  28 |  1 | 2018-08-15 10:25:28.287823 |     70 | 2018-08-15 10:25:37.375585 | 00:00:09.087762 | f      | t          | 2018-08-15 10:25:26.518359
  12 |  1 | 2018-08-15 10:25:47.203597 |     20 | 2018-08-15 10:26:03.423969 | 00:00:16.220372 | f      | t          | 2018-08-15 10:25:47.036459
  91 |  1 | 2018-08-15 10:26:05.332921 |     57 | 2018-08-15 10:26:08.070122 | 00:00:02.737201 | f      | t          | 2018-08-15 10:26:03.423969
  24 |  1 | 2018-08-15 10:26:16.798485 |     85 | 2018-08-15 10:26:22.222025 | 00:00:05.42354  | f      | t          | 2018-08-15 10:26:15.319287
  90 |  1 | 2018-08-15 10:26:22.58553  |     28 | 2018-08-15 10:26:25.987987 | 00:00:03.402457 | f      | t          | 2018-08-15 10:26:22.222025
  30 |  1 | 2018-08-15 10:26:31.458875 |     42 | 2018-08-15 10:26:36.259917 | 00:00:04.801042 | f      | t          | 2018-08-15 10:26:25.987987
  11 |  1 | 2018-08-15 10:26:37.828413 |     70 | 2018-08-15 10:26:49.212275 | 00:00:11.383862 | f      | t          | 2018-08-15 10:26:36.259917
  21 |  2 | 2018-08-15 10:25:15.532378 |     66 | 2018-08-15 10:25:19.742437 | 00:00:04.210059 | f      |            | 
  50 |  2 | 2018-08-15 10:25:30.988507 |     20 | 2018-08-15 10:25:36.645969 | 00:00:05.657462 | f      | t          | 2018-08-15 10:25:30.750224
  98 |  2 | 2018-08-15 10:25:47.075616 |     72 | 2018-08-15 10:25:52.34913  | 00:00:05.273514 | f      | t          | 2018-08-15 10:25:40.858465
  72 |  2 | 2018-08-15 10:25:56.595608 |     99 | 2018-08-15 10:26:11.46232  | 00:00:14.866712 | f      | t          | 2018-08-15 10:25:55.324131
  98 |  2 | 2018-08-15 10:26:12.303834 |     97 | 2018-08-15 10:26:15.341379 | 00:00:03.037545 | f      | t          | 2018-08-15 10:26:11.46232
  63 |  2 | 2018-08-15 10:26:19.116171 |     22 | 2018-08-15 10:26:23.743978 | 00:00:04.627807 | f      | t          | 2018-08-15 10:26:15.341379
  66 |  2 | 2018-08-15 10:26:30.024534 |     49 | 2018-08-15 10:26:41.196351 | 00:00:11.171817 | f      | t          | 2018-08-15 10:26:23.743978
  83 |  2 | 2018-08-15 10:26:41.962942 |     51 | 2018-08-15 10:26:43.172856 | 00:00:01.209914 | f      | t          | 2018-08-15 10:26:41.196351
  64 |  2 | 2018-08-15 10:26:43.575144 |     88 | 2018-08-15 10:26:44.17728  | 00:00:00.602136 | f      | t          | 2018-08-15 10:26:43.172856

4、对比使用中间表得到的结果

insert into tbl_mid select * from tbl order by ts ;  
select * from tbl_result where b=1 or b=2 order by b_ts;  
  

 b |            b_ts            | a  |            a_ts            
---+----------------------------+----+----------------------------
 1 | 2018-08-15 10:25:13.056316 | 75 | 2018-08-15 10:25:16.546126
 1 | 2018-08-15 10:25:21.483542 | 99 | 2018-08-15 10:25:25.552488
 1 | 2018-08-15 10:25:28.287823 | 70 | 2018-08-15 10:25:37.375585
 1 | 2018-08-15 10:25:47.203597 | 20 | 2018-08-15 10:26:03.423969
 1 | 2018-08-15 10:26:05.332921 | 57 | 2018-08-15 10:26:08.070122
 1 | 2018-08-15 10:26:16.798485 | 85 | 2018-08-15 10:26:22.222025
 1 | 2018-08-15 10:26:22.58553  | 28 | 2018-08-15 10:26:25.987987
 1 | 2018-08-15 10:26:31.458875 | 42 | 2018-08-15 10:26:36.259917
 1 | 2018-08-15 10:26:37.828413 | 70 | 2018-08-15 10:26:49.212275
 1 | 2018-08-15 10:26:50.622352 | -1 | 
 2 | 2018-08-15 10:25:15.532378 | 66 | 2018-08-15 10:25:19.742437
 2 | 2018-08-15 10:25:30.988507 | 20 | 2018-08-15 10:25:36.645969
 2 | 2018-08-15 10:25:47.075616 | 72 | 2018-08-15 10:25:52.34913
 2 | 2018-08-15 10:25:56.595608 | 99 | 2018-08-15 10:26:11.46232
 2 | 2018-08-15 10:26:12.303834 | 97 | 2018-08-15 10:26:15.341379
 2 | 2018-08-15 10:26:19.116171 | 22 | 2018-08-15 10:26:23.743978
 2 | 2018-08-15 10:26:30.024534 | 49 | 2018-08-15 10:26:41.196351
 2 | 2018-08-15 10:26:41.962942 | 51 | 2018-08-15 10:26:43.172856
 2 | 2018-08-15 10:26:43.575144 | 88 | 2018-08-15 10:26:44.17728
 2 | 2018-08-15 10:26:45.595639 | -1 | 
(20 rows)

5、会话模式,SQL改动两处即可。

create index idx_tbl_2 on tbl (b,a,ts);  -- 窗口加速
select  
  a,  -- 虚拟会话的第一条消息,客户发给了哪位客服ID
  b,  -- 客户ID
  ts,  -- 虚拟会话开始时间  
  lead_a,  -- 最先响应的是谁(哪位客服)
  lead_session_end_ts,  -- 虚拟会话第一次响应时间  
  lead_session_end_ts - ts as dur,  -- 响应间隔  
  direct,lag_direct,lag_ts  
from  
(  
select *,   
  lead(session_end_ts) over w2 as lead_session_end_ts,  -- 当前窗口,当前行的下一条ts值 , 即会话第一次响应时间   
  lead(a) over w2 as lead_a -- 当前窗口,当前行的下一条的b(客服ID) , 即响应的是哪位客服
from  
(  
select * from   
(  
select a,b,ts,direct,lag_direct,lag_ts,  
case when ((direct = false and lag_direct is null)  -- 判断虚拟会话开始时间的逻辑  
or  
(direct = false and lag_direct = true))  
then ts  
end as session_begin_ts,  -- 虚拟会话开始时间  
case when (direct = true and lag_direct = false)  -- 判断虚拟会话第一次响应时间的逻辑  
then ts  
end as session_end_ts  -- 虚拟会话第一次响应时间  
from  
(  
select   
  a,  -- 客服ID  
  b,  -- 客户ID  
  ts, -- 消息时间  
  direct,  -- 消息方向 true: a->b, false: b->a   
  lag(direct) over w1 as lag_direct,  -- 当前窗口,当前行的上一条direct值  
  lag(ts) over w1 as lag_ts           -- 当前窗口,当前行的上一条ts值  
from tbl   
  window w1 as (partition by b,a order by ts)   -- 有会话模式,改这个partition
  -- where ts between xx and xx  , 一次只查部分数据时可用  
) t  
) t  
where session_begin_ts is not null  -- 虚拟会话开始时间字段不为空,表示这条记录是会话开始的记录  
or  
session_end_ts is not null   -- 虚拟会话结束时间字段不为空,表示这条记录是会话第一次响应的记录  
) t   
window w2 as (partition by b,a order by ts)    -- 有会话模式,改这个partition
) t  
where   
direct = false -- 客户在虚拟会话中发起第一条消息的记录   
and  
lead_session_end_ts - ts is not null  
limit 100;  

性能,3000万记录,1毫秒响应。

小结

本文涉及的场景为无会话、或者会话无明显标识的情况下,使用PostgreSQL高效率的统计客服的响应速度的问题。

使用到的方法与性能指标

1、实时计算,触发器(当到达时间有序, 或者说大部分有序时。使用clock_timestamp可以让数据基本有序)

写入吞吐16.5万行每秒。

2、阅后即焚(延迟消费,解决数据写入无需的问题)。

写入吞吐29万行每秒。

单线程消费6万行每秒。

3、阅后即焚,使用HASH,并行消费,提升消费吞吐。

4、使用窗口查询,同样能够很好的解决此场景的需求,而且性能杠杠的。

参考

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

目录
相关文章
|
7月前
|
SQL 存储 关系型数据库
PostgreSQL窗口函数避坑指南:如何让复杂分析查询提速300%?
本文基于真实企业级案例,深入剖析PostgreSQL窗口函数的执行原理与性能陷阱,提供8大优化策略。通过定制索引、分区裁剪、内存调优及并行处理等手段,将分钟级查询压缩至秒级响应。结合CTE分阶段计算与物化视图技术,解决海量数据分析中的瓶颈问题。某金融客户实践表明,风险分析查询从47秒降至0.8秒,效率提升5800%。文章附带代码均在PostgreSQL 15中验证,助您高效优化SQL性能。
387 0
|
8月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1487 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
6月前
|
移动开发 缓存 前端开发
可二次开发的在线客服系统-前后端混合渲染模式
服务端渲染(SSR)结合API交互,提升首屏加载速度与SEO友好性,适用于混合渲染模式的Web应用。
122 0
|
10月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
2278 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
374 0
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
310 0
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3931 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
11月前
|
SQL 关系型数据库 OLAP
云原生数据仓库AnalyticDB PostgreSQL同一个SQL可以实现向量索引、全文索引GIN、普通索引BTREE混合查询,简化业务实现逻辑、提升查询性能
本文档介绍了如何在AnalyticDB for PostgreSQL中创建表、向量索引及混合检索的实现步骤。主要内容包括:创建`articles`表并设置向量存储格式,创建ANN向量索引,为表增加`username`和`time`列,建立BTREE索引和GIN全文检索索引,并展示了查询结果。参考文档提供了详细的SQL语句和配置说明。
395 2
|
SQL 关系型数据库 数据库
PostgreSQL性能飙升的秘密:这几个调优技巧让你的数据库查询速度翻倍!
【10月更文挑战第25天】本文介绍了几种有效提升 PostgreSQL 数据库查询效率的方法,包括索引优化、查询优化、配置优化和硬件优化。通过合理设计索引、编写高效 SQL 查询、调整配置参数和选择合适硬件,可以显著提高数据库性能。
2426 2
|
调度 流计算
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
105 3

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多