MaxCompute(原ODPS)任务优化之列裁剪

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介:   最近因为几个ODPS任务节点扣分严重,计算健康度一度堕落至85分的红线以下,上了一次黑榜,立马开始了艰苦的优化之旅。刚刚前几天搞定了两个OpenMR的列裁剪优化,略作记录。

免费开通大数据服务:https://www.aliyun.com/product/odps

转自kaiding


最近因为几个ODPS任务节点扣分严重,计算健康度一度堕落至85分的红线以下,上了一次黑榜,立马开始了艰苦的优化之旅。刚刚前几天搞定了两个OpenMR列裁剪优化,略作记录。

什么是列裁剪以及为什么要做列裁剪

列裁剪,即针对OpenMR任务Map阶段的输入,如果只使用了其中的某几列,则裁剪掉不需要使用的列,只指定需要使用的列。这样做的好处也就很明显了,减少网络I/O,提升Map计算效率等等。其实从使用上来看或许叫做输入列指定更顾名思义一些。

如何看任务是否需要列裁剪

这里贴一个本次重点做了列裁剪的节点,在列裁剪优化之前的logview(由于系统只保留最近几天的日志,所以这里就不贴链接了),这里就贴部分内容吧,具体查看路径为:
1. 在待优化节点右键查看运行日志
2. 找到logview轻点打开
3. ODPS Tasks->Detail
4. Main Content->JSONSummary

其中某一路输入的相关统计数据如下:

"M1_U13" :
                {
                    "writer_dumps" :
                    {
                        "R2_1" :
                        [
                            0,
                            0,
                            0
                        ]
                    },
                    "planned_memory" : 4096,
                    "input_attrs" :
                    {
                        "split" : 256
                    },
                    "user_counters" :
                    {
                        "INPUTS-TOTAL" :
                        {
                            "name" : "INPUTS-TOTAL",
                            "counters" :
                            [
                                {
                                    "name" : "one_of_your_input_table[用户XX信息表][ds=20161009]",
                                    "value" : 1776171427
                                }
                            ]
                        },
                        "INPUTS-SUCCESS" :
                        {
                            "name" : "INPUTS-SUCCESS",
                            "counters" :
                            [
                                {
                                    "name" : "one_of_your_input_table[用户XX信息表]_SUCCESS",
                                    "value" : 1776171425
                                }
                            ]
                        },
                        "ODPS_SDK_FRAMEWORK_COUNTER_GROUP" :
                        {
                            "name" : "ODPS_SDK_FRAMEWORK_COUNTER_GROUP",
                            "counters" :
                            [
                                {
                                    "name" : "input_col_total_num",
                                    "value" : 14896
                                },
                                {
                                    "name" : "input_col_used_num",
                                    "value" : 3724
                                }
                            ]
                        }
                    },
                    "total_instance_run_time" : 77213,
                    "output_record_counts" :
                    {
                        "R2_1" :
                        {
                            "R2_1" : 1776171425
                        }
                    },
                    "custom_info" :
                    {
                        "StringMemoryPoolSize" : "41963520"
                    },
                    "input_record_counts" :
                    {
                        "input" : 1776171427
                    },
                    "writer_bytes" :
                    {
                        "R2_1" :
                        {
                            "R2_1" : 54021298354
                        }
                    },
                    "reader_dumps" :
                    {

                    },
                    "input_record_count_stats" :
                    {
                        "input" :
                        [
                            830754,
                            1017349,
                            953937
                        ]
                    },
                    "reader_bytes" :
                    {
                        "input" : 484089191296
                    },
                    "instance_run_times" :
                    [
                        23,
                        74,
                        41
                    ],
                    "output_record_count_stats" :
                    {
                        "R2_1" :
                        [
                            830754,
                            1017349,
                            953937
                        ]
                    },
                    "task_run_time" : 173,
                    "planned_cpu" : 100,
                    "instance_count" : 1862,
                    "output_bytes_stats" :
                    {
                        "R2_1" :
                        [
                            25216988,
                            31049145,
                            29013458
                        ]
                    }
                }

OK,没错,input_col_total_numinput_col_used_num两个字段的数据就是我们此次优化需要关注的要点。可以看到这张输入表在Map阶段总共输入了14896列(这里的意思不是说输入表有1万多列,而是map个数乘以表的列数得到的Map阶段总的输入列数),但是实际使用了3724列(同理),利用率非常低,扣分就是从这里开始的。。。

其实回到FuxiJobs点击M1_U13,然后在列出的任意一个FuxInstance中点击Debug可以看到这个Map任务中的统计数据:

{
    "endTime" : 1476055269,
    "finishedPercentage" : 100,
    "gblCounter" : "{"Counters":"{\"CountersValue\":[]}","CustomInfo":{"StringMemoryPoolSize":"41963520","UserCounters":"{\n    \"INPUTS-SUCCESS\": {\n        \"counters\": [{\n                \"name\": \"one_of_your_input_table[用户XX信息表]_SUCCESS\",\n                \"value\": 963577}],\n        \"name\": \"INPUTS-SUCCESS\"},\n    \"INPUTS-TOTAL\": {\n        \"counters\": [{\n                \"name\": \"one_of_your_input_table[用户XX信息表][ds=20161009]\",\n                \"value\": 963577}],\n        \"name\": \"INPUTS-TOTAL\"},\n    \"ODPS_SDK_FRAMEWORK_COUNTER_GROUP\": {\n        \"counters\": [{\n                \"name\": \"input_col_total_num\",\n                \"value\": 8},\n            {\n                \"name\": \"input_col_used_num\",\n                \"value\": 2}],\n        \"name\": \"ODPS_SDK_FRAMEWORK_COUNTER_GROUP\"}}"},"InputCounters":{"input":{"bytes":248006624,"records":963577}},"OutputCounters":{"R2_1":{"bytes":29108313,"dumps":0,"records":963577}},"PanguReadBytes":[0,48051280,48,199955317],"PanguWriteBytes":[0,0,0,0]}",
    "history" :
    [

    ],
    "id" : "Odps/your_project_20161009231931375gj8pn6sb1_LOT_0_0_0_job0/M1_U13#1045_0",
    "input_bytes" : 248006624,
    "input_records" : 963577,
    "logId" : "PU1UQXVNVEF6TGpFd01TNHhPRGdzVDJSd2N5OTBZbDl0WkdOZk1qQXhOakV3TURreU16RTVNekV6TnpWbmFqaHdialp6WWpGZlRFOVVYekJmTUY4d1gycHZZakF2VFRGZlZURXpRSEpoWVdJeE1qTTRPUzVsZERFak1qUWFiYw==",
    "output_bytes" : 29108313,
    "output_records" : 963577,
    "startTime" : 1476055236,
    "status" : "Terminated",
    "st" : "2016-10-10 07:20:36",
    "et" : "2016-10-10 07:21:09",
    "latencynum" : 33,
    "latency" : 33,
    "barleft" : "8%",
    "barlen" : "19%"
}

从这个Map统计数据可以看到,输入表共输入8列,但实际使用了2列,即使用率只有1/4而已,浪费严重。

如何进行列裁剪优化

  • 如果你使用的是Java SDK,那么可以在指定输入表的同时指定输入列,如下:

    InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);
    
  • 如果你的MR任务输入表确定而不变,则可以参数化进行设置,如下:

    set odps.mapred.input.columns.test.wc_in=c1,c2; //设置test这个project的wc_in表的输入column为c1,c2这俩列
    

第二种方式我没有用过,目前只是实践了第一种方式,而这种方式的灵活性在于,如果你是通过配置文件来表达输入表和输入列时,可以实现只改配置而不改代码

