PostgreSQL 海量时序数据(任意滑动窗口实时统计分析) - 传感器、人群、物体等对象跟踪

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介:

标签

PostgreSQL , 物联网 , feed , 网游 , 热力 , 商场驻留 , 人群分析 , 实时热力图 , 实时在线图 , 实时分段最大最小区间图 , 任意滑动窗口实时最高、最低在线数


背景

在现实生活中,经常有聚集分析的需求。

例如:

某个商场,每个时间点,商场的每个商铺位置的人群驻留数量。(有技术手段可以感知人的驻留位置,当走进某个区域时,将写入一条记录,表示你进入了这个区域,离开时记录一条离开的记录,如果长时间不动,则定时写心跳记录)。

某个网游,每个时间点,在线人数。(上线时写一条上线记录,下线时写一条下线记录。)

某个共享单车公司,每个时间点,在线和不在线的车辆数量。(借车时写一条上线记录,还车时写一条下线记录。同时每隔一段时间询问车辆状态。)

某个物联网企业,每个分钟单位内,最小、最大在线传感器的数量。(传感器上线时写一条上线记录,下线时写一条下线记录,同时每隔一段时间询问传感器状态。)

这种属于非常典型的FEED应用,要求输出每个时间点这个世界(系统)的在线数。(如果按时间段输出,则输出每个时间段内的最大,最小在线数,实际上就是取range的边界)。

设计

场景:

某个物联网企业,有一些传感器,传感器上线时写一条上线记录,下线时写一条下线记录,同时每隔小时询问传感器状态,也就是说1小时内没有记录的传感器视为不在线。

企业需要统计每个分钟单位内,最小、最大在线传感器的数量。

1、表结构

create table sensor_stat(  
  sid int,             -- 传感器ID  
  state boolean,       -- 传感器状态,true在线,false离线  
  crt_time timestamp   -- 状态上传时间  
);  

2、索引

create index idx_sensor_stat_1 on sensor_stat(sid, crt_time desc);  

写入1.101亿测试数据(我们假设这是1小时的数据写入量,全天写入26.424亿记录),1001个传感器ID。

insert into sensor_stat select random()*1000, (random()*1)::int::boolean, clock_timestamp() from generate_series(1,110100000);  

3、数据TTL,确保表比较瘦,只包含心跳时间范围内的数据。

由于每小时接收心跳,所以1小时内,必有数据,没有数据的传感器不计状态。因此我们保留1小时内的状态即可。

一种保留方法是pipelinedb,用法如下。

《数据保留时间窗口的使用》

另一种保留方法,使用两张表,轮询使用即可。

create table sensor_stat1 (  
  sid int,             -- 传感器ID  
  state boolean,       -- 传感器状态,true在线,false离线  
  crt_time timestamp   -- 状态上传时间  
);  
  
create table sensor_stat2 (  
  sid int,             -- 传感器ID  
  state boolean,       -- 传感器状态,true在线,false离线  
  crt_time timestamp   -- 状态上传时间  
);  

类似的用法如下

《PostgreSQL 数据rotate用法介绍 - 按时间覆盖历史数据》

4、使用递归查询,高效查询传感器的最终状态

with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select (t.sensor_stat).* from t where t.* is not null;  

执行计划如下

explain (analyze,verbose,timing,costs,buffers) with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat where state is true order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid and t1.state is true order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select (t.sensor_stat).* from t where t.* is not null;  
                                                                                      QUERY PLAN                                                                                        
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 CTE Scan on t  (cost=70.86..72.88 rows=100 width=13) (actual time=0.037..10.975 rows=1001 loops=1)  
   Output: (t.sensor_stat).sid, (t.sensor_stat).state, (t.sensor_stat).crt_time  
   Filter: (t.* IS NOT NULL)  
   Rows Removed by Filter: 1  
   Buffers: shared hit=5926  
   CTE t  
     ->  Recursive Union  (cost=0.57..70.86 rows=101 width=37) (actual time=0.030..10.293 rows=1002 loops=1)  
           Buffers: shared hit=5926  
           ->  Subquery Scan on "*SELECT* 1"  (cost=0.57..0.63 rows=1 width=37) (actual time=0.029..0.029 rows=1 loops=1)  
                 Output: "*SELECT* 1".sensor_stat  
                 Buffers: shared hit=5  
                 ->  Limit  (cost=0.57..0.62 rows=1 width=49) (actual time=0.028..0.028 rows=1 loops=1)  
                       Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time  
                       Buffers: shared hit=5  
                       ->  Index Scan using idx_sensor_stat_1 on public.sensor_stat  (cost=0.57..3180100.70 rows=55369290 width=49) (actual time=0.027..0.027 rows=1 loops=1)  
                             Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time  
                             Filter: (sensor_stat.state IS TRUE)  
                             Buffers: shared hit=5  
           ->  WorkTable Scan on t t_1  (cost=0.00..6.82 rows=10 width=32) (actual time=0.010..0.010 rows=1 loops=1002)  
                 Output: (SubPlan 1)  
                 Filter: ((t_1.sensor_stat).sid IS NOT NULL)  
                 Rows Removed by Filter: 0  
                 Buffers: shared hit=5921  
                 SubPlan 1  
                   ->  Limit  (cost=0.57..0.66 rows=1 width=49) (actual time=0.009..0.009 rows=1 loops=1001)  
                         Output: t1.*, t1.sid, t1.crt_time  
                         Buffers: shared hit=5921  
                         ->  Index Scan using idx_sensor_stat_1 on public.sensor_stat t1  (cost=0.57..1746916.71 rows=18456430 width=49) (actual time=0.009..0.009 rows=1 loops=1001)  
                               Output: t1.*, t1.sid, t1.crt_time  
                               Index Cond: (t1.sid > (t_1.sensor_stat).sid)  
                               Filter: (t1.state IS TRUE)  
                               Rows Removed by Filter: 1  
                               Buffers: shared hit=5921  
 Planning time: 0.180 ms  
 Execution time: 11.083 ms  
