开发者社区> 问答> 正文

确定哪个对象在Apache-Flink中不可序列化

我正在写一个Flink转换器,我有一个Histogram具有以下属性的自定义对象:

case class Histogram(
nRows: Int,
nCols: Int,
min: Int,
step: Double,
private val countMatrix: Array[ArrayBuffer[Double]],
private val cutMatrixL1: Array[ArrayBuffer[Double]],
val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
extends Serializable {

???

}
这是我的FitOperation:

implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {

override def fit(
                  instance: PIDiscretizerTransformer,
                  fitParameters: ParameterMap,
                  input: DataSet[LabeledVector]): Unit = {

  // get params...

  val metric = input.map { x ⇒
    // (instance, histrogram totalCount)
    (x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
  }.reduce { (m1, m2) ⇒
    // Update Layer 1
    val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)

    //         Update Layer 2 if neccesary
    val updatedL2 = if (m1._3 % l2updateExamples == 0) {
      updateL2(m1._1, updatedL1)
    } else updatedL1

    (m2._1, updatedL2, m1._3 + 1)
  }.map(_._2)

  //      instance.metricsOption = Some(metric)
}

}
这很好用,但如果我取消注释最后一行:instance.metricsOption = Some(metric)我得到一个java.io.NotSerializableException: org.apache.flink.api.scala.DataSet

我怎么能找到班上哪个Histogram类导致的问题?据我所知,ArrayBuffer可序列化,Map也是如此。虽然我发现了这个问题:

地图无法在scala中序列化?

这说明.mapValues不可序列化,但我没有.在任何地方使用mapValues。

展开
收起
社区小助手 2018-12-11 16:12:27 3951 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    问题是你指的是你的MapFunction里的instance.step。instance是PIDiscretizerTransformer无法序列化的类型。因此,您需要计算步骤之外的步骤MapFunction并将值传递给函数。然后你的程序应该是可序列化的。

    2019-07-17 23:19:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像