开发者社区> 问答> 正文

Spark:优化DataFrame,将其写入SQL 服务器

Spark:优化DataFrame,将其写入SQL 服务器

展开
收起
贺贺_ 2019-12-17 13:33:58 601 0
1 条回答
写回答
取消 提交回答
  • 我们使用Azure-sqldb-spark库,而不是 Spark 的默认内置导出功能。此库为您提供了一个bulkCopyToSqlDB方法,它是一个真正的批处理插入,并且速度要快得多。与内置功能相比,使用起来不太实用,但根据我的经验,它仍然是值得的。 我们或多或少地使用它像这样:

    import com.microsoft.azure.sqldb.spark.config.Config
    import com.microsoft.azure.sqldb.spark.connect._
    import com.microsoft.azure.sqldb.spark.query._
    
    val options = Map(
      "url"          -> "***",
      "databaseName" -> "***",
      "user"         -> "***",
      "password"     -> "***",
      "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    )
    
    // first make sure the table exists, with the correct column types
    // and is properly cleaned up if necessary
    val query = dropAndCreateQuery(df, "myTable")
    val createConfig = Config(options ++ Map("QueryCustom" -> query))
    spark.sqlContext.sqlDBQuery(createConfig)
    
    val bulkConfig = Config(options ++ Map(
      "dbTable"           -> "myTable",
      "bulkCopyBatchSize" -> "20000",
      "bulkCopyTableLock" -> "true",
      "bulkCopyTimeout"   -> "600"
    ))
    
    df.bulkCopyToSqlDB(bulkConfig)
    
    

    如您所见,我们自行生成查询。您可以让库创建表,但它只是执行TABLEdataFrame.limit(0).write.sqlDB(config),可能需要缓存 DataFrame, 并且它不允许您选择SaveMode 可能还有可能很有趣:当将此库添加到 sbt 生成时,我们必须使用ExclusionRule ,否则assembly任务将失败。

    libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
      ExclusionRule(organization = "org.apache.spark")
    )
    
    2019-12-17 13:41:34
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
如何运维千台以上游戏云服务器 立即下载
网站/服务器取证 实践与挑战 立即下载
ECS快储存加密技术 立即下载