《Hadoop与大数据挖掘》——2.5 K-Means算法原理及Hadoop MapReduce实现

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

本节书摘来自华章计算机《Hadoop与大数据挖掘》一书中的第2章,第2.5节,作者 张良均 樊哲 位文超 刘名军 许国杰 周龙 焦正升,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.5 K-Means算法原理及Hadoop MapReduce实现

2.5.1 K-Means算法原理

K-Means算法是硬聚类算法,是典型的基于原型的目标函数聚类方法的代表。它是将数据点到原型的某种距离作为优化的目标函数,利用函数求极值的方法得到迭代运算的调整规则(如图2-45所示)。K-Means算法以欧氏距离作为相似度测度,求对应某一初始聚类中心向量V最优分类,使得评价指标最小。算法采用误差平方和准则函数作为聚类准则函数。

image

具体的算法步骤如下:

image
image

2.5.2 动手实践:K-Means算法实现

编写单机版的K-Means算法有利于理解Hadoop实现的K-Means算法,所以这里给出单机版(Java)的编写步骤,供读者参考。

实验步骤如下:

1)打开Eclipse,新建Java工程kmeans1.0;
2)参考前面的流程完善K-means代码;
3)使用测试数据hadoop/data/kmeans.data进行测试,查看结果;
4)思考把该算法转换为Hadoop MapReduce实现的思路。

2.5.3 Hadoop K-Means算法实现思路

针对K-Means算法,本节给出两种实现思路。思路1相对比较直观,但是效率较低;思路2在实现上需要自定义键值类型,但是效率较高。下面是对两种思路的介绍。

思路1

如图2-47所示,算法描述如下:

1)根据原始文件生成随机聚类中心向量(需指定聚类中心向量个数k),指定循环次数;
2)在map阶段,setup函数读取并初始化聚类中心向量;在map函数中读取每个记录,计算当前记录到各个聚类中心向量的距离,根据到聚类中心向量最小的聚类中心id判断该记录属于哪个类别,输出所属聚类中心id和当前记录;
3)在reduce阶段,reduce函数接收相同聚类中心id的数据;把这些数据的每列进行求和,并记录每列的个数;计算新的聚类中心向量(每列的和除以每列的个数),然后输出聚类中心id和新的聚类中心向量;
4)判断前后两次聚类中心向量之间的误差是否小于某阈值;如果小于,则跳转到步骤5),否则跳转到步骤2);
5)针对最后一次生成的聚类中心向量对原始数据进行分类,得到每个记录的类别。

其MR数据流如图2-48所示。

image

image

思路2

如如图2-49所示,算法描述如下:

1)根据原始文件生成随机聚类中心向量(需指定聚类中心向量个数k),指定循环次数。
2)在map阶段,setup函数读取并初始化聚类中心向量,同时初始化聚类中心向量和;在map函数中读取每个记录,计算当前记录到各个聚类中心向量的距离,根据到聚类中心向量最小的聚类中心id判断该记录属于哪个类别,然后把所属的类别加入到聚类中心向量和中(需要记录个数及和,即需要自定义类型);在cleanup函数中输出所属聚类中心id和其对应的聚类中心向量和。

image

3)在reduce阶段,reduce函数接收相同聚类中心id的数据;把这些数据的每列进行求和,并记录每列的个数;计算新的聚类中心向量(每列的和除以每列的个数),然后输出聚类中心id和新的聚类中心向量。
4)判断前后两次聚类中心向量之间的误差是否小于某阈值;如果小于,则跳转到步骤5),否则跳转到步骤2)。
5)针对最后一次生成的聚类中心向量对原始数据进行分类,得到每个记录的类别。

其MR数据流如图2-50所示。

2.5.4 Hadoop K-Means编程实现

在下面的实现过程中,会进行简单实现思路介绍,针对一些实现会有动手实践给读者练习。一般情况下我们建议读者自己全部实现,对于实现起来有难度的读者,我们提供了参考程序,但是需要注意,参考程序不是完整的,里面设置了TODO提示,这些地方是需要读者去完善的。

image

image

不管是思路1还是思路2,Hadoop实现K-Means算法都包含4个步骤:①初始化聚类中心向量;②进行聚类并更新聚类中心向量;③判断是否达到循环条件,如果是则循环;④判断是否需要对原始数据进行分类,如果是则进行分类操作。下面就针对这4个步骤分别进行分析。

(1)初始化聚类中心向量:蓄水池抽样

初始化聚类中心其实和单机算法类似,可以有多种方法,比如随机取出k个聚类中心向量、直接取出前k个聚类中心向量等。在Hadoop的编程框架MapReduce限制下,如果是随机取k个聚类中心向量,那么实现起来就是这样的:遍历一次所有数据,统计数据个数n,再次遍历,按照k/n概率抽取k个数据。这样不是不可以,但是效率太低,并且如果真要实现起来,还是要考虑多个问题的,比如如果有多个Mapper怎么处理?

这里提出一种效率高,并且还能达到随机取数的算法—蓄水池抽样。

什么是蓄水池抽样呢?简单描述:先选中第1~k个元素,作为被选中的元素。然后依次对第k+1至第n个元素做如下操作:每个元素都有k/x的概率被选中,然后等概率地(1/k)替换掉被选中的元素(其中x是元素的序号)。其算法伪代码描述如代码清单2-33所示。

image

