PostgreSQL 如何让心跳永远不死,支持半同步自动同步、异步升降级 - udf 心跳

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: PostgreSQL 如何让心跳永远不死,支持半同步自动同步、异步升降级 - udf 心跳

背景

在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。

UDF输入

1、优先模式(同步、异步)

2、同步等待超时时间

当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。

当优先为异步模式时,假设当前为同步配置,自动降级为异步。

当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。

使用技术点:

1、alter system

2、reload conf

3、cancle backend

4、dblink 异步调用

心跳UDF逻辑

判断当前实例状态  
  
  只读  
  
    退出  
  
  读写  
  
    判断当前事务模式   
  
      异步  
  
        发心跳  
  
        优先模式是什么  
  
          异步  
  
            退出  
  
          同步  
  
            判断是否需要升级  
  
              升级  
  
              退出  
  
  
      同步  
  
        消耗异步消息  
  
        发远程心跳  
  
        查询是否超时  
  
          降级  
  
        否则  
  
          消耗异步消息  
  
        优先模式是什么  
  
        异步  
  
          降级  
  
          退出  
  
        同步  
  
          退出  

设计

1、当前postgresql.conf配置

synchronous_commit='remote_write';  
synchronous_standby_names='*';  

表示同步模式。

2、心跳表设计

create table t_keepalive(id int primary key, ts timestamp, pos pg_lsn);  

3、心跳写入方法

insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos;  

4、创建一个建立连接函数,不报错

create or replace function conn(        
  name,   -- dblink名字        
  text    -- 连接串,URL        
) returns void as $$          
declare          
begin          
  perform dblink_connect($1, $2);         
  return;          
exception when others then          
  return;          
end;          
$$ language plpgsql strict;      

5、更加以上逻辑创建心跳UDF。

create or replace function keepalive (  
  prio_commit_mode text,    
  tmout interval  
) returns t_keepalive as $$  
declare  
  res1 int;  
  res2 timestamp;  
  res3 pg_lsn;  
  commit_mode text;  
  conn text := format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database());  
  conn_altersys text := format('hostaddr=%s port=%s user=%s dbname=%s', '127.0.0.1', current_setting('port'), current_user, current_database());  
  app_prefix_stat text := 'keepalive_dblink';  
begin  
  if prio_commit_mode not in ('sync','async') then  
    raise notice 'prio_commit_mode must be [sync|async]';  
    return null;  
  end if;  
  
  show synchronous_commit into commit_mode;  
  
  create extension IF NOT EXISTS dblink;  
  
  -- 判断当前实例状态  
  if pg_is_in_recovery()   
  
  -- 只读  
  then  
    raise notice 'Current instance in recovery mode.';  
    return null;  
      
  -- 读写  
  else  
  
    -- 判断当前事务模式   
    if commit_mode in ('local','off')  
  
    -- 异步  
    then  
  
      -- 发心跳  
      insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos into res1,res2,res3;  
  
      -- 优先模式是什么  
      if prio_commit_mode='async'   
  
      -- 异步  
      then  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 判断是否需要升级  
        perform 1 from pg_stat_replication where state='streaming' limit 1;  
        if found  
  
        -- 升级  
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=remote_write', true);   
          perform pg_reload_conf();   
  
          -- 退出  
          return row(res1,res2,res3)::t_keepalive;  
        end if;  
  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
  
  
    -- 同步  
    else  
  
      -- 消耗异步消息  
      perform conn(app_prefix_stat,  conn||app_prefix_stat);     
      perform t from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
  
      -- 发远程心跳  
      perform dblink_send_query(app_prefix_stat, $_$ insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos $_$);    
  
      -- 查询是否超时  
      <<ablock>>  
      loop  
        perform pg_sleep(0.2);  
  
        perform 1 from pg_stat_activity where application_name=app_prefix_stat and state='idle' limit 1;  
        -- 未超时  
        if found then  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'no timeout';  
          exit ablock;  
        end if;  
            
        perform 1 from pg_stat_activity where wait_event='SyncRep' and application_name=app_prefix_stat and clock_timestamp()-query_start > tmout limit 1;  
        -- 降级  
        if found then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'timeout';  
          exit ablock;  
        end if;  
            
        perform pg_sleep(0.2);  
      end loop;  
  
      -- 优先模式是什么  
      if prio_commit_mode='async'   
  
      -- 异步  
      then  
        show synchronous_commit into commit_mode;  
        -- 降级  
        if commit_mode in ('on','remote_write','remote_apply')   
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
        end if;  
              
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
          
    end if;  
  
  end if;  
