From ea5acdd80b3a93dd8e9ae69628d237e71e9ad575 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Sat, 16 Mar 2024 03:46:28 +0000 Subject: [PATCH v11 2/4] Add XID age based replication slot invalidation Up until now, postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, choosing a default value for max_slot_wal_keep_size is tricky. Because the amount of WAL a customer generates, and their allocated storage will vary greatly in production, making it difficult to pin down a one-size-fits-all value. It is often easy for developers to set an XID age (age of slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which the slots get invalidated. To achieve the above, postgres uses replication slot xmin (the oldest transaction that this slot needs the database to retain) or catalog_xmin (the oldest transaction affecting the system catalogs that this slot needs the database to retain), and a new GUC max_slot_xid_age. The checkpointer then looks at all replication slots invalidating the slots based on the age set. --- doc/src/sgml/config.sgml | 21 ++++ doc/src/sgml/system-views.sgml | 8 ++ src/backend/access/transam/xlog.c | 10 ++ src/backend/replication/slot.c | 49 +++++++- src/backend/utils/misc/guc_tables.c | 10 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/replication/slot.h | 3 + src/test/recovery/meson.build | 1 + src/test/recovery/t/050_invalidate_slots.pl | 108 ++++++++++++++++++ 9 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 src/test/recovery/t/050_invalidate_slots.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 65a6e6c408..6dd54ffcb7 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4544,6 +4544,27 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + max_slot_xid_age (integer) + + max_slot_xid_age configuration parameter + + + + + Invalidate replication slots whose xmin (the oldest + transaction that this slot needs the database to retain) or + catalog_xmin (the oldest transaction affecting the + system catalogs that this slot needs the database to retain) has reached + the age specified by this setting. A value of zero (which is default) + disables this feature. Users can set this value anywhere from zero to + two billion. This parameter can only be set in the + postgresql.conf file or on the server command + line. + + + + track_commit_timestamp (boolean) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index e685921847..56252b12ee 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2588,6 +2588,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx perform logical decoding. + + + xid_aged means that the slot's + xmin or catalog_xmin + has reached the age specified by + parameter. + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..36ae2ac6a4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7147,6 +7147,11 @@ CreateCheckPoint(int flags) if (PriorRedoPtr != InvalidXLogRecPtr) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); + /* Invalidate replication slots based on xmin or catalog_xmin age */ + if (max_slot_xid_age > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, + InvalidOid, InvalidTransactionId); + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. @@ -7597,6 +7602,11 @@ CreateRestartPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + /* Invalidate replication slots based on xmin or catalog_xmin age */ + if (max_slot_xid_age > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, + InvalidOid, InvalidTransactionId); + /* * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 4f1a17f6ce..dc37586dcc 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = { [RS_INVAL_WAL_REMOVED] = "wal_removed", [RS_INVAL_HORIZON] = "rows_removed", [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", + [RS_INVAL_XID_AGE] = "xid_aged", }; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL +#define RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -140,6 +141,7 @@ ReplicationSlot *MyReplicationSlot = NULL; /* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +int max_slot_xid_age = 0; /* * This GUC lists streaming replication standby server slot names that @@ -158,6 +160,7 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static bool IsSlotXIDAged(TransactionId xmin); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -1483,6 +1486,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_LEVEL: appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server.")); break; + case RS_INVAL_XID_AGE: + appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age.")); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1499,6 +1505,31 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, pfree(err_detail.data); } +/* + * Returns true if slot's passed in xmin/catalog_xmin age is more than + * max_slot_xid_age. + */ +static bool +IsSlotXIDAged(TransactionId xmin) +{ + TransactionId xid_cur; + TransactionId xid_limit; + + if (!TransactionIdIsNormal(xmin)) + return false; + + xid_cur = ReadNextTransactionId(); + xid_limit = xmin + max_slot_xid_age; + + if (xid_limit < FirstNormalTransactionId) + xid_limit += FirstNormalTransactionId; + + if (TransactionIdFollowsOrEquals(xid_cur, xid_limit)) + return true; + + return false; +} + /* * Helper for InvalidateObsoleteReplicationSlots * @@ -1599,6 +1630,21 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, if (SlotIsLogical(s)) conflict = cause; break; + case RS_INVAL_XID_AGE: + { + if (IsSlotXIDAged(s->data.xmin)) + { + conflict = cause; + break; + } + + if (IsSlotXIDAged(s->data.catalog_xmin)) + { + conflict = cause; + break; + } + } + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1752,6 +1798,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical + * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 57d9de4dd9..6b5375909d 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2954,6 +2954,16 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."), + gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.") + }, + &max_slot_xid_age, + 0, 0, 2000000000, + NULL, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 2244ee52f7..b4c928b826 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -334,6 +334,7 @@ #wal_sender_timeout = 60s # in milliseconds; 0 disables #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) +#max_slot_xid_age = 0 # - Primary Server - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 7f25a083ee..614ba0e30b 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_HORIZON, /* wal_level insufficient for slot */ RS_INVAL_WAL_LEVEL, + /* slot's xmin or catalog_xmin has reached the age */ + RS_INVAL_XID_AGE, } ReplicationSlotInvalidationCause; extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; @@ -227,6 +229,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; extern PGDLLIMPORT char *standby_slot_names; +extern PGDLLIMPORT int max_slot_xid_age; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index b1eb77b1ec..708a2a3798 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -51,6 +51,7 @@ tests += { 't/040_standby_failover_slots_sync.pl', 't/041_checkpoint_at_promote.pl', 't/042_low_level_backup.pl', + 't/050_invalidate_slots.pl', ], }, } diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl new file mode 100644 index 0000000000..2f482b56e8 --- /dev/null +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -0,0 +1,108 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Test for replication slots invalidation +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::Cluster; +use Test::More; +use Time::HiRes qw(usleep); + +# Initialize primary node, setting wal-segsize to 1MB +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 1, extra => ['--wal-segsize=1']); +$primary->append_conf( + 'postgresql.conf', q{ +checkpoint_timeout = 1h +}); +$primary->start; +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot('sb1_slot'); +]); + +# Take backup +my $backup_name = 'my_backup'; +$primary->backup($backup_name); + +# Create a standby linking to the primary using the replication slot +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, $backup_name, has_streaming => 1); + +# Enable hs_feedback. The slot should gain an xmin. We set the status interval +# so we'll see the results promptly. +$standby1->append_conf( + 'postgresql.conf', q{ +primary_slot_name = 'sb1_slot' +hot_standby_feedback = on +wal_receiver_status_interval = 1 +}); +$standby1->start; + +# Create some content on primary to move xmin +$primary->safe_psql('postgres', + "CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a"); + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby1); + +$primary->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NOT NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'sb1_slot'; +]) or die "Timed out waiting for slot xmin to advance"; + +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET max_slot_xid_age = 500; +]); +$primary->reload; + +# Stop standby to make the replication slot's xmin on primary to age +$standby1->stop; + +my $logstart = -s $primary->logfile; + +# Do some work to advance xmin +$primary->safe_psql( + 'postgres', q{ +do $$ +begin + for i in 10000..11000 loop + -- use an exception block so that each iteration eats an XID + begin + insert into tab_int values (i); + exception + when division_by_zero then null; + end; + end loop; +end$$; +}); + +my $invalidated = 0; +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $primary->safe_psql('postgres', "CHECKPOINT"); + if ($primary->log_contains( + 'invalidating obsolete replication slot "sb1_slot"', $logstart)) + { + $invalidated = 1; + last; + } + usleep(100_000); +} +ok($invalidated, 'check that slot sb1_slot invalidation has been logged'); + +# Wait for the inactive replication slots to be invalidated. +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' AND + invalidation_reason = 'xid_aged'; +]) + or die + "Timed out while waiting for replication slot sb1_slot to be invalidated"; + +done_testing(); -- 2.34.1