人、机客户服务质量 - 实时透视分析

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

标签

PostgreSQL , 多输入流 , 人工客服 , 机器客服 , 服务质量 , 多维透视 , 实时透视 , 窗口查询 , 流式计算 , 流式SQL , 阅后即焚 , 上下文相关 , 流式窗口 , 到达时间错乱 , 窗口保持 , 可重算 , 幂等


背景

通常一个服务型的产品,面向很多用户时,都会提供多种服务渠道:

电话、WEB、人工客服。机器人客服。

如何从各个维度(问题分类、地区分类。。。。)了解问题的 : 解决率、解决时长柱状图、一次解决率(例如电话机器人、转人工、转机器人等,如果多次流转说明一次解决率太低)。

pic

这个透视分析,可以作为检验客户服务质量,提升用户体验的一个重要参考。

以电商为例,一次问题,可能涉及:

1、多股数据流表:

电话、WEB、人工客服。机器人客服。

2、数据流的内容涉及:

卖家ID

买家ID

商品ID

小二ID

时间。

3、多个元数据表可能包括:

买家、买家、商品、问题、小二 属性、标签。

4、透视维度包括:

时间。

地区。

商品类别。

问题类别。

等。

5、透视指标可能包括:

解决率、解决时长柱状图、一次解决率(例如电话机器人、转人工、转机器人等,如果多次流转说明一次解决率太低)。

pic

如何做到实时透视?可以使用PostgreSQL的流式处理(实时、或异步阅后即焚)功能,以及insert on conflict幂等操作的功能。

多股数据流的到达时间问题

通常在一个大型的企业中,业务会拆得很细,甚至对于客服这样的系统,人工处理和电话处理都属于两个业务线。

而数据流转往往通过MQ平台,也就是说不同业务线的数据流可能存在到达的时间差,或者消费的时间差。

为了透视客户服务质量相关数据,需要用到多个数据流,势必面临到达时间差的问题。

例如,对于某个CASE:

人工服务是9点发生的。

机器服务是9点01分发生的。

但是他们属于多个数据流,最后的到达时间反过来了。那么在统计一解率时,可能出现误差。

为了解决这个误差,需要做窗口保持的重复计算操作,做到实时统计结果可调整,可增量重算。(这个可以使用insert on conflict do update xx=exclude.xx where xx<>excluded.xx来实现)

demo

以一解率维度为例,其他维度可以参考模仿。

数据流如下:

会话流(多表) -> 流合并表 -> 转换表 -> (转换表+元数据表) 透视结果表  

1、会话数据流

流1,机器人

create table tbl_robot (  
  caseid int8,             -- 会话ID  
  crt_time timestamp,      -- 消息时间  
  message text,            -- 交互信息  
  custom_id int8,          -- 消费者ID  
  pro_id int,              -- 问题ID  
  others text              -- 其他字段  
);  

流2,人工

create table tbl_human (  
  caseid int8,             -- 会话ID  
  crt_time timestamp,      -- 消息时间  
  message text,            -- 交互信息  
  custom_id int8,          -- 消费者ID  
  xiao2_id  int8,          -- 小二ID  
  pro_id int,              -- 问题ID  
  others text              -- 其他字段  
);  

2、合并数据流表(这里可以使用分区表,便于维护)

create table tbl_session (  
  caseid int8,             -- 会话ID  
  crt_time timestamp,      -- 消息时间  
  message text,            -- 交互信息  
  custom_id int8,          -- 消费者ID  
  xiao2_id  int8,          -- 小二ID  
  pro_id int,              -- 问题ID  
  stream_id int,           -- 流ID, 1 表示robot, 2 表示human   
  others1 text,            -- 流1,其他字段  
  others2 text             -- 流2,其他字段  
);  
  
create index idx_tbl_session_1 on tbl_session (crt_time);  
create index idx_tbl_session_2 on tbl_session (caseid, crt_time);  

3、创建源头规则,自动将会话数据流,合并到合并数据流表单表。

通过规则,自动将数据合并到合并表。

create or replace rule r1 as on insert to tbl_robot do instead   
  insert into tbl_session   
    (caseid, crt_time, message, custom_id, pro_id, others1, stream_id)  
    values (NEW.caseid, NEW.crt_time, NEW.message, NEW.custom_id, NEW.pro_id, NEW.others, 1);  
  
