From 0075c8b4e79ecd9a54caa81d90de07266687db9c Mon Sep 17 00:00:00 2001 From: Jakub Wartak Date: Thu, 26 Jan 2023 11:50:10 +0000 Subject: [PATCH v2] WIP: Syncrep and improving latency due to WAL throttling --- src/backend/access/transam/xlog.c | 59 +++++++++++++++++++++++++ src/backend/replication/syncrep.c | 21 +++++++-- src/backend/tcop/postgres.c | 3 ++ src/backend/utils/activity/wait_event.c | 3 ++ src/backend/utils/init/globals.c | 1 + src/backend/utils/misc/guc_tables.c | 11 +++++ src/include/access/xlog.h | 3 ++ src/include/miscadmin.h | 1 + src/include/replication/syncrep.h | 1 + src/include/utils/wait_event.h | 1 + 10 files changed, 100 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index fb4c860bde..6d2d928a31 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1,4 +1,5 @@ /*------------------------------------------------------------------------- + * * xlog.c * PostgreSQL write-ahead log manager @@ -73,6 +74,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "portability/instr_time.h" #include "port/atomics.h" #include "port/pg_iovec.h" #include "postmaster/bgwriter.h" @@ -82,6 +84,7 @@ #include "replication/origin.h" #include "replication/slot.h" #include "replication/snapbuild.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -108,6 +111,7 @@ #include "utils/timestamp.h" #include "utils/varlena.h" + extern uint32 bootstrap_data_checksum_version; /* timeline ID to be used when bootstrapping */ @@ -138,6 +142,7 @@ int wal_retrieve_retry_interval = 5000; int max_slot_wal_keep_size_mb = -1; int wal_decode_buffer_size = 512 * 1024; bool track_wal_io_timing = false; +int synchronous_commit_flush_wal_after = 0; /* kb */ #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -252,10 +257,14 @@ static int LocalXLogInsertAllowed = -1; * parallel backends may have written WAL records at later LSNs than the value * stored here. The parallel leader advances its own copy, when necessary, * in WaitForParallelWorkersToFinish. + * + * XactLastThrottledRecEnd points to the last XLOG record that should be throttled + * as the additional WAL records could be generated before processing interrupts. */ XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; +static XLogRecPtr XactLastThrottledRecEnd = InvalidXLogRecPtr; /* * RedoRecPtr is this backend's local copy of the REDO record pointer @@ -638,6 +647,8 @@ static bool holdingAllLocks = false; static MemoryContext walDebugCxt = NULL; #endif +uint32 backendWalInserted = 0; + static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI, XLogRecPtr EndOfLog, TimeLineID newTLI); @@ -1022,6 +1033,17 @@ XLogInsertRecord(XLogRecData *rdata, pgWalUsage.wal_bytes += rechdr->xl_tot_len; pgWalUsage.wal_records++; pgWalUsage.wal_fpi += num_fpi; + + /* WAL throttling */ + backendWalInserted += rechdr->xl_tot_len; + if ((synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_APPLY || synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_WRITE) && + synchronous_commit_flush_wal_after > 0 && + backendWalInserted > synchronous_commit_flush_wal_after * 1024L) + { + XactLastThrottledRecEnd = XactLastRecEnd; + InterruptPending = true; + XLogDelayPending = true; + } } return EndPos; @@ -2531,6 +2553,9 @@ XLogFlush(XLogRecPtr record) return; } + /* reset WAL throttling bytes counter */ + backendWalInserted = 0; + /* Quick exit if already known flushed */ if (record <= LogwrtResult.Flush) return; @@ -8952,3 +8977,37 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + + +/* + * Called from ProcessMessageInterrupts() to avoid waiting while being in critical section + */ +void HandleXLogDelayPending() +{ + /* flush only up to the last fully filled page */ + XLogRecPtr LastFullyWrittenXLogPage = XactLastThrottledRecEnd - (XactLastThrottledRecEnd % XLOG_BLCKSZ); + instr_time wal_throttle_time_start, wal_throttle_time_end; + double timediff; + XLogDelayPending = false; + + //HOLD_INTERRUPTS(); + + /* XXX Debug for now */ + elog(WARNING, "throttling WAL down on this session (backendWalInserted=%d, LSN=%X/%X flushingTo=%X/%X)", + backendWalInserted, + LSN_FORMAT_ARGS(XactLastThrottledRecEnd), + LSN_FORMAT_ARGS(LastFullyWrittenXLogPage)); + + INSTR_TIME_SET_CURRENT(wal_throttle_time_start); + + XLogFlush(LastFullyWrittenXLogPage); + SyncRepWaitForLSNThrottled(LastFullyWrittenXLogPage, false); + + INSTR_TIME_SET_CURRENT(wal_throttle_time_end); + INSTR_TIME_SUBTRACT(wal_throttle_time_end, wal_throttle_time_start); + + timediff = INSTR_TIME_GET_DOUBLE(wal_throttle_time_end) * 1000.0; + elog(WARNING, "throttling WAL down on this session - end (%f ms)", timediff); + + //RESUME_INTERRUPTS(); +} diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 80d681b71c..09e405333e 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -145,8 +145,9 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * to be flushed if synchronous_commit is set to the higher level of * remote_apply, because only commit records provide apply feedback. */ -void -SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) + +static void +SyncRepWaitForLSNInternal(XLogRecPtr lsn, bool commit, uint32 wait_event) { char *new_status = NULL; const char *old_status; @@ -293,8 +294,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) * Wait on latch. Any condition that should wake us up will set the * latch, so no need for timeout. */ - rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, - WAIT_EVENT_SYNC_REP); + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, wait_event); /* * If the postmaster dies, we'll probably never get an acknowledgment, @@ -330,6 +330,19 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) } } +void +SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) +{ + SyncRepWaitForLSNInternal(lsn, commit, WAIT_EVENT_SYNC_REP); +} + +void +SyncRepWaitForLSNThrottled(XLogRecPtr lsn, bool commit) +{ + SyncRepWaitForLSNInternal(lsn, commit, WAIT_EVENT_SYNC_REP_THROTTLED); +} + + /* * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant. * diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 470b734e9e..9eaf2df1ce 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3393,6 +3393,9 @@ ProcessInterrupts(void) if (ParallelApplyMessagePending) HandleParallelApplyMessages(); + + if (XLogDelayPending) + HandleXLogDelayPending(); } /* diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 6e4599278c..4271514520 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -457,6 +457,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_SYNC_REP_THROTTLED: + event_name = "SyncRepThrottled"; + break; case WAIT_EVENT_WAL_RECEIVER_EXIT: event_name = "WalReceiverExit"; break; diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 1b1d814254..0941269cf4 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -40,6 +40,7 @@ volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false; volatile uint32 InterruptHoldoffCount = 0; volatile uint32 QueryCancelHoldoffCount = 0; volatile uint32 CritSectionCount = 0; +bool XLogDelayPending = false; int MyProcPid; pg_time_t MyStartTime; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 4ac808ed22..88015b05bb 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2831,6 +2831,17 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"synchronous_commit_flush_wal_after", PGC_USERSET, REPLICATION_SENDING, + gettext_noop("Sets the maximum logged WAL in kbytes, after which wait for sync commit confiration even without commit "), + NULL, + GUC_UNIT_KB + }, + &synchronous_commit_flush_wal_after, + 0, 0, 1024L*1024L, + NULL, NULL, NULL + }, + { {"extra_float_digits", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets the number of digits displayed for floating-point values."), diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cfe5409738..19a6478e57 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -51,6 +51,8 @@ extern PGDLLIMPORT char *wal_consistency_checking_string; extern PGDLLIMPORT bool log_checkpoints; extern PGDLLIMPORT bool track_wal_io_timing; extern PGDLLIMPORT int wal_decode_buffer_size; +extern PGDLLIMPORT int synchronous_commit_flush_wal_after; +extern PGDLLIMPORT uint32 backendWalInserted; extern PGDLLIMPORT int CheckPointSegments; @@ -246,6 +248,7 @@ extern TimeLineID GetWALInsertionTimeLine(void); extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void HandleXLogDelayPending(void); /* * Routines used by xlogrecovery.c to call back into xlog.c during recovery. diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 96b3a1e1a0..395b294298 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -95,6 +95,7 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending; extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending; extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending; extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending; +extern PGDLLIMPORT bool XLogDelayPending; extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending; extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 0aec4cb1d5..bdbe903eba 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -82,6 +82,7 @@ extern PGDLLIMPORT char *SyncRepStandbyNames; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit); +extern void SyncRepWaitForLSNThrottled(XLogRecPtr lsn, bool commit); /* called at backend exit */ extern void SyncRepCleanupAtProcExit(void); diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 6cacd6edaf..2515b148d6 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -128,6 +128,7 @@ typedef enum WAIT_EVENT_RESTORE_COMMAND, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, + WAIT_EVENT_SYNC_REP_THROTTLED, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WAIT_EVENT_XACT_GROUP_UPDATE -- 2.30.2