PostgreSQL pipelinedb 流计算插件 - IoT应用 - 实时轨迹聚合

本文涉及的产品
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS SQL Server,基础系列 2核4GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 标签PostgreSQL , IoT , 轨迹聚合 , pipelinedb , 流计算 , 实时聚合背景IoT场景,车联网场景,共享单车场景,人的行为位点等,终端实时上报的是孤立的位点,我们需要将其补齐成轨迹。

标签

PostgreSQL , IoT , 轨迹聚合 , pipelinedb , 流计算 , 实时聚合


背景

IoT场景,车联网场景,共享单车场景,人的行为位点等,终端实时上报的是孤立的位点,我们需要将其补齐成轨迹。

例如共享单车,下单,开锁,生成订单,骑行,关闭订单,关锁。这个过程有一个唯一的订单号,每次上报的位点会包含时间,订单号,位置。

根据订单号,将点聚合为轨迹。

使用pipelinedb插件,可以实时的实现聚合。

例子

以ECS (centos 7.x x64), postgresql 10 为例

1、编译zeromq

wget https://github.com/zeromq/libzmq/releases/download/v4.2.5/zeromq-4.2.5.tar.gz  
tar -zxvf zeromq-4.2.5.tar.gz  
cd zeromq-4.2.5  
./configure  
make  
make install  

2、编译pipelinedb

wget https://github.com/pipelinedb/pipelinedb/archive/1.0.0rev4.tar.gz  
tar -zxvf 1.0.0rev4.tar.gz   
cd pipelinedb-1.0.0rev4/  
  
vi Makefile  
  
SHLIB_LINK += /usr/local/lib/libzmq.so -lstdc++  
  
. /var/lib/pgsql/env.sh 1925  
  
USE_PGXS=1 make  
USE_PGXS=1 make install  

3、配置postgresql.conf

max_worker_processes = 512  
shared_preload_libraries = 'pipelinedb'  
pipelinedb.stream_insert_level=async
pipelinedb.num_combiners=8
pipelinedb.num_workers=16
pipelinedb.num_queues=16
pipelinedb.fillfactor=75
pipelinedb.continuous_queries_enabled=true  

重启

pg_ctl restart -m fast  

4、安装插件

postgres=# create extension pipelinedb;  

5、创建stream,实时写入轨迹点

CREATE FOREIGN TABLE s1 ( order_id int8, ts timestamp, pos geometry )  
SERVER pipelinedb;  

6、创建Continue view,实时聚合

CREATE VIEW cv1 WITH (action=materialize ) AS   
select order_id, min(ts) min_ts, array_agg(ts||','||st_astext(pos)) as seg  
from s1  
group by order_id;  

激活视图(默认已激活)

select pipelinedb.activate('public.cv1');  

7、压测

vi test.sql  
\set order_id random(1,100000)  
\set x random(70,90)  
\set y random(120,125)  
insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 256 -j 256 -T 120  

8、压测结果

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 256  
number of threads: 256  
duration: 120 s  
number of transactions actually processed: 17614607  
latency average = 1.740 ms  
latency stddev = 1.730 ms  
tps = 146550.933776 (including connections establishing)  
tps = 146906.482277 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.002  \set order_id random(1,10000000)  
         0.001  \set x random(70,90)  
         0.000  \set y random(120,125)  
         1.742  insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));  
postgres=# \x  
Expanded display is on.  
-[ RECORD 17 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 8672585  
min_ts   | 2018-11-01 18:44:08.140027  
seg      | {"2018-11-01 18:44:08.140027,POINT(78.3615547642112 121.881739947945)","2018-11-01 18:44:11.739248,POINT(80.9645632216707 121.450987955555)"}  
-[ RECORD 18 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 4011211  
min_ts   | 2018-11-01 18:44:08.166407  
seg      | {"2018-11-01 18:44:08.166407,POINT(87.126777020283 132.819293198176)","2018-11-01 18:44:11.524995,POINT(80.482944605872 126.906906872056)"}  
-[ RECORD 19 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 2468486  
min_ts   | 2018-11-01 18:44:08.135136  
seg      | {"2018-11-01 18:44:08.135136,POINT(84.7732630362734 132.659516767599)","2018-11-01 18:44:20.603312,POINT(87.6352122295648 132.18647258915)","2018-11-01 18:44:19.447776,POINT(94.9817024609074 131.295661441982)"}  

9、历史轨迹的保留

设置cv生命周期,自动清理老化数据

postgres=# select pipelinedb.set_ttl('cv1', interval '1 hour' , 'min_ts');  
-[ RECORD 1 ]-----  
set_ttl | (3600,2)  

创建目标持久化表

create table cv1_persist (like cv1);  

创建时间字段索引(CV1)

postgres=# create index idx_1 on cv1 (min_ts);  
CREATE INDEX  

ETL形式,将数据从cv抽取到目标持久化表

postgres=# insert into cv1_persist select * from cv1 where min_ts <= '2018-01-01';  
INSERT 0 0  

参考

https://github.com/pipelinedb/pipelinedb

http://zeromq.org/intro:get-the-software

https://www.pipelinedb.com/

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
存储 关系型数据库 物联网
沉浸式学习PostgreSQL|PolarDB 14: 共享单车、徒步、旅游、网约车轨迹查询
本文的目的是帮助你了解如何设计轨迹表, 如何高性能的写入、查询、分析轨迹数据.
699 0
|
6月前
|
人工智能 自然语言处理 关系型数据库
|
5月前
|
XML 关系型数据库 数据库
使用mybatis-generator插件生成postgresql数据库model、mapper、xml
使用mybatis-generator插件生成postgresql数据库model、mapper、xml
498 0
|
6月前
|
SQL JSON 关系型数据库
[UE虚幻引擎插件DTPostgreSQL] PostgreSQL Connector 使用蓝图连接操作 PostgreSQL 数据库说明
本插件主要是支持在UE蓝图中连接和操作PostgreSQL 数据库。
61 2
|
关系型数据库 定位技术 分布式数据库
沉浸式学习PostgreSQL|PolarDB 18: 通过GIS轨迹相似伴随|时态分析|轨迹驻点识别等技术对拐卖、诱骗场景进行侦查
本文主要教大家怎么用好数据库, 而不是怎么运维管理数据库、怎么开发数据库内核.
1304 1
|
6月前
|
存储 JSON 关系型数据库
PostgreSQL Json应用场景介绍和Shared Detoast优化
PostgreSQL Json应用场景介绍和Shared Detoast优化
|
SQL 关系型数据库 Go
《增强你的PostgreSQL:最佳扩展和插件推荐》
《增强你的PostgreSQL:最佳扩展和插件推荐》
946 0
|
6月前
|
关系型数据库 数据库 PostgreSQL
Docker【应用 03】给Docker部署的PostgreSQL数据库安装PostGIS插件(安装流程及问题说明)
Docker【应用 03】给Docker部署的PostgreSQL数据库安装PostGIS插件(安装流程及问题说明)
379 0
|
6月前
|
关系型数据库 数据库 PostgreSQL
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
575 1
|
6月前
|
SQL 关系型数据库 C语言
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
95 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版