简 介:下面是我在学习时候的记录并加上自己的理解。本文意在记录自己近期学习过程中的所学所得,如有错误,欢迎大家指正。
关键词:Python、机器学习、DBSCAN
自己实现DBSCAN算法,需要对两个参数ξ和Minpt的选取选取进行说明,语言不限。要能支持多维数组,距离用欧式距离。
""" * Created with PyCharm * 作者: 阿光 * 日期: 2021/7/18 * 时间: 21:53 * 描述: 使用numpy实现DBSCAN """ import warnings import numpy as np import pandas as pd from Draw import draw warnings.filterwarnings('ignore') epsilon = 4 # 定义邻域半径 M = 5 # 定义阈值 k = 0 # 类别标签 # 加载数据 data = pd.read_csv("../../data/cluster_500-10_7.csv", encoding="gbk") X = data.iloc[:, 1:-2].values y = data.iloc[:, -1].values.flatten() label_ = np.zeros(len(X)) def dbscan(X): """ * 描述: DBSCAN算法的主入口 * 参数: X:样本数据 * 返回值: """ # 获取核心对象 coreObjectList = get_coreobject(X) # 开始迭代每个核心对象,进行划圈 iter_coreobject(coreObjectList, X) def iter_coreobject(coreObjectList, X): """ * 描述: 迭代每个核心对象,然后将其可达的对象划为一簇 * 参数: coreObjectList:核心列表对象 X:样本数据 * 返回值: """ def get_epsilonobject(row): """ * 描述: 获取某个样本邻域内的样本 * 参数: row:核心对象 * 返回值: 邻域内的样本的索引 """ return np.where(np.sqrt((((X - row) ** 2).sum(axis=1))) <= epsilon)[0] def distance_epsilon(coreobject, X): """ * 描述: 判断某一核心对象邻域内是否有核心对象,如果有重复该步骤,如果不是将其划为一簇 * 参数: coreobject:核心对象 X:样本数据 * 返回值: """ # 保存某一簇内的样本 queue = [coreobject] # 判断队列中是否还存在数据 while len(queue) != 0: object = X[queue.pop(0), :] # 判断其是否是核心对象,如果是将它的邻域内的样本也加入队列 if judge_coreobject(X, object): nonlocal noAccess # 获取上一层作用域的变量 # 获取未访问和该核心对象邻域内样本的交集 delta = set(noAccess).intersection(set(get_epsilonobject(object))) # 将邻域内的样本添加到队列 queue.extend(delta) # 将已经访问的数据从noAccess中去掉 noAccess = list(set(noAccess).difference(set(delta))) # 记录未被访问过的数据 noAccess = list(range(X.shape[0])) # 判断是否还有核心对象可以进行迭代 while len(coreObjectList) != 0: # 为了获取本次迭代的数据 AccessOld = noAccess.copy() # 取出一个核心对象 coreobject = coreObjectList.pop(0) # 扩充领域,添加邻域内的样本 distance_epsilon(coreobject, X) # 取得本轮划分簇的数据,就是本次迭代到的数据 cluster_k = list(set(AccessOld).difference(set(noAccess))) global k # 将本次迭代到的数据划分成一簇 label_[[cluster_k]] = k # 将所有已经访问过的数据从noAccess中移除 coreObjectList = list(set(coreObjectList).difference(set(cluster_k))) # 将簇+1 k += 1 def judge_coreobject(X, row): """ * 描述: 判断某一样本是否为核心对象 * 参数: X:样本数据 row:待判断的样本数据 * 返回值: 返回true和false """ # 判断在邻域距离内的样本数是否达到阈值 if (np.sqrt(((X - row) ** 2).sum(axis=1)) <= epsilon).sum() - 1 >= M: return True return False def get_coreobject(x): """ * 描述: 迭代数据集,获取核心对象列表 * 参数: x:样本数据 * 返回值: 核心对象列表 """ # 初始化列表 coreObjectList = [] # 判断是否是核心对象 for i, row in enumerate(x): if judge_coreobject(X, row): coreObjectList.append(i) return coreObjectList if __name__ == "__main__": dbscan(X) draw(X, y, label_, 7)