/**

• Compute the probabilities of each value on the given [[DataSet]]
*
• @param x single colum [[DataSet]]
• @return Sequence of probabilites for each value
*/

private[this] def probs(x: DataSet[Double]): Seq[Double] = {

``````    val counts = x.groupBy(_.doubleValue)
.reduceGroup(_.size.toDouble)
.name("X Probs")
.collect

val total = counts.sum

counts.map(_ / total)``````

}

/**

• Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
*
• It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
*
• @param xy [[DataSet]] with two features
• @return SU value
*/

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

``````val su = xy.mapPartitionWith {
case in ⇒
val x = in map (_._2)
val y = in map (_._1)

val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)

Some(2 * mu / (Hx + Hy))
}

}

1 条回答

它使用n级并行：

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

``````val su = xy.reduceGroup { in ⇒
val invec = in.toVector
val x = invec map (_._2)
val y = invec map (_._1)

val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)

2 * mu / (Hx + Hy)
}

}
您可以在InformationTheory.scala及其测试InformationTheorySpec.scala中查看整个代码

2019-07-17 23:19:15
赞同 展开评论 打赏

44
1
0
17
1
0
20
0
0
5
1
0
12
0
0
10
1
0
17
0
0
24
1
0
47
1
0
34
3
0

## 相关电子书

Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载