diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index bb69683e2a..af3e114fc9 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -359,17 +359,20 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) * checkpoints. */ static XLogRecPtr -pg_physical_replication_slot_advance(XLogRecPtr moveto) +pg_physical_replication_slot_advance(XLogRecPtr moveto, bool *advance_done) { XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; XLogRecPtr retlsn = startlsn; + *advance_done = false; + if (startlsn < moveto) { SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.restart_lsn = moveto; SpinLockRelease(&MyReplicationSlot->mutex); retlsn = moveto; + *advance_done = true; } return retlsn; @@ -387,13 +390,15 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * mode, no changes are generated anyway. */ static XLogRecPtr -pg_logical_replication_slot_advance(XLogRecPtr moveto) +pg_logical_replication_slot_advance(XLogRecPtr moveto, bool *advance_done) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; XLogRecPtr startlsn; XLogRecPtr retlsn; + *advance_done = false; + PG_TRY(); { /* @@ -475,13 +480,16 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * effort to save it for them. * * Dirty the slot so it's written out at the next checkpoint. - * We'll still lose its position on crash, as documented, but it's - * better than always losing the position even on clean restart. + * We'll still lose its position on crash until slot advancing + * is done, as documented in the logical decoding section of the + * docs, but it's better than always losing the position even on + * clean restart. */ ReplicationSlotMarkDirty(); } retlsn = MyReplicationSlot->data.confirmed_flush; + *advance_done = true; /* free context, call shutdown callback */ FreeDecodingContext(ctx); @@ -515,6 +523,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) bool nulls[2]; HeapTuple tuple; Datum result; + bool advance_done; Assert(!MyReplicationSlot); @@ -566,15 +575,15 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Do the actual slot update, depending on the slot type */ if (OidIsValid(MyReplicationSlot->data.database)) - endlsn = pg_logical_replication_slot_advance(moveto); + endlsn = pg_logical_replication_slot_advance(moveto, &advance_done); else - endlsn = pg_physical_replication_slot_advance(moveto); + endlsn = pg_physical_replication_slot_advance(moveto, &advance_done); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; - /* Update the on disk state when lsn was updated. */ - if (XLogRecPtrIsInvalid(endlsn)) + /* Update the on disk state when the slot position has been moved. */ + if (advance_done) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredXmin(false); diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 3c743d7d7c..12b36bc824 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 32; +use Test::More tests => 34; # Initialize master node my $node_master = get_new_node('master'); @@ -344,3 +344,25 @@ is($catalog_xmin, '', is($xmin, '', 'xmin of cascaded slot null with hs feedback reset'); is($catalog_xmin, '', 'catalog xmin of cascaded slot still null with hs_feedback reset'); + +# Test physical slot advancing and its durability. Create a new slot on +# the primary, not used by any of the standbys. This reserves WAL at creation. +my $phys_slot = 'phys_slot'; +$node_master->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('$phys_slot', true);"); +$node_master->psql('postgres', " + CREATE TABLE tab_phys_slot (a int); + INSERT INTO tab_phys_slot VALUES (generate_series(1,10));"); +my $psql_rc = $node_master->psql('postgres', + "SELECT pg_replication_slot_advance('$phys_slot', 'FF/FFFFFFFF');"); +is($psql_rc, '0', 'slot advancing works with physical slot'); +my $phys_restart_lsn_pre = $node_master->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$phys_slot';"); +chomp($phys_restart_lsn_pre); +# Slot advance should persist across restarts. +$node_master->restart; +my $phys_restart_lsn_post = $node_master->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$phys_slot';"); +chomp($phys_restart_lsn_post); +ok(($phys_restart_lsn_pre cmp $phys_restart_lsn_post) == 0, + "physical slot advance persists across restarts"); diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index c23cc4dda7..1dcc98b418 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 10; +use Test::More tests => 12; use Config; # Initialize master node @@ -135,5 +135,26 @@ is($node_master->psql('postgres', 'DROP DATABASE otherdb'), is($node_master->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); +# Test logical slot advancing and its durability. +my $logical_slot = 'logical_slot'; +$node_master->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);"); +$node_master->psql('postgres', " + CREATE TABLE tab_logical_slot (a int); + INSERT INTO tab_logical_slot VALUES (generate_series(1,10));"); +my $psql_rc = $node_master->psql('postgres', + "SELECT pg_replication_slot_advance('$logical_slot', 'FF/FFFFFFFF');"); +is($psql_rc, '0', 'slot advancing works with logical slot'); +my $logical_restart_lsn_pre = $node_master->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';"); +chomp($logical_restart_lsn_pre); +# Slot advance should persists across restarts. +$node_master->restart; +my $logical_restart_lsn_post = $node_master->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';"); +chomp($logical_restart_lsn_post); +ok(($logical_restart_lsn_pre cmp $logical_restart_lsn_post) == 0, + "logical slot advance persists across restarts"); + # done with the node $node_master->stop;