Apache Spark Delta Lake 事务日志实现源码分析

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。

Apache Spark Delta Lake 事务日志实现源码分析

我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。在看本文时,强烈建议先看一下《深入理解 Apache Spark Delta Lake 的事务日志》文章。

Delta Lake 更新数据事务实现

Delta Lake 里面所有对表数据的更新(插入数据、更新数据、删除数据)都需要进行下面这些步骤,其主要目的是把删除哪些文件、新增哪些文件等记录写入到事务日志里面,也就是 _delta_log 目录下的 json 文件,通过这个实现 Delta Lake 的 ACID 以及时间旅行。下面我们进入事务日志提交的切入口 org.apache.spark.sql.delta.OptimisticTransaction#commit,持久化事务操作日志都是需要调用这个函数进行的。commit 函数实现如下:

def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(
      deltaLog,
      "delta.commit") {
    val version = try {
      // 事务日志提交之前需要先做一些工作,比如如果更新操作是第一次进行的,那么需要初始化 Protocol,
      // 还需要将用户对 Delta Lake 表的设置持久化到事务日志里面
      var finalActions = prepareCommit(actions, op)

      // 如果这次更新操作需要删除之前的文件,那么 isBlindAppend 为 false,否则为 true
      val isBlindAppend = {
        val onlyAddFiles =
          finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
        onlyAddFiles && !dependsOnFiles
      }

      // 如果 commitInfo.enabled 参数设置为 true,那么还需要把 commitInfo 记录到事务日志里面
      if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {
        commitInfo = CommitInfo(
          clock.getTimeMillis(),
          op.name,
          op.jsonEncodedValues,
          Map.empty,
          Some(readVersion).filter(_ >= 0),
          None,
          Some(isBlindAppend))
        finalActions = commitInfo +: finalActions
      }

      // 真正写事务日志,如果发生版本冲突会重试直到事务日志写成功
      val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)
      logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
      // 对事务日志进行 checkpoint 操作
      postCommit(commitVersion, finalActions)
      commitVersion
    } catch {
      case e: DeltaConcurrentModificationException =>
        recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)
        throw e
      case NonFatal(e) =>
        recordDeltaEvent(
          deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
        throw e
    }
    version
}

我们先从这个函数的两个参数开始介绍。

  • _actions: Seq[Action]_:Delta Lake 表更新操作产生的新文件(AddFile)和需要删除文件的列表(RemoveFile)。如果是 Structured Streaming 作业,还会记录 SetTransaction 记录,里面会存储作业的 query id(sql.streaming.queryId)、batchId 以及当前时间。这个就是我们需要持久化到事务日志里面的数据。
  • _op: DeltaOperations.Operation_:Delta 操作类型,比如 WRITE、STREAMING UPDATE、DELETE、MERGE 以及 UPDATE 等。

在 commit 函数里面主要分为三步:prepareCommit、doCommit 以及 postCommit。prepareCommit 的实现如下:

protected def prepareCommit(
      actions: Seq[Action],
      op: DeltaOperations.Operation): Seq[Action] = {
 
    assert(!committed, "Transaction already committed.")
 
    //  如果我们更新了表的 Metadata 信息,那么需要将其写入到事务日志里面
    var finalActions = newMetadata.toSeq ++ actions
    val metadataChanges = finalActions.collect { case m: Metadata => m }
    assert(
      metadataChanges.length <= 1,
      "Cannot change the metadata more than once in a transaction.")
    metadataChanges.foreach(m => verifyNewMetadata(m))
 
    //   首次提交事务日志,那么会确保 _delta_log 目录要存在,
    // 然后检查 finalActions 里面是否有 Protocol,没有的话需要初始化协议版本
    if (snapshot.version == -1) {
      deltaLog.ensureLogDirectoryExist()
      if (!finalActions.exists(_.isInstanceOf[Protocol])) {
        finalActions = Protocol() +: finalActions
      }
    }
 
    finalActions = finalActions.map {
      //  第一次提交,并且是 Metadata那么会将 Delta Lake 的配置信息加入到 Metadata 里面
      case m: Metadata if snapshot.version == -1 =>
        val updatedConf = DeltaConfigs.mergeGlobalConfigs(
          spark.sessionState.conf, m.configuration, Protocol())
        m.copy(configuration = updatedConf)
      case other => other
    }
 
    deltaLog.protocolWrite(
      snapshot.protocol,
      logUpgradeMessage = !actions.headOption.exists(_.isInstanceOf[Protocol]))
 
    //  如果 actions 里面有删除的文件,那么需要检查 Delta Lake 是否支持删除
    val removes = actions.collect { case r: RemoveFile => r }
    if (removes.exists(_.dataChange)) deltaLog.assertRemovable()
 
    finalActions
}