create or replace rule r1 as on insert to tbl_human do instead   
  insert into tbl_session   
    (caseid, crt_time, message, custom_id, pro_id, others2, xiao2_id, stream_id)  
    values (NEW.caseid, NEW.crt_time, NEW.message, NEW.custom_id, NEW.pro_id, NEW.others, NEW.xiao2_id, 2);  

4、元数据表

略。

5、会话状态转换表(批量可重算转换表)

打个会话会涉及多条记录,这个DEMO的目的是找到人工回复后,用户从机器人找答案,从而识别人工回复的效率。

也可以反之,求机器人回复的效率。

create table tbl_session_etl (  
  caseid int8 primary key,   -- 会话ID  
  s_crt_time timestamp,      -- 会话最开始的时间  
  e_crt_time timestamp,      -- 会话最后一条记录的时间  
  robot_to_human boolean,    -- 是否包含 从机器人切到人工  
  human_to_robot boolean     -- 是否包含 从人工切到机器人  
);  

会话转换SQL,调度,可以重复执行。

窗口大小可调整,容忍不同数据流的到达时间差异。

select caseid, max(s_crt_time) s_crt_time, max(e_crt_time) e_crt_time,   
       bool_or(lag=1 and stream_id=2) as robot_to_human,   
       bool_or(lag=2 and stream_id=1) as human_to_robot  
from  
(  
select caseid, min(crt_time) over w1 as s_crt_time, max(crt_time) over w1 as e_crt_time,   
       (case when (row_number() over w1) = 1 then stream_id else lag(stream_id) over w1 end) as lag,  
       stream_id   
from tbl_session   
where crt_time > now() - interval '10 min'           -- 10分钟内的会话数据, 可以自由调整这个窗口  
window w1 as (partition by caseid order by crt_time)   
) t  
group by caseid;  

合并写入,使用如下SQL,可以重复执行。

窗口大小可调整,容忍不同数据流的到达时间差异。

insert into tbl_session_etl (caseid, s_crt_time, e_crt_time, robot_to_human, human_to_robot)  
select caseid, max(s_crt_time) s_crt_time, max(e_crt_time) e_crt_time,   
       bool_or(lag=1 and stream_id=2) as robot_to_human,     
       bool_or(lag=2 and stream_id=1) as human_to_robot      
from  
(  
select caseid, min(crt_time) over w1 as s_crt_time, max(crt_time) over w1 as e_crt_time,   
       (case when (row_number() over w1) = 1 then stream_id else lag(stream_id) over w1 end) as lag,  
       stream_id   
from tbl_session    
where crt_time > now() - interval '10 min'             -- 10分钟内的会话数据, 可以自由调整这个窗口  
window w1 as (partition by caseid order by crt_time)   -- 开窗查询  
) t  
group by caseid  
on conflict (caseid)  
do update set   
  s_crt_time = excluded.s_crt_time,   
  e_crt_time = excluded.e_crt_time,   
  robot_to_human = excluded.robot_to_human,   
  human_to_robot = excluded.human_to_robot  
where       -- 当数据转换后的值,发送变化时,合并写入。  
  tbl_session_etl.s_crt_time<>excluded.s_crt_time  
or   
  tbl_session_etl.e_crt_time<>excluded.e_crt_time  
or  
  tbl_session_etl.robot_to_human<>excluded.robot_to_human  
or  
  tbl_session_etl.human_to_robot<>excluded.human_to_robot  
;  

创建函数便于调用

create or replace function f_tbl_session_etl(interval) returns void as $$  
insert into tbl_session_etl (caseid, s_crt_time, e_crt_time, robot_to_human, human_to_robot)  
select caseid, max(s_crt_time) s_crt_time, max(e_crt_time) e_crt_time,   
       bool_or(lag=1 and stream_id=2) as robot_to_human,     
       bool_or(lag=2 and stream_id=1) as human_to_robot      
