diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 91d27d0..6e9c95c 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -116,6 +116,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars data += sizeof(xl_xact_origin); } + + if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xl_snapshot_safety = + (xl_xact_snapshot_safety *) data; + + parsed->snapshot_token = xl_snapshot_safety->token; + parsed->snapshot_safety = xl_snapshot_safety->safety; + + data += sizeof(xl_xact_snapshot_safety); + } } void diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e11b229..7cb5562 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5114,6 +5114,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; xl_xact_origin xl_origin; + xl_xact_snapshot_safety xl_snapshot_safety; uint8 info; @@ -5187,6 +5188,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } + if (IsolationIsSerializable()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY; + GetHypotheticalSnapshotSafety(&xl_snapshot_safety.token, + &xl_snapshot_safety.safety); + } + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; @@ -5231,6 +5239,10 @@ XactLogCommitRecord(TimestampTz commit_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + XLogRegisterData((char *) (&xl_snapshot_safety), + sizeof(xl_xact_snapshot_safety)); + /* we allow filtering by xacts */ XLogIncludeOrigin(); @@ -5400,12 +5412,20 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionIdAsyncCommitTree( xid, parsed->nsubxacts, parsed->subxacts, lsn); + /* Coordinate atomic update of snapshot safety and ProcArray. */ + if (parsed->snapshot_token != 0) + BeginHypotheticalSnapshotReplay(parsed->snapshot_token, + parsed->snapshot_safety); + /* * We must mark clog before we update the ProcArray. */ ExpireTreeKnownAssignedTransactionIds( xid, parsed->nsubxacts, parsed->subxacts, max_xid); + if (parsed->snapshot_token != 0) + CompleteHypotheticalSnapshotReplay(); + /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as @@ -5575,6 +5595,34 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) } } +XLogRecPtr +XactLogSnapshotSafetyRecord(uint64 token, SnapshotSafety safety) +{ + xl_xact_snapshot_safety snapshot_safety; + + snapshot_safety.token = token; + snapshot_safety.safety = safety; + + XLogBeginInsert(); + XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety)); + return XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY); +} + +static void +xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety) +{ + /* + * Any earlier COMMIT record must have carried a snapshot safety message + * the same token as this record, and had safety == + * SNAPSHOT_SAFETY_UNKNOWN. This new independent snapshot safety message + * reports that the safety is now known. We will wake any backend that is + * waiting to learn if the snapshot is safe. + */ + if (standbyState >= STANDBY_INITIALIZED) + NotifyHypotheticalSnapshotSafety(snapshot_safety->token, + snapshot_safety->safety); +} + void xact_redo(XLogReaderState *record) { @@ -5639,6 +5687,13 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xlrec = + (xl_xact_snapshot_safety *) XLogRecGetData(record); + + xact_redo_snapshot_safety(xlrec); + } else elog(PANIC, "xact_redo: unknown op code %u", info); } diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index defafa5..9852e5b 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -567,14 +567,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source) GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction"); return false; } - /* Can't go to serializable mode while recovery is still active */ - if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress()) - { - GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED); - GUC_check_errmsg("cannot use serializable mode in a hot standby"); - GUC_check_errhint("You can use REPEATABLE READ instead."); - return false; - } } *extra = malloc(sizeof(int)); diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 24ed21b..ff67019 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -277,6 +277,7 @@ #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0) #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0) #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0) +#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0) /* * Compute the hash code associated with a PREDICATELOCKTARGETTAG. @@ -427,7 +428,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize); static void SummarizeOldestCommittedSxact(void); static Snapshot GetSafeSnapshot(Snapshot snapshot); static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid); + TransactionId sourcexid, + SnapshotSafety *snapshot_safety); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); @@ -1207,6 +1209,9 @@ InitPredicateLocks(void) PredXact->OldCommittedSxact->xmin = InvalidTransactionId; PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED; PredXact->OldCommittedSxact->pid = 0; + SHMQueueInit(&PredXact->snapshotSafetyWaitList); + PredXact->LastReplayedHypotheticalSnapshotToken = 0; + PredXact->LastReplayedHypotheticalSnapshotSafety = 0; } /* This never changes, so let's keep a local copy. */ OldCommittedSxact = PredXact->OldCommittedSxact; @@ -1491,6 +1496,7 @@ static Snapshot GetSafeSnapshot(Snapshot origSnapshot) { Snapshot snapshot; + SnapshotSafety snapshot_safety; Assert(XactReadOnly && XactDeferrable); @@ -1503,10 +1509,39 @@ GetSafeSnapshot(Snapshot origSnapshot) * one passed to it, but we avoid assuming that here. */ snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, - InvalidTransactionId); + InvalidTransactionId, + &snapshot_safety); - if (MySerializableXact == InvalidSerializableXact) - return snapshot; /* no concurrent r/w xacts; it's safe */ + if (RecoveryInProgress()) + { + /* + * Check if the most recently replayed COMMIT record was either + * known to be safe because it had no concurrent r/w xacts on the + * primary, or has subsequently been declared safe by a snapshot + * safety record. + */ + if (snapshot_safety == SNAPSHOT_SAFE) + return snapshot; + else if (snapshot_safety == SNAPSHOT_UNSAFE) + { + /* + * TODO: Can this happen? A SNAPSHOT_UNSAFE state can only be + * generated by ReleasePredicateLocks *after* a commit record + * which would establish a new hypothetical snapshot. So we + * can never take a snapshot here that is already known to be + * unsafe; that is, there can never be a commit record with + * unknown followed by snapshot safety record that immediately + * marks it unsafe, because there must be a new commit in + * between. Right? + */ + continue; + } + } + else + { + if (MySerializableXact == InvalidSerializableXact) + return snapshot; /* no concurrent r/w xacts; it's safe */ + } LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); @@ -1514,20 +1549,48 @@ GetSafeSnapshot(Snapshot origSnapshot) * Wait for concurrent transactions to finish. Stop early if one of * them marked us as conflicted. */ - MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; - while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || - SxactIsROUnsafe(MySerializableXact))) + if (RecoveryInProgress()) { - LWLockRelease(SerializableXactHashLock); - ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); - LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + /* + * Running on a standby. Wait for a the primary to tell us the + * result of testing a hypothetical transaction whose + * serializability matches the snapshot we have. + */ + Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN); + Assert(!SHMQueueIsDetached(&MyProc->safetyLinks)); + while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN) + { + LWLockRelease(SerializableXactHashLock); + ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + } + if (MyProc->snapshotSafety == SNAPSHOT_SAFE) + { + LWLockRelease(SerializableXactHashLock); + break; /* success */ + } } - MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; - - if (!SxactIsROUnsafe(MySerializableXact)) + else { - LWLockRelease(SerializableXactHashLock); - break; /* success */ + /* + * Running on primary. Wait for a signal from one of the backends + * that we possibly conflict with. + */ + MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; + while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || + SxactIsROUnsafe(MySerializableXact))) + { + LWLockRelease(SerializableXactHashLock); + ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + } + MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; + + if (!SxactIsROUnsafe(MySerializableXact)) + { + LWLockRelease(SerializableXactHashLock); + break; /* success */ + } } LWLockRelease(SerializableXactHashLock); @@ -1542,13 +1605,64 @@ GetSafeSnapshot(Snapshot origSnapshot) /* * Now we have a safe snapshot, so we don't need to do any further checks. */ - Assert(SxactIsROSafe(MySerializableXact)); + Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact)); ReleasePredicateLocks(false); return snapshot; } /* + * When the primary server has determined the safety of a hypothetical + * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a + * COMMIT record, it emits a WAL record that causes the recovery process on + * standbys to call this function. Here, we will wake up any backend that is + * currently waiting in GetSafeSnapshot to learn about the safety of a + * snapshot taken after that point in the transaction stream. + */ +void +NotifyHypotheticalSnapshotSafety(uint64 token, SnapshotSafety safety) +{ + PGPROC *proc; + PGPROC *next; + + Assert(AmStartupProcess()); + + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + + /* + * Walk the list of processes that are waiting in GetSafeSnapshot on a + * standby, and find any that are waiting to learn the safety of a + * snapshot taken at a point in time when this token appeared onthe most + * recently replayed SSI transaction. If we find any of those, tell them + * the final status for and wake them up. + */ + proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList, + &PredXact->snapshotSafetyWaitList, + offsetof(PGPROC, safetyLinks)); + while (proc != NULL) + { + next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList, + &proc->safetyLinks, + offsetof(PGPROC, safetyLinks)); + if (proc->waitSnapshotToken == token) + { + SHMQueueDelete(&proc->safetyLinks); + proc->snapshotSafety = safety; + ProcSendSignal(proc->pid); + } + proc = next; + } + + /* + * If this happens to be the most recently replayed snapshot token then + * remember this safety value. + */ + if (PredXact->LastReplayedHypotheticalSnapshotToken == token) + PredXact->LastReplayedHypotheticalSnapshotSafety = safety; + LWLockRelease(SerializableXactHashLock); +} + +/* * Acquire a snapshot that can be used for the current transaction. * * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact. @@ -1570,7 +1684,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) * check_XactIsoLevel() if default_transaction_isolation is set to * serializable, so phrase the hint accordingly. */ - if (RecoveryInProgress()) + if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use serializable mode in a hot standby"), @@ -1586,7 +1700,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) return GetSafeSnapshot(snapshot); return GetSerializableTransactionSnapshotInt(snapshot, - InvalidTransactionId); + InvalidTransactionId, + NULL); } /* @@ -1616,7 +1731,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE"))); - (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid); + (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid, NULL); } /* @@ -1627,10 +1742,15 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, * loaded up. HOWEVER: to avoid race conditions, we must check that the * source xact is still running after we acquire SerializableXactHashLock. * We do that by calling ProcArrayInstallImportedXmin. + * + * If snapshot_safety is a non-NULL pointer, then the safety of this snapshot + * if used on a standby server is written to it. If it is SNAPSHOT_SAFE, then + * the snapshot may be safely used. If it is SNAPSHOT_SAFETY_UNKNOWN, then */ static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid) + TransactionId sourcexid, + SnapshotSafety *snapshot_safety) { PGPROC *proc; VirtualTransactionId vxid; @@ -1641,8 +1761,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, /* We only do this for serializable transactions. Once. */ Assert(MySerializableXact == InvalidSerializableXact); - Assert(!RecoveryInProgress()); - /* * Since all parts of a serializable transaction must use the same * snapshot, it is too late to establish one after a parallel operation @@ -1683,6 +1801,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, } } while (!sxact); + /* + * Note the snapshot safety information for standbys. This can be used to + * know if the returned snapshot is already known to be safe/unsafe, or if + * we must wait for notification of the final safety determination. + */ + if (snapshot_safety != NULL) + { + *snapshot_safety = PredXact->LastReplayedHypotheticalSnapshotSafety; + if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN) + { + /* + * We must put the this process into the waitlist while we hold + * the lock or there would be a race condition where we might miss + * a notification. The caller must wait for + * MyProc->snapshotSafety to be set to a final value. + */ + MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN; + MyProc->waitSnapshotToken = PredXact->LastReplayedHypotheticalSnapshotToken; + if (SHMQueueIsDetached(&MyProc->safetyLinks)) + SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList, + &MyProc->safetyLinks); + } + } + /* Get the snapshot, or check that it's safe to use */ if (!TransactionIdIsValid(sourcexid)) snapshot = GetSnapshotData(snapshot); @@ -3439,12 +3581,35 @@ ReleasePredicateLocks(bool isCommit) /* * Wake up the process for a waiting DEFERRABLE transaction if we - * now know it's either safe or conflicted. + * now know it's either safe or conflicted. This releases + * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary. */ if (SxactIsDeferrableWaiting(roXact) && (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) ProcSendSignal(roXact->pid); + /* + * If a hypothetical transaction is now known to be safe or + * unsafe, we can report that in the WAL for the benefit of + * standbys and recycle it. This releases SERIALIZABLE READ ONLY + * DEFERRABLE transactions that are waiting for the status of this + * particular hypothetical tranasactions on any standby that + * replays it. + */ + if (SxactIsHypothetical(roXact) && + (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) + { + SnapshotSafety safety; + + if (SxactIsROSafe(roXact)) + safety = SNAPSHOT_SAFE; + else + safety = SNAPSHOT_UNSAFE; + XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot, + safety); + ReleasePredXact(roXact); + } + possibleUnsafeConflict = nextConflict; } } @@ -4704,6 +4869,78 @@ PreCommit_CheckForSerializationFailure(void) MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo); MySerializableXact->flags |= SXACT_FLAG_PREPARED; + /* + * For the benefit of hot standby servers that want to take a safe + * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a + * hypothetical read-only serializable transaction that starts after this + * transaction commits would be safe. + */ + if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1)) + { + /* + * There are no concurrent writable SERIALIZABLE transactions. A + * read-only snapshot taken immediately after this one commits is + * safe. + */ + MySerializableXact->hypotheticalSnapshotSafety = SNAPSHOT_SAFE; + } + else + { + SERIALIZABLEXACT *sxact; + SERIALIZABLEXACT *othersxact; + + /* + * We can't yet determine whether a read-only transaction beginning + * now would be safe. Create a hypothetical SERIALIZABLEXACT and let + * ReleasePredicateLocks report on its safety once that can be + * determined. + */ + sxact = CreatePredXact(); + if (sxact == NULL) + { + /* Out of space. Don't allow SERIALIZABLE on standbys. */ + MySerializableXact->hypotheticalSnapshotSafety = SNAPSHOT_UNSAFE; + } + else + { + SetInvalidVirtualTransactionId(sxact->vxid); + sxact->SeqNo.lastCommitBeforeSnapshot = + MySerializableXact->prepareSeqNo; + sxact->prepareSeqNo = InvalidSerCommitSeqNo; + sxact->commitSeqNo = InvalidSerCommitSeqNo; + SHMQueueInit(&(sxact->outConflicts)); + SHMQueueInit(&(sxact->inConflicts)); + SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + sxact->topXid = InvalidTransactionId; + sxact->finishedBefore = InvalidTransactionId; + sxact->xmin = MySerializableXact->xmin; /* TODO ?! */ + sxact->pid = InvalidPid; + SHMQueueInit(&(sxact->predicateLocks)); + SHMQueueElemInit(&(sxact->finishedLink)); + sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL; + + /* Register concurrent r/w transactions as possible conflicts. */ + for (othersxact = FirstPredXact(); + othersxact != NULL; + othersxact = NextPredXact(othersxact)) + { + if (!SxactIsCommitted(othersxact) + && !SxactIsDoomed(othersxact) + && !SxactIsReadOnly(othersxact)) + { + SetPossibleUnsafeConflict(sxact, othersxact); + } + } + + /* + * The status will be reported in a later WAL record once it has + * been determined. + */ + MySerializableXact->hypotheticalSnapshotSafety = + SNAPSHOT_SAFETY_UNKNOWN; + } + } + LWLockRelease(SerializableXactHashLock); } @@ -4966,3 +5203,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, CreatePredicateLock(&lockRecord->target, targettaghash, sxact); } } + +/* + * Accessor for the hypothetical snapshot safety information needed for commit + * records generated on primary servers. This is used by XlactLogCommitRecord + * to receive the safety level computed by + * PreCommit_CheckForSerializationFailure in a committing SSI transaction. + */ +void +GetHypotheticalSnapshotSafety(uint64 *token, SnapshotSafety *safety) +{ + *token = MySerializableXact->prepareSeqNo; + *safety = MySerializableXact->hypotheticalSnapshotSafety; +} + +/* + * If a commit record contains SSI snapshot safety information then we need to + * update that atomically with ProcArray from the point of view of anyone + * taking a serializable snapshot. We achive that by holding + * SerializableXactHashLock, mirroring the way + * GetSerializableTransactionSnapshotInt does that when acquiring a snapshot. + * The recovery process should wrap its call to + * ExpireTreeKnownAssignedTransactionIds in calls to + * BeginHypotheticalSnapshotReplay and CompleteHypotheticalSnapshotReplay when + * it has safety information. + */ +void +BeginHypotheticalSnapshotReplay(uint64 token, SnapshotSafety safety) +{ + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + PredXact->LastReplayedHypotheticalSnapshotToken = token; + PredXact->LastReplayedHypotheticalSnapshotSafety = safety; +} + +void +CompleteHypotheticalSnapshotReplay(void) +{ + LWLockRelease(SerializableXactHashLock); +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index b201631..bf2e2b7 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -394,6 +394,11 @@ InitProcess(void) MyProc->syncRepState = SYNC_REP_NOT_WAITING; SHMQueueElemInit(&(MyProc->syncRepLinks)); + /* Initialize fields for SERIALIZABLE on standbys */ + MyProc->waitSnapshotToken = 0; + MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN; + SHMQueueElemInit(&(MyProc->safetyLinks)); + /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; MyProc->procArrayGroupMemberXid = InvalidTransactionId; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index a123d2a..3282dfa 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -74,6 +74,13 @@ extern int synchronous_commit; /* Kluge for 2PC support */ extern bool MyXactAccessedTempRel; +typedef enum SnapshotSafety +{ + SNAPSHOT_SAFE, + SNAPSHOT_UNSAFE, + SNAPSHOT_SAFETY_UNKNOWN +} SnapshotSafety; + /* * start- and end-of-transaction callbacks for dynamically loaded modules */ @@ -118,7 +125,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_SNAPSHOT_SAFETY 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -137,6 +144,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_INVALS (1U << 3) #define XACT_XINFO_HAS_TWOPHASE (1U << 4) #define XACT_XINFO_HAS_ORIGIN (1U << 5) +#define XACT_XINFO_HAS_SNAPSHOT_SAFETY (1U << 6) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -232,6 +240,12 @@ typedef struct xl_xact_origin TimestampTz origin_timestamp; } xl_xact_origin; +typedef struct xl_xact_snapshot_safety +{ + uint64 token; + SnapshotSafety safety; +} xl_xact_snapshot_safety; + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ @@ -243,6 +257,7 @@ typedef struct xl_xact_commit /* xl_xact_invals follows if XINFO_HAS_INVALS */ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ /* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */ + /* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */ } xl_xact_commit; #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) @@ -286,6 +301,9 @@ typedef struct xl_xact_parsed_commit XLogRecPtr origin_lsn; TimestampTz origin_timestamp; + + uint64 snapshot_token; + SnapshotSafety snapshot_safety; } xl_xact_parsed_commit; typedef struct xl_xact_parsed_abort @@ -370,6 +388,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, TransactionId twophase_xid); + +extern XLogRecPtr XactLogSnapshotSafetyRecord(uint64, SnapshotSafety safety); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index a66b5b7..c59e1c6 100644 --- a/src/include/storage/predicate.h +++ b/src/include/storage/predicate.h @@ -14,6 +14,7 @@ #ifndef PREDICATE_H #define PREDICATE_H +#include "access/xact.h" /* for SnapshotSafe; where else to put that? */ #include "utils/relcache.h" #include "utils/snapshot.h" @@ -27,7 +28,6 @@ extern int max_predicate_locks_per_xact; /* Number of SLRU buffers to use for predicate locking */ #define NUM_OLDSERXID_BUFFERS 16 - /* * function prototypes */ @@ -70,4 +70,10 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); extern void predicatelock_twophase_recover(TransactionId xid, uint16 info, void *recdata, uint32 len); +/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */ +extern void GetHypotheticalSnapshotSafety(uint64 *token, SnapshotSafety *safety); +extern void NotifyHypotheticalSnapshotSafety(uint64 token, SnapshotSafety safety); +extern void BeginHypotheticalSnapshotReplay(uint64 token, SnapshotSafety safety); +extern void CompleteHypotheticalSnapshotReplay(void); + #endif /* PREDICATE_H */ diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 3175d28..ce97a1b 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT */ SHM_QUEUE possibleUnsafeConflicts; + /* + * for committing transactions: would a hypothetical read-only snapshot + * taken immediately after this transaction commits be safe? + */ + SnapshotSafety hypotheticalSnapshotSafety; + TransactionId topXid; /* top level xid for the transaction, if one * exists; else invalid */ TransactionId finishedBefore; /* invalid means still running; else @@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT #define SXACT_FLAG_RO_UNSAFE 0x00000100 #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400 +#define SXACT_FLAG_HYPOTHETICAL 0x00000800 /* * The following types are used to provide an ad hoc list for holding @@ -173,6 +180,11 @@ typedef struct PredXactListData * seq no */ SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */ + /* Tracking of snapshot safety on standby servers. */ + SHM_QUEUE snapshotSafetyWaitList; + uint64 LastReplayedHypotheticalSnapshotToken; + SnapshotSafety LastReplayedHypotheticalSnapshotSafety; + PredXactListElement element; } PredXactListData; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 7dc8dac..4df6eb3 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -133,6 +133,11 @@ struct PGPROC int syncRepState; /* wait state for sync rep */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + /* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */ + uint64 waitSnapshotToken; + int snapshotSafety; /* space for result */ + SHM_QUEUE safetyLinks; /* list link for GetSafeSnapshot */ + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of