03-PDI(Kettle)导入与导出CDC(上)

简介: 文章目录03-PDI(Kettle)导入与导出CDC数据的全量、增量、差异备份基于时间戳的源数据CDC实验原理实验步骤基于触发器的CDC实验原理

03-PDI(Kettle)导入与导出CDC

数据的全量、增量、差异备份

在很远很远的地方,有一个帐房先生。

他每天要记很多很多的账单。

老先生一生谨慎,为了保证账本的安全,

便找来三个徒弟帮忙来对账本做备份,

这样即使账本丢失了,

也可以用备份的账本继续使用。

三个徒弟各有所长,分别采用了不同的做法:

大徒弟▼



性格宅心仁厚,成熟稳重。

他采用的方式是每天都把师父的账单重新抄录一份。这样做的好处就是每天都是一份完整的账本,每一个备份的账本都可以直接使用,坏处则是每天要花费很多时间去进行记录,并且需要很多纸、墨水以及存账本的柜子。

二徒弟▼



性格聪明伶俐,人小鬼大。

她觉得大师姐的方法太累太耗时,不如每天只抄录账本上新增的信息,这样她每天总是第一个抄完,不仅节省时间,而且为店里节省了大量的纸张、墨水等成本。不过她最讨厌师父来检查账本了,因此她需要将每次纪录的数据拼凑在一起才能组成一个完成的账本。


三徒弟▼



性格心思缜密,粗中有细

她借鉴了两位师姐的办法,进行折中创新,具体是隔一段时间就把账本的所有信息重新抄一份,而在这段时间内则只纪录每天相比于“总账”变化的信息,比如周一抄一份总账,周二买菜一项花了100元,那么周三就把“买菜花了100元”纪录下来;周三买肉花了50元,周四就把周三的“买肉花了50元”和周二的“买菜一项花了100元”都记录下来。如果师父要查账的话,她把新纪录的数据和“总账”拼在一起,就是完整的账本了。这种方式在记账速度、纸墨使用量、账单查看等方面都是一种较为折中的方式。哦,对了,一个比较明显的不足就是解释起来需要打更多的字!

三个徒弟的三种方法各有千秋,后来为了便于记忆,账房先生给这三种方法起了名字,分别叫:全量备份、增量备份和差异备份。


基于时间戳的源数据CDC

实验原理

从时间戳识别出变化的数据并只导入这部分数据。根据cdc_time_log表中的上次执行时间,以及输入的当前执行时间,增量导出student_cdc表中的数据。输出的数据存储在XX/student_cdc.xls文件中。其中,cdc_time_log表的主要作用是记录上次执行的时间,拉取当前执行时间与上次执行时间之间的数据即为增量数据, 拉取成功后,需要将cdc_time_log表中的上次执行时间更新为当前执行时间。这样就可以继续进行CDC操作。

实验步骤

  1. 创建student_cdc表。

在mysql命令行执行student_cdc.sql脚本

