什么是数据倾斜
数据倾斜是指在分布式系统中,由于数据的不均匀分布或者任务的不平衡执行,导致某些节点或者任务处理的数据量远远大于其他节点或者任务,从而导致整个系统的性能下降或者崩溃的问题。
比如,我们有1000w条数据(0~10开头)需要进行WordCount,也就是统计每个数字出现的次数,但是由于数据分布很不均匀(5这个数字就占了910w左右的样子),这个时候我们如果来写一个MApReduce程序来进行次数统计。
//数据格式: 数字 [空格] 日志 10 INFO main org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
首先,Map端它肯定是中规中矩低按照每128MB切一片,然后一片开启一个 MapTask, 也没什么可以优化的。但是Reduce端就不一样了,我们知道,如果我们在提交作业的时候不去指定ReduceTask的个数的话,它默认只开启一个ReduceTask,也就是说,我们成百上千万个键值对都交给一个ReduceTask去处理,效率可想而知。
public int getPartition(K key, V value,int numReduceTasks) { //不指定reduceTask的个数的话则 numReduceTasks 的值默认为1,即开启一台ReduceTask return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
我们可以看到,当ReduceTask的个数为1时,任何键的哈希值对1 取余后都是0,这也就是为什么我们的输出文件只有一个且文件名为"part-000000"的原因。这个时候我们可以在提交作业的时候开启多个ReduceTask:
这里开启10个,因为开启十个就会产生十个分区,每个分区根据键的哈希值来分区,由于我们的数据中大约有910w数据的键就都是 5 ,所以可想而知,肯定5对应的那个ReduceTask压力很大,因为别的均摊下来也就10w条数据而已,但是以5为键的数据就有910w条,当别的ReduceTask都完成任务了,它还得老半天,显然是不行的。
job.setNumReduceTasks(10);
那么应该怎么办呢?这时候只需要我们对数据进行打散:
数据打散
怎么打散:
我们只需要在数字5后面拼接一个随机的数字标识即可,比如 "5_1"、"5_2"、..."5-10"等。这样,它的哈希值也会跟着被均匀分布,使得不同的键在不同的ReduceTask被合并。
public static class Mapper extends Mapper<LongWritable, Text,Text,LongWritable>{ private Text OUT_KEY = new Text(); private LongWritable OUT_VALUE = new LongWritable(1); Random random = new Random(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //数字切分,一次读取一行 String line = value.toString(); String[] words = line.split(" "); String key = words[0]; if("5".equals(key)){ //把倾斜的key打散,分成10份 key = "5"+"_"+random.nextInt(10); } OUT_KEY.set(key); //写出 context.write(OUT_KEY,OUT_VALUE); } }
这样,我们就可以轻松将原本光处理以“5”为键的 900 多 w 条数据就需要两三分钟,将压力分担到多个 ReduceTask,处理时间压缩到几十秒。