python spark kmeans demo

简介:

官方的demo

复制代码
from numpy import array
from math import sqrt

from pyspark import SparkContext

from pyspark.mllib.clustering import KMeans, KMeansModel

sc = SparkContext(appName="clusteringExample")
# Load and parse the data
data = sc.textFile("/root/spark-2.1.1-bin-hadoop2.6/data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
#clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
#sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
复制代码

 带归一化的例子:

复制代码
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions.{col, udf}

case class DataRow(label: Double, x1: Double, x2: Double)
val data = sqlContext.createDataFrame(sc.parallelize(Seq(
    DataRow(3, 1, 2),
    DataRow(5, 3, 4),
    DataRow(7, 5, 6),
    DataRow(6, 0, 0)
)))

val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
val t = udf { (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2)) }
val result = data.select(col("label"), t(col("x1"), col("x2")))

The important part are the last two lines.

    Creates a UDF (user-defined function) which can be directly applied to Dataframe columns (in this case, the two columns x1 and x2).

    Selects the label column along with the UDF applied to the x1 and x2 columns. Since the UDF will predict closestCluster, after this result will be a Dataframe consisting of (label, closestCluster)
复制代码

参考:https://stackoverflow.com/questions/31447141/spark-mllib-kmeans-from-dataframe-and-back-again

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering._

val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
val vectors = rows.map(r => Vectors.dense(r._1, r._2))
val kMeansModel = KMeans.train(vectors, 3, 20)
val predictions = rows.map{r => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))}
val df = predictions.toDF("id", "cluster")
df.show

Create column from RDD

It's very easy to obtain pairs of ids and clusters in form of RDD:

val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)

Then you create DataFrame from that

val idCluster = idClusterRDD.toDF("id", "cluster")

It works because map doesn't change order of the data in RDD, which is why you can just zip ids with results of prediction.

Use UDF (User Defined Function)

Second method involves using clusters.predict method as UDF:

val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
    bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)

Now we can use it to add predictions to data:

val idCluster = data.selectExpr("id", "predict(x, y) as cluster")

Keep in mind that Spark API doesn't allow UDF deregistration. This means that closure data will be kept in the memory.
















本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/bonelee/p/7229115.html,如需转载请自行联系原作者

相关文章
|
5月前
|
Python
python3之flask快速入门教程Demo
python3之flask快速入门教程Demo
80 6
|
3月前
|
机器学习/深度学习 算法 数据挖掘
【机器学习】Python详细实现基于欧式Euclidean、切比雪夫Chebyshew、曼哈顿Manhattan距离的Kmeans聚类
文章详细实现了基于不同距离度量(欧氏、切比雪夫、曼哈顿)的Kmeans聚类算法,并提供了Python代码,展示了使用曼哈顿距离计算距离矩阵并输出k=3时的聚类结果和轮廓系数评价指标。
67 1
|
4月前
|
机器学习/深度学习 数据采集 算法
Python基于KMeans算法进行文本聚类项目实战
Python基于KMeans算法进行文本聚类项目实战
179 19
|
3月前
|
数据采集 自然语言处理 数据可视化
基于python数据挖掘在淘宝评价方面的应用与分析,技术包括kmeans聚类及情感分析、LDA主题分析
本文探讨了基于Python数据挖掘技术在淘宝评价分析中的应用,涵盖了数据采集、清洗、预处理、评论词频分析、情感分析、聚类分析以及LDA主题建模和可视化,旨在揭示淘宝客户评价中的潜在模式和情感倾向,为商家和消费者提供决策支持。
|
4月前
|
数据采集 机器学习/深度学习 算法
Python实现用PSO粒子群优化算法对KMeans聚类模型进行优化项目实战
Python实现用PSO粒子群优化算法对KMeans聚类模型进行优化项目实战
108 2
|
4月前
|
机器学习/深度学习 算法 数据挖掘
Python实现聚类(Kmeans)分析客户分组
Python实现聚类(Kmeans)分析客户分组
144 0
Python实现聚类(Kmeans)分析客户分组
|
4月前
|
分布式计算 Apache Spark
|
6月前
|
机器学习/深度学习 分布式计算 数据处理
在Python中应用Spark框架
在Python中应用Spark框架
61 1
|
5月前
|
分布式计算 Shell 调度
看看airflow怎样调度python写的spark任务吧
看看airflow怎样调度python写的spark任务吧
81 0
|
5月前
|
算法 数据可视化 Python
【KMeans】Python实现KMeans算法及其可视化
【KMeans】Python实现KMeans算法及其可视化