diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c50f9c4bf6..877330d409 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -338,18 +338,13 @@ AtAbort_Twophase(void) * resources held by the transaction yet. In those cases, the in-memory * state can be wrong, but it's too late to back out. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); if (!MyLockedGxact->valid) - { RemoveGXact(MyLockedGxact); - } else - { - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); - MyLockedGxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); - LWLockRelease(TwoPhaseStateLock); - } MyLockedGxact = NULL; } @@ -530,15 +525,15 @@ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, /* * MarkAsPrepared * Mark the GXACT as fully valid, and enter it into the global ProcArray. + * An exclusive lock on TwoPhaseStateLock needs to be taken by caller. */ static void MarkAsPrepared(GlobalTransaction gxact) { - /* Lock here may be overkill, but I'm not convinced of that ... */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + Assert(!gxact->valid); gxact->valid = true; - LWLockRelease(TwoPhaseStateLock); /* * Put it into the global ProcArray so TransactionIdIsInProgress considers @@ -624,6 +619,8 @@ LockGXact(const char *gid, Oid user) /* * RemoveGXact * Remove the prepared transaction from the shared memory array. + * Caller needs to hold an exclusive lock on TwoPhaseStateLock as + * data of TwoPhaseState gets updated. * * NB: caller should have already removed it from ProcArray */ @@ -632,7 +629,7 @@ RemoveGXact(GlobalTransaction gxact) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { @@ -646,14 +643,10 @@ RemoveGXact(GlobalTransaction gxact) gxact->next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact; - LWLockRelease(TwoPhaseStateLock); - return; } } - LWLockRelease(TwoPhaseStateLock); - elog(ERROR, "failed to find %p in GlobalTransaction array", gxact); } @@ -1126,8 +1119,12 @@ EndPrepare(GlobalTransaction gxact) * TransactionIdIsInProgress, and onlookers would be entitled to assume * the xact crashed. Instead we have a window where the same XID appears * twice in ProcArray, which is OK. + * + * Lock here may be overkill, but I'm not convinced of that ... */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); MarkAsPrepared(gxact); + LWLockRelease(TwoPhaseStateLock); /* * Now we can mark ourselves as out of the commit critical section: a @@ -1506,7 +1503,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) if (gxact->ondisk) RemoveTwoPhaseFile(xid, true); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); RemoveGXact(gxact); + LWLockRelease(TwoPhaseStateLock); MyLockedGxact = NULL; pfree(buf); @@ -1733,6 +1732,7 @@ restoreTwoPhaseData(void) DIR *cldir; struct dirent *clde; + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { @@ -1753,6 +1753,7 @@ restoreTwoPhaseData(void) } } FreeDir(cldir); + LWLockRelease(TwoPhaseStateLock); } /* @@ -1781,6 +1782,9 @@ restoreTwoPhaseData(void) * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all * top-level xids is stored in *xids_p. The number of entries in the array * is returned in *nxids_p. + * + * An exclusive lock on TwoPhaseStateLock is taken for the duration of the + * scan of TwoPhaseState data as its may be updated. */ TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) @@ -1792,7 +1796,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) int allocsize = 0; int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1861,13 +1865,16 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) * The lack of calls to SubTransSetParent() calls here is by design; * those calls are made by RecoverPreparedTransactions() at the end of recovery * for those xacts that need this. + * + * An exclusive lock on TwoPhaseStateLock is taken for the duration of the + * scan of TwoPhaseState data as it may get updated. */ void StandbyRecoverPreparedTransactions(void) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1931,9 +1938,11 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, true, false); + LWLockRelease(TwoPhaseStateLock); if (buf == NULL) continue; @@ -1963,11 +1972,11 @@ RecoverPreparedTransactions(void) /* recovered, so reset the flag for entries generated by redo */ gxact->inredo = false; - LWLockRelease(TwoPhaseStateLock); - GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); + LWLockRelease(TwoPhaseStateLock); + /* * Recover other state (notably locks) using resource managers */ @@ -2001,6 +2010,9 @@ RecoverPreparedTransactions(void) * * If setNextXid is true, set ShmemVariableCache->nextXid to the newest * value scanned. + * + * Caller needs to hold an exclusive lock on TwoPhaseStateLock as data + * of TwoPhaseState gets updated. */ static char * ProcessTwoPhaseBuffer(TransactionId xid, @@ -2014,6 +2026,8 @@ ProcessTwoPhaseBuffer(TransactionId xid, TwoPhaseFileHeader *hdr; int i; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); @@ -2299,6 +2313,10 @@ RecordTransactionAbortPrepared(TransactionId xid, * a gxact entry in shared memory TwoPhaseState structure. If caller * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase * data, the entry is marked as located on disk. + * + * Caller needs to hold an exclusive lock on TwoPhaseStateLock as data of + * TwoPhaseState gets updated. Note that the lock is not directly taken in + * this routine as it can be called in restoreTwoPhaseData(). */ void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) @@ -2309,6 +2327,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) GlobalTransaction gxact; Assert(RecoveryInProgress()); + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); gid = (const char *) bufptr; @@ -2324,7 +2343,6 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) * that it got added in the redo phase */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); /* Get a free gxact from the freelist */ if (TwoPhaseState->freeGXacts == NULL) ereport(ERROR, @@ -2350,17 +2368,18 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; - LWLockRelease(TwoPhaseStateLock); - elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid); } /* * PrepareRedoRemove * - * Remove the corresponding gxact entry from TwoPhaseState. Also - * remove the 2PC file if a prepared transaction was saved via - * an earlier checkpoint. + * Remove the corresponding gxact entry from TwoPhaseState. Also remove + * the 2PC file if a prepared transaction was saved via an earlier checkpoint. + * + * Caller needs to hold an exclusive lock on TwoPhaseStateLock as data of + * TwoPhaseState gets updated. Note that the lock is not directly taken in + * this routine as it can be called in ProcessTwoPhaseBuffer(). */ void PrepareRedoRemove(TransactionId xid, bool giveWarning) @@ -2370,8 +2389,8 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) bool found = false; Assert(RecoveryInProgress()); + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { gxact = TwoPhaseState->prepXacts[i]; @@ -2383,7 +2402,6 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) break; } } - LWLockRelease(TwoPhaseStateLock); /* * Just leave if there is nothing, this is expected during WAL replay. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 7e8c598f2a..6b60ea9577 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5619,7 +5619,9 @@ xact_redo(XLogReaderState *record) record->EndRecPtr, XLogRecGetOrigin(record)); /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); } } else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) @@ -5641,7 +5643,9 @@ xact_redo(XLogReaderState *record) xact_redo_abort(&parsed, parsed.twophase_xid); /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); } } else if (info == XLOG_XACT_PREPARE) @@ -5650,9 +5654,11 @@ xact_redo(XLogReaderState *record) * Store xid and start/end pointers of the WAL record in TwoPhaseState * gxact entry. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, record->EndRecPtr); + LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) {