From 6b1fe98c047ceab654933ef0d6f2eeded5c74276 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 1 Dec 2025 10:37:27 +0900 Subject: [PATCH v17 01/10] Introduce internal messages to track dependencies This patch introduces a new set of internal worker message types (see PAWorkerMsgType). These types of messages are generated by a leader worker and sent to parallel apply workers based on the needs. For now, two types of messages exist: PA_MSG_XACT_DEPENDENCY and PA_MSG_RELMAP. PA_MSG_XACT_DEPENDENCY ensures that dependent transactions are committed in the correct order. It has a list of transaction IDs that parallel workers must wait for. This type of message is generated when the leader detects a dependency between the current and other transactions, or just before the COMMIT message. The latter one is used to preserve the commit ordering between the publisher and the subscriber. PA_MSG_RELMAP is used to synchronize the remote relation information between the leader and parallel workers. It has a list of relations that the leader already knows, and parallel workers also update the relmap in response to the message. This type of message is generated when the leader allocates a new parallel worker to the transaction, or when the publisher sends additional RELATION messages. This synchronization is necessary for parallel apply workers to map local replication target relations to their remote counterparts during change application. Since the walsender does not send remote relation information with every transaction, the parallel apply worker may not have up-to-date relation info unless synchronized by the leader. Author: Zhijie Hou Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 142 +++++++++++++++--- src/backend/replication/logical/proto.c | 2 + src/backend/replication/logical/worker.c | 4 + src/include/replication/logicalproto.h | 8 + src/include/replication/worker_internal.h | 17 +++ src/tools/pgindent/typedefs.list | 1 + 6 files changed, 157 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index e10f653fde7..95a30689f01 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -733,6 +733,75 @@ ProcessParallelApplyInterrupts(void) } } +/* + * Handle internal dependency information. + * + * Wait for all transactions listed in the message to commit. + */ +static void +apply_handle_internal_dependency(StringInfo s) +{ + int nxids = pq_getmsgint(s, 4); + + for (int i = 0; i < nxids; i++) + { + TransactionId xid = pq_getmsgint(s, 4); + + pa_wait_for_depended_transaction(xid); + } +} + +/* + * Handle internal relation information. + * + * Update all relation details in the relation map cache. + */ +static void +apply_handle_internal_relation(StringInfo s) +{ + int nrels; + + nrels = pq_getmsgint(s, 4); + + for (int i = 0; i < nrels; i++) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + + logicalrep_relmap_update(rel); + + /* Also reset all entries in the partition map that refer to remoterel. */ + logicalrep_partmap_reset_relmap(rel); + + elog(DEBUG1, "parallel apply worker init relmap for %s", + rel->relname); + } +} + +/* + * Handle an internal message generated by the leader apply worker. + */ +void +apply_handle_internal_message(StringInfo s) +{ + PAWorkerMsgType action = pq_getmsgbyte(s); + + Assert(am_parallel_apply_worker()); + + switch (action) + { + case PA_MSG_XACT_DEPENDENCY: + apply_handle_internal_dependency(s); + break; + case PA_MSG_RELMAP: + apply_handle_internal_relation(s); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid worker internal message type \"??? (%d)\"", action))); + } +} + /* Parallel apply worker main loop. */ static void LogicalParallelApplyLoop(shm_mq_handle *mqh) @@ -779,26 +848,46 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) initReadOnlyStringInfo(&s, data, len); - /* - * The first byte of messages sent from leader apply worker to - * parallel apply workers can only be PqReplMsg_WALData. - */ c = pq_getmsgbyte(&s); - if (c != PqReplMsg_WALData) - elog(ERROR, "unexpected message \"%c\"", c); - /* - * Ignore statistics fields that have been updated by the leader - * apply worker. - * - * XXX We can avoid sending the statistics fields from the leader - * apply worker but for that, it needs to rebuild the entire - * message by removing these fields which could be more work than - * simply ignoring these fields in the parallel apply worker. - */ - s.cursor += SIZE_STATS_MESSAGE; + if (c == PqReplMsg_WALData) + { + /* + * Ignore statistics fields that have been updated by the + * leader apply worker. + * + * XXX We can avoid sending the statistics fields from the + * leader apply worker but for that, it needs to rebuild the + * entire message by removing these fields which could be more + * work than simply ignoring these fields in the parallel apply + * worker. + */ + s.cursor += SIZE_STATS_MESSAGE; - apply_dispatch(&s); + apply_dispatch(&s); + } + else if (c == LOGICAL_REP_MSG_INTERNAL_MESSAGE) + { + /* + * Rewind the cursor so that apply_dispatch can re-read the + * first byte (LOGICAL_REP_MSG_INTERNAL_MESSAGE) to correctly + * handle the internal message. Alternatively, we could call + * apply_handle_internal_message directly, but using + * apply_dispatch uniformly across all message types is cleaner + * and more consistent. + */ + s.cursor--; + + apply_dispatch(&s); + } + else + { + /* + * The first byte of messages sent from leader apply worker to + * parallel apply workers can only be 'w' or 'i'. + */ + elog(ERROR, "unexpected message \"%c\"", c); + } } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { @@ -1656,3 +1745,22 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) pa_free_worker(winfo); } + +/* + * Wait for the given transaction to finish. + * + * Both leader and parallel apply workers can call this function to wait for a + * transaction to finish. + */ +void +pa_wait_for_depended_transaction(TransactionId xid) +{ + elog(DEBUG1, "wait for depended xid %u", xid); + + for (;;) + { + /* XXX wait until given transaction is finished */ + } + + elog(DEBUG1, "finish waiting for depended xid %u", xid); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 86ad97cd937..a0cac05cdcd 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1253,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; + case LOGICAL_REP_MSG_INTERNAL_MESSAGE: + return "INTERNAL MESSAGE"; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9f2a16b17fa..2c0091e4fb1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3875,6 +3875,10 @@ apply_dispatch(StringInfo s) apply_handle_stream_prepare(s); break; + case LOGICAL_REP_MSG_INTERNAL_MESSAGE: + apply_handle_internal_message(s); + break; + default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 058a955e20c..cb0a8f440c0 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -53,6 +53,13 @@ * in logical replication protocol, which uses a single byte to identify a * message type. Hence the values should be single-byte wide and preferably * human-readable characters. + * + * LOGICAL_REP_MSG_INTERNAL_MESSAGE ('i') is reserved for internal messages (see + * PAWorkerMsgType for sub-message types) sent from the leader apply + * worker to parallel apply workers. Centralizing this definition here allows + * all message types to be handled together and avoids the maintenance burden of + * ensuring sub-message types do not conflict with regular LogicalRepMsgType + * values. */ typedef enum LogicalRepMsgType { @@ -75,6 +82,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A', LOGICAL_REP_MSG_STREAM_PREPARE = 'p', + LOGICAL_REP_MSG_INTERNAL_MESSAGE = 'i', } LogicalRepMsgType; /* diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 745b7d9e969..226357c8f3f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -15,6 +15,7 @@ #include "access/xlogdefs.h" #include "catalog/pg_subscription.h" #include "datatype/timestamp.h" +#include "libpq/protocol.h" #include "miscadmin.h" #include "replication/logicalrelation.h" #include "replication/walreceiver.h" @@ -233,6 +234,18 @@ typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; +/* + * Parallel Apply worker internal message types + * + * This type of messages would be generated by leader apply worker and sent to + * the parallel apply worker. + */ +typedef enum PAWorkerMsgType +{ + PA_MSG_XACT_DEPENDENCY = 'd', + PA_MSG_RELMAP = 'r', +} PAWorkerMsgType; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern PGDLLIMPORT MemoryContext ApplyContext; @@ -332,6 +345,8 @@ extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); extern void pa_detach_all_error_mq(void); +extern void apply_handle_internal_message(StringInfo s); + extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, @@ -359,6 +374,8 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void pa_wait_for_depended_transaction(TransactionId xid); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTableSyncWorker(worker) ((worker)->in_use && \ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9f1dd55213d..746f0dbb0a5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1914,6 +1914,7 @@ OverridingKind PACE_HEADER PACL PATH +PAWorkerMsgType PCtxtHandle PERL_CONTEXT PERL_SI -- 2.47.3