from  
(  
select caseid, min(crt_time) over w1 as s_crt_time, max(crt_time) over w1 as e_crt_time,   
       (case when (row_number() over w1) = 1 then stream_id else lag(stream_id) over w1 end) as lag,  
       stream_id   
from tbl_session    
where crt_time > now() - $1             -- n分钟内的会话数据, 可以自由调整这个窗口  
window w1 as (partition by caseid order by crt_time)   -- 开窗查询  
) t  
group by caseid  
on conflict (caseid)  
do update set   
  s_crt_time = excluded.s_crt_time,   
  e_crt_time = excluded.e_crt_time,   
  robot_to_human = excluded.robot_to_human,   
  human_to_robot = excluded.human_to_robot  
where       -- 当数据转换后的值,发送变化时,合并写入。  
  tbl_session_etl.s_crt_time<>excluded.s_crt_time  
or   
  tbl_session_etl.e_crt_time<>excluded.e_crt_time  
or  
  tbl_session_etl.robot_to_human<>excluded.robot_to_human  
or  
  tbl_session_etl.human_to_robot<>excluded.human_to_robot  
;  
$$ language sql strict;  

调度方法如下,完成自动修正:

每10秒,统计10分钟内的数据。(不同流的到达时间差异,容忍度为10分钟。)

每小时,统计全天内的数据。(不同流的到达时间差异,容忍度为全天。)

6、会话统计表,统计一解率 (可选,如果不统计的话,就直接查询)

天维度表

create table tbl_session_stat_day (  
  stat_dim text primary key,  
  robot_to_human_cnt int8,  
  human_to_robot_cnt int8  
);  

分钟维度表

create table tbl_session_stat_min (  
  stat_dim text primary key,  
  robot_to_human_cnt int8,  
  human_to_robot_cnt int8  
);  

统计调度SQL,可以重复执行。

select to_char(s_crt_time, 'yyyymmdd') as stat_dim,   
       sum(case when robot_to_human then 1 else 0 end) robot_to_human_cnt,   
       sum(case when human_to_robot then 1 else 0 end) human_to_robot_cnt  
from tbl_session_etl  
group by 1;  

分钟维度

select to_char(s_crt_time, 'yyyymmddhh24mi') as stat_dim,   
       sum(case when robot_to_human then 1 else 0 end) robot_to_human_cnt,   
       sum(case when human_to_robot then 1 else 0 end) human_to_robot_cnt  
from tbl_session_etl  
group by 1;  

写入并合并,可以重复执行。

insert into tbl_session_stat_day   
select to_char(s_crt_time, 'yyyymmdd') as stat_dim,   
       sum(case when robot_to_human then 1 else 0 end) robot_to_human_cnt,   
       sum(case when human_to_robot then 1 else 0 end) human_to_robot_cnt  
from tbl_session_etl  
group by 1  
on conflict (stat_dim) do update  
set  
  robot_to_human_cnt = excluded.robot_to_human_cnt,  
  human_to_robot_cnt = excluded.human_to_robot_cnt  
where  
  tbl_session_stat_day.robot_to_human_cnt <> excluded.robot_to_human_cnt  
or  
  tbl_session_stat_day.human_to_robot_cnt <> excluded.human_to_robot_cnt  
;  
  
  
  
  
insert into tbl_session_stat_min    
select to_char(s_crt_time, 'yyyymmddhh24mi') as stat_dim,   
       sum(case when robot_to_human then 1 else 0 end) robot_to_human_cnt,   
       sum(case when human_to_robot then 1 else 0 end) human_to_robot_cnt  
from tbl_session_etl  
group by 1  
on conflict (stat_dim) do update  
set  
  robot_to_human_cnt = excluded.robot_to_human_cnt,  
  human_to_robot_cnt = excluded.human_to_robot_cnt  
where  
  tbl_session_stat_min.robot_to_human_cnt <> excluded.robot_to_human_cnt  
or  
  tbl_session_stat_min.human_to_robot_cnt <> excluded.human_to_robot_cnt  
;  

创建函数便于调用

create or replace function f_tbl_session_stat_day() returns void as $$  
insert into tbl_session_stat_day   
select to_char(s_crt_time, 'yyyymmdd') as stat_dim,   
       sum(case when robot_to_human then 1 else 0 end) robot_to_human_cnt,   
       sum(case when human_to_robot then 1 else 0 end) human_to_robot_cnt  
from tbl_session_etl  
group by 1  
on conflict (stat_dim) do update  
set  
  robot_to_human_cnt = excluded.robot_to_human_cnt,  
  human_to_robot_cnt = excluded.human_to_robot_cnt  
