From ae3312b8669246dc425179e083b61602d197dfc6 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Fri, 10 Mar 2023 09:03:15 +1100 Subject: [PATCH v1] Add macros for ReorderBufferTXN toptxn. Make toptxn related code more readable by introducing some simple macros. --- contrib/test_decoding/test_decoding.c | 4 +-- src/backend/replication/logical/reorderbuffer.c | 38 +++++++++++-------------- src/backend/replication/pgoutput/pgoutput.c | 6 ++-- src/include/replication/reorderbuffer.h | 3 ++ 4 files changed, 24 insertions(+), 27 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index b7e6048..d8710ee 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -815,11 +815,11 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, * maintain the output_plugin_private only under the toptxn so if this is * not the toptxn then fetch the toptxn. */ - ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn; + ReorderBufferTXN *toptxn = get_toptxn(txn); TestDecodingTxnData *txndata = toptxn->output_plugin_private; bool xact_wrote_changes = txndata->xact_wrote_changes; - if (txn->toptxn == NULL) + if (isa_toptxn(txn)) { Assert(txn->output_plugin_private != NULL); pfree(txndata); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2d17c55..3e2edc9 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -717,10 +717,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, return; /* Get the top transaction. */ - if (txn->toptxn != NULL) - toptxn = txn->toptxn; - else - toptxn = txn; + toptxn = get_toptxn(txn); /* * Indicate a partial change for toast inserts. The change will be @@ -812,10 +809,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferTXN *toptxn; /* get the top transaction */ - if (txn->toptxn != NULL) - toptxn = txn->toptxn; - else - toptxn = txn; + toptxn = get_toptxn(txn); toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE; } @@ -1667,7 +1661,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) + if ((!txn_prepared) && (isa_toptxn(txn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; if (txn_prepared) @@ -3207,10 +3201,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, * Update the total size in top level as well. This is later used to * compute the decoding stats. */ - if (txn->toptxn != NULL) - toptxn = txn->toptxn; - else - toptxn = txn; + toptxn = get_toptxn(txn); if (addition) { @@ -3295,8 +3286,8 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, * so that we can execute them all together. See comments atop this * function. */ - if (txn->toptxn) - txn = txn->toptxn; + if (isa_subtxn(txn)) + txn = get_toptxn(txn); Assert(nmsgs > 0); @@ -3354,7 +3345,6 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { ReorderBufferTXN *txn; - ReorderBufferTXN *toptxn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); @@ -3370,11 +3360,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, * conveniently check just top-level transaction and decide whether to * build the hash table or not. */ - toptxn = txn->toptxn; - if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn)) + if (isa_subtxn(txn)) { - toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; - dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); + ReorderBufferTXN *toptxn = get_toptxn(txn); + + if (!rbtxn_has_catalog_changes(toptxn)) + { + toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); + } } } @@ -3619,7 +3613,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) { /* we know there has to be one, because the size is not zero */ - Assert(txn && !txn->toptxn); + Assert(txn && isa_toptxn(txn)); Assert(txn->total_size > 0); Assert(rb->size >= txn->total_size); @@ -4007,7 +4001,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) bool txn_is_streamed; /* We can never reach here for a subtransaction. */ - Assert(txn->toptxn == NULL); + Assert(isa_toptxn(txn)); /* * We can't make any assumptions about base snapshot here, similar to what diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 00a2d73..abfeea6 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -694,8 +694,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, if (in_streaming) xid = change->txn->xid; - if (change->txn->toptxn) - topxid = change->txn->toptxn->xid; + if (isa_subtxn(change->txn)) + topxid = get_toptxn(change->txn)->xid; else topxid = xid; @@ -1879,7 +1879,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, Assert(!in_streaming); /* determine the toplevel transaction */ - toptxn = (txn->toptxn) ? txn->toptxn : txn; + toptxn = get_toptxn(txn); Assert(rbtxn_is_streamed(toptxn)); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 215d149..f7234ef 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -296,6 +296,9 @@ typedef struct ReorderBufferTXN XLogRecPtr end_lsn; /* Toplevel transaction for this subxact (NULL for top-level). */ +#define isa_toptxn(rbtxn) (rbtxn->toptxn == NULL) +#define isa_subtxn(rbtxn) (rbtxn->toptxn != NULL) +#define get_toptxn(rbtxn) (isa_subtxn(rbtxn) ? rbtxn->toptxn : rbtxn) struct ReorderBufferTXN *toptxn; /* -- 1.8.3.1