使用PostgreSQL_Notify实现多实例缓存同步

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,企业版 4核16GB
推荐场景:
HTAP混合负载
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: Parallel与Hierarchy是架构设计的两大法宝,**缓存**是Hierarchy在IO领域的体现。单线程场景下缓存机制的实现可以简单到不可思议,但很难想象成熟的应用会只有一个实例。在使用缓存的同时引入并发,就不得不考虑一个问题:如何保证每个实例的缓存与底层数据副本的数据一致性。 分布式系统受到CAP定理的约束,分区一致性P是一般来说是不允许牺牲的,不可能让两个实例对同样的请求却给出

Parallel与Hierarchy是架构设计的两大法宝,缓存是Hierarchy在IO领域的体现。单线程场景下缓存机制的实现可以简单到不可思议,但很难想象成熟的应用会只有一个实例。在使用缓存的同时引入并发,就不得不考虑一个问题:如何保证每个实例的缓存与底层数据副本的数据一致性。

分布式系统受到CAP定理的约束,分区一致性P是一般来说是不允许牺牲的,不可能让两个实例对同样的请求却给出不同的结果。用缓存是为了更好的性能,所以如果还要追求可用性A,就一定会牺牲C。我们能做的,就是通过巧妙设计让AP系统的一致性损失最小化。

传统方法

最简单粗暴的办法就是定时重新拉取,例如每个整点,所有应用一起去数据库拉取一次最新版本的数据。很多应用都是这么做的。当然问题也很多:拉的间隔长了,变更不能及时应用,用户体验差;拉的频繁了,IO压力大。而且实例数目和数据大小一旦膨胀起来,对于宝贵的IO资源是很大的浪费。

异步通知是一种更好的办法,尤其是在读请求远多于写请求的情况下。接受到写请求的实例,通过发送广播的方式通知其他实例。RedisPubSub就可以很好地实现这个功能。如果原本下层存储就是Redis自然是再方便不过,但如果下层存储是关系型数据库的话,为这样一个功能引入一个新的组件似乎有些得不偿失。况且考虑到后台管理程序或者其他应用如果在修改了数据库后也要去redis发布通知,实在太麻烦了。一种可行的办法是通过数据库中间件来监听RDS变动并广播通知,淘宝不少东西就是这么做的。但如果DB本身就能搞定的事情,为什么要加一个中间件呢?通过PostgreSQL的Notfiy-Listen机制,可以方便地实现这种功能。

目标

无论从任何渠道产生的数据库记录变更(增删改)都能被所有相关应用实时感知,用于维护自身缓存与数据库内容的一致性。

原理

PostgreSQL行级触发器 + Notify机制 + 自定义协议 + Smart Client

  • 行级触发器:通过为我们感兴趣的表建立一个行级别的写触发器,对数据表中的每一行记录的Update,Delete,Insert都会出发自定义函数的执行。
  • Notify:通过PostgreSQL内建的异步通知机制向指定的Channel发送通知
  • 自定义协议:协商消息格式,传递操作的类型与变更记录的标识
  • Smart Client:客户端监听消息变更,根据消息对缓存执行相应的操作。

实际上这样一套东西就是一个超简易的WAL(Write After Log)实现,从而使应用内部的缓存状态能与数据库保持实时一致(compare to poll)。

实现

DDL

这里以一个最简单的表作为示例,一张以主键标识的users表。

-- 用户表
CREATE TABLE users (
  id   TEXT,
  name TEXT,
  PRIMARY KEY (id)
);

触发器

-- 通知触发器
CREATE OR REPLACE FUNCTION notify_change() RETURNS TRIGGER AS 
$$

BEGIN
  IF    (TG_OP = 'INSERT') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'I' || NEW.id); RETURN NEW;
  ELSIF (TG_OP = 'UPDATE') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'U' || NEW.id); RETURN NEW;
  ELSIF (TG_OP = 'DELETE') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'D' || OLD.id); RETURN OLD;
  END IF;
END; 
$$
 LANGUAGE plpgsql SECURITY DEFINER;

这里创建了一个触发器函数,通过内置变量TG_OP获取操作的名称,TG_RELNAME获取表名。每当触发器执行时,它会向名为<table_name>_chan的通道发送指定格式的消息:[I|U|D]<id>

题外话:通过行级触发器,还可以实现一些很实用的功能,例如In-DB Audit,自动更新字段值,统计信息,自定义备份策略与回滚逻辑等。

-- 为用户表创建行级触发器,监听INSERT UPDATE DELETE 操作。
CREATE TRIGGER t_user_notify AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE PROCEDURE notify_change();

创建触发器也很简单,表级触发器对每次表变更执行一次,而行级触发器对每条记录都会执行一次。这样,数据库的里的工作就算全部完成了。

消息格式

通知需要传达出两个信息:变更的操作类型,变更的实体标记。

  • 变更的操作类型就是增删改:INSERT,DELETE,UPDATE。通过一个打头的字符'[I|U|D]'就可以标识。
  • 变更的对象可以通过实体主键来标识。如果不是字符串类型,还需要确定一种无歧义的序列化方式。

这里为了省事直接使用字符串类型作为ID,那么插入一条id=1的记录,对应的消息就是I1,更新一条id=5的记录消息就是U5,删除id=3的记录消息就是D3

