03-PDI(Kettle)导入与导出CDC(下)

简介: 文章目录03-PDI(Kettle)导入与导出CDC数据的全量、增量、差异备份基于时间戳的源数据CDC实验原理实验步骤基于触发器的CDC实验原理
  1. 创建studentinfo表
-- -----------------------------------------------
-- 创建kettledb数据库
-- -----------------------------------------------
CREATE DATABASE IF NOT EXISTS kettledb;
USE kettledb;
-- -----------------------------------------------
-- 创建studentinfo表
-- 添加两个时间戳字段:
--   ID : 学生记录主键,设置未自增序列 (AUTO_INCREMENT)
--        新增学生记录会ID会自动增加1
--   createtimestamp : 记录创建时间
--                     当记录被创建是自动设置当时时间
--   moditytimestamp : 记录修改时间
--                     当记录被更新是自动设置当时时间
-- -----------------------------------------------
DROP TABLE IF EXISTS studentinfo;
CREATE TABLE studentinfo  (
  ID int UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  Name varchar(20),
  Gender char(2),
  Class char(10),
  Age tinyint UNSIGNED DEFAULT NULL,
  Score tinyint UNSIGNED DEFAULT NULL,
  Height tinyint UNSIGNED DEFAULT NULL,
  PhoneNumber char(11),
  createtimestamp datetime DEFAULT CURRENT_TIMESTAMP,
  modifytimestamp datetime ON UPDATE CURRENT_TIMESTAMP
);
-- -----------------------------------------------
-- 插入几条测试数据
-- createtimestamp和modifytimestamp会自动添加
-- -----------------------------------------------
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (1, 'Yi Zhang', 'M', '1701', 16, 78, 170, '19946554571');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (2, 'Li, Er', 'M', '1701', 17, 80, 175, '19946554572');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (3, 'Xun Xie', 'M', '1702', 18, 95, 168, '19946554573');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (4, 'Ling Zhao', 'F', '1702', 19, 86, 180, '19946554574');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (5, 'Ming Zhang', 'M', '1704', 20, 85, 185, '19946554575');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (6, 'San Zhang', 'F', '1704', 18, 95, 169, '19946554576');
  1. 复制kettledb数据库中的studentinfo数据到新的表studentinfobak1
use kettledb;
show create table studentinfo;

返回studentinfo建表语句为:

CREATE TABLE `studentinfo` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8

根据studentinfo建表语句创建studentinfobak1表,注意这里修改了表名为studentinfobak1。

  1. 快照备份
    将studentinfo中的数据备份到studentinfobak1 中
INSERT INTO studentinfobak1 SELECT * FROM studentinfo
  1. 修改studentinfo表中数据
    执行的sql:
-- 触发插入
INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');
-- 触发删除
DELETE FROM studentinfo WHERE Name='San Zhang';
-- 触发更新
UPDATE studentinfo SET Score=92 WHERE ID=1;
-- 清空操作日志表
-- TRUNCATE cdc_opt_log;
  1. 复制studentinfo表中数据到新表studentinfobak2
    创建studentinfobak2表
CREATE TABLE `studentinfobak2` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
  1. 快照备份
    将studentinfo中的数据备份到新的备份表studentinfobak2 中:
INSERT INTO studentinfobak2 SELECT * FROM studentinfo
  1. kettle步骤

(1)设计转换

table input 为第一次的备份studentinfobak1中的数据

table input 2 为第二次的备份studentinfobak2 中的数据

通过比较两份数据中的差(Merge rows diff),即可获得增量数据再通过synchronize after merge中进行合并操作。synchronize after merge空间常与Merge rows diff联合使用,用于合并后同步信息 =根据某个字段值的条件插入,删除,更新数据库表


(2)table input1




(3)table input2



(4)merge rows (diff)

