PostgreSQL, SQL Server 逻辑增量 (通过逻辑标记update,delete) 同步到 Greenplum, PostgreSQL

本文涉及的产品
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介:

标签

PostgreSQL , Greenplum , trigger , rule , 逻辑更新 , 逻辑删除 , 增量复制


背景

异构数据的增量同步是比较繁琐的事情,需要考虑很多事情,比如:

1、同步延迟

2、DDL的同步

3、同步时对上游性能的影响

4、上下游数据一致性

5、上游事务原子性在目标端是否能保证原子性

6、上下游数据类型兼容性

7、上下游字符集一致性

8、同步时对下游性能的影响

9、可以同步哪些操作(INSERT, UPDATE, DELETE, TRUNCATE, DDL)

10、同步操作的幂等性

11、同步的效率

12、下游的回放速度

13、是否支持批量操作

通常有一些比较专业的同步软件,比如cdc, goldengate, kettle等。

又比如阿里云开源的rds_dbsync,又比如阿里云的服务datax,又比如PostgreSQL内置的逻辑订阅功能,又比如PostgreSQL内置的FDW功能。

等等:

《debezium - 数据实时捕获和传输管道(CDC)》

《ETL for Oracle to Greenplum (bulk) - Pentaho Data Integrator (PDI, kettle)》

《ETL for Oracle to PostgreSQL 3 - DATAX》

《ETL for Oracle to PostgreSQL 2 - Pentaho Data Integrator (PDI, kettle)》

《ETL for Oracle to PostgreSQL 1 - Oracle Data Integrator (ODI)》

《MySQL准实时同步到PostgreSQL, Greenplum的方案之一 - rds_dbsync》

《MySQL,Oracle,SQL Server等准实时同步到PostgreSQL的方案之一 - FDW外部访问接口》

《[未完待续] MySQL Oracle PostgreSQL PPAS Greenplum 的异构迁移和同步实现和场景介绍》

《MySQL 增量同步到 PostgreSQL》

《使用Londiste3 增量同步 线下PostgreSQL 到 阿里云RDS PG》

《使用alidecode将RDS PG同步到线下, 或者将MySQL同步到PG》

《PostgreSQL 分区表的逻辑复制(逻辑订阅)》

《PostgreSQL 逻辑订阅 - DDL 订阅 实现方法》

《Greenplum, PostgreSQL 数据实时订阅的几种方式》

《使用PostgreSQL逻辑订阅实现multi-master》

《PostgreSQL 逻辑订阅 - 给业务架构带来了什么希望?》

《PostgreSQL 10.0 preview 逻辑订阅 - 原理与最佳实践》

《GoldenGate - Oracle 实时复制到 PostgreSQL或EnterpriseDB》

越来越多的数据库内置了逻辑订阅的能力(通过解析WAL日志,产生流式的变更行为,在目标端回放)。

本文介绍一下另类的方法,或者说更为传统的方法,所以它适用于几乎所有的数据库产品同步。

要求

1、源端需要对update, delete使用逻辑更新或删除标记和时间戳,可以使用触发器和RULE实现

2、目标端需要具备MERGE INSERT的能力

3、如果目标端没有MERGE能力,则可以通过临时表,使用两步操作来实现MERGE

4、源端和目标端的表,都必须具有PK

一、源PostgreSQL, 目标Greenplum, PostgreSQL, 增量复制delete,insert,update

源端

1、创建源表

create table t_src (  
  id int primary key,   
  info text not null,   
  is_del boolean default null,   -- 删除标记,NULL表示未删除,非空表示已删除  
  mod_time timestamp not null default clock_timestamp()  -- 插入、删除、修改的时间戳  
);  

2、创建索引,加速同步

create index idx_t_src_1 on t_src(mod_time);  

3、创建触发器函数,更新、删除数据时,更新时间戳,同时修改删除标记位

当被删除的记录重新被插入时,把删除标记改成未删除。

create or replace function tg1_t_src() returns trigger as $$  
declare  
begin  
  NEW.mod_time := clock_timestamp();  
  select case when OLD.is_del is null and NEW.is_del = true then true else null end into NEW.is_del;     -- 如果以前这个ID被删除过,则插入,并将is_del重新置为未删除   
  return NEW;  
end;  
$$ language plpgsql strict;   

4、创建触发器,更新时触发

create trigger tg1 before update on t_src for each row execute procedure tg1_t_src();  

