Apache Spark Delta Lake 事务日志实现源码分析

简介: Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。

Apache Spark Delta Lake 事务日志实现源码分析

我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。在看本文时,强烈建议先看一下《深入理解 Apache Spark Delta Lake 的事务日志》文章。

Delta Lake 更新数据事务实现

Delta Lake 里面所有对表数据的更新(插入数据、更新数据、删除数据)都需要进行下面这些步骤,其主要目的是把删除哪些文件、新增哪些文件等记录写入到事务日志里面,也就是 _delta_log 目录下的 json 文件,通过这个实现 Delta Lake 的 ACID 以及时间旅行。下面我们进入事务日志提交的切入口 org.apache.spark.sql.delta.OptimisticTransaction#commit,持久化事务操作日志都是需要调用这个函数进行的。commit 函数实现如下:

def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(
      deltaLog,
      "delta.commit") {
    val version = try {
      // 事务日志提交之前需要先做一些工作,比如如果更新操作是第一次进行的,那么需要初始化 Protocol,
      // 还需要将用户对 Delta Lake 表的设置持久化到事务日志里面
      var finalActions = prepareCommit(actions, op)

      // 如果这次更新操作需要删除之前的文件,那么 isBlindAppend 为 false,否则为 true
      val isBlindAppend = {
        val onlyAddFiles =
          finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
        onlyAddFiles && !dependsOnFiles
      }

      // 如果 commitInfo.enabled 参数设置为 true,那么还需要把 commitInfo 记录到事务日志里面
      if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {
        commitInfo = CommitInfo(
          clock.getTimeMillis(),
          op.name,
          op.jsonEncodedValues,
          Map.empty,
          Some(readVersion).filter(_ >= 0),
          None,
          Some(isBlindAppend))
        finalActions = commitInfo +: finalActions
      }

      // 真正写事务日志,如果发生版本冲突会重试直到事务日志写成功
      val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)
      logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
      // 对事务日志进行 checkpoint 操作
      postCommit(commitVersion, finalActions)
      commitVersion
    } catch {
      case e: DeltaConcurrentModificationException =>
        recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)
        throw e
      case NonFatal(e) =>
        recordDeltaEvent(
          deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
        throw e
    }
    version
}

我们先从这个函数的两个参数开始介绍。

  • _actions: Seq[Action]_:Delta Lake 表更新操作产生的新文件(AddFile)和需要删除文件的列表(RemoveFile)。如果是 Structured Streaming 作业,还会记录 SetTransaction 记录,里面会存储作业的 query id(sql.streaming.queryId)、batchId 以及当前时间。这个就是我们需要持久化到事务日志里面的数据。
  • _op: DeltaOperations.Operation_:Delta 操作类型,比如 WRITE、STREAMING UPDATE、DELETE、MERGE 以及 UPDATE 等。

在 commit 函数里面主要分为三步:prepareCommit、doCommit 以及 postCommit。prepareCommit 的实现如下:

protected def prepareCommit(
      actions: Seq[Action],
      op: DeltaOperations.Operation): Seq[Action] = {
 
    assert(!committed, "Transaction already committed.")
 
    //  如果我们更新了表的 Metadata 信息,那么需要将其写入到事务日志里面
    var finalActions = newMetadata.toSeq ++ actions
    val metadataChanges = finalActions.collect { case m: Metadata => m }
    assert(
      metadataChanges.length <= 1,
      "Cannot change the metadata more than once in a transaction.")
    metadataChanges.foreach(m => verifyNewMetadata(m))
 
    //   首次提交事务日志,那么会确保 _delta_log 目录要存在,
    // 然后检查 finalActions 里面是否有 Protocol,没有的话需要初始化协议版本
    if (snapshot.version == -1) {
      deltaLog.ensureLogDirectoryExist()
      if (!finalActions.exists(_.isInstanceOf[Protocol])) {
        finalActions = Protocol() +: finalActions
      }
    }
 
    finalActions = finalActions.map {
      //  第一次提交,并且是 Metadata那么会将 Delta Lake 的配置信息加入到 Metadata 里面
      case m: Metadata if snapshot.version == -1 =>
        val updatedConf = DeltaConfigs.mergeGlobalConfigs(
          spark.sessionState.conf, m.configuration, Protocol())
        m.copy(configuration = updatedConf)
      case other => other
    }
 
    deltaLog.protocolWrite(
      snapshot.protocol,
      logUpgradeMessage = !actions.headOption.exists(_.isInstanceOf[Protocol]))
 
    //  如果 actions 里面有删除的文件,那么需要检查 Delta Lake 是否支持删除
    val removes = actions.collect { case r: RemoveFile => r }
    if (removes.exists(_.dataChange)) deltaLog.assertRemovable()
 
    finalActions
}

