Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication |
Date | |
Msg-id | CAHut+PvkwVfmuMsHmZgA_ZAgMP977_gbRG5TNUg1aUGBMvwapg@mail.gmail.com Whole thread Raw |
In response to | RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication ("Hayato Kuroda (Fujitsu)" <kuroda.hayato@fujitsu.com>) |
Responses |
RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
|
List | pgsql-hackers |
Here are some review comments for patch v16-00003 ====== 1. Commit Message. The patch description is missing. ====== 2. General. +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot) and +start_table_sync(XLogRecPtr *origin_startpos, + char **myslotname, + int worker_slot) and @@ -4548,12 +4552,13 @@ run_tablesync_worker(WalRcvStreamOptions *options, char *slotname, char *originname, int originname_size, - XLogRecPtr *origin_startpos) + XLogRecPtr *origin_startpos, + int worker_slot) It seems the worker_slot is being passed all over the place as an additional function argument so that it can be used to construct an application_name. Is it possible/better to introduce a new 'MyLogicalRepWorker' field for the 'worker_slot' so it does not have to be passed like this? ====== src/backend/replication/logical/tablesync.c 3. + /* + * Disconnect from publisher. Otherwise reused sync workers causes + * exceeding max_wal_senders. + */ + if (LogRepWorkerWalRcvConn != NULL) + { + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + } + Why is this comment mentioning anything about "reused workers" at all? The worker process exits in this function, right? ~~~ 4. LogicalRepSyncTableStart /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. + * Connect to publisher if not yet. The application_name must be also + * different from the leader apply worker because synchronous replication + * must distinguish them. */ I felt all the details in the 2nd part of this comment belong inside the condition, not outside. SUGGESTION /* Connect to the publisher if haven't done so already. */ ~~~ 5. + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * FIXME: set appropriate application_name. Previously, the slot name + * was used becasue the lifetime of the tablesync worker was same as + * that, but now the tablesync worker handles many slots during the + * synchronization so that it is not suitable. So what should be? + * Note that if the tablesync worker starts to reuse the replication + * slot during synchronization, we should use the slot name as + * application_name again. + */ + snprintf(application_name, NAMEDATALEN, "pg_%u_sync_%i", + MySubscription->oid, worker_slot); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, + must_use_password, + application_name, &err); + } 5a. /becasue/because/ ~ 5b. I am not sure about what name this should ideally use, but anyway for uniqueness doesn't it still need to include the GetSystemIdentifier() same as function ReplicationSlotNameForTablesync() was doing? Maybe this can use the same function ReplicationSlotNameForTablesync() can be used but just pass the worker_slot instead of the relid? ====== src/backend/replication/logical/worker.c 6. LogicalRepApplyLoop /* * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * protocol message, if needed. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + Maybe slightly reword the comment. BEFORE: Init the ApplyMessageContext which we clean up after each replication protocol message, if needed. AFTER: Init the ApplyMessageContext if needed. This context is cleaned up after each replication protocol message. ====== src/backend/replication/walsender.c 7. + /* + * Initialize the flag again because this streaming may be + * second time. + */ + streamingDoneSending = streamingDoneReceiving = false; Isn't this only possible to be 2nd time because the "reuse tablesync worker" might re-issue a START_REPLICATION again to the same WALSender? So, should this flag reset ONLY be done for the logical replication ('else' part), otherwise it should be asserted false? e.g. Would it be better to be like this? if (cmd->kind == REPLICATION_KIND_PHYSICAL) { Assert(!streamingDoneSending && !streamingDoneReceiving) StartReplication(cmd); } else { /* Reset flags because reusing tablesync workers can mean this is the second time here. */ streamingDoneSending = streamingDoneReceiving = false; StartLogicalReplication(cmd); } ------ Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: