开发者社区> 问答> 正文

在Apache Flink中将自定义类写入HDFS

flink小助手 2018-11-28 15:56:09 1087

"在开始使用Spark之后,我试图熟悉Flink的语义。我想DataSet[IndexNode]在HDFS中写一个持久存储,以便以后可以通过另一个进程读取它。Spark有一个ObjectFile提供这种功能的简单API,但我在Flink中找不到类似的选项。

case class IndexNode(vec: Vector[IndexNode],

                 id: Int) extends Serializable {

// Getters and setters etc. here
}
内置接收器倾向于基于该toString方法序列化实例,由于该类的嵌套结构,这在这里不适合。我想解决方案是使用a FileOutputFormat并将实例转换为字节流。但是,我不确定如何序列化矢量,它具有任意长度并且可以有很多级别。
"

存储 分布式计算 API Apache 流计算 Spark
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:16:48

    "您可以使用SerializedOutputFormat和实现此目的SerializedInputFormat。

    请尝试以下步骤:

    请IndexNode延长IOReadableWritable从flink接口。制作不可分割的字段@transient。实施write(DataOutputView out)和read(DataInputView in)方法。write方法将写出所有数据IndexNode,read方法将读取它们并构建所有内部数据字段。例如,我不是从类中的arr字段序列化所有数据,而是Result将所有值写出,然后将它们读回并以read方法重建数组。

    class Result(var name: String, var count: Int) extends IOReadableWritable {

    @transient
    var arr = Array(count, count)

    def this() {

    this("""", 1)

    }

    override def write(out: DataOutputView): Unit = {

    out.writeInt(count)
    out.writeUTF(name)

    }

    override def read(in: DataInputView): Unit = {

    count = in.readInt()
    
    name = in.readUTF()
    
    arr = Array(count, count)

    }

    override def toString: String = s""$name, $count, ${getArr}""

    }
    写出数据

    myDataSet.write(new SerializedOutputFormat[Result], ""/tmp/test"")
    并用它读回来

    env.readFile(new SerializedInputFormat[Result], ""/tmp/test"")"

    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题
推荐课程