概述
这是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)。
1.Mapreduce支持将map输出的结果或者reduce输出的结果进行压缩,以减少网络IO或最终输出数据的体积
2.压缩特性运用得当能提高性能,但运用不当也可能降低性能
3.基本原则:
- 运算密集型的job,少用压缩
- IO密集型的job,多用压缩
MR支持的压缩编码
Reducer输出压缩
在配置参数或在代码中都可以设置reduce的输出压缩
1、在配置参数中设置
mapreduce.output.fileoutputformat.compress=false mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec mapreduce.output.fileoutputformat.compress.type=RECORD
2、在代码中设置
Job job = Job.getInstance(conf); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
Mapper输出压缩
在配置参数或在代码中都可以设置reduce
的输出压缩
1、在配置参数中设置
mapreduce.map.output.compress=false mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
2、在代码中设置:
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true); conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
压缩文件的读取
Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中:
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); //根据文件后缀名创建相应压缩编码的codec CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); //判断是否属于可切片压缩编码类型 if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); //如果是可切片压缩编码,则创建一个CompressedSplitLineReader读取压缩数据 in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { //如果是不可切片压缩编码,则创建一个SplitLineReader读取压缩数据,并将文件输入流转换成解压数据流传递给普通SplitLineReader读取 in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); //如果不是压缩文件,则创建普通SplitLineReader读取数据 in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; }