MaxCompute实践

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 一、写在前面 本人之前一直从事程序开发的(PHP、JAVA、Python)工作,在之前的工作经历中有过一段时间配合Hadoop工程师的事务,但接触的并不深,只能说略知点皮毛,有点管中窥豹的感觉。 今年进了新公司,因为公司正在组建新的数据部门,非常有幸本人得以调入该部门,恰逢MaxCompute横空出世,刚好因为我那时工作比较空闲,得以安排调研它的功能及测试是否符合我们的要求。

一、写在前面

本人之前一直从事程序开发的(PHP、JAVA、Python)工作,在之前的工作经历中有过一段时间配合Hadoop工程师的事务,但接触的并不深,只能说略知点皮毛,有点管中窥豹的感觉。
今年进了新公司,因为公司正在组建新的数据部门,非常有幸本人得以调入该部门,恰逢MaxCompute横空出世,刚好因为我那时工作比较空闲,得以安排调研它的功能及测试是否符合我们的要求。
由于个人对大数据这块的经验不足,涉及的内容也不是太高端的,所得知识基本是通过阅读文档获得,所以本文仅以初学者的角度去阐述,主要围绕数据迁移这块,其他部分还未来得急涉及,如有不正确的地方,还请指出,谢谢。

二、平台体系的选型

因初期数据量相对较小,使用Kettle进行抽取数据等工作,ETL的工作大部分在MySQL数据仓库中完成。多种数据源使用Presto(集群)作为查询中间键进行相应的数据分析。但随着业务的疯狂增长,数据表单表达到数亿后,磁盘容量达数几百GB时,数据要求的复杂度逐步提升,使用MySQL作为基础数据仓库的基石已经不足以应付,常出现查询响应时间等待过长,甚至内存崩溃导致执行失败的情况,极大的影响了工作效率。所以使用选择一款大数据平台势在必行。初期考虑使用Hadoop来做进行数据分析平台,恰逢MaxCompute横空出世给我们一个新的选择,在经过仔细比对后,考虑到公司新数据部门刚成立不久,Hadoop大数据相关人才储备较为紧缺,如果等待Hadoop体系完成搭建并使用,可能需要更多的时间,而这势必会拖慢之后的工作进度。而如果使用MaxCompute相对于自建Haoop数据平台要简单,快速的多,但任何产品在正式使用前必然需要进行详细的调研,所以我就开始调研的过程。

三、数据迁移

3.1 数据源类型介绍

目前我们主要有两种数据源的数据需要放至MaxCompute中进行处理,一种为MySQL,一种为MongoDB。在此主要描述MySQL相关的,至于MongoDB嘛,看到最后你就会知道了^_^。

3.2 迁移工具的选择

虽然MaxCompute有多种迁移方案可供选择,但如果使用大数据开发套件上的脚本模式能够有效完成的话,个人还是比较倾向使用大数据开发套件去完成,除非在此之上无法完成。因为相对于自己使用SDK直接同步或其他工具去实现,大数据开发平台要简单快速的多,任务调度系统完善,任务运行情况相对清晰便于查错,之后更多任务可以灵活组合搭配。

3.3 同步策略介绍

