万字长文解析删除Topic流程领导再也不用担心我排查生产环境问题了(附教学视频,建议收藏!!!)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 万字长文解析删除Topic流程领导再也不用担心我排查生产环境问题了(附教学视频,建议收藏!!!)

文章目录

删除Topic命令

相关配置

源码解析

1. 客户端发起删除Topic的请求

2. Controller处理deleteTopics的请求

3. Controller监听zk变更 执行删除Topic流程

3.1 resumeDeletions 执行删除方法

3.2 TopicDeletionManager.onPartitionDeletion

4. Brokers 接受StopReplica请求

4.1 日志清理定时线程

5.StopReplica 请求成功 执行回调接口

6. Controller启动时候 尝试继续处理待删除的Topic

6.1 获取需要被删除的Topic和暂时不能删除的Topic

6.2 topicDeletionManager.init初始化删除管理器

6.3 topicDeletionManager.tryTopicDeletion尝试恢复删除

源码总结

Q&A

什么时候在/admin/delete_topics写入节点的

什么时候真正执行删除Topic磁盘日志

为什么正在重新分配的Topic不能被删除

如果在`/admin/delete_topics/`中手动写入一个节点会不会正常删除

如果直接删除ZK上的`/brokers/topics/{topicName}`节点会怎样

Controller通知Brokers 执行StopReplica是通知所有的Broker还是只通知跟被删除Topic有关联的Broker?

删除过程有Broker不在线 或者执行失败怎么办

ReplicaStateMachine 副本状态机

在重新分配的过程中,如果执行删除操作会怎么样

删除Topic命令

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test


支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来

例如: 删除以create_topic_byhand_zk为开头的topic;


bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “create_topic_byhand_zk.*”

.表示任意匹配除换行符 \n 之外的任何单字符。要匹配 . ,请使用 . 。

·*·:匹配前面的子表达式零次或多次。要匹配 * 字符,请使用 *。

.* : 任意字符


删除任意Topic (慎用)


bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “.*?”


更多的用法请参考正则表达式


相关配置

配置 描述 默认

file.delete.delay.ms topic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件 60000

delete.topic.enable 是否能够删除topic true

源码解析

如果觉得阅读源码解析太枯燥,请直接看 源码总结及其后面部分


1. 客户端发起删除Topic的请求

在【kafka源码】TopicCommand之创建Topic源码解析 里面已经分析过了整个请求流程; 所以这里就不再详细的分析请求的过程了,直接看重点;


image.png

image.png

向Controller发起 deleteTopics请求

2. Controller处理deleteTopics的请求

KafkaApis.handle

AdminManager.deleteTopics

  /**
    * Delete topics and wait until the topics have been completely deleted.
    * The callback function will be triggered either when timeout, error or the topics are deleted.
    */
  def deleteTopics(timeout: Int,
                   topics: Set[String],
                   responseCallback: Map[String, Errors] => Unit): Unit = {
    // 1. map over topics calling the asynchronous delete
    val metadata = topics.map { topic =>
        try {
          // zk中写入数据 标记要被删除的topic /admin/delete_topics/Topic名称
          adminZkClient.deleteTopic(topic)
          DeleteTopicMetadata(topic, Errors.NONE)
        } catch {
          case _: TopicAlreadyMarkedForDeletionException =>
            // swallow the exception, and still track deletion allowing multiple calls to wait for deletion
            DeleteTopicMetadata(topic, Errors.NONE)
          case e: Throwable =>
            error(s"Error processing delete topic request for topic $topic", e)
            DeleteTopicMetadata(topic, Errors.forException(e))
        }
    }
    // 2. 如果客户端传过来的timeout<=0或者 写入zk数据过程异常了 则执行下面的,直接返回异常
    if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) {
      val results = metadata.map { deleteTopicMetadata =>
        // ignore topics that already have errors
        if (deleteTopicMetadata.error == Errors.NONE) {
          (deleteTopicMetadata.topic, Errors.REQUEST_TIMED_OUT)
        } else {
          (deleteTopicMetadata.topic, deleteTopicMetadata.error)
        }
      }.toMap
      responseCallback(results)
    } else {
      // 3. else pass the topics and errors to the delayed operation and set the keys
      val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback)
      val delayedDeleteKeys = topics.map(new TopicKey(_)).toSeq
      // try to complete the request immediately, otherwise put it into the purgatory
      topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys)
    }
  }

zk中写入数据topic/admin/delete_topics/Topic名称; 标记要被删除的Topic

如果客户端传过来的timeout<=0或者 写入zk数据过程异常了 则直接返回异常

3. Controller监听zk变更 执行删除Topic流程

KafkaController.processTopicDeletion

  private def processTopicDeletion(): Unit = {
    if (!isActive) return
    var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
    if (nonExistentTopics.nonEmpty) {
      warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
      zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
    }
    topicsToBeDeleted --= nonExistentTopics
    if (config.deleteTopicEnable) {
      if (topicsToBeDeleted.nonEmpty) {
        info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
        // 标记暂时不可删除的Topic
        topicsToBeDeleted.foreach { topic =>
          val partitionReassignmentInProgress =
            controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
          if (partitionReassignmentInProgress)
            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
              reason = "topic reassignment in progress")
        }
        // add topic to deletion list
        topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
      }
    } else {
      // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
      info(s"Removing $topicsToBeDeleted since delete topic is disabled")
      zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
    }
  }

如果/admin/delete_topics/下面的节点有不存在的Topic,则清理掉

如果配置了delete.topic.enable=false不可删除Topic的话,则将/admin/delete_topics/下面的节点全部删除,然后流程结束

delete.topic.enable=true; 将主题标记为不符合删除条件,放到topicsIneligibleForDeletion中; 不符合删除条件的是:Topic分区正在进行分区重分配

将Topic添加到删除Topic列表topicsToBeDeleted中;

然后调用TopicDeletionManager.resumeDeletions()方法执行删除操作

3.1 resumeDeletions 执行删除方法

TopicDeletionManager.resumeDeletions()

  private def resumeDeletions(): Unit = {
    val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
    val topicsEligibleForRetry = mutable.Set.empty[String]
    val topicsEligibleForDeletion = mutable.Set.empty[String]
    if (topicsQueuedForDeletion.nonEmpty)
    topicsQueuedForDeletion.foreach { topic =>
      // if all replicas are marked as deleted successfully, then topic deletion is done
      //如果所有副本都被标记为删除成功了,然后执行删除Topic成功操作; 
      if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
        // clear up all state for this topic from controller cache and zookeeper
        //执行删除Topic成功之后的操作; 
        completeDeleteTopic(topic)
        info(s"Deletion of topic $topic successfully completed")
      } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
        // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
        // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
        // or there is at least one failed replica (which means topic deletion should be retried).
        if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
          topicsEligibleForRetry += topic
        }
      }
      // Add topic to the eligible set if it is eligible for deletion.
      if (isTopicEligibleForDeletion(topic)) {
        info(s"Deletion of topic $topic (re)started")
        topicsEligibleForDeletion += topic
      }
    }
    // topic deletion retry will be kicked off
    if (topicsEligibleForRetry.nonEmpty) {
      retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
    }
    // topic deletion will be kicked off
    if (topicsEligibleForDeletion.nonEmpty) {
      //删除Topic,发送UpdataMetaData请求
      onTopicDeletion(topicsEligibleForDeletion)
    }
  }
}

重点看看onTopicDeletion方法,标记所有待删除分区;向Brokers发送updateMetadataRequest请求,告知Brokers这个主题正在被删除,并将Leader设置为LeaderAndIsrLeaderDuringDelete;


将待删除的Topic的所有分区,执行分区状态机的转换 ;当前状态–>OfflinePartition->NonExistentPartition ; 这两个状态转换只是在当前Controller内存中更新了一下状态; 关于状态机请看 【kafka源码】Controller中的状态机TODO…;

client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic)) 向待删除Topic分区发送UpdateMetadata请求; 这个时候更新了什么数据呢?


image.png

image.png

看上面图片源码, 发送UpdateMetadata请求的时候把分区的Leader= -2; 表示这个分区正在被删除;那么所有正在被删除的分区就被找到了;拿到这些待删除分区之后干嘛呢?

更新一下限流相关信息

调用groupCoordinator.handleDeletedPartitions(deletedPartitions): 清除给定的deletedPartitions的组偏移量以及执行偏移量删除的函数;就是现在该分区不能提供服务啦,不能被消费啦

详细请看 Kafka的元数据更新UpdateMetadata


调用TopicDeletionManager.onPartitionDeletion接口如下;

3.2 TopicDeletionManager.onPartitionDeletion

将所有Dead replicas 副本直接移动到ReplicaDeletionIneligible状态,如果某些副本已死,也将相应的主题标记为不适合删除,因为它无论如何都不会成功完成

副本状态转换成OfflineReplica; 这个时候会对该Topic的所有副本所在Broker发起StopReplicaRequest 请求;(参数deletePartitions = false,表示还不执行删除操作); 以便他们停止向Leader发送fetch请求; 关于状态机请看 【kafka源码】Controller中的状态机TODO…;

