【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )

简介: 【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )

需要全部代码请点赞关注收藏后评论区留言私信~~~

下面通过WordCount,WordMean等几个例子讲解MapReduce的实际应用,编程环境都是以Hadoop MapReduce为基础

一、WordCount

WordCount用于计算文件中每个单词出现的次数,非常适合采用MapReduce进行处理,处理单词计数问题的思路很简单,在 Map阶段处理每个文本split中的数据,产生<word,1> 这样的键-值对,在Reduce阶段对相同的关键字求和,最后生成所有的单词计数 。

运行示意图如下

运行结果如下

二、WordMean

对上面例子的代码稍作修改,改成计算所有文件中单词的平均长度,单词长度的定义是单词的字符个数,现在HDFS集群中有大量的文件,需要统计所有文件中所出现单词的平均长度。

三、Grep

还是进行大规模文本中单词的相关操作,现在希望提供类似Linux系统中的Grep命令的功能,找出匹配目标串的所有文件,并统计出每个文件中出现目标字符串的个数。

在 Map阶段根据提供的文件split信息、给定的每个字符串输出 <filename,1> 这样 的键-值对信息

在 Reduce阶段根据filename对 Map阶段产生的结果进行合并

运行效果如下

四、代码

部分代码如下 全部代码请点赞关注收藏后评论区留言私信~

package alibook.odps;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class wordcount {
  public static class TokenizerMapper extends MapperBase {
    private Record word;
    private Record one;
    @Override
    public void setup(TaskContext context) throws IOException {
      word = context.createMapOutputKeyRecord();
      one = context.createMapOutputValueRecord();
      one.set(new Object[] { 1L });
      System.out.println("TaskID:" + context.getTaskID().toString());
    }
    @Override
    public void map(long recordNum, Record record, TaskContext context)
        throws IOException {
      for (int i = 0; i < record.getColumnCount(); i++) {
        word.set(new Object[] { record.get(i).toString() });
        context.write(word, one);
      }
    }
  }
  /**
   * A combiner class that combines map output by sum them.
   **/
  public static class SumCombiner extends ReducerBase {
    private Record count;
    @Override
    public void setup(TaskContext context) throws IOException {
      count = context.createMapOutputValueRecord();
    }
    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context)
        throws IOException {
      long c = 0;
      while (values.hasNext()) {
        Record val = values.next();
        c += (Long) val.get(0);
      }
      count.set(0, c);
      context.write(key, count);
    }
  }
  /**
   * A reducer class that just emits the sum of the input values.
   **/
  public static class SumReducer extends ReducerBase {
    private Record result = null;
    @Override
    public void setup(TaskContext context) throws IOException {
      result = context.createOutputRecord();
    }
    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context)
        throws IOException {
      long count = 0;
      while (values.hasNext()) {
        Record val = values.next();
        count += (Long) val.get(0);
      }
      result.set(0, key.get(0));
      result.set(1, count);
      context.write(result);
    }
  }
  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: WordCount <in_table> <out_table>");
      System.exit(2);
    }
    JobConf job = new JobConf();
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(SumCombiner.class);
    job.setReducerClass(SumReducer.class);
    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
    InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
    OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
    JobClient.runJob(job);
  }
}

pom.xml文件代码如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>alibook</groupId>
  <artifactId>odps</artifactId>
  <version>0.0.1</version>
  <packaging>jar</packaging>
  <name>odps</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.aliyun.odps</groupId>
      <artifactId>odps-sdk-core</artifactId>
      <version>0.23.3-public</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.odps</groupId>
      <artifactId>odps-sdk-commons</artifactId>
      <version>0.23.3-public</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.odps</groupId>
      <artifactId>odps-sdk-mapred</artifactId>
      <version>0.23.3-public</version>
    </dependency>
  </dependencies>
</project>

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
10月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
546 79
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
742 4
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
603 2
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
511 1
|
8月前
|
人工智能 运维 安全
中企出海大会|打造全球化云计算一张网,云网络助力中企出海和AI创新
阿里云网络作为全球化战略的重要组成部分,致力于打造具备AI技术服务能力和全球竞争力的云计算网络。通过高质量互联网服务、全球化网络覆盖等措施,支持企业高效出海。过去一年,阿里云持续加大基础设施投入,优化海外EIP、GA产品,强化金融科技与AI场景支持。例如,携程、美的等企业借助阿里云实现业务全球化;同时,阿里云网络在弹性、安全及性能方面不断升级,推动中企迎接AI浪潮并服务全球用户。
1244 8
|
存储 安全 网络安全
云计算与网络安全的深度探讨###
【10月更文挑战第21天】 云计算作为信息技术领域的重要组成部分,正在迅速改变我们的工作方式和生活模式。然而,随着云服务的普及,网络安全问题也日益凸显。本文将详细探讨云计算的基本概念、服务模型及其对网络安全的影响,并深入分析数据保护、身份与访问管理、应用程序安全等关键技术领域的最新进展。通过实际案例和技术手段,展示如何在云计算环境下实现全面的安全防护。最后,对未来网络安全的发展进行展望,提供一些启示和建议。 ###
281 5
|
存储 安全 网络安全
云计算与网络安全:技术融合的双刃剑
在数字化浪潮中,云计算如同一股不可阻挡的力量,推动着企业和个人用户步入一个高效、便捷的新时代。然而,随之而来的网络安全问题也如影随形,成为制约云计算发展的阿喀琉斯之踵。本文将探讨云计算服务中的网络安全挑战,揭示信息保护的重要性,并提供实用的安全策略,旨在为读者呈现一场技术与安全的较量,同时指出如何在享受云服务带来的便利的同时,确保数据的安全和隐私。
261 6
|
存储 安全 网络安全
云计算与网络安全:技术融合与安全挑战
随着云计算技术的飞速发展,其在各行各业的应用日益广泛。然而,随之而来的网络安全问题也日益凸显,成为制约云计算发展的重要因素。本文将从云服务、网络安全、信息安全等方面探讨云计算与网络安全的关系,分析云计算环境下的网络安全挑战,并提出相应的解决方案。
|
存储 人工智能 安全
云计算与网络安全:技术融合与挑战
在数字化时代的浪潮中,云计算和网络安全已成为推动社会进步的两大关键技术。本文将探讨云计算服务的发展,网络安全的重要性,以及信息安全技术的演进。我们将通过实例分析,揭示云服务如何增强数据保护,网络安全措施如何应对新兴威胁,以及信息安全技术的创新如何为企业带来竞争优势。文章旨在为读者提供对云计算和网络安全领域的深入理解,并展示它们如何共同塑造我们的未来。
|
存储 安全 网络安全
云计算与网络安全:探索云服务的安全挑战与策略
在数字化的浪潮下,云计算成为企业转型的重要推手。然而,随着云服务的普及,网络安全问题也日益凸显。本文将深入探讨云计算环境下的安全挑战,并提出相应的防护策略,旨在为企业构建安全的云环境提供指导。