From b0f953e1447bcf020bf2f25c065c0b151af2ee5d Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 21 Sep 2018 01:49:22 +1200 Subject: [PATCH] SERIALIZABLE READ ONLY DEFERRABLE for hot standbys. Work in progress! Author: Thomas Munro, based on an idea from Kevin Grittner Discussion: https://postgr.es/m/CAEepm%3D2b9TV%2BvJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ%40mail.gmail.com --- doc/src/sgml/high-availability.sgml | 9 - doc/src/sgml/mvcc.sgml | 21 -- doc/src/sgml/ref/set_transaction.sgml | 36 +++ src/backend/access/rmgrdesc/xactdesc.c | 56 ++++ src/backend/access/transam/xact.c | 73 ++++- src/backend/access/transam/xlog.c | 7 + src/backend/commands/variable.c | 8 - src/backend/storage/lmgr/predicate.c | 336 ++++++++++++++++++++-- src/backend/storage/lmgr/proc.c | 5 + src/include/access/xact.h | 17 +- src/include/access/xlogdefs.h | 12 + src/include/storage/predicate.h | 8 +- src/include/storage/predicate_internals.h | 12 + src/include/storage/proc.h | 5 + 14 files changed, 534 insertions(+), 71 deletions(-) diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 8cb77f85ec0..1a26c26a71f 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -2449,15 +2449,6 @@ LOG: database system is ready to accept read only connections your setting of max_prepared_transactions is 0. - - - The Serializable transaction isolation level is not yet available in hot - standby. (See and - for details.) - An attempt to set a transaction to the serializable isolation level in - hot standby mode will generate an error. - - diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml index 73934e5cf37..49aa7d51c4a 100644 --- a/doc/src/sgml/mvcc.sgml +++ b/doc/src/sgml/mvcc.sgml @@ -1599,15 +1599,6 @@ SELECT pg_advisory_lock(q.id) FROM See for performance suggestions. - - - - This level of integrity protection using Serializable transactions - does not yet extend to hot standby mode (). - Because of that, those using hot standby may want to use Repeatable - Read and explicit locking on the master. - - @@ -1700,18 +1691,6 @@ SELECT pg_advisory_lock(q.id) FROM could cause visible inconsistency between the contents of the target table and other tables in the database. - - - Support for the Serializable transaction isolation level has not yet - been added to Hot Standby replication targets (described in - ). The strictest isolation level currently - supported in hot standby mode is Repeatable Read. While performing all - permanent database writes within Serializable transactions on the - master will ensure that all standbys will eventually reach a consistent - state, a Repeatable Read transaction run on the standby can sometimes - see a transient state that is inconsistent with any serial execution - of the transactions on the master. - diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml index 43b1c6c892b..0c7d8b22710 100644 --- a/doc/src/sgml/ref/set_transaction.sgml +++ b/doc/src/sgml/ref/set_transaction.sgml @@ -151,6 +151,42 @@ SET SESSION CHARACTERISTICS AS TRANSACTION transa is well suited for long-running reports or backups. + + Serializable transactions cannot use the NOT DEFERRABLE + option when run on Hot Standby servers. READ ONLY + and DEFERRABLE are the default settings for Hot Standby + servers, but may also be specified explicitly. The first statement run in + a Serializable transaction on a Hot Standby server may need to wait until a + point in the transaction stream where it is not possible to see a transient + state that is inconsistent with any serial execution of the transactions on + the master server. This may introduce a pause, but avoids the need to + coordinate predicate locking across multiple servers. + + + + Serializable transactions cannot use the NOT DEFERRABLE + option when run on Hot Standby servers. READ ONLY + and DEFERRABLE are the default settings for Hot Standby + servers, but may also be specified explicitly. The first statement run in + a Serializable transaction on a Hot Standby server may need to wait until a + point in the transaction stream where it is not possible to see a transient + state that is inconsistent with any serial execution of the transactions on + the master server. This may introduce a pause, but avoids the need to + coordinate predicate locking across multiple servers. + + + + Serializable transactions cannot use the NOT DEFERRABLE + option when run on Hot Standby servers. READ ONLY + and DEFERRABLE are the default settings for Hot Standby + servers, but may also be specified explicitly. The first statement run in + a Serializable transaction on a Hot Standby server may need to wait until a + point in the transaction stream where it is not possible to see a transient + state that is inconsistent with any serial execution of the transactions on + the master server. This may introduce a pause, but avoids the need to + coordinate predicate locking across multiple servers. + + The SET TRANSACTION SNAPSHOT command allows a new transaction to run with the same snapshot as an existing diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 6d5ebd475b4..e4ab28b9eab 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -123,6 +123,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars data += sizeof(xl_xact_origin); } + + if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xl_snapshot_safety = + (xl_xact_snapshot_safety *) data; + + parsed->snapshot_token = xl_snapshot_safety->token; + parsed->snapshot_safety = xl_snapshot_safety->safety; + + data += sizeof(xl_xact_snapshot_safety); + } } void @@ -209,6 +220,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) } } +static const char * +xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety) +{ + const char *string = ""; + + switch(snapshot_safety) + { + case SNAPSHOT_SAFE: + string = "SNAPSHOT_SAFE"; + break; + case SNAPSHOT_UNSAFE: + string = "SNAPSHOT_UNSAFE"; + break; + case SNAPSHOT_SAFETY_UNKNOWN: + string = "SNAPSHOT_SAFETY_UNKNOWN"; + break; + } + + return string; +} + static void xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id) { @@ -258,6 +290,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId (uint32) parsed.origin_lsn, timestamptz_to_str(parsed.origin_timestamp)); } + + if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + { + appendStringInfo(buf, "; snapshot safety: %s, token: %lx", + xact_snapshot_safety_to_string(parsed.snapshot_safety), + parsed.snapshot_token); + } } static void @@ -304,6 +343,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec) appendStringInfo(buf, " %u", xlrec->xsub[i]); } +static void +xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec) +{ + appendStringInfo(buf, "snapshot safety: %s, token: %lx", + xact_snapshot_safety_to_string(xlrec->safety), + xlrec->token); +} + void xact_desc(StringInfo buf, XLogReaderState *record) { @@ -335,6 +382,12 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec; + + xact_desc_snapshot_safety(buf, xlrec); + } } const char * @@ -362,6 +415,9 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_SNAPSHOT_SAFETY: + id = "SNAPSHOT_SAFETY"; + break; } return id; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 875be180fe4..98136e05c6f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1812,13 +1812,14 @@ StartTransaction(void) { s->startedInRecovery = true; XactReadOnly = true; + XactDeferrable = true; } else { s->startedInRecovery = false; XactReadOnly = DefaultXactReadOnly; + XactDeferrable = DefaultXactDeferrable; } - XactDeferrable = DefaultXactDeferrable; XactIsoLevel = DefaultXactIsoLevel; forceSyncCommit = false; MyXactFlags = 0; @@ -5228,6 +5229,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; xl_xact_origin xl_origin; + xl_xact_snapshot_safety xl_snapshot_safety; uint8 info; Assert(CritSectionCount > 0); @@ -5306,6 +5308,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } + if (IsolationIsSerializable()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY; + GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token, + &xl_snapshot_safety.safety); + } + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; @@ -5354,6 +5363,10 @@ XactLogCommitRecord(TimestampTz commit_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY) + XLogRegisterData((char *) (&xl_snapshot_safety), + sizeof(xl_xact_snapshot_safety)); + /* we allow filtering by xacts */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); @@ -5565,12 +5578,23 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionIdAsyncCommitTree( xid, parsed->nsubxacts, parsed->subxacts, lsn); + /* Coordinate atomic update of snapshot safety and ProcArray. */ + if (parsed->snapshot_token != 0) + { + BeginSnapshotSafetyReplay(); + SetNewestSnapshotSafety(parsed->snapshot_token, + parsed->snapshot_safety); + } + /* * We must mark clog before we update the ProcArray. */ ExpireTreeKnownAssignedTransactionIds( xid, parsed->nsubxacts, parsed->subxacts, max_xid); + if (parsed->snapshot_token != 0) + CompleteSnapshotSafetyReplay(); + /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as @@ -5724,6 +5748,46 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) DropRelationFiles(parsed->xnodes, parsed->nrels, true); } +XLogRecPtr +XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety) +{ + XLogRecPtr result; + xl_xact_snapshot_safety snapshot_safety; + + snapshot_safety.token = token; + snapshot_safety.safety = safety; + + START_CRIT_SECTION(); + XLogBeginInsert(); + XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety)); + result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY); + END_CRIT_SECTION(); + + /* + * TODO: How can we avoid having to flush again (after already flushing + * for the commit)? If we don't have this flush here, then they standby + * has to wait a while to find out whether its snapshot is safe. + */ + XLogFlush(result); + + return result; +} + +static void +xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety) +{ + /* + * Any earlier COMMIT record must have carried a snapshot safety message + * the same token as this record, and had safety == + * SNAPSHOT_SAFETY_UNKNOWN. This new independent snapshot safety message + * reports that the safety is now known. We will wake any backend that is + * waiting to learn if the snapshot is safe. + */ + if (standbyState >= STANDBY_INITIALIZED) + NotifyHypotheticalSnapshotSafety(snapshot_safety->token, + snapshot_safety->safety); +} + void xact_redo(XLogReaderState *record) { @@ -5797,6 +5861,13 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_SNAPSHOT_SAFETY) + { + xl_xact_snapshot_safety *xlrec = + (xl_xact_snapshot_safety *) XLogRecGetData(record); + + xact_redo_snapshot_safety(xlrec); + } else elog(PANIC, "xact_redo: unknown op code %u", info); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3025d0badb8..fec37f67e1b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5177,6 +5177,8 @@ BootStrapXLOG(void) checkPoint.newestCommitTsXid = InvalidTransactionId; checkPoint.time = (pg_time_t) time(NULL); checkPoint.oldestActiveXid = InvalidTransactionId; +// checkPoint.newestSnapshotToken = 0; +// checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE; ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; @@ -5186,6 +5188,8 @@ BootStrapXLOG(void) SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId); +// SetNewestSnapshotSafety(checkPoint.newestSnapshotToken, +// checkPoint.newestSnapshotSafety); /* Set up the XLOG page header */ page->xlp_magic = XLOG_PAGE_MAGIC; @@ -6813,6 +6817,9 @@ StartupXLOG(void) checkPoint.newestCommitTsXid); XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXid = checkPoint.nextXid; + /* TODO: figure out checkpoint protocol */ +// SetNewestSnapshotSafety(checkPoint.newestSnapshotToken, +// checkPoint.newestSnapshotSafety); /* * Initialize replication slots, before there's a chance to remove diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index 9a754dae3fd..9c6083d0edc 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -564,14 +564,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source) GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction"); return false; } - /* Can't go to serializable mode while recovery is still active */ - if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress()) - { - GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED); - GUC_check_errmsg("cannot use serializable mode in a hot standby"); - GUC_check_errhint("You can use REPEATABLE READ instead."); - return false; - } } *extra = malloc(sizeof(int)); diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index e8390311d03..8053d23131a 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -279,6 +279,7 @@ #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0) #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0) #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0) +#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0) /* * Compute the hash code associated with a PREDICATELOCKTARGETTAG. @@ -433,7 +434,8 @@ static void SummarizeOldestCommittedSxact(void); static Snapshot GetSafeSnapshot(Snapshot snapshot); static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, VirtualTransactionId *sourcevxid, - int sourcepid); + int sourcepid, + SnapshotSafety *snapshot_safety); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); @@ -1186,6 +1188,9 @@ InitPredicateLocks(void) PredXact->OldCommittedSxact->xmin = InvalidTransactionId; PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED; PredXact->OldCommittedSxact->pid = 0; + SHMQueueInit(&PredXact->snapshotSafetyWaitList); + PredXact->NewestSnapshotToken = 0; + PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE; } /* This never changes, so let's keep a local copy. */ OldCommittedSxact = PredXact->OldCommittedSxact; @@ -1468,6 +1473,7 @@ static Snapshot GetSafeSnapshot(Snapshot origSnapshot) { Snapshot snapshot; + SnapshotSafety snapshot_safety; Assert(XactReadOnly && XactDeferrable); @@ -1480,10 +1486,36 @@ GetSafeSnapshot(Snapshot origSnapshot) * one passed to it, but we avoid assuming that here. */ snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, - NULL, InvalidPid); - - if (MySerializableXact == InvalidSerializableXact) - return snapshot; /* no concurrent r/w xacts; it's safe */ + NULL, InvalidPid, + &snapshot_safety); + if (RecoveryInProgress()) + { + /* + * Check if the most recently replayed COMMIT record was either + * known to be safe because it had no concurrent r/w xacts on the + * primary, or has subsequently been declared safe by a snapshot + * safety record. + */ + if (snapshot_safety == SNAPSHOT_SAFE) + { + elog(WARNING, "got a safe snapshot!"); + return snapshot; + } + else if (snapshot_safety == SNAPSHOT_UNSAFE) + { + /* + * TODO: This can only happen if the master ran out of memory + * while trying to create a hypothetical transaction, right? + * Should we wait or error out? + */ + continue; + } + } + else + { + if (MySerializableXact == InvalidSerializableXact) + return snapshot; /* no concurrent r/w xacts; it's safe */ + } LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); @@ -1491,20 +1523,48 @@ GetSafeSnapshot(Snapshot origSnapshot) * Wait for concurrent transactions to finish. Stop early if one of * them marked us as conflicted. */ - MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; - while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || - SxactIsROUnsafe(MySerializableXact))) + if (RecoveryInProgress()) { - LWLockRelease(SerializableXactHashLock); - ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); - LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + /* + * Running on a standby. Wait for a the primary to tell us the + * result of testing a hypothetical transaction whose + * serializability matches the snapshot we have. + */ + Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN); + Assert(!SHMQueueIsDetached(&MyProc->safetyLinks)); + while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN) + { + LWLockRelease(SerializableXactHashLock); + ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + } + if (MyProc->snapshotSafety == SNAPSHOT_SAFE) + { + LWLockRelease(SerializableXactHashLock); + break; /* success */ + } } - MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; - - if (!SxactIsROUnsafe(MySerializableXact)) + else { - LWLockRelease(SerializableXactHashLock); - break; /* success */ + /* + * Running on primary. Wait for a signal from one of the backends + * that we possibly conflict with. + */ + MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; + while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || + SxactIsROUnsafe(MySerializableXact))) + { + LWLockRelease(SerializableXactHashLock); + ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT); + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + } + MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING; + + if (!SxactIsROUnsafe(MySerializableXact)) + { + LWLockRelease(SerializableXactHashLock); + break; /* success */ + } } LWLockRelease(SerializableXactHashLock); @@ -1519,12 +1579,64 @@ GetSafeSnapshot(Snapshot origSnapshot) /* * Now we have a safe snapshot, so we don't need to do any further checks. */ - Assert(SxactIsROSafe(MySerializableXact)); + Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact)); ReleasePredicateLocks(false); return snapshot; } + /* + * When the primary server has determined the safety of a hypothetical + * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a + * COMMIT record, it emits a WAL record that causes the recovery process on + * standbys to call this function. Here, we will wake up any backend that is + * currently waiting in GetSafeSnapshot to learn about the safety of a + * snapshot taken after that point in the transaction stream. + */ +void +NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety) +{ + PGPROC *proc; + PGPROC *next; + + Assert(AmStartupProcess()); + elog(LOG, "NotifyHypotheticalSnapshotSafety token = %ld, safety = %d", token, safety); + + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + + /* + * Walk the list of processes that are waiting in GetSafeSnapshot on a + * standby, and find any that are waiting to learn the safety of a + * snapshot taken at a point in time when this token appeared onthe most + * recently replayed SSI transaction. If we find any of those, tell them + * the final status for and wake them up. + */ + proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList, + &PredXact->snapshotSafetyWaitList, + offsetof(PGPROC, safetyLinks)); + while (proc != NULL) + { + next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList, + &proc->safetyLinks, + offsetof(PGPROC, safetyLinks)); + if (proc->waitSnapshotToken == token) + { + SHMQueueDelete(&proc->safetyLinks); + proc->snapshotSafety = safety; + ProcSendSignal(proc->pid); + } + proc = next; + } + + /* + * If this happens to be the most recently replayed snapshot token then + * remember this safety value. + */ + if (PredXact->NewestSnapshotToken == token) + PredXact->NewestSnapshotSafety = safety; + LWLockRelease(SerializableXactHashLock); +} + /* * GetSafeSnapshotBlockingPids * If the specified process is currently blocked in GetSafeSnapshot, @@ -1593,16 +1705,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) /* * Can't use serializable mode while recovery is still active, as it is, - * for example, on a hot standby. We could get here despite the check in - * check_XactIsoLevel() if default_transaction_isolation is set to - * serializable, so phrase the hint accordingly. + * for example, on a hot standby, unless DEFERRABLE mode is active. In + * that case, DEFERRABLE is the default, so this error should should only + * be reachable if the user has explicitly asked for NOT DEFERRABLE via + * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL. */ - if (RecoveryInProgress()) + if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use serializable mode in a hot standby"), - errdetail("\"default_transaction_isolation\" is set to \"serializable\"."), - errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default."))); + errmsg("cannot use serializable not deferrable mode in a hot standby"), + errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."), + errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE."))); /* * A special optimization is available for SERIALIZABLE READ ONLY @@ -1613,7 +1726,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) return GetSafeSnapshot(snapshot); return GetSerializableTransactionSnapshotInt(snapshot, - NULL, InvalidPid); + NULL, InvalidPid, NULL); } /* @@ -1645,7 +1758,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE"))); (void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid, - sourcepid); + sourcepid, NULL); } /* @@ -1656,11 +1769,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, * loaded up. HOWEVER: to avoid race conditions, we must check that the * source xact is still running after we acquire SerializableXactHashLock. * We do that by calling ProcArrayInstallImportedXmin. + * + * If snapshot_safety is a non-NULL, then the safety of this snapshot if used + * on a standby server is written to it. If it is SNAPSHOT_SAFE, then the + * snapshot may be safely used. If it is SNAPSHOT_SAFETY_UNKNOWN, then the + * caller must wait for the safety to be announced in the WAL. */ static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, VirtualTransactionId *sourcevxid, - int sourcepid) + int sourcepid, + SnapshotSafety *snapshot_safety) { PGPROC *proc; VirtualTransactionId vxid; @@ -1671,8 +1790,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, /* We only do this for serializable transactions. Once. */ Assert(MySerializableXact == InvalidSerializableXact); - Assert(!RecoveryInProgress()); - /* * Since all parts of a serializable transaction must use the same * snapshot, it is too late to establish one after a parallel operation @@ -1713,6 +1830,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, } } while (!sxact); + /* + * Note the snapshot safety information for standbys. This can be used to + * know if the returned snapshot is already known to be safe/unsafe, or if + * we must wait for notification of the final safety determination. + */ + if (snapshot_safety != NULL) + { + *snapshot_safety = PredXact->NewestSnapshotSafety; + if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN) + { + /* + * We must put the this process into the waitlist while we hold + * the lock or there would be a race condition where we might miss + * a notification. The caller must wait for + * MyProc->snapshotSafety to be set to a final value. + */ + MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN; + MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken; + if (SHMQueueIsDetached(&MyProc->safetyLinks)) + SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList, + &MyProc->safetyLinks); + } + } + /* Get the snapshot, or check that it's safe to use */ if (!sourcevxid) snapshot = GetSnapshotData(snapshot); @@ -3476,12 +3617,35 @@ ReleasePredicateLocks(bool isCommit) /* * Wake up the process for a waiting DEFERRABLE transaction if we - * now know it's either safe or conflicted. + * now know it's either safe or conflicted. This releases + * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary. */ if (SxactIsDeferrableWaiting(roXact) && (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) ProcSendSignal(roXact->pid); + /* + * If a hypothetical transaction is now known to be safe or + * unsafe, we can report that in the WAL for the benefit of + * standbys and recycle it. This releases SERIALIZABLE READ ONLY + * DEFERRABLE transactions that are waiting for the status of this + * particular hypothetical tranasactions on any standby that + * replays it. + */ + if (SxactIsHypothetical(roXact) && + (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) + { + SnapshotSafety safety; + + if (SxactIsROSafe(roXact)) + safety = SNAPSHOT_SAFE; + else + safety = SNAPSHOT_UNSAFE; + XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot, + safety); + ReleasePredXact(roXact); + } + possibleUnsafeConflict = nextConflict; } } @@ -4741,6 +4905,80 @@ PreCommit_CheckForSerializationFailure(void) MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo); MySerializableXact->flags |= SXACT_FLAG_PREPARED; + /* + * For the benefit of hot standby servers that want to take a safe + * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a + * hypothetical read-only serializable transaction that starts after this + * transaction commits would be safe. + */ + if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1)) + { + /* + * There are no concurrent writable SERIALIZABLE transactions. A + * read-only snapshot taken immediately after this one commits is + * safe. + */ + MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE; + } + else + { + SERIALIZABLEXACT *sxact; + SERIALIZABLEXACT *othersxact; + + /* + * We can't yet determine whether a read-only transaction beginning + * now would be safe. Create a hypothetical SERIALIZABLEXACT and let + * ReleasePredicateLocks report on its safety once that can be + * determined. + */ + sxact = CreatePredXact(); + if (sxact == NULL) + { + /* Out of space. Don't allow SERIALIZABLE on standbys. */ + MySerializableXact->snapshotSafetyAfterThisCommit = + SNAPSHOT_UNSAFE; + } + else + { + SetInvalidVirtualTransactionId(sxact->vxid); + sxact->SeqNo.lastCommitBeforeSnapshot = + MySerializableXact->prepareSeqNo; + sxact->prepareSeqNo = InvalidSerCommitSeqNo; + sxact->commitSeqNo = InvalidSerCommitSeqNo; + SHMQueueInit(&(sxact->outConflicts)); + SHMQueueInit(&(sxact->inConflicts)); + SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + sxact->topXid = InvalidTransactionId; + sxact->finishedBefore = InvalidTransactionId; + sxact->xmin = MySerializableXact->xmin; + sxact->pid = InvalidPid; + SHMQueueInit(&(sxact->predicateLocks)); + SHMQueueElemInit(&(sxact->finishedLink)); + sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL; + + /* Register concurrent r/w transactions as possible conflicts. */ + for (othersxact = FirstPredXact(); + othersxact != NULL; + othersxact = NextPredXact(othersxact)) + { + if (othersxact != MySerializableXact + && !SxactIsCommitted(othersxact) + && !SxactIsDoomed(othersxact) + && !SxactIsReadOnly(othersxact)) + { + SetPossibleUnsafeConflict(sxact, othersxact); + } + } + + /* + * The status will be reported in a later WAL record once it has + * been determined. + */ + MySerializableXact->snapshotSafetyAfterThisCommit = + SNAPSHOT_SAFETY_UNKNOWN; + } + } + LWLockRelease(SerializableXactHashLock); } @@ -5003,3 +5241,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, CreatePredicateLock(&lockRecord->target, targettaghash, sxact); } } + +/* + * Accessor for the hypothetical snapshot safety information needed for commit + * records generated on primary servers. This is used by XlactLogCommitRecord + * to receive the safety level computed by + * PreCommit_CheckForSerializationFailure in a committing SSI transaction. + */ +void +GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety) +{ + *token = MySerializableXact->prepareSeqNo; + *safety = MySerializableXact->snapshotSafetyAfterThisCommit; +} + +void +BeginSnapshotSafetyReplay(void) +{ + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); +} + +/* + * Used in recovery when replaying commit records. On a hot standby, these + * values must be set atomically with ProcArray updates, mirroring the code + * in GetSerializableTransactionSnapshotInt. This is done by wrapping both + * in BeginSnapshotSafetyReplay/CompleteSnapshotSafetyReplay. + */ +void +SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety) +{ + PredXact->NewestSnapshotToken = token; + PredXact->NewestSnapshotSafety = safety; +} + +void +CompleteSnapshotSafetyReplay(void) +{ + LWLockRelease(SerializableXactHashLock); +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6f9aaa52faf..33e7ace17bb 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -398,6 +398,11 @@ InitProcess(void) MyProc->syncRepState = SYNC_REP_NOT_WAITING; SHMQueueElemInit(&(MyProc->syncRepLinks)); + /* Initialize fields for SERIALIZABLE on standbys */ + MyProc->waitSnapshotToken = 0; + MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN; + SHMQueueElemInit(&(MyProc->safetyLinks)); + /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; MyProc->procArrayGroupMemberXid = InvalidTransactionId; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 083e879d5c3..da632445c68 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -143,7 +143,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_SNAPSHOT_SAFETY 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -164,6 +164,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_ORIGIN (1U << 5) #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) #define XACT_XINFO_HAS_GID (1U << 7) +#define XACT_XINFO_HAS_SNAPSHOT_SAFETY (1U << 8) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -259,6 +260,12 @@ typedef struct xl_xact_origin TimestampTz origin_timestamp; } xl_xact_origin; +typedef struct xl_xact_snapshot_safety +{ + SnapshotToken token; + SnapshotSafety safety; +} xl_xact_snapshot_safety; + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ @@ -271,6 +278,7 @@ typedef struct xl_xact_commit /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ /* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */ /* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */ + /* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */ } xl_xact_commit; #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) @@ -318,6 +326,9 @@ typedef struct xl_xact_parsed_commit XLogRecPtr origin_lsn; TimestampTz origin_timestamp; + + SnapshotToken snapshot_token; + SnapshotSafety snapshot_safety; } xl_xact_parsed_commit; typedef xl_xact_parsed_commit xl_xact_parsed_prepare; @@ -416,6 +427,10 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nrels, RelFileNode *rels, int xactflags, TransactionId twophase_xid, const char *twophase_gid); + +extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token, + SnapshotSafety safety); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 0a48d1cfb40..c747a126865 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -50,6 +50,18 @@ typedef uint32 TimeLineID; */ typedef uint16 RepOriginId; +/* + * Snapshot safety information using to control SERIALIAZABLE on standby + * servers appears in checkpoints, so we define the types used here. + */ +typedef uint64 SnapshotToken; +typedef enum SnapshotSafety +{ + SNAPSHOT_SAFE, + SNAPSHOT_UNSAFE, + SNAPSHOT_SAFETY_UNKNOWN +} SnapshotSafety; + /* * Because O_DIRECT bypasses the kernel buffers, and because we never * read those buffers except during crash recovery or if wal_level != minimal, diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index 6a3464daa1e..a84f80d88fd 100644 --- a/src/include/storage/predicate.h +++ b/src/include/storage/predicate.h @@ -30,7 +30,6 @@ extern int max_predicate_locks_per_page; /* Number of SLRU buffers to use for predicate locking */ #define NUM_OLDSERXID_BUFFERS 16 - /* * function prototypes */ @@ -74,4 +73,11 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); extern void predicatelock_twophase_recover(TransactionId xid, uint16 info, void *recdata, uint32 len); +/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */ +extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety); +extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety); +extern void BeginSnapshotSafetyReplay(void); +extern void SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety); +extern void CompleteSnapshotSafetyReplay(void); + #endif /* PREDICATE_H */ diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 0f736d37dff..db69fba2925 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT */ SHM_QUEUE possibleUnsafeConflicts; + /* + * for committing transactions: would a hypothetical read-only snapshot + * taken immediately after this transaction commits be safe? + */ + SnapshotSafety snapshotSafetyAfterThisCommit; + TransactionId topXid; /* top level xid for the transaction, if one * exists; else invalid */ TransactionId finishedBefore; /* invalid means still running; else the @@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT #define SXACT_FLAG_RO_UNSAFE 0x00000100 #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400 +#define SXACT_FLAG_HYPOTHETICAL 0x00000800 /* * The following types are used to provide an ad hoc list for holding @@ -172,6 +179,11 @@ typedef struct PredXactListData * seq no */ SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */ + /* Tracking of snapshot safety on standby servers. */ + SHM_QUEUE snapshotSafetyWaitList; + SnapshotToken NewestSnapshotToken; + SnapshotSafety NewestSnapshotSafety; + PredXactListElement element; } PredXactListData; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index cb613c8076e..1497cdf66c4 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -152,6 +152,11 @@ struct PGPROC int syncRepState; /* wait state for sync rep */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + /* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */ + SnapshotToken waitSnapshotToken; + SnapshotSafety snapshotSafety; /* space for result */ + SHM_QUEUE safetyLinks; /* list link for GetSafeSnapshot */ + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of -- 2.17.0