From 81c38df18c7038dcfcabcc396e484d16b4680d51 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Mon, 5 Jun 2023 15:04:41 +0300 Subject: [PATCH 1/4] Refactor to split Apply and Tablesync Workers Both apply and tablesync workers were using ApplyWorkerMain() as entry point. As the name implies, ApplyWorkerMain() should be considered as the main function for apply workers. Tablesync worker's path was hidden and does not have enough in common to share the same main function with apply worker. Also; most of the code shared by both worker types are already combined in LogicalRepApplyLoop(). There is no need to combine the rest in ApplyWorkerMain() anymore. This commit introduces TablesyncWorkerMain() as a new entry point for tablesync workers and separates both type of workers from each other. This aims to increase code readability and help to maintain logical replication workers separately. Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com --- src/backend/postmaster/bgworker.c | 3 + .../replication/logical/applyparallelworker.c | 2 +- src/backend/replication/logical/launcher.c | 25 +- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 381 +++++++++++------- src/include/replication/logicalworker.h | 1 + src/include/replication/worker_internal.h | 4 +- 7 files changed, 258 insertions(+), 160 deletions(-) diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 0dd22b2351..5609919edf 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,6 +131,9 @@ static const struct }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain + }, + { + "TablesyncWorkerMain", TablesyncWorkerMain } }; diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 82c1ddcdcb..f16e2377bf 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -942,7 +942,7 @@ ParallelApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = 0; - InitializeApplyWorker(); + InitializeLogRepWorker(); InitializingApplyWorker = false; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 87b5593d2d..c2bba3ba69 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -459,24 +459,27 @@ retry: snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); if (is_parallel_apply_worker) + { snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - else - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - - if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u sync %u", subid, relid); - else if (is_parallel_apply_worker) + "logical replication parallel apply worker for subscription %u", subid); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication parallel apply worker for subscription %u", subid); + } + else if (OidIsValid(relid)) + { + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication tablesync worker for subscription %u sync %u", subid, relid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); + } else + { + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication apply worker for subscription %u", subid); - - if (is_parallel_apply_worker) - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); - else - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker"); + } bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6d461654ab..8125bbd170 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -131,7 +131,7 @@ static StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. */ -static void +void pg_attribute_noreturn() finish_sync_worker(void) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0ee764d68f..b979a755ae 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -392,6 +392,7 @@ static void stream_open_file(Oid subid, TransactionId xid, static void stream_write_change(char action, StringInfo s); static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); +static void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -4330,6 +4331,69 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s) stream_stop_internal(xid); } + /* set_stream_options + * Set logical replication streaming options. + * + * This function sets streaming options including replication slot name + * and origin start position. Workers need these options for logical replication. + */ +static void +set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos) +{ + int server_version; + + options->logical = true; + options->startpoint = *origin_startpos; + options->slotname = slotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); + options->proto.logical.proto_version = + server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + + options->proto.logical.publication_names = MySubscription->publications; + options->proto.logical.binary = MySubscription->binary; + options->proto.logical.twophase = false; + options->proto.logical.origin = pstrdup(MySubscription->origin); + + /* + * Assign the appropriate option value for streaming option according to + * the 'streaming' mode and the publisher's ability to support that mode. + */ + if (server_version >= 160000 && + MySubscription->stream == LOGICALREP_STREAM_PARALLEL) + { + options->proto.logical.streaming_str = "parallel"; + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != LOGICALREP_STREAM_OFF) + { + options->proto.logical.streaming_str = "on"; + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options->proto.logical.streaming_str = NULL; + MyLogicalRepWorker->parallel_apply = false; + } + + /* + * Even when the two_phase mode is requested by the user, it remains as + * the tri-state PENDING until all tablesyncs have reached READY state. + * Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + options->proto.logical.twophase = true; +} + /* * Cleanup the memory for subxacts and reset the related variables. */ @@ -4442,13 +4506,134 @@ start_apply(XLogRecPtr origin_startpos) } /* - * Common initialization for leader apply worker and parallel apply worker. + * Runs the tablesync worker. + * It starts syncing tables. After a successful sync, + * sets streaming options and starts streaming to catchup. + */ +static void +run_tablesync_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + /* Start table synchronization. */ + start_table_sync(origin_startpos, &slotname); + + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + originname_size); + + set_apply_error_context_origin(originname); + + set_stream_options(options, slotname, origin_startpos); + + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + + /* Start applying changes to catchup. */ + start_apply(*origin_startpos); +} + +/* + * Runs the leader apply worker. + * It sets up replication origin, streaming options + * and then starts streaming. + */ +static void +run_apply_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + bool must_use_password; + + slotname = MySubscription->slotname; + + /* + * This shouldn't happen if the subscription is enabled, but guard + * against DDL bugs or manual catalog changes. (libpqwalreceiver will + * crash if slot is NULL.) + */ + if (!slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); + + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, originname_size); + + /* Setup replication origin tracking. */ + StartTransactionCommand(); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; + *origin_startpos = replorigin_session_get_progress(false); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !superuser_arg(MySubscription->owner); + + /* Note that the superuser_arg call can access the DB */ + CommitTransactionCommand(); + + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + must_use_password, + MySubscription->name, &err); + + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything but it + * does some initializations on the upstream so let's still call it. + */ + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + set_apply_error_context_origin(originname); + + set_stream_options(options, slotname, origin_startpos); + + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); + } + + ereport(DEBUG1, + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + + /* Run the main loop. */ + start_apply(*origin_startpos); +} + +/* + * Common initialization for logical replication workers; leader apply worker, + * parallel apply worker and tablesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. */ void -InitializeApplyWorker(void) +InitializeLogRepWorker(void) { MemoryContext oldctx; @@ -4512,7 +4697,8 @@ InitializeApplyWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + (errmsg("%s for subscription \"%s\", table \"%s\" has started", + get_worker_name(), MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); else @@ -4533,7 +4719,6 @@ ApplyWorkerMain(Datum main_arg) XLogRecPtr origin_startpos = InvalidXLogRecPtr; char *myslotname = NULL; WalRcvStreamOptions options; - int server_version; InitializingApplyWorker = true; @@ -4557,7 +4742,7 @@ ApplyWorkerMain(Datum main_arg) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - InitializeApplyWorker(); + InitializeLogRepWorker(); InitializingApplyWorker = false; @@ -4565,165 +4750,69 @@ ApplyWorkerMain(Datum main_arg) elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - if (am_tablesync_worker()) - { - start_table_sync(&origin_startpos, &myslotname); - - ReplicationOriginNameForLogicalRep(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - set_apply_error_context_origin(originname); - } - else - { - /* This is the leader apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - bool must_use_password; - - myslotname = MySubscription->slotname; - - /* - * This shouldn't happen if the subscription is enabled, but guard - * against DDL bugs or manual catalog changes. (libpqwalreceiver will - * crash if slot is NULL.) - */ - if (!myslotname) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription has no replication slot set"))); - - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, - originname, sizeof(originname)); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - - /* Note that the superuser_arg call can access the DB */ - CommitTransactionCommand(); - - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, - MySubscription->name, &err); - if (LogRepWorkerWalRcvConn == NULL) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to the publisher: %s", err))); - - /* - * We don't really use the output identify_system for anything but it - * does some initializations on the upstream so let's still call it. - */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); - - set_apply_error_context_origin(originname); - } - /* * Setup callback for syscache so that we know when something changes in - * the subscription relation state. + * the subscription relation state. Do this outside the loop to avoid + * exceeding MAX_SYSCACHE_CALLBACKS */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); - /* Build logical replication streaming options. */ - options.logical = true; - options.startpoint = origin_startpos; - options.slotname = myslotname; + /* This is leader apply worker */ + run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); - server_version = walrcv_server_version(LogRepWorkerWalRcvConn); - options.proto.logical.proto_version = - server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : - server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : - server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : - LOGICALREP_PROTO_VERSION_NUM; + proc_exit(0); +} - options.proto.logical.publication_names = MySubscription->publications; - options.proto.logical.binary = MySubscription->binary; +/* Logical Replication Tablesync worker entry point */ +void +TablesyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *myslotname = NULL; + WalRcvStreamOptions options; + + /* Attach to slot */ + logicalrep_worker_attach(worker_slot); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); /* - * Assign the appropriate option value for streaming option according to - * the 'streaming' mode and the publisher's ability to support that mode. + * We don't currently need any ResourceOwner in a walreceiver process, but + * if we did, we could call CreateAuxProcessResourceOwner here. */ - if (server_version >= 160000 && - MySubscription->stream == LOGICALREP_STREAM_PARALLEL) - { - options.proto.logical.streaming_str = "parallel"; - MyLogicalRepWorker->parallel_apply = true; - } - else if (server_version >= 140000 && - MySubscription->stream != LOGICALREP_STREAM_OFF) - { - options.proto.logical.streaming_str = "on"; - MyLogicalRepWorker->parallel_apply = false; - } - else - { - options.proto.logical.streaming_str = NULL; - MyLogicalRepWorker->parallel_apply = false; - } - options.proto.logical.twophase = false; - options.proto.logical.origin = pstrdup(MySubscription->origin); + /* Initialise stats to a sanish value */ + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = + MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); - if (!am_tablesync_worker()) - { - /* - * Even when the two_phase mode is requested by the user, it remains - * as the tri-state PENDING until all tablesyncs have reached READY - * state. Only then, can it become ENABLED. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); - StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; - CommitTransactionCommand(); - } - else - { - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + InitializeLogRepWorker(); - ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", - MySubscription->name, - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : - "?"))); - } - else - { - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + /* Connect to the origin and start the replication. */ + elog(DEBUG1, "connecting to publisher using connection string \"%s\"", + MySubscription->conninfo); - /* Run the main loop. */ - start_apply(origin_startpos); + /* + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. Do this outside the loop to avoid + * exceeding MAX_SYSCACHE_CALLBACKS + */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + invalidate_syncing_table_states, + (Datum) 0); - proc_exit(0); + run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); + + finish_sync_worker(); } /* diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 39588da79f..bbd71d0b42 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,6 +18,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); +extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 343e781896..7aba034774 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -265,7 +265,7 @@ extern void maybe_reread_subscription(void); extern void stream_cleanup_files(Oid subid, TransactionId xid); -extern void InitializeApplyWorker(void); +extern void InitializeLogRepWorker(void); extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); @@ -307,6 +307,8 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid) +extern void finish_sync_worker(void); + static inline bool am_tablesync_worker(void) { -- 2.25.1