From 10943afd0d57b4dbd97f1ebe896fa7459541c024 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 3 Mar 2020 09:40:10 +0530 Subject: [PATCH v2] Fastpath for sending changes to output plugin in logical decoding In logical decoding before sending the changes to the output plugin, if there is only a one transaction then no need to build the binary heap becasue for one transaction they are already in the LSN order. --- src/backend/replication/logical/reorderbuffer.c | 61 +++++++++++++++++-------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 481277a..c6f1f89 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1037,11 +1037,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, nr_txns++; } - /* - * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no - * need to allocate/build a heap then. - */ - /* allocate iteration state */ state = (ReorderBufferIterTXNState *) MemoryContextAllocZero(rb->context, @@ -1057,10 +1052,11 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].segno = 0; } - /* allocate heap */ - state->heap = binaryheap_allocate(state->nr_txns, - ReorderBufferIterCompare, - state); + /* allocate heap, if we have more than one transaction. */ + if (nr_txns > 1) + state->heap = binaryheap_allocate(state->nr_txns, + ReorderBufferIterCompare, + state); /* Now that the state fields are initialized, it is safe to return it. */ *iter_state = state; @@ -1092,7 +1088,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + /* add to heap, only if we have more than one transaction. */ + if (nr_txns > 1) + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } /* add subtransactions if they contain changes */ @@ -1121,12 +1119,15 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = cur_txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + /* add to heap, only if we have more than one transaction. */ + if (nr_txns > 1) + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } } /* assemble a valid binary heap */ - binaryheap_build(state->heap); + if (nr_txns > 1) + binaryheap_build(state->heap); } /* @@ -1142,11 +1143,24 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) ReorderBufferIterTXNEntry *entry; int32 off; - /* nothing there anymore */ - if (state->heap->bh_size == 0) - return NULL; + /* + * If there is only one transaction then it will be at the offset 0. + * Otherwise get the offset from the binary heap. + */ + if (state->nr_txns == 1) + { + off = 0; + if (state->entries[off].change == NULL) + return NULL; + } + else + { + /* nothing there anymore */ + if (state->heap->bh_size == 0) + return NULL; - off = DatumGetInt32(binaryheap_first(state->heap)); + off = DatumGetInt32(binaryheap_first(state->heap)); + } entry = &state->entries[off]; /* free memory we might have "leaked" in the previous *Next call */ @@ -1176,7 +1190,8 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) state->entries[off].lsn = next_change->lsn; state->entries[off].change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + if (state->nr_txns > 1) + binaryheap_replace_first(state->heap, Int32GetDatum(off)); return change; } @@ -1206,14 +1221,19 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) /* txn stays the same */ state->entries[off].lsn = next_change->lsn; state->entries[off].change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + + if (state->nr_txns > 1) + binaryheap_replace_first(state->heap, Int32GetDatum(off)); return change; } } /* ok, no changes there anymore, remove */ - binaryheap_remove_first(state->heap); + if (state->nr_txns > 1) + binaryheap_remove_first(state->heap); + else + entry->change = NULL; return change; } @@ -1244,7 +1264,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, Assert(dlist_is_empty(&state->old_change)); } - binaryheap_free(state->heap); + if (state->nr_txns > 1) + binaryheap_free(state->heap); pfree(state); } -- 1.8.3.1