diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 9f55adc..eb7c339 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -45,8 +45,8 @@ * fsynced * * If COMMIT happens after checkpoint then backend reads state data from * files - * * In case of crash replay will move data from xlog to files, if that - * hasn't happened before. XXX TODO - move to shmem in replay also + * + * The same procedure happens during WAL replay. * *------------------------------------------------------------------------- */ @@ -578,6 +578,45 @@ LockGXact(const char *gid, Oid user) } /* + * LockGXactByXid + * + * Find prepared transaction by xid and lock corresponding GXACT. + * This is used during recovery as an alternative to LockGXact(), and + * should only be used in recovery. No entries found means that a checkpoint + * has moved the searched prepared transaction data to a twophase file. + * + * Returns the transaction data if found, or NULL if nothing has been locked. + */ +static GlobalTransaction +LockGXactByXid(TransactionId xid) +{ + int i; + GlobalTransaction gxact = NULL; + + Assert(RecoveryInProgress()); + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + PGXACT *pgxact; + + gxact = TwoPhaseState->prepXacts[i]; + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (TransactionIdEquals(xid, pgxact->xid)) + { + /* ok to lock it */ + gxact->locking_backend = MyBackendId; + MyLockedGxact = gxact; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + + return gxact; +} + +/* * RemoveGXact * Remove the prepared transaction from the shared memory array. * @@ -1241,9 +1280,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) * Reads 2PC data from xlog. During checkpoint this data will be moved to * twophase files and ReadTwoPhaseFile should be used instead. * - * Note clearly that this function accesses WAL during normal operation, similarly - * to the way WALSender or Logical Decoding would do. It does not run during - * crash recovery or standby processing. + * Note that this function accesses WAL not only during recovery but also + * during normal operation, similarly to the way WALSender or Logical + * Decoding would do. */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1252,8 +1291,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - Assert(!RecoveryInProgress()); - xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, @@ -1296,13 +1333,35 @@ StandbyTransactionIdIsPrepared(TransactionId xid) char *buf; TwoPhaseFileHeader *hdr; bool result; + int i; Assert(TransactionIdIsValid(xid)); if (max_prepared_xacts <= 0) return false; /* nothing to do */ - /* Read and validate file */ + /* + * First check if this prepared transaction has its information in + * shared memory, and use it. + */ + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (TransactionIdEquals(pgxact->xid, xid)) + { + LWLockRelease(TwoPhaseStateLock); + return true; + } + } + LWLockRelease(TwoPhaseStateLock); + + /* + * Nothing in shared memory? Then just read its corresponding twophase + * file and validate it. + */ buf = ReadTwoPhaseFile(xid, false); if (buf == NULL) return false; @@ -1316,12 +1375,17 @@ StandbyTransactionIdIsPrepared(TransactionId xid) } /* - * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED + * FinishGXact + * + * Do the actual finish of COMMIT/ABORT PREPARED. Calls is responsible + * for locking the transaction this routine is working on. + * + * This function can be called during replay to clean memory state for + * previously prepared xact. In that case actions are the same as in + * normal operations but without any writes to WAL or files. */ -void -FinishPreparedTransaction(const char *gid, bool isCommit) +static void FinishGXact(GlobalTransaction gxact, bool isCommit) { - GlobalTransaction gxact; PGPROC *proc; PGXACT *pgxact; TransactionId xid; @@ -1332,16 +1396,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) TransactionId *children; RelFileNode *commitrels; RelFileNode *abortrels; - RelFileNode *delrels; - int ndelrels; SharedInvalidationMessage *invalmsgs; int i; - /* - * Validate the GID, and lock the GXACT to ensure that two backends do not - * try to commit the same GID at once. - */ - gxact = LockGXact(gid, GetUserId()); proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; xid = pgxact->xid; @@ -1383,17 +1440,23 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * TransactionIdIsInProgress will stop saying the prepared xact is in * progress), then run the post-commit or post-abort callbacks. The * callbacks will release the locks the transaction held. + * + * In recovery nothing needs to happen here as this generates WAL + * records. */ - if (isCommit) - RecordTransactionCommitPrepared(xid, + if (!RecoveryInProgress()) + { + if (isCommit) + RecordTransactionCommitPrepared(xid, hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, hdr->initfileinval); - else - RecordTransactionAbortPrepared(xid, + else + RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, hdr->nabortrels, abortrels); + } ProcArrayRemove(proc, latestXid); @@ -1408,41 +1471,50 @@ FinishPreparedTransaction(const char *gid, bool isCommit) gxact->valid = false; /* - * We have to remove any files that were supposed to be dropped. For - * consistency with the regular xact.c code paths, must do this before - * releasing locks, so do it before running the callbacks. - * - * NB: this code knows that we couldn't be dropping any temp rels ... + * Perform actions needed only during normal operation, but *not* recovery. */ - if (isCommit) - { - delrels = commitrels; - ndelrels = hdr->ncommitrels; - } - else + if (!RecoveryInProgress()) { - delrels = abortrels; - ndelrels = hdr->nabortrels; - } - for (i = 0; i < ndelrels; i++) - { - SMgrRelation srel = smgropen(delrels[i], InvalidBackendId); + RelFileNode *delrels; + int ndelrels; - smgrdounlink(srel, false); - smgrclose(srel); - } + /* + * We have to remove any files that were supposed to be dropped. For + * consistency with the regular xact.c code paths, must do this before + * releasing locks, so do it before running the callbacks. + * + * NB: this code knows that we couldn't be dropping any temp rels ... + */ + if (isCommit) + { + delrels = commitrels; + ndelrels = hdr->ncommitrels; + } + else + { + delrels = abortrels; + ndelrels = hdr->nabortrels; + } + for (i = 0; i < ndelrels; i++) + { + SMgrRelation srel = smgropen(delrels[i], InvalidBackendId); - /* - * Handle cache invalidation messages. - * - * Relcache init file invalidation requires processing both before and - * after we send the SI messages. See AtEOXact_Inval() - */ - if (hdr->initfileinval) - RelationCacheInitFilePreInvalidate(); - SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs); - if (hdr->initfileinval) - RelationCacheInitFilePostInvalidate(); + smgrdounlink(srel, false); + smgrclose(srel); + } + + /* + * Handle cache invalidation messages. + * + * Relcache init file invalidation requires processing both before and + * after we send the SI messages. See AtEOXact_Inval() + */ + if (hdr->initfileinval) + RelationCacheInitFilePreInvalidate(); + SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs); + if (hdr->initfileinval) + RelationCacheInitFilePostInvalidate(); + } /* And now do the callbacks */ if (isCommit) @@ -1468,6 +1540,50 @@ FinishPreparedTransaction(const char *gid, bool isCommit) } /* + * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED + */ +void +FinishPreparedTransaction(const char *gid, bool isCommit) +{ + GlobalTransaction gxact; + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to commit the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + FinishGXact(gxact, isCommit); +} + +/* + * XlogRedoFinishPrepared() + * + * This function is called during recovery for WAL records working on COMMIT + * PREPARED or ABORT PREPARED. That function cleans up memory state that was + * created while replaying its corresponding PREPARE record if its information + * was not on disk in a twophase file. + */ +void +XlogRedoFinishPrepared(TransactionId xid, bool isCommit) +{ + GlobalTransaction gxact; + + Assert(RecoveryInProgress()); + + gxact = LockGXactByXid(xid); + + /* + * If requested xid was not found that means that the PREPARE record was + * moved to a twophase file because of a checkpoint or a restart point. + * There is nothing else to do in this case. + */ + if (!gxact) + return; + + FinishGXact(gxact, isCommit); +} + +/* * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record. */ static void @@ -1690,7 +1806,47 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId *xids = NULL; int nxids = 0; int allocsize = 0; + int i; + + /* + * We need to check the PGXACT array for prepared transactions that doesn't + * have any state file in case of a slave restart with the master being off. + */ + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (!gxact->valid) + continue; + + if (TransactionIdPrecedes(pgxact->xid, result)) + result = pgxact->xid; + + if (xids_p) + { + if (nxids == allocsize) + { + if (nxids == 0) + { + allocsize = 10; + xids = palloc(allocsize * sizeof(TransactionId)); + } + else + { + allocsize = allocsize * 2; + xids = repalloc(xids, allocsize * sizeof(TransactionId)); + } + } + xids[nxids++] = pgxact->xid; + } + } + LWLockRelease(TwoPhaseStateLock); + /* + * And now scan files in pg_twophase directory + */ cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { @@ -1701,7 +1857,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) char *buf; TwoPhaseFileHeader *hdr; TransactionId *subxids; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); @@ -1809,102 +1964,105 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) } /* - * StandbyRecoverPreparedTransactions + * RecoverPreparedFromBuffer * - * Scan the pg_twophase directory and setup all the required information to - * allow standby queries to treat prepared transactions as still active. - * This is never called at the end of recovery - we use - * RecoverPreparedTransactions() at that point. + * Parse data in given buffer (that can be a pointer to WAL record holding + * this information or data read from a twophase file) and build the + * shared-memory state for that prepared transaction. * - * Currently we simply call SubTransSetParent() for any subxids of prepared - * transactions. If overwriteOK is true, it's OK if some XIDs have already - * been marked in pg_subtrans. + * Caller is responsible for calling MarkAsPrepared() on the returned gxact. */ -void -StandbyRecoverPreparedTransactions(bool overwriteOK) +static GlobalTransaction +RecoverPreparedFromBuffer(char *buf, bool forceOverwriteOK) { - DIR *cldir; - struct dirent *clde; + char *bufptr; + const char *gid; + TransactionId *subxids; + bool overwriteOK = false; + int i; + GlobalTransaction gxact; + TwoPhaseFileHeader *hdr; - cldir = AllocateDir(TWOPHASE_DIR); - while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) - { - if (strlen(clde->d_name) == 8 && - strspn(clde->d_name, "0123456789ABCDEF") == 8) - { - TransactionId xid; - char *buf; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; - int i; + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + bufptr += MAXALIGN(hdr->gidlen); + subxids = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); - xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + /* + * It's possible that SubTransSetParent has been set before, if + * the prepared transaction generated xid assignment records. Test + * here must match one used in AssignTransactionId(). + */ + if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || + XLogLogicalInfoActive())) + overwriteOK = true; - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) - { - ereport(WARNING, - (errmsg("removing stale two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + /* + * Caller can also force overwriteOK. + */ + if (forceOverwriteOK) + overwriteOK = true; - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } + /* + * Reconstruct subtrans state for the transaction --- needed + * because pg_subtrans is not preserved over a restart. Note that + * we are linking all the subtransactions directly to the + * top-level XID; there may originally have been a more complex + * hierarchy, but there's no need to restore that exactly. + */ + for (i = 0; i < hdr->nsubxacts; i++) + SubTransSetParent(subxids[i], hdr->xid, overwriteOK); - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - if (!TransactionIdEquals(hdr->xid, xid)) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - pfree(buf); - continue; - } + /* + * Recreate its GXACT and dummy PGPROC + */ + gxact = MarkAsPreparing(hdr->xid, gid, + hdr->prepared_at, + hdr->owner, hdr->database); + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); - /* - * Examine subtransaction XIDs ... they should all follow main - * XID. - */ - subxids = (TransactionId *) - (buf + MAXALIGN(sizeof(TwoPhaseFileHeader))); - for (i = 0; i < hdr->nsubxacts; i++) - { - TransactionId subxid = subxids[i]; + /* + * Recover other state (notably locks) using resource managers + */ + ProcessRecords(bufptr, hdr->xid, twophase_recover_callbacks); - Assert(TransactionIdFollows(subxid, xid)); - SubTransSetParent(xid, subxid, overwriteOK); - } - } - } - FreeDir(cldir); + /* + * Release locks held by the standby process after we process each + * prepared transaction. As a result, we don't need too many + * additional locks at any one time. + */ + if (InHotStandby) + StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids); + + /* + * We're done with recovering this transaction. Clear + * MyLockedGxact, like we do in PrepareTransaction() during normal + * operation. + */ + PostPrepare_Twophase(); + + return gxact; } /* - * RecoverPreparedTransactions + * RecoverPreparedFromFiles * * Scan the pg_twophase directory and reload shared-memory state for each * prepared transaction (reacquire locks, etc). This is run during database * startup. */ void -RecoverPreparedTransactions(void) +RecoverPreparedFromFiles(bool forceOverwriteOK) { char dir[MAXPGPATH]; DIR *cldir; struct dirent *clde; - bool overwriteOK = false; snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR); @@ -1916,15 +2074,30 @@ RecoverPreparedTransactions(void) { TransactionId xid; char *buf; - char *bufptr; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; GlobalTransaction gxact; - const char *gid; int i; + bool recovered = false; + PGXACT *pgxact; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + /* Already recovered from WAL? */ + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (TransactionIdEquals(xid, pgxact->xid)) + { + recovered = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + if (recovered) + continue; + /* Already processed? */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) { @@ -1949,73 +2122,39 @@ RecoverPreparedTransactions(void) ereport(LOG, (errmsg("recovering prepared transaction %u", xid))); - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - Assert(TransactionIdEquals(hdr->xid, xid)); - bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); - gid = (const char *) bufptr; - bufptr += MAXALIGN(hdr->gidlen); - subxids = (TransactionId *) bufptr; - bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); - bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); - bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); - bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); - - /* - * It's possible that SubTransSetParent has been set before, if - * the prepared transaction generated xid assignment records. Test - * here must match one used in AssignTransactionId(). - */ - if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || - XLogLogicalInfoActive())) - overwriteOK = true; - - /* - * Reconstruct subtrans state for the transaction --- needed - * because pg_subtrans is not preserved over a restart. Note that - * we are linking all the subtransactions directly to the - * top-level XID; there may originally have been a more complex - * hierarchy, but there's no need to restore that exactly. - */ - for (i = 0; i < hdr->nsubxacts; i++) - SubTransSetParent(subxids[i], xid, overwriteOK); - - /* - * Recreate its GXACT and dummy PGPROC - */ - gxact = MarkAsPreparing(xid, gid, - hdr->prepared_at, - hdr->owner, hdr->database); + gxact = RecoverPreparedFromBuffer(buf, forceOverwriteOK); gxact->ondisk = true; - GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); - /* - * Recover other state (notably locks) using resource managers - */ - ProcessRecords(bufptr, xid, twophase_recover_callbacks); - - /* - * Release locks held by the standby process after we process each - * prepared transaction. As a result, we don't need too many - * additional locks at any one time. - */ - if (InHotStandby) - StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); - - /* - * We're done with recovering this transaction. Clear - * MyLockedGxact, like we do in PrepareTransaction() during normal - * operation. - */ - PostPrepare_Twophase(); - pfree(buf); } } FreeDir(cldir); } + +/* + * RecoverPreparedFromXLOG + * + * To avoid the creation of twophase state files during replay we register + * WAL records for prepared transactions in shared memory in the same way + * during normal operations. If replay faces a WAL record for a COMMIT + * PREPARED transaction before a checkpoint or restartpoint happens then + * no files are used, limiting the I/O impact of such operations during + * recovery. + */ +void +RecoverPreparedFromXLOG(XLogReaderState *record) +{ + GlobalTransaction gxact; + + gxact = RecoverPreparedFromBuffer((char *) XLogRecGetData(record), false); + gxact->prepare_start_lsn = record->ReadRecPtr; + gxact->prepare_end_lsn = record->EndRecPtr; + MarkAsPrepared(gxact); +} + + /* * RecordTransactionCommitPrepared * diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e11b229..6a40425 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5602,7 +5602,7 @@ xact_redo(XLogReaderState *record) Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, record->EndRecPtr, XLogRecGetOrigin(record)); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + XlogRedoFinishPrepared(parsed.twophase_xid, true); } } else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) @@ -5622,14 +5622,12 @@ xact_redo(XLogReaderState *record) { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_abort(&parsed, parsed.twophase_xid); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + XlogRedoFinishPrepared(parsed.twophase_xid, false); } } else if (info == XLOG_XACT_PREPARE) { - /* the record contents are exactly the 2PC file */ - RecreateTwoPhaseFile(XLogRecGetXid(record), - XLogRecGetData(record), XLogRecGetDataLen(record)); + RecoverPreparedFromXLOG(record); } else if (info == XLOG_XACT_ASSIGNMENT) { diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2189c22..613097f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6704,7 +6704,7 @@ StartupXLOG(void) ProcArrayApplyRecoveryInfo(&running); - StandbyRecoverPreparedTransactions(false); + RecoverPreparedFromFiles(false); } } @@ -7463,7 +7463,7 @@ StartupXLOG(void) TrimMultiXact(); /* Reload shared-memory state for prepared transactions */ - RecoverPreparedTransactions(); + RecoverPreparedFromFiles(false); /* * Shutdown the recovery environment. This must occur after @@ -9377,7 +9377,7 @@ xlog_redo(XLogReaderState *record) ProcArrayApplyRecoveryInfo(&running); - StandbyRecoverPreparedTransactions(true); + RecoverPreparedFromFiles(true); } /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 547f1a8..5ea2530 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -608,7 +608,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) /* Already processed? */ if (!TransactionIdIsValid(xid) || TransactionIdDidCommit(xid) || - TransactionIdDidAbort(xid)) + TransactionIdDidAbort(xid) || + StandbyTransactionIdIsPrepared(xid)) return; elog(trace_recovery(DEBUG4), diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index b7ce0c6..416ef5e 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -17,6 +17,7 @@ #include "access/xlogdefs.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "access/xlogreader.h" /* * GlobalTransactionData is defined in twophase.c; other places have no @@ -46,8 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); -extern void StandbyRecoverPreparedTransactions(bool overwriteOK); -extern void RecoverPreparedTransactions(void); +extern void RecoverPreparedFromFiles(bool overwriteOK); +extern void RecoverPreparedFromXLOG(XLogReaderState *record); extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); @@ -56,4 +57,5 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); +extern void XlogRedoFinishPrepared(TransactionId xid, bool isCommit); #endif /* TWOPHASE_H */ diff --git a/src/test/recovery/t/008_twophase.pl b/src/test/recovery/t/008_twophase.pl new file mode 100644 index 0000000..3c203cd --- /dev/null +++ b/src/test/recovery/t/008_twophase.pl @@ -0,0 +1,249 @@ +# Tests dedicated to two-phase commit in recovery +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 12; + +# Setup master node +my $node_master = get_new_node("master"); +$node_master->init(allows_streaming => 1); +$node_master->append_conf('postgresql.conf', qq( + max_prepared_transactions = 10 +)); +$node_master->start; +$node_master->backup('master_backup'); +$node_master->psql('postgres', "create table t(id int)"); + +# Setup master node +my $node_slave = get_new_node('slave'); +$node_slave->init_from_backup($node_master, 'master_backup', has_streaming => 1); +$node_slave->start; + +# Switch to synchronous replication +$node_master->append_conf('postgresql.conf', qq( + synchronous_standby_names = '*' +)); +$node_master->psql('postgres', "select pg_reload_conf()"); + +my $psql_out = ''; +my $psql_rc = ''; + +############################################################################### +# Check that we can commit and abort tx after soft restart. +# Here checkpoint happens before shutdown and no WAL replay will occur at next +# startup. In this case postgres re-create shared-memory state from twophase +# files. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + begin; + insert into t values (142); + prepare transaction 'y';"); +$node_master->stop; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared transaction after restart.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared transaction after restart.'); + +############################################################################### +# Check that we can commit and abort after hard restart. +# At next startup, WAL replay will re-create shared memory state for preaped +# using dedicated WAL records. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + prepare transaction 'x'; + begin; + insert into t values (142); + prepare transaction 'y';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Commit prepared tx after teardown.'); + +$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'"); +is($psql_rc, '0', 'Rollback prepared transaction after teardown.'); + +############################################################################### +# Check that WAL replay can handle several transactions with same name GID. +############################################################################### + +$node_master->psql('postgres', " + checkpoint; + begin; + insert into t values (42); + prepare transaction 'x'; + commit prepared 'x'; + begin; + insert into t values (42); + prepare transaction 'x';"); +$node_master->teardown_node; +$node_master->start; + +$psql_rc = $node_master->psql('postgres', "commit prepared 'x'"); +is($psql_rc, '0', 'Replay several transactions with same GID.'); + +############################################################################### +# Check that WAL replay cleans up its shared memory state and releases locks +# while replaying transaction commits. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + commit prepared 'x';"); +$node_master->teardown_node; +$node_master->start; +$psql_rc = $node_master->psql('postgres', "begin; + insert into t values (42); + -- This prepare can fail due to conflicting GID or locks conflicts if + -- replay did not fully cleanup its state on previous commit. + prepare transaction 'x';"); +is($psql_rc, '0', "Cleanup of shared memory state for 2PC commit"); + +$node_master->psql('postgres', "commit prepared 'x'"); + +############################################################################### +# Check that WAL replay will cleanup its shared memory state on running slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x'; + commit prepared 'x';"); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", + stdout => \$psql_out); +is($psql_out, '0', + "Cleanup of shared memory state on running standby without checkpoint."); + +############################################################################### +# Same as in previous case, but let's force checkpoint on slave between +# prepare and commit to use on-disk twophase files. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x';"); +$node_slave->psql('postgres', "checkpoint;"); +$node_master->psql('postgres', "commit prepared 'x';"); +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;", + stdout => \$psql_out); +is($psql_out, '0', + "Cleanup of shared memory state on running standby after checkpoint."); + +############################################################################### +# Check that prepared transactions can be committed on promoted slave. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x';"); +$node_master->teardown_node; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$psql_rc = $node_slave->psql('postgres', "commit prepared 'x';"); +is($psql_rc, '0', "Restore of prepared transaction on promoted slave."); + +# change roles +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; + +############################################################################### +# Check that prepared transactions are replayed after soft restart of standby +# while master is down. Since standby knows that master is down it uses +# different code path on start to be sure that the status of transactions is +# consistent. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (42); + prepare transaction 'x';"); +$node_master->stop; +$node_slave->restart; +$node_slave->promote; +$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '1', + "Restore prepared transactions from files with master down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres', "commit prepared 'x'"); + +############################################################################### +# Check that prepared transactions are correctly replayed after slave hard +# restart while master is down. +############################################################################### + +$node_master->psql('postgres', " + begin; + insert into t values (242); + prepare transaction 'x'; + "); +$node_master->stop; +$node_slave->teardown_node; +$node_slave->start; +$node_slave->promote; +$node_slave->poll_query_until('postgres', + "SELECT pg_is_in_recovery() <> true"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '1', + "Restore prepared transactions from records with master down."); + +# restore state +($node_master, $node_slave) = ($node_slave, $node_master); +$node_slave->enable_streaming($node_master); +$node_slave->append_conf('recovery.conf', qq( +recovery_target_timeline='latest' +)); +$node_slave->start; +$node_master->psql('postgres', "commit prepared 'x'"); + + +############################################################################### +# Check for a lock confcict between prepared tx with DDL inside and replay of +# XLOG_STANDBY_LOCK wal record. +############################################################################### + +$node_master->psql('postgres', " + begin; + create table t2(id int); + prepare transaction 'x'; + -- checkpoint will issue XLOG_STANDBY_LOCK that can conflict with lock + -- held by 'create table' statement + checkpoint; + commit prepared 'x';"); + +$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts", + stdout => \$psql_out); +is($psql_out, '0', "Replay prepared transaction with DDL.");