【分布式计算框架】 MapReduce编程初级实践

简介: 【分布式计算框架】 MapReduce编程初级实践

MapReduce编程初级实践

一、实验目的

  • 编程WordCount
  • 编程实现文件合并和去重操作
  • 编程实现对输入文件的排序

二、实验环境

  • centos 6.5
  • VMware Workstation

三、实验内容

mapreduce高可用环境配置

伪分布式(单节点)修改配置:

(1) mapred-site.xml

 <property>
     <name>mapreduce.framework.name</name>
     <value>yarn</value>
 </property>

(2) yarn-site.xml

 <property>
     <name>yarn.nodemanager.aux-services</name>
     <value>mapreduce_shuffle</value>
 </property>

复制目录D:\software\eclipce\workspace\Test20191909\src

cd /opt/20191909/hadoop-2.6.5/etc/hadoop 进入配置文件的目录

ll 查看文件

将yarn-site.xml和mapred-site.xml文件复制到D:\software\eclipce\workspace\Test20191909\src

新建一个类

启动resourcemanager服务

编程WordCount

(1)创建一个新文件

for i in `seq 100000`;do echo "hello jxxy$i" >> test.txt;done

(2)编程MyWC主类,MyMapper类,MyReducer类,制作jar包

  • MyWC主类
package com.sxt.mr.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyWC {
  public static void main(String[] args){
     Configuration conf=new Configuration();        

         try{

         //创建一个新作业
         Job job=Job.getInstance(conf);
         job.setJarByClass(MyWC.class); //jar包      
         job.setJobName("myjob");
       
         //job.setInputPath(new Path());
         //job.setOutputPath(new Path());
         Path inPath=new Path("/user/root/test.txt");
         FileInputFormat.addInputPath(job,inPath);
         //org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        
         Path outPath=new Path("/output/wordcount");
         //如果输出路径存在,则先删除
         if(outPath.getFileSystem(conf).exists(outPath))
            outPath.getFileSystem(conf).delete(outPath,true);
         FileOutputFormat.setOutputPath(job,outPath); 
         //org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

         //创建MyMapper,MyReducer两个类 
         job.setMapperClass(MyMapper.class);
         job.setMapOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);
         job.setReducerClass(MyReducer.class);
       
          //提交作业
         job.waitForCompletion(true);

          }
          catch(Exception e){
          }

  }
}

  • MyMapper类
package com.sxt.mr.wc;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<Object,Text,Text,IntWritable> {
    private final static IntWritable one=new IntWritable(1);
    private Text word=new Text();

    public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
       StringTokenizer str=new StringTokenizer(value.toString());
       while(str.hasMoreTokens()){
          word.set(str.nextToken());
          context.write(word,one);
       }
    }
    }


  • MyReducer类
package com.sxt.mr.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

//迭代计算
private IntWritable result=new IntWritable();

public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
   int sum=0;
   for(IntWritable val:values){
      sum+=val.get();
   }
   result.set(sum);
   context.write(key,result);
}
}

  • 打成jar包

将jar包上传

在集群上执行命令

hadoop jar wc.jar com.sxt.mr.wc.MyWC

(3)运行程序,统计test.txt文件hello和jxxy出现的次数

查看浏览器8088端口

编程实现文件合并和去重操作

对于两个输入文件,即文件A和文件B,编写程序对两个文件进行合并,并剔除其中重复的内容,

得到一个新的输出文件C。

样例如下:

文件A

20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x

文件B

20150101 y
20150102 y
20150103 x
20150104 z
20150105 y

合并如下

20150101 x  
20150101 y  
20150102 y  
20150103 x  
20150104 y  
20150104 z  
20150105 y  
20150105 z  
20150106 x  

代码如下

FileMergeDriver类

package com.sxt.file.test1;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FileMergeDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "FileMerge");
        job.setJarByClass(FileMergeDriver.class);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new FileMergeDriver(), args);
        System.exit(exitCode);
    }
}


MyReducer类

package com.sxt.file.test1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Text, Text, Text, Text> {
    private final static Text nonDuplicate = new Text();

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        nonDuplicate.set(""); // 设置空值
        context.write(key, nonDuplicate); // 将key写入输出,实现去重
    }
}