prepareCommit 里面做的事情比较简单,主要对事务日志进行补全等操作。具体为

  • 、由于 Delta Lake 表允许对已经存在的表模式进行修改,比如添加了新列,或者覆盖原有表的模式等。那么这时候我们需要将新的 Metadata 写入到事务日志里面。Metadata 里面存储了表的 schema、分区列、表的配置、表的创建时间。注意,除了表的 schema 和分区字段可以在后面修改,其他的信息都不可以修改的。
  • 、如果是首次提交事务日志,那么先检查表的 _delta_log 目录是否存在,不存在则创建。然后检查是否设置了协议的版本,如果没有设置,则使用默认的协议版本,默认的协议版本中 readerVersion = 1,writerVersion = 2;
  • 、如果是第一次提交,并且是 Metadata ,那么会将 Delta Lake 的配置信息加入到 Metadata 里面。Delta Lake 表的配置信息主要是在 org.apache.spark.sql.delta.sources.DeltaSQLConf 类里面定义的,比如我们可以在创建 Delta Lake 表的时候指定多久做一次 Checkpoint。
  • 、由于我们可以通过 spark.databricks.delta.properties.defaults.appendOnly 参数将表设置为仅允许追加,所以如果当 actions 里面存在 RemoveFile,那么我们需要判断表是否允许删除。

我们回到 commit 函数里面,在执行完 prepareCommit 之后得到了 finalActions 列表,这些信息就是我们需要写入到事务日志里面的数据。紧接着会判断这次事务变更是否需要删除之前的文件,如果是,那么 isBlindAppend 为 false,否则为 true。

当 commitInfo.enabled 参数设置为 true(默认),那么还需要将 commitInfo 写入到事务日志文件里面。CommitInfo 里面包含了操作时间、操作的类型(WRITEUPDATE)、操作类型(Overwrite)等重要信息。最后到了 doCommit 函数的调用,大家注意看第一个参数传递的是 snapshot.version + 1snapshot.version 是事务日志中最新的版本,比如 _delta_lake 目录下的文件如下:

-rw-r--r--  1 yangping.wyp  wheel   811B  8 28 19:12 00000000000000000000.json
-rw-r--r--  1 yangping.wyp  wheel   514B  8 28 19:14 00000000000000000001.json
-rw-r--r--  1 yangping.wyp  wheel   711B  8 29 10:54 00000000000000000002.json
-rw-r--r--  1 yangping.wyp  wheel   865B  8 29 10:56 00000000000000000003.json

那么 snapshot.version 的值就是3,所以这次更新操作的版本应该是4。我们来看下 doCommit 函数的实现:

private def doCommit(
      attemptVersion: Long,
      actions: Seq[Action],
      attemptNumber: Int): Long = deltaLog.lockInterruptibly {
    try {
      logDebug(s"Attempting to commit version $attemptVersion with ${actions.size} actions")
       
      //  真正写事务日志的操作
      deltaLog.store.write(
        deltaFile(deltaLog.logPath, attemptVersion),
        actions.map(_.json).toIterator)
      val commitTime = System.nanoTime()
      //  由于发生了数据更新,所以更新内存中事务日志的最新快照,并做相关判断
      val postCommitSnapshot = deltaLog.update()
      if (postCommitSnapshot.version < attemptVersion) {
        throw new IllegalStateException(
          s"The committed version is $attemptVersion " +
            s"but the current version is ${postCommitSnapshot.version}.")
      }
 
      //  发送一些统计信息
      var numAbsolutePaths = 0
      var pathHolder: Path = null
      val distinctPartitions = new mutable.HashSet[Map[String, String]]
      val adds = actions.collect {
        case a: AddFile =>
          pathHolder = new Path(new URI(a.path))
          if (pathHolder.isAbsolute) numAbsolutePaths += 1
          distinctPartitions += a.partitionValues
          a
      }
      val stats = CommitStats(
        startVersion = snapshot.version,
        commitVersion = attemptVersion,
        readVersion = postCommitSnapshot.version,
        txnDurationMs = NANOSECONDS.toMillis(commitTime - txnStartNano),
        commitDurationMs = NANOSECONDS.toMillis(commitTime - commitStartNano),
        numAdd = adds.size,
        numRemove = actions.collect { case r: RemoveFile => r }.size,
        bytesNew = adds.filter(_.dataChange).map(_.size).sum,
        numFilesTotal = postCommitSnapshot.numOfFiles,
        sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
        protocol = postCommitSnapshot.protocol,
        info = Option(commitInfo).map(_.copy(readVersion = None, isolationLevel = None)).orNull,
        newMetadata = newMetadata,
        numAbsolutePaths,
        numDistinctPartitionsInAdd = distinctPartitions.size,
        isolationLevel = null)
      recordDeltaEvent(deltaLog, "delta.commit.stats", data = stats)
 
      attemptVersion
    } catch {
      case e: java.nio.file.FileAlreadyExistsException =>
        checkAndRetry(attemptVersion, actions, attemptNumber)
    }
}
  • 、这里就是真正写事务日志的操作,其中 store 是通过 spark.delta.logStore.class 参数指定的,目前支持 HDFS、S3、Azure 以及 Local 等存储介质。默认是 HDFS。具体的写事务操作参见下面的介绍。
  • 、持久化事务日志之后,更新内存中的事务日志最新的快照,然后做相关的合法性校验;
  • 、发送一些统计信息。这里应该是 databricks 里面含有的功能,开源版本这里面其实并没有做什么操作。

下面我们开看看真正写事务日志的实现,为了简单起见,我们直接查看 HDFSLogStore 类中对应的方法,主要涉及 writeInternal,其实现如下:

