改一把spark-cassandra-connector 的CassandraSourceRelation.scala
Schema.fromCassandra(connector, Some(tableRef.keyspace), Some(tableRef.table)).tables.headOption match {
case Some(t) => t
case None => createKeyspaceAndTable(connector, tableRef, userSpecifiedSchema)
}
/**
* create keyspace and table if not exists
* @param connector
* @param tableRef
* @param userSpecifiedSchema
* @return
*/
private def createKeyspaceAndTable (connector: CassandraConnector, tableRef: TableRef, userSpecifiedSchema: Option[StructType]) = {
try {
connector.withSessionDo {
val keyspace = quote(tableRef.keyspace)
val table = quote(tableRef.table)
val structType: StructType = userSpecifiedSchema.get
val builder = new StringBuilder
builder.append("CREATE TABLE IF NOT EXISTS ")
builder.append(keyspace)
builder.append(".")
builder.append(table)
builder.append(" (")
builder.append(structType.sql.replace("STRUCT<", "").replace(">", "").replace("`", "\"").replace(":", "").replace("STRING", "TEXT"))
builder.append(",PRIMARY KEY ((")
// val fieldsNames = structType.fieldNames
// for (i <- 0 until fieldsNames.length) {
// builder.append(fieldsNames(i))
// if (i < fieldsNames.length - 1) {
// builder.append(",")
// }
// }
val fields = structType.fields
val partitionKeyColumns: ArrayBuffer[String] = ArrayBuffer[String]() //partition keys array
val clusteringColumns: ArrayBuffer[String] = ArrayBuffer[String]() //clustering keys array
var firstFieldName = fields(0).name //first column's name. Used as default primary key when user not specified partition key or clustering key
for(i <- 0 until fields.length) {
val comment = fields(i).getComment()
//fetch first column name to be used as default primary key
if(0 == structType.getFieldIndex(fields(i).name).get) {
firstFieldName = fields(i).name
}
//fetch partition key
if(comment.getOrElse("").contains("_pk")) {
partitionKeyColumns += fields(i).name
}
else if(comment.getOrElse("").contains("_ck")) {
clusteringColumns += fields(i).name //fetch clustering key
}
}
if(partitionKeyColumns.size <=0 && clusteringColumns.size <= 0) {
builder.append(firstFieldName)
builder.append(")")
}
else if(partitionKeyColumns.size <=0 && clusteringColumns.size > 0) {
throw new IOException("Please specify partition key")
}
else {
for(i <- 0 until partitionKeyColumns.size) {
builder.append(partitionKeyColumns(i))
if(i != partitionKeyColumns.size -1) {
builder.append(",")
}
}
builder.append(")")
if(clusteringColumns.size > 0) {
builder.append(",")
for(i <- 0 until clusteringColumns.size) {
builder.append(clusteringColumns(i))
if(i != clusteringColumns.size -1) {
builder.append(",")
}
}
}
}
builder.append("))")
session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
session.execute(builder.toString())
}
}catch {
// case e:NoSuchElementException => throw new IOException("To create a table, fields definition need to be provided")
case e:NoSuchElementException => e.printStackTrace()
}
}
先查一把,如果没有则创建
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里云NoSQL数据库提供了一种灵活的数据存储方式,可以支持各种数据模型,包括文档型、图型、列型和键值型。此外,它还提供了一种分布式的数据处理方式,可以支持高可用性和容灾备份。包含Redis社区版和Tair、多模数据库 Lindorm、MongoDB 版。