From fd2cb48726dd4e1932f8809dfb36e0fe9f922226 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 3 Apr 2024 05:03:22 +0000 Subject: [PATCH v31 1/2] Allow synced slots to have their own inactive_since. --- doc/src/sgml/system-views.sgml | 7 +++ src/backend/replication/logical/slotsync.c | 44 +++++++++++++++++ src/backend/replication/slot.c | 23 +++------ src/test/perl/PostgreSQL/Test/Cluster.pm | 34 +++++++++++++ src/test/recovery/t/019_replslot_limit.pl | 26 +--------- .../t/040_standby_failover_slots_sync.pl | 49 +++++++++++++++++++ 6 files changed, 144 insertions(+), 39 deletions(-) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 3c8dca8ca3..b64274a1fb 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2530,6 +2530,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx The time since the slot has become inactive. NULL if the slot is currently being used. + Note that for slots on the standby that are being synced from a + primary server (whose synced field is + true), the + inactive_since value will get updated + after every synchronization (see + ) + from the corresponding remote slot on the primary. diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 30480960c5..4050bd40f8 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -140,6 +140,7 @@ typedef struct RemoteSlot } RemoteSlot; static void slotsync_failure_callback(int code, Datum arg); +static void update_synced_slots_inactive_since(void); /* * If necessary, update the local synced slot's metadata based on the data @@ -1296,6 +1297,46 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len) Assert(false); } +/* + * Update the inactive_since property for synced slots. + */ +static void +update_synced_slots_inactive_since(void) +{ + TimestampTz now = 0; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is a synchronized slot */ + if (s->in_use && s->data.synced) + { + Assert(SlotIsLogical(s)); + + /* + * We get the current time beforehand and only once to avoid + * system calls overhead while holding the lock. + */ + if (now == 0) + now = GetCurrentTimestamp(); + + /* + * Set the time since the slot has become inactive after shutting + * down slot sync machinery. This helps correctly interpret the + * time if the standby gets promoted without a restart. + */ + SpinLockAcquire(&s->mutex); + s->inactive_since = now; + SpinLockRelease(&s->mutex); + } + } + + LWLockRelease(ReplicationSlotControlLock); +} + /* * Shut down the slot sync worker. */ @@ -1309,6 +1350,7 @@ ShutDownSlotSync(void) if (SlotSyncCtx->pid == InvalidPid) { SpinLockRelease(&SlotSyncCtx->mutex); + update_synced_slots_inactive_since(); return; } SpinLockRelease(&SlotSyncCtx->mutex); @@ -1341,6 +1383,8 @@ ShutDownSlotSync(void) } SpinLockRelease(&SlotSyncCtx->mutex); + + update_synced_slots_inactive_since(); } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index d778c0b921..5549ca9640 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -42,6 +42,7 @@ #include "access/transam.h" #include "access/xlog_internal.h" #include "access/xlogrecovery.h" +#include "access/xlogutils.h" #include "common/file_utils.h" #include "common/string.h" #include "miscadmin.h" @@ -690,13 +691,10 @@ ReplicationSlotRelease(void) } /* - * Set the last inactive time after marking the slot inactive. We don't - * set it for the slots currently being synced from the primary to the - * standby because such slots are typically inactive as decoding is not - * allowed on those. + * Set the time since the slot has become inactive. We get the current + * time beforehand to avoid system call overhead while holding the lock. */ - if (!(RecoveryInProgress() && slot->data.synced)) - now = GetCurrentTimestamp(); + now = GetCurrentTimestamp(); if (slot->data.persistency == RS_PERSISTENT) { @@ -2369,16 +2367,11 @@ RestoreSlotFromDisk(const char *name) slot->active_pid = 0; /* - * We set the last inactive time after loading the slot from the disk - * into memory. Whoever acquires the slot i.e. makes the slot active - * will reset it. We don't set it for the slots currently being synced - * from the primary to the standby because such slots are typically - * inactive as decoding is not allowed on those. + * Set the time since the slot has become inactive after loading the + * slot from the disk into memory. Whoever acquires the slot i.e. + * makes the slot active will reset it. */ - if (!(RecoveryInProgress() && slot->data.synced)) - slot->inactive_since = GetCurrentTimestamp(); - else - slot->inactive_since = 0; + slot->inactive_since = GetCurrentTimestamp(); restored = true; break; diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index b08296605c..ddfc3236f3 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -3276,6 +3276,40 @@ sub create_logical_slot_on_standby =pod +=item $node->get_slot_inactive_since_value(self, slot_name, reference_time) + +Get inactive_since column value for a given replication slot validating it +against optional reference time. + +=cut + +sub get_slot_inactive_since_value +{ + my ($self, $slot_name, $reference_time) = @_; + my $name = $self->name; + + my $inactive_since = $self->safe_psql('postgres', + qq(SELECT inactive_since FROM pg_replication_slots + WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;) + ); + + # Check that the captured time is sane + if (defined $reference_time) + { + is($self->safe_psql('postgres', + qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND + '$inactive_since'::timestamptz >= '$reference_time'::timestamptz;] + ), + 't', + "last inactive time for slot $slot_name is valid on node $name") + or die "could not validate captured inactive_since for slot $slot_name"; + } + + return $inactive_since; +} + +=pod + =item $node->advance_wal(num) Advance WAL of node by given number of segments. diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl index 3b9a306a8b..c8e5e5054d 100644 --- a/src/test/recovery/t/019_replslot_limit.pl +++ b/src/test/recovery/t/019_replslot_limit.pl @@ -443,7 +443,7 @@ $primary4->safe_psql( # Get inactive_since value after the slot's creation. Note that the slot is # still inactive till it's used by the standby below. my $inactive_since = - capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time); + $primary4->get_slot_inactive_since_value($sb4_slot, $slot_creation_time); $standby4->start; @@ -502,7 +502,7 @@ $publisher4->safe_psql('postgres', # Get inactive_since value after the slot's creation. Note that the slot is # still inactive till it's used by the subscriber below. $inactive_since = - capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time); + $publisher4->get_slot_inactive_since_value($lsub4_slot, $slot_creation_time); $subscriber4->start; $subscriber4->safe_psql('postgres', @@ -540,26 +540,4 @@ is( $publisher4->safe_psql( $publisher4->stop; $subscriber4->stop; -# Capture and validate inactive_since of a given slot. -sub capture_and_validate_slot_inactive_since -{ - my ($node, $slot_name, $slot_creation_time) = @_; - - my $inactive_since = $node->safe_psql('postgres', - qq(SELECT inactive_since FROM pg_replication_slots - WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;) - ); - - # Check that the captured time is sane - is( $node->safe_psql( - 'postgres', - qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND - '$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;] - ), - 't', - "last inactive time for an active slot $slot_name is sane"); - - return $inactive_since; -} - done_testing(); diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index f47bfd78eb..50be94e629 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -35,6 +35,13 @@ my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); $subscriber1->init; $subscriber1->start; +# Capture the time before the logical failover slot is created on the +# primary. We later call this publisher as primary anyway. +my $slot_creation_time_on_primary = $publisher->safe_psql( + 'postgres', qq[ + SELECT current_timestamp; +]); + # Create a slot on the publisher with failover disabled $publisher->safe_psql('postgres', "SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);" @@ -174,6 +181,10 @@ $primary->poll_query_until( "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'", 1); +# Capture the inactive_since of the slot from the primary +my $inactive_since_on_primary = + $primary->get_slot_inactive_since_value('lsub1_slot', $slot_creation_time_on_primary); + # Wait for the standby to catch up so that the standby is not lagging behind # the subscriber. $primary->wait_for_replay_catchup($standby1); @@ -181,6 +192,11 @@ $primary->wait_for_replay_catchup($standby1); # Synchronize the primary server slots to the standby. $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); +my $slot_sync_time = $standby1->safe_psql( + 'postgres', qq[ + SELECT current_timestamp; +]); + # Confirm that the logical failover slots are created on the standby and are # flagged as 'synced' is( $standby1->safe_psql( @@ -190,6 +206,19 @@ is( $standby1->safe_psql( "t", 'logical slots have synced as true on standby'); +# Capture the inactive_since of the synced slot on the standby +my $inactive_since_on_standby = + $standby1->get_slot_inactive_since_value('lsub1_slot', $slot_creation_time_on_primary); + +# Synced slot on the standby must get its own inactive_since. +is( $standby1->safe_psql( + 'postgres', + "SELECT '$inactive_since_on_primary'::timestamptz < '$inactive_since_on_standby'::timestamptz AND + '$inactive_since_on_standby'::timestamptz < '$slot_sync_time'::timestamptz;" + ), + "t", + 'synchronized slot has got its own inactive_since'); + ################################################## # Test that the synchronized slot will be dropped if the corresponding remote # slot on the primary server has been dropped. @@ -750,8 +779,28 @@ $primary->reload; $standby1->start; $primary->wait_for_replay_catchup($standby1); +# Capture the time before the standby is promoted +my $promotion_time_on_primary = $standby1->safe_psql( + 'postgres', qq[ + SELECT current_timestamp; +]); + $standby1->promote; +# Capture the inactive_since of the synced slot after the promotion. +# Expectation here is that the slot gets its own inactive_since as part of the +# promotion. We do this check before the slot is enabled on the new primary +# below, otherwise the slot gets active setting inactive_since to NULL. +my $inactive_since_on_new_primary = + $standby1->get_slot_inactive_since_value('lsub1_slot', $promotion_time_on_primary); + +is( $standby1->safe_psql( + 'postgres', + "SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz" + ), + "t", + 'synchronized slot has got its own inactive_since on the new primary after promotion'); + # Update subscription with the new primary's connection info my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; $subscriber1->safe_psql('postgres', -- 2.34.1