我正在构建一个使用Kafka主题的Spark SQL应用程序,转换一些数据,然后使用特定的JSON对象写回单独的Kafka主题。
现在我能够查询/转换我想要的内容并编写它:
Dataset reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "new_separate_topic")
.save();
这会产生如下记录:
{
"record_count": 989
}
我需要的是,这一点JSON是一个更大的JSON对象的有效负载(子)属性,我们将其用作我们的微服务的标准消费者对象。
我想写的主题实际上是这样的:
{
"id": "ABC123",
"timestamp": "2018-11-16 20:40:26.108",
"user": "DEF456",
"type": "new_entity",
"data": {
"record_count": 989
}
}
此外,“id”,“user”和“type”字段将从外部填充 - 它们将来自触发整个过程的原始Kafka消息。基本上,我需要为我想写入Kafka的元数据/对象注入一些值,并将“data”字段设置为上面的Spark SQL查询的结果。
如果要添加新字段,则不能只选择一个字段。
例如,write.format("kafka")和之间.select(),你需要做类似的事情withColumn()
Dataset reader = myData.getRecordCount();
// Keep your DataSet as Columns
reader = reader.select("record_count"))
// Add more data
reader = reader.withColumn(...)
// Then convert structs to JSON and write the output
reader.select(to_json(...))
.write()
.format("kafka")
“id”,“user”和“type”字段将从外部填充 - 它们将来自触发整个过程的原始Kafka消息
然后你需要包含select("id", "user", "type")在你的代码中
另一种选择是使用Kafka Streams而不是强制使用DataSets,您可以使用实际的Java类/ JSONObjects
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。