prepareCommit 里面做的事情比较简单,主要对事务日志进行补全等操作。具体为

  • 、由于 Delta Lake 表允许对已经存在的表模式进行修改,比如添加了新列,或者覆盖原有表的模式等。那么这时候我们需要将新的 Metadata 写入到事务日志里面。Metadata 里面存储了表的 schema、分区列、表的配置、表的创建时间。注意,除了表的 schema 和分区字段可以在后面修改,其他的信息都不可以修改的。
  • 、如果是首次提交事务日志,那么先检查表的 _delta_log 目录是否存在,不存在则创建。然后检查是否设置了协议的版本,如果没有设置,则使用默认的协议版本,默认的协议版本中 readerVersion = 1,writerVersion = 2;
  • 、如果是第一次提交,并且是 Metadata ,那么会将 Delta Lake 的配置信息加入到 Metadata 里面。Delta Lake 表的配置信息主要是在 org.apache.spark.sql.delta.sources.DeltaSQLConf 类里面定义的,比如我们可以在创建 Delta Lake 表的时候指定多久做一次 Checkpoint。
  • 、由于我们可以通过 spark.databricks.delta.properties.defaults.appendOnly 参数将表设置为仅允许追加,所以如果当 actions 里面存在 RemoveFile,那么我们需要判断表是否允许删除。

我们回到 commit 函数里面,在执行完 prepareCommit 之后得到了 finalActions 列表,这些信息就是我们需要写入到事务日志里面的数据。紧接着会判断这次事务变更是否需要删除之前的文件,如果是,那么 isBlindAppend 为 false,否则为 true。

当 commitInfo.enabled 参数设置为 true(默认),那么还需要将 commitInfo 写入到事务日志文件里面。CommitInfo 里面包含了操作时间、操作的类型(WRITEUPDATE)、操作类型(Overwrite)等重要信息。最后到了 doCommit 函数的调用,大家注意看第一个参数传递的是 snapshot.version + 1snapshot.version 是事务日志中最新的版本,比如 _delta_lake 目录下的文件如下:

-rw-r--r--  1 yangping.wyp  wheel   811B  8 28 19:12 00000000000000000000.json
-rw-r--r--  1 yangping.wyp  wheel   514B  8 28 19:14 00000000000000000001.json
-rw-r--r--  1 yangping.wyp  wheel   711B  8 29 10:54 00000000000000000002.json
-rw-r--r--  1 yangping.wyp  wheel   865B  8 29 10:56 00000000000000000003.json

那么 snapshot.version 的值就是3,所以这次更新操作的版本应该是4。我们来看下 doCommit 函数的实现:

