diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index d48a13f..898979a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2143,7 +2143,7 @@ include_dir 'conf.d' Specifies whether transaction commit will wait for WAL records to be written to disk before the command returns a success indication to the client. Valid values are on, - remote_write, local, and off. + remote_write, remote_apply, local, and off. The default, and safe, setting is on. When off, there can be a delay between when success is reported to the client and when the transaction is @@ -2177,6 +2177,10 @@ include_dir 'conf.d' ensure data preservation even if the standby instance of PostgreSQL were to crash, but not if the standby suffers an operating-system-level crash. + When set to remote_apply, commits will wait until a reply + from the current synchronous standby indicates it has received the + commit record of the transaction and applied it, so that it has become + visible to queries. When synchronous diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 19d613e..03c6c30 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1081,6 +1081,9 @@ primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless wal_receiver_status_interval is set to zero on the standby. + In the case that synchronous_commit is set to + remote_apply, the standby sends reply messages when the commit + record is replayed, making the transaction visible. If the standby is the first matching standby, as specified in synchronous_standby_names on the primary, the reply messages from that standby will be used to wake users waiting for @@ -1107,6 +1110,14 @@ primary_slot_name = 'node_a_slot' + Setting synchronous_commit to remote_apply will + cause each commit to wait until the current synchronous standby reports + that it has replayed the transaction, making it visible to user queries. + In simple cases, this allows for load balancing with causal consistency + on a single hot standby. + + + Users will stop waiting if a fast shutdown is requested. However, as when using asynchronous replication, the server will not fully shutdown until all outstanding WAL records are transferred to the currently @@ -1160,9 +1171,10 @@ primary_slot_name = 'node_a_slot' Planning for High Availability - Commits made when synchronous_commit is set to on - or remote_write will wait until the synchronous standby responds. The response - may never occur if the last, or only, standby should crash. + Commits made when synchronous_commit is set to on, + remote_write or remote_apply will wait until the + synchronous standby responds. The response may never occur if the last, or + only, standby should crash. diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e7234c8..893c2fa 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1107,7 +1107,7 @@ EndPrepare(GlobalTransaction gxact) * Note that at this stage we have marked the prepare, but still show as * running in the procarray (twice!) and continue to hold locks. */ - SyncRepWaitForLSN(gxact->prepare_end_lsn); + SyncRepWaitForLSN(gxact->prepare_end_lsn, false); records.tail = records.head = NULL; records.num_chunks = 0; @@ -2103,7 +2103,7 @@ RecordTransactionCommitPrepared(TransactionId xid, * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, true); } /* @@ -2156,5 +2156,5 @@ RecordTransactionAbortPrepared(TransactionId xid, * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, true); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 89a14b4..130b56b 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1324,7 +1324,7 @@ RecordTransactionCommit(void) * in the procarray and continue to hold locks. */ if (wrote_xlog && markXidCommitted) - SyncRepWaitForLSN(XactLastRecEnd); + SyncRepWaitForLSN(XactLastRecEnd, true); /* remember end of last commit record */ XactLastCommitEnd = XactLastRecEnd; @@ -5123,6 +5123,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; /* + * Check if the caller would like to ask standbys for immediate feedback + * once this commit is applied. + */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK; + + /* * Relcache invalidations requires information about the current database * and so does logical decoding. */ @@ -5300,6 +5307,13 @@ XactLogAbortRecord(TimestampTz abort_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + /* + * Check if the caller would like to ask standbys for immediate feedback + * once this abort is applied. + */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK; + return XLogInsert(RM_XACT_ID, info); } @@ -5458,6 +5472,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); + /* + * If asked by the primary (because someone is waiting for a synchronous + * commit = remote_apply), we will need to ask walreceiver to send a + * reply immediately. + */ + if (XactCompletionSyncApplyFeedback(parsed->xinfo)) + XLogRequestWalReceiverReply(); } /* @@ -5544,6 +5565,14 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) smgrdounlink(srel, true); smgrclose(srel); } + + /* + * If asked by the primary (because someone is waiting for a synchronous + * commit = remote_apply), we will need to ask walreceiver to send a + * reply immediately. + */ + if (XactCompletionSyncApplyFeedback(parsed->xinfo)) + XLogRequestWalReceiverReply(); } void diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b119a47..3e454f5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -345,6 +345,9 @@ static XLogRecPtr RedoRecPtr; */ static bool doPageWrites; +/* Has the recovery code requested a walreceiver wakeup? */ +static bool doRequestWalReceiverReply; + /* * RedoStartLSN points to the checkpoint's REDO location which is specified * in a backup label file, backup history file or control file. In standby @@ -6879,6 +6882,19 @@ StartupXLOG(void) XLogCtl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&XLogCtl->info_lck); + /* + * If rm_redo reported that it applied a commit record that + * the master is waiting for by calling + * XLogRequestWalReceiverReply, then we wake up the receiver + * so that it notices the updated lastReplayedEndRecPtr and + * sends a reply to the master. + */ + if (doRequestWalReceiverReply) + { + doRequestWalReceiverReply = false; + WalRcvWakeup(); + } + /* Remember this record as the last-applied one */ LastRec = ReadRecPtr; @@ -11594,3 +11610,12 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + +/* + * Schedule a walreceiver wakeup in the main recovery loop. + */ +void +XLogRequestWalReceiverReply(void) +{ + doRequestWalReceiverReply = true; +} diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 92faf4e..1ee1bc5 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -91,13 +91,25 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * to the wait queue. During SyncRepWakeQueue() a WALSender changes * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. * This backend then resets its state to SYNC_REP_NOT_WAITING. + * + * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN + * represents a commit/abort record. If it's not, then we wait only for the + * WAL to be flushed if synchronous_commit is set to the higher level of + * remote_apply, because standbys only send apply feedback for commit/abort + * records. */ void -SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) { char *new_status = NULL; const char *old_status; - int mode = SyncRepWaitMode; + int mode; + + /* Cap the level for non-commit records to remote flush only. */ + if (commit) + mode = SyncRepWaitMode; + else + mode = Max(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); /* * Fast exit if user has not requested sync replication, or there are no @@ -122,7 +134,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * to be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || - XactCommitLSN <= WalSndCtl->lsn[mode]) + lsn <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); return; @@ -132,7 +144,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * Set our waitLSN so WALSender will know when to wake us, and add * ourselves to the queue. */ - MyProc->waitLSN = XactCommitLSN; + MyProc->waitLSN = lsn; MyProc->syncRepState = SYNC_REP_WAITING; SyncRepQueueInsert(mode); Assert(SyncRepQueueIsOrderedByLSN(mode)); @@ -147,7 +159,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) new_status = (char *) palloc(len + 32 + 1); memcpy(new_status, old_status, len); sprintf(new_status + len, " waiting for %X/%X", - (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN); + (uint32) (lsn >> 32), (uint32) lsn); set_ps_display(new_status, false); new_status[len] = '\0'; /* truncate off " waiting ..." */ } @@ -416,6 +428,7 @@ SyncRepReleaseWaiters(void) WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; /* * If this WALSender is serving a standby that is not on the list of @@ -462,12 +475,18 @@ SyncRepReleaseWaiters(void) walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); /* * If we are managing the highest priority standby, though we weren't @@ -728,6 +747,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7b36e02..59e65c2 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -101,6 +101,7 @@ static uint32 recvOff = 0; */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGUSR2 = false; /* * LogstreamResult indicates the byte positions that we have already @@ -150,9 +151,29 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); static void WalRcvSigUsr1Handler(SIGNAL_ARGS); +static void WalRcvSigUsr2Handler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); +static void +WalRcvBlockSigUsr2(void) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_BLOCK, &mask, NULL); +} + +static void +WalRcvUnblockSigUsr2(void) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_UNBLOCK, &mask, NULL); +} static void ProcessWalRcvInterrupts(void) @@ -200,6 +221,7 @@ WalReceiverMain(void) WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; bool ping_sent; + bool forceReply; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -268,7 +290,7 @@ WalReceiverMain(void) pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalRcvSigUsr1Handler); - pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGUSR2, WalRcvSigUsr2Handler); /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); @@ -299,6 +321,10 @@ WalReceiverMain(void) /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* Block SIGUSR2 (we unblock it only during network waits). */ + WalRcvBlockSigUsr2(); + got_SIGUSR2 = false; + /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); walrcv_connect(conninfo); @@ -408,7 +434,9 @@ WalReceiverMain(void) } /* Wait a while for data to arrive */ + WalRcvUnblockSigUsr2(); len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + WalRcvBlockSigUsr2(); if (len != 0) { /* @@ -439,11 +467,21 @@ WalReceiverMain(void) endofwal = true; break; } + WalRcvUnblockSigUsr2(); len = walrcv_receive(0, &buf); + WalRcvBlockSigUsr2(); + } + + if (got_SIGUSR2) + { + /* The recovery process asked us to force a reply. */ + got_SIGUSR2 = false; + forceReply = true; } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(forceReply, false); + forceReply = false; /* * If we've written some records, flush them to disk and @@ -498,7 +536,14 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + if (got_SIGUSR2) + { + /* The recovery process asked us to force a reply. */ + got_SIGUSR2 = false; + forceReply = true; + } + XLogWalRcvSendReply(requestReply || forceReply, requestReply); + forceReply = false; XLogWalRcvSendHSFeedback(false); } } @@ -740,6 +785,13 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) errno = save_errno; } +/* SIGUSR2: used to receive wakeups from recovery */ +static void +WalRcvSigUsr2Handler(SIGNAL_ARGS) +{ + got_SIGUSR2 = true; +} + /* SIGTERM: set flag for main loop, or shutdown immediately if safe */ static void WalRcvShutdownHandler(SIGNAL_ARGS) @@ -1222,6 +1274,22 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } /* + * Wake up the walreceiver if it happens to be blocked in walrcv_receive, + * and tell it that a commit record has been applied. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = remote_apply. + */ +void +WalRcvWakeup(void) +{ + if (WalRcv->pid != 0) + kill(WalRcv->pid, SIGUSR2); +} + +/* * Return a string constant representing the state. This is used * in system functions and views, and should *not* be translated. */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 65a6cd4..06cb166 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = { }; /* - * Although only "on", "off", "remote_write", and "local" are documented, we - * accept all the likely variants of "on" and "off". + * Although only "on", "off", "remote_apply", "remote_write", and "local" are + * documented, we accept all the likely variants of "on" and "off". */ static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, {"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, + {"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5536012..ec4427f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -177,7 +177,7 @@ # (change requires restart) #fsync = on # turns forced synchronization on or off #synchronous_commit = on # synchronization level; - # off, local, remote_write, or on + # off, local, remote_write, remote_apply, or on #wal_sync_method = fsync # the default is the first option # supported by the operating system: # open_datasync diff --git a/src/include/access/xact.h b/src/include/access/xact.h index ebeb582..ed8d22c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -60,7 +60,9 @@ typedef enum SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote * write */ - SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_APPLY /* wait for local flush and remote + * apply */ } SyncCommitLevel; /* Define the default setting for synchonous_commit */ @@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, * EOXact... routines which run at the end of the original transaction * completion. */ +#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK (1U << 29) #define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) #define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) /* Access macros for above flags */ +#define XactCompletionSyncApplyFeedback(xinfo) \ + (!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK)) #define XactCompletionRelcacheInitFileInval(xinfo) \ (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)) #define XactCompletionForceSyncCommit(xinfo) \ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 74a1394..a7dcdae 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -267,6 +267,8 @@ extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); extern void SetWalWriterSleeping(bool sleeping); +extern void XLogRequestWalReceiverReply(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 96e059b..c005a42 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -23,8 +23,9 @@ #define SYNC_REP_NO_WAIT -1 #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_APPLY 2 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 3 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 @@ -35,7 +36,7 @@ extern char *SyncRepStandbyNames; /* called by user backend */ -extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); +extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit); /* called at backend exit */ extern void SyncRepCleanupAtProcExit(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 6eacb09..3294df9 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -162,5 +162,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); +extern void WalRcvWakeup(void); #endif /* _WALRECEIVER_H */