利用EDB的database link实现从PostgreSQL到PostgreSQL的数据同步。
源数据是持续插入,无更新操作的记录.将create_time作为同步标记.并且需要定期清除同步来的历史数据.
Source Database : PostgreSQL
Destination Database : PostgreSQL
Sync Database : EnterpriseDB
SD_table :
Table "public.s_table"
Column | Type | Modifiers
--------------+-----------------------------+--------------------------------------------------------------------
id | bigint | not null default nextval('s_table_id_seq'::regclass)
create_time | timestamp without time zone | default now()
s_id | integer |
url | character varying(1000) |
ip | character varying(32) |
module | character varying(64) |
resource_id | character varying(100) |
source | character varying(64) |
refer_url | character varying(1000) |
app_id | integer |
last_foot_id | bigint |
Indexes:
"pk_s_table" PRIMARY KEY, btree (id)
"create_time_index" btree (create_time)
DD_table :
Table "sync.d_table"
Column | Type | Modifiers
------------+-----------------------------+---------------
id | bigint | not null
appid | integer |
module | character varying(64) |
createtime | timestamp without time zone | default now()
Indexes:
"pk_d_table" PRIMARY KEY, btree (id)
"idx_d_table_1" btree (createtime)
SD_database_link :
List of database links
Link | Access | Owner | Type | User | Connection String
-----------------------+--------+--------------+-------+----------+----------------------------------------------
sync_s | PUBLIC | enterprisedb | LIBPQ | s_user | host=xxx.xxx.xxx.xxx port=xxxx dbname=s_database_name
sync_d | PUBLIC | enterprisedb | LIBPQ | d_user | host=xxx.xxx.xxx.xxx port=xxxx dbname=d_database_name
SD_sync_function :
create or replace function f_sync_d_table () returns void as $BODY$
declare
s_max_time timestamp without time zone;
d_max_time timestamp without time zone;
del_time timestamp without time zone;
return_rows int;
begin
del_time := now() - interval '2 hour';
select max(createtime) into d_max_time from d_table@sync_d;
raise notice 'd_max_time is %.',d_max_time;
select max(create_time) into s_max_time from s_table@sync_s where create_time>d_max_time;
raise notice 'The record time before % and after % will be synced.',s_max_time,d_max_time;
insert into d_table@sync_d (id,appid,module,createtime) select id,app_id,module,create_time from s_table@sync_s where create_time>d_max_time and create_time<s_max_time;
GET DIAGNOSTICS return_rows = ROW_COUNT;
raise notice 'sync row count is %.',return_rows;
raise notice 'The record time before % and before % will be deleted.',d_max_time,del_time;
delete from d_table@sync_d where createtime < d_max_time and createtime < del_time;
GET DIAGNOSTICS return_rows = ROW_COUNT;
raise notice 'del row count is %.',return_rows;
end;
$BODY$ language plpgsql;
TEST:
select * from f_sync_d_table();
NOTICE: d_max_time is 04-NOV-10 11:06:06.186422.
NOTICE: The record time before 04-NOV-10 11:08:06.193542 and after 04-NOV-10 11:06:06.186422 will be synced.
NOTICE: sync row count is 164.
NOTICE: The record time before 04-NOV-10 11:06:06.186422 and before 04-NOV-10 09:09:01.409094 will be deleted.
NOTICE: del row count is 0.
f_sync_tbl_deposit_visit_log
------------------------------
(1 row)
搞个执行计划,定时执行就可以了(前提是至少执行周期比删除范围窄).