From ee9e1dc7472c8ff30c31357290114e80c6aa4e8d Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 23 Apr 2021 14:23:28 +0530 Subject: [PATCH v2] Update decoding stats during replication slot release. Currently, replication slot statistics are updated at prepare, commit, and rollback. Now, if the transaction is interrupted the stats might not get updated. Fixed this by updating statistics during replication slot release. Moved the statistics related variables from ReorderBuffer to ReplicationSlot structure to make them accessible at ReplicationSlotRelease. Reported-by: Andres Freund Author: Vignesh C Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de --- src/backend/replication/logical/decode.c | 12 ++-- src/backend/replication/logical/logical.c | 55 +++++++++---------- .../replication/logical/reorderbuffer.c | 39 +++++++------ src/backend/replication/slot.c | 26 +++++++++ src/include/replication/logical.h | 2 +- src/include/replication/reorderbuffer.h | 23 -------- src/include/replication/slot.h | 24 ++++++++ 7 files changed, 104 insertions(+), 77 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7924581cdcd..312fab07985 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -748,9 +748,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* * Update the decoding stats at transaction prepare/commit/abort. It is * not clear that sending more or less frequently than this would be - * better. + * better. We do send the stats in ReplicationSlotRelease to avoid losing + * in case the decoding is interrupted. */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(); } /* @@ -830,9 +831,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* * Update the decoding stats at transaction prepare/commit/abort. It is * not clear that sending more or less frequently than this would be - * better. + * better. We do send the stats in ReplicationSlotRelease to avoid losing + * in case the decoding is interrupted. */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(); } @@ -889,7 +891,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* update the decoding stats */ - UpdateDecodingStats(ctx); + UpdateDecodingStats(); } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 35b0c676412..156a9780f20 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1770,44 +1770,39 @@ ResetLogicalStreamingState(void) * Report stats for a slot. */ void -UpdateDecodingStats(LogicalDecodingContext *ctx) +UpdateDecodingStats() { - ReorderBuffer *rb = ctx->reorder; PgStat_ReplSlotStats repSlotStat; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(MyReplicationSlot != NULL); /* Nothing to do if we don't have any replication stats to be sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) + if (slot->spillBytes <= 0 && slot->streamBytes <= 0 && slot->totalBytes <= 0) return; elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld", - rb, - (long long) rb->spillTxns, - (long long) rb->spillCount, - (long long) rb->spillBytes, - (long long) rb->streamTxns, - (long long) rb->streamCount, - (long long) rb->streamBytes, - (long long) rb->totalTxns, - (long long) rb->totalBytes); - - namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name)); - repSlotStat.spill_txns = rb->spillTxns; - repSlotStat.spill_count = rb->spillCount; - repSlotStat.spill_bytes = rb->spillBytes; - repSlotStat.stream_txns = rb->streamTxns; - repSlotStat.stream_count = rb->streamCount; - repSlotStat.stream_bytes = rb->streamBytes; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; + slot, + (long long) slot->spillTxns, + (long long) slot->spillCount, + (long long) slot->spillBytes, + (long long) slot->streamTxns, + (long long) slot->streamCount, + (long long) slot->streamBytes, + (long long) slot->totalTxns, + (long long) slot->totalBytes); + + namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name)); + repSlotStat.spill_txns = slot->spillTxns; + repSlotStat.spill_count = slot->spillCount; + repSlotStat.spill_bytes = slot->spillBytes; + repSlotStat.stream_txns = slot->streamTxns; + repSlotStat.stream_count = slot->streamCount; + repSlotStat.stream_bytes = slot->streamBytes; + repSlotStat.total_txns = slot->totalTxns; + repSlotStat.total_bytes = slot->totalBytes; pgstat_report_replslot(&repSlotStat); - rb->spillTxns = 0; - rb->spillCount = 0; - rb->spillBytes = 0; - rb->streamTxns = 0; - rb->streamCount = 0; - rb->streamBytes = 0; - rb->totalTxns = 0; - rb->totalBytes = 0; + InitializeSlotStats(slot); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5cb484f0323..700ccf542c9 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -344,15 +344,6 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; - buffer->spillTxns = 0; - buffer->spillCount = 0; - buffer->spillBytes = 0; - buffer->streamTxns = 0; - buffer->streamCount = 0; - buffer->streamBytes = 0; - buffer->totalTxns = 0; - buffer->totalBytes = 0; - buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); @@ -1316,6 +1307,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) ReorderBufferChange *change; ReorderBufferIterTXNEntry *entry; int32 off; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(MyReplicationSlot != NULL); /* nothing there anymore */ if (state->heap->bh_size == 0) @@ -1369,7 +1363,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) * Update the total bytes processed before releasing the current set * of changes and restoring the new set of changes. */ - rb->totalBytes += rb->size; + slot->totalBytes += rb->size; if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { @@ -2018,6 +2012,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *volatile specinsert = NULL; volatile bool stream_started = false; ReorderBufferTXN *volatile curtxn = NULL; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(MyReplicationSlot != NULL); /* build data to be able to lookup the CommandIds of catalog tuples */ ReorderBufferBuildTupleCidHash(rb, txn); @@ -2380,9 +2377,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * which we have already accounted in ReorderBufferIterTXNNext. */ if (!rbtxn_is_streamed(txn)) - rb->totalTxns++; + slot->totalTxns++; - rb->totalBytes += rb->size; + slot->totalBytes += rb->size; /* * Done with current changes, send the last message for this set of @@ -3482,6 +3479,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) XLogSegNo curOpenSegNo = 0; Size spilled = 0; Size size = txn->size; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(MyReplicationSlot != NULL); elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -3543,11 +3543,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* update the statistics iff we have spilled anything */ if (spilled) { - rb->spillCount += 1; - rb->spillBytes += size; + slot->spillCount += 1; + slot->spillBytes += size; /* don't consider already serialized transactions */ - rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + slot->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; } Assert(spilled == txn->nentries_mem); @@ -3818,6 +3818,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) CommandId command_id; Size stream_bytes; bool txn_is_streamed; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(MyReplicationSlot != NULL); /* We can never reach here for a subtransaction. */ Assert(txn->toptxn == NULL); @@ -3911,11 +3914,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); - rb->streamCount += 1; - rb->streamBytes += stream_bytes; + slot->streamCount += 1; + slot->streamBytes += stream_bytes; /* Don't consider already streamed transaction. */ - rb->streamTxns += (txn_is_streamed) ? 0 : 1; + slot->streamTxns += (txn_is_streamed) ? 0 : 1; Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f61b163f78d..6fde2bc8676 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -44,6 +44,7 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/slot.h" #include "storage/fd.h" #include "storage/proc.h" @@ -295,6 +296,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_xmin_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; + InitializeSlotStats(slot); /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -500,6 +502,15 @@ ReplicationSlotRelease(void) Assert(slot != NULL && slot->active_pid != 0); + if (SlotIsLogical(slot)) + { + /* + * Update the decoding stats. This avoids losing them if the decoding + * is interrupted. + */ + UpdateDecodingStats(); + } + if (slot->data.persistency == RS_EPHEMERAL) { /* @@ -1782,6 +1793,7 @@ RestoreSlotFromDisk(const char *name) slot->candidate_xmin_lsn = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; + InitializeSlotStats(slot); slot->in_use = true; slot->active_pid = 0; @@ -1795,3 +1807,17 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* Initialize Replication Slot stats */ +void +InitializeSlotStats(ReplicationSlot *slot) +{ + slot->spillTxns = 0; + slot->spillCount = 0; + slot->spillBytes = 0; + slot->streamTxns = 0; + slot->streamCount = 0; + slot->streamBytes = 0; + slot->totalTxns = 0; + slot->totalBytes = 0; +} diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 7dfcb7be187..44baec63114 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -134,6 +134,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); -extern void UpdateDecodingStats(LogicalDecodingContext *ctx); +extern void UpdateDecodingStats(void); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bfab8303ee7..c8295fc9edf 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -602,29 +602,6 @@ struct ReorderBuffer /* memory accounting */ Size size; - - /* - * Statistics about transactions spilled to disk. - * - * A single transaction may be spilled repeatedly, which is why we keep - * two different counters. For spilling, the transaction counter includes - * both toplevel transactions and subtransactions. - */ - int64 spillTxns; /* number of transactions spilled to disk */ - int64 spillCount; /* spill-to-disk invocation counter */ - int64 spillBytes; /* amount of data spilled to disk */ - - /* Statistics about transactions streamed to the decoding output plugin */ - int64 streamTxns; /* number of transactions streamed */ - int64 streamCount; /* streaming invocation counter */ - int64 streamBytes; /* amount of data streamed */ - - /* - * Statistics about all the transactions sent to the decoding output - * plugin - */ - int64 totalTxns; /* total number of transactions sent */ - int64 totalBytes; /* total amount of data sent */ }; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c50df..267eed60aef 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -172,6 +172,29 @@ typedef struct ReplicationSlot XLogRecPtr candidate_xmin_lsn; XLogRecPtr candidate_restart_valid; XLogRecPtr candidate_restart_lsn; + + /* + * Statistics about transactions spilled to disk. + * + * A single transaction may be spilled repeatedly, which is why we keep + * two different counters. For spilling, the transaction counter includes + * both toplevel transactions and subtransactions. + */ + int64 spillTxns; /* number of transactions spilled to disk */ + int64 spillCount; /* spill-to-disk invocation counter */ + int64 spillBytes; /* amount of data spilled to disk */ + + /* Statistics about transactions streamed to the decoding output plugin */ + int64 streamTxns; /* number of transactions streamed */ + int64 streamCount; /* streaming invocation counter */ + int64 streamBytes; /* amount of data streamed */ + + /* + * Statistics about all the transactions sent to the decoding output + * plugin + */ + int64 totalTxns; /* total number of transactions sent */ + int64 totalBytes; /* total amount of data sent */ } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) @@ -231,5 +254,6 @@ extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void InitializeSlotStats(ReplicationSlot *slot); #endif /* SLOT_H */ -- 2.28.0.windows.1