5、创建规则,当删除记录时,使用UPDATE代替DELETE

create rule r1 as on delete to t_src do instead update t_src set is_del=true where t_src.id=OLD.id and t_src.is_del is null;     -- 未标记为删除的记录is_del=null,标记为删除.    

6、查看插入、更新、删除是否符合预期

postgres=# insert into t_src values (1, md5(random()::text)) on conflict (id) do update set info=excluded.info;  
INSERT 0 1  
postgres=# select * from t_src where id=1;  
 id |               info               | is_del |          mod_time            
----+----------------------------------+--------+----------------------------  
  1 | 56c21963342997fd8bf80a5b542abde9 |        | 2018-05-12 08:54:19.393532  
(1 row)  
  
postgres=# insert into t_src values (1, md5(random()::text)) on conflict (id) do update set info=excluded.info;  
INSERT 0 1  
postgres=# select * from t_src where id=1;  
 id |               info               | is_del |          mod_time            
----+----------------------------------+--------+----------------------------  
  1 | 5bca407559081d6cfc1154fd0f17b6a9 |        | 2018-05-12 08:54:23.465005  
(1 row)  
  
  
postgres=# delete from t_src where id=1;  
DELETE 0  
postgres=# select * from t_src;  
 id |               info               | is_del |          mod_time            
----+----------------------------------+--------+----------------------------  
  1 | 5bca407559081d6cfc1154fd0f17b6a9 | t      | 2018-05-12 08:54:43.158809  
(1 row)  

7、创建压测脚本

vi test.sql  
  
\set id1 random(1,10000000)  
\set id2 random(1,20000000)  
insert into t_src values (:id1, md5(random()::text)) on conflict (id) do update set info=excluded.info;  
delete from t_src where id=:id2;  

8、压测,高压插入、更新、删除动作,每秒处理13.8万行。

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 120  
  
  
transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 120 s  
number of transactions actually processed: 16634225  
latency average = 0.462 ms  
latency stddev = 0.506 ms  
tps = 138437.925513 (including connections establishing)  
tps = 138445.093708 (excluding connections establishing)  
statement latencies in milliseconds:  
         0.002  \set id1 random(1,10000000)  
         0.000  \set id2 random(1,20000000)  
         0.298  insert into t_src values (:id1, md5(random()::text)) on conflict (id) do update set info=excluded.info;  
         0.163  delete from t_src where id=:id2;  

目标端

1、创建目标表

create table t_dst (  
  id int primary key,   
  info text not null,   
  is_del boolean default null,   
  mod_time timestamp not null default clock_timestamp()  
);  

2、创建索引

create index idx_t_dst_1 on t_dst(mod_time);  

数据同步

1、源端数据增量同步到目标端

(在同一DB中模拟,后面有例子讲源和目标在不同集群的DEMO)

do language plpgsql $$  
declare  
  pos timestamp;           -- 位点  
  pre interval := '10 s';  -- 缓冲10秒,防止空洞,根据业务层设置  
begin  
  -- 已同步位点  
  select max(mod_time) into pos from t_dst;  
    
  -- NULL表示目标端没有数据  
  if pos is null then  
    -- 缓冲上限   
    pos := now()::timestamp;  
    insert into t_dst select * from t_src where mod_time < (pos - pre) on conflict (id) do update set   -- on conflict 合并insert,update  
      info=excluded.info, is_del=excluded.is_del, mod_time=excluded.mod_time  
      where t_dst.info is distinct from excluded.info or  
      t_dst.is_del is distinct from excluded.is_del or  
      t_dst.mod_time is distinct from excluded.mod_time;  
    return;  
  end if;  
  
  -- 同步超过位点的数据  
  insert into t_dst select * from t_src where mod_time > pos and mod_time < (now()::timestamp - pre) on conflict (id) do update set   
    info=excluded.info, is_del=excluded.is_del, mod_time=excluded.mod_time  
    where t_dst.info is distinct from excluded.info or  
    t_dst.is_del is distinct from excluded.is_del or  
    t_dst.mod_time is distinct from excluded.mod_time;  
  return;  
end;  
$$;  

2、一边压测,一边调用以上过程同步,最后达到一致性状态,检查一致性的SQL如下:

select sum(hashtext((t.*)::text)),count(*),sum(case is_del when true then 0 else 1 end) from t_src t;  
  
      sum       |  count  |   sum     
----------------+---------+---------  
 -2967631712018 | 8299587 | 6199359  
