PostgreSQL 如何让心跳永远不死,支持半同步自动同步、异步升降级 - udf 心跳

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 标签 PostgreSQL , 同步 , 半同步 , 流复制 , 心跳 , 自动降级 , 自动升级 , dblink , 异步调用 背景 在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。 UDF输入 1、优先模式(同步、异步) 2、同步等待超时时间 当优先为同步模式时,假设当前为同步配置,如果备库异常导致

标签

PostgreSQL , 同步 , 半同步 , 流复制 , 心跳 , 自动降级 , 自动升级 , dblink , 异步调用


背景

在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。

UDF输入

1、优先模式(同步、异步)

2、同步等待超时时间

当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。

当优先为异步模式时,假设当前为同步配置,自动降级为异步。

当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。

使用技术点:

1、alter system

2、reload conf

3、cancle backend

4、dblink 异步调用

心跳UDF逻辑

判断当前实例状态  
  
  只读  
  
    退出  
  
  读写  
  
    判断当前事务模式   
  
      异步  
  
        发心跳  
  
        优先模式是什么  
  
          异步  
  
            退出  
  
          同步  
  
            判断是否需要升级  
  
              升级  
  
              退出  
  
  
      同步  
  
        消耗异步消息  
  
        发远程心跳  
  
        查询是否超时  
  
          降级  
  
        否则  
  
          消耗异步消息  
  
        优先模式是什么  
  
        异步  
  
          降级  
  
          退出  
  
        同步  
  
          退出  

设计

1、当前postgresql.conf配置

synchronous_commit='remote_write';  
synchronous_standby_names='*';  

表示同步模式。

2、心跳表设计

create table t_keepalive(id int primary key, ts timestamp, pos pg_lsn);  

3、心跳写入方法

insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos;  

4、创建一个建立连接函数,不报错

create or replace function conn(        
  name,   -- dblink名字        
  text    -- 连接串,URL        
) returns void as $$          
declare          
begin          
  perform dblink_connect($1, $2);         
  return;          
exception when others then          
  return;          
end;          
$$ language plpgsql strict;      

5、更加以上逻辑创建心跳UDF。

create or replace function keepalive (  
  prio_commit_mode text,    
  tmout interval  
) returns t_keepalive as $$  
declare  
  res1 int;  
  res2 timestamp;  
  res3 pg_lsn;  
  commit_mode text;  
  conn text := format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database());  
  conn_altersys text := format('hostaddr=%s port=%s user=%s dbname=%s', '127.0.0.1', current_setting('port'), current_user, current_database());  
  app_prefix_stat text := 'keepalive_dblink';  
begin  
  if prio_commit_mode not in ('sync','async') then  
    raise notice 'prio_commit_mode must be [sync|async]';  
    return null;  
  end if;  
  
  show synchronous_commit into commit_mode;  
  
  create extension IF NOT EXISTS dblink;  
  
  -- 判断当前实例状态  
  if pg_is_in_recovery()   
  
  -- 只读  
  then  
    raise notice 'Current instance in recovery mode.';  
    return null;  
      
  -- 读写  
  else  
  
    -- 判断当前事务模式   
    if commit_mode in ('local','off')  
  
    -- 异步  
    then  
  
      -- 发心跳  
      insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos into res1,res2,res3;  
  
      -- 优先模式是什么  
      if prio_commit_mode='async'   
  
      -- 异步  
      then  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 判断是否需要升级  
        perform 1 from pg_stat_replication where state='streaming' limit 1;  
        if found  
  
        -- 升级  
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=remote_write', true);   
          perform pg_reload_conf();   
  
          -- 退出  
          return row(res1,res2,res3)::t_keepalive;  
        end if;  
  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
  
  
    -- 同步  
    else  
  
      -- 消耗异步消息  
      perform conn(app_prefix_stat,  conn||app_prefix_stat);     
      perform t from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
  
      -- 发远程心跳  
      perform dblink_send_query(app_prefix_stat, $_$ insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos $_$);    
  
      -- 查询是否超时  
      <<ablock>>  
      loop  
        perform pg_sleep(0.2);  
  
        perform 1 from pg_stat_activity where application_name=app_prefix_stat and state='idle' limit 1;  
        -- 未超时  
        if found then  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'no timeout';  
          exit ablock;  
        end if;  
            
        perform 1 from pg_stat_activity where wait_event='SyncRep' and application_name=app_prefix_stat and clock_timestamp()-query_start > tmout limit 1;  
        -- 降级  
        if found then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'timeout';  
          exit ablock;  
        end if;  
            
        perform pg_sleep(0.2);  
      end loop;  
  
      -- 优先模式是什么  
      if prio_commit_mode='async'   
  
      -- 异步  
      then  
        show synchronous_commit into commit_mode;  
        -- 降级  
        if commit_mode in ('on','remote_write','remote_apply')   
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
        end if;  
              
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
          
    end if;  
  
  end if;  
