PostgreSQL "物联网"应用 - 1 实时流式数据处理案例(万亿每天)

简介: 物联网的特点是万物联网,会产生大量的数据。 例如 : 一盒药,从生产,到运输,到药店,到售卖。每流经一个节点,都会记录它的信息。 又如 : 健康手环,儿童防丢手表,一些动物迁徙研究的传感器(如中华鲟),水纹监测,电网监测,煤气管道监测,气象监测等等这些信息。 股价的实时预测。 车流实时

物联网的一个特点是万物联网,会产生大量的数据。
例如 :
一盒药,从生产,到运输,到药店,到售卖。每流经一个节点,都会记录它的信息。
又如 :
健康手环,儿童防丢手表,一些动物迁徙研究的传感器(如中华鲟),水纹监测,电网监测,煤气管道监测,气象监测等等这些信息。
股价的实时预测。
车流实时数据统计,车辆轨迹实时合并。
商场人流实时统计。
数据监控实时处理,例如数据库的监控,服务器的监控,操作系统的监控等。
等等。。。。。。
传感器种类繁多,采集的数据量已经达到了海量。
这些数据比电商双十一的量有过之而不及,怎样才能处理好这些数据呢?如何做到实时的流式数据处理?

PostgreSQL提供了一个很好的基于流的数据处理产品,实时计算能力达到了单机10W记录/s(普通X86服务器)。

下面是应用CASE。

下载并安装pipelineDB,它是基于PostgreSQL改进的流式数据处理数据库。

# wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.8.5-centos6-x86_64.rpm  
#rpm -ivh pipelinedb-0.8.5-centos6-x86_64.rpm  --prefix=/home/digoal/pipelinedb  

配置环境变量脚本

$vi env_pipe.sh   
  
export PS1="$USER@`/bin/hostname -s`-> "  
export PGPORT=1922  
export PGDATA=/disk1/digoal/pipe/pg_root  
export LANG=en_US.utf8  
export PGHOME=/home/digoal/pipelinedb  
export LD_LIBRARY_PATH=/home/digoal/scws/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib:$LD_LIBRARY_PATH  
export DATE=`date +"%Y%m%d%H%M"`  
export PATH=/home/digoal/scws/bin:$PGHOME/bin:$PATH:.  
export MANPATH=$PGHOME/share/man:$MANPATH  
export PGHOST=$PGDATA  
export PGUSER=postgres  
export PGDATABASE=pipeline  
alias rm='rm -i'  
alias ll='ls -lh'  
unalias vi  
  
$ . ./env_pipe.sh  

初始化数据库

$ pipeline-init -D $PGDATA -U postgres -E UTF8 --locale=C -W  

配置参数

$ cd $PGDATA  
$ vi pipelinedb.conf  
listen_addresses = '0.0.0.0'            # what IP address(es) to listen on;  
port = 1922                            # (change requires restart)  
max_connections = 200                   # (change requires restart)  
unix_socket_directories = '.'   # comma-separated list of directories  
shared_buffers = 8GB                    # min 128kB  
maintenance_work_mem = 640MB            # min 1MB  
dynamic_shared_memory_type = posix      # the default is the first option  
synchronous_commit = off                # synchronization level;  
wal_buffers = 16MB                      # min 32kB, -1 sets based on shared_buffers  
wal_writer_delay = 10ms         # 1-10000 milliseconds  
checkpoint_segments = 400               # in logfile segments, min 1, 16MB each  
log_destination = 'csvlog'              # Valid values are combinations of  
logging_collector = on          # Enable capturing of stderr and csvlog  
log_timezone = 'PRC'  
datestyle = 'iso, mdy'  
timezone = 'PRC'  
lc_messages = 'C'                       # locale for system error message  
lc_monetary = 'C'                       # locale for monetary formatting  
lc_numeric = 'C'                        # locale for number formatting  
lc_time = 'C'                           # locale for time formatting  
default_text_search_config = 'pg_catalog.english'  
continuous_query_combiner_work_mem = 1GB  
continuous_query_batch_size = 100000  
continuous_query_num_combiners = 8  
continuous_query_num_workers = 4  
continuous_queries_enabled = on  

