PostgreSQL pipelinedb 流计算插件 - IoT应用 - 实时轨迹聚合

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介: 标签PostgreSQL , IoT , 轨迹聚合 , pipelinedb , 流计算 , 实时聚合背景IoT场景,车联网场景,共享单车场景,人的行为位点等,终端实时上报的是孤立的位点,我们需要将其补齐成轨迹。

标签

PostgreSQL , IoT , 轨迹聚合 , pipelinedb , 流计算 , 实时聚合


背景

IoT场景,车联网场景,共享单车场景,人的行为位点等,终端实时上报的是孤立的位点,我们需要将其补齐成轨迹。

例如共享单车,下单,开锁,生成订单,骑行,关闭订单,关锁。这个过程有一个唯一的订单号,每次上报的位点会包含时间,订单号,位置。

根据订单号,将点聚合为轨迹。

使用pipelinedb插件,可以实时的实现聚合。

例子

以ECS (centos 7.x x64), postgresql 10 为例

1、编译zeromq

wget https://github.com/zeromq/libzmq/releases/download/v4.2.5/zeromq-4.2.5.tar.gz  
tar -zxvf zeromq-4.2.5.tar.gz  
cd zeromq-4.2.5  
./configure  
make  
make install  

2、编译pipelinedb

wget https://github.com/pipelinedb/pipelinedb/archive/1.0.0rev4.tar.gz  
tar -zxvf 1.0.0rev4.tar.gz   
cd pipelinedb-1.0.0rev4/  
  
vi Makefile  
  
SHLIB_LINK += /usr/local/lib/libzmq.so -lstdc++  
  
. /var/lib/pgsql/env.sh 1925  
  
USE_PGXS=1 make  
USE_PGXS=1 make install  

3、配置postgresql.conf

max_worker_processes = 512  
shared_preload_libraries = 'pipelinedb'  
pipelinedb.stream_insert_level=async
pipelinedb.num_combiners=8
pipelinedb.num_workers=16
pipelinedb.num_queues=16
pipelinedb.fillfactor=75
pipelinedb.continuous_queries_enabled=true  

重启

pg_ctl restart -m fast  

4、安装插件

postgres=# create extension pipelinedb;  

5、创建stream,实时写入轨迹点

CREATE FOREIGN TABLE s1 ( order_id int8, ts timestamp, pos geometry )  
SERVER pipelinedb;  

6、创建Continue view,实时聚合

CREATE VIEW cv1 WITH (action=materialize ) AS   
select order_id, min(ts) min_ts, array_agg(ts||','||st_astext(pos)) as seg  
from s1  
group by order_id;  

激活视图(默认已激活)

select pipelinedb.activate('public.cv1');  

7、压测

vi test.sql  
\set order_id random(1,100000)  
\set x random(70,90)  
\set y random(120,125)  
insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 256 -j 256 -T 120  

8、压测结果

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 256  
number of threads: 256  
duration: 120 s  
number of transactions actually processed: 17614607  
latency average = 1.740 ms  
latency stddev = 1.730 ms  
tps = 146550.933776 (including connections establishing)  
tps = 146906.482277 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.002  \set order_id random(1,10000000)  
         0.001  \set x random(70,90)  
         0.000  \set y random(120,125)  
         1.742  insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));  
