From 2d9d0d564e022670106f8f00abd77c4d86474b20 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 4 Jul 2023 22:13:52 +0300 Subject: [PATCH v17 3/5] Reuse connection when tablesync workers change the target Previously tablesync workers establish new connections when it changes the syncing table, but this might have additional overhead. This patch allows to reuse connections instead. As for the publisher node, this patch allows to reuse logical walsender processes after the streaming is done once. --- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/tablesync.c | 56 ++++++++++++++------- src/backend/replication/logical/worker.c | 21 ++++---- src/backend/replication/walsender.c | 8 +++ src/include/replication/worker_internal.h | 3 ++ 5 files changed, 61 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 72e5ef8a78..945619b603 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -441,6 +441,7 @@ retry: worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; worker->is_sync_completed = false; + worker->worker_slot = slot; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 605c5bd4ec..d455d97f2f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -144,16 +144,6 @@ clean_sync_worker(void) pgstat_report_stat(true); } - /* - * Disconnect from publisher. Otherwise reused sync workers causes - * exceeding max_wal_senders - */ - if (LogRepWorkerWalRcvConn != NULL) - { - walrcv_disconnect(LogRepWorkerWalRcvConn); - LogRepWorkerWalRcvConn = NULL; - } - /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); } @@ -1259,6 +1249,24 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, relid, GetSystemIdentifier()); } +/* + * Determine the application_name for tablesync workers. + * + * FIXME: set appropriate application_name. Previously, the slot name was used + * because the lifetime of the tablesync worker was same as that, but now the + * tablesync worker handles many slots during the synchronization so that it is + * not suitable. So what should be? Note that if the tablesync worker starts to + * reuse the replication slot during synchronization, we should use the slot + * name as application_name again. + */ +static void +ApplicationNameForTablesync(Oid suboid, int worker_slot, + char *application_name, Size szapp) +{ + snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid, + worker_slot, GetSystemIdentifier()); +} + /* * Start syncing the table in the sync worker. * @@ -1320,15 +1328,25 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) slotname, NAMEDATALEN); - /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. - */ - LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, - must_use_password, - slotname, &err); + /* Connect to the publisher if haven't done so already. */ + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * The application_name must be also different from the leader apply + * worker because synchronous replication must distinguish them. + */ + ApplicationNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->worker_slot, + application_name, + NAMEDATALEN); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, + must_use_password, + application_name, &err); + } + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eae561db05..4eb67ebd26 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3499,20 +3499,22 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ErrorContextCallback errcallback; /* - * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * Init the ApplyMessageContext if needed. This context is cleaned up + * after each replication protocol message. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* * This memory context is used for per-stream data when the streaming mode * is enabled. This context is reset on each stream stop. */ - LogicalStreamingContext = AllocSetContextCreate(ApplyContext, - "LogicalStreamingContext", - ALLOCSET_DEFAULT_SIZES); + if (!LogicalStreamingContext) + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -4468,7 +4470,8 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) * are not repeatable. */ static void -start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +start_table_sync(XLogRecPtr *origin_startpos, + char **myslotname) { char *syncslotname = NULL; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..a6de2de209 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1827,10 +1827,18 @@ exec_replication_command(const char *cmd_string) set_ps_display(cmdtag); PreventInTransactionBlock(true, cmdtag); + if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else + { + /* + * Reset flags because reusing tablesync workers can mean + * this is the second time here. + */ + streamingDoneSending = streamingDoneReceiving = false; StartLogicalReplication(cmd); + } /* dupe, but necessary per libpqrcv_endstreaming */ EndReplicationCommand(cmdtag); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1e9f8e6e72..32783a8cdd 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -62,6 +62,9 @@ typedef struct LogicalRepWorker */ bool is_sync_completed; + /* Indicates the slot number which corresponds to this LogicalRepWorker. */ + int worker_slot; + /* * Used to create the changes and subxact files for the streaming * transactions. Upon the arrival of the first streaming transaction or -- 2.27.0