From d45c30dc76044c967b83c2b1eba23da0db35f001 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 23 Dec 2025 17:58:15 +0900 Subject: [PATCH v11 7/9] Wait applying transaction if one of user-defined triggers is immutable Since many parallel workers apply transactions, triggers for relations can also be fired in parallel, which may obtain unexpected results. To make it safe, parallel apply workers wait for the previously dispatched transaction before applying changes to the relation that has mutable triggers. --- src/backend/replication/logical/relation.c | 123 ++++++++++++++++++--- src/backend/replication/logical/worker.c | 68 ++++++++++++ src/include/replication/logicalrelation.h | 20 ++++ 3 files changed, 197 insertions(+), 14 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index e1ce183dfd3..eeb85f8cc5d 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -21,7 +21,9 @@ #include "access/genam.h" #include "access/table.h" #include "catalog/namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_subscription_rel.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "nodes/makefuncs.h" #include "replication/logicalrelation.h" @@ -160,6 +162,10 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) * * Called when new relation mapping is sent by the publisher to update * our expected view of incoming data from said publisher. + * + * Note that we do not check the user-defined constraints here. PostgreSQL has + * already assumed that CHECK constraints' conditions are immutable and here + * follows the rule. */ void logicalrep_relmap_update(LogicalRepRelation *remoterel) @@ -209,6 +215,8 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) (remoterel->relkind == 0) ? RELKIND_RELATION : remoterel->relkind; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + + entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; MemoryContextSwitchTo(oldctx); } @@ -354,27 +362,79 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } /* - * Open the local relation associated with the remote one. + * Check all local triggers for the relation to see the parallelizability. * - * Rebuilds the Relcache mapping if it was invalidated by local DDL. + * We regard relations as applicable in parallel if all triggers are immutable. + * Result is directly set to LogicalRepRelMapEntry::parallel_safe. */ -LogicalRepRelMapEntry * -logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) +static void +check_defined_triggers(LogicalRepRelMapEntry *entry) +{ + TriggerDesc *trigdesc = entry->localrel->trigdesc; + + /* Quick exit if triffer is not defined */ + if (trigdesc == NULL) + { + entry->parallel_safe = LOGICALREP_PARALLEL_SAFE; + return; + } + + /* Seek triggers one by one to see the volatility */ + for (int i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + + Assert(OidIsValid(trigger->tgfoid)); + + /* Skip if the trigger is not enabled for logical replication */ + if (trigger->tgenabled == TRIGGER_DISABLED || + trigger->tgenabled == TRIGGER_FIRES_ON_ORIGIN) + continue; + + /* Check the volatility of the trigger. Exit if it is not immutable */ + if (func_volatile(trigger->tgfoid) != PROVOLATILE_IMMUTABLE) + { + entry->parallel_safe = LOGICALREP_PARALLEL_RESTRICTED; + return; + } + } + + /* All triggers are immutable, set as parallel safe */ + entry->parallel_safe = LOGICALREP_PARALLEL_SAFE; +} + +/* + * Actual workhorse for logicalrep_rel_open(). + * + * Caller must specify *either* entry or key. If the entry is specified, its + * attributes are filled and returned. The logical relation is kept opening. + * If the key is given, the corresponding entry is first searched in the hash + * table and processed as in the above case. At the end, logical replication is + * closed. + */ +void +logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, + LOCKMODE lockmode) { - LogicalRepRelMapEntry *entry; - bool found; LogicalRepRelation *remoterel; - if (LogicalRepRelMap == NULL) - logicalrep_relmap_init(); + Assert((entry && !remoteid) || (!entry && remoteid)); - /* Search for existing entry. */ - entry = hash_search(LogicalRepRelMap, &remoteid, - HASH_FIND, &found); + if (!entry) + { + bool found; - if (!found) - elog(ERROR, "no relation map entry for remote relation ID %u", - remoteid); + if (LogicalRepRelMap == NULL) + logicalrep_relmap_init(); + + /* Search for existing entry. */ + entry = hash_search(LogicalRepRelMap, &remoteid, + HASH_FIND, &found); + + if (!found) + elog(ERROR, "no relation map entry for remote relation ID %u", + remoteid); + } remoterel = &entry->remoterel; @@ -500,6 +560,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel, entry->attrmap); + /* + * Leader must also collect all local unique indexes for dependency + * tracking. + */ + if (am_leader_apply_worker()) + check_defined_triggers(entry); + entry->localrelvalid = true; } @@ -508,6 +575,34 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) entry->localreloid, &entry->statelsn); + if (remoteid) + logicalrep_rel_close(entry, lockmode); +} + +/* + * Open the local relation associated with the remote one. + * + * Rebuilds the Relcache mapping if it was invalidated by local DDL. + */ +LogicalRepRelMapEntry * +logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) +{ + LogicalRepRelMapEntry *entry; + bool found; + + if (LogicalRepRelMap == NULL) + logicalrep_relmap_init(); + + /* Search for existing entry. */ + entry = hash_search(LogicalRepRelMap, &remoteid, + HASH_FIND, &found); + + if (!found) + elog(ERROR, "no relation map entry for remote relation ID %u", + remoteid); + + logicalrep_rel_load(entry, 0, lockmode); + return entry; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b0fc2e3f0bc..3645ffa4431 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1077,6 +1077,59 @@ check_dependency_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid, relentry->last_depended_xid = new_depended_xid; } +/* + * Check the parallelizability of applying changes for the relation. + * Append the lastly dispatched transaction in in 'depends_on_xids' if the + * relation is parallel unsafe. + */ +static void +check_dependency_for_parallel_safety(LogicalRepRelId relid, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + + /* Quick exit if no transactions have been dispatched */ + if (!TransactionIdIsValid(last_remote_xid)) + return; + + relentry = logicalrep_get_relentry(relid); + + /* + * Gather information for local triggres if not yet. We require to be in a + * transaction state because system catalogs are read. + */ + if (relentry->parallel_safe == LOGICALREP_PARALLEL_UNKNOWN) + { + bool needs_start = !IsTransactionOrTransactionBlock(); + + if (needs_start) + StartTransactionCommand(); + + logicalrep_rel_load(NULL, relid, AccessShareLock); + + /* + * Close the transaction if we start here. We must not abort because + * it would release all session-level locks, such as the stream lock, + * and break the deadlock detection mechanism between LA and PA. The + * outcome is the same regardless of the end status, since the + * transaction did not modify any tuples. + */ + if (needs_start) + CommitTransactionCommand(); + + Assert(relentry->parallel_safe != LOGICALREP_PARALLEL_UNKNOWN); + } + + /* Do nothing for parallel safe relations */ + if (relentry->parallel_safe == LOGICALREP_PARALLEL_SAFE) + return; + + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &last_remote_xid, + new_depended_xid); +} + /* * Check dependencies related to the current change by determining if the * modification impacts the same row or table as another ongoing transaction. If @@ -1135,6 +1188,8 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, + &depends_on_xids); break; case LOGICAL_REP_MSG_UPDATE: @@ -1142,13 +1197,19 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, &newtup); if (has_oldtup) + { check_dependency_on_replica_identity(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, + &depends_on_xids); + } check_dependency_on_replica_identity(relid, &newtup, new_depended_xid, &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, + &depends_on_xids); break; case LOGICAL_REP_MSG_DELETE: @@ -1156,6 +1217,8 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_dependency_on_replica_identity(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, + &depends_on_xids); break; case LOGICAL_REP_MSG_TRUNCATE: @@ -1168,8 +1231,13 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, * modified the same table. */ foreach_int(truncated_relid, remote_relids) + { check_dependency_on_rel(truncated_relid, new_depended_xid, &depends_on_xids); + check_dependency_for_parallel_safety(truncated_relid, + new_depended_xid, + &depends_on_xids); + } break; diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 27e365f84c2..916d21c2dbd 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -39,6 +39,20 @@ typedef struct LogicalRepRelMapEntry XLogRecPtr statelsn; TransactionId last_depended_xid; + + /* + * Whether the relation can be applied in parallel or not. It is + * distinglish whether defined triggers are the immutable or not. + * + * Theoretically, we can determine the parallelizability for each type of + * replication messages, INSERT/UPDATE/DELETE/TRUNCATE. But it is not done + * yet to reduce the number of attributes. + * + * Note that we do not check the user-defined constraints here. PostgreSQL + * has already assumed that CHECK constraints' conditions are immutable and + * here follows the rule. + */ + char parallel_safe; } LogicalRepRelMapEntry; extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); @@ -46,6 +60,8 @@ extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel); extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode); +extern void logicalrep_rel_load(LogicalRepRelMapEntry *entry, + LogicalRepRelId remoteid, LOCKMODE lockmode); extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map); extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, @@ -56,4 +72,8 @@ extern int logicalrep_get_num_rels(void); extern void logicalrep_write_all_rels(StringInfo out); extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid); +#define LOGICALREP_PARALLEL_SAFE 's' +#define LOGICALREP_PARALLEL_RESTRICTED 'r' +#define LOGICALREP_PARALLEL_UNKNOWN 'u' + #endif /* LOGICALRELATION_H */ -- 2.47.3