MaxCompute实践

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 MongoDB,通用型 2核4GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 一、写在前面 本人之前一直从事程序开发的(PHP、JAVA、Python)工作,在之前的工作经历中有过一段时间配合Hadoop工程师的事务,但接触的并不深,只能说略知点皮毛,有点管中窥豹的感觉。 今年进了新公司,因为公司正在组建新的数据部门,非常有幸本人得以调入该部门,恰逢MaxCompute横空出世,刚好因为我那时工作比较空闲,得以安排调研它的功能及测试是否符合我们的要求。

一、写在前面

本人之前一直从事程序开发的(PHP、JAVA、Python)工作,在之前的工作经历中有过一段时间配合Hadoop工程师的事务,但接触的并不深,只能说略知点皮毛,有点管中窥豹的感觉。
今年进了新公司,因为公司正在组建新的数据部门,非常有幸本人得以调入该部门,恰逢MaxCompute横空出世,刚好因为我那时工作比较空闲,得以安排调研它的功能及测试是否符合我们的要求。
由于个人对大数据这块的经验不足,涉及的内容也不是太高端的,所得知识基本是通过阅读文档获得,所以本文仅以初学者的角度去阐述,主要围绕数据迁移这块,其他部分还未来得急涉及,如有不正确的地方,还请指出,谢谢。

二、平台体系的选型

因初期数据量相对较小,使用Kettle进行抽取数据等工作,ETL的工作大部分在MySQL数据仓库中完成。多种数据源使用Presto(集群)作为查询中间键进行相应的数据分析。但随着业务的疯狂增长,数据表单表达到数亿后,磁盘容量达数几百GB时,数据要求的复杂度逐步提升,使用MySQL作为基础数据仓库的基石已经不足以应付,常出现查询响应时间等待过长,甚至内存崩溃导致执行失败的情况,极大的影响了工作效率。所以使用选择一款大数据平台势在必行。初期考虑使用Hadoop来做进行数据分析平台,恰逢MaxCompute横空出世给我们一个新的选择,在经过仔细比对后,考虑到公司新数据部门刚成立不久,Hadoop大数据相关人才储备较为紧缺,如果等待Hadoop体系完成搭建并使用,可能需要更多的时间,而这势必会拖慢之后的工作进度。而如果使用MaxCompute相对于自建Haoop数据平台要简单,快速的多,但任何产品在正式使用前必然需要进行详细的调研,所以我就开始调研的过程。

三、数据迁移

3.1 数据源类型介绍

目前我们主要有两种数据源的数据需要放至MaxCompute中进行处理,一种为MySQL,一种为MongoDB。在此主要描述MySQL相关的,至于MongoDB嘛,看到最后你就会知道了^_^。

3.2 迁移工具的选择

虽然MaxCompute有多种迁移方案可供选择,但如果使用大数据开发套件上的脚本模式能够有效完成的话,个人还是比较倾向使用大数据开发套件去完成,除非在此之上无法完成。因为相对于自己使用SDK直接同步或其他工具去实现,大数据开发平台要简单快速的多,任务调度系统完善,任务运行情况相对清晰便于查错,之后更多任务可以灵活组合搭配。

3.3 同步策略介绍