private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = {
    //  获取 HDFS 的 FileContext 用于后面写事务日志
    val fc = getFileContext(path)
 
    //  如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试
    if (!overwrite && fc.util.exists(path)) {
      // This is needed for the tests to throw error with local file system
      throw new FileAlreadyExistsException(path.toString)
    }
 
    //  事务日志先写到临时文件
    val tempPath = createTempPath(path)
    var streamClosed = false // This flag is to avoid double close
    var renameDone = false // This flag is to save the delete operation in most of cases.
    val stream = fc.create(
      tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
 
    try {
      //  将本次修改产生的 actions 写入到临时事务日志里
      actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
      stream.close()
      streamClosed = true
      try {
        val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE
        //  将临时的事务日志移到正式的事务日志里面,如果移动失败则抛出异常,后面再重试
        fc.rename(tempPath, path, renameOpt)
        renameDone = true
      } catch {
        case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
          throw new FileAlreadyExistsException(path.toString)
      }
    } finally {
      if (!streamClosed) {
        stream.close()
      }
 
      // 删除临时事务日志
      if (!renameDone) {
        fc.delete(tempPath, false)
      }
    }
}

writeInternal 的实现逻辑很简单,其实就是我们正常的写文件操作,具体如下:

  • 、获取 HDFS 的 FileContext 用于后面写事务日志
  • 、如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试;比如上面我们写事务日志之前磁盘中最新的事务日志文件是 00000000000000000003.json,我们这次写的事务日志文件应该是 00000000000000000004.json,但是由于 Delta Lake 允许多个用户写数据,所以在我们获取最新的事务日志版本到写事务日志期间已经有用户写了一个新的事务日志 00000000000000000004.json,那么我们这次写肯定要失败了。这时候会抛出 FileAlreadyExistsException 异常,以便后面重试。
  • 、写事务日志的时候是先写到表 _delta_lake 目录下的临时文件里面,比如我们这次写的事务日志文件为 00000000000000000004.json,那么会往类似于 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 文件里面写数据的。
  • 、将本次更新操作的事务记录写到临时文件里;
  • 、写完事务日志之后我们需要将临时事务日志移到最后正式的日志文件里面,比如将 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 移到 00000000000000000004.json。大家注意,在写事务日志文件的过程中同样存在多个用户修改表,所以 00000000000000000004.json 文件很可能已经被别的修改占用了,这时候也需要抛出 FileAlreadyExistsException 异常,以便后面重试。

整个事务日志写操作就完成了,我们再回到 doCommit 函数,注意由于 writeInternal 可能会抛出 FileAlreadyExistsException 异常,也就是 deltaLog.store.write(xxx) 调用可能会抛出异常,我们注意看到 doCommit 函数 catch 了这个异常,并在异常捕获里面调用 checkAndRetry(attemptVersion, actions, attemptNumber),这就是事务日志重写过程, checkAndRetry 函数的实现如下:

protected def checkAndRetry(
      checkVersion: Long,
      actions: Seq[Action],
      attemptNumber: Int): Long = recordDeltaOperation(
        deltaLog,
        "delta.commit.retry",
        tags = Map(TAG_LOG_STORE_CLASS -> deltaLog.store.getClass.getName)) {
    //  读取磁盘中持久化的事务日志,并更新内存中事务日志快照
    deltaLog.update()
    //  重试的版本是刚刚更新内存中事务日志快照的版本+1
    val nextAttempt = deltaLog.snapshot.version + 1
 
    //  做相关的合法性判断
    (checkVersion until nextAttempt).foreach { version =>
      val winningCommitActions =
        deltaLog.store.read(deltaFile(deltaLog.logPath, version)).map(Action.fromJson)
      val metadataUpdates = winningCommitActions.collect { case a: Metadata => a }
      val txns = winningCommitActions.collect { case a: SetTransaction => a }
      val protocol = winningCommitActions.collect { case a: Protocol => a }
      val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }.map(
        ci => ci.copy(version = Some(version)))
      val fileActions = winningCommitActions.collect { case f: FileAction => f }
      // If the log protocol version was upgraded, make sure we are still okay.
      // Fail the transaction if we're trying to upgrade protocol ourselves.
      if (protocol.nonEmpty) {
        protocol.foreach { p =>
          deltaLog.protocolRead(p)
          deltaLog.protocolWrite(p)
        }
        actions.foreach {
          case Protocol(_, _) => throw new ProtocolChangedException(commitInfo)
          case _ =>
        }
      }
      // Fail if the metadata is different than what the txn read.
      if (metadataUpdates.nonEmpty) {
        throw new MetadataChangedException(commitInfo)
      }
      // Fail if the data is different than what the txn read.
      if (dependsOnFiles && fileActions.nonEmpty) {
        throw new ConcurrentWriteException(commitInfo)
      }
      // Fail if idempotent transactions have conflicted.
      val txnOverlap = txns.map(_.appId).toSet intersect readTxn.toSet
      if (txnOverlap.nonEmpty) {
        throw new ConcurrentTransactionException(commitInfo)
      }
    }
    logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttempt), retrying.")
    //  开始重试事务日志的写操作
    doCommit(nextAttempt, actions, attemptNumber + 1)
}

checkAndRetry 函数只有在事务日志写冲突的时候才会出现,主要目的是重写当前的事务日志。

  • 、因为上次更新事务日志发生冲突,所以我们需要再一次读取磁盘中持久化的事务日志,并更新内存中事务日志快照;
  • 、重试的版本是刚刚更新内存中事务日志快照的版本+1;
  • 、做相关的合法性判断;
  • 、开始重试事务日志的写操作。

当事务日志成功持久化到磁盘之后,这时候会执行 commit 操作的最后一步,执行 postCommit 函数,其实现如下:

protected def postCommit(commitVersion: Long, commitActions: Seq[Action]): Unit = {
    committed = true
    if (commitVersion != 0 && commitVersion % deltaLog.checkpointInterval == 0) {
      try {
        deltaLog.checkpoint()
      } catch {
        case e: IllegalStateException =>
          logWarning("Failed to checkpoint table state.", e)
      }
    }
}

postCommit 函数实现很简单,就是判断需不需要对事务日志做一次 checkpoint 操作,其中 deltaLog.checkpointInterval 就是通过 spark.databricks.delta.properties.defaults.checkpointInterval 参数设置的,默认每写10次事务日志做一次 checkpoint。

checkpoint 的其实就是将内存中事务日志的最新快照持久化到磁盘里面,如下所示:

-rw-r--r--  1 yangping.wyp  wheel   811B  8 28 19:12 00000000000000000000.json
-rw-r--r--  1 yangping.wyp  wheel   514B  8 28 19:14 00000000000000000001.json
-rw-r--r--  1 yangping.wyp  wheel   711B  8 29 10:54 00000000000000000002.json
-rw-r--r--  1 yangping.wyp  wheel   865B  8 29 10:56 00000000000000000003.json
-rw-r--r--  1 yangping.wyp  wheel   668B  8 29 14:36 00000000000000000004.json
-rw-r--r--  1 yangping.wyp  wheel    13K  8 29 14:36 00000000000000000005.checkpoint.parquet
-rw-r--r--  1 yangping.wyp  wheel   514B  8 29 14:36 00000000000000000005.json
-rw-r--r--  1 yangping.wyp  wheel   514B  8 29 14:36 00000000000000000006.json
-rw-r--r--  1 yangping.wyp  wheel    24B  8 29 14:36 _last_checkpoint

00000000000000000005.checkpoint.parquet 文件就是对事务日志进行 checkpoint 的文件,里面汇总了 00000000000000000000.json - 00000000000000000005.json 之间的所有事务操作记录。所以下一次需要构建事务日志的快照时,只需要从 00000000000000000005.checkpoint.parquet 文件、00000000000000000006.json 文件构造,而无需再读取 00000000000000000000.json - 00000000000000000005.json 之间的事务操作。
同时我们还看到做完 checkpoint 之后还会生成一个 _last_checkpoint 文件,这个其实就是对 CheckpointMetaData 类的持久化操作。里面记录了最后一次 checkpoint 的版本,checkpoint 文件里面的 Action 条数,如下:

⇒  cat _last_checkpoint
{"version":5,"size":10}

注意,其实 CheckpointMetaData 类里面还有个 parts 字段,这个代表 checkpoint 文件有几个分片。因为随着时间的推移,checkpoint 文件也会变得很大,如果只写到一个 checkpoint 文件里面效率不够好,这时候会对 checkpoint 文件进行拆分,拆分成几个文件是记录到 parts 里面,但是目前开源版本的 Delta Lake 尚无这个功能,也不知道数砖后面会不会开源

写在最后

为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds

xxx

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4天前
|
存储 Ubuntu Apache
如何在 Ubuntu VPS 上配置 Apache 的日志记录和日志轮转
如何在 Ubuntu VPS 上配置 Apache 的日志记录和日志轮转
16 6
|
3天前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
19 0
|
1月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
76 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
30天前
|
分布式计算 Apache Spark
|
2月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
76 6
|
2月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
3月前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
2月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
3月前
|
Apache
web服务器(Apache)访问日志(access_log)详细解释
web服务器(Apache)访问日志(access_log)详细解释
|
6天前
|
SQL 存储 JSON
更快更强,SLS 推出高性能 SPL 日志查询模式
从海量的日志数据中,按照各种灵活的条件进行即时查询搜索,是可观测场景下的基本需求。本文介绍了 SLS 新推出的高性能 SPL 日志查询模式,支持 Unix 风格级联管道式语法,以及各种丰富的 SQL 处理函数。同时通过计算下推、向量化计算等优化,使得 SPL 查询可以在数秒内处理亿级数据,并支持 SPL 过滤结果分布图、随机翻页等特性。
228 66

推荐镜像

更多