PostgreSQL 的pg_buffercache 代码研究

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
简介:

pg_buffercache 代码位于 contrib 目录,总体上代码量200多行。

刚接触,感觉直接访问PostgreSQL 中的内存结构很神奇,特意学习了一下。

复制代码
/*-------------------------------------------------------------------------                        
 *                        
 * pg_buffercache_pages.c                        
 *      display some contents of the buffer cache                    
 *                        
 *      contrib/pg_buffercache/pg_buffercache_pages.c                    
 *-------------------------------------------------------------------------                        
 */                        
#include "postgres.h"                        
                        
#include "catalog/pg_type.h"                        
#include "funcapi.h"                        
#include "storage/buf_internals.h"                        
#include "storage/bufmgr.h"                        
                        
                        
#define NUM_BUFFERCACHE_PAGES_ELEM    8                    
                        
PG_MODULE_MAGIC;                        
                        
Datum    pg_buffercache_pages(PG_FUNCTION_ARGS);                    
                        
                        
/*                        
 * Record structure holding the to be exposed cache data.                        
 */                        
typedef struct                        
{                        
    uint32        bufferid;            
    Oid        relfilenode;            
    Oid        reltablespace;            
    Oid        reldatabase;            
    ForkNumber        forknum;            
    BlockNumber         blocknum;            
    bool        isvalid;            
    bool        isdirty;            
    uint16        usagecount;            
} BufferCachePagesRec;                        
                        
                        
/*                        
 * Function context for data persisting over repeated calls.                        
 */                        
typedef struct                        
{                        
    TupleDesc    tupdesc;                
    BufferCachePagesRec *record;                    
} BufferCachePagesContext;                        
                        
                        
/*                        
 * Function returning data from the shared buffer cache - buffer number,                        
 * relation node/tablespace/database/blocknum and dirty indicator.                        
 */                        
PG_FUNCTION_INFO_V1(pg_buffercache_pages);                        
                        
