开发者社区> 问答> 正文

了解Flink - 任务不可序列化

flink小助手 2018-12-10 11:30:13 630

我正在研究一个Flink项目,并遇到了一个问题,我在Stackoverflow的帮助下设法解决了这个问题。但是,我不清楚为什么提出的解决方案实际可行,我发现有关该主题的信息很少。请考虑以下代码:

object DeCP {
def main(args: Array[String]): Unit = {

val params: ParameterTool = ParameterTool.fromArgs(args)

// Get the execution environment and read the data
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val queryPoints: DataSet[Point] = readQueryPoints(env, params)
val points: DataSet[Point] = readFeatureVector(env, params)

// Process the query points
queryPoints
  .map(new KNNRich)
  .withBroadcastSet(points, "pointsIn")
  .print

}

final class KNNRich extends RichMapFunction[Point, (Point, Vector[Point])]{

private var pointsIn: Traversable[Point] = _

override def open(parameters: Configuration): Unit =
  pointsIn = getRuntimeContext.getBroadcastVariable[Point]("pointsIn").asScala

def map(queryPoint: Point): (Point, Vector[Point]) = {
  val dataSetIn = ExecutionEnvironment.getExecutionEnvironment
                                      .fromCollection(pointsIn.toVector)
  val cluster = new Cluster(dataSetIn, queryPoint)
  val knn = cluster.kNearestNeighbor(queryPoint, 3) // This call causes problems
  (queryPoint, knn.collect.toVector)
}

}
}
Cluster类和伴随对象定义为:

class Cluster(var points: DataSet[Point],

          var clusterLeader: Point) extends Serializable {

private var queryPoint: Point = _

def distance(p: Point): Point = {

p.eucDist(queryPoint)

}

def kNearestNeighbor(queryPoint: Point, k: Int): DataSet[Point] = {

this.queryPoint = queryPoint

this.points.map{p => distance(p)} // Task not serializable
this.points.map{p => p.eucDist(queryPoint)} // Works
this.points.map{p => Cluster.staticDistance(queryPoint, p)} // Works

}
}

object Cluster {
def staticDistance(queryPoint: Point, p: Point): Point = {

p.eucDist(queryPoint)

}
}
对distance方法的调用导致任务不可序列化异常,但用定义替换方法调用可以解决问题。类似地,定义与伴随对象的成员完全相同的方法允许代码正常运行。

为什么第一个call不起作用,但另外两个call有效?如果你在类上有一个更复杂的执行流程会发生什么,这不容易被替换为伴随对象上的方法?

流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:08

    通过执行DataSet转换,您只需创建管道的逻辑计划。管道通过调用提交给集群execute/print/collect。

    当管道提交到集群时,诸如您的每个功能都RichMapFunction被序列化,发送到集群,为每个并行实例复制,并独立执行。当您获得“Task not serializable”异常时,这意味着您RichMapFunction正在传递此类之外的变量/对象。您应该确保函数是一个独立的块。

    通过调用points.map{}你隐式创建一个MapFunction。但这MapFunction有一个参考实例,Cluster因而不是独立的。Flink尝试序列化Cluster但失败了。如果distance是静态的(在伴随对象中定义),那么Cluster也不需要序列化。

    顺便说一下,您的示例的另一个问题是您没有按预期使用DataSet API。通常,您不应在正在运行的管道中创建管道。

    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题