From fdfe91482d7dd28920db67067d77388ef3871165 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Wed, 16 Mar 2016 15:12:34 +0800 Subject: [PATCH 2/2] Dirty replication slots when confirm_lsn is changed --- src/backend/replication/logical/logical.c | 62 +++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2e6d3f9..40db6ff 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -437,6 +437,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) } ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + ReplicationSlotMarkDirty(); } /* @@ -847,10 +848,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { bool updated_xmin = false; bool updated_restart = false; + bool updated_confirm = false; SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (MyReplicationSlot->data.confirmed_flush != lsn) + { + MyReplicationSlot->data.confirmed_flush = lsn; + updated_confirm = true; + } /* if were past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && @@ -888,34 +894,50 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); - /* first write new xmin to disk, so we know whats up after a crash */ - if (updated_xmin || updated_restart) + if (updated_xmin || updated_restart || updated_confirm) { ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); - } - /* - * Now the new xmin is safely on disk, we can let the global value - * advance. We do not take ProcArrayLock or similar since we only - * advance xmin here and there's not much harm done by a concurrent - * computation missing that. - */ - if (updated_xmin) - { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; - SpinLockRelease(&MyReplicationSlot->mutex); + /* + * first write new xmin to disk, so we know whats up + * after a crash. + */ + if (updated_xmin || updated_restart) + { + ReplicationSlotSave(); + elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); + } - ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only + * advance xmin here and there's not much harm done by a concurrent + * computation missing that. + */ + if (updated_xmin) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } } } else { + bool dirtied = false; + SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (MyReplicationSlot->data.confirmed_flush != lsn) + { + MyReplicationSlot->data.confirmed_flush = lsn; + dirtied = true; + } SpinLockRelease(&MyReplicationSlot->mutex); + + if (dirtied) + ReplicationSlotMarkDirty(); } } -- 2.1.0