(1 row)  
  
  
select sum(hashtext((t.*)::text)),count(*),sum(case is_del when true then 0 else 1 end) from t_dst t;  
  
      sum       |  count  |   sum     
----------------+---------+---------  
 -2967631712018 | 8299587 | 6199359  
(1 row)  

二、源PostgreSQL, 目标Greenplum, 增量复制delete,insert,update

第一种方法,DELETE使用逻辑标记,所以实际上数据并没有删除。

还有一种方法,可以把DELETE的记录,MOVE到另一张表。

方法与一差不多,只是把rule改一下,改成INSERT到其他表。

三、源PostgreSQL, 目标Greenplum, 增量复制insert,update

上游没有del,或者说不需要捕获DEL操作 (DEL操作,人为在上下游同时执行SQL来删除)

源端

1、创建表

create table tbl_src (  
  id int primary key,   
  info text not null,   
  mod_time timestamp not null default clock_timestamp()  -- update时间戳  
);  

2、创建索引

create index idx_tbl_src_1 on tbl_src(mod_time);  

3、创建触发器函数,更新时,自动更新时间戳字段

create or replace function tg1_tbl_src() returns trigger as $$  
declare  
begin  
  NEW.mod_time := clock_timestamp();  
  return NEW;  
end;  
$$ language plpgsql strict;   

4、创建触发器

create trigger tg1 before update on tbl_src for each row execute procedure tg1_tbl_src();  

5、压测

vi test.sql  
\set id1 random(1,10000000)  
insert into tbl_src values (:id1, md5(random()::text)) on conflict (id) do update set info=excluded.info;  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 120  

目标端

1、创建目标表

create table tbl_dst (  
  id int primary key,   
  info text not null,   
  mod_time timestamp not null default clock_timestamp()  
);  

2、创建索引

create index idx_tbl_dst_1 on tbl_dst(mod_time);  

数据同步

在同一个实例中模拟。

1、调用过程,同步数据。

do language plpgsql $$  
declare  
  pos timestamp;  -- 位点  
  pre interval := '10 s';  -- 缓冲10秒,防止空洞,根据业务层设置  
begin  
  select max(mod_time) into pos from tbl_dst;  
  if pos is null then  
    pos := now()::timestamp;  
    insert into tbl_dst select * from tbl_src where mod_time < (pos - pre) on conflict (id) do update set   
      info=excluded.info, mod_time=excluded.mod_time  
      where tbl_dst.info is distinct from excluded.info or  
      tbl_dst.mod_time is distinct from excluded.mod_time;  
    return;  
  end if;  
  
  insert into tbl_dst select * from tbl_src where mod_time > pos and mod_time < (now()::timestamp - pre) on conflict (id) do update set   
    info=excluded.info, mod_time=excluded.mod_time  
    where tbl_dst.info is distinct from excluded.info or  
    tbl_dst.mod_time is distinct from excluded.mod_time;  
  return;  
end;  
$$;  

2、一边压测,一边调用以上过程同步数据,最后检查一致性

select sum(hashtext((t.*)::text)),count(*) from tbl_src t;  
  
      sum      |  count    
---------------+---------  
 2739329060132 | 6725930  
(1 row)  
  
select sum(hashtext((t.*)::text)),count(*) from tbl_dst t;  
  
      sum      |  count    
---------------+---------  
 2739329060132 | 6725930  
(1 row)  

四、源PostgreSQL, 目标Greenplum, 增量复制insert,update

同样,不包括DELETE的复制。只复制insert和update。

由于greenplum 没有 insert into on conflict 的功能,所以需要采用临时表,分步实现MERGE。

同步方法

1. 目标端,HDB PG, 获取max(mod_time)

2. 源端,PG, 拉取增量

3. 目标端,增量数据,导入HDB PG 临时表

4. 目标端,HDB PG ,DELETE from 目标表 using 临时表

5. 目标端,HDB PG ,insert into 目标表 select * from 临时表

6. 目标端,清空临时表

目标端

1、创建临时表

create table tbl_dst_tmp (  
  id int primary key,   
  info text not null,   
  mod_time timestamp not null default clock_timestamp()  
);  

2、创建数据同步的脚本

vi imp.sh  
  
#!/bin/bash  
  
# 1. HDB PG, 获取max(mod_time)   
MOD_TIME=`PGPASSWORD="pwd1234" psql -h 127.0.0.1 -p 4000 -U postgres postgres -q -t -A -c "select coalesce(max(mod_time),'4714-11-24 00:00:00 BC'::timestamp) from tbl_dst"`  
  
