Mahout聚类算法学习之Canopy算法的分析与实现

简介: 3.1 Canopy算法3.1.1 Canopy算法简介     Canopy算法的主要思想是把聚类分为两个阶段:阶段一,通过使用一个简单、快捷的距离计算方法把数据分为可重叠的子集,称为“canopy”;阶段二,通过使用一个精准、严密的距离计算方法来计算出现在阶段一中同一个canopy的所有数据向量的距离。

3.1 Canopy算法
3.1.1 Canopy算法简介
    Canopy算法的主要思想是把聚类分为两个阶段:阶段一,通过使用一个简单、快捷的距离计算方法把数据分为可重叠的子集,称为“canopy”;阶段二,通过使用一个精准、严密的距离计算方法来计算出现在阶段一中同一个canopy的所有数据向量的距离。这种方式和之前的聚类方式不同的地方在于使用了两种距离计算方式,同时因为只计算了重叠部分的数据向量,所以达到了减少计算量的目的。
   具体来说,阶段一,使用一个简单距离计算方法来产生具有一定数量的可重叠的子集。canopy就是一个样本数据集的子集,子集中的样本数据是通过一个粗糙的距离计算方法来计算样本数据向量和canopy的中心向量的距离,设定一个距离阈值,当计算的距离小于这个阈值的时候,就把样本数据向量归为此canopy。这里要说明的是,每个样本数据向量有可能存在于多个canopy里面,但是每个样本数据向量至少要包含于一个canopy中。canopy的创建基于不存在于同一个canopy中的样本数据向量彼此很不相似,不能被分为同一个类的这样的观点考虑的。由于距离计算方式是粗糙的,因此不能够保证性能(计算精确度)。但是通过允许存在可叠加的canopy和设定一个较大的距离阈值,在某些情况下可以保证该算法的性能。
图3-1是一个canopy的例子,其中包含5个数据中心向量。

    图3-1中数据向量用同样灰度值表示的属于同一个聚类。聚类中心向量A被随机选出,然后以A数据向量创建一个canopy,这个canopy包括所有在其外圈(实线圈)的数据向量,而内圈(虚线)中的数据向量则不再作为中心向量的候选名单。
那么针对一个具体的canopy应该如何创建呢?下面介绍创建一个普通的canopy算法的步骤。
1)原始数据集合List按照一定的规则进行排序(这个规则是任意的,但是一旦确定就不再更改),初始距离阈值为T1、T2,且T1 > T2(T1、T2的设定可以根据用户的需要,或者使用交叉验证获得)。
2)在List中随机挑选一个数据向量A,使用一个粗糙距离计算方式计算A与List中其他样本数据向量之间的距离d。
3)根据第2步中的距离d,把d小于T1的样本数据向量划到一个canopy中,同时把d小于T2的样本数据向量从候选中心向量名单(这里可以理解为就是List)中移除。
4)重复第2、3步,直到候选中心向量名单为空,即List为空,算法结束。
图3-2为创建canopy算法的流程图。

   阶段二,可以在阶段一的基础上应用传统聚类算法,比如贪婪凝聚聚类算法、K均值聚类算法,当然,这些算法使用的距离计算方式是精准的距离计算方式。但是因为只计算了同一个canopy中的数据向量之间的距离,而没有计算不在同一个canopy的数据向量之间的距离,所以假设它们之间的距离为无穷大。例如,若所有的数据都简单归入同一个canopy,那么阶段二的聚类就会退化成传统的具有高计算量的聚类算法了。但是,如果canopy不是那么大,且它们之间的重叠不是很多,那么代价很大的距离计算就会减少,同时用于分类的大量计算也可以省去。进一步来说,如果把Canopy算法加入到传统的聚类算法中,那么算法既可以保证性能,即精确度,又可以增加计算效率,即减少计算时间。
Canopy算法的优势在于可以通过第一阶段的粗糙距离计算方法把数据划入不同的可重叠的子集中,然后只计算在同一个重叠子集中的样本数据向量来减少对于需要距离计算的样本数量。
3.1.2 Mahout中Canopy算法实现原理
在Mahout中,Canopy算法用于文本的分类。实现Canopy算法包含三个MR,即三个Job,可以描述为下面4个步骤。
1)Job1:将输入数据处理为Canopy算法可以使用的输入格式。
2)Job2:每个mapper针对自己的输入执行Canopy聚类,输出每个canopy的中心向量。
3)Job2:每个reducer接收mapper的中心向量,并加以整合以计算最后的canopy的中心向量。
4)Job3:根据Job2的中心向量来对原始数据进行分类。
其中,Job1和Job3属于基础操作,这里不再进行详细分析,而主要对Job2的数据流程加以简要分析,即只对Canopy算法的原理进行分析。
首先来看图3-3,可以根据这个图来理解Job2的map/reduce过程。

图3-3中的输入数据可以产生两个mapper和一个reducer。每个mapper处理其相应的数据,在这里处理的意思是使用Canopy算法来对所有的数据进行遍历,得到canopy。具体如下:首先随机取出一个样本向量作为一个canopy的中心向量,然后遍历样本数据向量集,若样本数据向量和随机样本向量的距离小于T1,则把该样本数据向量归入此canopy中,若距离小于T2,则把该样本数据从原始样本数据向量集中去除,直到整个样本数据向量集为空为止,输出所有的canopy的中心向量。reducer调用Reduce过程处理Map过程的输出,即整合所有Map过程产生的canopy的中心向量,生成新的canopy的中心向量,即最终的结果。

3.1.3 Mahout的Canopy算法实战
1.输入数据
下载数据,这里使用的数据同样是第2章中提到的控制图数据,包含600个样本数据,每个样本数据有60个属性列,这些数据可以分为六类。我们首先上传该文本数据到HDFS,使用如下命令:

$HADOOP_HOME/bin/hadoop fs –copyFromLocal /home/mahout/mahout_data/synthetic_control.data input/synthetic_control.data 

这里只针对Job2的任务进行实战:Job2的输入要求使用的数据是序列化的,同时要求输入数据要按照一定的格式,因此,编写代码清单3-1对原始数据进行处理。

代码清单 3-1 原始数据处理代码

package mahout.fansy.utils.transform;  
import java.io.IOException;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
import org.apache.hadoop.util.ToolRunner;  
import org.apache.mahout.common.AbstractJob;  
import org.apache.mahout.math.RandomAccessSparseVector;  
import org.apache.mahout.math.Vector;  
import org.apache.mahout.math.VectorWritable;  
/**  
??* transform text data to vectorWritable data  
??* @author fansy  
??*  
??*/  
public class Text2VectorWritable extends AbstractJob{  
    public static void main(String[] args) throws Exception{  
         ToolRunner.run(new Configuration(), new Text2VectorWritable(),args);  
    }  
    @Override  
    public int run(String[] arg0) throws Exception {  
         addInputOption();  
         addOutputOption();  
         if (parseArguments(arg0) == null) {  
              return -1;  
         }  
         Path input=getInputPath();  
         Path output=getOutputPath();  
         Configuration conf=getConf();  
         // set job information  
       ?Job job=new Job(conf,"text2vectorWritableCopy with input:"+input.getName());  
         job.setOutputFormatClass(SequenceFileOutputFormat.class);  
         job.setMapperClass(Text2VectorWritableMapper.class);  
         job.setMapOutputKeyClass(LongWritable.class);  
         job.setMapOutputValueClass(VectorWritable.class);  
         job.setReducerClass(Text2VectorWritableReducer.class);  
         job.setOutputKeyClass(LongWritable.class);  
         job.setOutputValueClass(VectorWritable.class);  
         job.setJarByClass(Text2VectorWritable.class);  
         FileInputFormat.addInputPath(job, input);  
         SequenceFileOutputFormat.setOutputPath(job, output);  
         if (!job.waitForCompletion(true)) { // wait for the job is done  
             throw new InterruptedException("Canopy Job failed processing " + input);  
             }  
         return 0;  
   }  
   /**  
    ??* Mapper :main procedure  
    ??* @author fansy  
    ??*  
    ??*/  
   public static class Text2VectorWritableMapper extends Mapper<LongWritable,Text,LongWritable,VectorWritable>{  
        public void map(LongWritable key,Text value,Context context)throws  
IOException,InterruptedException{  
            ??String[] str=value.toString().split("\\s{1,}");  
            // split data use one or more blanker  
            ???Vector vector=new RandomAccessSparseVector(str.length);  
            ???for(int i=0;i<str.length;i++){  
            ???     vector.set(i, Double.parseDouble(str[i]));  
            ???}  
            ???VectorWritable va=new VectorWritable(vector);  
            ???context.write(key, va);  
        }  
   }  
   /**  
    ??* Reducer: do nothing but output  
    ??* @author fansy  
    ??*  
    ??*/  
   public static class Text2VectorWritableReducer extends Reducer<LongWritable,  
VectorWritable,LongWritable,VectorWritable>{  
        public void reduce(LongWritable key,Iterable<VectorWritable> values,Con-text context)throws IOException,InterruptedException{  
            ???for(VectorWritable v:values){  
            ?     context.write(key, v);  
            ???}  
        }  
   }  
}

把上面的代码编译打包成ClusteringUtils.jar并放入/home/mahout/mahout_jar目录下,然后在Hadoop根目录下运行下面的命令:
 

$HADOOP_HOME/bin/hadoop jar /home/mahout/mathout_jar/ClusteringUtils.jar  
mahou·t.fansy.utils.transform.Text2VectorWritable –i input/synthetic_control.data –o  
input/transform 

命令运行成功后可以在文件监控系统查看转换后的输入数据,如图3-5所示。


由图3-5方框中的内容可以看出,数据已经被转换为VectorWritable的序列文件了。经过上面的步骤,输入数据的准备工作就完成了。

提示 在Hadoop中运行编译打包好的jar程序,可能会报下面的错误:
 

Exception in thread "main" java.lang.NoClassDefFoundError:  
org/apache/mahout/common/AbstractJob 

这时需要把Mahout根目录下的相应的jar包复制到Hadoop根目录下的lib文件夹下,同时重启Hadoop即可。

2.运行

进入Mahout的根目录下,运行下面的命令:
 

     
     
  1. $MAHOUT_HOME/bin/mahout canopy --input input/transform/part-r-00000 --output output/canopy --distanceMeasure org.apache.mahout.common.distance.EuclideanDistanceMeasure --t1 80 --t2 55 --t3 80 --t4 55 --clustering

其中输入文件使用的是转换后的序列文件;距离计算方式使用的是欧式距离;T1和T3设置为80,T2和T4设置为55;--clustering选项表示最后对原始数据进行分类。

可以看到其输出类名为ClusterWritable,编写下面的代码清单 3-2。

代码清单3-2 转换canopy聚类中心向量代码
 

     
     
  1. package mahout.fansy.utils;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapreduce.Job;  
  6. import org.apache.hadoop.mapreduce.Mapper;  
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  8. import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;  
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  10. import org.apache.hadoop.util.ToolRunner;  
  11. import org.apache.mahout.clustering.iterator.ClusterWritable;  
  12. import org.apache.mahout.common.AbstractJob;  
  13. import org.slf4j.Logger;  
  14. import org.slf4j.LoggerFactory;  
  15. /**  
  16. ??* read cluster centers  
  17. ??* @author fansy  
  18. ??*/  
  19. public class ReadClusterWritable extends AbstractJob {  
  20.      public static void main(String[] args) throws Exception{  
  21.           ToolRunner.run(new Configuration(), new ReadClusterWritable(),args);  
  22.      }  
  23.      @Override  
  24.      public int run(String[] args) throws Exception {  
  25.           addInputOption();  
  26.           addOutputOption();  
  27.           if (parseArguments(args) == null) {  
  28.                return -1;  
  29.             ?}  
  30.           Job job=new Job(getConf(),getInputPath().toString());  
  31.           job.setInputFormatClass(SequenceFileInputFormat.class);  
  32.           job.setMapperClass(RM.class);  
  33.           job.setMapOutputKeyClass(Text.class);  
  34.           job.setMapOutputValueClass(Text.class);  
  35.           job.setNumReduceTasks(0);  
  36.           job.setJarByClass(ReadClusterWritable.class);  
  37.  
  38.           FileInputFormat.addInputPath(job, getInputPath());  
  39.           FileOutputFormat.setOutputPath(job, getOutputPath());  
  40.           ???if (!job.waitForCompletion(true)) {  
  41.              throw new InterruptedException("Canopy Job failed processing " + getInputPath());  
  42.        }  
  43.      ?return 0;  
  44.    }  
  45.    public static class RM extends Mapper<Text,ClusterWritable ,Text,Text>{  
  46.         private Logger log=LoggerFactory.getLogger(RM.class);  
  47.       ???public void map(Text key,ClusterWritable value,Context context) throws  
  48. IOException,InterruptedException{  
  49.              String str=value.getValue().getCenter().asFormatString();  
  50.         //   System.out.println("center****************:"+str);  
  51.           ?log.info("center*****************************:"+str); // set log information  
  52.              context.write(key, new Text(str));  
  53.         }  
  54.    }  
  55. }

把上面的代码编译打包放入/home/mahout/mahout_jar目录下,运行下面的命令:

     
     
  1. $HADOOP_HOME/bin/hadoop jar /home/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.ReadClusterWritable -i output/canopy/clusters-0-final/part-r-00000 -o output/canopy-output


相关文章
|
4天前
|
存储 算法 安全
基于哈希表的文件共享平台 C++ 算法实现与分析
在数字化时代,文件共享平台不可或缺。本文探讨哈希表在文件共享中的应用,包括原理、优势及C++实现。哈希表通过键值对快速访问文件元数据(如文件名、大小、位置等),查找时间复杂度为O(1),显著提升查找速度和用户体验。代码示例展示了文件上传和搜索功能,实际应用中需解决哈希冲突、动态扩容和线程安全等问题,以优化性能。
|
14天前
|
缓存 算法 搜索推荐
Java中的算法优化与复杂度分析
在Java开发中,理解和优化算法的时间复杂度和空间复杂度是提升程序性能的关键。通过合理选择数据结构、避免重复计算、应用分治法等策略,可以显著提高算法效率。在实际开发中,应该根据具体需求和场景,选择合适的优化方法,从而编写出高效、可靠的代码。
25 6
|
2月前
|
并行计算 算法 测试技术
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面,旨在通过综合策略提升程序性能,满足实际需求。
67 1
|
2月前
|
存储 算法 安全
2024重生之回溯数据结构与算法系列学习之串(12)【无论是王道考研人还是IKUN都能包会的;不然别给我家鸽鸽丟脸好嘛?】
数据结构与算法系列学习之串的定义和基本操作、串的储存结构、基本操作的实现、朴素模式匹配算法、KMP算法等代码举例及图解说明;【含常见的报错问题及其对应的解决方法】你个小黑子;这都学不会;能不能不要给我家鸽鸽丢脸啊~除了会黑我家鸽鸽还会干嘛?!!!
2024重生之回溯数据结构与算法系列学习之串(12)【无论是王道考研人还是IKUN都能包会的;不然别给我家鸽鸽丟脸好嘛?】
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
【EMNLP2024】基于多轮课程学习的大语言模型蒸馏算法 TAPIR
阿里云人工智能平台 PAI 与复旦大学王鹏教授团队合作,在自然语言处理顶级会议 EMNLP 2024 上发表论文《Distilling Instruction-following Abilities of Large Language Models with Task-aware Curriculum Planning》。
|
2月前
|
算法 安全 搜索推荐
2024重生之回溯数据结构与算法系列学习(8)【无论是王道考研人还是IKUN都能包会的;不然别给我家鸽鸽丢脸好嘛?】
数据结构王道第2.3章之IKUN和I原达人之数据结构与算法系列学习x单双链表精题详解、数据结构、C++、排序算法、java、动态规划你个小黑子;这都学不会;能不能不要给我家鸽鸽丢脸啊~除了会黑我家鸽鸽还会干嘛?!!!
|
2月前
|
存储 算法 安全
2024重生之回溯数据结构与算法系列学习之顺序表【无论是王道考研人还真爱粉都能包会的;不然别给我家鸽鸽丢脸好嘛?】
顺序表的定义和基本操作之插入;删除;按值查找;按位查找等具体详解步骤以及举例说明
|
2月前
|
算法 安全 搜索推荐
2024重生之回溯数据结构与算法系列学习之单双链表精题详解(9)【无论是王道考研人还是IKUN都能包会的;不然别给我家鸽鸽丢脸好嘛?】
数据结构王道第2.3章之IKUN和I原达人之数据结构与算法系列学习x单双链表精题详解、数据结构、C++、排序算法、java、动态规划你个小黑子;这都学不会;能不能不要给我家鸽鸽丢脸啊~除了会黑我家鸽鸽还会干嘛?!!!
|
2月前
|
存储 Web App开发 算法
2024重生之回溯数据结构与算法系列学习之单双链表【无论是王道考研人还是IKUN都能包会的;不然别给我家鸽鸽丢脸好嘛?】
数据结构之单双链表按位、值查找;[前后]插入;删除指定节点;求表长、静态链表等代码及具体思路详解步骤;举例说明、注意点及常见报错问题所对应的解决方法
|
2月前
|
算法 安全 NoSQL
2024重生之回溯数据结构与算法系列学习之栈和队列精题汇总(10)【无论是王道考研人还是IKUN都能包会的;不然别给我家鸽鸽丢脸好嘛?】
数据结构王道第3章之IKUN和I原达人之数据结构与算法系列学习栈与队列精题详解、数据结构、C++、排序算法、java、动态规划你个小黑子;这都学不会;能不能不要给我家鸽鸽丢脸啊~除了会黑我家鸽鸽还会干嘛?!!!