end;  
$$ language plpgsql strict;  

测试

1、当前为同步模式

postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  

2、人为关闭从库,心跳自动将数据库改成异步模式,并通知所有等待中会话。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:39.800829 | 23/9501D5F8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 local  
(1 row)  

3、恢复从库,心跳自动将数据库升级为优先sync模式。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:47.329119 | 23/9501D6E8  
(1 row)  
  
postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  no timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:49:11.991855 | 23/9501E0C8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  

小结

在心跳时,通过自定义UDF,实现心跳永远不被堵塞,并且支持更加当前的配置自动的进行同步、异步模式的升降级。实现半同步的功能。

UDF输入

1、优先模式(同步、异步)

2、同步等待超时时间

当优先为同步模式时,假设当前为同步配置,如果备库异常导致事务提交等待超过指定时间,则自动降级为异步。

当优先为异步模式时,假设当前为同步配置,自动降级为异步。

当优先为同步模式时,假设当前为异步配置,如果备库恢复到streaming模式,自动升级为同步。

使用技术点:

1、alter system

2、reload conf

3、cancle backend

4、dblink 异步调用

使用心跳实现半同步,大大简化了整个同步、异步模式切换的流程。当然如果内核层面可以实现,配置几个参数,会更加完美。

参考

dblin 异步

《PostgreSQL 数据库心跳(SLA(RPO)指标的时间、WAL SIZE维度计算)》

《PostgreSQL 双节点流复制如何同时保证可用性、可靠性(rpo,rto) - (半同步,自动降级方法实践)》

 

免费领取阿里云RDS PostgreSQL实例、ECS虚拟机

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
3月前
|
监控 物联网 关系型数据库
使用PostgreSQL触发器解决物联网设备状态同步问题
在物联网监控系统中,确保设备状态(如在线与离线)的实时性和准确性至关重要。当设备状态因外部因素改变时,需迅速反映到系统内部。因设备状态数据分布在不同表中,直接通过应用同步可能引入复杂性和错误。采用PostgreSQL触发器自动同步状态变化是一种高效方法。首先定义触发函数,在设备状态改变时更新管理模块表;然后创建触发器,在状态字段更新后执行此函数。此外,还需进行充分测试、监控性能并实施优化,以及在触发函数中加入错误处理和日志记录功能。这种方法不仅提高自动化程度,增强数据一致性与实时性,还需注意其对性能的影响并采取优化措施。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何使用PostgreSQL2.4.1从指定时间戳同步数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 数据库
实时计算 Flink版产品使用合集之同步PostgreSQL数据时,WAL 日志无限增长,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在使用 DataWorks 数据集成同步 PostgreSQL 数据库中的 Geometry 类型数据如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
86 0
|
6月前
|
分布式计算 关系型数据库 大数据
MaxCompute产品使用合集之怎么才可以将 PostgreSQL 中的 geometry 空间类型字段同步到 MaxCompute 或另一个 PostgreSQL 数据库
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
SQL 关系型数据库 MySQL
Flink同步RDS数据到Elasticsearch实践
Flink同步RDS数据到Elasticsearch实践
|
SQL 移动开发 关系型数据库
PostgreSQL 执行计划,成本公式解说,代价因子校准,自动跟踪SQL执行计划(三)|学习笔记
快速学习PostgreSQL 执行计划,成本公式解说,代价因子校准,自动跟踪SQL执行计划(三)
PostgreSQL 执行计划,成本公式解说,代价因子校准,自动跟踪SQL执行计划(三)|学习笔记
|
存储 SQL Oracle
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
AnalyticDB PostgreSQL 7.0 新增了存储过程功能的支持,让用户在使用ADB PG时能够更方便高效地开发业务,并能够更好地兼容Oracle等传统数仓的业务。
494 1
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
|
SQL 存储 关系型数据库
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记
快速学习PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记
|
存储 SQL Oracle
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记
快速学习10 PostgreSQL 表级复制-物化视图篇,支持异地,异构如 Oracle 到 pg 的物化视图
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记