From 3f30765b61c0d6ad24c112660b800e884ad83230 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Sat, 29 Jul 2023 14:39:47 +0200 Subject: [PATCH 6/6] per-transaction hash of sequences --- src/backend/replication/logical/decode.c | 2 +- .../replication/logical/reorderbuffer.c | 332 ++++++++++++++---- src/include/replication/reorderbuffer.h | 14 +- 3 files changed, 272 insertions(+), 76 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f2fb4a2f24..6232120493 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -1480,5 +1480,5 @@ smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; - ReorderBufferAddRelFileLocator(ctx->reorder, xid, xlrec->rlocator); + ReorderBufferAddRelFileLocator(ctx->reorder, xid, buf->endptr, xlrec->rlocator); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 69d52b6f39..34e8590528 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -154,7 +154,7 @@ typedef struct ReorderBufferTXNByIdEnt typedef struct ReorderBufferSequenceEnt { RelFileLocator rlocator; - TransactionId xid; + ReorderBufferTXN *txn; } ReorderBufferSequenceEnt; /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */ @@ -390,14 +390,6 @@ ReorderBufferAllocate(void) buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* hash table of sequences, mapping relfilelocator to XID of transaction */ - hash_ctl.keysize = sizeof(RelFileLocator); - hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); - hash_ctl.hcxt = buffer->context; - - buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; @@ -505,12 +497,48 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + if (txn->sequences_hash != NULL) + { + hash_destroy(txn->sequences_hash); + txn->sequences_hash = NULL; + } + /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); pfree(txn); } +/* + * Initialize hash table of relfilenodes created by the transaction. + * + * Each entry maps the relfilenode to the (sub)transaction that created the + * relfilenode - which is also the transaction the sequence change needs to + * be part of (in transactional case). + * + * We don't do this in ReorderBufferGetTXN because that'd allocate the hash + * for all transactions, and we expect new relfilenodes to be fairly rare. + * So only do that when adding the first entry. + */ +static void +ReorderBufferTXNSequencesInit(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + HASHCTL hash_ctl; + + /* bail out if already initialized */ + if (txn->sequences_hash) + return; + + /* hash table of sequences, mapping relfilelocator to transaction */ + hash_ctl.keysize = sizeof(RelFileLocator); + hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); + hash_ctl.hcxt = rb->context; + + /* we expect relfilenodes to be created only rarely, so 32 seems enough */ + txn->sequences_hash = hash_create("ReorderBufferTXNSequenceHash", 32, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + /* * Get a fresh ReorderBufferChange. */ @@ -954,49 +982,189 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, /* * Treat the sequence change as transactional? * - * The hash table tracks all sequences which are assigned a new relfilenode in - * the transaction being decoded, so we simply do a lookup (the sequence is - * identified by relfilende). If we find a match, the change should be handled - * as transactional. + * To decide if a sequence change should be handled as transactional or applied + * immediately, we track (sequence) relfilenodes created by each transaction. + * We don't know if the current sub-transaction was already assigned to the + * top-level transaction, so we need to check all transactions. + * + * To limit the number of transactions we need to check, we copy the entries + * from subxacts to the top-level transaction, so we have to scan just the top + * level ones. We expect relfilenode creation to be a rare thing, so the copies + * should not waste too much memory. But sequence changes are very common, so + * making the check more efficient seems like a very good trade off. + * + * XXX Actually, can it happen that the either the relfilenode creation and/or + * the sequence change is still unassigned? + * + * XXX Maybe we should return the XID (in transactional mode), so that we don't + * need to look it up again while queueing it. + * + * XXX Maybe this should try searching the current xact, or the immediate parent. + * If it's a transactional change, it's likely where we find a match. */ bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, RelFileLocator rlocator) { - bool found = false; + bool found = false; + dlist_iter iter; + + /* + * Walk all top-level transactions (some of which may be subxacts, except + * that we haven't processed the assignments yet), and check if any of + * them created the relfilenode. + */ + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node, + iter.cur); + + /* transaction has no relfilenodes at all */ + if (!txn->sequences_hash) + continue; - hash_search(rb->sequences, - (void *) &rlocator, - HASH_FIND, - &found); + (void) hash_search(txn->sequences_hash, + (void *) &rlocator, + HASH_FIND, + &found); + + /* + * If we found an entry with matchine relfilenode, we're done - we have + * to treat the sequence change as transactional, and replay it in the + * same (sub)transaction just like any other change. + */ + if (found) + break; + } return found; } /* - * Cleanup sequences created in in-progress transactions. + * ReorderBufferSequenceGetXid + * XID of the (sub)transaction that created the relfilenode. * - * There's no way to search by XID, so we simply do a seqscan of all - * the entries in the hash table. Hopefully there are only a couple - * entries in most cases - people generally don't create many new - * sequences over and over. + * Returns XID for transactional changes, and invalid XID otherwise. + * + * XXX Same thing as ReorderBufferSequenceIsTransactional, except that instead + * of returning true/false, returns XID of the xact that created relfilenode. + */ +static TransactionId +ReorderBufferSequenceGetXid(ReorderBuffer *rb, + RelFileLocator rlocator) +{ + bool found = true; + dlist_iter iter; + + /* + * Walk all top-level transactions (some of which may be subxacts, except + * that we haven't processed the assignments yet), and check if any of + * them created the relfilenode. + */ + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferSequenceEnt *entry; + ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node, + iter.cur); + + /* transaction has no relfilenodes at all */ + if (!txn->sequences_hash) + continue; + + entry = hash_search(txn->sequences_hash, + (void *) &rlocator, + HASH_FIND, + &found); + + /* + * If we found an entry with matchine relfilenode, we're done - we have + * to treat the sequence change as transactional, and replay it in the + * same (sub)transaction just like any other change. + */ + if (found) + return entry->txn->xid; + } + + /* no match (has to be non-transactional change) */ + return InvalidTransactionId; +} + +/* + * ReorderBufferTransferSequencesToParent + * Copy the relfilenode entries to the parent after assignment. */ static void -ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid) +ReorderBufferTransferSequencesToParent(ReorderBuffer *rb, + ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn) { HASH_SEQ_STATUS scan_status; ReorderBufferSequenceEnt *ent; - hash_seq_init(&scan_status, rb->sequences); + /* we should only get here for subtransactions */ + Assert(rbtxn_is_known_subxact(subtxn)); + + /* if the subtransaction has no relfilenodes, there's nothing to copy */ + if (!subtxn->sequences_hash) + return; + + /* make sure the hash table for parent xact is initialized */ + ReorderBufferTXNSequencesInit(rb, txn); + + /* scan the subxact hash and copy entries to the parent */ + hash_seq_init(&scan_status, subtxn->sequences_hash); while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) { - /* skip sequences not from this transaction */ - if (ent->xid != xid) - continue; + ReorderBufferSequenceEnt *entry; + bool found = false; + + /* add the same relfilenode to the parent */ + entry = hash_search(txn->toptxn->sequences_hash, + (void *) &ent->rlocator, + HASH_ENTER, + &found); + + /* the parent should not have the relfilenode yet */ + Assert(!found); + Assert(ent->txn == subtxn); + + entry->txn = ent->txn; + } +} - (void) hash_search(rb->sequences, - (void *) &(ent->rlocator), - HASH_REMOVE, NULL); +/* + * Cleanup sequences after a subtransaction got aborted. + * + * The hash table will get destroyed in ReorderBufferReturnTXN, so we don't + * need to worry about that. But the entries were copied to the parent xact, + * and that's still being decoded - we make sure to remove the entries from + * the aborted one. + * + * XXX We only call this after an abort. + */ +static void +ReorderBufferSequenceCleanup(ReorderBufferTXN *txn) +{ + HASH_SEQ_STATUS scan_status; + ReorderBufferSequenceEnt *ent; + + /* Bail out if not a subxact, or if there are no entries. */ + if (!rbtxn_is_known_subxact(txn)) + return; + + if (!txn->sequences_hash) + return; + + /* + * Scan the top-level transaction hash and remove the entries from it. If we + * have entries for subxact, the top-level hash must have been initialized. + */ + hash_seq_init(&scan_status, txn->sequences_hash); + while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) + { + (void) hash_search(txn->toptxn->sequences_hash, + (void *) &ent->rlocator, + HASH_REMOVE, NULL); } } @@ -1030,8 +1198,6 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, { MemoryContext oldcontext; ReorderBufferChange *change; - ReorderBufferSequenceEnt *ent; - bool found; /* allocate and queue the transactional sequence change */ oldcontext = MemoryContextSwitchTo(rb->context); @@ -1045,19 +1211,14 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, change->data.sequence.tuple = tuplebuf; - /* - * Lookup XID of the transaction where to queue the change (the one - * that did the ALTER SEQUENCE etc.) - */ - ent = hash_search(rb->sequences, - (void *) &rlocator, - HASH_FIND, - &found); + /* lookup the XID for transaction that created the relfilenode */ + xid = ReorderBufferSequenceGetXid(rb, rlocator); - Assert(found); + /* the XID should be valid for a transactional change */ + Assert(TransactionIdIsValid(xid)); - /* add it to the same subxact that created the sequence */ - ReorderBufferQueueChange(rb, ent->xid, lsn, change, false); + /* add it to the same (sub)xact that created that relfilenode */ + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -1075,16 +1236,8 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, /* non-transactional changes require a valid snapshot */ Assert(snapshot_now); -#ifdef USE_ASSERT_CHECKING - /* Make sure the sequence is not in the hash table. */ - { - bool found; - hash_search(rb->sequences, - (void *) &rlocator, - HASH_FIND, &found); - Assert(!found); - } -#endif + /* Make sure the sequence is not in any of the hash tables */ + Assert(!TransactionIdIsValid(ReorderBufferSequenceGetXid(rb, rlocator))); if (xid != InvalidTransactionId) txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); @@ -1169,25 +1322,65 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, /* * ReorderBufferAddRelFileLocator - * Add newly created relfilenode to the global hash table. + * Add newly created relfilenode to the hash table for transaction. + * + * If the transaction is already known to be a subtransaction, we add the same + * entry into the parent transaction (but it still points at the subxact, so + * that we know where to queue changes or what to discard in case of an abort). */ void ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid, - RelFileLocator rlocator) + XLogRecPtr lsn, RelFileLocator rlocator) { - /* lookup sequence by relfilelocator */ - ReorderBufferSequenceEnt *ent; - bool found; + bool found; + ReorderBufferSequenceEnt *entry; + ReorderBufferTXN *txn; - /* sequence changes require a transaction */ + /* + * We only care about sequence relfilenodes for now, and those always have + * a XID. So if there's no XID, don't bother adding them to the hash. + */ if (xid == InvalidTransactionId) return; + /* + * This might be the first change decoded for this transaction, so make + * sure we create it if needed. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* + * First add it to the top-level transaction, but make sure it links to + * the correct subtransaction (so that we add later increments to it). + */ + if (txn->toptxn) + { + /* make sure the hash table is initialized */ + ReorderBufferTXNSequencesInit(rb, txn->toptxn); + + /* search the lookup table */ + entry = hash_search(txn->toptxn->sequences_hash, + (void *) &rlocator, + HASH_ENTER, + &found); + + /* + * We've just decoded creation of the relfilenode, so if we found it in + * the hash table, something is wrong. + */ + Assert(!found); + + entry->txn = txn; + } + + /* make sure the hash table is initialized */ + ReorderBufferTXNSequencesInit(rb, txn); + /* search the lookup table */ - ent = hash_search(rb->sequences, - (void *) &rlocator, - HASH_ENTER, - &found); + entry = hash_search(txn->sequences_hash, + (void *) &rlocator, + HASH_ENTER, + &found); /* * We've just decoded creation of the relfilenode, so if we found it in @@ -1195,7 +1388,7 @@ ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid, */ Assert(!found); - ent->xid = xid; + entry->txn = txn; } /* @@ -1407,6 +1600,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, /* Possibly transfer the subtxn's snapshot to its top-level txn. */ ReorderBufferTransferSnapToParent(txn, subtxn); + /* Possibly transfer the subtxn's sequences to its top-level txn. */ + ReorderBufferTransferSequencesToParent(rb, txn, subtxn); + /* Verify LSN-ordering invariant */ AssertTXNLsnOrder(rb); } @@ -1888,9 +2084,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found); Assert(found); - /* Remove sequences created in this transaction (if any). */ - ReorderBufferSequenceCleanup(rb, txn->xid); - /* remove entries spilled to disk */ if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); @@ -3281,6 +3474,9 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, /* cosmetic... */ txn->final_lsn = lsn; + /* remove sequence relfilenodes from a top-level hash table (if subxact) */ + ReorderBufferSequenceCleanup(txn); + /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1f3a39c311..e5d8e1c0d1 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -400,6 +400,12 @@ typedef struct ReorderBufferTXN */ HTAB *toast_hash; + /* + * Sequence relfilenodes created in this transaction (also includes + * altered sequences, which assigns new relfilenode). + */ + HTAB *sequences_hash; + /* * non-hierarchical list of subtransactions that are *not* aborted. Only * used in toplevel transactions. @@ -581,12 +587,6 @@ struct ReorderBuffer */ HTAB *by_txn; - /* - * relfilenode => XID lookup table for sequences created in a transaction - * (also includes altered sequences, which assigns new relfilenode) - */ - HTAB *sequences; - /* * Transactions that could be a toplevel xact, ordered by LSN of the first * record bearing that xid. @@ -787,7 +787,7 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr); extern void StartupReorderBuffer(void); extern void ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid, - RelFileLocator rlocator); + XLogRecPtr lsn, RelFileLocator rlocator); extern bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, RelFileLocator locator); -- 2.41.0