1.源码下载
git clone git@github.com:alibaba/DataX.git
需要下载核心的包,core与common,在maven下进行安装到本地
mvn install:install-file -DgroupId=com.datax -DartifactId=datax-core -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-core-0.0.1-SNAPSHOT.jar
mvn install:install-file -DgroupId=com.datax -DartifactId=datax-common -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-common-0.0.1-SNAPSHOT.jar
2.mysql创建源表以及目标表
-- testdata.source_table definition
CREATE TABLE `source_table` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL,
`address` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- testdata.target_table definition
CREATE TABLE `target_table` (
`id` int NOT NULL DEFAULT '0',
`name` varchar(100) DEFAULT NULL,
`address` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3.使用存储过程生成测试数据
///存储过程生成测试数据 100w
create procedure geneData(in loop_times int)
begin
declare i int default 1;
while i <= loop_times do
set @name = CONCAT('elite',i);
set @address =CONCAT('xxx',i);
INSERT INTO source_table(name,address)
VALUES(@name,@address);
set i=i+1;
end while;
end
3.创建springboot项目
1.引入datax的核心包
<!--datax-->
<dependency>
<groupId>com.datax</groupId>
<artifactId>datax-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.datax</groupId>
<artifactId>datax-common</artifactId>
<version>1.0.0</version>
</dependency>
2.job的配置
{
"job": {
"setting": {
"speed": {
"channel":2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"splitPk": "id",
"column": ["id","name","address"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:3306/testdata?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
"table": ["source_table"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "username",
"password": "password",
"column": ["id","name","address"],
"connection": [
{
"table": [
"target_table"
],
"jdbcUrl": "jdbc:mysql://ip:3306/testdata?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
}
]
}
}
}
]
}
}
2.测试代码
- 需要指定datax的home目录
- 指定运行的参数
public class TestMain {
public static String getCurrentClasspath(){
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String currentClasspath = classLoader.getResource("").getPath();
// 当前操作系统
String osName = System.getProperty("os.name");
if (osName.startsWith("Win")) {
// 删除path中最前面的/
currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);
}
return currentClasspath;
}
public static void main(String[] args) {
String fileName = getCurrentClasspath()+ "/configuration/job.json";
System.out.println(fileName);
System.setProperty("datax.home","D:\\devproject\\devcode\\code\\datax\\target\\datax\\datax");
String[] datxArgs2 = { "-job", getCurrentClasspath()+ "/configuration/job.json","-mode", "standalone", "-jobid", "-1"};
try {
Engine.entry(datxArgs2);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
4.测试
4.1 不加splitpk的情况下
16:41:03.086 [job-0] INFO com.alibaba.datax.core.job.JobContainer -
任务启动时刻 : 2023-03-26 16:40:42
任务结束时刻 : 2023-03-26 16:41:03
任务总计耗时 : 20s
任务平均流量 : 1.22MB/s
记录写入速度 : 50000rec/s
读出记录总数 : 1000000
读写失败总数 : 0
4.1 加splitpk的情况下
任务启动时刻 : 2023-03-26 16:48:26
任务结束时刻 : 2023-03-26 16:48:37
任务总计耗时 : 10s
任务平均流量 : 2.45MB/s
记录写入速度 : 100000rec/s
读出记录总数 : 1000000
读写失败总数 : 0