From 288477bbd1421d173133a34f226beedfb6c305a9 Mon Sep 17 00:00:00 2001 From: Jakub Wartak Date: Wed, 1 Feb 2023 09:36:09 +0000 Subject: [PATCH v3] WIP: Syncrep and improving latency due to WAL throttling --- src/backend/access/heap/vacuumlazy.c | 7 +++- src/backend/access/transam/xlog.c | 55 +++++++++++++++++++++++++ src/backend/catalog/system_views.sql | 1 + src/backend/commands/explain.c | 5 +++ src/backend/executor/instrument.c | 2 + src/backend/replication/syncrep.c | 17 ++++++-- src/backend/tcop/postgres.c | 3 ++ src/backend/utils/activity/pgstat_wal.c | 2 + src/backend/utils/activity/wait_event.c | 3 ++ src/backend/utils/adt/pgstatfuncs.c | 10 +++-- src/backend/utils/init/globals.c | 1 + src/backend/utils/misc/guc_tables.c | 12 ++++++ src/include/access/xlog.h | 3 ++ src/include/catalog/pg_proc.dat | 6 +-- src/include/executor/instrument.h | 1 + src/include/miscadmin.h | 2 + src/include/pgstat.h | 3 +- src/include/utils/wait_event.h | 1 + src/test/regress/expected/rules.out | 3 +- 19 files changed, 124 insertions(+), 13 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 8f14cf85f3..df28a2397d 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -759,10 +759,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, (long long) PageMissOp, (long long) PageDirtyOp); appendStringInfo(&buf, - _("WAL usage: %lld records, %lld full page images, %llu bytes\n"), + _("WAL usage: %lld records, %lld full page images, %llu bytes"), (long long) walusage.wal_records, (long long) walusage.wal_fpi, (unsigned long long) walusage.wal_bytes); + if(walusage.wal_throttled > 0) + appendStringInfo(&buf, _("%lld times throttled"), (long long) walusage.wal_throttled); + else + appendStringInfo(&buf, _("\n")); + appendStringInfo(&buf, _("system usage: %s"), pg_rusage_show(&ru0)); ereport(verbose ? INFO : LOG, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index fb4c860bde..8259c70f98 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1,4 +1,5 @@ /*------------------------------------------------------------------------- + * * xlog.c * PostgreSQL write-ahead log manager @@ -73,6 +74,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "portability/instr_time.h" #include "port/atomics.h" #include "port/pg_iovec.h" #include "postmaster/bgwriter.h" @@ -82,6 +84,7 @@ #include "replication/origin.h" #include "replication/slot.h" #include "replication/snapbuild.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -108,6 +111,7 @@ #include "utils/timestamp.h" #include "utils/varlena.h" + extern uint32 bootstrap_data_checksum_version; /* timeline ID to be used when bootstrapping */ @@ -138,6 +142,7 @@ int wal_retrieve_retry_interval = 5000; int max_slot_wal_keep_size_mb = -1; int wal_decode_buffer_size = 512 * 1024; bool track_wal_io_timing = false; +int synchronous_commit_wal_throttle_threshold = 0; /* kb */ #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -252,10 +257,14 @@ static int LocalXLogInsertAllowed = -1; * parallel backends may have written WAL records at later LSNs than the value * stored here. The parallel leader advances its own copy, when necessary, * in WaitForParallelWorkersToFinish. + * + * XactLastThrottledRecEnd points to the last XLOG record that should be throttled + * as the additional WAL records could be generated before processing interrupts. */ XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; +static XLogRecPtr XactLastThrottledRecEnd = InvalidXLogRecPtr; /* * RedoRecPtr is this backend's local copy of the REDO record pointer @@ -638,6 +647,8 @@ static bool holdingAllLocks = false; static MemoryContext walDebugCxt = NULL; #endif +uint32 backendWalInserted = 0; + static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI, XLogRecPtr EndOfLog, TimeLineID newTLI); @@ -1022,6 +1033,24 @@ XLogInsertRecord(XLogRecData *rdata, pgWalUsage.wal_bytes += rechdr->xl_tot_len; pgWalUsage.wal_records++; pgWalUsage.wal_fpi += num_fpi; + + /* WAL throttling: we can slow down (some) backends generating a lot of WAL + * in syncrep scenario by waiting for standby confirmation. This allows + * prioritzation of other backends over this backend in bandwidth constrained + * WAN scenarios. Such throttled down backends are going to be visible with + * "SyncRepThrottled" wait event. + */ + backendWalInserted += rechdr->xl_tot_len; + if ((synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_APPLY || + synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_WRITE) && + synchronous_commit_wal_throttle_threshold > 0 && + backendWalInserted > synchronous_commit_wal_throttle_threshold * 1024L) + { + XactLastThrottledRecEnd = XactLastRecEnd; + InterruptPending = true; + XLogDelayPending = true; + pgWalUsage.wal_throttled++; + } } return EndPos; @@ -2531,6 +2560,9 @@ XLogFlush(XLogRecPtr record) return; } + /* reset WAL throttling bytes counter */ + backendWalInserted = 0; + /* Quick exit if already known flushed */ if (record <= LogwrtResult.Flush) return; @@ -8952,3 +8984,26 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + + +/* + * Called from ProcessMessageInterrupts() to avoid waiting while + * being in critical section. Performing those directly from XLogInsertRecord() + * would cause locks to be held for longer duration. + */ +void +HandleXLogDelayPending() +{ + /* flush only up to the last fully filled page to avoid repeating flushing + * of the same page multiple times */ + XLogRecPtr LastFullyWrittenXLogPage = XactLastThrottledRecEnd - + (XactLastThrottledRecEnd % XLOG_BLCKSZ); + + Assert(synchronous_commit_wal_throttle_threshold > 0); + Assert(backendWalInserted > synchronous_commit_wal_throttle_threshold * 1024L); + Assert(XactLastThrottledRecEnd != InvalidXLogRecPtr); + + XLogFlush(LastFullyWrittenXLogPage); + SyncRepWaitForLSN(LastFullyWrittenXLogPage, false); + XLogDelayPending = false; +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8608e3fa5b..32b368ebed 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1127,6 +1127,7 @@ CREATE VIEW pg_stat_wal AS w.wal_sync, w.wal_write_time, w.wal_sync_time, + w.wal_throttled, w.stats_reset FROM pg_stat_get_wal() w; diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 35c23bd27d..d9489f3e2d 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -3708,6 +3708,9 @@ show_wal_usage(ExplainState *es, const WalUsage *usage) if (usage->wal_bytes > 0) appendStringInfo(es->str, " bytes=" UINT64_FORMAT, usage->wal_bytes); + if (usage->wal_throttled > 0) + appendStringInfo(es->str, " throttled=%lld", + (long long) usage->wal_throttled); appendStringInfoChar(es->str, '\n'); } } @@ -3719,6 +3722,8 @@ show_wal_usage(ExplainState *es, const WalUsage *usage) usage->wal_fpi, es); ExplainPropertyUInteger("WAL Bytes", NULL, usage->wal_bytes, es); + ExplainPropertyUInteger("WAL Throttled", NULL, + usage->wal_throttled, es); } } diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index ee78a5749d..d87bad5675 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -274,6 +274,7 @@ WalUsageAdd(WalUsage *dst, WalUsage *add) dst->wal_bytes += add->wal_bytes; dst->wal_records += add->wal_records; dst->wal_fpi += add->wal_fpi; + dst->wal_throttled += add->wal_throttled; } void @@ -282,4 +283,5 @@ WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub) dst->wal_bytes += add->wal_bytes - sub->wal_bytes; dst->wal_records += add->wal_records - sub->wal_records; dst->wal_fpi += add->wal_fpi - sub->wal_fpi; + dst->wal_throttled += add->wal_throttled - sub->wal_throttled; } diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 80d681b71c..5c56dc3eff 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -145,6 +145,7 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * to be flushed if synchronous_commit is set to the higher level of * remote_apply, because only commit records provide apply feedback. */ + void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) { @@ -155,9 +156,10 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) /* * This should be called while holding interrupts during a transaction * commit to prevent the follow-up shared memory queue cleanups to be - * influenced by external interruptions. + * influenced by external interruptions. The only exception is WAL throttling + * where this could be called without holding interrupts. */ - Assert(InterruptHoldoffCount > 0); + Assert(XLogDelayPending == true || InterruptHoldoffCount > 0); /* * Fast exit if user has not requested sync replication, or there are no @@ -236,6 +238,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) for (;;) { int rc; + uint32 wait_event; /* Must reset the latch before testing state. */ ResetLatch(MyLatch); @@ -289,12 +292,18 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) break; } + /* XLogDelayPending flag that is used here is being reset afterwards in + * in HandleXLogDelayPending() + */ + if(XLogDelayPending == true) + wait_event = WAIT_EVENT_SYNC_REP_THROTTLED; + else + wait_event = WAIT_EVENT_SYNC_REP; /* * Wait on latch. Any condition that should wake us up will set the * latch, so no need for timeout. */ - rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, - WAIT_EVENT_SYNC_REP); + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, wait_event); /* * If the postmaster dies, we'll probably never get an acknowledgment, diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 470b734e9e..9eaf2df1ce 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3393,6 +3393,9 @@ ProcessInterrupts(void) if (ParallelApplyMessagePending) HandleParallelApplyMessages(); + + if (XLogDelayPending) + HandleXLogDelayPending(); } /* diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c index e7a82b5fed..1303155ff8 100644 --- a/src/backend/utils/activity/pgstat_wal.c +++ b/src/backend/utils/activity/pgstat_wal.c @@ -90,6 +90,7 @@ pgstat_flush_wal(bool nowait) PendingWalStats.wal_records = diff.wal_records; PendingWalStats.wal_fpi = diff.wal_fpi; PendingWalStats.wal_bytes = diff.wal_bytes; + PendingWalStats.wal_throttled = diff.wal_throttled; if (!nowait) LWLockAcquire(&stats_shmem->lock, LW_EXCLUSIVE); @@ -105,6 +106,7 @@ pgstat_flush_wal(bool nowait) WALSTAT_ACC(wal_sync); WALSTAT_ACC(wal_write_time); WALSTAT_ACC(wal_sync_time); + WALSTAT_ACC(wal_throttled); #undef WALSTAT_ACC LWLockRelease(&stats_shmem->lock); diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 6e4599278c..4271514520 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -457,6 +457,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_SYNC_REP_THROTTLED: + event_name = "SyncRepThrottled"; + break; case WAIT_EVENT_WAL_RECEIVER_EXIT: event_name = "WalReceiverExit"; break; diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 6737493402..2d1152188d 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1251,7 +1251,7 @@ pg_stat_get_buf_alloc(PG_FUNCTION_ARGS) Datum pg_stat_get_wal(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_COLS 9 +#define PG_STAT_GET_WAL_COLS 10 TupleDesc tupdesc; Datum values[PG_STAT_GET_WAL_COLS] = {0}; bool nulls[PG_STAT_GET_WAL_COLS] = {0}; @@ -1276,7 +1276,9 @@ pg_stat_get_wal(PG_FUNCTION_ARGS) FLOAT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 8, "wal_sync_time", FLOAT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "wal_throttled", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -1303,7 +1305,9 @@ pg_stat_get_wal(PG_FUNCTION_ARGS) values[6] = Float8GetDatum(((double) wal_stats->wal_write_time) / 1000.0); values[7] = Float8GetDatum(((double) wal_stats->wal_sync_time) / 1000.0); - values[8] = TimestampTzGetDatum(wal_stats->stat_reset_timestamp); + values[8] = Int64GetDatum(wal_stats->wal_throttled); + + values[9] = TimestampTzGetDatum(wal_stats->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 1b1d814254..0941269cf4 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -40,6 +40,7 @@ volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false; volatile uint32 InterruptHoldoffCount = 0; volatile uint32 QueryCancelHoldoffCount = 0; volatile uint32 CritSectionCount = 0; +bool XLogDelayPending = false; int MyProcPid; pg_time_t MyStartTime; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index c5a95f5dcc..9bc15be914 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2831,6 +2831,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"synchronous_commit_wal_throttle_threshold", PGC_USERSET, REPLICATION_SENDING, + gettext_noop("Sets the maximum amount of WAL in kilobytes a backend generates " + "which it waits for synchronous commit confiration even without commit"), + NULL, + GUC_UNIT_KB + }, + &synchronous_commit_wal_throttle_threshold, + 0, 0, MAX_KILOBYTES, + NULL, NULL, NULL + }, + { {"extra_float_digits", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets the number of digits displayed for floating-point values."), diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cfe5409738..c9ca29d087 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -51,6 +51,8 @@ extern PGDLLIMPORT char *wal_consistency_checking_string; extern PGDLLIMPORT bool log_checkpoints; extern PGDLLIMPORT bool track_wal_io_timing; extern PGDLLIMPORT int wal_decode_buffer_size; +extern PGDLLIMPORT int synchronous_commit_wal_throttle_threshold; +extern PGDLLIMPORT uint32 backendWalInserted; extern PGDLLIMPORT int CheckPointSegments; @@ -246,6 +248,7 @@ extern TimeLineID GetWALInsertionTimeLine(void); extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void HandleXLogDelayPending(void); /* * Routines used by xlogrecovery.c to call back into xlog.c during recovery. diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index c0f2a8a77c..44f601c3ec 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5720,9 +5720,9 @@ { oid => '1136', descr => 'statistics: information about WAL activity', proname => 'pg_stat_get_wal', proisstrict => 'f', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int8,int8,numeric,int8,int8,int8,float8,float8,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o}', - proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}', + proallargtypes => '{int8,int8,numeric,int8,int8,int8,float8,float8,int8,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o}', + proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,wal_throttled,stats_reset}', prosrc => 'pg_stat_get_wal' }, { oid => '6248', descr => 'statistics: information about WAL prefetching', proname => 'pg_stat_get_recovery_prefetch', prorows => '1', proretset => 't', diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 87e5e2183b..b3e4e86256 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -51,6 +51,7 @@ typedef struct WalUsage int64 wal_records; /* # of WAL records produced */ int64 wal_fpi; /* # of WAL full page images produced */ uint64 wal_bytes; /* size of WAL records produced */ + int64 wal_throttled; /* # of times WAL throttling was engaged*/ } WalUsage; /* Flag bits included in InstrAlloc's instrument_options bitmask */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 96b3a1e1a0..aaac055e1d 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -95,6 +95,8 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending; extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending; extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending; extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending; +/* this doesn't need to be volatile sig_atomic_t as it set in XLogInsertRecord() */ +extern PGDLLIMPORT bool XLogDelayPending; extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending; extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5e3326a3b9..c7ad997fc7 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -242,7 +242,7 @@ typedef struct PgStat_TableXactStatus * ------------------------------------------------------------ */ -#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA9 +#define PGSTAT_FILE_FORMAT_ID 0x01A5BCAA typedef struct PgStat_ArchiverStats { @@ -392,6 +392,7 @@ typedef struct PgStat_WalStats PgStat_Counter wal_sync; PgStat_Counter wal_write_time; PgStat_Counter wal_sync_time; + PgStat_Counter wal_throttled; /* how many times backend was throttled */ TimestampTz stat_reset_timestamp; } PgStat_WalStats; diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 6cacd6edaf..2515b148d6 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -128,6 +128,7 @@ typedef enum WAIT_EVENT_RESTORE_COMMAND, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, + WAIT_EVENT_SYNC_REP_THROTTLED, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WAIT_EVENT_XACT_GROUP_UPDATE diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e7a2f5856a..d11ae5123b 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2203,8 +2203,9 @@ pg_stat_wal| SELECT wal_records, wal_sync, wal_write_time, wal_sync_time, + wal_throttled, stats_reset - FROM pg_stat_get_wal() w(wal_records, wal_fpi, wal_bytes, wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, stats_reset); + FROM pg_stat_get_wal() w(wal_records, wal_fpi, wal_bytes, wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, wal_throttled, stats_reset); pg_stat_wal_receiver| SELECT pid, status, receive_start_lsn, -- 2.30.2