k-medoids聚类算法,即k-中心聚类算法,它是基于k-means聚类算法的改进。我们知道,k-means算法执行过程,首先需要随机选择初始质心,只有第一次随机选择的初始质心才是实际待聚类点集中的点,而后续将非质心点指派到对应的质心点后,重新计算得到的质心并非是待聚类点集中的点,而且如果某些非质心点是离群点的话,导致重新计算得到的质心可能偏离整个簇,为了解决这个问题,提出了改进的k-medoids聚类算法。
k-medoids聚类算法也是通过划分的方式来计算得到聚类结果,它使用绝对差值和(Sum of Absolute Differences,SAD)的度量来衡量聚类结果的优劣,在n维欧几里德空间中,计算SAD的公式如下所示:
围绕中心点划分(Partitioning Around Medoids,PAM)的方法是比较常用的,使用PAM方法进行处理,可以指定一个最大迭代次数的参数,在迭代过程中基于贪心策略来选择使得聚类的质量最高的划分。使用PAM的方法处理,每次交换一个中心点和非中心点,然后执行将非中心点指派到最近的中心点,计算得到的SAD值越小,则聚类质量越好,如此不断地迭代,直到找到一个最好的划分。
维基百科上给出的基于PAM方法计算聚类的过程,描述如下:
- 从待聚类的数据点集中随机选择k个点,作为初始中心点;
- 将待聚类的数据点集中的点,指派到最近的中心点;
- 进入迭代,直到聚类的质量满足指定的阈值(可以通过计算SAD),使总代价减少:
- 对每一个中心点o,对每一个非中心点p,执行如下计算步骤:
- 交换点o和p,重新计算交换后的该划分所生成的代价值;
- 如果本次交换造成代价增加,则取消交换。
上面算法描述,应该是按顺序的取遍中心点集合中的点,也从非中心点集合中取遍所有非中心点,分别计算生成的新划分的代价。由于待聚类的点集可大可小,我们可以考虑,每次取点的时候,采用随机取点的策略,随机性越强越好,只要满足最终迭代终止的条件即可。通常,如果能够迭代所有情况,那么最终得到的划分一定是最优的划分,即聚类结果最好,这通常适用于聚类比较小的点的集合。但是如果待聚类的点的集合比较大,则需要通过限制迭代次数来终止迭代计算,从而得到一个能够满足实际精度需要的聚类结果。
我们在下面实现k-medoids聚类算法,分别随机选择中心点和非中心点,对他们进行交换,通过设置允许最大迭代次数(maxIterations)这个参数值,来使聚类计算最后停止。
聚类算法实现
首先,为了便于理解后面的代码实现,我们描述一下代码实现聚类过程的基本步骤,如下所示:
- 输入待聚类点集,以及参数k、maxIterations、parallism;
- 同k-means算法一样,随机选择初始中心点集合;
- 启动parallism个线程,用来将非中心点指派给最近的中心点;
- 开始执行迭代,使得聚类结果对应的划分的SAD值最小:
- 将非中心点,基于Round-Robin策略,分配给多个线程,并行指派:将非中心点指派给距离其最近的中心点;
- 将多个线程指派的局部结果进行合并,得到一个全局的指派结果;
- 根据指派结果计算SAD值:如果是第一次进行指派,直接计算其SAD值,保存在previousSAD变量中,该变量保存的是最小的SAD值,第一次初始化第一次指派结果计算得到的SAD值;如果不是第一次进行指派,也计算SAD值,将SAD值保存在变量currentSAD中,继续执行步骤8;
- 随机选择一个非中心点;
- 创建一个ClusterHolder对象,该对象保存了该轮迭代指派结果,根据随机选择的非中心点修改ClusterHolder对象中的结果,将随机选择非中心点和对应的中心点进行交换,为下一轮指派过程准备数据;
- 最后,判断是否达到指定的最大迭代次数,如果达到则终止计算,处理最终聚类结果,否则执行下一轮迭代计算,转步骤5。
我们实现的k-medoids聚类算法,需要指定2个聚类相关参数,另外一个参数是程序计算并行度,可以通过构造方法看到,代码如下所示:
1 |
public KMedoidsClustering( int k, int maxIterations, int parallism) { |
2 |
super (k, maxIterations, parallism); |
3 |
distanceCache = new DistanceCache(Integer.MAX_VALUE); |
4 |
executorService = Executors.newCachedThreadPool( new NamedThreadFactory( "SEEKER" )); |
5 |
latch = new CountDownLatch(parallism); |
上面代码中的参数含义如下:
- k:聚类最终想要得到的簇的个数
- maxIterations:因为k-medoids聚类算法的最终目标是最小化SAD的值,所以聚类算法执行迭代的次数越大,最终的结果可能越接近最优,如果是对一个不大的点集进行聚类,可以设置该参数的值大一些
- parallism:每一次迭代过程中,我们都需要将非中心点(Non-medoid Point)指派到最近的中心点,所以将原待聚类点集划分成多组,有多个处理线程并行处理可能速度会更快,该参数就是并行度
聚类实现的核心代码如下所示:
002 |
public void clustering() { |
004 |
FileUtils.read2DPointsFromFiles(allPoints, "[\t,;\\s]+" , inputFiles); |
005 |
LOG.info( "Total points: count=" + allPoints.size()); |
007 |
ClusterHolder currentHolder = new ClusterHolder(); |
008 |
ClusterHolder previousHolder = null ; |
010 |
currentHolder.medoids = initialCentroidsSelectionPolicy.select(k, allPoints); |
011 |
LOG.info( "Initial selected medoids: " + currentHolder.medoids); |
014 |
for ( int i = 0 ; i < parallism; i++) { |
015 |
final NearestMedoidSeeker seeker = new NearestMedoidSeeker(seekerQueueSize); |
016 |
executorService.execute(seeker); |
024 |
boolean firstTimeToAssign = true ; |
025 |
int numIterations = 0 ; |
026 |
double previousSAD = 0.0 ; |
027 |
double currentSAD = 0.0 ; |
029 |
while (!finallyCompleted) { |
031 |
LOG.debug( "Current medoid set: " + currentHolder.medoids); |
032 |
if (firstTimeToAssign) { |
034 |
assignNearestMedoids(currentHolder, true ); |
035 |
firstTimeToAssign = false ; |
038 |
assignNearestMedoids(currentHolder, false ); |
042 |
mergeMedoidAssignedResult(currentHolder); |
043 |
LOG.debug( "Merged result: " + currentHolder.medoidWithNearestPointSet); |
046 |
if (previousSAD == 0.0 ) { |
048 |
previousSAD = currentSAD; |
049 |
currentSAD = computeSAD(currentHolder); |
051 |
RandomPoint randomPoint = selectNonCenterPointRandomly(currentHolder); |
052 |
LOG.debug( "Randomly selected: " + randomPoint); |
055 |
currentSAD = computeSAD(currentHolder); |
057 |
if (currentSAD - previousSAD < 0.0 ) { |
058 |
previousHolder = currentHolder; |
059 |
previousSAD = currentSAD; |
063 |
currentHolder = constructNewHolder(currentHolder, randomPoint); |
065 |
LOG.info( "Iteration #" + (++numIterations) + ": previousSAD=" + previousSAD + ", currentSAD=" + currentSAD); |
067 |
if (numIterations > maxIterations) { |
068 |
finallyCompleted = true ; |
070 |
} catch (Exception e) { |
071 |
Throwables.propagate(e); |
074 |
if (!finallyCompleted) { |
075 |
latch = new CountDownLatch(parallism); |
076 |
completeToAssignTask = false ; |
079 |
synchronized (signalLock) { |
080 |
signalLock.notifyAll(); |
082 |
} catch (InterruptedException e) {} |
086 |
LOG.info( "Shutdown executor service: " + executorService); |
087 |
executorService.shutdown(); |
091 |
centerPointSet.addAll(previousHolder.medoids); |
092 |
Iterator<Entry<CenterPoint, List<Point2D>>> iter = previousHolder.medoidWithNearestPointSet.entrySet().iterator(); |
093 |
while (iter.hasNext()) { |
094 |
Entry<CenterPoint, List<Point2D>> entry = iter.next(); |
095 |
int clusterId = entry.getKey().getId(); |
096 |
Set<ClusterPoint<Point2D>> set = Sets.newHashSet(); |
097 |
for (Point2D p : entry.getValue()) { |
098 |
set.add( new ClusterPoint2D(p, clusterId)); |
100 |
clusteredPoints.put(clusterId, set); |
通过上面代码及其注释,我们可以了解到聚类实现的基本处理流程。首先,看一下工具类ClusterHolder和RandomPoint:
01 |
private class ClusterHolder { |
03 |
/** snapshot of clustering result: medoids of clustering result, as well as non-medoid points */ |
04 |
private TreeMap<CenterPoint, List<Point2D>> medoidWithNearestPointSet; |
05 |
/** center point set represented by Point2D */ |
06 |
private Set<Point2D> centerPoints; |
07 |
/** center point set represented by CenterPoint */ |
08 |
private TreeSet<CenterPoint> medoids; |
10 |
public ClusterHolder() { |
15 |
private class RandomPoint { |
16 |
/** medoid which the random point belongs to */ |
17 |
private final CenterPoint medoid; |
18 |
/** a non-medoid point selected randomly */ |
19 |
private final Point2D point; |
21 |
public RandomPoint(CenterPoint medoid, Point2D point) { |
28 |
public String toString() { |
29 |
return "RandomPoint[medoid=" + medoid + ", point=" + point + "]" ; |
上面2个类,能够在迭代处理过程中,方便地保存当前迭代处理的数据状态。下面我们看一下,上面代码调用的比较重要的方法的实现逻辑。
将非中心点指派到最近的中心点的计算,是调用assignNearestMedoids方法,该方法的代码实现,如下所示:
01 |
private void assignNearestMedoids( final ClusterHolder holder, boolean firstTimeToAssign) { |
02 |
LOG.debug( "firstTimeToAssign=" + firstTimeToAssign); |
05 |
if (firstTimeToAssign) { |
06 |
holder.centerPoints = Sets.newHashSet(); |
07 |
for (CenterPoint medoid : holder.medoids) { |
08 |
holder.centerPoints.add(medoid.toPoint()); |
10 |
LOG.debug( "holder.centerPoints: " + holder.centerPoints); |
12 |
for (Point2D p : allPoints) { |
13 |
LOG.debug( "Assign point: " + p); |
14 |
if (!holder.centerPoints.contains(p)) { |
15 |
selectSeeker().q.put( new Task(holder.medoids, p)); |
19 |
for (List<Point2D> points : holder.medoidWithNearestPointSet.values()) { |
20 |
for (Point2D p : points) { |
21 |
selectSeeker().q.put( new Task(holder.medoids, p)); |
25 |
} catch (Exception e) { |
26 |
Throwables.propagate(e); |
29 |
completeToAssignTask = true ; |
31 |
} catch (InterruptedException e) { } |
上面代码调用selectSeeker()方法,获取到一个NearestMedoidSeeker线程,将待指派的点加入到其队列中,然后由该线程去异步循环处理。selectSeeker()方法实现代码,如下所示:
1 |
private NearestMedoidSeeker selectSeeker() { |
2 |
int index = taskIndex++ % parallism; |
3 |
return seekers.get(index); |
下面,我们看一下NearestMedoidSeeker线程的实现,它也比较简单,实现了从队列q中将非中心点取出,计算到该点最近的中心点,然后指派给该中心点,线程实现代码如下所示:
01 |
private class NearestMedoidSeeker implements Runnable { |
03 |
private final Log LOG = LogFactory.getLog(NearestMedoidSeeker. class ); |
04 |
private final BlockingQueue<Task> q; |
05 |
private Map<CenterPoint, List<Point2D>> clusteringNearestPoints = Maps.newHashMap(); |
06 |
private int processedTasks = 0 ; |
08 |
public NearestMedoidSeeker( int qsize) { |
09 |
q = new LinkedBlockingQueue<Task>(qsize); |
14 |
while (!finallyCompleted) { |
18 |
} catch (Exception e) { |
24 |
private void assign() throws InterruptedException { |
26 |
LOG.debug( "Q size: " + q.size()); |
27 |
while (!(q.isEmpty() && completeToAssignTask)) { |
29 |
final Task task = q.poll(); |
31 |
final Point2D p1 = task.point; |
32 |
double minDistance = Double.MAX_VALUE; |
33 |
CenterPoint nearestMedoid = null ; |
34 |
for (CenterPoint medoid : task.medoids) { |
35 |
final Point2D p2 = medoid.toPoint(); |
36 |
Double distance = distanceCache.computeDistance(p1, p2); |
37 |
if (distance < minDistance) { |
38 |
minDistance = distance; |
39 |
nearestMedoid = medoid; |
42 |
LOG.debug( "Nearest medoid seeked: point=" + p1 + ", medoid=" + nearestMedoid); |
44 |
List<Point2D> points = clusteringNearestPoints.get(nearestMedoid); |
46 |
points = Lists.newArrayList(); |
47 |
clusteringNearestPoints.put(nearestMedoid, points); |
54 |
} catch (Exception e) { |
58 |
LOG.debug( "Point processed: processedTasks=" + processedTasks); |
60 |
synchronized (signalLock) { |
64 |
clusteringNearestPoints = Maps.newHashMap(); |
每一轮指派,多个线程都计算得到一个非中心点指派到最近中心点的子集,最后还要将这些子集合并为一个全局的指派结果,即得到距离每个中心点最近的非中心点的集合,合并的实现在mergeMedoidAssignedResult()方法中,代码如下所示:
01 |
private void mergeMedoidAssignedResult(ClusterHolder currentHolder) { |
02 |
currentHolder.medoidWithNearestPointSet = Maps.newTreeMap(); |
03 |
for (NearestMedoidSeeker seeker : seekers) { |
04 |
LOG.debug( "seeker.clusteringNearestPoints: " + seeker.clusteringNearestPoints); |
05 |
Iterator<Entry<CenterPoint, List<Point2D>>> iter = seeker.clusteringNearestPoints.entrySet().iterator(); |
06 |
while (iter.hasNext()) { |
07 |
Entry<CenterPoint, List<Point2D>> entry = iter.next(); |
08 |
List<Point2D> set = currentHolder.medoidWithNearestPointSet.get(entry.getKey()); |
10 |
set = Lists.newArrayList(); |
11 |
currentHolder.medoidWithNearestPointSet.put(entry.getKey(), set); |
13 |
set.addAll(entry.getValue()); |
合并后的指派结果,都存放在ClusterHolder对象中,为下一轮迭代准备了数据。
随机选择一个中心点和非中心点,实现代码如下所示:
1 |
private RandomPoint selectNonCenterPointRandomly(ClusterHolder holder) { |
2 |
List<CenterPoint> medoids = new ArrayList<CenterPoint>(holder.medoidWithNearestPointSet.keySet()); |
3 |
CenterPoint selectedMedoid = medoids.get(random.nextInt(medoids.size())); |
5 |
List<Point2D> belongingPoints = holder.medoidWithNearestPointSet.get(selectedMedoid); |
6 |
Point2D point = belongingPoints.get(random.nextInt(belongingPoints.size())); |
7 |
return new RandomPoint(selectedMedoid, point); |
因为每一次迭代,我们都得到一个非中心点指派到最近的中心点的聚类结果集合,所以在设计随机选择中心点和非中心点进行交换时,我们首先从中心点集合中选择一个中心点,然后再从该中心点对应的非中心点的簇的集合中选择一个非中心点,当然也可以考虑用其他的方法,比如,在一次迭代过程中,待交换的中心点和非中心点不在同一个簇中。
- 创建ClusterHolder对象,交换非中心点和中心点
我们处理的策略是,事后处理,也就是每次先实现非中心点和中心点的交换,再进行指派,计算SAD值,如果此轮迭代得到的SAD值比上一轮的大,则直接丢弃结果,将上一轮的指派结果作为最终候选结果,直到最后,保留着具有最小SAD值的指派结果。
交换中心点和非中心点,我们创建了一个ClusterHolder对象,然后在该对象所持有的集合上进行修改贾环。交换非中心点和中心点的实现代码,如下所示:
01 |
private ClusterHolder constructNewHolder( final ClusterHolder holder, RandomPoint randomPoint) { |
02 |
ClusterHolder newHolder = new ClusterHolder(); |
06 |
newHolder.centerPoints = Sets.newHashSet(); |
07 |
for (CenterPoint c : holder.medoidWithNearestPointSet.keySet()) { |
08 |
newHolder.centerPoints.add(c.toPoint()); |
11 |
Point2D newPoint = randomPoint.point; |
12 |
CenterPoint oldMedoid = randomPoint.medoid; |
16 |
CenterPoint newMedoid = new CenterPoint(oldMedoid.getId(), newPoint); |
19 |
newHolder.centerPoints.remove(oldMedoid.toPoint()); |
20 |
newHolder.centerPoints.add(newPoint); |
22 |
newHolder.medoids = Sets.newTreeSet(); |
23 |
newHolder.medoids.addAll(holder.medoidWithNearestPointSet.keySet()); |
24 |
newHolder.medoids.remove(oldMedoid); |
25 |
newHolder.medoids.add(newMedoid); |
28 |
newHolder.medoidWithNearestPointSet = Maps.newTreeMap(); |
29 |
newHolder.medoidWithNearestPointSet.putAll(holder.medoidWithNearestPointSet); |
30 |
List<Point2D> oldPoints = newHolder.medoidWithNearestPointSet.get(oldMedoid); |
31 |
oldPoints.remove(newPoint); |
32 |
oldPoints.add(oldMedoid.toPoint()); |
33 |
newHolder.medoidWithNearestPointSet.put(newMedoid, oldPoints); |
为了保留上一次迭代指派的结果,这里不要修改holder对应的结果的集合(holder是本次迭代得到的聚类结果),而是拷贝出一份,在拷贝的结果上交换中心点和非中心点。
聚类效果对比
我们分别使用k-medoids算法与k-means算法对同一个点集进行聚类,分别对结果进行比较。其中,k-means算法由于随机选择初始质心,每次执行聚类结果不同;而k-medoids算法的聚类结果质量依赖于迭代次数,所以我们选择不同的迭代次数:取值maxIterations分别为300、1000、3000时,对比效果,如下图所示:
上图中,第一排3个图是k-means聚类得到的3个结果,第二排是k-medoids聚类得到的结果。通过上图可以看出,使用k-medoids聚类算法,当maxIterations越大的时候,可能更加靠近最优解,聚类结果的质量越高,此时对应的质量的度量SAD的值就越小。