MapReduce系统学习(2)

本文涉及的产品
系统运维管理,不限时长
简介: shuffer是一个网络拷贝的过程,是指通过网络把数据从map端拷贝到reduce端的过程.

Shuffle过程详解


shuffer是一个网络拷贝的过程,是指通过网络把数据从map端拷贝到reduce端的过程.

20210823231302370.png

map阶段


最左边有一个inputsplit,最终会产生一个map任务,map任务在执行的时候会k1,v1转化为k2,v2,这些数据会先临时存储到一个内存缓冲区中,这个内存缓冲区的大小默认是100M

(io.sort.mb属性),当达到内存缓冲区大小的80%(io.sort.spill.percent)也就是80M的时候,会把内存中的数据溢写到本地磁盘中(mapred.local.dir),一直到map把所有的数据都计算完,最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中,在这个图里面表示产生了3个临时文件,每个临时文件中有3个分区,这是由于map阶段中对数据做了分区,所以数据在存储的时候,在每个临时文件中也分为了3块

reduce阶段


最后需要对这些临时文件进行合并,合并为一个大文件,因为一个map任务最终只会产生一个文件,这个合并之后的文件也是有3个分区的

这3个分区的数据会被shuffle线程分别拷贝到三个不同的reduce节点,图里面只显示了一个reduce节 点,下面还有两个没有显示。不同map任务中的相同分区的数据会在同一个reduce节点进行合并,合并  以后会执行reduce的功能,最终产生结果数据。

在这里shuffle其实是横跨map端和reduce端的,它主要是负责把map端产生的数据通过网络拷贝到

reduce阶段进行统一聚合计算。

Hadoop中序列化机制


20210823232347195.png

开发MapReduce程序的时候使用到了LongWritable和Text这些数据类型,这些数据类   的是Java中的Long和String,那MapReduce为什么不直接使用Java中的这些数据类型呢?


因为java的反序列化和序列化效能不如hadoop,所以hadoop重写了序列化相关代码.


mapreduce的过程中当程序在向磁盘中写数据以及从磁盘中读取数据的时候会对数据进行序列化和反序列化,磁盘io这些步骤我们省略不了,但是我们可以从序列化和反序列化这一块来着手做一些优化

什么是序列化和反序列化?


20210823232409705.png

把内存中的数据写入到文件中的时候,会对数据序列化,然后再写入,这个序列化其实就是把内存中的对象信息转成二进制的形式,方便存储到文件中,默认java中的序列化会把对象及其父类、超类的整个继承体系信息都保存下来,这样存储的信息太大了,就会导致写入文件的信息过大,这写入是会额外消耗性能的。    

反序列化也是一样,reduce端想把文件中的对象信息加载到内存中,如果文件很大,在加载的时候也额外消耗很多性能,所以如果我们把对象存储的信息尽量精简,那么就可以提高数据写入和读取消耗的性能。

基于此, hadoop官方实现了自己的序列化和反序列化机制, 没有使用java中的序列化机制, 所

hadoop中的数据类型没有沿用java中的数据类型,而是自己单独设计了一些writable的实现了,例如 longwritable、text等

java序列化


public class JavaSerialize {
    public static void main(String[] args) throws Exception{
        //创建Student对象,并设置id和name属性
        StudentJava studentJava = new StudentJava();
        studentJava.setId(1L);
        studentJava.setName("Hadoop");
        //将Student对象的当前状态写入本地文件中
        FileOutputStream fos = new FileOutputStream("D:\\student_java.txt");
        ObjectOutputStream oos = new ObjectOutputStream(fos);
        oos.writeObject(studentJava);
        oos.close();
        fos.close();
    }
}
class StudentJava implements Serializable{
    private static final long serialVersionUID = 1L;
    private Long id;
    private String name;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

hadoop序列化


public class HadoopSerialize {
    public static void main(String[] args) throws Exception{
        //创建Student对象,并设置id和name属性
        StudentWritable studentWritable = new StudentWritable();
        studentWritable.setId(1L);
        studentWritable.setName("Hadoop");
        //将Student对象的当前状态写入本地文件中
        FileOutputStream fos = new FileOutputStream("D:\\student_hadoop.txt");
        ObjectOutputStream oos = new ObjectOutputStream(fos);
        studentWritable.write(oos);
        oos.close();
        fos.close();
    }
}
class StudentWritable implements Writable{
    private Long id;
    private String name;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(this.id);
        out.writeUTF(this.name);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readLong();
        this.name = in.readUTF();
    }
}

这段代码执行会报错:

20210824214607157.png

因为在pom中hadoop-client的scope 标签设定为provide代表只有编译的时候使用.

20210824214727886.png

注释掉这个标签执行成功.

两种反序列化的比较


20210824215058489.png

InputFormat分析


20210824215355948.png

fileFormat源代码

public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
//获取InputSplit的size的最小值minSize和最大值maxSize
  /*
getFormatMinSplitSize()=1
getMinSplitSize(job)=0
所以最终minSize=1
  */
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  /*
getMaxSplitSize(job)=Long.MAX_VALUE
所以maxSize等于Long的最大值
  */
  long maxSize = getMaxSplitSize(job);
// 创建List,准备保存生成的InputSplit
List<InputSplit> splits = new ArrayList<InputSplit>();
//获取输入文件列表
List<FileStatus> files = listStatus(job);
  /*
!getInputDirRecursive(job) = !false = true  
job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS,
所以ignoreDirs=false
  */  
  boolean ignoreDirs = !getInputDirRecursive(job)
  &&  
//迭代输入文件列表
for (FileStatus file: files) {
//是否忽略子目录,默认不忽略
if (ignoreDirs && file.isDirectory()) {
continue;
  }
//获取 文件/目录 路径
Path path = file.getPath();
//获取 文件/目录 长度
long length = file.getLen();
if (length != 0) {
//保存文件的Block块所在的位置
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
  }
//判断文件是否支持切割,默认为true
if (isSplitable(job, path)) {
//获取文件的Block大小,默认128M
long blockSize = file.getBlockSize();
//计算split的大小
splitSize = Math.max(1, Math.min(Long.MAX_VALUE, 128))=128M=134217728
所以我们说默认情况下split逻辑切片的大小和Block size相等
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//还需要处理的文件剩余字节大小,其实就是这个文件的原始大小long bytesRemaining = length;
//
/*
SPLIT_SLOP = 1.1
文件剩余字节大小/1134217728【128M】 > 1.1
意思就是当文件剩余大小bytesRemaining与splitSize的比值还大于1.1的时候,就继否则,剩下的直接作为一个InputSplit
敲黑板,划重点:只要bytesRemaining/splitSize<=1.1就会停止划分,将剩下的作
*/
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//组装InputSplit
/*
生成InputSplit path:路径
length-bytesRemaining:起始位置splitSize:大小
blkLocations[blkIndex].getHosts()和blkLocations[blkIndex].getCached makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts())
*/
splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;  
}
//最后会把bytesRemaining/splitSize<=1.1的那一部分内容作为一个InputSplit
if (bytesRemaining != 0) {  
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),  
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
//如果文件不支持切割,执行这里if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
//把不支持切割的文件整个作为一个InputSplit
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts( blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop();

下面我们来看几个面试题?


1.一个1G的文件,会产生多少个map任务?

Block块默认是128M,所以1G的文件会产生8个Block块

默认情况下InputSplit的大小和Block块的大小一致,每一个InputSplit会产生一个map任务     所以:1024/128=8个map任务

2.1000个文件,每个文件100KB,会产生多少个map任务?

一个文件,不管再小,都会占用一个block,所以这1000个小文件会产生1000个Block,  那最终会产生1000个InputSplit,也就对应着会产生1000个map任务

3.一个140M的文件,会产生多少个map任务? 根据前面的分析

140M的文件会产生2个Block,那对应的就会产生2个InputSplit了?  注意:这个有点特殊,140M/128M=1.09375<1.1

所以,这个文件只会产生一个InputSplit,也最终也就只会产生1个map 任务。这个文件其实再稍微大1M就可以产生2个map 任务了。

createRecordReader分析


createRecordReader的具体实现是在TextInputFormat类中


试想一个问题: 如果逻辑切分InputSplit时,一行数据横跨两个InputSplit时该如何处理?


hadoop中是这样处理的:


如果这个InputSplit不是第一个InputSplit,我们将会丢掉读取出来的第一行因为我们总是通过next()方法多读取一行(会多读取下一个InputSplit的第一行)

//如果start不等于0,表示不是第一个InputSplit,所以就把start的值重置为第二行的起始
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}

20210825225911848.png

源码解读

2021083021080861.png

  @Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    // 获取换行符,默认没有配置。
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
//    创建一个行阅读器
    return new LineRecordReader(recordDelimiterBytes);
  }