where  
  tbl_session_stat_day.robot_to_human_cnt <> excluded.robot_to_human_cnt  
or  
  tbl_session_stat_day.human_to_robot_cnt <> excluded.human_to_robot_cnt  
;  
$$ language sql strict;  
  
  
  
create or replace function f_tbl_session_stat_min() returns void as $$  
insert into tbl_session_stat_min    
select to_char(s_crt_time, 'yyyymmddhh24mi') as stat_dim,   
       sum(case when robot_to_human then 1 else 0 end) robot_to_human_cnt,   
       sum(case when human_to_robot then 1 else 0 end) human_to_robot_cnt  
from tbl_session_etl  
group by 1  
on conflict (stat_dim) do update  
set  
  robot_to_human_cnt = excluded.robot_to_human_cnt,  
  human_to_robot_cnt = excluded.human_to_robot_cnt  
where  
  tbl_session_stat_min.robot_to_human_cnt <> excluded.robot_to_human_cnt  
or  
  tbl_session_stat_min.human_to_robot_cnt <> excluded.human_to_robot_cnt  
;  
$$ language sql strict;  

性能压测

1、高并发写入会话信息

vi test.sql  
  
\set caseid1 random(1,1000000)  
\set caseid2 random(1,1000000)  
\set custom_id1 random(1,100000)  
\set pro_id1 random(1,1000)  
\set custom_id2 random(1,100000)  
\set pro_id2 random(1,1000)  
\set xiao2_id random(1,100)  
insert into tbl_robot values (:caseid1, now(), 'test', :custom_id1, :pro_id1, 'test');  
insert into tbl_human values (:caseid2, now(), 'test', :custom_id2, :xiao2_id, :pro_id2, 'test');  
\sleep 500 us  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  

单条写入,约17.6万行/s.

如果批量写入,可以做到100万+ 行/s

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 32  
number of threads: 32  
duration: 120 s  
number of transactions actually processed: 10655120  
latency average = 0.360 ms  
latency stddev = 0.466 ms  
tps = 88792.101825 (including connections establishing)  
tps = 88804.892722 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set caseid1 random(1,1000000)  
         0.001  \set caseid2 random(1,1000000)  
         0.000  \set custom_id1 random(1,100000)  
         0.000  \set pro_id1 random(1,1000)  
         0.000  \set custom_id2 random(1,100000)  
         0.000  \set pro_id2 random(1,1000)  
         0.000  \set xiao2_id random(1,100)  
         0.178  insert into tbl_robot values (:caseid1, now(), 'test', :custom_id1, :pro_id1, 'test');  
         0.178  insert into tbl_human values (:caseid2, now(), 'test', :custom_id2, :xiao2_id, :pro_id2, 'test');  

2、实时转换调度

同时开启写入,(写入速度14.2万行/s。)

psql  
  
select f_tbl_session_etl(interval '5 sec');  
  
\watch 1  
  
Sat 09 Dec 2017 07:05:42 PM CST (every 1s)  
  
 f_tbl_session_etl   
-------------------  
   
(1 row)  
  
Time: 4515.817 ms (00:04.516)  

处理最近71万行, 耗时4.5秒。处理速度约15.7万行/s。

3、实时统计调度

postgres=# select f_tbl_session_stat_day();  
 f_tbl_session_stat_day   
------------------------  
   
(1 row)  
  
Time: 926.839 ms  
postgres=# select f_tbl_session_stat_min();  
 f_tbl_session_stat_min   
------------------------  
   
(1 row)  
  
Time: 1162.713 ms (00:01.163)  

4、数据量

1.79亿。

postgres=# select count(*) from tbl_session;  
   count     
-----------  
 179639156  
(1 row)  
  
Time: 1635.908 ms (00:01.636)  
  
postgres=# select count(*) from tbl_session_etl;  
  count    
---------  
 1000000  
(1 row)  
  
Time: 47.540 ms  

5、性能指标:

并发度 写入吞吐 写入延迟
32 17.6万行/s 0.178毫秒

1.79亿数据打散到全天写入的话,响应速度会更快。

并发度 转换吞吐 转换延迟
1 15.7万行/s 1秒
并发度 统计吞吐 统计延迟
1 1000000行 1秒

统计信息查询性能,毫秒级延迟

