开发者社区> 问答> 正文

Apache Spark:Kafka以自定义格式编写

我正在构建一个使用Kafka主题的Spark SQL应用程序,转换一些数据,然后使用特定的JSON对象写回单独的Kafka主题。

现在我能够查询/转换我想要的内容并编写它:

Dataset reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))

.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "new_separate_topic")
.save();

这会产生如下记录:

{
"record_count": 989
}
我需要的是,这一点JSON是一个更大的JSON对象的有效负载(子)属性,我们将其用作我们的微服务的标准消费者对象。

我想写的主题实际上是这样的:

{
"id": "ABC123",
"timestamp": "2018-11-16 20:40:26.108",
"user": "DEF456",
"type": "new_entity",
"data": {

  "record_count": 989
}

}
此外,“id”,“user”和“type”字段将从外部填充 - 它们将来自触发整个过程的原始Kafka消息。基本上,我需要为我想写入Kafka的元数据/对象注入一些值,并将“data”字段设置为上面的Spark SQL查询的结果。

展开
收起
社区小助手 2018-12-12 14:25:31 2809 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    如果要添加新字段,则不能只选择一个字段。

    例如,write.format("kafka")和之间.select(),你需要做类似的事情withColumn()

    Dataset reader = myData.getRecordCount();
    // Keep your DataSet as Columns
    reader = reader.select("record_count"))

    // Add more data
    reader = reader.withColumn(...)

    // Then convert structs to JSON and write the output
    reader.select(to_json(...))

    .write()
    .format("kafka")

    “id”,“user”和“type”字段将从外部填充 - 它们将来自触发整个过程的原始Kafka消息

    然后你需要包含select("id", "user", "type")在你的代码中

    另一种选择是使用Kafka Streams而不是强制使用DataSets,您可以使用实际的Java类/ JSONObjects

    2019-07-17 23:20:12
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关镜像