From 7eb3d10ef5084c2244662fd9c9bad8f2c9878102 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 19 Sep 2023 09:41:01 +0530 Subject: [PATCH v18 2/2] Add logical slot sync capability to physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical subscribers are not blocked after failover. All the logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker(s) on the standby server ping the primary at regular intervals to get the necessary logical slot information and create/update the slots locally. A new GUC 'max_slotsync_workers' defines the maximum number of slot-sync workers on the standby. This parameter can only be set at server start. Now the replication launcher on the physical standby queries primary to get the list of dbids that belong to the slots mentioned in GUC 'synchronize_slot_names'. Once it gets the dbids, if dbids < max_slotsync_workers, it starts only that many workers and if dbids > max_slotsync_workers, it starts max_slotsync_workers and divides the work equally among them. Each worker is then responsible to keep on syncing the logical slots belonging to the DBs assigned to it. For example, let's say the slots mentioned in 'synchronize_slot_names' on the primary belongs to 4 DBs and say 'max_slotsync_workers' is 4, then a new worker will be launched for each db. If a new logical slot with a different DB is found by replication launcher, it will assign this new db to the worker handling the minimum number of dbs currently (or first worker in case of equal count). Each slot-sync worker will have its own dbids list. Since the upper limit of this dbid-count is not known, it needs to be handled using dsa. We initially allocate memory to hold 100 dbids for each worker. If this limit is exhausted, we reallocate this memory with size incremented again by 100. The naptime of worker is tuned according to the activity on the primary. Each worker starts with naptime of 10ms and if no activity is observed on the primary for some time, then naptime is increased to 10sec. And if activity is observed again, naptime is reduced back to 10ms. Each worker uses one slot (first one assigned to it) for monitoring purpose. If there is no change in lsn of that slot for some threshold time, naptime is increased to 10sec and as soon as a change is observed, naptime is reduced back to 10ms. The logical slots created by slot-sync workers on physical standbys are not allowed to be consumed. Any attempt to do pg_logical_slot_get_changes on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby due to conflict (say required rows removed on the primary), then that slot is dropped and recreated on the standby in next sync-cycle. It is okay to recerate such slots as long as these are not consumable on the standby (which is the case currently). If there is any change in synchronize_slot_names, then the slots that are no longer part of it or the ones that no longer exist on the primary will be dropped by slot-sync workers on the physical standbys. --- doc/src/sgml/config.sgml | 28 + src/backend/postmaster/bgworker.c | 3 + .../libpqwalreceiver/libpqwalreceiver.c | 76 ++ src/backend/replication/logical/Makefile | 1 + .../replication/logical/applyparallelworker.c | 7 +- src/backend/replication/logical/launcher.c | 902 ++++++++++++-- .../replication/logical/logicalfuncs.c | 13 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1063 +++++++++++++++++ src/backend/replication/logical/tablesync.c | 9 +- src/backend/replication/logical/worker.c | 4 +- src/backend/replication/repl_gram.y | 32 +- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 3 +- src/backend/replication/slotfuncs.c | 29 + src/backend/replication/walsender.c | 120 ++ src/backend/storage/lmgr/lwlock.c | 2 + src/backend/storage/lmgr/lwlocknames.txt | 1 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 16 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/catalog/pg_proc.dat | 4 + src/include/commands/subscriptioncmds.h | 4 + src/include/nodes/replnodes.h | 9 + src/include/postmaster/bgworker_internals.h | 1 + src/include/replication/logicallauncher.h | 4 + src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 7 +- src/include/replication/walreceiver.h | 23 + src/include/replication/worker_internal.h | 63 +- src/include/storage/lwlock.h | 1 + src/tools/pgindent/typedefs.list | 5 + 32 files changed, 2307 insertions(+), 130 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c mode change 100644 => 100755 src/backend/replication/walsender.c diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 3d81bd308c..8abc685015 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5015,6 +5015,34 @@ ANY num_sync ( + max_slotsync_workers (integer) + + max_slotsync_workers configuration parameter + + + + + Specifies maximum number of slot synchronization workers. + + + Slot synchronization workers are taken from the pool defined by + max_worker_processes. + + + The default value is 2. This parameter can only be set at server + start. + + + The slot-sync workers are needed for synchronization of logical replication + slots from the primary server to the physical standby so that logical + subscribers are not blocked after failover. Slot-sync workers need + synchronize_slot_names to be configured correctly in + order to synchronize the logical replication slots. + + + + diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 505e38376c..1df23a99c5 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -129,6 +129,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..f4b2a80da6 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,8 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static List *libpqrcv_get_dbinfo_for_logical_slots(WalReceiverConn *conn, + const char *slot_names); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -96,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_get_dbinfo_for_logical_slots = libpqrcv_get_dbinfo_for_logical_slots, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -409,6 +413,78 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get DB info for logical slots + * + * It gets the DBIDs for slot_names from primary. The list returned + * by LIST_DBID_FOR_LOGICAL_SLOTS has no duplicates. + */ +static List * +libpqrcv_get_dbinfo_for_logical_slots(WalReceiverConn *conn, + const char *slot_names) +{ + PGresult *res; + List *slotlist = NIL; + int ntuples; + StringInfoData s; + WalRcvRepSlotDbData *slot_data; + + initStringInfo(&s); + appendStringInfoString(&s, "LIST_DBID_FOR_LOGICAL_SLOTS"); + + if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0) + { + char *rawnames; + List *namelist; + ListCell *lc; + + appendStringInfoChar(&s, ' '); + rawnames = pstrdup(slot_names); + SplitIdentifierString(rawnames, ',', &namelist); + foreach(lc, namelist) + { + if (lc != list_head(namelist)) + appendStringInfoChar(&s, ','); + appendStringInfo(&s, "%s", + quote_identifier(lfirst(lc))); + } + } + + res = libpqrcv_PQexec(conn->streamConn, s.data); + pfree(s.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive list of slots from the primary server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQnfields(res) != 1) + { + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Could not get list of slots: got %d fields, " + "expected 1", nfields))); + } + + ntuples = PQntuples(res); + for (int i = 0; i < ntuples; i++) + { + slot_data = palloc0(sizeof(WalRcvRepSlotDbData)); + if (!PQgetisnull(res, i, 0)) + slot_data->database = atooid(PQgetvalue(res, i, 0)); + + slotlist = lappend(slotlist, slot_data); + } + + PQclear(res); + + return slotlist; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 82f48a488e..c140a3b9e1 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -436,10 +436,10 @@ pa_launch_parallel_worker(void) } launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY, - MyLogicalRepWorker->dbid, + MyLogicalRepWorker->hdr.dbid, MySubscription->oid, MySubscription->name, - MyLogicalRepWorker->userid, + MyLogicalRepWorker->hdr.userid, InvalidOid, dsm_segment_handle(winfo->dsm_seg)); @@ -925,7 +925,8 @@ ParallelApplyWorkerMain(Datum main_arg) before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); SpinLockAcquire(&MyParallelShared->mutex); - MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; + MyParallelShared->logicalrep_worker_generation = + MyLogicalRepWorker->hdr.generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; SpinLockRelease(&MyParallelShared->mutex); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7882fc91ce..0a3e4f2742 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -22,6 +22,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_authid.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" @@ -57,6 +58,21 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_slotsync_workers = 2; + +/* + * Initial allocation size for dbids array for each SlotSyncWorker in dynamic + * shared memory. + */ +#define DB_PER_WORKER_ALLOC_INIT 100 + +/* + * Once initially allocated size is exhausted for dbids array, it is extended by + * DB_PER_WORKER_ALLOC_EXTRA size. + */ +#define DB_PER_WORKER_ALLOC_EXTRA 100 + +SlotSyncWorker *MySlotSyncWorker = NULL; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -70,6 +86,7 @@ typedef struct LogicalRepCtxStruct dshash_table_handle last_start_dsh; /* Background workers. */ + SlotSyncWorker *ss_workers; /* slot-sync workers */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -102,6 +119,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static void slotsync_worker_cleanup(SlotSyncWorker *worker); static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); @@ -178,6 +196,8 @@ get_subscription_list(void) } /* + * This is common code for logical workers and slotsync workers. + * * Wait for a background worker to start up and attach to the shmem context. * * This is only needed for cleaning up the shared memory in case the worker @@ -186,12 +206,14 @@ get_subscription_list(void) * Returns whether the attach was successful. */ static bool -WaitForReplicationWorkerAttach(LogicalRepWorker *worker, +WaitForReplicationWorkerAttach(WorkerHeader *worker, uint16 generation, - BackgroundWorkerHandle *handle) + BackgroundWorkerHandle *handle, + LWLock *lock) { BgwHandleStatus status; int rc; + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false; for (;;) { @@ -199,27 +221,32 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, CHECK_FOR_INTERRUPTS(); - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); /* Worker either died or has started. Return false if died. */ if (!worker->in_use || worker->proc) { - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); return worker->in_use; } - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Check if worker has died before attaching, and clean up after it. */ status = GetBackgroundWorkerPid(handle, &pid); if (status == BGWH_STOPPED) { - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Ensure that this was indeed the worker we waited for. */ if (generation == worker->generation) - logicalrep_worker_cleanup(worker); - LWLockRelease(LogicalRepWorkerLock); + { + if (is_slotsync_worker) + slotsync_worker_cleanup((SlotSyncWorker *) worker); + else + logicalrep_worker_cleanup((LogicalRepWorker *) worker); + } + LWLockRelease(lock); return false; } @@ -262,8 +289,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) if (isParallelApplyWorker(w)) continue; - if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && w->relid == relid && + (!only_running || w->hdr.proc)) { res = w; break; @@ -290,7 +317,8 @@ logicalrep_workers_find(Oid subid, bool only_running) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->in_use && w->subid == subid && (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && + (!only_running || w->hdr.proc)) res = lappend(res, w); } @@ -351,7 +379,7 @@ retry: { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (!w->in_use) + if (!w->hdr.in_use) { worker = w; slot = i; @@ -380,8 +408,8 @@ retry: * If the worker was marked in use but didn't manage to attach in * time, clean it up. */ - if (w->in_use && !w->proc && - TimestampDifferenceExceeds(w->launch_time, now, + if (w->hdr.in_use && !w->hdr.proc && + TimestampDifferenceExceeds(w->hdr.launch_time, now, wal_receiver_timeout)) { elog(WARNING, @@ -437,12 +465,12 @@ retry: /* Prepare the worker slot. */ worker->type = wtype; - worker->launch_time = now; - worker->in_use = true; - worker->generation++; - worker->proc = NULL; - worker->dbid = dbid; - worker->userid = userid; + worker->hdr.launch_time = now; + worker->hdr.in_use = true; + worker->hdr.generation++; + worker->hdr.proc = NULL; + worker->hdr.dbid = dbid; + worker->hdr.userid = userid; worker->subid = subid; worker->relid = relid; worker->relstate = SUBREL_STATE_UNKNOWN; @@ -457,7 +485,7 @@ retry: TIMESTAMP_NOBEGIN(worker->reply_time); /* Before releasing lock, remember generation for future identification. */ - generation = worker->generation; + generation = worker->hdr.generation; LWLockRelease(LogicalRepWorkerLock); @@ -510,7 +538,7 @@ retry: { /* Failed to start worker, so clean up the worker slot. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - Assert(generation == worker->generation); + Assert(generation == worker->hdr.generation); logicalrep_worker_cleanup(worker); LWLockRelease(LogicalRepWorkerLock); @@ -522,7 +550,9 @@ retry: } /* Now wait until it attaches. */ - return WaitForReplicationWorkerAttach(worker, generation, bgw_handle); + return WaitForReplicationWorkerAttach((WorkerHeader *) worker, generation, + bgw_handle, + LogicalRepWorkerLock); } /* @@ -540,13 +570,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * Remember which generation was our worker so we can check if what we see * is still the same one. */ - generation = worker->generation; + generation = worker->hdr.generation; /* * If we found a worker but it does not have proc set then it is still * starting up; wait for it to finish starting and then kill it. */ - while (worker->in_use && !worker->proc) + while (worker->hdr.in_use && !worker->hdr.proc) { int rc; @@ -571,16 +601,16 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * that the worker has exited, or whether the worker generation is * different, meaning that a different worker has taken the slot. */ - if (!worker->in_use || worker->generation != generation) + if (!worker->hdr.in_use || worker->hdr.generation != generation) return; /* Worker has assigned proc, so it has started. */ - if (worker->proc) + if (worker->hdr.proc) break; } /* Now terminate the worker ... */ - kill(worker->proc->pid, signo); + kill(worker->hdr.proc->pid, signo); /* ... and wait for it to die. */ for (;;) @@ -588,7 +618,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) int rc; /* is it gone? */ - if (!worker->proc || worker->generation != generation) + if (!worker->hdr.proc || worker->hdr.generation != generation) break; LWLockRelease(LogicalRepWorkerLock); @@ -669,7 +699,7 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) /* * Only stop the worker if the generation matches and the worker is alive. */ - if (worker->generation == generation && worker->proc) + if (worker->hdr.generation == generation && worker->hdr.proc) logicalrep_worker_stop_internal(worker, SIGINT); LWLockRelease(LogicalRepWorkerLock); @@ -696,14 +726,14 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) /* * Wake up (using latch) the specified logical replication worker. * - * Caller must hold lock, else worker->proc could change under us. + * Caller must hold lock, else worker->hdr.proc could change under us. */ void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - SetLatch(&worker->proc->procLatch); + SetLatch(&worker->hdr.proc->procLatch); } /* @@ -718,7 +748,7 @@ logicalrep_worker_attach(int slot) Assert(slot >= 0 && slot < max_logical_replication_workers); MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; - if (!MyLogicalRepWorker->in_use) + if (!MyLogicalRepWorker->hdr.in_use) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -727,7 +757,7 @@ logicalrep_worker_attach(int slot) slot))); } - if (MyLogicalRepWorker->proc) + if (MyLogicalRepWorker->hdr.proc) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -736,7 +766,7 @@ logicalrep_worker_attach(int slot) "another worker, cannot attach", slot))); } - MyLogicalRepWorker->proc = MyProc; + MyLogicalRepWorker->hdr.proc = MyProc; before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); LWLockRelease(LogicalRepWorkerLock); @@ -794,10 +824,10 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); worker->type = WORKERTYPE_UNKNOWN; - worker->in_use = false; - worker->proc = NULL; - worker->dbid = InvalidOid; - worker->userid = InvalidOid; + worker->hdr.in_use = false; + worker->hdr.proc = NULL; + worker->hdr.dbid = InvalidOid; + worker->hdr.userid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; worker->leader_pid = InvalidPid; @@ -931,7 +961,16 @@ ApplyLauncherRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + + /* + * The launcher now takes care of launching both logical apply workers and + * logical slot-sync workers. Thus to cater to the requirements of both, + * start it as soon as a consistent state is reached. This will help + * slot-sync workers to start timely on a physical standby while on a + * non-standby server, it holds same meaning as that of + * BgWorkerStart_RecoveryFinished. + */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -953,6 +992,7 @@ void ApplyLauncherShmemInit(void) { bool found; + Size ssw_size; LogicalRepCtx = (LogicalRepCtxStruct *) ShmemInitStruct("Logical Replication Launcher Data", @@ -977,6 +1017,14 @@ ApplyLauncherShmemInit(void) SpinLockInit(&worker->relmutex); } } + + /* Allocate shared-memory for slot-sync workers pool now */ + ssw_size = mul_size(max_slotsync_workers, sizeof(SlotSyncWorker)); + LogicalRepCtx->ss_workers = (SlotSyncWorker *) + ShmemInitStruct("Replication slot-sync workers", ssw_size, &found); + + if (!found) + memset(LogicalRepCtx->ss_workers, 0, ssw_size); } /* @@ -1114,12 +1162,703 @@ ApplyLauncherWakeup(void) kill(LogicalRepCtx->launcher_pid, SIGUSR1); } +/* + * Clean up slot-sync worker info. + */ +static void +slotsync_worker_cleanup(SlotSyncWorker *worker) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_EXCLUSIVE)); + + worker->hdr.in_use = false; + worker->hdr.proc = NULL; + worker->hdr.dbid = InvalidOid; + worker->hdr.userid = InvalidOid; + worker->slot = -1; + + if (DsaPointerIsValid(worker->dbids_dp)) + { + dsa_free(worker->dbids_dsa, worker->dbids_dp); + worker->dbids_dp = InvalidDsaPointer; + } + + if (worker->dbids_dsa) + { + dsa_detach(worker->dbids_dsa); + worker->dbids_dsa = NULL; + } + + worker->dbcount = 0; + + MemSet(NameStr(worker->monitoring_info.slot_name), 0, NAMEDATALEN); + worker->monitoring_info.confirmed_lsn = 0; + worker->monitoring_info.last_update_time = 0; +} + +/* + * Attach Slot-sync worker to worker-slot assigned by launcher. + */ +void +slotsync_worker_attach(int slot) +{ + /* Block concurrent access. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + Assert(slot >= 0 && slot < max_slotsync_workers); + MySlotSyncWorker = &LogicalRepCtx->ss_workers[slot]; + MySlotSyncWorker->slot = slot; + + if (!MySlotSyncWorker->hdr.in_use) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot-sync worker slot %d is " + "empty, cannot attach", slot))); + } + + if (MySlotSyncWorker->hdr.proc) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot-sync worker slot %d is " + "already used by another worker, cannot attach", slot))); + } + + MySlotSyncWorker->hdr.proc = MyProc; + + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Slot Sync worker find. + * + * Walks the slot-sync workers pool and searches for one that matches given + * dbid. Since one worker can manage multiple dbs, so it walks the db array in + * each worker to find the match. + */ +static SlotSyncWorker * +slotsync_worker_find(Oid dbid) +{ + int i; + SlotSyncWorker *res = NULL; + Oid *dbids; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* Search for attached worker for a given dbid */ + for (i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + int cnt; + + if (!w->hdr.in_use) + continue; + + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); + for (cnt = 0; cnt < w->dbcount; cnt++) + { + Oid wdbid = dbids[cnt]; + + if (wdbid == dbid) + { + res = w; + break; + } + } + + /* if worker is found, break the outer loop */ + if (res) + break; + } + + return res; +} + +/* + * Setup DSA for slot-sync worker. + * + * DSA is needed for dbids array. Since max number of dbs a worker can manage + * is not known, so initially fixed size to hold DB_PER_WORKER_ALLOC_INIT + * dbs is allocated. If this size is exhausted, it can be extended using + * dsa free and allocate routines. + */ +static dsa_handle +slotsync_dsa_setup(SlotSyncWorker *worker, int alloc_db_count) +{ + dsa_area *dbids_dsa; + dsa_pointer dbids_dp; + dsa_handle dbids_dsa_handle; + MemoryContext oldcontext; + + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + dbids_dsa = dsa_create(LWTRANCHE_SLOTSYNC_DSA); + dsa_pin(dbids_dsa); + dsa_pin_mapping(dbids_dsa); + + dbids_dp = dsa_allocate0(dbids_dsa, alloc_db_count * sizeof(Oid)); + + /* set-up worker */ + worker->dbcount = 0; + worker->dbids_dsa = dbids_dsa; + worker->dbids_dp = dbids_dp; + + /* Get the handle. This is the one which can be passed to worker processes */ + dbids_dsa_handle = dsa_get_handle(dbids_dsa); + + ereport(DEBUG1, + (errmsg("allocated dsa for slot-sync worker for dbcount: %d", + alloc_db_count))); + + MemoryContextSwitchTo(oldcontext); + + return dbids_dsa_handle; +} + +/* + * Stop the slot-sync worker and wait until it detaches from the slot. + * + * TODO: try to merge it with logicalrep_worker_stop_internal() + */ +static void +slotsync_worker_stop(SlotSyncWorker *worker) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* send SIGINT so that it exists cleanly ... */ + kill(worker->hdr.proc->pid, SIGINT); + + /* ... and wait for it to exit. */ + for (;;) + { + int rc; + + /* is it gone? */ + if (!worker->hdr.proc) + break; + + LWLockRelease(SlotSyncWorkerLock); + + /* Wait a bit --- we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + } +} + +/* + * Slot sync worker launch or reuse + * + * Start new slot-sync background worker from the pool of available workers + * going by max_slotsync_workers count. If the worker pool is exhausted, + * reuse the existing worker with minimum number of dbs. The idea is to + * always distribute the dbs equally among launched workers. + * If initially allocated dbids array is exhausted for the selected worker, + * reallocate the dbids array with increased size and copy the existing + * dbids to it and assign the new one as well. + * + * Returns true on success, false on failure. + */ +static bool +slotsync_worker_launch_or_reuse(Oid dbid, Oid userid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + SlotSyncWorker *worker = NULL; + uint32 mindbcnt = 0; + uint32 alloc_count = 0; + uint32 copied_dbcnt = 0; + Oid *copied_dbids = NULL; + int worker_slot = -1; + dsa_handle handle; + Oid *dbids; + int i; + + Assert(OidIsValid(dbid)); + + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* Find unused worker slot. */ + for (i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (!w->hdr.in_use) + { + worker = w; + worker_slot = i; + break; + } + } + + /* + * If all the workers are currently in use. Find the one with minimum + * number of dbs and use that. + */ + if (!worker) + { + for (i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (i == 0) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + else if (w->dbcount < mindbcnt) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + } + } + + /* + * If worker is being reused, and there is vacancy in dbids array, just + * update dbids array and dbcount and we are done. But if dbids array is + * exhausted, reallocate dbids using dsa and copy the old dbids and assign + * the new one as well. + */ + if (worker->hdr.in_use) + { + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + if (worker->dbcount < DB_PER_WORKER_ALLOC_INIT) + { + dbids[worker->dbcount++] = dbid; + + } + else + { + MemoryContext oldcontext; + + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + /* Remember the old dbids before we reallocate dsa. */ + copied_dbcnt = worker->dbcount; + copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid)); + memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid)); + + alloc_count = copied_dbcnt + DB_PER_WORKER_ALLOC_EXTRA; + + /* Free the existing dbids and allocate new with increased size */ + if (DsaPointerIsValid(worker->dbids_dp)) + dsa_free(worker->dbids_dsa, worker->dbids_dp); + + worker->dbids_dp = dsa_allocate0(worker->dbids_dsa, + alloc_count * sizeof(Oid)); + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + /* Copy the existing dbids */ + worker->dbcount = copied_dbcnt; + memcpy(dbids, copied_dbids, copied_dbcnt * sizeof(Oid)); + + /* Assign new dbid */ + dbids[worker->dbcount++] = dbid; + + MemoryContextSwitchTo(oldcontext); + } + + LWLockRelease(SlotSyncWorkerLock); + + ereport(LOG, + (errmsg("Adding database %d to replication slot" + " synchronization worker %d; dbcount now: %d", + dbid, worker_slot, worker->dbcount))); + + return true; + + } + + /* Prepare the new worker. */ + worker->hdr.launch_time = GetCurrentTimestamp(); + worker->hdr.in_use = true; + + /* + * 'proc' and 'slot' will be assigned in ReplSlotSyncWorkerMain when we + * attach this worker to a particular worker-pool slot + */ + worker->hdr.proc = NULL; + worker->slot = -1; + worker->hdr.dbid = dbid; + + /* TODO: do we really need 'generation', analyse more here */ + worker->hdr.generation++; + + /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */ + handle = slotsync_dsa_setup(worker, DB_PER_WORKER_ALLOC_INIT); + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + dbids[worker->dbcount++] = dbid; + worker->hdr.userid = userid; + + /* Before releasing lock, remember generation for future identification. */ + generation = worker->hdr.generation; + + LWLockRelease(SlotSyncWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + + Assert(worker_slot >= 0); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot-sync worker %d", worker_slot); + + snprintf(bgw.bgw_type, BGW_MAXLEN, "slot-sync worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(worker_slot); + + memcpy(bgw.bgw_extra, &handle, sizeof(dsa_handle)); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + Assert(generation == worker->hdr.generation); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase %s.", "max_worker_processes"))); + return false; + } + + /* Now wait until it attaches. */ + return WaitForReplicationWorkerAttach((WorkerHeader *) worker, generation, + bgw_handle, + SlotSyncWorkerLock); +} + +/* + * Slot-sync workers remove obsolete DBs from db-list + * + * If the DBIds fetched from the primary are lesser than the ones being managed + * by slot-sync workers, remove extra dbs from worker's db-list. This may happen + * if some slots are removed on primary or 'synchronize_slot_names' have been + * changed by user. + */ +static void +slotsync_remove_obsolete_dbs(List *remote_dbs) +{ + ListCell *lc; + Oid *dbids; + int widx; + int dbidx; + int i; + + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + /* Traverse slot-sync-workers to validate the DBs */ + for (widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (!worker->hdr.in_use) + continue; + + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + for (dbidx = 0; dbidx < worker->dbcount;) + { + Oid wdbid = dbids[dbidx]; + bool found = false; + + /* Check if current DB is still present in remote-db-list */ + foreach(lc, remote_dbs) + { + WalRcvRepSlotDbData *slot_db_data = lfirst(lc); + + if (slot_db_data->database == wdbid) + { + found = true; + break; + } + } + + /* If not found, then delete this db from worker's db-list */ + if (!found) + { + for (i = dbidx; i < worker->dbcount; i++) + { + /* Shift the DBs and get rid of wdbid */ + if (i < (worker->dbcount - 1)) + dbids[i] = dbids[i + 1]; + } + + worker->dbcount--; + ereport(LOG, + (errmsg("Removed database %d from replication slot" + " synchronization worker %d", + wdbid, worker->slot))); + } + /* Else move to next db-position */ + else + { + dbidx++; + } + } + } + + LWLockRelease(SlotSyncWorkerLock); + + /* If dbcount for any worker has become 0, shut it down */ + for (widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (worker->hdr.in_use && !worker->dbcount) + { + int slot = worker->slot; + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + slotsync_worker_stop(worker); + LWLockRelease(SlotSyncWorkerLock); + + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + ereport(LOG, + (errmsg("Stopped replication slot-sync worker %d", + slot))); + } + } +} + +/* + * Connect to primary server for slotsync purpose and return the connection + * info. Disconnect previous connection if provided in wrconn_prev. + */ +static WalReceiverConn * +primary_connect(WalReceiverConn *wrconn_prev) +{ + WalReceiverConn *wrconn = NULL; + char *err; + + if (wrconn_prev) + walrcv_disconnect(wrconn_prev); + + if (!RecoveryInProgress()) + return NULL; + + if (max_slotsync_workers == 0) + return NULL; + + if (strcmp(synchronize_slot_names, "") == 0) + return NULL; + + wrconn = walrcv_connect(PrimaryConnInfo, false, false, + "Logical Replication Launcher", &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + return wrconn; +} + +/* + * Start slot-sync background workers. + * + * It connects to primary, get the list of DBIDs for slots configured in + * synchronize_slot_names. It then launces the slot-sync workers as per + * max_slotsync_workers and then assign the DBs equally to the workers + * launched. + */ +static void +ApplyLauncherStartSlotSync(long *wait_time, WalReceiverConn *wrconn) +{ + List *slots_dbs; + ListCell *lc; + MemoryContext tmpctx; + MemoryContext oldctx; + + if (max_slotsync_workers == 0) + return; + + if (strcmp(synchronize_slot_names, "") == 0) + return; + + Assert(wrconn); + + /* Use temporary context for the slot list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher Slot Sync ctx", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(tmpctx); + + /* + * TODO if we get any error in libpqrcv_get_dbinfo_for_logical_slots(), do + * not exit launcher, just skip slot-synchronization here. + */ + slots_dbs = walrcv_get_dbinfo_for_logical_slots(wrconn, + synchronize_slot_names); + + slotsync_remove_obsolete_dbs(slots_dbs); + + foreach(lc, slots_dbs) + { + WalRcvRepSlotDbData *slot_db_data = lfirst(lc); + SlotSyncWorker *w; + TimestampTz last_launch_tried; + TimestampTz now; + long elapsed; + + if (!OidIsValid(slot_db_data->database)) + continue; + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + w = slotsync_worker_find(slot_db_data->database); + LWLockRelease(SlotSyncWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. + */ + last_launch_tried = slot_db_data->last_launch_time; + now = GetCurrentTimestamp(); + if (last_launch_tried == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_launch_tried, now)) >= + wal_retrieve_retry_interval) + { + slot_db_data->last_launch_time = now; + + /* TODO: is userid correct? */ + slotsync_worker_launch_or_reuse(slot_db_data->database, + BOOTSTRAP_SUPERUSERID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(tmpctx); +} + +/* + * Start logical replication apply workers for enabled subscriptions. + */ +static void +ApplyLauncherStartSubs(long *wait_time) +{ + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases where + * a restart is expected (e.g., subscription parameter changes), + * another process should remove the last-start entry for the + * subscription so that the worker can be restarted without waiting + * for wal_retrieve_retry_interval to elapse. + */ + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + /* * Main loop for the apply launcher process. */ void ApplyLauncherMain(Datum main_arg) { + WalReceiverConn *wrconn = NULL; + ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -1139,79 +1878,22 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + load_file("libpqwalreceiver", false); + + wrconn = primary_connect(NULL); + /* Enter main loop */ for (;;) { int rc; - List *sublist; - ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - /* Use temporary context to avoid leaking memory across cycles. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* Start any missing workers for enabled subscriptions. */ - sublist = get_subscription_list(); - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; - TimestampTz last_start; - TimestampTz now; - long elapsed; - - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w != NULL) - continue; /* worker is running already */ - - /* - * If the worker is eligible to start now, launch it. Otherwise, - * adjust wait_time so that we'll wake up as soon as it can be - * started. - * - * Each subscription's apply worker can only be restarted once per - * wal_retrieve_retry_interval, so that errors do not cause us to - * repeatedly restart the worker as fast as possible. In cases - * where a restart is expected (e.g., subscription parameter - * changes), another process should remove the last-start entry - * for the subscription so that the worker can be restarted - * without waiting for wal_retrieve_retry_interval to elapse. - */ - last_start = ApplyLauncherGetWorkerStartTime(sub->oid); - now = GetCurrentTimestamp(); - if (last_start == 0 || - (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) - { - ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(WORKERTYPE_APPLY, - sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); - } - else - { - wait_time = Min(wait_time, - wal_retrieve_retry_interval - elapsed); - } - } - - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + if (!RecoveryInProgress()) + ApplyLauncherStartSubs(&wait_time); + else + ApplyLauncherStartSlotSync(&wait_time, wrconn); /* Wait for more work. */ rc = WaitLatch(MyLatch, @@ -1229,6 +1911,9 @@ ApplyLauncherMain(Datum main_arg) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* Reconnect in case primary_conninfo has changed */ + wrconn = primary_connect(wrconn); } } @@ -1260,7 +1945,8 @@ GetLeaderApplyWorkerPid(pid_t pid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid) + if (isParallelApplyWorker(w) && w->hdr.proc && + pid == w->hdr.proc->pid) { leader_pid = w->leader_pid; break; @@ -1298,13 +1984,13 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) memcpy(&worker, &LogicalRepCtx->workers[i], sizeof(LogicalRepWorker)); - if (!worker.proc || !IsBackendPid(worker.proc->pid)) + if (!worker.hdr.proc || !IsBackendPid(worker.hdr.proc->pid)) continue; if (OidIsValid(subid) && worker.subid != subid) continue; - worker_pid = worker.proc->pid; + worker_pid = worker.hdr.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); if (isTablesyncWorker(&worker)) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 197169d6b0..de318fb29c 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -202,6 +202,19 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ReplicationSlotAcquire(NameStr(*name), true); + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && MyReplicationSlot->data.synced) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("operation not permitted on replication slots on " + "standby which are synchronized from primary"))); + } + PG_TRY(); { /* restart at slot's confirmed_flush */ diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..7f257093ae --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1063 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby from the + * primary + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot-sync workers on physical standby + * to fetch logical replication slot information from the primary server + * (PrimaryConnInfo), create the slots on the standby, and synchronize + * them periodically. Slot-sync workers only synchronize slots configured + * in 'synchronize_slot_names'. + * + * It also takes care of dropping the slots which were created by it and are + * currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME before every next synchronization. + * If there is no activity observed on the primary for some time, the + * naptime is increased to WORKER_INACTIVITY_NAPTIME, but if any activity + * is observed, the naptime reverts to the default value. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool conflicting; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +List *sync_slot_names_list = NIL; + +/* Worker's naptime in case of regular activity on primary */ +#define WORKER_DEFAULT_NAPTIME 10L /* 10 ms */ + +/* Worker's naptime in case of no-activity on primary */ +#define WORKER_INACTIVITY_NAPTIME 10000L /* 10 sec */ + +/* + * Inactivity Threshold in ms before increasing naptime of worker. + * + * If the lsn of slot being monitored did not change for this threshold time, + * then increase naptime of current worker from WORKER_DEFAULT_NAPTIME to + * WORKER_INACTIVITY_NAPTIME. + */ +#define WORKER_INACTIVITY_THRESHOLD 1000L /* 1 sec */ + +/* + * Wait for remote slot to pass locally reserved position. + */ +static void +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name, + XLogRecPtr min_lsn) +{ + WalRcvExecResult *res; + TupleTableSlot *slot; + Oid slotRow[1] = {LSNOID}; + StringInfoData cmd; + bool isnull; + XLogRecPtr restart_lsn; + + for (;;) + { + int rc; + + CHECK_FOR_INTERRUPTS(); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT restart_lsn" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from" + " primary: %s", slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disappeared from the primary", + slot_name))); + + restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (restart_lsn >= min_lsn) + break; + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + ReplicationSlotMarkDirty(); +} + +/* + * Get list of local logical slot names which are synchronized from + * primary and belongs to one of the DBs passed in. + */ +static List * +get_local_synced_slot_names(Oid *dbids) +{ + List *localSyncedSlots = NIL; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && s->data.synced) + { + for (int j = 0; j < MySlotSyncWorker->dbcount; j++) + { + /* + * Add it to output list if this belongs to one of the + * worker's dbs. + */ + if (s->data.database == dbids[j]) + { + localSyncedSlots = lappend(localSyncedSlots, s); + break; + } + } + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return localSyncedSlots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if logical slot is locally invalidated i.e. invalided on + * standby but valid on primary. If found so, it sets locally_invalidated to + * true. + */ +static bool +slot_exists_locally(List *remote_slots, ReplicationSlot *local_slot, + bool *locally_invalidated) +{ + ListCell *cell; + + foreach(cell, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * if remote slot is marked as non-conflicting (i.e. not + * invalidated) but local slot is marked as invalidated, then set + * the bool. + */ + if (!remote_slot->conflicting && + SlotIsLogical(local_slot) && + local_slot->data.invalidated != RS_INVAL_NONE) + *locally_invalidated = true; + + return true; + } + } + + return false; +} + +/* + * Use slot_name in query. + * + * Check the dbid of the slot and if the dbid is one of the dbids managed by + * current worker, then use this slot-name in query to get the data from + * primary. If the slot is not created yet on standby (first time it is being + * queried), then too, use this slot in query. + */ +static bool +use_slot_in_query(char *slot_name, Oid *dbids) +{ + bool slot_found = false; + bool relevant_db = false; + ReplicationSlot *slot; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* Search for the local slot with the same name as slot_name */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && SlotIsLogical(s) && + (strcmp(NameStr(s->data.name), slot_name) == 0)) + { + slot_found = true; + slot = s; + break; + } + } + + /* Check if slot belongs to one of the input dbids */ + if (slot_found) + { + for (int j = 0; j < MySlotSyncWorker->dbcount; j++) + { + if (slot->data.database == dbids[j]) + { + relevant_db = true; + break; + } + } + } + + LWLockRelease(ReplicationSlotControlLock); + + /* + * Return TRUE if either slot is not yet created on standby or if it + * belongs to one of the dbs passed in dbids. + */ + if (!slot_found || relevant_db) + return true; + + return false; +} + + +/* + * Compute naptime for MySlotSyncWorker. + * + * The slot-sync worker takes a nap before it again checks for slots on primary. + * The time for each nap is computed here. + * + * The first slot managed by each worker is chosen for monitoring purpose. + * If the lsn of that slot changes during each sync-check time, then the + * naptime is kept at regular value of WORKER_DEFAULT_NAPTIME. + * When no lsn change is observed for contiguous WORKER_INACTIVITY_THRESHOLD + * time, then the naptime is increased to WORKER_INACTIVITY_NAPTIME. + * This naptime is brought back to WORKER_DEFAULT_NAPTIME as soon as lsn change + * is observed. + * + * The caller is supposed to ignore return-value of 0. The 0 value is returned + * for the slots other that slot being monitored. + */ +static long +compute_naptime(RemoteSlot *remote_slot) +{ + if (NameStr(MySlotSyncWorker->monitoring_info.slot_name)[0] == '\0') + { + /* + * First time, just update the name and lsn and return regular + * naptime. Start comparison from next time onward. + */ + strcpy(NameStr(MySlotSyncWorker->monitoring_info.slot_name), + remote_slot->name); + MySlotSyncWorker->monitoring_info.confirmed_lsn = + remote_slot->confirmed_lsn; + + MySlotSyncWorker->monitoring_info.last_update_time = + GetCurrentTimestamp(); + + return WORKER_DEFAULT_NAPTIME; + } + + /* If this is the slot being monitored by this worker, compute naptime */ + if (strcmp(remote_slot->name, + NameStr(MySlotSyncWorker->monitoring_info.slot_name)) == 0) + { + TimestampTz now = GetCurrentTimestamp(); + + /* + * If new received lsn (remote one) is different from what we have in + * our local slot, then update last_update_time. + */ + if (MySlotSyncWorker->monitoring_info.confirmed_lsn != + remote_slot->confirmed_lsn) + MySlotSyncWorker->monitoring_info.last_update_time = now; + + MySlotSyncWorker->monitoring_info.confirmed_lsn = + remote_slot->confirmed_lsn; + + /* If the inactivity time reaches the threshold, increase naptime */ + if (TimestampDifferenceExceeds(MySlotSyncWorker->monitoring_info.last_update_time, + now, WORKER_INACTIVITY_THRESHOLD)) + return WORKER_INACTIVITY_NAPTIME; + else + return WORKER_DEFAULT_NAPTIME; + } + + /* if it is not the slot being monitored, return 0 */ + return 0; +} + +/* + * Get Remote Slot's invalidation cause. + * + * This gets invalidation cause of remote slot. + */ +static ReplicationSlotInvalidationCause +get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name) +{ + WalRcvExecResult *res; + Oid slotRow[1] = {INT2OID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *slot; + ReplicationSlotInvalidationCause cause; + MemoryContext oldctx = CurrentMemoryContext; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "select pg_get_slot_invalidation_cause(%s)", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + pfree(cmd.data); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch invalidation cuase for slot \"%s\" from" + " primary: %s", slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disapeared from the primary", + slot_name))); + + cause = DatumGetInt16(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + return cause; +} + +/* + * Drop obsolete slots + * + * Drop the slots which no longer need to be synced i.e. these either + * do not exist on primary or are no longer part of synchronize_slot_names. + * + * Also drop the slots which are valid on primary and got invalidated + * on standby due to conflict (say required rows removed on primary). + * The assumption is, these will get recreated in next sync-cycle and + * it is okay to drop and recreate such slots as long as these are not + * consumable on standby (which is the case currently). + */ +static void +drop_obsolete_slots(Oid *dbids, List *remote_slot_list) +{ + List *local_slot_list = NIL; + ListCell *lc_slot; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* + * Get the list of local slots for dbids managed by this worker, so that + * those not on remote could be dropped. + */ + local_slot_list = get_local_synced_slot_names(dbids); + + foreach(lc_slot, local_slot_list) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc_slot); + bool local_exists = false; + bool locally_invalidated = false; + + local_exists = slot_exists_locally(remote_slot_list, local_slot, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!local_exists || locally_invalidated) + { + ReplicationSlotDrop(NameStr(local_slot->data.name), true); + + /* if this slot is being monitored, clean-up the monitoring info */ + if (strcmp(NameStr(local_slot->data.name), + NameStr(MySlotSyncWorker->monitoring_info.slot_name)) == 0) + { + MemSet(NameStr(MySlotSyncWorker->monitoring_info.slot_name), 0, NAMEDATALEN); + MySlotSyncWorker->monitoring_info.confirmed_lsn = 0; + MySlotSyncWorker->monitoring_info.last_update_time = 0; + + } + + elog(LOG, "Dropped replication slot \"%s\" ", + NameStr(local_slot->data.name)); + } + } +} + +/* + * Construct Slot Query + * + * It constructs the query using dbids array and sync_slot_names_list + * in order to get slots information from the primary. + */ +static bool +construct_slot_query(StringInfo s, Oid *dbids) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + appendStringInfo(s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, conflicting, " + " database FROM pg_catalog.pg_replication_slots" + " WHERE database IN "); + + appendStringInfoChar(s, '('); + for (int i = 0; i < MySlotSyncWorker->dbcount; i++) + { + char *dbname; + + if (i != 0) + appendStringInfoChar(s, ','); + + dbname = get_database_name(dbids[i]); + appendStringInfo(s, "%s", + quote_literal_cstr(dbname)); + pfree(dbname); + } + appendStringInfoChar(s, ')'); + + if (strcmp(synchronize_slot_names, "") != 0 && + strcmp(synchronize_slot_names, "*") != 0) + { + ListCell *lc; + bool first_slot = true; + + + foreach(lc, sync_slot_names_list) + { + char *slot_name = lfirst(lc); + + if (!use_slot_in_query(slot_name, dbids)) + continue; + + if (first_slot) + appendStringInfoString(s, " AND slot_name IN ("); + else + appendStringInfoChar(s, ','); + + appendStringInfo(s, "%s", + quote_literal_cstr(slot_name)); + first_slot = false; + } + + if (!first_slot) + appendStringInfoChar(s, ')'); + } + + return true; +} + +/* + * Synchronize single slot to given position. + * + * This creates new slot if there is no existing one and updates the + * metadata of existing slots as per the data received from the primary. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot) +{ + bool found = false; + + /* + * Make sure that concerned WAL is received before syncing slot to target + * lsn received from the primary. + */ + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd) + { + ereport(WARNING, + errmsg("skipping sync of slot \"%s\" as the received slot-sync " + "lsn %X/%X is ahead of the standby position %X/%X", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(WalRcv->latestWalEnd))); + return; + } + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (!s->in_use) + continue; + + if (strcmp(NameStr(s->data.name), remote_slot->name) == 0) + { + found = true; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + StartTransactionCommand(); + + /* Already existing slot, acquire */ + if (found) + { + ReplicationSlotAcquire(remote_slot->name, true); + + if (remote_slot->confirmed_lsn < MyReplicationSlot->data.confirmed_flush) + { + ereport(WARNING, + errmsg("not synchronizing slot %s; synchronization would move" + " it backward", remote_slot->name)); + + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } + + /* update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotSave(); + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase); + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + slot->data.synced = true; + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + if (remote_slot->confirmed_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) to pass " + "local slot LSN (%X/%X)", remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn))); + + wait_for_primary_slot_catchup(wrconn, remote_slot->name, + MyReplicationSlot->data.restart_lsn); + } + + + /* update lsns of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotPersist(); + } + + ReplicationSlotRelease(); + CommitTransactionCommand(); +} + +/* + * Synchronize slots. + * + * It looks into dbids array maintained in dsa and gets the logical slots info + * from the primary for the slots configured in synchronize_slot_names and + * belonging to concerned dbids. It then updates the slots locally as per the + * data received from the primary. It creates the slots if not present on the + * standby. + * + * It returns naptime for the next sync-cycle. + */ +static long +synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 8 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *slot; + StringInfoData s; + List *remote_slot_list = NIL; + MemoryContext oldctx = CurrentMemoryContext; + long naptime = WORKER_DEFAULT_NAPTIME; + long value; + Oid *dbids; + ListCell *cell; + + if (strcmp(synchronize_slot_names, "") == 0 || !WalRcv) + return naptime; + + /* primary_slot_name is not set yet */ + if (WalRcv->slotname[0] == '\0') + { + ereport(WARNING, + errmsg("skipping sync as primary_slot_name not set.")); + return naptime; + } + + /* WALs not received yet */ + if (XLogRecPtrIsInvalid(WalRcv->latestWalEnd)) + return naptime; + + /* + * No more writes to dbcount and dbids by launcher after this until we + * release this lock. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + + /* + * There is a small window between CHECK_FOR_INTERRUPTS done last and + * above lock acquiring, so there is a chance that synchronize_slot_names + * has changed making dbs assigned to this worker as invalid. In that + * case, launcher will make dbcount=0 and will send SIGINT to this worker. + * So check dbcount before proceeding. + */ + if (!MySlotSyncWorker->dbcount) + { + /* return and handle the interrupts in main loop */ + return false; + } + + + /* Get dbids from dsa */ + dbids = (Oid *) dsa_get_address(dsa, MySlotSyncWorker->dbids_dp); + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + /* Construct query to get slots info from the primary */ + initStringInfo(&s); + if (!construct_slot_query(&s, dbids)) + { + pfree(s.data); + CommitTransactionCommand(); + LWLockRelease(SlotSyncWorkerLock); + return naptime; + } + + elog(DEBUG2, "slot-sync worker%d's query:%s \n", MySlotSyncWorker->slot, + s.data); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info from the primary: %s", + res->err))); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for lsns and xmin if slot is + * invalidated on primary, so handle accordingly. + */ + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + if (isnull) + remote_slot->confirmed_lsn = InvalidXLogRecPtr; + + remote_slot->restart_lsn = DatumGetLSN(slot_getattr(slot, 4, &isnull)); + if (isnull) + remote_slot->restart_lsn = InvalidXLogRecPtr; + + remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 5, &isnull)); + if (isnull) + remote_slot->catalog_xmin = InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(slot, 6, &isnull)); + Assert(!isnull); + + remote_slot->conflicting = DatumGetBool(slot_getattr(slot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(slot, + 8, &isnull)); + Assert(!isnull); + + if (remote_slot->conflicting) + remote_slot->invalidated = get_remote_invalidation_cause(wrconn, + remote_slot->name); + else + remote_slot->invalidated = RS_INVAL_NONE; + + /* Create list of remote slot names to be used by drop_obsolete_slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + synchronize_one_slot(wrconn, remote_slot); + + /* + * Update naptime in case of non-zero value returned. The zero value + * is returned if remote_slot is not the one being monitored. + */ + value = compute_naptime(remote_slot); + if (value) + naptime = value; + + ExecClearTuple(slot); + } + + /* + * Drop local slots which no longer need to be synced i.e. these either do + * not exist on primary or are no longer part of synchronize_slot_names. + */ + drop_obsolete_slots(dbids, remote_slot_list); + + LWLockRelease(SlotSyncWorkerLock); + + /* We are done, free remot_slot_list elements */ + foreach(cell, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + pfree(remote_slot); + } + + walrcv_clear_result(res); + + return naptime; +} + +/* + * Initialize the list from raw synchronize_slot_names and cache it, in order + * to avoid parsing it repeatedly. Done at slot-sync worker startup and after + * each SIGHUP. + */ +static void +SlotSyncInitSlotNamesList() +{ + char *rawname; + + if (strcmp(synchronize_slot_names, "") != 0 && + strcmp(synchronize_slot_names, "*") != 0) + { + rawname = pstrdup(synchronize_slot_names); + SplitIdentifierString(rawname, ',', &sync_slot_names_list); + } +} + +/* + * Connect to remote (primary) server. + */ +static WalReceiverConn * +remote_connect() +{ + char *database; + WalReceiverConn *wrconn = NULL; + StringInfoData s; + char *err; + MemoryContext oldctx = CurrentMemoryContext; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + database = get_database_name(MyDatabaseId); + + /* + * Make connection to primary. + * + * TODO: Shall we connect to 'postgres' instead? or shall we take this + * input from user. + */ + initStringInfo(&s); + appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database); + pfree(database); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + wrconn = walrcv_connect(s.data, true, false, "slot-sync", &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + return wrconn; +} + +/* + * Reconnect to remote (primary) server if PrimaryConnInfo got changed. + */ +static WalReceiverConn * +reconnect_if_needed(WalReceiverConn *wrconn_prev, char *conninfo_prev) +{ + WalReceiverConn *wrconn = NULL; + + /* If no change in PrimaryConnInfo, return previous connection itself */ + if (strcmp(conninfo_prev, PrimaryConnInfo) == 0) + return wrconn_prev; + + walrcv_disconnect(wrconn); + wrconn = remote_connect(); + return wrconn; +} + +/* + * Interrupt handler for main loop of slot-sync worker. + */ +static bool +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn, char **conninfo_prev) +{ + bool reload_done = false; + + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + elog(LOG, "replication slot-sync worker %d is shutting" + " down on receiving SIGINT", MySlotSyncWorker->slot); + + /* + * TODO: we need to take care of dropping the slots belonging to dbids + * of this worker before exiting, for the case when all the dbids of + * this worker are obsoleted/dropped on primary and that is the reason + * for this worker's exit. + */ + walrcv_disconnect(wrconn); + proc_exit(0); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + + /* Save the PrimaryConnInfo before reloading */ + *conninfo_prev = pstrdup(PrimaryConnInfo); + + ProcessConfigFile(PGC_SIGHUP); + SlotSyncInitSlotNamesList(); + reload_done = true; + } + + return reload_done; +} + +/* + * Detach the worker from DSM and update 'proc' and 'in_use'. + * Logical replication launcher will come to know using these + * that the worker has shutdown. + */ +static void +slotsync_worker_detach(int code, Datum arg) +{ + dsa_detach((dsa_area *) DatumGetPointer(arg)); + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + MySlotSyncWorker->hdr.in_use = false; + MySlotSyncWorker->hdr.proc = NULL; + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * The main loop of our worker process. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + dsa_handle handle; + dsa_area *dsa; + WalReceiverConn *wrconn = NULL; + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Attach to the dynamic shared memory segment for the slot-sync worker + * and find its table of contents. + */ + memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsa_handle)); + dsa = dsa_attach(handle); + if (!dsa) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory " + "segment for slot-sync worker"))); + + /* Primary initialization is complete. Now attach to our slot. */ + slotsync_worker_attach(worker_slot); + + before_shmem_exit(slotsync_worker_detach, PointerGetDatum(dsa)); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* + * Connect to our database. We need database connection for walrcv_exec to + * work. Please see comments atop libpqrcv_exec. + * + * TODO: shall we connect to postgres instead? What if this dbid is + * dropped on primary. + */ + BackgroundWorkerInitializeConnectionByOid(MySlotSyncWorker->hdr.dbid, + MySlotSyncWorker->hdr.userid, + 0); + + StartTransactionCommand(); + elog(LOG, "replication slot-sync worker %d " + "started managing database \"%s\" (dbid: %d) ", + worker_slot, get_database_name(MySlotSyncWorker->hdr.dbid), + MySlotSyncWorker->hdr.dbid); + CommitTransactionCommand(); + + SlotSyncInitSlotNamesList(); + + /* connect to primary node */ + wrconn = remote_connect(); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime; + bool config_reloaded = false; + char *conninfo_prev; + + config_reloaded = ProcessSlotSyncInterrupts(wrconn, &conninfo_prev); + + /* Reconnect if primary_conninfo got changed */ + if (config_reloaded) + wrconn = reconnect_if_needed(wrconn, conninfo_prev); + + if (!RecoveryInProgress()) + proc_exit(0); + + naptime = synchronize_slots(dsa, wrconn); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + naptime, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + walrcv_disconnect(wrconn); + proc_exit(1); + } + } + + /* + * The slot-sync worker must not get here because it will only stop when + * it receives a SIGINT from the logical replication launcher, or when + * there is an error. None of these cases will allow the code to reach + * here. + */ + Assert(false); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e2cee92cf2..3a3e60ba44 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" @@ -246,7 +247,7 @@ wait_for_worker_state_change(char expected_state) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, false); - if (worker && worker->proc) + if (worker && worker->hdr.proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -535,7 +536,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* Signal the sync worker, as it may be waiting for us. */ - if (syncworker->proc) + if (syncworker->hdr.proc) logicalrep_worker_wakeup_ptr(syncworker); /* Now safe to release the LWLock */ @@ -588,10 +589,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) wal_retrieve_retry_interval)) { logicalrep_worker_launch(WORKERTYPE_TABLESYNC, - MyLogicalRepWorker->dbid, + MyLogicalRepWorker->hdr.dbid, MySubscription->oid, MySubscription->name, - MyLogicalRepWorker->userid, + MyLogicalRepWorker->hdr.userid, rstate->relid, DSM_HANDLE_INVALID); hentry->last_start_time = now; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 597947410f..21d9c7c691 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4574,8 +4574,8 @@ InitializeLogRepWorker(void) PGC_SUSET, PGC_S_OVERRIDE); /* Connect to our database. */ - BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, - MyLogicalRepWorker->userid, + BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->hdr.dbid, + MyLogicalRepWorker->hdr.userid, 0); /* diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..2b00bf845c 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -76,11 +76,12 @@ Node *replication_parse_result; %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_LIST_DBID_FOR_LOGICAL_SLOTS %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show + read_replication_slot timeline_history show list_dbid_for_logical_slots %type generic_option_list %type generic_option %type opt_timeline @@ -91,6 +92,7 @@ Node *replication_parse_result; %type opt_temporary %type create_slot_options create_slot_legacy_opt_list %type create_slot_legacy_opt +%type slot_name_list slot_name_list_opt %% @@ -114,6 +116,7 @@ command: | read_replication_slot | timeline_history | show + | list_dbid_for_logical_slots ; /* @@ -126,6 +129,33 @@ identify_system: } ; +slot_name_list: + IDENT + { + $$ = list_make1($1); + } + | slot_name_list ',' IDENT + { + $$ = lappend($1, $3); + } + +slot_name_list_opt: + slot_name_list { $$ = $1; } + | /* EMPTY */ { $$ = NIL; } + ; + +/* + * LIST_DBID_FOR_LOGICAL_SLOTS + */ +list_dbid_for_logical_slots: + K_LIST_DBID_FOR_LOGICAL_SLOTS slot_name_list_opt + { + ListDBForLogicalSlotsCmd *cmd = makeNode(ListDBForLogicalSlotsCmd); + cmd->slot_names = $2; + $$ = (Node *) cmd; + } + ; + /* * READ_REPLICATION_SLOT %s */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 1cc7fb858c..d4ecce6a47 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } +LIST_DBID_FOR_LOGICAL_SLOTS { return K_LIST_DBID_FOR_LOGICAL_SLOTS; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } @@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void) case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: + case K_LIST_DBID_FOR_LOGICAL_SLOTS: /* Yes; push back the first token so we can parse later. */ repl_pushed_back_token = first_token; return true; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 7370452d3b..364049b93f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -92,7 +92,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -317,6 +317,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.synced = false; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6035cf4816..a1bc090f58 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -225,6 +225,35 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * SQL function for getting invalidation cause of a slot. + * + * Returns ReplicationSlotInvalidationCause enum value for valid slot_name; + * returns NULL if slot with given name is not found. + * + * It return RS_INVAL_NONE if the given slot is not invalidated. + */ +Datum +pg_get_slot_invalidation_cause(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + int slotno; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[slotno]; + + if (strcmp(NameStr(s->data.name), NameStr(*name)) == 0) + { + PG_RETURN_INT16(s->data.invalidated); + } + } + LWLockRelease(ReplicationSlotControlLock); + + PG_RETURN_NULL(); +} + /* * pg_get_replication_slots - SQL SRF showing all replication slots * that currently exist on the database cluster. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c old mode 100644 new mode 100755 index 0148b59ed7..b39a2f1078 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -473,6 +473,119 @@ IdentifySystem(void) end_tup_output(tstate); } +static int +pg_qsort_namecmp(const void *a, const void *b) +{ + return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN); +} + +/* + * Handle the LIST_DBID_FOR_LOGICAL_SLOTS command. + * + * Given the slot-names, this function returns the list of database-ids + * corresponding to those slots. The returned list has no duplicates. + */ +static void +ListSlotDatabaseOIDs(ListDBForLogicalSlotsCmd *cmd) +{ + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + NameData *slot_names = NULL; + int numslot_names; + List *database_oids_list = NIL; + int slotno; + + numslot_names = list_length(cmd->slot_names); + if (numslot_names) + { + ListCell *lc; + int i = 0; + + slot_names = palloc(numslot_names * sizeof(NameData)); + foreach(lc, cmd->slot_names) + { + char *slot_name = lfirst(lc); + + ReplicationSlotValidateName(slot_name, ERROR); + namestrcpy(&slot_names[i++], slot_name); + } + + qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp); + } + + dest = CreateDestReceiver(DestRemoteSimple); + + /* Need a tuple descriptor representing a single column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "database_oid", + INT8OID, -1, 0); + + /* Prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + Oid datoid; /* Variable to store the database OID for each + * slot */ + Datum values[1]; + bool nulls[1]; + + if (!slot->in_use) + continue; + + SpinLockAcquire(&slot->mutex); + + datoid = slot->data.database; + + SpinLockRelease(&slot->mutex); + + /* + * If slot names were provided and the current slot name is not in the + * list, skip it. + */ + if (numslot_names && + !bsearch((void *) &slot->data.name, (void *) slot_names, + numslot_names, sizeof(NameData), pg_qsort_namecmp)) + continue; + + /* + * If slot names were provided and the current slot is a physical slot, + * error out. We are supposed to get only logical slots for sync-slot + * purpose. + */ + if (numslot_names && SlotIsPhysical(slot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("physical replication slot %s found in " + "synchronize_slot_names", NameStr(slot->data.name)))); + + /* + * Check if the database OID is already in the list, and if so, skip + * this slot. + */ + if (OidIsValid(datoid) && list_member_oid(database_oids_list, datoid)) + continue; + + /* Add the database OID to the list */ + database_oids_list = lappend_oid(database_oids_list, datoid); + + values[0] = Int64GetDatum(datoid); + nulls[0] = (datoid == InvalidOid); + + /* Send it to dest */ + do_tup_output(tstate, values, nulls); + } + LWLockRelease(ReplicationSlotControlLock); + + /* Clean up the list */ + list_free(database_oids_list); + + end_tup_output(tstate); +} + /* Handle READ_REPLICATION_SLOT command */ static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd) @@ -1990,6 +2103,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ListDBForLogicalSlotsCmd: + cmdtag = "LIST_DBID_FOR_LOGICAL_SLOTS"; + set_ps_display(cmdtag); + ListSlotDatabaseOIDs((ListDBForLogicalSlotsCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 315a78cda9..fd9e73a49b 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -190,6 +190,8 @@ static const char *const BuiltinTrancheNames[] = { "LogicalRepLauncherDSA", /* LWTRANCHE_LAUNCHER_HASH: */ "LogicalRepLauncherHash", + /* LWTRANCHE_SLOTSYNC_DSA: */ + "SlotSyncWorkerDSA", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906ce..e62a3f1bc0 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,4 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 WaitEventExtensionLock 48 +SlotSyncWorkerLock 49 diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index daf2d57d3d..adcf879040 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,7 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of worker for synchronizing slots to a standby from primary." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 864f80f328..4fae88af75 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -64,8 +64,11 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -3497,6 +3500,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slotsync_workers", + PGC_POSTMASTER, + REPLICATION_STANDBY, + gettext_noop("Maximum number of slot synchronization workers " + "on a standby."), + NULL, + }, + &max_slotsync_workers, + 2, 0, MAX_SLOTSYNC_WORKER_LIMIT, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index f1c87bd2d6..4f0bc2df28 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -357,6 +357,8 @@ #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery #synchronize_slot_names = '' # replication slot names to synchronize from # primary to streaming replication standby server +#max_slotsync_workers = 2 # maximum number of slot synchronization workers + # on a standby # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6118..947785af43 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11074,6 +11074,10 @@ proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', prosrc => 'pg_drop_replication_slot' }, +{ oid => '6312', descr => 'what caused the replication slot to become invalid', + proname => 'pg_get_slot_invalidation_cause', provolatile => 's', proisstrict => 't', + prorettype => 'int2', proargtypes => 'name', + prosrc => 'pg_get_slot_invalidation_cause' }, { oid => '3781', descr => 'information about replication slots currently in use', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 214dc6c29e..75b4b2040d 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -28,4 +29,7 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, + char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 4321ba8f86..bc9c1baea1 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ------------------------------- + * LIST_DBID_FOR_LOGICAL_SLOTS command + * ------------------------------- + */ +typedef struct ListDBForLogicalSlotsCmd +{ + NodeTag type; + List *slot_names; +} ListDBForLogicalSlotsCmd; /* ---------------------- * BASE_BACKUP command diff --git a/src/include/postmaster/bgworker_internals.h b/src/include/postmaster/bgworker_internals.h index 4ad63fd9bd..cccc5da8b2 100644 --- a/src/include/postmaster/bgworker_internals.h +++ b/src/include/postmaster/bgworker_internals.h @@ -22,6 +22,7 @@ * Maximum possible value of parallel workers. */ #define MAX_PARALLEL_WORKER_LIMIT 1024 +#define MAX_SLOTSYNC_WORKER_LIMIT 50 /* * List of background workers, private to postmaster. diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index a07c9cb311..891ce08cf1 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,8 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_slotsync_workers; + extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); @@ -31,4 +33,6 @@ extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); +extern PGDLLIMPORT char *PrimaryConnInfo; + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index bbd71d0b42..baad5a8f3a 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -19,6 +19,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); extern void TablesyncWorkerMain(Datum main_arg); +extern void ReplSlotSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 38c0072043..33a92e4879 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,7 +15,6 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" -#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -111,6 +110,11 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a slot created by a sync-slot worker? + */ + bool synced; } ReplicationSlotPersistentData; /* @@ -251,7 +255,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(bool is_shutdown); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..6d41d59ea5 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -20,6 +20,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -191,6 +192,16 @@ typedef struct } proto; } WalRcvStreamOptions; +/* + * Slot's DBid related data + */ +typedef struct WalRcvRepSlotDbData +{ + Oid database; /* Slot's DBid received from remote */ + TimestampTz last_launch_time; /* The last time we tried to launch sync + * worker for above Dbid */ +} WalRcvRepSlotDbData; + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; @@ -280,6 +291,15 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_logical_slots_fn + * + * Run LIST_DBID_FOR_LOGICAL_SLOTS on primary server to get the + * list of unique DBIDs for logical slots mentioned in 'slots' + */ +typedef List *(*walrcv_get_dbinfo_for_logical_slots_fn) (WalReceiverConn *conn, + const char *slots); + /* * walrcv_server_version_fn * @@ -393,6 +413,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbinfo_for_logical_slots_fn walrcv_get_dbinfo_for_logical_slots; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -417,6 +438,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbinfo_for_logical_slots(conn, slots) \ + WalReceiverFunctions->walrcv_get_dbinfo_for_logical_slots(conn, slots) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8f4bed0958..11638d40ec 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -36,11 +36,9 @@ typedef enum LogicalRepWorkerType WORKERTYPE_PARALLEL_APPLY } LogicalRepWorkerType; -typedef struct LogicalRepWorker +/* Common data for Slotsync and LogicalRep workers */ +typedef struct WorkerHeader { - /* What type of worker is this? */ - LogicalRepWorkerType type; - /* Time at which this worker was launched. */ TimestampTz launch_time; @@ -56,8 +54,19 @@ typedef struct LogicalRepWorker /* Database id to connect to. */ Oid dbid; - /* User to use for connection (will be same as owner of subscription). */ + /* + * User to use for connection (will be same as owner of subscription in + * case of LogicalRep worker). + */ Oid userid; +} WorkerHeader; + +typedef struct LogicalRepWorker +{ + WorkerHeader hdr; + + /* What type of worker is this? */ + LogicalRepWorkerType type; /* Subscription id for the worker. */ Oid subid; @@ -96,6 +105,40 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +/* + * Shared memory structure for Slot-Sync worker. It is allocated by logical + * replication launcher and then read by each slot-sync worker. + * + * It is protected by LWLock (SlotSyncWorkerLock). Each slot-sync worker + * reading it need to hold it in shared mode while logical replication + * launcher which needs to update it need to hold it in exclusive mode. + */ +typedef struct SlotSyncWorker +{ + WorkerHeader hdr; + + /* The slot in worker pool to which slot-sync worker is attached */ + int slot; + + /* Count of dbids slot-sync worker manages */ + uint32 dbcount; + + /* DSA for dbids */ + dsa_area *dbids_dsa; + + /* dsa_pointer for dbids slot-sync worker manages */ + dsa_pointer dbids_dp; + + /* Info about slot being monitored for worker's naptime purpose */ + struct SlotSyncWorkerWatchSlot + { + NameData slot_name; + XLogRecPtr confirmed_lsn; + TimestampTz last_update_time; + } monitoring_info; + +} SlotSyncWorker; + /* * State of the transaction in parallel apply worker. * @@ -234,12 +277,14 @@ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; /* Worker and subscription objects. */ extern PGDLLIMPORT Subscription *MySubscription; extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; +extern PGDLLIMPORT SlotSyncWorker *MySlotSyncWorker; extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); +extern void slotsync_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); @@ -327,9 +372,9 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->in_use && \ +#define isParallelApplyWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTablesyncWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) static inline bool @@ -341,14 +386,14 @@ am_tablesync_worker(void) static inline bool am_leader_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); } static inline bool am_parallel_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return isParallelApplyWorker(MyLogicalRepWorker); } diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index d77410bdea..334315acba 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -207,6 +207,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DATA, LWTRANCHE_LAUNCHER_DSA, LWTRANCHE_LAUNCHER_HASH, + LWTRANCHE_SLOTSYNC_DSA, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b5bbdd1608..7426c30f8c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1428,6 +1428,7 @@ LimitState LimitStateCond List ListCell +ListDBForLogicalSlotsCmd ListDictionary ListParsedLex ListenAction @@ -2305,6 +2306,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2560,6 +2562,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorker SlruCtl SlruCtlData SlruErrorCause @@ -3006,6 +3009,7 @@ WalLevel WalRcvData WalRcvExecResult WalRcvExecStatus +WalRcvRepSlotDbData WalRcvState WalRcvStreamOptions WalRcvWakeupReason @@ -3044,6 +3048,7 @@ WordEntryPosVector WordEntryPosVector1 WorkTableScan WorkTableScanState +WorkerHeader WorkerInfo WorkerInfoData WorkerInstrumentation -- 2.34.1