启动数据库

$ pipeline-ctl start  

创建流(从表里消费数据)
应用场景例子,

.1. 假设传感器会上传3个数据,分别是传感器ID,时间,以及采样值。
gid, crt_time, val
应用需要实时统计每分钟,每小时,每天,每个传感器上传的值的最大,最小,平均值,以及 count。
创建三个流视图,每个代表一个统计维度。
如下:

pipeline=# CREATE CONTINUOUS VIEW sv01  AS SELECT gid::int,date_trunc('min',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('min',crt_time);   
  
pipeline=# CREATE CONTINUOUS VIEW sv02  AS SELECT gid::int,date_trunc('hour',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('hour',crt_time);   
  
pipeline=# CREATE CONTINUOUS VIEW sv03  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('day',crt_time);   

激活流

pipeline=# activate;  
ACTIVATE  

插入数据测试

pipeline=# insert into stream01(gid,val,crt_time) values (1,1,now());  
INSERT 0 1  
pipeline=# select * from sv01;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  
  
pipeline=# select * from sv02;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  
  
pipeline=# select * from sv03;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 00:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

压力测试:
假设有10万个传感器,传感器上传的取值范围1到100。

$ vi test.sql  
\setrandom gid 1 100000  
\setrandom val 1 100  
insert into stream01(gid,val,crt_time) values (:gid,:val,now());  
  
./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 95949.9 tps, lat 0.247 ms stddev 0.575  
progress: 10.0 s, 98719.9 tps, lat 0.240 ms stddev 0.549  
progress: 15.0 s, 100207.8 tps, lat 0.237 ms stddev 0.573  
progress: 20.0 s, 101596.4 tps, lat 0.234 ms stddev 0.517  
progress: 25.0 s, 102830.4 tps, lat 0.231 ms stddev 0.492  
progress: 30.0 s, 103055.0 tps, lat 0.230 ms stddev 0.488  
progress: 35.0 s, 102786.0 tps, lat 0.231 ms stddev 0.482  
progress: 40.0 s, 99066.3 tps, lat 0.240 ms stddev 0.578  
progress: 45.0 s, 102912.5 tps, lat 0.231 ms stddev 0.494  
progress: 50.0 s, 100398.2 tps, lat 0.236 ms stddev 0.530  
progress: 55.0 s, 105719.8 tps, lat 0.224 ms stddev 0.425  
progress: 60.0 s, 99041.0 tps, lat 0.240 ms stddev 0.617  
progress: 65.0 s, 97087.0 tps, lat 0.245 ms stddev 0.619  
progress: 70.0 s, 95312.6 tps, lat 0.249 ms stddev 0.653  
progress: 75.0 s, 98768.3 tps, lat 0.240 ms stddev 0.593  
progress: 80.0 s, 106203.8 tps, lat 0.223 ms stddev 0.435  
progress: 85.0 s, 103423.1 tps, lat 0.229 ms stddev 0.480  
progress: 90.0 s, 106143.5 tps, lat 0.223 ms stddev 0.429  
progress: 95.0 s, 103514.5 tps, lat 0.229 ms stddev 0.478  
progress: 100.0 s, 100222.8 tps, lat 0.237 ms stddev 0.547  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 10114821  
latency average: 0.235 ms  
latency stddev: 0.530 ms  
tps = 101089.580065 (including connections establishing)  
tps = 101101.483296 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003051        \setrandom gid 1 100000  
        0.000866        \setrandom val 1 100  
        0.230430        insert into stream01(gid,val,crt_time) values (:gid,:val,now());  

每秒约处理10万记录,统计维度见上面的流SQL。

多轮测试后

pipeline=# select sum(count) from sv03;  
   sum      
----------  
 53022588  