# 2. PG, 拉取增量  
# 3. 增量数据,导入HDB PG 临时表  
# 使用 linux 管道同步上下游  
PGPASSWORD="pwd" psql -h 10.31.124.69 -p 4000 -U postgres postgres -c "copy (select * from tbl_src where mod_time > '$MOD_TIME'::timestamp and mod_time < (now()-'10 sec'::interval) ) to stdout" | PGPASSWORD="pwd1234" psql -h 127.0.0.1 -p 4000 -U postgres postgres -c "copy tbl_dst_tmp from stdin"  
  
# 4. HDB PG ,DELETE from 目标表 using 临时表   
# 5. HDB PG ,insert into 目标表 select * from 临时表   
# 6. 清空临时表   
# 放在一个事务中,并且对临时表加锁保护。  
PGPASSWORD="pwd1234" psql -h 127.0.0.1 -p 4000 -U postgres postgres <<EOF  
begin;  
lock table tbl_dst_tmp in ACCESS EXCLUSIVE mode;  
delete from tbl_dst using tbl_dst_tmp where tbl_dst.id=tbl_dst_tmp.id;  
insert into tbl_dst select * from tbl_dst_tmp;  
truncate tbl_dst_tmp;  
end;  
EOF  
  
  
chmod 700 imp.sh  

3、一边压测,一边同步

COPY 6725930  
BEGIN  
LOCK TABLE  
DELETE 0  
INSERT 0 6725930  
TRUNCATE TABLE  
COMMIT  
  
  
COPY 0  
BEGIN  
LOCK TABLE  
DELETE 0  
INSERT 0 0  
TRUNCATE TABLE  
COMMIT  
  
  
COPY 78423  
BEGIN  
LOCK TABLE  
DELETE 52796  
INSERT 0 78423  
TRUNCATE TABLE  
COMMIT  
  
  
COPY 486817  
BEGIN  
LOCK TABLE  
DELETE 327603  
INSERT 0 486817  
TRUNCATE TABLE  
COMMIT  
  
  
COPY 2019059  
BEGIN  
LOCK TABLE  
DELETE 1381561  
INSERT 0 2019059  
TRUNCATE TABLE  
COMMIT  
COPY 864687  
Timing is on.  
BEGIN  
Time: 0.149 ms  
LOCK TABLE  
Time: 0.629 ms  
DELETE 652117  
Time: 11029.147 ms (00:11.029)  
INSERT 0 864687  
Time: 10180.576 ms (00:10.181)  
TRUNCATE TABLE  
Time: 4.729 ms  
COMMIT  
Time: 53.555 ms  
  
  
  
COPY 2235200  
Timing is on.  
BEGIN  
Time: 0.178 ms  
LOCK TABLE  
Time: 0.702 ms  
DELETE 1719572  
Time: 18621.210 ms (00:18.621)  
INSERT 0 2235200  
Time: 27716.155 ms (00:27.716)  
TRUNCATE TABLE  
Time: 63.408 ms  
COMMIT  
Time: 81.915 ms  



COPY 5448790
Timing is on.
BEGIN
Time: 0.141 ms
LOCK TABLE
Time: 0.552 ms
DELETE 4486067
Time: 15297.884 ms (00:15.298)
INSERT 0 5448790
Time: 35654.723 ms (00:35.655)
TRUNCATE TABLE
Time: 1.860 ms
COMMIT
Time: 125.503 ms

4、检查数据一致性

postgres=# select sum(hashtext((t.*)::text)),count(*) from tbl_dst t;  
      sum       |  count    
----------------+---------  
 -1351270286348 | 7548269  
(1 row)  
  
postgres=# select sum(hashtext((t.*)::text)),count(*) from tbl_src t;  
      sum       |  count    
----------------+---------  
 -1351270286348 | 7548269  
(1 row)  

五、源SQL Server, 目标Greenplum, 增量复制insert,update

SQL Server有一个时间戳类型timestamp,会自动记录数据插入,更新的时间,所以不需要用触发器来实现标记。

SQL Server的timestamp,每个表只能建一个,并且在表上绝对唯一。8个字节,是一个相对时间,可以与bigint互相转换。

目标端可以使用bigint来存储SQL Server的timestamp类型。