蓄水池抽样同样可以使用Driver、Mapper、Reducer来进行分析。Driver部分可以参考MapReduce程序的固定模式,但是需要注意,需要传入聚类中心向量的个数,即k值。其代码参考代码清单2-34。

image

Mapper就是蓄水池抽样算法的具体实现了,这里需要注意,map函数针对每条记录进行筛选,并不输出,所以这里在cleanup进行输出。这样就需要在setup里面初始化一个变量来存储当前已经被选为聚类中心向量的值。其各个函数描述如下。

image
image

在设计Reducer的时候需要考虑的一个问题是,如果有多个Mapper怎么办?多个Mapper就会发送k×N个聚类中心向量到Reducer中(其中N为Mapper的个数),所以在Reducer端需要对k×N个记录再次筛选,选出其中的k个聚类中心向量。这里当然也有多种方法,其实这里的选择和最开始我们在Mapper中针对所有数据随机选取k条记录的选择一样,这里所有数据只是“变”小了而已。因为是在Reducer中处理(一个Reducer可以理解为单机),所以其实也可以理解为单机的随机选择k条记录的算法。这里随机选择k条记录的算法也可以,不过我们这里还是选择使用蓄水池抽样。

这里只能使用一个Reducer,为什么?请读者思考。

动手实践:蓄水池抽样Hadoop实现

首先理解上面蓄水池抽样算法的Hadoop实现的描述及分析,接着新建工程,并参考上节完善工程代码功能。

实验步骤:

1)打开Eclipse,新建工程2.5_002_sample;
2)添加相关环境(如JDK路径、Hadoop路径等);
3)参考上节蓄水池抽样Hadoop实现原理实现编写源代码;
4)把工程编译,并导出jar包,然后上传jar包到master节点上,使用yarn jar的方式运行,查看输出及相关日志。

思考:

1)还有其他方式实现蓄水池抽样吗?
2)如何查看蓄水池抽样抽取出来的结果?
(2)更新聚类中心向量
更新聚类中心向量其实就是整个K-Means算法的核心所在,K-Means算法的每次循环其实就是一个不断更新聚类中心向量的过程。那么具体怎么更新呢?我们在单机算法中已经知道怎么更新了,怎么把其转换为Hadoop的MapReduce代码呢?其实,可以把每个Mapper理解为一个单机算法,因为其处理的数据其实是所有数据的一部分(一个文件块)。下面来看具体涉及的Driver、Mapper和Reducer。

针对Driver类,除了一些固定写法外,还需传入聚类初始中心向量路径、聚类中心个数、列分隔符(考虑是否需要?),其示例代码如代码清单2-36所示。

image

Reducer设置多个会有什么问题?可以设置多个吗?设置多个有什么好处?

Mapper的工作主要包括两个:其一,读取首次HDFS上的聚类中心;其二,根据聚类中心对每个键值对记录进行距离计算,输出距离最小的聚类中心ID以及该条键值对记录。下面针对具体实现做分析。

1)setup():读取传入的初始聚类中心向量路径,根据路径读取对应的数据,利用分隔符来对初始聚类中心向量进行初始化(初始化为数组和列表)。
2)map():在map阶段根据初始化的聚类中心向量对当前记录进行分类,输出其对应的聚类中心id、当前记录,如代码清单2-37所示。

image
image

image要做的工作就是针对每个组的所有数据计算其平均值(该平均值就是新的聚类中心向量)。其函数描述如下。

1)reduce():每个reduce函数针对同一个聚类中心id的数据进行处理;具体处理过程为,把每条记录对应列的值加起来,同时记录当前的记录数;接着,使用每列和除以记录数,即可得到每列平均值,也就是当前聚类中心id新的聚类中心,如代码清单2-38所示。

image

3)cleanup():输出每个类别新的聚类中心。

动手实践:Hadoop实现更新聚类中心向量

实验步骤如下:

1)打开Eclipse,打开上一小节完成的工程;
2)根据上节Hadoop实现更新聚类中心实现思路,编写对应源代码;
3)把工程编译并导出Jar包,然后上传Jar包到master节点上,使用yarn jar的方式运行,查看输出及相关日志。

思考:如何测试代码?

image
image
image

同时,需要覆写readFields、write函数,在这里针对数组类型还需要做些额外的处理。其处理过程为存储数组的长度,在实例化类的时候传入数组的长度,否则会报NullPointer的异常,如代码清单2-40所示。

image

写入或者读取时,注意顺序,顺序重要吗?如果乱序会有什么影响?请读者思考。

下面针对Mapper进行分析。

image
image

SumNumWritable构造函数如代码清单2-42所示。

image
image

Reducer只需要整合各个Mapper的输出记录,针对每个记录分别求“列和”、个数和,然后再求平均即可得到新的聚类中心向量和。各个函数描述如下。

image
image

(2)动手实践:Hadoop实现K-Means算法思路2

请读者参考思路1的动手实践,编写K-Means算法思路2的Hadoop实现。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
15天前
|
缓存 算法 大数据
大数据查询优化算法
【10月更文挑战第26天】
36 1
|
20天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
71 2
|
22天前
|
机器学习/深度学习 数据采集 算法
大数据中缺失值处理使用算法处理
【10月更文挑战第21天】
35 3
|
21天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
59 1
|
21天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
59 1
|
1月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
54 3
|
1月前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
40 2
|
1月前
|
SQL 分布式计算 大数据
大数据平台的毕业设计01:Hadoop与离线分析
大数据平台的毕业设计01:Hadoop与离线分析
104 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
57 0
下一篇
无影云桌面