MaxCompute MapReduce的7个性能优化策略-阿里云开发者社区

开发者社区> 隐林> 正文

MaxCompute MapReduce的7个性能优化策略

简介: 本文收录了一些MaxCompute MapReduce优化策略。
+关注继续查看

1. 输入表的列裁剪

对于列数特别多的输入表,Map阶段处理只需要其中的某几列,可以通过在添加输入表时明确指定输入的列,减少输入量;
例如只需要c1,c2俩列,可以这样设置:

InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);

设置之后,你在map里的读取到的Record也就只有c1,c2俩列,如果之前是使用列名获取Record数据的,不会有影响,而用下标获取的需要注意这个变化。

2. 减少中间环节

如果有多个MR作业,之间有关联关系,前一个作业的输出是后一个作业的输入,可以考虑采用Pipeline的模式,将多个串行的MR作业合并为一个,这样可以用更少的作业数量完成同样的任务,一方面减少中间落表造成的的多余磁盘IO,提升性能;另一方面减少作业数量使调度更加简单,增强流程的可维护性。具体使用方法参见Pipeline示例

3. 避免资源重复读取

资源的读取尽量放置到setup阶段读取,避免资源的多次读取的性能损失,另外系统也有64次读取的限制,资源的读取参见使用资源示例

4. 减少对象构造开销

对于Map/Reduce阶段每次都会用到的一些java对象,避免在map/reduce函数里构造,可以放到setup阶段,避免多次构造产生的开销;

{
    ...
    Record word;
    Record one;

    public void setup(TaskContext context) throws IOException {


      // 创建一次就可以,避免在map中每次重复创建
      word = context.createMapOutputKeyRecord();

      one = context.createMapOutputValueRecord();

      one.set(new Object[]{1L});

    }
    ...
}

5. 合理选择partition column或自定义partitioner

合理选择partition columns,可以使用JobConf#setPartitionColumns这个方法进行设置(默认是key schema定义的column),设置后数据将按照指定的列计算hash值分发到reduce中去, 避免数据倾斜导致作业长尾现象,如有必要也可以选择自定义partitioner,自定义partitioner的使用方法如下:

import com.aliyun.odps.mapred.Partitioner;

public static class MyPartitioner extends Partitioner {

@Override
public int getPartition(Record key, Record value, int numPartitions) {
  // numPartitions即对应reducer的个数
  // 通过该函数决定map输出的key value去往哪个reducer
  String k = key.get(0).toString();
  return k.length() % numPartitions;
}
}

在jobconf里进行设置:

jobconf.setPartitionerClass(MyPartitioner.class)

另外需要在jobconf里明确指定reducer的个数:

jobconf.setNumReduceTasks(num)

6. 合理使用combiner

如果map的输出结果中有很多重复的key,可以合并后输出,combine后可以减少网络带宽传输和一定shuffle的开销,如果map输出本来就没有多少重复的,就不要用combiner,用了反而可能会有一些额外的开销。combiner实现的是和reducer相同的接口,例如一个WordCount程序的combiner可以定义如下:

/**
   * A combiner class that combines map output by sum them.
   */
  public static class SumCombiner extends ReducerBase {

    private Record count;

    @Override
    public void setup(TaskContext context) throws IOException {
      count = context.createMapOutputValueRecord();
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context)
        throws IOException {
      long c = 0;
      while (values.hasNext()) {
        Record val = values.next();
        c += (Long) val.get(0);
      }
      count.set(0, c);
      context.write(key, count);
    }
  }

7. 设置合理的split size

map默认的split size是256MB,split size的大小决定了map的个数多少,如果用户的代码逻辑比较耗时,map需要较长时间结束,可以通过JobConf#setSplitSize方法适当调小split size的大小。然而split size也不宜设置太小,否则会占用过多的计算资源。


欢迎加入“数加·MaxCompute购买咨询”钉钉群(群号: 11782920)进行咨询,群二维码如下:

96e17df884ab556dc002c912fa736ef6558cbb51 

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
java性能优化策略
· 当进行过多的字符串操作以避免不必要地创建那些最终必须经历垃圾收集的对象时,可使用 StringBuffer 功能而不是字符串并置。
602 0
Android 号码查询性能优化
我的需求是做一个快速拨号界面!list列表显示所有联系人Calllog资料!原来的做法在前面的日志中有提到!大概是先查 Cursor phoneCursor = this.managedQuery( ContactsContract.
505 0
进程上下文切换 – 残酷的性能杀手(上)(转载cppthinker.com)
对于服务器的优化,很多人都有自己的经验和见解,但就我观察,有两点常常会被人忽视 – 上下文切换 和 Cache Line同步 问题,人们往往都会习惯性地把视线集中在尽力减少内存拷贝,减少IO次数这样的问题上,不可否认它们一样重要,但一个高性能服务器需要更细致地去考察这些问题,这个问题我将分成两篇文章...
804 0
spring源码学习【准备】之jdk动态代理和cglib动态代理的区别和性能
一:区别:---->JDK的动态代理依靠接口实现,如果有些类并没有实现接口,则不能使用JDK代理,这就要使用cglib动态代理了。--->JDK的动态代理机制只能代理实现了接口的类,而不能实现接口的类就不能实现JDK的动态代理,cglib是针对类来实现代理的,他的原理是对指定的目标类生成一个子类,并覆盖其中方法实现增强,但因为采用的是继承,所以不能对final修饰的类进行代理。
917 0
+关注
隐林
阿里云大数据产品专家,擅长MaxCompute、机器学习、分布式、可视化、人工智能等大数据领域;
288
文章
38
问答
来源圈子
更多
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载