使用PostgreSQL_Notify实现多实例缓存同步

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: Parallel与Hierarchy是架构设计的两大法宝,**缓存**是Hierarchy在IO领域的体现。单线程场景下缓存机制的实现可以简单到不可思议,但很难想象成熟的应用会只有一个实例。在使用缓存的同时引入并发,就不得不考虑一个问题:如何保证每个实例的缓存与底层数据副本的数据一致性。 分布式系统受到CAP定理的约束,分区一致性P是一般来说是不允许牺牲的,不可能让两个实例对同样的请求却给出

Parallel与Hierarchy是架构设计的两大法宝,缓存是Hierarchy在IO领域的体现。单线程场景下缓存机制的实现可以简单到不可思议,但很难想象成熟的应用会只有一个实例。在使用缓存的同时引入并发,就不得不考虑一个问题:如何保证每个实例的缓存与底层数据副本的数据一致性。

分布式系统受到CAP定理的约束,分区一致性P是一般来说是不允许牺牲的,不可能让两个实例对同样的请求却给出不同的结果。用缓存是为了更好的性能,所以如果还要追求可用性A,就一定会牺牲C。我们能做的,就是通过巧妙设计让AP系统的一致性损失最小化。

传统方法

最简单粗暴的办法就是定时重新拉取,例如每个整点,所有应用一起去数据库拉取一次最新版本的数据。很多应用都是这么做的。当然问题也很多:拉的间隔长了,变更不能及时应用,用户体验差;拉的频繁了,IO压力大。而且实例数目和数据大小一旦膨胀起来,对于宝贵的IO资源是很大的浪费。

异步通知是一种更好的办法,尤其是在读请求远多于写请求的情况下。接受到写请求的实例,通过发送广播的方式通知其他实例。RedisPubSub就可以很好地实现这个功能。如果原本下层存储就是Redis自然是再方便不过,但如果下层存储是关系型数据库的话,为这样一个功能引入一个新的组件似乎有些得不偿失。况且考虑到后台管理程序或者其他应用如果在修改了数据库后也要去redis发布通知,实在太麻烦了。一种可行的办法是通过数据库中间件来监听RDS变动并广播通知,淘宝不少东西就是这么做的。但如果DB本身就能搞定的事情,为什么要加一个中间件呢?通过PostgreSQL的Notfiy-Listen机制,可以方便地实现这种功能。

目标

无论从任何渠道产生的数据库记录变更(增删改)都能被所有相关应用实时感知,用于维护自身缓存与数据库内容的一致性。

原理

PostgreSQL行级触发器 + Notify机制 + 自定义协议 + Smart Client

  • 行级触发器:通过为我们感兴趣的表建立一个行级别的写触发器,对数据表中的每一行记录的Update,Delete,Insert都会出发自定义函数的执行。
  • Notify:通过PostgreSQL内建的异步通知机制向指定的Channel发送通知
  • 自定义协议:协商消息格式,传递操作的类型与变更记录的标识
  • Smart Client:客户端监听消息变更,根据消息对缓存执行相应的操作。

实际上这样一套东西就是一个超简易的WAL(Write After Log)实现,从而使应用内部的缓存状态能与数据库保持实时一致(compare to poll)。

实现

DDL

这里以一个最简单的表作为示例,一张以主键标识的users表。

-- 用户表
CREATE TABLE users (
  id   TEXT,
  name TEXT,
  PRIMARY KEY (id)
);

触发器

-- 通知触发器
CREATE OR REPLACE FUNCTION notify_change() RETURNS TRIGGER AS 
$$

BEGIN
  IF    (TG_OP = 'INSERT') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'I' || NEW.id); RETURN NEW;
  ELSIF (TG_OP = 'UPDATE') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'U' || NEW.id); RETURN NEW;
  ELSIF (TG_OP = 'DELETE') THEN 
    PERFORM pg_notify(TG_RELNAME || '_chan', 'D' || OLD.id); RETURN OLD;
  END IF;
END; 
$$
 LANGUAGE plpgsql SECURITY DEFINER;

这里创建了一个触发器函数,通过内置变量TG_OP获取操作的名称,TG_RELNAME获取表名。每当触发器执行时,它会向名为<table_name>_chan的通道发送指定格式的消息:[I|U|D]<id>

题外话:通过行级触发器,还可以实现一些很实用的功能,例如In-DB Audit,自动更新字段值,统计信息,自定义备份策略与回滚逻辑等。

-- 为用户表创建行级触发器,监听INSERT UPDATE DELETE 操作。
CREATE TRIGGER t_user_notify AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE PROCEDURE notify_change();

创建触发器也很简单,表级触发器对每次表变更执行一次,而行级触发器对每条记录都会执行一次。这样,数据库的里的工作就算全部完成了。

消息格式

通知需要传达出两个信息:变更的操作类型,变更的实体标记。

  • 变更的操作类型就是增删改:INSERT,DELETE,UPDATE。通过一个打头的字符'[I|U|D]'就可以标识。
  • 变更的对象可以通过实体主键来标识。如果不是字符串类型,还需要确定一种无歧义的序列化方式。

这里为了省事直接使用字符串类型作为ID,那么插入一条id=1的记录,对应的消息就是I1,更新一条id=5的记录消息就是U5,删除id=3的记录消息就是D3

完全可以通过更复杂的消息协议实现更强大的功能。

SmartClient

数据库的机制需要客户端的配合才能生效,客户端需要监听数据库的变更通知,才能将变更实时应用到自己的缓存副本中。对于插入和更新,客户端需要根据ID重新拉取相应实体,对于删除,客户端需要删除自己缓存副本的相应实体。以Go语言为例,编写了一个简单的客户端模块。

本例中使用一个以User.ID作为键,User对象作为值的并发安全字典Users sync.Map作为缓存。

