开发者社区> 问答> 正文

scala如何参数化case类,并将case类变量传递给[T <:Product:TypeTag]

// class definition of RsGoods schema
case class RsGoods(add_time: Int)

// my operation
originRDD.toDF[Schemas.RsGoods]()

// and the function definition
def toDF[T <: Product: TypeTag](): DataFrame = mongoSpark.toDF[T]()
现在我定义了太多的模式(RsGoods1,RsGoods2,RsGoods3),将来还会添加更多模式。

所以问题是如何将case类作为变量传递给结构代码

附加sbt依赖

"org.apache.spark" % "spark-core_2.11" % "2.3.0",
"org.apache.spark" %% "spark-sql" % "2.3.0",
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1",
附上密钥代码段

var originRDD = MongoSpark.load(sc, readConfig)
val df = table match {

case "rs_goods_multi" => originRDD.toDF[Schemas.RsGoodsMulti]()
case "rs_goods" => originRDD.toDF[Schemas.RsGoods]()
case "ma_item_price" => originRDD.toDF[Schemas.MaItemPrice]()
case "ma_siteuid" => originRDD.toDF[Schemas.MaSiteuid]()
case "pi_attribute" => originRDD.toDF[Schemas.PiAttribute]()
case "pi_attribute_name" => originRDD.toDF[Schemas.PiAttributeName]()
case "pi_attribute_value" => originRDD.toDF[Schemas.PiAttributeValue]()
case "pi_attribute_value_name" => originRDD.toDF[Schemas.PiAttributeValueName]()

展开
收起
社区小助手 2018-12-21 11:50:20 2561 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    根据我对你的要求的理解,我认为以下应该是一个不错的起点。

    def readDataset[A: Encoder](
    spark: SparkSession,
    mongoUrl: String,
    collectionName: String,
    clazz: Class[A]
    ): Dataset[A] = {
    val config = ReadConfig(

    Map("uri" -> s"$mongoUrl.$collectionName")

    )

    val df = MongoSpark.load(spark, config)

    val fieldNames = clazz.getDeclaredFields.map(f => f.getName).dropRight(1).toList

    val dfWithMatchingFieldNames = df.toDf(fieldNames: _*)

    dfWithMatchingFieldNames.as[A]
    }
    你可以像这样使用它,

    case class RsGoods(add_time: Int)

    val spark: SparkSession = ...

    import spark.implicts._

    val rdGoodsDS = readDataset[RsGoods](
    spark,
    "mongodb://example.com/database",
    "rs_goods",
    classOf[RsGoods]
    )
    另外,以下两行,

    val fieldNames = clazz.getDeclaredFields.map(f => f.getName).dropRight(1).toList

    val dfWithMatchingFieldNames = df.toDf(fieldNames: _*)
    只需要因为Spark通常用列名来读取DataFrames value1, value2, ...。所以我们想要更改列名以匹配我们的列名case class。

    我不确定这些“defalut”列名是什么,因为MongoSpark参与其中。

    您应该首先检查创建的df中的列名,如下所示,

    val config = ReadConfig(
    Map("uri" -> s"$mongoUrl.$collectionName")
    )

    val df = MongoSpark.load(spark, config)
    如果,MongoSpark解决了这些“默认”列名称的问题并从您的集合中选择了库存名称,则不需要这两行,您的方法将变为这样,

    def readDataset[A: Encoder](
    spark: SparkSession,
    mongoUrl: String,
    collectionName: String,
    ): Dataset[A] = {
    val config = ReadConfig(

    Map("uri" -> s"$mongoUrl.$collectionName")

    )

    val df = MongoSpark.load(spark, config)

    df.as[A]
    }
    和,

    val rsGoodsDS = readDataset[RsGoods](
    spark,
    "mongodb://example.com/database",
    "rs_goods"
    )

    2019-07-17 23:23:20
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Just Enough Scala for Spark 立即下载
JDK8新特性与生产-for“华东地区scala爱好者聚会” 立即下载
低代码开发师(初级)实战教程 立即下载