开发者社区> 问答> 正文

根据Scala flink中的另一个DataSet过滤DataSet

我试图复制这个python代码:

cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])
其中x和y是向量,并且uy是y例如的唯一值0,1。

在flink中,我有:

val uy = y.distinct.collect
val condHx = for (i ← uy)

yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))

然而,它似乎filterWithBcVariable并没有采取任何价值y,它只需要第一个。

我也尝试过:

for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)
但是我的内存耗尽了。

我怎样才能x根据值进行过滤y?

x.zip(y)会有类似的事情,但它不受支持。

有任何想法吗?

展开
收起
社区小助手 2018-12-11 15:51:19 2267 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    我提出了一个解决方案,可能不是最好的,但至少它工作。

    现在,我没有传递x和y分离DataSets,而是传递一个DataSet[LabeledVector]只有一列:

    val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))
    然后我传递xy给我的函数:

    def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {

    // Get the label
    val y = xy map (_.label)
    // Get probs for the label
    val p = probs(y).toArray.asBreeze
    // Get unique values in label
    val values = y.distinct.collect
    // Compute Conditional Entropy
    val condH = for (i ← values)
      yield entropy(xy.filter(_.label == i))
    p.dot(seq2Breeze(condH))

    }

    2019-07-17 23:19:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载