这里会将Table input作为参照表,Table input 2 作为比较表,根据ID进行匹配,ID相同的进行比较,如果ID仅存在与Table input 中,而在Table input 2 不存在,表示该数据被删除了。如果ID不存在与Table input 中,而在Table input 2 存在,表示该数据为新增。如果ID同时存在与Table input和Table input 2中

,比较其ID ,Name,Gender…等字段,如果这些字段在两个表输入不相同,表示该数据进行过修改。


有提示建议先排序,可以在merge rows前执行下sort row操作,读者可以再

添加排序步骤




5)synchronize after merge设置

General选项通用设置:

Target Schema为比较两份快照后端增量输出表,保存这两份快照中的增量数据。Table field表示目标标准的ID字段,Stream field1 表示上一步merge rows diff输出的字段。

最下面的update fields表示要更新的字段。

第一次执行,可以选择SQL—>Execute生成目标表。



Advanced选项设置高级




至此:转换编写完成,如有需要,请留言,我会把ktr文件发到您的邮箱

相关文章
|
Linux 测试技术 数据库
7步教你搞定Linux禅道系统安装
7步教你搞定Linux禅道系统安装
475 0
|
4月前
|
存储 Android开发 数据安全/隐私保护
Thanox安卓系统增加工具下载,管理、阻止、限制后台每个APP运行情况
Thanox是一款Android系统管理工具,专注于权限、后台启动及运行管理。支持应用冻结、系统优化、UI自定义和模块管理,基于Xposed框架开发,安全可靠且开源免费,兼容Android 6.0及以上版本。
416 4
|
4月前
|
机器学习/深度学习 人工智能 数据可视化
一文讲透:信息化、数字化、智能化、智慧化、数智化,到底啥区别?
本文深入解析了企业转型中的五大关键概念:信息化、数字化、智能化、智慧化与数智化。通过清晰的阶段划分和实际案例,帮助企业认清自身所处阶段,明确下一步发展方向,避免盲目跟风,真正实现业务流程优化与数据驱动决策,迈向高效、智能、协同的未来企业形态。
|
机器学习/深度学习 人工智能 算法
图灵奖获得者杰夫·辛顿(Geoffrey Hinton)
杰夫·辛顿(Geoffrey Hinton),加拿大-英国籍教育科研工作者,1947年生于英国温布尔登。他因在神经网络和深度学习领域的杰出贡献,于2018年获得图灵奖。辛顿是反向传播算法和对比散度算法的发明人之一,被誉为“AI教父”。他的研究推动了现代神经网络的发展,并在多个国际顶级期刊上发表了多篇重要论文。
867 0
|
SQL 存储 监控
SQLServer事务复制延迟优化之并行(多线程)复制
【9月更文挑战第12天】在SQL Server中,事务复制延迟会影响数据同步性。并行复制可通过多线程处理优化这一问题,提高复制效率。主要优化方法包括:配置分发代理参数、优化网络带宽、调整系统资源、优化数据库设计及定期监控维护。合理实施这些措施可提升数据同步的及时性和可靠性。
449 0
|
小程序
【微信小程序-原生开发】wxml 支持 includes (wxml中执行函数的方法)
【微信小程序-原生开发】wxml 支持 includes (wxml中执行函数的方法)
724 0
|
存储 安全 Java
【Java】已解决Java中的java.lang.VerifyError异常
【Java】已解决Java中的java.lang.VerifyError异常
1134 1
|
编解码 JavaScript 前端开发
JS逆向浏览器脱环境专题:事件学习和编写、DOM和BOM结构、指纹验证排查、代理自吐环境通杀环境检测、脱环境框架、脱环境插件解决
JS逆向浏览器脱环境专题:事件学习和编写、DOM和BOM结构、指纹验证排查、代理自吐环境通杀环境检测、脱环境框架、脱环境插件解决
791 1
|
jenkins Java 持续交付
Jenkins打包,发布,部署
Jenkins打包,发布,部署
842 0