03-PDI(Kettle)导入与导出CDC(下)

简介: 文章目录03-PDI(Kettle)导入与导出CDC数据的全量、增量、差异备份基于时间戳的源数据CDC实验原理实验步骤基于触发器的CDC实验原理
  1. 创建studentinfo表
-- -----------------------------------------------
-- 创建kettledb数据库
-- -----------------------------------------------
CREATE DATABASE IF NOT EXISTS kettledb;
USE kettledb;
-- -----------------------------------------------
-- 创建studentinfo表
-- 添加两个时间戳字段:
--   ID : 学生记录主键,设置未自增序列 (AUTO_INCREMENT)
--        新增学生记录会ID会自动增加1
--   createtimestamp : 记录创建时间
--                     当记录被创建是自动设置当时时间
--   moditytimestamp : 记录修改时间
--                     当记录被更新是自动设置当时时间
-- -----------------------------------------------
DROP TABLE IF EXISTS studentinfo;
CREATE TABLE studentinfo  (
  ID int UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  Name varchar(20),
  Gender char(2),
  Class char(10),
  Age tinyint UNSIGNED DEFAULT NULL,
  Score tinyint UNSIGNED DEFAULT NULL,
  Height tinyint UNSIGNED DEFAULT NULL,
  PhoneNumber char(11),
  createtimestamp datetime DEFAULT CURRENT_TIMESTAMP,
  modifytimestamp datetime ON UPDATE CURRENT_TIMESTAMP
);
-- -----------------------------------------------
-- 插入几条测试数据
-- createtimestamp和modifytimestamp会自动添加
-- -----------------------------------------------
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (1, 'Yi Zhang', 'M', '1701', 16, 78, 170, '19946554571');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (2, 'Li, Er', 'M', '1701', 17, 80, 175, '19946554572');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (3, 'Xun Xie', 'M', '1702', 18, 95, 168, '19946554573');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (4, 'Ling Zhao', 'F', '1702', 19, 86, 180, '19946554574');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (5, 'Ming Zhang', 'M', '1704', 20, 85, 185, '19946554575');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (6, 'San Zhang', 'F', '1704', 18, 95, 169, '19946554576');
  1. 复制kettledb数据库中的studentinfo数据到新的表studentinfobak1
use kettledb;
show create table studentinfo;

返回studentinfo建表语句为:

CREATE TABLE `studentinfo` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8

根据studentinfo建表语句创建studentinfobak1表,注意这里修改了表名为studentinfobak1。

  1. 快照备份
    将studentinfo中的数据备份到studentinfobak1 中
INSERT INTO studentinfobak1 SELECT * FROM studentinfo
  1. 修改studentinfo表中数据
    执行的sql:
-- 触发插入
INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');
-- 触发删除
DELETE FROM studentinfo WHERE Name='San Zhang';
-- 触发更新
UPDATE studentinfo SET Score=92 WHERE ID=1;
-- 清空操作日志表
-- TRUNCATE cdc_opt_log;
  1. 复制studentinfo表中数据到新表studentinfobak2
    创建studentinfobak2表
CREATE TABLE `studentinfobak2` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
  1. 快照备份
    将studentinfo中的数据备份到新的备份表studentinfobak2 中:
INSERT INTO studentinfobak2 SELECT * FROM studentinfo
  1. kettle步骤

(1)设计转换

table input 为第一次的备份studentinfobak1中的数据

table input 2 为第二次的备份studentinfobak2 中的数据

通过比较两份数据中的差(Merge rows diff),即可获得增量数据再通过synchronize after merge中进行合并操作。synchronize after merge空间常与Merge rows diff联合使用,用于合并后同步信息 =根据某个字段值的条件插入,删除,更新数据库表


(2)table input1




(3)table input2



(4)merge rows (diff)

这里会将Table input作为参照表,Table input 2 作为比较表,根据ID进行匹配,ID相同的进行比较,如果ID仅存在与Table input 中,而在Table input 2 不存在,表示该数据被删除了。如果ID不存在与Table input 中,而在Table input 2 存在,表示该数据为新增。如果ID同时存在与Table input和Table input 2中

,比较其ID ,Name,Gender…等字段,如果这些字段在两个表输入不相同,表示该数据进行过修改。


有提示建议先排序,可以在merge rows前执行下sort row操作,读者可以再

添加排序步骤




5)synchronize after merge设置

General选项通用设置:

Target Schema为比较两份快照后端增量输出表,保存这两份快照中的增量数据。Table field表示目标标准的ID字段,Stream field1 表示上一步merge rows diff输出的字段。

最下面的update fields表示要更新的字段。

第一次执行,可以选择SQL—>Execute生成目标表。



Advanced选项设置高级




至此:转换编写完成,如有需要,请留言,我会把ktr文件发到您的邮箱

相关文章
|
数据采集 数据库
Kettle之错误处理(Error Handling)里面的坑
Kettle之错误处理(Error Handling)里面的坑
|
JavaScript 前端开发
kettle从sftp下载多个文件并进行转换后输出
kettle从sftp下载多个文件并进行转换后输出
|
SQL 存储 安全
03-PDI(Kettle)导入与导出CDC(上)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(上)
|
SQL 关系型数据库 MySQL
03-PDI(Kettle)导入与导出CDC(中)
文章目录 03-PDI(Kettle)导入与导出CDC 数据的全量、增量、差异备份 基于时间戳的源数据CDC 实验原理 实验步骤 基于触发器的CDC 实验原理
03-PDI(Kettle)导入与导出CDC(中)
|
SQL XML 存储
02-PDI(Kettle)导入与导出(下)
文章目录 02-PDI(Kettle)导入与导出 多个excel表格数据的合并 实验步骤: 拓展 Excel介绍 基于文本的数据导入与导出
02-PDI(Kettle)导入与导出(下)
|
XML 存储 移动开发
02-PDI(Kettle)导入与导出(上)
文章目录 02-PDI(Kettle)导入与导出 多个excel表格数据的合并 实验步骤: 拓展 Excel介绍 基于文本的数据导入与导出
02-PDI(Kettle)导入与导出(上)
|
存储 XML 缓存
01-PDI(Kettle)简介与安装
文章目录 01-PDI(Kettle)简介与安装 PDI(Kettle)简介 Kettle安装 Kettle核心知识点
01-PDI(Kettle)简介与安装
|
canal 关系型数据库 MySQL
【笔记】用户指南—数据导入和导出—使用DTS导入和导出数据
PolarDB-X提供丰富的数据导入和导出方式,以保持与其他数据系统的互通。本文主要介绍通过DTS导入导出数据的方式。
157 0
|
Java 数据库
用户指南—数据导入和导出—使用Batch Tool工具导入导出数据
本文介绍了通过Batch Tool工具导入导出数据的方法。
122 0
|
关系型数据库 MySQL Java
用户指南—数据导入和导出—使用程序进行数据导入
本文将介绍如何通过编写代码的方式,将导入数据到PolarDB-X中。
107 0