From 08761e6371fdb3e1485b9127948d7ae444749421 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 28 Mar 2024 20:19:22 +0800 Subject: [PATCH v2] advance the restart_lsn of synced slots using logical decoding --- src/backend/replication/logical/logical.c | 5 +- src/backend/replication/logical/slotsync.c | 65 ++++++++++++------- src/backend/replication/slotfuncs.c | 16 ++++- src/include/replication/slot.h | 3 + .../t/040_standby_failover_slots_sync.pl | 8 +-- 5 files changed, 64 insertions(+), 33 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 51ffb623c0..2a691e95e5 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -36,6 +36,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -516,7 +517,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use physical replication slot for logical decoding"))); - if (slot->data.database != MyDatabaseId) + if (slot->data.database != MyDatabaseId && !fast_forward) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slot \"%s\" was not created in this database", @@ -526,7 +527,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, * Do not allow consumption of a "synchronized" slot until the standby * gets promoted. */ - if (RecoveryInProgress() && slot->data.synced) + if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots()) ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use replication slot \"%s\" for logical decoding", diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 30480960c5..7c6ec06e13 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -25,6 +25,12 @@ * which slot sync worker can perform the sync periodically or user can call * pg_sync_replication_slots() periodically to perform the syncs. * + * If synchronized slots fail to build a consistent snapshot from the + * restart_lsn, they would become unreliable after promotion due to potential + * data loss from changes before reaching a consistent point. So, we mark such + * slots as RS_TEMPORARY. Once they successfully reach the consistent point, + * they will be marked to RS_PERSISTENT. + * * The slot sync worker waits for some time before the next synchronization, * with the duration varying based on whether any slots were updated during * the last cycle. Refer to the comments above wait_for_slot_activity() for @@ -149,26 +155,36 @@ static void slotsync_failure_callback(int code, Datum arg); * local slot) return false, otherwise true. */ static bool -update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *found_consistent_point) { ReplicationSlot *slot = MyReplicationSlot; - bool xmin_changed; - bool restart_lsn_changed; NameData plugin_name; + bool updated_lsn = false; Assert(slot->data.invalidated == RS_INVAL_NONE); - xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin); - restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn); + if (remote_slot->confirmed_lsn != slot->data.confirmed_flush) + { + /* + * By advancing the restart_lsn, confirmed_lsn, and xmin using + * fast-forward logical decoding, we can verify whether a consistent + * snapshot can be built. This process also involves saving necessary + * snapshots to disk during decoding, ensuring that logical decoding + * efficiently reaches a consistent point at the restart_lsn without + * the potential loss of data during snapshot creation. + */ + pg_logical_replication_slot_advance(remote_slot->confirmed_lsn, + found_consistent_point); + ReplicationSlotsComputeRequiredLSN(); + updated_lsn = true; + } - if (!xmin_changed && - !restart_lsn_changed && - remote_dbid == slot->data.database && + if (remote_dbid == slot->data.database && remote_slot->two_phase == slot->data.two_phase && remote_slot->failover == slot->data.failover && - remote_slot->confirmed_lsn == slot->data.confirmed_flush && strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0) - return false; + return updated_lsn; /* Avoid expensive operations while holding a spinlock. */ namestrcpy(&plugin_name, remote_slot->plugin); @@ -178,18 +194,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) slot->data.database = remote_dbid; slot->data.two_phase = remote_slot->two_phase; slot->data.failover = remote_slot->failover; - slot->data.restart_lsn = remote_slot->restart_lsn; - slot->data.confirmed_flush = remote_slot->confirmed_lsn; - slot->data.catalog_xmin = remote_slot->catalog_xmin; - slot->effective_catalog_xmin = remote_slot->catalog_xmin; SpinLockRelease(&slot->mutex); - if (xmin_changed) - ReplicationSlotsComputeRequiredXmin(false); - - if (restart_lsn_changed) - ReplicationSlotsComputeRequiredLSN(); - return true; } @@ -413,6 +419,7 @@ static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; + bool found_consistent_point = false; /* * Check if the primary server has caught up. Refer to the comment atop @@ -443,9 +450,19 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } - /* First time slot update, the function must return true */ - if (!update_local_synced_slot(remote_slot, remote_dbid)) - elog(ERROR, "failed to update slot"); + (void) update_local_synced_slot(remote_slot, remote_dbid, + &found_consistent_point); + + /* + * Don't persist the slot if it cannot reach the consistent point from the + * restart_lsn. + */ + if (!found_consistent_point) + { + elog(DEBUG1, "The synced slot could not find consistent point from %X/%X", + LSN_FORMAT_ARGS(slot->data.restart_lsn)); + return false; + } ReplicationSlotPersist(); @@ -578,7 +595,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) LSN_FORMAT_ARGS(remote_slot->restart_lsn)); /* Make sure the slot changes persist across server restart */ - if (update_local_synced_slot(remote_slot, remote_dbid)) + if (update_local_synced_slot(remote_slot, remote_dbid, NULL)) { ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index da57177c25..de4c471429 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -501,9 +501,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * because we need to digest WAL to advance restart_lsn allowing to recycle * WAL and removal of old catalog tuples. As decoding is done in fast_forward * mode, no changes are generated anyway. + * + * *found_consistent_point will be set to true if the logical decoding reaches + * the consistent point; Otherwise, it will be set to false. */ -static XLogRecPtr -pg_logical_replication_slot_advance(XLogRecPtr moveto) +XLogRecPtr +pg_logical_replication_slot_advance(XLogRecPtr moveto, + bool *found_consistent_point) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; @@ -511,6 +515,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) Assert(moveto != InvalidXLogRecPtr); + if (found_consistent_point) + *found_consistent_point = false; + PG_TRY(); { /* @@ -567,6 +574,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) CHECK_FOR_INTERRUPTS(); } + if (DecodingContextReady(ctx) && found_consistent_point) + *found_consistent_point = true; + /* * Logical decoding could have clobbered CurrentResourceOwner during * transaction management, so restore the executor's value. (This is @@ -680,7 +690,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Do the actual slot update, depending on the slot type */ if (OidIsValid(MyReplicationSlot->data.database)) - endlsn = pg_logical_replication_slot_advance(moveto); + endlsn = pg_logical_replication_slot_advance(moveto, NULL); else endlsn = pg_physical_replication_slot_advance(moveto); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 7b937d1a0c..534f1301b2 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -282,4 +282,7 @@ extern bool SlotExistsInStandbySlotNames(const char *slot_name); extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel); extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); +extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto, + bool *found_consistent_point); + #endif /* SLOT_H */ diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index f47bfd78eb..0e70bcc075 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -280,11 +280,11 @@ is( $standby1->safe_psql( 'logical slot is re-synced'); # Reset the log_min_messages to the default value. -$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'"); -$primary->reload; +#$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'"); +#$primary->reload; -$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'"); -$standby1->reload; +#$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'"); +#$standby1->reload; ################################################## # Test that a synchronized slot can not be decoded, altered or dropped by the -- 2.30.0.windows.2