背景
在spark中,有时候会报出running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.的错误,这种错误的原因有一种隐形的原因,那就是InMemoryFileIndex会缓存需要scan的文件在内存中,
分析
在scan file的过程中,最主要涉及的是CatalogFileIndex类,该类中的方法filterPartitions会创建InMemoryFileIndex:
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { if (table.partitionColumnNames.nonEmpty) { val startTime = System.nanoTime() val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) val partitions = selectedPartitions.map { p => val path = new Path(p.location) val fs = path.getFileSystem(hadoopConf) PartitionPath( p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } val partitionSpec = PartitionSpec(partitionSchema, partitions) val timeNs = System.nanoTime() - startTime new InMemoryFileIndex(sparkSession, rootPathsSpecified = partitionSpec.partitions.map(_.path), parameters = Map.empty, userSpecifiedSchema = Some(partitionSpec.partitionColumns), fileStatusCache = fileStatusCache, userSpecifiedPartitionSpec = Some(partitionSpec), metadataOpsTimeNs = Some(timeNs)) } else { new InMemoryFileIndex(sparkSession, rootPaths, parameters = table.storage.properties, userSpecifiedSchema = None, fileStatusCache = fileStatusCache) } }
而在该InMemoryFileIndex中有成员变量fileStatusCache,该成员变量的赋值通过*FileStatusCache.getOrCreate(sparkSession)*而来:
def getOrCreate(session: SparkSession): FileStatusCache = synchronized { if (session.sqlContext.conf.manageFilesourcePartitions && session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { if (sharedCache == null) { sharedCache = new SharedInMemoryCache( session.sqlContext.conf.filesourcePartitionFileCacheSize, session.sqlContext.conf.metadataCacheTTL ) } sharedCache.createForNewClient() } else { NoopCache } }
而SharedInMemoryCache的会调用guava的CacheBuilder方法:
var builder = CacheBuilder.newBuilder() .weigher(weigher) .removalListener(removalListener) .maximumWeight(maxSizeInBytes / weightScale) if (cacheTTL > 0) { builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) } builder.build[(ClientId, Path), Array[FileStatus]]()
该方法会对最终要scan的文件进行缓存处理,该缓存文件的调用路径如下:
FileSourceScanExec.doExecute => inputRDD => createBucketedReadRDD或者createNonBucketedReadRDD => dynamicallySelectedPartitions => selectedPartitions || \/ CatalogFileIndex.listFiles => filterPartitions || \/ InMemoryFileIndex.listFiles => partitionSpec => inferPartitioning => leafDirToChildrenFiles
其中cachedLeafDirToChildrenFiles的值会在InMemoryFileIndex对象初始化的时候进行赋值,对应的方法为refresh0:
private def refresh0(): Unit = { val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) cachedPartitionSpec = null }
listLeafFiles方法就会对调用fileStatusCache.getLeafFile从而获取到缓存的文件路径
结论
文中提到的 REFRESH TABLE tableName’ command in SQL错误,在两个不同的jvm进程中对于同一个表的读写中回经常出现,这里有三个参数可以去设置:
spark.sql.hive.filesourcePartitionFileCacheSize spark.sql.hive.manageFilesourcePartitions spark.sql.metadataCacheTTLSeconds
其他
1.那为什么在同一个jvm中对一个表的读写不会报这种错误呢?那是因为,在一个jvm中,比如说是写了之后再读取,会进行refresh操作,从而调用fileStatusCache.invalidateAll()方法,最终使文件缓存失效.
拿SQL:insert overwrite table a举例,该语句最终会生成InsertIntoHadoopFsRelationCommand物理计划,该run方法最终会调用fileIndex的refresh方法,从而调用*fileStatusCache.invalidateAll()*方法:
2.spark.sql.metadataCacheTTLSeconds还有其他的作用
在开启spark.sql.hive.convertMetastoreParquet或者spark.sql.hive.convertMetastoreOrc的情况下,在转换对应的逻辑计划当中,如果缓存中存在对应的表,则会复用缓存中的,具体的方法在HiveMetastoreCatalog.convertToLogicalRelation 中 ,最主要的点是会公用同一个CatalogFileIndex对象,从而实现了文件的复用,从未导致问题