完全可以通过更复杂的消息协议实现更强大的功能。

SmartClient

数据库的机制需要客户端的配合才能生效,客户端需要监听数据库的变更通知,才能将变更实时应用到自己的缓存副本中。对于插入和更新,客户端需要根据ID重新拉取相应实体,对于删除,客户端需要删除自己缓存副本的相应实体。以Go语言为例,编写了一个简单的客户端模块。

本例中使用一个以User.ID作为键,User对象作为值的并发安全字典Users sync.Map作为缓存。

作为演示,启动了另一个goroutine对数据库写入了一些变更。

package main

import "sync"
import "strings"
import "github.com/go-pg/pg"
import . "github.com/Vonng/gopher/db/pg"
import log "github.com/Sirupsen/logrus"

type User struct {
    ID   string `sql:",pk"`
    Name string
}

// Users 内部数据缓存
var Users sync.Map 

// 辅助函数:加载全部用户,初始化时使用
func LoadAllUser() {
    var users []User
    Pg.Query(&users, `SELECT ID,name FROM users;`)
    for _, user := range users {
        Users.Store(user.ID, user)
    }
}

// 辅助函数:根据ID重载单个用户,当插入和更新时执行
func LoadUser(id string) {
    user := User{ID: id}
    Pg.Select(&user)
    Users.Store(user.ID, user)
}

// 打印缓存内部的Key列表
func PrintUsers() string {
    var buf []string
    Users.Range(func(key, value interface{}) bool {
        buf = append(buf, key.(string));
        return true
    })
    return strings.Join(buf, ",")
}

// ListenUserChange 会监听PostgreSQL users数据表中的变动通知,并维护缓存状态
func ListenUserChange() {
    go func(c <-chan *pg.Notification) {
        for notify := range c {
            action, id := notify.Payload[0], notify.Payload[1:]
            switch action {
            case 'I': fallthrough
            case 'U': LoadUser(id);
            case 'D': Users.Delete(id)
            }
            log.Infof("[NOTIFY] Action:%c ID:%s Users: %s", action, id, PrintUsers())
        }
    }(Pg.Listen("users_chan").Channel())
}

// MakeSomeChange 会向数据库写入一些变更
func MakeSomeChange() {
    Pg.Exec(`TRUNCATE TABLE users;`)
    Pg.Insert(&User{"001", "张三"})
    Pg.Insert(&User{"002", "李四"})
    Pg.Insert(&User{"003", "王五"})  // 插入
    Pg.Update(&User{"003", "王麻子"}) // 改名
    Pg.Delete(&User{ID: "002"})    // 删除
}

func main() {
    LoadAllUser()
    ListenUserChange()
    go MakeSomeChange()
    <-make(chan struct{})
}

运行结果如下:

[NOTIFY] Action:I ID:001 Users: 001          
[NOTIFY] Action:I ID:002 Users: 001,002      
[NOTIFY] Action:I ID:003 Users: 002,003,001  
[NOTIFY] Action:U ID:003 Users: 001,002,003  
[NOTIFY] Action:D ID:002 Users: 001,003      

可以看出,缓存确是与数据库保持了同样的状态。

应用场景

读远大于写的场景。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
19天前
|
关系型数据库 MySQL 分布式数据库
PolarDB产品使用问题之如何将实例关联到本地的数据库
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
19天前
|
消息中间件 关系型数据库 分布式数据库
PolarDB产品使用问题之rockermq后增加一个broker节点,topic该如何同步
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
19天前
|
消息中间件 存储 关系型数据库
PolarDB产品使用问题之查询归档后的表,是否会占用当前实例负载资源
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
19天前
|
运维 关系型数据库 MySQL
PolarDB产品使用问题之怎么把将客户端所在的网络和实例配置到同一环境去
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
10天前
|
缓存 JavaScript API
vue3 生命周期(生命周期钩子 vs 生命周期选项 vs 缓存实例的生命周期)
vue3 生命周期(生命周期钩子 vs 生命周期选项 vs 缓存实例的生命周期)
12 0
|
1月前
|
缓存 分布式计算 关系型数据库
数据管理DMS操作报错合集之当进行RDS实例的可用区迁移时,提示“缓存清理”是什么意思
数据管理DMS(Data Management Service)是阿里云提供的数据库管理和运维服务,它支持多种数据库类型,包括RDS、PolarDB、MongoDB等。在使用DMS进行数据库操作时,可能会遇到各种报错情况。以下是一些常见的DMS操作报错及其可能的原因与解决措施的合集。
|
2月前
|
存储 缓存 NoSQL
NoSQL缓存数据库的使用场景实例和命令速查表
【5月更文挑战第8天】Redis 是一个内存数据结构服务,用 C 编写,支持五种数据结构,不仅限于键值对。它用于缓存、消息队列、订阅/发布系统等,提供持久化、主从复制和集群支持。了解其核心数据结构和应用场景是有效利用 Redis 的关键。
84 3
NoSQL缓存数据库的使用场景实例和命令速查表
|
1月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
17天前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之当使用DTS(数据传输服务)同步的表在目标库中进行LEFT JOIN查询时遇到异常,是什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
|
17天前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之源实例无主键表校验出现报错,该怎么办
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。