开发者社区> 问答> 正文

优化Flink转换

flink小助手 2018-12-10 13:58:39 332

我有以下方法来计算一个值的概率DataSet:

/**

  • Compute the probabilities of each value on the given [[DataSet]]
    *
  • @param x single colum [[DataSet]]
  • @return Sequence of probabilites for each value
    */

private[this] def probs(x: DataSet[Double]): Seq[Double] = {

    val counts = x.groupBy(_.doubleValue)
      .reduceGroup(_.size.toDouble)
      .name("X Probs")
      .collect

    val total = counts.sum

    counts.map(_ / total)

}
问题是,当我提交我的flink作业时,使用这种方法,它会导致flink因任务而终止作业TimeOut。我正在为a上的每个属性执行此方法DataSet,只有40.000个实例和9个属性。

有没有办法让这个代码更有效率?

经过几次尝试,我使用它mapPartition,这个方法是一个类的一部分InformationTheory,它做一些计算来计算熵,互信息等。所以,例如,SymmetricalUncertainty计算如下:

/**

  • Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
    *
  • It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
    *
  • @param xy [[DataSet]] with two features
  • @return SU value
    */

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

val su = xy.mapPartitionWith {
  case in ⇒
    val x = in map (_._2)
    val y = in map (_._1)

    val mu = mutualInformation(x, y)
    val Hx = entropy(x)
    val Hy = entropy(y)

    Some(2 * mu / (Hx + Hy))
}

su.collect.head.head

}
有了这个,我可以有效地计算entropy,互信息等。问题是,它只适用于1的并行度,问题在于mapPartition。

有没有办法可以做一些类似于我在这里做的事情SymmetricalUncertainty,但是有什么级别的并行性?

流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:15

    它使用n级并行:

    def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

    val su = xy.reduceGroup { in ⇒
        val invec = in.toVector
        val x = invec map (_._2)
        val y = invec map (_._1)
    
        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)
    
        2 * mu / (Hx + Hy)
    }
    
    su.collect.head

    }
    您可以在InformationTheory.scala及其测试InformationTheorySpec.scala中查看整个代码

    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题