03-PDI(Kettle)导入与导出CDC(中)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 文章目录03-PDI(Kettle)导入与导出CDC数据的全量、增量、差异备份基于时间戳的源数据CDC实验原理实验步骤基于触发器的CDC实验原理
  1. 创建INSERT触发器

创建INSERT触发器tri_insert_student


DROP TRIGGER IF EXISTS tri_insert_student;
DELIMITER //
CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW 
begin
  IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
    UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
  ELSE
    INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
  END IF;
end
//
DELIMITER ;

studentinfo表中插入一条记录后,触发器会执行,向cdc_opt_log中更新或插入一条记录。

本段语句解释如下:

DROP TRIGGER IF EXISTS tri_insert_student;

表示删除触发器

DELIMITER // 表示修改定界符为 // ,避免MySQL遇到 分号;立刻执行

CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW 表示创建的触发器为tri_insert_student ,

后面的AFTER表示插入后执行,可选BEFORE,

后面的INSERT表示插入时触发器执行,

ON studentinfo 表示触发器定义在某表中,

FOR EACH ROW表示每行都会触发。

begin
  IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
    UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
  ELSE
    INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
  END IF;
end

表示触发器的语句,其中new 为支持的关键字,表示修改后的行,old表示修改前的行。

DELIMITER ;表示修改定界符回默认的;。

  1. 创建UPDATE触发器
    创建INSERT触发器tri_update_student
DROP TRIGGER IF EXISTS tri_update_student;
DELIMITER //
CREATE TRIGGER tri_update_student AFTER UPDATE ON studentinfo FOR EACH ROW 
begin
  IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
    UPDATE cdc_opt_log SET optype='U',opflag='未处理' WHERE SID=new.ID;
  ELSE
    INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'U', '未处理');
  END IF;
end
//
DELIMITER ;

这段语句与插入触发器基本类似,不再重复描述

  1. 创建DELETE触发器

创建INSERT触发器tri_delete_student

DROP TRIGGER IF EXISTS tri_delete_student;
DELIMITER //
CREATE TRIGGER tri_delete_student AFTER DELETE ON studentinfo FOR EACH ROW 
begin
  IF (SELECT 1 FROM cdc_opt_log WHERE SID=old.ID) THEN 
    UPDATE cdc_opt_log SET optype='D',opflag='未处理' WHERE SID=old.ID;
  ELSE
    INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (old.ID, 'D', '未处理');
  END IF;
end
//
DELIMITER ;
  1. 转换设计


  1. 具体步骤

(1)Table Input :GetCDCOptlog

   SELECT
    optype,
    '已完成' as res,
    SID
   FROM cdc_opt_log
   WHERE opflag='未处理'

(2)Switch Case :IsDeleteOperation



(3)Table Input:GetStudentCDC

sql语句:

   SELECT
     ID
   , Name
   , Gender
   , Class
   , Age
   , Score
   , Height
   , PhoneNumber
   , createtimestamp
   , modifytimestamp
   , ? as optype
   , ? as res
   , CURRENT_TIMESTAMP as loadtimestamp
   FROM studentinfo
   WHERE ID=?


(4)Insert Update:InsertOrUpdateStudentCDCtoSyncTable

这一步骤会将新插入的数据更新到studentsync。



(5)Insert Update:UpdateCDCOptLog

这一步骤会将cdc_opt_log表在的optflag字段修改为res值(已完成)


(6)Delete : DeleteStudentFromSyncTable

这一步骤会删除studentsync中的指定ID的记录


(7)Insert / Update : UpdateCDCOptLog2



至此:转换编写完成,如有需要,请留言,我会把ktr文件发到邮箱中

基于快照的CDC案例

实验原理

如果没有时间戳,不允许使用触发器,就要使用快照表。可以通过比较源表和快照表来获得数据变化。

基于快照的CDC可以检测到插入、更新和删除的数据,这是相对于基于时间戳的CDC方案的有点。其缺点是需要大量存储空间来保存快照。