由于目前提供的sdk不支持使用列索引来做列裁剪(个人觉得应该封装一下),如果因为历史原因,只在输入表的自定义配置文件中指定了输入表的列索引也没关系(如果配置文件比较大,输入表有几十个,我想你也不希望去重新找每张表列索引对应的列名),可以通过获取TableTableSchema,然后通过下标来获取到列名,简单看下代码:

Odps odps = SessionState.get().getOdps();
Tables tables = odps.tables();
Table table = tables.get(projectName, tableName);
TableSchema tableSchema = table.getSchema();

List<String> inputColumnNameList = new LinkedList<String>();
for(int index : oneInputTable.getInputTableIndexes) { // 这里是你从配置文件中获得的某张输入表的数据
    Column column = tableSchema.getColumn(index);
    if(column != null && !StringUtil.isBlank(column.getName())) {
        inputColumnNameList.add(column.getName());
    }
}

// 接下来拼成一个字符串数组就OK了

已经做了列裁剪优化,为啥利用率反而降低了

这里有一个注意点就是,如果优化前在Map任务中是通过输入表的列索引去获取记录值的话,需要同步做相应修改。这是因为指定了输入列之后,实际在Map阶段的输入表列数比原输入表要少,你可以理解为在Map阶段是一张全新的表,此时列索引应该使用这张经过裁剪后的新表的列索引,因此使用原来的列索引可能会取不到值或者取到错误列的值。建议在使用时需要使用列名来访问或者通过新的列索引来获取记录值。

其实我上面贴的logview中的内容就是因为我在Map阶段使用了原输入表的列索引来访问,导致了利用率反而降低了。比如表wc_in一共有0-9共10列,你只需要读索引为7、8、9这三列,但是你在进行了列裁剪优化之后,实际在Map阶段的输入表只有0、1、2这三个索引对应的值,此时如果你还是使用原来的列索引去读的话就读不到数据,导致该表的输入列利用率从原来的30%变成了0%。

因此在完成了列裁剪优化之后务必检查Map阶段读取Record的方式:
1. 如果是使用列索引的务必改成列名,
2. 或者如果不方便获取列名的,在setup阶段读取配置文件建立列索引的映射来解决该问题。


欢迎加入“数加·MaxCompute购买咨询”钉钉群(群号: 11782920)进行咨询,群二维码如下:

96e17df884ab556dc002c912fa736ef6558cbb51 
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
45 2
|
2月前
|
SQL 存储 分布式计算
ODPS技术架构深度剖析与实战指南——从零开始掌握阿里巴巴大数据处理平台的核心要义与应用技巧
【10月更文挑战第9天】ODPS是阿里巴巴推出的大数据处理平台,支持海量数据的存储与计算,适用于数据仓库、数据挖掘等场景。其核心组件涵盖数据存储、计算引擎、任务调度、资源管理和用户界面,确保数据处理的稳定、安全与高效。通过创建项目、上传数据、编写SQL或MapReduce程序,用户可轻松完成复杂的数据处理任务。示例展示了如何使用ODPS SQL查询每个用户的最早登录时间。
154 1
|
1月前
|
存储 算法 固态存储
大数据分区优化存储成本
大数据分区优化存储成本
31 4
|
1月前
|
存储 分布式计算 监控
大数据增加分区减少单个任务的负担
大数据增加分区减少单个任务的负担
33 1
|
1月前
|
存储 大数据 Serverless
大数据增加分区优化资源使用
大数据增加分区优化资源使用
28 1
|
1月前
|
存储 NoSQL 大数据
大数据 数据存储优化
【10月更文挑战第25天】
80 2
|
2月前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
36 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
2月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
48 1
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
43 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
3月前
|
SQL 机器学习/深度学习 分布式计算
dataworks节点任务
在DataWorks中,你可以通过拖拽节点以及连线来构建复杂的工作流,这样可以方便地管理多个任务之间的依赖关系。此外,DataWorks还提供了调度功能,使得这些任务可以在设定的时间自动执行。这对于构建自动化、定时的数据处理管道非常有用。
71 5

相关产品

  • 云原生大数据计算服务 MaxCompute