From d2dec08b523e5f1e4dd1d3407bfca143056cdaa4 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 11 Dec 2025 22:21:47 +0900 Subject: [PATCH v5 8/8] Support dependency tracking via local unique indexes Currently, logical replication's parallel apply mechanism tracks dependencies primarily based on the REPLICA IDENTITY defined on the publisher table. However, local subscriber tables might have additional unique indexes that could effectively serve as dependency keys, even if they don't correspond to the publisher's REPLICA IDENTITY. Failing to track these additional unique keys can lead to incorrect data and/or deadlocks during parallel application. This patch extends the parallel apply's dependency tracking to consider local unique indexes on the subscriber table. This is achieved by extending the existing Replica Identity hash table to also store dependency information based on these local unique indexes. The LogicalRepRelMapEntry structure is extended to store details about these local unique indexes. This information is collected and cached when dependency checking is first performed for a remote transaction on a given relation. This collection process requires to be in a transaction to access system catalog information. --- src/backend/replication/logical/relation.c | 132 +++++++++ src/backend/replication/logical/worker.c | 275 ++++++++++++++---- src/backend/storage/lmgr/deadlock.c | 1 - src/include/replication/logicalrelation.h | 10 + src/test/subscription/t/050_parallel_apply.pl | 43 +++ src/tools/pgindent/typedefs.list | 2 + 6 files changed, 406 insertions(+), 57 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 9991bfe76cc..5601696d338 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -125,6 +125,21 @@ logicalrep_relmap_init(void) (Datum) 0); } +/* + * Release local index list + */ +static void +free_local_unique_indexes(LogicalRepRelMapEntry *entry) +{ + Assert(am_leader_apply_worker()); + + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, entry->local_unique_indexes) + bms_free(idxinfo->indexkeys); + + list_free(entry->local_unique_indexes); + entry->local_unique_indexes = NIL; +} + /* * Free the entry of a relation map cache. */ @@ -152,6 +167,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (entry->attrmap) free_attrmap(entry->attrmap); + + if (entry->local_unique_indexes != NIL) + free_local_unique_indexes(entry); } /* @@ -352,6 +370,107 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } } +/* + * Collect all local unique indexes that can be used for dependency tracking. + */ +static void +collect_local_indexes(LogicalRepRelMapEntry *entry) +{ + List *idxlist; + + if (entry->local_unique_indexes != NIL) + free_local_unique_indexes(entry); + + entry->local_unique_indexes_collected = true; + + idxlist = RelationGetIndexList(entry->localrel); + + /* Quick exit if there are no indexes */ + if (idxlist == NIL) + return; + + /* Iterate indexes to list all usable indexes */ + foreach_oid(idxoid, idxlist) + { + Relation idxrel; + int indnkeys; + AttrMap *attrmap; + Bitmapset *indexkeys = NULL; + bool suitable = true; + + idxrel = index_open(idxoid, AccessShareLock); + + /* + * Check whether the index can be used for the dependency tracking. + * + * For simplification, the same condition as REPLICA IDENTITY FULL, + * plus it must be a unique index. + */ + if (!(idxrel->rd_index->indisunique && + IsIndexUsableForReplicaIdentityFull(idxrel, entry->attrmap))) + { + index_close(idxrel, AccessShareLock); + continue; + } + + indnkeys = idxrel->rd_index->indnkeyatts; + attrmap = entry->attrmap; + + Assert(indnkeys); + + /* Seek each attributes and add to a Bitmap */ + for (int i = 0; i < indnkeys; i++) + { + AttrNumber localcol = idxrel->rd_index->indkey.values[i]; + AttrNumber remotecol; + + /* Skip computed column */ + if (!AttributeNumberIsValid(localcol)) + continue; + + remotecol = attrmap->attnums[AttrNumberGetAttrOffset(localcol)]; + + /* + * Skip if the column does not exist on publisher node. In this + * case the replicated tuples always have NULL or default value. + */ + if (remotecol < 0) + { + suitable = false; + break; + } + + /* Checks are passed, remember the attribute */ + indexkeys = bms_add_member(indexkeys, remotecol); + } + + index_close(idxrel, AccessShareLock); + + /* + * One of a column does not exist on publisher side, skip using index. + */ + if (!suitable) + continue; + + /* This index is usable, store on memory */ + if (indexkeys) + { + MemoryContext oldctx; + LogicalRepSubscriberIdx *idxinfo; + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + idxinfo = palloc(sizeof(LogicalRepSubscriberIdx)); + idxinfo->indexoid = idxoid; + idxinfo->indexkeys = bms_copy(indexkeys); + entry->local_unique_indexes = + lappend(entry->local_unique_indexes, idxinfo); + MemoryContextSwitchTo(oldctx); + } + } + + list_free(idxlist); +} + /* * Open the local relation associated with the remote one. * @@ -499,6 +618,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel, entry->attrmap); + /* + * Leader must also collect all local unique indexes for dependency + * tracking. + */ + if (am_leader_apply_worker()) + collect_local_indexes(entry); + entry->localrelvalid = true; } @@ -771,6 +897,12 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel, entry->attrmap); + /* + * TODO: Parallel apply does not support the parallel apply for now. + * Just mark local indexes are collected. + */ + entry->local_unique_indexes_collected = true; + entry->localrelvalid = true; return entry; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9776edd2310..87ec0fdbd0c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -548,9 +548,19 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; +/* + * Type of key used for dependency tracking. + */ +typedef enum LogicalRepKeyKind +{ + LOGICALREP_KEY_REPLICA_IDENTITY, + LOGICALREP_KEY_LOCAL_UNIQUE +} LogicalRepKeyKind; + typedef struct ReplicaIdentityKey { Oid relid; + LogicalRepKeyKind kind; LogicalRepTupleData *data; } ReplicaIdentityKey; @@ -710,7 +720,8 @@ static bool hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) { if (a->relid != b->relid || - a->data->ncols != b->data->ncols) + a->data->ncols != b->data->ncols || + a->kind != b->kind) return false; for (int i = 0; i < a->data->ncols; i++) @@ -718,6 +729,9 @@ hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) if (a->data->colstatus[i] != b->data->colstatus[i]) return false; + if (a->data->colstatus[i] == LOGICALREP_COLUMN_NULL) + continue; + if (a->data->colvalues[i].len != b->data->colvalues[i].len) return false; @@ -839,6 +853,93 @@ check_and_append_xid_dependency(List *depends_on_xids, return lappend_xid(depends_on_xids, *depends_on_xid); } +/* + * Common function for registering dependency on a key. Used by both + * check_dependency_on_replica_identity and check_dependency_on_local_key. + */ +static void +register_dependency_with_key(ReplicaIdentityKey *key, TransactionId new_depended_xid, + List **depends_on_xids) +{ + ReplicaIdentityEntry *rientry; + bool found = false; + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + if (TransactionIdIsValid(new_depended_xid)) + { + rientry = replica_identity_insert(replica_identity_table, key, + &found); + + /* + * Release the key built to search the entry, if the entry already + * exists. Otherwise, initialize the remote_xid. + */ + if (found) + { + elog(DEBUG1, + key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? + "found conflicting replica identity change from %u" : + "found conflicting local unique change from %u", + rientry->remote_xid); + + free_replica_identity_key(key); + } + else + rientry->remote_xid = InvalidTransactionId; + } + else + { + rientry = replica_identity_lookup(replica_identity_table, key); + free_replica_identity_key(key); + } + + MemoryContextSwitchTo(oldctx); + + /* Return if no entry found */ + if (!rientry) + return; + + Assert(!found || TransactionIdIsValid(rientry->remote_xid)); + + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &rientry->remote_xid, + new_depended_xid); + + /* + * Remove the entry if it is registered for the streamed transactions. We + * do not have to register an entry for them; The leader worker always + * waits until the parallel worker finishes handling streamed transactions, + * thus no need to consider the possiblity that upcoming parallel workers + * would go ahead. + */ + if (TransactionIdIsValid(stream_xid) && !found) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + } + + /* + * Update the new depended xid into the entry if valid, the new xid could + * be invalid if the transaction will be applied by the leader itself + * which means all the changes will be committed before processing next + * transaction, so no need to be depended on. + */ + else if (TransactionIdIsValid(new_depended_xid)) + rientry->remote_xid = new_depended_xid; + + /* + * Remove the entry if the transaction has been committed and no new + * dependency needs to be added. + */ + else if (!TransactionIdIsValid(rientry->remote_xid)) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + } +} + /* * Check for dependencies on preceding transactions that modify the same key. * Returns the dependent transactions in 'depends_on_xids' and records the @@ -853,10 +954,8 @@ check_dependency_on_replica_identity(Oid relid, LogicalRepRelMapEntry *relentry; LogicalRepTupleData *ridata; ReplicaIdentityKey *rikey; - ReplicaIdentityEntry *rientry; MemoryContext oldctx; int n_ri; - bool found = false; Assert(depends_on_xids); @@ -922,75 +1021,125 @@ check_dependency_on_replica_identity(Oid relid, rikey = palloc0_object(ReplicaIdentityKey); rikey->relid = relid; + rikey->kind = LOGICALREP_KEY_REPLICA_IDENTITY; rikey->data = ridata; - if (TransactionIdIsValid(new_depended_xid)) + MemoryContextSwitchTo(oldctx); + + register_dependency_with_key(rikey, new_depended_xid, + depends_on_xids); +} + +/* + * Mostly same as check_dependency_on_replica_identity() but for local unique + * indexes. + */ +static void +check_dependency_on_local_key(Oid relid, + LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + LogicalRepTupleData *ridata; + ReplicaIdentityKey *rikey; + MemoryContext oldctx; + + Assert(depends_on_xids); + + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); + + /* + * Gather information for local indexes if not yet. We require to be in a + * transaction state because system catalogs are read. + */ + if (!relentry->local_unique_indexes_collected) { - rientry = replica_identity_insert(replica_identity_table, rikey, - &found); + bool needs_start = !IsTransactionOrTransactionBlock(); + + if (needs_start) + StartTransactionCommand(); + + logicalrep_rel_close(logicalrep_rel_open(relid, AccessShareLock), + AccessShareLock); /* - * Release the key built to search the entry, if the entry already - * exists. Otherwise, initialize the remote_xid. + * Close the transaction if we start here. We must not abort because it + * would release all session-level locks, such as the stream lock, and + * break the deadlock detection mechanism between LA and PA. The + * outcome is the same regardless of the end status, since the + * transaction did not modify any tuples. */ - if (found) - { - elog(DEBUG1, "found conflicting replica identity change from %u", - rientry->remote_xid); + if (needs_start) + CommitTransactionCommand(); - free_replica_identity_key(rikey); - } - else - rientry->remote_xid = InvalidTransactionId; + Assert(relentry->local_unique_indexes_collected); } - else + + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, relentry->local_unique_indexes) { - rientry = replica_identity_lookup(replica_identity_table, rikey); - free_replica_identity_key(rikey); - } + int columns = bms_num_members(idxinfo->indexkeys); + bool suitable = true; - MemoryContextSwitchTo(oldctx); + Assert(columns); - /* Return if no entry found */ - if (!rientry) - return; + for (int i = 0; i < original_data->ncols; i++) + { + if (!bms_is_member(i, idxinfo->indexkeys)) + continue; - Assert(!found || TransactionIdIsValid(rientry->remote_xid)); + /* + * Skip if the column is not changed. + * + * XXX: NULL is allowed. + */ + if (original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + { + suitable = false; + break; + } + } - *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, - &rientry->remote_xid, - new_depended_xid); + if (!suitable) + continue; - /* - * Remove the entry if it is registered for the streamed transactions. We - * do not have to register an entry for them; The leader worker always - * waits until the parallel worker finishes handling streamed transactions, - * thus no need to consider the possiblity that upcoming parallel workers - * would go ahead. - */ - if (TransactionIdIsValid(stream_xid) && !found) - { - free_replica_identity_key(rientry->keydata); - replica_identity_delete_item(replica_identity_table, rientry); - } + oldctx = MemoryContextSwitchTo(ApplyContext); - /* - * Update the new depended xid into the entry if valid, the new xid could - * be invalid if the transaction will be applied by the leader itself - * which means all the changes will be committed before processing next - * transaction, so no need to be depended on. - */ - else if (TransactionIdIsValid(new_depended_xid)) - rientry->remote_xid = new_depended_xid; + /* Allocate space for replica identity values */ + ridata = palloc0_object(LogicalRepTupleData); + ridata->colvalues = palloc0_array(StringInfoData, columns); + ridata->colstatus = palloc0_array(char, columns); + ridata->ncols = columns; - /* - * Remove the entry if the transaction has been committed and no new - * dependency needs to be added. - */ - else if (!TransactionIdIsValid(rientry->remote_xid)) - { - free_replica_identity_key(rientry->keydata); - replica_identity_delete_item(replica_identity_table, rientry); + for (int i_original = 0, i_key = 0; i_original < original_data->ncols; i_original++) + { + if (!bms_is_member(i_original, idxinfo->indexkeys)) + continue; + + if (original_data->colstatus[i_original] != LOGICALREP_COLUMN_NULL) + { + StringInfo original_colvalue = &original_data->colvalues[i_original]; + + initStringInfoExt(&ridata->colvalues[i_key], original_colvalue->len + 1); + appendStringInfoString(&ridata->colvalues[i_key], original_colvalue->data); + } + + ridata->colstatus[i_key] = original_data->colstatus[i_original]; + i_key++; + } + + rikey = palloc0_object(ReplicaIdentityKey); + rikey->relid = relid; + rikey->kind = LOGICALREP_KEY_LOCAL_UNIQUE; + rikey->data = ridata; + + MemoryContextSwitchTo(oldctx); + + register_dependency_with_key(rikey, new_depended_xid, + depends_on_xids); } } @@ -1120,6 +1269,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &newtup, + new_depended_xid, + &depends_on_xids); break; case LOGICAL_REP_MSG_UPDATE: @@ -1127,13 +1279,21 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, &newtup); if (has_oldtup) + { check_dependency_on_replica_identity(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &oldtup, + new_depended_xid, + &depends_on_xids); + } check_dependency_on_replica_identity(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &newtup, + new_depended_xid, + &depends_on_xids); break; case LOGICAL_REP_MSG_DELETE: @@ -1141,6 +1301,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_on_local_key(relid, &oldtup, + new_depended_xid, + &depends_on_xids); break; case LOGICAL_REP_MSG_TRUNCATE: diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index c4bfaaa67ac..ca7dee52b32 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -33,7 +33,6 @@ #include "storage/procnumber.h" #include "utils/memutils.h" - /* * One edge in the waits-for graph. * diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 34a7069e9e5..32152ef3833 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -16,6 +16,12 @@ #include "catalog/index.h" #include "replication/logicalproto.h" +typedef struct LogicalRepSubscriberIdx +{ + Oid indexoid; /* OID of the local key */ + Bitmapset *indexkeys; /* Bitmap of key columns *on remote* */ +} LogicalRepSubscriberIdx; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -39,6 +45,10 @@ typedef struct LogicalRepRelMapEntry XLogRecPtr statelsn; TransactionId last_depended_xid; + + /* Local unique indexes. Used for dependency tracking */ + List *local_unique_indexes; + bool local_unique_indexes_collected; } LogicalRepRelMapEntry; extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 20e8a7b91a7..e489a4bdc1e 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -231,4 +231,47 @@ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset $h->query_safe("COMMIT;"); +# Ensure subscriber-local indexes are also used for the dependency tracking + +# Truncate the data for upcoming tests +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Define an unique index on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE INDEX ON regress_tab (value);"); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert a tuple on publisher. Parallel worker would wait at the injection +# point +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (1, 'would conflict');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction is would wait because all +# parallel workers wait till the previously launched worker commits. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (2, 'would not conflict');"); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Insert a conflicting tuple on publisher. Leader worker would detect the conflict +# and wait for the transaction to commit. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (3, 'would conflict');"); + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d9ec32de3ea..ecd4845f389 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1636,6 +1636,7 @@ LogicalRepBeginData LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct +LogicalRepKeyKind LogicalRepMsgType LogicalRepPartMapEntry LogicalRepPreparedTxnData @@ -1645,6 +1646,7 @@ LogicalRepRelation LogicalRepRollbackPreparedTxnData LogicalRepSequenceInfo LogicalRepStreamAbortData +LogicalRepSubscriberIdx LogicalRepTupleData LogicalRepTyp LogicalRepWorker -- 2.47.3