From 101667ff9e8cf2401b910672fc44f446b55e684d Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 4 Feb 2024 11:39:20 +0800 Subject: [PATCH v78 1/4] Add a slot synchronization function. This commit introduces a new SQL function pg_sync_replication_slots() which is used to synchronize the logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. A new 'synced' flag is introduced for replication slots, indicating whether the slot has been synchronized from the primary server. On a standby, synced slots cannot be dropped or consumed, and any attempt to perform logical decoding on them will result in an error. The logical replication slots on the primary can be synchronized to the hot standby by enabling the failover option during slot creation and calling pg_sync_replication_slots() function on the standby. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby, hot_standby_feedback must be enabled on the standby and a valid dbname must be specified in primary_conninfo. If a logical slot is invalidated on the primary, then that slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby, then that slot is dropped and can be recreated on the standby in next pg_sync_replication_slots() call provided the slot still exists on the primary server. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). This situation may occur due to the following reasons: - The max_slot_wal_keep_size on the standby is insufficient to retain WAL records from the restart_lsn of the slot. - primary_slot_name is temporarily reset to null and the physical slot is removed. - The primary changes wal_level to a level lower than logical. The slots synchronization status on the standby can be monitored using 'synced' column of pg_replication_slots view. --- doc/src/sgml/config.sgml | 9 +- doc/src/sgml/func.sgml | 16 + doc/src/sgml/logicaldecoding.sgml | 38 + doc/src/sgml/protocol.sgml | 6 +- doc/src/sgml/system-views.sgml | 22 +- src/backend/access/transam/xlog.c | 5 +- src/backend/catalog/system_views.sql | 3 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/logical.c | 12 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 938 ++++++++++++++++++ src/backend/replication/slot.c | 60 +- src/backend/replication/slotfuncs.c | 26 +- src/backend/replication/walreceiverfuncs.c | 16 + src/backend/replication/walsender.c | 19 +- src/include/catalog/pg_proc.dat | 10 +- src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 17 +- src/include/replication/walreceiver.h | 1 + src/include/replication/walsender.h | 3 + .../t/040_standby_failover_slots_sync.pl | 63 ++ src/test/regress/expected/rules.out | 5 +- 22 files changed, 1237 insertions(+), 35 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 61038472c5..a1e2e6e69f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4612,8 +4612,13 @@ ANY num_sync ( ), + it is also necessary to specify a valid dbname + in the primary_conninfo string. This will only be + used for slot synchronization. It is ignored for streaming. This parameter can only be set in the postgresql.conf diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 6788ba8ef4..7b8896b86f 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -28439,6 +28439,22 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset record is flushed along with its transaction. + + + + + pg_sync_replication_slots + + pg_sync_replication_slots () + void + + + Synchronize the logical failover slots from the primary server to the standby server. + This function can only be executed on the standby server. See + for details. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..5e8e4f036b 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -358,6 +358,44 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU So if a slot is no longer required it should be dropped. + + + + + Replication Slot Synchronization + + A logical replication slot on the primary can be synchronized to the hot + standby by enabling the failover option during slot + creation and calling pg_sync_replication_slots + on the standby. For the synchronization + to work, it is mandatory to have a physical replication slot between the + primary and the standby, and + hot_standby_feedback + must be enabled on the standby. It is also necessary to specify a valid + dbname in the + primary_conninfo. + + + + The ability to resume logical replication after failover depends upon the + pg_replication_slots.synced + value for the synchronized slots on the standby at the time of failover. + Only persistent slots that have attained synced state as true on the standby + before failover can be used for logical replication after failover. + Temporary slots will be dropped, therefore logical replication for those + slots cannot be resumed. For example, if the synchronized slot could not + become persistent on the standby due to a disabled subscription, then the + subscription cannot be resumed after failover even when it is enabled. + + + + To resume logical replication after failover from the synced logical + slots, the subscription's 'conninfo' must be altered to point to the + new primary server. This is done using + ALTER SUBSCRIPTION ... CONNECTION. + It is recommended that subscriptions are first disabled before promoting + the standby and are enabled back after altering the connection string. + diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index bb4fef1f51..f8ef2ad2ab 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2065,7 +2065,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" FAILOVER [ boolean ] - If true, the slot is enabled to be synced to the standbys. + If true, the slot is enabled to be synced to the standbys + so that logical replication can be resumed after failover. The default is false. @@ -2165,7 +2166,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" FAILOVER [ boolean ] - If true, the slot is enabled to be synced to the standbys. + If true, the slot is enabled to be synced to the standbys + so that logical replication can be resumed after failover. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index dd468b31ea..4ea2177b34 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2561,10 +2561,28 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx failover bool - True if this is a logical slot enabled to be synced to the standbys. - Always false for physical slots. + True if this is a logical slot enabled to be synced to the standbys + so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + + + synced bool + + + True if this is a logical slot that was synced from a primary server. + + + On a hot standby, the slots with the synced column marked as true can + neither be used for logical decoding nor dropped by the user. The value + of this column has no meaning on the primary server; the column value on + the primary is default false for all slots but may (if leftover from a + promoted standby) also be true. + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 478377c4a2..2d66d0d84b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -3596,6 +3596,9 @@ XLogGetLastRemovedSegno(void) /* * Return the oldest WAL segment on the given TLI that still exists in * XLOGDIR, or 0 if none. + * + * If the given TLI is 0, return the oldest WAL segment among all the currently + * existing WAL segments. */ XLogSegNo XLogGetOldestSegno(TimeLineID tli) @@ -3619,7 +3622,7 @@ XLogGetOldestSegno(TimeLineID tli) wal_segment_size); /* Ignore anything that's not from the TLI of interest. */ - if (tli != file_tli) + if (tli != 0 && tli != file_tli) continue; /* If it's the oldest so far, update oldest_segno. */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6791bff9dd..04227a72d1 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflict_reason, - L.failover + L.failover, + L.synced FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ca09c683f1..5aefb10ecb 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && slot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical" + " decoding", NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot.")); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 1050eb2c09..3dec36a6de 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..4716696817 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,938 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * Functionality for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot synchronization on a physical standby + * to fetch logical failover slots information from the primary server, create + * the slots on the standby and synchronize them periodically. + * + * While creating the slot on physical standby, if the local restart_lsn and/or + * local catalog_xmin is ahead of those on the remote then we cannot create the + * local slot in sync with the primary server because that would mean moving + * the local slot backwards and the standby might not have WALs retained for + * old LSN. In this case, the slot will be marked as RS_TEMPORARY. Once the + * primary server catches up, the slot will be marked as RS_PERSISTENT (which + * means sync-ready) and we can perform the sync periodically. + * + * The slots that were synchronized will be dropped if they are currently not + * needed to be synchronized. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include + +#include "access/genam.h" +#include "access/table.h" +#include "access/xlog_internal.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "libpq/pqsignal.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/fork_process.h" +#include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/ps_status.h" +#include "utils/timeout.h" +#include "utils/varlena.h" + +/* Flag to tell if we are syncing replication slots */ +static bool syncing_slots = false; + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* + * If necessary, update local slot metadata based on the data from the remote + * slot. + * + * If no update was needed (the data of the remote slot is the same as the + * local slot) return false, otherwise true. + */ +static bool +local_slot_update(RemoteSlot * remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot = MyReplicationSlot; + NameData plugin_name; + + Assert(slot->data.invalidated == RS_INVAL_NONE); + + if (strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0 && + remote_dbid == slot->data.database && + remote_slot->restart_lsn == slot->data.restart_lsn && + remote_slot->catalog_xmin == slot->data.catalog_xmin && + remote_slot->two_phase == slot->data.two_phase && + remote_slot->failover == slot->data.failover && + remote_slot->confirmed_lsn == slot->data.confirmed_flush) + return false; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.plugin = plugin_name; + slot->data.database = remote_dbid; + slot->data.two_phase = remote_slot->two_phase; + slot->data.failover = remote_slot->failover; + slot->data.restart_lsn = remote_slot->restart_lsn; + slot->data.confirmed_flush = remote_slot->confirmed_lsn; + slot->data.catalog_xmin = remote_slot->catalog_xmin; + slot->effective_catalog_xmin = remote_slot->catalog_xmin; + SpinLockRelease(&slot->mutex); + + if (remote_slot->catalog_xmin != slot->data.catalog_xmin) + ReplicationSlotsComputeRequiredXmin(false); + + if (remote_slot->restart_lsn != slot->data.restart_lsn) + ReplicationSlotsComputeRequiredLSN(); + + return true; +} + +/* + * Get list of local logical slots which are synchronized from + * the primary server. + */ +static List * +get_local_synced_slots(void) +{ + List *local_slots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is a synchronized slot */ + if (s->in_use && s->data.synced) + { + Assert(SlotIsLogical(s)); + local_slots = lappend(local_slots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return local_slots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if the slot on the standby server was invalidated while the + * corresponding remote slot in the list remained valid. If found so, it sets + * the locally_invalidated flag to true. + */ +static bool +check_sync_slot_on_remote(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + foreach_ptr(RemoteSlot, remote_slot, remote_slots) + { + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * If remote slot is not invalidated but local slot is marked as + * invalidated, then set the bool. + */ + SpinLockAcquire(&local_slot->mutex); + *locally_invalidated = + (remote_slot->invalidated == RS_INVAL_NONE) && + (local_slot->data.invalidated != RS_INVAL_NONE); + SpinLockRelease(&local_slot->mutex); + + return true; + } + } + + return false; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Additionally, it drops slots that are valid on the primary but got + * invalidated on the standby. This situation may occur due to the following + * reasons: + * - The max_slot_wal_keep_size on the standby is insufficient to retain WAL + * records from the restart_lsn of the slot. + * - primary_slot_name is temporarily reset to null and the physical slot is + * removed. + * - The primary changes wal_level to a level lower than logical. + * + * The assumption is that these dropped slots will get recreated in next + * sync-cycle and it is okay to drop and recreate such slots as long as these + * are not consumable on the standby (which is the case currently). + */ +static void +drop_obsolete_slots(List *remote_slot_list) +{ + List *local_slots = get_local_synced_slots(); + + foreach_ptr(ReplicationSlot, local_slot, local_slots) + { + bool remote_exists = false; + bool locally_invalidated = false; + + remote_exists = check_sync_slot_on_remote(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!remote_exists || locally_invalidated) + { + bool synced_slot; + + /* + * Use shared lock to prevent a conflict with + * ReplicationSlotsDropDBSlots(), trying to drop the same slot + * during a drop-database operation. + */ + LockSharedObject(DatabaseRelationId, local_slot->data.database, + 0, AccessShareLock); + + /* + * There is a possibility of parallel database drop and + * re-creation of new slot by user in the small window between + * getting the slot to drop and locking the db. This new + * user-created slot may end up using the same shared memory as + * that of 'local_slot'. Thus check if local_slot is still the + * synced one before performing actual drop. + */ + SpinLockAcquire(&local_slot->mutex); + synced_slot = local_slot->in_use && local_slot->data.synced; + SpinLockRelease(&local_slot->mutex); + if (synced_slot) + { + ReplicationSlotAcquire(NameStr(local_slot->data.name), true); + ReplicationSlotDropAcquired(); + } + + UnlockSharedObject(DatabaseRelationId, local_slot->data.database, + 0, AccessShareLock); + + ereport(LOG, + errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database)); + } + } +} + +/* + * Reserve WAL for the currently active slot using the specified WAL location + * (restart_lsn). + * + * If the given WAL location has been removed, reserve WAL using the oldest + * existing WAL segment. + */ +static void +reserve_wal_for_slot(XLogRecPtr restart_lsn) +{ + XLogSegNo oldest_segno; + XLogSegNo segno; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn)); + + while (true) + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + + /* Prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + + /* + * Find the oldest existing WAL segment file. + * + * Normally, we can determine it by using the last removed segment + * number. However, if no WAL segment files have been removed by a + * checkpoint since startup, we need to search for the oldest segment + * file currently existing in XLOGDIR. + */ + oldest_segno = XLogGetLastRemovedSegno() + 1; + + if (oldest_segno == 1) + oldest_segno = XLogGetOldestSegno(0); + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + if (segno >= oldest_segno) + break; + + /* Retry using the location of the oldest wal segment */ + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn); + } +} + +/* + * Update the LSNs and persist the slot for further syncs if the remote + * restart_lsn and catalog_xmin have caught up with the local ones, otherwise + * do nothing. + * + * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise + * false. + */ +static bool +update_and_persist_slot(RemoteSlot * remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot = MyReplicationSlot; + + /* + * Check if the primary server has caught up. Refer to the comment atop + * the file for details on this check. + * + * We also need to check if remote_slot's confirmed_lsn becomes valid. It + * is possible to get null values for confirmed_lsn and catalog_xmin if on + * the primary server the slot is just created with a valid restart_lsn + * and the slot was fetched before before the primary server could set + * valid confirmed_lsn and catalog_xmin. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + /* + * The remote slot didn't catch up to locally reserved position. + * + * We do not drop the slot because the restart_lsn can be ahead of the + * current location when recreating the slot in the next cycle. It may + * take more time to create such a slot. Therefore, we keep this slot + * and attempt the wait and synchronization in the next cycle. + */ + return false; + } + + /* First time slot update, the function must return true */ + if (!local_slot_update(remote_slot, remote_dbid)) + elog(ERROR, "failed to update slot"); + + ReplicationSlotPersist(); + + ereport(LOG, + errmsg("newly created slot \"%s\" is sync-ready now", + remote_slot->name)); + + return true; +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The slot is created as a temporary slot and stays in the same state until the + * the remote_slot catches up with locally reserved position and local slot is + * updated. The slot is then persisted and is considered as sync-ready for + * periodic syncs. + * + * Returns TRUE if the local slot is updated. + */ +static bool +synchronize_one_slot(RemoteSlot * remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot; + bool slot_updated = false; + XLogRecPtr latestFlushPtr; + + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + */ + latestFlushPtr = GetStandbyFlushRecPtr(NULL); + if (remote_slot->confirmed_lsn > latestFlushPtr) + { + /* + * Can get here only if GUC 'standby_slot_names' on the primary server + * was not configured correctly. + */ + ereport(ERROR, + errmsg("skipping slot synchronization as the received slot sync" + " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + return false; + } + + /* Search for the named slot */ + if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) + { + bool synced; + + SpinLockAcquire(&slot->mutex); + synced = slot->data.synced; + SpinLockRelease(&slot->mutex); + + /* User-created slot with the same name exists, raise ERROR. */ + if (!synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization because same" + " name slot \"%s\" already exists on the standby", + remote_slot->name)); + + /* + * The slot has been synchronized before. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + */ + ReplicationSlotAcquire(remote_slot->name, true); + + Assert(slot == MyReplicationSlot); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + } + + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return slot_updated; + } + + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (slot->data.persistency == RS_TEMPORARY) + { + slot_updated = update_and_persist_slot(remote_slot, remote_dbid); + } + + /* Slot ready for sync, so sync it. */ + else + { + /* + * Sanity check: As long as the invalidations are handled + * appropriately as above, this should never happen. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn) + elog(ERROR, + "cannot synchronize local slot \"%s\" LSN(%X/%X)" + " to remote slot's LSN(%X/%X) as synchronization" + " would move it backwards", remote_slot->name, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + LSN_FORMAT_ARGS(remote_slot->restart_lsn)); + + /* Make sure the slot changes persist across server restart */ + if (local_slot_update(remote_slot, remote_dbid)) + { + slot_updated = true; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + } + } + /* Otherwise create the slot first. */ + else + { + NameData plugin_name; + TransactionId xmin_horizon = InvalidTransactionId; + + /* Skip creating the local slot if remote_slot is invalidated already */ + if (remote_slot->invalidated != RS_INVAL_NONE) + return false; + + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, + remote_slot->two_phase, + remote_slot->failover, + true); + + /* For shorter lines. */ + slot = MyReplicationSlot; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.database = remote_dbid; + slot->data.plugin = plugin_name; + SpinLockRelease(&slot->mutex); + + reserve_wal_for_slot(remote_slot->restart_lsn); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + (void) update_and_persist_slot(remote_slot, remote_dbid); + slot_updated = true; + } + + ReplicationSlotRelease(); + + return slot_updated; +} + +/* + * Maps the pg_replication_slots.conflict_reason text value to + * ReplicationSlotInvalidationCause enum value + */ +static ReplicationSlotInvalidationCause +get_slot_invalidation_cause(char *conflict_reason) +{ + Assert(conflict_reason); + + if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0) + return RS_INVAL_WAL_REMOVED; + else if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0) + return RS_INVAL_HORIZON; + else if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0) + return RS_INVAL_WAL_LEVEL; + else + Assert(0); + + /* Keep compiler quiet */ + return RS_INVAL_NONE; +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and updates + * the slots locally. Creates the slots if not present on the standby. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 9 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *tupslot; + StringInfoData s; + List *remote_slot_list = NIL; + bool some_slot_updated = false; + XLogRecPtr latestWalEnd; + bool started_tx = false; + + /* + * The primary_slot_name is not set yet or WALs not received yet. + * Synchronization is not possible if the walreceiver is not started. + */ + latestWalEnd = GetWalRcvLatestWalEnd(); + SpinLockAcquire(&WalRcv->mutex); + if ((WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(latestWalEnd)) + { + SpinLockRelease(&WalRcv->mutex); + return false; + } + SpinLockRelease(&WalRcv->mutex); + + /* The syscache access in walrcv_exec() needs a transaction env. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + initStringInfo(&s); + + /* Construct query to fetch slots with failover enabled. */ + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, failover," + " database, conflict_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch failover logical slots info from the primary server: %s", + res->err)); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + Datum d; + int col = 0; + + remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr : + DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->catalog_xmin = isnull ? InvalidTransactionId : + DatumGetTransactionId(d); + + remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(tupslot, + ++col, &isnull)); + Assert(!isnull); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->invalidated = isnull ? RS_INVAL_NONE : + get_slot_invalidation_cause(TextDatumGetCString(d)); + + /* Sanity check */ + Assert(col == SLOTSYNC_COLUMN_COUNT); + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(tupslot); + } + + /* Drop local slots that no longer need to be synced. */ + drop_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach_ptr(RemoteSlot, remote_slot, remote_slot_list) + { + Oid remote_dbid = get_database_oid(remote_slot->database, false); + + /* + * Use shared lock to prevent a conflict with + * ReplicationSlotsDropDBSlots(), trying to drop the same slot during + * a drop-database operation. + */ + LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); + + some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + + UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); + } + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + if (started_tx) + CommitTransactionCommand(); + + return some_slot_updated; +} + +/* + * Using the specified primary server connection, validates primary_slot_name. + */ +static void +validate_primary_slot(WalReceiverConn *wrconn, int slot_invalid_elevel) +{ +#define PRIMARY_INFO_OUTPUT_COL_COUNT 1 + WalRcvExecResult *res; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *tupslot; + bool valid; + bool started_tx = false; + + /* The syscache access in walrcv_exec() needs a transaction env. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT count(*) = 1" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_type='physical' AND slot_name=%s", + quote_literal_cstr(PrimarySlotName)); + + res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s", + PrimarySlotName, res->err), + errhint("Check if \"primary_slot_name\" is configured correctly.")); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + elog(ERROR, + "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\""); + + valid = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + if (!valid) + ereport(slot_invalid_elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + /* translator: second %s is a GUC variable name */ + errdetail("The primary server slot \"%s\" specified by \"%s\" is not valid.", + PrimarySlotName, "primary_slot_name")); + + ExecClearTuple(tupslot); + walrcv_clear_result(res); + + if (started_tx) + CommitTransactionCommand(); +} + +/* + * Returns true if all necessary GUCs for slot synchronization are set + * appropriately, otherwise returns false. + */ +static bool +validate_slotsync_params(int elevel) +{ + char *dbname; + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (PrimarySlotName == NULL || *PrimarySlotName == '\0') + { + ereport(elevel, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"%s\" must be defined.", "primary_slot_name")); + return false; + } + + /* + * hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + { + ereport(elevel, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"%s\" must be enabled.", "hot_standby_feedback")); + return false; + } + + /* + * Logical decoding requires wal_level >= logical and we currently only + * synchronize logical slots. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + { + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"wal_level\" must be >= logical.")); + return false; + } + + /* + * The primary_conninfo is required to make connection to primary for + * getting slots information. + */ + if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0') + { + ereport(elevel, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"%s\" must be defined.", "primary_conninfo")); + return false; + } + + /* + * The slot synchronization needs a database connection for walrcv_exec to + * work. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + { + ereport(elevel, + + /* + * translator: 'dbname' is a specific option; %s is a GUC variable + * name + */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("'dbname' must be specified in \"%s\".", "primary_conninfo")); + return false; + } + + return true; +} + +/* + * Is current process syncing replication slots ? + */ +bool +IsSyncingReplicationSlots(void) +{ + return syncing_slots; +} + +/* + * Synchronize replication slots with failover enabled to a standby server from + * the primary server. + */ +Datum +pg_sync_replication_slots(PG_FUNCTION_ARGS) +{ + WalReceiverConn *wrconn = NULL; + char *err; + StringInfoData app_name; + + if (!RecoveryInProgress()) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slots can only be synchronized to a standby server")); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + validate_slotsync_params(ERROR); + + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync"); + else + appendStringInfo(&app_name, "%s", "slotsync"); + + /* + * Establish the connection to the primary server for slots + * synchronization. + */ + wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, + app_name.data, &err); + pfree(app_name.data); + + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + syncing_slots = true; + + PG_TRY(); + { + /* + * Using the specified primary server connection, validates the slot + * in primary_slot_name. + */ + validate_primary_slot(wrconn, ERROR); + + (void) synchronize_slots(wrconn); + } + PG_FINALLY(); + { + syncing_slots = false; + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + + PG_RETURN_VOID(); +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 110cb59783..144a178b63 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,7 +46,9 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logicalworker.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -103,7 +105,6 @@ int max_replication_slots = 10; /* the maximum number of replication * slots */ static void ReplicationSlotShmemExit(int code, Datum arg); -static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -250,11 +251,12 @@ ReplicationSlotValidateName(const char *name, int elevel) * user will only get commit prepared. * failover: If enabled, allows the slot to be synced to standbys so * that logical replication can be resumed after failover. + * synced: True if the slot is synchronized from the primary server. */ void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover) + bool two_phase, bool failover, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -263,6 +265,20 @@ ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotValidateName(name, ERROR); + /* + * Do not allow users to create the slots with failover enabled on the + * standby as we do not support sync to the cascading standby. + * + * Slots with failover enabled can still be created when doing slot + * synchronization, as it needs to maintain this value in sync with the + * remote slots. + */ + if (failover && RecoveryInProgress() && !IsSyncingReplicationSlots()) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a replication slot" + " created on the standby")); + /* * If some other backend ran this code concurrently with us, we'd likely * both allocate the same slot, and that would be bad. We'd also be at @@ -315,6 +331,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.synced = synced; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -677,6 +694,16 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + ReplicationSlotDropAcquired(); } @@ -696,6 +723,29 @@ ReplicationSlotAlter(const char *name, bool failover) errmsg("cannot use %s with a physical replication slot", "ALTER_REPLICATION_SLOT")); + if (RecoveryInProgress()) + { + /* + * Do not allow users to alter the slots which are currently being + * synced from the primary to the standby. + */ + if (MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + + /* + * Do not allow users to alter slots to enable failover on the standby + * as we do not support sync to the cascading standby. + */ + if (failover) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a replication slot" + " on the standby")); + } + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.failover = failover; SpinLockRelease(&MyReplicationSlot->mutex); @@ -708,7 +758,7 @@ ReplicationSlotAlter(const char *name, bool failover) /* * Permanently drop the currently acquired replication slot. */ -static void +void ReplicationSlotDropAcquired(void) { ReplicationSlot *slot = MyReplicationSlot; @@ -864,8 +914,8 @@ ReplicationSlotMarkDirty(void) } /* - * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot, - * guaranteeing it will be there after an eventual crash. + * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a + * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash. */ void ReplicationSlotPersist(void) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index eb685089b3..08efc2a1be 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -43,7 +43,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false); + false, false); if (immediately_reserve) { @@ -136,7 +136,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover); + failover, false); /* * Create logical decoding context to find start point or, if we don't @@ -237,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -418,21 +418,23 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) break; case RS_INVAL_WAL_REMOVED: - values[i++] = CStringGetTextDatum("wal_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT); break; case RS_INVAL_HORIZON: - values[i++] = CStringGetTextDatum("rows_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT); break; case RS_INVAL_WAL_LEVEL: - values[i++] = CStringGetTextDatum("wal_level_insufficient"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT); break; } } values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = BoolGetDatum(slot_contents.data.synced); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -700,7 +702,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; - bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -756,7 +757,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); - failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -791,12 +791,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) * We must not try to read WAL, since we haven't reserved it yet -- * hence pass find_startpoint false. confirmed_flush will be set * below, by copying from the source slot. + * + * To avoid potential issues with the slot synchronization when the + * restart_lsn of a replication slot goes backwards, we set the + * failover option to false here. This situation occurs when a slot on + * the primary server is dropped and immediately replaced with a new + * slot of the same name, created by copying from another existing + * slot. However, the slot synchronization will only observe the + * restart_lsn of the same slot going backwards. */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, false, - failover, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 73a7d8f96c..d420a833cd 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -345,6 +345,22 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) return recptr; } +/* + * Returns the latest reported end of WAL on the sender + */ +XLogRecPtr +GetWalRcvLatestWalEnd() +{ + WalRcvData *walrcv = WalRcv; + XLogRecPtr recptr; + + SpinLockAcquire(&walrcv->mutex); + recptr = walrcv->latestWalEnd; + SpinLockRelease(&walrcv->mutex); + + return recptr; +} + /* * Returns the last+1 byte position that walreceiver has written. * This returns a recently written value without taking a lock. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 77c8baa32a..460b3c0be4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -72,6 +72,7 @@ #include "postmaster/interrupt.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -243,7 +244,6 @@ static void WalSndShutdown(void) pg_attribute_noreturn(); static void XLogSendPhysical(void); static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); -static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); static void IdentifySystem(void); static void UploadManifest(void); static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, @@ -1224,7 +1224,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false); + false, false, false); if (reserve_wal) { @@ -1255,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover); + two_phase, failover, false); /* * Do options check early so that we can bail before calling the @@ -3375,14 +3375,17 @@ WalSndDone(WalSndSendDataCallback send_data) } /* - * Returns the latest point in WAL that has been safely flushed to disk, and - * can be sent to the standby. This should only be called when in recovery, - * ie. we're streaming to a cascaded standby. + * Returns the latest point in WAL that has been safely flushed to disk. + * This should only be called when in recovery. + * + * This is called either by cascading walsender to find WAL postion to be sent + * to a cascaded standby or by slot synchronization function to validate remote + * slot's lsn before syncing it locally. * * As a side-effect, *tli is updated to the TLI of the last * replayed WAL record. */ -static XLogRecPtr +XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli) { XLogRecPtr replayPtr; @@ -3391,6 +3394,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) TimeLineID receiveTLI; XLogRecPtr result; + Assert(am_cascading_walsender || IsSyncingReplicationSlots()); + /* * We can safely send what's already been replayed. Also, if walreceiver * is streaming WAL from the same timeline, we can send anything that it diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 29af4ce65d..9c120fc2b7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11127,9 +11127,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', @@ -11212,6 +11212,10 @@ proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool', prosrc => 'pg_logical_emit_message_bytea' }, +{ oid => '9929', descr => 'sync replication slots from the primary to the standby', + proname => 'pg_sync_replication_slots', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => '', + prosrc => 'pg_sync_replication_slots' }, # event triggers { oid => '3566', descr => 'list objects dropped by the current command', diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..7c00f73328 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -22,6 +22,7 @@ extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); +extern bool IsSyncingReplicationSlots(void); extern void HandleParallelApplyMessageInterrupt(void); extern void HandleParallelApplyMessages(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index da4c776492..1f4446aa3a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -52,6 +52,14 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* + * The possible values for 'conflict_reason' returned in + * pg_get_replication_slots. + */ +#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed" +#define SLOT_INVAL_HORIZON_TEXT "rows_removed" +#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient" + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +120,11 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Was this slot synchronized from the primary server? + */ + char synced; + /* * Is this a failover slot (sync candidate for standbys)? Only relevant * for logical slots on the primary server. @@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index b906bb5ce8..9147a8b962 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -498,6 +498,7 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, bool create_temp_slot); extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern XLogRecPtr GetWalRcvWriteRecPtr(void); +extern XLogRecPtr GetWalRcvLatestWalEnd(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 1b58d50b3b..276d8913aa 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -12,6 +12,8 @@ #ifndef _WALSENDER_H #define _WALSENDER_H +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -45,6 +47,7 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); /* * Remember that we want to wakeup walsenders later diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index bc58ff4cab..7036a33ff3 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -97,4 +97,67 @@ my ($result, $stdout, $stderr) = $subscriber1->psql('postgres', ok( $stderr =~ /ERROR: cannot set failover for enabled subscription/, "altering failover is not allowed for enabled subscription"); +################################################## +# Test logical failover slots on the standby +# Configure standby1 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot ->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb1_slot --->| ----> standby1 (connected via streaming replication) +# | lsub1_slot(synced_slot) +################################################## + +my $primary = $publisher; +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby1->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); + +my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; +my $offset = -s $standby1->logfile; + +# Start the standby so that slot syncing can begin +$standby1->start; + +# Generate a log to trigger the walsender to send messages to the walreceiver +# which will update WalRcv->latestWalEnd to a valid number. +$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + +# Confirm that WalRcv->latestWalEnd is valid +ok( $standby1->poll_query_until( + 'postgres', + "SELECT latest_end_lsn IS NOT NULL from pg_stat_wal_receiver;"), + 'the walreceiver has received a message from the primary'); + +$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Wait for the standby to finish sync +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? newly created slot \"lsub1_slot\" is sync-ready now/, + $offset); + +# Confirm that the logical failover slot is created on the standby and is +# flagged as 'synced' +is($standby1->safe_psql('postgres', + q{SELECT synced FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + "t", + 'logical slot has synced as true on standby'); + done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index abc944e8b8..b7488d760e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflict_reason, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover) + l.failover, + l.synced + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1