diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 00543ede45..f32a2da565 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -51,6 +51,12 @@ typedef struct LogicalErrorCallbackState XLogRecPtr report_location; } LogicalErrorCallbackState; +/* + * Pointing to the currently-used logical decoding context andu sed to sent + * slot statistics on releasing slots. + */ +static LogicalDecodingContext *MyLogicalDecodingContext = NULL; + /* wrappers around output plugin callbacks */ static void output_plugin_error_callback(void *arg); static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -290,6 +296,13 @@ StartupDecodingContext(List *output_plugin_options, MemoryContextSwitchTo(old_context); + /* + * Keep holding the decoding context until freeing the decoding context + * or releasing the logical slot. + */ + Assert(MyLogicalDecodingContext == NULL); + MyLogicalDecodingContext = ctx; + return ctx; } @@ -626,10 +639,12 @@ FreeDecodingContext(LogicalDecodingContext *ctx) if (ctx->callbacks.shutdown_cb != NULL) shutdown_cb_wrapper(ctx); + UpdateDecodingStats(ctx); ReorderBufferFree(ctx->reorder); FreeSnapshotBuilder(ctx->snapshot_builder); XLogReaderFree(ctx->reader); MemoryContextDelete(ctx->context); + MyLogicalDecodingContext = NULL; } /* @@ -1811,3 +1826,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->totalTxns = 0; rb->totalBytes = 0; } + +/* + * The function called at releasing a logical replication slot, sending the + * remaining slot statistics. + */ +void +DecodingContextAtSlotRelease(void) +{ + if (MyLogicalDecodingContext) + { + UpdateDecodingStats(MyLogicalDecodingContext); + MyLogicalDecodingContext = NULL; + } +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cf261e200e..4e9b45c84b 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -45,6 +45,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/slot.h" +#include "replication/logical.h" #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -496,6 +497,9 @@ ReplicationSlotRelease(void) Assert(slot != NULL && slot->active_pid != 0); + if (SlotIsLogical(slot)) + DecodingContextAtSlotRelease(); + if (slot->data.persistency == RS_EPHEMERAL) { /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 7dfcb7be18..c156045020 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -135,5 +135,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); +extern void DecodingContextAtSlotRelease(void); #endif