end;  
$$ language plpgsql strict;  

测试

1、当前为同步模式

postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  

2、人为关闭从库,心跳自动将数据库改成异步模式,并通知所有等待中会话。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:39.800829 | 23/9501D5F8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 local  
(1 row)  

3、恢复从库,心跳自动将数据库升级为优先sync模式。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:47.329119 | 23/9501D6E8  
(1 row)  
  
postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  no timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:49:11.991855 | 23/9501E0C8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  

小结

在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。

UDF输入

1、优先模式(同步、异步)

2、同步等待超时时间

当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。

当优先为异步模式时,假设当前为同步配置,自动降级为异步。

当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。

使用技术点:

1、alter system

2、reload conf

3、cancle backend

4、dblink 异步调用

使用心跳实现半同步,大大简化了整个同步、异步模式切换的流程。当然如果内核层面可以实现,配置几个参数,会更加完美。

参考

dblin 异步

《PostgreSQL 数据库心跳(SLA(RPO)指标的时间、WAL SIZE维度计算)》

《PostgreSQL 双节点流复制如何同时保证可用性、可靠性(rpo,rto) - (半同步,自动降级方法实践)》

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
7月前
|
SQL 关系型数据库 分布式数据库
PolarDB在尝试同步DDL任务时出现了问题
PolarDB在尝试同步DDL任务时出现了问题
72 1
|
1月前
|
关系型数据库 MySQL OLAP
PolarDB +AnalyticDB Zero-ETL :免费同步数据到ADB,享受数据流通新体验
Zero-ETL是阿里云瑶池数据库提供的服务,旨在简化传统ETL流程的复杂性和成本,提高数据实时性。降低数据同步成本,允许用户快速在AnalyticDB中对PolarDB数据进行分析,降低了30%的数据接入成本,提升了60%的建仓效率。 Zero-ETL特性包括免费的PolarDB MySQL联邦分析和PolarDB-X元数据自动同步,提供一体化的事务处理和数据分析,并能整合多个数据源。用户只需简单配置即可实现数据同步和实时分析。
|
1月前
|
关系型数据库 分布式数据库 数据库
PolarDB常见问题之PolarDB影响下游的binlogl同步如何解决
PolarDB是阿里云推出的下一代关系型数据库,具有高性能、高可用性和弹性伸缩能力,适用于大规模数据处理场景。本汇总囊括了PolarDB使用中用户可能遭遇的一系列常见问题及解答,旨在为数据库管理员和开发者提供全面的问题指导,确保数据库平稳运行和优化使用体验。
|
11月前
|
SQL 分布式计算 DataWorks
PolarDB同步到maxcompute
PolarDB同步到maxcompute
91 0
|
SQL 存储 关系型数据库
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记
快速学习PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换
733 0
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记
|
SQL 移动开发 关系型数据库
PostgreSQL 执行计划,成本公式解说,代价因子校准,自动跟踪SQL执行计划(三)|学习笔记
快速学习PostgreSQL 执行计划,成本公式解说,代价因子校准,自动跟踪SQL执行计划(三)
548 0
PostgreSQL 执行计划,成本公式解说,代价因子校准,自动跟踪SQL执行计划(三)|学习笔记
|
存储 SQL Oracle
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记
快速学习10 PostgreSQL 表级复制-物化视图篇,支持异地,异构如 Oracle 到 pg 的物化视图
423 0
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记
|
SQL 关系型数据库 MySQL
Flink同步RDS数据到Elasticsearch实践
Flink同步RDS数据到Elasticsearch实践
|
存储 SQL Oracle
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
AnalyticDB PostgreSQL 7.0 新增了存储过程功能的支持,让用户在使用ADB PG时能够更方便高效地开发业务,并能够更好地兼容Oracle等传统数仓的业务。
423 1
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
|
SQL 并行计算 关系型数据库
Citus 分布式 PostgreSQL 集群 - SQL Reference(SQL支持和变通方案)
Citus 分布式 PostgreSQL 集群 - SQL Reference(SQL支持和变通方案)
146 0

相关产品

  • 云原生数据库 PolarDB