基于dataX实现多种数据源数据汇聚(一)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 在数据中台项目实践过程中,经常需要获取多个部门、多个系统的数据,此时面临多种多样的数据库,如何快速稳定的获取数据,并持续归集到数据中台的数据仓库中,是每个数据中台项目必须解决的问题。本文介绍了我在项目实践过程中,基于dataX实现数据汇聚的一些使用心得,在此和大家分享,希望有所帮助。

1、DataX安装使用文档

1.1 前置条件

Linux JDK(1.8以上,推荐1.8 Python(推荐Python2.6.X)

1.2 下载

https://github.com/alibaba/DataX/blob/master/userGuid.md

选择下载编译好的DataX

1.3 直接解压安装

tar -zxvf 压缩包

1.4 运行

进入bin目录然后运行python datax.py -r {YOUR_READER} -w {YOUR_WRITER} 开始生成模板文件。

1.4.1举例:

python datax.py -r  mysqlREADER  -w   mysql_WRITER

复制生成好的json文件然后根据参数配置,一定要严格按照json文件格式走。

1.4.2这里的python datax.py -r {YOUR_READER} -w {YOUR_WRITER} 根据写的数据源到数据源不同会生成不同的模板

如MySQLMySQL

 {
       "job": {
               "content": [
                       {
                               "reader": {
                                       "name": "mysqlreader",
                                       "parameter": {
                                               "column": [
"id"                        ],
                          //列名需要自己写                       "connection": [                                                       {
                                                               "jdbcUrl": [
"jdbc:mysql:// ip地址:3306/amc"                                ],
                                                             //url数据库链接地址                               "table": ["test1"                                ]//表名需要一定要加双引号                                                       }                       
                        ],
                                               "password": "Ly2n7",
//必写                       "username": "amcc",//必写                       "where": ""                                       }               
                },
                               "writer": {
                                       "name": "mysqlwriter",
                                       "parameter": {
                                               "column": [
"id"                        ],
                                               "connection": [
                                                       {
                                                               "jdbcUrl": "jdbc:mysql://ip地址:3306/amc",
                                                               "table": [
"test2"                                ]                           
                            }                       
                        ],
                                               "password": "L1jhy2f5Iwn7",
                                               "preSql": [
                        ],
                                               "session": [
                        ],
                                               "username": "amcc",
                                               "writeMode": "insert"//必写                                       }               
                }           
            }       
        ],
               "setting": {
                       "speed": {
                               "channel": "3"           
            }       
        }   
    }
}
  1. json文件配置好后放在DataXjob目录下
  2. bin目录下运行命令 python py ../job/你的json文件


2、DataX安装依赖clickhouse

官方不支持clickhouse,需要重新编译

2.1安装maven

1.maven安装参考https://www.cnblogs.com/laoayi/p/12867990.html,如果已安装跳过该步骤

1.下载 官网地址: http://maven.apache.org/download.cgi

curl -O https://mirror.bit.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

2.解压

tar -zxvf apache-maven-3.6.3-bin.tar.gz

3.修改环境变量

vim /etc/profile

exportMAVEN_HOME=/opt/maven/apache-maven-3.6.3
exportPATH=$MAVEN_HOME/bin:$PATH

修改maven镜像地址

vim /opt/maven/apache-maven-3.6.3/conf/settings.xml

添加如下代码:

<mirror><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/repositories/central/</url><mirrorOf>central</mirrorOf></mirror>

source /etc/profile //使用环境变量生效

4.查看是否成功安装

mvn -version

[root@ambari-03 maven]#  mvn -version

Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)

Maven home: /opt/maven/apache-maven-3.6.3

Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/local/java/jdk/jdk1.8.0_181/jre

Default locale: en_US, platform encoding: UTF-8

OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"

2.2 下载datax源码

安装git

yum install curl-devel expat-devel gettext-devel  openssl-devel zlib-devel

git --version //查看是否安装成功

git clone git@github.com:alibaba/DataX.git  //下载datax源码  权限失败需要登录用户

使用另一种curl方式下载源码

curl -O https://gitee.com/jarynpl/DataX/repository/archive/master.zip

unzip master.zip //解压

2.3 通过maven打包

$ cd  {DataX_source_code_home}

$  mvn -U clean package assembly:assembly -Dmaven.test.skip=true

2.4 编译失败问题处理

网址https://github.com/alibaba/datax/issues/676

[ERROR] Failed to execute goal on project clickhousewriter: Could not resolve dependencies for project com.alibaba.datax:clickhousewriter:jar:0.0.1-SNAPSHOT: Could not find artifact com.alibaba.datax:simulator:jar:0.0.1-SNAPSHOT in alimaven (http://maven.aliyun.com/nexus/content/repositories/central/) -> [Help 1]

Clickhousewriter 存在问题,可以在该模块clickhousewriter目录下pom.xml 文件中,注释掉该依赖(该依赖用作测试,代码里面也没有单元测试)

<dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>simulator</artifactId>
            <version>${datax-project-version}</version>
            <scope>test</scope>
        </dependency>

另外,需要把 ClickhouseWriter.java 中 15行,引入的 ClickHouseType去掉,未识别该类

importru.yandex.clickhouse.ClickHouseTuple;

然后尝试编译成功。

打包成功,日志显示如下:

image.png

打包好的文件及tar包在target目录下,复制.gz并解压就可以用


clickhouse的writer配置文件如下:

"writer":{
"name":"clickhousewriter",
"parameter":{
"username":"admin",
"password":"admin",
"column":[
"cldjcs",
"sjyid",
"clbs",
"cph",
"clls",
"gps_rqsj",
"gps_jd",
"gps_wd",
"gps_fx",
"gps_sd",
"gps_wxdw",
"lxid",
"lxbb",
"sxx",
"zdxlh",
"jczzt",
"ktcbj",
"yyzt",
"sjjjbjzt"],
"connection":[
{
"jdbcUrl":"jdbc:clickhouse://10.10.101.2:8123",
"table":[
"tcps_gps_and_zdsj"]
}
],
"batchSize":65536,
"batchByteSize":134217728,
"dryRun":false,
"writeMode":"insert"}
}

3、添加hive的jdbc读

3.1 下载datax3.0到本地,并解压

#我是在Linux上操作的,命令如下:wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
tar -zxvf datax.tar.gz

3.2 下载hive驱动

下载好的hive jdbc驱动,上传到datax/plugin/reader/rdbmsreader/libs目录,直接到第三步。

也可以上Cloudera manager官方文档找到hive jdbc驱动安装页面             https://docs.cloudera.com/documentation/enterprise/latest/topics/hive_jdbc_odbc_driver_install.html

image.png

点击进入下载页面,选择适合自己的版本,鼠标右键复制链接,到Linux进行下载并解压

image.png

#进入上面解压好的datax的rdbmsreader插件的libs目录下cd datax/plugin/reader/rdbmsreader/libs
#在Linux上用刚才复制的链接下载驱动wget https://downloads.cloudera.com/connectors/ClouderaHiveJDBC-2.6.10.1012.zip
#解压到当前目录 unzip ClouderaHiveJDBC-2.6.10.1012.zip#然后从解压后的目录中找到hive驱动jar包HiveJDBC41.jar,复制到libs目录cp HiveJDBC41.jar ./

3.3 修改plugin.json文件

cd  datax/plugin/reader/rdbmsreader
vim plugin.json

在文件中加入hive驱动org.apache.hive.jdbc.HiveDriver和com.cloudera.impala.jdbc41.Driver

{
    "name": "rdbmsreader",
    "class": "com.alibaba.datax.plugin.reader.rdbmsreader.RdbmsReader",
    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
    "developer": "alibaba",
    "drivers":["com.cloudera.impala.jdbc41.Driver","dm.jdbc.driver.DmDriver", "com.sybase.jdbc3.jdbc.SybDriver", "com.edb.Driver", "ru.yandex.clickhouse.ClickHouseDriver", "org.apache.hive.jdbc.HiveDriver"]
}

3.4 编写job文件

我们创建一个json文件,读取数据源为hive,抽取之后将结果打印出来即可。

cd  datax/job
vim hive_rdbms.json
{
"job": {
"setting": {
"speed": {
"channel": 1            }
        },
"content": [
            {
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "default",
"password": "default",
"column": [
"*"                        ],
"connection": [
                            {
"table": [
"bank_data"                                ],
"jdbcUrl": [
"jdbc:hive2://ip:10000/default"                                ]
                            }
                        ]
                    }
                },
"writer": {
"name": "streamwriter",
"parameter": {
"fieldDelimiter": "\t",
"print": "true"                    }
                }
            }
        ]
    }
}