其他的方法与章节四类似,注意标记位的读取、比较时需要转换一下(timestamp, bigint)。 BIGINT 最小值为 (-9223372036854775808)::int8 。

DEMO1,位点从HDB PG读取。

1、HDB PG, 创建临时表

create table tbl_dst (  
  id int primary key,   
  info text not null,   
  mod_time int8 not null   -- 使用int8代替时间戳,对应ms sql的timestamp
); 

-- 临时表

create table tbl_dst_tmp (  
  id int primary key,   
  info text not null,   
  mod_time int8 not null   -- 使用int8代替时间戳,对应ms sql的timestamp
);  

2、数据同步的流程(MS SQL -> HDB PG,无法封装到LINUX SHELL中,可能需要ETL程序介入。流程大致如下)

# 1. HDB PG, 获取max(mod_time)   
MOD_TIME=`PGPASSWORD="pwd1234" psql -h 127.0.0.1 -p 4000 -U postgres postgres -q -t -A -c "select coalesce(max(mod_time),(-9223372036854775808)::int8) from tbl_dst"`  
  
# 2. MS SQL, 拉取增量  
select id, info, convert(bigint, mod_time) from tbl_src where convert(bigint,mod_time) > $MOD_TIME;

# 3. 增量数据,导入HDB PG 临时表  

insert into tbl_dst_tmp ...从步骤2来的数据...;

## 可能的话,还是用COPY更快,"copy tbl_dst_tmp from stdin"  
  
# 4. HDB PG ,DELETE from 目标表 using 临时表   
# 5. HDB PG ,insert into 目标表 select * from 临时表   
# 6. 清空临时表   

# 放在一个事务中,并且对临时表加锁保护。  
PGPASSWORD="pwd1234" psql -h 127.0.0.1 -p 4000 -U postgres postgres <<EOF  
begin;  
lock table tbl_dst_tmp in ACCESS EXCLUSIVE mode;  
delete from tbl_dst using tbl_dst_tmp where tbl_dst.id=tbl_dst_tmp.id;  
insert into tbl_dst select * from tbl_dst_tmp;  
truncate tbl_dst_tmp;  
end;  
EOF  

DEMO2, 位点记录在MS SQL里面,而不是从HDB PG读取。

MS SQL, 创建任务表

create table tbl_job (
  tbl_name name primary key,
  job_time datetime,
  offset_val_low bigint,
  offset_val_up bigint
)

从任务表选择上一次的最低位点

select offset_val_up into v1 from tbl_job where tbl_name='tbl_src'; 

如果上一次没有同步,则说明是第一次同步

if v1 is null then v1 := -1; end if; 

选择本次同步的最大位点

select max(convert(bigint, mod_time)) into v2 from tbl_src; 
  
更新任务表

insert or update into tbl_job values ('tbl_src', now(), v1, v2 ); 

导出本次的增量数据
  
select * from tbl_src where convert(bigint, mod_time) > v1 and convert(bigint, mod_time) <= v2; 

或者你还可以试试其他ETL软件:

http://www.symmetricds.org/about/overview

https://github.com/pivotalguru/outsourcer 专业的sql server,oracle同步到greenplum的软件

商业软件:

https://dbconvert.com/mssql/postgresql/

https://www.convert-in.com/mss2pgs.htm#

xDB

https://www.enterprisedb.com/docs/en/6.2/repguide/EDB_Postgres_Replication_Server_Users_Guide.1.10.html#pID0E0HSK0HA

FDW外部访问接口方法

https://github.com/tds-fdw/tds_fdw

《MySQL,Oracle,SQL Server等准实时同步到PostgreSQL的方案之一 - FDW外部访问接口》

小结

使用本文提供的方法,可以实现异构数据的批量同步,可以将脚本整合到一些ETL工具中,例如KETTLE,例如阿里云的DATAX (dataworks)。

性能如下:

1、源端insert\update\delete性能,单表 约 13.8万行/s。

2、同步性能,单表 约 5万行/s。

参考

http://www.cnblogs.com/gaizai/p/3483393.html

http://www.cnblogs.com/iampkm/p/4082916.html

http://www.cnblogs.com/windows/articles/2149701.html

https://blog.csdn.net/huigezi123/article/details/5849024

https://github.com/tds-fdw/tds_fdw

《debezium - 数据实时捕获和传输管道(CDC)》

《ETL for Oracle to Greenplum (bulk) - Pentaho Data Integrator (PDI, kettle)》

