MaxCompute(原ODPS)任务优化之列裁剪

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:   最近因为几个ODPS任务节点扣分严重,计算健康度一度堕落至85分的红线以下,上了一次黑榜,立马开始了艰苦的优化之旅。刚刚前几天搞定了两个OpenMR的列裁剪优化,略作记录。

免费开通大数据服务:https://www.aliyun.com/product/odps

转自kaiding


最近因为几个ODPS任务节点扣分严重,计算健康度一度堕落至85分的红线以下,上了一次黑榜,立马开始了艰苦的优化之旅。刚刚前几天搞定了两个OpenMR列裁剪优化,略作记录。

什么是列裁剪以及为什么要做列裁剪

列裁剪,即针对OpenMR任务Map阶段的输入,如果只使用了其中的某几列,则裁剪掉不需要使用的列,只指定需要使用的列。这样做的好处也就很明显了,减少网络I/O,提升Map计算效率等等。其实从使用上来看或许叫做输入列指定更顾名思义一些。

如何看任务是否需要列裁剪

这里贴一个本次重点做了列裁剪的节点,在列裁剪优化之前的logview(由于系统只保留最近几天的日志,所以这里就不贴链接了),这里就贴部分内容吧,具体查看路径为:
1. 在待优化节点右键查看运行日志
2. 找到logview轻点打开
3. ODPS Tasks->Detail
4. Main Content->JSONSummary

其中某一路输入的相关统计数据如下:

"M1_U13" :
                {
                    "writer_dumps" :
                    {
                        "R2_1" :
                        [
                            0,
                            0,
                            0
                        ]
                    },
                    "planned_memory" : 4096,
                    "input_attrs" :
                    {
                        "split" : 256
                    },
                    "user_counters" :
                    {
                        "INPUTS-TOTAL" :
                        {
                            "name" : "INPUTS-TOTAL",
                            "counters" :
                            [
                                {
                                    "name" : "one_of_your_input_table[用户XX信息表][ds=20161009]",
                                    "value" : 1776171427
                                }
                            ]
                        },
                        "INPUTS-SUCCESS" :
                        {
                            "name" : "INPUTS-SUCCESS",
                            "counters" :
                            [
                                {
                                    "name" : "one_of_your_input_table[用户XX信息表]_SUCCESS",
                                    "value" : 1776171425
                                }
                            ]
                        },
                        "ODPS_SDK_FRAMEWORK_COUNTER_GROUP" :
                        {
                            "name" : "ODPS_SDK_FRAMEWORK_COUNTER_GROUP",
                            "counters" :
                            [
                                {
                                    "name" : "input_col_total_num",
                                    "value" : 14896
                                },
                                {
                                    "name" : "input_col_used_num",
                                    "value" : 3724
                                }
                            ]
                        }
                    },
                    "total_instance_run_time" : 77213,
                    "output_record_counts" :
                    {
                        "R2_1" :
                        {
                            "R2_1" : 1776171425
                        }
                    },
                    "custom_info" :
                    {
                        "StringMemoryPoolSize" : "41963520"
                    },
                    "input_record_counts" :
                    {
                        "input" : 1776171427
                    },
                    "writer_bytes" :
                    {
                        "R2_1" :
                        {
                            "R2_1" : 54021298354
                        }
                    },
                    "reader_dumps" :
                    {

                    },
                    "input_record_count_stats" :
                    {
                        "input" :
                        [
                            830754,
                            1017349,
                            953937
                        ]
                    },
                    "reader_bytes" :
                    {
                        "input" : 484089191296
                    },
                    "instance_run_times" :
                    [
                        23,
                        74,
                        41
                    ],
                    "output_record_count_stats" :
                    {
                        "R2_1" :
                        [
                            830754,
                            1017349,
                            953937
                        ]
                    },
                    "task_run_time" : 173,
                    "planned_cpu" : 100,
                    "instance_count" : 1862,
                    "output_bytes_stats" :
                    {
                        "R2_1" :
                        [
                            25216988,
                            31049145,
                            29013458
                        ]
                    }
                }

OK,没错,input_col_total_numinput_col_used_num两个字段的数据就是我们此次优化需要关注的要点。可以看到这张输入表在Map阶段总共输入了14896列(这里的意思不是说输入表有1万多列,而是map个数乘以表的列数得到的Map阶段总的输入列数),但是实际使用了3724列(同理),利用率非常低,扣分就是从这里开始的。。。

其实回到FuxiJobs点击M1_U13,然后在列出的任意一个FuxInstance中点击Debug可以看到这个Map任务中的统计数据:

{
    "endTime" : 1476055269,
    "finishedPercentage" : 100,
    "gblCounter" : "{"Counters":"{\"CountersValue\":[]}","CustomInfo":{"StringMemoryPoolSize":"41963520","UserCounters":"{\n    \"INPUTS-SUCCESS\": {\n        \"counters\": [{\n                \"name\": \"one_of_your_input_table[用户XX信息表]_SUCCESS\",\n                \"value\": 963577}],\n        \"name\": \"INPUTS-SUCCESS\"},\n    \"INPUTS-TOTAL\": {\n        \"counters\": [{\n                \"name\": \"one_of_your_input_table[用户XX信息表][ds=20161009]\",\n                \"value\": 963577}],\n        \"name\": \"INPUTS-TOTAL\"},\n    \"ODPS_SDK_FRAMEWORK_COUNTER_GROUP\": {\n        \"counters\": [{\n                \"name\": \"input_col_total_num\",\n                \"value\": 8},\n            {\n                \"name\": \"input_col_used_num\",\n                \"value\": 2}],\n        \"name\": \"ODPS_SDK_FRAMEWORK_COUNTER_GROUP\"}}"},"InputCounters":{"input":{"bytes":248006624,"records":963577}},"OutputCounters":{"R2_1":{"bytes":29108313,"dumps":0,"records":963577}},"PanguReadBytes":[0,48051280,48,199955317],"PanguWriteBytes":[0,0,0,0]}",
    "history" :
    [

    ],
    "id" : "Odps/your_project_20161009231931375gj8pn6sb1_LOT_0_0_0_job0/M1_U13#1045_0",
    "input_bytes" : 248006624,
    "input_records" : 963577,
    "logId" : "PU1UQXVNVEF6TGpFd01TNHhPRGdzVDJSd2N5OTBZbDl0WkdOZk1qQXhOakV3TURreU16RTVNekV6TnpWbmFqaHdialp6WWpGZlRFOVVYekJmTUY4d1gycHZZakF2VFRGZlZURXpRSEpoWVdJeE1qTTRPUzVsZERFak1qUWFiYw==",
    "output_bytes" : 29108313,
    "output_records" : 963577,
    "startTime" : 1476055236,
    "status" : "Terminated",
    "st" : "2016-10-10 07:20:36",
    "et" : "2016-10-10 07:21:09",
    "latencynum" : 33,
    "latency" : 33,
    "barleft" : "8%",
    "barlen" : "19%"
}

从这个Map统计数据可以看到,输入表共输入8列,但实际使用了2列,即使用率只有1/4而已,浪费严重。

如何进行列裁剪优化

  • 如果你使用的是Java SDK,那么可以在指定输入表的同时指定输入列,如下:

    InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);
    
  • 如果你的MR任务输入表确定而不变,则可以参数化进行设置,如下:

    set odps.mapred.input.columns.test.wc_in=c1,c2; //设置test这个project的wc_in表的输入column为c1,c2这俩列
    

第二种方式我没有用过,目前只是实践了第一种方式,而这种方式的灵活性在于,如果你是通过配置文件来表达输入表和输入列时,可以实现只改配置而不改代码

由于目前提供的sdk不支持使用列索引来做列裁剪(个人觉得应该封装一下),如果因为历史原因,只在输入表的自定义配置文件中指定了输入表的列索引也没关系(如果配置文件比较大,输入表有几十个,我想你也不希望去重新找每张表列索引对应的列名),可以通过获取TableTableSchema,然后通过下标来获取到列名,简单看下代码:

Odps odps = SessionState.get().getOdps();
Tables tables = odps.tables();
Table table = tables.get(projectName, tableName);
TableSchema tableSchema = table.getSchema();

List<String> inputColumnNameList = new LinkedList<String>();
for(int index : oneInputTable.getInputTableIndexes) { // 这里是你从配置文件中获得的某张输入表的数据
    Column column = tableSchema.getColumn(index);
    if(column != null && !StringUtil.isBlank(column.getName())) {
        inputColumnNameList.add(column.getName());
    }
}

// 接下来拼成一个字符串数组就OK了

已经做了列裁剪优化,为啥利用率反而降低了

这里有一个注意点就是,如果优化前在Map任务中是通过输入表的列索引去获取记录值的话,需要同步做相应修改。这是因为指定了输入列之后,实际在Map阶段的输入表列数比原输入表要少,你可以理解为在Map阶段是一张全新的表,此时列索引应该使用这张经过裁剪后的新表的列索引,因此使用原来的列索引可能会取不到值或者取到错误列的值。建议在使用时需要使用列名来访问或者通过新的列索引来获取记录值。

其实我上面贴的logview中的内容就是因为我在Map阶段使用了原输入表的列索引来访问,导致了利用率反而降低了。比如表wc_in一共有0-9共10列,你只需要读索引为7、8、9这三列,但是你在进行了列裁剪优化之后,实际在Map阶段的输入表只有0、1、2这三个索引对应的值,此时如果你还是使用原来的列索引去读的话就读不到数据,导致该表的输入列利用率从原来的30%变成了0%。

因此在完成了列裁剪优化之后务必检查Map阶段读取Record的方式:
1. 如果是使用列索引的务必改成列名,
2. 或者如果不方便获取列名的,在setup阶段读取配置文件建立列索引的映射来解决该问题。


欢迎加入“数加·MaxCompute购买咨询”钉钉群(群号: 11782920)进行咨询,群二维码如下:

96e17df884ab556dc002c912fa736ef6558cbb51 
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
SQL 分布式计算 运维
如何对付一个耗时6h+的ODPS任务:慢节点优化实践
本文描述了大数据处理任务(特别是涉及大量JOIN操作的任务)中遇到的性能瓶颈问题及其优化过程。
|
12天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
27 2
|
22天前
|
存储 NoSQL 大数据
大数据 数据存储优化
【10月更文挑战第25天】
60 2
|
1月前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
31 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
1月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
37 1
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
39 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
SQL 存储 监控
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
52 0
|
1月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
100 0
|
1月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
41 0
|
3月前
|
存储 分布式计算 数据处理
MaxCompute 的成本效益分析与优化策略
【8月更文第31天】随着云计算技术的发展,越来越多的企业选择将数据处理和分析任务迁移到云端。阿里云的 MaxCompute 是一款专为海量数据设计的大规模数据仓库平台,它不仅提供了强大的数据处理能力,还简化了数据管理的工作流程。然而,在享受这些便利的同时,企业也需要考虑如何有效地控制成本,确保资源得到最优利用。本文将探讨如何评估 MaxCompute 的使用成本,并提出一些优化策略以降低费用,提高资源利用率。
217 0

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 下一篇
    无影云桌面