MyMapper类

package com.sxt.file.test1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
    private final static Text nonDuplicate = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        nonDuplicate.set(""); // 设置空值
        context.write(value, nonDuplicate); // 将每行文本作为key输出
    }
}

将jar包上传

准备测试数据

A.txt

B.txt

执行命令

hdfs dfs -put ./A.txt /user/root
hdfs dfs -put ./B.txt /user/root
hadoop jar test1.jar com.sxt.file.test1.FileMergeDriver /user/root/A.txt /user/root/B.txt /user/root/C.txt

查看结果与样例一致实验成功

编程实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的

文件中,输出的整数格式为每行两个整数,第一个整数位第二个整数的排序位次,第二个整数位原待排列的整数。

样例如下:

文件1

33
37
12
40

文件2

4
16
39
5

文件3

1
45
25

输出文件

1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45

代码如下

SortIntegers类

package com.sxt.file.test2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortIntegers {
    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "sort integers job");
            job.setJarByClass(SortIntegers.class);
            job.setMapperClass(SortIntegersMapper.class);
            job.setReducerClass(SortIntegersReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0])); // 输入文件路径
            FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出文件路径
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


SortIntegersReducer类

package com.sxt.file.test2;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class SortIntegersReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    private IntWritable rank = new IntWritable(1);
    private IntWritable number = new IntWritable();

    public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        for (IntWritable val : values) {
            number.set(key.get());
            context.write(rank, number);
            rank.set(rank.get() + 1);
        }
    }
}

SortIntegersMapper类

package com.sxt.file.test2;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortIntegersMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    private final static IntWritable lineNumber = new IntWritable(1);
    private IntWritable number = new IntWritable();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        int num = Integer.parseInt(line.trim());
        number.set(num);
        context.write(number, lineNumber);
        lineNumber.set(lineNumber.get() + 1);
    }
}

打成jar包

上传jar包

创建测试文件

1.txt

2.txt

3.txt

将文件上传到HDFS

hadoop fs -mkdir /user/root/sort_test
hdfs dfs -put ./1.txt /user/root/sort_test
hdfs dfs -put ./2.txt /user/root/sort_test
hdfs dfs -put ./3.txt /user/root/sort_test
hadoop jar test2.jar com.sxt.file.test2.SortIntegers /user/root/sort_test /user/root/sort_test/result

实验结果与样例一致,实验成功

四、出现的问题及解决方案

  1. DFS Location拒绝连接

    解决方案:

杀死所有java进程后开启集群

start-dfs.sh

start-yarn.sh

无效

新建一个Location,把端口号改为9000(这里是伪分布式的端口,之前没有修改,也是错误的原因)

解决


执行上传作业,版本不一致

解决方案

在 Eclipse 中选择要修改的项目,然后右键单击该项目,在弹出菜单中选择 “Properties”(属性)。

  1. 在弹出的对话框中,找到并展开 “Java Build Path”(Java 构建路径)选项。
  2. 在 “Java Build Path” 下,点击 “Libraries”(库),然后点击 “JRE System Library”。
  3. 点击 “Edit”(编辑)按钮,然后选择合适的 JRE(Java 运行时环境)版本。可以选择系统中已安装的 JRE 版本,也可以选择其他已配置的 JRE。
  4. 点击 “Finish”(完成)保存更改。
  5. 接下来,还需要修改项目的编译级别:
  • 在项目属性对话框中,找到 “Java Compiler”(Java 编译器)选项。
  • 确保选中 “Enable project specific settings”(启用项目特定设置)复选框。
  • 在 “Compiler compliance level”(编译器兼容性级别)下拉菜单中选择你想要的 Java 版本。
  • 点击 “Apply and Close”(应用并关闭)保存更改。

五、实验结果

运行程序,统计test.txt文件hello和jxxy出现的次数

运行程序,实现文件合并和去重操作

运行程序,实现对输入文件的排序

六、实验思考题

  1. 完全分布式配置哪些文件?
  • mapred-site.xml
  • yarn-site.xml
  1. 试述MapReduce的工作流程。

