Dataworks同步数据到X-pack Spark

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介:

简介

本文主要介绍如何通过“Dataworks->数据集成->离线同步”把数据同步到X-pack Spark的hdfs上。同步数据到X-pack的hdfs后,就可以使用X-pack Spark对数据进行分析。
本例通过把Dataworks的一张表同步到X-pack Spark的hadfs为例,介绍如何同步数据。

前置条件

  1. X-pack Spark集群已经开通hdfs端口。需要联系X-pack Spark维护人员:“云X-Pack Spark答疑” 开通。

操作步骤

在Dataworks中创建“独享数据集成资源组”

X-pack Spark的hdfs是在VPC内,Dataworks要求一定要使用“独享数据集成资源组”才可以同步数据。
假设创建的“独享数据集成资源组”的名称为:test_cluster,如下图:
image
注意:可用区应要选择和X-pack Spark一样的可用区
独享数据集成资源组的创建详细指导请参考Dataworks官方文档:“独享资源组

对“独享数据集成资源组”进行“专有网络绑定”

创建完“独享数据集成资源组”之后需要对其操作“专有网络绑定”,如下图:
image
注意:“专有网络” 一定要选择和X-pack Spark相同的专有网络。 “交换机”和“安全组”建议选择和X-pack Spark相同的。(本例选择相同的)

在“X-pack Spark”中配置Dataworks的白名单。

需要在X-pack Spark中配置Dataworks的白名单,Dataworks才能访问到X-pack Spark。
打开上一步骤中绑定的“交换机”, 查看交换机的“IPv4网段”, 把“IPv4网段”对应的IP断添加到X-pack Spark的白名单中。
image

配置Dataworks的安全组出入端口。

“独享数据集成资源组”绑定“安全组”后,需要配置安全组的出入端口,保证“独享数据集成资源组”可以访问到X-pack Spark的hdfs。 需要打开8020和50070端口。

在Dataworks中创建表。

在Dataworks创建表test01,数据如下:

CREATE TABLE IF NOT EXISTS test01
(
    id   STRING,
    name STRING
) 
insert into test01 values('a', 'b')

在Dataworks中创建“离线同步”。

创建离线同步把数据表test01同步到X-pack Spark的hdfs中。Dataworks不支持到hdfs的向导配置,需要切换到“脚本模式”配置任务。脚本内容如下:

{
    "type": "job",
    "steps": [
        {
            "stepType": "odps",
            "parameter": {
                "partition": [],
                "datasource": "odps_first",
                "column": [
                    "*"
                ],
                "guid": null,
                "emptyAsNull": false,
                "table": "test01"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "hdfs",
            "parameter": {
                "path": "/tmp",
                "fileName": "test01.txt",
                "compress": "GZIP",
                "defaultFS": "hdfs://${spark集群id}",
                "hadoopConfig": {
                    "dfs.ha.automatic-failover.enabled.${spark集群id}": true,
                    "dfs.namenode.http-address.${spark集群id}.nn1": "${spark集群id}-master1-001.spark.rds.aliyuncs.com:50070",
                    "dfs.namenode.http-address.${spark集群id}.nn2": "${spark集群id}-master2-001.spark.rds.aliyuncs.com:50070",
                    "dfs.client.failover.proxy.provider.${spark集群id}": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "dfs.nameservices": "${spark集群id}",
                    "dfs.ha.namenodes.${spark集群id}": "nn1,nn2",
                    "dfs.namenode.rpc-address.${spark集群id}.nn1": "ap-wz9t69njoc3xzt65y-master1-001.spark.rds.aliyuncs.com:8020",
                    "dfs.namenode.rpc-address.${spark集群id}.nn2": "ap-wz9t69njoc3xzt65y-master2-001.spark.rds.aliyuncs.com:8020"
                },
                "column": [
                    {
                        "name": "col1",
                        "type": "string"
                    },
                    {
                        "name": "col2",
                        "type": "string"
                    }
                ],
                "writeMode": "append",
                "encoding": "UTF-8",
                "fieldDelimiter": ",",
                "fileType": "text"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    }
}

脚本说明:
"datasource": "odps_first": Dataworks默认创建的数据源。
"table": "test01": Dataworks的数据表:test01
"path": "/tmp": 写入数据到hdfs的路径。
"fileName": "test01.txt":写入数据到hdfs的文件名称。
"compress": "GZIP": 写入到hdfs的文件压缩格式。
**"defaultFS": "hdfs://${spark集群id}"**: X-pack Spark hdfs集群的defaultFS,需要把${spark集群id}替换成自己的X-pack Spark集群ID。
"hadoopConfig": {xxx}: X-pack Spark hdfs集群的HA 的配置信息,需要把内容中的${spark集群id}替换成自己的X-pack Spark集群ID。
脚本配置完后,需要“配置任务资源组”, 点击“配置任务资源组” 选择第一步创建的“独享数据集成资源组”:test_cluster。 如下图:
image

在Dataworks中运行查看效果。

配置完毕后点击“运行”,等待运行成功。然后查询X-pack Spark的hdfs中是否有文件写入,当出现如下文件时,说明写入成功:
image

小结

数据同步到X-pack Spark hdfs 后可以同步X-pack Spark 控制分析hdfs的数据了。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标  通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群  企业数据仓库开发人员  大数据平台开发人员  数据分析师  大数据运维人员  对于大数据平台、数据中台产品感兴趣的开发者
相关文章
|
18天前
|
数据采集 DataWorks 数据挖掘
提升数据分析效率:DataWorks在企业级数据治理中的应用
【8月更文第25天】本文将探讨阿里巴巴云的DataWorks平台如何通过建立统一的数据标准、规范以及实现数据质量监控和元数据管理来提高企业的数据分析效率。我们将通过具体的案例研究和技术实践来展示DataWorks如何简化数据处理流程,减少成本,并加速业务决策。
143 54
|
18天前
|
SQL 分布式计算 DataWorks
利用DataWorks构建高效数据管道
【8月更文第25天】本文将详细介绍如何使用阿里云 DataWorks 的数据集成服务来高效地收集、清洗、转换和加载数据。我们将通过实际的代码示例和最佳实践来展示如何快速构建 ETL 流程,并确保数据管道的稳定性和可靠性。
135 56
|
24天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
DataWorks 关系型数据库 MySQL
DataWorks产品使用合集之mysql节点如何插入数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
数据采集 JSON DataWorks
DataWorks产品使用合集之支持哪些数据引擎
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
分布式计算 DataWorks NoSQL
DataWorks产品使用合集之怎么在同步脚本里进行列转行
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之如何实现数据过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
存储 运维 DataWorks
DataWorks产品使用合集之怎么实现时间字段进行分区同步
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之如何对数据进行保护
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理在DI节点同步到OceanBase数据库时,出现SQLException: Not supported feature or function
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。