Hadoop学习:深入解析MapReduce的大数据魔力(二)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: Hadoop学习:深入解析MapReduce的大数据魔力(二)

         

3.3 Shuffle 机制

3.3.1 Shuffle 机制

Map 方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

3.3.2 Partition 分区

1、问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机

归属地不同省份输出到不同文件中(分区)

2、默认Partitioner分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
 }
 public int getPartition(K key, V value, int numReduceTasks) {
 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
 }

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

3、自定义Partitioner步骤

(1)自定义类继承Partitioner,重写getPartition()方法

public class CustomPartitioner extends Partitioner<Text, FlowBean> {
 @Override
 public int getPartition(Text key, FlowBean value, int numPartitions) {
 // 控制分区代码逻辑
… …
 }
 }
 return partition;

(2)在Job驱动中,设置自定义Partitioner

 job.setPartitionerClass(CustomPartitioner.class);

(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

 job.setNumReduceTasks(5);

4、分区总结

(1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;

(3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个

ReduceTask,最终也就只会产生一个结果文件part-r-00000;

(4)分区号必须从零开始,逐一累加。

5、案例分析

例如:假设自定义分区数为5,则

(1)job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件

(2)job.setNumReduceTasks(2);会报错

(3)job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

3.3.3 Partition 分区案例实操

1)需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(1)输入数据

2)期望输出数据

手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到

一个文件中。

2)需求分析

1、需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

3)在案例2.3的基础上,增加一个分区类

package com.atguigu.mapreduce.partitioner; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Partitioner; 
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { 
    @Override 
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) 
{ 
        //获取手机号前三位prePhone 
        String phone = text.toString(); 
        String prePhone = phone.substring(0, 3); 
        //定义一个分区号变量partition,根据prePhone设置分区号 
        int partition; 
        if("136".equals(prePhone)){ 
            partition = 0; 
        }else if("137".equals(prePhone)){ 
            partition = 1; 
        }else if("138".equals(prePhone)){ 
            partition = 2; 
        }else if("139".equals(prePhone)){ 
            partition = 3; 
        }else { 
            partition = 4; 
        } 
        //最后返回分区号partition 
        return partition; 
    } 
} 

4)在驱动函数中增加自定义数据分区设置和ReduceTask设置