(1 row)  
  
pipeline=# select * from sv01 limit 10;  
  gid  |     date_trunc      | max | min |          avg           | count   
-------+---------------------+-----+-----+------------------------+-------  
     1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
 53693 | 2015-12-15 13:47:00 |  68 |   1 |    28.0000000000000000 |     6  
   588 | 2015-12-15 13:47:00 |  88 |  11 |    47.6250000000000000 |     8  
 60154 | 2015-12-15 13:47:00 |  95 |   1 |    40.9090909090909091 |    11  
 38900 | 2015-12-15 13:47:00 |  90 |  17 |    57.2000000000000000 |     5  
 12784 | 2015-12-15 13:47:00 |  93 |  13 |    64.1250000000000000 |     8  
 79782 | 2015-12-15 13:47:00 |  60 |  16 |    43.1666666666666667 |     6  
  5122 | 2015-12-15 13:47:00 | 100 |   3 |    46.8333333333333333 |    12  
 97444 | 2015-12-15 13:47:00 |  98 |   9 |    59.5833333333333333 |    12  
 34209 | 2015-12-15 13:47:00 |  86 |  13 |    52.2857142857142857 |     7  
(10 rows)  
  
pipeline=# select * from sv02 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 91065 | 2015-12-15 14:00:00 | 100 |   0 | 51.4299065420560748 |   321  
 24081 | 2015-12-15 14:00:00 | 100 |   0 | 52.1649831649831650 |   297  
 29013 | 2015-12-15 14:00:00 | 100 |   0 | 50.9967213114754098 |   305  
 13134 | 2015-12-15 14:00:00 | 100 |   0 | 49.6968750000000000 |   320  
 84691 | 2015-12-15 14:00:00 | 100 |   0 | 49.5547445255474453 |   274  
 91059 | 2015-12-15 14:00:00 | 100 |   1 | 47.7536764705882353 |   272  
 50115 | 2015-12-15 14:00:00 | 100 |   1 | 49.4219269102990033 |   301  
 92610 | 2015-12-15 14:00:00 | 100 |   0 | 50.1197183098591549 |   284  
 36616 | 2015-12-15 14:00:00 | 100 |   1 | 48.8750000000000000 |   312  
 46390 | 2015-12-15 14:00:00 |  99 |   0 | 48.3246268656716418 |   268  
(10 rows)  
  
pipeline=# select * from sv03 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 68560 | 2015-12-15 00:00:00 | 100 |   0 | 51.2702702702702703 |   555  
 42241 | 2015-12-15 00:00:00 | 100 |   0 | 49.5266903914590747 |   562  
 64946 | 2015-12-15 00:00:00 | 100 |   0 | 48.2409177820267686 |   523  
  2451 | 2015-12-15 00:00:00 | 100 |   0 | 49.8153564899451554 |   547  
 11956 | 2015-12-15 00:00:00 | 100 |   0 | 51.2382739212007505 |   533  
 21578 | 2015-12-15 00:00:00 | 100 |   0 | 49.2959558823529412 |   544  
 36451 | 2015-12-15 00:00:00 | 100 |   0 | 51.1292035398230088 |   565  
 62380 | 2015-12-15 00:00:00 | 100 |   0 | 48.9099437148217636 |   533  
 51946 | 2015-12-15 00:00:00 | 100 |   0 | 51.0318091451292247 |   503  
 35084 | 2015-12-15 00:00:00 | 100 |   0 | 49.3613766730401530 |   523  
(10 rows)  

.2. 假设车辆运行过程中,每隔一段时间会上传位置信息,
gid, crt_time, poi
应用需要按天,绘制车辆的路径信息(把多个point聚合成路径类型,或者数组类型,或者字符串,。。。)。

假设有1000万量车,每辆车每次上传一个坐标和时间信息,(或者是一批信息)。
应用需求,
.2.1. 按天绘制车辆的路径信息
.2.2. 按小时统计每个区域有多少量车经过

