From 533c45c4cbd4545350d464bbf7a2df91ee668a75 Mon Sep 17 00:00:00 2001 From: vignesh Date: Tue, 27 Apr 2021 10:56:02 +0530 Subject: [PATCH v4] Update replication statistics after every stream/spill. Currently, replication slot statistics are updated at prepare, commit, and rollback. Now, if the transaction is interrupted the stats might not get updated. Fixed this by updating replication statistics after every stream/spill. --- src/backend/replication/logical/decode.c | 14 ++++++++------ src/backend/replication/logical/reorderbuffer.c | 6 ++++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7924581cdc..888e064ec0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* - * Update the decoding stats at transaction prepare/commit/abort. It is - * not clear that sending more or less frequently than this would be - * better. + * Update the decoding stats at transaction prepare/commit/abort. + * Additionally we send the stats when we spill or stream the changes to + * avoid losing them in case the decoding is interrupted. It is not clear + * that sending more or less frequently than this would be better. */ UpdateDecodingStats(ctx); } @@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid); /* - * Update the decoding stats at transaction prepare/commit/abort. It is - * not clear that sending more or less frequently than this would be - * better. + * Update the decoding stats at transaction prepare/commit/abort. + * Additionally we send the stats when we spill or stream the changes to + * avoid losing them in case the decoding is interrupted. It is not clear + * that sending more or less frequently than this would be better. */ UpdateDecodingStats(ctx); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c27f710053..ceb83bcbf9 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3551,6 +3551,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* don't consider already serialized transactions */ rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + + /* update the decoding stats */ + UpdateDecodingStats(rb->private_data); } Assert(spilled == txn->nentries_mem); @@ -3920,6 +3923,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Don't consider already streamed transaction. */ rb->streamTxns += (txn_is_streamed) ? 0 : 1; + /* update the decoding stats */ + UpdateDecodingStats(rb->private_data); + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); -- 2.25.1