PostgreSQL 时序数据案例 - 时间流逝, 自动压缩, 同比\环比

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介:

标签

PostgreSQL , 时序数据 , rrd , rrdtool , round robin database , 自动压缩 , CTE , dml returning , 环比 , 同比 , KNN


背景

时序数据库一个重要的特性是时间流逝压缩,例如1天前压缩为5分钟一个点,7天前压缩为30分钟一个点。

PostgreSQL 压缩算法可定制。例如简单的平均值、最大值、最小值压缩,或者基于旋转门压缩算法的压缩。

《[未完待续] SQL流式案例 - 旋转门压缩(前后计算相关滑窗处理例子)》

《旋转门数据压缩算法在PostgreSQL中的实现 - 流式压缩在物联网、监控、传感器等场景的应用》

本文介绍一种简单压缩的场景,类似RRD数据库,按时间维度,压缩为 平均值、最大值、最小值、总和、记录数 等维度。

本文还介绍了窗口查询、同比、环比UDF(含KNN计算)、按时间分组均匀写入、等高级SQL用法。

设计

pic

明细表

create table tbl (  
  id serial8 primary key,  -- 主键  
  sid int,                 -- 传感器ID  
  hid int,                 -- 指标ID  
  val float8,              -- 采集值  
  ts timestamp             -- 采集时间  
);   
  
create index idx_tbl on tbl(ts);  

压缩表

1、5分钟级压缩表

create table tbl_5min (  
  id serial8 primary key,  -- 主键  
  sid int,                 -- 传感器ID  
  hid int,                 -- 指标ID  
  val float8,              -- 继承,平均值,方便做环比分析  
  ts timestamp,            -- 继承,开始时间,方便做环比分析  
  val_min float8,              -- 最小值  
  val_max float8,              -- 最大值  
  val_sum float8,              -- 和  
  val_count float8,            -- 采集次数  
  ts_start timestamp,      -- 区间开始时间  
  ts_end timestamp         -- 区间结束时间  
);   
  
alter table tbl_5min inherit tbl;  

2、30分钟级压缩表

create table tbl_30min (  
  id serial8 primary key,  -- 主键  
  sid int,                 -- 传感器ID  
  hid int,                 -- 指标ID  
  val float8,              -- 继承,平均值,方便做环比分析  
  ts timestamp,            -- 继承,开始时间,方便做环比分析  
  val_min float8,              -- 最小值  
  val_max float8,              -- 最大值  
  val_sum float8,              -- 和  
  val_count float8,            -- 采集次数  
  ts_start timestamp,      -- 区间开始时间  
  ts_end timestamp         -- 区间结束时间  
);   
  
alter table tbl_30min inherit tbl;  

3、5分钟级压缩语句

with tmp1 as (  
  delete from only tbl where ts <= now()-interval '1 day' returning *  
)  
insert into tbl_5min  
  (sid, hid, val, ts, val_min, val_max, val_sum, val_count, ts_start, ts_end)   
