// class definition of RsGoods schema
case class RsGoods(add_time: Int)
// my operation
originRDD.toDF[Schemas.RsGoods]()
// and the function definition
def toDF[T <: Product: TypeTag](): DataFrame = mongoSpark.toDF[T]()
现在我定义了太多的模式(RsGoods1,RsGoods2,RsGoods3),将来还会添加更多模式。
所以问题是如何将case类作为变量传递给结构代码
附加sbt依赖
"org.apache.spark" % "spark-core_2.11" % "2.3.0",
"org.apache.spark" %% "spark-sql" % "2.3.0",
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1",
附上密钥代码段
var originRDD = MongoSpark.load(sc, readConfig)
val df = table match {
case "rs_goods_multi" => originRDD.toDF[Schemas.RsGoodsMulti]()
case "rs_goods" => originRDD.toDF[Schemas.RsGoods]()
case "ma_item_price" => originRDD.toDF[Schemas.MaItemPrice]()
case "ma_siteuid" => originRDD.toDF[Schemas.MaSiteuid]()
case "pi_attribute" => originRDD.toDF[Schemas.PiAttribute]()
case "pi_attribute_name" => originRDD.toDF[Schemas.PiAttributeName]()
case "pi_attribute_value" => originRDD.toDF[Schemas.PiAttributeValue]()
case "pi_attribute_value_name" => originRDD.toDF[Schemas.PiAttributeValueName]()
根据我对你的要求的理解,我认为以下应该是一个不错的起点。
def readDataset[A: Encoder](
spark: SparkSession,
mongoUrl: String,
collectionName: String,
clazz: Class[A]
): Dataset[A] = {
val config = ReadConfig(
Map("uri" -> s"$mongoUrl.$collectionName")
)
val df = MongoSpark.load(spark, config)
val fieldNames = clazz.getDeclaredFields.map(f => f.getName).dropRight(1).toList
val dfWithMatchingFieldNames = df.toDf(fieldNames: _*)
dfWithMatchingFieldNames.as[A]
}
你可以像这样使用它,
case class RsGoods(add_time: Int)
val spark: SparkSession = ...
import spark.implicts._
val rdGoodsDS = readDataset[RsGoods](
spark,
"mongodb://example.com/database",
"rs_goods",
classOf[RsGoods]
)
另外,以下两行,
val fieldNames = clazz.getDeclaredFields.map(f => f.getName).dropRight(1).toList
val dfWithMatchingFieldNames = df.toDf(fieldNames: _*)
只需要因为Spark通常用列名来读取DataFrames value1, value2, ...。所以我们想要更改列名以匹配我们的列名case class。
我不确定这些“defalut”列名是什么,因为MongoSpark参与其中。
您应该首先检查创建的df中的列名,如下所示,
val config = ReadConfig(
Map("uri" -> s"$mongoUrl.$collectionName")
)
val df = MongoSpark.load(spark, config)
如果,MongoSpark解决了这些“默认”列名称的问题并从您的集合中选择了库存名称,则不需要这两行,您的方法将变为这样,
def readDataset[A: Encoder](
spark: SparkSession,
mongoUrl: String,
collectionName: String,
): Dataset[A] = {
val config = ReadConfig(
Map("uri" -> s"$mongoUrl.$collectionName")
)
val df = MongoSpark.load(spark, config)
df.as[A]
}
和,
val rsGoodsDS = readDataset[RsGoods](
spark,
"mongodb://example.com/database",
"rs_goods"
)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。