From 3cfce047ff0bbcdfddc7122a4b637f9d61f334a4 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 26 Jan 2024 11:40:03 +0900 Subject: [PATCH v1 4/4] Batch memory counter updates in ReorderBuffer. Commit XXX improved the algorith of selecting the largest transaction among top-level and sub transactions by using a max-heap. It in terns required for memory counter updates to also update the max-heap, which is O(log n), where n is the number of transactions. In order to reduce the number of times for updating the binaryheap, this commits batches memory counter updates where available. For instance, when cleaning up a transaction, we sum up the total amount of changes we freed, and then update the memory counter once. XXX: if the performance test on cases where memory counter updates happen quite often showed regressions, we need this patch. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch-through: --- .../replication/logical/reorderbuffer.c | 116 ++++++++++++------ 1 file changed, 80 insertions(+), 36 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1228e3e0d0..ca60e7b984 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -252,7 +252,7 @@ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *tx int fd, ReorderBufferChange *change); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno); -static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, +static Size ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -295,6 +295,9 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz); +static void ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, + ReorderBufferTXN *txn, + bool addition, Size sz); static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg); /* @@ -1509,6 +1512,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { bool found; dlist_mutable_iter iter; + Size freed_bytes = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1538,9 +1542,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); - ReorderBufferReturnChange(rb, change, true); + freed_bytes += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } + /* Update memory statistics for this txn entry */ + if (freed_bytes > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes); + + /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -1555,7 +1565,8 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(change->txn == txn); Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); - ReorderBufferReturnChange(rb, change, true); + /* Tuple CID changes are ignored for updating memory counter. */ + ReorderBufferReturnChange(rb, change, false); } /* @@ -1616,6 +1627,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; + Size freed_bytes = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1648,9 +1660,14 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* remove the change from it's containing list */ dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + freed_bytes += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } + /* Update memory statistics for this txn entry */ + if (freed_bytes > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes); + /* * Mark the transaction as streamed. * @@ -1689,7 +1706,8 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* Remove the change from its containing list. */ dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + /* Tuple CID changes are ignored for updating memory counter. */ + ReorderBufferReturnChange(rb, change, false); } } @@ -3170,26 +3188,14 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, } /* - * Update memory counters to account for the new or removed change. - * - * We update two counters - in the reorder buffer, and in the transaction - * containing the change. The reorder buffer counter allows us to quickly - * decide if we reached the memory limit, the transaction counter allows - * us to quickly pick the largest transaction for eviction. - * - * When streaming is enabled, we need to update the toplevel transaction - * counters instead - we don't really care about subtransactions as we - * can't stream them individually anyway, and we only pick toplevel - * transactions for eviction. So only toplevel transactions matter. + * A wrapper function for ReorderBufferTXNMemoryUpdate() to update memory + * counters to account for the new or removed change. */ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz) { - ReorderBufferTXN *txn; - ReorderBufferTXN *toptxn; - Assert(change->txn); /* @@ -3200,7 +3206,27 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) return; - txn = change->txn; + ReorderBufferTXNMemoryUpdate(rb, change->txn, addition, sz); +} + +/* + * Update memory counters of the given transaction. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. + */ +static void +ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool addition, Size sz) +{ + ReorderBufferTXN *toptxn; /* * Update the total size in top level as well. This is later used to @@ -3656,6 +3682,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) int fd = -1; XLogSegNo curOpenSegNo = 0; Size spilled = 0; + Size freed_bytes = 0; Size size = txn->size; elog(DEBUG2, "spill %u changes in XID %u to disk", @@ -3710,11 +3737,15 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferSerializeChange(rb, txn, fd, change); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + freed_bytes += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); spilled++; } + if (freed_bytes > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes); + /* update the statistics iff we have spilled anything */ if (spilled) { @@ -4197,6 +4228,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno) { Size restored = 0; + Size restored_bytes = 0; + Size freed_bytes = 0; XLogSegNo last_segno; dlist_mutable_iter cleanup_iter; File *fd = &file->vfd; @@ -4210,12 +4243,16 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *cleanup = dlist_container(ReorderBufferChange, node, cleanup_iter.cur); + freed_bytes += ReorderBufferChangeSize(cleanup); dlist_delete(&cleanup->node); - ReorderBufferReturnChange(rb, cleanup, true); + ReorderBufferReturnChange(rb, cleanup, false); } txn->nentries_mem = 0; Assert(dlist_is_empty(&txn->changes)); + if (freed_bytes > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes); + XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size); while (restored < max_changes_in_memory && *segno <= last_segno) @@ -4320,22 +4357,32 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * ok, read a full change from disk, now restore it into proper * in-memory format */ - ReorderBufferRestoreChange(rb, txn, rb->outbuf); + restored_bytes += ReorderBufferRestoreChange(rb, txn, rb->outbuf); restored++; } + /* + * Update memory accounting for the restored change. We need to do this + * although we don't check the memory limit when restoring the changes in + * this branch (we only do that when initially queueing the changes after + * decoding), because we will release the changes later, and that will + * update the accounting too (subtracting the size from the counters). And + * we don't want to underflow there. + */ + ReorderBufferTXNMemoryUpdate(rb, txn, true, restored_bytes); + return restored; } /* * Convert change from its on-disk format to in-memory format and queue it onto - * the TXN's ->changes list. + * the TXN's ->changes list. Return the size of the restored change. * * Note: although "data" is declared char*, at entry it points to a * maxalign'd buffer, making it safe in most of this function to assume * that the pointed-to data is suitably aligned for direct access. */ -static void +static Size ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data) { @@ -4488,16 +4535,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, dlist_push_tail(&txn->changes, &change->node); txn->nentries_mem++; - /* - * Update memory accounting for the restored change. We need to do this - * although we don't check the memory limit when restoring the changes in - * this branch (we only do that when initially queueing the changes after - * decoding), because we will release the changes later, and that will - * update the accounting too (subtracting the size from the counters). And - * we don't want to underflow there. - */ - ReorderBufferChangeMemoryUpdate(rb, change, true, - ReorderBufferChangeSize(change)); + return ReorderBufferChangeSize(change); } /* @@ -4931,6 +4969,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL) { dlist_mutable_iter it; + Size freed_bytes = 0; if (ent->reconstructed != NULL) pfree(ent->reconstructed); @@ -4940,9 +4979,14 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferChange *change = dlist_container(ReorderBufferChange, node, it.cur); + freed_bytes += ReorderBufferChangeSize(change); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + ReorderBufferReturnChange(rb, change, false); } + + /* Update memory statistics for this txn entry */ + if (freed_bytes > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes); } hash_destroy(txn->toast_hash); -- 2.39.3