PostgreSQL 如何实现upsert与新旧数据自动分离

本文涉及的产品
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介: 很多业务也行有这样的需求,新的数据会不断的插入,并且可能会有更新。对于更新的数据,需要记录更新前的记录到历史表。 这个需求有点类似于审计需求,即需要对记录变更前后做审计。我以前有写过使用hstore和触发器来满足审计需求的文档,有兴趣的同学可以参考http://blog.163.com/digoa

很多业务也行有这样的需求,新的数据会不断的插入,并且可能会有更新。
对于更新的数据,需要记录更新前的记录到历史表。
1
这个需求有点类似于审计需求,即需要对记录变更前后做审计。
我以前有写过使用hstore和触发器来满足审计需求的文档,有兴趣的同学可以参考
http://blog.163.com/digoal@126/blog/static/163877040201252575529358/
本文的目的并不是审计,而且也可能不期望使用触发器。
还有什么方法呢?
PostgreSQL 这么高大上,当然有,而且还能在一句SQL里面完成,看法宝。


创建一张当前状态表,一张历史记录表。

postgres=# create table tbl(id int primary key, price int);
CREATE TABLE
postgres=# create table tbl_history (id int not null, price int);
CREATE TABLE



插入一条不存在的记录,不会触发插入历史表的行为。
注意替代变量

id=$1 = 2
price=$2 = 7

postgres=# with old as (select * from tbl where id= $1), 
postgres-# new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *) 
postgres-# insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 0

postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
 tableoid | ctid  | id | price 
----------+-------+----+-------
    18243 | (0,1) |  2 |     7
(1 row)



插入一条不存在的记录,不会触发插入历史表的行为。

id=$1 = 1
price=$2 = 1

postgres=# with old as (select * from tbl where id= $1), 
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *) 
insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 0
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
 tableoid | ctid  | id | price 
----------+-------+----+-------
    18243 | (0,1) |  2 |     7
    18243 | (0,2) |  1 |     1
(2 rows)



插入一条已存在的记录,并且有数据的变更,触发数据插入历史表的行为。

id=$1 = 1
price=$2 = 2

postgres=# with old as (select * from tbl where id= $1), 
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *) 
insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 1
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
 tableoid | ctid  | id | price 
----------+-------+----+-------
    18243 | (0,1) |  2 |     7
    18243 | (0,3) |  1 |     2
    18251 | (0,1) |  1 |     1
(3 rows)



插入一条已存在的记录,并且已存在的记录值和老值一样,不会触发将数据插入历史表的行为。

id=$1 = 1
price=$2 = 2

postgres=# with old as (select * from tbl where id= $1), 
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *) 
insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 0
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
 tableoid | ctid  | id | price 
----------+-------+----+-------
    18243 | (0,1) |  2 |     7
    18243 | (0,3) |  1 |     2
    18251 | (0,1) |  1 |     1
(3 rows)



执行计划

postgres=# explain with old as (select * from tbl where id= $1), 
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *) 
insert into tbl_history select old.* from old,new where old.id=new.id;
                                 QUERY PLAN                                 
----------------------------------------------------------------------------
 Insert on tbl_history  (cost=2.17..2.23 rows=1 width=8)
   CTE old
     ->  Index Scan using tbl_pkey on tbl  (cost=0.14..2.16 rows=1 width=8)
           Index Cond: (id = 1)
   CTE new
     ->  Insert on tbl tbl_1  (cost=0.00..0.01 rows=1 width=8)
           Conflict Resolution: UPDATE
           Conflict Arbiter Indexes: tbl_pkey
           Conflict Filter: (tbl_1.price <> excluded.price)
           ->  Result  (cost=0.00..0.01 rows=1 width=8)
   ->  Nested Loop  (cost=0.00..0.05 rows=1 width=8)
         Join Filter: (old.id = new.id)
         ->  CTE Scan on old  (cost=0.00..0.02 rows=1 width=8)
         ->  CTE Scan on new  (cost=0.00..0.02 rows=1 width=4)
(14 rows)



在不支持insert on conflict语法的PostgreSQL中(小于9.5的版本),SQL可以调整为:

id=$1 = 1
price=$2 = 2

with new as (update tbl set price=$2 where id=$1 and price<>$2) 
  insert into tbl select $1, $2 where not exists (select 1 from tbl where id=$1);

更多upset参考
https://yq.aliyun.com/articles/36103



小于9.5的版本,实现本文的场景,需要这样写。

id=$1 = 1
price=$2 = 2

with 
old as (select * from tbl where id=$1),
new_upd as (update tbl set price=$2 where id=$1 and price<>$2 returning *),
new_ins as (insert into tbl select $1, $2 where not exists (select 1 from tbl where id=$1) returning *)
insert into tbl_history 
select old.* from old left outer join new_upd on (old.id=new_upd.id) where new_upd.* is not null;
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
5月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错之往GREENPLUM 6 写数据,用postgresql-42.2.9.jar 报 ON CONFLICT (uuid) DO UPDATE SET 语法有问题。怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
关系型数据库 PostgreSQL
PostgreSQL排序字段不唯一导致分页查询结果出现重复数据
PostgreSQL排序字段不唯一导致分页查询结果出现重复数据
113 0
|
关系型数据库 MySQL Linux
TiDB实时同步数据到PostgreSQL(三) ---- 使用pgloader迁移数据
使用PostgreSQL数据迁移神器pgloader从TiDB迁移数据到PostgreSQL,同时说明如何在最新的Rocky Linux 9(CentOS 9 stream也适用)上通过源码编译安装pgloader。
|
4月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
999 0
|
4月前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
运维 Cloud Native 关系型数据库
云原生数据仓库产品使用合集之原生数据仓库AnalyticDB PostgreSQL版如果是列存表的话, adb支持通过根据某个字段做upsert吗
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
2月前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
228 0
|
2月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
开发框架 关系型数据库 数据库
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。
在 PostgreSQL 中,解决图片二进制数据,由于bytea_output参数问题导致显示不正常的问题。
|
4月前
|
关系型数据库 5G PostgreSQL
postgreSQL 导出数据、导入
postgreSQL 导出数据、导入
49 1

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版