"在开始使用Spark之后,我试图熟悉Flink的语义。我想DataSet[IndexNode]在HDFS中写一个持久存储,以便以后可以通过另一个进程读取它。Spark有一个ObjectFile提供这种功能的简单API,但我在Flink中找不到类似的选项。
case class IndexNode(vec: Vector[IndexNode],
id: Int) extends Serializable {
// Getters and setters etc. here
}
内置接收器倾向于基于该toString方法序列化实例,由于该类的嵌套结构,这在这里不适合。我想解决方案是使用a FileOutputFormat并将实例转换为字节流。但是,我不确定如何序列化矢量,它具有任意长度并且可以有很多级别。
"
"您可以使用SerializedOutputFormat和实现此目的SerializedInputFormat。
请尝试以下步骤:
请IndexNode延长IOReadableWritable从flink接口。制作不可分割的字段@transient。实施write(DataOutputView out)和read(DataInputView in)方法。write方法将写出所有数据IndexNode,read方法将读取它们并构建所有内部数据字段。例如,我不是从类中的arr字段序列化所有数据,而是Result将所有值写出,然后将它们读回并以read方法重建数组。
class Result(var name: String, var count: Int) extends IOReadableWritable {
@transient
var arr = Array(count, count)
def this() {
this("""", 1)
}
override def write(out: DataOutputView): Unit = {
out.writeInt(count)
out.writeUTF(name)
}
override def read(in: DataInputView): Unit = {
count = in.readInt()
name = in.readUTF()
arr = Array(count, count)
}
override def toString: String = s""$name, $count, ${getArr}""
}
写出数据
myDataSet.write(new SerializedOutputFormat[Result], ""/tmp/test"")
并用它读回来
env.readFile(new SerializedInputFormat[Result], ""/tmp/test"")"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。