在物联网应用场景中,有大量的传感器,会产生非常大量的消息以极高的并发进入数据库。
这些数据如果直接进入面向OLAP场景设计的数据仓库,数据实时入库会成为瓶颈,并且OLAP系统很难接受非常高并发的请求。
面对这样的应用场景,这些既要又要还要怎么满足呢?
.1. 既要实时入库,
.2. 又要实时分析,
.3. 还要历史留档,应对随时变化的分析需求。
实时入库比较容易满足,我前些天写过一篇 "PostgreSQL 如何潇洒的处理每天上百TB的数据增量"
https://yq.aliyun.com/articles/8528
实时分析也比较好满足,我前些天写过一篇 "PostgreSQL "物联网"应用 - 1 实时流式数据处理案例(万亿每天)"
https://yq.aliyun.com/articles/166
历史留档,应对随时变化的分析需求。这一点的需求其实也非常简单,其实就是在满足了前面两点后,把数据LOAD到OLAP系统。
但是不要小看这个非常简单的操作,做到实时性,一致性是非常关键的。
一般的做法存在的gap问题(一致性问题)
GAP问题可解,例如通过快照或者单线程来解,太low了。
以前写过关于解GAP问题的一系列文章:
.1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
.2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
.3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/
.4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/
.5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/
.6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/
.7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/
.8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/
GAP问题出现的原因,用一张图来表示:
简单来说,就是读取数据的事务快照把一些未提交,但是序列或时间靠前的记录屏蔽了。下次再读取时就会产生GAP,实时性越高,产生GAP的概率越高。有GAP,OLTP和OLAP系统的数据就会不一致。
传统的解决这个问题的办法:
.1. 延迟同步,例如同步一个小时前的数据,来减少GAP。
.2. 串行插入,数据串行插入,不存在GAP。
.3. 在记录中添加一个XID字段,记录数据插入的事务号;读取数据时通过事务快照,记录未提交的事务XID;下次再次读取数据时,根据快照中表示未结束事务的XID,以及行上的XID找到这些GAP记录。
不用多说,前面几种方法,都有一定的弊端。
要解决实时性问题,又要高逼格。
PostgreSQL的阅后即焚完美的解决了以上问题,可以完美的实现并发性,一致性,实时性。
并发指并发的插入和并发的读取;
一致性指数据进去N条,出去一定是N条;
实时性,指数据可以实时|流式的取走,不需要设间隔;
阅后即焚的语法很简单,例子:
postgres=# create table tbl(id serial, crt_time timestamp, info jsonb default '
{
"k1": "v1",
"k2": "v2",
"k3": "v3",
"k4": {
"subk1": "subv1",
"subk2": "subv2",
"subk3": {
"ssubk1": "ssubv1"
}
}
}
');
postgres=# insert into tbl (crt_time) select clock_timestamp() from generate_series(1,1000);
INSERT 0 1000
postgres=# select * from tbl limit 1;
id | crt_time | info
----+----------------------------+-----------------------------------------------------------------------------------------------------------------
1 | 2016-04-13 15:02:06.603235 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
(1 row)
postgres=# select * from tbl limit 5;
id | crt_time | info
----+----------------------------+-----------------------------------------------------------------------------------------------------------------
1 | 2016-04-13 15:02:06.603235 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
2 | 2016-04-13 15:02:06.60337 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
3 | 2016-04-13 15:02:06.603375 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
4 | 2016-04-13 15:02:06.603378 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
5 | 2016-04-13 15:02:06.603379 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
(5 rows)
阅后即焚:
postgres=# delete from tbl where id<=5 returning *;
id | crt_time | info
----+----------------------------+-----------------------------------------------------------------------------------------------------------------
1 | 2016-04-13 15:02:06.603235 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
2 | 2016-04-13 15:02:06.60337 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
3 | 2016-04-13 15:02:06.603375 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
4 | 2016-04-13 15:02:06.603378 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
5 | 2016-04-13 15:02:06.603379 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
(5 rows)
DELETE 5
postgres=# select count(*) from tbl where id<=5;
count
-------
0
(1 row)
下面进行并发测试,
验证一致性,实时性,并发性:
postgres=# create table tbl(id serial, crt_time timestamp, info jsonb default '
{
"k1": "v1",
"k2": "v2",
"k3": "v3",
"k4": {
"subk1": "subv1",
"subk2": "subv2",
"subk3": {
"ssubk1": "ssubv1"
}
}
}
');
create index idx_tbl_1 on tbl(crt_time);
create table tbl1(like tbl including all);
create or replace function r_d(lmt int) returns setof tbl as
$$
declare
curs1 cursor for select * from tbl order by crt_time limit lmt for update SKIP LOCKED;
begin
for res in curs1 loop
delete from tbl where current of curs1;
return next res;
end loop;
return;
end;
$$
language plpgsql;
并发插入2小时:
vi ins.sql
insert into tbl (crt_time) select clock_timestamp() from generate_series(1,5000);
pgbench -M prepared -n -r -P 5 -f ./ins.sql -c 64 -j 64 -T 7200 &
并发阅后即焚2小时:
vi r_d.sql
insert into tbl1 select * from r_d(100000);
pgbench -M prepared -n -r -P 5 -f ./r_d.sql -c 64 -j 64 -T 7200 &
验证插入和阅后即焚的记录数一致。
性能指标(64张表并发测试写入和阅后即焚的性能指标):
插入: 230万行/s
阅后即焚: 384万行/s
这种技术在其他应用场景的使用:
.1. 延迟确认,在短信确认的应用中非常常见,如订阅一个运营商的业务,一般会收到二次确认的短信。
服务端会向数据库插入一条记录,然后等待用户反馈,反馈后更新之前插入的那条记录的状态。
insert into tbl values () returning id;
commit;
then wait user's response
update tbl set ... where id=xxx;
commimt;
.2. 相关的用法(oracle也支持这种用法) :
insert into tbl values () returning *;
delete from tbl where ... returning *;
update tbl set xxx=xxx where xxx returning *;
skip locked; -- oracle 11G以后也支持这种用法
扩展阅读:
"PostgreSQL 如何潇洒的处理每天上百TB的数据增量"
https://yq.aliyun.com/articles/8528
"PostgreSQL "物联网"应用 - 1 实时流式数据处理案例(万亿每天)"
https://yq.aliyun.com/articles/166
PostgreSQL的其他特性也非常的适合物联网:
JSON支持, GIS支持, 窗口查询, 树形查询, 轻数据分析, 范围类型, 范围索引 等等。