From 1fd06eb5ede8b8d0c9f619c944f55f6a0028371d Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 15 Apr 2026 14:28:49 +0800 Subject: [PATCH v13 4/9] Parallel apply non-streaming transactions -- Basic design -- The leader worker assigns each non-streaming transaction to a parallel apply worker. Before dispatching changes to a parallel worker, the leader verifies if the current modification affects the same row (identitied by replica identity key) as another ongoing transaction. If so, the leader sends a list of dependent transaction IDs to the parallel worker, indicating that the parallel apply worker must wait for these transactions to commit before proceeding. The leader preserves publisher commit order for all transactions by instructing the parallel worker to wait for the last transaction to commit before committing the current transaction. (Note that the leader itself does not wait the parallelized transaction to be committed unless partial serialization mode is active, where the parallel apply worker can no longer be reused.) See the "commit order" and "worker interaction" sections below for details. Tracking dependencies is necessary even when commit order is preserved. Consider two transactions: TX-1 (INSERT row 1) and TX-2 (DELETE row 1). If both are allowed to apply in parallel, TX-2's DELETE could be applied before TX-1's INSERT, resulting in a delete_missing conflict. Each parallel apply worker records the local end LSN of the transaction it applies in shared memory. Subsequently, the leader gathers these local end LSNs and logs them in the local 'lsn_mapping' for verifying whether they have been flushed to disk (following the logic in get_flush_position()). If no parallel apply worker is available, the leader will apply the transaction independently. For further details, please refer to the following: -- dedendency tracking -- The leader maintains a local hash table, using the remote change's replica identity column values and relid as keys, with remote transaction IDs as values. Before sending changes to the parallel apply worker, the leader computes a hash using RI key values and the relid of the current change to search the hash table. If an existing entry is found, the leader first updates the hash entry with the receiving remote xid then tells the parallel worker to wait for it. If the remote relation lacks a replica identity (RI), it indicates that only INSERT can be replicated for this table. In such cases, the leader skips dependency checks, allowing the parallel apply worker to proceed with applying changes without delay. This is because the only potential conflict could happen is related to the local unique key or foreign key, which that is yet to be implemented. In cases of TRUNCATE or remote schema changes affecting the entire table, the leader retrieves all remote xids touching the same table (via sequential scans of the hash table) and tells the parallel worker to wait for those transactions to commit. Hash entries are cleaned up once the transaction corresponding to the remote xid in the entry has been committed. Clean-up typically occurs when collecting the flush position of each transaction, but is forced if the hash table exceeds a set threshold. -- dedendency waiting -- If a transaction is relied upon by others, the leader adds its xid to a shared hash table. The shared hash table entry is cleared by the parallel apply worker upon completing the transaction. Workers needing to wait for a transaction check the shared hash table entry; if present, they lock the transaction ID (using pa_lock_transaction). If absent, it indicates the transaction has been committed, negating the need to wait. -- commit order -- We preserve publisher commit order for all transactions for two reasons: 1) User-visible consistency Out-of-order commits can expose states on the subscriber that were never visible on the publisher. For example, suppose a user updates table A and then updates table B on the publisher. If the subscriber commits those transactions out of order, a query that sees the latest row in B might still see stale data in A. Although eventual consistency would still be reached, that behavior may be unacceptable for some users. In the future, we could provide a GUC to allow out-of-order commits for users who prefer higher parallelism. 2) Replication progress tracking We currently track replication progress using transaction commit LSNs. With out-of-order commits, this becomes ambiguous after failures. For example, if TX-2 is applied before TX-1 and replication stops due to an error, we cannot reliably determine whether TX-1 was applied before restart. As a result, transactions that were already committed on the subscriber may be replayed. -- worker interaction -- After sending the COMMIT message for a transaction, the leader apply worker does not wait for the parallel apply worker to finish applying that transaction. Instead, it sends a WORKER_INTERNAL_MSG_DEPENDENCY message to the parallel apply worker, instructing it to wait for the last transaction to commit. This allows the leader to remain busy receiving and dispatching changes to more parallel apply workers, enabling greater parallelism in transaction application. --- .../replication/logical/applyparallelworker.c | 359 ++++++++++++++++-- src/backend/replication/logical/proto.c | 38 ++ src/backend/replication/logical/relation.c | 31 ++ src/backend/replication/logical/worker.c | 318 ++++++++++++++-- src/include/replication/logicalproto.h | 2 + src/include/replication/logicalrelation.h | 2 + src/include/replication/worker_internal.h | 7 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/001_rep_changes.pl | 2 + src/test/subscription/t/010_truncate.pl | 2 +- src/test/subscription/t/015_stream.pl | 8 +- src/test/subscription/t/026_stats.pl | 1 + src/test/subscription/t/027_nosuperuser.pl | 1 + src/test/subscription/t/050_parallel_apply.pl | 150 ++++++++ src/tools/pgindent/typedefs.list | 2 + 15 files changed, 856 insertions(+), 68 deletions(-) create mode 100644 src/test/subscription/t/050_parallel_apply.pl diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 3f91d79dbec..00756206f72 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -14,6 +14,9 @@ * ParallelApplyWorkerInfo which is required so the leader worker and parallel * apply workers can communicate with each other. * + * Streaming transactions + * ====================== + * * The parallel apply workers are assigned (if available) as soon as xact's * first stream is received for subscriptions that have set their 'streaming' * option as parallel. The leader apply worker will send changes to this new @@ -36,12 +39,9 @@ * its information is added to the ParallelApplyWorkerPool. Once the worker * finishes applying the transaction, it is marked as available for re-use. * Now, before starting a new worker to apply the streaming transaction, we - * check the list for any available worker. Note that we retain a maximum of - * half the max_parallel_apply_workers_per_subscription workers in the pool and - * after that, we simply exit the worker after applying the transaction. - * - * XXX This worker pool threshold is arbitrary and we can provide a GUC - * variable for this in the future if required. + * check the list for any available worker. We do not stop workers in the pool + * unless partial of the changes of the transaction are serialized due to a send + * timeout. * * The leader apply worker will create a separate dynamic shared memory segment * when each parallel apply worker starts. The reason for this design is that @@ -152,6 +152,61 @@ * session-level locks because both locks could be acquired outside the * transaction, and the stream lock in the leader needs to persist across * transaction boundaries i.e. until the end of the streaming transaction. + * + * Non-streaming transactions + * ====================== + * The handling is similar to streaming transactions, but including few + * differences: + * + * Transaction dependency + * ---------------------- + * Before dispatching changes to a parallel worker, the leader verifies if the + * current modification affects the same row (identitied by replica identity + * key) as another ongoing transaction (see handle_dependency_on_change for + * details). If so, the leader sends a list of dependent transaction IDs to the + * parallel worker, indicating that the parallel apply worker must wait for + * these transactions to commit before proceeding. + * + * Tracking dependencies is necessary even when commit order is preserved. + * Consider two transactions: TX-1 (INSERT row 1) and TX-2 (DELETE row 1). If + * both are allowed to apply in parallel, TX-2's DELETE could be applied before + * TX-1's INSERT, resulting in a delete_missing conflict. + * + * Commit order + * ------------ + * We preserve publisher commit order for all transactions for two reasons: + * + * 1) User-visible consistency + * + * Out-of-order commits can expose states on the subscriber that were never + * visible on the publisher. + * + * For example, suppose a user updates table A and then updates table B on the + * publisher. If the subscriber commits those transactions out of order, a + * query that sees the latest row in B might still see stale data in A. + * Although eventual consistency would still be reached, that behavior may be + * unacceptable for some users. In the future, we could provide a subscription + * option to allow out-of-order commits for users who prefer higher parallelism. + * + * 2) Replication progress tracking + * + * We currently track replication progress using the last transaction's commit + * LSNs. With out-of-order commits, this becomes ambiguous after failures. + * + * For example, if TX-2 is applied before TX-1 and replication stops due to an + * error, we cannot reliably determine whether TX-1 was applied before restart. + * As a result, transactions that were already committed on the subscriber may + * be replayed. + * + * Worker interaction + * ------------ + * After sending the COMMIT message for a transaction, the leader apply worker + * does not wait for the parallel apply worker to finish applying that + * transaction. Instead, it sends a WORKER_INTERNAL_MSG_DEPENDENCY message to + * the parallel apply worker, instructing it to wait for the last transaction to + * commit. This allows the leader to remain busy receiving and dispatching + * changes to more parallel apply workers, enabling greater parallelism in + * transaction application. *------------------------------------------------------------------------- */ @@ -302,6 +357,7 @@ static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); static void pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle, dshash_table_handle *pa_dshash_handle); +static void write_internal_relation(StringInfo s, LogicalRepRelation *rel); /* * Returns true if it is OK to start a parallel apply worker, false otherwise. @@ -419,6 +475,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared)); SpinLockInit(&shared->mutex); + shared->xid = InvalidTransactionId; shared->xact_state = PARALLEL_TRANS_UNKNOWN; pg_atomic_init_u32(&(shared->pending_stream_count), 0); shared->last_commit_end = InvalidXLogRecPtr; @@ -462,6 +519,8 @@ pa_launch_parallel_worker(void) MemoryContext oldcontext; bool launched; ParallelApplyWorkerInfo *winfo; + dsa_handle pa_dsa_handle; + dshash_table_handle pa_dshash_handle; ListCell *lc; /* Try to get an available parallel apply worker from the worker pool. */ @@ -469,10 +528,33 @@ pa_launch_parallel_worker(void) { winfo = (ParallelApplyWorkerInfo *) lfirst(lc); - if (!winfo->in_use) + if (!winfo->stream_txn && + pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED) + { + /* + * Save the local commit LSN of the last transaction applied by + * this worker before reusing it for another transaction. This WAL + * position is crucial for determining the flush position in + * responses to the publisher (see get_flush_position()). + */ + (void) pa_get_last_commit_end(winfo->shared->xid, false, NULL); + return winfo; + } + + if (winfo->stream_txn && !winfo->in_use) return winfo; } + pa_attach_parallelized_txn_hash(&pa_dsa_handle, &pa_dshash_handle); + + /* + * Return if the number of parallel apply workers has reached the maximum + * limit. + */ + if (list_length(ParallelApplyWorkerPool) == + max_parallel_apply_workers_per_subscription) + return NULL; + /* * Start a new parallel apply worker. * @@ -500,18 +582,32 @@ pa_launch_parallel_worker(void) dsm_segment_handle(winfo->dsm_seg), false); - if (launched) - { - ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo); - } - else + if (!launched) { + MemoryContextSwitchTo(oldcontext); pa_free_worker_info(winfo); - winfo = NULL; + return NULL; } + ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo); + MemoryContextSwitchTo(oldcontext); + /* + * Send all existing remote relation information to the parallel apply + * worker. This allows the parallel worker to initialize the + * LogicalRepRelMapEntry locally before applying remote changes. + */ + if (logicalrep_get_num_rels()) + { + StringInfoData out; + + initStringInfo(&out); + + write_internal_relation(&out, NULL); + pa_send_data(winfo, out.len, out.data); + } + return winfo; } @@ -616,25 +712,27 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) { Assert(!am_parallel_apply_worker()); Assert(winfo->in_use); - Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); + Assert(!winfo->stream_txn || + pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL)) elog(ERROR, "hash table corrupted"); /* - * Stop the worker if there are enough workers in the pool. + * XXX we stop the worker if the leader apply worker serialize part of the + * transaction data due to a send timeout. This is because the message could + * be partially written to the queue and there is no way to clean the queue + * other than resending the message until it succeeds. Instead of trying to + * send the data which anyway would have been serialized and then letting + * the parallel apply worker deal with the spurious message, we stop the + * worker. * - * XXX Additionally, we also stop the worker if the leader apply worker - * serialize part of the transaction data due to a send timeout. This is - * because the message could be partially written to the queue and there - * is no way to clean the queue other than resending the message until it - * succeeds. Instead of trying to send the data which anyway would have - * been serialized and then letting the parallel apply worker deal with - * the spurious message, we stop the worker. + * For other cases, we do not stop workers once started. All transactions + * (whether streamed or not) are assigned to parallel apply workers, so + * restarting workers frequently would only increase CPU overhead and slow + * down the leader's ability to dispatch changes to workers. */ - if (winfo->serialize_changes || - list_length(ParallelApplyWorkerPool) > - (max_parallel_apply_workers_per_subscription / 2)) + if (winfo->serialize_changes) { logicalrep_pa_worker_stop(winfo); pa_free_worker_info(winfo); @@ -831,6 +929,38 @@ pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write return entry->local_end; } +/* + * Wait for the remote transaction associated with the specified remote xid to + * complete. + */ +static void +pa_wait_for_transaction(TransactionId wait_for_xid) +{ + if (!am_leader_apply_worker()) + return; + + if (!TransactionIdIsValid(wait_for_xid)) + return; + + elog(DEBUG1, "plan to wait for remote_xid %u to finish", + wait_for_xid); + + for (;;) + { + if (pa_transaction_committed(wait_for_xid)) + break; + + pa_lock_transaction(wait_for_xid, AccessShareLock); + pa_unlock_transaction(wait_for_xid, AccessShareLock); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } + + elog(DEBUG1, "finished wait for remote_xid %u to finish", + wait_for_xid); +} + /* * Interrupt handler for main loop of parallel apply worker. */ @@ -1015,6 +1145,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) if (rc & WL_LATCH_SET) ResetLatch(MyLatch); + + if (!IsTransactionState()) + pgstat_report_stat(true); } } else @@ -1052,6 +1185,9 @@ pa_shutdown(int code, Datum arg) INVALID_PROC_NUMBER); dsm_detach((dsm_segment *) DatumGetPointer(arg)); + + if (parallel_apply_dsa_area) + dsa_detach(parallel_apply_dsa_area); } /* @@ -1363,7 +1499,6 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) shm_mq_result result; TimestampTz startTime = 0; - Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); /* @@ -1415,6 +1550,72 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) } } +/* + * Distribute remote relation information to all active parallel apply workers + * that require it. + */ +void +pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel) +{ + List *workers_stopped = NIL; + StringInfoData out; + + /* Only leader apply workers can distribute schema changes */ + if (!am_leader_apply_worker()) + return; + + /* Quick exit if there are no parallel apply workers */ + if (!ParallelApplyWorkerPool) + return; + + initStringInfo(&out); + + write_internal_relation(&out, rel); + + foreach_ptr(ParallelApplyWorkerInfo, winfo, ParallelApplyWorkerPool) + { + /* + * Skip the worker responsible for the current transaction, as the + * relation information has already been sent to it. + */ + if (winfo == stream_apply_worker) + continue; + + /* + * Skip the worker that is in serialize mode, as they will soon stop + * once they finish applying the transaction. + */ + if (winfo->serialize_changes) + continue; + + elog(DEBUG1, "distributing schema changes to pa workers"); + + if (pa_send_data(winfo, out.len, out.data)) + continue; + + elog(DEBUG1, "failed to distribute, will stop that worker instead"); + + /* + * Distribution to this worker failed due to a sending timeout. Wait + * for the worker to complete its transaction and then stop it. This + * is consistent with the handling of workers in serialize mode (see + * pa_free_worker() for details). + */ + pa_wait_for_transaction(winfo->shared->xid); + + pa_get_last_commit_end(winfo->shared->xid, false, NULL); + + logicalrep_pa_worker_stop(winfo); + + workers_stopped = lappend(workers_stopped, winfo); + } + + pfree(out.data); + + foreach_ptr(ParallelApplyWorkerInfo, winfo, workers_stopped) + pa_free_worker_info(winfo); +} + /* * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means * that the current data and any subsequent data for this transaction will be @@ -1575,6 +1776,13 @@ pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp) void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid) { + /* + * Subtransactions can be sent only for streamed transactions. Skip this + * for non-streamed cases. + */ + if (!TransactionIdIsValid(top_xid)) + return; + if (current_xid != top_xid && !list_member_xid(subxactlist, current_xid)) { @@ -1831,25 +2039,41 @@ pa_decr_and_wait_stream_block(void) void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) { + XLogRecPtr local_lsn = InvalidXLogRecPtr; + TransactionId pa_remote_xid = winfo->shared->xid; + Assert(am_leader_apply_worker()); /* - * Unlock the shared object lock so that parallel apply worker can - * continue to receive and apply changes. + * Unlock the shared object lock taken for streaming transactions so that + * parallel apply worker can continue to receive and apply changes. */ - pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + if (winfo->stream_txn) + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); /* - * Wait for that worker to finish. This is necessary to maintain commit - * order which avoids failures due to transaction dependencies and - * deadlocks. + * Wait for that worker for streaming transaction to finish. This is + * necessary to maintain commit order which avoids failures due to + * transaction dependencies and deadlocks. + * + * For non-streaming transaction but in partial seralize mode, wait for + * stop as well as the worker is anyway cannot be reused anymore (see + * pa_free_worker() for details). */ - pa_wait_for_xact_finish(winfo); + if (winfo->serialize_changes || winfo->stream_txn) + { + pa_wait_for_xact_finish(winfo); + + local_lsn = winfo->shared->last_commit_end; + pa_remote_xid = InvalidTransactionId; + + pa_free_worker(winfo); + } if (XLogRecPtrIsValid(remote_lsn)) - store_flush_position(remote_lsn, winfo->shared->last_commit_end); + store_flush_position(remote_lsn, local_lsn, pa_remote_xid); - pa_free_worker(winfo); + pa_set_stream_apply_worker(NULL); } bool @@ -1917,6 +2141,22 @@ pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle, MemoryContextSwitchTo(oldctx); } +/* + * Mark the transaction state as finished and remove the shared hash entry. + */ +void +pa_commit_transaction(void) +{ + TransactionId xid = MyParallelShared->xid; + + SpinLockAcquire(&MyParallelShared->mutex); + MyParallelShared->xact_state = PARALLEL_TRANS_FINISHED; + SpinLockRelease(&MyParallelShared->mutex); + + dshash_delete_key(parallelized_txns, &xid); + elog(DEBUG1, "depended xid %u committed", xid); +} + /* * Wait for the given transaction to finish. */ @@ -1925,6 +2165,13 @@ pa_wait_for_depended_transaction(TransactionId xid) { elog(DEBUG1, "wait for depended xid %u", xid); + /* + * Quick exit if parallelized_txns has not been initialized yet. This can + * happen when this function is called by the leader worker. + */ + if (!parallelized_txns) + return; + for (;;) { ParallelizedTxnEntry *txn_entry; @@ -1945,3 +2192,45 @@ pa_wait_for_depended_transaction(TransactionId xid) elog(DEBUG1, "finish waiting for depended xid %u", xid); } + +/* + * Write internal relation description to the output stream. + */ +static void +write_internal_relation(StringInfo s, LogicalRepRelation *rel) +{ + pq_sendbyte(s, PARALLEL_APPLY_INTERNAL_MESSAGE); + pq_sendbyte(s, WORKER_INTERNAL_MSG_RELATION); + + if (rel) + { + pq_sendint(s, 1, 4); + logicalrep_write_internal_rel(s, rel); + } + else + { + pq_sendint(s, logicalrep_get_num_rels(), 4); + logicalrep_write_all_rels(s); + } +} + +/* + * Register a transaction to the shared hash table. + * + * This function is intended to be called during the commit phase of + * non-streamed transactions. Other parallel workers would wait, + * removing the added entry. + */ +void +pa_add_parallelized_transaction(TransactionId xid) +{ + bool found; + ParallelizedTxnEntry *txn_entry; + + Assert(parallelized_txns); + Assert(TransactionIdIsValid(xid)); + + txn_entry = dshash_find_or_insert(parallelized_txns, &xid, &found); + + dshash_release_lock(parallelized_txns, txn_entry); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 86ad97cd937..62a15fa74bd 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -691,6 +691,44 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, logicalrep_write_attrs(out, rel, columns, include_gencols_type); } +/* + * Write internal relation description to the output stream. + */ +void +logicalrep_write_internal_rel(StringInfo out, LogicalRepRelation *rel) +{ + pq_sendint32(out, rel->remoteid); + + /* Write relation name */ + pq_sendstring(out, rel->nspname); + pq_sendstring(out, rel->relname); + + /* Write the replica identity. */ + pq_sendbyte(out, rel->replident); + + /* Write attribute description */ + pq_sendint16(out, rel->natts); + + for (int i = 0; i < rel->natts; i++) + { + uint8 flags = 0; + + if (bms_is_member(i, rel->attkeys)) + flags |= LOGICALREP_IS_REPLICA_IDENTITY; + + pq_sendbyte(out, flags); + + /* attribute name */ + pq_sendstring(out, rel->attnames[i]); + + /* attribute type id */ + pq_sendint32(out, rel->atttyps[i]); + + /* ignore attribute mode for now */ + pq_sendint32(out, 0); + } +} + /* * Read the relation info from stream and return as LogicalRepRelation. */ diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index ab6313eb1bc..e1ce183dfd3 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -961,6 +961,37 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, return InvalidOid; } +/* + * Get the number of entries in the LogicalRepRelMap. + */ +int +logicalrep_get_num_rels(void) +{ + if (LogicalRepRelMap == NULL) + return 0; + + return hash_get_num_entries(LogicalRepRelMap); +} + +/* + * Write all the remote relation information from the LogicalRepRelMapEntry to + * the output stream. + */ +void +logicalrep_write_all_rels(StringInfo out) +{ + LogicalRepRelMapEntry *entry; + HASH_SEQ_STATUS status; + + if (LogicalRepRelMap == NULL) + return; + + hash_seq_init(&status, LogicalRepRelMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + logicalrep_write_internal_rel(out, &entry->remoterel); +} + /* * Get the LogicalRepRelMapEntry corresponding to the given relid without * opening the local relation. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9cb6eaa8a3a..5a9fb97bad0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -290,6 +290,7 @@ #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/guc.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -514,6 +515,8 @@ static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +static TransactionId remote_xid = InvalidTransactionId; +static TransactionId last_remote_xid = InvalidTransactionId; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -719,6 +722,7 @@ static void on_exit_clear_xact_state(int code, Datum arg); static bool send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids); +static bool build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo); /* * Compute the hash value for entries in the replica_identity_table. @@ -1135,7 +1139,7 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, */ StringInfoData change = *s; - /* Compute dependency only for non-streaming transaction */ + /* XXX Compute dependency only for non-streaming transaction */ if (in_streamed_transaction || (winfo && winfo->stream_txn)) return; @@ -1413,7 +1417,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransApplyAction apply_action; StringInfoData original_msg; - apply_action = get_transaction_apply_action(stream_xid, &winfo); + Assert(!in_streamed_transaction || TransactionIdIsValid(stream_xid)); + + apply_action = get_transaction_apply_action(in_streamed_transaction + ? stream_xid : remote_xid, + &winfo); /* not in streaming mode */ if (apply_action == TRANS_LEADER_APPLY) @@ -1422,8 +1430,6 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) return false; } - Assert(TransactionIdIsValid(stream_xid)); - /* * The parallel apply worker needs the xid in this message to decide * whether to define a savepoint, so save the original message that has @@ -1434,15 +1440,28 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) /* * We should have received XID of the subxact as the first part of the - * message, so extract it. + * message in streaming transactions, so extract it. */ - current_xid = pq_getmsgint(s, 4); + if (in_streamed_transaction) + current_xid = pq_getmsgint(s, 4); + else + current_xid = remote_xid; if (!TransactionIdIsValid(current_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + handle_dependency_on_change(action, s, current_xid, winfo); + + /* + * Re-fetch the latest apply action as it might have been changed during + * dependency check. + */ + apply_action = get_transaction_apply_action(in_streamed_transaction + ? stream_xid : remote_xid, + &winfo); + switch (apply_action) { case TRANS_LEADER_SERIALIZE: @@ -1487,6 +1506,7 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) /* Define a savepoint for a subxact if needed. */ pa_start_subtrans(current_xid, stream_xid); + return false; default: @@ -1846,17 +1866,71 @@ static void apply_handle_begin(StringInfo s) { LogicalRepBeginData begin_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; /* There must not be an active streaming transaction. */ Assert(!TransactionIdIsValid(stream_xid)); logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); + + remote_xid = begin_data.xid; + + set_apply_error_context_xact(remote_xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; maybe_start_skipping_changes(begin_data.final_lsn); + pa_allocate_worker(remote_xid, false); + + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + elog(DEBUG1, "new remote_xid %u", remote_xid); + switch (apply_action) + { + case TRANS_LEADER_APPLY: + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + pa_set_stream_apply_worker(winfo); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + + pg_fallthrough; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_write_change(LOGICAL_REP_MSG_BEGIN, &original_msg); + + /* Cache the parallel apply worker for this transaction. */ + pa_set_stream_apply_worker(winfo); + break; + + case TRANS_PARALLEL_APPLY: + /* Hold the lock until the end of the transaction. */ + pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock); + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -1897,6 +1971,26 @@ send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids return false; } +/* + * Make a dependency between this and the lastly committed transaction. + * + * Sends an INTERNAL_DEPENDENCY message to the parallel apply worker, + * instructing it to wait for the last committed transaction to finish before + * committing its own, thereby preserving commit order. + * + * Return false if we switched to the serialize mode to send the + * message, true otherwise. + */ +static bool +build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo) +{ + /* Skip if transactions have not been applied yet */ + if (!TransactionIdIsValid(last_remote_xid)) + return true; + + return send_internal_dependencies(winfo, list_make1_xid(last_remote_xid)); +} + /* * Handle COMMIT message. * @@ -1906,6 +2000,11 @@ static void apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; logicalrep_read_commit(s, &commit_data); @@ -1916,7 +2015,106 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(&commit_data); + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + switch (apply_action) + { + case TRANS_LEADER_APPLY: + + /* + * Apart from parallelized transactions, we do not have to + * register this transaction to parallelized_txns. The commit + * ordering is always preserved. + */ + + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_remote_xid)) + pa_wait_for_depended_transaction(last_remote_xid); + + apply_handle_commit_internal(&commit_data); + + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* + * Mark this transaction as parallelized. This ensures that + * upcoming transactions wait until this transaction is committed. + */ + pa_add_parallelized_transaction(remote_xid); + + /* + * Build a dependency between this transaction and the lastly + * committed transaction to preserve the commit order. Then try to + * send a COMMIT message if succeeded. + */ + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) + { + /* Finish processing the transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + + pg_fallthrough; + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Build a dependency with the last committed transaction if not + * already done. + */ + if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL) + build_dependency_with_last_committed_txn(winfo); + + stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* Finish processing the transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + + INJECTION_POINT("parallel-worker-before-commit", NULL); + + apply_handle_commit_internal(&commit_data); + + MyParallelShared->last_commit_end = XactLastCommitEnd; + + pa_commit_transaction(); + + pa_unlock_transaction(remote_xid, AccessExclusiveLock); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + + /* Cache the remote_xid */ + last_remote_xid = remote_xid; + + remote_xid = InvalidTransactionId; + in_remote_transaction = false; + + elog(DEBUG1, "reset remote_xid %u", remote_xid); /* * Process any tables that are being synchronized in parallel, as well as @@ -2039,7 +2237,8 @@ apply_handle_prepare(StringInfo s) * XactLastCommitEnd, and adding it for this purpose doesn't seems worth * it. */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr); + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; @@ -2099,7 +2298,8 @@ apply_handle_commit_prepared(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd, + InvalidTransactionId); in_remote_transaction = false; /* @@ -2168,7 +2368,8 @@ apply_handle_rollback_prepared(StringInfo s) * transaction because we always flush the WAL record for it. See * apply_handle_prepare. */ - store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); + store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; /* @@ -2230,7 +2431,8 @@ apply_handle_stream_prepare(StringInfo s) * It is okay not to set the local_end LSN for the prepare because * we always flush the prepare record. See apply_handle_prepare. */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr); + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); in_remote_transaction = false; @@ -2482,6 +2684,11 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_PARTIAL_SERIALIZE: Assert(winfo); + /* + * TODO, the pa worker could start to wait too soon when + * processing some old stream start + */ + /* * Open the spool file unless it was already opened when switching * to serialize mode. The transaction started in @@ -2955,8 +3162,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, int fileno; pgoff_t offset; - if (!am_parallel_apply_worker()) - maybe_start_skipping_changes(lsn); + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -3229,7 +3435,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) pgstat_report_stat(false); - store_flush_position(commit_data->end_lsn, XactLastCommitEnd); + store_flush_position(commit_data->end_lsn, XactLastCommitEnd, + InvalidTransactionId); } else { @@ -3262,6 +3469,8 @@ apply_handle_relation(StringInfo s) /* Also reset all entries in the partition map that refer to remoterel. */ logicalrep_partmap_reset_relmap(rel); + + pa_distribute_schema_changes_to_workers(rel); } /* @@ -4036,6 +4245,8 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, /* * This handles insert, update, delete on a partitioned table. + * + * TODO, support parallel apply. */ static void apply_handle_tuple_routing(ApplyExecutionData *edata, @@ -4578,6 +4789,10 @@ apply_dispatch(StringInfo s) * check which entries on it are already locally flushed. Those we can report * as having been flushed. * + * For non-streaming transactions managed by a parallel apply worker, we will + * get the local commit end from the shared parallel apply worker info once the + * transaction has been committed by the worker. + * * The have_pending_txes is true if there are outstanding transactions that * need to be flushed. */ @@ -4587,6 +4802,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, { dlist_mutable_iter iter; XLogRecPtr local_flush = GetFlushRecPtr(NULL); + List *committed_pa_xid = NIL; *write = InvalidXLogRecPtr; *flush = InvalidXLogRecPtr; @@ -4596,6 +4812,40 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, FlushPosition *pos = dlist_container(FlushPosition, node, iter.cur); + /* + * If the transaction was assigned to a parallel apply worker, attempt + * to retrieve its commit end LSN from that worker. + */ + if (TransactionIdIsValid(pos->pa_remote_xid) && + !XLogRecPtrIsValid(pos->local_end)) + { + bool skipped_write; + + pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true, + &skipped_write); + + elog(DEBUG1, + "got commit end from parallel apply worker, " + "txn: %u, remote_end %X/%X, local_end %X/%X", + pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end), + LSN_FORMAT_ARGS(pos->local_end)); + + /* + * Break the loop if the worker has not finished applying the + * transaction. There's no need to check subsequent transactions, + * as they must commit after the current transaction being + * examined and thus won't have their commit end available yet. + */ + if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end)) + break; + + committed_pa_xid = lappend_xid(committed_pa_xid, pos->pa_remote_xid); + } + + /* + * Worker has finished applying or the transaction was applied in the + * leader apply worker. + */ *write = pos->remote_end; if (pos->local_end <= local_flush) @@ -4604,29 +4854,22 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, dlist_delete(iter.cur); pfree(pos); } - else - { - /* - * Don't want to uselessly iterate over the rest of the list which - * could potentially be long. Instead get the last element and - * grab the write position from there. - */ - pos = dlist_tail_element(FlushPosition, node, - &lsn_mapping); - *write = pos->remote_end; - *have_pending_txes = true; - return; - } } *have_pending_txes = !dlist_is_empty(&lsn_mapping); + + cleanup_replica_identity_table(committed_pa_xid); } /* * Store current remote/local lsn pair in the tracking list. + * + * pa_remote_xid should be a valid transaction ID only when the transaction was + * assigned to a parallel apply worker; otherwise, pass InvalidTransactionId. */ void -store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) +store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn, + TransactionId pa_remote_xid) { FlushPosition *flushpos; @@ -4644,6 +4887,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) flushpos = palloc_object(FlushPosition); flushpos->local_end = local_lsn; flushpos->remote_end = remote_lsn; + flushpos->pa_remote_xid = pa_remote_xid; dlist_push_tail(&lsn_mapping, &flushpos->node); MemoryContextSwitchTo(ApplyMessageContext); @@ -6767,6 +7011,22 @@ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn) { Assert(!is_skipping_changes()); + + /* + * For streaming transactions applied in a parallel apply worker, the remote + * end LSN is unknown, so skipping is not possible. For non-streaming + * transactions, the leader determines whether to skip the transaction. If + * skipping is needed, the leader simply does not send the transaction to a + * parallel apply worker. + */ + if (am_parallel_apply_worker()) + return; + + /* + * These assertions apply only to leader apply and table sync workers. + * Parallel workers may apply spooled BEGIN or STREAM_START messages, which + * can set these flags to true. + */ Assert(!in_remote_transaction); Assert(!in_streamed_transaction); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 9c5530804c8..6fe927d6da9 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -263,6 +263,8 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type); +extern void logicalrep_write_internal_rel(StringInfo out, + LogicalRepRelation *rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 77b34998e63..408f0b529ba 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -56,6 +56,8 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap); extern Oid GetRelationIdentityOrPK(Relation rel); +extern int logicalrep_get_num_rels(void); +extern void logicalrep_write_all_rels(StringInfo out); extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid); #endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9756b8461c5..92ae2760427 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -332,7 +332,8 @@ extern void SetupApplyOrSyncWorker(int worker_slot); extern void DisableSubscriptionAndExit(void); -extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); +extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn, + TransactionId pa_remote_xid); /* Function for apply error callback */ extern void apply_error_callback(void *arg); @@ -349,6 +350,7 @@ extern void apply_handle_internal_message(StringInfo s); extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data); +extern void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel); extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked); @@ -374,8 +376,9 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); extern bool pa_transaction_committed(TransactionId xid); - +extern void pa_commit_transaction(void); extern void pa_wait_for_depended_transaction(TransactionId xid); +extern void pa_add_parallelized_transaction(TransactionId xid); #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..48ae698e786 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/050_parallel_apply.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 7d41715ed81..c863b430bec 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -16,6 +16,8 @@ $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + "max_logical_replication_workers = 10"); $node_subscriber->start; # Create some preexisting content on publisher diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index 945505d0239..e15a6bb2a03 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -17,7 +17,7 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; $node_subscriber->append_conf('postgresql.conf', - qq(max_logical_replication_workers = 6)); + qq(max_logical_replication_workers = 7)); $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index ac96bc3f009..705bc83f5ea 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -232,6 +232,12 @@ $node_subscriber->wait_for_log( $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); +# FIXME: Currently, non-streaming transactions are applied in parallel by +# default. So, the first transaction is handled by a parallel apply worker. To +# trigger the deadlock, initiate an more transaction to be applied by the +# leader. +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); + $h->query_safe('COMMIT'); $h->quit; @@ -247,7 +253,7 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); -is($result, qq(5001), 'data replicated to subscriber after dropping index'); +is($result, qq(5002), 'data replicated to subscriber after dropping index'); # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index 5d457060a02..911005bde20 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -16,6 +16,7 @@ $node_publisher->start; # Create subscriber node. my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); $node_subscriber->start; diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl index 322f5b4cc6a..fdfc44ac729 100644 --- a/src/test/subscription/t/027_nosuperuser.pl +++ b/src/test/subscription/t/027_nosuperuser.pl @@ -86,6 +86,7 @@ $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_publisher->init(allows_streaming => 'logical'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); $node_publisher->start; $node_subscriber->start; $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl new file mode 100644 index 00000000000..37a255a8040 --- /dev/null +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -0,0 +1,150 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# This tests that dependency tracking between transactions can work well + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Insert initial data +$node_publisher->safe_psql('postgres', + "CREATE TABLE regress_tab (id int PRIMARY KEY, value text);"); +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(1, 10), 'test');"); + +# Create a publication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_pub FOR ALL TABLES;"); + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->append_conf('postgresql.conf', + "max_logical_replication_workers = 10"); +$node_subscriber->start; + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Create a subscription +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE regress_tab (id int PRIMARY KEY, value text);"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub'); + +# Insert tuples on publisher +# +# XXX This may not enough to launch a parallel apply worker, because +# table_states_not_ready is not discarded yet. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(11, 20), 'test');"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Insert tuples again +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(21, 30), 'test');"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify the parallel apply worker is launched +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_activity WHERE backend_type = 'logical replication parallel worker'"); +is($result, '1', "parallel apply worker is launched by a non-streamed transaction"); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert tuples on publisher +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(31, 40), 'test');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +my $offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction is independent from the +# previous one, but the parallel worker would wait till it finishes +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(41, 50), 'test');"); + +# Verify the parallel worker waits for the transaction +my $str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +my $xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Update tuples which have not been applied yet on subscriber because the +# parallel worker stops at the injection point. Newly assigned worker also +# waits for the same transactions as above. +$node_publisher->safe_psql('postgres', + "UPDATE regress_tab SET value = 'updated' WHERE id BETWEEN 31 AND 35;"); + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +# Wakeup the parallel worker. We detach first no to stop other parallel workers +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the parallel worker wakes up +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); +is ($result, 50, 'inserts are replicated to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM regress_tab WHERE value = 'updated'"); +is ($result, 5, 'updates are also replicated to subscriber'); + +# Force the leader apply worker to serialize changes to disk +$node_subscriber->append_conf('postgresql.conf', + "debug_logical_replication_streaming = immediate"); +$node_subscriber->reload; + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(51, 60), 'test');"); + +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify that changes have been serialized +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +# Verify that the parallel apply worker can restore serialized changes correctly +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); +is ($result, 60, 'inserts are replicated to subscriber'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 56c392c3744..9545571fe68 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2149,6 +2149,7 @@ ParallelHashGrowth ParallelHashJoinBatch ParallelHashJoinBatchAccessor ParallelHashJoinState +ParallelizedTxnEntry ParallelIndexScanDesc ParallelizedTxnEntry ParallelSlot @@ -4232,6 +4233,7 @@ rendezvousHashEntry rep replace_rte_variables_callback replace_rte_variables_context +replica_identity_hash report_error_fn ret_type rewind_source -- 2.43.7