简 介:下面是我在学习时候的记录并加上自己的理解。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。
关键词:Python、机器学习、K-Means聚类
""" * Created with PyCharm * 作者: 阿光 * 日期: 2021/7/18 * 时间: 14:19 * 描述: 利用numpy自己实现K-Means算法 """ import random import warnings import numpy as np import pandas as pd from Draw import draw warnings.filterwarnings('ignore') # 加载数据 data = pd.read_csv("../../data/cluster_500-10_7.csv", encoding="gbk") X = data.iloc[:, 1:-2].values y = data.iloc[:, -1].values.flatten() cluster = 7 # 定义簇的个数 iter_ = 500 # 每轮的k-means的迭代次数 epochs = 20 # 模型的迭代轮数 label_ = np.zeros(len(X)) # 定义样本分类集合,代表每个样本所属的簇类别,[0,1,4,2] best_label = [] # 保存最好的聚类效果 best_score = 99999 # 保存模型最优效果下的DB指数 def kmeans(x): """ * 描述: K-Means算法的入口 * 参数: x即样本数据 * 返回值: """ # 随机选择k个样本作为初始向量 # test = random.sample(range(0, x.shape[0]), cluster) # print(test) x_center = x[random.sample(range(0, x.shape[0]), cluster), :] # x_center = init_cluster(X, cluster) # 新的簇的向量中心 new_center = x_center.copy() k = 0 # 记录迭代轮数 # 进行迭代更新,只要上一次和本次的簇中心不一致就继续迭代 while True: if ((x_center == new_center).sum() == x_center.shape[0] * x_center.shape[1] and k != 0) or k > iter_: break x_center = new_center k += 1 # 计算样本到簇中心的距离 matrix = distance(x_center, x) # 更新每个样本的所属簇分类 update_cluster(matrix) # 根据新的样本划分情况,更新每个簇的中心 new_center = update_center(x) # print("聚类结束,共迭代{}轮".format(k)) def run(X): """ * 描述: k-means算法的主调函数 * 参数: X为样本数据 * 返回值: """ # 迭代多个epoch,根据DBI指数选取最好的结果 for epoch in range(epochs): kmeans(X) # 调用kmeans # 计算DBI指数 DBI = optimizer_score(cluster, X) global best_score, best_label # 如果当前轮数的效果好于上一次的,记录最优的DBI指数,以及最好的聚类结果 if DBI < best_score: best_score = DBI best_label = label_ def init_cluster(X, cluster): x_center = np.zeros((cluster, X.shape[1])) i = 0 res = set() while True: if i == 0: index = random.sample(range(0, X.shape[0]), 1) res.add(index[0]) x_center[i, :] = X[index] i += 1 else: distance_ = np.zeros((X.shape[0], i)) for j in range(i): row = x_center[j, :] distance_[:, j] = np.sqrt(((X - row) ** 2).sum(axis=1)) distance_ = (distance_ - distance_.mean(axis=0)) / distance_.std(axis=0) rank = np.argsort(distance_, axis=0) rank = (rank - rank.mean(axis=0)) / rank.std(axis=0) loc = np.sum(rank, axis=1) # distance_ = np.sum(distance_, axis=1) # location = np.where(distance_ == distance_.max()) index = np.argmax(loc) print(res) if index not in res: x_center[i, :] = X[index] res.add(index) i += 1 else: while index < X.shape[0] - 2: index = index + 1 if index not in res: x_center[i, :] = X[index] res.add(index) i += 1 break if i == cluster: break return x_center def optimizer_score(cluster, X): """ * 描述: 计算DBI优化指标 * 参数: cluster:聚类簇数 X:样本数据 * 返回值: 模型的DBI指数 """ # 用于计算簇内平均值 def avg_C(label): distance_ = np.zeros((X[label_ == label, :].shape[0], X[label_ == label, :].shape[0])) for i, row in enumerate(X[label_ == label, :]): distance_[i, :] = np.sqrt(np.sum((X[label_ == label, :] - row) ** 2, axis=1)) distance_ = distance_.sum() / (X.shape[0] * (X.shape[0] - 1)) return distance_ # 用于计算簇间中心的距离 def d_cen(cluster_i, cluster_j): X_i = X[label_ == cluster_i, :] X_j = X[label_ == cluster_j, :] X_i_mean = X_i.mean(axis=0) x_j_mean = X_j.mean(axis=0) return np.sqrt(((X_i_mean - x_j_mean) ** 2).sum()) DBI = 0 # 迭代每个簇,计算每个簇的DB指数,然后求平均值 for i in range(cluster): dis_list = [] for j in range(cluster): if i != j: dis_list.append((avg_C(i) + avg_C(j)) / d_cen(i, j)) DBI += max(dis_list) return DBI / cluster def distance(center, x): """ * 描述: 计算每个样本到簇中心的距离 * 参数: center:簇中心,形状为(10,5)10个簇,5个特征属性 x:数据样本,形状(100,5),100个样本,5个特征 * 返回值: 返回每个样本到各个簇中心的距离,形状为(100,10),每一行代表每个样本到各个簇的距离 """ matrix = np.random.rand(x.shape[0], center.shape[0]) # 迭代每一行,进行计算到各个簇中心的距离 for i, row in enumerate(x): matrix[i, :] = np.sum((row - center) ** 2, axis=1) # 平方差,然后按照每一行进行求和 return matrix def update_cluster(matrix): """ * 描述: 更新每个样本所属的簇分类 * 参数: matrix就是样本到簇中心的距离矩阵 * 返回值: """ global label_ # 返回每行的最小值的下标,即返回距离最近的簇中心所属的类别 label_ = np.argmin(matrix, axis=1) def update_center(x): """ * 描述: 更新每个簇的中心 * 参数: x,样本数据 * 返回值: """ new_center = np.zeros((cluster, x.shape[1])) # 依次迭代每个类别下的数据,计算该类数据的均值 for category in range(cluster): new_center[category, :] = x[label_ == category, :].mean(axis=0) return new_center if __name__ == "__main__": run(X) # 调用算法 draw(X, y, best_label, cluster) # 将聚类结果可视化