datax常见问题

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: datax常见问题

1、如何调整jvm的参数?
调整datax.py文件中DEFAULT_JVM的值即可

2、插件对应的参数具体含义以及报错之后该如何解决,以MysqlReader为例

  "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "root",
                    "password": "root",
                    "column": [
                        "id",
                        "name"
                    ],
                    "splitPk": "db_id",
                    "connection": [
                        {
                            "table": [
                                "table"
                            ],
                            "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
                        }
                    ]
                }
            }
            

参数说明:splitPk代表切分主键,对于单表进行切分的时候使用 ,如对于表a,存在自增主键id,可以通过id < 1000,id>=1000 & id<2000 ,id>=2000切分成3个sql进行读取,另外,根据connection中的table和jdbcUrl配置项是list可知,这里可以配置多个,多个之间会去笛卡尔积,也就是可以配置任意多的数据库和表,这些库的密码必须保持一致,下面说明一下上述参数在哪里使用

根据前面代码分析可以知道,整个datax的task会被切分多少个是由reader端的split函数决定的,查看mysqlReader的的split函数

    public List<Configuration> split(int adviceNumber) {
        return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
    }
    继续深入进去可以看到实际执行为
    public static List<Configuration> doSplit(
        Configuration originalSliceConfig, int adviceNumber) {
    boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
    int eachTableShouldSplittedNumber = -1;
    if (isTableMode) {
        // adviceNumber这里是channel数量大小, 即datax并发task数量
        // eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经
        eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
                adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
    }

    String column = originalSliceConfig.getString(Key.COLUMN);
    String where = originalSliceConfig.getString(Key.WHERE, null);

    List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);

    List<Configuration> splittedConfigs = new ArrayList<Configuration>();

    for (int i = 0, len = conns.size(); i < len; i++) {
        Configuration sliceConfig = originalSliceConfig.clone();

        Configuration connConf = Configuration.from(conns.get(i).toString());
        String jdbcUrl = connConf.getString(Key.JDBC_URL);
        sliceConfig.set(Key.JDBC_URL, jdbcUrl);

        // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
        sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));

        sliceConfig.remove(Constant.CONN_MARK);

        Configuration tempSlice;

        // 说明是配置的 table 方式
        if (isTableMode) {
            // 已在之前进行了扩展和`处理,可以直接使用
            List<String> tables = connConf.getList(Key.TABLE, String.class);

            Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误.");

            String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);

            //最终切分份数不一定等于 eachTableShouldSplittedNumber
            boolean needSplitTable = eachTableShouldSplittedNumber > 1
                    && StringUtils.isNotBlank(splitPk);
            if (needSplitTable) {
                if (tables.size() == 1) {
                    //原来:如果是单表的,主键切分num=num*2+1
                    // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑
                    //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
                    
                    //考虑其他比率数字?(splitPk is null, 忽略此长尾)
                    eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
                }
                // 尝试对每个表,切分为eachTableShouldSplittedNumber 份
                for (String table : tables) {
                    tempSlice = sliceConfig.clone();
                    tempSlice.set(Key.TABLE, table);

                    List<Configuration> splittedSlices = SingleTableSplitUtil
                            .splitSingleTable(tempSlice, eachTableShouldSplittedNumber);

                    splittedConfigs.addAll(splittedSlices);
                }
            } else {
                for (String table : tables) {
                    tempSlice = sliceConfig.clone();
                    tempSlice.set(Key.TABLE, table);
                    String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
                    tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
                    splittedConfigs.add(tempSlice);
                }
            }
        } else {
            // 说明是配置的 querySql 方式
            List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);

            // TODO 是否check 配置为多条语句??
            for (String querySql : sqls) {
                tempSlice = sliceConfig.clone();
                tempSlice.set(Key.QUERY_SQL, querySql);
                splittedConfigs.add(tempSlice);
            }
        }

    }

    return splittedConfigs;
}

在这里可以看到整个reader端是如何切分task,针对个connection下的每个表都切分成一个task(task的配置通过Configuration保存),即求笛卡尔积,同时,这里也考虑了splitPk的情况,通过源码可以直接看到每个参数的作用,在遇到插件的相关问题时,可以直接找到对应的插件函数进行排查

目录
相关文章
|
7月前
|
SQL DataWorks 关系型数据库
DataWorks报错问题之dataX数据导入报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
DataWorks报错问题之dataX数据导入报错如何解决
|
7月前
|
SQL DataWorks NoSQL
DataWorks报错问题之datax mongodb全量迁移报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
7月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之dataworks中lasticseatch8.9和logstash版本兼容问题如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
7月前
|
分布式计算 DataWorks 关系型数据库
DataWorks常见问题之dataworks100g大小的csv文件上传到odps失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
7月前
|
SQL 分布式计算 Oracle
数据同步工具DataX的安装
数据同步工具DataX的安装
1391 0
|
5月前
|
弹性计算 分布式计算 DataWorks
MaxCompute操作报错合集之运行pyodps报错超时,该如何排查
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之如何解决datax同步任务时报错ODPS-0410042:Invalid signature value
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
81 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
7月前
|
存储 监控 关系型数据库
DataX 概述、部署、数据同步运用示例
DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。
2896 5