diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index 8140418..adb82ba 100644 --- a/src/backend/access/heap/hio.c +++ b/src/backend/access/heap/hio.c @@ -23,6 +23,7 @@ #include "storage/freespace.h" #include "storage/lmgr.h" #include "storage/smgr.h" +#include "storage/proc.h" /* @@ -168,6 +169,160 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2, } } + +static BlockNumber +GroupExtendRelation(PGPROC *proc, Relation relation, BulkInsertState bistate) +{ + volatile PROC_HDR *procglobal = ProcGlobal; + uint32 nextidx; + uint32 wakeidx; + int extraWaits = -1; + BlockNumber targetBlock; + int count = 0; + + /* Add ourselves to the list of processes needing a group extend. */ + proc->groupExtendMember = true; + + while (true) + { + nextidx = pg_atomic_read_u32(&procglobal->extendGroupFirst); + pg_atomic_write_u32(&proc->extendGroupNext, nextidx); + + if (pg_atomic_compare_exchange_u32(&procglobal->extendGroupFirst, + &nextidx, + (uint32) proc->pgprocno)) + break; + } + + /* + * If the list was not empty, the leader will clear our XID. It is + * impossible to have followers without a leader because the first process + * that has added itself to the list will always have nextidx as + * INVALID_PGPROCNO. + */ + if (nextidx != INVALID_PGPROCNO) + { + /* Sleep until the leader clears our XID. */ + for (;;) + { + /* acts as a read barrier */ + PGSemaphoreLock(&proc->sem); + if (!proc->groupExtendMember) + break; + extraWaits++; + } + + Assert(pg_atomic_read_u32(&proc->extendGroupNext) == INVALID_PGPROCNO); + + /* Fix semaphore count for any absorbed wake ups */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + targetBlock = proc->blockNum; + + proc->blockNum = InvalidBlockNumber; + + return targetBlock; + } + + /* We are the leader. Acquire the lock on behalf of everyone. */ + LockRelationForExtension(relation, ExclusiveLock); + + /* + * Now that we've got the lock, clear the list of processes waiting for + * group extending + */ + while (true) + { + nextidx = pg_atomic_read_u32(&procglobal->extendGroupFirst); + if (pg_atomic_compare_exchange_u32(&procglobal->extendGroupFirst, + &nextidx, + PG_INT32_MAX)) + break; + } + + /* Remember head of list so we can perform wakeups after dropping lock. */ + wakeidx = nextidx; + + + /* Walk the list and clear all XIDs. */ + while (nextidx != INVALID_PGPROCNO) + { + PGPROC *proc = &ProcGlobal->allProcs[nextidx]; + Buffer buffer; + + buffer = ReadBufferBI(relation, P_NEW, bistate); + proc->blockNum = BufferGetBlockNumber(buffer); + + ReleaseBuffer(buffer); + + /* Move to next proc in list. */ + nextidx = pg_atomic_read_u32(&proc->extendGroupNext); + + count ++; + } + + count = count*10; + + do + { + Buffer buffer; + Page page; + Size freespace; + BlockNumber blockNum; + + buffer = ReadBufferBI(relation, P_NEW, bistate); + + /* + * Now acquire lock on the new page. + */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + page = BufferGetPage(buffer); + PageInit(page, BufferGetPageSize(buffer), 0); + + freespace = PageGetHeapFreeSpace(page); + MarkBufferDirty(buffer); + blockNum = BufferGetBlockNumber(buffer); + UnlockReleaseBuffer(buffer); + + RecordPageWithFreeSpace(relation, blockNum, freespace); + } while (count--); + + /* We're done with the lock now. */ + UnlockRelationForExtension(relation, ExclusiveLock); + + /* + * Now that we've released the lock, go back and wake everybody up. We + * don't do this under the lock so as to keep lock hold times to a + * minimum. The system calls we need to perform to wake other processes + * up are probably much slower than the simple memory writes we did while + * holding the lock. + */ + while (wakeidx != INVALID_PGPROCNO) + { + PGPROC *proc = &ProcGlobal->allProcs[wakeidx]; + + wakeidx = pg_atomic_read_u32(&proc->extendGroupNext); + pg_atomic_write_u32(&proc->extendGroupNext, INVALID_PGPROCNO); + + /* ensure all previous writes are visible before follower continues. */ + pg_write_barrier(); + + proc->groupExtendMember = false; + + if (proc != MyProc) + PGSemaphoreUnlock(&proc->sem); + else + { + targetBlock = proc->blockNum; + proc->blockNum = InvalidBlockNumber; + } + } + + return targetBlock; +} + /* * RelationGetBufferForTuple * @@ -238,6 +393,7 @@ RelationGetBufferForTuple(Relation relation, Size len, BlockNumber targetBlock, otherBlock; bool needLock; + int extraBlocks; len = MAXALIGN(len); /* be conservative */ @@ -308,6 +464,7 @@ RelationGetBufferForTuple(Relation relation, Size len, } } +loop: while (targetBlock != InvalidBlockNumber) { /* @@ -441,27 +598,95 @@ RelationGetBufferForTuple(Relation relation, Size len, needLock = !RELATION_IS_LOCAL(relation); if (needLock) - LockRelationForExtension(relation, ExclusiveLock); + { + if (TryLockRelationForExtension(relation, ExclusiveLock)) + { + buffer = ReadBufferBI(relation, P_NEW, bistate); + /* + * We can be certain that locking the otherBuffer first is OK, since + * it must have a lower page number. + */ + if ((otherBuffer != InvalidBuffer) && !extraBlocks) + LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Now acquire lock on the new page. + */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + UnlockRelationForExtension(relation, ExclusiveLock); + } + else + { + targetBlock = GroupExtendRelation(MyProc, relation, bistate); + goto loop; + } + } + else + { + buffer = ReadBufferBI(relation, P_NEW, bistate); + /* + * We can be certain that locking the otherBuffer first is OK, since + * it must have a lower page number. + */ + if ((otherBuffer != InvalidBuffer) && !extraBlocks) + LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); + /* + * Now acquire lock on the new page. + */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + } + +#if 0 + if (LOCKACQUIRE_NOT_AVAIL) + + if (use_fsm) + extraBlocks = RelationGetExtendBlocks(relation) - 1; + else + extraBlocks = 0; /* * XXX This does an lseek - rather expensive - but at the moment it is the * only way to accurately determine how many blocks are in a relation. Is * it worth keeping an accurate file length in shared memory someplace, * rather than relying on the kernel to do it for us? */ - buffer = ReadBufferBI(relation, P_NEW, bistate); - /* - * We can be certain that locking the otherBuffer first is OK, since it - * must have a lower page number. - */ - if (otherBuffer != InvalidBuffer) - LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); + do + { + buffer = ReadBufferBI(relation, P_NEW, bistate); + + /* + * We can be certain that locking the otherBuffer first is OK, since + * it must have a lower page number. + */ + if ((otherBuffer != InvalidBuffer) && !extraBlocks) + LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Now acquire lock on the new page. + */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + if (extraBlocks) + { + Page page; + Size freespace; + BlockNumber blockNum; + + page = BufferGetPage(buffer); + PageInit(page, BufferGetPageSize(buffer), 0); + + freespace = PageGetHeapFreeSpace(page); + MarkBufferDirty(buffer); + blockNum = BufferGetBlockNumber(buffer); + UnlockReleaseBuffer(buffer); + RecordPageWithFreeSpace(relation, blockNum, freespace); + } + + } while (extraBlocks--); - /* - * Now acquire lock on the new page. - */ - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* * Release the file-extension lock; it's now OK for someone else to extend @@ -471,7 +696,7 @@ RelationGetBufferForTuple(Relation relation, Size len, */ if (needLock) UnlockRelationForExtension(relation, ExclusiveLock); - +#endif /* * We need to initialize the empty new page. Double-check that it really * is empty (this should never happen, but if it does we don't want to diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 9d16afb..08e5e07 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -340,6 +340,21 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode) (void) LockAcquire(&tag, lockmode, false, false); } +LockAcquireResult +TryLockRelationForExtension(Relation relation, LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_RELATION_EXTEND(tag, + relation->rd_lockInfo.lockRelId.dbId, + relation->rd_lockInfo.lockRelId.relId); + + return LockAcquire(&tag, lockmode, false, true); +} + + + + /* * UnlockRelationForExtension */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6453b88..a823eda 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -182,6 +182,7 @@ InitProcGlobal(void) ProcGlobal->walwriterLatch = NULL; ProcGlobal->checkpointerLatch = NULL; pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO); + pg_atomic_init_u32(&ProcGlobal->extendGroupFirst, INVALID_PGPROCNO); /* * Create and initialize all the PGPROC structures we'll need. There are diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index dbcdd3f..5a0e60b 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -152,6 +152,11 @@ struct PGPROC */ TransactionId procArrayGroupMemberXid; + bool groupExtendMember; + pg_atomic_uint32 extendGroupNext; + uint32 blockNum; + + /* Per-backend LWLock. Protects fields below (but not group fields). */ LWLock backendLock; @@ -223,6 +228,8 @@ typedef struct PROC_HDR PGPROC *bgworkerFreeProcs; /* First pgproc waiting for group XID clear */ pg_atomic_uint32 procArrayGroupFirst; + pg_atomic_uint32 extendGroupFirst; + /* WALWriter process's latch */ Latch *walwriterLatch; /* Checkpointer process's latch */