数据主要分为会变化的数据(有修改+有增加)与不会变化的数据(只增加),官方文档中是建议每天全量的同步策略,即每天的数据作为一个副本存至同步日期为分区,这样做确实有很多的好处。
但实际情况可能由于数据量过大,每天同步可能会花更多的时间(测试过1亿数据大概在3~4小时左右,脚本参数配置speed": { "concurrent": "1", "mbps": "1" }),因为我们业务中不存在DELTE操作,所以我们这里处理不管数据是否变化都使用每日增量的方式处理,最终按数据的创建日期存放在对应的分区(前期也不知道怎么设计,如何设计最佳,所以就先设计为1级分区),虽然这样需要多做一步合并操作,多耗费一些资源费用,但确实是实现我们的要求。

3.4 步骤介绍

在“大数据开发套件”中同步数据主要使用时间段的方式进行同步,主要分为两个步骤,第一步为手动全量同步,第二步为自动每日增量。如果想让数据的每条数据按数据的创建时间存放到对应的日期的话,则需要在此基础上进行细化。针对不同的数据(会变化与不会变化)的处理方式也各有所不同,基本上是按照文档中所描述的进行了,只是稍微有些不同。

3.5 举例说明

3.5.1 准备工作

  • 假设当前日期为: 2017-07-13

  • Mysql表信息
    数据表名: test
    创建时间字段: created_at
    更改时间字段: updated_at
    时间字段值: unix时间戳
    表结构定义如下:

    CREATE TABLE `test` 
    (
    `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
    `name` varchar(10) NOT NULL COMMENT '姓名',
    `age` tinyint(4) NOT NULL COMMENT '年龄',
    `sex` tinyint(4) NOT NULL DEFAULT '0' COMMENT '性名(1=男, 2=女, 0=未选择)',
    `created_at` int(11) NOT NULL COMMENT '创建时间',
    `updated_at` int(11) NOT NULL COMMENT '修改时间',
    PRIMARY KEY ( `uid`),
    KEY `created_at` ( `created_at`),
    KEY `updated_at` ( `updated_at`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表'
    
  • MaxCompute数据表
    创建一张与这对应的MaxCompute数据表
    数据表名: odps_test_history
    分 区 名: pt
    分 区 值: 数据的创建日期
    表结构定义如下:

    CREATE TABLE odps_test_history
    (
      uid BIGINT COMMENT '用户ID',
      name STRING COMMENT '姓名',
      age BIGINT COMMENT '年龄',
      sex BIGINT COMMENT '性名(1=男, 2=女, 0=未选择)',
      created_at BIGINT COMMENT '创建时间',
      updated_at BIGINT COMMENT '修改时'
    )
    PARTITIONED BY (
      pt STRING
    )
    LIFECYCLE 100000;
    
  • 命名约定
    临时表表名后缀:_history
    增量表表名后缀:_ inc

3.5.1 同步不会变化的数据

1. 手动全量同步

通过脚本模式将小于当前日期(2017-07-13)的所有的数据一次性导入到一张临时表中。
在数据开发中点击测试运行,看到任务成功运行后,在任务设置调度参数中设置暂停(测试运行不会受到调度干扰,直接运行)。

脚本配置如下:

{
  "configuration": {
    "reader": {
      "plugin": "mysql",
      "parameter": {
        "datasource": "mysql_db001",
        "column": [
          "uid",
          "name",
          "age",
          "sex",
          "created_at",
          "updated_at"
        ],
        "where": "created_at < UNIX_TIMESTAMP('20170713')",
        "splitPk": "uid",
        "table": "test"
      }
    },
    "writer": {
      "plugin": "odps",
      "parameter": {
        "partition": "pt=history",
        "truncate": true,
        "datasource": "odps_first",
        "column": [
          "uid",
          "name",
          "age",
          "sex",
          "created_at",
          "updated_at"
        ],
        "table": "odps_test_history"
      }
    },
    "setting": {
      "errorLimit": {
        "record": "0"
      },
      "speed": {
        "concurrent": "1",
        "mbps": "1"
      }
    }
  },
  "type": "job",
  "version": "1.0"
}

2. 创建最终目标表

执行ODPS_SQL复制odps_test_history表并命名为odps_test作为最终目标表。

-- 创建目标表
CREATE TABLE odps_test LIKE odps_test_history;

3. 根据数据的时间进行动态分区

执行ODPS_SQL将数据导入到最终目标表中。

--- 将历史表中的数据根据数据的创建的时间插入目标表对应的分区中
INSERT OVERWRITE TABLE odps_test PARTITION (pt)
SELECT uid, name, age, sex, created_at, updated_at, TO_CHAR(FROM_UNIXTIME(created_at), 'yyyymmdd') AS pt 
FROM odps_test_history;

4. 简单验证数据

执行ODPS SQL,如两个数一样则说明导入成功,如不一样,请重试。

-- 验证数据总数
SELECT COUNT(*) FROM  odps_test_history;
SELECT COUNT(*) FROM  odps_test;

5. 删除历史临时表

如确认数据无误,则可以删除临时表。当然你也可以不删除,以便出了错只需将此时间节点内的数据导入即可,避免从头导入的过程。

--- 删除历史表
ALTER TABLE odps_test_history DROP IF EXISTS PARTITION ( pt='history' );

6. 更改同步脚本

更新同步脚本,并将任务进行提交,之前的第一步操作的暂停记得勾掉。
更改后的同步脚本如下,请注意蓝色部分:
image001

7. 完成

如果执行到这一步基本上就完成了,隔天就会在你在设定时间内运行,你也可以去运维中心中查看你的任务情况。
下面附上”不会变化的数据“数据同步的全部流程图:
image002

3.5.2 同步会变化的数据

注意事项
A. 为方便演示,继续使用test表来作为演示表。
B. 同步会变化的数据与不会变化的数据,前面5个步骤一样是一样,在此不进行复述。

1. 手动全量同步

参考同步不会变化的数据

2.创建最终目标表

参考同步不会变化的数据

3. 根据数据的时间进行动态分区

参考同步不会变化的数据

4. 简单验证数据

参考同步不会变化的数据

5. 删除临时表

参考同步不会变化的数据

6. 创建增量表

执行ODPS SQL,创建一张增量表

--- 创建增量表
CREATE TABLE odps_test_inc LIKE odps_test;

7. 更改同步脚本

更改后的同步脚本如下,请注意蓝色部分:
image004

因为太长不好截图,所以脚本模式这里的where的参数值使用:

FROM_UNIXTIME ( created_at,'%Y%m%d')=${
   bdp.system.bizdate} OR FROM_UNIXTIME(updated_at, '%Y%m%d') = ${
   bdp.system.bizdate}

如果想效率更高一点可使用下方语句,当字段created_at有索引时可以命中:

(created_at >= UNIX_TIMESTAMP('${bdp.system.bizdate}000000') AND created_at <= UNIX_TIMESTAMP('${bdp.system.bizdate}235959')) OR (updated_at >= UNIX_TIMESTAMP('${bdp.system.bizdate}000000') AND updated_at <= UNIX_TIMESTAMP('${bdp.system.bizdate}235959'))

8. 创建合并数据(ODPS_SQL)任务

在数据开发->任务开发中创建一个节点任务,任务名称使用_merge结尾

并输入以下内容:

--- 合并数据并根据数据的创建时间更新到指定分区中
INSERT OVERWRITE TABLE odps_test PARTITION (pt)
SELECT  
    CASE WHEN b.id IS NOT NULL THEN b.uid ELSE a.uid END AS uid,
    CASE WHEN b.id IS NOT NULL THEN b.name ELSE a.name END AS name,
    CASE WHEN b.id IS NOT NULL THEN b.age ELSE a.age END AS age,
    CASE WHEN b.id IS NOT NULL THEN b.sex ELSE a.sex END AS sex,
    CASE WHEN b.id IS NOT NULL THEN b.created_at ELSE a.created_at END AS created_at,
    CASE WHEN b.id IS NOT NULL THEN b.updated_at ELSE a.updated_at END AS updated_at,
    CASE WHEN b.id IS NOT NULL THEN TO_CHAR(FROM_UNIXTIME(b.created_at), 'yyyymmdd') ELSE TO_CHAR(FROM_UNIXTIME(a.created_at), 'yyyymmdd') END AS pt
FROM
odps_test a 
FULL OUTER JOIN odps_test_inc b
ON a.id  = b.id ;

--- 删除无用数据(如想观察每日数据变化不进行删除,上面一行的SQL需要加上条件: pt='${bdp.system.bizdate}')
ALTER TABLE odps_test_inc DROP IF EXISTS PARTITION ( pt='${bdp.system.bizdate}');

9. 设置调度依赖

将合并数据的任务的上游任务设置为每日增量同步数据那个任务,并提交即可。

注: odps_test是就是同步数据的脚本模式的任务名称。

image00003

10. 完成

如果执行到这一步基本上就完成了,隔天就会在你在设定时间内运行,你也可以去运维中心中查看你的任务情况。
下面附上”会变化的数据“数据同步的全部流程图:
image007

四、数据迁移

4.1 脚本模式下mongodb同步MaxCompute,时间字段值存的unix时间戳如何筛选?

这个问题我之前也有提交过工单,工单内容如下:

我们在测试使用数据同步更新,使用query参数过滤数据,按文档中所描述,语法参照MongoDB查询语法

因为我们mongodb表中表示数据"最后更新时间"的字段为updated_at,其存放的内容为unix时间戳。
mongodb中将日期格式化成unix时间戳的方法是:ISODate("2012-10-15T21:26:17+0800").valueOf() / 1000

根据该写法,那在脚本模式中的写法将是:
"query": "{'updated_at':{'$gte':ISODate('${last_day}T00:00:00.424+0800').valueOf() / 1000}}",

但是我们运行后,程序报错无法运行了,报错信息为说是格式的问题,解析不了。

报错信息如下:
2017-06-23 20:46:32.647 [job-37098472] ERROR JobContainer - Exception when job run
org.bson.json.JsonParseException: Invalid JSON input. Position: 62. Character: '.'.

当mongodb中字段为unix时间戳时,如果使用query参数进行过滤?

我们花了时间查看文档依旧没有得到答案,所以前来请教一下。

最终MaxCompute技术人员给的解决方案如下:
  1. dataworks 里面配置shell + datax, 在shell 里面讲dataworks的调度时间参数yyyy-mm-dd格式 转换为 unix时间戳格式, 时间戳传递给datax,再由datax发给mongodb server 做数据过滤查询。

  2. 在mongodb server 表里面添加时间列,时间列的值具体是你原来整数unix时间戳列生成的, 这个只需要操作修改一次。

4.2 脚本模式下mongodb同步MaxCompute,同步速度如何优化?

因为各种原因我们一开始是拿MongoDB进行的测试,为此专门找DBA挑一张较小的数据表( 约3GB左右)进行。按照文档中描述,使用脚本模式根据文档进行了配置,确实很挺简单的,所以进行任务测试,任务如期正常运行,但经过观察发现数据同步的速度逐步下降,最让人无法忍受的只有10KB/S,该表同步了20多个小时都没同步完,最后任务自行中止了。经过询问相关技术人员,可能由于Mongodb同步数据的底层实现还存在一些问题导致同步速度过慢导致的。为此只能选择替代方案,主要集中在“基于Kettle的MaxCompute插件”及“基于Tunnel SDK开发”,经过比对后考虑到开发速度及同步速度,最终选择暂时采用Kettle来进行mongodb的导入工作。

如下为导入速度对比结果:
image009

4.3 基于Kettle的MaxCompute插件同步mongodb遇到的问题

以下为我的同事在使用中的经验总结,在此我一并提一下:
MaxCompute的kettle插件的问题(个人只用过Aliyun MaxCompute Output,没有用过Aliyun MaxCompute Input,因此只对Output部分做评价):

  1. 没有提供更新的功能,只能插入,做增量更新的时候会很棘手。
  2. 没有提供输出流,无法记录日志。
  3. 不支持从上一步获取变量的功能,对于想要使用变量替换的操作很麻烦。
  4. EndPoint、AccessId等配置不支持从配置文件读取,十分不灵活。
  5. 文档太少,没有提供对MongoDB等数据库的帮助文档。

4.4 Mysql全库同步工具时间字段是unix时间戳没办法用。

因为我们有大量的Mysql数据表需要导入,发现有个整库迁移的工具可以使用,刚发现时就怦然心动,然而经过实测后大失所望,因为该工具日期字段的值必须是yyyymmdd格式,像我们使用UNIX时间戳压根没法用。
另外该功能不支持表字段指定,如果某些数据表字段过于敏感想不进行同步,也不能设置。
image010

4.4 大数据开发套件中“数据表管理”如果表数量过多就不便于查找,且数据表分类只有一级。

如果左侧能提供一个树形菜单将多级分类列出,点击就能查看对应分类下的表就方便多了。

五、感谢

接触一件新事务时遇到了困难,并且自己无法解决时,难免会感到彷徨无助,有种挫败感。
万幸的是我在钉钉上找到了组织,非常感谢阿里云MaxCompute项目组的@数加·祎休(yī xiū)@一斅@彭敏@李珍珍等工作人员的鼎力支持,在我们提交问题后,不厌其烦及时并且认真的回答解决我们的问题,祝你们工作顺利,事事顺心。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
4月前
|
SQL 分布式计算 运维
如何对付一个耗时6h+的ODPS任务:慢节点优化实践
本文描述了大数据处理任务(特别是涉及大量JOIN操作的任务)中遇到的性能瓶颈问题及其优化过程。
|
3月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
95 3
|
3天前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
14 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
2月前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
7月前
|
数据采集 数据可视化 大数据
Python在大数据处理中的应用实践
Python在大数据处理中扮演重要角色,借助`requests`和`BeautifulSoup`抓取数据,`pandas`进行清洗预处理,面对大规模数据时,`Dask`提供分布式处理能力,而`matplotlib`和`seaborn`则助力数据可视化。通过这些工具,数据工程师和科学家能高效地管理、分析和展示海量数据。
317 4
|
2月前
|
边缘计算 人工智能 搜索推荐
大数据与零售业:精准营销的实践
【10月更文挑战第31天】在信息化社会,大数据技术正成为推动零售业革新的重要驱动力。本文探讨了大数据在零售业中的应用,包括客户细分、个性化推荐、动态定价、营销自动化、预测性分析、忠诚度管理和社交网络洞察等方面,通过实际案例展示了大数据如何帮助商家洞悉消费者行为,优化决策,实现精准营销。同时,文章也讨论了大数据面临的挑战和未来展望。
|
6月前
|
数据采集 运维 Cloud Native
Flink+Paimon在阿里云大数据云原生运维数仓的实践
构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。
18536 54
Flink+Paimon在阿里云大数据云原生运维数仓的实践
|
3月前
|
SQL 消息中间件 分布式计算
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
114 0
|
3月前
|
SQL 大数据
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(二)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(二)
79 0
|
3月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
77 0