From dd4e48c74e1898886ba85a93160c75cf4b953d47 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 1 Dec 2025 10:37:27 +0900 Subject: [PATCH v14 1/9] Introduce internal messages to track dependencies This patch introduces a new set of message WorkerInternalMsgType. These types of messages are generated by a leader worker and sent to parallel apply workers based on the needs. For now, two types of messages exist: WORKER_INTERNAL_MSG_DEPENDENCY and WORKER_INTERNAL_MSG_RELATION. WORKER_INTERNAL_MSG_DEPENDENCY ensures that dependent transactions are committed in the correct order. It has a list of transaction IDs that parallel workers must wait for. The message type would be generated when the leader detects a dependency between the current and other transactions, or just before the COMMIT message. The latter one is used to preserve the commit ordering between the publisher and the subscriber. WORKER_INTERNAL_MSG_RELATION is used to synchronize the relation information between the leader and parallel workers. It has a list of relations that the leader already knows, and parallel workers also update the relmap in response to the message. This type of message is generated when the leader allocates a new parallel worker to the transaction, or when the publisher sends additional RELATION messages. Author: Zhijie Hou Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 128 +++++++++++++++--- src/backend/replication/logical/worker.c | 24 +++- src/include/replication/logicalproto.h | 12 ++ src/include/replication/worker_internal.h | 10 ++ src/tools/pgindent/typedefs.list | 1 + 5 files changed, 157 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 798e8d85b3e..2432bb44655 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -733,6 +733,73 @@ ProcessParallelApplyInterrupts(void) } } +/* + * Handle internal dependency information. + * + * Wait for all transactions listed in the message to commit. + */ +static void +apply_handle_internal_dependency(StringInfo s) +{ + int nxids = pq_getmsgint(s, 4); + + for (int i = 0; i < nxids; i++) + { + TransactionId xid = pq_getmsgint(s, 4); + + pa_wait_for_depended_transaction(xid); + } +} + +/* + * Handle internal relation information. + * + * Update all relation details in the relation map cache. + */ +static void +apply_handle_internal_relation(StringInfo s) +{ + int num_rels; + + num_rels = pq_getmsgint(s, 4); + + for (int i = 0; i < num_rels; i++) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + + logicalrep_relmap_update(rel); + + /* Also reset all entries in the partition map that refer to remoterel. */ + logicalrep_partmap_reset_relmap(rel); + + elog(DEBUG1, "parallel apply worker worker init relmap for %s", + rel->relname); + } +} + +/* + * Handle an internal message generated by the leader apply worker. + */ +void +apply_handle_internal_message(StringInfo s) +{ + WorkerInternalMsgType action = pq_getmsgbyte(s); + + switch (action) + { + case WORKER_INTERNAL_MSG_DEPENDENCY: + apply_handle_internal_dependency(s); + break; + case WORKER_INTERNAL_MSG_RELATION: + apply_handle_internal_relation(s); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid worker internal message type \"??? (%d)\"", action))); + } +} + /* Parallel apply worker main loop. */ static void LogicalParallelApplyLoop(shm_mq_handle *mqh) @@ -779,26 +846,37 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) initReadOnlyStringInfo(&s, data, len); - /* - * The first byte of messages sent from leader apply worker to - * parallel apply workers can only be PqReplMsg_WALData. - */ c = pq_getmsgbyte(&s); - if (c != PqReplMsg_WALData) - elog(ERROR, "unexpected message \"%c\"", c); - /* - * Ignore statistics fields that have been updated by the leader - * apply worker. - * - * XXX We can avoid sending the statistics fields from the leader - * apply worker but for that, it needs to rebuild the entire - * message by removing these fields which could be more work than - * simply ignoring these fields in the parallel apply worker. - */ - s.cursor += SIZE_STATS_MESSAGE; + if (c == PqReplMsg_WALData) + { + /* + * Ignore statistics fields that have been updated by the + * leader apply worker. + * + * XXX We can avoid sending the statistics fields from the + * leader apply worker but for that, it needs to rebuild the + * entire message by removing these fields which could be more + * work than simply ignoring these fields in the parallel apply + * worker. + */ + s.cursor += SIZE_STATS_MESSAGE; - apply_dispatch(&s); + apply_dispatch(&s); + } + else if (c == PARALLEL_APPLY_INTERNAL_MESSAGE) + { + /* Handle the internal message. */ + apply_handle_internal_message(&s); + } + else + { + /* + * The first byte of messages sent from leader apply worker to + * parallel apply workers can only be 'w' or 'i'. + */ + elog(ERROR, "unexpected message \"%c\"", c); + } } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { @@ -1647,3 +1725,19 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) pa_free_worker(winfo); } + +/* + * Wait for the given transaction to finish. + */ +void +pa_wait_for_depended_transaction(TransactionId xid) +{ + elog(DEBUG1, "wait for depended xid %u", xid); + + for (;;) + { + /* XXX wait until given transaction is finished */ + } + + elog(DEBUG1, "finish waiting for depended xid %u", xid); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b38170f0fbe..ba09822820d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2260,6 +2260,25 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, path); } +/* + * Check if the given message is an internal message for the parallel apply + * worker. + */ +static bool +is_worker_internal_message(StringInfo s) +{ + int msgtype; + + msgtype = pq_getmsgbyte(s); + + if (msgtype == PARALLEL_APPLY_INTERNAL_MESSAGE) + return true; + + /* Reset the cursor so other consumers can identify the type */ + s->cursor = 0; + return false; +} + /* * Common spoolfile processing. */ @@ -2357,7 +2376,10 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, /* Ensure we are reading the data into our memory context. */ oldcxt = MemoryContextSwitchTo(ApplyMessageContext); - apply_dispatch(&s2); + if (is_worker_internal_message(&s2)) + apply_handle_internal_message(&s2); + else + apply_dispatch(&s2); MemoryContextReset(ApplyMessageContext); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 058a955e20c..9c5530804c8 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -77,6 +77,18 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_STREAM_PREPARE = 'p', } LogicalRepMsgType; +/* + * Worker internal message types + * + * This type of messages would be generated by leader apply worker and sent to + * the parallel apply worker. + */ +typedef enum WorkerInternalMsgType +{ + WORKER_INTERNAL_MSG_DEPENDENCY = 'd', + WORKER_INTERNAL_MSG_RELATION = 'i', +} WorkerInternalMsgType; + /* * This struct stores a tuple received via logical replication. * Keep in mind that the columns correspond to the *remote* table. diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 745b7d9e969..21562195e19 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -332,6 +332,8 @@ extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); extern void pa_detach_all_error_mq(void); +extern void apply_handle_internal_message(StringInfo s); + extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, @@ -359,6 +361,8 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void pa_wait_for_depended_transaction(TransactionId xid); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTableSyncWorker(worker) ((worker)->in_use && \ @@ -366,6 +370,12 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isSequenceSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_SEQUENCESYNC) +/* + * The message code for internal messages sent by the leader apply worker to the + * parallel apply worker. + */ +#define PARALLEL_APPLY_INTERNAL_MESSAGE 'i' + static inline bool am_tablesync_worker(void) { diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ea95e7984bc..86930f93cad 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3460,6 +3460,7 @@ WorkTableScan WorkTableScanState WorkerInfo WorkerInfoData +WorkerInternalMsgType WorkerJobDumpPtrType WorkerJobRestorePtrType WorkerNodeInstrumentation -- 2.43.7