Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication |
Date | |
Msg-id | CAHut+Pu1sCC9UqN+BBi4sn-ev=SY4feECwicfodMxT=YZoe7jw@mail.gmail.com Whole thread Raw |
In response to | Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication (Melih Mutlu <m.melihmutlu@gmail.com>) |
Responses |
Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
|
List | pgsql-hackers |
Some review comments for v21-0001 ====== src/backend/replication/logical/worker.c 1. InitializeLogRepWorker if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); I think this should not be changed. IIUC that decision for using the generic worker name for translations was only when the errmsg was in shared code where the worker type was not clear from existing conditions. See also previous review comments [1]. ~~~ 2. StartLogRepWorker /* Common function to start the leader apply or tablesync worker. */ void StartLogRepWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* * We don't currently need any ResourceOwner in a walreceiver process, but * if we did, we could call CreateAuxProcessResourceOwner here. */ /* Initialise stats to a sanish value */ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); InitializeLogRepWorker(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); } ~ 2a. The function name seems a bit misleading because it is not really "starting" anything here - it is just more "initialization" code, right? Nor is it common to all kinds of LogRepWorker. Maybe the function could be named something else like 'InitApplyOrSyncWorker()'. -- see also #2c ~ 2b. Should this have Assert to ensure this is only called from leader apply or tablesync? -- see also #2c ~ 2c. IMO maybe the best/tidiest way to do this is not to introduce a new function at all. Instead, just put all this "common init" code into the existing "common init" function ('InitializeLogRepWorker') and execute it only if (am_tablesync_worker() || am_leader_apply_worker()) { }. ====== src/include/replication/worker_internal.h 3. extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void set_stream_options(WalRcvStreamOptions *options, + char *slotname, + XLogRecPtr *origin_startpos); + +extern void start_apply(XLogRecPtr origin_startpos); +extern void DisableSubscriptionAndExit(void); +extern void StartLogRepWorker(int worker_slot); This placement (esp. with the missing whitespace) seems to be grouping the set_stream_options with the other 'pa' externs, which are all under the comment "/* Parallel apply worker setup and interactions */". Putting all these up near the other "extern void InitializeLogRepWorker(void)" might be less ambiguous. ------ [1] worker name in errmsg - https://www.postgresql.org/message-id/CAA4eK1%2B%2BwkxxMjsPh-z2aKa9ZjNhKsjv0Tnw%2BTVX-hCBkDHusw%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: