From 1e1516f832be2709dbc7d43e2dd1fcb89a34745c Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Mon, 20 Apr 2026 16:46:25 +0800 Subject: [PATCH v17 02/10] Introduce a shared hash table to store parallelized transactions This commit introduces a shared hash table for tracking parallelized transactions. The hash table uses transaction IDs as keys; no values are stored. The hash entries are inserted with a remote XID when the leader bypasses remote transactions to parallel apply workers. Entries are deleted when parallel workers are committed to corresponding transactions. Any worker can use this hash table to check whether a transaction is still in progress: - If an entry exists, the transaction is still in progress. - If no entry exists, the transaction has committed. Later patches will use this hash table to support dependency waiting and commit order preservation. When transaction A depends on transaction B (or must commit after B), the worker will wait for transaction B's entry to be removed before committing transaction A. Author: Zhijie Hou Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 126 +++++++++++++++++- .../utils/activity/wait_event_names.txt | 1 + src/include/replication/worker_internal.h | 9 ++ src/include/storage/lwlocklist.h | 1 + src/tools/pgindent/typedefs.list | 1 + 5 files changed, 137 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 95a30689f01..3af491d6da7 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -221,12 +221,35 @@ typedef struct ParallelApplyWorkerEntry ParallelApplyWorkerInfo *winfo; } ParallelApplyWorkerEntry; +/* An entry in the parallelized_txns shared hash table */ +typedef struct ParallelizedTxnEntry +{ + TransactionId xid; /* Hash key, remote transaction ID */ +} ParallelizedTxnEntry; + /* * A hash table used to cache the state of streaming transactions being applied * by the parallel apply workers. */ static HTAB *ParallelApplyTxnHash = NULL; +/* + * A hash table used to track the parallelized remote transactions that could be + * depended on by other transactions. + */ +static dsa_area *parallel_apply_dsa_area = NULL; +static dshash_table *parallelized_txns = NULL; + +/* parameters for the parallelized_txns shared hash table */ +static const dshash_parameters dsh_params = { + sizeof(TransactionId), + sizeof(ParallelizedTxnEntry), + dshash_memcmp, + dshash_memhash, + dshash_memcpy, + LWTRANCHE_PARALLEL_APPLY_DSA +}; + /* * A list (pool) of active parallel apply workers. The information for * the new worker is added to the list after successfully launching it. The @@ -260,6 +283,8 @@ static List *subxactlist = NIL; static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); +static void pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle, + dshash_table_handle *pa_dshash_handle); /* * Returns true if it is OK to start a parallel apply worker, false otherwise. @@ -337,6 +362,15 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) shm_mq *mq; Size queue_size = DSM_QUEUE_SIZE; Size error_queue_size = DSM_ERROR_QUEUE_SIZE; + dsa_handle parallel_apply_dsa_handle; + dshash_table_handle parallelized_txns_handle; + + pa_attach_parallelized_txn_hash(¶llel_apply_dsa_handle, + ¶llelized_txns_handle); + + if (parallel_apply_dsa_handle == DSA_HANDLE_INVALID || + parallelized_txns_handle == DSHASH_HANDLE_INVALID) + return false; /* * Estimate how much shared memory we need. @@ -372,6 +406,8 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) pg_atomic_init_u32(&(shared->pending_stream_count), 0); shared->last_commit_end = InvalidXLogRecPtr; shared->fileset_state = FS_EMPTY; + shared->parallel_apply_dsa_handle = parallel_apply_dsa_handle; + shared->parallelized_txns_handle = parallelized_txns_handle; shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared); @@ -965,6 +1001,8 @@ ParallelApplyWorkerMain(Datum main_arg) shm_mq *mq; shm_mq_handle *mqh; shm_mq_handle *error_mqh; + dsa_handle pa_dsa_handle; + dshash_table_handle pa_dshash_handle; ReplOriginId originid; int worker_slot = DatumGetInt32(main_arg); char originname[NAMEDATALEN]; @@ -1051,6 +1089,8 @@ ParallelApplyWorkerMain(Datum main_arg) InitializingApplyWorker = false; + pa_attach_parallelized_txn_hash(&pa_dsa_handle, &pa_dshash_handle); + /* Setup replication origin tracking. */ StartTransactionCommand(); ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, @@ -1746,6 +1786,51 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) pa_free_worker(winfo); } +/* + * Attach to the shared hash table for parallelized transactions. + */ +static void +pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle, + dshash_table_handle *pa_dshash_handle) +{ + MemoryContext oldctx; + + if (parallelized_txns) + { + Assert(parallel_apply_dsa_area); + *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area); + *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns); + return; + } + + /* Be sure any local memory allocated by DSA routines is persistent. */ + oldctx = MemoryContextSwitchTo(ApplyContext); + + if (am_leader_apply_worker()) + { + /* Initialize dynamic shared hash table for parallelized transactions */ + parallel_apply_dsa_area = dsa_create(LWTRANCHE_PARALLEL_APPLY_DSA); + dsa_pin(parallel_apply_dsa_area); + dsa_pin_mapping(parallel_apply_dsa_area); + parallelized_txns = dshash_create(parallel_apply_dsa_area, &dsh_params, NULL); + + /* Store handles in shared memory for other backends to use. */ + *pa_dsa_handle = dsa_get_handle(parallel_apply_dsa_area); + *pa_dshash_handle = dshash_get_hash_table_handle(parallelized_txns); + } + else if (am_parallel_apply_worker()) + { + /* Attach to existing dynamic shared hash table. */ + parallel_apply_dsa_area = dsa_attach(MyParallelShared->parallel_apply_dsa_handle); + dsa_pin_mapping(parallel_apply_dsa_area); + parallelized_txns = dshash_attach(parallel_apply_dsa_area, &dsh_params, + MyParallelShared->parallelized_txns_handle, + NULL); + } + + MemoryContextSwitchTo(oldctx); +} + /* * Wait for the given transaction to finish. * @@ -1755,11 +1840,50 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) void pa_wait_for_depended_transaction(TransactionId xid) { + ParallelizedTxnEntry *txn_entry; + + /* + * Quick exit if parallelized_txns has not been initialized yet. This can + * happen when 1) this function is called by the leader worker and 2) no + * parallel apply workers have never been launched yet since the leader + * worker started. + */ + if (!parallelized_txns) + return; + elog(DEBUG1, "wait for depended xid %u", xid); for (;;) { - /* XXX wait until given transaction is finished */ + txn_entry = dshash_find(parallelized_txns, &xid, false); + + /* The entry is removed only if the transaction is committed */ + if (txn_entry == NULL) + break; + + dshash_release_lock(parallelized_txns, txn_entry); + + /* + * Wait for the parallel apply worker processing the given remote + * transaction to finish applying and release its lock. + */ + pa_lock_transaction(xid, AccessShareLock); + pa_unlock_transaction(xid, AccessShareLock); + + CHECK_FOR_INTERRUPTS(); + + /* + * Acquiring the lock successfully does not guarantee we can proceed. + * The worker may have errored out and released the lock while leaving + * its shared hash entry intact, or it may not have acquired the lock + * yet because it hasn't processed the BEGIN message. In either case, we + * must continue waiting in the loop until the parallel apply worker + * finishes applying the transaction, or until the leader notifies us of + * a failure and restarts all workers. + * + * The above race window is small and infrequent, so no WaitLatch is + * added. + */ } elog(DEBUG1, "finish waiting for depended xid %u", xid); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 560659f9568..1339c33262f 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -417,6 +417,7 @@ XactSLRU "Waiting to access the transaction status SLRU cache." ParallelVacuumDSA "Waiting for parallel vacuum dynamic shared memory allocation." AioUringCompletion "Waiting for another process to complete IO via io_uring." ShmemIndex "Waiting to find or allocate space in shared memory." +ParallelApplyDSA "Waiting for parallel apply dynamic shared memory allocation." # No "ABI_compatibility" region here as WaitEventLWLock has its own C code. diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 226357c8f3f..474ec6d0dfe 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -15,6 +15,7 @@ #include "access/xlogdefs.h" #include "catalog/pg_subscription.h" #include "datatype/timestamp.h" +#include "lib/dshash.h" #include "libpq/protocol.h" #include "miscadmin.h" #include "replication/logicalrelation.h" @@ -197,6 +198,14 @@ typedef struct ParallelApplyWorkerShared */ PartialFileSetState fileset_state; FileSet fileset; + + /* + * DSA handle for parallel apply workers, along with the handle for the + * shared hash table allocated under it. The hash table stores parallelized + * transaction information. + */ + dsa_handle parallel_apply_dsa_handle; + dshash_table_handle parallelized_txns_handle; } ParallelApplyWorkerShared; /* diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index d7eb648bd27..c5f2921b703 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -140,3 +140,4 @@ PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU) PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA) PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion) PG_LWLOCKTRANCHE(SHMEM_INDEX, ShmemIndex) +PG_LWLOCKTRANCHE(PARALLEL_APPLY_DSA, ParallelApplyDSA) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 746f0dbb0a5..72f9d22215d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2151,6 +2151,7 @@ ParallelHashJoinBatch ParallelHashJoinBatchAccessor ParallelHashJoinState ParallelIndexScanDesc +ParallelizedTxnEntry ParallelSlot ParallelSlotArray ParallelSlotResultHandler -- 2.47.3