From cd310711c1565b686416e8bf5a3a5105002e35c3 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Fri, 12 Jan 2024 11:36:41 +0900 Subject: [PATCH v9] Lift initial snapshot limit for logical replication The replication command CREATE_REPLICATION_SLOT often fails with "snapshot too large" error when numerous subtransactions are active. This issue stems from SnapBuildInitialSnapshot attempting to generate a snapshot that includes all active XIDs, including subxids, stored within the xip array. This patch mitigates the constraint by utilizing the same storing method as takenDuringRecovery. --- src/backend/replication/logical/snapbuild.c | 22 +++++++++++++-------- src/backend/storage/ipc/procarray.c | 1 + src/backend/utils/time/snapmgr.c | 15 +++++++++----- src/include/utils/snapshot.h | 3 ++- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index a0b7947d2f..528bdbb662 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -388,6 +388,7 @@ SnapBuildFreeSnapshot(Snapshot snap) Assert(snap->curcid == FirstCommandId); Assert(!snap->suboverflowed); Assert(!snap->takenDuringRecovery); + Assert(!snap->mixed); Assert(snap->regd_count == 0); /* slightly more likely, so it's checked even without c-asserts */ @@ -464,6 +465,7 @@ SnapBuildSnapDecRefcount(Snapshot snap) Assert(snap->curcid == FirstCommandId); Assert(!snap->suboverflowed); Assert(!snap->takenDuringRecovery); + Assert(!snap->mixed); Assert(snap->regd_count == 0); @@ -550,6 +552,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder) snapshot->suboverflowed = false; snapshot->takenDuringRecovery = false; + snapshot->mixed = false; snapshot->copied = false; snapshot->curcid = FirstCommandId; snapshot->active_count = 0; @@ -572,8 +575,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder) Snapshot snap; TransactionId xid; TransactionId safeXid; - TransactionId *newxip; - int newxcnt = 0; + TransactionId *newsubxip; + int newsubxcnt = 0; Assert(XactIsoLevel == XACT_REPEATABLE_READ); Assert(builder->building_full_snapshot); @@ -616,8 +619,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder) MyProc->xmin = snap->xmin; /* allocate in transaction context */ - newxip = (TransactionId *) - palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount()); + newsubxip = (TransactionId *) + palloc(sizeof(TransactionId) * GetMaxSnapshotSubxidCount()); /* * snapbuild.c builds transactions in an "inverted" manner, which means it @@ -638,12 +641,12 @@ SnapBuildInitialSnapshot(SnapBuild *builder) if (test == NULL) { - if (newxcnt >= GetMaxSnapshotXidCount()) + if (newsubxcnt >= GetMaxSnapshotSubxidCount()) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("initial slot snapshot too large"))); - newxip[newxcnt++] = xid; + newsubxip[newsubxcnt++] = xid; } TransactionIdAdvance(xid); @@ -651,8 +654,11 @@ SnapBuildInitialSnapshot(SnapBuild *builder) /* adjust remaining snapshot fields as needed */ snap->snapshot_type = SNAPSHOT_MVCC; - snap->xcnt = newxcnt; - snap->xip = newxip; + snap->xcnt = 0; + snap->xip = NULL; + snap->subxcnt = newsubxcnt; + snap->subxip = newsubxip; + snap->mixed = true; return snap; } diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index e2f71da4a0..744ba27441 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2377,6 +2377,7 @@ GetSnapshotData(Snapshot snapshot) * those newly added transaction ids would be filtered away, so we * need not be concerned about them. */ + snapshot->mixed = true; subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin, xmax); diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 675e81d82d..6ba9fda4ef 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -205,6 +205,7 @@ typedef struct SerializedSnapshotData int32 subxcnt; bool suboverflowed; bool takenDuringRecovery; + bool mixed; CommandId curcid; TimestampTz whenTaken; XLogRecPtr lsn; @@ -519,6 +520,7 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, sourcesnap->subxcnt * sizeof(TransactionId)); CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed; CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery; + CurrentSnapshot->mixed = sourcesnap->mixed; /* NB: curcid should NOT be copied, it's a local matter */ CurrentSnapshot->snapXactCompletionCount = 0; @@ -616,8 +618,7 @@ CopySnapshot(Snapshot snapshot) * snapshot taken during recovery; all the top-level XIDs are in subxip as * well in that case, so we mustn't lose them. */ - if (snapshot->subxcnt > 0 && - (!snapshot->suboverflowed || snapshot->takenDuringRecovery)) + if (snapshot->subxcnt > 0 && (!snapshot->suboverflowed || snapshot->mixed)) { newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff); memcpy(newsnap->subxip, snapshot->subxip, @@ -1226,6 +1227,7 @@ ExportSnapshot(Snapshot snapshot) appendStringInfo(&buf, "sxp:%u\n", children[i]); } appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery); + appendStringInfo(&buf, "mixed:%u\n", snapshot->mixed); /* * Now write the text representation into a file. We first write to a @@ -1502,6 +1504,7 @@ ImportSnapshot(const char *idstr) } snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path); + snapshot.mixed = parseIntFromText("mixed:", &filebuf, path); /* * Do some additional sanity checking, just to protect ourselves. We @@ -1706,7 +1709,7 @@ EstimateSnapshotSpace(Snapshot snapshot) size = add_size(sizeof(SerializedSnapshotData), mul_size(snapshot->xcnt, sizeof(TransactionId))); if (snapshot->subxcnt > 0 && - (!snapshot->suboverflowed || snapshot->takenDuringRecovery)) + (!snapshot->suboverflowed || snapshot->mixed)) size = add_size(size, mul_size(snapshot->subxcnt, sizeof(TransactionId))); @@ -1732,6 +1735,7 @@ SerializeSnapshot(Snapshot snapshot, char *start_address) serialized_snapshot.subxcnt = snapshot->subxcnt; serialized_snapshot.suboverflowed = snapshot->suboverflowed; serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery; + serialized_snapshot.mixed = snapshot->mixed; serialized_snapshot.curcid = snapshot->curcid; serialized_snapshot.whenTaken = snapshot->whenTaken; serialized_snapshot.lsn = snapshot->lsn; @@ -1741,7 +1745,7 @@ SerializeSnapshot(Snapshot snapshot, char *start_address) * taken during recovery - in that case, top-level XIDs are in subxip as * well, and we mustn't lose them. */ - if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery) + if (serialized_snapshot.suboverflowed && !snapshot->mixed) serialized_snapshot.subxcnt = 0; /* Copy struct to possibly-unaligned buffer */ @@ -1806,6 +1810,7 @@ RestoreSnapshot(char *start_address) snapshot->subxcnt = serialized_snapshot.subxcnt; snapshot->suboverflowed = serialized_snapshot.suboverflowed; snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery; + snapshot->mixed = serialized_snapshot.mixed; snapshot->curcid = serialized_snapshot.curcid; snapshot->whenTaken = serialized_snapshot.whenTaken; snapshot->lsn = serialized_snapshot.lsn; @@ -1880,7 +1885,7 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) * Snapshot information is stored slightly differently in snapshots taken * during recovery. */ - if (!snapshot->takenDuringRecovery) + if (!snapshot->mixed) { /* * If the snapshot contains full subxact data, the fastest way to diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 8d1e31e888..85b7648932 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -181,7 +181,8 @@ typedef struct SnapshotData int32 subxcnt; /* # of xact ids in subxip[] */ bool suboverflowed; /* has the subxip array overflowed? */ - bool takenDuringRecovery; /* recovery-shaped snapshot? */ + bool takenDuringRecovery; /* taken on a standby? */ + bool mixed; /* both top and sub xids are mixed in subxip */ bool copied; /* false if it's a static snapshot */ CommandId curcid; /* in my xact, CID < curcid are visible */ -- 2.39.3