5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(二)

2、配置文件说明

(1)Reader参数说明

b0c1640b029c44eeb645b594f4085fb2.png

3、提交任务

(1)清空历史数据

hadoop fs -rm -r -f /base_province/*

(2)进入DataX根目录

(3)执行如下命令

python bin/datax.py job/base_province.json

4、查看结果

(1)DataX打印日志


022dfb98451d4f65ba0b0af927141d6d.png

(2)查看HDFS文件

hadoop fs -cat /base_province/* | zcat

4.2.3 DataX传参

通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。

DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。

1、编写配置文件

(1)修改配置文件base_province.json

(2)配置文件内容如下

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "querySql": [
                                    "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
                                ]
                            }
                        ],
                        "password": "000000",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province/${dt}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

2、提交任务

(1)创建目标路径

hadoop fs -mkdir /base_province/2020-06-14

(2)进入DataX根目录

(3)执行如下命令

python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json

3、查看结果

hadoop fs -ls /base_province

4.3 同步HDFS数据到MySQL案例

案例要求:同步HDFS上的/base_province目录下的数据到MySQL gmall 数据库下的test_province表。

需求分析:要实现该功能,需选用HDFSReader和MySQLWriter。

1、编写配置文件

(1)创建配置文件test_province.json

(2)配置文件内容如下

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop102:8020",
                        "path": "/base_province",
                        "column": [
                            "*"
                        ],
                        "fileType": "text",
                        "compress": "gzip",
                        "encoding": "UTF-8",
                        "nullFormat": "\\N",
                        "fieldDelimiter": "\t",
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "username": "root",
                        "password": "000000",
                        "connection": [
                            {
                                "table": [
                                    "test_province"
                                ],
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"
                            }
                        ],
                        "column": [
                            "id",
                            "name",
                            "region_id",
                            "area_code",
                            "iso_code",
                            "iso_3166_2"
                        ],
                        "writeMode": "replace"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

2、配置文件说明

(1)Reader参数说明

57f54d6eab984006bcb32c7ab8f3feee.png

(2)Writer参数说明



cb53991a0622466c8b051bd1391f550b.png

3、提交任务

(1)在MySQL中创建gmall.test_province表

DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province`  (
  `id` bigint(20) NOT NULL,
  `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

(2)进入DataX根目录

(3)执行如下命令

python bin/datax.py job/test_province.json

4、查看结果

(1)DataX打印日志

(2)查看MySQL目标表数据

4852d168ef7d46e58357cd3fbdf6bd45.png

5、DataX优化

5.1 速度控制

DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。

bf447172a08546ea98c0b2a7bc32c27c.png

注意事项:

1.若配置了总record限速,则必须配置单个channel的record限速

2.若配置了总byte限速,则必须配置单个channe的byte限速

3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:

计算公式为:

min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)

5.2 内存调整

当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。

建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。

调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm=“-Xms8G -Xmx8G” /path/to/your/job.json


相关文章
|
20天前
|
存储 分布式计算 安全
bigdata-07-Hdfs原理到实战
bigdata-07-Hdfs原理到实战
62 0
|
20天前
|
Java 数据处理 调度
Dataphin常见问题之离线管道同步数据datax就报连接超时如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
20天前
|
存储 SQL 关系型数据库
ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
ClickHouse的核心架构包括执行过程和数据存储两部分。执行过程涉及Parser与Interpreter解析SQL,通过Column、DataType、Block、Functions和Storage模块处理数据。Column是内存中列的表示,Field处理单个值,DataType负责序列化和反序列化,Block是内存中表的子集,Block Streams处理数据流。Storage代表表,使用不同的引擎如StorageMergeTree。数据存储基于分片和副本,1个分片由多个副本组成,每个节点只能拥有1个分片。
149 0
ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
|
20天前
|
DataWorks 安全 关系型数据库
DataWorks常见问题之使用公共数据集成资源组进行同步失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
20天前
|
存储 分布式计算 Hadoop
Hadoop【基础知识 01】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 01】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)
112 3
|
20天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
158 2
|
8天前
|
存储 关系型数据库 MySQL
【MySQL】存储引擎简介、存储引擎特点、存储引擎区别
【MySQL】存储引擎简介、存储引擎特点、存储引擎区别
22 2
|
9天前
|
SQL 存储 关系型数据库
Hive 和 HDFS、MySQL 之间的关系
Hive是Hadoop上的数据仓库工具,用HiveQL进行大数据查询;HDFS是分布式文件系统,用于存储大规模数据,常与Hive结合,提供数据存储和高可靠性。MySQL是RDBMS,适用于结构化数据管理,在大数据环境里可存储Hive的元数据,提升查询效率和元数据管理。三者协同处理数据管理和分析任务。
|
20天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
29 0
|
20天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
33 1