From 4ecda5e6debda82946669f500ad14c018b0eae57 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Sat, 2 Dec 2023 10:28:26 +0100 Subject: [PATCH v20240111 4/7] global hash table of sequence relfilenodes --- .../replication/logical/reorderbuffer.c | 177 +++++++----------- src/include/replication/reorderbuffer.h | 6 + 2 files changed, 74 insertions(+), 109 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 6ac91068799..acaa6dce8e4 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -415,6 +415,14 @@ ReorderBufferAllocate(void) dlist_init(&buffer->txns_by_base_snapshot_lsn); dclist_init(&buffer->catchange_txns); + /* Create hash of sequences, mapping relfilelocator to transaction. */ + hash_ctl.keysize = sizeof(RelFileLocator); + hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); + hash_ctl.hcxt = buffer->context; + + buffer->sequences_hash = hash_create("ReorderBufferSequenceHash", 32, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* * Ensure there's no stale data from prior uses of this slot, in case some * prior exit avoided calling ReorderBufferFree. Failure to do this can @@ -1015,45 +1023,27 @@ ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, RelFileLocator rlocator, TransactionId *xid) { + ReorderBufferSequenceEnt *entry; bool found = false; - dlist_iter iter; AssertCheckSequences(rb); + entry = hash_search(rb->sequences_hash, + (void *) &rlocator, + HASH_FIND, + &found); + /* - * Walk all top-level transactions (some of which may be subxacts, except - * that we haven't processed the assignments yet), and check if any of - * them created the relfilenode. + * If we found an entry with matching relfilenode, we're done - we + * have to treat the sequence change as transactional, and replay it + * in the same (sub)transaction just like any other change. + * + * Optionally set XID of the (sub)xact that created the relfilenode. */ - dlist_foreach(iter, &rb->toplevel_by_lsn) + if (found) { - ReorderBufferSequenceEnt *entry; - ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node, - iter.cur); - - /* transaction has no relfilenodes at all */ - if (!txn->sequences_hash) - continue; - - entry = hash_search(txn->sequences_hash, - (void *) &rlocator, - HASH_FIND, - &found); - - /* - * If we found an entry with matching relfilenode, we're done - we - * have to treat the sequence change as transactional, and replay it - * in the same (sub)transaction just like any other change. - * - * Optionally set XID of the (sub)xact that created the relfilenode. - */ - if (found) - { - if (xid) - *xid = entry->txn->xid; - - break; - } + if (xid) + *xid = entry->txn->xid; } return found; @@ -1068,27 +1058,22 @@ ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, * the committed/aborted one. */ static void -ReorderBufferSequenceCleanup(ReorderBufferTXN *txn) +ReorderBufferSequenceCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) { HASH_SEQ_STATUS scan_status; ReorderBufferSequenceEnt *ent; - /* Bail out if not a subxact, or if there are no entries. */ - if (!rbtxn_is_known_subxact(txn)) - return; - + /* Bail out if there are no sequence entries for this xact. */ if (!txn->sequences_hash) return; /* - * Scan the top-level transaction hash and remove the entries from it. If - * we have entries for subxact, the top-level hash must have been - * initialized. + * Scan the global transaction hash and remove the entries from it. */ hash_seq_init(&scan_status, txn->sequences_hash); while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) { - if (hash_search(txn->toptxn->sequences_hash, + if (hash_search(rb->sequences_hash, (void *) &ent->rlocator, HASH_REMOVE, NULL) == NULL) elog(ERROR, "hash table corrupted"); @@ -1286,28 +1271,23 @@ ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); /* - * First add it to the top-level transaction, but make sure it links to + * First add it to the global hash table, and make sure it links to * the correct subtransaction (so that we add later changes to it). */ - if (txn->toptxn) - { - /* make sure the hash table is initialized */ - ReorderBufferTXNSequencesInit(rb, txn->toptxn); + entry = hash_search(rb->sequences_hash, + (void *) &rlocator, + HASH_ENTER, + &found); - /* search the lookup table */ - entry = hash_search(txn->toptxn->sequences_hash, - (void *) &rlocator, - HASH_ENTER, - &found); + /* + * We've just decoded creation of the relfilenode, so if we found it + * in the hash table, something is wrong. + */ + Assert(!found); - /* - * We've just decoded creation of the relfilenode, so if we found it - * in the hash table, something is wrong. - */ - Assert(!found); + entry->txn = txn; - entry->txn = txn; - } + /* now add it to the hash table in the transaction itself */ /* make sure the hash table is initialized */ ReorderBufferTXNSequencesInit(rb, txn); @@ -1417,7 +1397,6 @@ AssertCheckSequences(ReorderBuffer *rb) { #ifdef USE_ASSERT_CHECKING LogicalDecodingContext *ctx = rb->private_data; - dlist_iter iter; /* * Skip the verification if we don't reach the LSN at which we start @@ -1433,57 +1412,53 @@ AssertCheckSequences(ReorderBuffer *rb) return; /* - * Make sure the relfilenodes from subxacts are properly recorded in the - * top-level transaction hash table. + * Make sure the relfilenodes from all xacts are properly recorded in the + * global hash table of sequences. */ - dlist_foreach(iter, &rb->toplevel_by_lsn) { + HASH_SEQ_STATUS hash_seq; + ReorderBufferTXNByIdEnt *ent; int nentries = 0, nsubentries = 0; - dlist_iter subiter; - ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node, - iter.cur); /* - * We don't skip top-level transactions without relfilenodes, because - * there might be a subtransaction with some, and we want to detect + * We do this check even if the global hash table is empty, because + * there might be a transaction with some, and we want to detect * such cases too. */ - if (txn->sequences_hash) - nentries = hash_get_num_entries(txn->sequences_hash); + nentries = hash_get_num_entries(rb->sequences_hash); - /* walk all subxacts, and check the hash table in each one */ - dlist_foreach(subiter, &txn->subtxns) + /* walk all xacts, and check the hash table in each one */ + hash_seq_init(&hash_seq, rb->by_txn); + while ((ent = hash_seq_search(&hash_seq)) != NULL) { - HASH_SEQ_STATUS scan_status; + ReorderBufferTXN *txn = ent->txn; ReorderBufferSequenceEnt *entry; - - ReorderBufferTXN *subtxn = dlist_container(ReorderBufferTXN, node, - subiter.cur); + HASH_SEQ_STATUS scan_status; /* * If this subxact has no relfilenodes, skip it. We'll do the - * check in the opposite direction (that all top-level - * relfilenodes are in the correct subxact) later. + * check in the opposite direction (that all relfilenodes are + * also in hash for the correct xact) later. */ - if (!subtxn->sequences_hash) + if (!txn->sequences_hash) continue; - /* add number of relfilenodes in this subxact */ - nsubentries += hash_get_num_entries(subtxn->sequences_hash); + /* account for relfilenodes in this xact */ + nsubentries += hash_get_num_entries(txn->sequences_hash); /* * Check that all subxact relfilenodes are in the top-level txn * too, and are pointing to this subtransaction. */ - hash_seq_init(&scan_status, subtxn->sequences_hash); + hash_seq_init(&scan_status, txn->sequences_hash); while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) { bool found = false; ReorderBufferSequenceEnt *subentry; - /* search for the same relfilenode in the top-level txn */ - subentry = hash_search(txn->sequences_hash, + /* search for the same relfilenode in the global hash table */ + subentry = hash_search(rb->sequences_hash, (void *) &entry->rlocator, HASH_FIND, &found); @@ -1498,7 +1473,7 @@ AssertCheckSequences(ReorderBuffer *rb) * The entry has to point to the subxact - there's no subxact * "below" this one to which the relfilenode could belong. */ - Assert(subentry->txn == subtxn); + Assert(subentry->txn == txn); } } @@ -1508,32 +1483,25 @@ AssertCheckSequences(ReorderBuffer *rb) * directly, so this needs to be inequality. */ Assert(nentries >= nsubentries); + } + { /* * Now do the check in the opposite direction - check that every entry - * in the top-level txn (except those pointing to the top-level txn - * itself) point to one of the subxacts, and there's an entry in the - * subxact hash. + * in the global hash table point to one of the running xacts, and + * there's an entry in the transaction hash table. */ - if (txn->sequences_hash) + if (rb->sequences_hash) { HASH_SEQ_STATUS scan_status; ReorderBufferSequenceEnt *entry; - hash_seq_init(&scan_status, txn->sequences_hash); + hash_seq_init(&scan_status, rb->sequences_hash); while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) { bool found = false; ReorderBufferSequenceEnt *subentry; - /* Skip entries for the top-level txn itself. */ - if (entry->txn == txn) - continue; - - /* Is it a subxact of this txn? */ - Assert(rbtxn_is_known_subxact(entry->txn)); - Assert(entry->txn->toptxn == txn); - /* * Search for the same relfilenode in the subxact (it should * be initialized, as we expect it to contain the @@ -2181,6 +2149,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); + /* remove the sequence relfilenode from the global hash table */ + ReorderBufferSequenceCleanup(rb, txn); + /* deallocate */ ReorderBufferReturnTXN(rb, txn); } @@ -3519,9 +3490,6 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); - /* cleanup: remove sequence relfilenodes from the top-level txn */ - ReorderBufferSequenceCleanup(txn); - ReorderBufferCleanupTXN(rb, txn); } @@ -3571,9 +3539,6 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, /* cosmetic... */ txn->final_lsn = lsn; - /* remove sequence relfilenodes from the top-level txn */ - ReorderBufferSequenceCleanup(txn); - /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); @@ -3613,9 +3578,6 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) if (rbtxn_is_streamed(txn)) rb->stream_abort(rb, txn, InvalidXLogRecPtr); - /* remove sequence relfilenodes from the top-level txn */ - ReorderBufferSequenceCleanup(txn); - /* remove potential on-disk data, and deallocate this tx */ ReorderBufferCleanupTXN(rb, txn); @@ -3668,9 +3630,6 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) else Assert(txn->ninvalidations == 0); - /* remove sequence relfilenodes from a top-level txn */ - ReorderBufferSequenceCleanup(txn); - /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0134133517a..ad206b3f7d3 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -704,6 +704,12 @@ struct ReorderBuffer */ int64 totalTxns; /* total number of transactions sent */ int64 totalBytes; /* total amount of data decoded */ + + /* + * Sequence relfilenodes created in any transaction (also includes + * altered sequences, which assigns new relfilenode). + */ + HTAB *sequences_hash; }; -- 2.43.0