From c058efd2e549cbb8377f144bb48ff1dbe9ae3bfb Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Tue, 21 Apr 2026 16:26:33 +0800 Subject: [PATCH v15 08/10] Support dependency tracking via local unique indexes Currently, logical replication's parallel apply mechanism tracks dependencies primarily based on the REPLICA IDENTITY defined on the publisher table. However, local subscriber tables might have additional unique indexes that could effectively serve as dependency keys, even if they don't correspond to the publisher's REPLICA IDENTITY. Failing to track these additional unique keys can lead to incorrect data and/or deadlocks during parallel application. This patch extends the parallel apply's dependency tracking to consider local unique indexes on the subscriber table. This is achieved by extending the existing Replica Identity hash table to also store dependency information based on these local unique indexes. The LogicalRepRelMapEntry structure is extended to store details about these local unique indexes. This information is collected and cached when dependency checking is first performed for a remote transaction on a given relation. This collection process requires to be in a transaction to access system catalog information. --- src/backend/replication/logical/relation.c | 161 +++++++++++- src/backend/replication/logical/worker.c | 246 ++++++++++++++---- src/backend/storage/lmgr/deadlock.c | 1 - src/include/replication/logicalrelation.h | 14 + src/test/subscription/t/050_parallel_apply.pl | 57 ++++ src/tools/pgindent/typedefs.list | 2 + 6 files changed, 434 insertions(+), 47 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index eeb85f8cc5d..6665f5c0cc9 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -128,6 +128,21 @@ logicalrep_relmap_init(void) (Datum) 0); } +/* + * Release local index list + */ +static void +free_local_unique_indexes(LogicalRepRelMapEntry *entry) +{ + Assert(am_leader_apply_worker()); + + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, entry->local_unique_indexes) + bms_free(idxinfo->indexkeys); + + list_free_deep(entry->local_unique_indexes); + entry->local_unique_indexes = NIL; +} + /* * Free the entry of a relation map cache. */ @@ -155,6 +170,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (entry->attrmap) free_attrmap(entry->attrmap); + + if (entry->local_unique_indexes != NIL) + free_local_unique_indexes(entry); } /* @@ -361,6 +379,126 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } } +/* + * Collect all local unique indexes that can be used for dependency tracking. + */ +static void +collect_local_indexes(LogicalRepRelMapEntry *entry) +{ + List *idxlist; + + if (entry->local_unique_indexes != NIL) + free_local_unique_indexes(entry); + + entry->local_unique_indexes_collected = true; + + idxlist = RelationGetIndexList(entry->localrel); + + /* Quick exit if there are no indexes */ + if (idxlist == NIL) + return; + + /* Iterate indexes to list all usable indexes */ + foreach_oid(idxoid, idxlist) + { + Relation idxrel; + int indnkeys; + AttrMap *attrmap; + Bitmapset *indexkeys = NULL; + bool suitable = true; + + idxrel = index_open(idxoid, AccessShareLock); + + /* + * Check whether the index can be used for the dependency tracking. + * + * For simplification, the same condition as REPLICA IDENTITY FULL, + * plus it must be a unique index. + */ + if (!(idxrel->rd_index->indisunique && + IsIndexUsableForReplicaIdentityFull(idxrel, entry->attrmap))) + { + index_close(idxrel, AccessShareLock); + continue; + } + + indnkeys = idxrel->rd_index->indnkeyatts; + attrmap = entry->attrmap; + + Assert(indnkeys); + + /* Seek each attributes and add to a Bitmap */ + for (int i = 0; i < indnkeys; i++) + { + AttrNumber localcol = idxrel->rd_index->indkey.values[i]; + AttrNumber remotecol; + + /* + * XXX: Mark a relation as parallel-unsafe if it has expression + * indexes because we cannot compute the hash value for the + * dependency tracking. For safety, transactions that modify such + * tables can wait for applications till the lastly dispatched + * transaction is committed. + */ + if (!AttributeNumberIsValid(localcol)) + { + entry->parallel_safe = LOGICALREP_PARALLEL_RESTRICTED; + suitable = false; + break; + } + + remotecol = attrmap->attnums[AttrNumberGetAttrOffset(localcol)]; + + /* + * Skip if the column does not exist on publisher node. In this + * case the replicated tuples always have NULL or default value. + */ + if (remotecol < 0) + { + suitable = false; + break; + } + + /* Checks are passed, remember the attribute */ + indexkeys = bms_add_member(indexkeys, remotecol); + } + + index_close(idxrel, AccessShareLock); + + /* + * Skip using the index if it is not suitable. This can happen if 1) + * one of the columns does not exist on the publisher side, or 2) + * there is an expression column. + */ + if (!suitable) + { + if (indexkeys) + bms_free(indexkeys); + + continue; + } + + /* This index is usable, store on memory */ + if (indexkeys) + { + MemoryContext oldctx; + LogicalRepSubscriberIdx *idxinfo; + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + idxinfo = palloc(sizeof(LogicalRepSubscriberIdx)); + idxinfo->indexoid = idxoid; + idxinfo->indexkeys = bms_copy(indexkeys); + entry->local_unique_indexes = + lappend(entry->local_unique_indexes, idxinfo); + + pfree(indexkeys); + MemoryContextSwitchTo(oldctx); + } + } + + list_free(idxlist); +} + /* * Check all local triggers for the relation to see the parallelizability. * @@ -370,7 +508,16 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) static void check_defined_triggers(LogicalRepRelMapEntry *entry) { - TriggerDesc *trigdesc = entry->localrel->trigdesc; + TriggerDesc *trigdesc; + + /* + * Skip if the parallelizability has already been checked. Possilble if + * the relation has expression indexes. + */ + if (entry->parallel_safe != LOGICALREP_PARALLEL_UNKNOWN) + return; + + trigdesc = entry->localrel->trigdesc; /* Quick exit if triffer is not defined */ if (trigdesc == NULL) @@ -411,7 +558,7 @@ check_defined_triggers(LogicalRepRelMapEntry *entry) * If the key is given, the corresponding entry is first searched in the hash * table and processed as in the above case. At the end, logical replication is * closed. - */ + */ void logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, LOCKMODE lockmode) @@ -565,7 +712,11 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, * tracking. */ if (am_leader_apply_worker()) + { + entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; + collect_local_indexes(entry); check_defined_triggers(entry); + } entry->localrelvalid = true; } @@ -867,6 +1018,12 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel, entry->attrmap); + /* + * TODO: Parallel apply does not support the parallel apply for now. Just + * mark local indexes are collected. + */ + entry->local_unique_indexes_collected = true; + entry->localrelvalid = true; return entry; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8d706e23838..ca6a83d2866 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -578,10 +578,20 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; +/* + * Type of key used for dependency tracking. + */ +typedef enum LogicalRepKeyKind +{ + LOGICALREP_KEY_REPLICA_IDENTITY, + LOGICALREP_KEY_LOCAL_UNIQUE +} LogicalRepKeyKind; + /* Hash table key for replica_identity_table */ typedef struct ReplicaIdentityKey { Oid relid; + LogicalRepKeyKind kind; LogicalRepTupleData *data; } ReplicaIdentityKey; @@ -757,7 +767,8 @@ static bool hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) { if (a->relid != b->relid || - a->data->ncols != b->data->ncols) + a->data->ncols != b->data->ncols || + a->kind != b->kind) return false; for (int i = 0; i < a->data->ncols; i++) @@ -765,6 +776,9 @@ hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) if (a->data->colstatus[i] != b->data->colstatus[i]) return false; + if (a->data->colstatus[i] == LOGICALREP_COLUMN_NULL) + continue; + if (a->data->colvalues[i].len != b->data->colvalues[i].len) return false; @@ -897,6 +911,80 @@ check_and_append_xid_dependency(List *depends_on_xids, return lappend_xid(depends_on_xids, *depends_on_xid); } +/* + * Common function for checking dependency by using the key. Used by both + * check_dependency_on_replica_identity and check_dependency_on_local_key. + */ +static void +check_dependency_by_key(ReplicaIdentityKey *key, TransactionId new_depended_xid, + List **depends_on_xids) +{ + ReplicaIdentityEntry *rientry; + bool found = false; + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + if (TransactionIdIsValid(new_depended_xid)) + { + rientry = replica_identity_insert(replica_identity_table, key, + &found); + + /* + * Release the key built to search the entry, if the entry already + * exists. Otherwise, initialize the remote_xid. + */ + if (found) + { + elog(DEBUG1, + key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? + "found conflicting replica identity change from %u" : + "found conflicting local unique change from %u", + rientry->remote_xid); + + free_replica_identity_key(key); + } + else + rientry->remote_xid = InvalidTransactionId; + } + else + { + rientry = replica_identity_lookup(replica_identity_table, key); + free_replica_identity_key(key); + } + + MemoryContextSwitchTo(oldctx); + + /* Return if no entry found */ + if (!rientry) + return; + + Assert(!found || TransactionIdIsValid(rientry->remote_xid)); + + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &rientry->remote_xid, + new_depended_xid); + + /* + * Update the new depended xid into the entry if valid, the new xid could + * be invalid if the transaction will be applied by the leader itself + * which means all the changes will be committed before processing next + * transaction, so no need to be depended on. + */ + if (TransactionIdIsValid(new_depended_xid)) + rientry->remote_xid = new_depended_xid; + + /* + * Remove the entry if the transaction has been committed and no new + * dependency needs to be added. + */ + else if (!TransactionIdIsValid(rientry->remote_xid)) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + } +} + /* * Check for dependencies on preceding transactions that modify the same key. * Returns the dependent transactions in 'depends_on_xids'. @@ -914,10 +1002,8 @@ check_dependency_on_replica_identity(Oid relid, LogicalRepRelMapEntry *relentry; LogicalRepTupleData *ridata; ReplicaIdentityKey *rikey; - ReplicaIdentityEntry *rientry; MemoryContext oldctx; int n_ri; - bool found = false; Assert(depends_on_xids); @@ -983,62 +1069,122 @@ check_dependency_on_replica_identity(Oid relid, rikey = palloc0_object(ReplicaIdentityKey); rikey->relid = relid; + rikey->kind = LOGICALREP_KEY_REPLICA_IDENTITY; rikey->data = ridata; - if (TransactionIdIsValid(new_depended_xid)) + MemoryContextSwitchTo(oldctx); + + check_dependency_by_key(rikey, new_depended_xid, depends_on_xids); +} + +/* + * Mostly same as check_dependency_on_replica_identity() but for local unique + * indexes. + */ +static void +check_dependency_on_local_key(Oid relid, + LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + LogicalRepTupleData *ridata; + ReplicaIdentityKey *rikey; + MemoryContext oldctx; + + Assert(depends_on_xids); + + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); + + /* + * Gather information for local indexes if not yet. We require to be in a + * transaction state because system catalogs are read. + */ + if (!relentry->local_unique_indexes_collected) { - rientry = replica_identity_insert(replica_identity_table, rikey, - &found); + bool needs_start = !IsTransactionOrTransactionBlock(); + + if (needs_start) + StartTransactionCommand(); + + logicalrep_rel_load(NULL, relid, AccessShareLock); /* - * Release the key built to search the entry, if the entry already - * exists. Otherwise, initialize the remote_xid. + * Close the transaction if we start here. We must not abort because + * it would release all session-level locks, such as the stream lock, + * and break the deadlock detection mechanism between LA and PA. The + * outcome is the same regardless of the end status, since the + * transaction did not modify any tuples. */ - if (found) - { - elog(DEBUG1, "found conflicting replica identity change from %u", - rientry->remote_xid); + if (needs_start) + CommitTransactionCommand(); - free_replica_identity_key(rikey); - } - else - rientry->remote_xid = InvalidTransactionId; + Assert(relentry->local_unique_indexes_collected); } - else + + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, relentry->local_unique_indexes) { - rientry = replica_identity_lookup(replica_identity_table, rikey); - free_replica_identity_key(rikey); - } + int columns = bms_num_members(idxinfo->indexkeys); + bool suitable = true; - MemoryContextSwitchTo(oldctx); + Assert(columns); - /* Return if no entry found */ - if (!rientry) - return; + for (int i = 0; i < original_data->ncols; i++) + { + if (!bms_is_member(i, idxinfo->indexkeys)) + continue; - Assert(!found || TransactionIdIsValid(rientry->remote_xid)); + /* + * Skip if the column is not changed. + * + * XXX: NULL is allowed. + */ + if (original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + { + suitable = false; + break; + } + } - *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, - &rientry->remote_xid, - new_depended_xid); + if (!suitable) + continue; - /* - * Update the new depended xid into the entry if valid, the new xid could - * be invalid if the transaction will be applied by the leader itself - * which means all the changes will be committed before processing next - * transaction, so no need to be depended on. - */ - if (TransactionIdIsValid(new_depended_xid)) - rientry->remote_xid = new_depended_xid; + oldctx = MemoryContextSwitchTo(ApplyContext); - /* - * Remove the entry if the transaction has been committed and no new - * dependency needs to be added. - */ - else if (!TransactionIdIsValid(rientry->remote_xid)) - { - free_replica_identity_key(rientry->keydata); - replica_identity_delete_item(replica_identity_table, rientry); + /* Allocate space for replica identity values */ + ridata = palloc0_object(LogicalRepTupleData); + ridata->colvalues = palloc0_array(StringInfoData, columns); + ridata->colstatus = palloc0_array(char, columns); + ridata->ncols = columns; + + for (int i_original = 0, i_key = 0; i_original < original_data->ncols; i_original++) + { + if (!bms_is_member(i_original, idxinfo->indexkeys)) + continue; + + if (original_data->colstatus[i_original] != LOGICALREP_COLUMN_NULL) + { + StringInfo original_colvalue = &original_data->colvalues[i_original]; + + initStringInfoExt(&ridata->colvalues[i_key], original_colvalue->len + 1); + appendStringInfoString(&ridata->colvalues[i_key], original_colvalue->data); + } + + ridata->colstatus[i_key] = original_data->colstatus[i_original]; + i_key++; + } + + rikey = palloc0_object(ReplicaIdentityKey); + rikey->relid = relid; + rikey->kind = LOGICALREP_KEY_LOCAL_UNIQUE; + rikey->data = ridata; + + MemoryContextSwitchTo(oldctx); + + check_dependency_by_key(rikey, new_depended_xid, depends_on_xids); } } @@ -1234,6 +1380,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1247,6 +1396,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &oldtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); } @@ -1254,6 +1406,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1263,6 +1418,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &oldtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index b8962d875b6..31bbfcf1971 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -33,7 +33,6 @@ #include "storage/procnumber.h" #include "utils/memutils.h" - /* * One edge in the waits-for graph. * diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 3e973b8ee0e..67ba704fe9a 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -16,6 +16,12 @@ #include "catalog/index.h" #include "replication/logicalproto.h" +typedef struct LogicalRepSubscriberIdx +{ + Oid indexoid; /* OID of the local key */ + Bitmapset *indexkeys; /* Bitmap of key columns *on remote* */ +} LogicalRepSubscriberIdx; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -46,6 +52,10 @@ typedef struct LogicalRepRelMapEntry */ TransactionId last_depended_xid; + /* Local unique indexes. Used for dependency tracking */ + List *local_unique_indexes; + bool local_unique_indexes_collected; + /* * Whether the relation can be applied in parallel or not. It is * distinglish whether defined triggers are the immutable or not. @@ -57,6 +67,10 @@ typedef struct LogicalRepRelMapEntry * Note that we do not check the user-defined constraints here. PostgreSQL * has already assumed that CHECK constraints' conditions are immutable and * here follows the rule. + * + * XXX: Additonally, this can be false if the relation has expression + * indexes. Because we cannot compute the hash value for the dependency + * tracking. */ char parallel_safe; } LogicalRepRelMapEntry; diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 92b4e5d6c70..3819d1a493c 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -234,4 +234,61 @@ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset $h->query_safe("COMMIT;"); +# Ensure subscriber-local indexes are also used for the dependency tracking + +# Truncate the data for upcoming tests +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Define an unique index on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE INDEX ON regress_tab (value);"); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert a tuple on publisher. Parallel worker would wait at the injection +# point +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (1, 'would conflict');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction is would wait because all +# parallel workers wait till the previously launched worker commits. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (2, 'would not conflict');"); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Insert a conflicting tuple on publisher. Leader worker would detect the conflict +# and wait for the transaction to commit. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (3, 'would conflict');"); + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +# Cleanup +$node_subscriber->safe_psql('postgres', "DROP INDEX regress_tab_value_idx;"); +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ab3c120180b..8aec95dcdba 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1690,6 +1690,7 @@ LogicalRepBeginData LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct +LogicalRepKeyKind LogicalRepMsgType LogicalRepPartMapEntry LogicalRepPreparedTxnData @@ -1699,6 +1700,7 @@ LogicalRepRelation LogicalRepRollbackPreparedTxnData LogicalRepSequenceInfo LogicalRepStreamAbortData +LogicalRepSubscriberIdx LogicalRepTupleData LogicalRepTyp LogicalRepWorker -- 2.53.0.windows.2