diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 0a8edb9..75ddd04 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -45,8 +45,26 @@ * fsynced * * If COMMIT happens after checkpoint then backend reads state data from * files - * * In case of crash replay will move data from xlog to files, if that - * hasn't happened before. XXX TODO - move to shmem in replay also + * + * During replay and replication, TwoPhaseState also holds information + * about active prepared transactions that haven't been moved to disk yet. + * + * Replay of twophase records happens by the following rules: + * + * * On PREPARE redo we add the transaction to TwoPhaseState->prepXacts. + * We set gxact->inredo to true for such entries. + * + * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries + * that have gxact->inredo set and are behind the redo_horizon. We + * save them to disk and also set gxact->ondisk to true. + * + * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts. + * If gxact->ondisk is true, we delete the corresponding entry from + * the disk as well. + * + * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions() + * and PrescanPreparedTransactions() have been modified to go through + * gxact->inredo entries that have not made to disk yet. * *------------------------------------------------------------------------- */ @@ -147,11 +165,13 @@ typedef struct GlobalTransactionData */ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ + TransactionId xid; /* The GXACT id */ Oid owner; /* ID of user that executed the xact */ BackendId locking_backend; /* backend currently working on the xact */ bool valid; /* TRUE if PGPROC entry is in proc array */ bool ondisk; /* TRUE if prepare state file is on disk */ + bool inredo; /* TRUE if entry was added via xlog_redo */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ } GlobalTransactionData; @@ -198,6 +218,10 @@ static void ProcessRecords(char *bufptr, TransactionId xid, static void RemoveGXact(GlobalTransaction gxact); static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); +static char *ProcessTwoPhaseBufferAndReturn(TransactionId xid, + XLogRecPtr prepare_start_lsn, + bool fromdisk, bool overwriteOK, bool setParent, + TransactionId *result, TransactionId *maxsubxid); /* * Initialization of shared memory @@ -349,12 +373,14 @@ PostPrepare_Twophase(void) */ GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, - TimestampTz prepared_at, Oid owner, Oid databaseid) + TimestampTz prepared_at, Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn) { GlobalTransaction gxact; PGPROC *proc; PGXACT *pgxact; int i; + bool found = false; if (strlen(gid) >= GIDSIZE) ereport(ERROR, @@ -384,22 +410,32 @@ MarkAsPreparing(TransactionId xid, const char *gid, gxact = TwoPhaseState->prepXacts[i]; if (strcmp(gxact->gid, gid) == 0) { - ereport(ERROR, + /* It's ok to find an entry in the redo/recovery case */ + if (!gxact->inredo) + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("transaction identifier \"%s\" is already in use", gid))); + else + { + found = true; + break; + } } } /* Get a free gxact from the freelist */ - if (TwoPhaseState->freeGXacts == NULL) + if (!found && TwoPhaseState->freeGXacts == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("maximum number of prepared transactions reached"), errhint("Increase max_prepared_transactions (currently %d).", max_prepared_xacts))); - gxact = TwoPhaseState->freeGXacts; - TwoPhaseState->freeGXacts = gxact->next; + if (!found) + { + gxact = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = gxact->next; + } proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -431,18 +467,24 @@ MarkAsPreparing(TransactionId xid, const char *gid, pgxact->nxids = 0; gxact->prepared_at = prepared_at; - /* initialize LSN to InvalidXLogRecPtr */ - gxact->prepare_start_lsn = InvalidXLogRecPtr; - gxact->prepare_end_lsn = InvalidXLogRecPtr; + /* initialize LSN to passed in values */ + gxact->prepare_start_lsn = prepare_start_lsn; + gxact->prepare_end_lsn = prepare_end_lsn; + gxact->xid = xid; gxact->owner = owner; gxact->locking_backend = MyBackendId; gxact->valid = false; gxact->ondisk = false; + gxact->inredo = false; strcpy(gxact->gid, gid); /* And insert it into the active array */ - Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); - TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + if (!found) + { + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + } + /* * Remember that we have this GlobalTransaction entry locked for us. If we @@ -456,6 +498,58 @@ MarkAsPreparing(TransactionId xid, const char *gid, } /* + * MarkAsPreparingInRedo + * Reserve the GID for the given transaction in the redo code path. + * + * Internally, this creates a gxact struct and puts it into the active array. + * + * In redo, this struct is mainly used to track PREPARE/COMMIT entries + * in shared memory. Hence, we only fill up the bare minimum contents here. + * The gxact also gets marked with gxact->inredo set to true to indicate + * that it got added in the redo phase + */ +GlobalTransaction +MarkAsPreparingInRedo(TransactionId xid, const char *gid, + TimestampTz prepared_at, Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn) +{ + GlobalTransaction gxact; + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + /* Get a free gxact from the freelist */ + if (TwoPhaseState->freeGXacts == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of prepared transactions reached"), + errhint("Increase max_prepared_transactions (currently %d).", + max_prepared_xacts))); + gxact = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = gxact->next; + + + gxact->prepared_at = prepared_at; + /* initialize LSN to passed in values */ + gxact->prepare_start_lsn = prepare_start_lsn; + gxact->prepare_end_lsn = prepare_end_lsn; + gxact->xid = xid; + gxact->owner = owner; + gxact->locking_backend = InvalidBackendId; + gxact->valid = false; + gxact->ondisk = false; + gxact->inredo = true; /* yes, added in redo */ + strcpy(gxact->gid, gid); + + /* And insert it into the active array */ + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + + LWLockRelease(TwoPhaseStateLock); + + return gxact; +} + +/* * GXactLoadSubxactData * * If the transaction being persisted had any subtransactions, this must @@ -1241,9 +1335,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) * Reads 2PC data from xlog. During checkpoint this data will be moved to * twophase files and ReadTwoPhaseFile should be used instead. * - * Note clearly that this function accesses WAL during normal operation, similarly - * to the way WALSender or Logical Decoding would do. It does not run during - * crash recovery or standby processing. + * Note clearly that this function can access WAL during normal operation, similarly + * to the way WALSender or Logical Decoding would do. + * */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1252,8 +1346,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - Assert(!RecoveryInProgress()); - xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, @@ -1623,9 +1715,8 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - if (gxact->valid && + if ((gxact->valid || gxact->inredo) && !gxact->ondisk && gxact->prepare_end_lsn <= redo_horizon) { @@ -1633,7 +1724,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) int len; XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); - RecreateTwoPhaseFile(pgxact->xid, buf, len); + RecreateTwoPhaseFile(gxact->xid, buf, len); gxact->ondisk = true; pfree(buf); serialized_xacts++; @@ -1661,6 +1752,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) * reading WAL. ShmemVariableCache->nextXid has been set to one more than * the highest XID for which evidence exists in WAL. * + * Additionally, scan TwoPhaseState shmem entries that are marked as added + * in redo and are not already on disk. + * * We throw away any prepared xacts with main XID beyond nextXid --- if any * are present, it suggests that the DBA has done a PITR recovery to an * earlier point in time without cleaning out pg_twophase. We dare not @@ -1685,11 +1779,13 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) { TransactionId origNextXid = ShmemVariableCache->nextXid; TransactionId result = origNextXid; + TransactionId maxsubxid = origNextXid; DIR *cldir; struct dirent *clde; TransactionId *xids = NULL; int nxids = 0; int allocsize = 0; + int i; cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) @@ -1699,83 +1795,16 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) { TransactionId xid; char *buf; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); - /* Reject XID if too new */ - if (TransactionIdFollowsOrEquals(xid, origNextXid)) - { - ereport(WARNING, - (errmsg("removing future two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } - /* - * Note: we can't check if already processed because clog - * subsystem isn't up yet. - */ + buf = ProcessTwoPhaseBufferAndReturn(xid, InvalidXLogRecPtr, + true, false, false, + &result, &maxsubxid); - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); continue; - } - - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - if (!TransactionIdEquals(hdr->xid, xid)) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - pfree(buf); - continue; - } - - /* - * OK, we think this file is valid. Incorporate xid into the - * running-minimum result. - */ - if (TransactionIdPrecedes(xid, result)) - result = xid; - - /* - * Examine subtransaction XIDs ... they should all follow main - * XID, and they may force us to advance nextXid. - * - * We don't expect anyone else to modify nextXid, hence we don't - * need to hold a lock while examining it. We still acquire the - * lock to modify it, though. - */ - subxids = (TransactionId *) (buf + - MAXALIGN(sizeof(TwoPhaseFileHeader)) + - MAXALIGN(hdr->gidlen)); - for (i = 0; i < hdr->nsubxacts; i++) - { - TransactionId subxid = subxids[i]; - - Assert(TransactionIdFollows(subxid, xid)); - if (TransactionIdFollowsOrEquals(subxid, - ShmemVariableCache->nextXid)) - { - LWLockAcquire(XidGenLock, LW_EXCLUSIVE); - ShmemVariableCache->nextXid = subxid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - LWLockRelease(XidGenLock); - } - } - if (xids_p) { @@ -1800,12 +1829,63 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) } FreeDir(cldir); + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + TransactionId xid; + char *buf; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* only look at entries added by redo and not already on disk */ + if (!gxact->inredo || gxact->ondisk) + continue; + + xid = gxact->xid; + + buf = ProcessTwoPhaseBufferAndReturn(xid, gxact->prepare_start_lsn, + false, false, false, + &result, &maxsubxid); + + if (buf == NULL) + continue; + + if (xids_p) + { + if (nxids == allocsize) + { + if (nxids == 0) + { + allocsize = 10; + xids = palloc(allocsize * sizeof(TransactionId)); + } + else + { + allocsize = allocsize * 2; + xids = repalloc(xids, allocsize * sizeof(TransactionId)); + } + } + xids[nxids++] = xid; + } + + pfree(buf); + } + LWLockRelease(TwoPhaseStateLock); + if (xids_p) { *xids_p = xids; *nxids_p = nxids; } + /* update nextXid if needed */ + if (TransactionIdFollowsOrEquals(maxsubxid, ShmemVariableCache->nextXid)) + { + LWLockAcquire(XidGenLock, LW_EXCLUSIVE); + ShmemVariableCache->nextXid = maxsubxid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + LWLockRelease(XidGenLock); + } + return result; } @@ -1814,6 +1894,10 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) * * Scan the pg_twophase directory and setup all the required information to * allow standby queries to treat prepared transactions as still active. + * + * Additionally, scan TwoPhaseState shmem entries that are marked as added + * in redo and are not already on disk. + * * This is never called at the end of recovery - we use * RecoverPreparedTransactions() at that point. * @@ -1826,6 +1910,7 @@ StandbyRecoverPreparedTransactions(bool overwriteOK) { DIR *cldir; struct dirent *clde; + int i; cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) @@ -1835,72 +1920,50 @@ StandbyRecoverPreparedTransactions(bool overwriteOK) { TransactionId xid; char *buf; - TwoPhaseFileHeader *hdr; - TransactionId *subxids; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) - { - ereport(WARNING, - (errmsg("removing stale two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } - - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } - - /* Deconstruct header */ - hdr = (TwoPhaseFileHeader *) buf; - if (!TransactionIdEquals(hdr->xid, xid)) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); + buf = ProcessTwoPhaseBufferAndReturn(xid, InvalidXLogRecPtr, + true, overwriteOK, true, + NULL, NULL); + if (buf != NULL) pfree(buf); - continue; - } + } + } + FreeDir(cldir); - /* - * Examine subtransaction XIDs ... they should all follow main - * XID. - */ - subxids = (TransactionId *) (buf + - MAXALIGN(sizeof(TwoPhaseFileHeader)) + - MAXALIGN(hdr->gidlen)); - for (i = 0; i < hdr->nsubxacts; i++) - { - TransactionId subxid = subxids[i]; + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + TransactionId xid; + char *buf; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - Assert(TransactionIdFollows(subxid, xid)); - SubTransSetParent(xid, subxid, overwriteOK); - } + /* only look at entries added by redo and not already on disk */ + if (!gxact->inredo || gxact->ondisk) + continue; + + xid = gxact->xid; + buf = ProcessTwoPhaseBufferAndReturn(xid, gxact->prepare_start_lsn, + false, overwriteOK, true, + NULL, NULL); + if (buf != NULL) pfree(buf); - } } - FreeDir(cldir); + LWLockRelease(TwoPhaseStateLock); } /* * RecoverPreparedTransactions * * Scan the pg_twophase directory and reload shared-memory state for each - * prepared transaction (reacquire locks, etc). This is run during database - * startup. + * prepared transaction (reacquire locks, etc). + * + * Additionally, scan TwoPhaseState shmem entries that are marked as added + * in redo and are not already on disk. + * + * This is run during database startup. */ void RecoverPreparedTransactions(void) @@ -1909,6 +1972,7 @@ RecoverPreparedTransactions(void) DIR *cldir; struct dirent *clde; bool overwriteOK = false; + int i; snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR); @@ -1925,35 +1989,17 @@ RecoverPreparedTransactions(void) TransactionId *subxids; GlobalTransaction gxact; const char *gid; - int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) - { - ereport(WARNING, - (errmsg("removing stale two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); - continue; - } - - /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); + buf = ProcessTwoPhaseBufferAndReturn(xid, InvalidXLogRecPtr, + true, false, false, + NULL, NULL); if (buf == NULL) - { - ereport(WARNING, - (errmsg("removing corrupt two-phase state file \"%s\"", - clde->d_name))); - RemoveTwoPhaseFile(xid, true); continue; - } ereport(LOG, - (errmsg("recovering prepared transaction %u", xid))); - - /* Deconstruct header */ + (errmsg("recovering prepared transaction %u from state file", xid))); hdr = (TwoPhaseFileHeader *) buf; Assert(TransactionIdEquals(hdr->xid, xid)); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); @@ -1989,7 +2035,8 @@ RecoverPreparedTransactions(void) */ gxact = MarkAsPreparing(xid, gid, hdr->prepared_at, - hdr->owner, hdr->database); + hdr->owner, hdr->database, + InvalidXLogRecPtr, InvalidXLogRecPtr); gxact->ondisk = true; GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); @@ -2018,6 +2065,249 @@ RecoverPreparedTransactions(void) } } FreeDir(cldir); + + /* + * Don't need a lock in the recovery phase. + */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + TransactionId xid; + char *buf; + char *bufptr; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + const char *gid; + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + GlobalTransaction gxactnew; + + /* only look at entries added by redo and not already on disk */ + if (!gxact->inredo || gxact->ondisk) + continue; + + xid = gxact->xid; + + buf = ProcessTwoPhaseBufferAndReturn(xid, gxact->prepare_start_lsn, + false, false, false, + NULL, NULL); + if (buf == NULL) + continue; + + ereport(LOG, + (errmsg("recovering prepared transaction %u from shared memory", xid))); + + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + bufptr += MAXALIGN(hdr->gidlen); + subxids = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); + + /* + * It's possible that SubTransSetParent has been set before, if + * the prepared transaction generated xid assignment records. Test + * here must match one used in AssignTransactionId(). + */ + if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || + XLogLogicalInfoActive())) + overwriteOK = true; + + /* + * Reconstruct subtrans state for the transaction --- needed + * because pg_subtrans is not preserved over a restart. Note that + * we are linking all the subtransactions directly to the + * top-level XID; there may originally have been a more complex + * hierarchy, but there's no need to restore that exactly. + */ + for (i = 0; i < hdr->nsubxacts; i++) + SubTransSetParent(subxids[i], xid, overwriteOK); + + /* + * Recreate its GXACT and dummy PGPROC + */ + gxactnew = MarkAsPreparing(xid, gid, + hdr->prepared_at, + hdr->owner, hdr->database, + gxact->prepare_start_lsn, + gxact->prepare_end_lsn); + + Assert(gxactnew == gxact); + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); + MarkAsPrepared(gxact); + + /* + * Recover other state (notably locks) using resource managers + */ + ProcessRecords(bufptr, xid, twophase_recover_callbacks); + + /* + * Release locks held by the standby process after we process each + * prepared transaction. As a result, we don't need too many + * additional locks at any one time. + */ + if (InHotStandby) + StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); + + /* + * We're done with recovering this transaction. Clear + * MyLockedGxact, like we do in PrepareTransaction() during normal + * operation. + */ + PostPrepare_Twophase(); + + pfree(buf); + } +} + +/* + * Given a transaction id, read it either from disk or read it directly + * via shmem xlog record pointer using the provided "prepare_start_lsn" + * + * If setParent is true, then use the overwriteOK parameter to set up + * subtransaction parent linkages + * + * If result and maxsubxid are not NULL, fill them up with smallest + * running transaction id (lesser than ShmemVariableCache->nextXid) + * and largest subtransaction id for this transaction respectively + */ +static char * +ProcessTwoPhaseBufferAndReturn(TransactionId xid, + XLogRecPtr prepare_start_lsn, + bool fromdisk, bool overwriteOK, + bool setParent, TransactionId *result, + TransactionId *maxsubxid) +{ + TransactionId origNextXid = ShmemVariableCache->nextXid; + TransactionId res; + TransactionId maxsub; + TransactionId *subxids; + char *buf; + TwoPhaseFileHeader *hdr; + int i; + + if (!fromdisk) + Assert(prepare_start_lsn != InvalidXLogRecPtr); + + if (result) + res = *result; + if (maxsubxid) + maxsub = *maxsubxid; + + /* Already processed? */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing stale two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); + } + else + { + ereport(WARNING, + (errmsg("removing stale two-phase state from" + " shared memory for \"%u\"", xid))); + PrepareRedoRemove(xid); + } + return NULL; + } + + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing future two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); + } + else + { + ereport(WARNING, + (errmsg("removing future two-phase state from memory for \"%u\"", + xid))); + PrepareRedoRemove(xid); + } + return NULL; + } + + if (fromdisk) + { + /* Read and validate file */ + buf = ReadTwoPhaseFile(xid, true); + if (buf == NULL) + { + ereport(WARNING, + (errmsg("removing corrupt two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); + return NULL; + } + } + else + { + /* Read xlog data */ + XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL); + } + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + if (!TransactionIdEquals(hdr->xid, xid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing corrupt two-phase state file for \"%u\"", + xid))); + RemoveTwoPhaseFile(xid, true); + } + else + { + ereport(WARNING, + (errmsg("removing corrupt two-phase state from memory for \"%u\"", + xid))); + PrepareRedoRemove(xid); + } + pfree(buf); + return NULL; + } + + /* + * OK, we think this buffer is valid. Incorporate xid into the + * running-minimum result. + */ + if (TransactionIdPrecedes(xid, res)) + res = xid; + + /* + * Examine subtransaction XIDs ... they should all follow main + * XID, and they may force us to advance nextXid. + */ + subxids = (TransactionId *) (buf + + MAXALIGN(sizeof(TwoPhaseFileHeader)) + + MAXALIGN(hdr->gidlen)); + for (i = 0; i < hdr->nsubxacts; i++) + { + TransactionId subxid = subxids[i]; + + Assert(TransactionIdFollows(subxid, xid)); + if (TransactionIdFollowsOrEquals(subxid, maxsub)) + maxsub = subxid; + if (setParent) + SubTransSetParent(xid, subxid, overwriteOK); + } + + if (result) + *result = res; + if (maxsubxid) + *maxsubxid = maxsub; + + return buf; } /* @@ -2162,3 +2452,83 @@ RecordTransactionAbortPrepared(TransactionId xid, */ SyncRepWaitForLSN(recptr, false); } + +/* + * PrepareRedoAdd + * + * Store pointers to the start/end of the WAL record along with the xid in + * a gxact entry in shared memory TwoPhaseState structure + */ +void +PrepareRedoAdd(XLogReaderState *record) +{ + char *buf = XLogRecGetData(record); + TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; + char *bufptr; + const char *gid; + GlobalTransaction gxact; + + Assert(RecoveryInProgress()); + + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + gid = (const char *) bufptr; + + /* + * Add a GXACT entry + */ + gxact = MarkAsPreparingInRedo(hdr->xid, gid, + hdr->prepared_at, + hdr->owner, hdr->database, + record->ReadRecPtr, + record->EndRecPtr); + + elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid); +} + +/* + * PrepareRedoRemove + * + * Remove the corresponding gxact entry from TwoPhaseState. Also + * remove the 2PC file. + */ +void +PrepareRedoRemove(TransactionId xid) +{ + GlobalTransaction gxact; + int i; + bool found = false; + + Assert(RecoveryInProgress()); + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + + if (gxact->xid == xid) + { + Assert(gxact->inredo); + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + if (found) + { + /* + * And now we can clean up any files we may have left. + */ + if (gxact->ondisk) + RemoveTwoPhaseFile(xid, true); + RemoveGXact(gxact); + elog(DEBUG2, "Removing 2PC data from shared memory %u", xid); + } + else + { + /* + * Entry could be on disk. Call with giveWarning=false + * since it can be expected during replay. + */ + RemoveTwoPhaseFile(xid, false); + } +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 82f9a3c..2357048 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2294,7 +2294,8 @@ PrepareTransaction(void) * GID is invalid or already in use. */ gxact = MarkAsPreparing(xid, prepareGID, prepared_at, - GetUserId(), MyDatabaseId); + GetUserId(), MyDatabaseId, + InvalidXLogRecPtr, InvalidXLogRecPtr); prepareGID = NULL; /* @@ -5606,7 +5607,9 @@ xact_redo(XLogReaderState *record) Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, record->EndRecPtr, XLogRecGetOrigin(record)); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + PrepareRedoRemove(parsed.twophase_xid); } } else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) @@ -5626,14 +5629,18 @@ xact_redo(XLogReaderState *record) { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_abort(&parsed, parsed.twophase_xid); - RemoveTwoPhaseFile(parsed.twophase_xid, false); + + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + PrepareRedoRemove(parsed.twophase_xid); } } else if (info == XLOG_XACT_PREPARE) { - /* the record contents are exactly the 2PC file */ - RecreateTwoPhaseFile(XLogRecGetXid(record), - XLogRecGetData(record), XLogRecGetDataLen(record)); + /* + * Store xid and start/end pointers of the WAL record in + * TwoPhaseState gxact entry. + */ + PrepareRedoAdd(record); } else if (info == XLOG_XACT_ASSIGNMENT) { diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index b2b7848..063b946 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xlogreader.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -38,7 +39,12 @@ extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid); extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, - Oid owner, Oid databaseid); + Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn); +extern GlobalTransaction MarkAsPreparingInRedo(TransactionId xid, const char *gid, + TimestampTz prepared_at, + Oid owner, Oid databaseid, + XLogRecPtr prepare_start_lsn, XLogRecPtr prepare_end_lsn); extern void StartPrepare(GlobalTransaction gxact); extern void EndPrepare(GlobalTransaction gxact); @@ -56,4 +62,6 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); +extern void PrepareRedoAdd(XLogReaderState *record); +extern void PrepareRedoRemove(TransactionId xid); #endif /* TWOPHASE_H */