开发者学堂课程【SaaS 模式云数据仓库实战:Hadoop 数据如何同步至 MaxCompute 】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/332/detail/3717
Hadoop 数据如何同步至 MaxCompute
内容简介:
一、MMA 功能介绍、技术架构和原理
二、MMA 数据迁移操作演示
一、MMA 功能介绍、技术架构和原理
1、覆盖的主要场景
工作负载 |
Hadoop开源生态 |
MaxCompute产品组件和生态工具 |
批处理 |
Hive SQL |
MaxCompute SQL |
MapReduce |
MaxCompute MR |
|
Apache Spark |
MaxCompute Spark |
|
存储 |
HDFS/Hive数据存储 |
MaxCompute Table,OOS |
数据集成 |
Sqoop 、Kettle |
Datawroks数据集成 |
编排&调度 |
Oozie、Airflow等作业调度工具 |
Datawroks Studio |
2、主要功能
迁移评估分析
(1)对 Hadoop 平台进行诊断分析,评估数据迁移规模、作业迁移改造的数量、预估迁移后的成本,从而对迁移工作进行整体评估和决策
数据迁移自动化
(2)对 Hive Meta及数据进行检測描,自动在 MaxCompute 创建对应的 Meta,同时根据不同的网络环境,将 Hive 的数据自动转换并高吞吐地加载到 MaxCompute 上,支持从 TB 级到 PB 级数据的迁移上云
作业兼容性分析
对 Hive 作业进行兼容性分析,识别出需要修改的任务并提供针对性的兼容性修改建议。对于用自定义辑的分析任务,如 UDF、MR/Spark 作业等,我们将给出一般性的改造建议供用戶参考(语法分析)
工作流迁移
对主流数据集成工具 Sqoop 进行作业的迁移转换,并自动创建 Dataworks 数据集成作业:支持主流 Pipeline 工具,如 Ooie、Azkabar、Airflov 等自动迁移转化,井自动创建为 Dataworks 工作流及调度作业
左边是客户的集群,右边是阿里云的大数据服务,主要是 Dataworks 和MaxCompute,左边的那个工具会跑在客户集群上面,需要在客户服务器上,这台服务器需要能访问你的 hive,在这样一条机器上你需要部署你的MMA后端工具,然后这个工具会帮你自动的去获取 hive meta 里的数据,还支持把 meta 信息自动转换成MaxComputer 的地点 L,然后用地点L批量的在 MaxComputer 创建表以上为前三步。
然后批量创建完表后,我们会批量的拉起数据和作业,这个作业其实就是像你的这个hive 上去提交 hivecircle 作业,然后调用 UDF,UDF 上会集成我们的 FDK,然后把数据批量写到 MaxComputer 上,完成数据的迁移。作业和工作流,是基于 MMA 工具自动发现的 hivemeta 数据,基于这个数据会做一整个工作流的一些作业的检查,包括批量把一些自己的工作流的组件的配置转化成 Datawork 的配置,这样的话他会直接给你生成 Datawork 的工作流,这就完成了整个,数据作业到工作流的迁移。
客户端加服务端的共同努力,才完成了数据加客户流的批量迁移
3、MMA Agent 的工作流程
1、Metadata 抓取
2、MaxCompute DDL 与 Hive UDTF 生成
3、MaxCompute 表创建
4、Hive 数据迁移
客户端实现的自动 Meta 获取已经批量批量创建表且同步数据的工作原理图
有四个主要组件,第一个是 Meta Carrier 工具,这个工具实际上是要连接到你的HiveMeta 里的,它自动会把 Meta 信息给拉出来,然后在本地生成一个 HiveMeta 的结构。
第二个叫 MetaProcessors 是基于第一个工具产生的结果 HiveMeta 的数据将它批量转成 Meta Processor 的 DDL,即建表语句,包括了数据类型的转换等
第三步我们整个工具内置了 ODPS Console,基于 Console 把第二步产生的 ODPS DDL 批量的通过 Console 在 MaxCompute 上创建表
最后一步基于 Data Carrier 批量的创建 Hive circle 的作业,相当于多个表运行的数据的同步。
如何用MMA
环境准备:
1、jdk1.6+
2、Python3+
3、Hive Client
4、能访问Hive Server
5、网络连接MaxCompute
下载和编译工具包
有两种方法,一种直接提供编译好的工具包下载,另一种根据官网上提供的github 地址去登陆源码。
1、下载源:
切换到 odps-datacarrier--develop 分支,
https://github.com/aliyun/aliyun-maxcompute-data collectors?spm=a2c4g.11186623.2.8.422c4c07MdilpQ
2、解压下载的
Jaliyun-maxcompute-data--collectorsodps-datacarrier-develop.zip 文件
3、在控制台运行 odps-data--carrier 目录下的 build.py 文件,编译生成 MMA 工具
二、MMA数据迁移操作演示
MMA Agent 操作说明 Hive Metadata 自动采集
1、使用meta-carrier采集Hive Metadata
解压工具包:odps-data-carrier.zip,工具目录结构如下:
odps-data-carrier/
bin
meta-carrier //用于获取Hive setadata的工具
meta-processor // 用于生成ODPS DDL及Hive UDTF SQL odps_ddl_runner.py //用于运行ODPS DDL的工具
hive_udtf_sql_runner.py //用于运行Hive UDTF SQL的工具
network-seasuresent-tool // 用于检查用户网络环境、选择合适的endpoint的工具
sql-checker // 用于检查用户Hive SQL是否能直接在ODPS上运行的工具,语法检查
odps_config.ini // 用户ODPS配置
extra_settings.ini // 数据迁移时需要增加的额外Hadoop/Hive配置
获取Hive metadata
Usage:
Meta-carrier-u <uri> -o <output dir> {-h} {-d <database>} {-t <table>}
-h,--help // 打印help选项
-o,--output-dir <output-dir> //必填,指定一个输出的目录
-u,--uri <uri // 必填,指定hive metastore service的thrift地址
-d,--database <database> // 可选,指定一个database
-t,--table <table> // 可选,指定一个table
输入地址:
root@emr-header-2 odps-data-carrier]# bin/meta-carrier-u thrift://127.0.0.1:9083 -d dma_demo -o meta
它将把 hive meta 运行结果存放到指定文档中去。运行结果得到一个 meta 目录
Meta/
dma_demo.json//用来描述数据库的
partition_meta
Inventory.json
test_partition.json
table_meta//将所有表集的信息写在下面
catalog_sales.json
inventory.json
store_sales.json
test_partition.json
web_sales.json
3 directories,9 files整个库的
{root@emr-header-1odps-data-carrier}#cat meta/dma_demo.json
{
“databaseName”:”dma_demo”.//json结构有三个name名字
“odpsprojectName”:”dma_demo”,//
“dropTableIfExists”:false//如果不一致可以在这修改,默认没有打开。
}{root@emr-header-1odps-data-carrier}#cat meta/dma_demo/dma-d
以上为库的字段信息,还有表的也是如此。
将 hive meta 库的信息拉出来后,下一步将 hive meta 转化为 Max compute 地点。用下一个命令
{root@emr-header-1odps-data-carrier}#bin/meta-processor-h
[root@emr-header-1odps-data-carrier]#bin/meta-processor -meta-processor -i meta -o output
会生成一个 output 目录。
Example:
导出全部 metadata:
Sh meta-carrier -u thrift://127.0.0.1:9083 -o meta
会生成一个 meta 目录,
导出某个 database 的 metadata:
Sh meta-carrier -u thrift://127.0.0.1:9083 -o meta -d test_db
导出某张表的 metadata:
Sh meta-carrier -u thrift://127.0.0.1:9083 -o meta -d test_db -t test_tbl
结果 hive metadata 的目录结构
output:
输出一个目录,结构如下:
{output directory}
global.json
{database name}
{database name}.}json
table_meta
{table name}.json
partition_seta
{table name}.}json
说明:
(1)、global.json 是一个全局的配置文件,包含了整个迁移过程中的一些配置
(2)、每一个 database 会有一个独立的目录
(3)、每一个表有一个以表命名的 json 文件
(4)、如果是分区表,还会有一个以表名为命名的 partition 的 json 文件
接下来应该批量创建表,批量创建表用 python 命令
[root@emr-header-1odps-data-carrier]#python36 bin/odps_ddl_runner.py -h
Usage:odps_ddl_runner.py [-h] –input INPUT [--odpscmd ODPSCMD]
Run ODPS DDL automatically.
Optional arguments:
-h,--help show this help messaged and exit
--input INPUT path to directory generated by meta
processor
--odpscmd ODPSCMD path to odpscmd executable//不是必须指定的,不需要单独配置参数
[root@emr-header-1odps-data-carrier]#
[root@emr-header-1odps-data-carrier]#
[root@emr-header-1odps-data-carrier]# python36 bin/odps_ddl_runner.py –input output//批量创建表的输入
用odps@ dma_demo>show tables: //去查看是否创建好表
然后去hive中查看分区是否建好:
odps@ dma_demo>show partitions inventory:
查看表结构:
odps@ dma_demo>desc catalog_sales:
表创建好后就可清除数据:
[root@emr-header-1odps-data-carrier]# python36 bin/hive_udtf_sql_runner.py
Run hive UDTF SQL automatically.
optional arguments:
-h,--help show this help message and exit
—input_all INPUT_ALL
path to directory generated by meta processor//参数如下
-input_single_file INPUT_SINGLE_FILE
path to a single sql file
-settings SETTINGS path to extra settings to set before running a hive sql
-parallelism PARALLELISM
max paralelism of running hive sql
[root@emr-header-1odps-data- carrier]# Python36 bin/hive_udtfsql_runnerpy-input_all output
数据迁移完后,如果数据与 hive 中相同,则说明数据完整的迁移过去了。
做单分区来测试单分区的迁移
2、使用 network-measurement-tool
测试 Hadoop 集群到 MaxCompute 各 Region 的网络连通质量
测试网络上下行传输速率
使用方法
Usage:
Network-measure-tool –mode FIND|TEST
Options:
--mode<mode> FIND (find avaitable endpoints)
Or TEST (test performance of a
Single endpoint)
--endpoint <endpoint> ODPS endpoint,required inTEST
mode
-h.—help Print help information
-p,--access-key <access-key> ODPS access key, required in
TEST mode
--project <project> ODPS project name, required in
TEST mode
-t,--num-thread <num-thread> Number of thread
--tunnel-endpoint <tunnel-endpoint> ODPS tunnel endpoint,optional
-u,--access-id <access-id> ODPS access id,required in TEST
Mode
Example:
Example;
查找可用endpoint:
Odps-data-carrier/bin/network-measurement-tool—mode find
Output:
各个 endpoint 的连接情况(只输出可以连接的):
ENDPOINT:
EXTERNAL-BEIJING:
http://service.cn.maxcompute.aliyum.com/api
AVAILABILITY: ture
ELAPSED TINE(ms):5
Example:
测试某个 endpoint 的读写性能:
Odps-data-carrier/bin/network-measurement-tool –mode test \
--endpoint <endpoint to test>\
-u <access id> -p <access key> \
--project=<project name> --num-thread <number of thread>
3、使用meta-processor生成ODPS DDL和Hive UDTF SQL
修改 globle.json,自定义表、字段的生成规则:
globle.json:
{
“datasourceType” :”Hive”,//目前仅支持Hive
“odpsVersion” “:1.0”,//默认ODPS版本为1.0
“hiveCompatible” : false // 在ODPS.2.0下,是否打开hive compatible开关
}
[database.name].json
{
“databaseName” : XXX,//Hive数据库名
“odpsProjectName” : XXX //对应的ODPS项目名,默认与Hive数据库名相同
}
[table name].json:
{
“tableName” : XXX,//Hive表名
“odpsTableName” : “XXX “//对应的ODPS表名,默认与Hive表名相同
“lifeCycle” : 10, // ODPS表的life cycle,默认为空,即不启用life cycle
“commene” : “XXX”, //ODPS表的comment,默认为空
“ifNotExists” : ture, // 创建ODPS表时是否加if not exists,默认不加
“cotumns” : {
{
“names” :column_l”,//Hive列名
“odpsColumName”, : “odps_column_1”,//对应的odps列名,默认与Hive列名相同 “type” : “bigint”,//ODPS列的类型,用户暂时不可自行修改
“comment” : “XXX” //ODPS列的comment
},
…
],
“partitioncolumns” : [
{
“name” : “column_1”,//Hive分区判名
“odpsColumnName” : “odps_column_1”,//对应的ODPS分区判名,默认与Hive分区判名相同
“type” : “bigint”,//ODPS分区判类型
“comment” : “xxx” //ODPS分区列的comment
},
…
]
}
生成 ODPS DDL 和 Hive UDTF SQL 了,用法及结果如下:
Usage:
./meta-processor -I <metadata directory> -o <output> directory>
Options:
-h,--help print help information
-I,--input-dir <input-dir> Directary generated by meta carrier
-o,--output-dir <output-dir> output directory generated by meta processor
Example:
./meta_processor -I meta -o output
output:
A directory, its structure is as fellows:
[output directory]
Report.htm]
[database name]
odps_ddl
tbles
{table name}.sql
partitions
{table name}.sql
hive_udtf_sql
single_partition
{table name}.sql
multi_partitions
{table name}.sql
4、使用 sql-checker 检查 Hive SQL 是否可以直接在 MaxCompute 执行
输出SQL是否存在语法错误及修改建议:
Usage:
Sql-checker -I <metadata dir> -p <default odps project> --sql to check>{--settings
Options:
-h,--help print help information
-i ,--input-dir <input-dir> Directory generated by seta carrier,which
Contains all the metadata
-p,--project <project> Ddfault odps project
--settings <settings> odps settings
--sql <sql> sql statements to check
Example:
Sql-checker -I meta -p odps_test -sql “select *from tbl_i”
--settings odps.sql,type, system,odps=ture
Output:
Sql 是否有语法或语义错误,错误的详细信息及修改建议,如:
Issues:
0.
Compatibility: ERROR
Description: {line 1,col 15} table odps_DATA_CARRIER.string_types cannot be resolved
Suggestion: null
5、使用odps_ddl_runner.py批量创建表和分区
ODPS DDL 创建好以后,运行 odps_ddl_runner.py,将会遍历 meta-processor 生成的目录,调用 odpscmd 自动创建表与分区:
Usage:
Python3 odps_ddl_runner.py [-h] –input INPUT [--odpscmd ODPSCMD]
Options:
--input <input-dri> Directory generated by meta processor
--odpscmd <path to opdscmd executable> optional, user can use
Their installed odpscmd to execute odps ddl
Example:
Python3 odps_ddl_runner.py –input input –odpscmd /opt/console/bin/odpscmd
Output:
执行过程中的日志,如:
INF:exectuing ‘console/bin/odpscmd -f output/test_types/odps_ddl/tables/dummy.sql
DEBUG: stdout: b’’
DEBUG: stderr:b’ok\nID =20190505071530558gyang67a\nok’
DEBUG: returncode: 0
6、使用 hive_udtf_sql_runner.py 迁移数据
表和分区创建完成后,运行 hive_udtf_sql_runner.py, 将数据从 hive 上传至MaxCompute.
Hive_udtf_sql_runner.py 有两种模式,第一种将会遍历 meta-processor 生成的目录,调用 hive client 运行 hive udtf sql,从而将数据从 hive 上传至 ODPS。第二种模式需要用户手动指定一个 hive sql 文件。
Usage:
Python3 hive_udtf_sql_runner.py {-h} {--input_all INPUT_ALL}
{--input_single_file INPUT_SINGLE_FILE}
{--settings SETTINGS}
options:
--input_all <input-dir> 自动运行<input-dir>下的所有 hive.sql 文件,将会自动迁移所有的数据
--input_single_file 运行指定的 hive sql文件,只会迁移指定的表成分区
--settings 额外的Hadoop/Hive配置,默认用odps-data-carrier/extra_settings
Example:
执行 meta-processor 生成的所有 hive sql:
python3 hive_udtf_sql_runner.py –-input_all processed
迁移单个表成分区:
python3 hive_udtf_sql_runner.py –input_single_file/path/to/hive_sql.sql
两个小工具:
[root@emr-header-1 odps-data-carier]# bin/network-
measurement-tool --mode FIND//它能连接max所有的地址,
会把最快的到最慢的做一个排序。
[root@emr-header-1 odps-data-carier]# bin/sql-checker -I meta –project dma_demo –sql “select distinct(cs_item_sk)
From catalog_sales;”
可用于兼容性的检查*
进阶功能1:仅生成指定database或table的metadata
在前面的 Demo 中,我们抓去了 hive 中所有 database 和表的 metadata,但在很多环境下,我们倾向于一次处理一个 database 或一张表,因此 meta-carrier 工具提供了抓取指定 database 或 table 的 metadata 的能力:
执行:
shodps-data-carrier/bin/meta-carier-u thrift://127.0.0.1:9083-d test-t test-o meta
可以看到,这里我们生成的 metadata 仅包含了 test.test 这张表
进阶功能2:仅仅灵活的hive到maxcompute映射
在前面的 Demo 中,我们将 hive 的 test.test 表映射到 mc 中ODPS_DATA_CARRIER_TEST.test这张表,然而,我们提供了更强大的能力,比如说修改 hive 表到 mc 的表明与列名映射,设置 mc 中表的 life cycle,增加 comment,等等(可直接修改 json 文件名)
用户可以编辑 meta-carrier 生成的 metadata 来做到上述的事情,可编辑的部分如下:
globle.json:
{
“datasourceType” :”Hive”,//目前仅支持Hive
“odpsVersion” “:1.0”,//默认ODPS版本为1.0
“hiveCompatible” : false // 在ODPS.2.0下,是否打开hive compatible开关
}
[database.name].json
{
“databaseName” : XXX,//Hive数据库名
“odpsProjectName” : XXX //对应的ODPS项目名,默认与Hive数据库名相同
}
[table name].json:
{
“tableName” : XXX,//Hive表名
“odpsTableName” : “XXX “//对应的ODPS表名,默认与Hive表名相同
“lifeCycle” : 10, // ODPS表的life cycle,默认为空,即不启用life cycle
“commene” : “XXX”, //ODPS表的comment,默认为空
“ifNotExists” : ture, // 创建ODPS表时是否加if not exists,默认不加
“cotumns” : {
{
“names” :column_l”,//Hive列名
“odpsColumName”, : “odps_column_1”,//对应的odps列名,默认与Hive列名相同 “type” : “bigint”,//ODPS列的类型,用户暂时不可自行修改
“comment” : “XXX” //ODPS列的comment
},
…
],
partitioncolumns” : [
{
name” : “column_1”,//Hive分区判名
odpsColumnName” : “odps_column_1”,//对应的ODPS分区判名,默认与Hive分区判名相同
“type” : “bigint”,//ODPS分区判类型
comment” : “xxx” //ODPS分区列的comment
},
…
]
}
使用Dataworks自动迁移数据和工作流
工作流的迁移:
先给到目录模板,然后客户根据模板做目录,如果有相应的开源的磕放入目录中去。
1、安装 MMA Agent 客户端工具:采集 Metadata&生成 ODPS DDL
参照【MMAAgent 操作说明】的第1、2步骤。
先给到目录模板,然后客户根据模板做目录,如果有相应的开源的磕放入目录中去。
2、上传 Dataworks 项目描述文件
根据模板(参见右图)生成 DataWorks 项目描述文档,打包为:dataworks_project.tgz 上传到 Dataworks。
【注意】:一期仅支持:1)打包文件手动上传:2)支持OOIZE调度引擎的配置模板和 Dataworks 工作流配置模板。
上传完成后,Dataworks 服务会根据 ODPSDDL 批量生成 MaxComputegtable
MaxCompute 的表创建完成后,Dataworks 服务会自动拉起 Datax 的数据同步任务,完成批量数据迁移。
3、项目描述文件(/project.xml)说明
tenantId:用户在 dataworks 上的租户ID:
name:用户事先在 dataworks 上创建好的项目空间名称:
owner:用户的阿里云账号 ID。
4、工作流描述文件(/workflow.xml)说明
节点类型说明
type |
DataWorks对应概念 |
DI |
数据集成类型的节点 |
VIRTUAL |
虚拟节点 |
Odps_SQL |
Max Compute SQL典型节点 |
其他类型作业迁移方案
UDF、MR 迁移
支持相同逻辑的 UDF、MR 输入,输出参数的映射转换,但 UDF 和 MR 内部逻辑需要客户自己维护。(需要开启 hive 兼容的 flag)
注意】:不支持在 UDF、MR 中直接访问文件系统、网络访问、外部数据源连接。
外表迁移
原则上全部迁到 MaxCompute 内部表。
如果必须通过外表访问外部文件,建议先将文件迁移到 OSS 或者 OTS,在MaxCompute 中创建外部表,实现对文件的访问。
【注意]:MaxCompute 外部表支持的格式包括:ORC、.PARQUET、SEQUENCEFILE. 、RCFILE、.AVRO和TEXTFILE。
Spark 作业迁移
1.【作业无需访问 MaxCompute 表和 OSS】用户 jar 包可直接运行,参照《MaxComputeSpark 开发指南》第二节准备开发环境和修改配置。注意,对于spark或 hadoop 的依赖必须设成 provide。
2.【作业需要访问 MaxCompute 表】参考《MaxCompute Spark 开发指南》第三节编译 datasource 并安装到本地 maven 仓库,在 pom 中添加依赖后重新打包即可。
3.【作业需要访问 OSS】参考《MaxComputeSpark 开发指南》第四节在pom中添加依赖后重新打包即可。
查看迁移评估报告
使用 MMAAgent 获得评估报告(数据类型的不兼容,语法类型的不兼容)
报告中将搬站风险分为两档,高风险(HIGH RISK)与中等风险(MODERATE RISK)
高风险意味着必须人工介入,例如出现了表名冲突,ODPS完全不支持的类型等问题。
中等风险意味着迁移过程中可以自动处理,但是需要告知用户的潜在风险,例如Hive数据类型到 ODPS 数据类型会带来的精度损失等问题。