实验步骤

  1. 复制表中全部数据的几种方法

复制studentinfo表中数据到新表studentinfobak1

可选用的方法,

   1、复制表结构及数据到新表
    CREATE TABLE 新表SELECT * FROM 旧表
    这种方法会将oldtable中所有的内容都拷贝过来,当然我们可以用delete from newtable;来删除。
    不过这种方法的一个最不好的地方就是新表中没有了旧表的primary key、Extra(auto_increment)等属性。需要自己用 alter 添加,而且容易搞错。
    2、只复制表结构到新表
    CREATE TABLE 新表 SELECT * FROM 旧表 WHERE 1=2
    或CREATE TABLE 新表 LIKE 旧表
    3、复制旧表的数据到新表(假设两个表结构一样)
    INSERT INTO 新表SELECT * FROM 旧表
    4、复制旧表的数据到新表(假设两个表结构不一样)
    INSERT INTO 新表(字段1,字段2,.......) SELECT 字段1,字段2,...... FROM 旧表
    5、可以将表1结构复制到表2
    SELECT * INTO 表2 FROM 表1 WHERE 1=2
    6、可以将表1内容全部复制到表2
    SELECT * INTO 表2 FROM 表1
    7、 show create table 旧表;
    这样会将旧表的创建命令列出。我们只需要将该命令拷贝出来,更改table的名字,就可以建立一个完全一样的表
    8、mysqldump
    用mysqldump将表dump出来,改名字后再导回去或者直接在命令行中运行
    9、复制旧数据库到新数据库(复制全部表结构并且复制全部表数据)
    #mysql -u root -ppassword
    >CREATE DATABASE new_db;
    #mysqldump old_db -u root -ppassword--skip-extended-insert --add-drop-table | mysql new_db -u root -ppassword
    10、表不在同一数据库中(如,db1 table1, db2 table2)
    sql: insert into db1.table1 select * from db2.table2 (完全复制)
    insert into db1.table1 select distinct * from db2.table2(不复制重复纪录)
    insert into tdb1.able1 select top 5 * from db2.table2 (前五条纪录)
CREATE TABLE `studentinfobak1` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
7月前
|
API Apache 数据库
Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
Flink CDC 于 2023 年 12 月 7 日重磅推出了其全新的 3.0 版本 ~
105192 8
 Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
|
5月前
|
存储 数据库 时序数据库
influxdb得导出与导入
influxdb得导出与导入
155 1
|
SQL 人工智能 数据库
DataGrip导出,导入sql文件
DataGrip导出,导入sql文件
|
SQL 存储 安全
03-PDI(Kettle)导入与导出CDC(上)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(上)
|
SQL 数据库
03-PDI(Kettle)导入与导出CDC(下)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(下)
|
XML 存储 移动开发
02-PDI(Kettle)导入与导出(上)
文章目录 02-PDI(Kettle)导入与导出 多个excel表格数据的合并 实验步骤: 拓展 Excel介绍 基于文本的数据导入与导出
02-PDI(Kettle)导入与导出(上)
|
SQL XML 存储
02-PDI(Kettle)导入与导出(下)
文章目录 02-PDI(Kettle)导入与导出 多个excel表格数据的合并 实验步骤: 拓展 Excel介绍 基于文本的数据导入与导出
02-PDI(Kettle)导入与导出(下)
|
存储 XML 缓存
01-PDI(Kettle)简介与安装
文章目录 01-PDI(Kettle)简介与安装 PDI(Kettle)简介 Kettle安装 Kettle核心知识点
01-PDI(Kettle)简介与安装
|
监控 调度 数据安全/隐私保护
04-PDI(Kettle)job案例
文章目录 04-PDI(Kettle)job案例 job简介 job创建案例 1.创建空作业
04-PDI(Kettle)job案例
|
资源调度 Ubuntu Unix
05-PDI(Kettle)脚本执行
文章目录 05-PDI(Kettle)脚本执行 pan和kitchen实验背景 pan命令演示
05-PDI(Kettle)脚本执行