PostgreSQL "物联网"应用 - 1 实时流式数据处理案例(万亿每天)

简介: 物联网的特点是万物联网,会产生大量的数据。 例如 : 一盒药,从生产,到运输,到药店,到售卖。每流经一个节点,都会记录它的信息。 又如 : 健康手环,儿童防丢手表,一些动物迁徙研究的传感器(如中华鲟),水纹监测,电网监测,煤气管道监测,气象监测等等这些信息。 股价的实时预测。 车流实时

物联网的一个特点是万物联网,会产生大量的数据。
例如 :
一盒药,从生产,到运输,到药店,到售卖。每流经一个节点,都会记录它的信息。
又如 :
健康手环,儿童防丢手表,一些动物迁徙研究的传感器(如中华鲟),水纹监测,电网监测,煤气管道监测,气象监测等等这些信息。
股价的实时预测。
车流实时数据统计,车辆轨迹实时合并。
商场人流实时统计。
数据监控实时处理,例如数据库的监控,服务器的监控,操作系统的监控等。
等等。。。。。。
传感器种类繁多,采集的数据量已经达到了海量。
这些数据比电商双十一的量有过之而不及,怎样才能处理好这些数据呢?如何做到实时的流式数据处理?

PostgreSQL提供了一个很好的基于流的数据处理产品,实时计算能力达到了单机10W记录/s(普通X86服务器)。

下面是应用CASE。

下载并安装pipelineDB,它是基于PostgreSQL改进的流式数据处理数据库。

# wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.8.5-centos6-x86_64.rpm  
#rpm -ivh pipelinedb-0.8.5-centos6-x86_64.rpm  --prefix=/home/digoal/pipelinedb  

配置环境变量脚本

$vi env_pipe.sh   
  
export PS1="$USER@`/bin/hostname -s`-> "  
export PGPORT=1922  
export PGDATA=/disk1/digoal/pipe/pg_root  
export LANG=en_US.utf8  
export PGHOME=/home/digoal/pipelinedb  
export LD_LIBRARY_PATH=/home/digoal/scws/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib:$LD_LIBRARY_PATH  
export DATE=`date +"%Y%m%d%H%M"`  
export PATH=/home/digoal/scws/bin:$PGHOME/bin:$PATH:.  
export MANPATH=$PGHOME/share/man:$MANPATH  
export PGHOST=$PGDATA  
export PGUSER=postgres  
export PGDATABASE=pipeline  
alias rm='rm -i'  
alias ll='ls -lh'  
unalias vi  
  
$ . ./env_pipe.sh  

初始化数据库

$ pipeline-init -D $PGDATA -U postgres -E UTF8 --locale=C -W  

配置参数

$ cd $PGDATA  
$ vi pipelinedb.conf  
listen_addresses = '0.0.0.0'            # what IP address(es) to listen on;  
port = 1922                            # (change requires restart)  
max_connections = 200                   # (change requires restart)  
unix_socket_directories = '.'   # comma-separated list of directories  
shared_buffers = 8GB                    # min 128kB  
maintenance_work_mem = 640MB            # min 1MB  
dynamic_shared_memory_type = posix      # the default is the first option  
synchronous_commit = off                # synchronization level;  
wal_buffers = 16MB                      # min 32kB, -1 sets based on shared_buffers  
wal_writer_delay = 10ms         # 1-10000 milliseconds  
checkpoint_segments = 400               # in logfile segments, min 1, 16MB each  
log_destination = 'csvlog'              # Valid values are combinations of  
logging_collector = on          # Enable capturing of stderr and csvlog  
log_timezone = 'PRC'  
datestyle = 'iso, mdy'  
timezone = 'PRC'  
lc_messages = 'C'                       # locale for system error message  
lc_monetary = 'C'                       # locale for monetary formatting  
lc_numeric = 'C'                        # locale for number formatting  
lc_time = 'C'                           # locale for time formatting  
default_text_search_config = 'pg_catalog.english'  
continuous_query_combiner_work_mem = 1GB  
continuous_query_batch_size = 100000  
continuous_query_num_combiners = 8  
continuous_query_num_workers = 4  
continuous_queries_enabled = on  

启动数据库

$ pipeline-ctl start  

创建流(从表里消费数据)
应用场景例子,

.1. 假设传感器会上传3个数据,分别是传感器ID,时间,以及采样值。
gid, crt_time, val
应用需要实时统计每分钟,每小时,每天,每个传感器上传的值的最大,最小,平均值,以及 count。
创建三个流视图,每个代表一个统计维度。
如下:

pipeline=# CREATE CONTINUOUS VIEW sv01  AS SELECT gid::int,date_trunc('min',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('min',crt_time);   
  
pipeline=# CREATE CONTINUOUS VIEW sv02  AS SELECT gid::int,date_trunc('hour',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('hour',crt_time);   
  
pipeline=# CREATE CONTINUOUS VIEW sv03  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('day',crt_time);   

激活流

pipeline=# activate;  
ACTIVATE  

插入数据测试

pipeline=# insert into stream01(gid,val,crt_time) values (1,1,now());  
INSERT 0 1  
pipeline=# select * from sv01;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  
  
pipeline=# select * from sv02;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  
  
pipeline=# select * from sv03;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 00:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

压力测试:
假设有10万个传感器,传感器上传的取值范围1到100。

$ vi test.sql  
\setrandom gid 1 100000  
\setrandom val 1 100  
insert into stream01(gid,val,crt_time) values (:gid,:val,now());  
  
./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 95949.9 tps, lat 0.247 ms stddev 0.575  
progress: 10.0 s, 98719.9 tps, lat 0.240 ms stddev 0.549  
progress: 15.0 s, 100207.8 tps, lat 0.237 ms stddev 0.573  
progress: 20.0 s, 101596.4 tps, lat 0.234 ms stddev 0.517  
progress: 25.0 s, 102830.4 tps, lat 0.231 ms stddev 0.492  
progress: 30.0 s, 103055.0 tps, lat 0.230 ms stddev 0.488  
progress: 35.0 s, 102786.0 tps, lat 0.231 ms stddev 0.482  
progress: 40.0 s, 99066.3 tps, lat 0.240 ms stddev 0.578  
progress: 45.0 s, 102912.5 tps, lat 0.231 ms stddev 0.494  
progress: 50.0 s, 100398.2 tps, lat 0.236 ms stddev 0.530  
progress: 55.0 s, 105719.8 tps, lat 0.224 ms stddev 0.425  
progress: 60.0 s, 99041.0 tps, lat 0.240 ms stddev 0.617  
progress: 65.0 s, 97087.0 tps, lat 0.245 ms stddev 0.619  
progress: 70.0 s, 95312.6 tps, lat 0.249 ms stddev 0.653  
progress: 75.0 s, 98768.3 tps, lat 0.240 ms stddev 0.593  
progress: 80.0 s, 106203.8 tps, lat 0.223 ms stddev 0.435  
progress: 85.0 s, 103423.1 tps, lat 0.229 ms stddev 0.480  
progress: 90.0 s, 106143.5 tps, lat 0.223 ms stddev 0.429  
progress: 95.0 s, 103514.5 tps, lat 0.229 ms stddev 0.478  
progress: 100.0 s, 100222.8 tps, lat 0.237 ms stddev 0.547  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 10114821  
latency average: 0.235 ms  
latency stddev: 0.530 ms  
tps = 101089.580065 (including connections establishing)  
tps = 101101.483296 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003051        \setrandom gid 1 100000  
        0.000866        \setrandom val 1 100  
        0.230430        insert into stream01(gid,val,crt_time) values (:gid,:val,now());  

每秒约处理10万记录,统计维度见上面的流SQL。

多轮测试后

pipeline=# select sum(count) from sv03;  
   sum      
----------  
 53022588  
(1 row)  
  
pipeline=# select * from sv01 limit 10;  
  gid  |     date_trunc      | max | min |          avg           | count   
-------+---------------------+-----+-----+------------------------+-------  
     1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
 53693 | 2015-12-15 13:47:00 |  68 |   1 |    28.0000000000000000 |     6  
   588 | 2015-12-15 13:47:00 |  88 |  11 |    47.6250000000000000 |     8  
 60154 | 2015-12-15 13:47:00 |  95 |   1 |    40.9090909090909091 |    11  
 38900 | 2015-12-15 13:47:00 |  90 |  17 |    57.2000000000000000 |     5  
 12784 | 2015-12-15 13:47:00 |  93 |  13 |    64.1250000000000000 |     8  
 79782 | 2015-12-15 13:47:00 |  60 |  16 |    43.1666666666666667 |     6  
  5122 | 2015-12-15 13:47:00 | 100 |   3 |    46.8333333333333333 |    12  
 97444 | 2015-12-15 13:47:00 |  98 |   9 |    59.5833333333333333 |    12  
 34209 | 2015-12-15 13:47:00 |  86 |  13 |    52.2857142857142857 |     7  