作为演示,启动了另一个goroutine对数据库写入了一些变更。

package main

import "sync"
import "strings"
import "github.com/go-pg/pg"
import . "github.com/Vonng/gopher/db/pg"
import log "github.com/Sirupsen/logrus"

type User struct {
    ID   string `sql:",pk"`
    Name string
}

// Users 内部数据缓存
var Users sync.Map 

// 辅助函数:加载全部用户,初始化时使用
func LoadAllUser() {
    var users []User
    Pg.Query(&users, `SELECT ID,name FROM users;`)
    for _, user := range users {
        Users.Store(user.ID, user)
    }
}

// 辅助函数:根据ID重载单个用户,当插入和更新时执行
func LoadUser(id string) {
    user := User{ID: id}
    Pg.Select(&user)
    Users.Store(user.ID, user)
}

// 打印缓存内部的Key列表
func PrintUsers() string {
    var buf []string
    Users.Range(func(key, value interface{}) bool {
        buf = append(buf, key.(string));
        return true
    })
    return strings.Join(buf, ",")
}

// ListenUserChange 会监听PostgreSQL users数据表中的变动通知,并维护缓存状态
func ListenUserChange() {
    go func(c <-chan *pg.Notification) {
        for notify := range c {
            action, id := notify.Payload[0], notify.Payload[1:]
            switch action {
            case 'I': fallthrough
            case 'U': LoadUser(id);
            case 'D': Users.Delete(id)
            }
            log.Infof("[NOTIFY] Action:%c ID:%s Users: %s", action, id, PrintUsers())
        }
    }(Pg.Listen("users_chan").Channel())
}

// MakeSomeChange 会向数据库写入一些变更
func MakeSomeChange() {
    Pg.Exec(`TRUNCATE TABLE users;`)
    Pg.Insert(&User{"001", "张三"})
    Pg.Insert(&User{"002", "李四"})
    Pg.Insert(&User{"003", "王五"})  // 插入
    Pg.Update(&User{"003", "王麻子"}) // 改名
    Pg.Delete(&User{ID: "002"})    // 删除
}

func main() {
    LoadAllUser()
    ListenUserChange()
    go MakeSomeChange()
    <-make(chan struct{})
}

运行结果如下:

[NOTIFY] Action:I ID:001 Users: 001          
[NOTIFY] Action:I ID:002 Users: 001,002      
[NOTIFY] Action:I ID:003 Users: 002,003,001  
[NOTIFY] Action:U ID:003 Users: 001,002,003  
[NOTIFY] Action:D ID:002 Users: 001,003      

可以看出,缓存确是与数据库保持了同样的状态。

应用场景

读远大于写的场景。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
6月前
|
缓存
详解CentOS8更换yum源后出现同步仓库缓存失败的问题
详解CentOS8更换yum源后出现同步仓库缓存失败的问题
361 0
|
1月前
|
缓存 JavaScript 搜索推荐
vue中的一个内置组件Keep-Alive的作用及使用方法介绍——缓存不活动的组件实例
vue中的一个内置组件Keep-Alive的作用及使用方法介绍——缓存不活动的组件实例
97 1
|
2月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
3月前
|
监控 物联网 关系型数据库
使用PostgreSQL触发器解决物联网设备状态同步问题
在物联网监控系统中,确保设备状态(如在线与离线)的实时性和准确性至关重要。当设备状态因外部因素改变时,需迅速反映到系统内部。因设备状态数据分布在不同表中,直接通过应用同步可能引入复杂性和错误。采用PostgreSQL触发器自动同步状态变化是一种高效方法。首先定义触发函数,在设备状态改变时更新管理模块表;然后创建触发器,在状态字段更新后执行此函数。此外,还需进行充分测试、监控性能并实施优化,以及在触发函数中加入错误处理和日志记录功能。这种方法不仅提高自动化程度,增强数据一致性与实时性,还需注意其对性能的影响并采取优化措施。
|
3月前
|
缓存 Java
Java本地高性能缓存实践问题之创建一个AsyncCache实例的问题如何解决
Java本地高性能缓存实践问题之创建一个AsyncCache实例的问题如何解决
|
3月前
|
存储 缓存 监控
Java本地高性能缓存实践问题之在EncacheTest示例中正确移除一个缓存实例的问题如何解决
Java本地高性能缓存实践问题之在EncacheTest示例中正确移除一个缓存实例的问题如何解决
|
6月前
|
存储 缓存 NoSQL
NoSQL缓存数据库的使用场景实例和命令速查表
【5月更文挑战第8天】Redis 是一个内存数据结构服务,用 C 编写,支持五种数据结构,不仅限于键值对。它用于缓存、消息队列、订阅/发布系统等,提供持久化、主从复制和集群支持。了解其核心数据结构和应用场景是有效利用 Redis 的关键。
119 3
NoSQL缓存数据库的使用场景实例和命令速查表
|
5月前
|
缓存 分布式计算 关系型数据库
数据管理DMS操作报错合集之当进行RDS实例的可用区迁移时,提示“缓存清理”是什么意思
数据管理DMS(Data Management Service)是阿里云提供的数据库管理和运维服务,它支持多种数据库类型,包括RDS、PolarDB、MongoDB等。在使用DMS进行数据库操作时,可能会遇到各种报错情况。以下是一些常见的DMS操作报错及其可能的原因与解决措施的合集。
100 3
|
4月前
|
缓存 JavaScript API
vue3 生命周期(生命周期钩子 vs 生命周期选项 vs 缓存实例的生命周期)
vue3 生命周期(生命周期钩子 vs 生命周期选项 vs 缓存实例的生命周期)
100 0
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何使用PostgreSQL2.4.1从指定时间戳同步数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。