SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题

简介: SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题

背景


在spark中,有时候会报出running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.的错误,这种错误的原因有一种隐形的原因,那就是InMemoryFileIndex会缓存需要scan的文件在内存中,


分析


在scan file的过程中,最主要涉及的是CatalogFileIndex类,该类中的方法filterPartitions会创建InMemoryFileIndex

def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
    if (table.partitionColumnNames.nonEmpty) {
      val startTime = System.nanoTime()
      val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
        table.identifier, filters)
      val partitions = selectedPartitions.map { p =>
        val path = new Path(p.location)
        val fs = path.getFileSystem(hadoopConf)
        PartitionPath(
          p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
          path.makeQualified(fs.getUri, fs.getWorkingDirectory))
      }
      val partitionSpec = PartitionSpec(partitionSchema, partitions)
      val timeNs = System.nanoTime() - startTime
      new InMemoryFileIndex(sparkSession,
        rootPathsSpecified = partitionSpec.partitions.map(_.path),
        parameters = Map.empty,
        userSpecifiedSchema = Some(partitionSpec.partitionColumns),
        fileStatusCache = fileStatusCache,
        userSpecifiedPartitionSpec = Some(partitionSpec),
        metadataOpsTimeNs = Some(timeNs))
    } else {
      new InMemoryFileIndex(sparkSession, rootPaths, parameters = table.storage.properties,
        userSpecifiedSchema = None, fileStatusCache = fileStatusCache)
    }
  }

而在该InMemoryFileIndex中有成员变量fileStatusCache,该成员变量的赋值通过*FileStatusCache.getOrCreate(sparkSession)*而来:

def getOrCreate(session: SparkSession): FileStatusCache = synchronized {
    if (session.sqlContext.conf.manageFilesourcePartitions &&
      session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
      if (sharedCache == null) {
        sharedCache = new SharedInMemoryCache(
          session.sqlContext.conf.filesourcePartitionFileCacheSize,
          session.sqlContext.conf.metadataCacheTTL
        )
      }
      sharedCache.createForNewClient()
    } else {
      NoopCache
    }
  }

SharedInMemoryCache的会调用guavaCacheBuilder方法:

var builder = CacheBuilder.newBuilder()
      .weigher(weigher)
      .removalListener(removalListener)
      .maximumWeight(maxSizeInBytes / weightScale)
    if (cacheTTL > 0) {
      builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
    }
    builder.build[(ClientId, Path), Array[FileStatus]]()

该方法会对最终要scan的文件进行缓存处理,该缓存文件的调用路径如下:

  FileSourceScanExec.doExecute => inputRDD => createBucketedReadRDD或者createNonBucketedReadRDD => dynamicallySelectedPartitions => selectedPartitions
   ||
   \/
   CatalogFileIndex.listFiles => filterPartitions
   ||
   \/
   InMemoryFileIndex.listFiles => partitionSpec => inferPartitioning => leafDirToChildrenFiles

其中cachedLeafDirToChildrenFiles的值会在InMemoryFileIndex对象初始化的时候进行赋值,对应的方法为refresh0

private def refresh0(): Unit = {
   val files = listLeafFiles(rootPaths)
   cachedLeafFiles =
     new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
   cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
   cachedPartitionSpec = null
 }

listLeafFiles方法就会对调用fileStatusCache.getLeafFile从而获取到缓存的文件路径


结论


文中提到的 REFRESH TABLE tableName’ command in SQL错误,在两个不同的jvm进程中对于同一个表的读写中回经常出现,这里有三个参数可以去设置:

spark.sql.hive.filesourcePartitionFileCacheSize
spark.sql.hive.manageFilesourcePartitions
spark.sql.metadataCacheTTLSeconds

其他


1.那为什么在同一个jvm中对一个表的读写不会报这种错误呢?那是因为,在一个jvm中,比如说是写了之后再读取,会进行refresh操作,从而调用fileStatusCache.invalidateAll()方法,最终使文件缓存失效.