Datum                        
pg_buffercache_pages(PG_FUNCTION_ARGS)                        
{                        
    FuncCallContext *funcctx;                    
    Datum        result;            
    MemoryContext oldcontext;                    
    BufferCachePagesContext *fctx;                /* User function context. */    
    TupleDesc        tupledesc;            
    HeapTuple        tuple;            
                        
    if (SRF_IS_FIRSTCALL())                    
    {                    
        int    i;            
        volatile BufferDesc *bufHdr;                
                        
        funcctx = SRF_FIRSTCALL_INIT();                
                        
        /* Switch context when allocating stuff to be used in later calls */                
        oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);                
                        
        /* Create a user function context for cross-call persistence */                
        fctx = (BufferCachePagesContext *) palloc(sizeof(BufferCachePagesContext));                
                        
        /* Construct a tuple descriptor for the result rows. */                
        tupledesc = CreateTemplateTupleDesc(NUM_BUFFERCACHE_PAGES_ELEM, false);                
        TupleDescInitEntry(tupledesc, (AttrNumber) 1, "bufferid",                
                           INT4OID, -1, 0);
        TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenode",                
                           OIDOID, -1, 0);
        TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reltablespace",                
                           OIDOID, -1, 0);
        TupleDescInitEntry(tupledesc, (AttrNumber) 4, "reldatabase",                
                           OIDOID, -1, 0);
        TupleDescInitEntry(tupledesc, (AttrNumber) 5, "relforknumber",                
                           INT2OID, -1, 0);
        TupleDescInitEntry(tupledesc, (AttrNumber) 6, "relblocknumber",                
                           INT8OID, -1, 0);
        TupleDescInitEntry(tupledesc, (AttrNumber) 7, "isdirty",                
                           BOOLOID, -1, 0);
        TupleDescInitEntry(tupledesc, (AttrNumber) 8, "usage_count",                
                           INT2OID, -1, 0);
                        
        fctx->tupdesc = BlessTupleDesc(tupledesc);                
                        
        /* Allocate NBuffers worth of BufferCachePagesRec records. */                
        fctx->record = (BufferCachePagesRec *) palloc(sizeof(BufferCachePagesRec) * NBuffers);                
                        
        /* Set max calls and remember the user function context. */                
        funcctx->max_calls = NBuffers;                
        funcctx->user_fctx = fctx;                
                        
        /* Return to original context when allocating transient memory */                
        MemoryContextSwitchTo(oldcontext);                
                        
        /*                
         * To get a consistent picture of the buffer state, we must lock all                
         * partitions of the buffer map.  Needless to say, this is horrible                
         * for concurrency.  Must grab locks in increasing order to avoid                
         * possible deadlocks.                
         */                
        for (i = 0; i < NUM_BUFFER_PARTITIONS; i++)                
            LWLockAcquire(FirstBufMappingLock + i, LW_SHARED);            
                        
        /*                
         * Scan though all the buffers, saving the relevant fields in the                
         * fctx->record structure.                
         */                
        for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)                
        {                
            /* Lock each buffer header before inspecting. */            
            LockBufHdr(bufHdr);            
                        
            fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr);            
            fctx->record[i].relfilenode = bufHdr->tag.rnode.relNode;            
            fctx->record[i].reltablespace = bufHdr->tag.rnode.spcNode;            
            fctx->record[i].reldatabase = bufHdr->tag.rnode.dbNode;            
            fctx->record[i].forknum = bufHdr->tag.forkNum;            
            fctx->record[i].blocknum = bufHdr->tag.blockNum;            
            fctx->record[i].usagecount = bufHdr->usage_count;            
                        
            if (bufHdr->flags & BM_DIRTY)            
                fctx->record[i].isdirty = true;        
            else            
                fctx->record[i].isdirty = false;        
                        
            /* Note if the buffer is valid, and has storage created */            
            if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_TAG_VALID))            
                fctx->record[i].isvalid = true;        
            else            
                fctx->record[i].isvalid = false;        
                        
            UnlockBufHdr(bufHdr);            
        }                
                        
        /*                
         * And release locks.  We do this in reverse order for two reasons:                
         * (1) Anyone else who needs more than one of the locks will be trying                
         * to lock them in increasing order; we don't want to release the                
         * other process until it can get all the locks it needs. (2) This                
         * avoids O(N^2) behavior inside LWLockRelease.                
         */                
        for (i = NUM_BUFFER_PARTITIONS; --i >= 0;)                
            LWLockRelease(FirstBufMappingLock + i);            
    }                    
                        
    funcctx = SRF_PERCALL_SETUP();                    
                        
    /* Get the saved state */                    
    fctx = funcctx->user_fctx;                    
                        
    if (funcctx->call_cntr < funcctx->max_calls)                    
    {                    
        uint32        i = funcctx->call_cntr;        
        Datum        values[NUM_BUFFERCACHE_PAGES_ELEM];        
        bool        nulls[NUM_BUFFERCACHE_PAGES_ELEM];        
                        
        values[0] = Int32GetDatum(fctx->record[i].bufferid);                
        nulls[0] = false;                
                        
        /*                
         * Set all fields except the bufferid to null if the buffer is unused                
         * or not valid.                
         */                
        if (fctx->record[i].blocknum == InvalidBlockNumber ||                
            fctx->record[i].isvalid == false)            
        {                
            nulls[1] = true;            
            nulls[2] = true;            
            nulls[3] = true;            
            nulls[4] = true;            
            nulls[5] = true;            
            nulls[6] = true;            
            nulls[7] = true;            
        }                
        else                
        {                
            values[1] = ObjectIdGetDatum(fctx->record[i].relfilenode);            
            nulls[1] = false;            
            values[2] = ObjectIdGetDatum(fctx->record[i].reltablespace);            
            nulls[2] = false;            
            values[3] = ObjectIdGetDatum(fctx->record[i].reldatabase);            
            nulls[3] = false;            
            values[4] = ObjectIdGetDatum(fctx->record[i].forknum);            
            nulls[4] = false;            
            values[5] = Int64GetDatum((int64) fctx->record[i].blocknum);            
            nulls[5] = false;            
            values[6] = BoolGetDatum(fctx->record[i].isdirty);            
            nulls[6] = false;            
            values[7] = Int16GetDatum(fctx->record[i].usagecount);            
            nulls[7] = false;            
        }                
                        
        /* Build and return the tuple. */                
        tuple = heap_form_tuple(fctx->tupdesc, values, nulls);                
        result = HeapTupleGetDatum(tuple);                
                        
        SRF_RETURN_NEXT(funcctx, result);                
    }                                
        SRF_RETURN_DONE(funcctx);                
           
}                 
复制代码

 我的看法是这样的:

 官方给的例子是这样说的: 

复制代码
Datum
my_set_returning_function(PG_FUNCTION_ARGS)
{
  FuncCallContext *funcctx;
  Datum result;
  MemoryContext oldcontext;
  further declarations as needed if (SRF_IS_FIRSTCALL()) {
    funcctx = SRF_FIRSTCALL_INIT();
    oldcontext =
        MemoryContextSwitchTo(funcctx->
                              multi_call_memory_ctx);
    /* One-time setup code appears here: */
    <<user code>>
    <<if returning composite>>
    <<build TupleDesc, and perhaps AttInMetadata>>
    <<endif returning composite>>
    <<user code>>
    MemoryContextSwitchTo(oldcontext);
  }

  /* Each-time setup code appears here: */
  <<user code>>
  funcctx = SRF_PERCALL_SETUP();
  <<user code>>

  /* this is just one way we might test whether we are
     done: */
  if (funcctx->call_cntr < funcctx->max_calls) {
    /* Here we want to return another item: */
    <<user code>>
    <<obtain result Datum>>
    SRF_RETURN_NEXT(funcctx, result);
  } else {
    /* Here we are done returning items and just need to
       clean up: */
    <<user code>>
    SRF_RETURN_DONE(funcctx);
  }
}
复制代码

 对于其中 的 call_cntr 和 max_calls ,不是太理解。原来想,是否一次就该结束了。但是验证的结果是:其实还是会被调用多次。

