开发者社区> 问答> 正文

在Apache Flink中将自定义类写入HDFS

"在开始使用Spark之后,我试图熟悉Flink的语义。我想DataSet[IndexNode]在HDFS中写一个持久存储,以便以后可以通过另一个进程读取它。Spark有一个ObjectFile提供这种功能的简单API,但我在Flink中找不到类似的选项。

case class IndexNode(vec: Vector[IndexNode],

                 id: Int) extends Serializable {

// Getters and setters etc. here
}
内置接收器倾向于基于该toString方法序列化实例,由于该类的嵌套结构,这在这里不适合。我想解决方案是使用a FileOutputFormat并将实例转换为字节流。但是,我不确定如何序列化矢量,它具有任意长度并且可以有很多级别。
"

展开
收起
flink小助手 2018-11-28 15:56:09 5255 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "您可以使用SerializedOutputFormat和实现此目的SerializedInputFormat。

    请尝试以下步骤:

    请IndexNode延长IOReadableWritable从flink接口。制作不可分割的字段@transient。实施write(DataOutputView out)和read(DataInputView in)方法。write方法将写出所有数据IndexNode,read方法将读取它们并构建所有内部数据字段。例如,我不是从类中的arr字段序列化所有数据,而是Result将所有值写出,然后将它们读回并以read方法重建数组。

    class Result(var name: String, var count: Int) extends IOReadableWritable {

    @transient
    var arr = Array(count, count)

    def this() {

    this("""", 1)

    }

    override def write(out: DataOutputView): Unit = {

    out.writeInt(count)
    out.writeUTF(name)

    }

    override def read(in: DataInputView): Unit = {

    count = in.readInt()
    
    name = in.readUTF()
    
    arr = Array(count, count)

    }

    override def toString: String = s""$name, $count, ${getArr}""

    }
    写出数据

    myDataSet.write(new SerializedOutputFormat[Result], ""/tmp/test"")
    并用它读回来

    env.readFile(new SerializedInputFormat[Result], ""/tmp/test"")"

    2019-07-17 23:16:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像