PostgreSQL和大多数传统RDBMS一样,都设计了大量的锁来保证并发操作的数据一致性。
同时PG在设计锁等待时,以队列方式存储等待锁。
参考
ProcSleep()@src/backend/storage/lmgr/proc.c
http:
因此,会出现一种问题。
例如一个长事务A持有一个表的某条记录的更新锁。
接下来的一个事务B要TRUNCATE这个表,会把这个锁放到等待队列去。
在接下来的事务C......如果请求的锁和TRUNCATE表的锁发生冲突,也会放到等待队列去。
这其实是有利有弊的,利是什么呢?
B在A释放后,可以立即获得锁进行TRUNCATE。
弊是什么?
排在B后面的事务会被堵塞,虽然它们可能和A没有锁冲突,也需要等待。
弊端往往在某些凑巧的情况下起到放大效果。
例如一个表的操作非常频繁,如果刚好有一个长事务在里面,然后又刚好有事务需要获得一个排他锁,那么接下来的频繁DML请求都会被堵塞。
一般可以用锁超时来降低弊端带来的这种影响,配置参数lock_timeout,如果锁等待超过这个时间,会强行中断,从等待队列去除。
另外,如果我们没有设置lock_timeout或者不方便设置lock_timeout的话,一旦发现数据库出现了大量的等待,应该如何找到罪魁祸首呢?即处于等待状态的查询,它们到底在等待谁?
找到之后可以认为的杀掉这些罪魁祸首。
使用以下SQL
with t_wait as
(select a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),
t_run as
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)
select r.locktype,r.mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,r.xact_start r_xact_start,r.query_start r_query_start,r.query r_query,
w.usename w_user,w.datname w_db,w.pid w_pid,w.xact_start w_xact_start,w.query_start w_query_start,w.query w_query
from t_wait w,t_run r where
r.locktype is not distinct from w.locktype and
r.database is not distinct from w.database and
r.relation is not distinct from w.relation and
r.page is not distinct from w.page and
r.tuple is not distinct from w.tuple and
r.classid is not distinct from w.classid and
r.objid is not distinct from w.objid and
r.objsubid is not distinct from w.objsubid
order by r.xact_start;
例如:
-[ RECORD 1 ]-+---------------------------------------------------------------------
locktype | relation
mode | ShareUpdateExclusiveLock
r_user | postgres
r_db | postgres
relation | tbl
r_pid | 24579
r_xact_start | 2015-05-10 09:43:53.956252+08
r_query_start | 2015-05-10 09:43:53.956252+08
r_query | autovacuum: VACUUM ANALYZE public.tbl (to prevent wraparound)
w_user | postgres
w_db | postgres
w_pid | 24737
w_xact_start | 2015-05-10 09:47:15.294562+08
w_query_start | 2015-05-10 09:47:15.294562+08
w_query | insert into tbl(crt_time) select now() from generate_series(1,1000);
.....
(1001 rows)
干掉它:
postgres=# select pg_terminate_backend(24579);
-[ RECORD 1 ]--------+--
pg_terminate_backend | t
再次查询,等待消失。
postgres=# with t_wait as
(select a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),
t_run as
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)
select r.locktype,r.mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,r.xact_start r_xact_start,r.query_start r_query_start,r.query r_query,
w.usename w_user,w.datname w_db,w.pid w_pid,w.xact_start w_xact_start,w.query_start w_query_start,w.query w_query
from t_wait w,t_run r where
r.locktype is not distinct from w.locktype and
r.database is not distinct from w.database and
r.relation is not distinct from w.relation and
r.page is not distinct from w.page and
r.tuple is not distinct from w.tuple and
r.classid is not distinct from w.classid and
r.objid is not distinct from w.objid and
r.objsubid is not distinct from w.objsubid
order by r.xact_start;
(No rows)
实际上,这个查询还不是完美,因为锁等待实际上是一个TRUNCATE TABLE的操作造成的,而因为AUTO VACUUM FREEZE和TRUNCATE操作冲突了,所以这两个会话的锁都会对后面的DML造成影响,实际上,我们要找到的应该是级别最高的锁(TRUNCATE),干掉这个才是最重要的,普通的vacuum并不会和DML冲突。
所以我们需要改进一下这个查询语句。
从pg_locks视图的函数的源码,可以知道mode是怎么来的。
src/backend/utils/adt/lockfuncs.c
Datum
pg_lock_status(PG_FUNCTION_ARGS)
{
......
Datum values[NUM_LOCK_STATUS_COLUMNS];
......
values[12] = CStringGetTextDatum(GetLockmodeName(instance->locktag.locktag_lockmethodid, mode));
......
}
src/backend/storage/lmgr/lock.c
LockMethod
GetLocksMethodTable(const LOCK *lock)
{
LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
return LockMethods[lockmethodid];
}
static const char *const lock_mode_names[] =
{
"INVALID",
"AccessShareLock",
"RowShareLock",
"RowExclusiveLock",
"ShareUpdateExclusiveLock",
"ShareLock",
"ShareRowExclusiveLock",
"ExclusiveLock",
"AccessExclusiveLock"
};
static const LOCKMASK LockConflicts[] = {
0,
(1 << AccessExclusiveLock),
(1 << ExclusiveLock) | (1 << AccessExclusiveLock),
(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
(1 << ExclusiveLock) | (1 << AccessExclusiveLock),
(1 << ShareUpdateExclusiveLock) |
(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
(1 << ExclusiveLock) | (1 << AccessExclusiveLock),
(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
(1 << ShareRowExclusiveLock) |
(1 << ExclusiveLock) | (1 << AccessExclusiveLock),
(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
(1 << ExclusiveLock) | (1 << AccessExclusiveLock),
(1 << RowShareLock) |
(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
(1 << ExclusiveLock) | (1 << AccessExclusiveLock),
(1 << AccessShareLock) | (1 << RowShareLock) |
(1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
(1 << ShareLock) | (1 << ShareRowExclusiveLock) |
(1 << ExclusiveLock) | (1 << AccessExclusiveLock)
};
static const LockMethodData default_lockmethod = {
AccessExclusiveLock,
LockConflicts,
lock_mode_names,
#ifdef LOCK_DEBUG
&Trace_locks
#else
&Dummy_trace
#endif
};
static const LockMethodData user_lockmethod = {
AccessExclusiveLock,
LockConflicts,
lock_mode_names,
#ifdef LOCK_DEBUG
&Trace_userlocks
#else
&Dummy_trace
#endif
};
src/include/storage/lock.h
#define NoLock 0
#define AccessShareLock 1 /* SELECT */
#define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE */
#define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE */
#define ShareUpdateExclusiveLock 4 /* VACUUM (non-FULL),ANALYZE, CREATE
* INDEX CONCURRENTLY */
#define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY) */
#define ShareRowExclusiveLock 6 /* like EXCLUSIVE MODE, but allows ROW
* SHARE */
#define ExclusiveLock 7 /* blocks ROW SHARE/SELECT...FOR
* UPDATE */
#define AccessExclusiveLock 8 /* ALTER TABLE, DROP TABLE, VACUUM
* FULL, and unqualified LOCK TABLE */
改进后的查询如下:
用一个函数来将锁转换为数字,
postgres=# create or replace function f_lock_level(i_mode text) returns int as
$$
declare
begin
case i_mode
when 'INVALID' then return 0;
when 'AccessShareLock' then return 1;
when 'RowShareLock' then return 2;
when 'RowExclusiveLock' then return 3;
when 'ShareUpdateExclusiveLock' then return 4;
when 'ShareLock' then return 5;
when 'ShareRowExclusiveLock' then return 6;
when 'ExclusiveLock' then return 7;
when 'AccessExclusiveLock' then return 8;
else return 0;
end case;
end;
$$
language plpgsql strict;
修改查询语句,按锁级别排序:
with t_wait as
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),
t_run as
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)
select r.locktype,r.mode r_mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,
r.page r_page,r.tuple r_tuple,r.xact_start r_xact_start,r.query_start r_query_start,
now()-r.query_start r_locktime,r.query r_query,w.mode w_mode,w.pid w_pid,w.page w_page,
w.tuple w_tuple,w.xact_start w_xact_start,w.query_start w_query_start,
now()-w.query_start w_locktime,w.query w_query
from t_wait w,t_run r where
r.locktype is not distinct from w.locktype and
r.database is not distinct from w.database and
r.relation is not distinct from w.relation and
r.page is not distinct from w.page and
r.tuple is not distinct from w.tuple and
r.classid is not distinct from w.classid and
r.objid is not distinct from w.objid and
r.objsubid is not distinct from w.objsubid and
r.transactionid is not distinct from w.transactionid and
r.pid <> w.pid
order by f_lock_level(w.mode)+f_lock_level(r.mode) desc,r.xact_start;
现在可以排在前面的就是锁级别高的等待,优先干掉这个。
-[ RECORD 1 ]-+---------------------------------------------------------------------
locktype | relation -- 冲突类型
r_mode | ShareUpdateExclusiveLock -- 持锁模式
r_user | postgres -- 持锁用户
r_db | postgres -- 持锁数据库
relation | tbl -- 持锁对象
r_pid | 25656 -- 持锁进程
r_xact_start | 2015-05-10 14:11:16.08318+08 -- 持锁事务开始时间
r_query_start | 2015-05-10 14:11:16.08318+08 -- 持锁SQL开始时间
r_locktime | 00:01:49.460779 -- 持锁时长
r_query | vacuum freeze tbl; -- 持锁SQL,注意不一定是这个SQL带来的锁,也有可能是这个事务在之前执行的SQL加的锁
w_mode | AccessExclusiveLock -- 等待锁模式
w_pid | 26731 -- 等待锁进程
w_xact_start | 2015-05-10 14:11:17.987362+08 -- 等待锁事务开始时间
w_query_start | 2015-05-10 14:11:17.987362+08 -- 等待锁SQL开始时间
w_locktime | 00:01:47.556597 -- 等待锁时长
w_query | truncate tbl; -- 等待锁SQL
-[ RECORD 2 ]-+---------------------------------------------------------------------
locktype | relation
r_mode | ShareUpdateExclusiveLock
r_user | postgres
r_db | postgres
relation | tbl
r_pid | 25656
r_xact_start | 2015-05-10 14:11:16.08318+08
r_query_start | 2015-05-10 14:11:16.08318+08
r_locktime | 00:01:49.460779
r_query | vacuum freeze tbl;
w_mode | RowExclusiveLock
w_pid | 25582
w_xact_start | 2015-05-10 14:11:22.845+08
w_query_start | 2015-05-10 14:11:22.845+08
w_locktime | 00:01:42.698959
w_query | insert into tbl(crt_time) select now() from generate_series(1,1000); -- 这个SQL其实等待的是truncate tbl的锁;
......
锁冲突判断函数:
LockCheckConflicts()@src/backend/storage/lmgr/lock.c
我们可以创建一个函数用来杀掉对同一个对象某些锁等待时间和等待进程超出阈值的进程。
例如扩展数据块的锁超出一定数量,我们想办法杀掉,避免大量等待。
CREATE OR REPLACE FUNCTION public.f_kill_extend(i_interval interval, i_waiting bigint)
RETURNS void
LANGUAGE plpgsql
STRICT
AS $function$
declare
v_database oid;
v_relation oid;
v_pid int;
v_record record;
begin
if (pg_is_in_recovery()) then
return;
end if;
for v_record in with t_wait as
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),
t_run as
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)
select r.locktype,r.mode r_mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,
r.page r_page,r.tuple r_tuple,r.xact_start r_xact_start,r.query_start r_query_start,
now()-r.query_start r_locktime,r.query r_query,w.mode w_mode,w.pid w_pid,w.page w_page,
w.tuple w_tuple,w.xact_start w_xact_start,w.query_start w_query_start,
now()-w.query_start w_locktime,w.query w_query
from t_wait w,t_run r where
r.locktype is not distinct from w.locktype and
r.database is not distinct from w.database and
r.relation is not distinct from w.relation and
r.page is not distinct from w.page and
r.tuple is not distinct from w.tuple and
r.classid is not distinct from w.classid and
r.objid is not distinct from w.objid and
r.objsubid is not distinct from w.objsubid and
r.transactionid is not distinct from w.transactionid and
r.pid <> w.pid
order by f_lock_level(w.mode)+f_lock_level(r.mode) desc,r.xact_start
LOOP
raise notice '%', v_record;
END LOOP;
for v_database,v_relation in select database,relation from pg_locks where
locktype='extend' and mode='ExclusiveLock' and not granted and
pid in (select pid from pg_stat_activity where now()-xact_start > i_interval)
group by 1,2 having count(*) > i_waiting
loop
perform pg_terminate_backend(pid) from pg_locks
where database=v_database and relation=v_relation;
end loop;
return;
end;
$function$;
例如:
psql -c "select f_kill_extend(interval '1 sec', 10);"
[参考]
1. http:
2. src/backend/storage/lmgr/proc.c
int
ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
{
LOCKMODE lockmode = locallock->tag.mode;
LOCK *lock = locallock->lock;
PROCLOCK *proclock = locallock->proclock;
uint32 hashcode = locallock->hashcode;
LWLockId partitionLock = LockHashPartitionLock(hashcode);
PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMASK myHeldLocks = MyProc->heldLocks;
bool early_deadlock = false;
bool allow_autovacuum_cancel = true;
int myWaitStatus;
PGPROC *proc;
int i;
if (myHeldLocks != 0)
{
LOCKMASK aheadRequests = 0;
proc = (PGPROC *) waitQueue->links.next;
for (i = 0; i < waitQueue->size; i++)
{
if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks)
{
if (lockMethodTable->conflictTab[lockmode] & proc->heldLocks)
{
RememberSimpleDeadLock(MyProc, lockmode, lock, proc);
early_deadlock = true;
break;
}
if ((lockMethodTable->conflictTab[lockmode] & aheadRequests) == 0 &&
LockCheckConflicts(lockMethodTable,
lockmode,
lock,
proclock,
MyProc) == STATUS_OK)
{
GrantLock(lock, proclock, lockmode);
GrantAwaitedLock();
return STATUS_OK;
}
break;
}
aheadRequests |= LOCKBIT_ON(proc->waitLockMode);
proc = (PGPROC *) proc->links.next;
}
}
else
{
proc = (PGPROC *) &(waitQueue->links);
}
SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
waitQueue->size++;
lock->waitMask |= LOCKBIT_ON(lockmode);
MyProc->waitLock = lock;
MyProc->waitProcLock = proclock;
MyProc->waitLockMode = lockmode;
MyProc->waitStatus = STATUS_WAITING;
if (early_deadlock)
{
RemoveFromWaitQueue(MyProc, hashcode);
return STATUS_ERROR;
}
lockAwaited = locallock;
LWLockRelease(partitionLock);
if (RecoveryInProgress() && !InRecovery)
CheckRecoveryConflictDeadlock();
deadlock_state = DS_NOT_YET_CHECKED;
if (LockTimeout > 0)
{
EnableTimeoutParams timeouts[2];
timeouts[0].id = DEADLOCK_TIMEOUT;
timeouts[0].type = TMPARAM_AFTER;
timeouts[0].delay_ms = DeadlockTimeout;
timeouts[1].id = LOCK_TIMEOUT;
timeouts[1].type = TMPARAM_AFTER;
timeouts[1].delay_ms = LockTimeout;
enable_timeouts(timeouts, 2);
}
else
enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
do
{
PGSemaphoreLock(&MyProc->sem, true);
myWaitStatus = MyProc->waitStatus;
if (deadlock_state == DS_BLOCKED_BY_AUTOVACUUM && allow_autovacuum_cancel)
{
PGPROC *autovac = GetBlockingAutoVacuumPgproc();
PGXACT *autovac_pgxact = &ProcGlobal->allPgXact[autovac->pgprocno];
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
if ((autovac != NULL) &&
(autovac_pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
!(autovac_pgxact->vacuumFlags & PROC_VACUUM_FOR_WRAPAROUND))
{
int pid = autovac->pid;
StringInfoData locktagbuf;
StringInfoData logbuf;
initStringInfo(&locktagbuf);
initStringInfo(&logbuf);
DescribeLockTag(&locktagbuf, &lock->tag);
appendStringInfo(&logbuf,
_("Process %d waits for %s on %s."),
MyProcPid,
GetLockmodeName(lock->tag.locktag_lockmethodid,
lockmode),
locktagbuf.data);
LWLockRelease(ProcArrayLock);
ereport(LOG,
(errmsg("sending cancel to blocking autovacuum PID %d",
pid),
errdetail_log("%s", logbuf.data)));
pfree(logbuf.data);
pfree(locktagbuf.data);
if (kill(pid, SIGINT) < 0)
{
ereport(WARNING,
(errmsg("could not send signal to process %d: %m",
pid)));
}
}
else
LWLockRelease(ProcArrayLock);
allow_autovacuum_cancel = false;
}
if (log_lock_waits && deadlock_state != DS_NOT_YET_CHECKED)
{
StringInfoData buf;
const char *modename;
long secs;
int usecs;
long msecs;
initStringInfo(&buf);
DescribeLockTag(&buf, &locallock->tag.lock);
modename = GetLockmodeName(locallock->tag.lock.locktag_lockmethodid,
lockmode);
TimestampDifference(get_timeout_start_time(DEADLOCK_TIMEOUT),
GetCurrentTimestamp(),
&secs, &usecs);
msecs = secs * 1000 + usecs / 1000;
usecs = usecs % 1000;
if (deadlock_state == DS_SOFT_DEADLOCK)
ereport(LOG,
(errmsg("process %d avoided deadlock for %s on %s by rearranging queue order after %ld.%03d ms",
MyProcPid, modename, buf.data, msecs, usecs)));
else if (deadlock_state == DS_HARD_DEADLOCK)
{
ereport(LOG,
(errmsg("process %d detected deadlock while waiting for %s on %s after %ld.%03d ms",
MyProcPid, modename, buf.data, msecs, usecs)));
}
if (myWaitStatus == STATUS_WAITING)
ereport(LOG,
(errmsg("process %d still waiting for %s on %s after %ld.%03d ms",
MyProcPid, modename, buf.data, msecs, usecs)));
else if (myWaitStatus == STATUS_OK)
ereport(LOG,
(errmsg("process %d acquired %s on %s after %ld.%03d ms",
MyProcPid, modename, buf.data, msecs, usecs)));
else
{
Assert(myWaitStatus == STATUS_ERROR);
if (deadlock_state != DS_HARD_DEADLOCK)
ereport(LOG,
(errmsg("process %d failed to acquire %s on %s after %ld.%03d ms",
MyProcPid, modename, buf.data, msecs, usecs)));
}
deadlock_state = DS_NO_DEADLOCK;
pfree(buf.data);
}
} while (myWaitStatus == STATUS_WAITING);
if (LockTimeout > 0)
{
DisableTimeoutParams timeouts[2];
timeouts[0].id = DEADLOCK_TIMEOUT;
timeouts[0].keep_indicator = false;
timeouts[1].id = LOCK_TIMEOUT;
timeouts[1].keep_indicator = true;
disable_timeouts(timeouts, 2);
}
else
disable_timeout(DEADLOCK_TIMEOUT, false);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
lockAwaited = NULL;
if (MyProc->waitStatus == STATUS_OK)
GrantAwaitedLock();
return MyProc->waitStatus;
}