Re: [HACKERS] make async slave to wait for lsn to be replayed - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Re: [HACKERS] make async slave to wait for lsn to be replayed |
Date | |
Msg-id | 20210322.140510.419001276411171108.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: [HACKERS] make async slave to wait for lsn to be replayed (Ibrar Ahmed <ibrar.ahmad@gmail.com>) |
List | pgsql-hackers |
At Thu, 18 Mar 2021 18:57:15 +0500, Ibrar Ahmed <ibrar.ahmad@gmail.com> wrote in > On Thu, Jan 21, 2021 at 1:30 PM Kyotaro Horiguchi <horikyota.ntt@gmail.com> > wrote: > > > Hello. > > > > At Wed, 18 Nov 2020 15:05:00 +0300, a.pervushina@postgrespro.ru wrote in > > > I've changed the BEGIN WAIT FOR LSN statement to core functions > > > pg_waitlsn, pg_waitlsn_infinite and pg_waitlsn_no_wait. > > > Currently the functions work inside repeatable read transactions, but > > > waitlsn creates a snapshot if called first in a transaction block, > > > which can possibly lead the transaction to working incorrectly, so the > > > function gives a warning. > > > > According to the discuttion here, implementing as functions is not > > optimal. As a Poc, I made it as a procedure. However I'm not sure it > > is the correct implement as a native procedure but it seems working as > > expected. > > > > > Usage examples > > > ========== > > > select pg_waitlsn(‘LSN’, timeout); > > > select pg_waitlsn_infinite(‘LSN’); > > > select pg_waitlsn_no_wait(‘LSN’); > > > > The first and second usage is coverd by a single procedure. The last > > function is equivalent to pg_last_wal_replay_lsn(). As the result, the > > following procedure is provided in the attached. > > > > pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1) > > > > Any opinions mainly compared to implementation as a command? > > > > regards. > > > > -- > > Kyotaro Horiguchi > > NTT Open Source Software Center > > > > The patch (pg_waitlsn_v10_2_kh.patch) does not compile successfully and has > compilation errors. Can you please take a look? > > https://cirrus-ci.com/task/6241565996744704 > > xlog.c:45:10: fatal error: commands/wait.h: No such file or directory > #include "commands/wait.h" > ^~~~~~~~~~~~~~~~~ > compilation terminated. > make[4]: *** [<builtin>: xlog.o] Error 1 > make[4]: *** Waiting for unfinished jobs.... > make[3]: *** [../../../src/backend/common.mk:39: transam-recursive] Error 2 > make[2]: *** [common.mk:39: access-recursive] Error 2 > make[1]: *** [Makefile:42: all-backend-recurse] Error 2 > make: *** [GNUmakefile:11: all-src-recurse] Error 2 > > I am changing the status to "Waiting on Author" Anna is the autor. The "patch" was just to show how we can implement the feature as a procedure. (Sorry for the bad mistake I made.) The patch still applies to the master. So I resend just rebased version as v10_2, and attached the "PoC" as *.txt which applies on top of the patch. regards. -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6f8810e149..3c580083dd 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -42,6 +42,7 @@ #include "catalog/pg_database.h" #include "commands/progress.h" #include "commands/tablespace.h" +#include "commands/wait.h" #include "common/controldata_utils.h" #include "executor/instrument.h" #include "miscadmin.h" @@ -7535,6 +7536,15 @@ StartupXLOG(void) break; } + /* + * If we replayed an LSN that someone was waiting for, + * set latches in shared memory array to notify the waiter. + */ + if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN()) + { + WaitSetLatch(XLogCtl->lastReplayedEndRecPtr); + } + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, LOG, false); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index e8504f0ae4..2c0bd41336 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -60,6 +60,7 @@ OBJS = \ user.o \ vacuum.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 0000000000..1f2483672b --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,297 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements waitlsn, which allows waiting for events such as + * LSN having been replayed on replica. + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 2020, Regents of PostgresPro + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <math.h> + +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "commands/wait.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/backendid.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/sinvaladt.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/timestamp.h" + +/* Add to shared memory array */ +static void AddWaitedLSN(XLogRecPtr lsn_to_wait); + +/* Shared memory structure */ +typedef struct +{ + int backend_maxid; + pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */ + slock_t mutex; + /* LSNs that different backends are waiting */ + XLogRecPtr lsn[FLEXIBLE_ARRAY_MEMBER]; +} WaitState; + +static WaitState *state; + +/* + * Add the wait event of the current backend to shared memory array + */ +static void +AddWaitedLSN(XLogRecPtr lsn_to_wait) +{ + SpinLockAcquire(&state->mutex); + if (state->backend_maxid < MyBackendId) + state->backend_maxid = MyBackendId; + + state->lsn[MyBackendId] = lsn_to_wait; + + if (lsn_to_wait < state->min_lsn.value) + state->min_lsn.value = lsn_to_wait; + SpinLockRelease(&state->mutex); +} + +/* + * Delete wait event of the current backend from the shared memory array. + */ +void +DeleteWaitedLSN(void) +{ + int i; + XLogRecPtr lsn_to_delete; + + SpinLockAcquire(&state->mutex); + + lsn_to_delete = state->lsn[MyBackendId]; + state->lsn[MyBackendId] = InvalidXLogRecPtr; + + /* If we are deleting the minimal LSN, then choose the next min_lsn */ + if (lsn_to_delete != InvalidXLogRecPtr && + lsn_to_delete == state->min_lsn.value) + { + state->min_lsn.value = PG_UINT64_MAX; + for (i = 2; i <= state->backend_maxid; i++) + if (state->lsn[i] != InvalidXLogRecPtr && + state->lsn[i] < state->min_lsn.value) + state->min_lsn.value = state->lsn[i]; + } + + /* If deleting from the end of the array, shorten the array's used part */ + if (state->backend_maxid == MyBackendId) + for (i = (MyBackendId); i >= 2; i--) + if (state->lsn[i] != InvalidXLogRecPtr) + { + state->backend_maxid = i; + break; + } + + SpinLockRelease(&state->mutex); +} + +/* + * Report amount of shared memory space needed for WaitState + */ +Size +WaitShmemSize(void) +{ + Size size; + + size = offsetof(WaitState, lsn); + size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr))); + return size; +} + +/* + * Initialize an array of events to wait for in shared memory + */ +void +WaitShmemInit(void) +{ + bool found; + uint32 i; + + state = (WaitState *) ShmemInitStruct("pg_wait_lsn", + WaitShmemSize(), + &found); + if (!found) + { + SpinLockInit(&state->mutex); + + for (i = 0; i < (MaxBackends + 1); i++) + state->lsn[i] = InvalidXLogRecPtr; + + state->backend_maxid = 0; + state->min_lsn.value = PG_UINT64_MAX; + } +} + +/* + * Set latches in shared memory to signal that new LSN has been replayed + */ +void +WaitSetLatch(XLogRecPtr cur_lsn) +{ + uint32 i; + int backend_maxid; + PGPROC *backend; + + SpinLockAcquire(&state->mutex); + backend_maxid = state->backend_maxid; + + for (i = 2; i <= backend_maxid; i++) + { + backend = BackendIdGetProc(i); + + if (backend && state->lsn[i] != 0 && + state->lsn[i] <= cur_lsn) + { + SetLatch(&backend->procLatch); + } + } + SpinLockRelease(&state->mutex); +} + +/* + * Get minimal LSN that someone waits for + */ +XLogRecPtr +GetMinWaitedLSN(void) +{ + return state->min_lsn.value; +} + +/* + * On WAIT use a latch to wait till LSN is replayed, + * postmaster dies or timeout happens. Timeout is specified in milliseconds. + * Returns true if LSN was reached and false otherwise. + */ +bool +WaitUtility(XLogRecPtr target_lsn, const int timeout_ms) +{ + XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL); + int latch_events; + float8 endtime; + bool res = false; + bool wait_forever = (timeout_ms <= 0); + + if (!RecoveryInProgress()) { + ereport(ERROR, + errmsg("Cannot use waitlsn on primary")); + return false; + } + + /* + * In transactions, that have isolation level repeatable read or higher + * waitlsn creates a snapshot if called first in a block, which can + * lead the transaction to working incorrectly + */ + + if (IsTransactionBlock() && XactIsoLevel != XACT_READ_COMMITTED) { + ereport(WARNING, + errmsg("Waitlsn may work incorrectly in this isolation level"), + errhint("Call waitlsn before starting the transaction")); + } + + endtime = GetNowFloat() + timeout_ms / 1000.0; + + latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + /* Check if we already reached the needed LSN */ + if (cur_lsn >= target_lsn) + return true; + + AddWaitedLSN(target_lsn); + + for (;;) + { + int rc; + float8 time_left = 0; + long time_left_ms = 0; + + time_left = endtime - GetNowFloat(); + + /* Use 100 ms as the default timeout to check for interrupts */ + if (wait_forever || time_left < 0 || time_left > 0.1) + time_left_ms = 100; + else + time_left_ms = (long) ceil(time_left * 1000.0); + + /* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */ + CHECK_FOR_INTERRUPTS(); + + /* If postmaster dies, finish immediately */ + if (!PostmasterIsAlive()) + break; + + rc = WaitLatch(MyLatch, latch_events, time_left_ms, + WAIT_EVENT_CLIENT_READ); + + ResetLatch(MyLatch); + + if (rc & WL_LATCH_SET) + cur_lsn = GetXLogReplayRecPtr(NULL); + + if (rc & WL_TIMEOUT) + { + cur_lsn = GetXLogReplayRecPtr(NULL); + /* If the time specified by user has passed, stop waiting */ + time_left = endtime - GetNowFloat(); + if (!wait_forever && time_left <= 0.0) + break; + } + + /* If LSN has been replayed */ + if (target_lsn <= cur_lsn) + break; + } + + DeleteWaitedLSN(); + + if (cur_lsn < target_lsn) + ereport(WARNING, + errmsg("LSN was not reached"), + errhint("Try to increase wait time.")); + else + res = true; + + return res; +} + +Datum +pg_waitlsn(PG_FUNCTION_ARGS) +{ + XLogRecPtr trg_lsn = PG_GETARG_LSN(0); + uint64_t delay = PG_GETARG_INT32(1); + + PG_RETURN_BOOL(WaitUtility(trg_lsn, delay)); +} + +Datum +pg_waitlsn_infinite(PG_FUNCTION_ARGS) +{ + XLogRecPtr trg_lsn = PG_GETARG_LSN(0); + + PG_RETURN_BOOL(WaitUtility(trg_lsn, 0)); +} + +Datum +pg_waitlsn_no_wait(PG_FUNCTION_ARGS) +{ + XLogRecPtr trg_lsn = PG_GETARG_LSN(0); + + PG_RETURN_BOOL(WaitUtility(trg_lsn, 1)); +} \ No newline at end of file diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 3e4ec53a97..fb8f8588a7 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -23,6 +23,7 @@ #include "access/syncscan.h" #include "access/twophase.h" #include "commands/async.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, WaitShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -270,6 +272,11 @@ CreateSharedMemoryAndSemaphores(void) SyncScanShmemInit(); AsyncShmemInit(); + /* + * Init array of events for the wait clause in shared memory + */ + WaitShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 897045ee27..540991146a 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -38,6 +38,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xact.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -716,6 +717,9 @@ LockErrorCleanup(void) AbortStrongLockAcquire(); + /* If waitlsn was interrupted, then stop waiting for that LSN */ + DeleteWaitedLSN(); + /* Nothing to do if we weren't waiting for a lock */ if (lockAwaited == NULL) { diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c index 634f574d7e..50c836fdb7 100644 --- a/src/backend/utils/adt/misc.c +++ b/src/backend/utils/adt/misc.c @@ -375,8 +375,6 @@ pg_sleep(PG_FUNCTION_ARGS) * less than the specified time when WaitLatch is terminated early by a * non-query-canceling signal such as SIGHUP. */ -#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) - endtime = GetNowFloat() + secs; for (;;) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index e259531f60..c11387961e 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11411,4 +11411,19 @@ proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text', prosrc => 'unicode_is_normalized' }, +{ oid => '16387', descr => 'wait for LSN until timeout', + proname => 'pg_waitlsn', prorettype => 'bool', proargtypes => 'pg_lsn int8', + proargnames => '{trg_lsn,delay}', + prosrc => 'pg_waitlsn' }, + +{ oid => '16388', descr => 'wait for LSN for an infinite time', + proname => 'pg_waitlsn_infinite', prorettype => 'bool', proargtypes => 'pg_lsn', + proargnames => '{trg_lsn}', + prosrc => 'pg_waitlsn_infinite' }, + +{ oid => '16389', descr => 'wait for LSN with no timeout', + proname => 'pg_waitlsn_no_wait', prorettype => 'bool', proargtypes => 'pg_lsn', + proargnames => '{trg_lsn}', + prosrc => 'pg_waitlsn_no_wait' }, + ] diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h new file mode 100644 index 0000000000..fd21e43416 --- /dev/null +++ b/src/include/commands/wait.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * wait.h + * prototypes for commands/wait.c + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 2020, Regents of PostgresPro + * + * src/include/commands/wait.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAIT_H +#define WAIT_H +#include "postgres.h" +#include "tcop/dest.h" +#include "nodes/parsenodes.h" + +extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms); +extern Size WaitShmemSize(void); +extern void WaitShmemInit(void); +extern void WaitSetLatch(XLogRecPtr cur_lsn); +extern XLogRecPtr GetMinWaitedLSN(void); +extern void DeleteWaitedLSN(void); + +#endif /* WAIT_H */ diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 63bf71ac61..6c4ecd704d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -113,4 +113,6 @@ extern int date2isoyearday(int year, int mon, int mday); extern bool TimestampTimestampTzRequiresRewrite(void); +#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) + #endif /* TIMESTAMP_H */ diff --git a/src/test/recovery/t/021_waitlsn.pl b/src/test/recovery/t/021_waitlsn.pl new file mode 100644 index 0000000000..81dd70ef96 --- /dev/null +++ b/src/test/recovery/t/021_waitlsn.pl @@ -0,0 +1,91 @@ +# Checks waitlsn +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 11; + +# Initialize primary node +my $node_primary = get_new_node('primary'); +$node_primary->init(allows_streaming => 1); +$node_primary->start; + +# And some content and take a backup +$node_primary->safe_psql('postgres', + "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +# Using the backup, create a streaming standby with a 1 second delay +my $node_standby = get_new_node('standby'); +my $delay = 1; +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->append_conf('postgresql.conf', qq[ + recovery_min_apply_delay = '${delay}s' +]); +$node_standby->start; + +# Check that timeouts make us wait for the specified time (1s here) +my $current_time = $node_standby->safe_psql('postgres', "SELECT now()"); +my $two_seconds = 2000; # in milliseconds +my $start_time = time(); +$node_standby->safe_psql('postgres', + "SELECT pg_waitlsn('0/FFFFFFFF', $two_seconds)"); +my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds +ok($time_waited >= $two_seconds, "waitlsn waits for enough time"); + +# Check that timeouts let us stop waiting right away, before reaching target LSN +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(11, 20))"); +my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()"); +my ($ret, $out, $err) = $node_standby->psql('postgres', + "SELECT pg_waitlsn('$lsn1', 1)"); + +ok($ret == 0, "zero return value when failed to waitlsn on standby"); +ok($err =~ /WARNING: LSN was not reached/, + "correct error message when failed to waitlsn on standby"); +ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN"); + + +# Check that waitlsn works fine and reaches target LSN if given no timeout + +# Add data on primary, memorize primary's last LSN +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(21, 30))"); +my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()"); + +# Wait for it to appear on replica, memorize replica's last LSN +$node_standby->safe_psql('postgres', + "SELECT pg_waitlsn_infinite('$lsn2')"); +my $reached_lsn = $node_standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + +# Make sure that primary's and replica's LSNs are the same after WAIT +my $compare_lsns = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)"); +ok($compare_lsns eq 0, + "standby reached the same LSN as primary before starting transaction"); + + +# Make sure that it's not allowed to use waitlsn on primary +($ret, $out, $err) = $node_primary->psql('postgres', + "SELECT pg_waitlsn_infinite('0/FFFFFFFF')"); + +ok($ret != 0, "non-zero return value when trying to waitlsn on primary"); +ok($err =~ /ERROR: Cannot use waitlsn on primary/, + "correct error message when trying to waitlsn on primary"); +ok($out eq '', "empty output when trying to waitlsn on primary"); + +# Make sure that waitlsn gives a warning inside a read commited transaction + +($ret, $out, $err) = $node_standby->psql('postgres', + "BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT pg_waitlsn_no_wait('0/FFFFFFFF')"); +ok($ret == 0, "zero return value when trying to waitlsn in transaction"); +ok($err =~ /WARNING: Waitlsn may work incorrectly in this isolation level/, + "correct warning message when trying to waitlsn in transaction"); +ok($out eq "f", "non empty output when trying to waitlsn in transaction"); + +$node_standby->stop; +$node_primary->stop; \ No newline at end of file diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1d1d5d2f0e..6075ee5e77 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2730,6 +2730,7 @@ WaitEventIPC WaitEventSet WaitEventTimeout WaitPMResult +WaitState WalCloseMethod WalLevel WalRcvData diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 0dca65dc7b..635508639a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1474,6 +1474,10 @@ LANGUAGE internal STRICT IMMUTABLE PARALLEL SAFE AS 'unicode_is_normalized'; +CREATE OR REPLACE PROCEDURE + pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1) + LANGUAGE internal AS 'pg_waitlsn'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index c11387961e..7f25938cbc 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11426,4 +11426,8 @@ proargnames => '{trg_lsn}', prosrc => 'pg_waitlsn_no_wait' }, +{ oid => '9313', descr => 'wait for LSN to be replayed', + proname => 'pg_waitlsn', prokind => 'p',prorettype => 'void', proargtypes => 'pg_lsn int4', + proargnames => '{wait_lsn,timeout}', + prosrc => 'pg_waitlsn' } ]
pgsql-hackers by date: