03-PDI(Kettle)导入与导出CDC(下)

简介: 文章目录03-PDI(Kettle)导入与导出CDC数据的全量、增量、差异备份基于时间戳的源数据CDC实验原理实验步骤基于触发器的CDC实验原理
  1. 创建studentinfo表
-- -----------------------------------------------
-- 创建kettledb数据库
-- -----------------------------------------------
CREATE DATABASE IF NOT EXISTS kettledb;
USE kettledb;
-- -----------------------------------------------
-- 创建studentinfo表
-- 添加两个时间戳字段:
--   ID : 学生记录主键,设置未自增序列 (AUTO_INCREMENT)
--        新增学生记录会ID会自动增加1
--   createtimestamp : 记录创建时间
--                     当记录被创建是自动设置当时时间
--   moditytimestamp : 记录修改时间
--                     当记录被更新是自动设置当时时间
-- -----------------------------------------------
DROP TABLE IF EXISTS studentinfo;
CREATE TABLE studentinfo  (
  ID int UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  Name varchar(20),
  Gender char(2),
  Class char(10),
  Age tinyint UNSIGNED DEFAULT NULL,
  Score tinyint UNSIGNED DEFAULT NULL,
  Height tinyint UNSIGNED DEFAULT NULL,
  PhoneNumber char(11),
  createtimestamp datetime DEFAULT CURRENT_TIMESTAMP,
  modifytimestamp datetime ON UPDATE CURRENT_TIMESTAMP
);
-- -----------------------------------------------
-- 插入几条测试数据
-- createtimestamp和modifytimestamp会自动添加
-- -----------------------------------------------
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (1, 'Yi Zhang', 'M', '1701', 16, 78, 170, '19946554571');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (2, 'Li, Er', 'M', '1701', 17, 80, 175, '19946554572');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (3, 'Xun Xie', 'M', '1702', 18, 95, 168, '19946554573');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (4, 'Ling Zhao', 'F', '1702', 19, 86, 180, '19946554574');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (5, 'Ming Zhang', 'M', '1704', 20, 85, 185, '19946554575');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (6, 'San Zhang', 'F', '1704', 18, 95, 169, '19946554576');
  1. 复制kettledb数据库中的studentinfo数据到新的表studentinfobak1
use kettledb;
show create table studentinfo;

返回studentinfo建表语句为:

CREATE TABLE `studentinfo` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8

根据studentinfo建表语句创建studentinfobak1表,注意这里修改了表名为studentinfobak1。

  1. 快照备份
    将studentinfo中的数据备份到studentinfobak1 中
INSERT INTO studentinfobak1 SELECT * FROM studentinfo
  1. 修改studentinfo表中数据
    执行的sql:
-- 触发插入
INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');
-- 触发删除
DELETE FROM studentinfo WHERE Name='San Zhang';
-- 触发更新
UPDATE studentinfo SET Score=92 WHERE ID=1;
-- 清空操作日志表
-- TRUNCATE cdc_opt_log;
  1. 复制studentinfo表中数据到新表studentinfobak2
    创建studentinfobak2表
CREATE TABLE `studentinfobak2` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
  1. 快照备份
    将studentinfo中的数据备份到新的备份表studentinfobak2 中:
INSERT INTO studentinfobak2 SELECT * FROM studentinfo
  1. kettle步骤

(1)设计转换

table input 为第一次的备份studentinfobak1中的数据

table input 2 为第二次的备份studentinfobak2 中的数据

通过比较两份数据中的差(Merge rows diff),即可获得增量数据再通过synchronize after merge中进行合并操作。synchronize after merge空间常与Merge rows diff联合使用,用于合并后同步信息 =根据某个字段值的条件插入,删除,更新数据库表


(2)table input1




(3)table input2



(4)merge rows (diff)

这里会将Table input作为参照表,Table input 2 作为比较表,根据ID进行匹配,ID相同的进行比较,如果ID仅存在与Table input 中,而在Table input 2 不存在,表示该数据被删除了。如果ID不存在与Table input 中,而在Table input 2 存在,表示该数据为新增。如果ID同时存在与Table input和Table input 2中

,比较其ID ,Name,Gender…等字段,如果这些字段在两个表输入不相同,表示该数据进行过修改。


有提示建议先排序,可以在merge rows前执行下sort row操作,读者可以再

添加排序步骤




5)synchronize after merge设置

General选项通用设置:

Target Schema为比较两份快照后端增量输出表,保存这两份快照中的增量数据。Table field表示目标标准的ID字段,Stream field1 表示上一步merge rows diff输出的字段。

最下面的update fields表示要更新的字段。

第一次执行,可以选择SQL—>Execute生成目标表。



Advanced选项设置高级




至此:转换编写完成,如有需要,请留言,我会把ktr文件发到您的邮箱

相关文章
|
SQL 关系型数据库 MySQL
03-PDI(Kettle)导入与导出CDC(中)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(中)
|
Java
ETL工具 Kettle 中 kettle循环传递变量_(最简单的方法)
本文详细介绍了如何在Kettle工具中使用循环传递变量,通过示例展示了如何将movies表数据按月插入到ods_movies表,涉及新建转换、获取变量、作业配置和执行,呈现了一个嵌套作业结构.
2731 3
|
存储 关系型数据库 数据库连接
|
SQL 存储 安全
03-PDI(Kettle)导入与导出CDC(上)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(上)
|
IDE 开发工具
IDEA中文版插件
本文主要介绍IDEA中如何使用中文插件
1187 1
IDEA中文版插件
|
关系型数据库 Java 数据库
使用Kettle动态生成页码并实现分页数据同步
使用Kettle动态生成页码并实现分页数据同步
1294 0
使用Kettle动态生成页码并实现分页数据同步
|
9月前
|
机器学习/深度学习 人工智能 数据可视化
一文讲透:信息化、数字化、智能化、智慧化、数智化,到底啥区别?
本文深入解析了企业转型中的五大关键概念:信息化、数字化、智能化、智慧化与数智化。通过清晰的阶段划分和实际案例,帮助企业认清自身所处阶段,明确下一步发展方向,避免盲目跟风,真正实现业务流程优化与数据驱动决策,迈向高效、智能、协同的未来企业形态。
4.Electron之自定义菜单(绑定快捷键、点击事件)
4.Electron之自定义菜单(绑定快捷键、点击事件)
991 1
|
Java Maven Spring
IDEA中从https://start.spring.io网站创建springboot项目
IDEA中从https://start.spring.io网站创建springboot项目
IDEA中从https://start.spring.io网站创建springboot项目
|
jenkins Java 持续交付
Jenkins打包,发布,部署
Jenkins打包,发布,部署
1079 0

热门文章

最新文章

下一篇
开通oss服务