diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index 72fbb270334..13d2be24e18 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -165,10 +165,10 @@ SELECT pg_stat_force_next_flush(); (1 row) -SELECT slot_name, spill_txns, spill_count, mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; +SELECT slot_name, spill_txns, spill_count, mem_exceeded_count > 0 as mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; slot_name | spill_txns | spill_count | mem_exceeded_count ---------------------------------+------------+-------------+-------------------- - regression_slot_stats4_twophase | 0 | 0 | 1 + regression_slot_stats4_twophase | 0 | 0 | t (1 row) DROP TABLE stats_test; diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 9964a8efb87..b9aae0c7b63 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -65,7 +65,7 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophas -- Verify that the decoding doesn't spill already-aborted transaction's changes. SELECT pg_stat_force_next_flush(); -SELECT slot_name, spill_txns, spill_count, mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; +SELECT slot_name, spill_txns, spill_count, mem_exceeded_count > 0 as mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase'; DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot_stats1'), diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 6e72864804e..094b928cf35 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3899,18 +3899,26 @@ static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - bool memory_limit_reached = (rb->size >= logical_decoding_work_mem * (Size) 1024); + bool update_stats = true; - if (memory_limit_reached) + if (rb->size >= logical_decoding_work_mem * (Size) 1024) + { + /* + * Update the statistics as the memory usage has reached the limit. We + * report the statistics update later in this function since we can + * update the slot statistics altogether while streaming or + * serializing transactions in most cases. + */ rb->memExceededCount += 1; - - /* - * Bail out if debug_logical_replication_streaming is buffered and we - * haven't exceeded the memory limit. - */ - if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED && - !memory_limit_reached) + } + else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED) + { + /* + * Bail out if debug_logical_replication_streaming is buffered and we + * haven't exceeded the memory limit. + */ return; + } /* * If debug_logical_replication_streaming is immediate, loop until there's @@ -3970,8 +3978,14 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) */ Assert(txn->size == 0); Assert(txn->nentries_mem == 0); + + /* We've reported memExceededCount update */ + update_stats = false; } + if (update_stats) + UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); + /* We must be under the memory limit now. */ Assert(rb->size < logical_decoding_work_mem * (Size) 1024); }