From 370d4d740eb011f69cbd9a656dee53c7cdad3211 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 29 Oct 2018 22:16:02 -0700 Subject: [PATCH 3/3] WIP: global barriers. Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/postmaster/autovacuum.c | 3 +- src/backend/postmaster/bgwriter.c | 4 + src/backend/postmaster/checkpointer.c | 4 + src/backend/postmaster/startup.c | 3 + src/backend/postmaster/walwriter.c | 2 + src/backend/replication/walreceiver.c | 5 +- src/backend/storage/buffer/bufmgr.c | 4 + src/backend/storage/ipc/procsignal.c | 138 ++++++++++++++++++++++++++ src/backend/storage/lmgr/proc.c | 20 ++++ src/backend/tcop/postgres.c | 7 ++ src/include/storage/proc.h | 9 ++ src/include/storage/procsignal.h | 23 ++++- 12 files changed, 217 insertions(+), 5 deletions(-) diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 978089575b8..e821adcb4f3 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -651,8 +651,9 @@ AutoVacLauncherMain(int argc, char *argv[]) ResetLatch(MyLatch); - /* Process sinval catchup interrupts that happened while sleeping */ + /* Process pending interrupts. */ ProcessCatchupInterrupt(); + ProcessGlobalBarrierIntterupt(); /* * Emergency bailout if postmaster has died. This is to avoid the diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 87157b543fa..cd82cce4938 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -262,6 +262,10 @@ BackgroundWriterMain(void) proc_exit(0); /* done */ } + /* Process all pending interrupts. */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); + /* * Do one cycle of dirty-buffer writing. */ diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 31c9644759a..25e741d9869 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -349,6 +349,10 @@ CheckpointerMain(void) /* Clear any already-pending wakeups */ ResetLatch(MyLatch); + /* Process all pending interrupts. */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); + /* * Process any requests or signals received recently. */ diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index b8d5f5a2073..2e4b0d55056 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -151,6 +151,9 @@ HandleStartupProcInterrupts(void) */ if (IsUnderPostmaster && !PostmasterIsAlive()) exit(1); + + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); } diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index 62c239e4a2c..e36198ce0d4 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -260,6 +260,8 @@ WalWriterMain(void) /* Normal exit from the walwriter is here */ proc_exit(0); /* done */ } + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); /* * Do what we're here for; then, if XLogBackgroundFlush() found useful diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 5ec471259f9..4f6838b2a5f 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -155,9 +155,8 @@ static void ProcessWalRcvInterrupts(void) { /* - * Although walreceiver interrupt handling doesn't use the same scheme as - * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive - * any incoming signals on Win32. + * The CHECK_FOR_INTERRUPTS() call ensures global barriers are handled, + * and incoming signals on Win32 are received. */ CHECK_FOR_INTERRUPTS(); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 01eabe57063..02e3b9ac396 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1877,6 +1877,10 @@ BufferSync(int flags) cur_tsid = CkptBufferIds[i].tsId; + /* XXX: need a more principled approach here */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); + /* * Grow array of per-tablespace status structs, every time a new * tablespace is found. diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index b0dd7d1b377..c12a6e85cb0 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -18,6 +18,7 @@ #include #include "access/parallel.h" +#include "access/twophase.h" #include "commands/async.h" #include "miscadmin.h" #include "replication/walsender.h" @@ -62,9 +63,11 @@ typedef struct static ProcSignalSlot *ProcSignalSlots = NULL; static volatile ProcSignalSlot *MyProcSignalSlot = NULL; +volatile sig_atomic_t GlobalBarrierInterruptPending = false; static bool CheckProcSignal(ProcSignalReason reason); static void CleanupProcSignalState(int status, Datum arg); +static void HandleGlobalBarrierSignal(void); /* * ProcSignalShmemSize @@ -262,6 +265,8 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) { int save_errno = errno; + pg_read_barrier(); + if (CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT)) HandleCatchupInterrupt(); @@ -292,9 +297,142 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_GLOBAL_BARRIER)) + HandleGlobalBarrierSignal(); + SetLatch(MyLatch); latch_sigusr1_handler(); errno = save_errno; } + +/* + * + */ +uint64 +EmitGlobalBarrier(GlobalBarrierKind kind) +{ + uint64 generation; + + /* + * Broadcast flag, without incrementing generation. This ensures that all + * backends could know about this. + * + * It's OK if the to-be-signalled backend enters after our check here. A + * new backend should have current settings. + */ + for (int i = 0; i < (MaxBackends + max_prepared_xacts); i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + + if (proc->pid == 0) + continue; + + pg_atomic_fetch_or_u32(&proc->barrierFlags, (uint32) kind); + + elog(LOG, "setting flags for %u", proc->pid); + } + + /* + * Broadcast flag generation. If any backend joins after this, it's either + * going to be signalled below, or has read a new enough generation that + * WaitForGlobalBarrier() will not wait for it. + */ + generation = pg_atomic_add_fetch_u64(&ProcGlobal->globalBarrierGen, 1); + + /* Wake up each backend (including ours) */ + for (int i = 0; i < NumProcSignalSlots; i++) + { + ProcSignalSlot *slot = &ProcSignalSlots[i]; + + if (slot->pss_pid == 0) + continue; + + /* Atomically set the proper flag */ + slot->pss_signalFlags[PROCSIG_GLOBAL_BARRIER] = true; + + pg_write_barrier(); + + /* Send signal */ + kill(slot->pss_pid, SIGUSR1); + } + + return generation; +} + +/* + * Wait for all barriers to be absorbed. This guarantees that all changes + * requested by a specific EmitGlobalBarrier() have taken effect. + */ +void +WaitForGlobalBarrier(uint64 generation) +{ + for (int i = 0; i < (MaxBackends + max_prepared_xacts); i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + uint64 oldval; + + pg_memory_barrier(); + oldval = pg_atomic_read_u64(&proc->barrierGen); + + /* + * Unused proc slots get their barrierGen set to UINT64_MAX, so we + * need not care about that. + */ + while (oldval < generation) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000); + + pg_memory_barrier(); + oldval = pg_atomic_read_u64(&proc->barrierGen); + } + } +} + +/* + * Absorb the global barrier procsignal. + */ +static void +HandleGlobalBarrierSignal(void) +{ + InterruptPending = true; + GlobalBarrierInterruptPending = true; + SetLatch(MyLatch); +} + +/* + * Perform global barrier related interrupt checking. If CHECK_FOR_INTERRUPTS + * is used, it'll be called by that, if a backend type doesn't do so, it has + * to be called explicitly. + */ +void +ProcessGlobalBarrierIntterupt(void) +{ + if (GlobalBarrierInterruptPending) + { + uint64 generation; + uint32 flags; + + GlobalBarrierInterruptPending = false; + + generation = pg_atomic_read_u64(&ProcGlobal->globalBarrierGen); + pg_memory_barrier(); + flags = pg_atomic_exchange_u32(&MyProc->barrierFlags, 0); + pg_memory_barrier(); + + if (flags & GLOBBAR_CHECKSUM) + { + /* + * By virtue of getting here (i.e. interrupts being processed), we + * know that this backend won't have any in-progress writes (which + * might have missed the checksum change). + */ + } + + pg_atomic_write_u64(&MyProc->barrierGen, generation); + + elog(LOG, "processed interrupts for %u", MyProcPid); + } +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6f9aaa52faf..354574ef4a5 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -187,6 +187,7 @@ InitProcGlobal(void) ProcGlobal->checkpointerLatch = NULL; pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO); pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO); + pg_atomic_init_u64(&ProcGlobal->globalBarrierGen, 1); /* * Create and initialize all the PGPROC structures we'll need. There are @@ -267,6 +268,9 @@ InitProcGlobal(void) /* Initialize lockGroupMembers list. */ dlist_init(&procs[i].lockGroupMembers); + + pg_atomic_init_u32(&procs[i].barrierFlags, 0); + pg_atomic_init_u64(&procs[i].barrierGen, PG_UINT64_MAX); } /* @@ -418,6 +422,12 @@ InitProcess(void) MyProc->clogGroupMemberLsn = InvalidXLogRecPtr; pg_atomic_init_u32(&MyProc->clogGroupNext, INVALID_PGPROCNO); + /* pairs with globalBarrierGen increase */ + pg_memory_barrier(); + pg_atomic_write_u32(&MyProc->barrierFlags, 0); + pg_atomic_write_u64(&MyProc->barrierGen, + pg_atomic_read_u64(&ProcGlobal->globalBarrierGen)); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far @@ -561,6 +571,13 @@ InitAuxiliaryProcess(void) MyProc->lwWaitMode = 0; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + + /* pairs with globalBarrierGen increase */ + pg_memory_barrier(); + pg_atomic_write_u32(&MyProc->barrierFlags, 0); + pg_atomic_write_u64(&MyProc->barrierGen, + pg_atomic_read_u64(&ProcGlobal->globalBarrierGen)); + #ifdef USE_ASSERT_CHECKING { int i; @@ -859,6 +876,9 @@ ProcKill(int code, Datum arg) LWLockRelease(leader_lwlock); } + pg_atomic_write_u32(&MyProc->barrierFlags, 0); + pg_atomic_write_u64(&MyProc->barrierGen, PG_UINT64_MAX); + /* * Reset MyLatch to the process local one. This is so that signal * handlers et al can continue using the latch after the shared latch diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 6e13d14fcd0..806c66bc320 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -589,6 +589,10 @@ ProcessClientWriteInterrupt(bool blocked) CHECK_FOR_INTERRUPTS(); } + /* safe to handle during client communication */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); + errno = save_errno; } @@ -3123,6 +3127,9 @@ ProcessInterrupts(void) if (ParallelMessagePending) HandleParallelMessages(); + + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); } diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index cb613c8076e..1280c4f41f1 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -203,6 +203,13 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * Support for "super barriers". These can be used to e.g. make sure that + * all backends have acknowledged a configuration change. + */ + pg_atomic_uint64 barrierGen; + pg_atomic_uint32 barrierFlags; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ @@ -270,6 +277,8 @@ typedef struct PROC_HDR int startupProcPid; /* Buffer id of the buffer that Startup process waits for pin on, or -1 */ int startupBufferPinWaitBufId; + + pg_atomic_uint64 globalBarrierGen; } PROC_HDR; extern PGDLLIMPORT PROC_HDR *ProcGlobal; diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 6db0d69b71f..388eba83807 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -14,8 +14,9 @@ #ifndef PROCSIGNAL_H #define PROCSIGNAL_H -#include "storage/backendid.h" +#include +#include "storage/backendid.h" /* * Reasons for signalling a Postgres child process (a backend or an auxiliary @@ -42,6 +43,8 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + PROCSIG_GLOBAL_BARRIER, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; @@ -57,4 +60,22 @@ extern int SendProcSignal(pid_t pid, ProcSignalReason reason, extern void procsignal_sigusr1_handler(SIGNAL_ARGS); +/* + * These collapse. The flag values better be distinct bits. + */ +typedef enum GlobalBarrierKind +{ + /* + * Guarantee that all processes have the correct view of whether checksums + * enabled/disabled, and no writes are in-progress with previous value(s). + */ + GLOBBAR_CHECKSUM = 1 << 0 +} GlobalBarrierKind; + +extern uint64 EmitGlobalBarrier(GlobalBarrierKind kind); +extern void WaitForGlobalBarrier(uint64 generation); +extern void ProcessGlobalBarrierIntterupt(void); + +extern PGDLLIMPORT volatile sig_atomic_t GlobalBarrierInterruptPending; + #endif /* PROCSIGNAL_H */ -- 2.18.0.rc2.dirty