《Hadoop与大数据挖掘》一2.6.3 Hadoop TF-IDF编程实现

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

本节书摘来华章计算机《Hadoop与大数据挖掘》一书中的第2章 ,第2.6.3节,张良均 樊 哲 位文超 刘名军 许国杰 周 龙 焦正升 著 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.6.3 Hadoop TF-IDF编程实现

这里给出的TF-IDF算法的测试数据使用的是Avro格式的。这里只对Avro进行简单介绍,如读者需要深入了解,可以上网查找相关资料。

  1. Avro简介
    Avro是一个数据序列化的系统,它可以将数据结构或对象转化成便于存储或传输的格式。Avro设计之初就用来支持数据密集型应用,适合于远程或本地大规模数据的存储和交换。

Avro依赖于模式(Schema)。通过模式定义各种数据结构,只有确定了模式才能对数据进行解释,所以在数据的序列化和反序列化之前,必须先确定模式的结构。

image


Schema通过JSON对象表示。Schema定义了简单数据类型和复杂数据类型,其中复杂数据类型包含不同属性。通过各种数据类型用户可以自定义丰富的数据结构。
Avro定义了几种简单数据类型,表2-10是对其简单说明。

image
image

Avro定义了6种复杂数据类型,分别是record、enum、array、map、union和fixed,每一种复杂数据类型都具有独特的属性。表2-11就record这一种复杂数据类型进行了简要说明(后面也只会用到这种数据类型)。

image


(1)动手实践:Java基于Avro的序列化和反序列化
简单来说,Avro就是提供一个数据文件的说明文档,然后可以直接根据该说明文档进行序列化和反序列化的一个框架而已。
举个例子,比如现在有一个数据描述文件,如代码清单2-46所示。

代码清单2-46 Avro描述文件
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

有定义一个Java类和该描述文件匹配,如代码清单2-47所示。

代码清单2-47 Avro描述文件对应Java实体类
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// favorite color不设置

// 直接使用构造函数
User user2 = new User("Ben", 7, "red");

// 使用builder进行构造
User user3 = User.newBuilder()
             .setName("Charlie")
             .setFavoriteColor("blue")
             .setFavoriteNumber(null)
             .build();

代码清单2-46中的name:User或者name:name、name:favorite_number等,不需要与代码清单2-47中的名字User类或者方法setName、setFavoriteColor名字一模一样,只需一一对应即可。
那么怎么进行序列化呢?参考代码清单2-48,即可把用户user1、user2、user3序列化到本地磁盘的users.avro文件。

代码清单2-48 序列化User
// 序列化user1、user2 and user3 到硬盘
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

如何进行反序列化呢?参考代码清单2-49,即可把序列化后的users.avro文件内容读取出来了,并且代码清单2-49中的代码还把文件内容也打印出来了。

代码清单2-49 反序列化User
//从磁盘进行反序列化
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(file, user-DatumReader);
User user = null;
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}

参考上面的示例,进行下面的实验。
实验步骤如下:
1)新建Java工程,引入avro-1.7.4.jar、avro-tools-1.7.4.jar(非必需)、jackson-core-asl-1.9.13.jar、jackson-mapper-asl-1.9.13.jar、junit-4.11.jar、hamcrest-core-1.3.jar。
2)参考代码清单2-46、代码清单2-47、代码清单2-48、代码清单2-49,缩写对应程序实现,运行程序查看结果。
(2)动手实践:Hadoop基于Avro的反序列化
这里增加一点Hadoop Job Counter的知识,Hadoop Job Counter可以在Hadoop Map-Reduce程序运行的过程中定义全局计数器,对一些必要的参数进行统计,通过doc api查看该用法,如图2-54所示。


image


在Java代码中遍历所有Hadoop MapReduce Counter,可参考代码清单2-50。

代码清单2-50 Java代码获取Hadoop MapReduce Counter
Counters counter = job.getCounters();
    Iterator<CounterGroup> icg= counter.iterator();
    while(icg.hasNext()){
        System.out.println(icg.next());
        CounterGroup counterGroup = icg.next();
        System.out.println(counterGroup.getName());
        Iterator<org.apache.hadoop.mapreduce.Counter>counters = counterGroup.iterator();
        while(counters.hasNext()){
        Counter c =  counters.next();
        System.out.println(c.getName()+","+c.getValue());
        }
    }

实验步骤如下:
1)拷贝avro-mapred-1.7.4-hadoop2.jar到Hadoop集群lib目录,上传hadoop/data/mann.avro数据到HDFS。
2)设置读取Avro文件的FileInputFormat为AvroKeyInputFormat。
3)参考示例程序2.5_004_avro_mr,读懂程序代码,运行程序,查看结果。

  1. Job1:统计单个文件某个单词个数
    针对2.6.2节分析的Hadoop MapReduce实现TF-IDF的流程中的Job1,分析如下。

驱动程序Driver:只需要设置Mapper以及Reducer,需要注意这里的输入需要使用AvroKeyInputFormat,这里考虑到编程方便以及效率,输出使用SequenceFileOutput-Format,如代码清单2-51所示。

代码清单2-51 TF-IDF Job1 Driver类示例
// Job1 计算每个文件中单词个数
    Job job1 = Job.getInstance(getConf(), "Word Count per document");
    job1.setJarByClass(getClass());
    Configuration conf1 = job1.getConfiguration();
    FileInputFormat.setInputPaths(job1, in);
    out.getFileSystem(conf1).delete(out, true);
    FileOutputFormat.setOutputPath(job1, out);

    job1.setMapperClass(WordCountPerDocumentMapper.class);
    job1.setReducerClass(IntSumReducer.class);

    job1.setInputFormatClass(AvroKeyInputFormat.class);
    job1.setOutputFormatClass(SequenceFileOutputFormat.class);

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    int ret = job1.waitForCompletion(true) ? 0 : -1;

Mapper要做的工作只是读取Avro数据,然后针对数据分隔各个单词(注意这里有些单词是不需要进行统计的,可以直接忽略)。Mapper的功能描述如下:
1)读取Avro格式数据,获取文件名和文件内容(类似Java单机程序),如代码清单2-52所示。

代码清单2-52 读取Avro数据示例
@Override
protected void map(AvroKey<GenericRecord> key, NullWritable value,
        Mapper<AvroKey<GenericRecord>, NullWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
    String name = key.datum().get(Utils.FIELD_FILENAME).toString();
    ByteBuffer contentsByte = (ByteBuffer) key.datum().get(Utils.FIELD_CONTENTS);
    String contents = new String(contentsByte.array());
…
}

2)分隔文件的内容,这里需要注意不用统计的单词,具体单词如代码清单2-53所示。

代码清单2-53 需要忽略的单词
private static Set<String> STOPWORDS;
static {
    STOPWORDS = new HashSet<String>() {
        {
        add("I");
        add("a");
        add("about");
        add("an");
        add("are");
        add("as");
        add("at");
        add("be");
        add("by");
        add("com");
        add("de");
        add("en");
        add("for");
        add("from");
        add("how");
        add("in");
        add("is");
        add("it");
        add("la");
        add("of");
        add("on");
        add("or");
        add("that");
        add("the");
        add("this");
        add("to");
        add("was");
        add("what");
        add("when");
        add("where");
        add("who");
        add("will");
        add("with");
        add("and");
        add("the");
        add("www");
        }
    };

分隔采用Match类正则进行分隔,如代码清单2-54所示。

代码清单2-54 Match类分隔文本内容到单词
//定义Pattern
private static final Pattern WORD_PATTERN = Pattern.compile("\\w+");
// map函数
while (matcher.find()) {
        StringBuilder valueBuilder = new StringBuilder();
        String matchedKey = matcher.group().toLowerCase();
        if (!Character.isLetter(matchedKey.charAt(0)) ||
 Character.isDigit(matchedKey.charAt(0))
                || STOPWORDS.contains(matchedKey) ||
 matchedKey.contains(UNDERSCORE)) {
            continue;
        }
        …
    }

3)只须输出单词、文件名和计数1即可,如代码清单2-55所示。

代码清单2-55 TF-IDF Job1 Mapper类输出示例
valueBuilder.append(matchedKey);
        valueBuilder.append(SEPARATOR);
        valueBuilder.append(name);
        fileWord.set(valueBuilder.toString());
            // <key,value> -> <word|file , 1>
        context.write(fileWord, one);

Reducer类直接采用Hadoop内部类IntSumReducer即可,即把相同的key的所有value值全部加起来,其输入输出描述如表2-12所示。

表2-12 TF-IDF Job1 Reducer输入输出描述
// Reducer
// in: <key,value> -> <word|file, [1,1,1,1,…]>
// out: <key,value> -> <word|file, 1+1+…+1>
  1. Job2:统计某个文件所有单词个数
    Job2的Driver驱动程序是统计某个文件的所有单词个数,输入是Job1的输出,所以输入格式为SequenceFileInputFormat,输出格式也设成SequenceFileOutputFormat,方便Job3的读取,其设置参考代码清单2-56。
代码清单2-56 Job2 Driver驱动类示例代码
Job job2 = Job.getInstance(getConf(), "DocumentWordCount");
    job2.setJarByClass(getClass());
    Configuration conf2 = job2.getConfiguration();
    FileInputFormat.setInputPaths(job2, in);

    out.getFileSystem(conf2).delete(out, true);
    FileOutputFormat.setOutputPath(job2, out);

    job2.setMapperClass(DocumentWordCountMapper.class);
    job2.setReducerClass(DocumentWordCountReducer.class);

    job2.setInputFormatClass(SequenceFileInputFormat.class);
    job2.setOutputFormatClass(SequenceFileOutputFormat.class);

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(Text.class);

    ret = job2.waitForCompletion(true) ? 0 : -1;

Mapper类只需把Job1的输出的键值对进行重构即可,这里即可以利用MapReduce按照key进行分组的特性,输出<文件名,文件中的单词|文件中单词的个数>这样的键值对,如代码清单2-57所示。

代码清单2-57 Job2 Mapper map函数示例代码
public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
    int wordAndDocCounter = value.get();
    // wordAndDoc = word|filename
    String[] wordAndDoc = StringUtils.split(key.toString(), SEPARATOR);
    outKey.set(wordAndDoc[1]);
    outValue.set(wordAndDoc[0] + SEPARATOR + wordAndDocCounter);
    
    // <key,value> -> <filename, word| wordCount>
    context.write(outKey, outValue);
  }

在Reucer中利用分组的特性(每个键值对按照键进行分组,所以会得到每个文件的所有单词作为一个列表),统计每个文件的所有单词个数,如代码清单2-58所示。

代码清单2-58 Job2 Reducer reduce函数示例代码
//  <filename, [word| wordCount, word|wordCount, ...]>  
  protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    int sumOfWordsInDocument = 0;
    Map<String, Integer> tempCounter = new HashMap<String, Integer>();
    for (Text val : values) {
    // wordCounter = word| wordCount
      String[] wordCounter = StringUtils.split(val.toString(), SEPARATOR);
      tempCounter.put(wordCounter[0], Integer.valueOf(wordCounter[1]));
      sumOfWordsInDocument += Integer.parseInt(wordCounter[1]);
    }
    for (String wordKey : tempCounter.keySet()) {
      outKey.set(wordKey + SEPARATOR + key.toString());
      outValue.set(tempCounter.get(wordKey) + SEPARATOR + sumOfWordsInDocument);
      // <key,value> -> <word|filename , wordCount|sumOfWordsInDoc>
      context.write(outKey, outValue);
    }
  }
  1. Job3:计算单个文件某个单词的TF-IDF
    Job3综合前面两个的输出结构,得到最终每个文件每个单词的TF-IDF值。Driver需要配置输入输出以及格式,这里注意需要把Job1统计的总文件个数传入Job3中,这里为了便于观察,输出格式使用默认值TextFileOutputFormat,其示例代码如代码清单2-59所示。
代码清单2-59 Job3 Driver驱动类示例代码
    Job job3 = Job.getInstance(getConf(), "DocumentCountAndTfIdf");
    job3.setJarByClass(getClass());
    Configuration conf3 = job3.getConfiguration();
    FileInputFormat.setInputPaths(job3, in);
    out.getFileSystem(conf3).delete(out, true);
    FileOutputFormat.setOutputPath(job3, out);

    conf3.setInt("totalDocs", (int) totalDocs);

    job3.setMapperClass(TermDocumentCountMapper.class);
    job3.setReducerClass(TfIdfReducer.class);
    job3.setInputFormatClass(SequenceFileInputFormat.class);
    job3.setOutputFormatClass(SequenceFileOutputFormat.class);
    job3.setOutputKeyClass(Text.class);
    job3.setOutputValueClass(Text.class);

    ret = job3.waitForCompletion(true) ? 0 : -1;

Mapper类根据Job2的输入进行重构,再次使用word作为key,使用filename、word-Count、sumOfWordsInDoc作为value,如代码清单2-60所示。

代码清单2-60 Job3 Mapper类map函数示例代码
// <key,value> -> <word|filename , wordCount|sumOfWordsInDoc>
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
  // worddAndDoc = word|filename
    String[] wordAndDoc = StringUtils.split(key.toString(), SEPARATOR);
    outKey.set(wordAndDoc[0]);
    outValue.set(wordAndDoc[1] + DOC_SEPARATOR + value.toString());
    // <key,value> -> <word,filename=wordCount|sumOfWordsInDoc>
    context.write(outKey, outValue);
  }

Reducer根据Mapper的输出,同时利用相同的key聚合的特性,即可统计出每个单词在多少个文件中存在;在所有需要的参数计算完成后,即可利用TF-IDF的公式进行最后的计算,如代码清单2-61所示。

代码清单2-61 Job3 Reducer类reduce函数示例代码
//  <key,value> -> <word, [filename=wordCount|sumOfWordsInDoc,
//                 filename=wordCount|sumOfWordsInDoc,...]>
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    int totalDocs = context.getConfiguration().getInt("totalDocs", 0);

    int totalDocsForWord = 0;
    Map<String, String> tempFrequencies = new HashMap<String, String>();
    for (Text value : values) {
      // documentAndFrequencies = filename, wordCount|sumOfWordsInDoc
      String[] documentAndFrequencies = StringUtils.split(value.toString(), DOC_SEPARATOR);
      totalDocsForWord++;// the number of files which contains word 
      // tempFrequencies = (filename,wordCount|sumOfWordsInDoc)
      tempFrequencies.put(documentAndFrequencies[0], documentAndFrequencies[1]);
    }
    for (String document : tempFrequencies.keySet()) {
      // wordFrequencyAndTotalWords = wordCount,sumOfWordsInDoc
String[] wordFrequencyAndTotalWords = StringUtils.split(tempFrequencies.get(document), SEPARATOR);

      // TF = wordCount / sumOfWordsInDoc
double tf = Double.valueOf(wordFrequencyAndTotalWords[0]) / Double.valueOf(wordFrequencyAndTotalWords[1]);

      // IDF 
      double idf = (double) totalDocs / totalDocsForWord;

      double tfIdf = tf * Math.log10(idf);

      outKey.set(key + SEPARATOR + document);
      outValue.set(DF.format(tfIdf));
      // <key,value> -> <word|filename , tfIdf>
      context.write(outKey, outValue);
    }
  }

(1)动手实践:Hadoop实现TF-IDF算法
理解上面Hadoop MapReduce框架实现TF-IDF算法的原理,结合部分示例代码,完成该动手实践。
实验步骤如下:
1)参考“动手实践:Hadoop基于Avro的反序列化”内容,建立程序开发环境(主要是Avro相关开发包);
2)参考工程2.5_005_tf-idf示例代码,结合前面的分析,理解代码功能;
3)修复工程功能(TODO提示),运行程序;
4)查看输出,对结果进行解释。
(2)思考
请读者思考,针对Hadoop MapReduce实现TF-IDF算法是否还有优化的空间?如果有优化的空间,怎么做呢?可以考虑下面几点:
1)是否可以缩减Job的个数?(提示:输出多目录、自定义键值对)
2)如果使用自定义键值对技术,应该如何修改程序?

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
192 6
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
45 4
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
6天前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
33 4
|
7天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
32 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
115 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
84 1
|
2月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
81 1
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
49 4
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
45 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化