From 05f22dba1351b32422832b648af3af1c37d9edcf Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 19 Mar 2024 18:38:18 +0000 Subject: [PATCH v12 6/7] Add inactive_timeout based replication slot invalidation Up until now, postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, choosing a default value for max_slot_wal_keep_size is tricky. Because the amount of WAL a customer generates, and their allocated storage will vary greatly in production, making it difficult to pin down a one-size-fits-all value. It is often easy for developers to set a timeout of say 1 or 2 or 3 days at slot level, after which the inactive slots get dropped. To achieve the above, postgres uses replication slot metric last_inactive_at (the time at which the slot became inactive), and a new slot level parameter inactive_timeout. The checkpointer then looks at all replication slots invalidating the inactive slots based on the timeout set. --- doc/src/sgml/func.sgml | 12 +- doc/src/sgml/ref/create_subscription.sgml | 4 +- doc/src/sgml/system-views.sgml | 10 +- src/backend/access/transam/xlog.c | 8 + src/backend/replication/slot.c | 22 ++- src/include/replication/slot.h | 2 + src/test/recovery/meson.build | 1 + src/test/recovery/t/050_invalidate_slots.pl | 168 ++++++++++++++++++++ 8 files changed, 217 insertions(+), 10 deletions(-) create mode 100644 src/test/recovery/t/050_invalidate_slots.pl diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 0ece7c8d3d..da316f345d 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -28168,8 +28168,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, inactive_timeout, when set to a non-zero value, specifies the amount of time in seconds the slot is - allowed to be inactive. This function corresponds to the replication - protocol command + allowed to be inactive before getting invalidated. + This function corresponds to the replication protocol command CREATE_REPLICATION_SLOT ... PHYSICAL. @@ -28214,12 +28214,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset failover, when set to true, specifies that this slot is enabled to be synced to the standbys so that logical replication can be resumed after - failover. The optional sixth parameter, + failover. The optional sixth parameter, inactive_timeout, when set to a non-zero value, specifies the amount of time in seconds the slot is - allowed to be inactive. A call to this function has the same effect as - the replication protocol command - CREATE_REPLICATION_SLOT ... LOGICAL. + allowed to be inactive before getting invalidated. + A call to this function has the same effect as the replication protocol + command CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 7be4610921..472592c750 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -421,8 +421,8 @@ CREATE SUBSCRIPTION subscription_name When set to a non-zero value, specifies the amount of time in seconds the associated replication slots (i.e. the main slot and the table - sync slots) in the upstream database are allowed to be inactive. - The default is 0. + sync slots) in the upstream database are allowed to be inactive before + getting invalidated. The default is 0. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index f413c819de..9821c6f77a 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2588,6 +2588,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx perform logical decoding. + + + inactive_timeout means that the slot has been + inactive for the duration specified by slot's + inactive_timeout parameter. + + @@ -2767,7 +2774,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx inactive_timeout integer - The amount of time in seconds the slot is allowed to be inactive. + The amount of time in seconds the slot is allowed to be inactive before + getting invalidated. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..ea4ece22de 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7168,6 +7168,10 @@ CreateCheckPoint(int flags) RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr, checkPoint.ThisTimeLineID); + /* Invalidate inactive replication slots based on timeout */ + InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, + InvalidOid, InvalidTransactionId); + /* * Make more log segments if needed. (Do this after recycling old log * segments, since that may supply some of the needed files.) @@ -7635,6 +7639,10 @@ CreateRestartPoint(int flags) RemoveOldXlogFiles(_logSegNo, RedoRecPtr, endptr, replayTLI); + /* Invalidate inactive replication slots based on timeout */ + InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, + InvalidOid, InvalidTransactionId); + /* * Make more log segments if needed. (Do this after recycling old log * segments, since that may supply some of the needed files.) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 35a186f4bc..26121e939d 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = { [RS_INVAL_WAL_REMOVED] = "wal_removed", [RS_INVAL_HORIZON] = "rows_removed", [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", + [RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout", }; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL +#define RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -1550,6 +1551,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_LEVEL: appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server.")); break; + case RS_INVAL_INACTIVE_TIMEOUT: + appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by slot's inactive_timeout parameter.")); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1666,6 +1670,21 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, if (SlotIsLogical(s)) conflict = cause; break; + case RS_INVAL_INACTIVE_TIMEOUT: + if (s->data.last_inactive_at > 0 && + s->data.inactive_timeout > 0) + { + TimestampTz now; + + Assert(s->data.persistency == RS_PERSISTENT); + Assert(s->active_pid == 0); + + now = GetCurrentTimestamp(); + if (TimestampDifferenceExceeds(s->data.last_inactive_at, now, + s->data.inactive_timeout * 1000)) + conflict = cause; + } + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1819,6 +1838,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical + * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8966188acb..2efb9490c1 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_HORIZON, /* wal_level insufficient for slot */ RS_INVAL_WAL_LEVEL, + /* inactive slot timeout has occurred */ + RS_INVAL_INACTIVE_TIMEOUT, } ReplicationSlotInvalidationCause; extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index b1eb77b1ec..708a2a3798 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -51,6 +51,7 @@ tests += { 't/040_standby_failover_slots_sync.pl', 't/041_checkpoint_at_promote.pl', 't/042_low_level_backup.pl', + 't/050_invalidate_slots.pl', ], }, } diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl new file mode 100644 index 0000000000..d046e1d5d7 --- /dev/null +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -0,0 +1,168 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Test for replication slots invalidation +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::Cluster; +use Test::More; +use Time::HiRes qw(usleep); + +# ============================================================================= +# Testcase start: Invalidate streaming standby's slot due to inactive_timeout +# + +# Initialize primary node +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Avoid checkpoint during the test, otherwise, the test can get unpredictable +$primary->append_conf( + 'postgresql.conf', q{ +checkpoint_timeout = 1h +autovacuum = off +}); +$primary->start; + +# Take backup +my $backup_name = 'my_backup'; +$primary->backup($backup_name); + +# Create a standby linking to the primary using the replication slot +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, $backup_name, has_streaming => 1); +$standby1->append_conf( + 'postgresql.conf', q{ +primary_slot_name = 'sb1_slot' +}); + +# Set timeout so that the slot when inactive will get invalidated after the +# timeout. +my $inactive_timeout = 5; +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot', inactive_timeout := $inactive_timeout); +]); + +$standby1->start; + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby1); + +# The inactive replication slot info should be null when the slot is active +my $result = $primary->safe_psql( + 'postgres', qq[ + SELECT last_inactive_at IS NULL, inactive_timeout = $inactive_timeout + FROM pg_replication_slots WHERE slot_name = 'sb1_slot'; +]); +is($result, "t|t", + 'check the inactive replication slot info for an active slot'); + +my $logstart = -s $primary->logfile; + +# Stop standby to make the replication slot on primary inactive +$standby1->stop; + +# Wait for the inactive replication slot info to be updated +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE last_inactive_at IS NOT NULL + AND slot_name = 'sb1_slot' + AND inactive_timeout = $inactive_timeout; +]) + or die + "Timed out while waiting for inactive replication slot info to be updated"; + +my $invalidated = 0; +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $primary->safe_psql('postgres', "CHECKPOINT"); + if ($primary->log_contains( + 'invalidating obsolete replication slot "sb1_slot"', $logstart)) + { + $invalidated = 1; + last; + } + usleep(100_000); +} +ok($invalidated, 'check that slot sb1_slot invalidation has been logged'); + +# Wait for the inactive replication slots to be invalidated. +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' AND + invalidation_reason = 'inactive_timeout'; +]) + or die + "Timed out while waiting for inactive replication slot sb1_slot to be invalidated"; + +# Testcase end: Invalidate streaming standby's slot due to inactive_timeout +# ============================================================================= + +# ============================================================================= +# Testcase start: Invalidate logical subscriber's slot due to inactive_timeout +my $publisher = $primary; + +# Create subscriber node +my $subscriber = PostgreSQL::Test::Cluster->new('sub'); +$subscriber->init; +$subscriber->start; + +# Create tables +$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); +$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); + +# Insert some data +$subscriber->safe_psql('postgres', + "INSERT INTO test_tbl VALUES (generate_series(1, 5));"); + +# Setup logical replication +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; +$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES"); +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot', inactive_timeout = $inactive_timeout)" +); + +$subscriber->wait_for_subscription_sync($publisher, 'sub'); + +$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl"); + +is($result, qq(5), "check initial copy was done"); + +$logstart = -s $publisher->logfile; + +# Stop subscriber to make the replication slot on publisher inactive +$subscriber->stop; + +# Wait for the inactive replication slot info to be updated +$publisher->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE last_inactive_at IS NOT NULL + AND slot_name = 'lsub1_slot' + AND inactive_timeout = $inactive_timeout; +]) + or die + "Timed out while waiting for inactive replication slot info to be updated"; + +$invalidated = 0; +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $publisher->safe_psql('postgres', "CHECKPOINT"); + if ($publisher->log_contains( + 'invalidating obsolete replication slot "lsub1_slot"', $logstart)) + { + $invalidated = 1; + last; + } + usleep(100_000); +} +ok($invalidated, 'check that slot lsub1_slot invalidation has been logged'); + +# Testcase end: Invalidate logical subscriber's slot due to inactive_timeout +# ============================================================================= + +done_testing(); -- 2.34.1