From 889d0dc3a3ff203fd382e5020029a78b9334c586 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 26 Jan 2024 11:31:41 +0900 Subject: [PATCH v5 4/5] Use max-heap to evict largest transactions in ReorderBuffer. Previously, when selecting the transaction to evict, we check all transactions to find the largest transaction. Which could lead to a significant replication lag especially in case where there are many subtransactions. This commit improves the eviction algorithm in ReorderBuffer using the max-heap with transaction size as the key to find the largest transaction. The max-heap state is maneged in two states. Overall algorithm: REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do not update the max-heap when updating the memory counter. We build the max-heap just before selecting large transactions. Therefore, in this state, we can update the memory counter with no additional costs but need O(n) time to get the largest transaction, where n is the number of transactions including top-level transactions and subtransactions. Once we build the max-heap, we switch to REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update the max-heap when updating the memory counter. The intention is to efficiently retrieve the largest transaction in O(1) time instead of incurring the cost of memory counter updates (O(log n)). We remain in this state as long as the number of transactions is larger than the threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap. The performance benchmark results showed significant speed up (more than x30 speed up on my machine) in decoding a transaction with 100k subtransactions, whereas there is no visible overhead in other cases. XXX: update typedef.list Author: Reviewed-by: Discussion: https://postgr.es/m/ --- .../replication/logical/reorderbuffer.c | 197 +++++++++++++++--- src/include/replication/reorderbuffer.h | 21 ++ 2 files changed, 189 insertions(+), 29 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 91b9618d7ec..f22cf2fb9b8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -67,6 +67,26 @@ * allocator, evicting the oldest changes would make it more likely the * memory gets actually freed. * + * We use a max-heap with transaction size as the key to efficiently find + * the largest transaction. The max-heap state is managed in two states: + * REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP. + * + * REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do + * not update the max-heap when updating the memory counter. We build the + * max-heap just before selecting large transactions. Therefore, in this + * state, we can update the memory counter with no additional costs but + * need O(n) time to get the largest transaction, where n is the number of + * transactions including top-level transactions and subtransactions. + * + * Once we build the max-heap, we switch to + * REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update + * the max-heap when updating the memory counter. The intention is to + * efficiently retrieve the largest transaction in O(1) time instead of + * incurring the cost of memory counter updates (O(log n)). We remain in + * this state as long as the number of transactions is larger than the + * threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back + * to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap. + * * We still rely on max_changes_in_memory when loading serialized changes * back into memory. At that point we can't use the memory limit directly * as we load the subxacts independently. One option to deal with this @@ -109,6 +129,11 @@ #include "utils/rel.h" #include "utils/relfilenumbermap.h" +/* + * The threshold of the number of transactions in the max-heap (rb->txn_heap) + * to switch the state. + */ +#define REORDER_BUFFER_MEM_TRACK_THRESHOLD 1024 /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt @@ -296,6 +321,9 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz); +static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg); +static void ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool addition, Size sz); /* * Allocate a new ReorderBuffer and clean out any old serialized state from @@ -357,6 +385,15 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; + /* + * Don't start with a lower number than REORDER_BUFFER_MEM_TRACK_THRESHOLD, since + * we add at least REORDER_BUFFER_MEM_TRACK_THRESHOLD entries at once. + */ + buffer->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP; + buffer->txn_heap = binaryheap_allocate(REORDER_BUFFER_MEM_TRACK_THRESHOLD * 2, + ReorderBufferTXNSizeCompare, + true, NULL); + buffer->spillTxns = 0; buffer->spillCount = 0; buffer->spillBytes = 0; @@ -1500,6 +1537,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { bool found; dlist_mutable_iter iter; + Size mem_freed = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1529,9 +1567,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); - ReorderBufferReturnChange(rb, change, true); + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } + /* Update the memory counter */ + Assert(mem_freed == txn->size); + if (mem_freed > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); + /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -1590,6 +1634,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* deallocate */ ReorderBufferReturnTXN(rb, txn); + + /* + * Check if the number of transactions get lower than the threshold. If + * so, switch to NO_MAXHEAP state and reset the max-heap. + * + * XXX: If a new transaction is added and the memory usage reached the + * limit soon, we will end up building the max-heap again. It might be + * more efficient if we accept a certain amount of transactions to switch + * back to the NO_MAXHEAP state, say 95% of the threshold. + */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP && + (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD)) + { + rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP; + binaryheap_reset(rb->txn_heap); + } } /* @@ -3162,16 +3222,6 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, /* * Update memory counters to account for the new or removed change. - * - * We update two counters - in the reorder buffer, and in the transaction - * containing the change. The reorder buffer counter allows us to quickly - * decide if we reached the memory limit, the transaction counter allows - * us to quickly pick the largest transaction for eviction. - * - * When streaming is enabled, we need to update the toplevel transaction - * counters instead - we don't really care about subtransactions as we - * can't stream them individually anyway, and we only pick toplevel - * transactions for eviction. So only toplevel transactions matter. */ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, @@ -3179,7 +3229,6 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, bool addition, Size sz) { ReorderBufferTXN *txn; - ReorderBufferTXN *toptxn; Assert(change->txn); @@ -3193,6 +3242,28 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, txn = change->txn; + ReorderBufferTXNMemoryUpdate(rb, txn, addition, sz); +} + +/* + * Update memory counter of the given transaction. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. + */ +static void +ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool addition, Size sz) +{ + ReorderBufferTXN *toptxn; + /* * Update the total size in top level as well. This is later used to * compute the decoding stats. @@ -3206,6 +3277,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* Update the total size in the top transaction. */ toptxn->total_size += sz; + + /* Update the max-heap as well if necessary */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP) + { + if ((txn->size - sz) == 0) + binaryheap_add(rb->txn_heap, PointerGetDatum(txn)); + else + binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn)); + } } else { @@ -3215,6 +3295,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* Update the total size in the top transaction. */ toptxn->total_size -= sz; + + /* Update the max-heap as well if necessary */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP) + { + if (txn->size == 0) + binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn)); + else + binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn)); + } } Assert(txn->size <= rb->size); @@ -3472,31 +3561,45 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) /* * Find the largest transaction (toplevel or subxact) to evict (spill to disk). - * - * XXX With many subtransactions this might be quite slow, because we'll have - * to walk through all of them. There are some options how we could improve - * that: (a) maintain some secondary structure with transactions sorted by - * amount of changes, (b) not looking for the entirely largest transaction, - * but e.g. for transaction using at least some fraction of the memory limit, - * and (c) evicting multiple transactions at once, e.g. to free a given portion - * of the memory limit (e.g. 50%). */ static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb) { - HASH_SEQ_STATUS hash_seq; - ReorderBufferTXNByIdEnt *ent; ReorderBufferTXN *largest = NULL; - hash_seq_init(&hash_seq, rb->by_txn); - while ((ent = hash_seq_search(&hash_seq)) != NULL) + /* + * Build the max-heap to pick the largest transaction if not yet. We will + * run a heap assembly step at the end, which is more efficient. + */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP) { - ReorderBufferTXN *txn = ent->txn; + HASH_SEQ_STATUS hash_seq; + ReorderBufferTXNByIdEnt *ent; - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; + hash_seq_init(&hash_seq, rb->by_txn); + while ((ent = hash_seq_search(&hash_seq)) != NULL) + { + ReorderBufferTXN *txn = ent->txn; + + if (txn->size == 0) + continue; + + binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn)); + } + + binaryheap_build(rb->txn_heap); + + /* + * The max-heap is ready now. We remain in this state at least until + * we free up enough transactions to bring the total memory usage + * below the limit. + */ + rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP; } + else + Assert(binaryheap_size(rb->txn_heap) > 0); + + largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap)); Assert(largest); Assert(largest->size > 0); @@ -3638,6 +3741,18 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->nentries_mem == 0); } + /* + * Check the number of transactions in max-heap after evicting large + * transactions. If the number of transactions is small, we switch back + * to the NO_MAXHEAP state, and reset the current the max-heap. + */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP && + (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD)) + { + rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP; + binaryheap_reset(rb->txn_heap); + } + /* We must be under the memory limit now. */ Assert(rb->size < logical_decoding_work_mem * 1024L); } @@ -3654,6 +3769,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) XLogSegNo curOpenSegNo = 0; Size spilled = 0; Size size = txn->size; + Size mem_freed = 0; elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -3707,11 +3823,17 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferSerializeChange(rb, txn, fd, change); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); spilled++; } + /* Update the memory counter */ + Assert(mem_freed == txn->size); + if (mem_freed > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); + /* update the statistics iff we have spilled anything */ if (spilled) { @@ -5273,3 +5395,20 @@ restart: *cmax = ent->cmax; return true; } + +/* + * Compare between sizes of two transactions. This is for a binary heap + * comparison function. + */ +static int +ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg) +{ + ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a); + ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b); + + if (ta->size < tb->size) + return -1; + if (ta->size > tb->size) + return 1; + return 0; +} diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0b2c95f7aa0..f0d352cfcc6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "lib/binaryheap.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -531,6 +532,22 @@ typedef void (*ReorderBufferUpdateProgressTxnCB) ( ReorderBufferTXN *txn, XLogRecPtr lsn); +/* State of how to track the memory usage of each transaction being decoded */ +typedef enum ReorderBufferMemTrackState +{ + /* + * We don't update max-heap while updating the memory counter. The + * max-heap is built before use. + */ + REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP, + + /* + * We also update the max-heap when updating the memory counter so + * the heap property is always preserved. + */ + REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP, +} ReorderBufferMemTrackState; + struct ReorderBuffer { /* @@ -631,6 +648,10 @@ struct ReorderBuffer /* memory accounting */ Size size; + /* Max-heap for sizes of all top-level and sub transactions */ + ReorderBufferMemTrackState memtrack_state; + binaryheap *txn_heap; + /* * Statistics about transactions spilled to disk. * -- 2.43.0