(10 rows)  
  
pipeline=# select * from sv02 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 91065 | 2015-12-15 14:00:00 | 100 |   0 | 51.4299065420560748 |   321  
 24081 | 2015-12-15 14:00:00 | 100 |   0 | 52.1649831649831650 |   297  
 29013 | 2015-12-15 14:00:00 | 100 |   0 | 50.9967213114754098 |   305  
 13134 | 2015-12-15 14:00:00 | 100 |   0 | 49.6968750000000000 |   320  
 84691 | 2015-12-15 14:00:00 | 100 |   0 | 49.5547445255474453 |   274  
 91059 | 2015-12-15 14:00:00 | 100 |   1 | 47.7536764705882353 |   272  
 50115 | 2015-12-15 14:00:00 | 100 |   1 | 49.4219269102990033 |   301  
 92610 | 2015-12-15 14:00:00 | 100 |   0 | 50.1197183098591549 |   284  
 36616 | 2015-12-15 14:00:00 | 100 |   1 | 48.8750000000000000 |   312  
 46390 | 2015-12-15 14:00:00 |  99 |   0 | 48.3246268656716418 |   268  
(10 rows)  
  
pipeline=# select * from sv03 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 68560 | 2015-12-15 00:00:00 | 100 |   0 | 51.2702702702702703 |   555  
 42241 | 2015-12-15 00:00:00 | 100 |   0 | 49.5266903914590747 |   562  
 64946 | 2015-12-15 00:00:00 | 100 |   0 | 48.2409177820267686 |   523  
  2451 | 2015-12-15 00:00:00 | 100 |   0 | 49.8153564899451554 |   547  
 11956 | 2015-12-15 00:00:00 | 100 |   0 | 51.2382739212007505 |   533  
 21578 | 2015-12-15 00:00:00 | 100 |   0 | 49.2959558823529412 |   544  
 36451 | 2015-12-15 00:00:00 | 100 |   0 | 51.1292035398230088 |   565  
 62380 | 2015-12-15 00:00:00 | 100 |   0 | 48.9099437148217636 |   533  
 51946 | 2015-12-15 00:00:00 | 100 |   0 | 51.0318091451292247 |   503  
 35084 | 2015-12-15 00:00:00 | 100 |   0 | 49.3613766730401530 |   523  
(10 rows)  

.2. 假设车辆运行过程中,每隔一段时间会上传位置信息,
gid, crt_time, poi
应用需要按天,绘制车辆的路径信息(把多个point聚合成路径类型,或者数组类型,或者字符串,。。。)。

假设有1000万量车,每辆车每次上传一个坐标和时间信息,(或者是一批信息)。
应用需求,
.2.1. 按天绘制车辆的路径信息
.2.2. 按小时统计每个区域有多少量车经过

创建流 (这里假设点信息已经经过了二进制编码,用一个INT8来表示,方便压力测试)

CREATE CONTINUOUS VIEW sv04  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),array_agg(poi::int8||' -> '||crt_time) FROM stream02 group by gid,date_trunc('day',crt_time);  

压力测试

$ vi test.sql  
\setrandom gid 1 10000000  
\setrandom poi 1 1000000000  
insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  
  
./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 106005.0 tps, lat 0.223 ms stddev 0.370  
progress: 10.0 s, 109884.8 tps, lat 0.216 ms stddev 0.347  
progress: 15.0 s, 111122.1 tps, lat 0.213 ms stddev 0.368  
progress: 20.0 s, 111987.0 tps, lat 0.212 ms stddev 0.353  
progress: 25.0 s, 111835.4 tps, lat 0.212 ms stddev 0.363  
progress: 30.0 s, 111759.7 tps, lat 0.212 ms stddev 0.366  
progress: 35.0 s, 112110.4 tps, lat 0.211 ms stddev 0.358  
progress: 40.0 s, 112185.4 tps, lat 0.211 ms stddev 0.352  
progress: 45.0 s, 113080.0 tps, lat 0.210 ms stddev 0.345  
progress: 50.0 s, 113205.4 tps, lat 0.209 ms stddev 0.353  
progress: 55.0 s, 113415.1 tps, lat 0.209 ms stddev 0.352  
progress: 60.0 s, 113519.8 tps, lat 0.209 ms stddev 0.342  
progress: 65.0 s, 112683.6 tps, lat 0.210 ms stddev 0.358  
progress: 70.0 s, 112748.3 tps, lat 0.210 ms stddev 0.360  
progress: 75.0 s, 112158.9 tps, lat 0.211 ms stddev 0.373  
progress: 80.0 s, 112580.8 tps, lat 0.210 ms stddev 0.355  
progress: 85.0 s, 111895.5 tps, lat 0.212 ms stddev 0.370  
progress: 90.0 s, 112229.2 tps, lat 0.211 ms stddev 0.442  
progress: 95.0 s, 104915.8 tps, lat 0.226 ms stddev 2.852  
progress: 100.0 s, 103079.9 tps, lat 0.230 ms stddev 2.054  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 11112035  
latency average: 0.213 ms  
latency stddev: 0.836 ms  
tps = 111106.652772 (including connections establishing)  
tps = 111118.651135 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.002939        \setrandom gid 1 10000000  
        0.000887        \setrandom poi 1 1000000000  
        0.209177        insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  
  