创建流 (这里假设点信息已经经过了二进制编码,用一个INT8来表示,方便压力测试)

CREATE CONTINUOUS VIEW sv04  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),array_agg(poi::int8||' -> '||crt_time) FROM stream02 group by gid,date_trunc('day',crt_time);  

压力测试

$ vi test.sql  
\setrandom gid 1 10000000  
\setrandom poi 1 1000000000  
insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  
  
./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 106005.0 tps, lat 0.223 ms stddev 0.370  
progress: 10.0 s, 109884.8 tps, lat 0.216 ms stddev 0.347  
progress: 15.0 s, 111122.1 tps, lat 0.213 ms stddev 0.368  
progress: 20.0 s, 111987.0 tps, lat 0.212 ms stddev 0.353  
progress: 25.0 s, 111835.4 tps, lat 0.212 ms stddev 0.363  
progress: 30.0 s, 111759.7 tps, lat 0.212 ms stddev 0.366  
progress: 35.0 s, 112110.4 tps, lat 0.211 ms stddev 0.358  
progress: 40.0 s, 112185.4 tps, lat 0.211 ms stddev 0.352  
progress: 45.0 s, 113080.0 tps, lat 0.210 ms stddev 0.345  
progress: 50.0 s, 113205.4 tps, lat 0.209 ms stddev 0.353  
progress: 55.0 s, 113415.1 tps, lat 0.209 ms stddev 0.352  
progress: 60.0 s, 113519.8 tps, lat 0.209 ms stddev 0.342  
progress: 65.0 s, 112683.6 tps, lat 0.210 ms stddev 0.358  
progress: 70.0 s, 112748.3 tps, lat 0.210 ms stddev 0.360  
progress: 75.0 s, 112158.9 tps, lat 0.211 ms stddev 0.373  
progress: 80.0 s, 112580.8 tps, lat 0.210 ms stddev 0.355  
progress: 85.0 s, 111895.5 tps, lat 0.212 ms stddev 0.370  
progress: 90.0 s, 112229.2 tps, lat 0.211 ms stddev 0.442  
progress: 95.0 s, 104915.8 tps, lat 0.226 ms stddev 2.852  
progress: 100.0 s, 103079.9 tps, lat 0.230 ms stddev 2.054  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 11112035  
latency average: 0.213 ms  
latency stddev: 0.836 ms  
tps = 111106.652772 (including connections establishing)  
tps = 111118.651135 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.002939        \setrandom gid 1 10000000  
        0.000887        \setrandom poi 1 1000000000  
        0.209177        insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  
  
pipeline=# select * from sv04 limit 3;  
  448955 | 2015-12-15 00:00:00 | {"306029686 -> 2015-12-15 14:53:01.273121","885962518 -> 2015-12-15 14:53:03.352406"}  
 7271368 | 2015-12-15 00:00:00 | {"615447469 -> 2015-12-15 14:53:01.2616","944473391 -> 2015-12-15 14:53:04.543387"}  
 8613957 | 2015-12-15 00:00:00 | {"473349491 -> 2015-12-15 14:53:01.288332","125413709 -> 2015-12-15 14:53:08.742894"}  

.3. 按交警探头为单位,统计每个探头采集的车辆信息。
例如
.3.1 以车辆为单位,统计车辆的探头位置信息,串成轨迹数据。
.3.2 以探头为单位,统计每个路口的车流信息。(假设一个探头对应一个路口)

第一个需求和前面的绘制车辆轨迹例子一样,统计路口流量信息则是以探头ID为单位进行统计。
用法都差不多,不再举例

.4. 实时股价预测。
可以结合madlib或者plr进行多元回归,选择最好的R2,根据对应的截距和斜率推测下一组股价。
需要用到UDF,具体的用法参考我以前写的文章。
这里不再举例。

.5. 商场WIFI传感器的信息实时统计。
根据WIFI提供的位置信息,实时统计每个店铺的人流量。店铺的人均驻留时间,总计驻留时间。

