From f314184765c5180b26cea8ffba2038941e4b199c Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Thu, 15 Jun 2023 12:19:49 +1000 Subject: [PATCH v1] Consistent naming of LR workers --- .../replication/logical/applyparallelworker.c | 34 +++++++++++++++------- src/backend/replication/logical/launcher.c | 6 ++-- src/backend/replication/logical/tablesync.c | 8 +++-- src/backend/replication/logical/worker.c | 27 +++++++++++------ src/include/replication/worker_internal.h | 4 +++ 5 files changed, 54 insertions(+), 25 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 82c1ddc..979b1d4 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -716,7 +716,9 @@ ProcessParallelApplyInterrupts(void) if (ShutdownRequestPending) { ereport(LOG, - (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished", + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" has finished", + LR_WORKER_NAME_APPLY_PARALLEL, MySubscription->name))); proc_exit(0); @@ -821,8 +823,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) Assert(shmq_res == SHM_MQ_DETACHED); ereport(ERROR, + /* translator: first %s is the name of logical replication worker */ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("lost connection to the logical replication apply worker"))); + errmsg("lost connection to the %s", LR_WORKER_NAME_APPLY))); } MemoryContextReset(ApplyMessageContext); @@ -1024,9 +1027,9 @@ HandleParallelApplyMessage(StringInfo msg) */ if (edata.context) edata.context = psprintf("%s\n%s", edata.context, - _("logical replication parallel apply worker")); + LR_WORKER_NAME_APPLY_PARALLEL); else - edata.context = pstrdup(_("logical replication parallel apply worker")); + edata.context = pstrdup(LR_WORKER_NAME_APPLY_PARALLEL); /* * Context beyond that should use the error context callbacks @@ -1040,7 +1043,8 @@ HandleParallelApplyMessage(StringInfo msg) */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication parallel apply worker exited due to error"), + /* translator: first %s is the name of logical replication worker */ + errmsg("%s exited due to error", LR_WORKER_NAME_APPLY_PARALLEL), errcontext("%s", edata.context))); } @@ -1054,7 +1058,9 @@ HandleParallelApplyMessage(StringInfo msg) break; default: - elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)", + /* translator: first %s is the name of logical replication worker */ + elog(ERROR, "unrecognized message type received %s: %c (message length %d bytes)", + LR_WORKER_NAME_APPLY_PARALLEL, msgtype, msg->len); } } @@ -1126,8 +1132,9 @@ HandleParallelApplyMessages(void) } else ereport(ERROR, + /* translator: first %s is the name of logical replication worker */ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("lost connection to the logical replication parallel apply worker"))); + errmsg("lost connection to the %s", LR_WORKER_NAME_APPLY_PARALLEL))); } MemoryContextSwitchTo(oldcontext); @@ -1215,7 +1222,9 @@ pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked) { ereport(LOG, - (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s will serialize the remaining changes of remote transaction %u to a file", + LR_WORKER_NAME_APPLY, winfo->shared->xid))); /* @@ -1299,8 +1308,9 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) */ if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED) ereport(ERROR, + /* translator: first %s is the name of logical replication worker */ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("lost connection to the logical replication parallel apply worker"))); + errmsg("lost connection to the %s", LR_WORKER_NAME_APPLY_PARALLEL))); } /* @@ -1373,7 +1383,8 @@ pa_start_subtrans(TransactionId current_xid, TransactionId top_xid) pa_savepoint_name(MySubscription->oid, current_xid, spname, sizeof(spname)); - elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname); + /* translator: second %s is the name of logical replication worker */ + elog(DEBUG1, "defining savepoint %s in %s", LR_WORKER_NAME_APPLY_PARALLEL, spname); /* We must be in transaction block to define the SAVEPOINT. */ if (!IsTransactionBlock()) @@ -1468,7 +1479,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname)); - elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname); + /* translator: second %s is the name of logical replication worker */ + elog(DEBUG1, "rolling back to savepoint %s in %s", LR_WORKER_NAME_APPLY_PARALLEL, spname); /* * Search the subxactlist, determine the offset tracked for the diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 87b5593..ee47793 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -465,13 +465,13 @@ retry: if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u sync %u", subid, relid); + "%s for subscription %u sync %u", LR_WORKER_NAME_TABLESYNC, subid, relid); else if (is_parallel_apply_worker) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication parallel apply worker for subscription %u", subid); + "%s for subscription %u", LR_WORKER_NAME_APPLY_PARALLEL, subid); else snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication apply worker for subscription %u", subid); + "%s for subscription %u", LR_WORKER_NAME_APPLY, subid); if (is_parallel_apply_worker) snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index abae8d4..a410520 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -150,7 +150,9 @@ finish_sync_worker(void) StartTransactionCommand(); ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\", table \"%s\" has finished", + LR_WORKER_NAME_TABLESYNC, MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); @@ -619,7 +621,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (AllTablesyncsReady()) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" will restart so that two_phase can be enabled", + LR_WORKER_NAME_APPLY, MySubscription->name))); should_exit = true; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0ee764d..b8c9eba 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -442,11 +442,11 @@ static const char * get_worker_name(void) { if (am_tablesync_worker()) - return _("logical replication table synchronization worker"); + return LR_WORKER_NAME_TABLESYNC; else if (am_parallel_apply_worker()) - return _("logical replication parallel apply worker"); + return LR_WORKER_NAME_APPLY_PARALLEL; else - return _("logical replication apply worker"); + return LR_WORKER_NAME_APPLY; } /* @@ -509,7 +509,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) rel->state != SUBREL_STATE_UNKNOWN) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", + /* translator: first %s is the name of logical replication worker */ + errmsg("%s for subscription \"%s\" will stop", + LR_WORKER_NAME_APPLY_PARALLEL, MySubscription->name), errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized."))); @@ -1071,7 +1073,8 @@ apply_handle_begin_prepare(StringInfo s) if (am_tablesync_worker()) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); + errmsg_internal("%s received a BEGIN PREPARE message", + LR_WORKER_NAME_TABLESYNC))); /* There must not be an active streaming transaction. */ Assert(!TransactionIdIsValid(stream_xid)); @@ -1310,7 +1313,8 @@ apply_handle_stream_prepare(StringInfo s) if (am_tablesync_worker()) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("tablesync worker received a STREAM PREPARE message"))); + errmsg_internal("%s received a STREAM PREPARE message", + LR_WORKER_NAME_TABLESYNC))); logicalrep_read_stream_prepare(s, &prepare_data); set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn); @@ -3950,7 +3954,9 @@ maybe_reread_subscription(void) { if (am_parallel_apply_worker()) ereport(LOG, - (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change", + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" will stop because of a parameter change", + LR_WORKER_NAME_APPLY_PARALLEL, MySubscription->name))); else ereport(LOG, @@ -4512,7 +4518,9 @@ InitializeApplyWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\", table \"%s\" has started", + LR_WORKER_NAME_TABLESYNC, MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); else @@ -4707,7 +4715,8 @@ ApplyWorkerMain(Datum main_arg) } ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + (errmsg_internal("%s for subscription \"%s\" two_phase is %s", + LR_WORKER_NAME_APPLY, MySubscription->name, MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 343e781..aeb26dd 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -26,6 +26,10 @@ #include "storage/shm_toc.h" #include "storage/spin.h" +/* Names for the different kinds of logical replication workers. */ +#define LR_WORKER_NAME_TABLESYNC _("logical replication table synchronization worker") +#define LR_WORKER_NAME_APPLY _("logical replication apply worker") +#define LR_WORKER_NAME_APPLY_PARALLEL _("logical replication parallel apply worker") typedef struct LogicalRepWorker { -- 1.8.3.1