我有以下方法来计算一个值的概率DataSet:
/**
private[this] def probs(x: DataSet[Double]): Seq[Double] = {
val counts = x.groupBy(_.doubleValue)
.reduceGroup(_.size.toDouble)
.name("X Probs")
.collect
val total = counts.sum
counts.map(_ / total)
}
问题是,当我提交我的flink作业时,使用这种方法,它会导致flink因任务而终止作业TimeOut。我正在为a上的每个属性执行此方法DataSet,只有40.000个实例和9个属性。
有没有办法让这个代码更有效率?
经过几次尝试,我使用它mapPartition,这个方法是一个类的一部分InformationTheory,它做一些计算来计算熵,互信息等。所以,例如,SymmetricalUncertainty计算如下:
/**
def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.mapPartitionWith {
case in ⇒
val x = in map (_._2)
val y = in map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
Some(2 * mu / (Hx + Hy))
}
su.collect.head.head
}
有了这个,我可以有效地计算entropy,互信息等。问题是,它只适用于1的并行度,问题在于mapPartition。
有没有办法可以做一些类似于我在这里做的事情SymmetricalUncertainty,但是有什么级别的并行性?
它使用n级并行:
def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.reduceGroup { in ⇒
val invec = in.toVector
val x = invec map (_._2)
val y = invec map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
2 * mu / (Hx + Hy)
}
su.collect.head
}
您可以在InformationTheory.scala及其测试InformationTheorySpec.scala中查看整个代码
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。