private def doCommit(
      attemptVersion: Long,
      actions: Seq[Action],
      attemptNumber: Int): Long = deltaLog.lockInterruptibly {
    try {
      logDebug(s"Attempting to commit version $attemptVersion with ${actions.size} actions")
       
      //  真正写事务日志的操作
      deltaLog.store.write(
        deltaFile(deltaLog.logPath, attemptVersion),
        actions.map(_.json).toIterator)
      val commitTime = System.nanoTime()
      //  由于发生了数据更新,所以更新内存中事务日志的最新快照,并做相关判断
      val postCommitSnapshot = deltaLog.update()
      if (postCommitSnapshot.version < attemptVersion) {
        throw new IllegalStateException(
          s"The committed version is $attemptVersion " +
            s"but the current version is ${postCommitSnapshot.version}.")
      }
 
      //  发送一些统计信息
      var numAbsolutePaths = 0
      var pathHolder: Path = null
      val distinctPartitions = new mutable.HashSet[Map[String, String]]
      val adds = actions.collect {
        case a: AddFile =>
          pathHolder = new Path(new URI(a.path))
          if (pathHolder.isAbsolute) numAbsolutePaths += 1
          distinctPartitions += a.partitionValues
          a
      }
      val stats = CommitStats(
        startVersion = snapshot.version,
        commitVersion = attemptVersion,
        readVersion = postCommitSnapshot.version,
        txnDurationMs = NANOSECONDS.toMillis(commitTime - txnStartNano),
        commitDurationMs = NANOSECONDS.toMillis(commitTime - commitStartNano),
        numAdd = adds.size,
        numRemove = actions.collect { case r: RemoveFile => r }.size,
        bytesNew = adds.filter(_.dataChange).map(_.size).sum,
        numFilesTotal = postCommitSnapshot.numOfFiles,
        sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
        protocol = postCommitSnapshot.protocol,
        info = Option(commitInfo).map(_.copy(readVersion = None, isolationLevel = None)).orNull,
        newMetadata = newMetadata,
        numAbsolutePaths,
        numDistinctPartitionsInAdd = distinctPartitions.size,
        isolationLevel = null)
      recordDeltaEvent(deltaLog, "delta.commit.stats", data = stats)
 
      attemptVersion
    } catch {
      case e: java.nio.file.FileAlreadyExistsException =>
        checkAndRetry(attemptVersion, actions, attemptNumber)
    }
}
  • 、这里就是真正写事务日志的操作,其中 store 是通过 spark.delta.logStore.class 参数指定的,目前支持 HDFS、S3、Azure 以及 Local 等存储介质。默认是 HDFS。具体的写事务操作参见下面的介绍。
  • 、持久化事务日志之后,更新内存中的事务日志最新的快照,然后做相关的合法性校验;
  • 、发送一些统计信息。这里应该是 databricks 里面含有的功能,开源版本这里面其实并没有做什么操作。

下面我们开看看真正写事务日志的实现,为了简单起见,我们直接查看 HDFSLogStore 类中对应的方法,主要涉及 writeInternal,其实现如下:

