Re: [HACKERS] make async slave to wait for lsn to be replayed - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: [HACKERS] make async slave to wait for lsn to be replayed
Date
Msg-id 20210121.173009.235021120161403875.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: [HACKERS] make async slave to wait for lsn to be replayed  (a.pervushina@postgrespro.ru)
Responses Re: [HACKERS] make async slave to wait for lsn to be replayed  (Ibrar Ahmed <ibrar.ahmad@gmail.com>)
List pgsql-hackers
Hello.

At Wed, 18 Nov 2020 15:05:00 +0300, a.pervushina@postgrespro.ru wrote in 
> I've changed the BEGIN WAIT FOR LSN statement to core functions
> pg_waitlsn, pg_waitlsn_infinite and pg_waitlsn_no_wait.
> Currently the functions work inside repeatable read transactions, but
> waitlsn creates a snapshot if called first in a transaction block,
> which can possibly lead the transaction to working incorrectly, so the
> function gives a warning.

According to the discuttion here, implementing as functions is not
optimal. As a Poc, I made it as a procedure. However I'm not sure it
is the correct implement as a native procedure but it seems working as
expected.

> Usage examples
> ==========
> select pg_waitlsn(‘LSN’, timeout);
> select pg_waitlsn_infinite(‘LSN’);
> select pg_waitlsn_no_wait(‘LSN’);

The first and second usage is coverd by a single procedure. The last
function is equivalent to pg_last_wal_replay_lsn(). As the result, the
following procedure is provided in the attached.

pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1)

Any opinions mainly compared to implementation as a command?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 470e113b33..4283b98eb4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "catalog/pg_database.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/controldata_utils.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -7463,6 +7464,15 @@ StartupXLOG(void)
                     break;
                 }
 
+                /*
+                 * If we replayed an LSN that someone was waiting for,
+                 * set latches in shared memory array to notify the waiter.
+                 */
+                if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+                {
+                    WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+                }
+
                 /* Else, try to fetch the next WAL record */
                 record = ReadRecord(xlogreader, LOG, false);
             } while (record != NULL);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fa58afd9d7..c19d49e7a4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1460,6 +1460,10 @@ LANGUAGE internal
 STRICT IMMUTABLE PARALLEL SAFE
 AS 'unicode_is_normalized';
 
+CREATE OR REPLACE PROCEDURE
+  pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1)
+  LANGUAGE internal AS 'pg_waitlsn';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index e8504f0ae4..2c0bd41336 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -60,6 +60,7 @@ OBJS = \
     user.o \
     vacuum.o \
     variable.o \
-    view.o
+    view.o \
+    wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index f9bbe97b50..959e96b7e0 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -23,6 +23,7 @@
 #include "access/syncscan.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void)
         size = add_size(size, BTreeShmemSize());
         size = add_size(size, SyncScanShmemSize());
         size = add_size(size, AsyncShmemSize());
+        size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
         size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -268,6 +270,11 @@ CreateSharedMemoryAndSemaphores(void)
     SyncScanShmemInit();
     AsyncShmemInit();
 
+    /*
+     * Init array of events for the wait clause in shared memory
+     */
+    WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
     /*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index c87ffc6549..2b4d73ba2f 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -38,6 +38,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -713,6 +714,9 @@ LockErrorCleanup(void)
 
     AbortStrongLockAcquire();
 
+    /* If waitlsn was interrupted, then stop waiting for that LSN */
+    DeleteWaitedLSN();
+
     /* Nothing to do if we weren't waiting for a lock */
     if (lockAwaited == NULL)
     {
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 4096faff9a..90876da120 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -373,8 +373,6 @@ pg_sleep(PG_FUNCTION_ARGS)
      * less than the specified time when WaitLatch is terminated early by a
      * non-query-canceling signal such as SIGHUP.
      */
-#define GetNowFloat()    ((float8) GetCurrentTimestamp() / 1000000.0)
-
     endtime = GetNowFloat() + secs;
 
     for (;;)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b5f52d4e4a..918eaedfd5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11375,4 +11375,8 @@
   proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text',
   prosrc => 'unicode_is_normalized' },
 
+{ oid => '9313', descr => 'wait for LSN to be replayed',
+  proname => 'pg_waitlsn',  prokind => 'p',prorettype => 'void', proargtypes => 'pg_lsn int4',
+  proargnames => '{wait_lsn,timeout}',
+  prosrc => 'pg_waitlsn' }
 ]
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 63bf71ac61..6c4ecd704d 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -113,4 +113,6 @@ extern int    date2isoyearday(int year, int mon, int mday);
 
 extern bool TimestampTimestampTzRequiresRewrite(void);
 
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
 #endif                            /* TIMESTAMP_H */

pgsql-hackers by date:

Previous
From: Noah Misch
Date:
Subject: Re: Wrong usage of RelationNeedsWAL
Next
From: Amit Kapila
Date:
Subject: Re: Parallel INSERT (INTO ... SELECT ...)