开发者社区> xiaohei.info> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

MapReduce程序开发

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/51325813 通过AP...
+关注继续查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/51325813

通过API操作之前要先了解几个基本知识

基本数据类型

Hadoop的基本数据类型和Java的基本数据类型是不一样的,但是都存在对应的关系

如下图
数据类型关系

如果需要定义自己的数据类型,则必须实现Writable
hadoop的数据类型可以通过get方法获得对应的java数据类型
而java的数据类型可以通过hadoop数据类名的构造函数,或者set方法转换

关于Hadoop的Writable接口,详情请看Hadoop I/O中的序列化部分

MapReduce执行的基本步骤

Hadoop提交作业的的步骤分为八个,可以理解为天龙八步

Map端工作

  • 1.1 读取要操作的文件–这步会将文件的内容格式化成键值对的形式,键为每一行的起始位置偏移,值为每一行的内容。
  • 1.2 调用map进行处理–在这步使用自定义的Mapper类来实现自己的逻辑,输入的数据为1.1格式化的键值对,输入的数据也是键值对的形式。
  • 1.3 对map的处理结果进行分区–map处理完毕之后可以根据自己的业务需求来对键值对进行分区处理,比如,将类型不同的结果保存在不同的文件中等。这里设置几个分区,后面就会有对应的几个Reducer来处理相应分区中的内容。
  • 1.4 分区之后,对每个分区的数据进行排序,分组–排序按照从小到大进行排列,排序完毕之后,会将键值对中,key相同的选项 的value进行合并。如,所有的键值对中,可能存在
    hello 1
    hello 1
    key都是hello,进行合并之后变成
    hello 2
    可以根据自己的业务需求对排序和合并的处理进行干涉和实现。
  • 1.5 归约(combiner)–简单的说就是在map端进行一次reduce处理,但是和真正的reduce处理不同之处在于:combiner只能处理本地数据,不能跨网络处理。通过map端的combiner处理可以减少输出的数据,因为数据都是通过网络传输的,其目的是为了减轻网络传输的压力和后边reduce的工作量。并不能取代reduce。

Reduce端工作

  • 2.1 通过网络将数据copy到各个reduce。
  • 2.2 调用reduce进行处理–reduce接收的数据是整个map端处理完毕之后的键值对,输出的也是键值对的集合,是最终的结果。
  • 2.3 将结果输出到hdfs文件系统的路径中。

程序开发

开发流程

一般情况下我们不可能直接在生产环境中直接拿海量数据进行程序开发,不说程序有没有bug,能不能跑起来都是一个问题
所以通常步骤是:

1.本地程序开发测试:从海量数据中抽取小部分数据到本地,使用IDE等工具进行开发,并在测试数据集上进行程序运行、逻辑等测试
2.集群环境运行:本地测试通过后,就可以将代码运行在海量数据中,这时候90%的小bug已经得到修复了,剩余要解决的就是生产环境中的问题了
3.调优:程序能够在集群中运行起来并不代表成功,通过一些集群、程序调优方式可以让你的代码跑的更好、更快

导包

新建一个java项目,并导入hadoop包,在项目选项上右键,如图选择
示例

找到hadoop的安装目录,选择所有的包
示例

在找到hadoop安装目录下的lib,导入其中的所有包
示例

代码

新建JMapper类为自定义的Mapper类

import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  

//自定义的Mapper类必须继承Mapper类,并重写map方法实现自己的逻辑  
public class JMapper extends Mapper<LongWritable, Text, Text, LongWritable> {  
    //处理输入文件的每一行都会调用一次map方法,文件有多少行就会调用多少次  
    protected void map(  
            LongWritable key,  
            Text value,  
            org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)  
            throws java.io.IOException, InterruptedException {  
        //key为每一行的起始偏移量  
        //value为每一行的内容  

        //每一行的内容分割,如hello   world,分割成一个String数组有两个数据,分别是hello,world  
        String[] ss = value.toString().toString().split("\t");  
        //循环数组,将其中的每个数据当做输出的键,值为1,表示这个键出现一次  
        for (String s : ss) {  
            //context.write方法可以将map得到的键值对输出  
            context.write(new Text(s), new LongWritable(1));  
        }  
    };  
}  

新建JReducer类为自定义的Reducer

import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  

//自定义的Reducer类必须继承Reducer,并重写reduce方法实现自己的逻辑,泛型参数分别为输入的键类型,值类型;输出的键类型,值类型;之后的reduce类似  
public class JReducer extends Reducer<Text, LongWritable, Text, LongWritable> {  
    //处理每一个键值对都会调用一次reduce方法,有多少个键值对就调用多少次  
    protected void reduce(  
            Text key,  
            java.lang.Iterable<LongWritable> value,  
            org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)  
            throws java.io.IOException, InterruptedException {  
        //key为每一个单独的单词,如:hello,world,you,me等  
        //value为这个单词在文本中出现的次数集合,如{1,1,1},表示总共出现了三次  
        long sum = 0;  
        //循环value,将其中的值相加,得到总次数  
        for (LongWritable v : value) {  
            sum += v.get();  
        }  
        //context.write输入新的键值对(结果)  
        context.write(key, new LongWritable(sum));  
    };  
}  

新建执行提交作业的类,取名JSubmit

import java.io.IOException;  
import java.net.URI;  
import java.net.URISyntaxException;  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

