From 3b7610e661b7fc44792894bcb83822b93d876cf1 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 1 Dec 2025 10:37:27 +0900 Subject: [PATCH v15 1/3] Introduce internal messages to track dependencies This patch introduces a new set of internal worker message types (see WorkerInternalMsgType). These types of messages are generated by a leader worker and sent to parallel apply workers based on the needs. For now, two types of messages exist: WORKER_INTERNAL_MSG_DEPENDENCY and WORKER_INTERNAL_MSG_RELATION. WORKER_INTERNAL_MSG_DEPENDENCY ensures that dependent transactions are committed in the correct order. It has a list of transaction IDs that parallel workers must wait for. This type of message is generated when the leader detects a dependency between the current and other transactions, or just before the COMMIT message. The latter one is used to preserve the commit ordering between the publisher and the subscriber. WORKER_INTERNAL_MSG_RELATION is used to synchronize the relation information between the leader and parallel workers. It has a list of relations that the leader already knows, and parallel workers also update the relmap in response to the message. This type of message is generated when the leader allocates a new parallel worker to the transaction, or when the publisher sends additional RELATION messages. Author: Zhijie Hou Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 133 +++++++++++++++--- src/backend/replication/logical/proto.c | 2 + src/backend/replication/logical/worker.c | 4 + src/include/replication/logicalproto.h | 8 ++ src/include/replication/worker_internal.h | 31 ++++ src/tools/pgindent/typedefs.list | 1 + 6 files changed, 162 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index e10f653fde7..7ae67a061ad 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -733,6 +733,75 @@ ProcessParallelApplyInterrupts(void) } } +/* + * Handle internal dependency information. + * + * Wait for all transactions listed in the message to commit. + */ +static void +apply_handle_internal_dependency(StringInfo s) +{ + int nxids = pq_getmsgint(s, 4); + + for (int i = 0; i < nxids; i++) + { + TransactionId xid = pq_getmsgint(s, 4); + + pa_wait_for_depended_transaction(xid); + } +} + +/* + * Handle internal relation information. + * + * Update all relation details in the relation map cache. + */ +static void +apply_handle_internal_relation(StringInfo s) +{ + int nrels; + + nrels = pq_getmsgint(s, 4); + + for (int i = 0; i < nrels; i++) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + + logicalrep_relmap_update(rel); + + /* Also reset all entries in the partition map that refer to remoterel. */ + logicalrep_partmap_reset_relmap(rel); + + elog(DEBUG1, "parallel apply worker init relmap for %s", + rel->relname); + } +} + +/* + * Handle an internal message generated by the leader apply worker. + */ +void +apply_handle_internal_message(StringInfo s) +{ + WorkerInternalMsgType action = pq_getmsgbyte(s); + + Assert(am_parallel_apply_worker()); + + switch (action) + { + case WORKER_INTERNAL_MSG_DEPENDENCY: + apply_handle_internal_dependency(s); + break; + case WORKER_INTERNAL_MSG_RELATION: + apply_handle_internal_relation(s); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid worker internal message type \"??? (%d)\"", action))); + } +} + /* Parallel apply worker main loop. */ static void LogicalParallelApplyLoop(shm_mq_handle *mqh) @@ -779,26 +848,37 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) initReadOnlyStringInfo(&s, data, len); - /* - * The first byte of messages sent from leader apply worker to - * parallel apply workers can only be PqReplMsg_WALData. - */ c = pq_getmsgbyte(&s); - if (c != PqReplMsg_WALData) - elog(ERROR, "unexpected message \"%c\"", c); - /* - * Ignore statistics fields that have been updated by the leader - * apply worker. - * - * XXX We can avoid sending the statistics fields from the leader - * apply worker but for that, it needs to rebuild the entire - * message by removing these fields which could be more work than - * simply ignoring these fields in the parallel apply worker. - */ - s.cursor += SIZE_STATS_MESSAGE; + if (c == PqReplMsg_WALData) + { + /* + * Ignore statistics fields that have been updated by the + * leader apply worker. + * + * XXX We can avoid sending the statistics fields from the + * leader apply worker but for that, it needs to rebuild the + * entire message by removing these fields which could be more + * work than simply ignoring these fields in the parallel apply + * worker. + */ + s.cursor += SIZE_STATS_MESSAGE; - apply_dispatch(&s); + apply_dispatch(&s); + } + else if (c == PARALLEL_APPLY_INTERNAL_MESSAGE) + { + /* Handle the internal message. */ + apply_dispatch(&s); + } + else + { + /* + * The first byte of messages sent from leader apply worker to + * parallel apply workers can only be 'w' or 'i'. + */ + elog(ERROR, "unexpected message \"%c\"", c); + } } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { @@ -1656,3 +1736,22 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) pa_free_worker(winfo); } + +/* + * Wait for the given transaction to finish. + * + * Both leader and parallel apply workers can call this function to wait for a + * transaction to finish. + */ +void +pa_wait_for_depended_transaction(TransactionId xid) +{ + elog(DEBUG1, "wait for depended xid %u", xid); + + for (;;) + { + /* XXX wait until given transaction is finished */ + } + + elog(DEBUG1, "finish waiting for depended xid %u", xid); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 86ad97cd937..a0cac05cdcd 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1253,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; + case LOGICAL_REP_MSG_INTERNAL_MESSAGE: + return "INTERNAL MESSAGE"; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9f2a16b17fa..2c0091e4fb1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3875,6 +3875,10 @@ apply_dispatch(StringInfo s) apply_handle_stream_prepare(s); break; + case LOGICAL_REP_MSG_INTERNAL_MESSAGE: + apply_handle_internal_message(s); + break; + default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 058a955e20c..611d521f1d3 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -53,6 +53,13 @@ * in logical replication protocol, which uses a single byte to identify a * message type. Hence the values should be single-byte wide and preferably * human-readable characters. + * + * LOGICAL_REP_MSG_INTERNAL_MESSAGE ('i') is reserved for internal messages (see + * WorkerInternalMsgType for sub-message types) sent from the leader apply + * worker to parallel apply workers. Centralizing this definition here allows + * all message types to be handled together and avoids the maintenance burden of + * ensuring sub-message types do not conflict with regular LogicalRepMsgType + * values. */ typedef enum LogicalRepMsgType { @@ -75,6 +82,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A', LOGICAL_REP_MSG_STREAM_PREPARE = 'p', + LOGICAL_REP_MSG_INTERNAL_MESSAGE = 'i', } LogicalRepMsgType; /* diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 745b7d9e969..96d90c2418c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -15,6 +15,7 @@ #include "access/xlogdefs.h" #include "catalog/pg_subscription.h" #include "datatype/timestamp.h" +#include "libpq/protocol.h" #include "miscadmin.h" #include "replication/logicalrelation.h" #include "replication/walreceiver.h" @@ -233,6 +234,18 @@ typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; +/* + * Worker internal message types + * + * This type of messages would be generated by leader apply worker and sent to + * the parallel apply worker. + */ +typedef enum WorkerInternalMsgType +{ + WORKER_INTERNAL_MSG_DEPENDENCY = 'd', + WORKER_INTERNAL_MSG_RELATION = 'i', +} WorkerInternalMsgType; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern PGDLLIMPORT MemoryContext ApplyContext; @@ -332,6 +345,8 @@ extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); extern void pa_detach_all_error_mq(void); +extern void apply_handle_internal_message(StringInfo s); + extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, @@ -359,6 +374,8 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void pa_wait_for_depended_transaction(TransactionId xid); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTableSyncWorker(worker) ((worker)->in_use && \ @@ -366,6 +383,20 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isSequenceSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_SEQUENCESYNC) +/* + * The message code for internal messages sent by the leader apply worker to the + * parallel apply worker. + */ +#define PARALLEL_APPLY_INTERNAL_MESSAGE 'i' + +/* + * Ensure PARALLEL_APPLY_INTERNAL_MESSAGE does not conflict with + * PqReplMsg_WALData ('d'), as parallel apply workers may receive both types of + * messages. + */ +StaticAssertDecl(PARALLEL_APPLY_INTERNAL_MESSAGE != PqReplMsg_WALData, + "PARALLEL_APPLY_INTERNAL_MESSAGE conflicts with PqReplMsg_WALData"); + static inline bool am_tablesync_worker(void) { diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 49dfb662abc..2bfc75c1061 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3460,6 +3460,7 @@ WorkTableScan WorkTableScanState WorkerInfo WorkerInfoData +WorkerInternalMsgType WorkerJobDumpPtrType WorkerJobRestorePtrType WorkerNodeInstrumentation -- 2.53.0.windows.2