package com.atguigu.mapreduce.partitioner; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import java.io.IOException; 
public class FlowDriver { 
    public static void main(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException { 
        //1 获取job对象 
        Configuration conf = new Configuration(); 
        Job job = Job.getInstance(conf); 
        //2 关联本Driver类 
        job.setJarByClass(FlowDriver.class); 
        //3 关联Mapper和Reducer 
        job.setMapperClass(FlowMapper.class); 
        job.setReducerClass(FlowReducer.class); 
        //4 设置Map端输出数据的KV类型 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(FlowBean.class); 
        //5 设置程序最终输出的KV类型 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(FlowBean.class); 
        //8 指定自定义分区器 
        job.setPartitionerClass(ProvincePartitioner.class); 
        //9 同时指定相应数量的ReduceTask 
        job.setNumReduceTasks(5); 
        //6 设置输入输出路径 
        FileInputFormat.setInputPaths(job, new Path("D:\\inputflow")); 
        FileOutputFormat.setOutputPath(job, new Path("D\\partitionout")); 
        //7 提交Job 
        boolean b = job.waitForCompletion(true); 
        System.exit(b ? 0 : 1); 
    } 
} 

3.3.4 WritableComparable 排序

排序概述

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于

Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是

否需要。


默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。


对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使

用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数

据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大

小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到

一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者

数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完

毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

(1)部分排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

(2)全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在

处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

(3)辅助排序:(GroupingComparator分组)

(4)二次排序

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部

字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。


自定义排序WritableComparable原理分析

bean 对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override 
public int compareTo(FlowBean bean) { 
int result; 
// 按照总流量大小,倒序排列 
if (this.sumFlow > bean.getSumFlow()) { 
result = -1; 
}else if (this.sumFlow < bean.getSumFlow()) { 
result = 1; 
}else { 
result = 0; 
} 
return result; 
} 

3.3.5 Combiner 合并

(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

(2)Combiner组件的父类就是Reducer。

(3)Combiner和Reducer的区别在于运行的位置

Combiner是在每一个MapTask所在的节点运行;

Reducer是接收全局所有Mapper的输出结果;

应该跟Reducer的输入kv类型要对应起来。

Mapper

(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

(6)自定义Combiner实现步骤

(a)自定义一个Combiner继承Reducer,重写Reduce方法

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, 
IntWritable> { 
private IntWritable outV = new IntWritable(); 
@Override 
protected void reduce(Text key, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException { 
int sum = 0; 
for (IntWritable value : values) { 
sum += value.get(); 
} 
outV.set(sum); 
context.write(key,outV); 
} 
} 

(b)在Job驱动类中设置:

job.setCombinerClass(WordCountCombiner.class);

3.4 OutputFormat 数据输出

3.4.1 OutputFormat 接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat

接口。下面我们介绍几种常见的OutputFormat实现类。

1.OutputFormat实现类

2.默认输出格式TextOutputFormat

3.自定义OutputFormat

3.1 应用场景:

例如:输出数据到MySQL/HBase/Elasticsearch等存储框架中。

3.2 自定义OutputFormat步骤

➢ 自定义一个类继承FileOutputFormat。

➢ 改写RecordWriter,具体改写输出数据的方法write()。

3.4.2 自定义OutputFormat案例实操

1)需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(2)期望输出数据

2)需求分析

3)案例实操

(1)编写LogMapper类

package com.atguigu.mapreduce.outputformat; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
import java.io.IOException; 
public 
class 
LogMapper 
extends 
Mapper<LongWritable, 
Text,Text, 
NullWritable> { 
@Override 
protected void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException { 
//不做任何处理,直接写出一行log数据 
context.write(value,NullWritable.get()); 
} 
} 

(2)编写LogReducer类

package com.atguigu.mapreduce.outputformat; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
import java.io.IOException; 
public class LogReducer extends Reducer<Text, NullWritable,Text, 
NullWritable> { 
    @Override 
    protected void reduce(Text key, Iterable<NullWritable> values, Context 
context) throws IOException, InterruptedException { 
        // 防止有相同的数据,迭代写出 
        for (NullWritable value : values) { 
            context.write(key,NullWritable.get()); 
        } 
    } 
} 

(3)自定义一个LogOutputFormat类

package com.atguigu.mapreduce.outputformat; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import java.io.IOException; 
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> 
{ 
    @Override 
    public RecordWriter<Text, NullWritable> 
getRecordWriter(TaskAttemptContext job) throws IOException, 
InterruptedException { 
        //创建一个自定义的RecordWriter返回 
        LogRecordWriter logRecordWriter = new LogRecordWriter(job); 
        return logRecordWriter; 
    } 
} 

(4)编写LogRecordWriter类

package com.atguigu.mapreduce.outputformat; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import java.io.IOException; 
public class LogRecordWriter extends RecordWriter<Text, NullWritable> { 
    private FSDataOutputStream atguiguOut;
      private FSDataOutputStream otherOut; 
    public LogRecordWriter(TaskAttemptContext job) { 
        try { 
            //获取文件系统对象 
            FileSystem fs = FileSystem.get(job.getConfiguration()); 
            //用文件系统对象创建两个输出流对应不同的目录 
            atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log")); 
            otherOut = fs.create(new Path("d:/hadoop/other.log")); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
    @Override 
    public void write(Text key, NullWritable value) throws IOException, 
InterruptedException { 
        String log = key.toString(); 
        //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容 
        if (log.contains("atguigu")) { 
            atguiguOut.writeBytes(log + "\n"); 
        } else { 
            otherOut.writeBytes(log + "\n"); 
        } 
    } 
    @Override 
    public void close(TaskAttemptContext context) throws IOException, 
InterruptedException { 
        //关流 
        IOUtils.closeStream(atguiguOut); 
        IOUtils.closeStream(otherOut); 
    } 
} 

(5)编写LogDriver类

package com.atguigu.mapreduce.outputformat; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import java.io.IOException; 
public class LogDriver { 
    public static void main(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException { 
        Configuration conf = new Configuration(); 
        Job job = Job.getInstance(conf); 
        job.setJarByClass(LogDriver.class); 
        job.setMapperClass(LogMapper.class); 
        job.setReducerClass(LogReducer.class); 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(NullWritable.class); 
        ——————————————————————————— 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(NullWritable.class); 
//设置自定义的outputformat 
job.setOutputFormatClass(LogOutputFormat.class); 
FileInputFormat.setInputPaths(job, new Path("D:\\input")); 
//虽然我们自定义了 outputformat,但是因为我们的 outputformat 继承自
fileoutputformat 
//而fileoutputformat 要输出一个_SUCCESS文件,所以在这还得指定一个输出目录 
FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput")); 
boolean b = job.waitForCompletion(true); 
System.exit(b ? 0 : 1); 
} 
} 
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
29天前
|
存储 分布式计算 Hadoop
|
15天前
|
分布式计算 资源调度 Hadoop
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
|
16天前
|
存储 SQL 分布式计算
Hadoop生态系统概述:构建大数据处理与分析的基石
【8月更文挑战第25天】Hadoop生态系统为大数据处理和分析提供了强大的基础设施和工具集。通过不断扩展和优化其组件和功能,Hadoop将继续在大数据时代发挥重要作用。
|
17天前
|
资源调度 分布式计算 Hadoop
揭秘Hadoop Yarn背后的秘密!它是如何化身‘资源大师’,让大数据处理秒变高效大戏的?
【8月更文挑战第24天】在大数据领域,Hadoop Yarn(另一种资源协调者)作为Hadoop生态的核心组件,扮演着关键角色。Yarn通过其ResourceManager、NodeManager、ApplicationMaster及Container等组件,实现了集群资源的有效管理和作业调度。当MapReduce任务提交时,Yarn不仅高效分配所需资源,还能确保任务按序执行。无论是处理Map阶段还是Reduce阶段的数据,Yarn都能优化资源配置,保障任务流畅运行。此外,Yarn还在Spark等框架中展现出灵活性,支持不同模式下的作业执行。未来,Yarn将持续助力大数据技术的发展与创新。
27 2
|
13天前
|
SQL 分布式计算 数据可视化
基于Hadoop的大数据可视化方法
【8月更文第28天】在大数据时代,有效地处理和分析海量数据对于企业来说至关重要。Hadoop作为一个强大的分布式数据处理框架,能够处理PB级别的数据量。然而,仅仅完成数据处理还不够,还需要将这些数据转化为易于理解的信息,这就是数据可视化的重要性所在。本文将详细介绍如何使用Hadoop处理后的数据进行有效的可视化分析,并会涉及一些流行的可视化工具如Tableau、Qlik等。
36 0
|
13天前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
47 0
|
21天前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
32 0
|
4月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
65 1
|
3月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
42 1
|
3月前
|
数据采集 SQL 分布式计算

推荐镜像

更多
下一篇
DDNS