pg_orphaned扩展是用于维护PostgreSQL孤儿文件的扩展,通过分析学习了查找孤儿文件的方法,同时还将学习在PostgreSQL后端(backend)如何查找指定表/视图、如何创建cache、如何使用hash表、如何使用List、如何使用正则表达式、C语言扩展如何返回结果集。
有关孤儿文件的内容可参考:
pg_orphaned扩展实现了pg_list_orphaned、pg_list_orphaned_moved、pg_move_orphaned、pg_remove_moved_orphaned、pg_move_back_orphaned 五个函数:
CREATE FUNCTION pg_list_orphaned( older_than interval default null, OUT dbname text, OUT path text, OUT name text, OUT size bigint, OUT mod_time timestamptz, OUT relfilenode bigint, OUT reloid bigint, OUT older bool)RETURNS SETOF RECORD AS'MODULE_PATHNAME','pg_list_orphaned'LANGUAGE C VOLATILE;CREATE FUNCTION pg_list_orphaned_moved( OUT dbname text, OUT path text, OUT name text, OUT size bigint, OUT mod_time timestamptz, OUT relfilenode bigint, OUT reloid bigint)RETURNS SETOF RECORD AS'MODULE_PATHNAME','pg_list_orphaned_moved'LANGUAGE C VOLATILE;CREATE FUNCTION pg_move_orphaned(older_than interval default null) RETURNS int LANGUAGE c AS'MODULE_PATHNAME','pg_move_orphaned';CREATE FUNCTION pg_remove_moved_orphaned() RETURNS void LANGUAGE c AS'MODULE_PATHNAME','pg_remove_moved_orphaned';CREATE FUNCTION pg_move_back_orphaned() RETURNS int LANGUAGE c AS'MODULE_PATHNAME','pg_move_back_orphaned';revoke execute on function pg_list_orphaned(older_than interval)from public;revoke execute on function pg_list_orphaned_moved()from public;revoke execute on function pg_move_orphaned(older_than interval)from public;revoke execute on function pg_remove_moved_orphaned()from public;revoke execute on function pg_move_back_orphaned()from public;
pg_list_orphaned
pg_list_orphaned函数是整个扩展的核心,返回孤儿文件列表。查找孤儿文件的原理是通过遍历pg缺省表空间目录以及PG_DATA/pg_tblspc目录下的文件(仅查找全数字文件、临时表文件),并逐一在pg_class中查找该条目(通过reltablespace和relfilenode),如果pg_class中不存在该条目,则认定该文件是孤儿文件,查找过程通过 search_orphaned 函数实现。核心函数是RelidByRelfilenodeDirty
search_orphaned
/* 忽略文件名非数字的文件 */if (strstr(de->d_name, "_") ==NULL&&isdigit((unsignedchar) *(de->d_name))) { orph=palloc(sizeof(*orph)); relfilename=strdup(de->d_name); relfilenode= (Oid) strtoul(relfilename, &relfilename, 10); /* 如果RelidByRelfilenodeDirty没有返回有效的oid* 我们就认为此文件是个孤儿文件*/oidrel=RelidByRelfilenodeDirty(reltablespace, relfilenode); /* * 如果是指定relation的第一个segment文件、大小为0并且是在上一次checkpoint之后创建的文件* 我们过滤之并不做为孤儿文件上报* due to https://github.com/postgres/postgres/blob/REL_12_8/src/backend/storage/smgr/md.c#L225*/segment_time=time_t_to_timestamptz(attrib.st_mtime); if (!OidIsValid(oidrel) &&!(attrib.st_size==0&&strstr(de->d_name, ".") ==NULL&&segment_time>last_checkpoint_time)) { orph->dbname=strdup(dbname); orph->path=strdup(dir); orph->name=strdup(de->d_name); orph->size= (int64) attrib.st_size; orph->mod_time=segment_time; orph->relfilenode=relfilenode; orph->reloid=oidrel; *flist=lappend(*flist, orph); /* search for _init and _fsm */if(strstr(de->d_name, ".") ==NULL) pgorph_add_suffix(flist, orph); } }
last_checkpoint_time在pg_build_orphaned_list函数中获取:
/* get a copy of the control file */ControlFile=get_controlfile(".", &crc_ok); ControlFile=get_controlfile(".", NULL, &crc_ok); if (!crc_ok) ereport(ERROR,(errmsg("pg_control CRC value is incorrect"))); /* get last checkpoint time */time_tmp= (time_t) ControlFile->checkPointCopy.time; last_checkpoint_time=time_t_to_timestamptz(time_tmp);
RelidByRelfilenodeDirty
/** Map a relation's (tablespace, filenode) to a relation's oid and cache the* result.** This is the same as the existing RelidByRelfilenode in relfilenodemap.c but* it is done by using a DirtySnapshot as we want to see relation being created.** Returns InvalidOid if no relation matching the criteria could be found.*/OidRelidByRelfilenodeDirty(Oidreltablespace, Oidrelfilenode){...}
RelidByRelfilenodeDirty是核心函数,入口参数reltablespace、relfilenode,分别是表空间oid跟文件oid,首先在cache中查找,如果cache中没有,则去pg_class中找,没找到,则说明是孤儿文件,加入cache中,关键代码如下:
else { /** Not a shared table, could either be a plain relation or a* non-shared, nailed one, like e.g. pg_class.*//* check for plain relations by looking in pg_class *//** RelationRelationId在pg_class_d.h中定义* 值为1259,pg_class的OID* 所以这里打开pg_class*/relation=table_open(RelationRelationId, AccessShareLock); relation=heap_open(RelationRelationId, AccessShareLock); /* copy scankey to local copy, it will be modified during the scan *//* relfilenode_skey_dirty的解释见下文 */memcpy(skey, relfilenode_skey_dirty, sizeof(skey)); /* 设置扫描参数 */skey[0].sk_argument=ObjectIdGetDatum(reltablespace); skey[1].sk_argument=ObjectIdGetDatum(relfilenode); scandesc=systable_beginscan(relation, ClassTblspcRelfilenodeIndexId, true, &DirtySnapshot, 2, skey); found=false; while (HeapTupleIsValid(ntp=systable_getnext(scandesc))) { Form_pg_classclassform= (Form_pg_class) GETSTRUCT(ntp); found=true; Assert(classform->reltablespace==reltablespace); Assert(classform->relfilenode==relfilenode); relid=classform->oid; found=true; relid=HeapTupleGetOid(ntp); } systable_endscan(scandesc); table_close(relation, AccessShareLock); heap_close(relation, AccessShareLock); /* check for tables that are mapped but not shared */if (!found) relid=RelationMapFilenumberToOid(relfilenode, false); relid=RelationMapFilenodeToOid(relfilenode, false); } /** Only enter entry into cache now, our opening of pg_class could have* caused cache invalidations to be executed which would have deleted a* new entry if we had entered it above.*/entry=hash_search(RelfilenodeMapHashDirty, (void*) &key, HASH_ENTER, &found); if (found) elog(ERROR, "corrupted hashtable"); entry->relid=relid;
InitializeRelfilenodeMapDirty
relfilenode_skey_dirty是个全局变是,并在InitializeRelfilenodeMapDirty函数中初始化:
/** Initialize cache, either on first use or after a reset.* Same as InitializeRelfilenodeMap in relfilenodemap.c*/staticvoidInitializeRelfilenodeMapDirty(void) { HASHCTLctl; inti; /* 确保我们已经初始化了CacheMemoryContext. */if (CacheMemoryContext==NULL) CreateCacheMemoryContext(); /* 构造skey */MemSet(&relfilenode_skey_dirty, 0, sizeof(relfilenode_skey_dirty)); /* 我们搜索pg_class时使用了两个键值,tablespace和filenode,所以这里是2 */for (i=0; i<2; i++) { /* 填充FmgrInfo结构 */fmgr_info_cxt(F_OIDEQ, &relfilenode_skey_dirty[i].sk_func, CacheMemoryContext); /* 使用Btree索引相等策略 */relfilenode_skey_dirty[i].sk_strategy=BTEqualStrategyNumber; relfilenode_skey_dirty[i].sk_subtype=InvalidOid; relfilenode_skey_dirty[i].sk_collation=InvalidOid; } /* 设置查找键值 */relfilenode_skey_dirty[0].sk_attno=Anum_pg_class_reltablespace; relfilenode_skey_dirty[1].sk_attno=Anum_pg_class_relfilenode; /* 初始化hash表 */MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize=sizeof(RelfilenodeMapKeyDirty); ctl.entrysize=sizeof(RelfilenodeMapEntryDirty); /* hash表位于CacheMemoryContext */ctl.hcxt=CacheMemoryContext; /** Only create the RelfilenodeMapHashDirty now, so we don't end up partially* initialized when fmgr_info_cxt() above ERRORs out with an out of memory* error.* Note that the hash table is not created in shared memory but in* private memory.*/RelfilenodeMapHashDirty=hash_create("RelfilenodeMap cache", 64, &ctl, HASH_ELEM|HASH_BLOBS|HASH_CONTEXT); /* Watch for invalidation events. */CacheRegisterRelcacheCallback(RelfilenodeMapInvalidateCallbackDirty, (Datum) 0); }
这里hash表用到了RelfilenodeMapKeyDirty、RelfilenodeMapEntryDirty两个结构,分别定义如下:
typedefstruct{ Oidreltablespace; Oidrelfilenode; } RelfilenodeMapKeyDirty; typedefstruct{ RelfilenodeMapKeyDirtykey; /* lookup key - must be first */Oidrelid; /* pg_class.oid */} RelfilenodeMapEntryDirty;
后端hash表的使用
pg_orphaned扩展使用pg后端的hash表实现cache,这里简单总结下如何使用后端提供的hash函数。相关代码在源码的src/backend/utils/hash/dynahash.c中:
HTAB*hash_create(constchar*tabname, longnelem, constHASHCTL*info, intflags); void*hash_search(HTAB*hashp, constvoid*keyPtr, HASHACTIONaction, bool*foundPtr)
hash_create的flag参数常用的值有:
HASH_ELEM - 必须包含此值
HASH_STRINGS、HASH_BLOBS、HASH_FUNCTION - 三都必须包含其一
HASH_CONTEXT - 如果包含此值,表示将hash表分配到info->hcxt指定的内存上下文中,缺省分配hash表到TopMemoryContext
hash_search的HASHACTION常用的值有:
* HASH_FIND: 在表中查找key
* HASH_ENTER: 在表中查找key,如果key不存在则创建
* HASH_ENTER_NULL: 同上,如果内存不足返回NULL
* HASH_REMOVE: 从表中删除指定key
用法见下面的代码片断:
/* Hash table for information about each relfilenode <-> oid pair */staticHTAB*RelfilenodeMapHashDirty=NULL; HASHCTLctl; /* 初始化hash表 */MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize=sizeof(RelfilenodeMapKeyDirty); ctl.entrysize=sizeof(RelfilenodeMapEntryDirty); /* hash表位于CacheMemoryContext */ctl.hcxt=CacheMemoryContext; RelfilenodeMapHashDirty=hash_create("RelfilenodeMap cache", 64, &ctl, HASH_ELEM|HASH_BLOBS|HASH_CONTEXT); /* hash_search */RelfilenodeMapEntryDirty*entry; entry=hash_search(RelfilenodeMapHashDirty, (void*) &key, HASH_FIND, &found);