From 9036702eb645acaf3ec660d511c62b09b816f73e Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Mon, 3 Apr 2017 17:31:19 +0800 Subject: [PATCH 2/3] Support conflict with standby on logical walsender Detect and resolve conflicts between walsenders or SQL-level logical decoding sessions and catalog_xmin advances. Refuse to start decoding from a logical slot whose catalog_xmin is below the cluster-wide known-safe threshold so new sessions cannot start. Slots are not persistently marked as invalid and will continue to hold down xlog and (on master) catalog retention. There is no way to restore them to working order, so the application or administrator must drop them to release resources. --- src/backend/access/heap/heapam.c | 2 +- src/backend/access/transam/xact.c | 6 +- src/backend/access/transam/xlog.c | 3 +- src/backend/replication/logical/logical.c | 135 ++++++++++++++++++++++++++++++ src/backend/replication/slot.c | 4 +- src/backend/replication/walsender.c | 14 +--- src/backend/storage/ipc/procarray.c | 51 +++++++++++ src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 7 +- src/backend/tcop/postgres.c | 43 +++++++--- src/include/storage/procarray.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 3 + 13 files changed, 241 insertions(+), 33 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 0c3e2b0..93bf143 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7273,7 +7273,7 @@ heap_tuple_needs_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid, * ratchet forwards latestRemovedXid to the greatest one found. * This is used as the basis for generating Hot Standby conflicts, so * if a tuple was never visible then removing it should not conflict - * with queries. + * with queries or logical decoding output plugin callbacks. */ void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 63453d7..48ca884 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5662,14 +5662,10 @@ xact_redo(XLogReaderState *record) * notice when we signal them with a recovery conflict. There's no * effect on the catalogs themselves yet, so it's safe for backends * with older catalog_xmins to still exist. - * - * We don't have to take ProcArrayLock since only the startup process - * is allowed to change oldestCatalogXmin when we're in recovery. - * - * Existing sessions are not notified and must check the safe xmin. */ SetOldestCatalogXmin(xlrec->new_catalog_xmin); + ResolveRecoveryConflictWithLogicalDecoding(xlrec->new_catalog_xmin); } else elog(PANIC, "xact_redo: unknown op code %u", info); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8d713e9..a98601a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8546,7 +8546,8 @@ CreateCheckPoint(int flags) InitXLogInsert(); /* Checkpoints are a handy time to update the effective catalog_xmin */ - UpdateOldestCatalogXmin(); + if (XLogInsertAllowed()) + UpdateOldestCatalogXmin(); /* * Acquire CheckpointLock to ensure only one checkpoint happens at a time. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..4a15d55 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "pgstat.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -38,11 +39,14 @@ #include "replication/reorderbuffer.h" #include "replication/origin.h" #include "replication/snapbuild.h" +#include "replication/walreceiver.h" +#include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/memutils.h" +#include "utils/ps_status.h" /* data for errcontext callback */ typedef struct LogicalErrorCallbackState @@ -68,6 +72,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void EnsureActiveLogicalSlotValid(void); + /* * Make sure the current settings & environment are capable of doing logical * decoding. @@ -279,6 +285,16 @@ CreateInitDecodingContext(char *plugin, LWLockRelease(ProcArrayLock); /* + * If this is the first slot created on the master we won't have a + * persistent record of the oldest safe xid for historic snapshots yet. + * Force one to be recorded so that when we go to replay from this slot we + * know it's safe. + */ + if (!RecoveryInProgress() && + !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin)) + UpdateOldestCatalogXmin(); + + /* * tell the snapshot builder to only assemble snapshot once reaching the * running_xact's record with the respective xmin. */ @@ -376,6 +392,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, start_lsn = slot->data.confirmed_flush; } + EnsureActiveLogicalSlotValid(); + ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, read_page, prepare_write, do_write); @@ -963,3 +981,120 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Test to see if the active logical slot is usable. + */ +static void +EnsureActiveLogicalSlotValid(void) +{ + TransactionId shmem_catalog_xmin; + + Assert(MyReplicationSlot != NULL); + + /* + * A logical slot can become unusable if we're doing logical decoding on a + * standby or using a slot created before we were promoted from standby + * to master. If the master advanced its global catalog_xmin past the + * threshold we need it could've removed catalog tuple versions that + * we'll require to start decoding at our restart_lsn. + */ + + LWLockAcquire(ProcArrayLock, LW_SHARED); + shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + LWLockRelease(ProcArrayLock); + + if (!TransactionIdIsValid(shmem_catalog_xmin) || + TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot '%s' requires catalogs removed by master", + NameStr(MyReplicationSlot->data.name)), + errdetail("need catalog_xmin %u, have oldestCatalogXmin %u", + MyReplicationSlot->data.catalog_xmin, shmem_catalog_xmin))); +} + +/* + * Scan to see if any clients are using replication slots that are below a + * newly-applied new catalog_xmin theshold and signal them to terminate with a + * recovery conflict. + */ +void +ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin) +{ + int i; + + if (!InHotStandby) + /* nobody can be actively using logical slots */ + return; + + /* Already applied new limit, can't have replayed later one yet */ + Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin); + + /* + * Find the first conflicting active slot and signal its owning backend + * to exit. We'll be called repeatedly by the recovery code until there + * are no more conflicts. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot; + pid_t active_pid; + + slot = &ReplicationSlotCtl->replication_slots[i]; + + /* + * Physical slots can have a catalog_xmin, but conflicts are the + * problem of the leaf replica with the logical slot. + */ + if (!(slot->in_use && SlotIsLogical(slot))) + continue; + + /* + * We only care about the effective_catalog_xmin of active logical + * slots. Anything else gets checked when a new decoding session tries + * to start. + */ + while (slot->in_use && slot->active_pid != 0 && + TransactionIdIsValid(slot->effective_catalog_xmin) && + (!TransactionIdIsValid(new_catalog_xmin) || + TransactionIdPrecedes(slot->effective_catalog_xmin, new_catalog_xmin))) + { + /* + * We'll be sleeping, so release the control lock. New conflicting + * backends cannot appear and if old ones go away that's what we + * want, so release and re-acquire is OK here. + */ + active_pid = slot->active_pid; + LWLockRelease(ReplicationSlotControlLock); + + if (WaitExceedsMaxStandbyDelay()) + { + ereport(INFO, + (errmsg("terminating logical decoding session due to recovery conflict"), + errdetail("Pid %u requires catalog_xmin %u for replication slot '%s' but the master has removed catalogs up to xid %u.", + active_pid, slot->effective_catalog_xmin, + NameStr(slot->data.name), new_catalog_xmin))); + + /* + * Signal the proc. If the slot is already released or even if + * pid is re-used we don't care, backends are required to + * tolerate spurious recovery signals. + */ + CancelLogicalDecodingSessionWithRecoveryConflict(active_pid); + + /* Don't flood the system with signals */ + pg_usleep(10000); + } + + /* + * We need to re-acquire the lock before re-checking the slot or + * continuing the scan. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + } + + } + LWLockRelease(ReplicationSlotControlLock); +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 6c5ec7a..57a3994 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -48,6 +48,7 @@ #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "storage/standby.h" #include "utils/builtins.h" /* @@ -931,7 +932,8 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. We can't do that on a standby; there we must wait for the + * bgwriter to get around to logging its periodic standby snapshot. * * That's not needed (or indeed helpful) for physical slots as they'll * start replay at the last logged checkpoint anyway. Instead return diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e64054b..5d60e7a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -212,7 +212,6 @@ static struct /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); -static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ @@ -2863,17 +2862,6 @@ WalSndSigHupHandler(SIGNAL_ARGS) errno = save_errno; } -/* SIGUSR1: set flag to send WAL records */ -static void -WalSndXLogSendHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - latch_sigusr1_handler(); - - errno = save_errno; -} - /* SIGUSR2: set flag to do a last cycle and shut down afterwards */ static void WalSndLastCycleHandler(SIGNAL_ARGS) @@ -2907,7 +2895,7 @@ WalSndSignals(void) pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 9e98af8..05e3058 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2762,6 +2762,57 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) } /* + * Notify a logical decoding session that it conflicts with newly set + * catalog_xmin from the master. We're about to start replaying WAL + * that will make its historic snapshot potentially unsafe by removing + * system tuples it might need. + */ +void +CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid) +{ + ProcArrayStruct *arrayP = procArray; + int index; + BackendId backend_id = InvalidBackendId; + + /* + * We have to scan ProcArray to find the process and set a pending recovery + * conflict even though we know the pid. At least we can get the BackendId + * and avoid a ProcSignal scan by SendProcSignal. + * + * The pid might've gone away, in which case we got the desired + * outcome anyway. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + volatile PGPROC *proc = &allProcs[pgprocno]; + + if (proc->pid == session_pid) + { + VirtualTransactionId procvxid; + + GET_VXID_FROM_PGPROC(procvxid, *proc); + + proc->recoveryConflictPending = true; + backend_id = procvxid.backendId; + break; + } + } + + LWLockRelease(ProcArrayLock); + + /* + * Kill the pid if it's still here. If not, that's what we + * wanted so ignore any errors. + */ + if (backend_id != InvalidBackendId) + (void) SendProcSignal(session_pid, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, backend_id); +} + +/* * MinimumActiveBackends --- count backends (other than myself) that are * in active transactions. Return true if the count exceeds the * minimum threshold passed. This is used as a heuristic to decide if diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 4a21d55..16c2e1f 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -273,6 +273,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_TABLESPACE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_TABLESPACE); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 8e57f93..f6106ca 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -29,6 +29,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/standby.h" +#include "replication/slot.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -152,11 +153,13 @@ GetStandbyLimitTime(void) static int standbyWait_us = STANDBY_INITIAL_WAIT_US; /* - * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs. + * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs and + * ResolveRecoveryConflictWithLogicalDecoding. + * * We wait here for a while then return. If we decide we can't wait any * more then we return true, if we can wait some more return false. */ -static bool +bool WaitExceedsMaxStandbyDelay(void) { TimestampTz ltime; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index a2282058..530dcbe 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2276,6 +2276,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: + errdetail("Logical replication slot requires catalog rows that will be removed."); + break; case PROCSIG_RECOVERY_CONFLICT_DATABASE: errdetail("User was connected to a database that must be dropped."); break; @@ -2698,8 +2701,12 @@ SigHupHandler(SIGNAL_ARGS) /* * RecoveryConflictInterrupt: out-of-line portion of recovery conflict * handling following receipt of SIGUSR1. Designed to be similar to die() - * and StatementCancelHandler(). Called only by a normal user backend - * that begins a transaction during recovery. + * and StatementCancelHandler(). + * + * Called by normal user backends running during recovery. Also used by the + * walsender to handle recovery conflicts with logical decoding, and by + * background workers that call CHECK_FOR_INTERRUPTS() and respect recovery + * conflicts. */ void RecoveryConflictInterrupt(ProcSignalReason reason) @@ -2781,6 +2788,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason) /* Intentional drop through to session cancel */ + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: case PROCSIG_RECOVERY_CONFLICT_DATABASE: RecoveryConflictPending = true; ProcDiePending = true; @@ -2795,12 +2803,18 @@ RecoveryConflictInterrupt(ProcSignalReason reason) Assert(RecoveryConflictPending && (QueryCancelPending || ProcDiePending)); /* - * All conflicts apart from database cause dynamic errors where the - * command or transaction can be retried at a later point with some - * potential for success. No need to reset this, since non-retryable - * conflict errors are currently FATAL. + * All conflicts apart from database and catalog_xmin cause dynamic + * errors where the command or transaction can be retried at a later + * point with some potential for success. No need to reset this, since + * non-retryable conflict errors are currently FATAL. + * + * catalog_xmin is non-retryable because once we advance the + * catalog_xmin threshold we might replay wal that removes + * needed catalog tuples. The slot can't (re)start decoding + * because its catalog_xmin cannot be satisifed. */ - if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE) + if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) RecoveryConflictRetryable = false; } @@ -2855,11 +2869,20 @@ ProcessInterrupts(void) } else if (RecoveryConflictPending) { - /* Currently there is only one non-retryable recovery conflict */ - Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE); + int code; + + Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + + if (RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) + /* XXX more appropriate error code? */ + code = ERRCODE_PROGRAM_LIMIT_EXCEEDED; + else + code = ERRCODE_DATABASE_DROPPED; + pgstat_report_recovery_conflict(RecoveryConflictReason); ereport(FATAL, - (errcode(ERRCODE_DATABASE_DROPPED), + (errcode(code), errmsg("terminating connection due to conflict with recovery"), errdetail_recovery_conflict())); } diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 69a82d7..231297d 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -112,6 +112,8 @@ extern int CountUserBackends(Oid roleid); extern bool CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared); +extern void CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid); + extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index d068dde..3a3ba72 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -40,6 +40,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 3ecc446..b17ba6f 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -34,10 +34,13 @@ extern void ResolveRecoveryConflictWithDatabase(Oid dbid); extern void ResolveRecoveryConflictWithLock(LOCKTAG locktag); extern void ResolveRecoveryConflictWithBufferPin(void); +extern void ResolveRecoveryConflictWithLogicalDecoding( + TransactionId new_catalog_xmin); extern void CheckRecoveryConflictDeadlock(void); extern void StandbyDeadLockHandler(void); extern void StandbyTimeoutHandler(void); extern void StandbyLockTimeoutHandler(void); +extern bool WaitExceedsMaxStandbyDelay(void); /* * Standby Rmgr (RM_STANDBY_ID) -- 2.5.5