优化Flink转换-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

优化Flink转换

2018-12-10 13:58:39 1124 1

我有以下方法来计算一个值的概率DataSet:

/**

  • Compute the probabilities of each value on the given [[DataSet]]
    *
  • @param x single colum [[DataSet]]
  • @return Sequence of probabilites for each value
    */

private[this] def probs(x: DataSet[Double]): Seq[Double] = {

    val counts = x.groupBy(_.doubleValue)
      .reduceGroup(_.size.toDouble)
      .name("X Probs")
      .collect

    val total = counts.sum

    counts.map(_ / total)

}
问题是,当我提交我的flink作业时,使用这种方法,它会导致flink因任务而终止作业TimeOut。我正在为a上的每个属性执行此方法DataSet,只有40.000个实例和9个属性。

有没有办法让这个代码更有效率?

经过几次尝试,我使用它mapPartition,这个方法是一个类的一部分InformationTheory,它做一些计算来计算熵,互信息等。所以,例如,SymmetricalUncertainty计算如下:

/**

  • Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
    *
  • It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
    *
  • @param xy [[DataSet]] with two features
  • @return SU value
    */

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

val su = xy.mapPartitionWith {
  case in ⇒
    val x = in map (_._2)
    val y = in map (_._1)

    val mu = mutualInformation(x, y)
    val Hx = entropy(x)
    val Hy = entropy(y)

    Some(2 * mu / (Hx + Hy))
}

su.collect.head.head

}
有了这个,我可以有效地计算entropy,互信息等。问题是,它只适用于1的并行度,问题在于mapPartition。

有没有办法可以做一些类似于我在这里做的事情SymmetricalUncertainty,但是有什么级别的并行性?

取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:15

    它使用n级并行:

    def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {

    val su = xy.reduceGroup { in ⇒
        val invec = in.toVector
        val x = invec map (_._2)
        val y = invec map (_._1)
    
        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)
    
        2 * mu / (Hx + Hy)
    }
    
    su.collect.head

    }
    您可以在InformationTheory.scala及其测试InformationTheorySpec.scala中查看整个代码

    0 0
相关问答

5

回答

Spark 【问答合集】

社区小助手 2019-05-29 14:13:40 129510浏览量 回答数 5

38

回答

阿里官方Java代码规范标准《阿里巴巴Java开发手册》下载

管理贝贝 2017-02-10 15:14:36 79461浏览量 回答数 38

2

回答

Blink计算引擎 【精品问答集锦】

管理贝贝 2016-07-21 17:03:19 29346浏览量 回答数 2

3

回答

Kafka、ActiveMQ、RabbitMQ、RocketMQ的区别?【Java问答学堂】19期

剑曼红尘 2020-05-15 11:24:19 37169浏览量 回答数 3

1

回答

在flink集群模式下,能不能指定某个节点的solt来执行一个task?

孙goku 2019-07-01 15:19:09 116624浏览量 回答数 1

8

回答

flink sql 支持checkpoints吗?

游客izljdlkgbdwfc 2019-07-10 17:46:37 125682浏览量 回答数 8

2

回答

云效平台——基于jmeter的轻量级性能测试平台

云效平台 2016-06-08 11:27:28 29803浏览量 回答数 2

10

回答

【6.27更新】你不能错过的:阿里中间件性能挑战赛重要学习资料

凝岚 2016-04-16 15:26:05 33282浏览量 回答数 10

2

回答

Apache Flink常见问题汇总【精品问答】

黄一刀 2020-05-19 17:51:47 51062浏览量 回答数 2

56

回答

Flink Forward Asia 2021 有奖问答

阿里云实时计算Flink 2021-12-29 17:30:44 468264浏览量 回答数 56
+关注
flink小助手
flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。
0
文章
377
问答
问答排行榜
最热
最新
相关电子书
更多
OceanBase 入门到实战教程
立即下载
阿里云图数据库GDB,加速开启“图智”未来.ppt
立即下载
实时数仓Hologres技术实战一本通2.0版(下)
立即下载