public class JSubmit {  
    public static void main(String[] args) throws IOException,  
            URISyntaxException, InterruptedException, ClassNotFoundException {  
        //Path类为hadoop API定义,创建两个Path对象,一个输入文件的路径,一个输入结果的路径  
        Path outPath = new Path("hdfs://localhost:9000/out");  
        //输入文件的路径为本地linux系统的文件路径  
        Path inPath = new Path("/home/hadoop/word");  
        //创建默认的Configuration对象  
        Configuration conf = new Configuration();  
        //根据地址和conf得到hadoop的文件系统独享  
        //如果输入路径已经存在则删除  
        FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf);  
        if (fs.exists(outPath)) {  
            fs.delete(outPath, true);  
        }  
        //根据conf创建一个新的Job对象,代表要提交的作业,作业名为JSubmit.class.getSimpleName()  
        Job job = new Job(conf, JSubmit.class.getSimpleName());  
        //1.1  
        //FileInputFormat类设置要读取的文件路径  
        FileInputFormat.setInputPaths(job, inPath);  
        //setInputFormatClass设置读取文件时使用的格式化类  
        job.setInputFormatClass(TextInputFormat.class);  

        //1.2调用自定义的Mapper类的map方法进行操作  
        //设置处理的Mapper类  
        job.setMapperClass(JMapper.class);  
        //设置Mapper类处理完毕之后输出的键值对 的 数据类型  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(LongWritable.class);  

        //1.3分区,下面的两行代码写和没写都一样,默认的设置  
        job.setPartitionerClass(HashPartitioner.class);  
        job.setNumReduceTasks(1);  
        //1.4排序,分组  

        //1.5归约,这三步都有默认的设置,如果没有特殊的需求可以不管
        //2.1将数据传输到对应的Reducer  

        //2.2使用自定义的Reducer类操作  
        //设置Reducer类  
        job.setReducerClass(JReducer.class);  
        //设置Reducer处理完之后 输出的键值对 的数据类型  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(LongWritable.class);  

        //2.3将结果输出  
        //FileOutputFormat设置输出的路径  
        FileOutputFormat.setOutputPath(job, outPath);  
        //setOutputFormatClass设置输出时的格式化类  
        job.setOutputFormatClass(TextOutputFormat.class);  

        //将当前的job对象提交  
        job.waitForCompletion(true);  
    }  

运行

运行java程序,可以再控制台看到提交作业的提示
示例

在hdfs中查看输出的文件
示例

程序调试

在集群上调试程序是十分困难的,就算采用最常见的println打印信息你也不知道该信息会在集群上哪个节点打印出来

在MapReduce中,可以使用系统错误信息+计数器的组合来进行调试,在map或者reduce函数中,我们可以这样做:

System.out.println("一些错误信息");
context.setStatus("关于错误的一些提示")
context.getCounter(计数器组名一般为枚举类型).increment(1);

在CLI中可以通过以下命令查看计数器:

mapred job -counter jobId counterGroup counterName

counterGroup:计数器组名,一般为枚举类型的全类名
counterName:计数器名,一般为枚举类型的值

性能调优

参考:MapReduce性能调优记录

JobControl

开发MapReduce程序的时候,我们需要考虑如何把需求转换为MapReduce模型来解决问题
对于一些复杂的场景,我们通常是使用多个Job来完成任务,而不是一个非常复杂的单一Job

所以一个完成的任务就可能会有多个Job,一个Job有可能有多个Mapper的情况,参考:
多个Mapper和Reducer处理多个输入

开发技巧

参考:MapReduce开发技巧

更多应用场景

下面是自己在学习过程中收集整理的一些MapReduce场景Demo,可以提供参考和帮助:
GitHub - chubbyjiang/MapReduce: MapReduce Demo

作者:@小黑

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【大数据优化】(二)MapReduce 优化方法
【大数据优化】(二)MapReduce 优化方法
22 0
【月刊】E-MapReduce 2021-10 产品月刊
10月 E-MapReduce 产品月刊为您带来 1.相关活动:云栖大会、开源主题直播回顾 2.版本发布 3.产品文档更新 4.十月精选文章推荐 。欢迎持续关注更多精彩内容!
165 0
【月刊】E-MapReduce 2021-08 产品月刊
8月 E-MapReduce 产品月刊为您带来 1.相关活动:大数据开发平台用户调研问卷 ;2.产品功能更新:EMR ClickHouse 独立集群公测发布 ;3.最佳实践 E-MapReduce 本地盘实例大规模数据集测试 。欢迎持续关注更多精彩内容!
283 0
Mapeduce编程八大步骤
Mapreduce编程八大步 1.1:指定读取的文件位于哪里 FileInputFormat.setInputPaths() 指定如何对输入文件进行格式化,把输入文本每一行解析为键值对 job.
741 0
MapReduce编程实例之自定义排序
任务描述: 给出一组数据,自定义排序的样式,第一列降序,相同时第二列升序 example Data: 2013 1 2013 5 2014 5 2014 8 2015 9 2015 4 Code: package mrTest; import java.
914 0
MapReduce框架排序和分组
前言:         Mapreduce框架就是map->reduce,其中Map中的是偏移量和行值,在其之前会使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
643 0
程序4
【程序4】 题目:将一个正整数分解质因数。例如:输入90,打印出90=2*3*3*5。 程序分析:对n进行分解质因数,应先找到一个最小的质数k,然后按下述步骤完成: (1)如果这个质数恰等于n,则说明分解质因数的过程已经结束,打印出即可。
567 0
程序3
【程序3】题目:打印出所有的"水仙花数",所谓"水仙花数"是指一个三位数,其各位数字立方和等于该数本身。例如:153是一个"水仙花数",因为153=1的三次方+5的三次方+3的三次方。1.程序分析:利用for循环控制100-999个数,每个数分解出个位,十位,百位。
465 0
+关注
163
文章
1
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载