3.5 启动抽取任务进行验证

目录
相关文章
|
1月前
|
机器学习/深度学习 SQL 大数据
什么是数据集成?和数据融合有什么区别?
在大数据领域,“数据集成”与“数据融合”常被混淆。数据集成关注数据的物理集中,解决“数据从哪来”的问题;数据融合则侧重逻辑协同,解决“数据怎么用”的问题。两者相辅相成,集成是基础,融合是价值提升的关键。理解其差异,有助于企业释放数据潜力,避免“数据堆积”或“盲目融合”的误区,实现数据从成本到生产力的转变。
什么是数据集成?和数据融合有什么区别?
|
6月前
|
JSON 分布式计算 DataX
【YashanDB知识库】使用DataX工具迁移yashan数据到maxcompute
本文介绍使用崖山适配的DataX工具进行数据库迁移的方法,包括单表迁移和批量表迁移。单表迁移需配置json文件并执行同步命令;批量迁移则通过脚本自动化生成json配置文件并完成数据迁移,最后提供数据比对功能验证迁移结果。具体步骤涵盖连接信息配置、表清单获取、json文件生成、数据迁移执行及日志记录,确保数据一致性。相关工具和脚本简化了复杂迁移过程,提升效率。
|
9月前
|
人工智能 安全 DataX
【瓴羊数据荟】 Data x AI :大模型时代的数据治理创新实践 | 瓴羊数据Meet Up城市行第三期
第三期瓴羊数据Meetup 将于2025年1月3日在线上与大家见面,共同探讨AI时代的数据治理实践。
729 10
【瓴羊数据荟】 Data x  AI :大模型时代的数据治理创新实践 | 瓴羊数据Meet Up城市行第三期
|
SQL DataWorks 关系型数据库
DataWorks产品使用合集之数据集成时源头提供数据库自定义函数调用返回数据,数据源端是否可以写自定义SQL实现
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
DataWorks 安全 API
DataWorks产品使用合集之是否可以不使用DataWorks进行EMR的调度和DataX数据导入
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
数据采集 分布式计算 大数据
MaxCompute产品使用合集之数据集成中进行数据抽取时,是否可以定义使用和源数据库一样的字符集进行抽取
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
145 1
|
数据采集 SQL DataWorks
【颠覆想象的数据巨匠】DataWorks——远超Excel的全能数据集成与管理平台:一场电商数据蜕变之旅的大揭秘!
【8月更文挑战第7天】随着大数据技术的发展,企业对数据处理的需求日益增长。DataWorks作为阿里云提供的数据集成与管理平台,为企业提供从数据采集、清洗、加工到应用的一站式解决方案。不同于桌面级工具如Excel,DataWorks具备强大的数据处理能力和丰富的功能集,支持大规模数据处理任务。本文通过电商平台案例,展示了如何使用DataWorks构建数据处理流程,包括多源数据接入、SQL任务实现数据采集、数据清洗加工以提高质量,以及利用分析工具挖掘数据价值的过程。这不仅凸显了DataWorks在大数据处理中的核心功能与优势,还展示了其相较于传统工具的高扩展性和灵活性。
339 0
|
6月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
278 6
|
6月前
|
DataWorks 关系型数据库 Serverless
DataWorks数据集成同步至Hologres能力介绍
本文由DataWorks PD王喆分享,介绍DataWorks数据集成同步至Hologres的能力。DataWorks提供低成本、高效率的全场景数据同步方案,支持离线与实时同步。通过Serverless资源组,实现灵活付费与动态扩缩容,提升隔离性和安全性。文章还详细演示了MySQL和ClickHouse整库同步至Hologres的过程。
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成并发数不支持批量修改,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
158 0

热门文章

最新文章