《ETL for Oracle to PostgreSQL 3 - DATAX》

《ETL for Oracle to PostgreSQL 2 - Pentaho Data Integrator (PDI, kettle)》

《ETL for Oracle to PostgreSQL 1 - Oracle Data Integrator (ODI)》

《MySQL准实时同步到PostgreSQL, Greenplum的方案之一 - rds_dbsync》

《MySQL,Oracle,SQL Server等准实时同步到PostgreSQL的方案之一 - FDW外部访问接口》

《[未完待续] MySQL Oracle PostgreSQL PPAS Greenplum 的异构迁移和同步实现和场景介绍》

《MySQL 增量同步到 PostgreSQL》

《使用Londiste3 增量同步 线下PostgreSQL 到 阿里云RDS PG》

《使用alidecode将RDS PG同步到线下, 或者将MySQL同步到PG》

《PostgreSQL 分区表的逻辑复制(逻辑订阅)》

《PostgreSQL 逻辑订阅 - DDL 订阅 实现方法》

《Greenplum, PostgreSQL 数据实时订阅的几种方式》

《使用PostgreSQL逻辑订阅实现multi-master》

《PostgreSQL 逻辑订阅 - 给业务架构带来了什么希望?》

《PostgreSQL 10.0 preview 逻辑订阅 - 原理与最佳实践》

《GoldenGate - Oracle 实时复制到 PostgreSQL或EnterpriseDB》

目录
相关文章
|
2月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的逻辑存储结构
PostgreSQL的逻辑存储结构包括数据库集群、数据库、表空间、段、区、块等。每个对象都有唯一的对象标识符OID,并存储于相应的系统目录表中。集群由单个服务器实例管理,包含多个数据库、用户及对象。表空间是数据库的逻辑存储单元,用于组织逻辑相关的数据结构。段是分配给表、索引等逻辑结构的空间集合,区是段的基本组成单位,而块则是最小的逻辑存储单位。
【赵渝强老师】PostgreSQL的逻辑存储结构
|
3月前
|
SQL
简单练习Microsoft SQL Server MERGE同步两个表
【10月更文挑战第13天】本文介绍了在Microsoft SQL Server中使用`MERGE`语句同步两个表的步骤。首先创建源表`SourceTable`和目标表`TargetTable`并分别插入数据,然后通过`MERGE`语句根据ID匹配行,实现更新、插入和删除操作,最后验证同步结果。此方法可根据需求调整以适应不同场景。
178 1
|
5月前
|
监控 物联网 关系型数据库
使用PostgreSQL触发器解决物联网设备状态同步问题
在物联网监控系统中,确保设备状态(如在线与离线)的实时性和准确性至关重要。当设备状态因外部因素改变时,需迅速反映到系统内部。因设备状态数据分布在不同表中,直接通过应用同步可能引入复杂性和错误。采用PostgreSQL触发器自动同步状态变化是一种高效方法。首先定义触发函数,在设备状态改变时更新管理模块表;然后创建触发器,在状态字段更新后执行此函数。此外,还需进行充分测试、监控性能并实施优化,以及在触发函数中加入错误处理和日志记录功能。这种方法不仅提高自动化程度,增强数据一致性与实时性,还需注意其对性能的影响并采取优化措施。
|
5月前
|
SQL 关系型数据库 数据库
[postgresql]逻辑备份与还原
[postgresql]逻辑备份与还原
|
8月前
|
SQL Java 网络安全
实时计算 Flink版操作报错合集之SQLserver表没有主键,同步的时候报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
223 1
|
8月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何使用PostgreSQL2.4.1从指定时间戳同步数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
SQL 关系型数据库 数据库
实时计算 Flink版产品使用合集之同步PostgreSQL数据时,WAL 日志无限增长,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在使用 DataWorks 数据集成同步 PostgreSQL 数据库中的 Geometry 类型数据如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
107 0
|
8月前
|
分布式计算 关系型数据库 大数据
MaxCompute产品使用合集之怎么才可以将 PostgreSQL 中的 geometry 空间类型字段同步到 MaxCompute 或另一个 PostgreSQL 数据库
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
SQL 数据库
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例
SQL Server附加数据库出现错误823,附加数据库失败。数据库没有备份,无法通过备份恢复数据库。 SQL Server数据库出现823错误的可能原因有:数据库物理页面损坏、数据库物理页面校验值损坏导致无法识别该页面、断电或者文件系统问题导致页面丢失。
122 12
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等