(35 rows)  

样例

 sid  | state |          crt_time            
------+-------+----------------------------  
    0 | t     | 2017-07-05 10:29:09.470687  
    1 | f     | 2017-07-05 10:29:09.465721  
    2 | t     | 2017-07-05 10:29:09.474216  
    3 | f     | 2017-07-05 10:29:09.473176  
    4 | t     | 2017-07-05 10:29:09.473179  
    5 | t     | 2017-07-05 10:29:09.473842  
......  
  996 | t     | 2017-07-05 10:29:09.469787  
  997 | f     | 2017-07-05 10:29:09.470983  
  998 | t     | 2017-07-05 10:29:09.47268  
  999 | t     | 2017-07-05 10:29:09.469192  
 1000 | t     | 2017-07-05 10:29:09.472195  
(1001 rows)  
  
Time: 11.067 ms  

效率很高,1.101亿数据,11毫秒获取最终在线状态。

在线的设备为state=t的。

with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;  
 count   
-------  
   491  
(1 row)  
  
Time: 10.182 ms  

5、统计任意时间点的传感器在线数量,如果每个设备上线的时间精确到秒(crt_time精确到秒),那么不管有多少条记录,一天最多需要统计86400个时间点的传感器在线数量。

例如统计 2017-07-05 10:29:09 时间点的传感器在线数量,加一个时间限制即可。

with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat where crt_time <= '2017-07-05 10:29:09' order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.crt_time <= '2017-07-05 10:29:09' and t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;  
  
 count   
-------  
   501  
(1 row)  
  
Time: 20.743 ms  

新增这个时间限制,会带来一定的性能影响,特别是如果这个时间是过去很久以前的时间,过滤会越多,性能下降越严重。

因此,建议实时,每秒发起一次查询请求,就不要加这个时间限制了。

6、一次性生成过去每一秒的在线数。

使用窗口查询的帧查询技术。(帧表示按时间排序,截止到当前记录的区间。)

7、统计每分钟内,最高在线数、最低在线数。

每秒查询一次,将数据写入结果表。

create table result (crt_time timestamp(0) default now(), state boolean, cnt int);  
create index idx_result_1 on result using brin (crt_time);  
  
insert into result (state,cnt)  
with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select (t.sensor_stat).state, count(*) from t where t.* is not null group by 1;  
  
INSERT 0 2  
Time: 12.061 ms  
  
postgres=# select * from result ;  
      crt_time       | state | cnt   
---------------------+-------+-----  
 2017-07-05 11:11:03 | f     | 510  
 2017-07-05 11:11:03 | t     | 491  
(2 rows)  
  
Time: 0.274 ms  

由于每次查询仅需12毫秒,每秒调用一次没有问题。

统计某一分钟内,最高在线数、最低在线数。

select '2017-07-05 11:11:00', min(cnt), max(cnt) from result where crt_time between '2017-07-05 11:11:00' and '2017-07-05 11:12:00';  
  
or  
  
select to_char(crt_time, 'yyyy-mm-dd hh24:mi:00'), min(cnt), max(cnt) from result where crt_time between ? and ? group by 1;  

传感器ID很多很多时,如何优化

当传感器ID达到10万级别时,查询性能会下降到250毫秒。

如果传感器ID特别多,例如有百万以上,那么会下降到2.5秒。就不适合每秒查询一次了。

因此传感器数量特别多时,如何优化?

有一个比较好的方法是数据按传感器ID进行哈希分布,例如每张分区表负责1万个传感器ID。在查询在线数时,并发的查询所有的分区表,从而降低RT。

小结

使用本文提到的方法(递归查询),我们可以实现非常细粒度的,大量被跟踪物的状态实时统计。

用于绘制被跟踪物的实时状态图,例如:

1、实时热力图

2、实时传感器(或用户)在线、离线数,任意滑动窗口的最大最小在线、离线值。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
8月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错之往GREENPLUM 6 写数据,用postgresql-42.2.9.jar 报 ON CONFLICT (uuid) DO UPDATE SET 语法有问题。怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
8月前
|
关系型数据库 PostgreSQL
PostgreSQL排序字段不唯一导致分页查询结果出现重复数据
PostgreSQL排序字段不唯一导致分页查询结果出现重复数据
167 0
|
关系型数据库 MySQL Linux
TiDB实时同步数据到PostgreSQL(三) ---- 使用pgloader迁移数据
使用PostgreSQL数据迁移神器pgloader从TiDB迁移数据到PostgreSQL,同时说明如何在最新的Rocky Linux 9(CentOS 9 stream也适用)上通过源码编译安装pgloader。
|
1月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据文件
PostgreSQL的物理存储结构主要包括数据文件、日志文件等。数据文件按oid命名,超过1G时自动拆分。通过查询数据库和表的oid,可定位到具体的数据文件。例如,查询数据库oid后,再查询特定表的oid及relfilenode,即可找到该表对应的数据文件位置。
|
7月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
1029 0
|
7月前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
109 0
|
5月前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
592 0
|
5月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 关系型数据库 数据库
手把手教你管理PostgreSQL数据库及其对象
手把手教你管理PostgreSQL数据库及其对象
120 0
|
5月前
|
开发框架 关系型数据库 数据库
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版