From 0df4f4ae04f8d37c623d3a533699966c3cc0479a Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Wed, 22 Mar 2017 13:36:49 +0800 Subject: [PATCH v2] Log catalog_xmin advances before removing catalog tuples Before advancing the effective catalog_xmin we use to remove old catalog tuple versions, make sure it is written to WAL. This allows standbys to know the oldest xid they can safely create a historic snapshot for. They can then refuse to start decoding from a slot or raise a recovery conflict. The catalog_xmin advance is logged in the next xl_running_xacts records, so vacuum of catalogs may be held back up to 10 seconds when a replication slot with catalog_xmin is holding down the global catalog_xmin. --- src/backend/access/heap/rewriteheap.c | 3 +- src/backend/access/rmgrdesc/standbydesc.c | 5 ++- src/backend/access/transam/varsup.c | 1 - src/backend/access/transam/xlog.c | 26 ++++++++++- src/backend/replication/logical/logical.c | 54 +++++++++++++++++++++++ src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 13 ++++++ src/backend/storage/ipc/procarray.c | 68 +++++++++++++++++++++++------ src/backend/storage/ipc/standby.c | 25 +++++++++++ src/bin/pg_controldata/pg_controldata.c | 2 + src/include/access/transam.h | 11 +++++ src/include/catalog/pg_control.h | 1 + src/include/storage/procarray.h | 3 +- src/include/storage/standby.h | 6 +++ src/include/storage/standbydefs.h | 1 + src/test/recovery/t/006_logical_decoding.pl | 15 ++++++- 16 files changed, 214 insertions(+), 22 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index d7f65a5..d1400ec 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state) if (!state->rs_logical_rewrite) return; - ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); + /* Use oldestCatalogXmin here */ + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL); /* * If there are no logical slots in progress we don't need to do anything, diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c index 278546a..4aaae59 100644 --- a/src/backend/access/rmgrdesc/standbydesc.c +++ b/src/backend/access/rmgrdesc/standbydesc.c @@ -21,10 +21,11 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec) { int i; - appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u", + appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u oldestCatalogXmin %u", xlrec->nextXid, xlrec->latestCompletedXid, - xlrec->oldestRunningXid); + xlrec->oldestRunningXid, + xlrec->oldestCatalogXmin); if (xlrec->xcnt > 0) { appendStringInfo(buf, "; %d xacts:", xlrec->xcnt); diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 5efbfbd..4babdf9 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -414,7 +414,6 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) } } - /* * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating? * diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5d58f09..19e0116 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5021,6 +5021,7 @@ BootStrapXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin; SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId); @@ -6611,6 +6612,9 @@ StartupXLOG(void) (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u", checkPoint.oldestXid, checkPoint.oldestXidDB))); ereport(DEBUG1, + (errmsg_internal("oldest catalog-only transaction ID: %u", + checkPoint.oldestCatalogXmin))); + ereport(DEBUG1, (errmsg_internal("oldest MultiXactId: %u, in database %u", checkPoint.oldestMulti, checkPoint.oldestMultiDB))); ereport(DEBUG1, @@ -6628,6 +6632,7 @@ StartupXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin; SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid); @@ -8704,6 +8709,10 @@ CreateCheckPoint(int flags) checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB; LWLockRelease(XidGenLock); + LWLockAcquire(ProcArrayLock, LW_SHARED); + checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin; + LWLockRelease(ProcArrayLock); + LWLockAcquire(CommitTsLock, LW_SHARED); checkPoint.oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid; checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid; @@ -9633,6 +9642,12 @@ xlog_redo(XLogReaderState *record) SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); /* + * There can be no concurrent writers to oldestCatalogXmin during + * recovery, so no need to take ProcArrayLock. + */ + ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin; + + /* * If we see a shutdown checkpoint while waiting for an end-of-backup * record, the backup was canceled and the end-of-backup record will * never arrive. @@ -9729,8 +9744,15 @@ xlog_redo(XLogReaderState *record) checkPoint.oldestMultiDB); if (TransactionIdPrecedes(ShmemVariableCache->oldestXid, checkPoint.oldestXid)) - SetTransactionIdLimit(checkPoint.oldestXid, - checkPoint.oldestXidDB); + SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + + + /* + * There can be no concurrent writers to oldestCatalogXmin during + * recovery, so no need to take ProcArrayLock. + */ + ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin; + /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..76155bf 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -68,6 +68,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void EnsureActiveLogicalSlotValid(void); + /* * Make sure the current settings & environment are capable of doing logical * decoding. @@ -218,6 +220,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlot *slot; LogicalDecodingContext *ctx; MemoryContext old_context; + bool force_standby_snapshot; /* shorter lines... */ slot = MyReplicationSlot; @@ -276,8 +279,21 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotsComputeRequiredXmin(true); + /* + * If this is the first slot created on the master we won't have a + * persistent record of the oldest safe xid for historic snapshots yet. + * Force one to be recorded so that when we go to replay from this slot we + * know it's safe. + */ + force_standby_snapshot = + !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin); + LWLockRelease(ProcArrayLock); + /* Update ShmemVariableCache->oldestCatalogXmin */ + if (force_standby_snapshot) + LogStandbySnapshot(); + /* * tell the snapshot builder to only assemble snapshot once reaching the * running_xact's record with the respective xmin. @@ -376,6 +392,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, start_lsn = slot->data.confirmed_flush; } + EnsureActiveLogicalSlotValid(); + ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, read_page, prepare_write, do_write); @@ -963,3 +981,39 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Test to see if the active logical slot is usable. + */ +static void +EnsureActiveLogicalSlotValid(void) +{ + TransactionId shmem_catalog_xmin; + + Assert(MyReplicationSlot != NULL); + + /* + * A logical slot can become unusable if we're doing logical decoding on a + * standby or using a slot created before we were promoted from standby + * to master. If the master advanced its global catalog_xmin past the + * threshold we need it could've removed catalog tuple versions that + * we'll require to start decoding at our restart_lsn. + * + * We need a barrier so that if we decode in recovery on a standby we + * don't allow new decoding sessions to start after redo has advanced + * the threshold. + */ + if (RecoveryInProgress()) + pg_memory_barrier(); + + shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + + if (!TransactionIdIsValid(shmem_catalog_xmin) || + TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot '%s' requires catalogs removed by master", + NameStr(MyReplicationSlot->data.name)), + errdetail("need catalog_xmin %u, have oldestCatalogXmin %u", + MyReplicationSlot->data.catalog_xmin, shmem_catalog_xmin))); +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 771ac30..c2ad791 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1233,7 +1233,7 @@ XLogWalRcvSendHSFeedback(bool immed) xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN); - ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin); + ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin); if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedes(slot_xmin, xmin)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cfc3fba..cdc5f95 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1658,6 +1658,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) * be energy wasted - the worst lost information can do here is give us * wrong information in a statistics view - we'll just potentially be more * conservative in removing files. + * + * We don't have to do any effective_xmin / effective_catalog_xmin testing + * here either, like for LogicalConfirmReceivedLocation. If we received + * the xmin and catalog_xmin from downstream replication slots we know they + * were already confirmed there, */ } @@ -1778,6 +1783,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } + /* + * If the physical slot is relaying catalog_xmin for logical replication + * slots on the replica it's safe to act on catalog_xmin advances + * immediately too. The replica will only send a new catalog_xmin via + * feedback when it advances its effective_catalog_xmin, so it's done the + * delay-until-confirmed dance for us and knows it won't need the data + * we're protecting from vacuum again. + */ if (!TransactionIdIsNormal(slot->data.catalog_xmin) || !TransactionIdIsNormal(feedbackCatalogXmin) || TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 7c2e1e1..a5b26dd 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -87,7 +87,11 @@ typedef struct ProcArrayStruct /* oldest xmin of any replication slot */ TransactionId replication_slot_xmin; - /* oldest catalog xmin of any replication slot */ + /* + * Oldest catalog xmin of any replication slot + * + * See also ShmemVariableCache->oldestGlobalXmin + */ TransactionId replication_slot_catalog_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ @@ -679,6 +683,18 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid); /* + * Update our knowledge of the oldest xid we can safely create historic + * snapshots for. + * + * There can be no concurrent writers to oldestCatalogXmin during + * recovery, so no need to take ProcArrayLock. + * + * If we allow logical decoding on standbys in future we must raise + * recovery conflicts with catalog_xmin advances here. + */ + ShmemVariableCache->oldestCatalogXmin = running->pendingOldestCatalogXmin; + + /* * Remove stale locks, if any. * * Locks are always assigned to the toplevel xid so we don't need to care @@ -1306,6 +1322,9 @@ TransactionIdIsActive(TransactionId xid) * The return value is also adjusted with vacuum_defer_cleanup_age, so * increasing that setting on the fly is another easy way to make * GetOldestXmin() move backwards, with no consequences for data integrity. + * + * When changing GetOldestXmin, check to see whether RecentGlobalXmin + * computation in GetSnapshotData also needs changing. */ TransactionId GetOldestXmin(Relation rel, int flags) @@ -1493,7 +1512,8 @@ GetMaxSnapshotSubxidCount(void) * older than this are known not running any more. * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all * running transactions, except those running LAZY VACUUM). This is - * the same computation done by GetOldestXmin(true, true). + * the same computation done by + * GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_FLAGS_VACUUM) * RecentGlobalDataXmin: the global xmin for non-catalog tables * >= RecentGlobalXmin * @@ -1700,7 +1720,7 @@ GetSnapshotData(Snapshot snapshot) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; - replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; @@ -1711,6 +1731,9 @@ GetSnapshotData(Snapshot snapshot) * Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. + * + * If you change computation of RecentGlobalXmin here you may need to + * change GetOldestXmin(...) as well. */ if (TransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; @@ -2041,12 +2064,16 @@ GetRunningTransactionData(void) } /* - * It's important *not* to include the limits set by slots here because + * It's important *not* to include the xmin set by slots here because * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those * were to be included here the initial value could never increase because * of a circular dependency where slots only increase their limits when * running xacts increases oldestRunningXid and running xacts only * increases if slots do. + * + * We can include the catalog_xmin limit here; there's no similar + * circularity, and we need it to log xl_running_xacts records for + * standbys. */ CurrentRunningXacts->xcnt = count - subcount; @@ -2055,6 +2082,8 @@ GetRunningTransactionData(void) CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid; CurrentRunningXacts->oldestRunningXid = oldestRunningXid; CurrentRunningXacts->latestCompletedXid = latestCompletedXid; + CurrentRunningXacts->pendingOldestCatalogXmin = + procArray->replication_slot_catalog_xmin; Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid)); Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid)); @@ -2168,14 +2197,14 @@ GetOldestSafeDecodingTransactionId(void) oldestSafeXid = ShmemVariableCache->nextXid; /* - * If there's already a slot pegging the xmin horizon, we can start with - * that value, it's guaranteed to be safe since it's computed by this - * routine initially and has been enforced since. + * If there's already an effectiveCatalogXmin held down by an existing + * replication slot it's definitely safe to start there, and it can't + * advance while we hold ProcArrayLock. */ - if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && - TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, + if (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) && + TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, oldestSafeXid)) - oldestSafeXid = procArray->replication_slot_catalog_xmin; + oldestSafeXid = ShmemVariableCache->oldestCatalogXmin; /* * If we're not in recovery, we walk over the procarray and collect the @@ -2965,18 +2994,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, * * Return the current slot xmin limits. That's useful to be able to remove * data that's older than those limits. + * + * For logical replication slots' catalog_xmin, we return both the effective + * catalog_xmin being used for tuple removal (retained catalog_xmin) and the + * catalog_xmin actually needed by replication slots (needed_catalog_xmin). + * + * retained_catalog_xmin should be older than needed_catalog_xmin but is not + * guaranteed to be if there are replication slots on a replica currently + * attempting to start up and reserve catalogs, outdated replicas sending + * feedback, etc. */ void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin) + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin) { LWLockAcquire(ProcArrayLock, LW_SHARED); if (xmin != NULL) *xmin = procArray->replication_slot_xmin; - if (catalog_xmin != NULL) - *catalog_xmin = procArray->replication_slot_catalog_xmin; + if (retained_catalog_xmin != NULL) + *retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + + if (needed_catalog_xmin != NULL) + *needed_catalog_xmin = procArray->replication_slot_catalog_xmin; LWLockRelease(ProcArrayLock); } diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 8e57f93..819abf7 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -45,6 +45,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason); static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts); static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks); +static void UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin); /* @@ -822,6 +823,7 @@ standby_redo(XLogReaderState *record) running.latestCompletedXid = xlrec->latestCompletedXid; running.oldestRunningXid = xlrec->oldestRunningXid; running.xids = xlrec->xids; + running.pendingOldestCatalogXmin = xlrec->oldestCatalogXmin; ProcArrayApplyRecoveryInfo(&running); } @@ -953,12 +955,24 @@ LogStandbySnapshot(void) /* GetRunningTransactionData() acquired XidGenLock, we must release it */ LWLockRelease(XidGenLock); + /* + * Now that we've recorded our intention to allow cleanup of catalog tuples + * no longer needed by our replication slots we can make the new threshold + * effective for vacuum etc. + */ + UpdateOldestCatalogXmin(running->pendingOldestCatalogXmin); + return recptr; } /* * Record an enhanced snapshot of running transactions into WAL. * + * We also record the value of procArray->replication_slot_catalog_xmin + * obtained from GetRunningTransactionData here, so standbys know we're about + * to advance ShmemVariableCache->oldestCatalogXmin to its value and start + * removing dead catalog tuples below that threshold. + * * The definitions of RunningTransactionsData and xl_xact_running_xacts are * similar. We keep them separate because xl_xact_running_xacts is a * contiguous chunk of memory and never exists fully until it is assembled in @@ -977,6 +991,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts) xlrec.nextXid = CurrRunningXacts->nextXid; xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid; xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid; + xlrec.oldestCatalogXmin = CurrRunningXacts->pendingOldestCatalogXmin; /* Header */ XLogBeginInsert(); @@ -1021,6 +1036,16 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts) return recptr; } +static void +UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + if (TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, pendingOldestCatalogXmin) + || (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) != TransactionIdIsValid(pendingOldestCatalogXmin))) + ShmemVariableCache->oldestCatalogXmin = pendingOldestCatalogXmin; + LWLockRelease(ProcArrayLock); +} + /* * Wholesale logging of AccessExclusiveLocks. Other lock types need not be * logged, as described in backend/storage/lmgr/README. diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 2ea8931..5c7eb77 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -248,6 +248,8 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"), + ControlFile->checkPointCopy.oldestCatalogXmin); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/include/access/transam.h b/src/include/access/transam.h index d25a2dd..a4ecfb7 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -136,6 +136,17 @@ typedef struct VariableCacheData * aborted */ /* + * This field is protected by ProcArrayLock except + * during recovery, when it's set unlocked. + * + * oldestCatalogXmin is the oldest xid it is + * guaranteed to be safe to create a historic + * snapshot for. See also + * procArray->replication_slot_catalog_xmin + */ + TransactionId oldestCatalogXmin; + + /* * These fields are protected by CLogTruncationLock */ TransactionId oldestClogXid; /* oldest it's safe to look up in clog */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 3a25cc8..1fe89ae 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -45,6 +45,7 @@ typedef struct CheckPoint MultiXactOffset nextMultiOffset; /* next free MultiXact offset */ TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */ Oid oldestXidDB; /* database with minimum datfrozenxid */ + TransactionId oldestCatalogXmin; /* catalog retained after this xid */ MultiXactId oldestMulti; /* cluster-wide minimum datminmxid */ Oid oldestMultiDB; /* database with minimum datminmxid */ pg_time_t time; /* time stamp of checkpoint */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 9b42e49..05ace64 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -120,6 +120,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked); extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin); + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin); #endif /* PROCARRAY_H */ diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 3ecc446..7756a27 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -65,6 +65,10 @@ extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids); * is written to WAL as a separate record immediately after each * checkpoint. That means that wherever we start a standby from we will * almost immediately see the data we need to begin executing queries. + * + * Information about the oldest catalog_xmin needed by any replication slot is + * also included here, so we can use it to update the catalog tuple removal + * limit and convey the new limit to standbys. */ typedef struct RunningTransactionsData @@ -75,6 +79,8 @@ typedef struct RunningTransactionsData TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId oldestRunningXid; /* *not* oldestXmin */ TransactionId latestCompletedXid; /* so we can set xmax */ + /* so we can update ShmemVariableCache->oldestCatalogXmin: */ + TransactionId pendingOldestCatalogXmin; TransactionId *xids; /* array of (sub)xids still running */ } RunningTransactionsData; diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h index f8444c7..6153675 100644 --- a/src/include/storage/standbydefs.h +++ b/src/include/storage/standbydefs.h @@ -52,6 +52,7 @@ typedef struct xl_running_xacts TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId oldestRunningXid; /* *not* oldestXmin */ TransactionId latestCompletedXid; /* so we can set xmax */ + TransactionId oldestCatalogXmin; /* oldest safe historic snapshot */ TransactionId xids[FLEXIBLE_ARRAY_MEMBER]; } xl_running_xacts; diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index bf9b50a..2cfa9ac 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 16; +use Test::More tests => 25; # Initialize master node my $node_master = get_new_node('master'); @@ -17,6 +17,10 @@ $node_master->append_conf( wal_level = logical )); $node_master->start; + +command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, + "pg_controldata's oldestCatalogXmin is zero after start"); + my $backup_name = 'master_backup'; $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]); @@ -96,9 +100,18 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0', 'restored slot catalog_xmin is nonzero'); is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3, 'reading from slot with wal_level < logical fails'); +command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, + "pg_controldata's oldestCatalogXmin is nonzero"); is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0, 'can drop logical slot while wal_level = replica'); is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped'); +$node_master->safe_psql('postgres', 'VACUUM;'); +# First checkpoint forces xl_running_xacts with the new oldestCatalogXmin +$node_master->safe_psql('postgres', 'CHECKPOINT;'); +# Then we need a second checkpoint to write the control file with the new value +$node_master->safe_psql('postgres', 'CHECKPOINT;'); +command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, + "pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint"); # done with the node $node_master->stop; -- 2.5.5