pipeline=# select * from sv04 limit 3;  
  448955 | 2015-12-15 00:00:00 | {"306029686 -> 2015-12-15 14:53:01.273121","885962518 -> 2015-12-15 14:53:03.352406"}  
 7271368 | 2015-12-15 00:00:00 | {"615447469 -> 2015-12-15 14:53:01.2616","944473391 -> 2015-12-15 14:53:04.543387"}  
 8613957 | 2015-12-15 00:00:00 | {"473349491 -> 2015-12-15 14:53:01.288332","125413709 -> 2015-12-15 14:53:08.742894"}  

.3. 按交警探头为单位,统计每个探头采集的车辆信息。
例如
.3.1 以车辆为单位,统计车辆的探头位置信息,串成轨迹数据。
.3.2 以探头为单位,统计每个路口的车流信息。(假设一个探头对应一个路口)

第一个需求和前面的绘制车辆轨迹例子一样,统计路口流量信息则是以探头ID为单位进行统计。
用法都差不多,不再举例

.4. 实时股价预测。
可以结合madlib或者plr进行多元回归,选择最好的R2,根据对应的截距和斜率推测下一组股价。
需要用到UDF,具体的用法参考我以前写的文章。
这里不再举例。

.5. 商场WIFI传感器的信息实时统计。
根据WIFI提供的位置信息,实时统计每个店铺的人流量。店铺的人均驻留时间,总计驻留时间。

.6. 假设你的数据处理场景,PG现有的函数无法处理怎么办?没问题,PG提供了自定义UDF,数据类型,操作符,索引方法等一系列API。你可以根据业务的需求,在此基础上实现。

用法还有很多,无法穷举。

下面再结合一个当下非常流行的消息队列,pipelineDB可以实时的从消息队列取数据并进行实时计算。
例子:
在本地起一个nginx服务端,并且使用siege模拟HTTP请求,nginx将记录这些行为,存储为JSON格式到文件中。
在本地起kafka服务端,使用kafkacat将nginx的访问日志不断的push到kafka。
在pipelinedb中订阅kafka的消息,并实时处理为想要的统计信息,(WEB页面的访问人数,延迟,等信息)

安装kafka

http://kafka.apache.org/07/quickstart.html  
  
# wget http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz  
# tar -zxvf kafka_2.10-0.8.2.2.tgz  
  
# git clone https://github.com/edenhill/librdkafka.git  
# cd librdkafka  
./configure  
make  
make install  
  
# git clone https://github.com/lloyd/yajl.git  
# cd yajl  
./configure  
make  
make install  
  
# vi /etc/ld.so.conf  
/usr/local/lib  
# ldconfig  
  
# git clone https://github.com/edenhill/kafkacat.git  
# cd kafkacat  
./configure  
make  
make install  

安装siege和nginx

# yum install -y siege nginx  

创建一个nginx配置文件,记录访问日志到/tmp/access.log,格式为json

cd /tmp  
  
cat <<EOF > nginx.conf  
worker_processes 4;  
pid $PWD/nginx.pid;  
events {}  
http {  
  
    log_format json   
    '{'  
        '"ts": "\$time_iso8601", '  
        '"user_agent": "\$http_user_agent", '  
        '"url": "\$request_uri", '  
        '"latency": "\$request_time",  '  
        '"user": "\$arg_user"'  
    '}';  
  
    access_log $PWD/access.log json;  
    error_log $PWD/error.log;  
  
    server {  
        location ~ ^/ {  
            return 200;  
        }  
    }  
}  
EOF  