副本状态转换成 ReplicaDeletionStarted状态,这个时候会对该Topic的所有副本所在Broker发起StopReplicaRequest 请求;(参数deletePartitions = true,表示执行删除操作)。这将发送带有 deletePartition=true 的 StopReplicaRequest 。并将删除相应分区的所有副本中的所有持久数据

4. Brokers 接受StopReplica请求

最终调用的是接口

ReplicaManager.stopReplica ==> LogManager.asyncDelete


将给定主题分区“logdir”的目录重命名为“logdir.uuid.delete”,并将其添加到删除队列中

例如 :

image.png

def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
    val removedLog: Log = logCreationOrDeletionLock synchronized {
      //将待删除的partition在 Logs中删除掉
      if (isFuture)
        futureLogs.remove(topicPartition)
      else
        currentLogs.remove(topicPartition)
    }
    if (removedLog != null) {
      //我们需要等到要删除的日志上没有更多的清理任务,然后才能真正删除它。
      if (cleaner != null && !isFuture) {
        cleaner.abortCleaning(topicPartition)
        cleaner.updateCheckpoints(removedLog.dir.getParentFile)
      }
      //重命名topic副本文件夹 命名规则 topic-uuid-delete
      removedLog.renameDir(Log.logDeleteDirName(topicPartition))
      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
      checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
      //将Log添加到待删除Log队列中,等待删除
      addLogToBeDeleted(removedLog)
    } else if (offlineLogDirs.nonEmpty) {
      throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}")
    }
    removedLog
  }

4.1 日志清理定时线程

上面我们知道最终是将待删除的Log添加到了logsToBeDeleted这个队列中; 这个队列就是待删除Log队列,有一个线程 kafka-delete-logs专门来处理的;我们来看看这个线程怎么工作的


LogManager.startup 启动的时候 ,启动了一个定时线程

   scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         deleteLogs _,
                         delay = InitialTaskDelayMs,
                         unit = TimeUnit.MILLISECONDS)

删除日志的线程

  /**
   *  Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
   *  has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
   *  considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
   *  after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
   *  `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
   * 删除标记为删除的日志文件;
   * file.delete.delay.ms 文件延迟删除时间 默认60000毫秒
   * 
   */
  private def deleteLogs(): Unit = {
    var nextDelayMs = 0L
    try {
      def nextDeleteDelayMs: Long = {
        if (!logsToBeDeleted.isEmpty) {
          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
        } else
          currentDefaultConfig.fileDeleteDelayMs
      }
      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
        val (removedLog, _) = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            //立即彻底删除此日志目录和文件系统中的所有内容
            removedLog.delete()
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
          } catch {
            case e: KafkaStorageException =>
              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
          }
        }
      }
    } catch {
      case e: Throwable =>
        error(s"Exception in kafka-delete-logs thread.", e)
    } finally {
      try {
        scheduler.schedule("kafka-delete-logs",
          deleteLogs _,
          delay = nextDelayMs,
          unit = TimeUnit.MILLISECONDS)
      } catch {
        case e: Throwable =>
          if (scheduler.isStarted) {
            // No errors should occur unless scheduler has been shutdown
            error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
          }
      }
    }
  }

file.delete.delay.ms 决定延迟多久删除


5.StopReplica 请求成功 执行回调接口

Topic删除完成, 清理相关信息

触发这个接口的地方是: 每个Broker执行删除StopReplica成功之后,都会执行一个回调函数;TopicDeletionStopReplicaResponseReceived ; 当然调用方是Controller,回调到的也就是Controller;


传入回调函数的地方


image.png

image.png

执行回调函数 KafkaController.processTopicDeletionStopReplicaResponseReceived


如果回调有异常,删除失败则将副本状态转换成==》ReplicaDeletionIneligible,并且重新执行resumeDeletions方法;


如果回调正常,则变更状态 ReplicaDeletionStarted==》ReplicaDeletionSuccessful;并且重新执行resumeDeletions方法;


resumeDeletions方法会判断所有副本是否均被删除,如果全部删除了就会执行下面的completeDeleteTopic代码;否则会继续删除未被成功删除的副本

  private def completeDeleteTopic(topic: String): Unit = {
    // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
    // firing before the new topic listener when a deleted topic gets auto created
    client.mutePartitionModifications(topic)
    val replicasForDeletedTopic = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
    // controller will remove this replica from the state machine as well as its partition assignment cache
    replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
    controllerContext.topicsToBeDeleted -= topic
    controllerContext.topicsWithDeletionStarted -= topic
    client.deleteTopic(topic, controllerContext.epochZkVersion)
    controllerContext.removeTopic(topic)
  }

清理内存中相关信息

取消注册被删除Topic的相关节点监听器;节点是/brokers/topics/Topic名称

删除zk中的数据包括;/brokers/topics/Topic名称、/config/topics/Topic名称 、/admin/delete_topics/Topic名称

6. Controller启动时候 尝试继续处理待删除的Topic

我们之前分析Controller上线的时候有看到

KafkaController.onControllerFailover

以下省略部分代码

 private def onControllerFailover(): Unit = {
    // 获取哪些Topic需要被删除,哪些暂时还不能删除
     val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
    info("Initializing topic deletion manager")
    //Topic删除管理器初始化
    topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
    //Topic删除管理器 尝试开始删除Topi
    topicDeletionManager.tryTopicDeletion()

6.1 获取需要被删除的Topic和暂时不能删除的Topic

fetchTopicDeletionsInProgress


topicsToBeDeleted所有需要被删除的Topic从zk中/admin/delete_topics 获取

topicsIneligibleForDeletion有一部分Topic还暂时不能被删除:

①. Topic任意分区正在进行副本重分配

②. Topic任意分区副本存在不在线的情况(只有topic有一个副本所在的Broker异常就不能能删除)

将得到的数据存在在controllerContext内存中

6.2 topicDeletionManager.init初始化删除管理器

如果服务器配置delete.topic.enable=false不允许删除topic的话,则删除/admin/delete_topics 中的节点; 这个节点下面的数据是标记topic需要被删除的意思;

6.3 topicDeletionManager.tryTopicDeletion尝试恢复删除

这里又回到了上面分析过的resumeDeletions啦;恢复删除操作

  def tryTopicDeletion(): Unit = {
    if (isDeleteTopicEnabled) {
      resumeDeletions()
    }
  }

源码总结

整个Topic删除, 请看下图

image.png

几个注意点:


Controller 也是Broker

Controller发起删除请求的时候,只是跟相关联的Broker发起删除请求;

Broker不在线或者删除失败,Controller会持续进行删除操作; 或者Broker上线之后继续进行删除操作

Q&A

列举在此主题下比较常见的问题; 如果读者有其他问题可以在评论区评论, 博主会不定期更新


什么时候在/admin/delete_topics写入节点的

客户端发起删除操作deleteTopics的时候,Controller响应deleteTopics请求, 这个时候Controller就将待删除Topic写入了zk的/admin/delete_topics/Topic名称节点中了;


什么时候真正执行删除Topic磁盘日志

Controller监听到zk节点/admin/delete_topics之后,向所有存活的Broker发送删除Topic的请求; Broker收到请求之后将待删除副本标记为–delete后缀; 然后会有专门日志清理现场来进行真正的删除操作; 延迟多久删除是靠file.delete.delay.ms来决定的;默认是60000毫秒 = 一分钟


为什么正在重新分配的Topic不能被删除

正在重新分配的Topic,你都不知道它具体会落在哪个地方,所以肯定也就不知道啥时候删除啊;

等分配完毕之后,就会继续删除流程


如果在/admin/delete_topics/中手动写入一个节点会不会正常删除

如果写入的节点,并不是一个真实存在的Topic;则将会直接被删除

当然要注意如果配置了delete.topic.enable=false不可删除Topic的话,则将/admin/delete_topics/下面的节点全部删除,然后流程结束

如果写入的节点是一个真实存在的Topic; 则将会执行删除Topic的流程; 本质上跟用Kafka客户端执行删除Topic操作没有什么不同


如果直接删除ZK上的/brokers/topics/{topicName}节点会怎样

TODO…


Controller通知Brokers 执行StopReplica是通知所有的Broker还是只通知跟被删除Topic有关联的Broker?

只是通知跟被删除Topic有关联的Broker;

请看下图源码,可以看到所有需要被StopReplica的副本都是被过滤了一遍,获取它们所在的BrokerId; 最后调用的时候也是sendRequest(brokerId, stopReplicaRequest) ;根据获取到的BrokerId发起的请求

image.png

删除过程有Broker不在线 或者执行失败怎么办

Controller会继续删除操作;或者等Broker上线然后继续删除操作; 反正就是一定会保证所有的分区都被删除(被标记了–delete)之后才会把zk上的数据清理掉;


ReplicaStateMachine 副本状态机

请看 【kafka源码】Controller中的状态机TODO


在重新分配的过程中,如果执行删除操作会怎么样

删除操作会等待,等待重新分配完成之后,继续进行删除操作

image.png


相关文章
|
4天前
|
域名解析 弹性计算 安全
阿里云服务器租用、注册域名、备案及域名解析完整流程参考(图文教程)
对于很多初次建站的用户来说,选购云服务器和注册应及备案和域名解析步骤必须了解的,目前轻量云服务器2核2G68元一年,2核4G4M服务器298元一年,域名注册方面,阿里云推出域名1元购买活动,新用户注册com和cn域名2年首年仅需0元,xyz和top等域名首年仅需1元。对于建站的用户来说,购买完云服务器并注册好域名之后,下一步还需要操作备案和域名绑定。本文为大家展示阿里云服务器的购买流程,域名注册、绑定以及备案的完整流程,全文以图文教程形式为大家展示具体细节及注意事项,以供新手用户参考。
|
21天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
50 12
|
2月前
|
JavaScript 前端开发 开发者
Vue执行流程及渲染解析
【10月更文挑战第2天】
112 58
|
1月前
|
数据采集 JSON 数据格式
深入解析:使用Python爬取Bilibili视频
本文介绍了如何使用Python编写脚本自动化下载Bilibili视频。通过requests等库获取视频和音频URL,使用ffmpeg合并音视频文件,最终实现高效下载。注意遵守网站爬虫政策和法律法规。
214 4
|
1月前
|
Kubernetes 监控 API
深入解析Kubernetes及其在生产环境中的最佳实践
深入解析Kubernetes及其在生产环境中的最佳实践
46 1
|
2月前
|
数据安全/隐私保护 流计算 开发者
python知识点100篇系列(18)-解析m3u8文件的下载视频
【10月更文挑战第6天】m3u8是苹果公司推出的一种视频播放标准,采用UTF-8编码,主要用于记录视频的网络地址。HLS(Http Live Streaming)是苹果公司提出的一种基于HTTP的流媒体传输协议,通过m3u8索引文件按序访问ts文件,实现音视频播放。本文介绍了如何通过浏览器找到m3u8文件,解析m3u8文件获取ts文件地址,下载ts文件并解密(如有必要),最后使用ffmpeg合并ts文件为mp4文件。
|
2月前
|
机器学习/深度学习 编解码 算法
深入解析MaxFrame:关键技术组件及其对视频体验的影响
【10月更文挑战第12天】随着流媒体服务和高清视频内容的普及,用户对于视频质量的要求越来越高。为了满足这些需求,许多技术被开发出来以提升视频播放的质量。其中,MaxFrame是一种旨在通过一系列先进的图像处理算法来优化视频帧的技术。本文将深入探讨构成MaxFrame的核心组件,包括运动估计、超分辨率重建以及时间插值算法,并讨论这些技术如何协同工作以改善视频播放效果。
47 1
|
2月前
|
JavaScript 前端开发 UED
Vue执行流程及渲染解析
【10月更文挑战第5天】
|
2月前
|
存储 搜索推荐 数据库
运用LangChain赋能企业规章制度制定:深入解析Retrieval-Augmented Generation(RAG)技术如何革新内部管理文件起草流程,实现高效合规与个性化定制的完美结合——实战指南与代码示例全面呈现
【10月更文挑战第3天】构建公司规章制度时,需融合业务实际与管理理论,制定合规且促发展的规则体系。尤其在数字化转型背景下,利用LangChain框架中的RAG技术,可提升规章制定效率与质量。通过Chroma向量数据库存储规章制度文本,并使用OpenAI Embeddings处理文本向量化,将现有文档转换后插入数据库。基于此,构建RAG生成器,根据输入问题检索信息并生成规章制度草案,加快更新速度并确保内容准确,灵活应对法律与业务变化,提高管理效率。此方法结合了先进的人工智能技术,展现了未来规章制度制定的新方向。
45 3
|
2月前
|
存储 缓存 边缘计算
揭秘直播带货背后的黑科技:播放流程全解析!
大家好,我是小米,今天聊聊社区直播带货的技术细节。我们将探讨直播播放流程中的关键技术,包括 HTTP DASH 协议、POP(Point of Presence)缓存和一致性哈希算法等。通过这些技术,直播流能根据网络状况动态调整清晰度,保证流畅体验。POP 和 DC 的多层次缓存设计减少了延迟,提升了观看效果。无论是技术人员还是直播运营者,都能从中受益。希望通过本文,你能更好地理解直播背后的技术原理。
55 3

推荐镜像

更多
下一篇
DataWorks