diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 56a1cb4..2f13b63 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5099,6 +5099,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; /* + * Check if the caller would like to ask standbys for immediate feedback + * once this commit is applied. + */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK; + + /* * Relcache invalidations requires information about the current database * and so does logical decoding. */ @@ -5435,6 +5442,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); + /* + * This commit record has someone waiting for apply feedback, so we tell + * the xlog apply loop about that so it can generate a reply. + */ + if (XactCompletionSyncApplyFeedback(parsed->xinfo)) + XLogAppliedSynchronousCommit(true); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a092aad..373204a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6871,6 +6871,15 @@ StartupXLOG(void) XLogCtl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&XLogCtl->info_lck); + /* + * If rm_redo reported that it applied a commit record that + * the master is waiting for, then we wake up the receiver so + * that it notices the updated lastReplayedEndRecPtr and sends + * a reply to the master. + */ + if (XLogAppliedSynchronousCommit(false)) + WalRcvWakeup(); + /* Remember this record as the last-applied one */ LastRec = ReadRecPtr; @@ -11637,3 +11646,17 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + +/* + * Update the flag to indicate that a commit record has been applied, and + * return the previous value. + */ +bool +XLogAppliedSynchronousCommit(bool value) +{ + static bool last_value = false; + bool result = last_value; + + last_value = value; + return result; +} diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 325239d..4524dcb 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -416,6 +416,7 @@ SyncRepReleaseWaiters(void) WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; /* * If this WALSender is serving a standby that is not on the list of @@ -462,12 +463,18 @@ SyncRepReleaseWaiters(void) walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); /* * If we are managing the highest priority standby, though we weren't @@ -728,6 +735,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 41e57f2..7d3acac 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -96,6 +96,7 @@ static uint32 recvOff = 0; */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGUSR1 = false; /* * LogstreamResult indicates the byte positions that we have already @@ -138,7 +139,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvSendReply(bool force, bool requestReply); +static void XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr applyLsn); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -182,6 +183,24 @@ DisableWalRcvImmediateExit(void) ProcessWalRcvInterrupts(); } +/* + * Check if the applied LSN has moved. If it has, send a message to the + * primary server. This is called by the walreceiver when it receives a + * signal from the startup process. + */ +static void +SendApplyNotificationIfMoved(XLogRecPtr *last_sent_applied_lsn) +{ + XLogRecPtr applied_lsn; + + applied_lsn = GetXLogReplayRecPtr(NULL); + if (applied_lsn != *last_sent_applied_lsn) + { + XLogWalRcvSendReply(true, true, applied_lsn); + *last_sent_applied_lsn = applied_lsn; + } +} + /* Main entry point for walreceiver process */ void WalReceiverMain(void) @@ -350,6 +369,7 @@ WalReceiverMain(void) slotname[0] != '\0' ? slotname : NULL)) { bool endofwal = false; + XLogRecPtr last_sent_applied_lsn = InvalidXLogRecPtr; if (first_stream) ereport(LOG, @@ -412,7 +432,7 @@ WalReceiverMain(void) * Process the received data, and any subsequent data we * can read without blocking. */ - for (;;) + while (!got_SIGUSR1) { if (len > 0) { @@ -439,8 +459,15 @@ WalReceiverMain(void) len = walrcv_receive(0, &buf); } + /* Check if the startup process has signaled us. */ + if (got_SIGUSR1) + { + got_SIGUSR1 = false; + SendApplyNotificationIfMoved(&last_sent_applied_lsn); + } + /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, InvalidXLogRecPtr); /* * If we've written some records, flush them to disk and @@ -495,7 +522,14 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + /* Check if the startup process has signaled us. */ + if (got_SIGUSR1) + { + got_SIGUSR1 = false; + SendApplyNotificationIfMoved(&last_sent_applied_lsn); + } + + XLogWalRcvSendReply(requestReply, requestReply, InvalidXLogRecPtr); XLogWalRcvSendHSFeedback(false); } } @@ -734,6 +768,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) { int save_errno = errno; + got_SIGUSR1 = true; latch_sigusr1_handler(); errno = save_errno; @@ -846,7 +881,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) /* If the primary requested a reply, send one immediately */ if (replyRequested) - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, InvalidXLogRecPtr); break; } default: @@ -1010,7 +1045,7 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, InvalidXLogRecPtr); XLogWalRcvSendHSFeedback(false); } } @@ -1028,9 +1063,12 @@ XLogWalRcvFlush(bool dying) * If 'requestReply' is true, requests the server to reply immediately upon * receiving this message. This is used for heartbearts, when approaching * wal_receiver_timeout. + * + * If 'apply_lsn' is InvalidXLogRecPtr, the apply LSN is looked up in shmem if + * it is needed. Otherwise, the value provided is used. */ static void -XLogWalRcvSendReply(bool force, bool requestReply) +XLogWalRcvSendReply(bool force, bool requestReply, XLogRecPtr apply_lsn) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; @@ -1068,7 +1106,8 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyPtr = apply_lsn != InvalidXLogRecPtr ? + apply_lsn : GetXLogReplayRecPtr(NULL); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); @@ -1221,3 +1260,19 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) pfree(receipttime); } } + +/* + * Wake up the walreceiver if it happens to be blocked in walrcv_receive, + * and tell it that a commit record has been applied. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = apply. + */ +void +WalRcvWakeup(void) +{ + if (WalRcv->pid != 0) + kill(WalRcv->pid, SIGUSR1); +} diff --git a/src/backend/utils/adt/tsvector_op.c b/src/backend/utils/adt/tsvector_op.c index 05c23da..e822ba8 100644 --- a/src/backend/utils/adt/tsvector_op.c +++ b/src/backend/utils/adt/tsvector_op.c @@ -21,6 +21,7 @@ #include "funcapi.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "parser/parse_coerce.h" #include "tsearch/ts_utils.h" #include "utils/builtins.h" #include "utils/lsyscache.h" diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 8ebf424..39f14fc 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -351,6 +351,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = { static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, {"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, + {"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, diff --git a/src/include/access/xact.h b/src/include/access/xact.h index cb1c2db..bfb8fa2 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -60,7 +60,9 @@ typedef enum SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote * write */ - SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_APPLY /* wait for local flush and remote + * apply */ } SyncCommitLevel; /* Define the default setting for synchonous_commit */ @@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, * EOXact... routines which run at the end of the original transaction * completion. */ +#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK (1U << 29) #define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) #define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) /* Access macros for above flags */ +#define XactCompletionSyncApplyFeedback(xinfo) \ + (!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK)) #define XactCompletionRelcacheInitFileInval(xinfo) \ (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)) #define XactCompletionForceSyncCommit(xinfo) \ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 790ca66..93efa29 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -266,6 +266,7 @@ extern void RemovePromoteSignalFiles(void); extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); extern void SetWalWriterSleeping(bool sleeping); +extern bool XLogAppliedSynchronousCommit(bool value); extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 71e2857..8e0fe00 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -23,8 +23,9 @@ #define SYNC_REP_NO_WAIT -1 #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_APPLY 2 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 3 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 61255a9..3256ed3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -160,5 +160,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); +extern void WalRcvWakeup(void); #endif /* _WALRECEIVER_H */