From 4eb7cfcca3e93ba8caba37eb1c3415ef9a764797 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 27 Feb 2025 13:32:35 +0800 Subject: [PATCH v1 3/3] Sync the two_phase_at field of a replication slot to the standby This patch enables slot synchronization to copy this value of two_phase_at field to the corresponding synced slot on the standby server. --- src/backend/replication/logical/slotsync.c | 15 +++++++++++---- .../recovery/t/040_standby_failover_slots_sync.pl | 8 +++++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2c0a7439be4..a2bf89d1ba6 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -139,6 +139,7 @@ typedef struct RemoteSlot bool failover; XLogRecPtr restart_lsn; XLogRecPtr confirmed_lsn; + XLogRecPtr two_phase_at; TransactionId catalog_xmin; /* RS_INVAL_NONE if valid, or the reason of invalidation */ @@ -249,6 +250,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = remote_slot->restart_lsn; slot->data.confirmed_flush = remote_slot->confirmed_lsn; + slot->data.two_phase_at = remote_slot->two_phase_at; slot->data.catalog_xmin = remote_slot->catalog_xmin; SpinLockRelease(&slot->mutex); @@ -276,7 +278,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, if (remote_dbid != slot->data.database || remote_slot->two_phase != slot->data.two_phase || remote_slot->failover != slot->data.failover || - strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0) + strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 || + remote_slot->two_phase_at != slot->data.two_phase_at) { NameData plugin_name; @@ -287,6 +290,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, slot->data.plugin = plugin_name; slot->data.database = remote_dbid; slot->data.two_phase = remote_slot->two_phase; + slot->data.two_phase_at = remote_slot->two_phase_at; slot->data.failover = remote_slot->failover; SpinLockRelease(&slot->mutex); @@ -788,9 +792,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) static bool synchronize_slots(WalReceiverConn *wrconn) { -#define SLOTSYNC_COLUMN_COUNT 9 +#define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, - LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID}; WalRcvExecResult *res; TupleTableSlot *tupslot; @@ -798,7 +802,7 @@ synchronize_slots(WalReceiverConn *wrconn) bool some_slot_updated = false; bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, failover," + " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," " database, invalidation_reason" " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; @@ -853,6 +857,9 @@ synchronize_slots(WalReceiverConn *wrconn) &isnull)); Assert(!isnull); + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, &isnull)); Assert(!isnull); diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 692b746ccd9..160e349b967 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -872,10 +872,16 @@ $subscriber1->safe_psql( ALTER SUBSCRIPTION regress_mysub1 ENABLE; ]); +$primary->wait_for_catchup('regress_mysub1'); + +my $two_phase_at = $primary->safe_psql('postgres', + "SELECT two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot';" +); + # Confirm that two_phase setting of lsub1_slot slot is synced to the standby ok( $standby1->poll_query_until( 'postgres', - "SELECT two_phase from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;" + "SELECT two_phase AND '$two_phase_at' = two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;" ), 'two_phase setting of slot lsub1_slot synced to standby'); -- 2.30.0.windows.2