基于dataX实现多种数据源数据汇聚(一)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 在数据中台项目实践过程中,经常需要获取多个部门、多个系统的数据,此时面临多种多样的数据库,如何快速稳定的获取数据,并持续归集到数据中台的数据仓库中,是每个数据中台项目必须解决的问题。本文介绍了我在项目实践过程中,基于dataX实现数据汇聚的一些使用心得,在此和大家分享,希望有所帮助。

1、DataX安装使用文档

1.1 前置条件

Linux JDK(1.8以上,推荐1.8 Python(推荐Python2.6.X)

1.2 下载

https://github.com/alibaba/DataX/blob/master/userGuid.md

选择下载编译好的DataX

1.3 直接解压安装

tar -zxvf 压缩包

1.4 运行

进入bin目录然后运行python datax.py -r {YOUR_READER} -w {YOUR_WRITER} 开始生成模板文件。

1.4.1举例:

python datax.py -r  mysqlREADER  -w   mysql_WRITER

复制生成好的json文件然后根据参数配置,一定要严格按照json文件格式走。

1.4.2这里的python datax.py -r {YOUR_READER} -w {YOUR_WRITER} 根据写的数据源到数据源不同会生成不同的模板

如MySQLMySQL

 {
       "job": {
               "content": [
                       {
                               "reader": {
                                       "name": "mysqlreader",
                                       "parameter": {
                                               "column": [
"id"                        ],
                          //列名需要自己写                       "connection": [                                                       {
                                                               "jdbcUrl": [
"jdbc:mysql:// ip地址:3306/amc"                                ],
                                                             //url数据库链接地址                               "table": ["test1"                                ]//表名需要一定要加双引号                                                       }                       
                        ],
                                               "password": "Ly2n7",
//必写                       "username": "amcc",//必写                       "where": ""                                       }               
                },
                               "writer": {
                                       "name": "mysqlwriter",
                                       "parameter": {
                                               "column": [
"id"                        ],
                                               "connection": [
                                                       {
                                                               "jdbcUrl": "jdbc:mysql://ip地址:3306/amc",
                                                               "table": [
"test2"                                ]                           
                            }                       
                        ],
                                               "password": "L1jhy2f5Iwn7",
                                               "preSql": [
                        ],
                                               "session": [
                        ],
                                               "username": "amcc",
                                               "writeMode": "insert"//必写                                       }               
                }           
            }       
        ],
               "setting": {
                       "speed": {
                               "channel": "3"           
            }       
        }   
    }
}
  1. json文件配置好后放在DataXjob目录下
  2. bin目录下运行命令 python py ../job/你的json文件


2、DataX安装依赖clickhouse

官方不支持clickhouse,需要重新编译

2.1安装maven

1.maven安装参考https://www.cnblogs.com/laoayi/p/12867990.html,如果已安装跳过该步骤

1.下载 官网地址: http://maven.apache.org/download.cgi

curl -O https://mirror.bit.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

2.解压

tar -zxvf apache-maven-3.6.3-bin.tar.gz

3.修改环境变量

vim /etc/profile

exportMAVEN_HOME=/opt/maven/apache-maven-3.6.3
exportPATH=$MAVEN_HOME/bin:$PATH

修改maven镜像地址

vim /opt/maven/apache-maven-3.6.3/conf/settings.xml

添加如下代码:

<mirror><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/repositories/central/</url><mirrorOf>central</mirrorOf></mirror>

source /etc/profile //使用环境变量生效

4.查看是否成功安装

mvn -version

[root@ambari-03 maven]#  mvn -version

Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)

Maven home: /opt/maven/apache-maven-3.6.3

Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/local/java/jdk/jdk1.8.0_181/jre

Default locale: en_US, platform encoding: UTF-8

OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"

2.2 下载datax源码

安装git

yum install curl-devel expat-devel gettext-devel  openssl-devel zlib-devel

git --version //查看是否安装成功

git clone git@github.com:alibaba/DataX.git  //下载datax源码  权限失败需要登录用户

使用另一种curl方式下载源码

curl -O https://gitee.com/jarynpl/DataX/repository/archive/master.zip

unzip master.zip //解压

2.3 通过maven打包

$ cd  {DataX_source_code_home}

$  mvn -U clean package assembly:assembly -Dmaven.test.skip=true

2.4 编译失败问题处理

网址https://github.com/alibaba/datax/issues/676

[ERROR] Failed to execute goal on project clickhousewriter: Could not resolve dependencies for project com.alibaba.datax:clickhousewriter:jar:0.0.1-SNAPSHOT: Could not find artifact com.alibaba.datax:simulator:jar:0.0.1-SNAPSHOT in alimaven (http://maven.aliyun.com/nexus/content/repositories/central/) -> [Help 1]

Clickhousewriter 存在问题,可以在该模块clickhousewriter目录下pom.xml 文件中,注释掉该依赖(该依赖用作测试,代码里面也没有单元测试)

<dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>simulator</artifactId>
            <version>${datax-project-version}</version>
            <scope>test</scope>
        </dependency>

另外,需要把 ClickhouseWriter.java 中 15行,引入的 ClickHouseType去掉,未识别该类

importru.yandex.clickhouse.ClickHouseTuple;

然后尝试编译成功。

打包成功,日志显示如下:

image.png

打包好的文件及tar包在target目录下,复制.gz并解压就可以用


clickhouse的writer配置文件如下:

"writer":{
"name":"clickhousewriter",
"parameter":{
"username":"admin",
"password":"admin",
"column":[
"cldjcs",
"sjyid",
"clbs",
"cph",
"clls",
"gps_rqsj",
"gps_jd",
"gps_wd",
"gps_fx",
"gps_sd",
"gps_wxdw",
"lxid",
"lxbb",
"sxx",
"zdxlh",
"jczzt",
"ktcbj",
"yyzt",
"sjjjbjzt"],
"connection":[
{
"jdbcUrl":"jdbc:clickhouse://10.10.101.2:8123",
"table":[
"tcps_gps_and_zdsj"]
}
],
"batchSize":65536,
"batchByteSize":134217728,
"dryRun":false,
"writeMode":"insert"}
}

3、添加hive的jdbc读

3.1 下载datax3.0到本地,并解压

#我是在Linux上操作的,命令如下:wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
tar -zxvf datax.tar.gz

3.2 下载hive驱动

下载好的hive jdbc驱动,上传到datax/plugin/reader/rdbmsreader/libs目录,直接到第三步。

也可以上Cloudera manager官方文档找到hive jdbc驱动安装页面             https://docs.cloudera.com/documentation/enterprise/latest/topics/hive_jdbc_odbc_driver_install.html

image.png

点击进入下载页面,选择适合自己的版本,鼠标右键复制链接,到Linux进行下载并解压

image.png

#进入上面解压好的datax的rdbmsreader插件的libs目录下cd datax/plugin/reader/rdbmsreader/libs
#在Linux上用刚才复制的链接下载驱动wget https://downloads.cloudera.com/connectors/ClouderaHiveJDBC-2.6.10.1012.zip
#解压到当前目录 unzip ClouderaHiveJDBC-2.6.10.1012.zip#然后从解压后的目录中找到hive驱动jar包HiveJDBC41.jar,复制到libs目录cp HiveJDBC41.jar ./

3.3 修改plugin.json文件

cd  datax/plugin/reader/rdbmsreader
vim plugin.json

在文件中加入hive驱动org.apache.hive.jdbc.HiveDriver和com.cloudera.impala.jdbc41.Driver

{
    "name": "rdbmsreader",
    "class": "com.alibaba.datax.plugin.reader.rdbmsreader.RdbmsReader",
    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
    "developer": "alibaba",
    "drivers":["com.cloudera.impala.jdbc41.Driver","dm.jdbc.driver.DmDriver", "com.sybase.jdbc3.jdbc.SybDriver", "com.edb.Driver", "ru.yandex.clickhouse.ClickHouseDriver", "org.apache.hive.jdbc.HiveDriver"]
}

3.4 编写job文件

我们创建一个json文件,读取数据源为hive,抽取之后将结果打印出来即可。

cd  datax/job
vim hive_rdbms.json
{
"job": {
"setting": {
"speed": {
"channel": 1            }
        },
"content": [
            {
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "default",
"password": "default",
"column": [
"*"                        ],
"connection": [
                            {
"table": [
"bank_data"                                ],
"jdbcUrl": [
"jdbc:hive2://ip:10000/default"                                ]
                            }
                        ]
                    }
                },
"writer": {
"name": "streamwriter",
"parameter": {
"fieldDelimiter": "\t",
"print": "true"                    }
                }
            }
        ]
    }
}

