背景:
最近碰到一个问题,PG数据库CPU比较高,但是当时业务是低峰期。
监控
实例监控
实例16核,8核用在walsender上面,8个slot进行cdc,所以CPU使用率800%,总CPU 50%:
(8 rows)
等待事件:ReorderBufferWrite 等在写数据上面
postgres=# select * from pg_stat_activity where pid = 14064 limit 1;
application_name | PostgreSQL JDBC Driver
wait_event_type | IO
wait_event | ReorderBufferWrite
state | active
backend_type | walsender
并且walsender IOPS高到30w。
磁盘使用率增加,而后又恢复,整体增加了20G。
主机监控
CPU都花在内核态。
云盘吞吐量,近100MB/s
排查:
排查CPU高的原因,最好能抓到现场的堆栈,因为该问题是第二次发生,所以直接现场抓到了当时walsender进程的堆栈:抓了几个进程,并且抓了几次,当时堆栈都是一样。
#pstack 14064#0 0x00007f7b3e97f6e0 in __write_nocancel () from /lib64/libpthread.so.0#1 0x00000000007b4a28 in ReorderBufferSerializeChange (txn=0x2989da8, txn=0x2989da8, change=0x9601aa8, fd=16, rb=0x2157dc8) at reorderbuffer.c:2653#2 ReorderBufferSerializeTXN (rb=rb@entry=0x2157dc8, txn=txn@entry=0x2989da8) at reorderbuffer.c:2468#3 0x00000000007b5a1e in ReorderBufferCheckMemoryLimit (rb=0x2157dc8) at reorderbuffer.c:2392#4 ReorderBufferQueueChange (rb=0x2157dc8, xid=<optimized out>, lsn=7790948496384, change=change@entry=0x96186f8) at reorderbuffer.c:649#5 0x00000000007a9c19 in DecodeTruncate (buf=<optimized out>, buf=<optimized out>, ctx=<optimized out>) at decode.c:879#6 DecodeHeapOp (buf=0x7ffdd4bd8ba0, ctx=0x206d5f8) at decode.c:459#7 LogicalDecodingProcessRecord (ctx=0x206d5f8, record=<optimized out>) at decode.c:129#8 0x00000000007d109f in XLogSendLogical () at walsender.c:2961#9 0x00000000007d4475 in WalSndLoop (send_data=0x7d1050 <XLogSendLogical>) at walsender.c:2341#10 StartLogicalReplication (cmd=<optimized out>) at walsender.c:1224#11 exec_replication_command (cmd_string=cmd_string@entry=0x203fae8 "START_REPLICATION SLOT kk_cdc_kkcx_slot LOGICAL 6DE/BEAA8058 (\"proto_version\" '1', \"publication_names\" 'cdc_kkcx')") at walsender.c:1656#12 0x000000000082957d in PostgresMain (argc=<optimized out>, argv=argv@entry=0x2086f38, dbname=0x2086da8 "melotall", username=<optimized out>) at postgres.c:4447#13 0x0000000000489dbd in BackendRun (port=<optimized out>, port=<optimized out>) at postmaster.c:4564#14 BackendStartup (port=0x2078d70) at postmaster.c:4248#15 ServerLoop () at postmaster.c:1741#16 0x00000000007945c9 in PostmasterMain (argc=argc@entry=3, argv=argv@entry=0x203a650) at postmaster.c:1414#17 0x000000000048bd1d in main (argc=3, argv=0x203a650) at main.c:210
我们可以确定,CPU当时就花在了执行ReorderBufferSerializeChange 这个函数上面,通过源码看下当时主要通过write系统调用,写数据到磁盘上,所以主机监控上看到的CPU使用率主要在system_CPU上。
首先ReorderBufferCheckMemoryLimit,会检查recorderbuffer的内存是不是超过了logical_decoding_work_mem 这个参数的内存限制,如果超过了就会把记录写到磁盘上。
ReorderBufferSerializeChange该函数比较简单,并且只有ReorderBufferSerializeTXN这个函数才会调用:
staticvoidReorderBufferSerializeChange(ReorderBuffer*rb, ReorderBufferTXN*txn, intfd, ReorderBufferChange*change) { ReorderBufferDiskChange*ondisk; Sizesz=sizeof(ReorderBufferDiskChange); ReorderBufferSerializeReserve(rb, sz); ondisk= (ReorderBufferDiskChange*) rb->outbuf; memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); switch (change->action) { /* fall through these, they're all similar enough */caseREORDER_BUFFER_CHANGE_INSERT: caseREORDER_BUFFER_CHANGE_UPDATE: caseREORDER_BUFFER_CHANGE_DELETE: caseREORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: { char*data; ReorderBufferTupleBuf*oldtup, *newtup; Sizeoldlen=0; Sizenewlen=0; oldtup=change->data.tp.oldtuple; newtup=change->data.tp.newtuple; if (oldtup) { sz+=sizeof(HeapTupleData); oldlen=oldtup->tuple.t_len; sz+=oldlen; } if (newtup) { sz+=sizeof(HeapTupleData); newlen=newtup->tuple.t_len; sz+=newlen; } /* make sure we have enough space */ReorderBufferSerializeReserve(rb, sz); data= ((char*) rb->outbuf) +sizeof(ReorderBufferDiskChange); /* might have been reallocated above */ondisk= (ReorderBufferDiskChange*) rb->outbuf; if (oldlen) { memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); data+=sizeof(HeapTupleData); memcpy(data, oldtup->tuple.t_data, oldlen); data+=oldlen; } if (newlen) { memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); data+=sizeof(HeapTupleData); memcpy(data, newtup->tuple.t_data, newlen); data+=newlen; } break; } caseREORDER_BUFFER_CHANGE_MESSAGE: { char*data; Sizeprefix_size=strlen(change->data.msg.prefix) +1; sz+=prefix_size+change->data.msg.message_size+sizeof(Size) +sizeof(Size); ReorderBufferSerializeReserve(rb, sz); data= ((char*) rb->outbuf) +sizeof(ReorderBufferDiskChange); /* might have been reallocated above */ondisk= (ReorderBufferDiskChange*) rb->outbuf; /* write the prefix including the size */memcpy(data, &prefix_size, sizeof(Size)); data+=sizeof(Size); memcpy(data, change->data.msg.prefix, prefix_size); data+=prefix_size; /* write the message including the size */memcpy(data, &change->data.msg.message_size, sizeof(Size)); data+=sizeof(Size); memcpy(data, change->data.msg.message, change->data.msg.message_size); data+=change->data.msg.message_size; break; } caseREORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshotsnap; char*data; snap=change->data.snapshot; sz+=sizeof(SnapshotData) +sizeof(TransactionId) *snap->xcnt+sizeof(TransactionId) *snap->subxcnt; /* make sure we have enough space */ReorderBufferSerializeReserve(rb, sz); data= ((char*) rb->outbuf) +sizeof(ReorderBufferDiskChange); /* might have been reallocated above */ondisk= (ReorderBufferDiskChange*) rb->outbuf; memcpy(data, snap, sizeof(SnapshotData)); data+=sizeof(SnapshotData); if (snap->xcnt) { memcpy(data, snap->xip, sizeof(TransactionId) *snap->xcnt); data+=sizeof(TransactionId) *snap->xcnt; } if (snap->subxcnt) { memcpy(data, snap->subxip, sizeof(TransactionId) *snap->subxcnt); data+=sizeof(TransactionId) *snap->subxcnt; } break; } caseREORDER_BUFFER_CHANGE_TRUNCATE: { Sizesize; char*data; /* account for the OIDs of truncated relations */size=sizeof(Oid) *change->data.truncate.nrelids; sz+=size; /* make sure we have enough space */ReorderBufferSerializeReserve(rb, sz); data= ((char*) rb->outbuf) +sizeof(ReorderBufferDiskChange); /* might have been reallocated above */ondisk= (ReorderBufferDiskChange*) rb->outbuf; memcpy(data, change->data.truncate.relids, size); data+=size; break; } caseREORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: caseREORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: caseREORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: caseREORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */break; } ondisk->size=sz; errno=0; pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); if (write(fd, rb->outbuf, ondisk->size) !=ondisk->size) { intsave_errno=errno; CloseTransientFile(fd); /* if write didn't set errno, assume problem is no disk space */errno=save_errno?save_errno : ENOSPC; ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to data file for XID %u: %m", txn->xid))); } pgstat_report_wait_end(); /** Keep the transaction's final_lsn up to date with each change we send to* disk, so that ReorderBufferRestoreCleanup works correctly. (We used to* only do this on commit and abort records, but that doesn't work if a* system crash leaves a transaction without its abort record).** Make sure not to move it backwards.*/if (txn->final_lsn<change->lsn) txn->final_lsn=change->lsn; Assert(ondisk->change.action==change->action);
通过跟客户沟通,知道客户当时执行的语句(create table xxx as select xxx from xxx),相当于创建了表又插入数据,并且算是一个事务,导致了超过了logical_decoding_work_mem的限制,从而发生写入磁盘的情况:
postgres=# show logical_decoding_work_mem;
logical_decoding_work_mem
---------------------------
64MB
优化建议
1) 建议分批写入数据。
2)如果CPU、IO达到瓶颈,可以升级相应规格。