优化Flink转换-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

优化Flink转换

我有以下方法来计算一个值的概率DataSet:

/**

  • Compute the probabilities of each value on the given [[DataSet]]
    *
  • @param x single colum [[DataSet]]
  • @return Sequence of probabilites for each value
    */

private[this] def probs(x: DataSet[Double]): Seq[Double] = {

    val counts = x.groupBy(_.doubleValue)
      .reduceGroup(_.size.toDouble)
      .name("X Probs")
      .collect

    val total = counts.sum

    counts.map(_ / total)

}
问题是,当我提交我的flink作业时,使用这种方法,它会导致flink因任务而终止作业TimeOut。我正在为a上的每个属性执行此方法DataSet,只有40.000个实例和9个属性。

有没有办法让这个代码更有效率?

经过几次尝试,我使用它mapPartition,这个方法是一个类的一部分InformationTheory,它做一些计算来计算熵,互信息等。所以,例如,SymmetricalUncertainty计算如下:

/**

  • Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
    *
  • It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
    *
  • @param xy [[DataSet]] with two features
  • @return SU value
    */

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

val su = xy.mapPartitionWith {
  case in ⇒
    val x = in map (_._2)
    val y = in map (_._1)

    val mu = mutualInformation(x, y)
    val Hx = entropy(x)
    val Hy = entropy(y)

    Some(2 * mu / (Hx + Hy))
}

su.collect.head.head

}
有了这个,我可以有效地计算entropy,互信息等。问题是,它只适用于1的并行度,问题在于mapPartition。

有没有办法可以做一些类似于我在这里做的事情SymmetricalUncertainty,但是有什么级别的并行性?

展开
收起
flink小助手 2018-12-10 13:58:39 1336 0
1 条回答
写回答
取消 提交回答
  • flink小助手
    flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    它使用n级并行:

    def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

    val su = xy.reduceGroup { in ⇒
        val invec = in.toVector
        val x = invec map (_._2)
        val y = invec map (_._1)
    
        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)
    
        2 * mu / (Hx + Hy)
    }
    
    su.collect.head

    }
    您可以在InformationTheory.scala及其测试InformationTheorySpec.scala中查看整个代码

    2019-07-17 23:19:15
    赞同 展开评论 打赏
问答分类:
问答标签:
问答地址:
问答排行榜
最热
最新
相关电子书
更多
Apache Flink 流式应用中状态的数据结构定义升级
立即下载
阿里云流计算 Flink SQL 核心功能解密
立即下载
阿里云流计算Flink SQL核心功能解密
立即下载