20210830210715428.png

创建行阅读器之前会执行这个初始化方法。

  //初始化方法
  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    //获取传过来的InputSplit,将InputSplit转换成子类FileSplit
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    //MAX_LINE_LENGTH对应的参数默认没有设置,所以会取Integer.MAX_VALUE
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    //获取InputSplit的起始位置
    start = split.getStart();
    //获取InputSplit的结束位置
    end = start + split.getLength();
    //获取InputSplit的路径
    final Path file = split.getPath();
    // open the file and seek to the start of the split
    // 打开文件,并且跳到InputSplit的起始位置
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    // 获取文件的压缩信息
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {
      //如果文件是压缩文件,则执行if中的语句
      isCompressedInput = true;
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new CompressedSplitLineReader(cIn, job,
            this.recordDelimiterBytes);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        if (start != 0) {
          // So we have a split that is only part of a file stored using
          // a Compression codec that cannot be split.
          throw new IOException("Cannot seek in " +
              codec.getClass().getSimpleName() + " compressed stream");
        }
        in = new SplitLineReader(codec.createInputStream(fileIn,
            decompressor), job, this.recordDelimiterBytes);
        filePosition = fileIn;
      }
    } else {
      //如果文件是未压缩文件(普通文件),则执行else中的语句
      //跳转到文件中的指定位置
      fileIn.seek(start);
      //针对未压缩文件,创建一个阅读器读取一行一行的数据
      in = new UncompressedSplitLineReader(
          fileIn, job, this.recordDelimiterBytes, split.getLength());
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    /*
       注意:如果这个InputSplit不是第一个InputSplit,我们将会丢掉读取出来的第一行因为我们总是通过next()方法多读取一行(会多读取下一个InputSplit的第一行)
      这就解释了这个问题:如果一行数据被拆分到了两个InputSplit中,会不会有问题?
    */
    if (start != 0) {
      //如果start不等于0,表示不是第一个InputSplit,所以就把start的值重置为第二行的起始
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
/*
    这个方法是核心的方法,会被框架调用,每调用一次,就会读取一行数据,最终获取到我们之前
    */
  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    // k1 就是每一行的起始位置
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        //读取一行数据,赋值给value,也就是v1
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }
      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }
      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

OutPutFormat分析


20210830211318638.png

源代码位置

20210830212103340.png

  public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
                                                  JobConf job,
                                                  String name,
                                                  Progressable progress)
    throws IOException {
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", 
                                       "\t");
    if (!isCompressed) {
      Path file = FileOutputFormat.getTaskOutputPath(job, name);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
      Class<? extends CompressionCodec> codecClass =
        getOutputCompressorClass(job, GzipCodec.class);
      // create the named codec
      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
      // build the filename including the extension
      Path file = 
        FileOutputFormat.getTaskOutputPath(job, 
                                           name + codec.getDefaultExtension());
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new LineRecordWriter<K, V>(new DataOutputStream
                                        (codec.createOutputStream(fileOut)),
                                        keyValueSeparator);
    }
  }

分析完源码能回答的问题


1.切分inputSplit 和block有什么联系?

2. 1个mapreduce 任务会生成几个map任务?

3. 针对一行文件被切分到多个inputSplit中的情况,mapreduce如何处理?


目录
相关文章
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
192 0
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力(三)
Hadoop学习:深入解析MapReduce的大数据魔力(三)
116 0
|
7月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
308 0
|
存储 分布式计算 Hadoop
Hadoop学习:深入解析MapReduce的大数据魔力(二)
Hadoop学习:深入解析MapReduce的大数据魔力(二)
169 0
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
8月前
|
存储 分布式计算 大数据
【云计算与大数据技术】大数据系统总体架构概述(Hadoop+MapReduce )
【云计算与大数据技术】大数据系统总体架构概述(Hadoop+MapReduce )
464 0
|
存储 分布式计算 Hadoop
Hadoop基础学习---6、MapReduce框架原理(一)
Hadoop基础学习---6、MapReduce框架原理(一)
|
存储 分布式计算 Hadoop
Hadoop学习:深入解析MapReduce的大数据魔力(一)
Hadoop学习:深入解析MapReduce的大数据魔力(一)
198 0
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
分布式计算 Hadoop 数据处理
Hadoop基础学习---6、MapReduce框架原理(二)
Hadoop基础学习---6、MapReduce框架原理(二)

相关实验场景

更多