03-PDI(Kettle)导入与导出CDC
数据的全量、增量、差异备份
在很远很远的地方,有一个帐房先生。
他每天要记很多很多的账单。
老先生一生谨慎,为了保证账本的安全,
便找来三个徒弟帮忙来对账本做备份,
这样即使账本丢失了,
也可以用备份的账本继续使用。
三个徒弟各有所长,分别采用了不同的做法:
大徒弟▼
性格宅心仁厚,成熟稳重。
他采用的方式是每天都把师父的账单重新抄录一份。这样做的好处就是每天都是一份完整的账本,每一个备份的账本都可以直接使用,坏处则是每天要花费很多时间去进行记录,并且需要很多纸、墨水以及存账本的柜子。
二徒弟▼
性格聪明伶俐,人小鬼大。
她觉得大师姐的方法太累太耗时,不如每天只抄录账本上新增的信息,这样她每天总是第一个抄完,不仅节省时间,而且为店里节省了大量的纸张、墨水等成本。不过她最讨厌师父来检查账本了,因此她需要将每次纪录的数据拼凑在一起才能组成一个完成的账本。
三徒弟▼
性格心思缜密,粗中有细
她借鉴了两位师姐的办法,进行折中创新,具体是隔一段时间就把账本的所有信息重新抄一份,而在这段时间内则只纪录每天相比于“总账”变化的信息,比如周一抄一份总账,周二买菜一项花了100元,那么周三就把“买菜花了100元”纪录下来;周三买肉花了50元,周四就把周三的“买肉花了50元”和周二的“买菜一项花了100元”都记录下来。如果师父要查账的话,她把新纪录的数据和“总账”拼在一起,就是完整的账本了。这种方式在记账速度、纸墨使用量、账单查看等方面都是一种较为折中的方式。哦,对了,一个比较明显的不足就是解释起来需要打更多的字!
三个徒弟的三种方法各有千秋,后来为了便于记忆,账房先生给这三种方法起了名字,分别叫:全量备份、增量备份和差异备份。
基于时间戳的源数据CDC
实验原理
从时间戳识别出变化的数据并只导入这部分数据。根据cdc_time_log表中的上次执行时间,以及输入的当前执行时间,增量导出student_cdc表中的数据。输出的数据存储在XX/student_cdc.xls文件中。其中,cdc_time_log表的主要作用是记录上次执行的时间,拉取当前执行时间与上次执行时间之间的数据即为增量数据, 拉取成功后,需要将cdc_time_log表中的上次执行时间更新为当前执行时间。这样就可以继续进行CDC操作。
实验步骤
- 创建student_cdc表。
在mysql命令行执行student_cdc.sql脚本
student_cdc.sql内容
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for student_cdc -- ---------------------------- DROP TABLE IF EXISTS `student_cdc`; CREATE TABLE `student_cdc` ( `学号` int(11) NOT NULL, `姓名` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `性别` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `班级` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `年龄` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `成绩` int(15) NULL DEFAULT NULL, `身高` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `手机` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `插入时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `更新时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`学号`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of student_cdc -- ---------------------------- INSERT INTO `student_cdc` VALUES (1, '张一', '男', '1701', '16', 78, '170', '18946554571', '2018-08-06', '2018-08-06'); INSERT INTO `student_cdc` VALUES (2, '李二', '男', '1701', '17', 80, '175', '18946554572', '2018-08-06', '2018-08-06'); INSERT INTO `student_cdc` VALUES (3, '谢逊', '男', '1702', '18', 95, '169', '18946554573', '2018-08-06', '2018-08-06'); INSERT INTO `student_cdc` VALUES (4, '赵玲', '女', '1702', '19', 86, '180', '18956257895', '2018-08-06', '2018-08-06'); INSERT INTO `student_cdc` VALUES (5, '张明', '男', '1704', '20', 85, '185', '18946554575', '2018-08-07', '2018-08-07'); INSERT INTO `student_cdc` VALUES (6, '张三', '女', '1704', '18', 92, '169', '18946554576', '2018-08-06', '2018-08-07'); SET FOREIGN_KEY_CHECKS = 1;
在mysql命令行执行cdc_time_log.sql脚本。
cdc_time_log.sql内容
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for cdc_time_log -- ---------------------------- DROP TABLE IF EXISTS `cdc_time_log`; CREATE TABLE `cdc_time_log` ( `id` int(11) NOT NULL, `上次执行时间` varchar(45) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of cdc_time_log -- ---------------------------- INSERT INTO `cdc_time_log` VALUES (1, '2018-08-04'); SET FOREIGN_KEY_CHECKS = 1;
2.新建并设计转换。
(1)设计转换如下
(2) 命名参数设置:鼠标在转换的空白地方右键,选择“转换设置”。
在命名参数标签卡“命名参数”中配置命名参数的名字(cur_time)和默认值(“2018-08-04”)。
(3)“table imput”的配置:
命名为“CDC日志表输入”
建立数据库连接,在数据库连接的option中设置
characterEncoding为utf-8
SQL查询语句为:
SELECT 上次执行时间 as last1,'${cur_time}' as cur1,上次执行时间 as last2,'${cur_time}' as cur2 FROM cdc_time_log
(4)“table input2”设置
步骤命名为“学生表输入”
Sql语句为:
SELECT 学号, 姓名, 性别, 班级, 年龄, 成绩, 身高, 手机, 插入时间, 更新时间, '${cur_time}' as 导入时间 FROM st
(5)“Microsoft Excel 输出”步骤设置,文件输出路径设置为"e:/output/student_cdc" 。
(6)“插入/更新”步骤设置
由于insert/update的上一部为excel,这里选择get update fields后,会出现多余字段,可通过edit mapping配置对应的映射关系。其中table field 为当前表中的字段,stream field为上一个步骤流在的字段。
3、运行
(1)在弹出的对话框中,设置命名参数cur_time的值为“2018-08-06”,点击“启动”按钮,将在路径/home/ubuntu中输出名为student_cdc.xls的文件,cdc_time_log表中的内容将自动改为输入的参数值。
excel输出内容为:
(2)第二次运行,将参数修改为“2018-08-07”。
excel输出内容为:
基于触发器的CDC
实验原理
类似时间戳和主键序列的CDC操作,区别在于这里采用触发器生成增量条件。
实验步骤
- 创建所需表。
sql语句内容如下:
注意:这段代码是总的SQL语句,不需要执行,下面会对这段语句分步骤解释,读者执行分步骤中的语句即可
create table studentsync like studentinfo; -- desc studentsync; ALTER TABLE studentsync MODIFY createtimestamp datetime; ALTER TABLE studentsync MODIFY modifytimestamp datetime; ALTER TABLE studentsync ADD synchronizationtime datetime; -- 创建操作日志表 DROP TABLE IF EXISTS cdc_opt_log; create table cdc_opt_log ( SID int PRIMARY KEY, optype char(1), opflag char(6) ); -- 创建 INSERT 触发器 DROP TRIGGER IF EXISTS tri_insert_student; DELIMITER // CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW begin IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID; ELSE INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理'); END IF; end // DELIMITER ; -- 创建 UPDATE 触发器 DROP TRIGGER IF EXISTS tri_update_student; DELIMITER // CREATE TRIGGER tri_update_student AFTER UPDATE ON studentinfo FOR EACH ROW begin IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN UPDATE cdc_opt_log SET optype='U',opflag='未处理' WHERE SID=new.ID; ELSE INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'U', '未处理'); END IF; end // DELIMITER ; -- 创建 DELETE 触发器 DROP TRIGGER IF EXISTS tri_delete_student; DELIMITER // CREATE TRIGGER tri_delete_student AFTER DELETE ON studentinfo FOR EACH ROW begin IF (SELECT 1 FROM cdc_opt_log WHERE SID=old.ID) THEN UPDATE cdc_opt_log SET optype='D',opflag='未处理' WHERE SID=old.ID; ELSE INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (old.ID, 'D', '未处理'); END IF; end // DELIMITER ; -- 触发插入 -- INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576'); -- 触发删除 -- DELETE FROM studentinfo WHERE Name='Shan Zhang'; -- 触发更新 -- UPDATE studentinfo SET Score=92 WHERE ID=1; -- 清空操作日志表 -- TRUNCATE cdc_opt_log;