1. 输入表的列裁剪
对于列数特别多的输入表,Map阶段处理只需要其中的某几列,可以通过在添加输入表时明确指定输入的列,减少输入量;
例如只需要c1,c2俩列,可以这样设置:
InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);
设置之后,你在map里的读取到的Record也就只有c1,c2俩列,如果之前是使用列名获取Record数据的,不会有影响,而用下标获取的需要注意这个变化。
2. 减少中间环节
如果有多个MR作业,之间有关联关系,前一个作业的输出是后一个作业的输入,可以考虑采用Pipeline的模式,将多个串行的MR作业合并为一个,这样可以用更少的作业数量完成同样的任务,一方面减少中间落表造成的的多余磁盘IO,提升性能;另一方面减少作业数量使调度更加简单,增强流程的可维护性。具体使用方法参见Pipeline示例。
3. 避免资源重复读取
资源的读取尽量放置到setup阶段读取,避免资源的多次读取的性能损失,另外系统也有64次读取的限制,资源的读取参见使用资源示例。
4. 减少对象构造开销
对于Map/Reduce阶段每次都会用到的一些java对象,避免在map/reduce函数里构造,可以放到setup阶段,避免多次构造产生的开销;
{
...
Record word;
Record one;
public void setup(TaskContext context) throws IOException {
// 创建一次就可以,避免在map中每次重复创建
word = context.createMapOutputKeyRecord();
one = context.createMapOutputValueRecord();
one.set(new Object[]{1L});
}
...
}
5. 合理选择partition column或自定义partitioner
合理选择partition columns,可以使用JobConf#setPartitionColumns这个方法进行设置(默认是key schema定义的column),设置后数据将按照指定的列计算hash值分发到reduce中去, 避免数据倾斜导致作业长尾现象,如有必要也可以选择自定义partitioner,自定义partitioner的使用方法如下:
import com.aliyun.odps.mapred.Partitioner;
public static class MyPartitioner extends Partitioner {
@Override
public int getPartition(Record key, Record value, int numPartitions) {
// numPartitions即对应reducer的个数
// 通过该函数决定map输出的key value去往哪个reducer
String k = key.get(0).toString();
return k.length() % numPartitions;
}
}
在jobconf里进行设置:
jobconf.setPartitionerClass(MyPartitioner.class)
另外需要在jobconf里明确指定reducer的个数:
jobconf.setNumReduceTasks(num)
6. 合理使用combiner
如果map的输出结果中有很多重复的key,可以合并后输出,combine后可以减少网络带宽传输和一定shuffle的开销,如果map输出本来就没有多少重复的,就不要用combiner,用了反而可能会有一些额外的开销。combiner实现的是和reducer相同的接口,例如一个WordCount程序的combiner可以定义如下:
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long c = 0;
while (values.hasNext()) {
Record val = values.next();
c += (Long) val.get(0);
}
count.set(0, c);
context.write(key, count);
}
}
7. 设置合理的split size
map默认的split size是256MB,split size的大小决定了map的个数多少,如果用户的代码逻辑比较耗时,map需要较长时间结束,可以通过JobConf#setSplitSize方法适当调小split size的大小。然而split size也不宜设置太小,否则会占用过多的计算资源。
欢迎加入“数加·MaxCompute购买咨询”钉钉群(群号: 11782920)进行咨询,群二维码如下: