From 3e4e9c8965d6109f71318a337c1f1e10f2ab67b6 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Sun, 23 Jun 2024 16:18:21 +0000 Subject: [PATCH v41 2/2] Add XID age based replication slot invalidation Till now, postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, choosing a default value for max_slot_wal_keep_size is tricky. Because the amount of WAL a customer generates, and their allocated storage will vary greatly in production, making it difficult to pin down a one-size-fits-all value. It is often easy for developers to set an XID age (age of slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which the slots get invalidated. To achieve the above, postgres introduces a GUC allowing users set slot XID age. The replication slots whose xmin or catalog_xmin has reached the age specified by this setting get invalidated. The invalidation check happens at various locations to help being as latest as possible, these locations include the following: - Whenever the slot is acquired and the slot acquisition errors out if invalidated. - During checkpoint - During vacuum (both command-based and autovacuum) Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com Discussion: https://www.postgresql.org/message-id/20240327150557.GA3994937%40nathanxps13 Discussion: https://www.postgresql.org/message-id/CA%2BTgmoaRECcnyqxAxUhP5dk2S4HX%3DpGh-p-PkA3uc%2BjG_9hiMw%40mail.gmail.com --- doc/src/sgml/config.sgml | 26 ++ doc/src/sgml/system-views.sgml | 8 + src/backend/commands/vacuum.c | 80 +++++ src/backend/replication/slot.c | 151 +++++++- src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/replication/slot.h | 3 + src/test/recovery/t/050_invalidate_slots.pl | 321 +++++++++++++++++- 8 files changed, 583 insertions(+), 17 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 5e7a81a1fd..20d800ce0c 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4587,6 +4587,32 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + replication_slot_xid_age (integer) + + replication_slot_xid_age configuration parameter + + + + + Invalidate replication slots whose xmin (the oldest + transaction that this slot needs the database to retain) or + catalog_xmin (the oldest transaction affecting the + system catalogs that this slot needs the database to retain) has reached + the age specified by this setting. A value of zero (which is default) + disables this feature. Users can set this value anywhere from zero to + two billion. This parameter can only be set in the + postgresql.conf file or on the server command + line. + + + + This invalidation check happens either when the slot is acquired + for use or during vacuum or during checkpoint. + + + + track_commit_timestamp (boolean) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 4867af1b61..0490f9f156 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2587,6 +2587,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx parameter. + + + xid_aged means that the slot's + xmin or catalog_xmin + has reached the age specified by + parameter. + + diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 48f8eab202..9eeb42ac27 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -47,6 +47,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" #include "postmaster/interrupt.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" @@ -116,6 +117,7 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, static double compute_parallel_delay(void); static VacOptValue get_vacoptval_from_boolean(DefElem *def); static bool vac_tid_reaped(ItemPointer itemptr, void *state); +static void try_replication_slot_invalidation(void); /* * GUC check function to ensure GUC value specified is within the allowable @@ -452,6 +454,75 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) MemoryContextDelete(vac_context); } +/* + * Try invalidating replication slots based on current replication slot xmin + * limits once every vacuum cycle. + */ +static void +try_replication_slot_invalidation(void) +{ + TransactionId min_slot_xmin; + TransactionId min_slot_catalog_xmin; + bool can_invalidate = false; + TransactionId cutoff; + TransactionId curr; + + curr = ReadNextTransactionId(); + + /* + * The cutoff can tell how far we can go back from the current transaction + * id till the age. And then, we check whether or not the xmin or + * catalog_xmin falls within the cutoff; if yes, return true, otherwise + * false. + */ + cutoff = curr - replication_slot_xid_age; + + if (!TransactionIdIsNormal(cutoff)) + cutoff = FirstNormalTransactionId; + + ProcArrayGetReplicationSlotXmin(&min_slot_xmin, &min_slot_catalog_xmin); + + /* + * Current replication slot xmin limits can never be larger than the + * current transaction id even in the case of transaction ID wraparound. + */ + Assert(min_slot_xmin <= curr); + Assert(min_slot_catalog_xmin <= curr); + + if (TransactionIdIsNormal(min_slot_xmin) && + TransactionIdPrecedesOrEquals(min_slot_xmin, cutoff)) + can_invalidate = true; + else if (TransactionIdIsNormal(min_slot_catalog_xmin) && + TransactionIdPrecedesOrEquals(min_slot_catalog_xmin, cutoff)) + can_invalidate = true; + + if (can_invalidate) + { + bool invalidated = false; + + /* + * Note that InvalidateObsoleteReplicationSlots is also called as part + * of CHECKPOINT, and emitting ERRORs from within is avoided already. + * Therefore, there is no concern here that any ERROR from + * invalidating replication slots blocks VACUUM. + */ + invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, + 0, + InvalidOid, + InvalidTransactionId); + + if (invalidated) + { + /* + * If any slots have been invalidated, recalculate the resource + * limits. + */ + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } + } +} + /* * Internal entry point for autovacuum and the VACUUM / ANALYZE commands. * @@ -483,6 +554,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, const char *stmttype; volatile bool in_outer_xact, use_own_xacts; + static bool first_time = true; Assert(params != NULL); @@ -594,6 +666,14 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, CommitTransactionCommand(); } + if (params->options & VACOPT_VACUUM && + first_time && + replication_slot_xid_age > 0) + { + try_replication_slot_invalidation(); + first_time = false; + } + /* Turn vacuum cost accounting on or off, and set/clear in_vacuum */ PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e80872f27b..79ac412d8e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -108,10 +108,11 @@ const char *const SlotInvalidationCauses[] = { [RS_INVAL_HORIZON] = "rows_removed", [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", [RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout", + [RS_INVAL_XID_AGE] = "xid_aged", }; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT +#define RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -142,6 +143,7 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 10; /* the maximum number of replication * slots */ int replication_slot_inactive_timeout = 0; +int replication_slot_xid_age = 0; /* * This GUC lists streaming replication standby server slot names that @@ -160,6 +162,9 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static bool ReplicationSlotIsXIDAged(ReplicationSlot *slot, + TransactionId *xmin, + TransactionId *catalog_xmin); static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, @@ -636,8 +641,8 @@ retry: * it gets invalidated now or has been invalidated previously, because * there's no use in acquiring the invalidated slot. * - * XXX: Currently we check for inactive_timeout invalidation here. We - * might need to check for other invalidations too. + * XXX: Currently we check for inactive_timeout and xid_aged invalidations + * here. We might need to check for other invalidations too. */ if (check_for_invalidation) { @@ -648,6 +653,22 @@ retry: InvalidTransactionId, &invalidated); + if (!invalidated && released_lock) + { + /* The slot is still ours */ + Assert(s->active_pid == MyProcPid); + + /* Reacquire the ControlLock */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + released_lock = false; + } + + if (!invalidated) + released_lock = InvalidatePossiblyObsoleteSlot(RS_INVAL_XID_AGE, + s, 0, InvalidOid, + InvalidTransactionId, + &invalidated); + /* * If the slot has been invalidated, recalculate the resource limits. */ @@ -657,7 +678,8 @@ retry: ReplicationSlotsComputeRequiredLSN(); } - if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT) + if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT || + s->data.invalidated == RS_INVAL_XID_AGE) { /* * Release the lock if it's not yet to keep the cleanup path on @@ -665,7 +687,10 @@ retry: */ if (!released_lock) LWLockRelease(ReplicationSlotControlLock); + } + if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT) + { Assert(s->inactive_since > 0); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -675,6 +700,20 @@ retry: timestamptz_to_str(s->inactive_since), replication_slot_inactive_timeout))); } + + if (s->data.invalidated == RS_INVAL_XID_AGE) + { + Assert(TransactionIdIsValid(s->data.xmin) || + TransactionIdIsValid(s->data.catalog_xmin)); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("can no longer get changes from replication slot \"%s\"", + NameStr(s->data.name)), + errdetail("The slot's xmin %u or catalog_xmin %u has reached the age %d specified by \"replication_slot_xid_age\".", + s->data.xmin, + s->data.catalog_xmin, + replication_slot_xid_age))); + } } if (!released_lock) @@ -1546,7 +1585,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, - TimestampTz inactive_since) + TimestampTz inactive_since, + TransactionId xmin, + TransactionId catalog_xmin) { StringInfoData err_detail; bool hint = false; @@ -1583,6 +1624,20 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, timestamptz_to_str(inactive_since), replication_slot_inactive_timeout); break; + case RS_INVAL_XID_AGE: + Assert(TransactionIdIsValid(xmin) || + TransactionIdIsValid(catalog_xmin)); + + if (TransactionIdIsValid(xmin)) + appendStringInfo(&err_detail, _("The slot's xmin %u has reached the age %d specified by \"replication_slot_xid_age\"."), + xmin, + replication_slot_xid_age); + else if (TransactionIdIsValid(catalog_xmin)) + appendStringInfo(&err_detail, _("The slot's catalog_xmin %u has reached the age %d specified by \"replication_slot_xid_age\"."), + catalog_xmin, + replication_slot_xid_age); + + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1627,6 +1682,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; TimestampTz inactive_since = 0; + TransactionId aged_xmin = InvalidTransactionId; + TransactionId aged_catalog_xmin = InvalidTransactionId; for (;;) { @@ -1743,6 +1800,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, Assert(s->active_pid == 0); } break; + case RS_INVAL_XID_AGE: + if (ReplicationSlotIsXIDAged(s, &aged_xmin, &aged_catalog_xmin)) + { + Assert(TransactionIdIsValid(aged_xmin) || + TransactionIdIsValid(aged_catalog_xmin)); + + invalidation_cause = cause; + break; + } + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1831,7 +1898,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReportSlotInvalidation(invalidation_cause, true, active_pid, slotname, restart_lsn, oldestLSN, snapshotConflictHorizon, - inactive_since); + inactive_since, aged_xmin, + aged_catalog_xmin); if (MyBackendType == B_STARTUP) (void) SendProcSignal(active_pid, @@ -1878,7 +1946,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReportSlotInvalidation(invalidation_cause, false, active_pid, slotname, restart_lsn, oldestLSN, snapshotConflictHorizon, - inactive_since); + inactive_since, aged_xmin, + aged_catalog_xmin); /* done with this slot for now */ break; @@ -1902,6 +1971,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs + * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ @@ -2033,14 +2103,20 @@ CheckPointReplicationSlots(bool is_shutdown) * * - Avoid saving slot info to disk two times for each invalidated slot. * - * XXX: Should we move inactive_timeout inavalidation check closer to - * wal_removed in CreateCheckPoint and CreateRestartPoint? + * XXX: Should we move inactive_timeout and xid_aged inavalidation checks + * closer to wal_removed in CreateCheckPoint and CreateRestartPoint? */ invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, InvalidOid, InvalidTransactionId); + if (!invalidated) + invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, + 0, + InvalidOid, + InvalidTransactionId); + if (invalidated) { /* @@ -2052,6 +2128,63 @@ CheckPointReplicationSlots(bool is_shutdown) } } +/* + * Returns true if the given replication slot's xmin or catalog_xmin age is + * more than replication_slot_xid_age. + * + * Note that the caller must hold the replication slot's spinlock to avoid + * race conditions while this function reads xmin and catalog_xmin. + */ +static bool +ReplicationSlotIsXIDAged(ReplicationSlot *slot, TransactionId *xmin, + TransactionId *catalog_xmin) +{ + TransactionId cutoff; + TransactionId curr; + + if (replication_slot_xid_age == 0) + return false; + + curr = ReadNextTransactionId(); + + /* + * Replication slot's xmin and catalog_xmin can never be larger than the + * current transaction id even in the case of transaction ID wraparound. + */ + Assert(slot->data.xmin <= curr); + Assert(slot->data.catalog_xmin <= curr); + + /* + * The cutoff can tell how far we can go back from the current transaction + * id till the age. And then, we check whether or not the xmin or + * catalog_xmin falls within the cutoff; if yes, return true, otherwise + * false. + */ + cutoff = curr - replication_slot_xid_age; + + if (!TransactionIdIsNormal(cutoff)) + cutoff = FirstNormalTransactionId; + + *xmin = InvalidTransactionId; + *catalog_xmin = InvalidTransactionId; + + if (TransactionIdIsNormal(slot->data.xmin) && + TransactionIdPrecedesOrEquals(slot->data.xmin, cutoff)) + { + *xmin = slot->data.xmin; + return true; + } + + if (TransactionIdIsNormal(slot->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(slot->data.catalog_xmin, cutoff)) + { + *catalog_xmin = slot->data.catalog_xmin; + return true; + } + + return false; +} + /* * Load all replication slots from disk into memory at server startup. This * needs to be run before we start crash recovery. diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 4990e73c97..ca210c6bf9 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2973,6 +2973,16 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"replication_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."), + gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.") + }, + &replication_slot_xid_age, + 0, 0, 2000000000, + NULL, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 535fb07385..f04771d65c 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -336,6 +336,7 @@ #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) #replication_slot_inactive_timeout = 0 # in seconds; 0 disables +#replication_slot_xid_age = 0 # - Primary Server - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 56d20e1a78..e757b836c5 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -55,6 +55,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, /* inactive slot timeout has occurred */ RS_INVAL_INACTIVE_TIMEOUT, + /* slot's xmin or catalog_xmin has reached the age */ + RS_INVAL_XID_AGE, } ReplicationSlotInvalidationCause; extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; @@ -233,6 +235,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; extern PGDLLIMPORT int max_replication_slots; extern PGDLLIMPORT char *standby_slot_names; extern PGDLLIMPORT int replication_slot_inactive_timeout; +extern PGDLLIMPORT int replication_slot_xid_age; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl index 4663019c16..18300cfeca 100644 --- a/src/test/recovery/t/050_invalidate_slots.pl +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -89,7 +89,7 @@ $primary->reload; # that nobody has acquired that slot yet, so due to # replication_slot_inactive_timeout setting above it must get invalidated. wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart, - $inactive_timeout); + $inactive_timeout, 'inactive_timeout'); # Set timeout on the standby also to check the synced slots don't get # invalidated due to timeout on the standby. @@ -129,7 +129,7 @@ $standby1->stop; # Wait for the standby's replication slot to become inactive wait_for_slot_invalidation($primary, 'sb1_slot', $logstart, - $inactive_timeout); + $inactive_timeout, 'inactive_timeout'); # Testcase end: Invalidate streaming standby's slot as well as logical failover # slot on primary due to replication_slot_inactive_timeout. Also, check the @@ -197,15 +197,280 @@ $subscriber->stop; # Wait for the replication slot to become inactive and then invalidated due to # timeout. wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart, - $inactive_timeout); + $inactive_timeout, 'inactive_timeout'); # Testcase end: Invalidate logical subscriber's slot due to # replication_slot_inactive_timeout. # ============================================================================= +# ============================================================================= +# Testcase start: Invalidate streaming standby's slot due to replication_slot_xid_age +# GUC. + +# Prepare for the next test +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_inactive_timeout TO '0'; +]); +$primary->reload; + +# Create a standby linking to the primary using the replication slot +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, $backup_name, has_streaming => 1); + +# Enable hs_feedback. The slot should gain an xmin. We set the status interval +# so we'll see the results promptly. +$standby2->append_conf( + 'postgresql.conf', q{ +primary_slot_name = 'sb2_slot' +hot_standby_feedback = on +wal_receiver_status_interval = 1 +}); + +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := 'sb2_slot', immediately_reserve := true); +]); + +$standby2->start; + +# Create some content on primary to move xmin +$primary->safe_psql('postgres', + "CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a"); + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby2); + +$primary->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NOT NULL AND catalog_xmin IS NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'sb2_slot'; +]) or die "Timed out waiting for slot sb2_slot xmin to advance"; + +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 500; +]); +$primary->reload; + +# Stop standby to make the replication slot's xmin on primary to age +$standby2->stop; + +$logstart = -s $primary->logfile; + +# Do some work to advance xids on primary +advance_xids($primary, 'tab_int'); + +# Wait for the replication slot to become inactive and then invalidated due to +# XID age. +wait_for_slot_invalidation($primary, 'sb2_slot', $logstart, 0, 'xid_aged'); + +# Testcase end: Invalidate streaming standby's slot due to replication_slot_xid_age +# GUC. +# ============================================================================= + +# ============================================================================= +# Testcase start: Invalidate logical subscriber's slot due to +# replication_slot_xid_age GUC. + +$publisher = $primary; +$publisher->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 500; +]); +$publisher->reload; + +$subscriber->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +wal_receiver_status_interval = 1 +)); +$subscriber->start; + +# Create tables +$publisher->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)"); +$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)"); + +# Insert some data +$publisher->safe_psql('postgres', + "INSERT INTO test_tbl2 VALUES (generate_series(1, 5));"); + +# Setup logical replication +$publisher_connstr = $publisher->connstr . ' dbname=postgres'; +$publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE test_tbl2"); + +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (slot_name = 'lsub2_slot')" +); + +$subscriber->wait_for_subscription_sync($publisher, 'sub2'); + +$result = + $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl2"); + +is($result, qq(5), "check initial copy was done"); + +$publisher->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NULL AND catalog_xmin IS NOT NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'lsub2_slot'; +]) or die "Timed out waiting for slot lsub2_slot catalog_xmin to advance"; + +$logstart = -s $publisher->logfile; + +# Stop subscriber to make the replication slot on publisher inactive +$subscriber->stop; + +# Do some work to advance xids on publisher +advance_xids($publisher, 'test_tbl2'); + +# Wait for the replication slot to become inactive and then invalidated due to +# XID age. +wait_for_slot_invalidation($publisher, 'lsub2_slot', $logstart, 0, + 'xid_aged'); + +# Testcase end: Invalidate logical subscriber's slot due to +# replication_slot_xid_age GUC. +# ============================================================================= + +# ============================================================================= +# Testcase start: Invalidate logical slot on standby that's being synced from +# the primary due to replication_slot_xid_age GUC. + +$publisher = $primary; + +# Prepare for the next test +$publisher->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 0; +]); +$publisher->reload; + +# Create a standby linking to the primary using the replication slot +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup($primary, $backup_name, has_streaming => 1); + +$standby3->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb3_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := 'sb3_slot', immediately_reserve := true); +]); + +$standby3->start; + +my $standby3_logstart = -s $standby3->logfile; + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby3); + +$subscriber->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +wal_receiver_status_interval = 1 +)); +$subscriber->start; + +# Create tables +$publisher->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)"); +$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)"); + +# Insert some data +$publisher->safe_psql('postgres', + "INSERT INTO test_tbl3 VALUES (generate_series(1, 5));"); + +# Setup logical replication +$publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE test_tbl3"); + +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (slot_name = 'lsub3_sync_slot', failover = true)" +); + +$subscriber->wait_for_subscription_sync($publisher, 'sub3'); + +$result = + $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl3"); + +is($result, qq(5), "check initial copy was done"); + +$publisher->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NULL AND catalog_xmin IS NOT NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'lsub3_sync_slot'; +]) + or die "Timed out waiting for slot lsub3_sync_slot catalog_xmin to advance"; + +# Synchronize the primary server slots to the standby +$standby3->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Confirm that the logical failover slot is created on the standby and is +# flagged as 'synced' and has got catalog_xmin from the primary. +is( $standby3->safe_psql( + 'postgres', + q{SELECT count(*) = 1 FROM pg_replication_slots + WHERE slot_name = 'lsub3_sync_slot' AND synced AND NOT temporary AND + xmin IS NULL AND catalog_xmin IS NOT NULL;} + ), + "t", + 'logical slot has synced as true on standby'); + +my $primary_catalog_xmin = $primary->safe_psql('postgres', + "SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;" +); + +my $stabdby3_catalog_xmin = $standby3->safe_psql('postgres', + "SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;" +); + +is($primary_catalog_xmin, $stabdby3_catalog_xmin, + "check catalog_xmin are same for primary slot and synced slot"); + +# Enable XID age based invalidation on the standby. Note that we disabled the +# same on the primary to check if the invalidation occurs for synced slot on +# the standby. +$standby3->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 500; +]); +$standby3->reload; + +$logstart = -s $standby3->logfile; + +# Do some work to advance xids on primary +advance_xids($primary, 'test_tbl3'); + +# Wait for standby to catch up with the above work +$primary->wait_for_catchup($standby3); + +# Wait for the replication slot to become inactive and then invalidated due to +# XID age. +wait_for_slot_invalidation($standby3, 'lsub3_sync_slot', $logstart, 0, + 'xid_aged'); + +# Note that the replication slot on the primary is still active +$result = $primary->safe_psql('postgres', + "SELECT COUNT(slot_name) = 1 FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND invalidation_reason IS NULL;" +); + +is($result, 't', "check lsub3_sync_slot is still active on primary"); + +# Testcase end: Invalidate logical slot on standby that's being synced from +# the primary due to replication_slot_xid_age GUC. +# ============================================================================= + sub wait_for_slot_invalidation { - my ($node, $slot_name, $offset, $inactive_timeout) = @_; + my ($node, $slot_name, $offset, $inactive_timeout, $reason) = @_; my $name = $node->name; # Wait for the replication slot to become inactive @@ -231,14 +496,15 @@ sub wait_for_slot_invalidation # for the slot to get invalidated. sleep($inactive_timeout); - check_for_slot_invalidation_in_server_log($node, $slot_name, $offset); + check_for_slot_invalidation_in_server_log($node, $slot_name, $offset, + $reason); # Wait for the inactive replication slot to be invalidated $node->poll_query_until( 'postgres', qq[ SELECT COUNT(slot_name) = 1 FROM pg_replication_slots WHERE slot_name = '$slot_name' AND - invalidation_reason = 'inactive_timeout'; + invalidation_reason = '$reason'; ]) or die "Timed out while waiting for inactive slot $slot_name to be invalidated on node $name"; @@ -262,15 +528,33 @@ sub wait_for_slot_invalidation # Check for invalidation of slot in server log sub check_for_slot_invalidation_in_server_log { - my ($node, $slot_name, $offset) = @_; + my ($node, $slot_name, $offset, $reason) = @_; my $name = $node->name; my $invalidated = 0; + my $isrecovery = + $node->safe_psql('postgres', "SELECT pg_is_in_recovery()"); + + chomp($isrecovery); for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) { - $node->safe_psql('postgres', "CHECKPOINT"); + if ($reason eq 'xid_aged' && $isrecovery eq 'f') + { + $node->safe_psql('postgres', "VACUUM"); + } + else + { + $node->safe_psql('postgres', "CHECKPOINT"); + } + if ($node->log_contains( "invalidating obsolete replication slot \"$slot_name\"", + $offset) + || $node->log_contains( + "The slot's xmin .* has reached the age .* specified by \"replication_slot_xid_age\".", + $offset) + || $node->log_contains( + "The slot's catalog_xmin .* has reached the age .* specified by \"replication_slot_xid_age\".", $offset)) { $invalidated = 1; @@ -283,4 +567,25 @@ sub check_for_slot_invalidation_in_server_log ); } +# Do some work for advancing xids on a given node +sub advance_xids +{ + my ($node, $table_name) = @_; + + $node->safe_psql( + 'postgres', qq[ + do \$\$ + begin + for i in 10000..11000 loop + -- use an exception block so that each iteration eats an XID + begin + insert into $table_name values (i); + exception + when division_by_zero then null; + end; + end loop; + end\$\$; + ]); +} + done_testing(); -- 2.34.1