背景
在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。
UDF输入
1、优先模式(同步、异步)
2、同步等待超时时间
当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。
当优先为异步模式时,假设当前为同步配置,自动降级为异步。
当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。
使用技术点:
1、alter system
2、reload conf
3、cancle backend
4、dblink 异步调用
心跳UDF逻辑
判断当前实例状态
只读
退出
读写
判断当前事务模式
异步
发心跳
优先模式是什么
异步
退出
同步
判断是否需要升级
升级
退出
同步
消耗异步消息
发远程心跳
查询是否超时
降级
否则
消耗异步消息
优先模式是什么
异步
降级
退出
同步
退出
设计
1、当前postgresql.conf配置
synchronous_commit='remote_write';
synchronous_standby_names='*';
表示同步模式。
2、心跳表设计
create table t_keepalive(id int primary key, ts timestamp, pos pg_lsn);
3、心跳写入方法
insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos;
4、创建一个建立连接函数,不报错
create or replace function conn(
name, -- dblink名字
text -- 连接串,URL
) returns void as $$
declare
begin
perform dblink_connect($1, $2);
return;
exception when others then
return;
end;
$$ language plpgsql strict;
5、更加以上逻辑创建心跳UDF。
create or replace function keepalive (
prio_commit_mode text,
tmout interval
) returns t_keepalive as $$
declare
res1 int;
res2 timestamp;
res3 pg_lsn;
commit_mode text;
conn text := format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database());
conn_altersys text := format('hostaddr=%s port=%s user=%s dbname=%s', '127.0.0.1', current_setting('port'), current_user, current_database());
app_prefix_stat text := 'keepalive_dblink';
begin
if prio_commit_mode not in ('sync','async') then
raise notice 'prio_commit_mode must be [sync|async]';
return null;
end if;
show synchronous_commit into commit_mode;
create extension IF NOT EXISTS dblink;
-- 判断当前实例状态
if pg_is_in_recovery()
-- 只读
then
raise notice 'Current instance in recovery mode.';
return null;
-- 读写
else
-- 判断当前事务模式
if commit_mode in ('local','off')
-- 异步
then
-- 发心跳
insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos into res1,res2,res3;
-- 优先模式是什么
if prio_commit_mode='async'
-- 异步
then
-- 退出
return row(res1,res2,res3)::t_keepalive;
-- 同步
else
-- 判断是否需要升级
perform 1 from pg_stat_replication where state='streaming' limit 1;
if found
-- 升级
then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=remote_write', true);
perform pg_reload_conf();
-- 退出
return row(res1,res2,res3)::t_keepalive;
end if;
return row(res1,res2,res3)::t_keepalive;
end if;
-- 同步
else
-- 消耗异步消息
perform conn(app_prefix_stat, conn||app_prefix_stat);
perform t from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
-- 发远程心跳
perform dblink_send_query(app_prefix_stat, $_$ insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos $_$);
-- 查询是否超时
<<ablock>>
loop
perform pg_sleep(0.2);
perform 1 from pg_stat_activity where application_name=app_prefix_stat and state='idle' limit 1;
-- 未超时
if found then
select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
raise notice 'no timeout';
exit ablock;
end if;
perform 1 from pg_stat_activity where wait_event='SyncRep' and application_name=app_prefix_stat and clock_timestamp()-query_start > tmout limit 1;
-- 降级
if found then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);
perform pg_reload_conf();
perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';
select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
raise notice 'timeout';
exit ablock;
end if;
perform pg_sleep(0.2);
end loop;
-- 优先模式是什么
if prio_commit_mode='async'
-- 异步
then
show synchronous_commit into commit_mode;
-- 降级
if commit_mode in ('on','remote_write','remote_apply')
then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);
perform pg_reload_conf();
perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';
end if;
-- 退出
return row(res1,res2,res3)::t_keepalive;
-- 同步
else
-- 退出
return row(res1,res2,res3)::t_keepalive;
end if;
end if;
end if;
end;
$$ language plpgsql strict;
测试
1、当前为同步模式
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
remote_write
(1 row)
2、人为关闭从库,心跳自动将数据库改成异步模式,并通知所有等待中会话。
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
NOTICE: timeout
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:48:39.800829 | 23/9501D5F8
(1 row)
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
local
(1 row)
3、恢复从库,心跳自动将数据库升级为优先sync模式。
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:48:47.329119 | 23/9501D6E8
(1 row)
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
NOTICE: no timeout
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:49:11.991855 | 23/9501E0C8
(1 row)
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
remote_write
(1 row)
小结
在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。
UDF输入
1、优先模式(同步、异步)
2、同步等待超时时间
当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。
当优先为异步模式时,假设当前为同步配置,自动降级为异步。
当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。
使用技术点:
1、alter system
2、reload conf
3、cancle backend
4、dblink 异步调用
使用心跳实现半同步,大大简化了整个同步、异步模式切换的流程。当然如果内核层面可以实现,配置几个参数,会更加完美。
参考
dblin 异步
《PostgreSQL 数据库心跳(SLA(RPO)指标的时间、WAL SIZE维度计算)》
《PostgreSQL 双节点流复制如何同时保证可用性、可靠性(rpo,rto) - (半同步,自动降级方法实践)》
PostgreSQL 许愿链接
您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.