[作者:技术者高健@博客园  mail: luckyjackgao@gmail.com]

修改代码后部为这个样子:

复制代码
……                            
if (funcctx->call_cntr < funcctx->max_calls)                            
{                            
    fprintf(stderr,"!!!!!call_cntr is smaller than max_calls");                        
    ……                        
    SRF_RETURN_NEXT(funcctx, result);                        
}                            
else{                            
    fprintf(stderr,"call_cntr is not smaller than max_calls");                        
    SRF_RETURN_DONE(funcctx);                        
}                            
                            
复制代码

运行结果:出现多次  !!!!!call_cntr is smaller than max_calls , 最后出现一次 call_cntr smaller than max_calls

哪怕是 我用  select  bufferid from pg_buffercache limit 1; 也是一样的效果。

再次测试,可以 发现, max_calls 居然为4096,也可以说, 我们在 psql 中对 pg_buffercache 的 一次普通查询,在后台进行了4096次运转。好古怪!

如果psql 中运行  select count(*) from pg_buffercache; 得出结果正好是 4096。

然后我们再看 NBuffers 的值,在程序中 下列循环之前,打印 NBuffers的值,发现只执行一次:值也是 4096

/* 
* Scan though all the buffers, saving the relevant fields in the
* fctx->record structure.
*/
for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
{

   ...

}

那么可以说:

  /* Each-time setup code appears here: */
  <<user code>>
  funcctx = SRF_PERCALL_SETUP();
  <<user code>>

之前的代码,只执行一次。后面的代码,虽然我们没有写循环,但是会循环(因为 SRF_RETURN_NEXT)直到我们的代码执行了 SRF_RETURN_DONE为止。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
11月前
|
SQL 网络协议 关系型数据库
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(一):CN代码结构
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(一):CN代码结构
5008 0
|
11月前
|
SQL 存储 监控
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(三):CDC代码结构
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(三):CDC代码结构
5396 0
|
资源调度 关系型数据库 Shell
手把手教你如何参与开源项目的协作、贡献代码: 以PolarDB开源项目为例
开源协作是一种社会进化的体现吗? 昨天体验了一下ChatGPT, 对这几个回答深有感触, 开源协作一定是未来会长期存在的, 更大规模化的人类协作模式. 所以我想写一点东西, 来帮助更多人参与开源协作.
1223 3
手把手教你如何参与开源项目的协作、贡献代码: 以PolarDB开源项目为例
|
关系型数据库 分布式数据库 数据库
PolarDB-X开源代码更名公告
PolarDB-X开源以来一直和开源社区保持着良性的互动,根据社区开发者和用户的真实反馈,同时为了更好的服务广大用户和开发者,PolarDB-X决定优化开源代码仓库的地址与名称,新地址和新名称会更加便于理解与记忆。虽然名称做了改变,但是PolarDB-X开源的初心不会改变,会持续向社区输出卓越的分布式数据库技术,和广大用户与开发者共同创造国产数据库的美好未来。
|
弹性计算 关系型数据库 PostgreSQL
PostgreSQL PostGIS 性能提升 - by new GEOS代码
标签 PostgreSQL , PostGIS , geos 背景 http://lin-ear-th-inking.blogspot.com/2019/02/betterfaster-stpointonsurface-for.html 使用GEOS新的代码,提升PostGIS重计算的函数性能。 The improved ST_PointOnSurface runs 13 times
720 0
|
关系型数据库 PolarDB 分布式数据库
“关键代码”——POLARDB大赛冠军吐血整理,你盘吗【下篇】
“上篇讲到:yche针对落幕不久的第一届POLARDB数据库性能大赛整理了一套详细的比赛攻略,(赛题:实现一个简化、高效的kv存储引擎,支持Write、Read以及Range接口),那么今天小天就要贡献出最核心的部分——关键代码~”
665 0
|
关系型数据库 物联网 PostgreSQL
PostgreSQL技术周刊第16期:PostgreSQL 优化器代码概览
PostgreSQL(简称PG)的开发者们:云栖社区已有5000位PG开发者,发布了3000+PG文章(文章列表),沉淀了700+的PG精品问答(问答列表)。 PostgreSQL技术周刊会为大家介绍最新的PG技术与动态、预告活动、最热问答、直播教程等,欢迎大家订阅PostgreSQL技术周刊。
3503 0
|
SQL 算法 关系型数据库
PostgreSQL 优化器代码概览
## 简介 PostgreSQL 的开发源自上世纪80年代,它最初是 Michael Stonebraker 等人在美国国防部支持下创建的POSTGRE项目。上世纪末,Andrew Yu 等人在它上面搭建了第一个SQL Parser,这个版本称为Postgre95,也是加州大学伯克利分校版本的PostgreSQL的基石[1]。
1570 0
|
关系型数据库 PostgreSQL