postgres=# \x  
Expanded display is on.  
-[ RECORD 17 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 8672585  
min_ts   | 2018-11-01 18:44:08.140027  
seg      | {"2018-11-01 18:44:08.140027,POINT(78.3615547642112 121.881739947945)","2018-11-01 18:44:11.739248,POINT(80.9645632216707 121.450987955555)"}  
-[ RECORD 18 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 4011211  
min_ts   | 2018-11-01 18:44:08.166407  
seg      | {"2018-11-01 18:44:08.166407,POINT(87.126777020283 132.819293198176)","2018-11-01 18:44:11.524995,POINT(80.482944605872 126.906906872056)"}  
-[ RECORD 19 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 2468486  
min_ts   | 2018-11-01 18:44:08.135136  
seg      | {"2018-11-01 18:44:08.135136,POINT(84.7732630362734 132.659516767599)","2018-11-01 18:44:20.603312,POINT(87.6352122295648 132.18647258915)","2018-11-01 18:44:19.447776,POINT(94.9817024609074 131.295661441982)"}  

9、历史轨迹的保留

设置cv生命周期,自动清理老化数据

postgres=# select pipelinedb.set_ttl('cv1', interval '1 hour' , 'min_ts');  
-[ RECORD 1 ]-----  
set_ttl | (3600,2)  

创建目标持久化表

create table cv1_persist (like cv1);  

创建时间字段索引(CV1)

postgres=# create index idx_1 on cv1 (min_ts);  
CREATE INDEX  

ETL形式,将数据从cv抽取到目标持久化表

postgres=# insert into cv1_persist select * from cv1 where min_ts <= '2018-01-01';  
INSERT 0 0  

参考

https://github.com/pipelinedb/pipelinedb

http://zeromq.org/intro:get-the-software

https://www.pipelinedb.com/

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
1月前
|
传感器 运维 数据可视化
AR眼镜巡检系统在工业互联网的应用:AR+IoT
AR与IoT融合构建虚实闭环,IoT采集实时数据,AR直观呈现并交互,形成感知-分析-决策-行动高效闭环,提升运维效率。
|
存储 边缘计算 安全
边缘计算的概念和在IoT中的应用
随着物联网(IoT)设备数量的激增,传统的云计算模式面临着数据传输延迟和带宽压力等问题。边缘计算作为一种新的计算模式,通过将计算资源和服务部署到靠近数据源的位置,解决了这些问题。
240 2
|
8月前
|
SQL 人工智能 关系型数据库
【PG锦囊】阿里云 RDS PostgreSQL 版插件—AI 插件(rds_ai)
本文介绍了AI 插件(rds_ai)的核心优势、适用场景等,帮助您更好地了解 rds_ai 插件。想了解更多 RDS 插件信息和讨论交流,欢迎加入 RDS PG 插件用户专项服务群(103525002795)
|
传感器 存储 机器学习/深度学习
物联网(IoT)简介:定义、技术与应用
【5月更文挑战第30天】物联网(IoT)是将物品通过嵌入式系统、传感器及通信技术连接至互联网,实现物物、物人交互和数据共享的技术。其关键包括传感器、通信、嵌入式系统、云计算和人工智能技术。物联网应用于智能家居、智慧城市、工业自动化、农业和健康医疗等领域,通过Arduino等平台可实现简单数据传输。随着技术发展,物联网将深远影响人们生活和工作方式。
4145 3
|
10月前
|
安全 物联网 物联网安全
揭秘区块链技术在物联网(IoT)安全中的革新应用
揭秘区块链技术在物联网(IoT)安全中的革新应用
|
8月前
|
SQL NoSQL 关系型数据库
RDS PostgreSQL版发布 rds_duckdb 插件!
RDS PostgreSQL版内置DuckDB,结合列存储与向量化执行的优势,在RDS PG内实现复杂SQL查询加速和ETL功能,复杂查询效率提升30X,想了解更多rds_duckdb信息和讨论交流,欢迎加入RDS PG插件用户专项服务群(103525002795)
|
物联网 Linux C#
一键掌控未来!用 Uno Platform 打造跨平台 IoT 应用,轻松连接你的智能设备,让生活更智能!
微软的开源跨平台框架 Uno Platform 支持使用 C# 和 XAML 一次性编写代码并部署至多个平台,如 Windows、macOS、Linux、WebAssembly 及 iOS/Android,这使其成为 IoT 设备开发的理想选择。本文通过创建控制网络 LED 灯的应用,详细介绍了 Uno Platform 的环境搭建及 MQTT 客户端配置过程,实现了 LED 状态订阅与控制指令发送功能。该案例展示了 Uno Platform 在 IoT 领域的潜力及其跨平台优势,未来可扩展至更多设备类型,构建智能家居系统。
600 58
|
10月前
|
存储 安全 物联网
C# 在物联网 (IoT) 应用中的应用
本文介绍了C#在物联网(IoT)应用中的应用,涵盖基础概念、优势、常见问题及其解决方法。重点讨论了网络通信、数据处理和安全问题,并提供了相应的代码示例,旨在帮助开发者更好地利用C#进行IoT开发。
447 3
|
10月前
|
SQL 监控 物联网
ClickHouse在物联网(IoT)中的应用:实时监控与分析
【10月更文挑战第27天】随着物联网(IoT)技术的快速发展,越来越多的设备被连接到互联网上,产生了海量的数据。这些数据不仅包含了设备的状态信息,还包括用户的使用习惯、环境参数等。如何高效地处理和分析这些数据,成为了一个重要的挑战。作为一位数据工程师,我在一个物联网项目中深入使用了ClickHouse,以下是我的经验和思考。
566 0
|
存储 物联网 关系型数据库
PolarDB在物联网(IoT)数据存储中的应用探索
【9月更文挑战第6天】随着物联网技术的发展,海量设备数据对实时存储和处理提出了更高要求。传统数据库在扩展性、性能及实时性方面面临挑战。阿里云推出的PolarDB具备高性能、高可靠及高扩展性特点,能有效应对这些挑战。它采用分布式存储架构,支持多副本写入优化、并行查询等技术,确保数据实时写入与查询;多副本存储架构和数据持久化存储机制保证了数据安全;支持动态调整数据库规模,适应设备和数据增长。通过API或SDK接入IoT设备,实现数据实时写入、分布式存储与高效查询,展现出在IoT数据存储领域的巨大潜力。
230 1

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多