MapReduce是一种用于处理大规模数据集的并行计算编程模型。其工作流程包括以下几个步骤:

  1. 划分阶段(Input Split)
  • 输入数据集被划分成若干个输入切片(input splits),每个输入切片会被一个Map任务处理。
  1. 映射阶段(Map Stage)
  • 每个Map任务读取一个输入切片,并对其进行处理,生成中间键值对(key-value pairs)。
  • 中间键值对由用户自定义的Map函数生成,这些键值对会被分区函数划分到不同的Reducer任务。
  1. 分区和排序阶段(Shuffle and Sort Stage)
  • 中间键值对根据键进行排序,并按照分区函数的规则划分到不同的Reducer任务。
  • 分区函数的作用是确保相同键的键值对会被发送到同一个Reducer任务。
  1. 归约阶段(Combine Stage,可选)
  • 可选的归约(Combiner)函数可以在Map阶段后执行,用于局部聚合中间键值对,以减少数据传输量。
  1. 合并阶段(Reduce Stage)
  • 每个Reduce任务接收来自Map任务输出的中间键值对,并按照键进行分组。
  • Reduce任务对每个键的值列表执行用户定义的Reduce函数,生成最终的输出结果。
  1. 输出阶段(Output Stage)
  • 最终的输出结果会被写入HDFS或其他存储系统,用于进一步的分析或处理。
  • 输入数据集被划分成若干个输入切片(input splits),每个输入切片会被一个Map任务处理。

整个MapReduce过程涉及到数据的划分、映射、分区、排序、归约和合并等操作,通过并行化处理大规模数据集,实现高效的数据处理和分析。 MapReduce的工作流程简化了分布式计算任务的编写和执行,使得处理海量数据变得更加容易和高效。

相关文章
|
2月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
196 0
分布式爬虫框架Scrapy-Redis实战指南
|
1月前
|
存储 负载均衡 测试技术
ACK Gateway with Inference Extension:优化多机分布式大模型推理服务实践
本文介绍了如何利用阿里云容器服务ACK推出的ACK Gateway with Inference Extension组件,在Kubernetes环境中为多机分布式部署的LLM推理服务提供智能路由和负载均衡能力。文章以部署和优化QwQ-32B模型为例,详细展示了从环境准备到性能测试的完整实践过程。
|
2月前
|
并行计算 PyTorch 算法框架/工具
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
本文探讨了如何通过技术手段混合使用AMD与NVIDIA GPU集群以支持PyTorch分布式训练。面对CUDA与ROCm框架互操作性不足的问题,文章提出利用UCC和UCX等统一通信框架实现高效数据传输,并在异构Kubernetes集群中部署任务。通过解决轻度与强度异构环境下的挑战,如计算能力不平衡、内存容量差异及通信性能优化,文章展示了如何无需重构代码即可充分利用异构硬件资源。尽管存在RDMA验证不足、通信性能次优等局限性,但该方案为最大化GPU资源利用率、降低供应商锁定提供了可行路径。源代码已公开,供读者参考实践。
140 3
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
|
2月前
|
人工智能 运维 监控
领先AI企业经验谈:探究AI分布式推理网络架构实践
当前,AI行业正处于快速发展的关键时期。继DeepSeek大放异彩之后,又一款备受瞩目的AI智能体产品Manus横空出世。Manus具备独立思考、规划和执行复杂任务的能力,其多智能体架构能够自主调用工具。在GAIA基准测试中,Manus的性能超越了OpenAI同层次的大模型,展现出卓越的技术实力。
|
2月前
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
|
2月前
|
消息中间件 分布式计算 并行计算
Python 高级编程与实战:构建分布式系统
本文深入探讨了 Python 中的分布式系统,介绍了 ZeroMQ、Celery 和 Dask 等工具的使用方法,并通过实战项目帮助读者掌握这些技术。ZeroMQ 是高性能异步消息库,支持多种通信模式;Celery 是分布式任务队列,支持异步任务执行;Dask 是并行计算库,适用于大规模数据处理。文章结合具体代码示例,帮助读者理解如何使用这些工具构建分布式系统。
|
5月前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
1842 66
|
4月前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
194 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
4月前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
220 7
|
4月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
274 8