.6. 假设你的数据处理场景,PG现有的函数无法处理怎么办?没问题,PG提供了自定义UDF,数据类型,操作符,索引方法等一系列API。你可以根据业务的需求,在此基础上实现。

用法还有很多,无法穷举。

下面再结合一个当下非常流行的消息队列,pipelineDB可以实时的从消息队列取数据并进行实时计算。
例子:
在本地起一个nginx服务端,并且使用siege模拟HTTP请求,nginx将记录这些行为,存储为JSON格式到文件中。
在本地起kafka服务端,使用kafkacat将nginx的访问日志不断的push到kafka。
在pipelinedb中订阅kafka的消息,并实时处理为想要的统计信息,(WEB页面的访问人数,延迟,等信息)

安装kafka

http://kafka.apache.org/07/quickstart.html  
  
# wget http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz  
# tar -zxvf kafka_2.10-0.8.2.2.tgz  
  
# git clone https://github.com/edenhill/librdkafka.git  
# cd librdkafka  
./configure  
make  
make install  
  
# git clone https://github.com/lloyd/yajl.git  
# cd yajl  
./configure  
make  
make install  
  
# vi /etc/ld.so.conf  
/usr/local/lib  
# ldconfig  
  
# git clone https://github.com/edenhill/kafkacat.git  
# cd kafkacat  
./configure  
make  
make install  

安装siege和nginx

# yum install -y siege nginx  

创建一个nginx配置文件,记录访问日志到/tmp/access.log,格式为json

cd /tmp  
  
cat <<EOF > nginx.conf  
worker_processes 4;  
pid $PWD/nginx.pid;  
events {}  
http {  
  
    log_format json   
    '{'  
        '"ts": "\$time_iso8601", '  
        '"user_agent": "\$http_user_agent", '  
        '"url": "\$request_uri", '  
        '"latency": "\$request_time",  '  
        '"user": "\$arg_user"'  
    '}';  
  
    access_log $PWD/access.log json;  
    error_log $PWD/error.log;  
  
    server {  
        location ~ ^/ {  
            return 200;  
        }  
    }  
}  
EOF  

启动nginx

nginx -c $PWD/nginx.conf -p $PWD/  

配置主机名

# hostname  
digoal.org  
# vi /etc/hosts  
127.0.0.1 digoal.org  

启动kafka

cd /opt/soft_bak/kafka_2.10-0.8.2.2  
bin/zookeeper-server-start.sh config/zookeeper.properties &  
bin/kafka-server-start.sh config/server.properties &  

产生一个随机URL文件

for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done  

使用siege模拟访问这些URL,nginx会产生访问日志到/tmp/access.log

siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1  
  
/tmp/access.log举例,格式为JSON  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002",  "user": "18583"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003",  "user": "24827"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003",  "user": "3988"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003",  "user": "18433"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001",  "user": "10801"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001",  "user": "4915"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001",  "user": "5367"}  

将访问日志输出到kafkacat,推送到kafka消息系统,对应的topic为logs_topic。

( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &  

原始的消费方式如下:

# cd /opt/soft_bak/kafka_2.10-0.8.2.2  
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning  
# Ctrl+C  

接下来我们使用pipelinedb来实时消费这些消息,并转化为需要的统计结果。

CREATE EXTENSION pipeline_kafka;  
SELECT kafka_add_broker('localhost:9092');  -- 添加一个kafka broker(kafka集群的一个节点)  
CREATE STREAM logs_stream (payload json);  -- 创建一个流映射到,kafka消息系统。  
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;   -- 创建一个流视图,实时消费,处理kafka消息。  
SELECT kafka_consume_begin('logs_topic', 'logs_stream');  -- 开始消费指定的topic,logs_topic,  
 kafka_consume_begin   
------------------  
 success  
(1 row)  

查询流视图,可以获得当前NGINX的访问统计。

SELECT * FROM message_count;  
 count   
--------  
  24  
(1 row)  
  
SELECT * FROM message_count;  
 count  
--------  
  36  
 success  
(1 row)  

接下来做一个更深入的实时分析,分析每个URL的访问次数,用户数,99%用户的访问延迟低于多少。

/*   
 * This function will strip away any query parameters from each url,  
 * as we're not interested in them.  
 */  
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')  
    RETURNS text  
AS 'textregexreplace_noopt'    -- textregexreplace_noopt@src/backend/utils/adt/regexp.c  
LANGUAGE internal;  
  
CREATE CONTINUOUS VIEW url_stats AS  
    SELECT  
        url, -- url地址  
    percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99,  -- 99%的URL访问延迟小于多少  
        count(DISTINCT user) AS uniques,  -- 唯一用户数  
    count(*) total_visits  -- 总共访问次数  
  FROM  
    (SELECT   
        url(payload->>'url'),  -- 地址  
        payload->>'user' AS user,  -- 用户ID  
        (payload->>'latency')::float * 1000 AS latency_ms,  -- 访问延迟  
        arrival_timestamp  
    FROM logs_stream) AS unpacked  
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'  
 GROUP BY url;  
  
CREATE CONTINUOUS VIEW user_stats AS  
    SELECT  
        day(arrival_timestamp),  
        payload->>'user' AS user,  
        sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,  
        sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,  
        count(DISTINCT url(payload->>'url')) AS unique_urls,  
        count(*) AS total_visits  
    FROM logs_stream GROUP BY payload->>'user', day;  
  
-- What are the top-10 most visited urls?  
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;  
      url      | total_visits   
---------------+--------------  
 /page62/path4 |        10182  
 /page51/path4 |        10181  
 /page24/path5 |        10180  
 /page93/path3 |        10180  
 /page81/path0 |        10180  
 /page2/path5  |        10180  
 /page75/path2 |        10179  
 /page28/path3 |        10179  
 /page40/path2 |        10178  
 /page74/path0 |        10176  
(10 rows)  
  
  
-- What is the 99th percentile latency across all urls?  
SELECT combine(p99) FROM url_stats;  
     combine        
------------------  
 6.95410494731137  
(1 row)  
  
-- What is the average conversion rate each day for the last month?  
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;  
          day           |            avg               
------------------------+----------------------------  
 2015-09-15 00:00:00-07 | 1.7455000000000000000000000  
(1 row)  
  
-- How many unique urls were visited each day for the last week?  
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;  
          day           | combine   
------------------------+---------  
 2015-09-15 00:00:00-07 |  100000  
(1 row)  
  
-- Is there a relationship between the number of unique urls visited and the highest conversion rates?  
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats  
    GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;  
 unique_urls |  conversion_rate    
-------------+-------------------  
          41 |  2.67121005785842  
          36 |  2.02713894173361  
          34 |  2.02034637010851  
          31 |  2.01958418072859  
          27 |  2.00045348712296  
          24 |  1.99714899522942  
          19 |  1.99438839453606  
          16 |  1.98083502184886  
          15 |  1.87983011139079  
          14 |  1.84906254929873  
(1 row)  

使用PipelineDB + kafka,应用场景又更丰富了。

如何构建更大的实时消息处理集群?
规划好数据的分片规则(避免跨节点的统计),如果有跨节点访问需求,可以在每个节点使用维度表,来实现。
例如每天要处理 万亿 条消息,怎么办?
根据以上压力测试,平均每台机器每秒处理10万记录(每天处理86亿),计算需要用到116台PostgreSQL。是不是很省心呢?
一个图例:
每一层都可以扩展
从lvs到 haproxy到 kafka到 PostgreSQL到 离线分析HAWQ。

1

目录
相关文章
|
21天前
|
存储 边缘计算 物联网
探索边缘计算:重塑物联网时代的数据处理格局
探索边缘计算:重塑物联网时代的数据处理格局
|
4月前
|
监控 物联网 关系型数据库
使用PostgreSQL触发器解决物联网设备状态同步问题
在物联网监控系统中,确保设备状态(如在线与离线)的实时性和准确性至关重要。当设备状态因外部因素改变时,需迅速反映到系统内部。因设备状态数据分布在不同表中,直接通过应用同步可能引入复杂性和错误。采用PostgreSQL触发器自动同步状态变化是一种高效方法。首先定义触发函数,在设备状态改变时更新管理模块表;然后创建触发器,在状态字段更新后执行此函数。此外,还需进行充分测试、监控性能并实施优化,以及在触发函数中加入错误处理和日志记录功能。这种方法不仅提高自动化程度,增强数据一致性与实时性,还需注意其对性能的影响并采取优化措施。
|
5月前
|
存储 数据采集 边缘计算
物联网设备的边缘计算与数据处理:技术革新与应用展望
【7月更文挑战第8天】物联网设备的边缘计算与数据处理技术是推动物联网技术发展的重要力量。通过将数据处理和决策推向设备边缘,边缘计算实现了低延迟、数据隐私和安全、带宽优化以及可靠性等优势,为物联网应用的实时性、智能化和高效性提供了有力保障。未来,随着技术的不断进步和创新,边缘计算将在物联网领域发挥更加重要的作用,推动物联网技术的进一步发展。
|
5月前
|
数据采集 存储 运维
物联网设备的数据处理与分析技术探讨
【7月更文挑战第2天】探索物联网(IoT)数据处理技术,涵盖数据采集(传感器、无线通信)、存储(分布式系统、NoSQL)、处理(清洗、压缩、转换)和分析(描述性、聚类、分类、异常检测)。未来趋势涉及AI集成、边缘计算、多模态处理和系统自主化。随着技术演进,期待更智能、高效的解决方案。
|
7月前
|
传感器 搜索推荐 安全
【Uniapp 专栏】从案例看 Uniapp 在物联网应用中的运用
【5月更文挑战第12天】Uniapp在物联网中展现出强大生命力,应用于智能家居系统,允许用户通过移动应用控制灯光、窗帘、家电等。通过网络通信与服务器连接,实现设备状态实时同步和用户指令准确传递。提供个性化场景设置,保证流畅体验并注重安全,支持数据加密和用户认证。结合传感器技术,实现环境监测。随着物联网发展,Uniapp有望在更多领域发挥关键作用,塑造更智能的未来。
353 3
|
6月前
|
SQL 关系型数据库 PostgreSQL
【sql】PostgreSQL物化视图表使用案例
【sql】PostgreSQL物化视图表使用案例
55 0
|
6月前
|
边缘计算 人工智能 物联网
边缘计算:为物联网带来更快速的数据处理
【6月更文挑战第4天】物联网快速发展,边缘计算应运而生,解决云计算的延迟与带宽挑战。边缘计算将处理能力移至数据源附近,提升响应速度,保障实时性应用(如自动驾驶)的安全性。同时,它减轻网络压力,增强数据隐私保护。示例代码展示如何在边缘设备上用Python进行实时人脸识别,体现边缘计算的实时性和隐私性。边缘计算结合5G、AI,将推动物联网进入新阶段,打造更智能、高效、安全的物联世界。
81 0
|
7月前
|
存储 JSON 关系型数据库
PostgreSQL Json应用场景介绍和Shared Detoast优化
PostgreSQL Json应用场景介绍和Shared Detoast优化
|
7月前
|
关系型数据库 数据库 PostgreSQL
Docker【应用 03】给Docker部署的PostgreSQL数据库安装PostGIS插件(安装流程及问题说明)
Docker【应用 03】给Docker部署的PostgreSQL数据库安装PostGIS插件(安装流程及问题说明)
418 0
|
7月前
|
关系型数据库 数据库 PostgreSQL
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
624 1

热门文章

最新文章

相关产品

  • 物联网平台