spark根据df动态创建cassandra的表怎么搞?-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

spark根据df动态创建cassandra的表怎么搞?

小六码奴 2019-05-30 14:53:10 868

spark根据df动态创建cassandra的表怎么搞?

分布式计算 NoSQL Spark
分享到
取消 提交回答
全部回答(1)
  • 小六码奴
    2019-07-17 23:36:24

    改一把spark-cassandra-connector 的CassandraSourceRelation.scala
    Schema.fromCassandra(connector, Some(tableRef.keyspace), Some(tableRef.table)).tables.headOption match {

    case Some(t) => t
    case None => createKeyspaceAndTable(connector, tableRef, userSpecifiedSchema)

    }

    /**

    * create keyspace and table if not exists
    * @param connector
    * @param tableRef
    * @param userSpecifiedSchema
    * @return
    */

    private def createKeyspaceAndTable (connector: CassandraConnector, tableRef: TableRef, userSpecifiedSchema: Option[StructType]) = {

    try {
      connector.withSessionDo {
        val keyspace = quote(tableRef.keyspace)
        val table = quote(tableRef.table)
        val structType: StructType = userSpecifiedSchema.get
        val builder = new StringBuilder
        builder.append("CREATE TABLE IF NOT EXISTS ")
        builder.append(keyspace)
        builder.append(".")
        builder.append(table)
        builder.append(" (")
        builder.append(structType.sql.replace("STRUCT<", "").replace(">", "").replace("`", "\"").replace(":", "").replace("STRING", "TEXT"))
        builder.append(",PRIMARY KEY ((")

    // val fieldsNames = structType.fieldNames
    // for (i <- 0 until fieldsNames.length) {
    // builder.append(fieldsNames(i))
    // if (i < fieldsNames.length - 1) {
    // builder.append(",")
    // }
    // }

        val fields = structType.fields
        val partitionKeyColumns: ArrayBuffer[String] = ArrayBuffer[String]()  //partition keys array
        val clusteringColumns: ArrayBuffer[String] = ArrayBuffer[String]()    //clustering keys array
        var firstFieldName = fields(0).name     //first column's name. Used as default primary key when user not specified partition key or clustering key
    
        for(i <- 0 until fields.length) {
          val comment = fields(i).getComment()
          //fetch first column name to be used as default primary key
          if(0 == structType.getFieldIndex(fields(i).name).get) {
            firstFieldName = fields(i).name
          }
          //fetch partition key
          if(comment.getOrElse("").contains("_pk")) {
            partitionKeyColumns += fields(i).name
          }
          else if(comment.getOrElse("").contains("_ck")) {
            clusteringColumns += fields(i).name         //fetch clustering key
          }
        }
        if(partitionKeyColumns.size <=0 && clusteringColumns.size <= 0) {
          builder.append(firstFieldName)
          builder.append(")")
        }
        else if(partitionKeyColumns.size <=0 && clusteringColumns.size > 0) {
          throw new IOException("Please specify partition key")
        }
        else {
          for(i <- 0 until partitionKeyColumns.size) {
            builder.append(partitionKeyColumns(i))
            if(i != partitionKeyColumns.size -1) {
              builder.append(",")
            }
          }
          builder.append(")")
          if(clusteringColumns.size > 0) {
            builder.append(",")
            for(i <- 0 until clusteringColumns.size) {
              builder.append(clusteringColumns(i))
              if(i != clusteringColumns.size -1) {
                builder.append(",")
              }
            }
          }
        }
        builder.append("))")
    
        session =>
          session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
          session.execute(builder.toString())
      }
    }catch {

    // case e:NoSuchElementException => throw new IOException("To create a table, fields definition need to be provided")

      case e:NoSuchElementException => e.printStackTrace()
    }

    }
    先查一把,如果没有则创建

    0 0
中国Cassandra技术社区
使用钉钉扫一扫加入圈子
+ 订阅

Cassandra已有10年+的沉淀,基于Amazon DynamoDB的分布式设计和 Google Bigtable 的数据模型。具备诸多优异特性:采用分布式架构、无中心、支持多活、弹性可扩展、高可用、容错、一致性可调、提供类SQL查询语言CQL等。Cassandra为互联网业务而生,已在全球广大互联网公司有成熟应用,是目前最流行的宽表数据库。阿里云在2019年8月份全球首发云Cassandra服务。

推荐文章
相似问题
相关链接