3.5 启动抽取任务进行验证

目录
相关文章
|
2月前
|
DataWorks API 数据库
DataWorks操作报错合集之在使用 OceanBase (OB) 作为数据源进行数据集成时遇到报错,该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
SQL DataWorks 关系型数据库
DataWorks产品使用合集之数据集成时源头提供数据库自定义函数调用返回数据,数据源端是否可以写自定义SQL实现
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
38 0
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
|
2月前
|
DataWorks 安全 API
DataWorks产品使用合集之是否可以不使用DataWorks进行EMR的调度和DataX数据导入
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
数据采集 分布式计算 大数据
MaxCompute产品使用合集之数据集成中进行数据抽取时,是否可以定义使用和源数据库一样的字符集进行抽取
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2月前
|
分布式计算 DataWorks 数据挖掘
DataWorks操作报错合集之上传数据时报错com.alibaba.datax.common.exception.DataXException: Code:[UnstructuredStorageReader-11],该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
数据采集 SQL DataWorks
【颠覆想象的数据巨匠】DataWorks——远超Excel的全能数据集成与管理平台:一场电商数据蜕变之旅的大揭秘!
【8月更文挑战第7天】随着大数据技术的发展,企业对数据处理的需求日益增长。DataWorks作为阿里云提供的数据集成与管理平台,为企业提供从数据采集、清洗、加工到应用的一站式解决方案。不同于桌面级工具如Excel,DataWorks具备强大的数据处理能力和丰富的功能集,支持大规模数据处理任务。本文通过电商平台案例,展示了如何使用DataWorks构建数据处理流程,包括多源数据接入、SQL任务实现数据采集、数据清洗加工以提高质量,以及利用分析工具挖掘数据价值的过程。这不仅凸显了DataWorks在大数据处理中的核心功能与优势,还展示了其相较于传统工具的高扩展性和灵活性。
70 0
|
3月前
|
数据采集 供应链 搜索推荐
数据集成:融合不同来源的数据
【6月更文挑战第4天】数据集成在企业中发挥关键作用,连接数据孤岛,促进信息流动,提升决策能力。通过抽取、清洗、转换和加载(ETL)不同来源、格式的数据,整合到统一框架,进行深度分析。以零售商为例,集成销售、客户和供应链数据可优化库存管理。数据清洗确保质量,转换满足分析需求,最终加载到数据仓库。Python和pandas库是实现这一过程的工具之一。随着技术进步,数据集成将推动企业向智能化和个性化发展。
86 2
|
3月前
|
DataWorks 监控 数据可视化
DataWorks产品使用合集之独享资源包括独享调度资源、独享数据集成资源、独享数据服务资源等的区别是什么
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之数据源同步时,使用脚本模式采集mysql数据到odps中,使用querySql方式采集数据,在脚本中删除了Reader中的column,但是datax还是报错OriginalConfPretreatmentUtil - 您的配置有误。如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。

热门文章

最新文章