select sid, hid, avg(val) as val, min(ts) as ts, min(val) as val_min, max(val) as val_max, sum(val) as val_sum, count(*) as val_count, min(ts) as ts_start, max(ts) as ts_end from   
tmp1  
group by sid, hid, substring(to_char(ts, 'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts, 'yyyymmddhh24mi'), 11, 2)::int / 5) * 5)::text, 2, '0');  

4、30分钟级压缩语句

with tmp1 as (  
  delete from only tbl_5min where ts_start <= now()-interval '1 day' returning *  
)  
insert into tbl_30min  
  (sid, hid, val_min, val_max, val_sum, val_count, ts_start, ts_end)  
select sid, hid, min(val_min) as val_min, max(val_max) as val_max, sum(val_sum) as val_sum, sum(val_count) as val_count, min(ts_start) as ts_start, max(ts_end) as ts_end from   
tmp1     
group by sid, hid, substring(to_char(ts_start, 'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts_start, 'yyyymmddhh24mi'), 11, 2)::int / 30) * 30)::text, 2, '0');  

DEMO

1、写入明细测试数据,1亿条,分布到10天。

insert into tbl (sid, hid, val, ts) select random()*1000, random()*5, random()*100,   -- 1000个传感器,每个传感器5个指标。  
  now()-interval '10 day' + (id * ((10*24*60*60/100000000.0)||' sec')::interval)   -- 倒推10天为起点 + (id * 每条记录的耗时)  
from generate_series(1,100000000) t(id);   

2、5分钟压缩调度,1天前的数据,每隔1小时调度一次以下SQL。

with tmp1 as (  
  delete from only tbl where ts <= now()-interval '1 day' returning *  
)  
insert into tbl_5min  
  (sid, hid, val, ts, val_min, val_max, val_sum, val_count, ts_start, ts_end)   
select sid, hid, avg(val) as val, min(ts) as ts, min(val) as val_min, max(val) as val_max, sum(val) as val_sum, count(*) as val_count, min(ts) as ts_start, max(ts) as ts_end from   
tmp1  
group by sid, hid, substring(to_char(ts, 'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts, 'yyyymmddhh24mi'), 11, 2)::int / 5) * 5)::text, 2, '0');  

3、30分钟压缩调度,7天前的数据,每隔1天调度一次以下SQL。

with tmp1 as (  
  delete from only tbl_5min where ts_start <= now()-interval '1 day' returning *  
)  
insert into tbl_30min  
  (sid, hid, val_min, val_max, val_sum, val_count, ts_start, ts_end)  
select sid, hid, min(val_min) as val_min, max(val_max) as val_max, sum(val_sum) as val_sum, sum(val_count) as val_count, min(ts_start) as ts_start, max(ts_end) as ts_end from   
tmp1     
group by sid, hid, substring(to_char(ts_start, 'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts_start, 'yyyymmddhh24mi'), 11, 2)::int / 30) * 30)::text, 2, '0');  

小结

1、根据interval取时间分组,用整型除法+乘法。

例子:

5分钟:

substring(to_char(ts, 'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts, 'yyyymmddhh24mi'), 11, 2)::int / 5) * 5)::text, 2, '0');  

30分钟:

substring(to_char(ts_start, 'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts_start, 'yyyymmddhh24mi'), 11, 2)::int / 30) * 30)::text, 2, '0')  

2、生成均匀分布的时序数据,使用PG的interval和generate_series,可以将写入时间均匀分配到对应区间。

insert into tbl (sid, hid, val, ts) select random()*1000, random()*5, random()*100,   -- 1000个传感器,每个传感器5个指标。  
  now()-interval '10 day' + (id * ((10*24*60*60/100000000.0)||' sec')::interval)   -- 倒推10天为起点 + (id * 每条记录的耗时)  
from generate_series(1,100000000) t(id);   

3、时序数据库一个重要的特性是时间流逝压缩,例如1天前压缩为5分钟一个点,7天前压缩为30分钟一个点。

PostgreSQL 压缩算法可定制。例如简单的平均值、最大值、最小值压缩,或者基于旋转门压缩算法的压缩。

本文介绍了一种简单压缩的场景,类似RRD数据库,按时间维度,压缩为 平均值、最大值、最小值、总和、记录数 等维度。

加上调度即可:

《PostgreSQL 定时任务方法2》

《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》

4、压缩后包含区间、最大值、最小值、平均值、点数等值,可以用于绘制图形。

5、结合PG的窗口函数,很容易绘制同比、环比的图形,SQL例句:

索引,加速

create index idx_tbl_2 on tbl using btree (sid, hid, ts);  
create index idx_tbl_5min_2 on tbl_5min using btree (sid, hid, ts);  
create index idx_tbl_30min_2 on tbl_30min using btree (sid, hid, ts);  

复合类型,返回环比值

create type tp as (id int8, sid int, hid int, val float8, ts timestamp);  

获取环比值函数,返回指定SID,HID在某个时间点附近的一条记录,含KNN算法

create or replace function get_val(v_sid int, v_hid int, v_ts timestamp) returns tp as $$  
select t.tp from 
(
select 
(select (id, sid, hid, val, ts)::tp tp from only tbl where sid=1 and hid=1 and ts>= now() limit 1) 
union all 
select 
(select (id, sid, hid, val, ts)::tp tp from only tbl where sid=1 and hid=1 and ts< now() limit 1)
) t
order by (t.tp).ts limit 1;
$$ language sql strict;  

同比、周环比、月环比(这些值也可以自动生成,避免每次查询时计算):

select   
sid,   
hid,   
val,   
lag(val) over w1,                           -- 同比  
get_val(sid, hid, ts-interval '1 week'),    -- 周环比  
get_val(sid, hid, ts-interval '1 month')    -- 月环比  
  from tbl         -- where ...  ,时间区间打点。   
window w1 as (partition by sid, hid order by ts)   
;   

6、结合PG的线性回归,可以绘制预测指标。以下为详细介绍的例子:

《PostgreSQL 多元线性回归 - 2 股票预测》

《在PostgreSQL中用线性回归分析linear regression做预测 - 例子2, 预测未来数日某股收盘价》

《PostgreSQL 线性回归 - 股价预测 1》

《在PostgreSQL中用线性回归分析(linear regression) - 实现数据预测》

7、将压缩表继承到明细表,方便开发的使用,不需要再写UNION的SQL,直接查明细表,即可得到所有数据(包括压缩数据)。

相关案例

《超时流式处理 - 没有消息流入的数据异常监控》

《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》

《PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《基于PostgreSQL的流式PipelineDB, 1000万/s实时统计不是梦》

参考

1、https://en.wikipedia.org/wiki/RRDtool

2、https://oss.oetiker.ch/rrdtool/

RRD is round robin database.

RRDtool is the OpenSource industry standard, high performance data logging and graphing system for time series data.

RRDtool can be easily integrated in shell scripts, perl, python, ruby, lua or tcl applications.

3、https://www.postgresql.org/docs/10/static/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS

https://www.postgresql.org/docs/10/static/functions-window.html

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
5月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错之往GREENPLUM 6 写数据,用postgresql-42.2.9.jar 报 ON CONFLICT (uuid) DO UPDATE SET 语法有问题。怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
关系型数据库 PostgreSQL
PostgreSQL排序字段不唯一导致分页查询结果出现重复数据
PostgreSQL排序字段不唯一导致分页查询结果出现重复数据
113 0
|
关系型数据库 MySQL Linux
TiDB实时同步数据到PostgreSQL(三) ---- 使用pgloader迁移数据
使用PostgreSQL数据迁移神器pgloader从TiDB迁移数据到PostgreSQL,同时说明如何在最新的Rocky Linux 9(CentOS 9 stream也适用)上通过源码编译安装pgloader。
|
4月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
997 0
|
4月前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
208 0
|
2月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
开发框架 关系型数据库 数据库
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。
|
4月前
|
关系型数据库 5G PostgreSQL
postgreSQL 导出数据、导入
postgreSQL 导出数据、导入
49 1
|
4月前
|
SQL 关系型数据库 PostgreSQL
【sql】PostgreSQL物化视图表使用案例
【sql】PostgreSQL物化视图表使用案例
44 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版