private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = {
    //  获取 HDFS 的 FileContext 用于后面写事务日志
    val fc = getFileContext(path)
 
    //  如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试
    if (!overwrite && fc.util.exists(path)) {
      // This is needed for the tests to throw error with local file system
      throw new FileAlreadyExistsException(path.toString)
    }
 
    //  事务日志先写到临时文件
    val tempPath = createTempPath(path)
    var streamClosed = false // This flag is to avoid double close
    var renameDone = false // This flag is to save the delete operation in most of cases.
    val stream = fc.create(
      tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
 
    try {
      //  将本次修改产生的 actions 写入到临时事务日志里
      actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
      stream.close()
      streamClosed = true
      try {
        val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE
        //  将临时的事务日志移到正式的事务日志里面,如果移动失败则抛出异常,后面再重试
        fc.rename(tempPath, path, renameOpt)
        renameDone = true
      } catch {
        case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
          throw new FileAlreadyExistsException(path.toString)
      }
    } finally {
      if (!streamClosed) {
        stream.close()
      }
 
      // 删除临时事务日志
      if (!renameDone) {
        fc.delete(tempPath, false)
      }
    }
}

writeInternal 的实现逻辑很简单,其实就是我们正常的写文件操作,具体如下:

  • 、获取 HDFS 的 FileContext 用于后面写事务日志
  • 、如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试;比如上面我们写事务日志之前磁盘中最新的事务日志文件是 00000000000000000003.json,我们这次写的事务日志文件应该是 00000000000000000004.json,但是由于 Delta Lake 允许多个用户写数据,所以在我们获取最新的事务日志版本到写事务日志期间已经有用户写了一个新的事务日志 00000000000000000004.json,那么我们这次写肯定要失败了。这时候会抛出 FileAlreadyExistsException 异常,以便后面重试。
  • 、写事务日志的时候是先写到表 _delta_lake 目录下的临时文件里面,比如我们这次写的事务日志文件为 00000000000000000004.json,那么会往类似于 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 文件里面写数据的。
  • 、将本次更新操作的事务记录写到临时文件里;
  • 、写完事务日志之后我们需要将临时事务日志移到最后正式的日志文件里面,比如将 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 移到 00000000000000000004.json。大家注意,在写事务日志文件的过程中同样存在多个用户修改表,所以 00000000000000000004.json 文件很可能已经被别的修改占用了,这时候也需要抛出 FileAlreadyExistsException 异常,以便后面重试。

整个事务日志写操作就完成了,我们再回到 doCommit 函数,注意由于 writeInternal 可能会抛出 FileAlreadyExistsException 异常,也就是 deltaLog.store.write(xxx) 调用可能会抛出异常,我们注意看到 doCommit 函数 catch 了这个异常,并在异常捕获里面调用 checkAndRetry(attemptVersion, actions, attemptNumber),这就是事务日志重写过程, checkAndRetry 函数的实现如下:

protected def checkAndRetry(
      checkVersion: Long,
      actions: Seq[Action],
      attemptNumber: Int): Long = recordDeltaOperation(
        deltaLog,
        "delta.commit.retry",
        tags = Map(TAG_LOG_STORE_CLASS -> deltaLog.store.getClass.getName)) {
    //  读取磁盘中持久化的事务日志,并更新内存中事务日志快照
    deltaLog.update()
    //  重试的版本是刚刚更新内存中事务日志快照的版本+1
    val nextAttempt = deltaLog.snapshot.version + 1
 
    //  做相关的合法性判断
    (checkVersion until nextAttempt).foreach { version =>
      val winningCommitActions =
        deltaLog.store.read(deltaFile(deltaLog.logPath, version)).map(Action.fromJson)
      val metadataUpdates = winningCommitActions.collect { case a: Metadata => a }
      val txns = winningCommitActions.collect { case a: SetTransaction => a }
      val protocol = winningCommitActions.collect { case a: Protocol => a }
      val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }.map(
        ci => ci.copy(version = Some(version)))
      val fileActions = winningCommitActions.collect { case f: FileAction => f }
      // If the log protocol version was upgraded, make sure we are still okay.
      // Fail the transaction if we're trying to upgrade protocol ourselves.
      if (protocol.nonEmpty) {
        protocol.foreach { p =>
          deltaLog.protocolRead(p)
          deltaLog.protocolWrite(p)
        }
        actions.foreach {
          case Protocol(_, _) => throw new ProtocolChangedException(commitInfo)
          case _ =>
        }
      }
      // Fail if the metadata is different than what the txn read.
      if (metadataUpdates.nonEmpty) {
        throw new MetadataChangedException(commitInfo)
      }
      // Fail if the data is different than what the txn read.
      if (dependsOnFiles && fileActions.nonEmpty) {
        throw new ConcurrentWriteException(commitInfo)
      }
      // Fail if idempotent transactions have conflicted.
      val txnOverlap = txns.map(_.appId).toSet intersect readTxn.toSet
      if (txnOverlap.nonEmpty) {
        throw new ConcurrentTransactionException(commitInfo)
      }
    }
    logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttempt), retrying.")
    //  开始重试事务日志的写操作
    doCommit(nextAttempt, actions, attemptNumber + 1)
}

checkAndRetry 函数只有在事务日志写冲突的时候才会出现,主要目的是重写当前的事务日志。

  • 、因为上次更新事务日志发生冲突,所以我们需要再一次读取磁盘中持久化的事务日志,并更新内存中事务日志快照;
  • 、重试的版本是刚刚更新内存中事务日志快照的版本+1;
  • 、做相关的合法性判断;
  • 、开始重试事务日志的写操作。

当事务日志成功持久化到磁盘之后,这时候会执行 commit 操作的最后一步,执行 postCommit 函数,其实现如下:

protected def postCommit(commitVersion: Long, commitActions: Seq[Action]): Unit = {
    committed = true
    if (commitVersion != 0 && commitVersion % deltaLog.checkpointInterval == 0) {
      try {
        deltaLog.checkpoint()
      } catch {
        case e: IllegalStateException =>
          logWarning("Failed to checkpoint table state.", e)
      }
    }
}

postCommit 函数实现很简单,就是判断需不需要对事务日志做一次 checkpoint 操作,其中 deltaLog.checkpointInterval 就是通过 spark.databricks.delta.properties.defaults.checkpointInterval 参数设置的,默认每写10次事务日志做一次 checkpoint。

checkpoint 的其实就是将内存中事务日志的最新快照持久化到磁盘里面,如下所示:

-rw-r--r--  1 yangping.wyp  wheel   811B  8 28 19:12 00000000000000000000.json
-rw-r--r--  1 yangping.wyp  wheel   514B  8 28 19:14 00000000000000000001.json
-rw-r--r--  1 yangping.wyp  wheel   711B  8 29 10:54 00000000000000000002.json
-rw-r--r--  1 yangping.wyp  wheel   865B  8 29 10:56 00000000000000000003.json
-rw-r--r--  1 yangping.wyp  wheel   668B  8 29 14:36 00000000000000000004.json
-rw-r--r--  1 yangping.wyp  wheel    13K  8 29 14:36 00000000000000000005.checkpoint.parquet
-rw-r--r--  1 yangping.wyp  wheel   514B  8 29 14:36 00000000000000000005.json
-rw-r--r--  1 yangping.wyp  wheel   514B  8 29 14:36 00000000000000000006.json
-rw-r--r--  1 yangping.wyp  wheel    24B  8 29 14:36 _last_checkpoint

00000000000000000005.checkpoint.parquet 文件就是对事务日志进行 checkpoint 的文件,里面汇总了 00000000000000000000.json - 00000000000000000005.json 之间的所有事务操作记录。所以下一次需要构建事务日志的快照时,只需要从 00000000000000000005.checkpoint.parquet 文件、00000000000000000006.json 文件构造,而无需再读取 00000000000000000000.json - 00000000000000000005.json 之间的事务操作。
同时我们还看到做完 checkpoint 之后还会生成一个 _last_checkpoint 文件,这个其实就是对 CheckpointMetaData 类的持久化操作。里面记录了最后一次 checkpoint 的版本,checkpoint 文件里面的 Action 条数,如下:

⇒  cat _last_checkpoint
{"version":5,"size":10}

注意,其实 CheckpointMetaData 类里面还有个 parts 字段,这个代表 checkpoint 文件有几个分片。因为随着时间的推移,checkpoint 文件也会变得很大,如果只写到一个 checkpoint 文件里面效率不够好,这时候会对 checkpoint 文件进行拆分,拆分成几个文件是记录到 parts 里面,但是目前开源版本的 Delta Lake 尚无这个功能,也不知道数砖后面会不会开源

写在最后

为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds

xxx

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
目录
相关文章
|
存储 监控 安全
实时记录和查看Apache 日志
Apache 是一个开源、跨平台的 Web 服务器,保护其平台需监控活动和事件。Apache 日志分为访问日志和错误日志,分别记录用户请求和服务器错误信息。EventLog Analyzer 是一款强大的日志查看工具,提供集中收集、分析、实时警报和安全监控功能,帮助管理员识别趋势、检测威胁并确保合规性。通过直观的仪表板和自动化响应,它简化了大规模日志管理,增强了 Apache 服务器的安全性和性能。
429 5
|
11月前
|
人工智能 运维 监控
Aipy实战:分析apache2日志中的网站攻击痕迹
Apache2日志系统灵活且信息全面,但安全分析、实时分析和合规性审计存在较高技术门槛。为降低难度,可借助AI工具如aipy高效分析日志,快速发现攻击痕迹并提供反制措施。通过结合AI与学习技术知识,新手运维人员能更轻松掌握复杂日志分析任务,提升工作效率与技能水平。
|
监控 安全 BI
优化 Apache 日志记录的 5 个最佳实践
Apache 日志记录对于维护系统运行状况和网络安全至关重要,其核心包括访问日志与错误日志的管理。通过制定合理的日志策略,如选择合适的日志格式、利用条件日志减少冗余、优化日志级别、使用取证模块提升安全性及实施日志轮换,可有效提高日志可用性并降低系统负担。此外,借助 Eventlog Analyzer 等专业工具,能够实现日志的高效收集、可视化分析与威胁检测,从而精准定位安全隐患、评估服务器性能,并满足合规需求,为强化网络安全提供有力支持。
317 0
优化 Apache 日志记录的 5 个最佳实践
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
1095 3
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
867 7
MySQL事务日志-Undo Log工作原理分析
|
7月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1233 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
574 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
9月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
1027 9
Apache Flink:从实时数据分析到实时AI
|
9月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
849 0
|
8月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
2768 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架

推荐镜像

更多