hadoop之数据压缩(12)

简介: hadoop之数据压缩(12)

hadoop数据压缩


你好! 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章,了解一下Markdown的基本语法知识。


概述


压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在Hadoop下,尤其是数据规模很大和工作负载密集的情况下,使用数据压缩显得非常重要。在这种情况下,I/O操作和网络数据传输要花大量的时间。还有,Shuffle与Merge过程同样也面临着巨大的I/O压力。


鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。


如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。


MR支持的压缩编码


image.png

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示

image.png

http://google.github.io/snappy/

On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.


压缩方式选择


1.Gzip压缩


优点:压缩率比较高,而且压缩/解压速度也比较快;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;大部分linux系统都自带gzip命令,使用方便。

缺点:不支持split。


应用场景:当每个文件压缩之后在140M以内的(1个块大小内),都可以考虑用gzip压缩格式。例如说一天或者一个小时的日志压缩成一个gzip文件,运行mapreduce程序的时候通过多个gzip文件达到并发。hive程序,streaming程序,和java写的mapreduce程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改。


2.Bzip2压缩


优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native(java和c互操作的API接口);在linux系统下自带bzip2命令,使用方便。


缺点:压缩/解压速度慢;不支持native。


应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。


3.Lzo压缩


优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;可以在linux系统下安装lzop命令,使用方便。


缺点:压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。


应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越越明显。


4.Snappy压缩


优点:高速压缩速度和合理的压缩率。


缺点:不支持split;压缩率比gzip要低;hadoop本身不支持,需要安装;


应用场景:当Mapreduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个Mapreduce作业的输出和另外一个Mapreduce作业的输入。


5.压缩位置选择


要在Hadoop中启用压缩,可以配置如下参数:

image.png


6.压缩实战


public class CompressDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
//        compress("F:\\input\\phone.txt","org.apache.hadoop.io.compress.GzipCodec");
//        decompression("F:\\input\\phone.txt.gz","txt");
    }
    /**
     * 压缩
     * @param filename 文件路径+文件名
     * @param method 解码器
     *
     * */
    public static void compress(String filename,String method) throws IOException, ClassNotFoundException {
        //创建输入流
        FileInputStream fis = new FileInputStream(new File(filename));
        //通过反射找到解码器
        Class codeClass = Class.forName(method);
        //通过反射工具列找到解码器的对象,需要配置Hadoop的conf
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass,new Configuration());
        //创建输出流
        FileOutputStream fos= new FileOutputStream(new File(filename+codec.getDefaultExtension()));
        //获取解码器的输出对象
        CompressionOutputStream cos = codec.createOutputStream(fos);
        //对接流
        IOUtils.copyBytes(fis,cos,1024*1024*5,true);
    }
    //解压缩
    public static void decompression(String filename,String decoded) throws IOException {
        //获取factory实例
        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
        CompressionCodec codec = factory.getCodec(new Path(filename));
        //配置解压缩的输入
        CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
        //输出流
        FileOutputStream fos = new FileOutputStream(new File(filename+"."+decoded));
        //流拷贝
        IOUtils.copyBytes(cis,fos,1024*1024*5,true);
    }
}

46a9d80a6e05e4e3b19d57a0ee70bcdf.png

1:原文件

2:压缩后的文件

3:解压后的文件


wordcount程序压缩


mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
//坑,自动导包Text很容易导成java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
/**
 * <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * <LongWritable, Text, Text, IntWritable>
 *                       apache 1
 *快捷键:查看父类的方法 ctrl+o
 *       补全 Ctrl+Alt+v
 * */
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value,
                       Context context) throws IOException, InterruptedException {
        //context.write(key,new IntWritable(1));
        System.out.println(key.toString());
        //拿到数据,进行数据转换Text=》String
        String line = value.toString();
        //按照空格切分
        String[] split = line.split(",");
        //输出数据 KEYOUT, VALUEOUT
        for (String s:split) {
            //数据转换String=》Text   int=》IntWritable
            context.write(new Text(s),new IntWritable(1));
        }
//        Arrays.asList(split).forEach((s) -> {
//            try {
//                context.write(new Text(s),new IntWritable(1));
//            } catch (IOException e) {
//                e.printStackTrace();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        });
    }
}


combine

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombine extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        //初始化一个计数器
        int count=0;
        //开始计数
        for (IntWritable value:values) {
            count=count+value.get();
        }
        //输出  int=》IntWritable
        context.write(key,new IntWritable(count));
    }
}


partition

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartition extends Partitioner <Text, IntWritable>{
    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        //拿到数据
        String t = text.toString();
        //得到每个单词的长度
        int length = t.length();
        if(length%2==0){
            return 0;
        }else {
            return 1;
        }
    }
}


reduce

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *<Text,IntWritable,Text,IntWritable>
 *
 * */
public class WordCountReduce extends Reducer <Text,IntWritable,Text,IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        //初始化一个计数器
        int count=0;
        //开始计数
        for (IntWritable value:values) {
            count=count+value.get();
//            System.out.println("aaaaa");
        }
//        System.out.println("===============分隔符====================");
        //输出  int=》IntWritable
        context.write(key,new IntWritable(count));
    }
}


测试类

public class WordCountMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        long starttime = System.currentTimeMillis();
        args=new String[]{"F:\\input\\wordcount.txt","F:\\output\\wordcountcompress"};
        //获取配置文件
        Configuration conf = new Configuration();
//        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
//        conf.set(NLineInputFormat.LINES_PER_MAP,"2");
        //开启map端的输出压缩
        conf.setBoolean("mapreduce.map.output.compress", true);
        //设置压缩方式
        //conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.c lass, CompressionCodec.class);
        conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
        //创建job任务
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountMain.class);
        //指定Map类和map的输出类型 Text, IntWritable
        job.setMapperClass(WordCountMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //指定Reducer类和reduce的输出数据类型 Text,IntWritable
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置分区
//        job.setPartitionerClass(WordCountPartition.class);
//        job.setNumReduceTasks(2);
        //设置预合并combine
//        job.setCombinerClass(WordCountCombine.class);
        //如果不设置InputFormat,它默认用的是TextInputFormat.class
//        job.setInputFormatClass(CombineTextInputFormat.class);
//        CombineTextInputFormat.setMaxInputSplitSize(job, 3*1024*1024);// 4m
//        CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m
//        job.setInputFormatClass(KeyValueTextInputFormat.class);
//        job.setInputFormatClass(NLineInputFormat.class);
        //指定数据输入的路径和输出的路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //设置reduce压缩
        FileOutputFormat.setCompressOutput(job,true);
        //设置压缩格式
        FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
        //提交任务
        job.waitForCompletion(true);
        long endtime = System.currentTimeMillis();
        System.out.println((endtime-starttime)/1000);
    }
}


运行结果:

66ba272a0bfc97be54a5fa679e3d5482.png


相关文章
|
8月前
|
分布式计算 Hadoop 大数据
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
|
9月前
|
分布式计算 算法 Hadoop
Hadoop学习---8、Hadoop数据压缩
Hadoop学习---8、Hadoop数据压缩
Hadoop学习---8、Hadoop数据压缩
|
2天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
94 2
|
2天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
2天前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
37 0
|
2天前
|
分布式计算 资源调度 Hadoop
java与大数据:Hadoop与MapReduce
java与大数据:Hadoop与MapReduce
27 0
|
2天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。
|
2天前
|
分布式计算 Hadoop 大数据
[大数据] mac 史上最简单 hadoop 安装过程
[大数据] mac 史上最简单 hadoop 安装过程

相关实验场景

更多