启动nginx

nginx -c $PWD/nginx.conf -p $PWD/  

配置主机名

# hostname  
digoal.org  
# vi /etc/hosts  
127.0.0.1 digoal.org  

启动kafka

cd /opt/soft_bak/kafka_2.10-0.8.2.2  
bin/zookeeper-server-start.sh config/zookeeper.properties &  
bin/kafka-server-start.sh config/server.properties &  

产生一个随机URL文件

for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done  

使用siege模拟访问这些URL,nginx会产生访问日志到/tmp/access.log

siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1  
  
/tmp/access.log举例,格式为JSON  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002",  "user": "18583"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003",  "user": "24827"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003",  "user": "3988"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003",  "user": "18433"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001",  "user": "10801"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001",  "user": "4915"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001",  "user": "5367"}  

将访问日志输出到kafkacat,推送到kafka消息系统,对应的topic为logs_topic。

( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &  

原始的消费方式如下:

# cd /opt/soft_bak/kafka_2.10-0.8.2.2  
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning  
# Ctrl+C  

接下来我们使用pipelinedb来实时消费这些消息,并转化为需要的统计结果。

CREATE EXTENSION pipeline_kafka;  
SELECT kafka_add_broker('localhost:9092');  -- 添加一个kafka broker(kafka集群的一个节点)  
CREATE STREAM logs_stream (payload json);  -- 创建一个流映射到,kafka消息系统。  
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;   -- 创建一个流视图,实时消费,处理kafka消息。  
SELECT kafka_consume_begin('logs_topic', 'logs_stream');  -- 开始消费指定的topic,logs_topic,  
 kafka_consume_begin   
------------------  
 success  
(1 row)  

查询流视图,可以获得当前NGINX的访问统计。

SELECT * FROM message_count;  
 count   
--------  
  24  
(1 row)  
  
SELECT * FROM message_count;  
 count  
--------  
  36  
 success  
(1 row)  

接下来做一个更深入的实时分析,分析每个URL的访问次数,用户数,99%用户的访问延迟低于多少。

/*   
 * This function will strip away any query parameters from each url,  
 * as we're not interested in them.  
 */  
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')  
    RETURNS text  
AS 'textregexreplace_noopt'    -- textregexreplace_noopt@src/backend/utils/adt/regexp.c  
LANGUAGE internal;  
  
CREATE CONTINUOUS VIEW url_stats AS  
    SELECT  
        url, -- url地址  
    percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99,  -- 99%的URL访问延迟小于多少  
        count(DISTINCT user) AS uniques,  -- 唯一用户数  
    count(*) total_visits  -- 总共访问次数  
  FROM  
    (SELECT   
        url(payload->>'url'),  -- 地址  
        payload->>'user' AS user,  -- 用户ID  
        (payload->>'latency')::float * 1000 AS latency_ms,  -- 访问延迟  
        arrival_timestamp  
    FROM logs_stream) AS unpacked  
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'  
 GROUP BY url;  
  
CREATE CONTINUOUS VIEW user_stats AS  
    SELECT  
        day(arrival_timestamp),  
        payload->>'user' AS user,  
        sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,  
        sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,  
        count(DISTINCT url(payload->>'url')) AS unique_urls,  
        count(*) AS total_visits  
    FROM logs_stream GROUP BY payload->>'user', day;  
  
-- What are the top-10 most visited urls?  
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;  
      url      | total_visits   
---------------+--------------  
 /page62/path4 |        10182  
 /page51/path4 |        10181  
 /page24/path5 |        10180  
 /page93/path3 |        10180  
 /page81/path0 |        10180  
 /page2/path5  |        10180  
 /page75/path2 |        10179  
 /page28/path3 |        10179  
 /page40/path2 |        10178  
 /page74/path0 |        10176  
(10 rows)  
  
  
-- What is the 99th percentile latency across all urls?  
SELECT combine(p99) FROM url_stats;  
     combine        
------------------  
 6.95410494731137  
(1 row)  
  
-- What is the average conversion rate each day for the last month?  
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;  
          day           |            avg               
------------------------+----------------------------  
 2015-09-15 00:00:00-07 | 1.7455000000000000000000000  
(1 row)  
  
-- How many unique urls were visited each day for the last week?  
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;  
          day           | combine   
------------------------+---------  
 2015-09-15 00:00:00-07 |  100000  
(1 row)  
  
-- Is there a relationship between the number of unique urls visited and the highest conversion rates?  
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats  
    GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;  
 unique_urls |  conversion_rate    
-------------+-------------------  
          41 |  2.67121005785842  
          36 |  2.02713894173361  
          34 |  2.02034637010851  
          31 |  2.01958418072859  
          27 |  2.00045348712296  
          24 |  1.99714899522942  
          19 |  1.99438839453606  
          16 |  1.98083502184886  
          15 |  1.87983011139079  
          14 |  1.84906254929873  
(1 row)  

使用PipelineDB + kafka,应用场景又更丰富了。

如何构建更大的实时消息处理集群?
规划好数据的分片规则(避免跨节点的统计),如果有跨节点访问需求,可以在每个节点使用维度表,来实现。
例如每天要处理 万亿 条消息,怎么办?
根据以上压力测试,平均每台机器每秒处理10万记录(每天处理86亿),计算需要用到116台PostgreSQL。是不是很省心呢?
一个图例:
每一层都可以扩展
从lvs到 haproxy到 kafka到 PostgreSQL到 离线分析HAWQ。

1

目录
相关文章
|
2天前
|
存储 数据采集 物联网
物联网技术在物流领域的应用会遇到哪些挑战?
物联网技术在物流领域的应用会遇到哪些挑战?
12 4
|
4天前
|
监控 物联网 vr&ar
探索新技术趋势与应用:物联网与虚拟现实的前沿进展###
随着科技的飞速进步,物联网(IoT)和虚拟现实(VR)已从概念逐步走向实用化,深刻影响着人们的生活和工作方式。本文将探讨这两种新兴技术的发展趋势和应用场景,分析其对社会发展的深远影响,并对未来发展方向进行展望。通过详细分析,本文揭示了物联网和虚拟现实如何共同推动社会进步,并带来创新和可能性。 ###
|
4天前
|
传感器 存储 人工智能
通义灵码在跨领域应用拓展之物联网篇
在数字化时代,通义灵码作为一款强大的人工智能代码生成工具,正在物联网领域展现巨大潜力。本文将探讨其在设备端和云端的应用,包括传感器数据采集、设备控制、数据存储与管理、远程设备管理等方面,展示其提高开发效率、降低门槛及增强系统稳定性的优势。
|
5天前
|
供应链 监控 搜索推荐
物联网技术在物流领域的应用会带来哪些影响?
物联网技术在物流领域的应用会带来哪些影响?
25 2
|
6天前
|
传感器 监控 供应链
物联网技术在智慧工地中如何应用?
物联网技术在智慧工地的应用主要包括:实时项目状况监控,通过传感器监测环境条件;人员与设备管理,利用RFID或人脸识别技术记录考勤,实时追踪工人位置;环境与安全监控,确保符合安全标准;施工效率与成本控制,优化资源分配;远程监控与智能分析,支持远程决策;材料管理与供应链优化,减少浪费;智能施工与自动化,提高施工效率。这些应用提升了工地的智能化水平,加强了安全管理,促进了建筑行业的数字化转型。
|
6天前
|
存储 算法 安全
消息认证码(MAC)在物联网发布者中如何应用
消息认证码(MAC)在物联网发布者中的应用主要是为了确保数据的完整性和来源的真实性。通过使用密钥生成的MAC值,可以验证发送者身份和数据未被篡改,从而提高物联网系统的安全性和可靠性。
|
5天前
|
供应链 物联网 区块链
新技术趋势与应用:探索区块链、物联网和虚拟现实的未来
【10月更文挑战第37天】本文将探讨新兴技术如区块链、物联网和虚拟现实的发展趋势和应用场景。我们将从这些技术的基本原理出发,分析它们在当前社会中的应用,并预测未来的发展方向。同时,我们还将提供一些代码示例,以帮助读者更好地理解这些技术。
|
关系型数据库 分布式数据库 PolarDB
《阿里云产品手册2022-2023 版》——PolarDB for PostgreSQL
《阿里云产品手册2022-2023 版》——PolarDB for PostgreSQL
363 0
|
存储 缓存 关系型数据库
|
存储 SQL 并行计算
PolarDB for PostgreSQL 开源必读手册-开源PolarDB for PostgreSQL架构介绍(中)
PolarDB for PostgreSQL 开源必读手册-开源PolarDB for PostgreSQL架构介绍
419 0

相关产品

  • 物联网平台