拿SQL:insert overwrite table a举例,该语句最终会生成InsertIntoHadoopFsRelationCommand物理计划,该run方法最终会调用fileIndex的refresh方法,从而调用*fileStatusCache.invalidateAll()*方法:


2.spark.sql.metadataCacheTTLSeconds还有其他的作用


在开启spark.sql.hive.convertMetastoreParquet或者spark.sql.hive.convertMetastoreOrc的情况下,在转换对应的逻辑计划当中,如果缓存中存在对应的表,则会复用缓存中的,具体的方法在HiveMetastoreCatalog.convertToLogicalRelation 中 ,最主要的点是会公用同一个CatalogFileIndex对象,从而实现了文件的复用,从未导致问题

相关文章
|
2月前
|
缓存 NoSQL Linux
【Azure Redis 缓存】Windows和Linux系统本地安装Redis, 加载dump.rdb中数据以及通过AOF日志文件追加数据
【Azure Redis 缓存】Windows和Linux系统本地安装Redis, 加载dump.rdb中数据以及通过AOF日志文件追加数据
【Azure Redis 缓存】Windows和Linux系统本地安装Redis, 加载dump.rdb中数据以及通过AOF日志文件追加数据
|
2月前
|
存储 缓存 NoSQL
【Azure Redis 缓存 Azure Cache For Redis】如何设置让Azure Redis中的RDB文件暂留更久(如7天)
【Azure Redis 缓存 Azure Cache For Redis】如何设置让Azure Redis中的RDB文件暂留更久(如7天)
|
2月前
|
缓存 NoSQL Redis
【Azure Redis 缓存】Azure Cache for Redis 服务的导出RDB文件无法在自建的Redis服务中导入
【Azure Redis 缓存】Azure Cache for Redis 服务的导出RDB文件无法在自建的Redis服务中导入
|
2月前
|
缓存 NoSQL 算法
【Azure Redis 缓存】Redis导出数据文件变小 / 在新的Redis复原后数据大小压缩近一倍问题分析
【Azure Redis 缓存】Redis导出数据文件变小 / 在新的Redis复原后数据大小压缩近一倍问题分析
|
2月前
|
SQL 缓存 监控
实时计算 Flink版产品使用问题之怎么手动清理缓存或废弃文件
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
存储 缓存 API
file_cache: 使用文件缓存函数结果
file_cache: 使用文件缓存函数结果
52 15
|
4月前
|
分布式计算 监控 大数据
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
|
4月前
|
缓存 运维 Devops
阿里云云效操作报错合集之在构建过程中,Docker尝试从缓存中获取某个文件(或计算缓存键)时遇到了问题,该如何处理
本合集将整理呈现用户在使用过程中遇到的报错及其对应的解决办法,包括但不限于账户权限设置错误、项目配置不正确、代码提交冲突、构建任务执行失败、测试环境异常、需求流转阻塞等问题。阿里云云效是一站式企业级研发协同和DevOps平台,为企业提供从需求规划、开发、测试、发布到运维、运营的全流程端到端服务和工具支撑,致力于提升企业的研发效能和创新能力。
|
5月前
|
canal 缓存 NoSQL
【后端面经】【缓存】33|缓存模式:缓存模式能不能解决缓存一致性问题?-03 Refresh Ahead + SingleFlight + 删除缓存 + 延迟双删
【5月更文挑战第11天】Refresh Ahead模式通过CDC异步刷新缓存,但面临缓存一致性问题,可借鉴Write Back策略解决。SingleFlight限制并发加载,减少数据库压力,适合热点数据。删除缓存模式在更新数据库后删除缓存,一致性问题源于读写线程冲突。延迟双删模式两次删除,理论上减少不一致,但可能降低缓存命中率。选用模式需权衡优劣,延迟双删在低并发下较优。装饰器模式可用于实现多种缓存模式,无侵入地增强现有缓存系统。
115 2
|
5月前
|
缓存 数据处理 Python
python读取文件到缓存
python读取文件到缓存
63 1