diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 5bedaf2..60cd641 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -2364,15 +2364,6 @@ LOG: database system is ready to accept read only connections your setting of max_prepared_transactions is 0. - - - The Serializable transaction isolation level is not yet available in hot - standby. (See and - for details.) - An attempt to set a transaction to the serializable isolation level in - hot standby mode will generate an error. - - diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml index 306def4..f489714 100644 --- a/doc/src/sgml/mvcc.sgml +++ b/doc/src/sgml/mvcc.sgml @@ -1596,15 +1596,6 @@ SELECT pg_advisory_lock(q.id) FROM See for performance suggestions. - - - - This level of integrity protection using Serializable transactions - does not yet extend to hot standby mode (). - Because of that, those using hot standby may want to use Repeatable - Read and explicit locking on the master. - - @@ -1697,18 +1688,6 @@ SELECT pg_advisory_lock(q.id) FROM could cause visible inconsistency between the contents of the target table and other tables in the database. - - - Support for the Serializable transaction isolation level has not yet - been added to Hot Standby replication targets (described in - ). The strictest isolation level currently - supported in hot standby mode is Repeatable Read. While performing all - permanent database writes within Serializable transactions on the - master will ensure that all standbys will eventually reach a consistent - state, a Repeatable Read transaction run on the standby can sometimes - see a transient state that is inconsistent with any serial execution - of the transactions on the master. - diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml index ca55a5b..0269e21 100644 --- a/doc/src/sgml/ref/set_transaction.sgml +++ b/doc/src/sgml/ref/set_transaction.sgml @@ -152,6 +152,18 @@ SET SESSION CHARACTERISTICS AS TRANSACTION transa + Serializable transactions cannot use the NOT DEFERRABLE + option when run on Hot Standby servers. READ ONLY + and DEFERRABLE are the default settings for Hot Standby + servers, but may also be specified explicitly. The first statement run in + a Serializable transaction on a Hot Standby server may need to wait until a + point in the transaction stream where it is not possible to see a transient + state that is inconsistent with any serial execution of the transactions on + the master server. This may introduce a pause, but avoids the need to + coordinate predicate locking across multiple servers. + + + The SET TRANSACTION SNAPSHOT command allows a new transaction to run with the same snapshot as an existing transaction. The pre-existing transaction must have exported its snapshot diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 91d27d0..e0bc9ad 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -116,6 +116,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars data += sizeof(xl_xact_origin); } + + if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xl_snapshot_safety = + (xl_xact_snapshot_safety *) data; + + parsed->snapshot_token = xl_snapshot_safety->token; + parsed->snapshot_safety = xl_snapshot_safety->safety; + + data += sizeof(xl_xact_snapshot_safety); + } } void @@ -171,6 +182,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) } } +static const char * +xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety) +{ + const char *string = ""; + + switch(snapshot_safety) + { + case SNAPSHOT_SAFE: + string = "SNAPSHOT_SAFE"; + break; + case SNAPSHOT_UNSAFE: + string = "SNAPSHOT_UNSAFE"; + break; + case SNAPSHOT_SAFETY_UNKNOWN: + string = "SNAPSHOT_SAFETY_UNKNOWN"; + break; + } + + return string; +} + static void xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id) { @@ -220,6 +252,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId (uint32) parsed.origin_lsn, timestamptz_to_str(parsed.origin_timestamp)); } + + if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + { + appendStringInfo(buf, "; snapshot safety: %s, token: %lx", + xact_snapshot_safety_to_string(parsed.snapshot_safety), + parsed.snapshot_token); + } } static void @@ -266,6 +305,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec) appendStringInfo(buf, " %u", xlrec->xsub[i]); } +static void +xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec) +{ + appendStringInfo(buf, "snapshot safety: %s, token: %lx", + xact_snapshot_safety_to_string(xlrec->safety), + xlrec->token); +} + void xact_desc(StringInfo buf, XLogReaderState *record) { @@ -297,6 +344,12 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec; + + xact_desc_snapshot_safety(buf, xlrec); + } } const char * @@ -324,6 +377,9 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_SNAPSHOT_SAFETY: + id = "SNAPSHOT_SAFETY"; + break; } return id; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e11b229..0913a28 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1834,13 +1834,14 @@ StartTransaction(void) { s->startedInRecovery = true; XactReadOnly = true; + XactDeferrable = true; } else { s->startedInRecovery = false; XactReadOnly = DefaultXactReadOnly; + XactDeferrable = DefaultXactDeferrable; } - XactDeferrable = DefaultXactDeferrable; XactIsoLevel = DefaultXactIsoLevel; forceSyncCommit = false; MyXactAccessedTempRel = false; @@ -5114,6 +5115,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; xl_xact_origin xl_origin; + xl_xact_snapshot_safety xl_snapshot_safety; uint8 info; @@ -5187,6 +5189,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } + if (IsolationIsSerializable()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY; + GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token, + &xl_snapshot_safety.safety); + } + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; @@ -5231,6 +5240,10 @@ XactLogCommitRecord(TimestampTz commit_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + XLogRegisterData((char *) (&xl_snapshot_safety), + sizeof(xl_xact_snapshot_safety)); + /* we allow filtering by xacts */ XLogIncludeOrigin(); @@ -5400,12 +5413,23 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionIdAsyncCommitTree( xid, parsed->nsubxacts, parsed->subxacts, lsn); + /* Coordinate atomic update of snapshot safety and ProcArray. */ + if (parsed->snapshot_token != 0) + { + BeginSnapshotSafetyReplay(); + SetNewestSnapshotSafety(parsed->snapshot_token, + parsed->snapshot_safety); + } + /* * We must mark clog before we update the ProcArray. */ ExpireTreeKnownAssignedTransactionIds( xid, parsed->nsubxacts, parsed->subxacts, max_xid); + if (parsed->snapshot_token != 0) + CompleteSnapshotSafetyReplay(); + /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as @@ -5575,6 +5599,39 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) } } +XLogRecPtr +XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety) +{ + XLogRecPtr result; + xl_xact_snapshot_safety snapshot_safety; + + snapshot_safety.token = token; + snapshot_safety.safety = safety; + + START_CRIT_SECTION(); + XLogBeginInsert(); + XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety)); + result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY); + END_CRIT_SECTION(); + + return result; +} + +static void +xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety) +{ + /* + * Any earlier COMMIT record must have carried a snapshot safety message + * the same token as this record, and had safety == + * SNAPSHOT_SAFETY_UNKNOWN. This new independent snapshot safety message + * reports that the safety is now known. We will wake any backend that is + * waiting to learn if the snapshot is safe. + */ + if (standbyState >= STANDBY_INITIALIZED) + NotifyHypotheticalSnapshotSafety(snapshot_safety->token, + snapshot_safety->safety); +} + void xact_redo(XLogReaderState *record) { @@ -5639,6 +5696,13 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xlrec = + (xl_xact_snapshot_safety *) XLogRecGetData(record); + + xact_redo_snapshot_safety(xlrec); + } else elog(PANIC, "xact_redo: unknown op code %u", info); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6cec027..8f3ba59 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4821,6 +4821,8 @@ BootStrapXLOG(void) checkPoint.newestCommitTsXid = InvalidTransactionId; checkPoint.time = (pg_time_t) time(NULL); checkPoint.oldestActiveXid = InvalidTransactionId; + checkPoint.newestSnapshotToken = 0; + checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE; ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; @@ -6426,6 +6428,8 @@ StartupXLOG(void) checkPoint.newestCommitTsXid); XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXid = checkPoint.nextXid; + SetNewestSnapshotSafety(checkPoint.newestSnapshotToken, + checkPoint.newestSnapshotSafety); /* * Initialize replication slots, before there's a chance to remove diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index defafa5..9852e5b 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -567,14 +567,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source) GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction"); return false; } - /* Can't go to serializable mode while recovery is still active */ - if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress()) - { - GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED); - GUC_check_errmsg("cannot use serializable mode in a hot standby"); - GUC_check_errhint("You can use REPEATABLE READ instead."); - return false; - } } *extra = malloc(sizeof(int)); diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 24ed21b..03219b4 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -277,6 +277,7 @@ #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0) #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0) #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0) +#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0) /* * Compute the hash code associated with a PREDICATELOCKTARGETTAG. @@ -427,7 +428,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize); static void SummarizeOldestCommittedSxact(void); static Snapshot GetSafeSnapshot(Snapshot snapshot); static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid); + TransactionId sourcexid, + SnapshotSafety *snapshot_safety); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); @@ -1207,6 +1209,9 @@ InitPredicateLocks(void) PredXact->OldCommittedSxact->xmin = InvalidTransactionId; PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED; PredXact->OldCommittedSxact->pid = 0; + SHMQueueInit(&PredXact->snapshotSafetyWaitList); + PredXact->NewestSnapshotToken = 0; + PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE; } /* This never changes, so let's keep a local copy. */ OldCommittedSxact = PredXact->OldCommittedSxact; @@ -1491,6 +1496,7 @@ static Snapshot GetSafeSnapshot(Snapshot origSnapshot) { Snapshot snapshot; + SnapshotSafety snapshot_safety; Assert(XactReadOnly && XactDeferrable); @@ -1503,10 +1509,43 @@ GetSafeSnapshot(Snapshot origSnapshot) * one passed to it, but we avoid assuming that here. */ snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, - InvalidTransactionId); + InvalidTransactionId, + &snapshot_safety); - if (MySerializableXact == InvalidSerializableXact) - return snapshot; /* no concurrent r/w xacts; it's safe */ + if (RecoveryInProgress()) + { + /* + * Check if the most recently replayed COMMIT record was either + * known to be safe because it had no concurrent r/w xacts on the + * primary, or has subsequently been declared safe by a snapshot + * safety record. + */ + if (snapshot_safety == SNAPSHOT_SAFE) + return snapshot; + else if (snapshot_safety == SNAPSHOT_UNSAFE) + { + /* + * TODO: This can only happen if the master ran out of memory + * while trying to create a hypothetical transaction, right? + * Should we wait or error out? + * + * Otherwise, a SNAPSHOT_UNSAFE state can only be + * generated by ReleasePredicateLocks *after* a commit record + * which would establish a new hypothetical snapshot. So we + * can never take a snapshot here that is already known to be + * unsafe; that is, there can never be a commit record with + * unknown followed by snapshot safety record that immediately + * marks it unsafe, because there must be a new commit in + * between. + */ + continue; + } + } + else + { + if (MySerializableXact == InvalidSerializableXact) + return snapshot; /* no concurrent r/w xacts; it's safe */ + } LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); @@ -1514,20 +1553,48 @@ GetSafeSnapshot(Snapshot origSnapshot) * Wait for concurrent transactions to finish. Stop early if one of * them marked us as conflicted. */ - MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; - while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || - SxactIsROUnsafe(MySerializableXact))) + if (RecoveryInProgress()) { - LWLockRelease(SerializableXactHashLock); - ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); - LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + /* + * Running on a standby. Wait for a the primary to tell us the + * result of testing a hypothetical transaction whose + * serializability matches the snapshot we have. + */ + Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN); + Assert(!SHMQueueIsDetached(&MyProc->safetyLinks)); + while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN) + { + LWLockRelease(SerializableXactHashLock); + ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + } + if (MyProc->snapshotSafety == SNAPSHOT_SAFE) + { + LWLockRelease(SerializableXactHashLock); + break; /* success */ + } } - MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; - - if (!SxactIsROUnsafe(MySerializableXact)) + else { - LWLockRelease(SerializableXactHashLock); - break; /* success */ + /* + * Running on primary. Wait for a signal from one of the backends + * that we possibly conflict with. + */ + MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; + while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || + SxactIsROUnsafe(MySerializableXact))) + { + LWLockRelease(SerializableXactHashLock); + ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + } + MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; + + if (!SxactIsROUnsafe(MySerializableXact)) + { + LWLockRelease(SerializableXactHashLock); + break; /* success */ + } } LWLockRelease(SerializableXactHashLock); @@ -1542,13 +1609,64 @@ GetSafeSnapshot(Snapshot origSnapshot) /* * Now we have a safe snapshot, so we don't need to do any further checks. */ - Assert(SxactIsROSafe(MySerializableXact)); + Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact)); ReleasePredicateLocks(false); return snapshot; } /* + * When the primary server has determined the safety of a hypothetical + * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a + * COMMIT record, it emits a WAL record that causes the recovery process on + * standbys to call this function. Here, we will wake up any backend that is + * currently waiting in GetSafeSnapshot to learn about the safety of a + * snapshot taken after that point in the transaction stream. + */ +void +NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety) +{ + PGPROC *proc; + PGPROC *next; + + Assert(AmStartupProcess()); + + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + + /* + * Walk the list of processes that are waiting in GetSafeSnapshot on a + * standby, and find any that are waiting to learn the safety of a + * snapshot taken at a point in time when this token appeared onthe most + * recently replayed SSI transaction. If we find any of those, tell them + * the final status for and wake them up. + */ + proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList, + &PredXact->snapshotSafetyWaitList, + offsetof(PGPROC, safetyLinks)); + while (proc != NULL) + { + next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList, + &proc->safetyLinks, + offsetof(PGPROC, safetyLinks)); + if (proc->waitSnapshotToken == token) + { + SHMQueueDelete(&proc->safetyLinks); + proc->snapshotSafety = safety; + ProcSendSignal(proc->pid); + } + proc = next; + } + + /* + * If this happens to be the most recently replayed snapshot token then + * remember this safety value. + */ + if (PredXact->NewestSnapshotToken == token) + PredXact->NewestSnapshotSafety = safety; + LWLockRelease(SerializableXactHashLock); +} + +/* * Acquire a snapshot that can be used for the current transaction. * * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact. @@ -1566,16 +1684,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) /* * Can't use serializable mode while recovery is still active, as it is, - * for example, on a hot standby. We could get here despite the check in - * check_XactIsoLevel() if default_transaction_isolation is set to - * serializable, so phrase the hint accordingly. + * for example, on a hot standby, unless DEFERRABLE mode is active. In + * that case, DEFERRABLE is the default, so this error should should only + * be reachable if the user has explicitly asked for NOT DEFERRABLE via + * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL. */ - if (RecoveryInProgress()) + if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use serializable mode in a hot standby"), - errdetail("\"default_transaction_isolation\" is set to \"serializable\"."), - errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default."))); + errmsg("cannot use serializable not deferrable mode in a hot standby"), + errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."), + errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE."))); /* * A special optimization is available for SERIALIZABLE READ ONLY @@ -1586,7 +1705,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) return GetSafeSnapshot(snapshot); return GetSerializableTransactionSnapshotInt(snapshot, - InvalidTransactionId); + InvalidTransactionId, + NULL); } /* @@ -1616,7 +1736,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE"))); - (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid); + (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid, NULL); } /* @@ -1627,10 +1747,15 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, * loaded up. HOWEVER: to avoid race conditions, we must check that the * source xact is still running after we acquire SerializableXactHashLock. * We do that by calling ProcArrayInstallImportedXmin. + * + * If snapshot_safety is a non-NULL pointer, then the safety of this snapshot + * if used on a standby server is written to it. If it is SNAPSHOT_SAFE, then + * the snapshot may be safely used. If it is SNAPSHOT_SAFETY_UNKNOWN, then */ static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid) + TransactionId sourcexid, + SnapshotSafety *snapshot_safety) { PGPROC *proc; VirtualTransactionId vxid; @@ -1641,8 +1766,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, /* We only do this for serializable transactions. Once. */ Assert(MySerializableXact == InvalidSerializableXact); - Assert(!RecoveryInProgress()); - /* * Since all parts of a serializable transaction must use the same * snapshot, it is too late to establish one after a parallel operation @@ -1683,6 +1806,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, } } while (!sxact); + /* + * Note the snapshot safety information for standbys. This can be used to + * know if the returned snapshot is already known to be safe/unsafe, or if + * we must wait for notification of the final safety determination. + */ + if (snapshot_safety != NULL) + { + *snapshot_safety = PredXact->NewestSnapshotSafety; + if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN) + { + /* + * We must put the this process into the waitlist while we hold + * the lock or there would be a race condition where we might miss + * a notification. The caller must wait for + * MyProc->snapshotSafety to be set to a final value. + */ + MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN; + MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken; + if (SHMQueueIsDetached(&MyProc->safetyLinks)) + SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList, + &MyProc->safetyLinks); + } + } + /* Get the snapshot, or check that it's safe to use */ if (!TransactionIdIsValid(sourcexid)) snapshot = GetSnapshotData(snapshot); @@ -3439,12 +3586,35 @@ ReleasePredicateLocks(bool isCommit) /* * Wake up the process for a waiting DEFERRABLE transaction if we - * now know it's either safe or conflicted. + * now know it's either safe or conflicted. This releases + * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary. */ if (SxactIsDeferrableWaiting(roXact) && (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) ProcSendSignal(roXact->pid); + /* + * If a hypothetical transaction is now known to be safe or + * unsafe, we can report that in the WAL for the benefit of + * standbys and recycle it. This releases SERIALIZABLE READ ONLY + * DEFERRABLE transactions that are waiting for the status of this + * particular hypothetical tranasactions on any standby that + * replays it. + */ + if (SxactIsHypothetical(roXact) && + (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) + { + SnapshotSafety safety; + + if (SxactIsROSafe(roXact)) + safety = SNAPSHOT_SAFE; + else + safety = SNAPSHOT_UNSAFE; + XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot, + safety); + ReleasePredXact(roXact); + } + possibleUnsafeConflict = nextConflict; } } @@ -4704,6 +4874,80 @@ PreCommit_CheckForSerializationFailure(void) MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo); MySerializableXact->flags |= SXACT_FLAG_PREPARED; + /* + * For the benefit of hot standby servers that want to take a safe + * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a + * hypothetical read-only serializable transaction that starts after this + * transaction commits would be safe. + */ + if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1)) + { + /* + * There are no concurrent writable SERIALIZABLE transactions. A + * read-only snapshot taken immediately after this one commits is + * safe. + */ + MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE; + } + else + { + SERIALIZABLEXACT *sxact; + SERIALIZABLEXACT *othersxact; + + /* + * We can't yet determine whether a read-only transaction beginning + * now would be safe. Create a hypothetical SERIALIZABLEXACT and let + * ReleasePredicateLocks report on its safety once that can be + * determined. + */ + sxact = CreatePredXact(); + if (sxact == NULL) + { + /* Out of space. Don't allow SERIALIZABLE on standbys. */ + MySerializableXact->snapshotSafetyAfterThisCommit = + SNAPSHOT_UNSAFE; + } + else + { + SetInvalidVirtualTransactionId(sxact->vxid); + sxact->SeqNo.lastCommitBeforeSnapshot = + MySerializableXact->prepareSeqNo; + sxact->prepareSeqNo = InvalidSerCommitSeqNo; + sxact->commitSeqNo = InvalidSerCommitSeqNo; + SHMQueueInit(&(sxact->outConflicts)); + SHMQueueInit(&(sxact->inConflicts)); + SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + sxact->topXid = InvalidTransactionId; + sxact->finishedBefore = InvalidTransactionId; + sxact->xmin = MySerializableXact->xmin; + sxact->pid = InvalidPid; + SHMQueueInit(&(sxact->predicateLocks)); + SHMQueueElemInit(&(sxact->finishedLink)); + sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL; + + /* Register concurrent r/w transactions as possible conflicts. */ + for (othersxact = FirstPredXact(); + othersxact != NULL; + othersxact = NextPredXact(othersxact)) + { + if (othersxact != MySerializableXact + && !SxactIsCommitted(othersxact) + && !SxactIsDoomed(othersxact) + && !SxactIsReadOnly(othersxact)) + { + SetPossibleUnsafeConflict(sxact, othersxact); + } + } + + /* + * The status will be reported in a later WAL record once it has + * been determined. + */ + MySerializableXact->snapshotSafetyAfterThisCommit = + SNAPSHOT_SAFETY_UNKNOWN; + } + } + LWLockRelease(SerializableXactHashLock); } @@ -4966,3 +5210,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, CreatePredicateLock(&lockRecord->target, targettaghash, sxact); } } + +/* + * Accessor for the hypothetical snapshot safety information needed for commit + * records generated on primary servers. This is used by XlactLogCommitRecord + * to receive the safety level computed by + * PreCommit_CheckForSerializationFailure in a committing SSI transaction. + */ +void +GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety) +{ + *token = MySerializableXact->prepareSeqNo; + *safety = MySerializableXact->snapshotSafetyAfterThisCommit; +} + +void +BeginSnapshotSafetyReplay(void) +{ + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); +} + +/* + * Used in recovery when replaying commit records. On a hot standby, these + * values must be set atomically with ProcArray updates, mirroring the code + * in GetSerializableTransactionSnapshotInt. This is done by wrapping both + * in BeginSnapshotSafetyReplay/CompleteSnapshotSafetyReplay. + */ +void +SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety) +{ + PredXact->NewestSnapshotToken = token; + PredXact->NewestSnapshotSafety = safety; +} + +void +CompleteSnapshotSafetyReplay(void) +{ + LWLockRelease(SerializableXactHashLock); +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index b201631..bf2e2b7 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -394,6 +394,11 @@ InitProcess(void) MyProc->syncRepState = SYNC_REP_NOT_WAITING; SHMQueueElemInit(&(MyProc->syncRepLinks)); + /* Initialize fields for SERIALIZABLE on standbys */ + MyProc->waitSnapshotToken = 0; + MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN; + SHMQueueElemInit(&(MyProc->safetyLinks)); + /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; MyProc->procArrayGroupMemberXid = InvalidTransactionId; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index a123d2a..7ea72d5 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -118,7 +118,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_SNAPSHOT_SAFETY 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -137,6 +137,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_INVALS (1U << 3) #define XACT_XINFO_HAS_TWOPHASE (1U << 4) #define XACT_XINFO_HAS_ORIGIN (1U << 5) +#define XACT_XINFO_HAS_SNAPSHOT_SAFETY (1U << 6) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -232,6 +233,12 @@ typedef struct xl_xact_origin TimestampTz origin_timestamp; } xl_xact_origin; +typedef struct xl_xact_snapshot_safety +{ + SnapshotToken token; + SnapshotSafety safety; +} xl_xact_snapshot_safety; + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ @@ -243,6 +250,7 @@ typedef struct xl_xact_commit /* xl_xact_invals follows if XINFO_HAS_INVALS */ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ /* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */ + /* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */ } xl_xact_commit; #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) @@ -286,6 +294,9 @@ typedef struct xl_xact_parsed_commit XLogRecPtr origin_lsn; TimestampTz origin_timestamp; + + SnapshotToken snapshot_token; + SnapshotSafety snapshot_safety; } xl_xact_parsed_commit; typedef struct xl_xact_parsed_abort @@ -370,6 +381,10 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, TransactionId twophase_xid); + +extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token, + SnapshotSafety safety); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index c2c6632..13789fa 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -51,6 +51,18 @@ typedef uint32 TimeLineID; typedef uint16 RepOriginId; /* + * Snapshot safety information using to control SERIALIAZABLE on standby + * servers appears in checkpoints, so we define the types used here. + */ +typedef uint64 SnapshotToken; +typedef enum SnapshotSafety +{ + SNAPSHOT_SAFE, + SNAPSHOT_UNSAFE, + SNAPSHOT_SAFETY_UNKNOWN +} SnapshotSafety; + +/* * Because O_DIRECT bypasses the kernel buffers, and because we never * read those buffers except during crash recovery or if wal_level != minimal, * it is a win to use it in all cases where we sync on each write(). We could diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index a66b5b7..0fd3aa7 100644 --- a/src/include/storage/predicate.h +++ b/src/include/storage/predicate.h @@ -27,7 +27,6 @@ extern int max_predicate_locks_per_xact; /* Number of SLRU buffers to use for predicate locking */ #define NUM_OLDSERXID_BUFFERS 16 - /* * function prototypes */ @@ -70,4 +69,11 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); extern void predicatelock_twophase_recover(TransactionId xid, uint16 info, void *recdata, uint32 len); +/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */ +extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety); +extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety); +extern void BeginSnapshotSafetyReplay(void); +extern void SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety); +extern void CompleteSnapshotSafetyReplay(void); + #endif /* PREDICATE_H */ diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 3175d28..baa2b0f 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT */ SHM_QUEUE possibleUnsafeConflicts; + /* + * for committing transactions: would a hypothetical read-only snapshot + * taken immediately after this transaction commits be safe? + */ + SnapshotSafety snapshotSafetyAfterThisCommit; + TransactionId topXid; /* top level xid for the transaction, if one * exists; else invalid */ TransactionId finishedBefore; /* invalid means still running; else @@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT #define SXACT_FLAG_RO_UNSAFE 0x00000100 #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400 +#define SXACT_FLAG_HYPOTHETICAL 0x00000800 /* * The following types are used to provide an ad hoc list for holding @@ -173,6 +180,11 @@ typedef struct PredXactListData * seq no */ SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */ + /* Tracking of snapshot safety on standby servers. */ + SHM_QUEUE snapshotSafetyWaitList; + SnapshotToken NewestSnapshotToken; + SnapshotSafety NewestSnapshotSafety; + PredXactListElement element; } PredXactListData; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 7dc8dac..468018e 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -133,6 +133,11 @@ struct PGPROC int syncRepState; /* wait state for sync rep */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + /* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */ + SnapshotToken waitSnapshotToken; + SnapshotSafety snapshotSafety; /* space for result */ + SHM_QUEUE safetyLinks; /* list link for GetSafeSnapshot */ + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of