JSON数据从OSS迁移到MaxCompute最佳实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文为您介绍如何利用DataWorks数据集成将JSON数据从OSS迁移到MaxCompute,并使用MaxCompute内置字符串函数GET_JSON_OBJECT提取JSON信息。

本文为您介绍如何利用DataWorks数据集成将JSON数据从OSS迁移到MaxCompute,并使用MaxCompute内置字符串函数GET_JSON_OBJECT提取JSON信息。

数据上传OSS

将您的JSON文件重命名后缀为TXT文件,并上传到OSS。本文中使用的JSON文件示例如下。

{
    "store": {
        "book": [
             {
                "category": "reference",
                "author": "Nigel Rees",
                "title": "Sayings of the Century",
                "price": 8.95
             },
             {
                "category": "fiction",
                "author": "Evelyn Waugh",
                "title": "Sword of Honour",
                "price": 12.99
             },
             {
                 "category": "fiction",
                 "author": "J. R. R. Tolkien",
                 "title": "The Lord of the Rings",
                 "isbn": "0-395-19395-8",
                 "price": 22.99
             }
          ],
          "bicycle": {
              "color": "red",
              "price": 19.95
          }
    },
    "expensive": 10
}
将applog.txt文件上传到OSS,本文中OSS Bucket位于华东2区。 

使用DataWorks导入数据到MaxCompute

  1. 新增OSS数据源
    进入DataWorks 数据集成控制台,新增 OSS类型数据源。 

    具体参数如下所示,测试数据源连通性通过即可点击完成。Endpoint地址请参见 OSS各区域的外网、内网地址,本例中为 http://oss-cn-shanghai.aliyuncs.com或  http://oss-cn-shanghai-internal.aliyuncs.com(由于本文中OSS和DataWorks项目处于同一个region中,本文选用后者,通过内网连接)。 

  2. 新建数据同步任务
    在DataWorks上新建 数据同步类型节点。 

    新建的同时,在DataWorks新建一个 建表任务,用于存放JSON数据,本例中新建表名为mqdata。 

    表参数可以通过图形化界面完成。本例中mqdata表仅有一列,类型为string,列名为MQ data。 

    完成上述新建后,您可以在图形化界面配置数据同步任务参数,如下图所示。选择目标数据源名称为odps_first,选择目标表为刚建立的mqdata。数据来源类型为OSS,Object前缀可填写文件路径及名称。列分隔符使用TXT文件中不存在的字符即可,本文中使用  ^(对于OSS中的TXT格式数据源,Dataworks支持多字符分隔符,所以您可以使用例如  %&%#^$$^%这样很难出现的字符作为列分隔符,保证分割为一列)。 

    映射方式选择默认的同行映射即可。 

    点击左上方的切换脚本按钮,切换为脚本模式。修改fileFormat参数为:  "fileFormat":"binary"。该步骤可以保证OSS中的JSON文件同步到MaxCompute之后存在同一行数据中,即为一个字段。其他参数保持不变,脚本模式代码示例如下。
    
    {
        "type": "job",
        "steps": [
            {
                "stepType": "oss",
                "parameter": {
                    "fieldDelimiterOrigin": "^",
                    "nullFormat": "",
                    "compress": "",
                    "datasource": "OSS_userlog",
                    "column": [
                        {
                            "name": 0,
                            "type": "string",
                            "index": 0
                        }
                    ],
                    "skipHeader": "false",
                    "encoding": "UTF-8",
                    "fieldDelimiter": "^",
                    "fileFormat": "binary",
                    "object": [
                        "applog.txt"
                    ]
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "isCompress": false,
                    "truncate": true,
                    "datasource": "odps_first",
                    "column": [
                        "mqdata"
                    ],
                    "emptyAsNull": false,
                    "table": "mqdata"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false,
                "dmu": 1
            }
        }
    }
    完成上述配置后,点击运行接即可。运行成功日志示例如下所示。 

获取JSON字段信息

在您的 业务流程中新建一个ODPS SQL节点。 
 
您可以首先输入  SELECT*from mqdata;语句,查看当前mqdata表中数据。当然这一步及后续步骤,您也可以直接在 MaxCompute客户端中输入命令运行。 

确认导入表中的数据结果无误后,您可以使用MaxCompute内建字符串函数 GET_JSON_OBJECT获取您想要的JSON数据。本例中使用  SELECT GET_JSON_OBJECT(mqdata.MQdata,'$.expensive') FROM mqdata;获取JSON文件中的  expensive值。如下图所示,可以看到已成功获取数据。 
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标  通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群  企业数据仓库开发人员  大数据平台开发人员  数据分析师  大数据运维人员  对于大数据平台、数据中台产品感兴趣的开发者
目录
相关文章
|
2月前
|
分布式计算 DataWorks 调度
oss数据同步maxcompute报错
在使用阿里云DataWorks同步OSS数据至MaxCompute时,遇到“Input is not in the .gz format”的报错。问题源于目标目录中存在一个空文件,导致同步时识别错误。
|
3天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
10天前
|
算法 大数据 数据库
云计算与大数据平台的数据库迁移与同步
本文详细介绍了云计算与大数据平台的数据库迁移与同步的核心概念、算法原理、具体操作步骤、数学模型公式、代码实例及未来发展趋势与挑战。涵盖全量与增量迁移、一致性与异步复制等内容,旨在帮助读者全面了解并应对相关技术挑战。
19 3
|
7天前
|
数据采集 分布式计算 OLAP
最佳实践:AnalyticDB在企业级大数据分析中的应用案例
【10月更文挑战第22天】在数字化转型的大潮中,企业对数据的依赖程度越来越高。如何高效地处理和分析海量数据,从中提取有价值的洞察,成为企业竞争力的关键。作为阿里云推出的一款实时OLAP数据库服务,AnalyticDB(ADB)凭借其强大的数据处理能力和亚秒级的查询响应时间,已经在多个行业和业务场景中得到了广泛应用。本文将从个人的角度出发,分享多个成功案例,展示AnalyticDB如何助力企业在广告投放效果分析、用户行为追踪、财务报表生成等领域实现高效的数据处理与洞察发现。
26 0
|
2月前
|
负载均衡 Java 对象存储
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
45 2
|
3月前
|
机器学习/深度学习 存储 人工智能
自动化是解决大数据迁移障碍的答案
自动化是解决大数据迁移障碍的答案
|
3月前
|
安全 关系型数据库 MySQL
揭秘MySQL海量数据迁移终极秘籍:从逻辑备份到物理复制,解锁大数据迁移的高效与安全之道
【8月更文挑战第2天】MySQL数据量很大的数据库迁移最优方案
576 17
|
4月前
|
存储 JSON 自然语言处理
OSS数据源一站式RAG最佳实践
本文介绍了如何使用OpenSearch LLM智能问答版通过OSS数据源一站式构建RAG系统。
7149 11
|
4月前
|
存储 分布式计算 监控
日志数据投递到MaxCompute最佳实践
日志服务采集到日志后,有时需要将日志投递至MaxCompute的表中进行存储与分析。本文主要向用户介绍将数据投递到MaxCompute完整流程,方便用户快速实现数据投递至MaxCompute。
186 2
|
4月前
|
分布式计算 DataWorks 数据处理
MaxCompute操作报错合集之UDF访问OSS,配置白名单后出现报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute