《Mahout算法解析与案例实战》一一

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介:

本节书摘来自华章计算机《Mahout算法解析与案例实战》一书中的第3章,第3.1节,作者:樊 哲,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.1 Canopy算法

3.1.1 Canopy算法简介
在生活中,我们可以使用聚类解决很多问题,就像本章开始提到的几个例子一样。传统的聚类算法对于一般的应用问题(基本都是小数据量)都是可以解决的,但是当数据变得很大的时候,就有点“力不从心”了。这里的数据变得很大指的是:①数据的条目很多,整个数据集包含的样本数据向量很多;②针对①中的每个样本数据向量其维度很大,即包含多个属性;③要聚类的中心向量很多。当我们所要应用聚类算法的数据是上面所述情况时,传统的聚类方法应用起来就会相当棘手,这时就要采取另外的途径,即改进的聚类算法。本节介绍的Canopy算法就是聚类算法发展到一定阶段,Andrew McCallum、Kamal Nigam、Lyle H.Ungar根据前人经验加上自己的想法提出来的一种改进算法。
Canopy算法的主要思想是把聚类分为两个阶段:阶段一,通过使用一个简单、快捷的距离计算方法把数据分为可重叠的子集,称为“canopy”;阶段二,通过使用一个精准、严密的距离计算方法来计算出现在阶段一中同一个canopy的所有数据向量的距离。这种方式和之前的聚类方式不同的地方在于使用了两种距离计算方式,同时因为只计算了重叠部分的数据向量,所以达到了减少计算量的目的。
具体来说,阶段一,使用一个简单距离计算方法来产生具有一定数量的可重叠的子集。canopy就是一个样本数据集的子集,子集中的样本数据是通过一个粗糙的距离计算方法来计算样本数据向量和canopy的中心向量的距离,设定一个距离阈值,当计算的距离小于这个阈值的时候,就把样本数据向量归为此canopy。这里要说明的是,每个样本数据向量有可能存在于多个canopy里面,但是每个样本数据向量至少要包含于一个canopy中。canopy的创建基于不存在于同一个canopy中的样本数据向量彼此很不相似,不能被分为同一个类的这样的观点考虑的。由于距离计算方式是粗糙的,因此不能够保证性能(计算精确度)。但是通过允许存在可叠加的canopy和设定一个较大的距离阈值,在某些情况下可以保证该算法的性能。
图3?1是一个canopy的例子,其中包含5个数据中心向量。
image

图3?1 canopy聚类图
图3-1中数据向量用同样灰度值表示的属于同一个聚类。聚类中心向量A被随机选出,然后以A数据向量创建一个canopy,这个canopy包括所有在其外圈(实线圈)的数据向量,而内圈(虚线)中的数据向量则不再作为中心向量的候选名单。
那么针对一个具体的canopy应该如何创建呢?下面介绍创建一个普通的canopy算法的步骤。
1)原始数据集合List按照一定的规则进行排序(这个规则是任意的,但是一旦确定就不再更改),初始距离阈值为T1、T2,且T1 > T2(T1、T2的设定可以根据用户的需要,或者使用交叉验证获得)。
2)在List中随机挑选一个数据向量A,使用一个粗糙距离计算方式计算A与List中其他样本数据向量之间的距离d。
3)根据第2步中的距离d,把d小于T1的样本数据向量划到一个canopy中,同时把d小于T2的样本数据向量从候选中心向量名单(这里可以理解为就是List)中移除。
4)重复第2、3步,直到候选中心向量名单为空,即List为空,算法结束。image

图3?2为创建canopy算法的流程图。
阶段二,可以在阶段一的基础上应用传统聚类算法,比如贪婪凝聚聚类算法、K均值聚类算法,当然,这些算法使用的距离计算方式是精准的距离计算方式。但是因为只计算了同一个canopy中的数据向量之间的距离,而没有计算不在同一个canopy的数据向量之间的距离,所以假设它们之间的距离为无穷大。例如,若所有的数据都简单归入同一个canopy,那么阶段二的聚类就会退化成传统的具有高计算量的聚类算法了。但是,如果canopy不是那么大,且它们之间的重叠不是很多,那么代价很大的距离计算就会减少,同时用于分类的大量计算也可以省去。进一步来说,如果把Canopy算法加入到传统的聚类算法中,那么算法既可以保证性能,即精确度,又可以增加计算效率,即减少计算时间。
Canopy算法的优势在于可以通过第一阶段的粗糙距离计算方法把数据划入不同的可重叠的子集中,然后只计算在同一个重叠子集中的样本数据向量来减少对于需要距离计算的样本数量。
3.1.2 Mahout中Canopy算法实现原理
在Mahout中,Canopy算法用于文本的分类。实现Canopy算法包含三个MR,即三个Job,可以描述为下面4个步骤。
1)Job1:将输入数据处理为Canopy算法可以使用的输入格式。
2)Job2:每个mapper针对自己的输入执行Canopy聚类,输出每个canopy的中心向量。
3)Job2:每个reducer接收mapper的中心向量,并加以整合以计算最后的canopy的中心向量。
4)Job3:根据Job2的中心向量来对原始数据进行分类。
其中,Job1和Job3属于基础操作,这里不再进行详细分析,而主要对Job2的数据流程加以简要分析,即只对Canopy算法的原理进行分析。
首先来看图3?3,可以根据这个图来理解Job2的map/reduce过程。
image

图3?3 canopy的map/reduce过程图
图3-3中的输入数据可以产生两个mapper和一个reducer。每个mapper处理其相应的数据,在这里处理的意思是使用Canopy算法来对所有的数据进行遍历,得到canopy。具体如下:首先随机取出一个样本向量作为一个canopy的中心向量,然后遍历样本数据向量集,若样本数据向量和随机样本向量的距离小于T1,则把该样本数据向量归入此canopy中,若距离小于T2,则把该样本数据从原始样本数据向量集中去除,直到整个样本数据向量集为空为止,输出所有的canopy的中心向量。reducer调用Reduce过程处理Map过程的输出,即整合所有Map过程产生的canopy的中心向量,生成新的canopy的中心向量,即最终的结果。
3.1.3 Mahout的Canopy算法实战
1.输入数据
http://archive.ics.uci.edu/m1/databases/synthetic_control/synthetic_control.data.html下载数据,这里使用的数据同样是第2章中提到的控制图数据,包含600个样本数据,每个样本数据有60个属性列,这些数据可以分为六类。我们首先上传该文本数据到HDFS,使用如下命令:
$HADOOP_HOME/bin/hadoop fs –copyFromLocal /home/mahout/mahout_data/synthetic_control.data input/synthetic_control.data
上传后在文件系统监控界面查看此文件,如图3?4所示。
image

图3?4 synthetic_control.data文件
这里只针对Job2的任务进行实战:Job2的输入要求使用的数据是序列化的,同时要求输入数据要按照一定的格式,因此,编写代码清单3?1对原始数据进行处理。
代码清单 3?1 原始数据处理代码

package mahout.fansy.utils.transform;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
/**
 ??* transform text data to vectorWritable data
 ??* @author fansy
 ??*
 ??*/
public class Text2VectorWritable extends AbstractJob{
     public static void main(String[] args) throws Exception{
          ToolRunner.run(new Configuration(), new Text2VectorWritable(),args);
     }
     @Override
     public int run(String[] arg0) throws Exception {
          addInputOption();
          addOutputOption();
          if (parseArguments(arg0) == null) {
               return -1;
          }
          Path input=getInputPath();
          Path output=getOutputPath();
          Configuration conf=getConf();
          // set job information
        ?Job job=new Job(conf,"text2vectorWritableCopy with input:"+input.getName());
          job.setOutputFormatClass(SequenceFileOutputFormat.class);
          job.setMapperClass(Text2VectorWritableMapper.class);
          job.setMapOutputKeyClass(LongWritable.class);
          job.setMapOutputValueClass(VectorWritable.class);
          job.setReducerClass(Text2VectorWritableReducer.class);
          job.setOutputKeyClass(LongWritable.class);
          job.setOutputValueClass(VectorWritable.class);
          job.setJarByClass(Text2VectorWritable.class);
          FileInputFormat.addInputPath(job, input);
          SequenceFileOutputFormat.setOutputPath(job, output);
          if (!job.waitForCompletion(true)) { // wait for the job is done
              throw new InterruptedException("Canopy Job failed processing " + input);
              }
          return 0;
    }
    /**
     ??* Mapper :main procedure 
     ??* @author fansy
     ??*
     ??*/
    public static class Text2VectorWritableMapper extends Mapper<LongWritable,Text,LongWritable,VectorWritable>{
         public void map(LongWritable key,Text value,Context context)throws
IOException,InterruptedException{
             ??String[] str=value.toString().split("\\s{1,}");
             // split data use one or more blanker
             ???Vector vector=new RandomAccessSparseVector(str.length);
             ???for(int i=0;i<str.length;i++){
             ???     vector.set(i, Double.parseDouble(str[i]));
             ???}
             ???VectorWritable va=new VectorWritable(vector);
             ???context.write(key, va);
         }
    }
    /**
     ??* Reducer: do nothing but output
     ??* @author fansy
     ??*
     ??*/
    public static class Text2VectorWritableReducer extends Reducer<LongWritable,
VectorWritable,LongWritable,VectorWritable>{
         public void reduce(LongWritable key,Iterable<VectorWritable> values,Con-text context)throws IOException,InterruptedException{
             ???for(VectorWritable v:values){
             ?     context.write(key, v);
             ???}
         }
    }
}

把上面的代码编译打包成ClusteringUtils.jar并放入/home/mahout/mahout_jar目录下,然后在Hadoop根目录下运行下面的命令:

$HADOOP_HOME/bin/hadoop jar /home/mahout/mathout_jar/ClusteringUtils.jar
mahou·t.fansy.utils.transform.Text2VectorWritable –i input/synthetic_control.data –o
input/transform

命令运行成功后可以在文件监控系统查看转换后的输入数据,如图3?5所示。
image

图3?5 转换后的输入数据
由图3-5方框中的内容可以看出,数据已经被转换为VectorWritable的序列文件了。经过上面的步骤,输入数据的准备工作就完成了。

示 在Hadoop中运行编译打包好的jar程序,可能会报下面的错误:
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/mahout/common/AbstractJob
这时需要把Mahout根目录下的相应的jar包复制到Hadoop根目录下的lib文件夹下,同时重启Hadoop即可。

2.参数意义
Canopy算法在Mahout中的使用方式如下:

usage: canopy [Generic Options] [Job-Specific Options]

其中[]包含的两个参数是可选的,下面具体介绍一下。
Generic Options参数选项如下:
-archives :在集群中被解压的压缩文件选项,以逗号隔开每个压缩文件。
-conf :应用所需要的配置文件选项。
-D :为特定的变量赋值的选项。
-files :在集群中使用到的文件选项,以逗号隔开每个文件。
-fs :选择namenode的选项。
-jt :选择job tracker 的选项。
-libjars :需要被包含到classpath中的jar文件选项,逗号隔开每个jar文件。
-tokenCacheFile :设置符号的文件选项。
Job-Specific Options参数选项如下:
--input (-i ) input:任务的输入文件选项,必选。
--output (-o) output:任务的输出文件的选项,必选。
--distanceMeasure (-dm) distanceMeasure:距离计算的类名称,默认为Square-Euclidean,即欧氏距离平方,可选。
--t1 (-t1) t1:T1阈值,可选。
--t2 (-t2) t2:T2阈值,可选。
--t3 (-t3) t3:Reducer中用到的T1阈值,可选。
--t4 (-t4) t4:Reducer中用到的T2阈值,可选。
--clusterFilter (-cf,-clusterFilter) clusterFilter:限制Mapper中比较小的canopies产生,可选。
--overwrite (-ow):如果出现,则对输出路径进行重写,可选。
--clustering (-cl):如果出现,则对数据进行分类,可选。
--method (-xm) method:选择使用的计算方式,单机或集群,默认为集群,可选。
--outlierThreshold (-outlierThreshold) outlierThreshold:异常值阈值,可选。
--help (-h):打印此参数帮助信息,可选。
--tempDir tempDir:临时文件所存放的地方,可选。
--startPhase startPhase:开始要运行算法的阶段,可选。
--endPhase endPhase:最后要运行算法的阶段,可选。
其中T1和T2的阈值可以根据数据的特点进行设置或者使用交叉验证进行设置;T3和T4的值可以不用设置,直接使用T1和T2的值即可;最后两个选项的Phase的含义可以参考笔者博客:http://blog.csdn.net/fansy1990/article/details/8242000
3.运行
进入Mahout的根目录下,运行下面的命令:

$MAHOUT_HOME/bin/mahout canopy --input input/transform/part-r-00000 --output output/canopy --distanceMeasure org.apache.mahout.common.distance.EuclideanDistanceMeasure --t1 80 --t2 55 --t3 80 --t4 55 --clustering

其中输入文件使用的是转换后的序列文件;距离计算方式使用的是欧式距离;T1和T3设置为80,T2和T4设置为55;--clustering选项表示最后对原始数据进行分类。
4.结果分析
运行上面的命令后,可以在终端中查看到图3?6所示的输出信息(只列出了部分信息)。
image

图3?6 Canopy算法终端输出信息
在图3-6的方框中可以看到输入有600条记录,map的输出有25条记录,即map产生了25个canopy,然后reduce接收了全部25个canopy,输出6个canopy,符合原始数据的6个类。
在文件监控系统可以看到运行本次算法产生的文件,如图3?7所示。
image

图3?7 Canopy算法产生文件
由图3-7可以看出,clusteredPoints中是已经分好的类,clusters-0-final中存储的是最后的reduce输出,即6个中心向量。由于上面的文件同样是序列文件,因此要把上面的文件转换为文本,方便用户查看。
首先查看中心向量的序列文件,可以看到其类名称(要根据此类名称进行转换),如图3?8所示。
image

图3?8 中心向量序列文件
可以看到其输出类名为ClusterWritable,编写下面的代码清单 3?2。
代码清单3?2 转换canopy聚类中心向量代码

package mahout.fansy.utils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.common.AbstractJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 ??* read cluster centers
 ??* @author fansy
 ??*/
public class ReadClusterWritable extends AbstractJob {
      public static void main(String[] args) throws Exception{
           ToolRunner.run(new Configuration(), new ReadClusterWritable(),args);
      }
      @Override
      public int run(String[] args) throws Exception {
           addInputOption();
           addOutputOption();
           if (parseArguments(args) == null) {
                return -1;
             ?}
           Job job=new Job(getConf(),getInputPath().toString());
           job.setInputFormatClass(SequenceFileInputFormat.class);
           job.setMapperClass(RM.class);
           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(Text.class);
           job.setNumReduceTasks(0);
           job.setJarByClass(ReadClusterWritable.class);

           FileInputFormat.addInputPath(job, getInputPath());
           FileOutputFormat.setOutputPath(job, getOutputPath());
           ???if (!job.waitForCompletion(true)) {
              throw new InterruptedException("Canopy Job failed processing " + getInputPath());
        }
      ?return 0;
    }
    public static class RM extends Mapper<Text,ClusterWritable ,Text,Text>{
         private Logger log=LoggerFactory.getLogger(RM.class);
       ???public void map(Text key,ClusterWritable value,Context context) throws
IOException,InterruptedException{
              String str=value.getValue().getCenter().asFormatString();
         //   System.out.println("center****************:"+str);
           ?log.info("center*****************************:"+str); // set log information
              context.write(key, new Text(str));
         }
    }
}

把上面的代码编译打包放入/home/mahout/mahout_jar目录下,运行下面的命令:

$HADOOP_HOME/bin/hadoop jar /home/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.ReadClusterWritable -i output/canopy/clusters-0-final/part-r-00000 -o output/canopy-output

在文件监控系统可以查看到转换后的聚类中心点,如图3?9所示。
image

图3?9 canopy聚类中心转换后的结果
3.1.4 Canopy算法小结
本节通过分析Canopy算法的应用背景、算法原理以及在Mahout中的实现、实战,可以让读者对Canopy算法有一个更加深入的了解,同时对在Mahout中如何使用该算法来解决实际问题提供了参考。本节所给出的两个代码清单:代码清单3-1和代码清单 3?2,具有一定的通用性,涉及文件在序列文件和文本文件之间的转换,在本章的其他小节还会用到。

相关文章
|
新零售 分布式计算 算法
mahout运行测试与数据挖掘算法之聚类分析(一)kmeans算法解析
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/44984327 在...
1148 0