postgres=# select * from tbl_session_stat_day ;  
 stat_dim | robot_to_human_cnt | human_to_robot_cnt   
----------+--------------------+--------------------  
 20171209 |              80160 |              80453  
(1 row)  
  
Time: 6.476 ms  
  
postgres=# select * from tbl_session_stat_min;  
   stat_dim   | robot_to_human_cnt | human_to_robot_cnt   
--------------+--------------------+--------------------  
 201712091758 |              56558 |              56531  
 201712091800 |                  4 |                  4  
 201712091759 |                509 |                501  
 201712091757 |             236638 |             236657  
 201712091802 |               7273 |               7177  
 201712091817 |               8336 |               8358  
 201712091812 |                  0 |                  0  
 201712091814 |                 12 |                  8  
 201712091815 |                127 |                144  
 201712091813 |                  1 |                  1  
 201712091816 |               1688 |               1761  
 201712091905 |              56645 |              57046  
 201712091904 |                411 |                391  
 201712091906 |              23104 |              23015  
 201712091902 |                  0 |                  1  
(15 rows)  
  
Time: 6.695 ms  

参考

《PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《经营、销售分析系统DB设计之PostgreSQL, Greenplum - 共享充电宝 案例实践》

相关文章
|
16天前
|
机器学习/深度学习 SQL 数据可视化
数据分享|数据探索电商平台用户行为流失可视化分析
数据分享|数据探索电商平台用户行为流失可视化分析
|
5月前
|
数据采集 监控 数据挖掘
如何更有价值采集电商数据,高效分析数据?
大数据,就是在一定时间范围内用常规工具软件对历史数据捕捉、处理,加以分析,进而改善决策和管理。在大数据时代,企业必须用大数据分析方法来做电商。
|
4天前
|
数据可视化 数据挖掘
R语言多维度视角下白领人群健康体质检测数据关系可视化分析2
R语言多维度视角下白领人群健康体质检测数据关系可视化分析
|
4天前
|
数据可视化 数据挖掘
R语言多维度视角下白领人群健康体质检测数据关系可视化分析1
R语言多维度视角下白领人群健康体质检测数据关系可视化分析
|
5月前
|
监控 安全 算法
uwb人员定位系统:人员轨迹实时定位
vuwb人员定位系统:人员轨迹实时定位
99 0
uwb人员定位系统:人员轨迹实时定位
|
5月前
|
监控 安全 5G
UWB人员精准定位系统源码,实现实时定位、人机料配对、物料标签配置、智慧调度、轨迹追踪
人员定位管理系统通过在厂区、车间部署UWB定位基站,实时采集人员、机具、物料上定位标签回传的位置信息数据,采用多维定位模式,精确定位人、机具、物料的实时位置,实现实时定位、人机料配对、物料标签配置、智慧调度、轨迹追踪、工时统计、区域物料统计、电子围栏等应用功能。
|
5月前
|
存储 数据采集 监控
智慧工地整体方案,实现现场各类工况数据采集、存储、分析与应用
“智慧工地整体方案”以智慧工地物联网云平台为核心,基于智慧工地物联网云平台与现场多个子系统的互联,实现现场各类工况数据采集、存储、分析与应用。通过接入智慧工地物联网云平台的多个子系统板块,根据现场管理实际需求灵活组合,实现一体化、模块化、智能化、网络化的施工现场过程全面感知、协同工作、智能分析、风险预控、知识共享、互联互通等业务,全面满足建筑施工企业精细化管理的业务需求,智能化地辅助建筑施工企业进行科学决策,促进施工企业监管水平的全面提高。
|
8月前
|
存储 安全
在线监测污水排放的方案
在线监测污水排放的方案
58 0
|
SQL 安全 数据挖掘
【业务数据分析】—— 用户留存分析(以挖掘Aha时刻为例)
【业务数据分析】—— 用户留存分析(以挖掘Aha时刻为例)
513 0
|
监控 搜索推荐 数据挖掘
如何使用海关数据准确开发到客户
海关数据其实一直是外贸应用中最广泛的开发渠道,而且特别利好初学者,是企业开发新客户、监控同行、维护老客户以及决策参考的商战利器。当然,想通过海关数据精确找到客户,需要熟悉以下操作技巧。
158 0