From ac82a55b9e963a89ad8aaca97ad3fdd4db28f874 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 1 Nov 2023 18:13:45 +0800 Subject: [PATCH v30 2/3] Add logical slot sync capability to the physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. All the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker(s) on the standby server ping the primary at regular intervals to get the necessary failover logical slots information and create/update the slots locally. GUC 'enable_syncslot' enables a physical standby to synchronize failover logical replication slots from the primary server. GUC 'max_slotsync_workers' defines the maximum number of slot-sync workers on the standby. This parameter can only be set at server start. The replication launcher on the physical standby queries primary to get the list of dbids for failover logical slots. Once it gets the dbids, if dbids < max_slotsync_workers, it starts only that many workers and if dbids > max_slotsync_workers, it starts max_slotsync_workers and divides the work equally among them. Each worker is then responsible to keep on syncing the logical slots belonging to the DBs assigned to it. Each slot-sync worker will have its own dbids list. Since the upper limit of this dbid-count is not known, it needs to be handled using dsa. We initially allocate memory to hold 100 dbids for each worker. If this limit is exhausted, we reallocate this memory with size incremented again by 100. The nap time of worker is tuned according to the activity on the primary. Each worker starts with nap time of 10ms and if no activity is observed on the primary for some time, then nap time is increased to 10sec. And if activity is observed again, nap time is reduced back to 10ms. Each worker uses one slot (first one assigned to it) for monitoring purpose. If there is no change in lsn of that slot for some threshold time, nap time is increased to 10sec and as soon as a change is observed, nap time is reduced back to 10ms. The logical slots created by slot-sync workers on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby due to conflict (say required rows removed on the primary), then that slot is dropped and recreated on the standby in next sync-cycle. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). --- doc/src/sgml/config.sgml | 56 +- doc/src/sgml/system-views.sgml | 11 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 5 +- .../libpqwalreceiver/libpqwalreceiver.c | 89 ++ src/backend/replication/logical/Makefile | 1 + .../replication/logical/applyparallelworker.c | 3 +- src/backend/replication/logical/launcher.c | 971 +++++++++++++-- src/backend/replication/logical/logical.c | 12 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1042 +++++++++++++++++ src/backend/replication/logical/tablesync.c | 5 +- src/backend/replication/repl_gram.y | 13 + src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 16 +- src/backend/replication/slotfuncs.c | 35 +- src/backend/replication/walsender.c | 76 +- src/backend/storage/lmgr/lwlock.c | 2 + src/backend/storage/lmgr/lwlocknames.txt | 1 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/misc/guc_tables.c | 25 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/catalog/pg_proc.dat | 10 +- src/include/commands/subscriptioncmds.h | 4 + src/include/nodes/replnodes.h | 9 + src/include/replication/logicallauncher.h | 7 +- src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 12 +- src/include/replication/walreceiver.h | 30 + src/include/replication/worker_internal.h | 61 +- src/include/storage/lwlock.h | 1 + src/test/recovery/t/050_verify_slot_order.pl | 121 ++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 5 + 35 files changed, 2497 insertions(+), 145 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7136a925a9..689820b16d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4556,10 +4556,14 @@ ANY num_sync ( num_sync ( + enable_syncslot (boolean) + + enable_syncslot configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked after failover. + + + It is enabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + + + + max_slotsync_workers (integer) + + max_slotsync_workers configuration parameter + + + + + Specifies maximum number of slot synchronization workers. + + + Slot synchronization workers are taken from the pool defined by + max_worker_processes. + + + The default value is 2. This parameter can only be set at server + start. + + + The slot-sync workers are needed for synchronization of logical replication + slots from the primary server to the physical standby so that logical + subscribers are not blocked after failover. + + + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 7ea08942c4..fb2a7ed15a 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2543,6 +2543,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx after failover. Always false for physical slots. + + + + synced_slot bool + + + True if this logical slot is created on the physical standby server as + part of slot-synchronization from the primary server. + Always false for physical slots. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 9c595ca3c9..fe9907f34e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1003,7 +1003,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflicting, - L.failover + L.failover, + L.synced_slot FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 48a9924527..0e039c786f 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -125,11 +125,14 @@ static const struct "ParallelWorkerMain", ParallelWorkerMain }, { - "ApplyLauncherMain", ApplyLauncherMain + "LauncherMain", LauncherMain }, { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 336c2bec99..d217d38641 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,8 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static List *libpqrcv_get_dbinfo_for_failover_slots(WalReceiverConn *conn); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -100,6 +103,8 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbinfo_for_failover_slots = libpqrcv_get_dbinfo_for_failover_slots, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -413,6 +418,90 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get DB info for logical failover slots + * + * It gets the DBIDs for failover logical slots from the primary server. + * The list returned by LIST_DBID_FOR_FAILOVER_SLOTS has no duplicates. + */ +static List * +libpqrcv_get_dbinfo_for_failover_slots(WalReceiverConn *conn) +{ + PGresult *res; + List *slotlist = NIL; + int ntuples; + WalRcvFailoverSlotsData *slot_data; + + res = libpqrcv_PQexec(conn->streamConn, "LIST_DBID_FOR_FAILOVER_SLOTS"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive logical failover slots dbinfo from the primary server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQnfields(res) != 1) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from the primary server"), + errdetail("Could not get logical failover slots dbinfo: got %d fields, " + "expected 1", nfields))); + } + + ntuples = PQntuples(res); + for (int i = 0; i < ntuples; i++) + { + slot_data = palloc0(sizeof(WalRcvFailoverSlotsData)); + if (!PQgetisnull(res, i, 0)) + slot_data->dboid = atooid(PQgetvalue(res, i, 0)); + + slotlist = lappend(slotlist, slot_data); + } + + PQclear(res); + + return slotlist; +} + +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + PQconninfoOption *opt; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (opt = opts; opt->keyword != NULL; ++opt) + { + /* If multiple dbnames are specified, then the last one will be returned */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && opt->val[0] != '\0') + dbname = pstrdup(opt->val); + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 9b37736f8e..192c9e1860 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -922,7 +922,8 @@ ParallelApplyWorkerMain(Datum main_arg) before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); SpinLockAcquire(&MyParallelShared->mutex); - MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; + MyParallelShared->logicalrep_worker_generation = + MyLogicalRepWorker->hdr.generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; SpinLockRelease(&MyParallelShared->mutex); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 501910b445..e7cfe645af 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -22,6 +22,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_authid.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" @@ -57,6 +58,31 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_slotsync_workers = 2; +bool enable_syncslot = true; + +/* + * Local variables to store the current values of slot-sync related GUCs + * before each ConfigReload. + */ +static char *PrimaryConnInfoPreReload = NULL; +static char *PrimarySlotNamePreReload = NULL; +static bool EnableSyncSlotPreReload; +static bool HotStandbyFeedbackPreReload; + +/* + * Initial allocation size for dbids array for each SlotSyncWorker in dynamic + * shared memory. + */ +#define DB_PER_WORKER_ALLOC_INIT 100 + +/* + * Once initially allocated size is exhausted for dbids array, it is extended by + * DB_PER_WORKER_ALLOC_EXTRA size. + */ +#define DB_PER_WORKER_ALLOC_EXTRA 100 + +SlotSyncWorker *MySlotSyncWorker = NULL; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -70,6 +96,7 @@ typedef struct LogicalRepCtxStruct dshash_table_handle last_start_dsh; /* Background workers. */ + SlotSyncWorker *ss_workers; /* slot-sync workers */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -102,6 +129,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static void slotsync_worker_cleanup(SlotSyncWorker *worker); static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); @@ -178,6 +206,8 @@ get_subscription_list(void) } /* + * This is common code for logical workers and slotsync workers. + * * Wait for a background worker to start up and attach to the shmem context. * * This is only needed for cleaning up the shared memory in case the worker @@ -186,12 +216,14 @@ get_subscription_list(void) * Returns whether the attach was successful. */ static bool -WaitForReplicationWorkerAttach(LogicalRepWorker *worker, +WaitForReplicationWorkerAttach(LogicalWorkerHeader *worker, uint16 generation, - BackgroundWorkerHandle *handle) + BackgroundWorkerHandle *handle, + LWLock *lock) { BgwHandleStatus status; int rc; + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false; for (;;) { @@ -199,27 +231,32 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, CHECK_FOR_INTERRUPTS(); - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); /* Worker either died or has started. Return false if died. */ if (!worker->in_use || worker->proc) { - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); return worker->in_use; } - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Check if worker has died before attaching, and clean up after it. */ status = GetBackgroundWorkerPid(handle, &pid); if (status == BGWH_STOPPED) { - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Ensure that this was indeed the worker we waited for. */ if (generation == worker->generation) - logicalrep_worker_cleanup(worker); - LWLockRelease(LogicalRepWorkerLock); + { + if (is_slotsync_worker) + slotsync_worker_cleanup((SlotSyncWorker *) worker); + else + logicalrep_worker_cleanup((LogicalRepWorker *) worker); + } + LWLockRelease(lock); return false; } @@ -262,8 +299,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) if (isParallelApplyWorker(w)) continue; - if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && w->relid == relid && + (!only_running || w->hdr.proc)) { res = w; break; @@ -290,7 +327,8 @@ logicalrep_workers_find(Oid subid, bool only_running) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->in_use && w->subid == subid && (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && + (!only_running || w->hdr.proc)) res = lappend(res, w); } @@ -351,7 +389,7 @@ retry: { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (!w->in_use) + if (!w->hdr.in_use) { worker = w; slot = i; @@ -380,8 +418,8 @@ retry: * If the worker was marked in use but didn't manage to attach in * time, clean it up. */ - if (w->in_use && !w->proc && - TimestampDifferenceExceeds(w->launch_time, now, + if (w->hdr.in_use && !w->hdr.proc && + TimestampDifferenceExceeds(w->hdr.launch_time, now, wal_receiver_timeout)) { elog(WARNING, @@ -437,10 +475,10 @@ retry: /* Prepare the worker slot. */ worker->type = wtype; - worker->launch_time = now; - worker->in_use = true; - worker->generation++; - worker->proc = NULL; + worker->hdr.launch_time = now; + worker->hdr.in_use = true; + worker->hdr.generation++; + worker->hdr.proc = NULL; worker->dbid = dbid; worker->userid = userid; worker->subid = subid; @@ -457,7 +495,7 @@ retry: TIMESTAMP_NOBEGIN(worker->reply_time); /* Before releasing lock, remember generation for future identification. */ - generation = worker->generation; + generation = worker->hdr.generation; LWLockRelease(LogicalRepWorkerLock); @@ -510,7 +548,7 @@ retry: { /* Failed to start worker, so clean up the worker slot. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - Assert(generation == worker->generation); + Assert(generation == worker->hdr.generation); logicalrep_worker_cleanup(worker); LWLockRelease(LogicalRepWorkerLock); @@ -522,19 +560,23 @@ retry: } /* Now wait until it attaches. */ - return WaitForReplicationWorkerAttach(worker, generation, bgw_handle); + return WaitForReplicationWorkerAttach((LogicalWorkerHeader *) worker, + generation, + bgw_handle, + LogicalRepWorkerLock); } /* * Internal function to stop the worker and wait until it detaches from the - * slot. + * slot. It is used for both logical rep workers and slot-sync workers. */ static void -logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) +logicalrep_worker_stop_internal(LogicalWorkerHeader *worker, int signo, + LWLock *lock) { uint16 generation; - Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED)); + Assert(LWLockHeldByMeInMode(lock, LW_SHARED)); /* * Remember which generation was our worker so we can check if what we see @@ -550,7 +592,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) { int rc; - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Wait a bit --- we don't expect to have to wait long. */ rc = WaitLatch(MyLatch, @@ -564,7 +606,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) } /* Recheck worker status. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); /* * Check whether the worker slot is no longer used, which would mean @@ -591,7 +633,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) if (!worker->proc || worker->generation != generation) break; - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Wait a bit --- we don't expect to have to wait long. */ rc = WaitLatch(MyLatch, @@ -604,7 +646,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) CHECK_FOR_INTERRUPTS(); } - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); } } @@ -623,7 +665,9 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (worker) { Assert(!isParallelApplyWorker(worker)); - logicalrep_worker_stop_internal(worker, SIGTERM); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGTERM, + LogicalRepWorkerLock); } LWLockRelease(LogicalRepWorkerLock); @@ -669,8 +713,10 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) /* * Only stop the worker if the generation matches and the worker is alive. */ - if (worker->generation == generation && worker->proc) - logicalrep_worker_stop_internal(worker, SIGINT); + if (worker->hdr.generation == generation && worker->hdr.proc) + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGINT, + LogicalRepWorkerLock); LWLockRelease(LogicalRepWorkerLock); } @@ -696,14 +742,14 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) /* * Wake up (using latch) the specified logical replication worker. * - * Caller must hold lock, else worker->proc could change under us. + * Caller must hold lock, else worker->hdr.proc could change under us. */ void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - SetLatch(&worker->proc->procLatch); + SetLatch(&worker->hdr.proc->procLatch); } /* @@ -718,7 +764,7 @@ logicalrep_worker_attach(int slot) Assert(slot >= 0 && slot < max_logical_replication_workers); MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; - if (!MyLogicalRepWorker->in_use) + if (!MyLogicalRepWorker->hdr.in_use) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -727,7 +773,7 @@ logicalrep_worker_attach(int slot) slot))); } - if (MyLogicalRepWorker->proc) + if (MyLogicalRepWorker->hdr.proc) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -736,7 +782,7 @@ logicalrep_worker_attach(int slot) "another worker, cannot attach", slot))); } - MyLogicalRepWorker->proc = MyProc; + MyLogicalRepWorker->hdr.proc = MyProc; before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); LWLockRelease(LogicalRepWorkerLock); @@ -771,7 +817,9 @@ logicalrep_worker_detach(void) LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); if (isParallelApplyWorker(w)) - logicalrep_worker_stop_internal(w, SIGTERM); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) w, + SIGTERM, + LogicalRepWorkerLock); } LWLockRelease(LogicalRepWorkerLock); @@ -794,10 +842,10 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); worker->type = WORKERTYPE_UNKNOWN; - worker->in_use = false; - worker->proc = NULL; - worker->dbid = InvalidOid; + worker->hdr.in_use = false; + worker->hdr.proc = NULL; worker->userid = InvalidOid; + worker->dbid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; worker->leader_pid = InvalidPid; @@ -931,9 +979,18 @@ ApplyLauncherRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + + /* + * The launcher now takes care of launching both logical apply workers and + * logical slot-sync workers. Thus to cater to the requirements of both, + * start it as soon as a consistent state is reached. This will help + * slot-sync workers to start timely on a physical standby while on a + * non-standby server, it holds same meaning as that of + * BgWorkerStart_RecoveryFinished. + */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LauncherMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication launcher"); snprintf(bgw.bgw_type, BGW_MAXLEN, @@ -953,6 +1010,7 @@ void ApplyLauncherShmemInit(void) { bool found; + Size ssw_size; LogicalRepCtx = (LogicalRepCtxStruct *) ShmemInitStruct("Logical Replication Launcher Data", @@ -977,6 +1035,14 @@ ApplyLauncherShmemInit(void) SpinLockInit(&worker->relmutex); } } + + /* Allocate shared-memory for slot-sync workers pool now */ + ssw_size = mul_size(max_slotsync_workers, sizeof(SlotSyncWorker)); + LogicalRepCtx->ss_workers = (SlotSyncWorker *) + ShmemInitStruct("Replication slot-sync workers", ssw_size, &found); + + if (!found) + memset(LogicalRepCtx->ss_workers, 0, ssw_size); } /* @@ -1115,13 +1181,729 @@ ApplyLauncherWakeup(void) } /* - * Main loop for the apply launcher process. + * Clean up slot-sync worker info. + */ +static void +slotsync_worker_cleanup(SlotSyncWorker *worker) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_EXCLUSIVE)); + + worker->hdr.in_use = false; + worker->hdr.proc = NULL; + worker->slot = -1; + + if (DsaPointerIsValid(worker->dbids_dp)) + { + dsa_free(worker->dbids_dsa, worker->dbids_dp); + worker->dbids_dp = InvalidDsaPointer; + } + + if (worker->dbids_dsa) + { + dsa_detach(worker->dbids_dsa); + worker->dbids_dsa = NULL; + } + + worker->dbcount = 0; + + worker->monitoring_info.confirmed_lsn = 0; + worker->monitoring_info.last_update_time = 0; +} + +/* + * Attach Slot-sync worker to worker-slot assigned by launcher. */ void -ApplyLauncherMain(Datum main_arg) +slotsync_worker_attach(int slot) { - ereport(DEBUG1, - (errmsg_internal("logical replication launcher started"))); + /* Block concurrent access. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + Assert(slot >= 0 && slot < max_slotsync_workers); + MySlotSyncWorker = &LogicalRepCtx->ss_workers[slot]; + MySlotSyncWorker->slot = slot; + + if (!MySlotSyncWorker->hdr.in_use) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot-sync worker slot %d is " + "empty, cannot attach", slot))); + } + + if (MySlotSyncWorker->hdr.proc) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot-sync worker slot %d is " + "already used by another worker, cannot attach", slot))); + } + + MySlotSyncWorker->hdr.proc = MyProc; + + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Detach the worker from DSM and update 'proc' and 'in_use'. + * Logical replication launcher will come to know using these + * that the worker has shutdown. + */ +void +slotsync_worker_detach(int code, Datum arg) +{ + dsa_detach((dsa_area *) DatumGetPointer(arg)); + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + MySlotSyncWorker->hdr.in_use = false; + MySlotSyncWorker->hdr.proc = NULL; + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Slot-Sync worker find. + * + * Searches the slot-sync worker pool for the worker who manages the + * specified dbid. Because a worker can manage multiple dbs, also walk + * the db array of each worker to find the match. + * + * Returns NULL if no matching worker is found. + */ +static SlotSyncWorker * +slotsync_worker_find(Oid dbid) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* Search for an attached worker for a given dbid */ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[widx]; + Oid *dbids; + + if (!w->hdr.in_use) + continue; + + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); + for (int dbidx = 0; dbidx < w->dbcount; dbidx++) + { + if (dbids[dbidx] == dbid) + return w; + } + + } + + return NULL; +} + +/* + * Setup the slot-sync worker. + * + * DSA is used for the dbids array. Because the maximum number of dbs a + * worker can manage is not known, initially enough memory for + * DB_PER_WORKER_ALLOC_INIT dbs is allocated. If this size is exhausted, + * it can be extended using dsa free and allocate routines. + */ +static dsa_handle +slotsync_worker_setup(SlotSyncWorker *worker) +{ + dsa_area *dbids_dsa; + dsa_pointer dbids_dp; + dsa_handle dbids_dsa_handle; + MemoryContext oldcontext; + + /* Prepare the new worker. */ + worker->hdr.launch_time = GetCurrentTimestamp(); + worker->hdr.in_use = true; + + /* + * 'proc' and 'slot' will be assigned in ReplSlotSyncWorkerMain when we + * attach this worker to a particular worker-pool slot + */ + worker->hdr.proc = NULL; + worker->slot = -1; + + /* TODO: do we really need 'generation', analyse more here */ + worker->hdr.generation++; + + /* Ensure the memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + dbids_dsa = dsa_create(LWTRANCHE_SLOTSYNC_DSA); + dsa_pin(dbids_dsa); + dsa_pin_mapping(dbids_dsa); + + dbids_dp = dsa_allocate0(dbids_dsa, DB_PER_WORKER_ALLOC_INIT * sizeof(Oid)); + + /* Set-up worker */ + worker->dbcount = 0; + worker->dbids_dsa = dbids_dsa; + worker->dbids_dp = dbids_dp; + + /* Get the handle. This is the one which can be passed to worker processes */ + dbids_dsa_handle = dsa_get_handle(dbids_dsa); + + elog(DEBUG1, "allocated dsa for slot-sync worker for dbcount: %d", + DB_PER_WORKER_ALLOC_INIT); + + MemoryContextSwitchTo(oldcontext); + + return dbids_dsa_handle; +} + +/* + * Slot-sync worker launch or reuse + * + * Start new slot-sync background worker from the pool of available workers + * limited by max_slotsync_workers count. If the worker pool is exhausted, + * reuse the existing worker with minimum number of dbs. The idea is to + * always distribute the dbs equally among launched workers. + * If initially allocated dbids array is exhausted for the selected worker, + * reallocate the dbids array with increased size and copy the existing + * dbids to it and assign the new one as well. + * + * Returns true on success, false on failure. + */ +static bool +slotsync_worker_launch_or_reuse(Oid dbid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + SlotSyncWorker *worker = NULL; + int worker_slot = -1; + dsa_handle handle; + Oid *dbids; + bool attach; + uint32 mindbcnt = PG_UINT32_MAX; + + Assert(OidIsValid(dbid)); + + /* The shared memory must only be modified under lock. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* + * Find unused worker slot. If all the workers are currently in use, find + * the one with minimum number of dbs and use that. + */ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[widx]; + + if (!w->hdr.in_use) + { + worker = w; + worker_slot = widx; + break; + } + + if (w->dbcount < mindbcnt) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = widx; + } + } + + /* + * If worker is being reused, and there is vacancy in dbids array, just + * update dbids array and dbcount and we are done. But if dbids array is + * exhausted, reallocate dbids using dsa and copy the old dbids and assign + * the new one as well. + */ + if (worker->hdr.in_use) + { + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + if (worker->dbcount < DB_PER_WORKER_ALLOC_INIT) + { + dbids[worker->dbcount++] = dbid; + } + else + { + MemoryContext oldcontext; + uint32 alloc_count; + uint32 old_dbcnt; + Oid *old_dbids; + + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + /* Remember the old dbids before we reallocate dsa. */ + old_dbcnt = worker->dbcount; + old_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid)); + memcpy(old_dbids, dbids, worker->dbcount * sizeof(Oid)); + + alloc_count = old_dbcnt + DB_PER_WORKER_ALLOC_EXTRA; + + /* Free the existing dbids and allocate new with increased size */ + if (DsaPointerIsValid(worker->dbids_dp)) + dsa_free(worker->dbids_dsa, worker->dbids_dp); + + worker->dbids_dp = dsa_allocate0(worker->dbids_dsa, + alloc_count * sizeof(Oid)); + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + /* Copy the existing dbids */ + worker->dbcount = old_dbcnt; + memcpy(dbids, old_dbids, old_dbcnt * sizeof(Oid)); + pfree(old_dbids); + + /* Assign new dbid */ + dbids[worker->dbcount++] = dbid; + + MemoryContextSwitchTo(oldcontext); + } + + LWLockRelease(SlotSyncWorkerLock); + + ereport(LOG, + (errmsg("added database %d to replication slot-sync " + "worker %d; dbcount now: %d", + dbid, worker_slot, worker->dbcount))); + return true; + } + + /* + * Initialise the worker and setup DSA for dbids array to hold + * DB_PER_WORKER_ALLOC_INIT dbs + */ + handle = slotsync_worker_setup(worker); + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + dbids[worker->dbcount++] = dbid; + + /* Before releasing lock, remember generation for future identification. */ + generation = worker->hdr.generation; + + LWLockRelease(SlotSyncWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + + Assert(worker_slot >= 0); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot-sync worker %d", worker_slot); + + snprintf(bgw.bgw_type, BGW_MAXLEN, "slot-sync worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(worker_slot); + + memcpy(bgw.bgw_extra, &handle, sizeof(dsa_handle)); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + Assert(generation == worker->hdr.generation); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase %s.", "max_worker_processes"))); + return false; + } + + /* Now wait until it attaches. */ + attach = WaitForReplicationWorkerAttach((LogicalWorkerHeader *) worker, + generation, + bgw_handle, + SlotSyncWorkerLock); + + /* + * If attach is done, log that the worker is managing dbid, else raise a + * warning + */ + if (attach) + ereport(LOG, + (errmsg("added database %d to replication slot-sync " + "worker %d; dbcount now: %d", + dbid, worker_slot, worker->dbcount))); + else + ereport(WARNING, + (errmsg("replication slot-sync worker failed to attach to " + "worker-pool slot %d", worker_slot))); + + return attach; +} + +/* + * Internal function to stop the slot-sync worker and cleanup afterwards. + */ +static void +slotsync_worker_stop_internal(SlotSyncWorker *worker) +{ + int slot = worker->slot; + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + ereport(LOG, + (errmsg("stopping replication slot-sync worker %d", + slot))); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGINT, + SlotSyncWorkerLock); + LWLockRelease(SlotSyncWorkerLock); + + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Stop all the slot-sync workers in use. + */ +static void +slotsync_workers_stop() +{ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (worker && worker->hdr.in_use) + slotsync_worker_stop_internal(worker); + } +} + + +/* + * Slot-sync workers remove obsolete DBs from db-list + * + * If the DBIds fetched from the primary server are lesser than the ones being + * managed by slot-sync workers, remove extra dbs from worker's db-list. This + * may happen if some logical failover slots are removed on the primary server + * or are disabled for failover. + */ +static void +slotsync_remove_obsolete_dbs(List *remote_dbs) +{ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* Traverse slot-sync-workers to validate the DBs */ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + Oid *dbids; + + if (!worker->hdr.in_use) + continue; + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + for (int dbidx = 0; dbidx < worker->dbcount;) + { + Oid wdbid = dbids[dbidx]; + bool found = false; + ListCell *lc; + + /* Check if current DB is still present in remote-db-list */ + foreach(lc, remote_dbs) + { + WalRcvFailoverSlotsData *failover_slot_data = lfirst(lc); + + if (failover_slot_data->dboid == wdbid) + { + found = true; + dbidx++; + break; + } + } + + /* If not found, then delete this db from worker's db-list */ + if (!found) + { + if (dbidx < (worker->dbcount - 1)) + { + /* Shift the DBs and get rid of wdbid */ + memmove(&dbids[dbidx], &dbids[dbidx + 1], + (worker->dbcount - dbidx - 1) * sizeof(Oid)); + } + + worker->dbcount--; + + ereport(LOG, + (errmsg("removed database %d from replication slot-sync " + "worker %d; dbcount now: %d", + wdbid, worker->slot, worker->dbcount))); + } + + } + } + + LWLockRelease(SlotSyncWorkerLock); + + /* + * If dbcount for any worker has become 0, shut it down. + * + * XXX: if needed in future, workers can be restarted in such a case to + * distribute the load. + */ + + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (worker->hdr.in_use && !worker->dbcount) + slotsync_worker_stop_internal(worker); + } + + /* + * TODO: Take care of removal of old 'synced' slots for the dbs which + * are no longer eligible for slot-sync. + */ +} + +/* + * Connect to the primary server for slotsync purpose and return the connection + * info. + */ +static WalReceiverConn * +slotsync_remote_connect(long *wait_time) +{ + WalReceiverConn *wrconn; + char *err; + char *dbname; + + if (!enable_syncslot) + return NULL; + + if (max_slotsync_workers == 0) + return NULL; + + /* + * Since the above two GUCs are set, check that other GUC settings + * (primary_slot_name, hot_standby_feedback, primary_conninfo) + * are compatible with slot synchronization. If not, issue warnings. + */ + + /* The primary_slot_name is not set */ + if (!WalRcv || WalRcv->slotname[0] == '\0') + { + ereport(WARNING, + errmsg("skipping slots synchronization as primary_slot_name " + "is not set.")); + + /* + * It's possible that the Walreceiver has not been started yet, adjust + * the wait_time to retry sooner in the next synchronization cycle. + */ + *wait_time = wal_retrieve_retry_interval; + return NULL; + } + + /* The hot_standby_feedback must be ON for slot-sync to work */ + if (!hot_standby_feedback) + { + ereport(WARNING, + errmsg("skipping slots synchronization as hot_standby_feedback " + "is off.")); + return NULL; + } + + /* The dbname must be specified in primary_conninfo for slot-sync to work */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + { + ereport(WARNING, + errmsg("skipping slots synchronization as dbname is not " + "specified in primary_conninfo.")); + return NULL; + } + + wrconn = walrcv_connect(PrimaryConnInfo, false, false, + "Logical Replication Launcher", &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + return wrconn; +} + +/* + * Save current slot-sync configurations. + * + * This function is invoked prior to each config-reload on receiving SIGHUP. + */ +static void +save_current_slotsync_configs() +{ + /* Free the previous allocations */ + if (PrimaryConnInfoPreReload) + pfree(PrimaryConnInfoPreReload); + + if (PrimarySlotNamePreReload) + pfree(PrimarySlotNamePreReload); + + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + PrimarySlotNamePreReload = pstrdup(WalRcv->slotname); + EnableSyncSlotPreReload = enable_syncslot; + HotStandbyFeedbackPreReload = hot_standby_feedback; +} + +/* + * Returns true if any of the slot-sync configurations changed. + */ +static bool +slotsync_configs_changed() +{ + return + (EnableSyncSlotPreReload != enable_syncslot) || + (HotStandbyFeedbackPreReload != hot_standby_feedback) || + (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) || + (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0); +} + +/* + * Launch slot-sync background workers. + * + * Connect to the primary, to get the list of DBIDs for failover logical slots. + * Then launch slot-sync workers (limited by max_slotsync_workers) where the DBs + * are distributed equally among those workers. + */ +static void +LaunchSlotSyncWorkers(long *wait_time, WalReceiverConn *wrconn) +{ + List *slots_dbs; + ListCell *lc; + MemoryContext tmpctx; + MemoryContext oldctx; + + Assert(wrconn); + + /* Use temporary context for the slot list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher slot-sync ctx", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(tmpctx); + + slots_dbs = walrcv_get_dbinfo_for_failover_slots(wrconn); + + slotsync_remove_obsolete_dbs(slots_dbs); + + foreach(lc, slots_dbs) + { + WalRcvFailoverSlotsData *slot_data = lfirst(lc); + SlotSyncWorker *w; + + Assert(OidIsValid(slot_data->dboid)); + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + w = slotsync_worker_find(slot_data->dboid); + LWLockRelease(SlotSyncWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If launch failed, adjust the wait_time to retry in the + * next sync-cycle sooner. + */ + if (!slotsync_worker_launch_or_reuse(slot_data->dboid)) + { + *wait_time = Min(*wait_time, wal_retrieve_retry_interval); + break; + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(tmpctx); +} + +/* + * Launch logical replication apply workers for enabled subscriptions. + */ +static void +LaunchSubscriptionApplyWorker(long *wait_time) +{ + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases where + * a restart is expected (e.g., subscription parameter changes), + * another process should remove the last-start entry for the + * subscription so that the worker can be restarted without waiting + * for wal_retrieve_retry_interval to elapse. + */ + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + +/* + * Main loop for the launcher process. + */ +void +LauncherMain(Datum main_arg) +{ + WalReceiverConn *wrconn = NULL; + + elog(DEBUG1, "logical replication launcher started"); before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); @@ -1139,79 +1921,23 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + load_file("libpqwalreceiver", false); + /* Enter main loop */ for (;;) { int rc; - List *sublist; - ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - /* Use temporary context to avoid leaking memory across cycles. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* Start any missing workers for enabled subscriptions. */ - sublist = get_subscription_list(); - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; - TimestampTz last_start; - TimestampTz now; - long elapsed; - - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w != NULL) - continue; /* worker is running already */ - - /* - * If the worker is eligible to start now, launch it. Otherwise, - * adjust wait_time so that we'll wake up as soon as it can be - * started. - * - * Each subscription's apply worker can only be restarted once per - * wal_retrieve_retry_interval, so that errors do not cause us to - * repeatedly restart the worker as fast as possible. In cases - * where a restart is expected (e.g., subscription parameter - * changes), another process should remove the last-start entry - * for the subscription so that the worker can be restarted - * without waiting for wal_retrieve_retry_interval to elapse. - */ - last_start = ApplyLauncherGetWorkerStartTime(sub->oid); - now = GetCurrentTimestamp(); - if (last_start == 0 || - (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) - { - ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(WORKERTYPE_APPLY, - sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); - } - else - { - wait_time = Min(wait_time, - wal_retrieve_retry_interval - elapsed); - } - } + if (!RecoveryInProgress()) + LaunchSubscriptionApplyWorker(&wait_time); + else if (wrconn == NULL) + wrconn = slotsync_remote_connect(&wait_time); - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + if (wrconn) + LaunchSlotSyncWorkers(&wait_time, wrconn); /* Wait for more work. */ rc = WaitLatch(MyLatch, @@ -1227,8 +1953,26 @@ ApplyLauncherMain(Datum main_arg) if (ConfigReloadPending) { + save_current_slotsync_configs(); + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* + * If any of the related GUCs changed, stop the slot-sync workers, + * revalidate the new configurations and reconnect. The workers + * will be relaunched in next sync-cycle using the new GUCs. + */ + if (slotsync_configs_changed()) + { + slotsync_workers_stop(); + + if (wrconn) + { + walrcv_disconnect(wrconn); + wrconn = NULL; + } + } } } @@ -1260,7 +2004,8 @@ GetLeaderApplyWorkerPid(pid_t pid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid) + if (isParallelApplyWorker(w) && w->hdr.proc && + pid == w->hdr.proc->pid) { leader_pid = w->leader_pid; break; @@ -1298,13 +2043,13 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) memcpy(&worker, &LogicalRepCtx->workers[i], sizeof(LogicalRepWorker)); - if (!worker.proc || !IsBackendPid(worker.proc->pid)) + if (!worker.hdr.proc || !IsBackendPid(worker.hdr.proc->pid)) continue; if (OidIsValid(subid) && worker.subid != subid) continue; - worker_pid = worker.proc->pid; + worker_pid = worker.hdr.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); if (isTablesyncWorker(&worker)) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8288da5277..90b33b8973 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && slot->data.synced) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical decoding", + NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot."))); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..ced162012d --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1042 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot-sync workers on a physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. + * + * It also takes care of dropping the slots which were created by it and are + * currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME_MS before every next + * synchronization. If there is no activity observed on the primary server for + * some time, the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS, but if + * any activity is observed, the nap time reverts to the default value. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool conflicting; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* Worker's nap time in case of regular activity on the primary server */ +#define WORKER_DEFAULT_NAPTIME_MS 10L /* 10 ms */ + +/* Worker's nap time in case of no-activity on the primary server */ +#define WORKER_INACTIVITY_NAPTIME_MS 10000L /* 10 sec */ + +/* + * Inactivity Threshold in ms before increasing nap time of worker. + * + * If the lsn of slot being monitored did not change for this threshold time, + * then increase nap time of current worker from WORKER_DEFAULT_NAPTIME_MS to + * WORKER_INACTIVITY_NAPTIME_MS. + */ +#define WORKER_INACTIVITY_THRESHOLD_MS 10000L /* 10 sec */ + +/* The variable to store primary_conninfo GUC before each ConfigReload */ +static char *PrimaryConnInfoPreReload = NULL; + +/* + * The variable to indicate the number of attempts for + * wait_for_primary_slot_catchup() after which it aborts the wait and + * the slot-sync worker then moves to the next slot creation. + * + * 0 indicates wait until the primary server catches up + */ +static int PrimaryCatchupWaitAttempt = 0; + +/* + * Wait for remote slot to pass locally reserved position. + */ +static bool +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + List **pending_slot_list) + +{ +#define WAIT_OUTPUT_COLUMN_COUNT 4 + StringInfoData cmd; + int wait_count = 0; + + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT conflicting, restart_lsn, confirmed_flush_lsn," + " catalog_xmin FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); + + for (;;) + { + XLogRecPtr new_invalidated; + XLogRecPtr new_restart_lsn; + XLogRecPtr new_confirmed_lsn; + TransactionId new_catalog_xmin; + WalRcvExecResult *res; + TupleTableSlot *slot; + int rc; + bool isnull; + Oid slotRow[WAIT_OUTPUT_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, + XIDOID}; + + CHECK_FOR_INTERRUPTS(); + + /* Check if this standby is promoted while we are waiting */ + if (!RecoveryInProgress()) + { + /* + * The remote slot didn't pass the locally reserved position at + * the time of local promotion, so it's not safe to use. + */ + ereport( + WARNING, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg( + "slot-sync wait for slot %s interrupted by promotion, " + "slot creation aborted", remote_slot->name))); + pfree(cmd.data); + return false; + } + + res = walrcv_exec(wrconn, cmd.data, WAIT_OUTPUT_COLUMN_COUNT, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from" + " the primary server: %s", remote_slot->name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + ereport(WARNING, + (errmsg("slot \"%s\" disappeared from the primary server," + " slot creation aborted", remote_slot->name))); + pfree(cmd.data); + walrcv_clear_result(res); + return false; + } + + + /* + * It is possible to get null values for lsns and xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + new_invalidated = DatumGetBool(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + new_restart_lsn = DatumGetLSN(slot_getattr(slot, 2, &isnull)); + if (new_invalidated || isnull) + { + ereport(WARNING, + (errmsg("slot \"%s\" invalidated on the primary server," + " slot creation aborted", remote_slot->name))); + pfree(cmd.data); + ExecClearTuple(slot); + walrcv_clear_result(res); + return false; + } + + /* + * Once we got valid restart_lsn, then confirmed_lsn and catalog_xmin + * are expected to be valid/non-null, so assert if found null. + */ + new_confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + new_catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 4, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && + TransactionIdFollowsOrEquals(new_catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + /* Update new values in remote_slot */ + remote_slot->restart_lsn = new_restart_lsn; + remote_slot->confirmed_lsn = new_confirmed_lsn; + remote_slot->catalog_xmin = new_catalog_xmin; + + ereport(LOG, + errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" + " and catalog xmin (%u) has now passed local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(new_restart_lsn), + new_catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + pfree(cmd.data); + + return true; + } + + if (PrimaryCatchupWaitAttempt && + ++wait_count >= PrimaryCatchupWaitAttempt) + { + if (pending_slot_list) + *pending_slot_list = lappend(*pending_slot_list, remote_slot); + + ereport(LOG, + errmsg("aborting the wait for remote slot \"%s\" and moving" + " to the next slot, will attempt creating it again.", + remote_slot->name)); + pfree(cmd.data); + return false; + } + + /* + * XXX: Is waiting for 2 seconds before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 2000L, + WAIT_EVENT_REPL_SLOTSYNC_PRIMARY_CATCHUP); + + ResetLatch(MyLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + ReplicationSlotMarkDirty(); +} + +/* + * Get list of local logical slot names which are synchronized from + * the primary server and belongs to one of the DBs passed in. + */ +static List * +get_local_synced_slot_names(Oid *dbids) +{ + List *localSyncedSlots = NIL; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && s->data.synced) + { + for (int j = 0; j < MySlotSyncWorker->dbcount; j++) + { + /* + * Add it to output list if this belongs to one of the + * worker's dbs. + */ + if (s->data.database == dbids[j]) + { + localSyncedSlots = lappend(localSyncedSlots, s); + break; + } + } + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return localSyncedSlots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if logical slot is locally invalidated i.e. invalidated on + * the standby but valid on the primary server. If found so, it sets + * locally_invalidated to true. + */ +static bool +slot_exists_in_list(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + ListCell *cell; + + foreach(cell, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * if remote slot is marked as non-conflicting (i.e. not + * invalidated) but local slot is marked as invalidated, then set + * the bool. + */ + *locally_invalidated = + !remote_slot->conflicting && + (local_slot->data.invalidated != RS_INVAL_NONE); + + return true; + } + } + + return false; +} + +/* + * Compute nap time for MySlotSyncWorker. + * + * The slot-sync worker takes a nap before it again checks for slots on the + * primary server. The time for each nap is computed here. + * + * The first slot managed by each worker is chosen for monitoring purpose. + * If the lsn of that slot changes during each sync-check time, then the + * nap time is kept at the regular value of WORKER_DEFAULT_NAPTIME_MS. + * When no lsn change is observed within the threshold period + * WORKER_INACTIVITY_THRESHOLD_MS, then the nap time is increased + * to WORKER_INACTIVITY_NAPTIME_MS. + * This nap time is brought back to WORKER_DEFAULT_NAPTIME_MS as soon as + * another lsn change is observed. + */ +static void +compute_naptime(RemoteSlot *remote_slot, long *naptime) +{ + TimestampTz now = GetCurrentTimestamp(); + + Assert(*naptime == WORKER_DEFAULT_NAPTIME_MS || *naptime == + WORKER_INACTIVITY_NAPTIME_MS); + + if (MySlotSyncWorker->monitoring_info.confirmed_lsn != + remote_slot->confirmed_lsn) + { + MySlotSyncWorker->monitoring_info.last_update_time = now; + MySlotSyncWorker->monitoring_info.confirmed_lsn = remote_slot->confirmed_lsn; + + /* Something changed; reset naptime to default. */ + *naptime = WORKER_DEFAULT_NAPTIME_MS; + } + else + { + if (*naptime == WORKER_DEFAULT_NAPTIME_MS) + { + /* + * If the inactivity time reaches the threshold, increase nap + * time. + */ + if (TimestampDifferenceExceeds(MySlotSyncWorker->monitoring_info.last_update_time, + now, WORKER_INACTIVITY_THRESHOLD_MS)) + *naptime = WORKER_INACTIVITY_NAPTIME_MS; + } + } +} + +/* + * This gets invalidation cause of the remote slot. + */ +static ReplicationSlotInvalidationCause +get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name) +{ + WalRcvExecResult *res; + Oid slotRow[1] = {INT2OID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *slot; + ReplicationSlotInvalidationCause cause; + MemoryContext oldctx = CurrentMemoryContext; + + /* Syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_get_slot_invalidation_cause(%s)", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + pfree(cmd.data); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch invalidation cause for slot \"%s\" from" + " the primary server: %s", slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disappeared from the primary server", + slot_name))); + + cause = DatumGetInt16(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + return cause; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Also drop the slots that are valid on the primary that got invalidated + * on the standby due to conflict (say required rows removed on the primary). + * The assumption is, that these will get recreated in next sync-cycle and + * it is okay to drop and recreate such slots as long as these are not + * consumable on the standby (which is the case currently). + */ +static void +drop_obsolete_slots(Oid *dbids, List *remote_slot_list) +{ + List *local_slot_list = NIL; + ListCell *lc_slot; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* + * Get the list of local slots for dbids managed by this worker, so that + * those not on remote could be dropped. + */ + local_slot_list = get_local_synced_slot_names(dbids); + + foreach(lc_slot, local_slot_list) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc_slot); + bool local_exists = false; + bool locally_invalidated = false; + + local_exists = slot_exists_in_list(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!local_exists || locally_invalidated) + { + ReplicationSlotDrop(NameStr(local_slot->data.name), true, false); + + ereport(LOG, + (errmsg("dropped replication slot \"%s\" ", + NameStr(local_slot->data.name)))); + } + } +} + +/* + * Construct Slot Query + * + * It constructs the query using dbids array in order to get failover + * logical slots information from the primary server. + */ +static void +construct_slot_query(StringInfo s, Oid *dbids) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + appendStringInfo(s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, conflicting, " + " database FROM pg_catalog.pg_replication_slots" + " WHERE failover and database IN "); + + appendStringInfoChar(s, '('); + for (int i = 0; i < MySlotSyncWorker->dbcount; i++) + { + char *dbname; + + if (i != 0) + appendStringInfoChar(s, ','); + + dbname = get_database_name(dbids[i]); + appendStringInfo(s, "%s", + quote_literal_cstr(dbname)); + pfree(dbname); + } + appendStringInfoChar(s, ')'); +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + List **pending_slot_list) +{ + bool found = false; + MemoryContext oldctx = CurrentMemoryContext; + + /* Good to check again if the standby is promoted */ + if (!RecoveryInProgress()) + { + ereport( + WARNING, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("slot-sync for slot \"%s\" interrupted by promotion, " + "sync not possible", remote_slot->name))); + return; + } + + /* + * Make sure that concerned WAL is received before syncing slot to target + * lsn received from the primary server. + * + * This check should never pass as on the primary server, we have waited + * for the standby's confirmation before updating the logical slot. But to + * take care of any bug in that flow, we should retain this check. + */ + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd) + { + elog(LOG, "skipping sync of slot \"%s\" as the received slot-sync " + "LSN %X/%X is ahead of the standby position %X/%X", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(WalRcv->latestWalEnd)); + + return; + } + + /* Search for the named slot */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots && !found; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use) + found = (strcmp(NameStr(s->data.name), remote_slot->name) == 0); + } + LWLockRelease(ReplicationSlotControlLock); + + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + /* Already existing slot, acquire */ + if (found) + { + ReplicationSlotAcquire(remote_slot->name, true); + + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(WARNING, + errmsg("not synchronizing slot %s; synchronization would move" + " it backwards", remote_slot->name)); + + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + + /* Update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotSave(); + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase, false); + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + slot->data.failover = true; + slot->data.synced = true; + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + /* + * If the local restart_lsn and/or local catalog_xmin is ahead of + * those on the remote then we cannot create the local slot in sync + * with the primary server because that would mean moving the local slot + * backwards and we might not have WALs retained for old lsns. In this + * case we will wait for the primary server's restart_lsn and + * catalog_xmin to catch up with the local one before attempting the + * sync. + */ + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, + pending_slot_list)) + { + /* + * The remote slot didn't catch up to locally reserved + * position + */ + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + + } + + /* Update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotPersist(); + + ereport(LOG, errmsg("created slot \"%s\" locally", remote_slot->name)); + } + + ReplicationSlotRelease(); + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + return; +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server for the dbids + * managed by this worker and then updates the slots locally as per the info + * received. Creates the slots if not present on the standby. + * + * Returns nap time for the next sync-cycle. + */ +static long +synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 8 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *slot; + StringInfoData s; + List *remote_slot_list = NIL; + List *pending_slot_list = NIL; + MemoryContext oldctx = CurrentMemoryContext; + long naptime = WORKER_DEFAULT_NAPTIME_MS; + Oid *dbids; + int count = 0; + ListCell *cell; + + /* The primary_slot_name is not set yet or WALs not received yet */ + if (!WalRcv || + (WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(WalRcv->latestWalEnd)) + return naptime; + + /* + * No more writes to dbcount and dbids by launcher after this until we + * release this lock. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + + /* + * Check dbcount before starting to sync. There is a possibility that + * dbids managed by this worker are no longer valid due to change in + * logical failover slots on the primary server. In that case, launcher + * will make dbcount=0 and will send SIGINT to shutdown this worker. + * Thus check dbcount before we proceed further. + */ + if (!MySlotSyncWorker->dbcount) + { + /* Return and handle the interrupts in main loop */ + return false; + } + + /* Get dbids from dsa */ + dbids = (Oid *) dsa_get_address(dsa, MySlotSyncWorker->dbids_dp); + + /* The syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + /* Construct query to get slots info from the primary server */ + initStringInfo(&s); + construct_slot_query(&s, dbids); + + elog(DEBUG2, "slot-sync worker %d's query:%s \n", MySlotSyncWorker->slot, + s.data); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch failover logical slots info from the primary server: %s", + res->err))); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for lsns and xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + if (isnull) + remote_slot->confirmed_lsn = InvalidXLogRecPtr; + + remote_slot->restart_lsn = DatumGetLSN(slot_getattr(slot, 4, &isnull)); + if (isnull) + remote_slot->restart_lsn = InvalidXLogRecPtr; + + remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 5, &isnull)); + if (isnull) + remote_slot->catalog_xmin = InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(slot, 6, &isnull)); + Assert(!isnull); + + remote_slot->conflicting = DatumGetBool(slot_getattr(slot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(slot, + 8, &isnull)); + Assert(!isnull); + + if (remote_slot->conflicting) + remote_slot->invalidated = get_remote_invalidation_cause(wrconn, + remote_slot->name); + else + { + remote_slot->invalidated = RS_INVAL_NONE; + count++; + } + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + /* + * Update naptime as required depending on slot activity. Check only + * for the first slot, if one slot has activity then all slots will. + */ + if (count == 1) + compute_naptime(remote_slot, &naptime); + + ExecClearTuple(slot); + } + + /* Now sync the slots locally */ + foreach(cell, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + /* + * Ping and wait for the primary server for these many times during a + * slot creation, if it still does not catch up, abort the wait and move + * to the next slot. + */ + PrimaryCatchupWaitAttempt = 5; + + /* + * The slots for which the primary server failed to catchup after trying + * for 'PrimaryCatchupWaitAttempt' attempts, will be added to the + * pending_slot_list by wait_for_primary_slot_catchup(). + */ + synchronize_one_slot(wrconn, remote_slot, &pending_slot_list); + } + + /* + * Now sync the pending slots which were failed to be created in first + * attempt. + */ + foreach(cell, pending_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + /* Wait until the primary server catches up */ + PrimaryCatchupWaitAttempt = 0; + + synchronize_one_slot(wrconn, remote_slot, NULL); + } + + /* Drop local slots that no longer need to be synced. */ + drop_obsolete_slots(dbids, remote_slot_list); + + LWLockRelease(SlotSyncWorkerLock); + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + return naptime; +} + +/* + * Connect to the remote (primary) server. + * + * This uses GUC primary_conninfo in order to connect to the primary. + * For slot-sync to work, primary_conninfo is required to specify dbname + * as well. + */ +static WalReceiverConn * +remote_connect() +{ + WalReceiverConn *wrconn = NULL; + char *err; + + wrconn = walrcv_connect(PrimaryConnInfo, true, false, "slot-sync", &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + return wrconn; +} + +/* + * Reconnect to the remote (primary) server if PrimaryConnInfo has changed. + */ +static WalReceiverConn * +reconnect_if_needed(WalReceiverConn *wrconn_prev) +{ + WalReceiverConn *wrconn = NULL; + + /* If no change in PrimaryConnInfo, return the previous connection itself */ + if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) == 0) + return wrconn_prev; + + walrcv_disconnect(wrconn_prev); + wrconn = remote_connect(); + return wrconn; +} + +/* + * Interrupt handler for main loop of slot-sync worker. + */ +static bool +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) +{ + bool reload_done = false; + + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(LOG, + errmsg("replication slot-sync worker %d is shutting" + " down on receiving SIGINT", MySlotSyncWorker->slot)); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + + /* Free the previous allocation. */ + if (PrimaryConnInfoPreReload) + pfree(PrimaryConnInfoPreReload); + + /* Save the GUC primary_conninfo before reloading. */ + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + + ProcessConfigFile(PGC_SIGHUP); + reload_done = true; + } + + return reload_done; +} + +/* + * The main loop of our worker process. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + dsa_handle handle; + dsa_area *dsa; + WalReceiverConn *wrconn = NULL; + char *dbname; + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Attach to the dynamic shared memory segment for the slot-sync worker + * and find its table of contents. + */ + memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsa_handle)); + dsa = dsa_attach(handle); + if (!dsa) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory " + "segment for slot-sync worker"))); + + /* Primary initialization is complete. Now attach to our slot. */ + slotsync_worker_attach(worker_slot); + + ereport(LOG, + errmsg("replication slot-sync worker %d started", worker_slot)); + + before_shmem_exit(slotsync_worker_detach, PointerGetDatum(dsa)); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* + * Get the user provided dbname from the connection string, if dbname not + * provided, skip sync. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + proc_exit(0); + + /* + * Connect to the database specified by user in PrimaryConnInfo. We need a + * database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, + NULL, + 0); + + /* Connect to the primary server */ + wrconn = remote_connect(); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime; + bool config_reloaded = false; + + config_reloaded = ProcessSlotSyncInterrupts(wrconn); + + /* Reconnect if GUC primary_conninfo got changed */ + if (config_reloaded) + wrconn = reconnect_if_needed(wrconn); + + if (!RecoveryInProgress()) + proc_exit(0); + + naptime = synchronize_slots(dsa, wrconn); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + naptime, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + ResetLatch(MyLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + walrcv_disconnect(wrconn); + proc_exit(1); + } + } + + /* + * The slot-sync worker can not get here because it will only stop when it + * receives a SIGINT from the logical replication launcher, or when there + * is an error. + */ + Assert(false); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7036096653..3caebd51f4 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" @@ -246,7 +247,7 @@ wait_for_worker_state_change(char expected_state) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, false); - if (worker && worker->proc) + if (worker && worker->hdr.proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -535,7 +536,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* Signal the sync worker, as it may be waiting for us. */ - if (syncworker->proc) + if (syncworker->hdr.proc) logicalrep_worker_wakeup_ptr(syncworker); /* Now safe to release the LWLock */ diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index b706046811..c910c9be96 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -77,12 +77,14 @@ Node *replication_parse_result; %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_LIST_DBID_FOR_FAILOVER_SLOTS %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot alter_replication_slot identify_system read_replication_slot timeline_history show + list_dbid_for_failover_slots %type generic_option_list %type generic_option %type opt_timeline @@ -117,6 +119,7 @@ command: | read_replication_slot | timeline_history | show + | list_dbid_for_failover_slots ; /* @@ -129,6 +132,16 @@ identify_system: } ; +/* + * LIST_DBID_FOR_FAILOVER_SLOTS + */ +list_dbid_for_failover_slots: + K_LIST_DBID_FOR_FAILOVER_SLOTS + { + $$ = (Node *) makeNode(ListDBForFailoverSlotsCmd); + } + ; + /* * READ_REPLICATION_SLOT %s */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 0b5ae23195..57ddc08dfc 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -129,6 +129,7 @@ ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } +LIST_DBID_FOR_FAILOVER_SLOTS { return K_LIST_DBID_FOR_FAILOVER_SLOTS; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } @@ -306,6 +307,7 @@ replication_scanner_is_replication_command(void) case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: + case K_LIST_DBID_FOR_FAILOVER_SLOTS: /* Yes; push back the first token so we can parse later. */ repl_pushed_back_token = first_token; return true; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e9babb69d4..089b6cb37d 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -317,6 +317,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.synced = false; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -646,12 +647,25 @@ restart: * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name, bool nowait) +ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd) { Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (user_cmd && RecoveryInProgress() && MyReplicationSlot->data.synced) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary."))); + } + ReplicationSlotDropAcquired(); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 3565ca196f..6121442b15 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -226,11 +226,40 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name), true); + ReplicationSlotDrop(NameStr(*name), true, true); PG_RETURN_VOID(); } +/* + * SQL function for getting invalidation cause of a slot. + * + * Returns ReplicationSlotInvalidationCause enum value for valid slot_name; + * returns NULL if slot with given name is not found. + * + * It return RS_INVAL_NONE if the given slot is not invalidated. + */ +Datum +pg_get_slot_invalidation_cause(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + int slotno; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[slotno]; + + if (strcmp(NameStr(s->data.name), NameStr(*name)) == 0) + { + PG_RETURN_INT16(s->data.invalidated); + } + } + LWLockRelease(ReplicationSlotControlLock); + + PG_RETURN_NULL(); +} + /* * pg_get_replication_slots - SQL SRF showing all replication slots * that currently exist on the database cluster. @@ -238,7 +267,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -420,6 +449,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = BoolGetDatum(slot_contents.data.synced); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eeb5ea6cfa..ef543b0f3f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -480,6 +480,73 @@ IdentifySystem(void) end_tup_output(tstate); } +/* + * Handle the LIST_DBID_FOR_FAILOVER_SLOTS command. + * + * Return the list of database-ids for failover logical slots. + * The returned list has no duplicates. + */ +static void +ListFailoverSlotsDbids(void) +{ + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + List *database_oids_list = NIL; + + dest = CreateDestReceiver(DestRemoteSimple); + + /* Need a tuple descriptor representing a single column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "database_oid", + INT8OID, -1, 0); + + /* Prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + Oid dboid; + bool failover_slot = false; + Datum values[1]; + bool nulls[1]; + + if (!slot->in_use) + continue; + + SpinLockAcquire(&slot->mutex); + + dboid = slot->data.database; + failover_slot = slot->data.failover; + + SpinLockRelease(&slot->mutex); + + if (!failover_slot || SlotIsPhysical(slot)) + continue; + + /* Skip this slot if the database OID is already in the list. */ + if (list_member_oid(database_oids_list, dboid)) + continue; + + /* Add the database OID to the list */ + database_oids_list = lappend_oid(database_oids_list, dboid); + + values[0] = Int64GetDatum(dboid); + nulls[0] = (dboid == InvalidOid); + + /* Send it to dest */ + do_tup_output(tstate, values, nulls); + } + LWLockRelease(ReplicationSlotControlLock); + + /* Clean up the list */ + list_free(database_oids_list); + + end_tup_output(tstate); +} + /* Handle READ_REPLICATION_SLOT command */ static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd) @@ -1263,7 +1330,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname, !cmd->wait); + ReplicationSlotDrop(cmd->slotname, !cmd->wait, false); } /* @@ -2171,6 +2238,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ListDBForFailoverSlotsCmd: + cmdtag = "LIST_DBID_FOR_FAILOVER_SLOTS"; + set_ps_display(cmdtag); + ListFailoverSlotsDbids(); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 315a78cda9..fd9e73a49b 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -190,6 +190,8 @@ static const char *const BuiltinTrancheNames[] = { "LogicalRepLauncherDSA", /* LWTRANCHE_LAUNCHER_HASH: */ "LogicalRepLauncherHash", + /* LWTRANCHE_SLOTSYNC_DSA: */ + "SlotSyncWorkerDSA", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906ce..e62a3f1bc0 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,4 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 WaitEventExtensionLock 48 +SlotSyncWorkerLock 49 diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index f6e2ec82c1..4affc1c861 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot-sync worker." +REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up, in slot-sync worker." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index d728e33d5e..5c2a2f182e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -65,8 +65,11 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2021,6 +2024,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_syncslot", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &enable_syncslot, + true, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -3518,6 +3530,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slotsync_workers", + PGC_POSTMASTER, + REPLICATION_STANDBY, + gettext_noop("Maximum number of slot synchronization workers " + "on a standby."), + NULL, + }, + &max_slotsync_workers, + 2, 0, 50, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index dd2769cdd3..d83647889d 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -355,6 +355,8 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#enable_syncslot = on # enables slot synchronization on the physical standby from the primary +#max_slotsync_workers = 2 # maximum number of slot synchronization workers # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index ca626dff13..40b0781f5c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11078,14 +11078,18 @@ proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', prosrc => 'pg_drop_replication_slot' }, +{ oid => '8484', descr => 'what caused the replication slot to become invalid', + proname => 'pg_get_slot_invalidation_cause', provolatile => 's', proisstrict => 't', + prorettype => 'int2', proargtypes => 'name', + prosrc => 'pg_get_slot_invalidation_cause' }, { oid => '3781', descr => 'information about replication slots currently in use', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced_slot}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 214dc6c29e..75b4b2040d 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -28,4 +29,7 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, + char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index bef8a7162e..8a5b374cea 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ------------------------------- + * LIST_DBID_FOR_FAILOVER_SLOTS command + * ------------------------------- + */ +typedef struct ListDBForFailoverSlotsCmd +{ + NodeTag type; + List *slot_names; +} ListDBForFailoverSlotsCmd; /* ---------------------- * BASE_BACKUP command diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index a07c9cb311..07cfae7b37 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,9 +15,12 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_slotsync_workers; +extern PGDLLIMPORT bool enable_syncslot; + extern void ApplyLauncherRegister(void); -extern void ApplyLauncherMain(Datum main_arg); +extern void LauncherMain(Datum main_arg); extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); @@ -31,4 +34,6 @@ extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); +extern PGDLLIMPORT char *PrimaryConnInfo; + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index bbd71d0b42..baad5a8f3a 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -19,6 +19,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); extern void TablesyncWorkerMain(Datum main_arg); +extern void ReplSlotSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 4bdb6edd83..3b1685d566 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,7 +15,6 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" -#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -112,6 +111,13 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Is this a slot created by a sync-slot worker? + * + * Relevant for logical slots on the physical standby. + */ + bool synced; + /* * Is this a failover slot (sync candidate for physical standbys)? * Relevant for logical slots on the primary server. @@ -230,7 +236,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); @@ -256,7 +262,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(bool is_shutdown); @@ -264,7 +269,6 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); -extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn); extern void SlotSyncInitConfig(void); extern void SlotSyncFreeConfig(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 115344f1c4..261c560190 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -20,6 +20,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -191,6 +192,14 @@ typedef struct } proto; } WalRcvStreamOptions; +/* + * Failover logical slots data received from remote. + */ +typedef struct WalRcvFailoverSlotsData +{ + Oid dboid; +} WalRcvFailoverSlotsData; + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; @@ -280,6 +289,21 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_failover_slots_fn + * + * Run LIST_DBID_FOR_FAILOVER_SLOTS on primary server to get the + * list of unique DBIDs for failover logical slots + */ +typedef List *(*walrcv_get_dbinfo_for_failover_slots_fn) (WalReceiverConn *conn); + +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -404,6 +428,8 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbinfo_for_failover_slots_fn walrcv_get_dbinfo_for_failover_slots; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -429,6 +455,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbinfo_for_failover_slots(conn) \ + WalReceiverFunctions->walrcv_get_dbinfo_for_failover_slots(conn) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a9bba11187..eb6f3fd75c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -36,11 +36,9 @@ typedef enum LogicalRepWorkerType WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; -typedef struct LogicalRepWorker +/* Common data for Slotsync and LogicalRep workers */ +typedef struct LogicalWorkerHeader { - /* What type of worker is this? */ - LogicalRepWorkerType type; - /* Time at which this worker was launched. */ TimestampTz launch_time; @@ -53,6 +51,16 @@ typedef struct LogicalRepWorker /* Pointer to proc array. NULL if not running. */ PGPROC *proc; +} LogicalWorkerHeader; + +/* Shared memory structure for logical replication workers. */ +typedef struct LogicalRepWorker +{ + LogicalWorkerHeader hdr; + + /* What type of worker is this? */ + LogicalRepWorkerType type; + /* Database id to connect to. */ Oid dbid; @@ -96,6 +104,40 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +/* + * Shared memory structure for Slot-Sync worker. It is allocated by logical + * replication launcher and then read by each slot-sync worker. + * + * It is protected by LWLock (SlotSyncWorkerLock). Each slot-sync worker + * reading the structure needs to hold the lock in shared mode, whereas + * the logical replication launcher which updates it needs to hold the lock + * in exclusive mode. + */ +typedef struct SlotSyncWorker +{ + LogicalWorkerHeader hdr; + + /* The slot in worker pool to which slot-sync worker is attached */ + int slot; + + /* Count of dbids slot-sync worker manages */ + uint32 dbcount; + + /* DSA for dbids */ + dsa_area *dbids_dsa; + + /* dsa_pointer for dbids slot-sync worker manages */ + dsa_pointer dbids_dp; + + /* Info about slot being monitored for worker's naptime purpose */ + struct SlotSyncWorkerWatchSlot + { + XLogRecPtr confirmed_lsn; + TimestampTz last_update_time; + } monitoring_info; + +} SlotSyncWorker; + /* * State of the transaction in parallel apply worker. * @@ -234,12 +276,15 @@ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; /* Worker and subscription objects. */ extern PGDLLIMPORT Subscription *MySubscription; extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; +extern PGDLLIMPORT SlotSyncWorker *MySlotSyncWorker; extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); +extern void slotsync_worker_attach(int slot); +extern void slotsync_worker_detach(int code, Datum arg); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); @@ -329,9 +374,9 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->in_use && \ +#define isParallelApplyWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTablesyncWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) static inline bool @@ -343,14 +388,14 @@ am_tablesync_worker(void) static inline bool am_leader_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); } static inline bool am_parallel_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return isParallelApplyWorker(MyLogicalRepWorker); } diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index b038e599c0..0621ee70fc 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -207,6 +207,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DATA, LWTRANCHE_LAUNCHER_DSA, LWTRANCHE_LAUNCHER_HASH, + LWTRANCHE_SLOTSYNC_DSA, LWTRANCHE_FIRST_USER_DEFINED, } BuiltinTrancheIds; diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl index 42e51634c5..151b126985 100644 --- a/src/test/recovery/t/050_verify_slot_order.pl +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -142,4 +142,125 @@ $result = $subscriber1->safe_psql('postgres', "SELECT count(*) = $primary_row_count FROM tab_int;"); is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); +# Test logical failover slots on the standby +# Configure standby3 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb3_slot--->| ----> standby3 (connected via streaming replication) +# | lsub1_slot(synced_slot) + +# Cleanup old standby_slot_names +$primary->stop; +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = '' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb3_slot');}); + +$backup_name = 'backup2'; +$primary->backup($backup_name); + +# Create standby3 +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby3->stop; +$standby3->append_conf( + 'postgresql.conf', q{ +enable_syncslot = true +hot_standby_feedback = on +primary_slot_name = 'sb3_slot' +}); +$standby3->append_conf( + 'postgresql.conf', qq( +primary_conninfo = '$connstr_1 dbname=postgres' +)); +$standby3->start; + +# Add this standby into the primary's configuration +$primary->stop; +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb3_slot' +)); +$primary->start; + +# Restart the standby +$standby3->restart; + +# Advance lsn on the primary +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); + +# Wait for the standby to sync +my $offset = -s $standby3->logfile; +$standby3->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? wait over for remote slot \"lsub1_slot\"/, + $offset); + +# Confirm that logical failover slot is created on the standby +is( $standby3->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots;} + ), + 'lsub1_slot', + 'failover slot was created'); + +# Verify slot parameters on the standby +is( $standby3->safe_psql('postgres', + q{SELECT failover, synced_slot FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t|t", + 'logical slot marked as synced_slot and failover on standby'); + +# Verify slot parameters on the primary +is( $primary->safe_psql('postgres', + q{SELECT failover, synced_slot FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t|f", + 'logical slot marked as failover but not synced_slot on primary'); + +# Test to confirm that restart_lsn of the logical slot on the primary is synced to the standby + +# Truncate table on primary +$primary->safe_psql('postgres', + "TRUNCATE TABLE tab_int;"); + +# Insert data on the primary +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# let the slots get synced on the standby +sleep 2; + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn of lsub1_slot slot is synced to the standby +$result = $standby3->safe_psql('postgres', + qq[SELECT '$primary_lsn' <= restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';]); +is($result, 't', 'restart_lsn of slot lsub1_slot synced to standby'); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +$primary_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that confirmed_flush_lsn of lsub1_slot slot is synced to the standby +$result = $standby3->safe_psql('postgres', + qq[SELECT '$primary_lsn' <= confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';]); +is($result, 't', 'confirmed_flush_lsn of slot lsub1_slot synced to the standby'); + done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index c9647e86b2..64afe1ddbd 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflicting, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) + l.failover, + l.synced_slot + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced_slot) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 271313ebf8..70bd65e2cf 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -132,8 +132,9 @@ select name, setting from pg_settings where name like 'enable%'; enable_self_join_removal | on enable_seqscan | on enable_sort | on + enable_syncslot | on enable_tidscan | on -(22 rows) +(23 rows) -- There are always wait event descriptions for various types. select type, count(*) > 0 as ok FROM pg_wait_events diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0f184fb103..12d36122ad 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1430,6 +1430,7 @@ LimitState LimitStateCond List ListCell +ListDBForLogicalSlotsCmd ListDictionary ListParsedLex ListenAction @@ -1509,6 +1510,7 @@ LogicalSlotInfo LogicalSlotInfoArr LogicalTape LogicalTapeSet +LogicalWorkerHeader LsnReadQueue LsnReadQueueNextFun LsnReadQueueNextStatus @@ -2310,6 +2312,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2566,6 +2569,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorker SlruCtl SlruCtlData SlruErrorCause @@ -3013,6 +3017,7 @@ WalLevel WalRcvData WalRcvExecResult WalRcvExecStatus +WalRcvFailoverSlotsData WalRcvState WalRcvStreamOptions WalRcvWakeupReason -- 2.30.0.windows.2