From 67a44702ff146756b33e8d15e91a02f5d9e86792 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Fri, 24 Feb 2017 21:39:03 +0100 Subject: [PATCH 1/4] Reserve global xmin for create slot snasphot export Otherwise the VACUUM or pruning might remove tuples still needed by the exported snapshot. --- src/backend/replication/logical/logical.c | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..9062244 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -267,12 +267,18 @@ CreateInitDecodingContext(char *plugin, * the slot machinery about the new limit. Once that's done the * ProcArrayLock can be released as the slot machinery now is * protecting against vacuum. + * + * Note that we only store the global xmin temporarily so that the initial + * snapshot can be exported. After initial snapshot is done global xmin + * should be reset and not tracked anymore. * ---- */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId(); slot->data.catalog_xmin = slot->effective_catalog_xmin; + slot->effective_xmin = slot->effective_catalog_xmin; + slot->data.xmin = slot->effective_catalog_xmin; ReplicationSlotsComputeRequiredXmin(true); @@ -282,7 +288,7 @@ CreateInitDecodingContext(char *plugin, * tell the snapshot builder to only assemble snapshot once reaching the * running_xact's record with the respective xmin. */ - xmin_horizon = slot->data.catalog_xmin; + xmin_horizon = slot->effective_xmin; ReplicationSlotMarkDirty(); ReplicationSlotSave(); @@ -456,12 +462,29 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) void FreeDecodingContext(LogicalDecodingContext *ctx) { + ReplicationSlot *slot = MyReplicationSlot; + if (ctx->callbacks.shutdown_cb != NULL) shutdown_cb_wrapper(ctx); - ReorderBufferFree(ctx->reorder); - FreeSnapshotBuilder(ctx->snapshot_builder); - XLogReaderFree(ctx->reader); + /* + * Cleanup global xmin for the slot that we may have set in + * CreateInitDecodingContext(). We do not take ProcArrayLock or similar + * since we only reset xmin here and there's not much harm done by a + * concurrent computation missing that. + */ + SpinLockAcquire(&slot->mutex); + slot->effective_xmin = InvalidTransactionId; + slot->data.xmin = InvalidTransactionId; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(false); + + if (ctx->reorder) + ReorderBufferFree(ctx->reorder); + if (ctx->snapshot_builder) + FreeSnapshotBuilder(ctx->snapshot_builder); + if (ctx->reader) + XLogReaderFree(ctx->reader); MemoryContextDelete(ctx->context); } -- 2.7.4