From 5659f9007ffdfc4daca5f9659be6c546ef4d7c5c Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Fri, 23 Aug 2024 14:02:20 +0530 Subject: [PATCH v18_REL_14] Distribute invalidatons if change in catalog tables Distribute invalidations to inprogress transactions if the current committed transaction change any catalog table. --- .../replication/logical/reorderbuffer.c | 5 +- src/backend/replication/logical/snapbuild.c | 54 +++-- src/include/replication/reorderbuffer.h | 4 + src/test/subscription/t/100_bugs.pl | 201 +++++++++++++++++- 4 files changed, 246 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 64d9baa7982..2456f5bd98c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -215,9 +215,6 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */ */ static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb); static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); -static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, - TransactionId xid, bool create, bool *is_new, - XLogRecPtr lsn, bool create_as_top); static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn); @@ -611,7 +608,7 @@ ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids) * (with the given LSN, and as top transaction if that's specified); * when this happens, is_new is set to true. */ -static ReorderBufferTXN * +ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top) { diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 9d8700147ca..d72b67777b0 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -290,7 +290,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap); static void SnapBuildSnapIncRefcount(Snapshot snap); -static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); @@ -843,18 +843,21 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, } /* - * Add a new Snapshot to all transactions we're decoding that currently are - * in-progress so they can see new catalog contents made by the transaction - * that just committed. This is necessary because those in-progress - * transactions will use the new catalog's contents from here on (at the very - * least everything they do needs to be compatible with newer catalog - * contents). + * Add a new Snapshot and invalidation messages to all transactions we're + * decoding that currently are in-progress so they can see new catalog contents + * made by the transaction that just committed. This is necessary because those + * in-progress transactions will use the new catalog's contents from here on + * (at the very least everything they do needs to be compatible with newer + * catalog contents). */ static void -SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) +SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) { dlist_iter txn_i; ReorderBufferTXN *txn; + ReorderBufferTXN *curr_txn; + + curr_txn = ReorderBufferTXNByXid(builder->reorder, xid, false, NULL, InvalidXLogRecPtr, false); /* * Iterate through all toplevel transactions. This can include @@ -872,6 +875,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * transaction which in turn implies we don't yet need a snapshot at * all. We'll add a snapshot when the first change gets queued. * + * Similarly, we don't need to add invalidations to a transaction whose + * base snapshot is not yet set. Once a base snapshot is built, it will + * include the xids of committed transactions that have modified the + * catalog, thus reflecting the new catalog contents. The existing + * catalog cache will have already been invalidated after processing + * the invalidations in the transaction that modified catalogs, + * ensuring that a fresh cache is constructed during decoding. + * * NB: This works correctly even for subtransactions because * ReorderBufferAssignChild() takes care to transfer the base snapshot * to the top-level transaction, and while iterating the changequeue @@ -881,13 +892,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) continue; /* - * We don't need to add snapshot to prepared transactions as they - * should not see the new catalog contents. + * We don't need to add snapshot or invalidations to prepared + * transactions as they should not see the new catalog contents. */ if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) continue; - elog(DEBUG2, "adding a new snapshot to %u at %X/%X", + elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X", txn->xid, LSN_FORMAT_ARGS(lsn)); /* @@ -897,6 +908,20 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn, builder->snapshot); + + /* + * Add invalidation messages to the reorder buffer of inprogress + * transactions except the current committed transaction, for which we + * will execute invalidations at the end. + * + * It is required, otherwise, we will end up using the stale catcache + * contents built by the current transaction even after its decoding + * which should have been invalidated due to concurrent catalog + * changing transaction. + */ + if (txn->xid != xid && curr_txn->ninvalidations > 0) + ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, + curr_txn->ninvalidations, curr_txn->invalidations); } } @@ -1175,8 +1200,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); - /* add a new catalog snapshot to all currently running transactions */ - SnapBuildDistributeNewCatalogSnapshot(builder, lsn); + /* + * add a new catalog snapshot and invalidations messages to all + * currently running transactions + */ + SnapBuildDistributeSnapshotAndInval(builder, lsn, xid); } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ba257d81b51..5c8833fe099 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -676,6 +676,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); +ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, + bool create, bool *is_new, + XLogRecPtr lsn, bool create_as_top); + void StartupReorderBuffer(void); #endif diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index cce91891ab9..3d99d937c9c 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 9; +use Test::More tests => 12; # Bug #15114 @@ -299,6 +299,205 @@ is( $node_subscriber->safe_psql( $node_publisher->stop('fast'); $node_subscriber->stop('fast'); +# The bug was that the incremental data synchronization was being skipped when +# a new table is added to the publication in presence of a concurrent active +# transaction performing the DML on the same table. +$node_publisher = get_new_node('node_publisher_invalidation'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +$node_subscriber = get_new_node('node_subscriber_invalidation'); +$node_subscriber->init(); +$node_subscriber->start; + +# Initial setup. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE tab_conc(a int); + CREATE PUBLICATION regress_pub1; +)); + +$publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE tab_conc(a int); + CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1; +)); + +# Bump the query timeout to avoid false negatives on slow test systems. +my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default; + +# Initiate 2 background sessions. +my $background_psql1 = $node_publisher->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $psql_timeout_secs); +$background_psql1->set_query_timer_restart(); + +my $background_psql2 = $node_publisher->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $psql_timeout_secs); +$background_psql2->set_query_timer_restart(); + +# Maintain an active transaction with the table that will be added to the +# publication. +$background_psql1->query_safe( + qq( + BEGIN; + INSERT INTO tab_conc VALUES (1); +)); + +# Add the table to the publication using background_psql, as the alter +# publication operation will distribute the invalidations to inprogress txns. +$background_psql2->query_safe( + "ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc" +); + +# Complete the transaction on the table. +$background_psql1->query_safe("COMMIT"); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO tab_conc VALUES (2); +)); + +# Refresh the publication. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1'); + +my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2), + 'Ensure that the data from the tab_conc table is synchronized to the subscriber after the subscription is refreshed' +); + +# Perform an insert. +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO tab_conc VALUES (3); +)); +$node_publisher->wait_for_catchup('regress_sub1'); + +# Verify that the insert is replicated to the subscriber. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2 +3), + 'Verify that the incremental data for table tab_conc added after table synchronization is replicated to the subscriber' +); + +# The bug was that the incremental data synchronization was happening even when +# tables are dropped from the publication in presence of a concurrent active +# transaction performing the DML on the same table. + +# Maintain an active transaction with the table that will be dropped from the +# publication. +$background_psql1->query_safe( + qq( + BEGIN; + INSERT INTO tab_conc VALUES (4); +)); + +# Drop the table from the publication using background_psql, as the alter +# publication operation will distribute the invalidations to inprogress txns. +$background_psql2->query_safe( + "ALTER PUBLICATION regress_pub1 DROP TABLE tab_conc" +); + +# Complete the transaction on the table. +$background_psql1->query_safe("COMMIT"); + +# Perform an insert. +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO tab_conc VALUES (5); +)); + +$node_publisher->wait_for_catchup('regress_sub1'); + +# Verify that the insert is not replicated to the subscriber. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2 +3 +4), + 'Verify that data for table tab_conc are not replicated to subscriber'); + +# The bug was that the incremental data synchronization was happening even after +# publication is dropped in a concurrent active transaction. + +# Add tables to the publication. +$background_psql2->query_safe( + "ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc" +); + +# Maintain an active transaction with the table. +$background_psql1->query_safe( + qq( + BEGIN; + INSERT INTO tab_conc VALUES (6); +)); + +# Drop publication. +$background_psql2->query_safe("DROP PUBLICATION regress_pub1"); + +# Perform an insert. +$background_psql1->query_safe("INSERT INTO tab_conc VALUES (7)"); + +# Complete the transaction on the table. +$background_psql1->query_safe("COMMIT"); + +# ERROR should appear on subscriber. +my $offset = -s $node_subscriber->logfile; +$node_subscriber->wait_for_log( + qr/ERROR: publication "regress_pub1" does not exist/, $offset); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1"); + +# The bug was that the incremental data synchronization was happening even after +# publication is renamed in a concurrent active transaction. + +# Create publication. +$background_psql2->query_safe( + "CREATE PUBLICATION regress_pub1 FOR TABLE tab_conc" +); + +# Create subscription. +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1" +); + +# Maintain an active transaction with the table. +$background_psql1->query_safe( + qq( + BEGIN; + INSERT INTO tab_conc VALUES (8); +)); + +# Rename publication. +$background_psql2->query_safe( + "ALTER PUBLICATION regress_pub1 RENAME TO regress_pub1_rename"); + +# Perform an insert. +$background_psql1->query_safe("INSERT INTO tab_conc VALUES (9)"); + +# Complete the transaction on the tables. +$background_psql1->query_safe("COMMIT"); + +# ERROR should appear on subscriber. +$offset = -s $node_subscriber->logfile; +$node_subscriber->wait_for_log( + qr/ERROR: publication "regress_pub1" does not exist/, $offset); + +$background_psql1->quit; +$background_psql2->quit; + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); + # The bug was that when the REPLICA IDENTITY FULL is used with dropped or # generated columns, we fail to apply updates and deletes my $node_publisher_d_cols = get_new_node('node_publisher_d_cols'); -- 2.43.5