From b156f7b88993697e0f1ce3c1a5109a3b9a8e4c29 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Fri, 23 Jan 2026 15:08:15 +0900 Subject: [PATCH v14 9/9] Support dependency tracking via foreign keys Logical replication's parallel apply mechanism did not account for foreign-key dependencies. This can cause a failure because the reference column may be committed before the referenced column. This patch extends the parallel apply's dependency tracking to account for foreign keys in the subscriber table. One assumption here is that referenced columns can be registered to the identity hash table. The leader apply worker checks to determine whether values in referencing columns have already been registered or not, and regards that there is a dependency if found. --- src/backend/replication/logical/relation.c | 185 ++++++++++++++++++ src/backend/replication/logical/worker.c | 142 ++++++++++++++ src/include/replication/logicalrelation.h | 11 ++ src/test/subscription/t/050_parallel_apply.pl | 50 ++++- 4 files changed, 385 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 6665f5c0cc9..83602dcd4cf 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -21,6 +21,7 @@ #include "access/genam.h" #include "access/table.h" #include "catalog/namespace.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_proc.h" #include "catalog/pg_subscription_rel.h" #include "commands/trigger.h" @@ -143,6 +144,21 @@ free_local_unique_indexes(LogicalRepRelMapEntry *entry) entry->local_unique_indexes = NIL; } +/* + * Release foreign key list + */ +static void +free_local_fkeys(LogicalRepRelMapEntry *entry) +{ + Assert(am_leader_apply_worker()); + + foreach_ptr(LogicalRepSubscriberFK, fkinfo, entry->local_fkeys) + bms_free(fkinfo->conkeys); + + list_free_deep(entry->local_fkeys); + entry->local_fkeys = NIL; +} + /* * Free the entry of a relation map cache. */ @@ -173,6 +189,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (entry->local_unique_indexes != NIL) free_local_unique_indexes(entry); + + if (entry->local_fkeys != NIL) + free_local_fkeys(entry); } /* @@ -499,6 +518,171 @@ collect_local_indexes(LogicalRepRelMapEntry *entry) list_free(idxlist); } +/* + * Search a relmap entry by local relation OID. + */ +static LogicalRepRelMapEntry * +logicalrep_get_relentry_by_local_oid(Oid localreloid) +{ + HASH_SEQ_STATUS status; + LogicalRepRelMapEntry *entry = NULL; + + if (LogicalRepRelMap == NULL) + return NULL; + + /* + * Each entry must be checked individually. Because the key of + * LogicalRepRelMap is the "remote" relid but we only have the local one. + * + * Note: This iteration ignores relations that have not yet been + * registered in LogicalRepRelMap. It is OK because such relations have + * not been modified yet since the subscriber started receiving changes. + */ + hash_seq_init(&status, LogicalRepRelMap); + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + if (entry->localreloid == localreloid) + { + hash_seq_term(&status); + return entry; + } + } + + return NULL; +} + +/* + * Return true if the FK constraint is always deferred. + */ +static bool +foreign_key_is_always_deferred(Oid conoid) +{ + HeapTuple tup; + Form_pg_constraint con; + bool deferred; + + tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for foreign key %u", conoid); + + con = (Form_pg_constraint) GETSTRUCT(tup); + deferred = con->condeferrable && con->condeferred; + ReleaseSysCache(tup); + + return deferred; +} + +/* + * Collect all local foreign keys that can be used for dependency tracking. + */ +static void +collect_local_fkeys(LogicalRepRelMapEntry *entry) +{ + List *fkeys; + + if (entry->local_fkeys != NIL) + free_local_fkeys(entry); + + entry->local_fkeys_collected = true; + + /* + * Get the list of foreign keys for the relation. + * + * XXX: Apart from RelationGetIndexList(), the returned list is a part of + * relcache and must be copied before doing anything. See comments atop + * RelationGetFKeyList(). + */ + fkeys = copyObject(RelationGetFKeyList(entry->localrel)); + + /* Quick exit if there are no foreign keys */ + if (fkeys == NIL) + return; + + foreach_ptr(ForeignKeyCacheInfo, fk, fkeys) + { + LogicalRepRelMapEntry *refentry; + Bitmapset *fkkeys = NULL; + bool suitable = true; + + /* + * Skip NOT ENFORCED constraints because they won't be checked at any + * times. + */ + if (!fk->conenforced) + continue; + + /* + * Skip if the foreign key constraint is always deferred. The commit + * ordering is always preserved thus the constraint can be checked in + * correct order. + */ + if (foreign_key_is_always_deferred(fk->conoid)) + continue; + + /* Find the referenced relation by the local OID */ + refentry = logicalrep_get_relentry_by_local_oid(fk->confrelid); + + /* + * Skip if the referenced relation is not the target of this + * subscription. + */ + if (!refentry) + continue; + + /* Seek each attributes and add to a Bitmap */ + for (int i = 0; i < fk->nkeys; i++) + { + AttrNumber localcol = fk->conkey[i]; + int remotecol = + entry->attrmap->attnums[AttrNumberGetAttrOffset(localcol)]; + + /* Skip if the column does not exist on publisher node */ + if (remotecol < 0) + { + suitable = false; + break; + } + + /* + * XXX: What if the FK column is specified with different order? + * E.g., there is a table (a, b) and other table refers like + * REFERENCES (b, a). Will it work correctly? + */ + fkkeys = bms_add_member(fkkeys, remotecol); + } + + /* + * One of the columns does not exist on the publisher side, skip such + * a constraint. + */ + if (!suitable) + { + if (fkkeys) + bms_free(fkkeys); + + continue; + } + + if (fkkeys) + { + MemoryContext oldctx; + LogicalRepSubscriberFK *fkinfo; + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + fkinfo = palloc(sizeof(LogicalRepSubscriberFK)); + fkinfo->conoid = fk->conoid; + fkinfo->ref_remoteid = refentry->remoterel.remoteid; + fkinfo->conkeys = bms_copy(fkkeys); + + bms_free(fkkeys); + entry->local_fkeys = lappend(entry->local_fkeys, fkinfo); + MemoryContextSwitchTo(oldctx); + } + } + + list_free_deep(fkeys); +} + /* * Check all local triggers for the relation to see the parallelizability. * @@ -715,6 +899,7 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, { entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; collect_local_indexes(entry); + collect_local_fkeys(entry); check_defined_triggers(entry); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c4478d17044..b6a2e0d4af2 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1201,6 +1201,136 @@ check_dependency_on_local_key(Oid relid, } } +/* + * Lookup-only dependency check by key. Does not register/update entries. + */ +static void +check_dependency_lookup_by_key(ReplicaIdentityKey *key, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + ReplicaIdentityEntry *rientry; + MemoryContext oldctx; + + Assert(replica_identity_table); + + oldctx = MemoryContextSwitchTo(ApplyContext); + + rientry = replica_identity_lookup(replica_identity_table, key); + free_replica_identity_key(key); + + MemoryContextSwitchTo(oldctx); + + if (!rientry) + return; + + /* + * Clean up committed entries while on it. + */ + if (pa_transaction_committed(rientry->remote_xid)) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + return; + } + + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &rientry->remote_xid, + new_depended_xid); +} + +/* + * Check for dependencies on preceding transactions that modify the referenced + * unique key of foreign keys. + */ +static void +check_dependency_on_foreign_key(Oid relid, + LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + LogicalRepTupleData *ridata; + ReplicaIdentityKey *rikey; + MemoryContext oldctx; + + Assert(depends_on_xids); + + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); + + /* + * We have already checked the local unique indexes and also gathered the + * FK info at that time. + * + * XXX: Should we check and load again? + */ + Assert(relentry->local_fkeys_collected); + + foreach_ptr(LogicalRepSubscriberFK, fkinfo, relentry->local_fkeys) + { + int columns = bms_num_members(fkinfo->conkeys); + bool suitable = true; + + Assert(columns); + + for (int i = 0; i < original_data->ncols; i++) + { + if (!bms_is_member(i, fkinfo->conkeys)) + continue; + + /* Skip if the column is NULL or not changed */ + if (original_data->colstatus[i] == LOGICALREP_COLUMN_NULL || + original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + { + suitable = false; + break; + } + } + + if (!suitable) + continue; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + /* Allocate space for replica identity values */ + ridata = palloc0_object(LogicalRepTupleData); + ridata->colvalues = palloc0_array(StringInfoData, columns); + ridata->colstatus = palloc0_array(char, columns); + ridata->ncols = columns; + + for (int i_original = 0, i_key = 0; i_original < original_data->ncols; i_original++) + { + StringInfo original_colvalue; + + if (!bms_is_member(i_original, fkinfo->conkeys)) + continue; + + original_colvalue = &original_data->colvalues[i_original]; + initStringInfoExt(&ridata->colvalues[i_key], original_colvalue->len + 1); + appendStringInfoString(&ridata->colvalues[i_key], original_colvalue->data); + + ridata->colstatus[i_key] = original_data->colstatus[i_original]; + i_key++; + } + + rikey = palloc0_object(ReplicaIdentityKey); + rikey->relid = fkinfo->ref_remoteid; + + /* + * XXX: use LOGICALREP_KEY_LOCAL_UNIQUE to compare with local unique + * indexes. + */ + rikey->kind = LOGICALREP_KEY_LOCAL_UNIQUE; + rikey->data = ridata; + + MemoryContextSwitchTo(oldctx); + + check_dependency_lookup_by_key(rikey, new_depended_xid, depends_on_xids); + } +} + /* * Check for preceding transactions that involve insert, delete, or update * operations on the specified table, and return them in 'depends_on_xids'. @@ -1403,6 +1533,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_local_key(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_on_foreign_key(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1419,6 +1552,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_local_key(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_on_foreign_key(relid, &oldtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); } @@ -1429,6 +1565,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_local_key(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_on_foreign_key(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1441,6 +1580,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_local_key(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_on_foreign_key(relid, &oldtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index f13f51b54a1..856fc9105b8 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -22,6 +22,13 @@ typedef struct LogicalRepSubscriberIdx Bitmapset *indexkeys; /* Bitmap of key columns *on remote* */ } LogicalRepSubscriberIdx; +typedef struct LogicalRepSubscriberFK +{ + Oid conoid; /* OID of the FK constraint */ + LogicalRepRelId ref_remoteid; /* referenced remote relid */ + Bitmapset *conkeys; /* Bitmap of key columns *on remote* */ +} LogicalRepSubscriberFK; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -54,6 +61,10 @@ typedef struct LogicalRepRelMapEntry List *local_unique_indexes; bool local_unique_indexes_collected; + /* Local foreign keys. Used for dependency tracking */ + List *local_fkeys; + bool local_fkeys_collected; + /* * Whether the relation can be applied in parallel or not. It is * distinglish whether defined triggers are the immutable or not. diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 708d263b5b7..5e5f7209526 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -26,6 +26,8 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE regress_tab (id int PRIMARY KEY, value text);"); $node_publisher->safe_psql('postgres', "INSERT INTO regress_tab VALUES (generate_series(1, 10), 'test');"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE regress_tab_fk (id int PRIMARY KEY, fk int);"); # Create a publication $node_publisher->safe_psql('postgres', @@ -56,6 +58,8 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_subscriber->safe_psql('postgres', "CREATE TABLE regress_tab (id int PRIMARY KEY, value text);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE regress_tab_fk (id int PRIMARY KEY, fk int REFERENCES regress_tab (id));"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;"); @@ -262,8 +266,8 @@ $h->query_safe("COMMIT;"); # Ensure subscriber-local indexes are also used for the dependency tracking -# Truncate the data for upcoming tests -$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +# delete the data for upcoming tests +$node_publisher->safe_psql('postgres', "DELETE FROM regress_tab;"); $node_publisher->wait_for_catchup('regress_sub'); # Define an unique index on subscriber @@ -314,7 +318,47 @@ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset # Cleanup $node_subscriber->safe_psql('postgres', "DROP INDEX regress_tab_value_idx;"); -$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->safe_psql('postgres', "DELETE FROM regress_tab;"); $node_publisher->wait_for_catchup('regress_sub'); +# Ensure subscriber-local indexes are also used for the dependency tracking + +# Insert an initial tuple +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (1, 'initial tuple');"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert a tuple on publisher. Parallel worker would wait at the injection +# point +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (2, 'tmp');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction waits for referring tuple +# is committed. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab_fk VALUES (1, 1);"); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + done_testing(); -- 2.43.7