From 0181ade19f928a2812b263412e366e394d09c8db Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 16 Apr 2026 15:24:08 +0800 Subject: [PATCH v14 3/9] Introduce a local hash table to store replica identities This local hash table on the leader is used for detecting dependencies between transactions. The hash contains the Replica Identity (RI) as a key and the remote XID that modified the corresponding tuple. The hash entries are inserted when the leader finds an RI from a replication message. Entries are deleted when transactions committed by parallel workers are gathered, or the number of entries exceeds the limit. When the leader sends replication changes to parallel workers, it checks whether other transactions have already used the RI associated with the change. If something is found, the leader treats it as a dependent transaction and notifies parallel workers to wait until it finishes via WORKER_INTERNAL_MSG_DEPENDENCY. Author: Zhijie Hou Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 108 ++- src/backend/replication/logical/relation.c | 24 + src/backend/replication/logical/worker.c | 665 +++++++++++++++++- src/include/replication/logicalrelation.h | 7 + src/include/replication/worker_internal.h | 7 +- src/tools/pgindent/typedefs.list | 3 + 6 files changed, 811 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 2b34e31f86c..3f91d79dbec 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -218,7 +218,24 @@ typedef struct ParallelApplyWorkerEntry { TransactionId xid; /* Hash key -- must be first */ + + /* + * The parallel apply worker assigned for applying the transaction. It is + * NULL if the assigned worker has been reused for another transaction. + */ ParallelApplyWorkerInfo *winfo; + + /* + * The local end LSN of this transaction applied by the parallel apply + * worker. + * + * The leader will initialize this value by reading the last_commit_end from + * the parallel apply worker shared memory before reusing the worker for + * another transaction (see pa_get_last_commit_end). An invalid value + * indicates the worker has not finished applying the transaction or there + * is no change applied by the worker for this transaction. + */ + XLogRecPtr local_end; } ParallelApplyWorkerEntry; /* An entry in the parallelized_txns shared hash table */ @@ -507,7 +524,7 @@ pa_launch_parallel_worker(void) * streaming changes. */ void -pa_allocate_worker(TransactionId xid) +pa_allocate_worker(TransactionId xid, bool stream_txn) { bool found; ParallelApplyWorkerInfo *winfo = NULL; @@ -548,7 +565,9 @@ pa_allocate_worker(TransactionId xid) winfo->in_use = true; winfo->serialize_changes = false; + winfo->stream_txn = stream_txn; entry->winfo = winfo; + entry->local_end = InvalidXLogRecPtr; } /* @@ -745,6 +764,73 @@ pa_process_spooled_messages_if_required(void) return true; } +/* + * Get the local end LSN for a transaction applied by a parallel apply worker. + * + * Set delete_entry to true if you intend to remove the transaction from the + * ParallelApplyTxnHash after collecting its LSN. + * + * If the parallel apply worker did not write any changes during the transaction + * application due to situations like update/delete_missing or a before trigger, + * the *skipped_write will be set to true. + */ +XLogRecPtr +pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write) +{ + bool found; + ParallelApplyWorkerEntry *entry; + ParallelApplyWorkerInfo *winfo; + + Assert(TransactionIdIsValid(xid)); + + if (skipped_write) + *skipped_write = false; + + /* Find an entry for the requested transaction. */ + entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found); + + if (!found) + return InvalidXLogRecPtr; + + /* + * If worker info is NULL, it indicates that the worker has been reused + * for handling other transactions. Consequently, the local end LSN has + * already been collected and saved in entry->local_end. + */ + winfo = entry->winfo; + if (winfo == NULL) + { + if (delete_entry && + !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) + elog(ERROR, "hash table corrupted"); + + if (skipped_write) + *skipped_write = XLogRecPtrIsInvalid(entry->local_end); + + return entry->local_end; + } + + /* Return InvalidXLogRecPtr if the transaction is still in progress */ + if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED) + return InvalidXLogRecPtr; + + /* Collect the local end LSN from the worker's shared memory area */ + entry->local_end = winfo->shared->last_commit_end; + entry->winfo = NULL; + + if (skipped_write) + *skipped_write = XLogRecPtrIsInvalid(entry->local_end); + + elog(DEBUG1, "store local commit %X/%X end to txn entry: %u", + LSN_FORMAT_ARGS(entry->local_end), xid); + + if (delete_entry && + !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL)) + elog(ERROR, "hash table corrupted"); + + return entry->local_end; +} + /* * Interrupt handler for main loop of parallel apply worker. */ @@ -1766,6 +1852,26 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) pa_free_worker(winfo); } +bool +pa_transaction_committed(TransactionId xid) +{ + bool found; + ParallelApplyWorkerEntry *entry; + + Assert(TransactionIdIsValid(xid)); + + /* Find an entry for the requested transaction */ + entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found); + + if (!found) + return true; + + if (!entry->winfo) + return true; + + return pa_get_xact_state(entry->winfo->shared) == PARALLEL_TRANS_FINISHED; +} + /* * Attach to the shared hash table for parallelized transactions. */ diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 0b1d80b5b0f..ab6313eb1bc 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -960,3 +960,27 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, return InvalidOid; } + +/* + * Get the LogicalRepRelMapEntry corresponding to the given relid without + * opening the local relation. + */ +LogicalRepRelMapEntry * +logicalrep_get_relentry(LogicalRepRelId remoteid) +{ + LogicalRepRelMapEntry *entry; + bool found; + + if (LogicalRepRelMap == NULL) + logicalrep_relmap_init(); + + /* Search for existing entry. */ + entry = hash_search(LogicalRepRelMap, (void *) &remoteid, + HASH_FIND, &found); + + if (!found) + elog(DEBUG1, "no relation map entry for remote relation ID %u", + remoteid); + + return entry; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ba09822820d..38e9b8ddf72 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -303,11 +303,35 @@ #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ +/* + * Struct for tracking the progress of flushing the transaction received from + * the publisher. + * + * A list of these structs (lsn_mapping) is maintained in the apply worker, each + * representing a transaction that is being flushed. The entries are removed + * from the list when the transaction is fully flushed (see get_flush_position). + */ typedef struct FlushPosition { dlist_node node; + + /* + * The end of commit record applied by the worker, and the corresponding end + * of commit record on the publisher. + * + * For transactions assigned to a parallel apply worker, the local_end is + * not immediately available. The leader will update the local_end when it + * confirms that the parallel apply worker has finished applying the + * transaction (see the usage of pa_get_last_commit_end). + */ XLogRecPtr local_end; XLogRecPtr remote_end; + + /* + * The remote transaction ID. This should be set to a valid ID only when the + * transaction is assigned to a parallel apply worker. + */ + TransactionId pa_remote_xid; } FlushPosition; static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); @@ -476,6 +500,8 @@ ErrorContextCallback *apply_error_context_stack = NULL; MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; +static MemoryContext ParallelApplyContext = NULL; + /* per stream context for streaming transactions */ static MemoryContext LogicalStreamingContext = NULL; @@ -549,6 +575,61 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; +/* Hash table key for replica_identity_table */ +typedef struct ReplicaIdentityKey +{ + Oid relid; + LogicalRepTupleData *data; +} ReplicaIdentityKey; + +/* Hash table entry in replica_identity_table */ +typedef struct ReplicaIdentityEntry +{ + ReplicaIdentityKey *keydata; + TransactionId remote_xid; + + /* needed for simplehash */ + uint32 hash; + char status; +} ReplicaIdentityEntry; + +#include "common/hashfn.h" + +static uint32 hash_replica_identity(ReplicaIdentityKey *key); +static bool hash_replica_identity_compare(ReplicaIdentityKey *a, + ReplicaIdentityKey *b); + +/* Define parameters for replica identity hash table code generation. */ +#define SH_PREFIX replica_identity +#define SH_ELEMENT_TYPE ReplicaIdentityEntry +#define SH_KEY_TYPE ReplicaIdentityKey * +#define SH_KEY keydata +#define SH_HASH_KEY(tb, key) hash_replica_identity(key) +#define SH_EQUAL(tb, a, b) hash_replica_identity_compare(a, b) +#define SH_STORE_HASH +#define SH_GET_HASH(tb, a) (a)->hash +#define SH_SCOPE static inline +#define SH_DECLARE +#define SH_DEFINE +#include "lib/simplehash.h" + +#define REPLICA_IDENTITY_INITIAL_SIZE 128 +#define REPLICA_IDENTITY_CLEANUP_THRESHOLD 1024 + +/* + * Hash table storing replica identity values for changes being applied in + * parallel, along with the last transaction that modified each row. + * + * Entries are removed in the following cases: + * + * 1) When handling a subsequent change that modifies the same row, and the + * stored transaction has already committed. + * 2) When collecting flush progress of remote transactions, and the stored + * transaction is found to have committed. + * 3) When the number of entries exceeds REPLICA_IDENTITY_CLEANUP_THRESHOLD. + */ +static replica_identity_hash *replica_identity_table = NULL; + static inline void subxact_filename(char *path, Oid subid, TransactionId xid); static inline void changes_filename(char *path, Oid subid, TransactionId xid); @@ -636,6 +717,550 @@ static void set_wal_receiver_timeout(void); static void on_exit_clear_xact_state(int code, Datum arg); +static bool send_internal_dependencies(ParallelApplyWorkerInfo *winfo, + List *depends_on_xids); + +/* + * Compute the hash value for entries in the replica_identity_table. + */ +static uint32 +hash_replica_identity(ReplicaIdentityKey *key) +{ + int i; + uint32 hashkey = 0; + + hashkey = hash_combine(hashkey, hash_uint32(key->relid)); + + for (i = 0; i < key->data->ncols; i++) + { + uint32 hkey; + + if (key->data->colstatus[i] == LOGICALREP_COLUMN_NULL) + continue; + + hkey = hash_any((const unsigned char *) key->data->colvalues[i].data, + key->data->colvalues[i].len); + hashkey = hash_combine(hashkey, hkey); + } + + return hashkey; +} + +/* + * Compare two entries in the replica_identity_table. + */ +static bool +hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) +{ + if (a->relid != b->relid || + a->data->ncols != b->data->ncols) + return false; + + for (int i = 0; i < a->data->ncols; i++) + { + if (a->data->colstatus[i] != b->data->colstatus[i]) + return false; + + if (a->data->colvalues[i].len != b->data->colvalues[i].len) + return false; + + if (strcmp(a->data->colvalues[i].data, b->data->colvalues[i].data)) + return false; + + elog(DEBUG1, "conflicting key %s", a->data->colvalues[i].data); + } + + return true; +} + +/* + * Free resources associated with a replica identity key. + */ +static void +free_replica_identity_key(ReplicaIdentityKey *key) +{ + Assert(key); + + pfree(key->data->colvalues); + pfree(key->data->colstatus); + pfree(key->data); + pfree(key); +} + +/* + * Clean up hash table entries associated with the given transaction IDs. + */ +static void +cleanup_replica_identity_table(List *committed_xid) +{ + replica_identity_iterator i; + ReplicaIdentityEntry *rientry; + + if (!committed_xid) + return; + + replica_identity_start_iterate(replica_identity_table, &i); + while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL) + { + if (!list_member_xid(committed_xid, rientry->remote_xid)) + continue; + + /* Clean up the hash entry for committed transaction */ + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + } +} + +/* + * Check committed transactions and clean up corresponding entries in the hash + * table. + * + * Also update the local_end for transactions assigned to parallel apply workers + * if not already set. + */ +static void +cleanup_committed_replica_identity_entries(void) +{ + dlist_mutable_iter iter; + List *committed_xids = NIL; + + dlist_foreach_modify(iter, &lsn_mapping) + { + FlushPosition *pos = + dlist_container(FlushPosition, node, iter.cur); + bool skipped_write; + + if (!TransactionIdIsValid(pos->pa_remote_xid) || + XLogRecPtrIsValid(pos->local_end)) + continue; + + pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true, + &skipped_write); + + elog(DEBUG1, + "got commit end from parallel apply worker, " + "txn: %u, remote_end %X/%X, local_end %X/%X", + pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end), + LSN_FORMAT_ARGS(pos->local_end)); + + if (!skipped_write && !XLogRecPtrIsValid(pos->local_end)) + continue; + + committed_xids = lappend_xid(committed_xids, pos->pa_remote_xid); + } + + /* cleanup the entries for committed transactions */ + cleanup_replica_identity_table(committed_xids); +} + +/* + * Append a transaction dependency, excluding duplicates and committed + * transactions. + */ +static List * +check_and_append_xid_dependency(List *depends_on_xids, + TransactionId *depends_on_xid, + TransactionId current_xid) +{ + Assert(depends_on_xid); + + if (!TransactionIdIsValid(*depends_on_xid)) + return depends_on_xids; + + if (TransactionIdEquals(*depends_on_xid, current_xid)) + return depends_on_xids; + + if (list_member_xid(depends_on_xids, *depends_on_xid)) + return depends_on_xids; + + /* + * Return and reset the xid if the transaction has been committed. + */ + if (pa_transaction_committed(*depends_on_xid)) + { + *depends_on_xid = InvalidTransactionId; + return depends_on_xids; + } + + return lappend_xid(depends_on_xids, *depends_on_xid); +} + +/* + * Check for dependencies on preceding transactions that modify the same key. + * Returns the dependent transactions in 'depends_on_xids'. + * + * Additionally, if new_depended_xid is valid, record it as a dependency for the + * replica identity key modification, allowing subsequent transactions that + * modify the same key to be dependent on it. + */ +static void +check_dependency_on_replica_identity(Oid relid, + LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + LogicalRepTupleData *ridata; + ReplicaIdentityKey *rikey; + ReplicaIdentityEntry *rientry; + MemoryContext oldctx; + int n_ri; + bool found = false; + + Assert(depends_on_xids); + + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); + + /* + * First search whether any previous transaction has affected the whole + * table e.g., truncate or schema change from publisher. + */ + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &relentry->last_depended_xid, + new_depended_xid); + + n_ri = bms_num_members(relentry->remoterel.attkeys); + + /* + * Return if there are no replica identity columns, indicating that the + * remote relation has neither a replica identity key nor is marked as + * replica identity full. + */ + if (!n_ri) + return; + + /* Check if the RI key value of the tuple is invalid */ + for (int i = 0; i < original_data->ncols; i++) + { + if (!bms_is_member(i, relentry->remoterel.attkeys)) + continue; + + /* + * Return if RI key is NULL or is explicitly marked unchanged. The key + * value could be NULL in the new tuple of a update opertaion which + * means the RI key is not updated. + */ + if (original_data->colstatus[i] == LOGICALREP_COLUMN_NULL || + original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + return; + } + + oldctx = MemoryContextSwitchTo(ApplyContext); + + /* Allocate space for replica identity values */ + ridata = palloc0_object(LogicalRepTupleData); + ridata->colvalues = palloc0_array(StringInfoData, n_ri); + ridata->colstatus = palloc0_array(char, n_ri); + ridata->ncols = n_ri; + + for (int i_original = 0, i_ri = 0; i_original < original_data->ncols; i_original++) + { + StringInfo original_colvalue = &original_data->colvalues[i_original]; + + if (!bms_is_member(i_original, relentry->remoterel.attkeys)) + continue; + + initStringInfoExt(&ridata->colvalues[i_ri], original_colvalue->len + 1); + appendStringInfoString(&ridata->colvalues[i_ri], original_colvalue->data); + ridata->colstatus[i_ri] = original_data->colstatus[i_original]; + i_ri++; + } + + rikey = palloc0_object(ReplicaIdentityKey); + rikey->relid = relid; + rikey->data = ridata; + + if (TransactionIdIsValid(new_depended_xid)) + { + rientry = replica_identity_insert(replica_identity_table, rikey, + &found); + + /* + * Release the key built to search the entry, if the entry already + * exists. Otherwise, initialize the remote_xid. + */ + if (found) + { + elog(DEBUG1, "found conflicting replica identity change from %u", + rientry->remote_xid); + + free_replica_identity_key(rikey); + } + else + rientry->remote_xid = InvalidTransactionId; + } + else + { + rientry = replica_identity_lookup(replica_identity_table, rikey); + free_replica_identity_key(rikey); + } + + MemoryContextSwitchTo(oldctx); + + /* Return if no entry found */ + if (!rientry) + return; + + Assert(!found || TransactionIdIsValid(rientry->remote_xid)); + + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &rientry->remote_xid, + new_depended_xid); + + /* + * Update the new depended xid into the entry if valid, the new xid could + * be invalid if the transaction will be applied by the leader itself + * which means all the changes will be committed before processing next + * transaction, so no need to be depended on. + */ + if (TransactionIdIsValid(new_depended_xid)) + rientry->remote_xid = new_depended_xid; + + /* + * Remove the entry if the transaction has been committed and no new + * dependency needs to be added. + */ + else if (!TransactionIdIsValid(rientry->remote_xid)) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + } +} + +/* + * Check for preceding transactions that involve insert, delete, or update + * operations on the specified table, and return them in 'depends_on_xids'. + */ +static void +find_all_dependencies_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid, + List **depends_on_xids) +{ + replica_identity_iterator i; + ReplicaIdentityEntry *rientry; + + Assert(depends_on_xids); + + replica_identity_start_iterate(replica_identity_table, &i); + while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL) + { + Assert(TransactionIdIsValid(rientry->remote_xid)); + + if (rientry->keydata->relid != relid) + continue; + + /* Clean up the hash entry for committed transaction while on it */ + if (pa_transaction_committed(rientry->remote_xid)) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + + continue; + } + + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &rientry->remote_xid, + new_depended_xid); + } +} + +/* + * Check for any preceding transactions that affect the given table and returns + * them in 'depends_on_xids'. + * + * Additionally, if new_depended_xid is valid, record it as a table-level + * dependency, allowing subsequent transactions that modify the same table to be + * dependent on it. + */ +static void +check_dependency_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + + Assert(depends_on_xids); + + find_all_dependencies_on_rel(relid, new_depended_xid, depends_on_xids); + + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); + + /* + * The relentry has not been initialized yet, indicating no change has + * been applide yet. + */ + if (!relentry) + return; + + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &relentry->last_depended_xid, + new_depended_xid); + + if (TransactionIdIsValid(new_depended_xid)) + relentry->last_depended_xid = new_depended_xid; +} + +/* + * Check dependencies related to the current change by determining if the + * modification impacts the same row or table as another ongoing transaction. If + * needed, instruct parallel apply workers to wait for these preceding + * transactions to complete. + * + * Simultaneously, if the current change will be dispatched to a parallel apply + * worker (indicated by a valid new_depended_xid and a non-NULL winfo), track + * the dependency for the current change to ensure that subsequent transactions + * address this dependency. + */ +static void +handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, + TransactionId new_depended_xid, + ParallelApplyWorkerInfo *winfo) +{ + LogicalRepRelId relid; + LogicalRepTupleData oldtup; + LogicalRepTupleData newtup; + LogicalRepRelation *rel; + List *depends_on_xids = NIL; + List *remote_relids; + bool has_oldtup = false; + bool cascade = false; + bool restart_seqs = false; + + /* + * Parse the consume data using a local copy instead of directly consuming + * the given remote change as the caller may also read the data from the + * remote message. + */ + StringInfoData change = *s; + + /* Compute dependency only for non-streaming transaction */ + if (in_streamed_transaction || (winfo && winfo->stream_txn)) + return; + + /* Only the leader checks dependencies and schedules the parallel apply */ + if (!am_leader_apply_worker()) + return; + + if (!ParallelApplyContext) + { + Assert(!replica_identity_table); + + /* + * Create a permanent memory context for dependency information that + * persists across transactions. Using a dedicated context makes memory + * consumption more visible and easier to track, especially when + * handling a large number of change entries for transactions being + * applied in parallel. + */ + ParallelApplyContext = AllocSetContextCreate(ApplyContext, + "ParallelApplyContext", + ALLOCSET_DEFAULT_SIZES); + + replica_identity_table = replica_identity_create(ParallelApplyContext, + REPLICA_IDENTITY_INITIAL_SIZE, + NULL); + } + + if (replica_identity_table->members >= REPLICA_IDENTITY_CLEANUP_THRESHOLD) + cleanup_committed_replica_identity_entries(); + + switch (action) + { + case LOGICAL_REP_MSG_INSERT: + relid = logicalrep_read_insert(&change, &newtup); + check_dependency_on_replica_identity(relid, &newtup, + new_depended_xid, + &depends_on_xids); + break; + + case LOGICAL_REP_MSG_UPDATE: + relid = logicalrep_read_update(&change, &has_oldtup, &oldtup, + &newtup); + + if (has_oldtup) + check_dependency_on_replica_identity(relid, &oldtup, + new_depended_xid, + &depends_on_xids); + + check_dependency_on_replica_identity(relid, &newtup, + new_depended_xid, + &depends_on_xids); + break; + + case LOGICAL_REP_MSG_DELETE: + relid = logicalrep_read_delete(&change, &oldtup); + check_dependency_on_replica_identity(relid, &oldtup, + new_depended_xid, + &depends_on_xids); + break; + + case LOGICAL_REP_MSG_TRUNCATE: + remote_relids = logicalrep_read_truncate(&change, &cascade, + &restart_seqs); + + /* + * Truncate affects all rows in a table, so the current + * transaction should wait for all preceding transactions that + * modified the same table. + */ + foreach_int(truncated_relid, remote_relids) + check_dependency_on_rel(truncated_relid, new_depended_xid, + &depends_on_xids); + + break; + + case LOGICAL_REP_MSG_RELATION: + rel = logicalrep_read_rel(&change); + + /* + * The replica identity key could be changed, making existing + * entries in the replica identity invalid. In this case, parallel + * apply is not allowed on this specific table until all running + * transactions that modified it have finished. + */ + check_dependency_on_rel(rel->remoteid, new_depended_xid, + &depends_on_xids); + break; + + case LOGICAL_REP_MSG_TYPE: + case LOGICAL_REP_MSG_MESSAGE: + + /* + * Type updates accompany relation updates, so dependencies have + * already been checked during relation updates. Logical messages + * do not conflict with any changes, so they can be ignored. + */ + break; + + default: + Assert(false); + break; + } + + /* Return early if the current change has no dependencies. */ + if (!depends_on_xids) + return; + + /* + * If the leader applies the transaction itself, start waiting for + * transactions that the current change depends on to finish. Otherwise, + * instruct the parallel apply worker to wait for them. + */ + if (winfo == NULL) + { + Assert(!TransactionIdIsValid(new_depended_xid)); + + foreach_xid(xid, depends_on_xids) + pa_wait_for_depended_transaction(xid); + } + else + { + (void) send_internal_dependencies(winfo, depends_on_xids); + } +} + /* * Form the origin name for the subscription. * @@ -792,7 +1417,10 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) /* not in streaming mode */ if (apply_action == TRANS_LEADER_APPLY) + { + handle_dependency_on_change(action, s, InvalidTransactionId, winfo); return false; + } Assert(TransactionIdIsValid(stream_xid)); @@ -1234,6 +1862,41 @@ apply_handle_begin(StringInfo s) pgstat_report_activity(STATE_RUNNING, NULL); } +/* + * Send an INTERNAL_DEPENDENCY message to a parallel apply worker. + * + * Returns false if we switched to the serialize mode to send the message, + * true otherwise. + */ +static bool +send_internal_dependencies(ParallelApplyWorkerInfo *winfo, List *depends_on_xids) +{ + StringInfoData dependencies; + + initStringInfo(&dependencies); + + pq_sendbyte(&dependencies, PARALLEL_APPLY_INTERNAL_MESSAGE); + pq_sendbyte(&dependencies, WORKER_INTERNAL_MSG_DEPENDENCY); + pq_sendint32(&dependencies, list_length(depends_on_xids)); + + foreach_xid(xid, depends_on_xids) + pq_sendint32(&dependencies, xid); + + if (!winfo->serialize_changes) + { + if (pa_send_data(winfo, dependencies.len, dependencies.data)) + return true; + + pa_switch_to_partial_serialize(winfo, true); + } + + /* Skip writing the first internal message flag */ + dependencies.cursor++; + stream_write_change(PARALLEL_APPLY_INTERNAL_MESSAGE, &dependencies); + + return false; +} + /* * Handle COMMIT message. * @@ -1761,7 +2424,7 @@ apply_handle_stream_start(StringInfo s) /* Try to allocate a worker for the streaming transaction. */ if (first_segment) - pa_allocate_worker(stream_xid); + pa_allocate_worker(stream_xid, true); apply_action = get_transaction_apply_action(stream_xid, &winfo); diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index efe0f9d6031..77b34998e63 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -37,6 +37,12 @@ typedef struct LogicalRepRelMapEntry /* Sync state. */ char state; XLogRecPtr statelsn; + + /* + * The last remote transaction that modified the relation's schema or + * truncated the relation. + */ + TransactionId last_depended_xid; } LogicalRepRelMapEntry; extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); @@ -50,5 +56,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap); extern Oid GetRelationIdentityOrPK(Relation rel); +extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid); #endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1e41430104e..9756b8461c5 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -239,6 +239,8 @@ typedef struct ParallelApplyWorkerInfo */ bool in_use; + bool stream_txn; + ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; @@ -337,8 +339,10 @@ extern void apply_error_callback(void *arg); extern void set_apply_error_context_origin(char *originname); /* Parallel apply worker setup and interactions */ -extern void pa_allocate_worker(TransactionId xid); +extern void pa_allocate_worker(TransactionId xid, bool stream_txn); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); +extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool delete_entry, + bool *skipped_write); extern void pa_detach_all_error_mq(void); extern void apply_handle_internal_message(StringInfo s); @@ -369,6 +373,7 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern bool pa_transaction_committed(TransactionId xid); extern void pa_wait_for_depended_transaction(TransactionId xid); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5047debff7a..56c392c3744 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2652,6 +2652,8 @@ ReplOriginXactState ReplaceVarsFromTargetList_context ReplaceVarsNoMatchOption ReplaceWrapOption +ReplicaIdentityEntry +ReplicaIdentityKey ReplicaIdentityStmt ReplicationKind ReplicationSlot @@ -2663,6 +2665,7 @@ ReplicationSlotPersistentData ReplicationState ReplicationStateCtl ReplicationStateOnDisk +replica_identity_hash ResTarget ReservoirState ReservoirStateData -- 2.43.7