本文分为三节,分别介绍clog的fsync频率,原子操作,与异步提交一致性。
PostgreSQL pg_clog fsync 频率分析
分析一下pg_clog是在什么时候需要调用fsync的?
首先引用wiki里的一段pg_clog的介绍
Some details here are in src/backend/access/transam/README:
1. “pg_clog records the commit status for each transaction that has been assigned an XID.”
2. “Transactions and subtransactions are assigned permanent XIDs only when/if they first do something that requires one — typically, insert/update/delete a tuple, though there are a few other places that need an XID assigned.”
pg_clog is updated only at sub or main transaction end. When the transactionid is assigned the page of the clog that contains that transactionid is checked to see if it already exists and if not, it is initialised.
pg_clog is allocated in pages of 8kB apiece(和BLOCKSZ一致,所以不一定是8K,见后面的分析).
Each transaction needs 2 bits, so on an 8 kB page there is space for 4 transactions/byte * 8k bytes = 32k transactions.
On allocation, pages are zeroed, which is the bit pattern for “transaction in progress”.
So when a transaction starts, it only needs to ensure that the pg_clog page that contains its status is allocated, but it need not write anything to it.
In 8.3 and later, this happens not when the transaction starts, but when the Xid is assigned (i.e. when the transaction first calls a read-write command).
In previous versions it happens when the first snapshot is taken, normally on the first command of any type with very few exceptions.
This means that one transaction in every 32K writing transactions does have to do extra work when it assigns itself an XID, namely create and zero out the next page of pg_clog.
And that doesn’t just slow down the transaction in question, but the next few guys that would like an XID but arrive on the scene while the zeroing-out is still in progress.
This probably contributes to reported behavior that the transaction execution time is subject to unpredictable spikes.
每隔32K个事务,要扩展一个CLOG PAGE,每次扩展需要填充0,同时需要调用PG_FSYNC,这个相比FSYNC XLOG应该是比较轻量级的。但是也可能出现不可预知的响应延迟,因为如果堵塞在扩展CLOG PAGE,所有等待clog PAGE的会话都会受到影响。
这里指当CLOG buffer没有空的SLOT时,会从所有的CLOG buffer SLOT选择一个脏页,将其刷出,这个时候才会产生pg_fsync。
CLOG pages don’t make their way out to disk until the internal CLOG buffers are filled, at which point the least recently used buffer there is evicted to permanent storage.
下面从代码中分析一下pg_clog是如何调用pg_fsync刷脏页的。
每次申请新的事务ID时,都需要调用ExtendCLOG,如果通过事务ID计算得到的CLOG PAGE页不存在,则需要扩展;但是并不是每次扩展都需要调用pg_fsync,因为checkpoint会将clog buffer刷到磁盘,除非在申请新的CLOG PAGE时所有的clog buffer都没有刷出脏页,才需要主动选择一个page并调用pg_fsync刷出对应的pg_clog/file。
src/backend/access/transam/varsup.c
TransactionId
GetNewTransactionId(bool isSubXact)
{
......
ExtendCLOG(xid);
ExtendSUBTRANS(xid);
......
ExtendCLOG(xid)
扩展clog page,调用TransactionIdToPgIndex
计算XID和CLOG_XACTS_PER_PAGE的余数,如果不为0,则不需要扩展。
src/backend/access/transam/clog.c
#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
void
ExtendCLOG(TransactionId newestXact)
{
int pageno;
if (TransactionIdToPgIndex(newestXact) != 0 &&
!TransactionIdEquals(newestXact, FirstNormalTransactionId))
return;
pageno = TransactionIdToPage(newestXact);
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
ZeroCLOGPage(pageno, true);
LWLockRelease(CLogControlLock);
}
ZeroCLOGPage(pageno, true)
,调用SimpleLruZeroPage
,扩展并初始化CLOG PAGE,写XLOG日志。
static int
ZeroCLOGPage(int pageno, bool writeXlog)
{
int slotno;
slotno = SimpleLruZeroPage(ClogCtl, pageno);
if (writeXlog)
WriteZeroPageXlogRec(pageno);
return slotno;
}
SimpleLruZeroPage(ClogCtl, pageno)
,调用SlruSelectLRUPage(ctl, pageno)
,从clog shared buffer中选择SLOT。
src/backend/access/transam/slru.c
int
SimpleLruZeroPage(SlruCtl ctl, int pageno)
{
SlruShared shared = ctl->shared;
int slotno;
slotno = SlruSelectLRUPage(ctl, pageno);
Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
(shared->page_status[slotno] == SLRU_PAGE_VALID &&
!shared->page_dirty[slotno]) ||
shared->page_number[slotno] == pageno);
shared->page_number[slotno] = pageno;
shared->page_status[slotno] = SLRU_PAGE_VALID;
shared->page_dirty[slotno] = true;
SlruRecentlyUsed(shared, slotno);
MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
SimpleLruZeroLSNs(ctl, slotno);
shared->latest_page_number = pageno;
return slotno;
}
SlruSelectLRUPage(SlruCtl ctl, int pageno)
,从clog buffer选择一个空的SLOT,如果没有空的SLOT,则需要调用SlruInternalWritePage(ctl, bestvalidslot, NULL)
,写shared buffer page。
static int
SlruSelectLRUPage(SlruCtl ctl, int pageno)
{
......
先查看clog buffer中是否有空SLOT,有则返回,不需要调pg_fsync
for (slotno = 0; slotno < shared->num_slots; slotno++)
{
if (shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_EMPTY)
return slotno;
}
......
cur_count = (shared->cur_lru_count)++;
for (slotno = 0; slotno < shared->num_slots; slotno++)
{
int this_delta;
int this_page_number;
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
return slotno;
this_delta = cur_count - shared->page_lru_count[slotno];
if (this_delta < 0)
{
shared->page_lru_count[slotno] = cur_count;
this_delta = 0;
}
this_page_number = shared->page_number[slotno];
if (this_page_number == shared->latest_page_number)
continue;
if (shared->page_status[slotno] == SLRU_PAGE_VALID)
{
if (this_delta > best_valid_delta ||
(this_delta == best_valid_delta &&
ctl->PagePrecedes(this_page_number,
best_valid_page_number)))
{
bestvalidslot = slotno;
best_valid_delta = this_delta;
best_valid_page_number = this_page_number;
}
}
else
{
if (this_delta > best_invalid_delta ||
(this_delta == best_invalid_delta &&
ctl->PagePrecedes(this_page_number,
best_invalid_page_number)))
{
bestinvalidslot = slotno;
best_invalid_delta = this_delta;
best_invalid_page_number = this_page_number;
}
}
}
if (best_valid_delta < 0)
{
SimpleLruWaitIO(ctl, bestinvalidslot);
continue;
}
if (!shared->page_dirty[bestvalidslot])
return bestvalidslot;
......
仅仅当以上所有的步骤,都没有找到一个EMPTY SLOT时,才需要主动刷脏页(在SlruInternalWritePage调用pg_fsync)。
SlruInternalWritePage(ctl, bestvalidslot, NULL);
......
SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
,调用SlruPhysicalWritePage
,执行write。
static void
SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
{
......
ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
......
SLRU PAGE状态
typedef enum
{
SLRU_PAGE_EMPTY,
SLRU_PAGE_READ_IN_PROGRESS,
SLRU_PAGE_VALID,
SLRU_PAGE_WRITE_IN_PROGRESS
} SlruPageStatus;
SlruPhysicalWritePage(ctl, pageno, slotno, fdata)
,这里涉及pg_clog相关的SlruCtlData
结构,do_fsync=true。
static bool
SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
SlruFlush fdata);
......
int fd = -1;
......
if (fd < 0)
{
SlruFileName(ctl, path, segno);
fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY,
S_IRUSR | S_IWUSR);
......
if (!fdata)
{
if (ctl->do_fsync && pg_fsync(fd))
{
slru_errcause = SLRU_FSYNC_FAILED;
slru_errno = errno;
CloseTransientFile(fd);
return false;
}
if (CloseTransientFile(fd))
{
slru_errcause = SLRU_CLOSE_FAILED;
slru_errno = errno;
return false;
}
}
ctl->do_fsync
&& pg_fsync(fd)
涉及的代码:
src/include/access/slru.h
typedef struct SlruCtlData
{
SlruShared shared;
bool do_fsync;
bool (*PagePrecedes) (int, int);
char Dir[64];
} SlruCtlData;
typedef SlruCtlData *SlruCtl;
src/backend/access/transam/slru.c
......
void
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLock *ctllock, const char *subdir)
......
ctl->do_fsync = true;
......
以下是clog初始化LRU的调用,可以看到它没有修改do_fsync,所以是TURE。
src/backend/access/transam/clog.c
Size
CLOGShmemBuffers(void)
{
return Min(32, Max(4, NBuffers / 512));
}
void
CLOGShmemInit(void)
{
ClogCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(ClogCtl, "CLOG Ctl", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
CLogControlLock, "pg_clog");
}
以下是subtrans初始化LRU的调用,看到它修改了do_fsync=false。所以subtrans扩展PAGE时不需要调用pg_fsync。
src/backend/access/transam/subtrans.c
void
SUBTRANSShmemInit(void)
{
SubTransCtl->PagePrecedes = SubTransPagePrecedes;
SimpleLruInit(SubTransCtl, "SUBTRANS Ctl", NUM_SUBTRANS_BUFFERS, 0,
SubtransControlLock, "pg_subtrans");
SubTransCtl->do_fsync = false;
}
multixact.c也没有修改do_fsync,所以也是需要fsync的。
MultiXactShmemInit(void)@src/backend/access/transam/multixact.c
pg_fsync代码:
src/backend/storage/file/fd.c
int
pg_fsync(int fd)
{
#if defined(HAVE_FSYNC_WRITETHROUGH) && !defined(FSYNC_WRITETHROUGH_IS_FSYNC)
if (sync_method == SYNC_METHOD_FSYNC_WRITETHROUGH)
return pg_fsync_writethrough(fd);
else
#endif
return pg_fsync_no_writethrough(fd);
}
int
pg_fsync_no_writethrough(int fd)
{
if (enableFsync)
return fsync(fd);
else
return 0;
}
int
pg_fsync_writethrough(int fd)
{
if (enableFsync)
{
#ifdef WIN32
return _commit(fd);
#elif defined(F_FULLFSYNC)
return (fcntl(fd, F_FULLFSYNC, 0) == -1) ? -1 : 0;
#else
errno = ENOSYS;
return -1;
#endif
}
else
return 0;
}
从上面的代码分析,扩展clog page时,如果在CLOG BUFFER中没有EMPTY SLOT,则需要backend process主动刷CLOG PAGE,所以会有调用pg_fsync的动作。
clog page和数据库BLOCKSZ (database block size)一样大,默认是8K(如果编译数据库软件时没有修改的话,默认是8KB),最大可以设置为32KB。每个事务在pg_clog中需要2个比特位来存储事务信息(xmin commit/abort,xmax commit/abort)。所以8K的clog page可以存储32K个事务信息,换句话说,每32K个事务,需要扩展一次clog page。
下面的代码是clog的一些常用宏。
src/backend/access/transam/clog.c
#define CLOG_BITS_PER_XACT 2
#define CLOG_XACTS_PER_BYTE 4
#define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
#define CLOG_XACT_BITMASK ((1 << CLOG_BITS_PER_XACT) - 1)
#define TransactionIdToPage(xid) ((xid) / (TransactionId) CLOG_XACTS_PER_PAGE)
#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
#define TransactionIdToByte(xid) (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
#define TransactionIdToBIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
查看数据库的block size:
postgres@digoal-> pg_controldata |grep block
Database block size: 8192
WAL block size: 8192
我们可以使用stap来跟踪是否调用pg_fsync,如果你要观察backend process主动刷clog 脏页,可以把checkpoint间隔开大,同时把clog shared buffer pages。
你就会观察到backend process主动刷clog 脏页。
Size
CLOGShmemBuffers(void)
{
return Min(32, Max(4, NBuffers / 512));
}
跟踪
src/backend/access/transam/slru.c
SlruPhysicalWritePage
......
SlruFileName(ctl, path, segno);
fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY,
S_IRUSR | S_IWUSR);
......
src/backend/storage/file/fd.c
OpenTransientFile
pg_fsync(fd)
stap脚本
[root@digoal ~]
global f_start[999999]
probe process("/opt/pgsql/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c").call {
f_start[execname(), pid(), tid(), cpu()] = gettimeofday_ms()
printf("%s <- time:%d, pp:%s, par:%s\n", thread_indent(-1), gettimeofday_ms(), pp(), $$parms$$)
}
probe process("/opt/pgsql/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c").return {
t=gettimeofday_ms()
a=execname()
b=cpu()
c=pid()
d=pp()
e=tid()
if (f_start[a,c,e,b]) {
printf("%s <- time:%d, pp:%s, par:%s\n", thread_indent(-1), t - f_start[a,c,e,b], d, $return$$)
}
}
probe process("/opt/pgsql/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c").call {
f_start[execname(), pid(), tid(), cpu()] = gettimeofday_ms()
printf("%s <- time:%d, pp:%s, par:%s\n", thread_indent(-1), gettimeofday_ms(), pp(), $$parms$$)
}
probe process("/opt/pgsql/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c").return {
t=gettimeofday_ms()
a=execname()
b=cpu()
c=pid()
d=pp()
e=tid()
if (f_start[a,c,e,b]) {
printf("%s <- time:%d, pp:%s, par:%s\n", thread_indent(-1), t - f_start[a,c,e,b], d, $return$$)
}
}
probe process("/opt/pgsql/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c").call {
f_start[execname(), pid(), tid(), cpu()] = gettimeofday_ms()
printf("%s <- time:%d, pp:%s, par:%s\n", thread_indent(-1), gettimeofday_ms(), pp(), $$parms$$)
}
probe process("/opt/pgsql/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c").return {
t=gettimeofday_ms()
a=execname()
b=cpu()
c=pid()
d=pp()
e=tid()
if (f_start[a,c,e,b]) {
printf("%s <- time:%d, pp:%s, par:%s\n", thread_indent(-1), t - f_start[a,c,e,b], d, $return$$)
}
}
开启一个pgbench执行txid_current()函数申请新的事务号。
postgres@digoal-> cat 7.sql
select txid_current();
测试,约每秒产生32K左右的请求。
postgres@digoal-> pgbench -M prepared -n -r -P 1 -f ./7.sql -c 1 -j 1 -T 100000
progress: 240.0 s, 31164.4 tps, lat 0.031 ms stddev 0.183
progress: 241.0 s, 33243.3 tps, lat 0.029 ms stddev 0.127
progress: 242.0 s, 32567.3 tps, lat 0.030 ms stddev 0.179
progress: 243.0 s, 33656.6 tps, lat 0.029 ms stddev 0.038
progress: 244.0 s, 33948.1 tps, lat 0.029 ms stddev 0.021
progress: 245.0 s, 32996.8 tps, lat 0.030 ms stddev 0.046
progress: 246.0 s, 34156.7 tps, lat 0.029 ms stddev 0.015
progress: 247.0 s, 33259.5 tps, lat 0.029 ms stddev 0.074
progress: 248.0 s, 32979.6 tps, lat 0.030 ms stddev 0.043
progress: 249.0 s, 32892.6 tps, lat 0.030 ms stddev 0.039
progress: 250.0 s, 33090.7 tps, lat 0.029 ms stddev 0.020
progress: 251.0 s, 33238.3 tps, lat 0.029 ms stddev 0.017
progress: 252.0 s, 32341.3 tps, lat 0.030 ms stddev 0.045
progress: 253.0 s, 31999.0 tps, lat 0.030 ms stddev 0.167
progress: 254.0 s, 33332.6 tps, lat 0.029 ms stddev 0.056
progress: 255.0 s, 30394.6 tps, lat 0.032 ms stddev 0.027
progress: 256.0 s, 31862.7 tps, lat 0.031 ms stddev 0.023
progress: 257.0 s, 31574.0 tps, lat 0.031 ms stddev 0.112
跟踪backend process
postgres@digoal-> ps -ewf|grep postgres
postgres 2921 1883 29 09:37 pts/1 00:00:05 pgbench -M prepared -n -r -P 1 -f ./7.sql -c 1 -j 1 -T 100000
postgres 2924 1841 66 09:37 ? 00:00:13 postgres: postgres postgres [local] SELECT
从日志中抽取pg_clog相关的跟踪结果。
[root@digoal ~]
0 postgres(2924): -> time:1441503927731, pp:process("/opt/pgsql9.4.4/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c:699").call, par:ctl={.shared=0x7f74a9fe39c0, .do_fsync='\001', .PagePrecedes=0x4b1960, .Dir="pg_clog"} pageno=12350 slotno=10 fdata=ERROR
31 postgres(2924): -> time:1441503927731, pp:process("/opt/pgsql9.4.4/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:1710").call, par:fileName="pg_clog/0181" fileFlags=66 fileMode=384
53 postgres(2924): <- time:0, pp:process("/opt/pgsql9.4.4/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:1710").return, par:14
102 postgres(2924): -> time:1441503927731, pp:process("/opt/pgsql9.4.4/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:315").call, par:fd=14
1096 postgres(2924): <- time:1, pp:process("/opt/pgsql9.4.4/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:315").return, par:0
1113 postgres(2924): <- time:1, pp:process("/opt/pgsql9.4.4/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c:699").return, par:'\001'
1105302 postgres(2924): -> time:1441503928836, pp:process("/opt/pgsql9.4.4/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c:699").call, par:ctl={.shared=0x7f74a9fe39c0, .do_fsync='\001', .PagePrecedes=0x4b1960, .Dir="pg_clog"} pageno=12351 slotno=11 fdata=ERROR
1105329 postgres(2924): -> time:1441503928836, pp:process("/opt/pgsql9.4.4/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:1710").call, par:fileName="pg_clog/0181" fileFlags=66 fileMode=384
1105348 postgres(2924): <- time:0, pp:process("/opt/pgsql9.4.4/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:1710").return, par:14
1105405 postgres(2924): -> time:1441503928836, pp:process("/opt/pgsql9.4.4/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:315").call, par:fd=14
1106440 postgres(2924): <- time:1, pp:process("/opt/pgsql9.4.4/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:315").return, par:0
1106452 postgres(2924): <- time:1, pp:process("/opt/pgsql9.4.4/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c:699").return, par:'\001'
2087891 postgres(2924): -> time:1441503929819, pp:process("/opt/pgsql9.4.4/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c:699").call, par:ctl={.shared=0x7f74a9fe39c0, .do_fsync='\001', .PagePrecedes=0x4b1960, .Dir="pg_clog"} pageno=12352 slotno=12 fdata=ERROR
2087917 postgres(2924): -> time:1441503929819, pp:process("/opt/pgsql9.4.4/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:1710").call, par:fileName="pg_clog/0182" fileFlags=66 fileMode=384
2087958 postgres(2924): <- time:0, pp:process("/opt/pgsql9.4.4/bin/postgres").function("OpenTransientFile@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:1710").return, par:14
2088013 postgres(2924): -> time:1441503929819, pp:process("/opt/pgsql9.4.4/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:315").call, par:fd=14
2089250 postgres(2924): <- time:1, pp:process("/opt/pgsql9.4.4/bin/postgres").function("pg_fsync@/opt/soft_bak/postgresql-9.4.4/src/backend/storage/file/fd.c:315").return, par:0
2089265 postgres(2924): <- time:1, pp:process("/opt/pgsql9.4.4/bin/postgres").function("SlruPhysicalWritePage@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/slru.c:699").return, par:'\001'
计算估计,每隔1秒左右会产生一次fsync。
postgres=# select 1441503928836-1441503927731;
?column?
1105
(1 row)
postgres=# select 1441503929819-1441503928836;
?column?
983
(1 row)
前面pgbench的输出看到每秒产生约32000个事务,刚好等于一个clog页的事务数(本例数据块大小为8KB)。
每个事务需要2个比特位,每个字节存储4个事务信息,8192*4=32768。
如果你需要观察backend process不刷clog buffer脏页的情况。可以把checkpoint 间隔改小,或者手动执行checkpoint,同时还需要把clog buffer pages改大,例如:
Size
CLOGShmemBuffers(void)
{
return Min(1024, Max(4, NBuffers / 2));
}
使用同样的stap脚本,你就观察不到backend process主动刷clog dirty page了。
通过以上分析,如果你发现backend process频繁的clog,可以采取一些优化手段。
- 因为每次扩展pg_clog文件后,文件大小都会发生变化,此时如果backend process调用pg_fdatasync也会写文件系统metadata journal(以EXT4为例,假设mount参数data不等于writeback),这个操作是整个文件系统串行的,容易产生堵塞;
所以backend process挑选clog page时,不选择最近的page number可以起到一定的效果,(最好是不选择最近的clog file中的pages);
另一种方法是先调用sync_file_range
, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER,它不需要写metadata。将文件写入后再调用pg_fsync。减少等待data fsync的时间;
- pg_clog文件预分配,目前pg_clog单个文件的大小是由CLOGShmemBuffers决定的,为BLOCKSZ的32倍。可以尝试预分配这个文件,而不是每次都扩展,改变它的大小;
- 延迟backend process 的 fsync请求到checkpoint处理。
[参考]
https://wiki.postgresql.org/wiki/Hint_Bits
http://blog.163.com/digoal@126/blog/static/1638770402015840480734/
src/backend/access/transam/varsup.c
src/backend/access/transam/clog.c
src/backend/access/transam/slru.c
src/include/access/slru.h
src/backend/access/transam/subtrans.c
src/backend/storage/file/fd.c
pg_clog的原子操作与pg_subtrans(子事务)
如果没有子事务,其实很容易保证pg_clog的原子操作,但是,如果加入了子事务并为子事务分配了XID,并且某些子事务XID和父事务的XID不在同一个CLOG PAGE时,保证事务一致性就涉及CLOG的原子写了。
PostgreSQL是通过2PC来实现CLOG的原子写的:
- 首先将主事务以外的CLOG PAGE中的子事务设置为sub-committed状态;
- 然后将主事务所在的CLOG PAGE中的子事务设置为sub-committed,同时设置主事务为committed状态,将同页的子事务设置为committed状态;
- 将其他CLOG PAGE中的子事务设置为committed状态;
src/backend/access/transam/clog.c
实际调用的入口代码在transam.c,subtrans.c中是一些低级接口。
那么什么是subtrans?
当我们使用savepoint时,会产生子事务。子事务和父事务一样,可能消耗XID。一旦为子事务分配了XID,那么就涉及CLOG的原子操作了,因为要保证父事务和所有的子事务的CLOG一致性。
当不消耗XID时,需要通过SubTransactionId来区分子事务。
src/backend/acp:process("/opt/pgsql9.4.4/bin/postgres").function("SubTransSetParent@/opt/soft_bak/postgresql-9.4.4/src/backend/access/transam/subtrans.c:75").return, par:pageno=? entryno=? slotno=607466858 ptr=0
重新开一个会话,你会发现,子事务也消耗了XID。因为重新分配的XID已经从607466859开始了。
postgres@digoal-> psql
psql (9.4.4)
Type "help" for help.
postgres=
txid_current
--------------
607466859
(1 row)
[参考]
src/backend/access/transam/clog.c
src/backend/access/transam/subtrans.c
src/backend/access/transam/transam.c
src/backend/access/transam/README
src/include/c.hr
CLOG一致性和异步提交
异步提交是指不需要等待事务对应的wal buffer fsync到磁盘,即返回,而且写CLOG时也不需要等待XLOG落盘。
而pg_clog和pg_xlog是两部分存储的,那么我们想一想,如果一个已提交事务的pg_clog已经落盘,而XLOG没有落盘,刚好此时数据库CRASH了。数据库恢复时,由于该事务对应的XLOG缺失,数据无法恢复到最终状态,但是PG_CLOG却显示该事务已提交,这就出问题了。
所以对于异步事务,CLOG在write前,务必等待该事务对应的XLOG已经FLUSH到磁盘。
PostgreSQL如何记录事务和它产生的XLOG的LSN的关系呢?
其实不是一一对应的关系,而是记录了多事务对一个LSN的关系。
src/backend/access/transam/clog.c
LSN组,每32个事务,记录它们对应的最大LSN。
也就是32个事务,只记录最大的LSN。节约空间?
#define CLOG_XACTS_PER_LSN_GROUP 32 /* keep this a power of 2 */
每个CLOG页需要分成多少个LSN组。
#define CLOG_LSNS_PER_PAGE (CLOG_XACTS_PER_PAGE / CLOG_XACTS_PER_LSN_GROUP)
#define GetLSNIndex(slotno, xid) ((slotno) * CLOG_LSNS_PER_PAGE + \
((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP)
LSN被存储在这个数据结构中
src/include/access/slru.h
typedef struct SlruSharedData
{
......
XLogRecPtr *group_lsn;
int lsn_groups_per_page;
......
src/backend/access/transam/clog.c
* lsn must be the WAL location of the commit record when recording an async
* commit. For a synchronous commit it can be InvalidXLogRecPtr, since the
* caller guarantees the commit record is already flushed in that case. It
* should be InvalidXLogRecPtr for abort cases, too.
void
TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
{
......
更新事务状态时,同时更新对应LSN组的LSN为最大LSN值。(CLOG BUFFER中的操作)
static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
{
......
if (!XLogR int lsnindex = GetLSNIndex(slotno, xid);
if (ClogCtl->shared->group_lsn[lsnindex] < lsn)
ClogCtl->shared->group_lsn[lsnindex] = lsn;
}
......
将事务标记为commit状态,对于异步事务,多一个LSN参数,用于修改事务组的最大LSN。
void
TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
{
TransactionIdSetTreeStatus(xid, nxids, xids,
TRANSACTION_STATUS_COMMITTED,
InvalidXLogRecPtr);
}
void
TransactionIdAsyncCommitTree(TransactionId xid, int nxids, TransactionId *xids,
XLogRecPtr lsn)
{
TransactionIdSetTreeStatus(xid, nxids, xids,
TRANSACTION_STATUS_COMMITTED, lsn);
}
void
TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids)
{
TransactionIdSetTreeStatus(xid, nxids, xids,
TRANSACTION_STATUS_ABORTED, InvalidXLogRecPtr);
}
从XID号,获取它对应的LSN,需要注意的是,这个XID如果是一个FROZEN XID,则返回一个(XLogRecPtr) invalid lsn。
src/backend/access/transam/transam.c
XLogRecPtr
TransactionIdGetCommitLSN(TransactionId xid)
{
XLogRecPtr result;
if (TransactionIdEquals(xid, cachedFetchXid))
return cachedCommitLSN;
if (!TransactionIdIsNormal(xid))
return InvalidXLogRecPtr;
(void) TransactionIdGetStatus(xid, &result);
return result;
}
XidStatus
TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
{
int pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
int slotno;
int lsnindex;
char *byteptr;
XidStatus status;
slotno = SimpleLruReadPage_ReadOnly(ClogCtl, pageno, xid);
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
lsnindex = GetLSNIndex(slotno, xid);
*lsn = ClogCtl->shared->group_lsn[lsnindex];
LWLockRelease(CLogControlLock);
return status;
}
前面所涉及的都是CLOG BUFFER中的操作,如果要将buffer写到磁盘,则真正需要涉及到一致性的问题,即在将CLOG write到磁盘前,必须先确保对应的事务产生的XLOG已经flush到磁盘。那么这里就需要用到前面每个LSN组中记录的max LSN了。
代码如下:
src/backend/access/transam/slru.c
static bool
SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
{
SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd = -1;
if (shared->group_lsn != NULL)
{
XLogRecPtr max_lsn;
int lsnindex,
lsnoff;
lsnindex = slotno * shared->lsn_groups_per_page;
max_lsn = shared->group_lsn[lsnindex++];
for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
{
XLogRecPtr this_lsn = shared->group_lsn[lsnindex++];
if (max_lsn < this_lsn)
max_lsn = this_lsn;
}
if (!XLogRecPtrIsInvalid(max_lsn))
则。
{
START_CRIT_SECTION();
XLogFlush(max_lsn);
END_CRIT_SECTION();
}
}
......
小结
对于异步事务,如何保证write-WAL-before-data规则?
pg_clog将32个事务分为一组,存储这些事务的最大LSN。存储在SlruSharedData结构中。
在将clog buffer write到磁盘前,需要确保该clog page对应事务的xlog LSN已经flush到磁盘。
[参考]
src/backend/access/transam/clog.c
src/include/access/slru.h
src/backend/access/transam/transam.c
src/backend/access/transam/slru.c