一、数据集成任务切分键探索
对于数据集成任务,这些任务的时间消耗一般都主要花费在数据同步上,当查询表数据量较大时,其SQL本身在数据库中查询就是很慢的,那么对于这种情况有说明好的优化方法呢?
数据集成任务上提供了一个切分键的设置,那么该切分键是否可以对源库SQL查询有一定的提升,进而提高数据同步任务的整体效率呢?
切分键:可以将源数据表中某一列作为切分键
建议使用主键或有索引的列作为切分键
1、如何探究任务究竟怎么入库查询拉取数据呢?
这里主要讲案例中使用到的MySQL数据库时可以通过什么方案探究任务如何入库查询,我们可以打开MySQL的general_log,general log记录连接到数据库的所有操作。值得注意的一点是,开启开操作对数据库性能有极大影响,所以一般情况下我们仅仅会在分析问题的时候才会在自己测试环境开启该日志。
general_log参数默认关闭,若我们需要开启,可在数据库中动态设置。general_log_file指定路径即为日志路径,可在操作系统中tail -100f ${general_log_file}实时查看日志记录情况。
mysql> show variables like '%general%';
+------------------+--------------------------------------------+
| Variable_name | Value |
+------------------+--------------------------------------------+
| general_log | OFF |
| general_log_file | /var/lib/mysql/izt4niarmwaooao2egw7wiz.log |
+------------------+--------------------------------------------+
2 rows in set (0.01 sec)
mysql> set global general_log=on;
Query OK, 0 rows affected (0.11 sec)
mysql> show variables like '%general%';
+------------------+--------------------------------------------+
| Variable_name | Value |
+------------------+--------------------------------------------+
| general_log | ON |
| general_log_file | /var/lib/mysql/izt4niarmwaooao2egw7wiz.log |
+------------------+--------------------------------------------+
2 rows in set (0.01 sec)
2、不设置切分键时,SQL如何执行?
直接使用元SQL入库查询
select xxx,xxx from `shardkey`
3、设置切分键时,SQL如何执行?
1、查询切分键的最大最小值范围
SELECT MIN(id),MAX(id) FROM `shardkey`
2、根据切分键范围进行切分,范围割接左闭右开,max值时左闭右闭,除范围查询外增加切分键is null的查询,避免遗漏数据
1)根据min/max进行范围切分
select xxx,xxx from `shardkey` where (1 <= id AND id < 19923)
select xxx,xxx from `shardkey` where (19923 <= id AND id < 39845)
...
...
select xxx,xxx from `shardkey` where (179299 <= id AND id <= 199221)
2)查询切分键 is null的情况
select xxx,xxx from `shardkey` where id IS NULL
3、按照最大并发数进行并发查询(实际并发数<=任务最大期望并发数)
在任务执行时可通过在数据库执行show processlist进行监控查看
当任务最大期望并发数为2时:
当任务最大期望并发数为4时:
4、切分键使用注意
1、推荐使用主键作为切分键,因为为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点
2、目前splitPk仅支持整型数据切分,不支持字符串、浮点、日期等其他类型。
二、切分键使用性能测试
数据源信息设置
数据源使用rds for mysql,配置信息如下:
数据源接入,使用阿里云实例模式接入
测试表为shardkey,并向表内插入11407872行记录。
CREATE TABLE `shardkey` (
`id` int(40) NOT NULL AUTO_INCREMENT,
`ref_data_id` int(40) NOT NULL,
`ref_meta_id` int(40) NOT NULL,
`ref_attribute_id` int(40) NOT NULL,
`value` text NOT NULL,
`creator` varchar(40) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`tenant` varchar(40) DEFAULT NULL,
`model` varchar(40) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11602312 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC
mysql>select count(*) from shardkey;
+--------------------+
| count(*) |
+--------------------+
| 11407872 |
+--------------------+
返回行数:[1],耗时:15684 ms.
方案一:不设置切分键,默认最大并发数为2
数据集成任务配置如下:
Reader: mysql
column=[["id","ref_data_id","ref_meta_id","ref_attribute_id","value","creator","create_time","update_time","tenant","model"]]
connection=[[{"datasource":"rds_for_mysql","table":["`shardkey`"]}]]
Writer: odps
partition=[ds=20200203 ]
truncate=[true ]
datasource=[odps_first ]
column=[["id","ref_data_id","ref_meta_id","ref_attribute_id","value","creator","create_time","update_time","tenant","model"]]
emptyAsNull=[false ]
table=[shardkey_odps ]
Setting:
errorLimit=[{"record":""} ]
speed=[{"concurrent":2,"throttle":false}]
将数据集成任务保存并提交到运维中心,对周期任务进行测试,日志分析如下:
=== total summarize info ===
1. all phase average time info and max time task info:
PHASE | AVERAGE USED TIME | ALL TASK NUM | MAX USED TIME | MAX TASK ID | MAX TASK INFO
TASK_TOTAL | 87.066s | 1 | 87.066s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_INIT | 0.006s | 1 | 0.006s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_PREPARE | 0.000s | 1 | 0.000s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_DATA | 84.567s | 1 | 84.567s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_POST | 0.000s | 1 | 0.000s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_DESTROY | 0.000s | 1 | 0.000s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_INIT | 0.001s | 1 | 0.001s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_PREPARE | 0.233s | 1 | 0.233s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_DATA | 86.304s | 1 | 86.304s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_POST | 0.285s | 1 | 0.285s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_DESTROY | 0.000s | 1 | 0.000s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
SQL_QUERY | 0.055s | 1 | 0.055s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
RESULT_NEXT_ALL | 5.124s | 1 | 5.124s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
ODPS_BLOCK_CLOSE | 42.770s | 1 | 42.770s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WAIT_READ_TIME | 2.124s | 1 | 2.124s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WAIT_WRITE_TIME | 48.318s | 1 | 48.318s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
2. record average count and max count task info :
PHASE | AVERAGE RECORDS | AVERAGE BYTES | MAX RECORDS | MAX RECORD`S BYTES | MAX TASK ID | MAX TASK INFO
READ_TASK_DATA | 11407872 | 786.62M | 11407872 | 786.62M | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
2020-02-04 16:18:43.464 [job-128859081] INFO MetricReportUtil - reportJobMetric is turn off
2020-02-04 16:18:43.464 [job-128859081] INFO LocalJobContainerCommunicator - Total 11407872 records, 786618535 bytes | Speed 8.34MB/s, 126754 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 48.318s | All Task WaitReaderTime 2.124s | Percentage 100.00%
2020-02-04 16:18:43.465 [job-128859081] INFO LogReportUtil - report datax log is turn off
2020-02-04 16:18:43.465 [job-128859081] INFO JobContainer -
任务启动时刻 : 2020-02-04 16:17:09
任务结束时刻 : 2020-02-04 16:18:43
任务总计耗时 : 93s
任务平均流量 : 8.34MB/s
记录写入速度 : 126754rec/s
读出记录总数 : 11407872
读写失败总数 : 0
2020-02-04 16:18:43 INFO =================================================================
2020-02-04 16:18:43 INFO Exit code of the Shell command 0
2020-02-04 16:18:43 INFO --- Invocation of Shell command completed ---
2020-02-04 16:18:43 INFO Shell run successfully!
2020-02-04 16:18:43 INFO Current task status: FINISH
2020-02-04 16:18:43 INFO Cost time is: 95.217s
方案二:设置主键为切分键,默认最大并发数为2
数据集成任务配置如下:
Reader: mysql
column=[["id","ref_data_id","ref_meta_id","ref_attribute_id","value","creator","create_time","update_time","tenant","model"]]
connection=[[{"datasource":"rds_for_mysql","table":["`shardkey`"]}]]
splitPk=[id ]
Writer: odps
partition=[ds=20200203 ]
truncate=[true ]
datasource=[odps_first ]
column=[["id","ref_data_id","ref_meta_id","ref_attribute_id","value","creator","create_time","update_time","tenant","model"]]
emptyAsNull=[false ]
table=[shardkey_odps ]
Setting:
errorLimit=[{"record":""} ]
speed=[{"concurrent":2,"throttle":false}]
日志分析如下:
=== total summarize info ===
1. all phase average time info and max time task info:
PHASE | AVERAGE USED TIME | ALL TASK NUM | MAX USED TIME | MAX TASK ID | MAX TASK INFO
TASK_TOTAL | 9.649s | 11 | 12.512s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_INIT | 0.002s | 11 | 0.009s | 0-0-8 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_PREPARE | 0.000s | 11 | 0.002s | 0-0-8 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_DATA | 7.704s | 11 | 10.269s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_POST | 0.000s | 11 | 0.000s | 0-0-4 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_DESTROY | 0.000s | 11 | 0.000s | 0-0-5 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_INIT | 0.001s | 11 | 0.002s | 0-0-6 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_PREPARE | 0.154s | 11 | 0.248s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_DATA | 8.894s | 11 | 11.674s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_POST | 0.346s | 11 | 0.612s | 0-0-1 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_DESTROY | 0.000s | 11 | 0.000s | 0-0-5 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
SQL_QUERY | 0.062s | 11 | 0.089s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
RESULT_NEXT_ALL | 0.626s | 11 | 0.823s | 0-0-4 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
ODPS_BLOCK_CLOSE | 4.172s | 11 | 6.661s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WAIT_READ_TIME | 0.508s | 11 | 0.891s | 0-0-4 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WAIT_WRITE_TIME | 3.549s | 11 | 5.992s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
2. record average count and max count task info :
PHASE | AVERAGE RECORDS | AVERAGE BYTES | MAX RECORDS | MAX RECORD`S BYTES | MAX TASK ID | MAX TASK INFO
READ_TASK_DATA | 1037079 | 71.51M | 1153917 | 80.41M | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
2020-02-04 16:20:54.876 [job-128859276] INFO MetricReportUtil - reportJobMetric is turn off
2020-02-04 16:20:54.876 [job-128859276] INFO LocalJobContainerCommunicator - Total 11407872 records, 786618535 bytes | Speed 12.50MB/s, 190131 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 39.038s | All Task WaitReaderTime 5.589s | Percentage 100.00%
2020-02-04 16:20:54.877 [job-128859276] INFO LogReportUtil - report datax log is turn off
2020-02-04 16:20:54.877 [job-128859276] INFO JobContainer -
任务启动时刻 : 2020-02-04 16:19:51
任务结束时刻 : 2020-02-04 16:20:54
任务总计耗时 : 63s
任务平均流量 : 12.50MB/s
记录写入速度 : 190131rec/s
读出记录总数 : 11407872
读写失败总数 : 0
2020-02-04 16:20:54 INFO =================================================================
2020-02-04 16:20:54 INFO Exit code of the Shell command 0
2020-02-04 16:20:54 INFO --- Invocation of Shell command completed ---
2020-02-04 16:20:54 INFO Shell run successfully!
2020-02-04 16:20:54 INFO Current task status: FINISH
2020-02-04 16:20:54 INFO Cost time is: 64.674s
/home/admin/alisatasknode/taskinfo//20200204/diide/16/19/49/fr8l5g0pvu449c494iy9g8vn/T3_0413124736.log-END-EOF
2020-02-04 16:20:59 : Detail log url: https://di-cn-hangzhou.data.aliyun.com/web/di/instanceLog?id=128859276&resourceGroup=group_253861156446274&requestId=9157dea3-e3cf-42e9-9ee4-c8bc3858ee51&projectId=6043
Exit with SUCCESS.
2020-02-04 16:20:59 [INFO] Sandbox context cleanup temp file success.
2020-02-04 16:20:59 [INFO] Data synchronization ended with return code: [0].
2020-02-04 16:21:02 INFO =================================================================
2020-02-04 16:21:02 INFO Exit code of the Shell command 0
2020-02-04 16:21:02 INFO --- Invocation of Shell command completed ---
2020-02-04 16:21:02 INFO Shell run successfully!
2020-02-04 16:21:02 INFO Current task status: FINISH
2020-02-04 16:21:02 INFO Cost time is: 75.403s
方案三:设置主键为切分键,设置最大并发数为4
数据集成任务配置如下:
Reader: mysql
column=[["id","ref_data_id","ref_meta_id","ref_attribute_id","value","creator","create_time","update_time","tenant","model"]]
connection=[[{"datasource":"rds_for_mysql","table":["`shardkey`"]}]]
splitPk=[id ]
Writer: odps
partition=[ds=20200203 ]
truncate=[true ]
datasource=[odps_first ]
column=[["id","ref_data_id","ref_meta_id","ref_attribute_id","value","creator","create_time","update_time","tenant","model"]]
emptyAsNull=[false ]
table=[shardkey_odps ]
Setting:
errorLimit=[{"record":""} ]
speed=[{"concurrent":4,"throttle":false}]
日志分析如下:
=== total summarize info ===
1. all phase average time info and max time task info:
PHASE | AVERAGE USED TIME | ALL TASK NUM | MAX USED TIME | MAX TASK ID | MAX TASK INFO
TASK_TOTAL | 5.676s | 21 | 6.515s | 0-0-2 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_INIT | 0.002s | 21 | 0.009s | 0-0-2 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_PREPARE | 0.000s | 21 | 0.001s | 0-0-3 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_DATA | 2.949s | 21 | 3.566s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_POST | 0.000s | 21 | 0.000s | 0-0-3 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
READ_TASK_DESTROY | 0.000s | 21 | 0.001s | 0-0-1 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_INIT | 0.000s | 21 | 0.001s | 0-0-0 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_PREPARE | 0.126s | 21 | 0.297s | 0-0-1 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_DATA | 4.928s | 21 | 5.679s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_POST | 0.369s | 21 | 0.626s | 0-0-17 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WRITE_TASK_DESTROY | 0.000s | 21 | 0.000s | 0-0-3 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
SQL_QUERY | 0.071s | 21 | 0.152s | 0-0-3 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
RESULT_NEXT_ALL | 0.665s | 21 | 1.155s | 0-0-9 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
ODPS_BLOCK_CLOSE | 2.090s | 21 | 2.715s | 0-0-12 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WAIT_READ_TIME | 0.613s | 21 | 1.105s | 0-0-5 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
WAIT_WRITE_TIME | 0.375s | 21 | 0.872s | 0-0-14 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
2. record average count and max count task info :
PHASE | AVERAGE RECORDS | AVERAGE BYTES | MAX RECORDS | MAX RECORD`S BYTES | MAX TASK ID | MAX TASK INFO
READ_TASK_DATA | 543232 | 37.46M | 576959 | 40.21M | 0-0-19 | `shardkey`,jdbcUrl:[jdbc:mysql://100.100.64.101:39001/sansi_test]
2020-02-04 16:23:11.979 [job-128859728] INFO MetricReportUtil - reportJobMetric is turn off
2020-02-04 16:23:11.979 [job-128859728] INFO LocalJobContainerCommunicator - Total 11407872 records, 786618535 bytes | Speed 25.01MB/s, 380262 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 7.881s | All Task WaitReaderTime 12.881s | Percentage 100.00%
2020-02-04 16:23:11.979 [job-128859728] INFO LogReportUtil - report datax log is turn off
2020-02-04 16:23:11.979 [job-128859728] INFO JobContainer -
任务启动时刻 : 2020-02-04 16:22:38
任务结束时刻 : 2020-02-04 16:23:11
任务总计耗时 : 32s
任务平均流量 : 25.01MB/s
记录写入速度 : 380262rec/s
读出记录总数 : 11407872
读写失败总数 : 0
2020-02-04 16:23:12 INFO =================================================================
2020-02-04 16:23:12 INFO Exit code of the Shell command 0
2020-02-04 16:23:12 INFO --- Invocation of Shell command completed ---
2020-02-04 16:23:12 INFO Shell run successfully!
2020-02-04 16:23:12 INFO Current task status: FINISH
2020-02-04 16:23:12 INFO Cost time is: 34.068s
/home/admin/alisatasknode/taskinfo//20200204/diide/16/22/34/3bjpuktukrcheai76obp1a5q/T3_0413126136.log-END-EOF
2020-02-04 16:23:14 : Detail log url: https://di-cn-hangzhou.data.aliyun.com/web/di/instanceLog?id=128859728&resourceGroup=group_253861156446274&requestId=0b04ad7b-c3ae-4b3c-a27b-6eaadcd69789&projectId=6043
Exit with SUCCESS.
2020-02-04 16:23:14 [INFO] Sandbox context cleanup temp file success.
2020-02-04 16:23:14 [INFO] Data synchronization ended with return code: [0].
2020-02-04 16:23:14 INFO =================================================================
2020-02-04 16:23:14 INFO Exit code of the Shell command 0
2020-02-04 16:23:14 INFO --- Invocation of Shell command completed ---
2020-02-04 16:23:14 INFO Shell run successfully!
2020-02-04 16:23:14 INFO Current task status: FINISH
2020-02-04 16:23:14 INFO Cost time is: 43.964s
三、三种方案对比
方案 | 数据量 | 数据同步耗时 | 任务整体耗时 | 任务平均流量 | WaitWriterTime | WaitReaderTime |
---|---|---|---|---|---|---|
方案一 | 11407872 | 93s | 95.217s | 8.34MB/s | 48.318s | 2.124s |
方案二 | 11407872 | 63s | 64.674s | 12.50MB/s | 39.038s | 5.589s |
方案三 | 11407872 | 32s | 34.068s | 25.01MB/s | 7.881s | 12.881s |
方案一 | 2851968 | 43s | 44.94s | 4.66MB/s | 16.035s | 0.526s |
方案二 | 2851968 | 23s | 25.177s | 9.32MB/s | 2.714s | 1.705s |
方案三 | 2851968 | 23s | 25.435s | 9.32MB/s | 8.551s | 18.145s |
从方案一与方案二的对比来看,说明通过设置切分键是可以提高数据同步的执行效率,查询表数据量越大,提升效果越明显。使用切分键来进行查询,对数据同步效率提升的同时,对于源库负载也有一定优化效果。
从方案二于方案上的对比来看,说明提高任务期望最大并发数一定程度上也是有益于数据同步的执行效率,查询表数据量越大,提升效果越明显。但是设置任务期望最大并发数时需要考虑表数据量大小以及源库负载可承受最大并发数。