From f98361d0e4dd465f72858484b97c8494053b9acc Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 5 Oct 2023 02:04:40 -0400 Subject: [PATCH v22 2/2] Add logical slot sync capability to physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical subscribers are not blocked after failover. All the logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker(s) on the standby server ping the primary at regular intervals to get the necessary logical slot information and create/update the slots locally. A new GUC 'max_slotsync_workers' defines the maximum number of slot-sync workers on the standby. This parameter can only be set at server start. Now the replication launcher on the physical standby queries primary to get the list of dbids that belong to the slots mentioned in GUC 'synchronize_slot_names'. Once it gets the dbids, if dbids < max_slotsync_workers, it starts only that many workers and if dbids > max_slotsync_workers, it starts max_slotsync_workers and divides the work equally among them. Each worker is then responsible to keep on syncing the logical slots belonging to the DBs assigned to it. Each slot-sync worker will have its own dbids list. Since the upper limit of this dbid-count is not known, it needs to be handled using dsa. We initially allocate memory to hold 100 dbids for each worker. If this limit is exhausted, we reallocate this memory with size incremented again by 100. The nap time of worker is tuned according to the activity on the primary. Each worker starts with nap time of 10ms and if no activity is observed on the primary for some time, then nap time is increased to 10sec. And if activity is observed again, nap time is reduced back to 10ms. Each worker uses one slot (first one assigned to it) for monitoring purpose. If there is no change in lsn of that slot for some threshold time, nap time is increased to 10sec and as soon as a change is observed, nap time is reduced back to 10ms. The logical slots created by slot-sync workers on physical standbys are not allowed to be consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby due to conflict (say required rows removed on the primary), then that slot is dropped and recreated on the standby in next sync-cycle. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). --- doc/src/sgml/config.sgml | 39 +- doc/src/sgml/system-views.sgml | 10 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 5 +- .../libpqwalreceiver/libpqwalreceiver.c | 119 ++ src/backend/replication/logical/Makefile | 1 + .../replication/logical/applyparallelworker.c | 3 +- src/backend/replication/logical/launcher.c | 987 +++++++++++++-- src/backend/replication/logical/logical.c | 12 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1077 +++++++++++++++++ src/backend/replication/logical/tablesync.c | 5 +- src/backend/replication/repl_gram.y | 32 +- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 18 +- src/backend/replication/slotfuncs.c | 35 +- src/backend/replication/walsender.c | 127 +- src/backend/storage/lmgr/lwlock.c | 2 + src/backend/storage/lmgr/lwlocknames.txt | 1 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/misc/guc_tables.c | 16 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/catalog/pg_proc.dat | 10 +- src/include/commands/subscriptioncmds.h | 4 + src/include/nodes/replnodes.h | 9 + src/include/postmaster/bgworker_internals.h | 1 + src/include/replication/logicallauncher.h | 6 +- src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 10 +- src/include/replication/walreceiver.h | 31 + src/include/replication/worker_internal.h | 60 +- src/include/storage/lwlock.h | 1 + src/test/regress/expected/rules.out | 5 +- src/tools/pgindent/typedefs.list | 5 + 34 files changed, 2499 insertions(+), 143 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c mode change 100644 => 100755 src/backend/replication/walsender.c diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index b2ba74f6ce..6e5ee538cf 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4576,10 +4576,13 @@ ANY num_sync ( num_sync ( + max_slotsync_workers (integer) + + max_slotsync_workers configuration parameter + + + + + Specifies maximum number of slot synchronization workers. + + + Slot synchronization workers are taken from the pool defined by + max_worker_processes. + + + The default value is 2. This parameter can only be set at server + start. + + + The slot-sync workers are needed for synchronization of logical replication + slots from the primary server to the physical standby so that logical + subscribers are not blocked after failover. Slot-sync workers need + synchronize_slot_names to be configured correctly in + order to synchronize the logical replication slots. + + + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 2b35c2f91b..0c9c84406a 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2532,6 +2532,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx invalidated). Always NULL for physical slots. + + + + synced_slot bool + + + True if this logical slot is created on physical standby as part of + slot-synchronization from primary server. Always false for physical slots. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fcb14976c0..7986e058f7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1002,7 +1002,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflicting + L.conflicting, + L.synced_slot FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 505e38376c..2a137ad4aa 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -124,11 +124,14 @@ static const struct "ParallelWorkerMain", ParallelWorkerMain }, { - "ApplyLauncherMain", ApplyLauncherMain + "LauncherMain", LauncherMain }, { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..bb943b4331 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,9 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static List *libpqrcv_get_dbinfo_for_logical_slots(WalReceiverConn *conn, + const char *slot_names); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -96,6 +100,8 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_get_dbinfo_for_logical_slots = libpqrcv_get_dbinfo_for_logical_slots, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -409,6 +415,119 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get DB info for logical slots + * + * It gets the DBIDs for slot_names from primary. The list returned + * by LIST_DBID_FOR_LOGICAL_SLOTS has no duplicates. + */ +static List * +libpqrcv_get_dbinfo_for_logical_slots(WalReceiverConn *conn, + const char *slot_names) +{ + PGresult *res; + List *slotlist = NIL; + int ntuples; + StringInfoData s; + WalRcvRepSlotDbData *slot_data; + + initStringInfo(&s); + appendStringInfoString(&s, "LIST_DBID_FOR_LOGICAL_SLOTS"); + + if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0) + { + char *rawnames; + List *namelist; + ListCell *lc; + + appendStringInfoChar(&s, ' '); + rawnames = pstrdup(slot_names); + SplitIdentifierString(rawnames, ',', &namelist); + foreach(lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_identifier(lfirst(lc))); + } + } + + res = libpqrcv_PQexec(conn->streamConn, s.data); + pfree(s.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive list of slots from the primary server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQnfields(res) != 1) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Could not get list of slots: got %d fields, " + "expected 1", nfields))); + } + + ntuples = PQntuples(res); + for (int i = 0; i < ntuples; i++) + { + slot_data = palloc0(sizeof(WalRcvRepSlotDbData)); + if (!PQgetisnull(res, i, 0)) + slot_data->database = atooid(PQgetvalue(res, i, 0)); + + slotlist = lappend(slotlist, slot_data); + } + + PQclear(res); + + return slotlist; +} + +/* + * Get database name from primary conninfo. + * + * If dbanme is not found in connInfo, return NULL value. + * The caller should take care of handling NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + PQconninfoOption *opt; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (opt = opts; opt->keyword != NULL; ++opt) + { + /* Ignore connection options that are not present. */ + if (opt->val == NULL) + continue; + + if (strcmp(opt->keyword, "dbname") == 0 && opt->val[0] != '\0') + { + dbname = pstrdup(opt->val); + } + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 82f48a488e..ea461e7ef1 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -925,7 +925,8 @@ ParallelApplyWorkerMain(Datum main_arg) before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); SpinLockAcquire(&MyParallelShared->mutex); - MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; + MyParallelShared->logicalrep_worker_generation = + MyLogicalRepWorker->hdr.generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; SpinLockRelease(&MyParallelShared->mutex); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 501910b445..f8ba72795b 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -22,6 +22,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_authid.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" @@ -57,6 +58,29 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_slotsync_workers = 2; + +/* + * The local variables to store the current values of slot-sync related GUCs + * before each ConfigReload. + */ +static char *PrimaryConnInfoPreReload = NULL; +static char *PrimarySlotNamePreReload = NULL; +static char *SyncSlotNamesPreReload = NULL; + +/* + * Initial allocation size for dbids array for each SlotSyncWorker in dynamic + * shared memory. + */ +#define DB_PER_WORKER_ALLOC_INIT 100 + +/* + * Once initially allocated size is exhausted for dbids array, it is extended by + * DB_PER_WORKER_ALLOC_EXTRA size. + */ +#define DB_PER_WORKER_ALLOC_EXTRA 100 + +SlotSyncWorker *MySlotSyncWorker = NULL; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -70,6 +94,7 @@ typedef struct LogicalRepCtxStruct dshash_table_handle last_start_dsh; /* Background workers. */ + SlotSyncWorker *ss_workers; /* slot-sync workers */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -102,6 +127,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static void slotsync_worker_cleanup(SlotSyncWorker *worker); static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); @@ -178,6 +204,8 @@ get_subscription_list(void) } /* + * This is common code for logical workers and slotsync workers. + * * Wait for a background worker to start up and attach to the shmem context. * * This is only needed for cleaning up the shared memory in case the worker @@ -186,12 +214,14 @@ get_subscription_list(void) * Returns whether the attach was successful. */ static bool -WaitForReplicationWorkerAttach(LogicalRepWorker *worker, +WaitForReplicationWorkerAttach(LogicalWorkerHeader *worker, uint16 generation, - BackgroundWorkerHandle *handle) + BackgroundWorkerHandle *handle, + LWLock *lock) { BgwHandleStatus status; int rc; + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false; for (;;) { @@ -199,27 +229,32 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, CHECK_FOR_INTERRUPTS(); - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); /* Worker either died or has started. Return false if died. */ if (!worker->in_use || worker->proc) { - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); return worker->in_use; } - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Check if worker has died before attaching, and clean up after it. */ status = GetBackgroundWorkerPid(handle, &pid); if (status == BGWH_STOPPED) { - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Ensure that this was indeed the worker we waited for. */ if (generation == worker->generation) - logicalrep_worker_cleanup(worker); - LWLockRelease(LogicalRepWorkerLock); + { + if (is_slotsync_worker) + slotsync_worker_cleanup((SlotSyncWorker *) worker); + else + logicalrep_worker_cleanup((LogicalRepWorker *) worker); + } + LWLockRelease(lock); return false; } @@ -262,8 +297,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) if (isParallelApplyWorker(w)) continue; - if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && w->relid == relid && + (!only_running || w->hdr.proc)) { res = w; break; @@ -290,7 +325,8 @@ logicalrep_workers_find(Oid subid, bool only_running) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->in_use && w->subid == subid && (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && + (!only_running || w->hdr.proc)) res = lappend(res, w); } @@ -351,7 +387,7 @@ retry: { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (!w->in_use) + if (!w->hdr.in_use) { worker = w; slot = i; @@ -380,8 +416,8 @@ retry: * If the worker was marked in use but didn't manage to attach in * time, clean it up. */ - if (w->in_use && !w->proc && - TimestampDifferenceExceeds(w->launch_time, now, + if (w->hdr.in_use && !w->hdr.proc && + TimestampDifferenceExceeds(w->hdr.launch_time, now, wal_receiver_timeout)) { elog(WARNING, @@ -437,10 +473,10 @@ retry: /* Prepare the worker slot. */ worker->type = wtype; - worker->launch_time = now; - worker->in_use = true; - worker->generation++; - worker->proc = NULL; + worker->hdr.launch_time = now; + worker->hdr.in_use = true; + worker->hdr.generation++; + worker->hdr.proc = NULL; worker->dbid = dbid; worker->userid = userid; worker->subid = subid; @@ -457,7 +493,7 @@ retry: TIMESTAMP_NOBEGIN(worker->reply_time); /* Before releasing lock, remember generation for future identification. */ - generation = worker->generation; + generation = worker->hdr.generation; LWLockRelease(LogicalRepWorkerLock); @@ -510,7 +546,7 @@ retry: { /* Failed to start worker, so clean up the worker slot. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - Assert(generation == worker->generation); + Assert(generation == worker->hdr.generation); logicalrep_worker_cleanup(worker); LWLockRelease(LogicalRepWorkerLock); @@ -522,19 +558,23 @@ retry: } /* Now wait until it attaches. */ - return WaitForReplicationWorkerAttach(worker, generation, bgw_handle); + return WaitForReplicationWorkerAttach((LogicalWorkerHeader *) worker, + generation, + bgw_handle, + LogicalRepWorkerLock); } /* * Internal function to stop the worker and wait until it detaches from the - * slot. + * slot. It is used for both logical rep workers and slot-sync workers. */ static void -logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) +logicalrep_worker_stop_internal(LogicalWorkerHeader *worker, int signo, + LWLock *lock) { uint16 generation; - Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED)); + Assert(LWLockHeldByMeInMode(lock, LW_SHARED)); /* * Remember which generation was our worker so we can check if what we see @@ -550,7 +590,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) { int rc; - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Wait a bit --- we don't expect to have to wait long. */ rc = WaitLatch(MyLatch, @@ -564,7 +604,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) } /* Recheck worker status. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); /* * Check whether the worker slot is no longer used, which would mean @@ -591,7 +631,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) if (!worker->proc || worker->generation != generation) break; - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Wait a bit --- we don't expect to have to wait long. */ rc = WaitLatch(MyLatch, @@ -604,7 +644,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) CHECK_FOR_INTERRUPTS(); } - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); } } @@ -623,7 +663,9 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (worker) { Assert(!isParallelApplyWorker(worker)); - logicalrep_worker_stop_internal(worker, SIGTERM); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGTERM, + LogicalRepWorkerLock); } LWLockRelease(LogicalRepWorkerLock); @@ -669,8 +711,10 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) /* * Only stop the worker if the generation matches and the worker is alive. */ - if (worker->generation == generation && worker->proc) - logicalrep_worker_stop_internal(worker, SIGINT); + if (worker->hdr.generation == generation && worker->hdr.proc) + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGINT, + LogicalRepWorkerLock); LWLockRelease(LogicalRepWorkerLock); } @@ -696,14 +740,14 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) /* * Wake up (using latch) the specified logical replication worker. * - * Caller must hold lock, else worker->proc could change under us. + * Caller must hold lock, else worker->hdr.proc could change under us. */ void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - SetLatch(&worker->proc->procLatch); + SetLatch(&worker->hdr.proc->procLatch); } /* @@ -718,7 +762,7 @@ logicalrep_worker_attach(int slot) Assert(slot >= 0 && slot < max_logical_replication_workers); MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; - if (!MyLogicalRepWorker->in_use) + if (!MyLogicalRepWorker->hdr.in_use) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -727,7 +771,7 @@ logicalrep_worker_attach(int slot) slot))); } - if (MyLogicalRepWorker->proc) + if (MyLogicalRepWorker->hdr.proc) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -736,7 +780,7 @@ logicalrep_worker_attach(int slot) "another worker, cannot attach", slot))); } - MyLogicalRepWorker->proc = MyProc; + MyLogicalRepWorker->hdr.proc = MyProc; before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); LWLockRelease(LogicalRepWorkerLock); @@ -771,7 +815,9 @@ logicalrep_worker_detach(void) LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); if (isParallelApplyWorker(w)) - logicalrep_worker_stop_internal(w, SIGTERM); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) w, + SIGTERM, + LogicalRepWorkerLock); } LWLockRelease(LogicalRepWorkerLock); @@ -794,10 +840,10 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); worker->type = WORKERTYPE_UNKNOWN; - worker->in_use = false; - worker->proc = NULL; - worker->dbid = InvalidOid; + worker->hdr.in_use = false; + worker->hdr.proc = NULL; worker->userid = InvalidOid; + worker->dbid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; worker->leader_pid = InvalidPid; @@ -931,9 +977,18 @@ ApplyLauncherRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + + /* + * The launcher now takes care of launching both logical apply workers and + * logical slot-sync workers. Thus to cater to the requirements of both, + * start it as soon as a consistent state is reached. This will help + * slot-sync workers to start timely on a physical standby while on a + * non-standby server, it holds same meaning as that of + * BgWorkerStart_RecoveryFinished. + */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LauncherMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication launcher"); snprintf(bgw.bgw_type, BGW_MAXLEN, @@ -953,6 +1008,7 @@ void ApplyLauncherShmemInit(void) { bool found; + Size ssw_size; LogicalRepCtx = (LogicalRepCtxStruct *) ShmemInitStruct("Logical Replication Launcher Data", @@ -977,6 +1033,14 @@ ApplyLauncherShmemInit(void) SpinLockInit(&worker->relmutex); } } + + /* Allocate shared-memory for slot-sync workers pool now */ + ssw_size = mul_size(max_slotsync_workers, sizeof(SlotSyncWorker)); + LogicalRepCtx->ss_workers = (SlotSyncWorker *) + ShmemInitStruct("Replication slot-sync workers", ssw_size, &found); + + if (!found) + memset(LogicalRepCtx->ss_workers, 0, ssw_size); } /* @@ -1115,11 +1179,738 @@ ApplyLauncherWakeup(void) } /* - * Main loop for the apply launcher process. + * Clean up slot-sync worker info. + */ +static void +slotsync_worker_cleanup(SlotSyncWorker *worker) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_EXCLUSIVE)); + + worker->hdr.in_use = false; + worker->hdr.proc = NULL; + worker->slot = -1; + + if (DsaPointerIsValid(worker->dbids_dp)) + { + dsa_free(worker->dbids_dsa, worker->dbids_dp); + worker->dbids_dp = InvalidDsaPointer; + } + + if (worker->dbids_dsa) + { + dsa_detach(worker->dbids_dsa); + worker->dbids_dsa = NULL; + } + + worker->dbcount = 0; + + worker->monitoring_info.confirmed_lsn = 0; + worker->monitoring_info.last_update_time = 0; +} + +/* + * Attach Slot-sync worker to worker-slot assigned by launcher. + */ +void +slotsync_worker_attach(int slot) +{ + /* Block concurrent access. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + Assert(slot >= 0 && slot < max_slotsync_workers); + MySlotSyncWorker = &LogicalRepCtx->ss_workers[slot]; + MySlotSyncWorker->slot = slot; + + if (!MySlotSyncWorker->hdr.in_use) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot-sync worker slot %d is " + "empty, cannot attach", slot))); + } + + if (MySlotSyncWorker->hdr.proc) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot-sync worker slot %d is " + "already used by another worker, cannot attach", slot))); + } + + MySlotSyncWorker->hdr.proc = MyProc; + + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Detach the worker from DSM and update 'proc' and 'in_use'. + * Logical replication launcher will come to know using these + * that the worker has shutdown. + */ +void +slotsync_worker_detach(int code, Datum arg) +{ + dsa_detach((dsa_area *) DatumGetPointer(arg)); + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + MySlotSyncWorker->hdr.in_use = false; + MySlotSyncWorker->hdr.proc = NULL; + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Slot-Sync worker find. + * + * Walks the slot-sync workers pool and searches for one that matches given + * dbid. Since one worker can manage multiple dbs, so it walks the db array in + * each worker to find the match. + */ +static SlotSyncWorker * +slotsync_worker_find(Oid dbid) +{ + SlotSyncWorker *res = NULL; + Oid *dbids; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* Search for attached worker for a given dbid */ + for (int i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (!w->hdr.in_use) + continue; + + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); + for (int cnt = 0; cnt < w->dbcount; cnt++) + { + Oid wdbid = dbids[cnt]; + + if (wdbid == dbid) + { + res = w; + break; + } + } + + /* If worker is found, break the outer loop */ + if (res) + break; + } + + return res; +} + +/* + * Setup DSA for slot-sync worker. + * + * DSA is needed for dbids array. Since max number of dbs a worker can manage + * is not known, so initially fixed size to hold DB_PER_WORKER_ALLOC_INIT + * dbs is allocated. If this size is exhausted, it can be extended using + * dsa free and allocate routines. + */ +static dsa_handle +slotsync_dsa_setup(SlotSyncWorker *worker, int alloc_db_count) +{ + dsa_area *dbids_dsa; + dsa_pointer dbids_dp; + dsa_handle dbids_dsa_handle; + MemoryContext oldcontext; + + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + dbids_dsa = dsa_create(LWTRANCHE_SLOTSYNC_DSA); + dsa_pin(dbids_dsa); + dsa_pin_mapping(dbids_dsa); + + dbids_dp = dsa_allocate0(dbids_dsa, alloc_db_count * sizeof(Oid)); + + /* Set-up worker */ + worker->dbcount = 0; + worker->dbids_dsa = dbids_dsa; + worker->dbids_dp = dbids_dp; + + /* Get the handle. This is the one which can be passed to worker processes */ + dbids_dsa_handle = dsa_get_handle(dbids_dsa); + + ereport(DEBUG1, + (errmsg("allocated dsa for slot-sync worker for dbcount: %d", + alloc_db_count))); + + MemoryContextSwitchTo(oldcontext); + + return dbids_dsa_handle; +} + +/* + * Slot-sync worker launch or reuse + * + * Start new slot-sync background worker from the pool of available workers + * going by max_slotsync_workers count. If the worker pool is exhausted, + * reuse the existing worker with minimum number of dbs. The idea is to + * always distribute the dbs equally among launched workers. + * If initially allocated dbids array is exhausted for the selected worker, + * reallocate the dbids array with increased size and copy the existing + * dbids to it and assign the new one as well. + * + * Returns true on success, false on failure. + */ +static bool +slotsync_worker_launch_or_reuse(Oid dbid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + SlotSyncWorker *worker = NULL; + uint32 mindbcnt = 0; + uint32 alloc_count = 0; + uint32 copied_dbcnt = 0; + Oid *copied_dbids = NULL; + int worker_slot = -1; + dsa_handle handle; + Oid *dbids; + bool attach; + + Assert(OidIsValid(dbid)); + + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* Find unused worker slot. */ + for (int i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (!w->hdr.in_use) + { + worker = w; + worker_slot = i; + break; + } + } + + /* + * If all the workers are currently in use. Find the one with minimum + * number of dbs and use that. + */ + if (!worker) + { + for (int i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (i == 0) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + else if (w->dbcount < mindbcnt) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + } + } + + /* + * If worker is being reused, and there is vacancy in dbids array, just + * update dbids array and dbcount and we are done. But if dbids array is + * exhausted, reallocate dbids using dsa and copy the old dbids and assign + * the new one as well. + */ + if (worker->hdr.in_use) + { + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + if (worker->dbcount < DB_PER_WORKER_ALLOC_INIT) + { + dbids[worker->dbcount++] = dbid; + } + else + { + MemoryContext oldcontext; + + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + /* Remember the old dbids before we reallocate dsa. */ + copied_dbcnt = worker->dbcount; + copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid)); + memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid)); + + alloc_count = copied_dbcnt + DB_PER_WORKER_ALLOC_EXTRA; + + /* Free the existing dbids and allocate new with increased size */ + if (DsaPointerIsValid(worker->dbids_dp)) + dsa_free(worker->dbids_dsa, worker->dbids_dp); + + worker->dbids_dp = dsa_allocate0(worker->dbids_dsa, + alloc_count * sizeof(Oid)); + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + /* Copy the existing dbids */ + worker->dbcount = copied_dbcnt; + memcpy(dbids, copied_dbids, copied_dbcnt * sizeof(Oid)); + + /* Assign new dbid */ + dbids[worker->dbcount++] = dbid; + + MemoryContextSwitchTo(oldcontext); + } + + LWLockRelease(SlotSyncWorkerLock); + + ereport(LOG, + (errmsg("Added database %d to replication slot-sync " + "worker %d; dbcount now: %d", + dbid, worker_slot, worker->dbcount))); + return true; + } + + /* Prepare the new worker. */ + worker->hdr.launch_time = GetCurrentTimestamp(); + worker->hdr.in_use = true; + + /* + * 'proc' and 'slot' will be assigned in ReplSlotSyncWorkerMain when we + * attach this worker to a particular worker-pool slot + */ + worker->hdr.proc = NULL; + worker->slot = -1; + + /* TODO: do we really need 'generation', analyse more here */ + worker->hdr.generation++; + + /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */ + handle = slotsync_dsa_setup(worker, DB_PER_WORKER_ALLOC_INIT); + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + dbids[worker->dbcount++] = dbid; + + /* Before releasing lock, remember generation for future identification. */ + generation = worker->hdr.generation; + + LWLockRelease(SlotSyncWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + + Assert(worker_slot >= 0); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot-sync worker %d", worker_slot); + + snprintf(bgw.bgw_type, BGW_MAXLEN, "slot-sync worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(worker_slot); + + memcpy(bgw.bgw_extra, &handle, sizeof(dsa_handle)); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + Assert(generation == worker->hdr.generation); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase %s.", "max_worker_processes"))); + return false; + } + + /* Now wait until it attaches. */ + attach = WaitForReplicationWorkerAttach((LogicalWorkerHeader *) worker, + generation, + bgw_handle, + SlotSyncWorkerLock); + if (!attach) + ereport(WARNING, + (errmsg("Replication slot-sync worker failed to attach to " + "worker-pool slot %d", worker_slot))); + + /* Attach is done, now safe to log that the worker is managing dbid */ + if (attach) + ereport(LOG, + (errmsg("Added database %d to replication slot-sync " + "worker %d; dbcount now: %d", + dbid, worker_slot, worker->dbcount))); + return attach; +} + +/* + * Internal function to stop the slot-sync worker and wait until it detaches + * from the slot-sync worker-pool slot. + */ +static void +slotsync_worker_stop_internal(SlotSyncWorker *worker) +{ + int slot = worker->slot; + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + ereport(LOG, + (errmsg("Stopping replication slot-sync worker %d", + slot))); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGINT, + SlotSyncWorkerLock); + LWLockRelease(SlotSyncWorkerLock); + + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Stop all the slot-sync workers in use. + */ +static void +slotsync_workers_stop() +{ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (worker && worker->hdr.in_use) + slotsync_worker_stop_internal(worker); + } +} + + +/* + * Slot-sync workers remove obsolete DBs from db-list + * + * If the DBIds fetched from the primary are lesser than the ones being managed + * by slot-sync workers, remove extra dbs from worker's db-list. This may happen + * if some slots are removed on primary but 'synchronize_slot_names' has not + * been changed yet. + */ +static void +slotsync_remove_obsolete_dbs(List *remote_dbs) +{ + ListCell *lc; + Oid *dbids; + + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* Traverse slot-sync-workers to validate the DBs */ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (!worker->hdr.in_use) + continue; + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + for (int dbidx = 0; dbidx < worker->dbcount;) + { + Oid wdbid = dbids[dbidx]; + bool found = false; + + /* Check if current DB is still present in remote-db-list */ + foreach(lc, remote_dbs) + { + WalRcvRepSlotDbData *slot_db_data = lfirst(lc); + + if (slot_db_data->database == wdbid) + { + found = true; + break; + } + } + + /* If not found, then delete this db from worker's db-list */ + if (!found) + { + for (int i = dbidx; i < worker->dbcount; i++) + { + /* Shift the DBs and get rid of wdbid */ + if (i < (worker->dbcount - 1)) + dbids[i] = dbids[i + 1]; + } + + worker->dbcount--; + + ereport(LOG, + (errmsg("Removed database %d from replication slot-sync " + "worker %d; dbcount now: %d", + wdbid, worker->slot, worker->dbcount))); + } + + /* Else move to next db-position */ + else + { + dbidx++; + } + } + } + + LWLockRelease(SlotSyncWorkerLock); + + /* If dbcount for any worker has become 0, shut it down */ + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (worker->hdr.in_use && !worker->dbcount) + slotsync_worker_stop_internal(worker); + } +} + +/* + * Connect to primary server for slotsync purpose and return the connection + * info. + */ +static WalReceiverConn * +slotsync_remote_connect() +{ + WalReceiverConn *wrconn = NULL; + char *err; + char *dbname; + + if (max_slotsync_workers == 0) + return NULL; + + if (strcmp(synchronize_slot_names, "") == 0) + return NULL; + + /* The primary_slot_name is not set */ + if (!WalRcv || WalRcv->slotname[0] == '\0') + { + ereport(WARNING, + errmsg("Skipping slots synchronization as primary_slot_name " + "is not set.")); + return NULL; + } + + /* The hot_standby_feedback must be ON for slot-sync to work */ + if (!hot_standby_feedback) + { + ereport(WARNING, + errmsg("Skipping slots synchronization as hot_standby_feedback " + "is off.")); + return NULL; + } + + /* The dbname must be specified in primary_conninfo for slot-sync to work */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + { + ereport(WARNING, + errmsg("Skipping slots synchronization as dbname is not " + "specified in primary_conninfo.")); + return NULL; + } + + wrconn = walrcv_connect(PrimaryConnInfo, false, false, + "Logical Replication Launcher", &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + return wrconn; +} + +/* + * Save current slot-sync configurations. + * + * This function is invoked prior to each config-reload on receiving SIGHUP. + */ +static void +SaveCurrentSlotSyncConfigs() +{ + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + PrimarySlotNamePreReload = pstrdup(WalRcv->slotname); + SyncSlotNamesPreReload = pstrdup(synchronize_slot_names); +} + +/* + * Returns true if any of the slot-sync configurations changed. + */ +static bool +SlotSyncConfigsChanged() +{ + if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) + return true; + + if (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0) + return true; + + if (strcmp(SyncSlotNamesPreReload, synchronize_slot_names) != 0) + return true; + + /* + * If we have reached this stage, it means original value of + * hot_standby_feedback was 'true', so consider it changed if 'false' now. + */ + if (!hot_standby_feedback) + return true; + + return false; +} + +/* + * Launch slot-sync background workers. + * + * Connect to the primary, to get the list of DBIDs for slots configured + * in synchronize_slot_names. Then launch slot-sync workers (limited by + * max_slotsync_workers) where the DBs are distributed equally among + * those workers. + */ +static void +LaunchSlotSyncWorkers(long *wait_time, WalReceiverConn *wrconn) +{ + List *slots_dbs; + ListCell *lc; + MemoryContext tmpctx; + MemoryContext oldctx; + + Assert(wrconn); + + /* Use temporary context for the slot list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher slot-sync ctx", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(tmpctx); + + slots_dbs = walrcv_get_dbinfo_for_logical_slots(wrconn, + synchronize_slot_names); + + slotsync_remove_obsolete_dbs(slots_dbs); + + foreach(lc, slots_dbs) + { + WalRcvRepSlotDbData *slot_db_data = lfirst(lc); + SlotSyncWorker *w; + + Assert(OidIsValid(slot_db_data->database)); + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + w = slotsync_worker_find(slot_db_data->database); + LWLockRelease(SlotSyncWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If we failed to launch this slotsync worker, return and try + * launching rest of the workers in next sync cycle. But change + * launcher's wait time to minimum of wal_retrieve_retry_interval and + * default wait time to try next sync-cycle sooner. + */ + if (!slotsync_worker_launch_or_reuse(slot_db_data->database)) + { + *wait_time = Min(*wait_time, wal_retrieve_retry_interval); + break; + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(tmpctx); +} + +/* + * Launch logical replication apply workers for enabled subscriptions. + */ +static void +LaunchSubscriptionApplyWorker(long *wait_time) +{ + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases where + * a restart is expected (e.g., subscription parameter changes), + * another process should remove the last-start entry for the + * subscription so that the worker can be restarted without waiting + * for wal_retrieve_retry_interval to elapse. + */ + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + +/* + * Main loop for the launcher process. */ void -ApplyLauncherMain(Datum main_arg) +LauncherMain(Datum main_arg) { + WalReceiverConn *wrconn = NULL; + ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -1139,79 +1930,35 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + load_file("libpqwalreceiver", false); + + /* + * If it is Hot Standby, validate the configurations and make remote + * connection for slot-sync purpose + */ + if (RecoveryInProgress()) + wrconn = slotsync_remote_connect(NULL); + /* Enter main loop */ for (;;) { int rc; - List *sublist; - ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - /* Use temporary context to avoid leaking memory across cycles. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* Start any missing workers for enabled subscriptions. */ - sublist = get_subscription_list(); - foreach(lc, sublist) + /* + * If it is Hot standby, then try to launch slot-sync workers else + * launch apply workers. + */ + if (RecoveryInProgress()) { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; - TimestampTz last_start; - TimestampTz now; - long elapsed; - - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w != NULL) - continue; /* worker is running already */ - - /* - * If the worker is eligible to start now, launch it. Otherwise, - * adjust wait_time so that we'll wake up as soon as it can be - * started. - * - * Each subscription's apply worker can only be restarted once per - * wal_retrieve_retry_interval, so that errors do not cause us to - * repeatedly restart the worker as fast as possible. In cases - * where a restart is expected (e.g., subscription parameter - * changes), another process should remove the last-start entry - * for the subscription so that the worker can be restarted - * without waiting for wal_retrieve_retry_interval to elapse. - */ - last_start = ApplyLauncherGetWorkerStartTime(sub->oid); - now = GetCurrentTimestamp(); - if (last_start == 0 || - (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) - { - ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(WORKERTYPE_APPLY, - sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); - } - else - { - wait_time = Min(wait_time, - wal_retrieve_retry_interval - elapsed); - } + /* Launch only if we have succesfully made the connection */ + if (wrconn) + LaunchSlotSyncWorkers(&wait_time, wrconn); } - - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + else + LaunchSubscriptionApplyWorker(&wait_time); /* Wait for more work. */ rc = WaitLatch(MyLatch, @@ -1227,8 +1974,27 @@ ApplyLauncherMain(Datum main_arg) if (ConfigReloadPending) { + SaveCurrentSlotSyncConfigs(); + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* + * Stop the slot-sync workers if any of the related GUCs changed. + * These will be relaunched using the new values during next + * sync-cycle. Also revalidate the new configurations and + * reconnect. + */ + if (SlotSyncConfigsChanged()) + { + slotsync_workers_stop(); + + if (wrconn) + walrcv_disconnect(wrconn); + + if (RecoveryInProgress()) + wrconn = slotsync_remote_connect(); + } } } @@ -1260,7 +2026,8 @@ GetLeaderApplyWorkerPid(pid_t pid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid) + if (isParallelApplyWorker(w) && w->hdr.proc && + pid == w->hdr.proc->pid) { leader_pid = w->leader_pid; break; @@ -1298,13 +2065,13 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) memcpy(&worker, &LogicalRepCtx->workers[i], sizeof(LogicalRepWorker)); - if (!worker.proc || !IsBackendPid(worker.proc->pid)) + if (!worker.hdr.proc || !IsBackendPid(worker.hdr.proc->pid)) continue; if (OidIsValid(subid) && worker.subid != subid) continue; - worker_pid = worker.proc->pid; + worker_pid = worker.hdr.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); if (isTablesyncWorker(&worker)) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..2779e1fffb 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -522,6 +522,18 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && slot->data.synced) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical decoding", + NameStr(slot->data.name)), + errdetail("This slot is being synced from primary."), + errhint("Specify another replication slot."))); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..227e5c2552 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1077 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby from the + * primary + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot-sync workers on physical standby + * to fetch logical replication slot information from the primary server + * (PrimaryConnInfo), create the slots on the standby, and synchronize + * them periodically. Slot-sync workers only synchronize slots configured + * in 'synchronize_slot_names'. + * + * It also takes care of dropping the slots which were created by it and are + * currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME_MS before every next + * synchronization. If there is no activity observed on the primary for some + * time, the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS, but if any + * activity is observed, the nap time reverts to the default value. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool conflicting; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +List *sync_slot_names_list = NIL; + +/* Worker's nap time in case of regular activity on primary */ +#define WORKER_DEFAULT_NAPTIME_MS 10L /* 10 ms */ + +/* Worker's nap time in case of no-activity on primary */ +#define WORKER_INACTIVITY_NAPTIME_MS 10000L /* 10 sec */ + +/* + * Inactivity Threshold in ms before increasing nap time of worker. + * + * If the lsn of slot being monitored did not change for this threshold time, + * then increase nap time of current worker from WORKER_DEFAULT_NAPTIME_MS to + * WORKER_INACTIVITY_NAPTIME_MS. + */ +#define WORKER_INACTIVITY_THRESHOLD_MS 1000L /* 1 sec */ + +/* + * Wait for remote slot to pass locally reserved position. + */ +static bool +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) + +{ +#define WAIT_OUTPUT_COLUMN_COUNT 3 + StringInfoData cmd; + + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT restart_lsn, confirmed_flush_lsn, catalog_xmin" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); + + for (;;) + { + XLogRecPtr new_restart_lsn; + XLogRecPtr new_confirmed_lsn; + TransactionId new_catalog_xmin; + WalRcvExecResult *res; + TupleTableSlot *slot; + int rc; + bool isnull; + Oid slotRow[WAIT_OUTPUT_COLUMN_COUNT] = {LSNOID, LSNOID, XIDOID}; + + CHECK_FOR_INTERRUPTS(); + + /* Check if this standby is promoted while we are waiting */ + if (!RecoveryInProgress()) + { + /* + * The remote slot didn't pass the locally reserved position at + * the time of local promotion, so it's not safe to use. + */ + ereport( + WARNING, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg( + "slot-sync wait for slot %s interrupted by promotion, " + "slot creation aborted", remote_slot->name))); + pfree(cmd.data); + return false; + } + + res = walrcv_exec(wrconn, cmd.data, WAIT_OUTPUT_COLUMN_COUNT, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from" + " primary: %s", remote_slot->name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + ereport(WARNING, + (errmsg("slot \"%s\" disappeared from the primary, aborting" + " slot creation", remote_slot->name))); + pfree(cmd.data); + walrcv_clear_result(res); + return false; + } + + /* + * It is possible to get null values for lsns and xmin if slot is + * invalidated on primary, so handle accordingly. + */ + new_restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull)); + if (isnull) + { + ereport(WARNING, + (errmsg("slot \"%s\" invalidated on primary, aborting" + " slot creation", remote_slot->name))); + pfree(cmd.data); + ExecClearTuple(slot); + walrcv_clear_result(res); + return false; + } + + /* + * Once we got valid restart_lsn, then confirmed_lsn and catalog_xmin + * are expected to be valid/non-null, so assert if found null. + */ + new_confirmed_lsn = DatumGetLSN(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + new_catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 3, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && + TransactionIdFollowsOrEquals(new_catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + /* Update new values in remote_slot */ + remote_slot->restart_lsn = new_restart_lsn; + remote_slot->confirmed_lsn = new_confirmed_lsn; + remote_slot->catalog_xmin = new_catalog_xmin; + + ereport(LOG, + errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" + "and catalog xmin (%u) has now passed local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(new_restart_lsn), + new_catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + pfree(cmd.data); + return true; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + WORKER_DEFAULT_NAPTIME_MS, + WAIT_EVENT_REPL_SLOTSYNC_PRIMARY_CATCHP); + + ResetLatch(MyLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + ReplicationSlotMarkDirty(); +} + +/* + * Get list of local logical slot names which are synchronized from + * primary and belongs to one of the DBs passed in. + */ +static List * +get_local_synced_slot_names(Oid *dbids) +{ + List *localSyncedSlots = NIL; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && s->data.synced) + { + for (int j = 0; j < MySlotSyncWorker->dbcount; j++) + { + /* + * Add it to output list if this belongs to one of the + * worker's dbs. + */ + if (s->data.database == dbids[j]) + { + localSyncedSlots = lappend(localSyncedSlots, s); + break; + } + } + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return localSyncedSlots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if logical slot is locally invalidated i.e. invalided on + * standby but valid on primary. If found so, it sets locally_invalidated to + * true. + */ +static bool +slot_exists_locally(List *remote_slots, ReplicationSlot *local_slot, + bool *locally_invalidated) +{ + ListCell *cell; + + foreach(cell, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * if remote slot is marked as non-conflicting (i.e. not + * invalidated) but local slot is marked as invalidated, then set + * the bool. + */ + if (!remote_slot->conflicting && + local_slot->data.invalidated != RS_INVAL_NONE) + *locally_invalidated = true; + + return true; + } + } + + return false; +} + +/* + * Use slot_name in query. + * + * Check the dbid of the slot and if the dbid is one of the dbids managed by + * current worker, then use this slot-name in query to get the data from + * primary. If the slot is not created yet on standby (first time it is being + * queried), then too, use this slot in query. + */ +static bool +use_slot_in_query(char *slot_name, Oid *dbids) +{ + bool slot_found = false; + bool relevant_db = false; + ReplicationSlot *slot; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* Search for the local slot with the same name as slot_name */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && SlotIsLogical(s) && + (strcmp(NameStr(s->data.name), slot_name) == 0)) + { + slot_found = true; + slot = s; + break; + } + } + + /* Check if slot belongs to one of the input dbids */ + if (slot_found) + { + for (int j = 0; j < MySlotSyncWorker->dbcount; j++) + { + if (slot->data.database == dbids[j]) + { + relevant_db = true; + break; + } + } + } + + LWLockRelease(ReplicationSlotControlLock); + + /* + * Return TRUE if either slot is not yet created on standby or if it + * belongs to one of the dbs passed in dbids. + */ + if (!slot_found || relevant_db) + return true; + + return false; +} + + +/* + * Compute nap time for MySlotSyncWorker. + * + * The slot-sync worker takes a nap before it again checks for slots on primary. + * The time for each nap is computed here. + * + * The first slot managed by each worker is chosen for monitoring purpose. + * If the lsn of that slot changes during each sync-check time, then the + * nap time is kept at the regular value of WORKER_DEFAULT_NAPTIME_MS. + * When no lsn change is observed within the threshold period + * WORKER_INACTIVITY_THRESHOLD_MS, then the nap time is increased + * to WORKER_INACTIVITY_NAPTIME_MS. + * This nap time is brought back to WORKER_DEFAULT_NAPTIME_MS as soon as + * another lsn change is observed. + */ +static void +compute_naptime(RemoteSlot *remote_slot, long *naptime) +{ + TimestampTz now = GetCurrentTimestamp(); + + Assert(*naptime == WORKER_DEFAULT_NAPTIME_MS || *naptime == + WORKER_INACTIVITY_NAPTIME_MS); + + if (MySlotSyncWorker->monitoring_info.confirmed_lsn != + remote_slot->confirmed_lsn) + { + MySlotSyncWorker->monitoring_info.last_update_time = now; + MySlotSyncWorker->monitoring_info.confirmed_lsn = remote_slot->confirmed_lsn; + + /* Something changed; reset naptime to default. */ + *naptime = WORKER_DEFAULT_NAPTIME_MS; + } + else + { + if (*naptime == WORKER_DEFAULT_NAPTIME_MS) + { + /* + * If the inactivity time reaches the threshold, increase nap + * time. + */ + if (TimestampDifferenceExceeds(MySlotSyncWorker->monitoring_info.last_update_time, + now, WORKER_INACTIVITY_THRESHOLD_MS)) + *naptime = WORKER_INACTIVITY_NAPTIME_MS; + } + } +} + +/* + * This gets invalidation cause of remote slot. + */ +static ReplicationSlotInvalidationCause +get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name) +{ + WalRcvExecResult *res; + Oid slotRow[1] = {INT2OID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *slot; + ReplicationSlotInvalidationCause cause; + MemoryContext oldctx = CurrentMemoryContext; + + /* Syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_get_slot_invalidation_cause(%s)", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + pfree(cmd.data); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch invalidation cause for slot \"%s\" from" + " primary: %s", slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disappeared from the primary", + slot_name))); + + cause = DatumGetInt16(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + return cause; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either + * do not exist on primary or are no longer part of synchronize_slot_names. + * + * Also drop the slots that are valid on primary that got invalidated + * on standby due to conflict (say required rows removed on primary). + * The assumption is, that these will get recreated in next sync-cycle and + * it is okay to drop and recreate such slots as long as these are not + * consumable on standby (which is the case currently). + */ +static void +drop_obsolete_slots(Oid *dbids, List *remote_slot_list) +{ + List *local_slot_list = NIL; + ListCell *lc_slot; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* + * Get the list of local slots for dbids managed by this worker, so that + * those not on remote could be dropped. + */ + local_slot_list = get_local_synced_slot_names(dbids); + + foreach(lc_slot, local_slot_list) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc_slot); + bool local_exists = false; + bool locally_invalidated = false; + + local_exists = slot_exists_locally(remote_slot_list, local_slot, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!local_exists || locally_invalidated) + { + ReplicationSlotDrop(NameStr(local_slot->data.name), true, false); + + elog(LOG, "Dropped replication slot \"%s\" ", + NameStr(local_slot->data.name)); + } + } +} + +/* + * Construct Slot Query + * + * It constructs the query using dbids array and sync_slot_names_list + * in order to get slots information from the primary. + */ +static void +construct_slot_query(StringInfo s, Oid *dbids) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + appendStringInfo(s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, conflicting, " + " database FROM pg_catalog.pg_replication_slots" + " WHERE database IN "); + + appendStringInfoChar(s, '('); + for (int i = 0; i < MySlotSyncWorker->dbcount; i++) + { + char *dbname; + + if (i != 0) + appendStringInfoChar(s, ','); + + dbname = get_database_name(dbids[i]); + appendStringInfo(s, "%s", + quote_literal_cstr(dbname)); + pfree(dbname); + } + appendStringInfoChar(s, ')'); + + if (strcmp(synchronize_slot_names, "") != 0 && + strcmp(synchronize_slot_names, "*") != 0) + { + ListCell *lc; + bool first_slot = true; + + foreach(lc, sync_slot_names_list) + { + char *slot_name = lfirst(lc); + + if (!use_slot_in_query(slot_name, dbids)) + continue; + + if (first_slot) + appendStringInfoString(s, " AND slot_name IN ("); + else + appendStringInfoChar(s, ','); + + appendStringInfo(s, "%s", + quote_literal_cstr(slot_name)); + first_slot = false; + } + + if (!first_slot) + appendStringInfoChar(s, ')'); + } + +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot) +{ + bool found = false; + + /* Good to check again if standby is promoted */ + if (!RecoveryInProgress()) + { + ereport( + WARNING, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg( + "slot-sync for slot %s interrupted by promotion, " + "sync not possible", remote_slot->name))); + return; + } + + /* + * Make sure that concerned WAL is received before syncing slot to target + * lsn received from the primary. + */ + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd) + { + ereport(WARNING, + errmsg("skipping sync of slot \"%s\" as the received slot-sync " + "lsn %X/%X is ahead of the standby position %X/%X", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(WalRcv->latestWalEnd))); + return; + } + + /* Search for the named slot */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots && !found; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use) + found = (strcmp(NameStr(s->data.name), remote_slot->name) == 0); + } + LWLockRelease(ReplicationSlotControlLock); + + StartTransactionCommand(); + + /* Already existing slot, acquire */ + if (found) + { + ReplicationSlotAcquire(remote_slot->name, true); + + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(WARNING, + errmsg("not synchronizing slot %s; synchronization would move" + " it backward", remote_slot->name)); + + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + + /* Update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotSave(); + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase); + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + slot->data.synced = true; + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + /* + * If the local restart_lsn and/or local catalog_xmin is ahead of + * those on the remote then we cannot create the local slot in sync + * with primary because that would mean moving local slot backwards + * and we might not have WALs retained for old lsns. In this case we + * will wait for primary's restart_lsn and catalog_xmin to catch up + * with the local one before attempting the sync. + */ + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + if (!wait_for_primary_slot_catchup(wrconn, remote_slot)) + { + /* + * The remote slot didn't catch up to locally reserved + * position + */ + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + + } + + /* Update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotPersist(); + } + + ReplicationSlotRelease(); + CommitTransactionCommand(); +} + +/* + * Synchronize slots. + * + * It looks into dbids array maintained in dsa and gets the logical slots info + * from the primary for the slots configured in synchronize_slot_names and + * belonging to concerned dbids. It then updates the slots locally as per the + * data received from the primary. It creates the slots if not present on the + * standby. + * + * It returns nap time for the next sync-cycle. + */ +static long +synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 8 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *slot; + StringInfoData s; + List *remote_slot_list = NIL; + MemoryContext oldctx = CurrentMemoryContext; + long naptime = WORKER_DEFAULT_NAPTIME_MS; + Oid *dbids; + ListCell *cell; + int count = 0; + + if (strcmp(synchronize_slot_names, "") == 0 || !WalRcv) + return naptime; + + /* The primary_slot_name is not set yet */ + if (WalRcv->slotname[0] == '\0') + return naptime; + + /* WALs not received yet */ + if (XLogRecPtrIsInvalid(WalRcv->latestWalEnd)) + return naptime; + + /* + * No more writes to dbcount and dbids by launcher after this until we + * release this lock. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + + /* + * There is a small window between CHECK_FOR_INTERRUPTS done last and + * above lock acquiring, so there is a chance that synchronize_slot_names + * has changed making dbs assigned to this worker as invalid. In that + * case, launcher will make dbcount=0 and will send SIGINT to this worker. + * So check dbcount before proceeding. + */ + if (!MySlotSyncWorker->dbcount) + { + /* Return and handle the interrupts in main loop */ + return false; + } + + /* Get dbids from dsa */ + dbids = (Oid *) dsa_get_address(dsa, MySlotSyncWorker->dbids_dp); + + /* The syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + /* Construct query to get slots info from the primary */ + initStringInfo(&s); + construct_slot_query(&s, dbids); + + elog(DEBUG2, "slot-sync worker%d's query:%s \n", MySlotSyncWorker->slot, + s.data); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info from the primary: %s", + res->err))); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for lsns and xmin if slot is + * invalidated on primary, so handle accordingly. + */ + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + if (isnull) + remote_slot->confirmed_lsn = InvalidXLogRecPtr; + + remote_slot->restart_lsn = DatumGetLSN(slot_getattr(slot, 4, &isnull)); + if (isnull) + remote_slot->restart_lsn = InvalidXLogRecPtr; + + remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 5, &isnull)); + if (isnull) + remote_slot->catalog_xmin = InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(slot, 6, &isnull)); + Assert(!isnull); + + remote_slot->conflicting = DatumGetBool(slot_getattr(slot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(slot, + 8, &isnull)); + Assert(!isnull); + + if (remote_slot->conflicting) + remote_slot->invalidated = get_remote_invalidation_cause(wrconn, + remote_slot->name); + else + { + remote_slot->invalidated = RS_INVAL_NONE; + count++; + } + + /* Create list of remote slots to be used by drop_obsolete_slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + synchronize_one_slot(wrconn, remote_slot); + + /* + * Update naptime as required depending on slot activity. Check only + * for the first slot, if one slot has activity then all slots will. + */ + if (count == 1) + compute_naptime(remote_slot, &naptime); + + ExecClearTuple(slot); + } + + /* Drop local slots that no longer need to be synced. */ + drop_obsolete_slots(dbids, remote_slot_list); + + LWLockRelease(SlotSyncWorkerLock); + + /* We are done, free remote_slot_list elements */ + foreach(cell, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + pfree(remote_slot); + } + + walrcv_clear_result(res); + + return naptime; +} + +/* + * Initialize the list from raw synchronize_slot_names and cache it, in order + * to avoid parsing it repeatedly. Done at slot-sync worker startup and after + * each SIGHUP. + */ +static void +init_slot_names_list() +{ + char *rawname; + + list_free(sync_slot_names_list); + sync_slot_names_list = NIL; + + if (strcmp(synchronize_slot_names, "") != 0 && + strcmp(synchronize_slot_names, "*") != 0) + { + rawname = pstrdup(synchronize_slot_names); + SplitIdentifierString(rawname, ',', &sync_slot_names_list); + } +} + +/* + * Connect to remote (primary) server. + * + * This uses GUC PrimaryConnInfo in order to connect to the primary. For slot-sync + * to work, PrimaryConnInfo is required to specify dbname as well. + */ +static WalReceiverConn * +remote_connect() +{ + WalReceiverConn *wrconn = NULL; + char *err; + + wrconn = walrcv_connect(PrimaryConnInfo, true, false, "slot-sync", &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + return wrconn; +} + +/* + * Reconnect to remote (primary) server if PrimaryConnInfo has changed. + */ +static WalReceiverConn * +reconnect_if_needed(WalReceiverConn *wrconn_prev, char *conninfo_prev) +{ + WalReceiverConn *wrconn = NULL; + + /* If no change in PrimaryConnInfo, return the previous connection itself */ + if (strcmp(conninfo_prev, PrimaryConnInfo) == 0) + return wrconn_prev; + + walrcv_disconnect(wrconn_prev); + wrconn = remote_connect(); + return wrconn; +} + +/* + * Interrupt handler for main loop of slot-sync worker. + */ +static bool +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn, char **conninfo_prev) +{ + bool reload_done = false; + + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + elog(LOG, "Replication slot-sync worker %d is shutting" + " down on receiving SIGINT", MySlotSyncWorker->slot); + + /* + * TODO: we need to take care of dropping the slots belonging to dbids + * of this worker before exiting, for the case when all the dbids of + * this worker are obsoleted/dropped on primary and that is the reason + * for this worker's exit. + */ + walrcv_disconnect(wrconn); + proc_exit(0); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + + /* Save the PrimaryConnInfo before reloading */ + *conninfo_prev = pstrdup(PrimaryConnInfo); + + ProcessConfigFile(PGC_SIGHUP); + init_slot_names_list(); + reload_done = true; + } + + return reload_done; +} + +/* + * The main loop of our worker process. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + dsa_handle handle; + dsa_area *dsa; + WalReceiverConn *wrconn = NULL; + char *dbname; + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Attach to the dynamic shared memory segment for the slot-sync worker + * and find its table of contents. + */ + memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsa_handle)); + dsa = dsa_attach(handle); + if (!dsa) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory " + "segment for slot-sync worker"))); + + /* Primary initialization is complete. Now attach to our slot. */ + slotsync_worker_attach(worker_slot); + + elog(LOG, "Replication slot-sync worker %d started", worker_slot); + + before_shmem_exit(slotsync_worker_detach, PointerGetDatum(dsa)); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* + * Get the user provided dbname from the connection string, if dbname not + * provided, skip sync. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + proc_exit(0); + + /* + * Connect to the database specified by user in PrimaryConnInfo. We need a + * database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, + NULL, + 0); + init_slot_names_list(); + + /* Connect to primary node */ + wrconn = remote_connect(); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime; + bool config_reloaded = false; + char *conninfo_prev; + + config_reloaded = ProcessSlotSyncInterrupts(wrconn, &conninfo_prev); + + /* Reconnect if GUC primary_conninfo got changed */ + if (config_reloaded) + wrconn = reconnect_if_needed(wrconn, conninfo_prev); + + if (!RecoveryInProgress()) + proc_exit(0); + + naptime = synchronize_slots(dsa, wrconn); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + naptime, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + ResetLatch(MyLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + walrcv_disconnect(wrconn); + proc_exit(1); + } + } + + /* + * The slot-sync worker can not get here because it will only stop when it + * receives a SIGINT from the logical replication launcher, or when there + * is an error. + */ + Assert(false); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e2cee92cf2..7ca0b5464b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" @@ -246,7 +247,7 @@ wait_for_worker_state_change(char expected_state) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, false); - if (worker && worker->proc) + if (worker && worker->hdr.proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -535,7 +536,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* Signal the sync worker, as it may be waiting for us. */ - if (syncworker->proc) + if (syncworker->hdr.proc) logicalrep_worker_wakeup_ptr(syncworker); /* Now safe to release the LWLock */ diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..2b00bf845c 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -76,11 +76,12 @@ Node *replication_parse_result; %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_LIST_DBID_FOR_LOGICAL_SLOTS %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show + read_replication_slot timeline_history show list_dbid_for_logical_slots %type generic_option_list %type generic_option %type opt_timeline @@ -91,6 +92,7 @@ Node *replication_parse_result; %type opt_temporary %type create_slot_options create_slot_legacy_opt_list %type create_slot_legacy_opt +%type slot_name_list slot_name_list_opt %% @@ -114,6 +116,7 @@ command: | read_replication_slot | timeline_history | show + | list_dbid_for_logical_slots ; /* @@ -126,6 +129,33 @@ identify_system: } ; +slot_name_list: + IDENT + { + $$ = list_make1($1); + } + | slot_name_list ',' IDENT + { + $$ = lappend($1, $3); + } + +slot_name_list_opt: + slot_name_list { $$ = $1; } + | /* EMPTY */ { $$ = NIL; } + ; + +/* + * LIST_DBID_FOR_LOGICAL_SLOTS + */ +list_dbid_for_logical_slots: + K_LIST_DBID_FOR_LOGICAL_SLOTS slot_name_list_opt + { + ListDBForLogicalSlotsCmd *cmd = makeNode(ListDBForLogicalSlotsCmd); + cmd->slot_names = $2; + $$ = (Node *) cmd; + } + ; + /* * READ_REPLICATION_SLOT %s */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 1cc7fb858c..d4ecce6a47 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } +LIST_DBID_FOR_LOGICAL_SLOTS { return K_LIST_DBID_FOR_LOGICAL_SLOTS; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } @@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void) case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: + case K_LIST_DBID_FOR_LOGICAL_SLOTS: /* Yes; push back the first token so we can parse later. */ repl_pushed_back_token = first_token; return true; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1bb2eead7b..e0103c062e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -92,7 +92,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -317,6 +317,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.synced = false; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -654,12 +655,25 @@ restart: * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name, bool nowait) +ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd) { Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to standby. + */ + if (user_cmd && RecoveryInProgress() && MyReplicationSlot->data.synced) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot"), + errdetail("This slot is being synced from primary."))); + } + ReplicationSlotDropAcquired(); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d22c187834..ea15920305 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -221,11 +221,40 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name), true); + ReplicationSlotDrop(NameStr(*name), true, true); PG_RETURN_VOID(); } +/* + * SQL function for getting invalidation cause of a slot. + * + * Returns ReplicationSlotInvalidationCause enum value for valid slot_name; + * returns NULL if slot with given name is not found. + * + * It return RS_INVAL_NONE if the given slot is not invalidated. + */ +Datum +pg_get_slot_invalidation_cause(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + int slotno; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[slotno]; + + if (strcmp(NameStr(s->data.name), NameStr(*name)) == 0) + { + PG_RETURN_INT16(s->data.invalidated); + } + } + LWLockRelease(ReplicationSlotControlLock); + + PG_RETURN_NULL(); +} + /* * pg_get_replication_slots - SQL SRF showing all replication slots * that currently exist on the database cluster. @@ -233,7 +262,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -413,6 +442,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(false); } + values[i++] = BoolGetDatum(slot_contents.data.synced); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c old mode 100644 new mode 100755 index d22c98edeb..695cadc731 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -473,6 +473,124 @@ IdentifySystem(void) end_tup_output(tstate); } +static int +pg_qsort_namecmp(const void *a, const void *b) +{ + return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN); +} + +/* + * Handle the LIST_DBID_FOR_LOGICAL_SLOTS command. + * + * Given the slot-names, this function returns the list of database-ids + * corresponding to those slots. The returned list has no duplicates. + */ +static void +ListSlotDatabaseOIDs(ListDBForLogicalSlotsCmd *cmd) +{ + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + NameData *slot_names = NULL; + int numslot_names; + List *database_oids_list = NIL; + int slotno; + + numslot_names = list_length(cmd->slot_names); + if (numslot_names) + { + ListCell *lc; + int i = 0; + + slot_names = palloc(numslot_names * sizeof(NameData)); + foreach(lc, cmd->slot_names) + { + char *slot_name = lfirst(lc); + + ReplicationSlotValidateName(slot_name, ERROR); + namestrcpy(&slot_names[i++], slot_name); + } + + qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp); + } + + dest = CreateDestReceiver(DestRemoteSimple); + + /* Need a tuple descriptor representing a single column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "database_oid", + INT8OID, -1, 0); + + /* Prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + Oid dboid; + Datum values[1]; + bool nulls[1]; + + if (!slot->in_use) + continue; + + SpinLockAcquire(&slot->mutex); + + dboid = slot->data.database; + + SpinLockRelease(&slot->mutex); + + if (numslot_names) + { + /* + * If slot names were provided and the current slot name is not in + * the list, skip it. + */ + if (!bsearch((void *) &slot->data.name, (void *) slot_names, + numslot_names, sizeof(NameData), pg_qsort_namecmp)) + continue; + + /* + * If the current slot is a physical slot, error out. We are + * supposed to get only logical slots for sync-slot purpose. + */ + if (SlotIsPhysical(slot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("physical replication slot %s found in " + "synchronize_slot_names", + NameStr(slot->data.name)))); + } + + /* If synchronize_slot_names is '*', then skip physical slots */ + if (SlotIsPhysical(slot)) + continue; + + /* + * Check if the database OID is already in the list, and if so, skip + * this slot. + */ + if (list_member_oid(database_oids_list, dboid)) + continue; + + /* Add the database OID to the list */ + database_oids_list = lappend_oid(database_oids_list, dboid); + + values[0] = Int64GetDatum(dboid); + nulls[0] = (dboid == InvalidOid); + + /* Send it to dest */ + do_tup_output(tstate, values, nulls); + } + LWLockRelease(ReplicationSlotControlLock); + + /* Clean up the list */ + list_free(database_oids_list); + + end_tup_output(tstate); +} + /* Handle READ_REPLICATION_SLOT command */ static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd) @@ -1244,7 +1362,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname, !cmd->wait); + ReplicationSlotDrop(cmd->slotname, !cmd->wait, false); } /* @@ -2027,6 +2145,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ListDBForLogicalSlotsCmd: + cmdtag = "LIST_DBID_FOR_LOGICAL_SLOTS"; + set_ps_display(cmdtag); + ListSlotDatabaseOIDs((ListDBForLogicalSlotsCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 315a78cda9..fd9e73a49b 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -190,6 +190,8 @@ static const char *const BuiltinTrancheNames[] = { "LogicalRepLauncherDSA", /* LWTRANCHE_LAUNCHER_HASH: */ "LogicalRepLauncherHash", + /* LWTRANCHE_SLOTSYNC_DSA: */ + "SlotSyncWorkerDSA", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906ce..e62a3f1bc0 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,4 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 WaitEventExtensionLock 48 +SlotSyncWorkerLock 49 diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index daf2d57d3d..037c57f116 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of worker for synchronizing slots to a standby from primary." +REPL_SLOTSYNC_PRIMARY_CATCHP "Waiting for primary to catch-up in worker for synchronizing slots to a standby from primary." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 1b3914af7d..668714aab8 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -65,8 +65,11 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -3508,6 +3511,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slotsync_workers", + PGC_POSTMASTER, + REPLICATION_STANDBY, + gettext_noop("Maximum number of slot synchronization workers " + "on a standby."), + NULL, + }, + &max_slotsync_workers, + 2, 0, MAX_SLOTSYNC_WORKER_LIMIT, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 4b0a556b0a..23563fe3e1 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -357,6 +357,8 @@ #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery #synchronize_slot_names = '' # replication slot names to synchronize from # primary to streaming replication standby server +#max_slotsync_workers = 2 # maximum number of slot synchronization workers + # on a standby # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f0b7b9cbd8..ad8bf1cfbf 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11074,14 +11074,18 @@ proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', prosrc => 'pg_drop_replication_slot' }, +{ oid => '8484', descr => 'what caused the replication slot to become invalid', + proname => 'pg_get_slot_invalidation_cause', provolatile => 's', proisstrict => 't', + prorettype => 'int2', proargtypes => 'name', + prosrc => 'pg_get_slot_invalidation_cause' }, { oid => '3781', descr => 'information about replication slots currently in use', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,synced_slot}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 214dc6c29e..75b4b2040d 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -28,4 +29,7 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, + char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 4321ba8f86..bc9c1baea1 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ------------------------------- + * LIST_DBID_FOR_LOGICAL_SLOTS command + * ------------------------------- + */ +typedef struct ListDBForLogicalSlotsCmd +{ + NodeTag type; + List *slot_names; +} ListDBForLogicalSlotsCmd; /* ---------------------- * BASE_BACKUP command diff --git a/src/include/postmaster/bgworker_internals.h b/src/include/postmaster/bgworker_internals.h index 4ad63fd9bd..cccc5da8b2 100644 --- a/src/include/postmaster/bgworker_internals.h +++ b/src/include/postmaster/bgworker_internals.h @@ -22,6 +22,7 @@ * Maximum possible value of parallel workers. */ #define MAX_PARALLEL_WORKER_LIMIT 1024 +#define MAX_SLOTSYNC_WORKER_LIMIT 50 /* * List of background workers, private to postmaster. diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index a07c9cb311..bd12350616 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,9 +15,11 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_slotsync_workers; + extern void ApplyLauncherRegister(void); -extern void ApplyLauncherMain(Datum main_arg); +extern void LauncherMain(Datum main_arg); extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); @@ -31,4 +33,6 @@ extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); +extern PGDLLIMPORT char *PrimaryConnInfo; + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index bbd71d0b42..baad5a8f3a 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -19,6 +19,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); extern void TablesyncWorkerMain(Datum main_arg); +extern void ReplSlotSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 38c0072043..e734478e94 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,7 +15,6 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" -#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -111,6 +110,11 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a slot created by a sync-slot worker? + */ + bool synced; } ReplicationSlotPersistentData; /* @@ -226,7 +230,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); @@ -251,7 +255,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(bool is_shutdown); @@ -259,7 +262,6 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); -extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn); extern void SlotSyncInitConfig(void); #endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..c771da7ded 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -20,6 +20,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -191,6 +192,14 @@ typedef struct } proto; } WalRcvStreamOptions; +/* + * Slot's DBid related data + */ +typedef struct WalRcvRepSlotDbData +{ + Oid database; /* Slot's DBid received from remote */ +} WalRcvRepSlotDbData; + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; @@ -280,6 +289,22 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_logical_slots_fn + * + * Run LIST_DBID_FOR_LOGICAL_SLOTS on primary server to get the + * list of unique DBIDs for logical slots mentioned in 'slots' + */ +typedef List *(*walrcv_get_dbinfo_for_logical_slots_fn) (WalReceiverConn *conn, + const char *slots); + +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -393,6 +418,8 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbinfo_for_logical_slots_fn walrcv_get_dbinfo_for_logical_slots; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -417,6 +444,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbinfo_for_logical_slots(conn, slots) \ + WalReceiverFunctions->walrcv_get_dbinfo_for_logical_slots(conn, slots) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8f4bed0958..b1a1d99355 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -36,11 +36,9 @@ typedef enum LogicalRepWorkerType WORKERTYPE_PARALLEL_APPLY } LogicalRepWorkerType; -typedef struct LogicalRepWorker +/* Common data for Slotsync and LogicalRep workers */ +typedef struct LogicalWorkerHeader { - /* What type of worker is this? */ - LogicalRepWorkerType type; - /* Time at which this worker was launched. */ TimestampTz launch_time; @@ -53,6 +51,15 @@ typedef struct LogicalRepWorker /* Pointer to proc array. NULL if not running. */ PGPROC *proc; +} LogicalWorkerHeader; + +typedef struct LogicalRepWorker +{ + LogicalWorkerHeader hdr; + + /* What type of worker is this? */ + LogicalRepWorkerType type; + /* Database id to connect to. */ Oid dbid; @@ -96,6 +103,40 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +/* + * Shared memory structure for Slot-Sync worker. It is allocated by logical + * replication launcher and then read by each slot-sync worker. + * + * It is protected by LWLock (SlotSyncWorkerLock). Each slot-sync worker + * reading the structure needs to hold the lock in shared mode, whereas + * the logical replication launcher which updates it needs to hold the lock + * in exclusive mode. + */ +typedef struct SlotSyncWorker +{ + LogicalWorkerHeader hdr; + + /* The slot in worker pool to which slot-sync worker is attached */ + int slot; + + /* Count of dbids slot-sync worker manages */ + uint32 dbcount; + + /* DSA for dbids */ + dsa_area *dbids_dsa; + + /* dsa_pointer for dbids slot-sync worker manages */ + dsa_pointer dbids_dp; + + /* Info about slot being monitored for worker's naptime purpose */ + struct SlotSyncWorkerWatchSlot + { + XLogRecPtr confirmed_lsn; + TimestampTz last_update_time; + } monitoring_info; + +} SlotSyncWorker; + /* * State of the transaction in parallel apply worker. * @@ -234,12 +275,15 @@ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; /* Worker and subscription objects. */ extern PGDLLIMPORT Subscription *MySubscription; extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; +extern PGDLLIMPORT SlotSyncWorker *MySlotSyncWorker; extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); +extern void slotsync_worker_attach(int slot); +extern void slotsync_worker_detach(int code, Datum arg); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); @@ -327,9 +371,9 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->in_use && \ +#define isParallelApplyWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTablesyncWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) static inline bool @@ -341,14 +385,14 @@ am_tablesync_worker(void) static inline bool am_leader_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); } static inline bool am_parallel_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return isParallelApplyWorker(MyLogicalRepWorker); } diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index d77410bdea..334315acba 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -207,6 +207,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DATA, LWTRANCHE_LAUNCHER_DSA, LWTRANCHE_LAUNCHER_HASH, + LWTRANCHE_SLOTSYNC_DSA, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2c60400ade..f409d15714 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflicting - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) + l.conflicting, + l.synced_slot + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, synced_slot) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8de90c4958..cd55249aa4 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1428,6 +1428,7 @@ LimitState LimitStateCond List ListCell +ListDBForLogicalSlotsCmd ListDictionary ListParsedLex ListenAction @@ -1505,6 +1506,7 @@ LogicalRepWorkerType LogicalRewriteMappingData LogicalTape LogicalTapeSet +LogicalWorkerHeader LsnReadQueue LsnReadQueueNextFun LsnReadQueueNextStatus @@ -2306,6 +2308,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2561,6 +2564,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorker SlruCtl SlruCtlData SlruErrorCause @@ -3007,6 +3011,7 @@ WalLevel WalRcvData WalRcvExecResult WalRcvExecStatus +WalRcvRepSlotDbData WalRcvState WalRcvStreamOptions WalRcvWakeupReason -- 2.34.1