student_cdc.sql内容

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for student_cdc
-- ----------------------------
DROP TABLE IF EXISTS `student_cdc`;
CREATE TABLE `student_cdc`  (
  `学号` int(11) NOT NULL,
  `姓名` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `性别` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `班级` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `年龄` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `成绩` int(15) NULL DEFAULT NULL,
  `身高` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `手机` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `插入时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `更新时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`学号`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of student_cdc
-- ----------------------------
INSERT INTO `student_cdc` VALUES (1, '张一', '男', '1701', '16', 78, '170', '18946554571', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (2, '李二', '男', '1701', '17', 80, '175', '18946554572', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (3, '谢逊', '男', '1702', '18', 95, '169', '18946554573', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (4, '赵玲', '女', '1702', '19', 86, '180', '18956257895', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (5, '张明', '男', '1704', '20', 85, '185', '18946554575', '2018-08-07', '2018-08-07');
INSERT INTO `student_cdc` VALUES (6, '张三', '女', '1704', '18', 92, '169', '18946554576', '2018-08-06', '2018-08-07');
SET FOREIGN_KEY_CHECKS = 1;

在mysql命令行执行cdc_time_log.sql脚本。

cdc_time_log.sql内容

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cdc_time_log
-- ----------------------------
DROP TABLE IF EXISTS `cdc_time_log`;
CREATE TABLE `cdc_time_log`  (
  `id` int(11) NOT NULL,
  `上次执行时间` varchar(45) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of cdc_time_log
-- ----------------------------
INSERT INTO `cdc_time_log` VALUES (1, '2018-08-04');
SET FOREIGN_KEY_CHECKS = 1;

2.新建并设计转换。

(1)设计转换如下


(2) 命名参数设置:鼠标在转换的空白地方右键,选择“转换设置”。

在命名参数标签卡“命名参数”中配置命名参数的名字(cur_time)和默认值(“2018-08-04”)。


(3)“table imput”的配置:

命名为“CDC日志表输入”

建立数据库连接,在数据库连接的option中设置

characterEncoding为utf-8

SQL查询语句为:

SELECT 上次执行时间 as last1,'${cur_time}' as cur1,上次执行时间 as last2,'${cur_time}' as cur2 FROM cdc_time_log



(4)“table input2”设置

步骤命名为“学生表输入”

Sql语句为:

SELECT 学号, 姓名, 性别, 班级, 年龄, 成绩, 身高, 手机, 插入时间, 更新时间, '${cur_time}' as 导入时间 FROM st


(5)“Microsoft Excel 输出”步骤设置,文件输出路径设置为"e:/output/student_cdc" 。

(6)“插入/更新”步骤设置



由于insert/update的上一部为excel,这里选择get update fields后,会出现多余字段,可通过edit mapping配置对应的映射关系。其中table field 为当前表中的字段,stream field为上一个步骤流在的字段。

3、运行

(1)在弹出的对话框中,设置命名参数cur_time的值为“2018-08-06”,点击“启动”按钮,将在路径/home/ubuntu中输出名为student_cdc.xls的文件,cdc_time_log表中的内容将自动改为输入的参数值。


excel输出内容为:




(2)第二次运行,将参数修改为“2018-08-07”。



excel输出内容为:



基于触发器的CDC

实验原理

类似时间戳和主键序列的CDC操作,区别在于这里采用触发器生成增量条件。

实验步骤

  1. 创建所需表。

sql语句内容如下:

注意:这段代码是总的SQL语句,不需要执行,下面会对这段语句分步骤解释,读者执行分步骤中的语句即可

create table studentsync like studentinfo;
-- desc studentsync;
ALTER TABLE studentsync MODIFY createtimestamp datetime;
ALTER TABLE studentsync MODIFY modifytimestamp datetime;
ALTER TABLE studentsync ADD synchronizationtime datetime;
-- 创建操作日志表
DROP TABLE IF EXISTS cdc_opt_log;
create table cdc_opt_log 
(
  SID int PRIMARY KEY,
  optype char(1),
  opflag char(6)
);
-- 创建 INSERT 触发器
DROP TRIGGER IF EXISTS tri_insert_student;
DELIMITER //
CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW 
begin
  IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
    UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
  ELSE
    INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
  END IF;
end
//
DELIMITER ;
-- 创建 UPDATE 触发器
DROP TRIGGER IF EXISTS tri_update_student;
DELIMITER //
CREATE TRIGGER tri_update_student AFTER UPDATE ON studentinfo FOR EACH ROW 
begin
  IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
    UPDATE cdc_opt_log SET optype='U',opflag='未处理' WHERE SID=new.ID;
  ELSE
    INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'U', '未处理');
  END IF;
end
//
DELIMITER ;
-- 创建 DELETE 触发器 
DROP TRIGGER IF EXISTS tri_delete_student;
DELIMITER //
CREATE TRIGGER tri_delete_student AFTER DELETE ON studentinfo FOR EACH ROW 
begin
  IF (SELECT 1 FROM cdc_opt_log WHERE SID=old.ID) THEN 
    UPDATE cdc_opt_log SET optype='D',opflag='未处理' WHERE SID=old.ID;
  ELSE
    INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (old.ID, 'D', '未处理');
  END IF;
end
//
DELIMITER ;
-- 触发插入
-- INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');
-- 触发删除
-- DELETE FROM studentinfo WHERE Name='Shan Zhang';
-- 触发更新
-- UPDATE studentinfo SET Score=92 WHERE ID=1;
-- 清空操作日志表
-- TRUNCATE cdc_opt_log;


相关文章
|
SQL 数据库
03-PDI(Kettle)导入与导出CDC(下)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(下)
|
SQL 关系型数据库 MySQL
03-PDI(Kettle)导入与导出CDC(中)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(中)
|
SQL XML 存储
02-PDI(Kettle)导入与导出(下)
文章目录 02-PDI(Kettle)导入与导出 多个excel表格数据的合并 实验步骤: 拓展 Excel介绍 基于文本的数据导入与导出
02-PDI(Kettle)导入与导出(下)
|
XML 存储 移动开发
02-PDI(Kettle)导入与导出(上)
文章目录 02-PDI(Kettle)导入与导出 多个excel表格数据的合并 实验步骤: 拓展 Excel介绍 基于文本的数据导入与导出
02-PDI(Kettle)导入与导出(上)
|
存储 XML 缓存
01-PDI(Kettle)简介与安装
文章目录 01-PDI(Kettle)简介与安装 PDI(Kettle)简介 Kettle安装 Kettle核心知识点
01-PDI(Kettle)简介与安装
|
资源调度 Ubuntu Unix
05-PDI(Kettle)脚本执行
文章目录 05-PDI(Kettle)脚本执行 pan和kitchen实验背景 pan命令演示
05-PDI(Kettle)脚本执行
|
监控 调度 数据安全/隐私保护
04-PDI(Kettle)job案例
文章目录 04-PDI(Kettle)job案例 job简介 job创建案例 1.创建空作业
04-PDI(Kettle)job案例
|
canal 关系型数据库 MySQL
【笔记】用户指南—数据导入和导出—使用DTS导入和导出数据
PolarDB-X提供丰富的数据导入和导出方式,以保持与其他数据系统的互通。本文主要介绍通过DTS导入导出数据的方式。
157 0
|
Java 数据库
用户指南—数据导入和导出—使用Batch Tool工具导入导出数据
本文介绍了通过Batch Tool工具导入导出数据的方法。
123 0
|
关系型数据库 MySQL Java
用户指南—数据导入和导出—使用程序进行数据导入
本文将介绍如何通过编写代码的方式,将导入数据到PolarDB-X中。
107 0