【大数据计算】(三) MapReduce的安装和基础编程

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 目录1.词频统计任务要求1.1 MapReduce程序编写方法1.1.1 编写Map处理逻辑1.1.2 编写Reduce处理逻辑1.1.3 编写main方法2 完整的词频统计程序3. 编译打包程序3.1 使用命令行编译打包词频统计程序3.2 使用IDEA编译打包词频统计程序4. 运行程序5. 编程题5.1 根据附件的数据文件flow_data.dat , 编程完成下面需求:5.2 附加题(选做)6. 福利送书最后

目录

1.词频统计任务要求

1.1 MapReduce程序编写方法

1.1.1 编写Map处理逻辑

1.1.2 编写Reduce处理逻辑

1.1.3 编写main方法

2 完整的词频统计程序

3. 编译打包程序

3.1 使用命令行编译打包词频统计程序

3.2 使用IDEA编译打包词频统计程序

4. 运行程序

5. 编程题

5.1 根据附件的数据文件flow_data.dat , 编程完成下面需求:

5.2 附加题(选做)

6. 福利送书

最后

1.词频统计任务要求

本地编辑txt文件

image.png

填入文字

image.png

编写文档入其中

image.png


1.1 MapReduce程序编写方法

导入包


mapreduce下所有jar

image.png


common下所有jar

image.png


common/lib下所有jar

image.png


yarn下所有包


image.png


hdfs下所有包

image.png

在hdfs上创建input,output文件夹。

image.png

查看是否创建

image.png

上传刚刚新建立的wordfile1.txt和wordfile2.txt到hdfs文件中!

image.png


查看是否上传成功!

image.png


1.1.1 编写Map处理逻辑

在Map阶段,文件wordfile1.txt和文件wordfile2.txt中的文本数据被读入,以<key , value>的形式提交给Map函数进行处理,其中key是当前读取到的行的地址偏移量,value是当前读取到的行的内容。<key , value>提交给Map函数以后,就可以运行自定义的Map处理逻辑,对value进行处理,然后以特定的键值对的形式进行输出,这个输出将作为中间结果,继续提供给Reduce阶段作为输入数据。


public static class TokenizerMapper extends Mapper<Object, Text, Text,
            IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public TokenizerMapper() {
        }
        public void map(Object key, Text value, Mapper<Object, Text, Text,
                IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
        }
    }

1.1.2 编写Reduce处理逻辑

Map阶段得到的中间结果,经过Shuffle阶段(分区、排序、合并)以后,分发给对应的Reduce任务去处理。对于该阶段而言,输入是<key , value-list>形式,例如,<’Hadoop’, <1,1>>。Reduce函数就是对输入中的value-list进行求和,得到词频统计结果。

    public static class IntSumReducer extends Reducer<Text, IntWritable,
            Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public IntSumReducer() {
        }
        public void reduce(Text key, Iterable<IntWritable> values,
                           Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws
                IOException, InterruptedException {
            int sum = 0;
            IntWritable val;
            for (Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable) i$.next();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }

 

1.1.3 编写main方法

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();// 加载hadoop配置
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[]{"/input/wordfile1.txt","/input/wordfile2.txt","/output/output"};
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }


image.png


2 完整的词频统计程序

3. 编译打包程序

3.1 使用命令行编译打包词频统计程序

image.png

image.png

image.png


3.2 使用IDEA编译打包词频统计程序

image.png


image.png


4. 运行程序

如果要再次运行WordCount.jar,需要先删除HDFS中的output目录,否则会报错。

image.png



5. 编程题

5.1 根据附件的数据文件flow_data.dat , 编程完成下面需求:

统计每个手机号的上行流量总和,下行流量总和,上行总流量之和,下行总流量之和

( Hint:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入。)


定义一个结构

public static class FlowBean implements Writable {
        private long upflow;
        private long downflow;
        private long sumflow;
        public FlowBean() {
        }
        public long getUpflow() {
            return upflow;
        }
        public void setUpflow(long upflow) {
            this.upflow = upflow;
        }
        public long getDownflow() {
            return downflow;
        }
        public void setDownflow(long downflow) {
            this.downflow = downflow;
        }
        public long getSumflow() {
            return sumflow;
        }
        public void setSumflow(long sumflow) {
            this.sumflow = sumflow;
        }
        public FlowBean(long upflow, long downflow) {
            this.upflow = upflow;
            this.downflow = downflow;
            this.sumflow = upflow + downflow;
        }
        public void write(DataOutput output) throws IOException {
            output.writeLong(this.upflow);
            output.writeLong(this.downflow);
            output.writeLong(this.sumflow);
        }
        public void readFields(DataInput Input) throws IOException {
            this.upflow = Input.readLong();
            this.downflow = Input.readLong();
            this.sumflow = Input.readLong();
        }
        @Override
        public String toString() {
            return this.upflow + "\t" + this.downflow + "\t" + this.sumflow;
        }
    }


主函数

import java.io.IOException;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
public class PhoneCount {
    public PhoneCount() {
    }
    public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();// 加载hadoop配置
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[]{"/phone/input/flow_data.dat","/phone/output4"};
        Job job = Job.getInstance(conf);
        job.setJarByClass(PhoneCount.class);
        job.setMapperClass(MapWritable.class);
        job.setReducerClass(ReduceWritable.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        job.waitForCompletion(true);
    }


map操作

public static class MapWritable extends Mapper<LongWritable, Text,Text,FlowBean> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            System.out.println("line");
            System.out.println(line);
            String[] fields = line.split("\t");
            if (Objects.equals(fields[fields.length - 3], "上行总和")){
            } else {
                long upflow = Long.parseLong(fields[fields.length - 3]);
                long downflow = Long.parseLong(fields[fields.length - 2]);
                context.write(new Text(fields[1]), new FlowBean(upflow, downflow));
            }
        }
    }


reduce 操作

public static class ReduceWritable extends Reducer<Text,FlowBean,Text,FlowBean> {
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            //定义两个计数器,计算每个用户的上传流量、下载流量
            long sumupflow = 0;
            long sumdownflow = 0;
            //累加的号的流量和
            for (FlowBean f: values) {
                sumupflow+=f.getUpflow();
                sumdownflow+=f.getDownflow();
            }
            //输出
            context.write(key,new FlowBean(sumupflow,sumdownflow));
        }
    }

 


5.2 附加题(选做)

MapReduce 实现最短路径算法,最优路径算法是路径图中满足通路上所有顶点(除起点、终点外)各异,所有边也各异的通路。

应用在公路运输中,可以提供起点和终点之间的最短路径,节省运输成本。可以大大提高交通运输效率。image.png

给定下述路径图


请编程计算出A到C之间的最短路径大小


6. 福利送书

image.png


【内容简介】


《亿级流量Java高并发与网络编程实战》 系统全面的介绍了开发人员必学的知识,如JVM、网络编程、NIO等知识,让开发人员系统地掌握JAVA高并发与网络编程知识。


《亿级流量Java高并发与网络编程实战》分为10章,内容如下。


第1章,主要讲高并发相关JVM原理解析

第2章,主要讲 Java 网络编程

第3章,主要讲 Java NIO

第4章,主要讲并发框架Disruptor

第5章,主要讲微服务构建框架Spring Boot

第6章,主要讲微服务治理框架Spring Cloud/Dubbo

第7章,主要讲 Java高并发网络编程框架Netty - 实战应用

第8章,主要讲 Java高并发网络编程框架Netty - 深度解读

第9章,主要讲海量数据的高并发处理

第10章,主要讲基于高并发与网络编程的大型互联网项目实战。

本书主要面向面向零基础及入门级读者,Java从业人员。


【评论区】和 【点赞区】 会抽一位粉丝送出这本书籍嗷~


当然如果没有中奖的话,可以到当当,京东北京大学出版社的自营店进行购买。


也可以关注我!每周都会送一本出去哒~


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
45 4
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
7天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
33 2
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
49 4
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
45 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
2月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
53 0
|
2月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
61 0
|
2月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
64 0
|
4月前
|
分布式计算 大数据 Hadoop
揭秘MapReduce背后的魔法:从基础类型到高级格式,带你深入理解这一大数据处理利器的奥秘与实战技巧,让你从此不再是编程门外汉!
【8月更文挑战第17天】MapReduce作为分布式计算模型,是大数据处理的基石。它通过Map和Reduce函数处理大规模数据集,简化编程模型,使开发者聚焦业务逻辑。MapReduce分单阶段和多阶段,支持多种输入输出格式如`TextInputFormat`和`SequenceFileInputFormat`。例如,简单的单词计数程序利用`TextInputFormat`读取文本行并计数;而`SequenceFileInputFormat`适用于高效处理二进制序列文件。合理选择类型和格式可有效解决大数据问题。
78 1
|
4月前
|
分布式计算 大数据 Hadoop
MapReduce:大数据处理的基石
【8月更文挑战第31天】
170 0