
关注HBase和Spark
Apache Spark Delta Lake 删除使用及实现原理代码解析 Delta Lake 的 Delete 功能是由 0.3.0 版本引入的。在介绍 Apache Spark Delta Lake 实现逻辑之前,我们先来看看如何使用 delete 这个功能。 Delta Lake 删除使用 Delta Lake 的官方文档为我们提供如何使用 Delete 的几个例子,参见这里,如下: import io.delta.tables._ val iteblogDeltaTable = DeltaTable.forPath(spark, path) // 删除 id 小于 4 的数据 iteblogDeltaTable.delete("id <= '4'") import org.apache.spark.sql.functions._ import spark.implicits._ iteblogDeltaTable.delete($"id" <= "4") // 删除所有的数据 iteblogDeltaTable.delete() 执行上面的 Delete 命令,如果确实删除了相应的数据,Delta Lake 会生成一个事务日志,内容类似下面的: {"commitInfo":{"timestamp":1566978478414,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` <= CAST('4' AS BIGINT))\"]"},"readVersion":10,"isBlindAppend":false}} {"remove":{"path":"dt=20190801/part-00000-ca73a0f4-fbeb-4ea8-9b9f-fa466a85724e.c000.snappy.parquet","deletionTimestamp":1566978478405,"dataChange":true}} {"remove":{"path":"dt=20190803/part-00000-8e11f4cc-a7ac-47a1-8ce6-b9d87eaf6c51.c000.snappy.parquet","deletionTimestamp":1566978478405,"dataChange":true}} {"add":{"path":"dt=20190801/part-00001-6ff11be3-22db-4ed2-bde3-a97d610fe11d.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566978478000,"dataChange":true}} 事务日志里面详细介绍了 Delete 执行的时间、删除的条件、需要删除的文件以及添加的文件等。注意:执行 Delete 的时候,真实的数据其实并没有删除,只是在事务日志里面记录了,真正删除数据需要通过执行 vacuum 命令。 Delta Lake 删除是如何实现的 前面小结我们简单体验了一下 Delete 的使用,本小结将深入代码详细介绍 Delta Lake 的 Delete 是如何实现的。delete 的 API 是通过在 io.delta.tables.DeltaTable 类添加相应方法实现的,其中涉及删除的方法主要包括下面三个: def delete(condition: String): Unit = { delete(functions.expr(condition)) } def delete(condition: Column): Unit = { executeDelete(Some(condition.expr)) } def delete(): Unit = { executeDelete(None) } 这个就是我们在上面例子看到的 delete 支持的三种用法。这三个函数最终都是调用 io.delta.tables.execution.DeltaTableOperations#executeDelete 函数的,executeDelete 的实现如下: protected def executeDelete(condition: Option[Expression]): Unit = { val delete = Delete(self.toDF.queryExecution.analyzed, condition) // current DELETE does not support subquery, // and the reason why perform checking here is that // we want to have more meaningful exception messages, // instead of having some random msg generated by executePlan(). subqueryNotSupportedCheck(condition, "DELETE") val qe = sparkSession.sessionState.executePlan(delete) val resolvedDelete = qe.analyzed.asInstanceOf[Delete] val deleteCommand = DeleteCommand(resolvedDelete) deleteCommand.run(sparkSession) } self.toDF.queryExecution.analyzed 这个就是我们输入 Delta Lake 表的 Analyzed Logical Plan,condition 就是我们执行删除操作的条件表达式(也就是上面例子的 id < = '4')。这个方法的核心就是初始化 DeleteCommand,然后调用 DeleteCommand 的 run 方法执行删除操作。DeleteCommand 类扩展自 Spark 的 RunnableCommand 特质,并实现其中的 run 方法,我们在 Spark 里面看到的 CREATE TABLE、ALTER TABLE、SHOE CREATE TABLE 等命令都是继承这个类的,所以 Delta Lake 的 delete、update 以及 Merge 也都是继承这个类。DeleteCommand 的 run 方法实现如下: final override def run(sparkSession: SparkSession): Seq[Row] = { recordDeltaOperation(tahoeFileIndex.deltaLog, "delta.dml.delete") { // 获取事务日志持有对象 val deltaLog = tahoeFileIndex.deltaLog // 检查 Delta Lake 表是否支持删除操作 deltaLog.assertRemovable() // 开启新事务,执行删除操作。 deltaLog.withNewTransaction { txn => performDelete(sparkSession, deltaLog, txn) } // Re-cache all cached plans(including this relation itself, if it's cached) that refer to // this data source relation. sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target) } Seq.empty[Row] } Delta Lake 表允许用户设置成 appendOnly(通过 spark.databricks.delta.properties.defaults.appendOnly 参数设置),也就是只允许追加操作,所以如果我们执行删除之前需要做一些校验。校验通过之后开始执行删除操作,由于删除操作是需要保证原子性的,所以这个操作需要在事务里面进行,withNewTransaction 的实现如下: def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { try { // 更新当前表事务日志的快照 update() // 初始化乐观事务锁对象 val txn = new OptimisticTransaction(this) // 开启事务 OptimisticTransaction.setActive(txn) // 执行写数据操作 thunk(txn) } finally { // 关闭事务 OptimisticTransaction.clearActive() } } 在开启事务之前,需要更新当前表事务日志的快照,因为在执行删除操作表之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 这个就是执行我们上面的 performDelete(sparkSession, deltaLog, txn) 方法。Delta Lake 删除的整个核心就在 performDelete 方法里面了。 如果某个文件里面有数据需要删除,那么这个文件会被标记为删除,然后这个文件里面不需要删除的数据需要重新写到一个新文件里面。那么在 performDelete 方法里面我们就需要知道哪些数据需要删除,这些数据对应的文件在哪里以及是否需要些事务日志。Delta Lake 将删除实现分为三大情况: 1、如果执行 delete 的时候并没有传递相关的删除条件,也就是上面例子的 iteblogDeltaTable.delete(),这时候其实就是删除当前 Delta Lake 表的所有数据。那这种情况最好处理了,只需要直接删除 Delta Lake 表对应的所有文件即可; 2、如果执行 delete 的时候传递了相关删除条件,而这个删除条件只是分区字段,比如 dt 是 Delta Lake 表的分区字段,然后我们执行了 iteblogDeltaTable.delete("dt = '20190828'") 这样相关的删除操作,那么我们可以直接从缓存在内存中的快照(snapshot, 也就是通过上面的 update() 函数初始化的)拿到需要删除哪些文件,直接删除即可,而且不需要执行数据重写操作。 3、最后一种情况就是用户删除的时候含有一些非分区字段的过滤条件,这时候我们就需要扫描底层数据,获取需要删除的数据在哪个文件里面,这又分两种情况: 3.1、Delta Lake 表并不存在我们需要删除的数据,这时候不需要做任何操作,直接返回,就连事务日志都不用记录; 3.2、这种情况是最复杂的,我们需要计算需要删除的数据在哪个文件里面,然后把对应的文件里面不需要删除的数据重写到新的文件里面(如果没有,就不生成新文件),最后记录事务日志。 为了加深印象,我画了一张图希望大家能够理解上面的过程。上图中每个绿色的框代表一个分区目录下的文件,红色代表标记为删除的文件,也就是事务日志中使用 remove 标记的文件,紫色代表移除需要删除的数据之后新生成的文件,也就是事务日志里面使用 add 标记的文件。 下面我们来详细分析删除的操作。 private def performDelete( sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction) = { import sparkSession.implicits._ var numTouchedFiles: Long = 0 var numRewrittenFiles: Long = 0 var scanTimeMs: Long = 0 var rewriteTimeMs: Long = 0 val startTime = System.nanoTime() val numFilesTotal = deltaLog.snapshot.numOfFiles val deleteActions: Seq[Action] = condition match { // 对应上面情况1,delete 操作没有传递任何条件 case None => // 直接将内存中快照里面所有的 AddFile 文件拿出来 val allFiles = txn.filterFiles(Nil) numTouchedFiles = allFiles.size scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 val operationTimestamp = System.currentTimeMillis() // 将 AddFile 标记成 RemoveFile allFiles.map(_.removeWithTimestamp(operationTimestamp)) // 传递了删除过滤条件,对应上面情况2、3 case Some(cond) => // metadataPredicates 为分区删除条件 // otherPredicates 为其他删除条件 val (metadataPredicates, otherPredicates) = DeltaTableUtils.splitMetadataAndDataPredicates( cond, txn.metadata.partitionColumns, sparkSession) // 如果只有分区删除条件,也就是 dt= "20190828" 这样的过滤条件,对应上面情况2 if (otherPredicates.isEmpty) { val operationTimestamp = System.currentTimeMillis() // 从快照中拿出符合这个分区条件的 AddFile 文件 val candidateFiles = txn.filterFiles(metadataPredicates) scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 numTouchedFiles = candidateFiles.size // 将 AddFile 标记成 RemoveFile candidateFiles.map(_.removeWithTimestamp(operationTimestamp)) } else { // 对应上面情况3,含有其他字段的删除条件,这时候我们需要扫描底层数据获取这些删除数据所在的文件 // 找到删除的数据潜在的 AddFile 文件列表 val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates) numTouchedFiles = candidateFiles.size val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) val fileIndex = new TahoeBatchFileIndex( sparkSession, "delete", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex) // 这个就是潜在需要删除的文件对应的 Dataset val data = Dataset.ofRows(sparkSession, newTarget) val filesToRewrite = withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") { // 没有需要潜在删除的 AddFile 文件 if (numTouchedFiles == 0) { Array.empty[String] } else { // 找到删除数据所在的文件 data.filter(new Column(cond)).select(new Column(InputFileName())).distinct() .as[String].collect() } } scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 // 对应上面情况3.1,如果没有找到需要删除的数据所在文件,那么删除的文件就是 Nil,不需要做事务日志 if (filesToRewrite.isEmpty) { Nil } else { // 对应上面情况3.2,找到我们需要删除的文件列表, // 那我们需要将需要删除文件里面不用删除的数据重新写到新文件 // Do the second pass and just read the affected files val baseRelation = buildBaseRelation( sparkSession, txn, "delete", tahoeFileIndex.path, filesToRewrite, nameToAddFileMap) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) val targetDF = Dataset.ofRows(sparkSession, newTarget) // 将删除过滤条件取反,也就是 id > 10 变成 id <= 10 val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType))) // 拿到潜在删除文件中不需要删除的数据 val updatedDF = targetDF.filter(new Column(filterCond)) // rewrittenFiles 就是新增的文件 val rewrittenFiles = withStatusCode( "DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") { // 开始将潜在需要删除文件里面不需要删除的数据写入到新文件 txn.writeFiles(updatedDF) } numRewrittenFiles = rewrittenFiles.size rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs val operationTimestamp = System.currentTimeMillis() // 需要删除的文件和新增的文件集合 removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ rewrittenFiles } } } // 如果匹配到需要删除的文件,那么需要记录事务日志 if (deleteActions.nonEmpty) { // 写事务日志,也就是写到 _delta_log 目录下,这个我们在前面分析了。 txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq)) } recordDeltaEvent( deltaLog, "delta.dml.delete.stats", data = DeleteMetric( condition = condition.map(_.sql).getOrElse("true"), numFilesTotal, numTouchedFiles, numRewrittenFiles, scanTimeMs, rewriteTimeMs) ) 上面注释已经很清楚说明了 Delta Lake 的删除过程了。从上面的执行过程也可以看出,Delta Lake 删除操作的代价还是挺高的,所以官方也建议删除数据的时候提供分区过滤条件,这样可以避免扫描全表的数据。 写在最后 为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds
Apache Spark Delta Lake 写数据使用及实现原理代码解析 Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下: df.write.format("delta").save("/data/yangping.wyp/delta/test/") //数据按照 dt 分区 df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/") // 覆盖之前的数据 df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/") 大家可以看出,使用写 Delta 数据是非常简单的,这也是 Delte Lake 介绍的 100% 兼容 Spark。 Delta Lake 写数据原理 前面简单了解了如何使用 Delta Lake 来写数据,本小结我们将深入介绍 Delta Lake 是如何保证写数据的基本原理以及如何保证事务性。 得益于 Apache Spark 强大的数据源 API,我们可以很方便的给 Spark 添加任何数据源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 实现的一种新的数据源,我们调用 df.write.format("delta") 其实底层调用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 类。为了简单起见,本文介绍的是 Delta Lake 批量写的实现,实时流写 Delta Lake 本文不涉及,后面有机会再介绍。 Delta Lake 批量写扩展了 org.apache.spark.sql.sources.CreatableRelationProvider 特质,并实现了其中的方法。我们调用上面的写数据方法首先会调用 DeltaDataSource 类的 createRelation 方法,它的具体实现如下: override def createRelation( sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { // 写数据的路径 val path = parameters.getOrElse("path", { throw DeltaErrors.pathNotSpecifiedException }) // 分区字段 val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY) .map(DeltaDataSource.decodePartitioningColumns) .getOrElse(Nil) // 事务日志对象 val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path) // 真正的写操作过程 WriteIntoDelta( deltaLog = deltaLog, mode = mode, new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf), partitionColumns = partitionColumns, configuration = Map.empty, data = data).run(sqlContext.sparkSession) deltaLog.createRelation() } 其中 mode 就是保持数据的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 这个传递的参数,比如分区字段、数据保存路径以及 Delta 支持的一些参数(replaceWhere、mergeSchema、overwriteSchema 等,具体参见 org.apache.spark.sql.delta.DeltaOptions);data 就是我们需要保存的数据。 createRelation 方法紧接着就是获取数据保存的路径,分区字段等信息。然后初始化 deltaLog,deltaLog 的初始化会做很多事情,比如会读取磁盘所有的事务日志(_delta_log 目录下),并构建最新事务日志的最新快照,里面可以拿到最新数据的版本。由于 deltaLog 的初始化成本比较高,所以 deltaLog 初始化完之后会缓存到 deltaLogCache 中,这是一个使用 Guava 的 CacheBuilder 类实现的一个缓存,缓存的数据保持一小时,缓存大小可以通过 delta.log.cacheSize 参数进行设置。只要写数据的路径是一样的,就只需要初始化一次 deltaLog,后面直接从缓存中拿即可。除非之前缓存的 deltaLog 被清理了,或者无效才会再次初始化。DeltaLog 类是 Delta Lake 中最重要的类之一,涉及的内容非常多,所以我们会单独使用一篇文章进行介绍。 紧接着初始化 WriteIntoDelta,WriteIntoDelta 扩展自 RunnableCommand,Delta Lake 中的更新、删除、合并都是扩展这个类的。初始化完 WriteIntoDelta 之后,就会调用 run 方法执行真正的写数据操作。WriteIntoDelta 的 run 方法实现如下: override def run(sparkSession: SparkSession): Seq[Row] = { deltaLog.withNewTransaction { txn => val actions = write(txn, sparkSession) val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere) txn.commit(actions, operation) } Seq.empty } Delta Lake 所有的更新操作都是在事务中进行的,deltaLog.withNewTransaction 就是一个事务,withNewTransaction 的实现如下: def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { try { // 更新当前表事务日志的快照 update() // 初始化乐观事务锁对象 val txn = new OptimisticTransaction(this) // 开启事务 OptimisticTransaction.setActive(txn) // 执行写数据操作 thunk(txn) } finally { // 关闭事务 OptimisticTransaction.clearActive() } } 在开启事务之前,需要更新当前表事务的快照,因为在执行写数据之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 就是需要执行的事务操作,对应 deltaLog.withNewTransaction 里面的所有代码。 我们回到上面的 run 方法。val actions = write(txn, sparkSession) 就是执行写数据的操作,它的实现如下: def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { import sparkSession.implicits._ // 如果不是第一次往表里面写数据,需要判断写数据的模式是否符合条件 if (txn.readVersion > -1) { // This table already exists, check if the insert is valid. if (mode == SaveMode.ErrorIfExists) { throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath) } else if (mode == SaveMode.Ignore) { return Nil } else if (mode == SaveMode.Overwrite) { deltaLog.assertRemovable() } } // 更新表的模式,比如是否覆盖现有的模式,是否和现有的模式进行 merge updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation) // 是否定义分区过滤条件 val replaceWhere = options.replaceWhere val partitionFilters = if (replaceWhere.isDefined) { val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get) if (mode == SaveMode.Overwrite) { verifyPartitionPredicates( sparkSession, txn.metadata.partitionColumns, predicates) } Some(predicates) } else { None } // 第一次写数据初始化事务日志的目录 if (txn.readVersion < 0) { // Initialize the log path deltaLog.fs.mkdirs(deltaLog.logPath) } // 写数据到文件系统中 val newFiles = txn.writeFiles(data, Some(options)) val deletedFiles = (mode, partitionFilters) match { // 全量覆盖,直接拿出缓存在内存中最新事务日志快照里面的所有 AddFile 文件 case (SaveMode.Overwrite, None) => txn.filterFiles().map(_.remove) // 从事务日志快照中获取对应分区里面的所有 AddFile 文件 case (SaveMode.Overwrite, Some(predicates)) => // Check to make sure the files we wrote out were actually valid. val matchingFiles = DeltaLog.filterFileList( txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect() val invalidFiles = newFiles.toSet -- matchingFiles if (invalidFiles.nonEmpty) { val badPartitions = invalidFiles .map(_.partitionValues) .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") } .mkString(", ") throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions) } txn.filterFiles(predicates).map(_.remove) case _ => Nil } newFiles ++ deletedFiles } } 如果 txn.readVersion == -1,说明是第一次写数据到 Delta Lake 表,所以当这个值大于 -1 的时候,需要判断一下写数据的操作是否合法。由于 Delta Lake 底层使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,这就是 updateMetadata 函数对应的操作。因为 Delta Lake 表支持分区,所以我们可能在写数据的时候指定某个分区进行覆盖。真正写数据的操作是 txn.writeFiles 函数执行的,具体实现如下: def writeFiles( data: Dataset[_], writeOptions: Option[DeltaOptions], isOptimize: Boolean): Seq[AddFile] = { hasWritten = true val spark = data.sparkSession val partitionSchema = metadata.partitionSchema val outputPath = deltaLog.dataPath val (queryExecution, output) = normalizeData(data, metadata.partitionColumns) val partitioningColumns = getPartitioningColumns(partitionSchema, output, output.length < data.schema.size) // 获取 DelayedCommitProtocol,里面可以设置写文件的名字, // commitTask 和 commitJob 等做一些事情 val committer = getCommitter(outputPath) val invariants = Invariants.getFromSchema(metadata.schema, spark) SQLExecution.withNewExecutionId(spark, queryExecution) { val outputSpec = FileFormatWriter.OutputSpec( outputPath.toString, Map.empty, output) val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants) FileFormatWriter.write( sparkSession = spark, plan = physicalPlan, fileFormat = snapshot.fileFormat, committer = committer, outputSpec = outputSpec, hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), partitionColumns = partitioningColumns, bucketSpec = None, statsTrackers = Nil, options = Map.empty) } // 返回新增的文件 committer.addedStatuses } Delta Lake 写操作最终调用 Spark 的 FileFormatWriter.write 方法进行的,通过这个方法的复用将我们真正的数据写入到 Delta Lake 表里面去了。在 Delta Lake 中,如果是新增文件则会在事务日志中使用 AddFile 类记录相关的信息,AddFile 持久化到事务日志里面的内容如下: {"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}} 可以看出 AddFile 里面记录了新增文件的保存路径,分区信息,新增的文件大小,修改时间等信息。如果是删除文件,也会在事务日志里面记录这个删除操作,对应的就是使用 RemoveFile 类存储,RemoveFile 持久化到事务日志里面的内容如下: {"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}} RemoveFile 里面保存了删除文件的路径,删除时间等信息。如果新增一个文件,再删除一个文件,那么最新的事务日志快照里面只会保存删除这个文件的记录。从这里面也可以看出, Delta Lake 删除、新增 ACID 是针对文件级别的。 上面的写操作肯定会产生新的文件,所以写操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要删除的文件(RemoveFile)。针对那些文件需要删除需要做一些判断,主要分两种情况(具体参见 write 方法里面的): 如果是全表覆盖,则直接从缓存在内存中最新的事务日志快照中拿出所有 AddFile 文件,然后将其标记为 RemoveFile; 如果是分区内的覆盖,则从缓存在内存中最新的事务日志快照中拿出对应分区下的 AddFile 文件,然后将其标记为 RemoveFile。 最后 write 方法返回新增的文件和需要删除的文件(newFiles ++ deletedFiles),这些文件最终需要记录到事务日志里面去。关于事务日志是如何写进去的请参见这篇文章的详细分析。 写在最后 为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。在看本文时,强烈建议先看一下《深入理解 Apache Spark Delta Lake 的事务日志》文章。 Delta Lake 更新数据事务实现 Delta Lake 里面所有对表数据的更新(插入数据、更新数据、删除数据)都需要进行下面这些步骤,其主要目的是把删除哪些文件、新增哪些文件等记录写入到事务日志里面,也就是 _delta_log 目录下的 json 文件,通过这个实现 Delta Lake 的 ACID 以及时间旅行。下面我们进入事务日志提交的切入口 org.apache.spark.sql.delta.OptimisticTransaction#commit,持久化事务操作日志都是需要调用这个函数进行的。commit 函数实现如下: def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation( deltaLog, "delta.commit") { val version = try { // 事务日志提交之前需要先做一些工作,比如如果更新操作是第一次进行的,那么需要初始化 Protocol, // 还需要将用户对 Delta Lake 表的设置持久化到事务日志里面 var finalActions = prepareCommit(actions, op) // 如果这次更新操作需要删除之前的文件,那么 isBlindAppend 为 false,否则为 true val isBlindAppend = { val onlyAddFiles = finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile]) onlyAddFiles && !dependsOnFiles } // 如果 commitInfo.enabled 参数设置为 true,那么还需要把 commitInfo 记录到事务日志里面 if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) { commitInfo = CommitInfo( clock.getTimeMillis(), op.name, op.jsonEncodedValues, Map.empty, Some(readVersion).filter(_ >= 0), None, Some(isBlindAppend)) finalActions = commitInfo +: finalActions } // 真正写事务日志,如果发生版本冲突会重试直到事务日志写成功 val commitVersion = doCommit(snapshot.version + 1, finalActions, 0) logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}") // 对事务日志进行 checkpoint 操作 postCommit(commitVersion, finalActions) commitVersion } catch { case e: DeltaConcurrentModificationException => recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType) throw e case NonFatal(e) => recordDeltaEvent( deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e))) throw e } version } 我们先从这个函数的两个参数开始介绍。 _actions: Seq[Action]_:Delta Lake 表更新操作产生的新文件(AddFile)和需要删除文件的列表(RemoveFile)。如果是 Structured Streaming 作业,还会记录 SetTransaction 记录,里面会存储作业的 query id(sql.streaming.queryId)、batchId 以及当前时间。这个就是我们需要持久化到事务日志里面的数据。 _op: DeltaOperations.Operation_:Delta 操作类型,比如 WRITE、STREAMING UPDATE、DELETE、MERGE 以及 UPDATE 等。 在 commit 函数里面主要分为三步:prepareCommit、doCommit 以及 postCommit。prepareCommit 的实现如下: protected def prepareCommit( actions: Seq[Action], op: DeltaOperations.Operation): Seq[Action] = { assert(!committed, "Transaction already committed.") // 如果我们更新了表的 Metadata 信息,那么需要将其写入到事务日志里面 var finalActions = newMetadata.toSeq ++ actions val metadataChanges = finalActions.collect { case m: Metadata => m } assert( metadataChanges.length <= 1, "Cannot change the metadata more than once in a transaction.") metadataChanges.foreach(m => verifyNewMetadata(m)) // 首次提交事务日志,那么会确保 _delta_log 目录要存在, // 然后检查 finalActions 里面是否有 Protocol,没有的话需要初始化协议版本 if (snapshot.version == -1) { deltaLog.ensureLogDirectoryExist() if (!finalActions.exists(_.isInstanceOf[Protocol])) { finalActions = Protocol() +: finalActions } } finalActions = finalActions.map { // 第一次提交,并且是 Metadata那么会将 Delta Lake 的配置信息加入到 Metadata 里面 case m: Metadata if snapshot.version == -1 => val updatedConf = DeltaConfigs.mergeGlobalConfigs( spark.sessionState.conf, m.configuration, Protocol()) m.copy(configuration = updatedConf) case other => other } deltaLog.protocolWrite( snapshot.protocol, logUpgradeMessage = !actions.headOption.exists(_.isInstanceOf[Protocol])) // 如果 actions 里面有删除的文件,那么需要检查 Delta Lake 是否支持删除 val removes = actions.collect { case r: RemoveFile => r } if (removes.exists(_.dataChange)) deltaLog.assertRemovable() finalActions } prepareCommit 里面做的事情比较简单,主要对事务日志进行补全等操作。具体为 、由于 Delta Lake 表允许对已经存在的表模式进行修改,比如添加了新列,或者覆盖原有表的模式等。那么这时候我们需要将新的 Metadata 写入到事务日志里面。Metadata 里面存储了表的 schema、分区列、表的配置、表的创建时间。注意,除了表的 schema 和分区字段可以在后面修改,其他的信息都不可以修改的。 、如果是首次提交事务日志,那么先检查表的 _delta_log 目录是否存在,不存在则创建。然后检查是否设置了协议的版本,如果没有设置,则使用默认的协议版本,默认的协议版本中 readerVersion = 1,writerVersion = 2; 、如果是第一次提交,并且是 Metadata ,那么会将 Delta Lake 的配置信息加入到 Metadata 里面。Delta Lake 表的配置信息主要是在 org.apache.spark.sql.delta.sources.DeltaSQLConf 类里面定义的,比如我们可以在创建 Delta Lake 表的时候指定多久做一次 Checkpoint。 、由于我们可以通过 spark.databricks.delta.properties.defaults.appendOnly 参数将表设置为仅允许追加,所以如果当 actions 里面存在 RemoveFile,那么我们需要判断表是否允许删除。 我们回到 commit 函数里面,在执行完 prepareCommit 之后得到了 finalActions 列表,这些信息就是我们需要写入到事务日志里面的数据。紧接着会判断这次事务变更是否需要删除之前的文件,如果是,那么 isBlindAppend 为 false,否则为 true。 当 commitInfo.enabled 参数设置为 true(默认),那么还需要将 commitInfo 写入到事务日志文件里面。CommitInfo 里面包含了操作时间、操作的类型(WRITEUPDATE)、操作类型(Overwrite)等重要信息。最后到了 doCommit 函数的调用,大家注意看第一个参数传递的是 snapshot.version + 1,snapshot.version 是事务日志中最新的版本,比如 _delta_lake 目录下的文件如下: -rw-r--r-- 1 yangping.wyp wheel 811B 8 28 19:12 00000000000000000000.json -rw-r--r-- 1 yangping.wyp wheel 514B 8 28 19:14 00000000000000000001.json -rw-r--r-- 1 yangping.wyp wheel 711B 8 29 10:54 00000000000000000002.json -rw-r--r-- 1 yangping.wyp wheel 865B 8 29 10:56 00000000000000000003.json 那么 snapshot.version 的值就是3,所以这次更新操作的版本应该是4。我们来看下 doCommit 函数的实现: private def doCommit( attemptVersion: Long, actions: Seq[Action], attemptNumber: Int): Long = deltaLog.lockInterruptibly { try { logDebug(s"Attempting to commit version $attemptVersion with ${actions.size} actions") // 真正写事务日志的操作 deltaLog.store.write( deltaFile(deltaLog.logPath, attemptVersion), actions.map(_.json).toIterator) val commitTime = System.nanoTime() // 由于发生了数据更新,所以更新内存中事务日志的最新快照,并做相关判断 val postCommitSnapshot = deltaLog.update() if (postCommitSnapshot.version < attemptVersion) { throw new IllegalStateException( s"The committed version is $attemptVersion " + s"but the current version is ${postCommitSnapshot.version}.") } // 发送一些统计信息 var numAbsolutePaths = 0 var pathHolder: Path = null val distinctPartitions = new mutable.HashSet[Map[String, String]] val adds = actions.collect { case a: AddFile => pathHolder = new Path(new URI(a.path)) if (pathHolder.isAbsolute) numAbsolutePaths += 1 distinctPartitions += a.partitionValues a } val stats = CommitStats( startVersion = snapshot.version, commitVersion = attemptVersion, readVersion = postCommitSnapshot.version, txnDurationMs = NANOSECONDS.toMillis(commitTime - txnStartNano), commitDurationMs = NANOSECONDS.toMillis(commitTime - commitStartNano), numAdd = adds.size, numRemove = actions.collect { case r: RemoveFile => r }.size, bytesNew = adds.filter(_.dataChange).map(_.size).sum, numFilesTotal = postCommitSnapshot.numOfFiles, sizeInBytesTotal = postCommitSnapshot.sizeInBytes, protocol = postCommitSnapshot.protocol, info = Option(commitInfo).map(_.copy(readVersion = None, isolationLevel = None)).orNull, newMetadata = newMetadata, numAbsolutePaths, numDistinctPartitionsInAdd = distinctPartitions.size, isolationLevel = null) recordDeltaEvent(deltaLog, "delta.commit.stats", data = stats) attemptVersion } catch { case e: java.nio.file.FileAlreadyExistsException => checkAndRetry(attemptVersion, actions, attemptNumber) } } 、这里就是真正写事务日志的操作,其中 store 是通过 spark.delta.logStore.class 参数指定的,目前支持 HDFS、S3、Azure 以及 Local 等存储介质。默认是 HDFS。具体的写事务操作参见下面的介绍。 、持久化事务日志之后,更新内存中的事务日志最新的快照,然后做相关的合法性校验; 、发送一些统计信息。这里应该是 databricks 里面含有的功能,开源版本这里面其实并没有做什么操作。 下面我们开看看真正写事务日志的实现,为了简单起见,我们直接查看 HDFSLogStore 类中对应的方法,主要涉及 writeInternal,其实现如下: private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = { // 获取 HDFS 的 FileContext 用于后面写事务日志 val fc = getFileContext(path) // 如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试 if (!overwrite && fc.util.exists(path)) { // This is needed for the tests to throw error with local file system throw new FileAlreadyExistsException(path.toString) } // 事务日志先写到临时文件 val tempPath = createTempPath(path) var streamClosed = false // This flag is to avoid double close var renameDone = false // This flag is to save the delete operation in most of cases. val stream = fc.create( tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled())) try { // 将本次修改产生的 actions 写入到临时事务日志里 actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write) stream.close() streamClosed = true try { val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE // 将临时的事务日志移到正式的事务日志里面,如果移动失败则抛出异常,后面再重试 fc.rename(tempPath, path, renameOpt) renameDone = true } catch { case e: org.apache.hadoop.fs.FileAlreadyExistsException => throw new FileAlreadyExistsException(path.toString) } } finally { if (!streamClosed) { stream.close() } // 删除临时事务日志 if (!renameDone) { fc.delete(tempPath, false) } } } writeInternal 的实现逻辑很简单,其实就是我们正常的写文件操作,具体如下: 、获取 HDFS 的 FileContext 用于后面写事务日志 、如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试;比如上面我们写事务日志之前磁盘中最新的事务日志文件是 00000000000000000003.json,我们这次写的事务日志文件应该是 00000000000000000004.json,但是由于 Delta Lake 允许多个用户写数据,所以在我们获取最新的事务日志版本到写事务日志期间已经有用户写了一个新的事务日志 00000000000000000004.json,那么我们这次写肯定要失败了。这时候会抛出 FileAlreadyExistsException 异常,以便后面重试。 、写事务日志的时候是先写到表 _delta_lake 目录下的临时文件里面,比如我们这次写的事务日志文件为 00000000000000000004.json,那么会往类似于 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 文件里面写数据的。 、将本次更新操作的事务记录写到临时文件里; 、写完事务日志之后我们需要将临时事务日志移到最后正式的日志文件里面,比如将 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 移到 00000000000000000004.json。大家注意,在写事务日志文件的过程中同样存在多个用户修改表,所以 00000000000000000004.json 文件很可能已经被别的修改占用了,这时候也需要抛出 FileAlreadyExistsException 异常,以便后面重试。 整个事务日志写操作就完成了,我们再回到 doCommit 函数,注意由于 writeInternal 可能会抛出 FileAlreadyExistsException 异常,也就是 deltaLog.store.write(xxx) 调用可能会抛出异常,我们注意看到 doCommit 函数 catch 了这个异常,并在异常捕获里面调用 checkAndRetry(attemptVersion, actions, attemptNumber),这就是事务日志重写过程, checkAndRetry 函数的实现如下: protected def checkAndRetry( checkVersion: Long, actions: Seq[Action], attemptNumber: Int): Long = recordDeltaOperation( deltaLog, "delta.commit.retry", tags = Map(TAG_LOG_STORE_CLASS -> deltaLog.store.getClass.getName)) { // 读取磁盘中持久化的事务日志,并更新内存中事务日志快照 deltaLog.update() // 重试的版本是刚刚更新内存中事务日志快照的版本+1 val nextAttempt = deltaLog.snapshot.version + 1 // 做相关的合法性判断 (checkVersion until nextAttempt).foreach { version => val winningCommitActions = deltaLog.store.read(deltaFile(deltaLog.logPath, version)).map(Action.fromJson) val metadataUpdates = winningCommitActions.collect { case a: Metadata => a } val txns = winningCommitActions.collect { case a: SetTransaction => a } val protocol = winningCommitActions.collect { case a: Protocol => a } val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }.map( ci => ci.copy(version = Some(version))) val fileActions = winningCommitActions.collect { case f: FileAction => f } // If the log protocol version was upgraded, make sure we are still okay. // Fail the transaction if we're trying to upgrade protocol ourselves. if (protocol.nonEmpty) { protocol.foreach { p => deltaLog.protocolRead(p) deltaLog.protocolWrite(p) } actions.foreach { case Protocol(_, _) => throw new ProtocolChangedException(commitInfo) case _ => } } // Fail if the metadata is different than what the txn read. if (metadataUpdates.nonEmpty) { throw new MetadataChangedException(commitInfo) } // Fail if the data is different than what the txn read. if (dependsOnFiles && fileActions.nonEmpty) { throw new ConcurrentWriteException(commitInfo) } // Fail if idempotent transactions have conflicted. val txnOverlap = txns.map(_.appId).toSet intersect readTxn.toSet if (txnOverlap.nonEmpty) { throw new ConcurrentTransactionException(commitInfo) } } logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttempt), retrying.") // 开始重试事务日志的写操作 doCommit(nextAttempt, actions, attemptNumber + 1) } checkAndRetry 函数只有在事务日志写冲突的时候才会出现,主要目的是重写当前的事务日志。 、因为上次更新事务日志发生冲突,所以我们需要再一次读取磁盘中持久化的事务日志,并更新内存中事务日志快照; 、重试的版本是刚刚更新内存中事务日志快照的版本+1; 、做相关的合法性判断; 、开始重试事务日志的写操作。 当事务日志成功持久化到磁盘之后,这时候会执行 commit 操作的最后一步,执行 postCommit 函数,其实现如下: protected def postCommit(commitVersion: Long, commitActions: Seq[Action]): Unit = { committed = true if (commitVersion != 0 && commitVersion % deltaLog.checkpointInterval == 0) { try { deltaLog.checkpoint() } catch { case e: IllegalStateException => logWarning("Failed to checkpoint table state.", e) } } } postCommit 函数实现很简单,就是判断需不需要对事务日志做一次 checkpoint 操作,其中 deltaLog.checkpointInterval 就是通过 spark.databricks.delta.properties.defaults.checkpointInterval 参数设置的,默认每写10次事务日志做一次 checkpoint。 checkpoint 的其实就是将内存中事务日志的最新快照持久化到磁盘里面,如下所示: -rw-r--r-- 1 yangping.wyp wheel 811B 8 28 19:12 00000000000000000000.json -rw-r--r-- 1 yangping.wyp wheel 514B 8 28 19:14 00000000000000000001.json -rw-r--r-- 1 yangping.wyp wheel 711B 8 29 10:54 00000000000000000002.json -rw-r--r-- 1 yangping.wyp wheel 865B 8 29 10:56 00000000000000000003.json -rw-r--r-- 1 yangping.wyp wheel 668B 8 29 14:36 00000000000000000004.json -rw-r--r-- 1 yangping.wyp wheel 13K 8 29 14:36 00000000000000000005.checkpoint.parquet -rw-r--r-- 1 yangping.wyp wheel 514B 8 29 14:36 00000000000000000005.json -rw-r--r-- 1 yangping.wyp wheel 514B 8 29 14:36 00000000000000000006.json -rw-r--r-- 1 yangping.wyp wheel 24B 8 29 14:36 _last_checkpoint 00000000000000000005.checkpoint.parquet 文件就是对事务日志进行 checkpoint 的文件,里面汇总了 00000000000000000000.json - 00000000000000000005.json 之间的所有事务操作记录。所以下一次需要构建事务日志的快照时,只需要从 00000000000000000005.checkpoint.parquet 文件、00000000000000000006.json 文件构造,而无需再读取 00000000000000000000.json - 00000000000000000005.json 之间的事务操作。同时我们还看到做完 checkpoint 之后还会生成一个 _last_checkpoint 文件,这个其实就是对 CheckpointMetaData 类的持久化操作。里面记录了最后一次 checkpoint 的版本,checkpoint 文件里面的 Action 条数,如下: ⇒ cat _last_checkpoint {"version":5,"size":10} 注意,其实 CheckpointMetaData 类里面还有个 parts 字段,这个代表 checkpoint 文件有几个分片。因为随着时间的推移,checkpoint 文件也会变得很大,如果只写到一个 checkpoint 文件里面效率不够好,这时候会对 checkpoint 文件进行拆分,拆分成几个文件是记录到 parts 里面,但是目前开源版本的 Delta Lake 尚无这个功能,也不知道数砖后面会不会开源。 写在最后 为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds
深入理解 Apache Spark Delta Lake 的事务日志 事务日志是理解 Delta Lake 的关键,因为它是贯穿许多最重要功能的通用模块,包括 ACID 事务、可扩展的元数据处理、时间旅行(time travel)等。本文我们将探讨事务日志(Transaction Log)是什么,它在文件级别是如何工作的,以及它如何为多个并发读取和写入问题提供优雅的解决方案。 事务日志(Transaction Log)是什么 Delta Lake 事务日志(也称为 DeltaLog)是 Delta Lake 表上执行每次事务的有序记录。具体形式如下: yangping.wyp@yangping.wyp:/tmp/delta-table/_delta_log| ⇒ ll total 280 -rw-r--r-- 1 yangping.wyp wheel 1.5K 8 21 14:19 00000000000000000000.json -rw-r--r-- 1 yangping.wyp wheel 1.1K 8 21 14:31 00000000000000000001.json -rw-r--r-- 1 yangping.wyp wheel 791B 8 21 14:31 00000000000000000002.json -rw-r--r-- 1 yangping.wyp wheel 3.9K 8 21 14:31 00000000000000000003.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:38 00000000000000000004.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:39 00000000000000000005.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000006.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000007.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000008.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000009.json -rw-r--r-- 1 yangping.wyp wheel 15K 8 21 19:40 00000000000000000010.checkpoint.parquet -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000010.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000011.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000012.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000013.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000014.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000015.json -rw-r--r-- 1 yangping.wyp wheel 1.2K 8 21 19:40 00000000000000000016.json 事务日志主要用途是什么? 单一事实来源 Delta Lake 构建于 Apache Spark™ 之上,允许多个写和读操作同时对给定表进行操作。为了始终向用户显示正确的数据视图,事务日志可作为单一事实来源(single source of truth) - 中央存储库,用于跟踪用户对表所做的所有更改。 当用户第一次读取 Delta Lake 表或在打开的表上运行一个新查询,该表自上次读取以来已被修改, Spark 会检查事务日志来查看已向表写入的新事务,然后使用这些新更改更新最终用户的表。这可确保用户表的版本始终与最新查询中的主记录同步,并且用户无法对表进行不同的,冲突的更改。 Delta Lake 上的原子性实现 原子性是 ACID 事务的四个属性之一,它可以保证在 Delta Lake 上执行的操作(如 INSERT 或 UPDATE )要么全部成功要么全部不成功。 如果没有此属性,硬件故障或软件错误很容易导致数据仅部分写入表中,从而导致数据混乱或损坏。 事务日志是 Delta Lake 能够提供原子性保证的机制。无论如何,如果它没有记录在事务日志中,它就不会发生。通过只记录完全执行的事务,并使用该记录作为唯一的真相来源,事务日志允许用户对其数据进行推理;并且即使数据在 PB 级别上,我们也可以对这些数据的准确性高枕无忧。 事务日志是如何工作的 将事务分解为原子提交 每当用户执行修改表的操作(例如插入、更新或删除)时,Delta Lake 将该操作分解为一系列由以下一个或多个操作组成的离散步骤: Add file:添加一个数据文件; Remove file:删除一个数据文件; Update metadata:更新表的元数据(例如更改表的名称,模式或分区); Set transaction:Structured Streaming 作业已经提交的具有给定 ID 的微批次记录; Change protocol:通过将事务日志切换到最新的软件协议来启用新特性; Commit info:包含有关提交的信息以及该操作是在何时何地进行的。 然后这些操作将按照有序的原子单位记录在事务日志中,称为提交。例如,假设用户创建一个事务以向表中添加新列,并向其中添加更多数据。Delta Lake 会将该事务分解为多个部分,一旦事务完成,就将它们添加到事务日志中,如下所示: Update metadata:更改模式以包含新列; Add file:每个添加的新文件。 文件级别的事务日志 当用户创建 Delta Lake 表时,将在 _delta_log 子目录中自动创建该表的事务日志。 当他或她对该表进行更改时,这些更改将作为有序的原子提交记录在事务日志中。 每个提交都以 JSON 文件的形式写出,从 000000.json 开始。对表的其他更改按升序数字顺序生成后续 JSON 文件,所以下一次提交被写入到 000001.json 文件,下下次修改写入到 000002.json 文件,依此类推。因此,如果我们通过从数据文件 1.parquet 和 2.parquet 向表中添加记录。该事务将自动添加到事务日志中,并以 000000.json 的形式保存到磁盘。然后,我们改变主意并决定删除这些文件并添加一个新文件(3.parquet)。 这些操作将记录为事务日志中的下一个提交,也就是 000001.json,如下所示。 尽管 1.parquet 和 2.parquet 不再是我们 Delta Lake 表的一部分,但它们的添加和删除仍记录在事务日志中,因为这些操作是在我们的表上执行的 - 尽管它们最终相互抵消了。 Delta Lake 仍然保留这样的原子提交,以确保在需要审计表或使用“时间旅行”来查看表在给定时间点的样子时,我们能够准确地做到这一点。 此外,即使我们从表中删除了基础数据文件,Spark 也不会立刻从磁盘中删除文件。用户可以使用 VACUUM 命令删除不再需要的文件。 使用检查点文件(Checkpoint Files)快速重新计算状态 一旦我们提交了10次事务日志,Delta Lake 就会在相同的 _delta_log 子目录中以 Parquet 格式保存一个检查点文件(如上面的 00000000000000000010.checkpoint.parquet 文件)。每 10 次提交 Delta Lake 会自动生成检查点文件,这个是通过参数 checkpointInterval 参数设置。这些检查点文件在某个时间点保存表的整个状态 - 以原生的 Parquet 格式保存,Spark 可以快速轻松地读取。换句话说,它们为 Spark reader 提供了一种“快捷方式”来完全复制表的状态,从而允许 Spark 避免重新处理可能存在的数千个低效的小 JSON 文件。 为了提高速度,Spark可以运行一个 listFrom 操作来查看事务日志中的所有文件,快速跳转到最新的检查点文件,并且只处理自保存了最新的检查点文件以来提交的JSON。 为了演示这是如何工作的,假设我们已经创建了提交,并且事务日志已经记录到 000007.json。Spark 加快了提交的速度,并在内存中自动缓存了表的最新版本。与此同时,其他一些写入者(可能是您过于热心的队友)已经向表中写入了新数据,并事务日志已经记录到 0000012.json 了。 为了合并这些新事务并更新表的状态,Spark 将运行 listFrom 方法来查看版本7之后对表的新更改。 Spark可以直接跳到最近的检查点文件(上图中的 0000010.checkpoint.parquet 文件),而不需要处理所有中间 JSON 文件,因为这个检查点文件包含 commit #10 中表的整个状态。现在,Spark 只需执行 0000011.json 和 0000012.json 的增量处理即可获得表的当前状态。然后 Spark 将表的版本12的状态缓存到内存中。通过遵循此工作流程,Delta Lake 能够使用 Spark 以高效的方式始终更新表的状态。 处理多个并发的读取和写入 现在我们已经在高层次上了解了事务日志如何工作的,让我们来谈谈并发性。到目前为止,我们的示例主要涵盖了用户线性提交事务或至少没有冲突的情况。 但是当 Delta Lake 处理多个并发读写时会发生什么?答案很简单,由于 Delta Lake 由 Apache Spark 提供支持,因此不仅可以让多个用户同时修改表 - 这是预期的。 为了处理这些情况,Delta Lake 采用了乐观的并发控制。 什么是乐观并发控制? 乐观并发控制是一种处理并发事务的方法,它假定不同用户对表所做的事务(更改)可以在不相互冲突的情况下完成。它的速度快得令人难以置信,因为当处理 PB 级的数据时,用户很可能同时处理数据的不同部分,从而允许他们同时完成不冲突的事务。 例如,假设你和我正在一起玩拼图游戏。只要我们都在做拼图的不同部分——比如你在角落里,我在边缘上——我们没有理由不能同时做更大拼图的那一部分,并且以两倍的速度完成拼图。只有当我们同时需要相同的部件时,才会产生冲突。这就是乐观并发控制。 相反,一些数据库系统使用悲观锁定的概念,这是假设最坏的情况——即使我们有10,000块拼图,在某个时候我们肯定需要相同的拼图——这导致了太多的冲突。为了解决这个问题,它的理由是,应该只允许一个人同时做拼图,并把其他人都锁在房间外面。这不是一个快速(或友好)解决难题的方法! 当然,即使使用乐观并发控制,有时用户也会尝试同时修改数据的相同部分。幸运的是,Delta Lake 有相应的协议处理它。 乐观地解决冲突 为了提供ACID事务,Delta Lake 有一个协议,用于确定提交应该如何排序(在数据库中称为 serializability),并确定在同时执行两个或多个提交时应该做什么。Delta Lake通过实现互斥(mutual exclusion)规则来处理这些情况,然后尝试乐观地解决任何冲突。该协议允许Delta Lake遵循ACID隔离原则,该原则确保多个并发写操作之后的表的结果状态与那些连续发生的写操作相同,并且是彼此隔离的。 一般来说,这个过程是这样进行的 记录起始表的版本; 记录读和写操作; 尝试提交; 如果有人已经提交了,检查一下你读到的内容是否有变化; 重复上面的步骤。 为了了解这一切是如何实时进行的,让我们看一下下面的图表,看看 Delta Lake 在冲突突然出现时是如何管理冲突的。假设两个用户从同一个表中读取数据,然后每个用户都尝试向表中添加一些数据。 Delta Lake 记录在进行任何更改之前读取的表的起始表版本(版本0); 用户1和2都试图同时向表添加一些数据。在这里,我们遇到了一个冲突,因为接下来只有一个提交可以被记录为 000001.json; Delta Lake使用“互斥”概念处理这种冲突,这意味着只有一个用户能够成功提交 000001.json。用户1的提交被接受,而用户2的提交被拒绝; Delta Lake 更倾向于乐观地处理这种冲突,而不是为用户2抛出错误。 它检查是否对表进行了任何新的提交,并悄悄地更新表以反映这些更改,然后在新更新的表上重试用户2的提交(不进行任何数据处理),最后成功提交 000002.json。 在绝大多数情况下,这种和解是悄无声息地、天衣无缝地、成功地进行的。但是,如果 Delta Lake 无法乐观地解决不可调和的问题(例如,如果用户1删除了用户2也删除的文件),那么惟一的选择就是抛出一个错误。最后要注意的是,由于在 Delta Lake 表上进行的所有事务都直接存储到磁盘中,因此这个过程满足 ACID 持久性的特性,这意味着即使在系统发生故障时,它也会保持。 其他用户案例 时间旅行(Time Travel) 每个表都是事务日志中记录的所有提交的总和的结果—不多也不少。事务日志提供了一步一步的指导,详细描述了如何从表的原始状态转换到当前状态。 因此,我们可以通过从原始表开始重新创建表在任何时间点的状态,并且只处理在该点之前提交的数据。这种强大的功能被称为“时间旅行”,或数据版本控制,在任何情况下都是救星。 数据血统(Data Lineage)和调试 作为对 Delta Lake 表所做的每个更改的最终记录,事务日志为用户提供了可验证的数据血统,这对于治理、审计和合规性目的非常有用。它还可以用于跟踪一个意外更改或管道中的一个 bug 的起源,以追溯到导致该更改的确切操作。用户可以运行 DESCRIBE HISTORY 来查看所做更改的元数据。 总结 在本博客中,我们深入研究 Delta Lake 事务日志的工作原理。我们讨论了: 事务日志是什么,它是如何构造的,以及提交如何作为文件存储在磁盘上; 事务日志如何作为一个单一的事实来源,允许 Delta Lake 实现原子性原则; Delta Lake 如何计算每个表的状态——包括它如何使用事务日志来跟踪最近的检查点,以及它如何解决“小文件”问题; 通过使用 Apache Spark 的强大功能来大规模处理元数据; 使用乐观并发控制允许多个并发读和写,即使在表发生更改时也是如此; Delta Lake 如何使用互斥来确保正确地线性(serialized)提交,以及在发生冲突时如何默默地重试提交。 本文翻译自:https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html 写在最后 为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds
Apache Kafka 是一个可扩展,高性能,低延迟的平台,允许我们像消息系统一样读取和写入数据。我们可以很容易地在 Java 中使用 Kafka。 Spark Streaming 是 Apache Spark 的一部分,是一个可扩展、高吞吐、容错的实时流处理引擎。虽然是使用 Scala 开发的,但是支持 Java API。 Apache Cassandra 是分布式的 NoSQL 数据库。在这篇文章中,我们将介绍如何通过这三个组件构建一个高扩展、容错的实时数据处理平台。 准备 在进行下面文章介绍之前,我们需要先创建好 Kafka 的主题以及 Cassandra 的相关表,具体如下: 在 Kafka 中创建名为 messages 的主题 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages 在 Cassandra 中创建 KeySpace 和 table CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int); 上面我们创建了名为 vocabulary 的 KeySpace,以及名为 words 的表。 添加依赖 我们使用 Maven 进行依赖管理,这个项目使用到的依赖如下: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector-java_2.11</artifactId> <version>1.5.2</version> </dependency> 数据管道开发 我们将使用 Spark 在 Java 中创建一个简单的应用程序,它将与我们之前创建的Kafka主题集成。应用程序将读取已发布的消息并计算每条消息中的单词频率。 然后将结果更新到 Cassandra 表中。整个数据架构如下: 现在我们来详细介绍代码是如何实现的。 获取 JavaStreamingContext Spark Streaming 中的切入点是 JavaStreamingContext,所以我们首先需要获取这个对象,如下: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1)); 从 Kafka 中读取数据 有了 JavaStreamingContext 之后,我们就可以从 Kafka 对应主题中读取实时流数据,如下: Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("messages"); JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams)); 我们在程序中提供了 key 和 value 的 deserializer。这个是 Kafka 内置提供的。我们也可以根据自己的需求自定义 deserializer。 处理 DStream 我们在前面只是定义了从 Kafka 中哪张表中获取数据,这里我们将介绍如何处理这些获取的数据: JavaPairDStream<String, String> results = messages .mapToPair( record -> new Tuple2<>(record.key(), record.value()) ); JavaDStream<String> lines = results .map( tuple2 -> tuple2._2() ); JavaDStream<String> words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream<String, Integer> wordCounts = words .mapToPair( s -> new Tuple2<>(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 ); 将数据发送到 Cassandra 中 最后我们需要将结果发送到 Cassandra 中,代码也很简单。 wordCounts.foreachRDD( javaRdd -> { Map<String, Integer> wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } ); 启动应用程序 最后,我们需要将这个 Spark Streaming 程序启动起来,如下: streamingContext.start(); streamingContext.awaitTermination(); 使用 Checkpoints 在实时流处理应用中,将每个批次的状态保存下来通常很有用。比如在前面的例子中,我们只能计算单词的当前频率,如果我们想计算单词的累计频率怎么办呢?这时候我们就可以使用 Checkpoints。新的数据架构如下 为了启用 Checkpoints,我们需要进行一些改变,如下: streamingContext.checkpoint("./.checkpoint"); 这里我们将 checkpoint 的数据写入到名为 .checkpoint 的本地目录中。但是在现实项目中,最好使用 HDFS 目录。 现在我们可以通过下面的代码计算单词的累计频率: JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2<String, Integer> output = new Tuple2<>(word, sum); state.update(sum); return output; } ) ); 部署应用程序 最后,我们可以使用 spark-submit 来部署我们的应用程序,具体如下: $SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar 最后,我们可以在 Cassandra 中查看到对应的表中有数据生成了。完整的代码可以参见 https://github.com/eugenp/tutorials/tree/master/apache-spark ##微信公众号和钉钉群交流为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
Zomato 是一家食品订购、外卖及餐馆发现平台,被称为印度版的“大众点评”。目前,该公司的业务覆盖全球24个国家(主要是印度,东南亚和中东市场)。本文将介绍该公司的 Food Feed 业务是如何从 Redis 迁移到 Cassandra 的。 Food Feed 是 Zomato 社交场景中不可或缺的一部分,因为它可以让我们的用户参与其中并与朋友的餐厅评论和图片保持同步,甚至可以通过这个获取餐厅提供的优惠和折扣。开始我们选择 Redis 作为消息 Feed 流的存储引擎,因为在当时的用户场景这是最好的选择。但是随着业务的发展,我们需要更高的可用性和负载支持,而 Redis 不能很好的满足这个需求。虽然我们可以通过丢失一些数据来避免系统的中断,但这不是我们想做的事情。为了确保我们的系统具有高可用性,我们不得不放弃 Redis,而选择 Cassandra 作为其替代品。 Cassandra 非常适合这个用例,因为它是分布式的,就有高可用性等。并且 Cassandra 也可以用于存储时间序列数据 - 这实际上就是我们的Feed 流。在做出这一重大改变之前,我们确实有一些 Cassandra 的使用经验,但对于像 Feed 这样重要的东西来说肯定是不够的。我们必须弄清楚如何顺利的从 Redis 过渡到 Cassandra,并像在 Redis 上那样有效地运行 Feed,并且没有停机时间。 我们开始花时间在 Cassandra 上,在前两周深入探索其配置并调整它以满足我们的要求。接下来,在最终确定 Feed 的架构之前,我们明确了一下两个情况: Feed 流信息一般只用于读取而基本上不会修改。使用 Redis 的时候,我们可以同时读取上百个 keys 而不必担心读取延迟,但是对于Cassandra 而言,连接延迟可能是读取请求过程中一个相当重要的部分。 schema 需要足够灵活,以便将来允许 Feed 中新类型的数据。鉴于我们不断迭代并致力于丰富产品体验,因此在 Feed 中添加元素和功能几乎是不可避免的。 我们花了几天时间用于收集了我们项目的数据模式以及各种用户案例,然后开始使用2个数据中心,每个数据中心有3个节点。 我们从 Redis 中迁移大概 6000万条记录到 Cassandra 中用于测试其性能。由于是测试阶段,我们只将一部分流量切入到 Cassandra ,并准备了两个版本的代码,分别写入到 Cassandra 和 Redis 。架构图如下 我们监控系统的延迟和其他问题,令人惊讶的是,我们遇到了写入吞吐量的瓶颈问题。 我们知道 Cassandra 的写入能力非常强,但是我们无法实现我们在各种博客文章和文章中阅读的写入吞吐量。 我们知道出了什么问题,但我们不知道是什么。我们从三个节点中获得的最佳结果是每秒1500次写入,这完全不能满足线上的需求,我们不得不在几个小时内回滚并重新评估。 经过几天的排查,我们意识到问题不在于 Cassandra,而在于 Elastic Block Store(EBS)。EBS是安装在每个EC2实例上的网络驱动器,具有10 Gigabits 的共享带宽和网络流量。当在单个EC2实例上的所有用户之间共享时,该带宽成为我们的瓶颈。为了满足这一需求,我们将数据从基于网络的EBS存储移动到同一EC2实例中的磁盘存储。然后我们在每个服务器上逐个部署由 Cassandra 提供支持的新 Food Feed,以便我们控制吞吐量。很高兴的是,这次成功了。 然后我们开始从我们的生产 Redis 服务器迁移数据(我们花了14个小时来迁移所有内容),在迁移过程中我们没有任何故障或额外负载。这就是 Redis 和 Cassandra 的强大功能。今天,我们的 Food Feed 完全运行在 Cassandra 上,我们在没有停机的情况下完成了这项工作。新的架构如下: 总而言之,通过上面这个项目,我们学到了以下几点: 在写入期间避免数据的读取。“读取”吞吐量大致保持不变,而“写入”规模与节点数量成比例; 避免数据的删除。删除意味着压缩(compaction),当它运行时,节点的资源会被占用; 延迟是一个问题。与Redis相比,Cassandra的连接延迟很高,大约是 Redis 的10x-15x。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o本文翻译自:https://www.zomato.com/blog/how-we-moved-our-food-feed-from-redis-to-cassandra
谁说 Facebook 弃用 Cassandra?相反 Facebook 拥有全世界最大的单个 Cassandra 集群部署,而且他们对 Cassandra 做了很多性能优化,包括 Cassandra on RocksDB 以提升 Cassandra 的响应时间。 在 Instagram (Instagram是Facebook公司旗下一款免费提供在线图片及视频分享的社交应用软件,于2010年10月发布。)上,我们拥有世界上最大的 Apache Cassandra 数据库部署。我们在 2012 年开始使用 Cassandra 取代 Redis ,在生产环境中支撑欺诈检测,Feed 和 Direct inbox 等产品。起初我们在 AWS 环境中运行了 Cassandra 集群,但是当 Instagram 架构发生变化时,我们将 Cassandra 集群迁移到Facebook 的基础架构中。我们对 Cassandra 的可靠性和可用性有了非常好的体验,但是在读取数据延迟方面我们觉得他需要改进。 去年 Instagram 的 Cassandra 团队开始着手这个项目,以显着减少 Cassandra 的读取延迟,我们称之为 Rocksandra。 在这篇文章中,我将描述该项目的动机,我们克服的挑战以及内部和公共云环境中的性能指标。 动机 在 Instagram 上,我们大量使用 Apache Cassandra 作为通用键值存储服务。Instagram 的大多数 Cassandra 请求都是在线的,因此为了向数亿 Instagram 用户提供可靠且响应迅速的用户体验,我们有很高的 SLA 要求。 Instagram 可靠性 SLA 保持为5-9s,这意味着在任何给定时间,请求失败率应小于0.001%。为了提高性能,我们主动监控不同 Cassandra 集群的吞吐量和延迟,尤其是 P99 读取延迟。 下面的图显示了一个生产 Cassandra 集群的客户端延迟。蓝线是平均读取延迟(5ms),橙色线是 P99 读取延迟(在25ms到60ms的范围内)。 经过调查,我们发现 JVM 垃圾收集器(GC)对延迟峰值做出了很大贡献。我们定义了一个称为 GC 停滞百分比的度量,用于衡量Cassandra 服务器执行 GC(Young Gen GC)并且无法响应客户端请求的时间百分比。结果显示我们生产 Cassandra 服务器上的 GC 停滞百分比。在最低请求流量时间窗口期间为1.25%,在高峰时段可能高达2.5%。 这表明 Cassandra 服务器实例在垃圾收集上花费 2.5% 的运行时间,而这段时间是不能服务客户端请求的。GC 开销显然对我们的 P99 延迟有很大影响,因此如果我们可以降低 GC 停顿百分比,我们将能够显着降低P99延迟。 解决办法 Apache Cassandra 是一个分布式数据库,并用 Java 编写了基于 LSM 树的存储引擎。 我们发现存储引擎中的组件,如memtable,压缩,读/写路径等,在 Java 堆中创建了很多对象,并为 JVM 产生了大量开销。为了减少来自存储引擎的GC影响,我们考虑了不同的方法,并最终决定开发C++存储引擎来替换现有的引擎。 我们不想从头开始构建新的存储引擎,因此我们决定在 RocksDB 之上构建新的存储引擎。 RocksDB 是一个开源的,高性能嵌入式键值数据库。它是用 C++ 编写的,并为 C++,C 和 Java 提供官方 API。RocksDB 针对性能进行了优化,尤其是在 SSD 等快速存储方面。它在业界广泛用作 MySQL,mongoDB 和其他流行数据库的存储引擎。 挑战 在 RocksDB 上实现新的存储引擎时,我们克服了三个主要挑战: 第一个挑战是 Cassandra 还没有可插拔的存储引擎架构,这意味着现有的存储引擎与数据库中的其他组件耦合在一起。为了在大规模重构和快速迭代之间找到平衡点,我们定义了一个新的存储引擎 API,包括最常见的读/写和流接口。这样我们就可以在 API 后面实现新的存储引擎,并将其注入 Cassandra 内部的相关代码路径。 其次,Cassandra 支持丰富的数据类型和表模式,而 RocksDB 提供纯粹的键值接口。我们仔细定义了编码/解码算法,使得我们使用 RocksDB 数据结构来构建 Cassandra 数据模型,并支持与原始 Cassandra 相同的查询语义。 第三个挑战是关于 streaming。 Streaming 在 Cassandra 这样的分布式数据库是很重要组件。每当我们从 Cassandra 集群加入或删除节点时,Cassandra 都需要在不同节点之间传输数据以平衡集群中的负载。现有的 streaming 实现基于当前存储引擎的。因此,我们必须将它们彼此分离,创建一个抽象层,并使用 RocksDB API 重新实现 streaming 传输。对于高 streaming 吞吐量,我们现在首先将数据流式传输到临时 sst 文件,然后使用 RocksDB 提取文件 API 立即将它们批量加载到 RocksDB 实例中。 性能指标 经过大约一年的开发和测试,我们已经完成了第一个版本的实现,并成功将其推广到 Instagram 中的几个生产 Cassandra 集群。在我们的一个生产集群中,P99读取延迟从60ms降至20ms。 我们还观察到该集群上的GC停滞从2.5%下降到0.3%,这减少了10倍! 我们还想验证 Rocksandra 在公共云环境中是否表现良好。我们在 AWS 环境中使用三个 i3.8 xlarge EC2 实例部署了一个 Cassandra 集群,每个实例具有 32 个核,244GB内存和 带有4个nvme闪存盘的raid0。 我们使用 NDBench 作为基准测试,并使用这个框架中的默认表模式: TABLE emp ( emp_uname text PRIMARY KEY, emp_dept text, emp_first text, emp_last text ) 我们将 250M 6KB 行数据预先加载到数据库中(每个服务器在磁盘上存储大约500GB的数据)。我们在 NDBench 中配置了128个读客户端和128个写客户端。 我们测试了不同的工作负载并测量了 avg/P99/P999 读/写延迟。如我们所见,Rocksandra 提供了更低且一致的读/写延迟。 我们还测试了一个只读工作负载并观察到,在类似的P99读取延迟(2ms)下,Rocksandra 读取吞吐量提高了10倍(Rocksandra为300K/s,C * 3.0为 30K/s)。 未来工作 我们开源了 Rocksandra 代码库和基准框架,您可以从Github下载(https://github.com/Instagram/cassandra/tree/rocks_3.0),在您自己的环境中试用! 下一步,我们正在积极开发更多 C 功能,如二级索引,修复等。我们还在开发一个 C 可插拔存储引擎架构,以便将我们的工作贡献给Apache Cassandra 社区。 本文翻译自:https://instagram-engineering.com/open-sourcing-a-10x-reduction-in-apache-cassandra-tail-latency-d64f86b43589 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
阿里云X-Pack Spark分析引擎,满足批量及流式入库、ETL、归档、复杂分析、机器学习等需求,同时内置数据工作台支持作业管理、调度、交互式查询等。随着现代云应用对正常运行时间及性能水平的要求逐步提高,已经有越来越多用户开始将注意力集中在Apache Cassandra数据库身上。 那么,为什么要选择Apache Cassandra?这套分布式OLTP数据库能够带来高可用性与线性可扩展能力。在说起Cassandra的用途时,我们可以将其理解为一套理想的客户系统实现方案——其能够保证各类应用始终可用,包括产品目录、物联网、医疗系统以及移动应用。这类项目一旦遭遇停机,企业可能面临严重的营收损失甚至失去忠诚的用户。Netflix公司早在2008年就开始使用这套开源数据库,而其做出的大力推动也真正让Cassandra引起了公众的重视。 Cassandra于2010年成为Apache软件基金会的顶级项目,而且至今仍拥有极高人气。Cassandra专业知识能够帮助我们在人才市场上获得赏识。我们不禁要问——为什么这一开源项目能够大受欢迎? Cassandra能够利用Amazon发布的Dynamo论文中所提到的独特设计成果,从而保证硬件与网络出现大规模故障时继续保证可用性。利用点对点模型,其消除了单点故障可能性,从而帮助我们在机架故障甚至是网络整体下线的情况下得以幸存。我们能够在无需影响用户体验的情况下,顺利处理整体数据中心故障。只有能够应对故障的分布式系统才是一套拥有出色设计水平的分布式系统,而在Cassandra的帮助下,我们能够承受各类意外状况,并将应对机制纳入数据库架构及功能当中。 但大家可能要问,“不过,我之前只使用过关系型数据库,过渡过程是否非常艰难?”这个问题无法一概而论。Cassandra使用的数据模型对于关系数据库管理员来说并不陌生,我们同样使用表进行数据建模,并通过CQL——Cassandra查询语言——查询数据库。不过与SQL不同,Cassandra支持更为复杂的数据结构,例如嵌套与用户定义类型。例如,相较于为某张图片创建独立的存储表,我们可以直接将该数据存储在集合中以实现更为快速的查询速度。这种作法在CQL当中非常自然,而对应的图片表中则包含其名称、URL以及喜爱该图片的用户信息。 在高性能系统当中,毫秒级别的差异可能决定用户的实际体验与去留。然而,资源成本高昂的JOIN操作限制了我们的向外扩展通彻。通过对数据进行非规范化处理,我们能够尽可能降低请求数量,从而大幅降低磁盘空间成本并实现可预测的高性能应用。 当然,我们能够存储的绝不仅仅是图片数据。Cassandra针对高写入吞吐量进行了优化,这意味着其能够完美地处理大数据应用。时间序列与物联网用例的快速增长要求我们不断寻求新的方法以收集数据并改进数据的应用技术。 这就带来了新的问题:我们已经能够以现代化且具备成本效益的方式存储数据,但如何进一步提升处理能力?换言之,在数据收集完成后,我们该如何加以运用?我们如何有效分析数百TB数据?我们又该如何以秒为单位实时利用信息进行决策?Apache Spark正是问题的答案。 Spark可谓大数据处理的下一场革命。Hadoop与Mapreduce属于第一代革命性项目,它们让我们得以立足于大数据层面实现数据收集。而Spark则能够大幅提高性能并降低代码计算的复杂性,从而实现前所未有的数据分析能力。在Spark的帮助下,我们可以完成大量批处理计算,针对数据流处理结果进行反应并通过机器学习机制做出明智决策,最终利用遍历与递归理解复杂的信息。其目标不光是为客户提供更为快速可靠的应用连接能力(这部分效果由Cassandra负责实现),同时也需要利用信息做出业务决策地更好地满足客户需求。 大家可以点击此处查阅Spark-Cassandra Connector方案说明(开源),我们也强烈建议各位在DataStax Academy上查找免费的自学教程。 希望大家能够享受这段技术学习之旅!如果大家希望了解更多,也可参阅我们的OSCON教程,其中包含大量与Cassandra与Spark相关的内容。 原文标题:An introduction to data processing with Cassandra and Spark 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
Discord 是一款国外的类似 YY 的语音聊天软件。Discord 语音聊天软件及我们的 UGC 内容的增长速度比想象中要快得多。随着越来越多用户的加入,带来了更多聊天消息。2016 年 7 月,每天大约有 4 千万条消息;2016 年 12 月,每天超过亿条。当写这篇文章时(2017 年 1 月),每天已经超过 1.2 亿条了。 我们早期决定永久保存所有用户的聊天历史记录,这样用户可以随时在任何设备查找他们的数据。这是一个持续增长的高并发访问的海量数据,而且需要保持高可用。如何才能搞定这一切?我们的经验是选择 Cassandra 作为数据库! 我们在做什么 Discord 语音聊天软件的最初版本在 2015 年只用了两个月就开发出来。在那个阶段,MongoDB 是支持快速迭代最好的数据库之一。所有 Discord 数据都保存在同一个 MongoDB 集群中,但在设计上我们也支持将所有数据很容易地迁移到一种新的数据库(我们不打算使用 MongoDB 数据库的分片,因为它使用起来复杂以及稳定性不好)。 实际上这是我们企业文化的一部分:快速搭建来验证产品的特性,但也预留方法来支持将它升级到一个更强大的版本。 消息保存在 MongoDB 中,使用 channel_id 和 created_at 的单一复合索引。到 2015 年 11 月,存储的消息达到了 1 亿条,这时,原来预期的问题开始出现:内存中再也放不下所有索引及数据,延迟开始变得不可控,是时候迁移到一个更适合这个项目的数据库了。 选择正确的数据库 在选择一个新的数据库之前,我们必须了解当前的读/写模式,以及我们目前的解决方案为什么会出现问题。 很显然,我们的读取是非常随机的,我们的读/写比为 50 / 50。 语音聊天服务器:它只处理很少的消息,每隔几天才发几条信息。在一年内,这种服务器不太可能达到 1000 条消息。它面临的问题是,即使请求量很小,它也很难高效,单返回 50 条消息给一个用户,就会导致磁盘中的许多次随机查找,并导致磁盘缓存淘汰。 私信聊天服务器:发送相当数量的消息,一年下来很容易达到 10 万到 100 万条消息。他们请求的数据通常只是最近的。它们的问题是,数据由于访问得不多且分散,因此不太可能被缓存在磁盘中。 大型公共聊天服务器:发送大量的消息。他们每天有成千上万的成员发送数以千计的消息,每年可以轻松地发送数以百万计的消息。他们几乎总是在频繁请求最近一小时的消息,因此数据可以很容易地被磁盘缓存命中。 我们预计在未来的一年,将会给用户提供更多随机读取数据的功能:查看 30 天内别人提及到你的消息,然后点击到某条历史记录消息,查阅标记(pinned)的消息以及全文搜索等功能。这一切导致更多的随机读取!! 接下来我们来定义一下需求: 线性可扩展性 - 我们不想等几个月又要重新考虑新的扩展方案,或者是重新拆分数据。 自动故障转移 (failover) - 我们不希望晚上的休息被打扰,当系统出现问题我们希望它尽可能的能自动修复。 低维护成本 - 一配置完它就能开始工作,随着数据的增长时,我们要需要简单增加机器就能解决。 已经被验证过的技术 - 我们喜欢尝试新的技术,但不要太新。 可预测的性能 - 当 API 的响应时间 95% 超过 80ms 时也无需警示。我们也不想重复在 Redis 或 Memcached 增加缓存机制。 非二进制存储 - 由于数据量大,我们不太希望写数据之前做一些读出二进制并反序列化的工作。 开源 - 我们希望能掌控自己的命运,不想依靠第三方公司。 Cassandra 是唯一能满足我们上述所有需求的数据库。我们可以添加节点来扩展它,添加过程不会对应用程序产生任何影响,也可以容忍节点的故障。一些大公司如 Netflix 和苹果,已经部署有数千个 Cassandra 节点。数据连续存储在磁盘上,这样减少了数据访问寻址成本,且数据可以很方便地分布在集群上。它依赖 DataStax,但依旧是开源和社区驱动的。 做出选择后,我们需要证明它实际上是可行的。 数据模型 向一个新手描述 Cassandra 数据库最好的办法,是将它描述为 KKV 存储,两个 K 构成了主键。第一个 K 是分区键(partition key),用于确定数据存储在哪个节点上,以及在磁盘上的位置。一个分区包含很多行数据,行的位置由第二个 K 确定,这是聚类键(clustering key),聚类键充当分区内的主键,以及决定了数据行如何排序。可以将分区视为有序字典。这些属性相结合,可以支持非常强大的数据建模。 前面提到过,消息在 MongoDB 中的索引用的是 channel_id 和 created_at,由于经常查询一个 channel 中的消息,因此 channel_id 被设计成为分区键,但 created_at 不作为一个大的聚类键,原因是系统内多个消息可能具有相同的创建时间。 幸运的是,Discord 系统的 ID 使用了类似 Twitter Snowflake [1] 的发号器(按时间粗略有序),因此我们可以使用这个 ID。主键就变成( channel_id, message_id), message_id 是 Snowflake 发号器产生。当加载一个 channel 时,我们可以准确地告诉 Cassandra 扫描数据的范围。 下面是我们的消息表的简化模式。 CREATE TABLE messages ( channel_id bigint, message_id bigint, author_id bigint, content text, PRIMARY KEY (channel_id, message_id) ) WITH CLUSTERING ORDER BY (message_id DESC); Cassandra 的 schema 与关系数据库模式有很大区别,调整 schema 非常方便,不会带来任何临时性的性能影响。因此我们获得了最好的二进制存储和关系型存储。 当我们开始向 Cassandra 数据库导入现有的消息时,马上看见出现在日志上的警告,提示分区的大小超过 100MB。发生了什么?!Cassandra 可是宣称单个分区可以支持 2GB!显然,支持那么大并不意味着它应该设成那么大。 大的分区在进行压缩、集群扩容等操作时会对 Cassandra 带来较大的 GC 压力。大分区也意味着它的数据不能分布在集群中。很明显,我们必须限制分区的大小,因为一个单一的 channel 可以存在多年,且大小不断增长。 我们决定按时间来归并我们的消息并放在一个 bucket 中。通过分析最大的 channel,我们来确定 10 天的消息放在一个 bucket 中是否会超过 100mb。Bucket 必须从 message_id 或时间戳来归并。 DISCORD_EPOCH = 1420070400000 BUCKET_SIZE = 1000 * 60 * 60 * 24 * 10 def make_bucket(snowflake): if snowflake is None: timestamp = int(time.time() * 1000) - DISCORD_EPOCH else: # When a Snowflake is created it contains the number of # seconds since the DISCORD_EPOCH. timestamp = snowflake_id >> 22 return int(timestamp / BUCKET_SIZE) def make_buckets(start_id, end_id=None): return range(make_bucket(start_id), make_bucket(end_id) + 1) Cassandra 数据库的分区键可以复合,所以我们新的主键成为 (( channel_id, bucket), message_id)。 CREATE TABLE messages ( channel_id bigint, bucket int, message_id bigint, author_id bigint, content text, PRIMARY KEY ((channel_id, bucket), message_id) ) WITH CLUSTERING ORDER BY (message_id DESC); 为了方便查询最近的消息,我们生成了一个从当前时间到 channel_id(也是 Snowflake 发号器生成,要比第一个消息旧)的 bucket。然后我们依次查询分区直到收集到足够的消息。这种方法的缺点是,不活跃的 channel 需要遍历多个 bucket 从而收集到足够返回的消息。在实践中,这已被证明还行得通,因为对于活跃的 channel,查询第一个 bucket 就可以返回足够多的数据。 将消息导入到 Cassandra 数据库十分顺利,我们准备尝试迁移到生产环境。 冒烟启动 在生产环境引入新系统总是可怕的,因此最好在不影响用户的前提下先进行测试。我们将代码设置成双读/写到 MongoDB 和 Cassandra。 一启动系统我们就收到 bug 追踪器发来的错误信息,错误提示 author_id 为 null。怎么会是 null ?这是一个必需的字段!在解释这个问题之前,先介绍一下问题的背景。 最终一致性 Cassandra 是一个 AP 数据库,这意味着它牺牲了强一致性(C)来换取可用性(A),这也正是我们所需要的。在 Cassandra 中读写是一个反模式(读比写的代价更昂贵)。你也可以写入任何节点,在 column 的范围,它将使用“last write wins”的策略自动解决写入冲突,这个策略对我们有何影响?请看下面动画。 在例子中,一个用户编辑消息时,另一个用户删除相同的消息,当 Cassandra 执行 upsert 之后,我们只留下了主键和另外一个正在更新文本的列。 有两个可能的解决方案来处理这个问题: 编辑消息时,将整个消息写回。这有可能找回被删除的消息,但是也增加了更多数据列冲突的可能。 能够判断消息已经损坏时,将其从数据库中删除。 我们选择第二个选项,我们按要求选择一列(在这种情况下, author_id),如果消息是空的就删除。 在解决这个问题时,我们也注意到我们的写入效率很低。由于 Cassandra 被设计为最终一致性,因此执行删除操作时不会立即删除数据,它必须复制删除到其他节点,即使其他节点暂时不可用,它也照做。 Cassandra 为了方便处理,将删除处理成一种叫“墓碑”的写入形式。在处理过程中,它只是简单跳过它遇到的墓碑。墓碑通过一个可配置的时间而存在(默认 10 天),在逾期后,会在压缩过程中被永久删除。 删除列以及将 null 写入列是完全相同的事情。他们都产生墓碑。因为所有在 Cassandra 数据库中的写入都是更新插入(upsert),这意味着哪怕第一次插入 null 都会生成一个墓碑。 实际上,我们整个消息数据包含 16 个列,但平均消息长度可能只有了 4 个值。这导致新插入一行数据没缘由地将 12 个新的墓碑写入至 Cassandra 中。 解决这个问题的方法很简单:只给 Cassandra 数据库写入非空值。 性能 Cassandra 以写入速度比读取速度要快著称,我们观察的结果也确实如此。写入速度通常低于 1 毫秒而读取低于 5 毫秒。我们观察了数据访问的情况,性能在测试的一周内保持了良好的稳定性。没什么意外,我们得到了我们所期望的数据库。 说到快速、一致的读取性能,这里有一个例子,跳转到某个上百万条消息的 channel 的一年前的某条消息,请看动画 巨大的意外 一切都很顺利,因此我们将它切换成我们的主数据库,然后在一周内淘汰掉 MongoDB。Cassandra 工作一切正常,直到 6 个月后有一天,Cassandra 突然变得反应迟钝。我们注意到 Cassandra 开始出现 10 秒钟的 GC 全停顿(Stop-the-world) ,但是我们不知道原因。 我们开始定位分析,发现加载某个 channel 需要 20 秒。一个叫 “Puzzles & Dragons Subreddit” 的公共 channel 是罪魁祸首。因为它是一个开放的 channel,因此我们也跑进去探个究竟。 令我们惊讶的是,channel 里只有 1 条消息。我们也了解到他们用我们的 API 删除了数百万条消息,只在 channel 中留下了 1 条消息。 上文提到 Cassandra 是如何用墓碑(在最终一致性中提及过)来处理删除动作的。当一个用户载入这个 channel,虽然只有 1 条的消息,Cassandra 不得不扫描百万条墓碑(产生垃圾的速度比虚拟机收集的速度更快)。 我们通过如下措施解决: 因为我们每晚都会运行 Cassandra 数据库修复(一个反熵进程),我们将墓碑的生命周期从 10 天降低至 2 天。 我们修改了查询代码,用来跟踪空的 buckets,并避免他们在未来的 channel 中加载。这意味着,如果一个用户再次触发这个查询,最坏的情况,Cassandra 数据库只在最近的 bucket 中进行扫描。 未来 我们目前在运行着一个复制因子是 3 的 12 节点集群,并根据业务需要持续增加新的节点,我相信这种模式可以支撑很长一段时间。但随着 Discord 软件的发展,相信有一天我们可能需要每天存储数十亿条消息。 Netflix 和苹果都维护了运行着数千个节点的集群,所以我们知道目前这个阶段不太需要顾虑太多。当然我们也希望有一些点子可以未雨绸缪。 近期工作 将我们的消息集群从 Cassandra 2 升级到 Cassandra 3。Cassandra 3 有一个新的存储格式,可以将存储大小减少 50% 以上。新版 Cassandra 单节点可以处理更多数据。目前,我们在每个节点存储了将近 1TB 的压缩数据。我们相信我们可以安全地扩展到 2TB,以减少集群中节点的数量。 长期工作 尝试下 Scylla [4],它是一款用 C++ 编写与 Cassandra 兼容的数据库。在正常操作期间,我们 Cassandra 节点实际上是没有占用太多的 CPU,然而在非高峰时间,当我们运行修复(一个反熵进程)变得相当占用 CPU,同时,继上次修复后,修复持续时间和写入的数据量也增大了许多。 Scylla 宣称有着极短的修复时间。 将没使用的 Channel 备份成谷歌云存储上的文件,并且在有需要时可以加载回来。我们其实也不太想做这件事,所以这个计划未必会执行。 结论 切换之后刚刚过去一年,尽管经历过“巨大的意外”,一切还是一帆风顺。从每天 1 亿条消息到目前超过 1.2 亿条,一直保持着良好的性能和稳定性。由于这个项目的成功,因此我们将生产环境的其他数据也迁移到 Cassandra,并且也取得了成功。 英文原文:https://blog.discordapp.com/how-discord-stores-billions-of-messages-7fa6ec7ee4c7 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
我们在《Apache Cassandra 简介》文章中介绍了 Cassandra 的数据模型类似于 Google 的 Bigtable,对应的开源实现为 Apache HBase,而且我们在 《HBase基本知识介绍及典型案例分析》 文章中简单介绍了 Apache HBase 的数据模型。按照这个思路,Apache Cassandra 的数据模型应该和 Apache HBase 的数据模型很类似,那么这两者的数据存储模型是不是一样的呢?本文将为大家解答这些问题。我们从 KeySpace -> Table -> Partition -> Row -> Cell 顺序介绍。本文基于 Apache Cassandra 3.11.4 源码进行介绍的,不同版本可能有些不一样。 Table & KeySpace Cassandra 中的 KeySpace 概念和 RDBMS 里面的 DataBase 概念很类似,一个 KeySpace 包含多张表,一般将有关联的数据表放到同一个 KeySpace 下面。KeySpace 创建的时候可以指定副本策略,副本因子以及是否启用 CommitLog 机制(类似 HBase 中的 WAL)。 Cassandra 中表的概念和 RDBMS 很类似。不同的是在 Cassandra 中属于同一张表的数据在物理上是分布在不同节点上存储的,同一张表由多个 Partition 组成。 Partitions Cassandra 一般是由多台节点组成的,每台节点负责一定范围的,如果使用 Murmur3hash 的时候,每个节点负责的 Token 类似于下面那样: 所以 Token 范围为 -9223372036854775808 ~ -4611686018427387904 的数据存储在 A 节点;同理,Token 范围为 -4611686018427387903 ~ -1 之间的数据存储在 B节点,其他类似;每个 Token 范围由多个 Partition 构成,每个 Partition 由一行或多行数据组成,Partition 类似下面的:其中,username 为 Partition key;type 为 Clustering key。那么在这种情况下,username = iteblog 的两条数据构成一个 Partition;另外两条构成分别构成两个 Partitions。在底层存储每个 Partition 格式如下:从上图可以看出,一个 Partition 是由 PartitionHeader、零个或多个 Row (格式在后面介绍)以及 EndPartition 三部分组成的。EndPartition 这个用于标记 Partition 结束,只占用一个字节,并使用 0x00000001 标记。PartitionHeader 的格式如下: Partition Key 就是我们建表的时候指定的,由于 Partition Key 长度使用两字节表示,所以 Cassandra 中 Partition Key 长度必须小于等于 65535 字节。 Local Delete Time 是删除发生时的服务器时间(以秒为单位),与 gc_grace_seconds 进行比较以确定何时可以清除它。当与 TTL 一起使用时,localDeletionTime 是数据到期的时间。共占四个字节; Marked For Delete At 记录删除的时间戳,时间戳小于此值的数据被视为已删除,共占用八字节。 Static Row:如果我们建表的时候有 Static 字段,那么标记为 Static 的列会在这里存储。从这里也可以看出,partition key 相同的数据 Static 列只会保存一份数据。 在底层存储中,多个 Partition 组成一个 SSTable(Sorted-String Table)文件。那么同一个 SSTable 文件中的数据数据是如何组织的呢?答案是按照 Partition Key 计算得到的 Token 升序排序的。 Row 上面看出,Partition 里面包含了零个或多个 Row,这些 Row 对应的 Partition Key 是一样的。非 Static 的 Row 在磁盘存储的格式如下:上面除了 flags 、Row Body Size 、 Previous Row Body Size 以及 Primary Key Liveness Timestamp 这四个字段一定会存在,其他字段需要满足条件才会存储。下面对上面字段进行介绍: flags:Row 的标记信息,主要用于标记当前 Row 是否存在时间戳、TTL、被删除、是否包含所有的列等信息。flag 字段占用一个字节, hasExtendedFlags:当前 Row 是否含有 Static 列,存在才会有数据; Clustering info:每个 Row 包含零个或多个 Clustering 相关的信息。Clustering 信息就是我们创建表的时候指定的 Clustering key 信息。每个 Clustering Info 在持久化的时候会先存储头部信息,标记当前 Clustering key 是否为空、是否为 null 以及是否有值等信息;然后根据数据类型将值存下来,如果当前 Clustering key 的值占用字节非固定,还需要存储当前 Clustering key 值的字节数。 Row Body Size:当前 Row Body 的大小,Row Body 包含 primary key 的 liveness 信息、Row 是否删除等信息以及 Cell 的信息。 Previous Row Body Size:前一个 Row Body 的大小,这个主要用于加速反向查询的,不过当前并没有使用; Primary Key Liveness Timestamp:primary key 的 Liveness 用于确定行是否还活着或已经死了(没有 live cells 并且 liveness 为空)。这个字段主要用于存储当前 Row 的 Liveness 时间戳。注意,持久化到磁盘的时间戳是相对于当前 Memtable 最小时间戳的值。 Primary Key Liveness TTL:这个字段主要用于存储当前 Row 的 Liveness TTL 信息。也是相对于当前 Memtable 最小 TTL 的值 Primary Key Liveness LocalExpirationTime:当前 Liveness 的 ExpirationTime,也是相对时间; Row Marked For Delete At:当前 Row 的删除时间,也是相对时间,精确到毫秒; Row Local Deletion Time:当前被标记为 tombstone 时服务器的时间,也是相对时间,精确到秒; Columns Bitmap:从 Cassandra 3.x 开始,列的信息已经不保存到数据文件里面了,列的信息是保存在对应 SSTable 的 md-X-big-Statistics.db 文件中。这个字段是用于标记当前行哪些列存在,哪些列不存在。如果列存在则标记为0;如果列不存在则标记为1;如果列全部存在,直接标记为0。当表的字段数小于64个的时候,直接使用一个 long 类型的数据来存储这个 bitmap。如果大于等于64个,处理方案稍微复杂一些:先保存一个标记位,标记当前表拥有的字段个数大于等于64; 如果存在的列没有占总列数的一半,则按照全部列的顺序保存存在的列在排序后列的索引位置;如果存在的列占总列数超过一半,则按照全部列的顺序保存不存在的列在排序后列的索引位置。可见,Cassandra 通过将列的信息(包括列的名称、类型、表名、keySpace等信息)保存到对应 SSTable 的 md-X-big-Statistics.db 文件中,相应的行只保存列是否存在的标记信息,这个可以节省存储空间的占用。注意,HBase 存储数据的时候每个 Cell 都需要保存列名称和列族名称的。 非 Static Row 的底层存储格式已经在前面描述过,对于 Static Row 除了没有上图的 Clustering info 信息,其余都一样,所以这里就不介绍了。 上图中最后有 N 个 Cell,那多个 Cell 之间的顺序是如何保证的呢?答案是按照列的名称字典顺序升序排序的。比如我们表的定义如下: CREATE TABLE iteblog ( user_id text, type text, action text, username text, age text, email text, PRIMARY KEY(user_id) ); 那么 Cell 的顺序排列如下: action -> age -> email -> type -> username 这个排序是通过 BTree 实现的,Row 的实现类为 BTreeRow。 Cell Cell 就是每列数据的底层实现,Cell 里面包含了列的定义信息,比如是否被删除、是否过期、是否设置了时间戳等。在 Cassandra 里面,Column 有 Simple 和 Complex(CASSANDRA-8099引入的) 之分。non-frozen collection 或 UDT(用户自定义类型)的列是 ComplexColumn(Complex Cell)。 Simple Cell(Simple Column)的底层格式 我们正常使用的列就是属于这种类型的,它的底层存储格式如下: flags:这个 Cell 的 flag 标记,主要用于标记当前 Cell 是否有值、是否被删除、是否过期、是否使用 Row 时间戳、是否使用 Row TTL 等信息。flag 字段占用一个字节,每位的含义代表如下: timestamp:当前 Cell 的时间戳,Cassandra 中我们可以对每列设置时间戳; deletion time:当前 Cell 的删除时间; ttl:当前 Cell 的 TTL,Cassandra 中我们可以对每列设置 TTL,代表这个 Cell 保留多长时间; value:当前 Cell 的值; Complex Cell(Complex Column)的底层格式 如果列属于 non-frozen collection 或 UDT(用户自定义类型),那么这个属于 Complex Cell,它的底层存储格式如下:可以看出,Complex Cell 和 Simple Cell 大部分很类似,下面只介绍不一样的地方: Complex Cell Marked For Delete At & Complex Cell Local Deletion Time:这两个属性和前面的类似,只不过针对 Complex Cell 而言的。 Complex Cell Counts:Complex Cell 的个数; path:当前 Cell 的路径。 在 Cassandra 中, Complex Cell 的实现类是 ComplexColumnData。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
在 Cassandra 中,当达到一定条件触发 flush 的时候,表对应的 Memtable 中的数据会被写入到这张表对应的数据目录(通过 data_file_directories 参数配置)中,并生成一个新的 SSTable(Sorted Strings Table,这个概念是从 Google 的 BigTable 借用的)。每个 SSTable 是由一系列的不可修改的文件组成,这些文件在 Cassandra 中被称为 Component。本文是基于 Cassandra 3.11.4 版本介绍的,这个版本生成的 SSTable 由以下 Component 组成: -rw-r--r-- 1 root root 43 May 3 11:55 md-1-big-CompressionInfo.db -rw-r--r-- 1 root root 155 May 3 11:55 md-1-big-Data.db -rw-r--r-- 1 root root 10 May 3 11:55 md-1-big-Digest.crc32 -rw-r--r-- 1 root root 16 May 3 11:55 md-1-big-Filter.db -rw-r--r-- 1 root root 66 May 3 11:55 md-1-big-Index.db -rw-r--r-- 1 root root 4866 May 3 11:55 md-1-big-Statistics.db -rw-r--r-- 1 root root 98 May 3 11:55 md-1-big-Summary.db -rw-r--r-- 1 root root 92 May 3 11:55 md-1-big-TOC.txt 从上面文件名可以看出,所有的 Component 都是由 md-1-big 开头的,每个文件名都是由 version - generation - SSTable Format type - Component name 组成,由 - 分割,每个字符的含义如下: version:这个代表 SSTable 存储格式的版本号,目前最新的 Cassandra 3.11.4 版本为 md,其他有效的版本为 jb、ka、la、lb、ma、mb 以及 mc 等,比如 Cassandra 3.0.8 的版本就是 mc。具体参见 org.apache.cassandra.io.sstable.format.big.BigFormat.BigVersion; generation:代表索引号,每次生成新的 SSTable 时,这个索引号都会递增; SSTable Format type:SSTable格式类型,主要有两种 BIG 和 LEGACY,但是他们的名字都是 big; Component name:代表当前文件存储的信息类型,比如 Data 代表存储的是用户实际写入的数据;Index 代表存储的是索引数据等。当前版本的 Cassandra 组件有 Data.db、Digest.crc32、CompressionInfo.db、Summary.db、Filter.db、Index.db、TOC.txt 以及 Statistics.db。 上面这些组件共同构建 SSTable,每种文件存储不同类型的数据,本文就是来介绍这些文件的作用以及文件之间的关联。 md-1-big-Data.db、md-1-big-Index.db 以及 md-1-big-Summary.db 文件介绍 md-1-big-Data.db 从名字就可以看出,md-1-big-Data.db 是存储用户插入到 Cassandra 对应表中数据的,这个文件的数据存储格式我们在 《Apache Cassandra 数据存储模型》 这篇文章中已经详细介绍了。 为了有个更直观的了解,假设我们有一张名为 iteblog_test 的表,建表语句和导入数据的语句如下: create table iteblog_test(username text, type text, email text, age text, cril text, briday text ,PRIMARY KEY(username, type)); insert into iteblog_test(username, type, email) values ('iteblog', '456', 'wyphao.2007@163.com'); insert into iteblog_test(username, type, email, age) values ('iteblog', '123', 'hadoop@spark.org', '99'); insert into iteblog_test(username, type, briday) values ('cassandra', '246', '2019-04-29'); 我们对这张表执行 flush 操作,这时候底层生成的 md-1-big-Data.db 内容如下: 00000000 87 00 00 00 f2 00 00 07 69 74 65 62 6c 6f 67 7f |........iteblog.| 00000010 ff ff ff 80 00 01 00 fb 3d 04 00 03 31 32 33 1a |........=...123.| 00000020 15 90 86 02 08 02 39 39 08 10 68 61 64 6f 6f 70 |......99..hadoop| 00000030 40 73 70 61 72 6b 2e 6f 72 67 04 00 03 34 35 36 |@spark.org...456| 00000040 18 21 00 03 08 13 77 79 70 68 61 6f 2e 32 30 30 |.!....wyphao.200| 00000050 37 40 31 36 33 2e 63 6f 6d 01 00 09 63 61 73 73 |7@163.com...cass| 00000060 61 6e 64 72 61 58 00 f0 08 32 34 36 12 17 e0 24 |andraX...246...$| 00000070 4d 75 05 08 0a 32 30 31 39 2d 30 34 2d 32 39 01 |Mu...2019-04-29.| 00000080 ac b6 4c 9c |..L.| 00000084 从最后边那列可以看出,md-1-big-Data.db 文件存储了用户插入到 iteblog_test 表的数据,并且同一个 Partition Key 以及静态列(如果有)只会在同一个 SSTable 中存储一份;Partition Key 相同的行对应的 Clustering key 是有序排序的。比如上面 Partition Key 为 iteblog 对应了两行数据,他们在 md-1-big-Data.db 文件是按照字符串的字典顺序升序排序的。关于 md-1-big-Data.db 文件的底层存储格式可以参见 《Apache Cassandra 数据存储模型》 的说明,这里就不再详细介绍了。 md-1-big-Index.db SSTable 对应的 md-1-big-Data.db 可能会很大,为了加快查询速度,Cassandra 对 md-1-big-Data.db 文件中的每个 Partition Key 建立了相关索引数据,这个就是 md-1-big-Index.db 文件的作用了。md-1-big-Index.db 文件存储的内容其实很简单,存储的是 Partition Key 及其在 md-1-big-Data.db 文件中的起始偏移量。 md-1-big-Summary.db 如果 md-1-big-Index.db 文件也很大,也可能会影响查询速度。所以 Cassandra 引入了 md-1-big-Summary.db 文件,对索引文件 md-1-big-Index.db 进行抽样。具体过程为:每隔 128 (这个数由 DEFAULT_MIN_INDEX_INTERVAL 决定,好像不可以配置)个 Partition Key 就将对应的 Partition Key 及其在索引文件中的起始偏移量写入到 Summary.db 文件中去。同时,Summary.db 文件最后面还会存储 md-1-big-Index.db 文件中的起止 Partition Key。 这三个文件的关系 前面已经简单介绍了这三个文件的作用,为了直观的表示这是三个文件的关系,我这里画了一张图,帮助大家理解。注意:上面的 iteblogxxx 代表的是 Partition Key,并且假设这些 Partition Key 对应的 hash token 是升序的。 md-1-big-Statistics.db 包含有关 SSTable 的统计元数据的文件。主要包含当前 SSTable 的: 最大最小 token 的值及对应的 Partition Key; Partitioner,默认为 org.apache.cassandra.dht.Murmur3Partitioner,和 Bloom Filter FP chance,主要用于校验 SSTable,在读取 Data.db 之前会先被读取; SSTable 元数据,比如 SSTable 最大最小的时间戳、最大最小 local deletion time、最大最小的 TTL、SSTable 文件的压缩率、行数、列个数以及最大和最小的 ClustringValues,如果为空这保存为 0 等等 Partition Key 的类型; Clustering Key 的类型; 静态列的名字及类型; 正常类的名字及类型; Clustering Key 的类型; md-1-big-CompressionInfo.db 存储 SSTable 的压缩相关的信息,包括使用的压缩算法的名字(LZ4Compressor,SnappyCompressor 和 DeflateCompressor,SSTable 的压缩算法可以在创建表的时候通过 WITH compression = {'sstable_compression': 'SnappyCompressor' 设置,默认为 LZ4Compressor),压缩算法的配置选项,chunkLength(默认为 65536),未压缩数据的长度,chunk 的个数等相关的信息 md-1-big-Filter.db、md-1-big-Digest.crc32 md-1-big-Filter.db 存储 Partition Key 布隆过滤器相关的信息,这个文件会被加载到内存,在查询的时候可以确定某行数据是否在这个 SSTable 中,减少访问磁盘的次数。 md-1-big-Digest.crc32 这个文件存储的仅仅是数据文件的 CRC 校验码信息。 md-1-big-TOC.txt 这个文件主要存储 SSTable 的 Component 名字,所以我们打开这个文件可以看到如下的内容: Statistics.db Digest.crc32 Data.db Filter.db CompressionInfo.db Summary.db TOC.txt Index.db这个就是 SSTable 对应的八个 Component 。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
到目前为止,我们在使用 CQL 建表的时候使用到了一些数据类型,比如 text、timeuuid等。本文将介绍 Apache Cassandra 内置及自定义数据类型。和其他语言一样,CQL 也支持一系列灵活的数据类型,包括基本的数据类型,集合类型以及用户自定义数据类(User-Defined Types,UDTs)。下面将介绍 CQL 支持的数据类型。 数字数据类型(Numeric Data Types) CQL 支持的数字数据类型包括整型和浮点型,这些数据类型和 Java 的标准数据类型类似。包括以下几种: int:32位有符号整型,和 Java 中的 int 类似; bigint:64位长整型,和 Java 中的 long 类似; smallint:16位有符号整型,和 Java 中的 short 类似,Apache Cassandra 2.2 开始引入; tinyint:8位有符号整型,和 Java 中的 tinyint 类似,Apache Cassandra 2.2 开始引入; varint:可变精度有符号整数,和 Java 中的 java.math.BigInteger 类似; float:32位 IEEE-754 浮点型,和 Java 中的 float 类似; double:64位 IEEE-754 浮点型,和 Java 中的 double 类似; decimal:可变精度的 decimal,和 Java 中的 java.math.BigDecimal 类似。 文本数据类型(Textual Data Types) CQL 中提供了两种数据类型用于存放文本类型,其中一种我们在前面的几篇文章里面已经使用过了,也就是 text。 text, varchar:UTF-8编码的字符串,这个在 CQL 中使用的比较普遍; ascii:ASCII字符串。 时间和标识符数据类型(Time and Identity Data Types) timestamp:时间可以使用64位有符号的整数表示,但是一般为了可读性,我们会选择支持 ISO 8601 标准的时间戳表示。建议在使用时间戳的时候都指定时区,而不是依赖系统的时区。 date, time:在 Apache Cassandra 2.1 版本之前只支持 timestamp 类型,里面包含了日期和时间;从 Cassandra 2.2 版本开始引入了 date 和 time 时间类型,分别表示日期和时间。和 timestamp 一样,这个也是支持 ISO 8601 标准的。 uuid:通用唯一识别码(universally unique identifier,UUID)是128位数据类型,其实现包含了很多种类型,其中最有名的为 Type 1 和 Type 4。CQL 中的 uuid 实现是 Type 4 UUID,其实现完全是基于随机数的。UUID 的数据类似于 ab7c46ac-c194-4c71-bb03-0f64986f3daa,uuid 类型通常用作代理键,可以单独使用,也可以与其他值组合使用。由于 UUID 的长度有限,因此并不能绝对保证它们是唯一的。我们可以在 CQL 中使用 uuid() 获取 Type 4 UUID。 timeuuid:这个是 Type 1 UUID,它的实现基于计算机的 MAC 地址,系统时间和用于防止重复的序列号。CQL 中提供了 now(), dateOf() 以及 unixTimestampOf() 等函数来操作 timeuuid 数据类型。由于这些简便的函数,timeuuid 的使用频率比 uuid 数据类型多。 集合数据类型 set 这种数据类型可以存储集合数据类型,set 里面的元素存储是无序的,但是 cql 返回的数据是有序的。set 里面可以存储前面介绍的数据类型,也可以是用户自定义数据类型,甚至是其他集合类型。 为了介绍这个类型的使用,我们使用 《Apache Cassandra 快速入门指南(Quick Start)》 文章中的 iteblog_user 表进行说明。假设我们需要在这张表里面添加 email 信息,如下: cqlsh:iteblog_keyspace> ALTER TABLE iteblog_user ADD emails set<text>; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name ------------+--------+----------- Wu | null | Shi (1 rows) cqlsh:iteblog_keyspace> UPDATE iteblog_user SET emails = {'iteblog@iteblog.com'} WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name ------------+-------------------------+----------- Wu | {'iteblog@iteblog.com'} | Shi (1 rows) 上面我们给 first_name 为 Wu 的数据添加了 email 信息。如果我们还需要往里面加一些 email 信息,可以使用下面语法进行: cqlsh:iteblog_keyspace> UPDATE iteblog_user SET emails = emails + {'cassandra@iteblog.com' } WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name ------------+--------------------------------------------------+----------- Wu | {'cassandra@iteblog.com', 'iteblog@iteblog.com'} | Shi (1 rows) 可见 first_name 为 Wu 的记录已经添加了两条 email 信息了。当然,如果我们需要删除 email,可以使用下面语法进行: cqlsh:iteblog_keyspace> UPDATE iteblog_user SET emails = emails - {'cassandra@iteblog.com'} WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name ------------+-------------------------+----------- Wu | {'iteblog@iteblog.com'} | Shi (1 rows) cqlsh:iteblog_keyspace> UPDATE iteblog_user SET emails ={} WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name ------------+--------+----------- Wu | null | Shi (1 rows) 上面我们使用 SET emails = emails - {'cassandra@iteblog.com'} 从用户 email 列表里面删除 email,使用 SET emails ={} 清空用户的 email。 list list 包含了有序的列表数据,默认情况下,数据是按照插入顺序保存的。我们还是使用 iteblog_user 进行说明,比如我们想往这张表里面添加电话等信息,操作如下: cqlsh:iteblog_keyspace> ALTER TABLE iteblog_user ADD phone list<text>; cqlsh:iteblog_keyspace> UPDATE iteblog_user SET phone = ['13112345678' ] WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name | phone ------------+--------+-----------+----------------- Wu | null | Shi | ['13112345678'] (1 rows) 上面我们给 first_name 为 Wu 的记录添加了电话信息,如果需要再添加电话信息,其操作和 set 添加信息类似,如下: cqlsh:iteblog_keyspace> UPDATE iteblog_user SET phone = phone + ['15511112222' ] WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name | phone ------------+--------+-----------+-------------------------------- Wu | null | Shi | ['13112345678', '15511112222'] (1 rows) 可见,新加入的电话号码被放在 list 的后面了。当然,如果我们使用下面的语句,可以往电话号码的前面添加信息: cqlsh:iteblog_keyspace> UPDATE iteblog_user SET phone = ['13344448888' ] + phone WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name | phone ------------+--------+-----------+----------------------------------------------- Wu | null | Shi | ['13344448888', '13112345678', '15511112222'] (1 rows) 我们可以使用下标从 list 数据类型里面修改数据: cqlsh:iteblog_keyspace> UPDATE iteblog_user SET phone[1] = '18888888888' WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name | phone ------------+--------+-----------+----------------------------------------------- Wu | null | Shi | ['13344448888', '18888888888', '15511112222'] (1 rows) 下标为 1 的元素被修改了。也可以使用下标删除数据: cqlsh:iteblog_keyspace> DELETE phone[2] from iteblog_user WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name = 'Wu'; first_name | emails | last_name | phone ------------+--------+-----------+-------------------------------- Wu | null | Shi | ['13344448888', '18888888888'] (1 rows) 当然,删除元素也可以使用 SET phone_numbers = phone_numbers - [ '13344448888' ],这里就不演示了。 map map 数据类型包含了 key/value 键值对。key 和 value 可以是任何类型,除了 counter 类型。使用如下: cqlsh:iteblog_keyspace> ALTER TABLE iteblog_user ADD login_sessions map<timeuuid, int>; cqlsh:iteblog_keyspace> UPDATE iteblog_user SET login_sessions = {now(): 13, now(): 18} WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT first_name, login_sessions FROM iteblog_user WHERE first_name = 'Wu'; first_name | login_sessions ------------+-------------------------------------------------------------------------------------- Wu | {1cc61ff0-5f8b-11e9-ac3a-5336cd8118f6: 13, 1cc61ff1-5f8b-11e9-ac3a-5336cd8118f6: 18} (1 rows) 其他简单数据类型 boolean:值只能为 true/false,在 cql 中输入的这两个值无论大小如何写法,其输出都是 True/False; blob:二进制大对象(binary large object)是任意字节数组的术语简称。这个类型在存储媒体或者其他二进制数据类型时很有用,Cassandra 并不会检查其中存储的二进制数据是否有效。Cassandra 中二进制大对象是以十六进制存储的,如果我们想将任意的文本数据类型使用 blob 存储,可以使用 textAsBlob() 函数实现。 inet:这个数据类型可以表示 IPv4 或 IPv6 网络地址。cqlsh 接受用于定义 IPv4 地址的任何合法格式,包括包含十进制,八进制或十六进制值的点或非点式表示。CQL 统一输出为 1.1.1.1 这种 ip 地址形式。 counter:计数器数据类型是64位有符号整数,其值不能直接设置,而只能递增或递减。计数器类型的使用有一些特殊限制,它不能用作主键的一部分;如果使用计数器,则除primary key 列之外的所有列都必须是计数器。 用户自定义数据类型(User-Defined Types) Cassandra 中如果内置的数据类型无法满足我们的需求,我们可以使用自定义数据类型的功能。比如我们想用一个字段存储用户的地址信息,然后我们需要获取地址的邮编、街道等信息,如果使用 text 来存储是不能满足我们的需求的。这时候就可以自己定义数据类型,如下: cqlsh:iteblog_keyspace> CREATE TYPE address ( ... street text, ... city text, ... state text, ... zip_code int); 上面我们定义了 address 数据类型。需要注意的是,Cassandra 中数据类型的定义是 keyspace 范围的,也就是说, address 数据类型只能在 iteblog_keyspace 里面使用。如果我们使用 DESCRIBE KEYSPACE iteblog_keyspace,可以看到 address 数据类型属于 iteblog_keyspace 的一部分。现在我们定义好了 address 数据类型,我们可以使用它了,如下: cqlsh:iteblog_keyspace> ALTER TABLE iteblog_user ADD addresses map<text, frozen<address>>; cqlsh:iteblog_keyspace> UPDATE iteblog_user SET addresses = addresses + {'home': { street: 'shangdi 9', city: 'Beijing', state: 'Beijing', zip_code: 100080} } WHERE first_name = 'Wu'; cqlsh:iteblog_keyspace> SELECT first_name, addresses FROM iteblog_user WHERE first_name = 'Wu'; first_name | addresses ------------+-------------------------------------------------------------------------------------- Wu | {'home': {street: 'shangdi 9', city: 'Beijing', state: 'Beijing', zip_code: 100080}} (1 rows) 可见 我们已经成功的使用了自定义数据类型了。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
假设我们有这样的场景:我们想在 Cassandra 中使用一张表记录用户基本信息(比如 email、密码等)以及用户状态更新。我们知道,用户的基本信息一般很少会变动,但是状态会经常变化,如果每次状态更新都把用户基本信息都加进去,势必会让费大量的存储空间。为了解决这种问题,Cassandra 引入了 static column。同一个 partition key 中被声明为 static 的列只有一个值的,也就是只存储一份。 定义 static column 在表中将某个列定义为 STATIC 很简单,只需要在列的最后面加上 STATIC 关键字,具体如下: CREATE TABLE "iteblog_users_with_status_updates" ( "username" text, "id" timeuuid, "email" text STATIC, "encrypted_password" blob STATIC, "body" text, PRIMARY KEY ("username", "id") ); iteblog_users_with_status_updates 表中我们将 email 和 encrypted_password 两个字段设置为 STATIC 了,这意味着同一个 username 只会有一个 email 和 encrypted_password 。 注意,不是任何表都支持给列加上 STATIC 关键字的,静态列有以下限制。 1、如果表没有定义 Clustering columns(又称 Clustering key),这种情况是不能添加静态列的。如下: cqlsh:iteblog_keyspace> CREATE TABLE "iteblog_users_with_status_updates_invalid" ( ... "username" text, ... "id" timeuuid, ... "email" text STATIC, ... "encrypted_password" blob STATIC, ... "body" text, ... PRIMARY KEY ("username") ... ); InvalidRequest: Error from server: code=2200 [Invalid query] message="Static columns are only useful (and thus allowed) if the table has at least one clustering column" iteblog_users_with_status_updates_invalid 表只有 PRIMARY KEY,没有定义 clustering column,不支持创建 Static columns。这是因为静态列在同一个 partition key 存在多行的情况下才能达到最优情况,而且行数越多效果也好。但是如果没有定义 clustering column,相同 PRIMARY KEY 的数据在同一个分区里面只存在一行数据,本质上就是静态的,所以没必要支持静态列。 2、如果建表的时候指定了 COMPACT STORAGE,这时候也不允许存在静态列: cqlsh:iteblog_keyspace> CREATE TABLE "iteblog_users_with_status_updates_invalid" ( ... "username" text, ... "id" timeuuid, ... "email" text STATIC, ... "encrypted_password" blob STATIC, ... "body" text, ... PRIMARY KEY ("username", "id") ... )WITH COMPACT STORAGE; InvalidRequest: Error from server: code=2200 [Invalid query] message="Static columns are not supported in COMPACT STORAGE tables" 3、如果列是 partition key/Clustering columns 的一部分,那么这个列不能说明为静态列: cqlsh:iteblog_keyspace> CREATE TABLE "iteblog_users_with_status_updates_invalid" ( ... "username" text, ... "id" timeuuid STATIC, ... "email" text STATIC, ... "encrypted_password" blob STATIC, ... "body" text, ... PRIMARY KEY ("username", "id") ... ); InvalidRequest: Error from server: code=2200 [Invalid query] message="Static column id cannot be part of the PRIMARY KEY" cqlsh:iteblog_keyspace> CREATE TABLE "iteblog_users_with_status_updates_invalid" ( ... "username" text, ... "id" timeuuid, ... "email" text STATIC, ... "encrypted_password" blob STATIC, ... "body" text, ... PRIMARY KEY (("username", "id"), email) ... ); InvalidRequest: Error from server: code=2200 [Invalid query] message="Static column email cannot be part of the PRIMARY KEY" 给静态列的表插入数据 含有静态列的表插入数据和正常表类似,比如我们现在往 iteblog_users_with_status_updates 导入数据: cqlsh:iteblog_keyspace> INSERT INTO "iteblog_users_with_status_updates" ... ("username", "id", "email", "encrypted_password", "body") ... VALUES ( ... 'iteblog', ... NOW(), ... 'iteblog_hadoop@iteblog.com', ... 0x877E8C36EFA827DBD4CAFBC92DD90D76, ... 'Learning Cassandra!' ... ); cqlsh:iteblog_keyspace> select username, email, encrypted_password, body from iteblog_users_with_status_updates; username | email | encrypted_password | body ----------+----------------------------+------------------------------------+--------------------- iteblog | iteblog_hadoop@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 | Learning Cassandra! (1 rows) 我们成功的插入一条数据了。但是上面的插入语句做了两件事: 所有 username 为 iteblog 数据中的 email 和 encrypted_password 都被设置为 iteblog_hadoop@iteblog.com 和 0x877e8c36efa827dbd4cafbc92dd90d76 了。 在 iteblog 所在的分区中新增了 body 内容为 Learning Cassandra! 的记录。现在我们再往表中插入一条数据,如下: cqlsh:iteblog_keyspace> INSERT INTO "iteblog_users_with_status_updates" ... ("username", "id", "body") ... VALUES ('iteblog', NOW(), 'I love Cassandra!'); cqlsh:iteblog_keyspace> select username, email, encrypted_password, body from iteblog_users_with_status_updates; username | email | encrypted_password | body ----------+----------------------------+------------------------------------+--------------------- iteblog | iteblog_hadoop@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 | Learning Cassandra! iteblog | iteblog_hadoop@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 | I love Cassandra! (2 rows) cqlsh:iteblog_keyspace> 可以看到,这次插入数据的时候,我们并没有指定 email 和 encrypted_password,但是从查询结果可以看出,新增加的行 email 和 encrypted_password 的值和之前是一样的! 现在由于某些原因,用户修改了自己的 email,我们来看看会发生什么事: cqlsh:iteblog_keyspace> UPDATE iteblog_users_with_status_updates SET email = 'iteblog@iteblog.com' ... WHERE username = 'iteblog'; cqlsh:iteblog_keyspace> select username, email, encrypted_password, body from iteblog_users_with_status_updates; username | email | encrypted_password | body ----------+---------------------+------------------------------------+--------------------- iteblog | iteblog@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 | Learning Cassandra! iteblog | iteblog@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 | I love Cassandra! (2 rows) 从上面查询这输出的结果可以看出, username 为 iteblog 的 email 全部修改成一样的了!这就是静态列的强大之处。 现在表中存在了用户的邮箱和密码等信息,如果我们前端做了个页面支持用户修改自己的邮箱和密码,这时候我们的后台系统需要获取到现有的邮箱和密码,具体如下: cqlsh:iteblog_keyspace> SELECT "username", "email", "encrypted_password" ... FROM "iteblog_users_with_status_updates" ... WHERE "username" = 'iteblog'; username | email | encrypted_password ----------+---------------------+------------------------------------ iteblog | iteblog@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 iteblog | iteblog@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 (2 rows) 可以看出,表中有多少行 username 为 iteblog 的数据将会输出多少行邮箱和密码,这肯定不是我们想要的。这时候我们可以在查询的时候加上 DISTINCT 关键字,如下: cqlsh:iteblog_keyspace> SELECT DISTINCT "username", "email", "encrypted_password" ... FROM "iteblog_users_with_status_updates" ... WHERE "username" = 'iteblog'; username | email | encrypted_password ----------+---------------------+------------------------------------ iteblog | iteblog@iteblog.com | 0x877e8c36efa827dbd4cafbc92dd90d76 (1 rows) 这样不管表中有多少行 username 为 iteblog 的数据,最终都会显示一行数据。注意,虽然我们加了 DISTINCT 关键字,但是 Cassandra 并不是将 username 为 iteblog 的数据全部拿出来,然后再去重的,因为静态列本来在底层就存储了一份,所以没必要去重。 静态列的意义 到这里,我们已经了解了 Cassandra 中静态列的创建、使用等。那静态列有什么意义呢?因为 Cassandra 中是不支持 join 的,静态列相当于把两张表进行了 join 操作。 那什么时候建议使用静态列呢?如果两张表关联度很大,而且我们经常需要同时查询这两张表,那这时候就可以考虑使用静态列了。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
在前面文章里面我们使用了下面语句创建了一张名为 iteblog_user 的表: cqlsh> use iteblog_keyspace; cqlsh:iteblog_keyspace> CREATE TABLE iteblog_user (first_name text , last_name text, PRIMARY KEY (first_name)) ; 建表语句里面有个 PRIMARY KEY 关键字,我们在初次使用 Apache Cassandra 的时候可能见过诸如 Composite Key、Partition key 以及 Clustering key,这么多种 key 它和上面的 PRIMARY KEY 有什么关系呢?看看本文你就明白了。 Single column Primary Key 在 Cassandra 里面,Primary Key 可以由一列或多列组成,用于从表中检索数据,如果 Primary Key 由一列组成,那么称为 Single column Primary Key,如下语句 cqlsh> use iteblog_keyspace; cqlsh:iteblog_keyspace> CREATE TABLE iteblog_user (first_name text , last_name text, PRIMARY KEY (first_name)) ; 这种情况 first_name 就是 Single column Primary Key,我们在检索数据的时候需要指定 Primary Key: cqlsh:iteblog_keyspace> select * from iteblog_user; first_name | last_name ------------+----------- Wu | Shi Zhang | San Li | Si (3 rows) cqlsh:iteblog_keyspace> select * from iteblog_user where first_name = 'Wu'; first_name | last_name ------------+----------- Wu | Shi (1 rows) cqlsh:iteblog_keyspace> select * from iteblog_user where last_name = 'Si'; InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING" 可以看出,如果查询只指定 last_name,是无法运行上面的查询的。 Composite Primary Key 如果 Primary Key 由多列组成,那么这种情况称为 Compound Primary Key 或 Composite Primary Key,如下: cqlsh:iteblog_keyspace> CREATE TABLE iteblog_user_composite (first_name text , last_name text, PRIMARY KEY (first_name, last_name)) ; 其中 first_name 称为 Partition key,last_name 称为 Clustering key(也可以称为 Clustering column)。在这种情况下,下面查询的前三条都是合法的,最后一条是非法的。 cqlsh:iteblog_keyspace> select * from iteblog_user_composite; cqlsh:iteblog_keyspace> select * from iteblog_user_composite where first_name = 'iteblog'; cqlsh:iteblog_keyspace> select * from iteblog_user_composite where first_name = 'iteblog' and last_name = 'hadoop'; //非法查询 cqlsh:iteblog_keyspace> select * from iteblog_user_composite where last_name = 'hadoop'; Partition key 和 Clustering key 也可以由多个字段组成,如果 Partition key 由多个字段组成,称之为 Composite partition key: create table iteblog_multiple ( k_part_one text, k_part_two int, k_clust_one text, k_clust_two int data text, PRIMARY KEY((k_part_one, k_part_two), k_clust_one, k_clust_two) ); 上面 k_part_one 和 k_part_two 共同组成 Partition key,k_clust_one 和 k_clust_two 共同组成 Clustering key。这种情况下有效的查询包括 select * from iteblog_multiple; select * from iteblog_multiple where k_part_one = 'iteblog' and k_part_two = 'hadoop'; select * from iteblog_multiple where k_part_one = 'iteblog' and k_part_two = 'hadoop' and k_clust_one = 'hbase'; select * from iteblog_multiple where k_part_one = 'iteblog' and k_part_two = 'hadoop' and k_clust_one = 'hbase' and k_clust_two = 'spark'; Partition key & Clustering key & Primary Key 作用 在 Cassandra 里面这么多种 key 都有什么作用呢?总结起来主要如下: Partition Key:将数据分散到集群的 node 上 Primary Key:在 Single column Primary Key 情况下作用和 Partition Key 一样;在 Composite Primary Key 情况下,组合 Partition key 字段决定数据的分发的节点; Clustering Key:决定同一个分区内相同 Partition Key 数据的排序,默认为升序,我们可以在建表语句里面手动设置排序的方式(DESC 或 ASC) 示例 这里主要介绍下 Composite Primary Key 中 Clustering Key 是如何决定同一个分区内相同 Partition Key 数据的排序。比如我们的建表语句如下: cqlsh:iteblog_keyspace> CREATE TABLE iteblog_user_composite ( ... name text, ... id timeuuid, ... content text, ... PRIMARY KEY (name, id)); cqlsh:iteblog_keyspace> 我们往表里面插入以下的数据 INSERT INTO iteblog_user_composite(name, id, content) VALUES ('iteblog', NOW(), 'I am www.iteblog.com'); INSERT INTO iteblog_user_composite(name, id, content) VALUES ('iteblog_hadoop', NOW(), 'I am iteblog_hadoop'); INSERT INTO iteblog_user_composite(name, id, content) VALUES ('iteblog', NOW(), 'Hello www.iteblog.com'); INSERT INTO iteblog_user_composite(name, id, content) VALUES ('iteblog_hadoop', NOW(), 'Hello iteblog_hadoop'); INSERT INTO iteblog_user_composite(name, id, content) VALUES ('iteblog', NOW(), 'Welcome to www.iteblog.com'); INSERT INTO iteblog_user_composite(name, id, content) VALUES ('iteblog_hadoop', NOW(), 'Welcome to iteblog_hadoop'); 我们使用下面的查询语句将所有数据查询出来: cqlsh:iteblog_keyspace> select name, content, id, UNIXTIMESTAMPOF(id) as time from iteblog_user_composite; name | content | id | time ----------------+----------------------------+--------------------------------------+--------------- iteblog_hadoop | I am iteblog_hadoop | 402eace0-5ad7-11e9-a4d4-f5d538b6a60b | 1554821615278 iteblog_hadoop | Hello iteblog_hadoop | 40303380-5ad7-11e9-a4d4-f5d538b6a60b | 1554821615288 iteblog_hadoop | Welcome to iteblog_hadoop | 40a00c50-5ad7-11e9-a4d4-f5d538b6a60b | 1554821616021 iteblog | I am www.iteblog.com | 402de990-5ad7-11e9-a4d4-f5d538b6a60b | 1554821615273 iteblog | Hello www.iteblog.com | 402f4920-5ad7-11e9-a4d4-f5d538b6a60b | 1554821615282 iteblog | Welcome to www.iteblog.com | 4030cfc0-5ad7-11e9-a4d4-f5d538b6a60b | 1554821615292 (6 rows) 从上面的查询可以看出,Partition key 为 iteblog_hadoop 和 iteblog 的数据都是按照 id 进行升序排序的,数据直接是在底层存储就排好序的,查询的时候并不需要做额外的配置,这种排列方式有利于那种只查询最近更新的几条数据的需求。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
我们在这篇文章简单介绍了 Apache Cassandra 是什么,以及有什么值得关注的特性。本文将简单介绍 Apache Cassandra 的安装以及简单使用,可以帮助大家快速了解 Apache Cassandra。 我们到 Apache Cassandra 的官方网站下载最新版本的 Cassandra,在本文写作时最新版本的 Cassandra 为 3.11.4。Apache Cassandra 可以在 Linux、Unix、Mac OS 以及 Windows 上进行安装,为了简便起见,本文以 CentOS 为例进行介绍。 下载、安装并启动 Cassandra 因为本文只是简单介绍 Apache Cassandra 的使用,所以本文仅安装单机版的 Cassandra,在生产环境下应该部署成分布式模式。可以使用下面的命令下载和解压相关的压缩文件: $ wget http://mirror.bit.edu.cn/apache/cassandra/3.11.4/apache-cassandra-3.11.4-bin.tar.gz $ tar -zxf apache-cassandra-3.11.4-bin.tar.gz $ cd apache-cassandra-3.11.4 在 apache-cassandra-3.11.4 目录下有很多文件: [iteblog@www.iteblog.com apache-cassandra-3.11.4]# ll total 528 drwxr-xr-x 2 iteblog iteblog 4096 Apr 2 21:12 bin -rw-r--r-- 1 iteblog iteblog 4832 Feb 3 06:09 CASSANDRA-14092.txt -rw-r--r-- 1 iteblog iteblog 366951 Feb 3 06:09 CHANGES.txt drwxr-xr-x 3 iteblog iteblog 4096 Apr 2 21:12 conf drwxr-xr-x 4 iteblog iteblog 4096 Apr 2 21:12 doc drwxr-xr-x 2 iteblog iteblog 4096 Apr 2 21:12 interface drwxr-xr-x 3 iteblog iteblog 4096 Apr 2 21:12 javadoc drwxr-xr-x 4 iteblog iteblog 4096 Apr 2 21:12 lib -rw-r--r-- 1 iteblog iteblog 11609 Feb 3 06:09 LICENSE.txt -rw-r--r-- 1 iteblog iteblog 112586 Feb 3 06:09 NEWS.txt -rw-r--r-- 1 iteblog iteblog 2811 Feb 3 06:09 NOTICE.txt drwxr-xr-x 3 iteblog iteblog 4096 Apr 2 21:12 pylib drwxr-xr-x 4 iteblog iteblog 4096 Apr 2 21:12 tools 各个文件或目录介绍如下: bin:这个目录下包含了启动 Cassandra 以及客户端相关操作的可执行文件,包括 query language shell(cqlsh)以及命令行界面(CLI)等客户端。同时还包含运行 nodetool 的相关脚本,操作 SSTables 的工具等等。 conf:这个目录下面包含了 Cassandra 的配置文件。必须包含的配置文件包括:assandra.yaml 以及 logback.xml,这两个文件分别是运行 Cassandra 必须包含的配置文件以及日志相关配置文件。同时还包含 Cassandra 网络拓扑配置文件等。 doc:这个目录包含 CQL 相关的 html 文档。 interface:这个文件夹下面只包含一个名为 cassandra.thrift 的文件。这个文件定义了基于 Thrift 语法的 RPC API,这个 Thrift 主要用于在 Java, C++, PHP, Ruby, Python, Perl, 以及 C# 等语言中创建相关客户端,但是在 CQL 出现之后,Thrift API 在 Cassandra 3.2 版本开始标记为 deprecated,并且会在 Cassandra 4.0 版本删除。 javadoc:这个文件夹包含使用 JavaDoc 工具生成的 html 文档。 lib:这个目录包含 Cassandra 运行时需要的所有外部库。 pylib:这个目录包含 cqlsh 运行时需要使用的 Python 库。 tools:这个目录包含用于维护 Cassandra 节点的相关工具。 NEWS.txt:这个文件包含当前及之前版本的 release notes 相关信息。 CHANGES.txt:这个文件主要包含一些 bug fixes 信息。 启动 Cassandra 上面已经简单介绍了 Cassandra 发行包里面的一些文件和目录用途。因为我们主要简单介绍 Cassandra 的使用,所以我们使用默认的配置。下面我们来启动 Cassandra 服务,具体如下: [iteblog@www.iteblog.com apache-cassandra-3.11.4]# bin/cassandra 运行上面命令会在命令行里面输出一堆的日志,但是我们如何判断 cassandra 服务已经启动了呢?答案是使用 nodetool 工具,如下: [iteblog@www.iteblog.com apache-cassandra-3.11.4]# bin/nodetool status Datacenter: datacenter1 ======================= Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Tokens Owns (effective) Host ID Rack UN 127.0.0.1 160.88 KiB 256 100.0% 49f4470f-396b-4d50-bcd6-3da2d9370167 rack1 如果我们看到上面的信息,那么我们的测试服务已经正常启动了。而且会在 apache-cassandra-3.11.4 目录下生成 data 和 logs 两个目录。 使用 CQL Shell 上面我们已经启动了 Cassandra 服务,我们可以使用 CQL Shell 来进行一些操作。从名字就可以看出,CQL(Cassandra Query Language) 其实和我们熟悉的 SQL 很类似,我们可以通过它使用类似 SQL 的语言来和 Cassandra 进行交互。需要注意的是,CQL 和 SQL 是不兼容的,CQL 缺少 SQL 的一些关键功能,比如 JOIN 等,这个在 Cassandra 下不能实现;同时,CQL 也不是 SQL 的子集。为了使用 CQL ,可以使用下面命令: [iteblog@www.iteblog.com apache-cassandra-3.11.4]# bin/cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4] Use HELP for help. cqlsh> 在启动 cqlsh 的时候我们并没有指定需要连接的节点以及端口,这种情况下 cqlsh 会自动探测本机及相关端口,因为我们在前面已经启动了 Cassandra 服务,所以 cqlsh 可以正确连接到这个集群。从上面的命令可以看出 cqlsh 连接到名为 Test Cluster 的集群,这是由 conf/cassandra.yaml 文件里面的 cluster_name 参数决定的,默认值为 Test Cluster。 当然,我们也可以在启动 cqlsh 的时候指定节点和相应的端口,如下: [iteblog@www.iteblog.com apache-cassandra-3.11.4]# bin/cqlsh localhost 9042 上面的命令执行效果和不指定一样。我们也可以将节点和端口相关的信息保存到环境变量 $CQLSH_HOST 和 $CQLSH_PORT 里面,这个在我们需要经常连接到特定节点的情况下非常有用。更多关于 cqlsh 命令支持的参数可以使用 bin/cqlsh -help。 基本的 cqlsh 命令 cqlsh 支持很多操作 Cassandra 的基本命令,我们可以在 cqlsh 里面使用 HELP 或 ? 命令查看所有支持的命令: cqlsh> HELP Documented shell commands: =========================== CAPTURE CLS COPY DESCRIBE EXPAND LOGIN SERIAL SOURCE UNICODE CLEAR CONSISTENCY DESC EXIT HELP PAGING SHOW TRACING CQL help topics: ================ AGGREGATES CREATE_KEYSPACE DROP_TRIGGER TEXT ALTER_KEYSPACE CREATE_MATERIALIZED_VIEW DROP_TYPE TIME ALTER_MATERIALIZED_VIEW CREATE_ROLE DROP_USER TIMESTAMP ALTER_TABLE CREATE_TABLE FUNCTIONS TRUNCATE ALTER_TYPE CREATE_TRIGGER GRANT TYPES ALTER_USER CREATE_TYPE INSERT UPDATE APPLY CREATE_USER INSERT_JSON USE ASCII DATE INT UUID BATCH DELETE JSON BEGIN DROP_AGGREGATE KEYWORDS BLOB DROP_COLUMNFAMILY LIST_PERMISSIONS BOOLEAN DROP_FUNCTION LIST_ROLES COUNTER DROP_INDEX LIST_USERS CREATE_AGGREGATE DROP_KEYSPACE PERMISSIONS CREATE_COLUMNFAMILY DROP_MATERIALIZED_VIEW REVOKE CREATE_FUNCTION DROP_ROLE SELECT CREATE_INDEX DROP_TABLE SELECT_JSON 如果需要查看特定命令的帮助,可以使用 HELP 。需要注意的是,很多 cqlsh 命令并不接收相关的参数,当我们使用这些命令时,其输出为当前的设置,比如 CONSISTENCY, EXPAND 和 PAGING 命令,如下: cqlsh> CONSISTENCY Current consistency level is ONE. cqlsh> EXPAND Expanded output is currently disabled. Use EXPAND ON to enable. cqlsh> PAGING Query paging is currently enabled. Use PAGING OFF to disable Page size: 100 在 cqlsh 里面查看环境变量 我们可以使用 DESCRIBE 命令,来查看一些集群的一些环境变量的值。下面命令查看当前集群的情况 cqlsh> DESCRIBE CLUSTER; Cluster: Test Cluster Partitioner: Murmur3Partitioner DESCRIBE CLUSTER 显示了集群的名字以及采用的 Partitioner ,Cassandra 1.2 版本开始默认为 Murmur3Partitioner,其他可选的 Partitioner 有 RandomPartitioner(Cassandra 1.2 版本之前默认的 Partitioner)、OrderPreservingPartitioner 以及 ByteOrderedPartitioner 等。 如果我们需要查看集群里面可用的 keyspaces,可以使用下面命令: cqlsh> DESCRIBE KEYSPACES; system_traces system_schema system_auth system system_distributed 上面命令将系统自带的 keyspaces 都显示出来了,如果我们自己创建了 keyspaces,也会在这里面显示。 可以使用下面命令查看 cqlsh、Cassandra 以及 protocol 的版本: cqlsh> SHOW VERSION; [cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4] 通过 cqlsh 创建 keyspace Cassandra 里面的 keyspace 和关系型数据库里面的 database 概念类似的,一个 keyspace 可以包含一个或多个 tables 或 column families。当我们启动 cqlsh 时没有指定 keyspace,那么命令提示符为 cqlsh>,我们可以使用 CREATE KEYSPACE 命令来创建 keyspace,具体如下: cqlsh> CREATE KEYSPACE iteblog_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; cqlsh> 上面命令创建了名为 iteblog_keyspace 的 keyspace;并且采用 SimpleStrategy 进行副本复制,因为我们这个测试集群只有单个节点,所以这里设置的副本因子(replication factor)为 1。如果是生产环境,千万别把副本因子设置为 1,比较常见的副本因子为 3。其他可选的副本复制策略出了 SimpleStrategy 还有 NetworkTopologyStrategy 和 OldNetworkTopologyStrategy,具体什么含义这里还不深入介绍,后面会起单独一篇文章进行详细介绍。 创建完 keyspace 之后,我们可以使用 DESCRIBE KEYSPACE 命令来查看这个 keyspace: cqlsh> DESCRIBE KEYSPACE iteblog_keyspace; CREATE KEYSPACE iteblog_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true; 现在我们可以使用 USE 命令来切换到这个 keyspace : cqlsh> USE iteblog_keyspace; cqlsh:iteblog_keyspace> 从上面的输出可以看出,keyspace 已经切换到 iteblog_keyspace 了。 通过 cqlsh 创建表 接下来,我们通过 cqlsh 来创建一张表: cqlsh> use iteblog_keyspace; cqlsh:iteblog_keyspace> CREATE TABLE iteblog_user (first_name text , last_name text, PRIMARY KEY (first_name)) ; 通过上面的命令,我们在 iteblog_keyspace 下面创建了一张名为 iteblog_user 的表。其中包含了 first_name 和 last_name 两个字段,类型都是 text,并且 first_name 是这张表的 PRIMARY KEY。当然,我们也可以通过下面命令在 iteblog_keyspace 里面建表: cqlsh> CREATE TABLE iteblog_keyspace.iteblog_user(first_name text , last_name text, PRIMARY KEY (first_name)) ; 效果和上面一样。我们可以使用 DESCRIBE TABLE 命令查看建表语句: cqlsh:iteblog_keyspace> DESCRIBE TABLE iteblog_user; CREATE TABLE iteblog_keyspace.iteblog_user ( first_name text PRIMARY KEY, last_name text ) WITH bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND crc_check_chance = 1.0 AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99PERCENTILE'; cqlsh:iteblog_keyspace> DESCRIBE TABLE 命令将建表语句以格式化的形式显示出来。除了我们制定的设置,还包含了许多默认的设置,这里我们先不纠结这些设置的含义。 通过 cqlsh 往表里面读写数据 到现在,我们已经创建好 keyspace 和 table 了,我们可以往表里面插入一些数据,看下一切是不是正常。 cqlsh:iteblog_keyspace> INSERT INTO iteblog_user (first_name, last_name) VALUES ('iteblog', 'Hadoop'); cqlsh:iteblog_keyspace> INSERT INTO iteblog_user (first_name, last_name) VALUES ('Zhang', 'San'); cqlsh:iteblog_keyspace> INSERT INTO iteblog_user (first_name) VALUES ('Wu'); 上面语句我们往 iteblog_user 表里面插入三条数据,其中最后一条数据只指定了 key,last_name 没有值。现在我们可以使用 SELECT COUNT 语句查看上面的数据是否插入成功 cqlsh:iteblog_keyspace> SELECT COUNT(*) FROM iteblog_user; count ------- 3 (1 rows) Warnings : Aggregation query used without partition key 可以看出 iteblog_user 表里面已经有3条数据了。我们可以使用下面命令将这条数据查询出来: cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user; first_name | last_name ------------+----------- iteblog | Hadoop Wu | null Zhang | San (3 rows) cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name='iteblog'; first_name | last_name ------------+----------- iteblog | Hadoop (1 rows) cqlsh:iteblog_keyspace> 可以看出,first_name 为 Wu 对应的 last_name 没数据直接显示 null 了,在 Cassandra 里面的这个代表对应的列没有数据,在底层存储是不占用空间的,而在常见的关系型数据库里面是占一定空间的。 我们可以使用 DELETE 命令删除一些列,比如我们删除 last_name 列, cqlsh:iteblog_keyspace> DELETE last_name FROM iteblog_user WHERE first_name='iteblog'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name='iteblog'; first_name | last_name ------------+----------- iteblog | null (1 rows) cqlsh:iteblog_keyspace> 可以看出 last_name 列已经成功被删除了。 我们也可以删除一整行数据,如下: cqlsh:iteblog_keyspace> DELETE FROM iteblog_user WHERE first_name='iteblog'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user WHERE first_name='iteblog'; first_name | last_name ------------+----------- (0 rows) cqlsh:iteblog_keyspace> 可以看到 key 为 iteblog 的数据已经成功被删除了。 insert/update 相当于 upsert 如果我们插入数据对应的 key 在 Cassandra 已经存在了,这时候 Cassandra 并不会在原来数据位置上修改数据,而是会新写入一份数据,旧的数据会被 Cassandra 删除。 cqlsh:iteblog_keyspace> INSERT INTO iteblog_user (first_name, last_name) VALUES ('Wu', 'Shi'); cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user; first_name | last_name ------------+----------- Wu | Shi Zhang | San (2 rows) 可以看见,key 为 Wu 的数据对应的 last_name 已经有值了。 如果我们使用 UPDATE 命令往表里面更新不存在的数据会发生什么呢?答案是会插入新的数据。 cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user; first_name | last_name ------------+----------- Wu | Shi Zhang | San (2 rows) cqlsh:iteblog_keyspace> UPDATE iteblog_user SET last_name = 'Si' WHERE first_name = 'Li'; cqlsh:iteblog_keyspace> SELECT * FROM iteblog_user; first_name | last_name ------------+----------- Wu | Shi Zhang | San Li | Si (3 rows) cqlsh:iteblog_keyspace> 可见,key 为 Li 的数据被插入到表中了,更新之前不存在。 清空或删除表 如果我们确实想清空一张表,我们也可以使用 TRUNCATE 命令;使用 DROP TABLE 命令可以删除一张表。 cqlsh:iteblog_keyspace> TRUNCATE iteblog_user; cqlsh:iteblog_keyspace> DROP TABLE iteblog_user; 到目前为止,我们已经学会了 cqlsh 的一些简单的命令。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
为了营造一个开放的 Cassandra 技术交流环境,和国内对 Cassandra 感兴趣的开发者进行交流,我们准备在近期写一个 Apache Cassandra 从入门到精通的系列文章,具体文章目录如下(如果大家有补充的可以在下面留言),可以点进去的说明是写完的。 1、基础篇 1.1 Cassandra简单介绍 1.2 为什么选择Cassandra进行大数据管理? 1.3 Cassandra quick start 1.4 Apache Cassandra Composite KeyPartition keyClustering key 介绍 1.5 Apache Cassandra static column 介绍与实战 1.6 多语言客户端 1.7 Cassandra 数据类型 1.8 Cassandra 配置介绍 2、进阶篇 2.1 架构、基本原理、核心技术等 2.1.1 一致性哈希 2.1.2 Gossip 2.1.3 全球容灾 2.1.4 xxxx 2.2 读写io流程 2.3 数据存储模型 2.4 数据 Compaction 2.5 数据 Flush 流程 2.6 二级索引 & sasi index & 物化视图 2.7 Cassandra 启动流程 2.8 基本运维指令和原理介绍 2.8.1 备份恢复 2.8.1.1 incremental backups 介绍 3、案例实战 3.1 Cassandra 运维 3.2 Cassandra 监控 3.3 性能调优 3.4 Dynamo 迁移 Cassandra 实战 3.5 Cassandra 业界案例 3.5.1 Apache Cassandra 在 Facebook 的应用 3.5.2 Discord 公司如何使用 Cassandra 存储上亿条线上数据 注意,本篇文章可以转载,但必须保留原文地址,以及上面各小结原文地址。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
Apache Cassandra 是一个开源的、分布式、无中心、弹性可扩展、高可用、容错、一致性可调、面向行的数据库,它基于 Amazon Dynamo 的分布式设计和 Google Bigtable 的数据模型,由 Facebook 创建,在一些最流行的网站中得到应用。 为什么会诞生 Apache Cassandra 2007 年 Facebook 为了解决消息收件箱搜索问题( Inbox Search problem)而开始设计 Cassandra 项目。 当时 Facebook 遇到了传统的方法难以解决的超大数据量存储可扩展性问题。具体来说,项目团队需要处理大量的消息副本、消息的反向索引等不同形式的数据,需要处理很多随机读和并发随机写操作。 该团队由 Jeff Hammerbacher 领导,核心工程师包括 Avinash Lakshman,Karthik Ranganathan 和搜索团队的 Prashant Malik 。2008年7月 Cassandra 的代码被作为开源项目发布到 Google Code。虽然代码在2008年作为 Google Code 的一个项目,但是这段时间基本上只有 Facebook 工程师来更新代码,基本上没有形成社区。所以在2009年3月,Cassandra 被转移到 Apache 孵化器项目,并在2010年2月17日,它被投票成为一个顶级项目。 在 Apache Cassandra Wiki 上,您可以找到 committers 列表,其中许多人自 2010/2011 年以来就一直参与该项目。 committers 主要来自 Twitter,LinkedIn,Apple,其中还包括了许多独立开发者。 Cassandra 的设计很大程度受 Amazon Dynamo 的影响,具体可参见《Dynamo: Amazon's Highly Available Key-value Store》。2010年由 Facebook 的 Lakshman 和 Malik 在 ACM 首次发表了 Cassandra 的论文 《Cassandra: a decentralized structured storage system》,向全世界介绍了 Cassandra。 随着商界对 Cassandra 的兴趣增加,对 Cassandra 的生产支持变得越来越明显。2010年4月 Cassandra 的 Apache 项目主席 Jonathan Ellis 和其同事 Matt Pfeil 成立了一家名为 DataStax 公司(最初名为 Riptano)。DataStax 雇佣了多名 Cassandra Committer,为 Cassandra 项目提供了相关支持,并引领其发展。 Cassandra 的名字由来在希腊神话里,Cassandra 是特洛伊国王 Priam 和 Hecuba 王后的女儿。Cassandra 非常美丽,以至于阿波罗给了她预见未来的能力。但当她拒绝阿波罗的爱慕的时候,遭到他的诅咒。从此,她依然可以精确地预知未来,但是不会有任何人相信她。Cassandra 预知了她的特洛伊城终将覆灭,但却无力阻止这一悲剧。Cassandra 分布式数据库就据此命名。 Apache Cassandra 特性 分布式和去中心化(Distributed and Decentralized) Cassandra 是分布式的,这意味着它可以运行在多台机器上,并呈现给用户一个一致的整体。事实上,在一个节点上运行 Cassandra 是没啥用的,虽然我们可以这么做,并且这可以帮助我们了解它的工作机制,但是你很快就会意识到,需要多个节点才能真正了解 Cassandra 的强大之处。它的很多设计和实现让系统不仅可以在多个节点上运行,更为多机架部署进行了优化,甚至一个 Cassandra 集群可以运行在分散于世界各地的数据中心上。你可以放心地将数据写到集群的任意一台机器上,Cassandra 都会收到数据。 对于很多存储系统(比如 MySQL, Bigtable),一旦你开始扩展它,就需要把某些节点设为主节点,其他则作为从节点。但 Cassandra 是无中心的,也就是说每个节点都是一样的。与主从结构相反,Cassandra 的协议是 P2P 的,并使用 gossip 来维护存活或死亡节点的列表。关于 gossip 可以参见《分布式原理:一文了解 Gossip 协议》。 去中心化这一事实意味着 Cassandra 不会存在单点失效。Cassandra 集群中的所有节点的功能都完全一样, 所以不存在一个特殊的主机作为主节点来承担协调任务。有时这被叫做服务器对称(server symmetry)。 综上所述,Cassandra 是分布式、无中心的,它不会有单点失效,所以支持高可用性。 弹性可扩展(Elastic Scalability) 可扩展性是指系统架构可以让系统提供更多的服务而不降低使用性能的特性。仅仅通过给现有的机器增加硬件的容量、内存进行垂直扩展,是最简单的达到可扩展性的手段。而水平扩展则需要增加更多机器,每台机器提供全部或部分数据,这样所有主机都不必负担全部业务请求。但软件自己需要有内部机制来保证集群中节点间的数据同步。 弹性可扩展是指水平扩展的特性,意即你的集群可以不间断的情况下,方便扩展或缩减服务的规模。这样,你就不需要重新启动进程,不必修改应用的查询,也无需自己手工重新均衡数据分布。在 Cassandra 里,你只要加入新的计算机,Cassandra 就会自动地发现它并让它开始工作。 高可用和容错(High Availability and Fault Tolerance) 从一般架构的角度来看,系统的可用性是由满足请求的能力来量度的。但计算机可能会有各种各样的故障,从硬件器件故障到网络中断都有可能。如何计算机都可能发生这些情况,所以它们一般都有硬件冗余,并在发生故障事件的情况下会自动响应并进行热切换。对一个需要高可用的系统,它必须由多台联网的计算机构成,并且运行于其上的软件也必须能够在集群条件下工作,有设备能够识别节点故障,并将发生故障的中端的功能在剩余系统上进行恢复。 Cassandra 就是高可用的。你可以在不中断系统的情况下替换故障节点,还可以把数据分布到多个数据中心里,从而提供更好的本地访问性能,并且在某一数据中心发生火灾、洪水等不可抗灾难的时候防止系统彻底瘫痪。 可调节的一致性(Tuneable Consistency) 2000年,加州大学伯克利分校的 Eric Brewer 在 ACM 分布式计算原理会议提出了著名的 CAP 定律。CAP 定律表明,对于任意给定的系统,只能在一致性(Consistency)、可用性(Availability)以及分区容错性(Partition Tolerance)之间选择两个。关于 CAP 定律的详细介绍可参见《分布式系统一致性问题、CAP定律以及 BASE 理论》以及《一篇文章搞清楚什么是分布式系统 CAP 定理》。所以 Cassandra 在设计的时候也不得不考虑这些问题,因为分区容错性这个是每个分布式系统必须考虑的,所以只能在一致性和可用性之间做选择,而 Cassandra 的应用场景更多的是为了满足可用性,所以我们只能牺牲一致性了。但是根据 BASE 理论,我们其实可以通过牺牲强一致性获得可用性。 Cassandra 提供了可调节的一致性,允许我们选定需要的一致性水平与可用性水平,在二者间找到平衡点。因为客户端可以控制在更新到达多少个副本之前,必须阻塞系统。这是通过设置副本因子(replication factor)来调节与之相对的一致性级别。 通过副本因子(replication factor),你可以决定准备牺牲多少性能来换取一致性。 副本因子是你要求更新在集群中传播到的节点数(注意,更新包括所有增加、删除和更新操作)。 客户端每次操作还必须设置一个一致性级别(consistency level)参数,这个参数决定了多少个副本写入成功才可以认定写操作是成功的,或者读取过程中读到多少个副本正确就可以认定是读成功的。这里 Cassandra 把决定一致性程度的权利留给了客户自己。 所以,如果需要的话,你可以设定一致性级别和副本因子相等,从而达到一个较高的一致性水平,不过这样就必须付出同步阻塞操作的代价,只有所有节点都被更新完成才能成功返回一次更新。而实际上,Cassandra 一般都不会这么来用,原因显而易见(这样就丧失了可用性目标,影响性能,而且这不是你选择 Cassandra 的初衷)。而如果一个客户端设置一致性级别低于副本因子的话,即使有节点宕机了,仍然可以写成功。 总体来说,Cassandra 更倾向于 CP,虽然它也可以通过调节一致性水平达到 AP;但是不推荐你这么设置。 面向行(Row-Oriented) Cassandra 经常被看做是一种面向列(Column-Oriented)的数据库,这也并不算错。它的数据结构不是关系型的,而是一个多维稀疏哈希表。稀疏(Sparse)意味着任何一行都可能会有一列或者几列,但每行都不一定(像关系模型那样)和其他行有一样的列。每行都有一个唯一的键值,用于进行数据访问。所以,更确切地说,应该把 Cassandra 看做是一个有索引的、面向行的存储系统。 Cassandra 的数据存储结构基本可以看做是一个多维哈希表。这意味着你不必事先精确地决定你的具体数据结构或是你的记录应该包含哪些具体字段。这特别适合处于草创阶段,还在不断增加或修改服务特性的应用。而且也特别适合应用在敏捷开发项目中,不必进行长达数月的预先分析。对于使用 Cassandra 的应用,如果业务发生变化了,只需要在运行中增加或删除某些字段就行了,不会造成服务中断。 当然, 这不是说你不需要考虑数据。相反,Cassandra 需要你换个角度看数据。在 RDBMS 里, 你得首先设计一个完整的数据模型, 然后考虑查询方式, 而在 Cassandra 里,你可以首先思考如何查询数据,然后提供这些数据就可以了。 灵活的模式(Flexible Schema) Cassandra 的早期版本支持无模式(schema-free)数据模型,可以动态定义新的列。 无模式数据库(如 Bigtable 和 MongoDB)在访问大量数据时具有高度可扩展性和高性能的优势。 无模式数据库的主要缺点是难以确定数据的含义和格式,这限制了执行复杂查询的能力。 为了解决这些问题,Cassandra 引入了 Cassandra Query Language(CQL),它提供了一种通过类似于结构化查询语言(SQL)的语法来定义模式。 最初,CQL 是作为 Cassandra 的另一个接口,并且基于 Apache Thrift 项目提供无模式的接口。 在这个过渡阶段,术语“模式可选”(Schema-optional)用于描述数据模型,我们可以使用 CQL 的模式来定义。并且可以通过 Thrift API 实现动态扩展以此添加新的列。 在此期间,基础数据存储模型是基于 Bigtable 的。 从 3.0 版本开始,不推荐使用基于 Thrift API 的动态列创建的 API,并且 Cassandra 底层存储已经重新实现了,以更紧密地与 CQL 保持一致。 Cassandra 并没有完全限制动态扩展架构的能力,但它的工作方式却截然不同。 CQL 集合(比如 list、set、尤其是 map)提供了在无结构化的格式里面添加内容的能力,从而能扩展现有的模式。CQL 还提供了改变列的类型的能力,以支持 JSON 格式的文本的存储。 因此,描述 Cassandra 当前状态的最佳方式可能是它支持灵活的模式。 高性能(High Performance) Cassandra 在设计之初就特别考虑了要充分利用多处理器和多核计算机的性能,并考虑在分布于多个数据中心的大量这类服务器上运行。它可以一致而且无缝地扩展到数百台机器,存储数 TB 的数据。Cassandra 已经显示出了高负载下的良好表现,在一个非常普通的工作站上,Cassandra 也可以提供非常高的写吞吐量。而如果你增加更多的服务器,你还可以继续保持 Cassandra 所有的特性而无需牺牲性能。 Cassandra 的应用场景 我们已经介绍了 Cassandra 的主要特点,对 Cassandra 的长处有了一定的理解。尽管 Cassandra 设计精巧,功能出色,但也不能胜任所有的工作。所以我们来介绍一下 Cassandra 最适合的场景。 大规模部署 你可能不会开着一辆轻型的小卡车去取干洗的衣服,小卡车显然不适合这种工作。Cassandra 的很多精巧设计都专注于高可用、可调一致性、P2P 协议、无缝扩展等,这些都是 Cassandra 的卖点。这些特性在单节点工作时都是没有意义的,更无法实现它的全部能力。 但是,单节点关系数据库在很多情况下可能正是我们需要的。所以你需要做一些评估。考虑你的期望的流量、吞吐需求以及 SAL 等。关于评估没有什么硬性的指标和要求。但如果你认为有几种关系型数据库可以很好地应付你的流量,提供不错的性能,那可能选关系型数据库更好。简单地说,这是因为 RDBMS 更易于在单机上运行,对你来说也更熟悉。 但是,如果你认为需要至少几个节点才能支撑你的业务,那 Cassandra 就是个不错的选择。如果你的应用可能需要数十个节点,那 Cassandra 可能就是个很棒的选择了。 写密集、统计和分析型工作 考虑一下你的应用的读写比例,Cassandra 是为优异的写吞吐量而特别优化的。 许多早期使用 Cassandra 的产品都用于存储用户状态更新、社交网络、建议/评价以及应用统计等。这些都是 Cassandra 很好的应用场景,因为这些应用大都是写多于读的,并且更新可能随时发生并伴有突发的峰值。事实上,支撑应用负载需要很高的多客户线程并发写性能,这正是 Cassandra 的主要特性。 根据项目的 wiki,Cassandra 已经被用于开发了多种不同的应用,包括窗口化的时间序列数据库,用于文档搜索的反向索引,以及分布式任务优先级队列。 地区分布 Cassandra 直接支持多地分布的数据存储,Cassandra 可以很容易配置成将数据分布到多个数据中心的存储方式。如果你有一个全球部署的应用,那么让数据贴近用户会获得不错的性能收益,Cassandra 正适合这种应用场合。 变化的应用 如果你正在“初创阶段”,业务会不断改进,Cassandra 这种灵活的模式的数据模型可能更适合你。这让你的数据库能更快地跟上业务改进的步伐。 谁在使用 Cassandra Cassandra 在全世界有多达 1500 家公司使用: 苹果的 Cassandra 集群达到 75,000 节点,存储了 10PB 的数据; Netflix 的 Cassandra 集群达到 2,500 个节点,存储了多达 420TB 的数据; 宜搜的 Cassandra 集群达到 270 个节点,存储多达 300TB 的数据; eBay 的 Cassandra 集群达到 100 个节点,存储多达 250TB 的数据; 360 的 Cassandra 集群达到 1500 个节点; 饿了么的 Cassandra 集群达到 100 个节点。 微信公众号和钉钉群交流 为了营造一个开放的 Cassandra 技术交流,我们建立了微信公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期在国内开展线下技术沙龙,专家技术直播,欢迎大家加入。 微信公众号: 钉钉群 钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o
近几年来,人工智能逐渐火热起来,特别是和大数据一起结合使用。人工智能的主要场景又包括图像能力、语音能力、自然语言处理能力和用户画像能力等等。这些场景我们都需要处理海量的数据,处理完的数据一般都需要存储起来,这些数据的特点主要有如下几点: 大:数据量越大,对我们后面建模越会有好处; 稀疏:每行数据可能拥有不同的属性,比如用户画像数据,每个人拥有属性相差很大,可能用户A拥有这个属性,但是用户B没有这个属性;那么我们希望存储的系统能够处理这种情况,没有的属性在底层不占用空间,这样可以节约大量的空间使用; 列动态变化:每行数据拥有的列数是不一样的。 为了更好的介绍 HBase 在人工智能场景下的使用,下面以某人工智能行业的客户案例进行分析如何利用 HBase 设计出一个快速查找人脸特征的系统。 目前该公司的业务场景里面有很多人脸相关的特征数据,总共3400多万张,每张人脸数据大概 3.2k。这些人脸数据又被分成很多组,每个人脸特征属于某个组。目前总共有近62W个人脸组,每个组的人脸张数范围为 1 ~ 1W不等,每个组里面会包含同一个人不同形式的人脸数据。组和人脸的分布如下: 43%左右的组含有1张人脸数据; 47%左右的组含有 2 ~ 9张人脸数据; 其余的组人脸数范围为 10 ~ 10000。 现在的业务需求主要有以下两类: 根据人脸组 id 查找该组下面的所有人脸; 根据人脸组 id +人脸 id 查找某个人脸的具体数据。 MySQL + OSS 方案 之前业务数据量比较小的情况使用的存储主要为 MySQL 以及 OSS(对象存储)。相关表主要有人脸组表group和人脸表face。表的格式如下: group表: group_id size 1 2 face表: face_id group_id feature "c5085f1ef4b3496d8b4da050cab0efd2" 1 "cwI4S/HO/nm6H……" 其中 feature 大小为3.2k,是二进制数据 base64 后存入的,这个就是真实的人脸特征数据。 现在人脸组 id 和人脸 id 对应关系存储在 MySQL 中,对应上面的 group 表;人脸 id 和人脸相关的特征数据存储在 OSS 里面,对应上面的 face 表。 因为每个人脸组包含的人类特征数相差很大(1 ~ 1W),所以基于上面的表设计,我们需要将人脸组以及每张人脸特征id存储在每一行,那么属于同一个人脸组的数据在MySQL 里面上实际上存储了很多行。比如某个人脸组id对应的人脸特征数为1W,那么需要在 MySQL 里面存储 1W 行。 我们如果需要根据人脸组 id 查找该组下面的所有人脸,那么需要从 MySQL 中读取很多行的数据,从中获取到人脸组和人脸对应的关系,然后到 OSS 里面根据人脸id获取所有人脸相关的特征数据,如下图的左部分所示。 我们从上图的查询路径可以看出,这样的查询导致链路非常长。从上面的设计可看出,如果查询的组包含的人脸张数比较多的情况下,那么我们需要从 MySQL 里面扫描很多行,然后再从 OSS 里面拿到这些人脸的特征数据,整个查询时间在10s左右,远远不能满足现有业务快速发展的需求。 HBase 方案 上面的设计方案有两个问题: 原本属于同一条数据的内容由于数据本身大小的原因无法存储到一行里面,导致后续查下需要访问两个存储系统; 由于MySQL不支持动态列的特性,所以属于同一个人脸组的数据被拆成多行存储。 针对上面两个问题,我们进行了分析,得出这个是 HBase 的典型场景,原因如下: HBase 拥有动态列的特性,支持万亿行,百万列; HBase 支持多版本,所有的修改都会记录在 HBase 中; HBase 2.0 引入了 MOB(Medium-Sized Object) 特性,支持小文件存储。HBase 的 MOB 特性针对文件大小在 1k~10MB 范围的,比如图片,短视频,文档等,具有低延迟,读写强一致,检索能力强,水平易扩展等关键能力。 我们可以使用这三个功能重新设计上面 MySQL + OSS 方案。结合上面应用场景的两大查询需求,我们可以将人脸组 id 作为 HBase 的 Rowkey,系统的设计如上图的右部分显示,在创建表的时候打开 MOB 功能,如下: create 'face', {NAME => 'c', IS_MOB => true, MOB_THRESHOLD => 2048} 上面我们创建了名为 face 的表,IS_MOB 属性说明列簇 c 将启用 MOB 特性,MOB_THRESHOLD 是 MOB 文件大小的阈值,单位是字节,这里的设置说明文件大于 2k 的列都当做小文件存储。大家可能注意到上面原始方案中采用了 OSS 对象存储,那我们为什么不直接使用 OSS 存储人脸特征数据呢,如果有这个疑问,可以看看下面表的性能测试: 对比属性 对象存储 云 HBase 建模能力 KV KV、表格、稀疏表、SQL、全文索引、时空、时序、图查询 查询能力 前缀查找 前缀查找、过滤器、索引 性能 优 优,特别对小对象有更低的延迟;在复杂查询场景下,比对象存储有10倍以上的性能提升 成本 按流量,请求次数计费,适合访问频率低的场景 托管式,在高并发,高吞吐场景有更低的成本 扩展性 优 优 适用对象范围 通用 <10MB 根据上面的对比,使用 HBase MOB特性来存储小于10MB的对象相比直接使用对象存储有一些优势。我们现在来看看具体的表设计,如下图: 上面 HBase 表的列簇名为c,我们使用人脸id作为列名。我们只使用了 HBase 的一张表就替换了之前方面的三张表!虽然我们启用了 MOB,但是具体插入的方法和正常使用一样,代码片段如下: String CF_DEFAULT = "c"; Put put = new Put(groupId.getBytes()); put.addColumn(CF_DEFAULT.getBytes(),faceId1.getBytes(), feature1.getBytes()); put.addColumn(CF_DEFAULT.getBytes(),faceId2.getBytes(), feature2.getBytes()); …… put.addColumn(CF_DEFAULT.getBytes(),faceIdn.getBytes(), featuren.getBytes()); table.put(put); 用户如果需要根据人脸组id获取所有人脸的数据,可以使用下面方法: Get get = new Get(groupId.getBytes()); Result re=table.get(get); 这样我们可以拿到某个人脸组id对应的所有人脸数据。如果需要根据人脸组id+人脸id查找某个人脸的具体数据,看可以使用下面方法: Get get = new Get(groupId.getBytes()); get.addColumn(CF_DEFAULT.getBytes(), faceId1.getBytes()) Result re=table.get(get); 经过上面的改造,在2台 HBase worker 节点内存为32GB,核数为8,每个节点挂载四块大小为 250GB 的 SSD 磁盘,并写入 100W 行,每行有1W列,读取一行的时间在100ms-500ms左右。在每行有1000个face的情况下,读取一行的时间基本在20-50ms左右,相比之前的10s提升200~500倍。 下面是各个方案的对比性能对比情况。 对比属性 对象存储 MySQL+对象存储 HBase MOB 读写强一致 Y N Y 查询能力 弱 强 强 查询响应时间 高 高 低 运维成本 低 高 低 水平扩展 Y Y Y 使用 Spark 加速数据分析 我们已经将人脸特征数据存储在阿里云 HBase 之中,这个只是数据应用的第一步,如何将隐藏在这些数据背后的价值发挥出来?这就得借助于数据分析,在这个场景就需要采用机器学习的方法进行聚类之类的操作。我们可以借助 Spark 对存储于 HBase 之中的数据进行分析,而且 Spark 本身支持机器学习的。但是如果直接采用开源的 Spark 读取 HBase 中的数据,会对 HBase 本身的读写有影响的。 针对这些问题,阿里云 HBase 团队对 Spark 进行了相关优化,比如直接读取 HFile、算子下沉等;并且提供全托管的 Spark 产品,通过SQL服务ThriftServer、作业服务LivyServer简化Spark的使用等。目前这套 Spark 的技术栈如下图所示。 通过 Spark 服务,我们可以和 HBase 进行很好的整合,将实时流和人脸特征挖掘整合起来,整个架构图如下: 我们可以收集各种人脸数据源的实时数据,经过 Spark Streaming 进行简单的 ETL 操作;其次,我们通过 Spark MLib 类库对刚刚试试收集到的数据进行人脸特征挖掘,最后挖掘出来的结果存储到 HBase 之中。最后,用户可以通过访问 HBase 里面已经挖掘好的人脸特征数据进行其他的应用。 【HBase生态+Spark社区大群】 (1)技术交流钉钉大群【强烈推荐!】 群内每周进行群直播技术分享及问答 加入方式1:1)点击link申请加入 https://dwz.cn/Fvqv066s 加入方式2:钉钉扫码加入:(2)微信群由于微信群已经超过100人,需要先加我们的同学,再拉入群先加 微信号:iteblog 注明云HBase交流
2019年09月
2019年06月
可以实现一些简单的OLTP查询,复杂一点的建议用 Spark。
-------------------------
-------------------------