数据主要分为会变化的数据(有修改+有增加)与不会变化的数据(只增加),官方文档中是建议每天全量的同步策略,即每天的数据作为一个副本存至同步日期为分区,这样做确实有很多的好处。
但实际情况可能由于数据量过大,每天同步可能会花更多的时间(测试过1亿数据大概在3~4小时左右,脚本参数配置speed": { "concurrent": "1", "mbps": "1" }),因为我们业务中不存在DELTE操作,所以我们这里处理不管数据是否变化都使用每日增量的方式处理,最终按数据的创建日期存放在对应的分区(前期也不知道怎么设计,如何设计最佳,所以就先设计为1级分区),虽然这样需要多做一步合并操作,多耗费一些资源费用,但确实是实现我们的要求。

3.4 步骤介绍

在“大数据开发套件”中同步数据主要使用时间段的方式进行同步,主要分为两个步骤,第一步为手动全量同步,第二步为自动每日增量。如果想让数据的每条数据按数据的创建时间存放到对应的日期的话,则需要在此基础上进行细化。针对不同的数据(会变化与不会变化)的处理方式也各有所不同,基本上是按照文档中所描述的进行了,只是稍微有些不同。

3.5 举例说明

3.5.1 准备工作

  • 假设当前日期为: 2017-07-13
  • Mysql表信息
    数据表名: test

创建时间字段: created_at
更改时间字段: updated_at
时间字段值: unix时间戳
表结构定义如下:

CREATE TABLE `test` 
(
 `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
 `name` varchar(10) NOT NULL COMMENT '姓名',
 `age` tinyint(4) NOT NULL COMMENT '年龄',
 `sex` tinyint(4) NOT NULL DEFAULT '0' COMMENT '性名(1=男, 2=女, 0=未选择)',
 `created_at` int(11) NOT NULL COMMENT '创建时间',
 `updated_at` int(11) NOT NULL COMMENT '修改时间',
 PRIMARY KEY ( `uid`),
 KEY `created_at` ( `created_at`),
 KEY `updated_at` ( `updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表'
  • MaxCompute数据表
    创建一张与这对应的MaxCompute数据表

数据表名: odps_test_history
分 区 名: pt
分 区 值: 数据的创建日期
表结构定义如下:

CREATE TABLE odps_test_history
(
    uid BIGINT COMMENT '用户ID',
    name STRING COMMENT '姓名',
    age BIGINT COMMENT '年龄',
    sex BIGINT COMMENT '性名(1=男, 2=女, 0=未选择)',
    created_at BIGINT COMMENT '创建时间',
    updated_at BIGINT COMMENT '修改时'
)
PARTITIONED BY (
    pt STRING
)
LIFECYCLE 100000;
  • 命名约定
    临时表表名后缀:_history

增量表表名后缀:_ inc

3.5.1 同步不会变化的数据

1. 手动全量同步

通过脚本模式将小于当前日期(2017-07-13)的所有的数据一次性导入到一张临时表中。
在数据开发中点击测试运行,看到任务成功运行后,在任务设置调度参数中设置暂停(测试运行不会受到调度干扰,直接运行)。

脚本配置如下:

{
  "configuration": {
    "reader": {
      "plugin": "mysql",
      "parameter": {
        "datasource": "mysql_db001",
        "column": [
          "uid",
          "name",
          "age",
          "sex",
          "created_at",
          "updated_at"
        ],
        "where": "created_at < UNIX_TIMESTAMP('20170713')",
        "splitPk": "uid",
        "table": "test"
      }
    },
    "writer": {
      "plugin": "odps",
      "parameter": {
        "partition": "pt=history",
        "truncate": true,
        "datasource": "odps_first",
        "column": [
          "uid",
          "name",
          "age",
          "sex",
          "created_at",
          "updated_at"
        ],
        "table": "odps_test_history"
      }
    },
    "setting": {
      "errorLimit": {
        "record": "0"
      },
      "speed": {
        "concurrent": "1",
        "mbps": "1"
      }
    }
  },
  "type": "job",
  "version": "1.0"
}

2. 创建最终目标表

执行ODPS_SQL复制odps_test_history表并命名为odps_test作为最终目标表。

-- 创建目标表
CREATE TABLE odps_test LIKE odps_test_history;

3. 根据数据的时间进行动态分区

执行ODPS_SQL将数据导入到最终目标表中。

--- 将历史表中的数据根据数据的创建的时间插入目标表对应的分区中
INSERT OVERWRITE TABLE odps_test PARTITION (pt)
SELECT uid, name, age, sex, created_at, updated_at, TO_CHAR(FROM_UNIXTIME(created_at), 'yyyymmdd') AS pt 
FROM odps_test_history;

4. 简单验证数据

执行ODPS SQL,如两个数一样则说明导入成功,如不一样,请重试。

-- 验证数据总数
SELECT COUNT(*) FROM  odps_test_history;
SELECT COUNT(*) FROM  odps_test;

5. 删除历史临时表

如确认数据无误,则可以删除临时表。当然你也可以不删除,以便出了错只需将此时间节点内的数据导入即可,避免从头导入的过程。

--- 删除历史表
ALTER TABLE odps_test_history DROP IF EXISTS PARTITION ( pt='history' ); 

6. 更改同步脚本

更新同步脚本,并将任务进行提交,之前的第一步操作的暂停记得勾掉。
更改后的同步脚本如下,请注意蓝色部分:
image001

7. 完成

如果执行到这一步基本上就完成了,隔天就会在你在设定时间内运行,你也可以去运维中心中查看你的任务情况。
下面附上”不会变化的数据“数据同步的全部流程图:
image002

3.5.2 同步会变化的数据

注意事项
A. 为方便演示,继续使用test表来作为演示表。
B. 同步会变化的数据与不会变化的数据,前面5个步骤一样是一样,在此不进行复述。

1. 手动全量同步

参考同步不会变化的数据

2.创建最终目标表

参考同步不会变化的数据

3. 根据数据的时间进行动态分区

参考同步不会变化的数据

4. 简单验证数据

参考同步不会变化的数据

5. 删除临时表

参考同步不会变化的数据

6. 创建增量表

执行ODPS SQL,创建一张增量表

--- 创建增量表
CREATE TABLE odps_test_inc LIKE odps_test;

7. 更改同步脚本

更改后的同步脚本如下,请注意蓝色部分:
image004

因为太长不好截图,所以脚本模式这里的where的参数值使用:

FROM_UNIXTIME ( created_at,'%Y%m%d')=${bdp.system.bizdate} OR FROM_UNIXTIME(updated_at, '%Y%m%d') = ${bdp.system.bizdate}

如果想效率更高一点可使用下方语句,当字段created_at有索引时可以命中:

(created_at >= UNIX_TIMESTAMP('${bdp.system.bizdate}000000') AND created_at <= UNIX_TIMESTAMP('${bdp.system.bizdate}235959')) OR (updated_at >= UNIX_TIMESTAMP('${bdp.system.bizdate}000000') AND updated_at <= UNIX_TIMESTAMP('${bdp.system.bizdate}235959'))

8. 创建合并数据(ODPS_SQL)任务

在数据开发->任务开发中创建一个节点任务,任务名称使用_merge结尾

并输入以下内容:

--- 合并数据并根据数据的创建时间更新到指定分区中
INSERT OVERWRITE TABLE odps_test PARTITION (pt)
SELECT  
    CASE WHEN b.id IS NOT NULL THEN b.uid ELSE a.uid END AS uid,
    CASE WHEN b.id IS NOT NULL THEN b.name ELSE a.name END AS name,
    CASE WHEN b.id IS NOT NULL THEN b.age ELSE a.age END AS age,
    CASE WHEN b.id IS NOT NULL THEN b.sex ELSE a.sex END AS sex,
    CASE WHEN b.id IS NOT NULL THEN b.created_at ELSE a.created_at END AS created_at,
    CASE WHEN b.id IS NOT NULL THEN b.updated_at ELSE a.updated_at END AS updated_at,
    CASE WHEN b.id IS NOT NULL THEN TO_CHAR(FROM_UNIXTIME(b.created_at), 'yyyymmdd') ELSE TO_CHAR(FROM_UNIXTIME(a.created_at), 'yyyymmdd') END AS pt
FROM
odps_test a 
FULL OUTER JOIN odps_test_inc b
ON a.id  = b.id ;

--- 删除无用数据(如想观察每日数据变化不进行删除,上面一行的SQL需要加上条件: pt='${bdp.system.bizdate}')
ALTER TABLE odps_test_inc DROP IF EXISTS PARTITION ( pt='${bdp.system.bizdate}'); 

9. 设置调度依赖

将合并数据的任务的上游任务设置为每日增量同步数据那个任务,并提交即可。

注: odps_test是就是同步数据的脚本模式的任务名称。

image00003

10. 完成

如果执行到这一步基本上就完成了,隔天就会在你在设定时间内运行,你也可以去运维中心中查看你的任务情况。
下面附上”会变化的数据“数据同步的全部流程图:
image007

四、数据迁移

4.1 脚本模式下mongodb同步MaxCompute,时间字段值存的unix时间戳如何筛选?

这个问题我之前也有提交过工单,工单内容如下:

我们在测试使用数据同步更新,使用query参数过滤数据,按文档中所描述,语法参照MongoDB查询语法
https://help.aliyun.com/knowledge_detail/50354.html?spm=a2c1i.8282367.0.0.OPStre

因为我们mongodb表中表示数据"最后更新时间"的字段为updated_at,其存放的内容为unix时间戳。
mongodb中将日期格式化成unix时间戳的方法是:ISODate("2012-10-15T21:26:17+0800").valueOf() / 1000

根据该写法,那在脚本模式中的写法将是:
"query": "{'updated_at':{'$gte':ISODate('${last_day}T00:00:00.424+0800').valueOf() / 1000}}",

但是我们运行后,程序报错无法运行了,报错信息为说是格式的问题,解析不了。

报错信息如下:
2017-06-23 20:46:32.647 [job-37098472] ERROR JobContainer - Exception when job run

org.bson.json.JsonParseException: Invalid JSON input. Position: 62. Character: '.'.

当mongodb中字段为unix时间戳时,如果使用query参数进行过滤?

我们花了时间查看文档依旧没有得到答案,所以前来请教一下。

最终MaxCompute技术人员给的解决方案如下:
  1. dataworks 里面配置shell + datax, 在shell 里面讲dataworks的调度时间参数yyyy-mm-dd格式 转换为 unix时间戳格式, 时间戳传递给datax,再由datax发给mongodb server 做数据过滤查询。
  2. 在mongodb server 表里面添加时间列,时间列的值具体是你原来整数unix时间戳列生成的, 这个只需要操作修改一次。

4.2 脚本模式下mongodb同步MaxCompute,同步速度如何优化?

因为各种原因我们一开始是拿MongoDB进行的测试,为此专门找DBA挑一张较小的数据表( 约3GB左右)进行。按照文档中描述,使用脚本模式根据文档进行了配置,确实很挺简单的,所以进行任务测试,任务如期正常运行,但经过观察发现数据同步的速度逐步下降,最让人无法忍受的只有10KB/S,该表同步了20多个小时都没同步完,最后任务自行中止了。经过询问相关技术人员,可能由于Mongodb同步数据的底层实现还存在一些问题导致同步速度过慢导致的。为此只能选择替代方案,主要集中在“基于Kettle的MaxCompute插件”及“基于Tunnel SDK开发”,经过比对后考虑到开发速度及同步速度,最终选择暂时采用Kettle来进行mongodb的导入工作。

如下为导入速度对比结果:
image009

4.3 基于Kettle的MaxCompute插件同步mongodb遇到的问题

以下为我的同事在使用中的经验总结,在此我一并提一下:
MaxCompute的kettle插件的问题(个人只用过Aliyun MaxCompute Output,没有用过Aliyun MaxCompute Input,因此只对Output部分做评价):

  1. 没有提供更新的功能,只能插入,做增量更新的时候会很棘手。
  2. 没有提供输出流,无法记录日志。
  3. 不支持从上一步获取变量的功能,对于想要使用变量替换的操作很麻烦。
  4. EndPoint、AccessId等配置不支持从配置文件读取,十分不灵活。
  5. 文档太少,没有提供对MongoDB等数据库的帮助文档。

4.4 Mysql全库同步工具时间字段是unix时间戳没办法用。

因为我们有大量的Mysql数据表需要导入,发现有个整库迁移的工具可以使用,刚发现时就怦然心动,然而经过实测后大失所望,因为该工具日期字段的值必须是yyyymmdd格式,像我们使用UNIX时间戳压根没法用。
另外该功能不支持表字段指定,如果某些数据表字段过于敏感想不进行同步,也不能设置。
image010

4.4 大数据开发套件中“数据表管理”如果表数量过多就不便于查找,且数据表分类只有一级。

如果左侧能提供一个树形菜单将多级分类列出,点击就能查看对应分类下的表就方便多了。

五、感谢

接触一件新事务时遇到了困难,并且自己无法解决时,难免会感到彷徨无助,有种挫败感。
万幸的是我在钉钉上找到了组织,非常感谢阿里云MaxCompute项目组的@数加·祎休(yī xiū)@一斅@彭敏@李珍珍等工作人员的鼎力支持,在我们提交问题后,不厌其烦及时并且认真的回答解决我们的问题,祝你们工作顺利,事事顺心。

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
存储 大数据 API
大数据隐私保护策略:加密、脱敏与访问控制实践
【4月更文挑战第9天】本文探讨了大数据隐私保护的三大策略:数据加密、数据脱敏和访问控制。数据加密通过加密技术保护静态和传输中的数据,密钥管理确保密钥安全;数据脱敏通过替换、遮蔽和泛化方法降低敏感信息的敏感度;访问控制则通过用户身份验证和权限设置限制数据访问。示例代码展示了数据库、文件系统和API访问控制的实施方式,强调了在实际应用中需结合业务场景和平台特性定制部署。
62 0
|
3月前
|
数据采集 监控 算法
利用大数据和API优化电商决策:商品性能分析实践
在数据驱动的电子商务时代,大数据分析已成为企业提升运营效率、增强市场竞争力的关键工具。通过精确收集和分析商品性能数据,企业能够洞察市场趋势,实现库存优化,提升顾客满意度,并显著增加销售额。本文将探讨如何通过API收集商品数据,并将这些数据转化为对电商平台有价值的洞察。
|
3月前
|
存储 数据可视化 数据管理
基于阿里云服务的数据平台架构实践
本文主要介绍基于阿里云大数据组件服务,对企业进行大数据平台建设的架构实践。
749 2
|
5月前
|
存储 SQL 分布式计算
开源大数据比对平台设计与实践—dataCompare
开源大数据比对平台设计与实践—dataCompare
81 0
|
5月前
|
SQL 存储 大数据
某互联网大厂亿级大数据服务平台的建设和实践
某互联网大厂亿级大数据服务平台的建设和实践
75 0
|
7月前
|
机器学习/深度学习 分布式计算 数据挖掘
MaxCompute 应用场景实践
MaxCompute 应用场景实践
101 0
|
8月前
|
SQL 存储 分布式计算
MaxCompute元数据使用实践--项目信息统计
MaxCompute的租户级别Information Schema从租户角度提供项目元数据及使用历史数据等信息,您可以一次性拉取您同一个元数据中心下所有Project的某类元数据,从而进行各类元数据的统计分析。
526 0
|
2天前
|
数据采集 供应链 安全
利用大数据优化业务流程:策略与实践
【5月更文挑战第11天】本文探讨了利用大数据优化业务流程的策略与实践,包括明确业务目标、构建大数据平台、数据采集整合、分析挖掘及流程优化。通过实例展示了电商和制造企业如何利用大数据改进库存管理和生产流程,提高效率与客户满意度。随着大数据技术进步,其在业务流程优化中的应用将更加广泛和深入,企业需积极采纳以适应市场和客户需求。
|
1月前
|
缓存 运维 NoSQL
面试分享:Redis在大数据环境下的缓存策略与实践
【4月更文挑战第10天】探索Redis在大数据缓存的关键作用,本文分享面试经验及必备知识点。聚焦Redis数据结构(String、List、Set、Hash、Sorted Set)及其适用场景,缓存策略(LRU、LFU、TTL)与过期机制,集群和数据分片,以及性能优化和运维技巧。通过代码示例深入理解,助你面试成功,构建高效缓存服务。
49 4
|
6月前
|
分布式计算 Kubernetes Spark
米哈游大数据云原生实践
近年来,容器、微服务、Kubernetes 等各项云原生技术的日渐成熟,越来越多的公司开始选择拥抱云原生,并开始将 AI、大数据等类型的企业应用部署运行在云原生之上。以 Spark 为例,在云上运行 Spark 可以充分享有公共云的弹性资源、运维管控和存储服务等,并且业界也涌现了不少 Spark 。

热门文章

最新文章