Re: BUG #16481: Stored Procedure Triggered by Logical Replicationis Unable to use Notification Events - Mailing list pgsql-bugs

From Kyotaro Horiguchi
Subject Re: BUG #16481: Stored Procedure Triggered by Logical Replicationis Unable to use Notification Events
Date
Msg-id 20200609.150921.2296867674612794898.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: BUG #16481: Stored Procedure Triggered by Logical Replication isUnable to use Notification Events  (Euler Taveira <euler.taveira@2ndquadrant.com>)
List pgsql-bugs
Hello, Euler.

At Mon, 8 Jun 2020 07:51:18 -0300, Euler Taveira <euler.taveira@2ndquadrant.com> wrote in 
> On Mon, 8 Jun 2020 at 05:27, Kyotaro Horiguchi <horikyota.ntt@gmail.com>
> wrote:
> 
> >
> > That can be fixed by calling ProcessCompletedNotifies() in
> > apply_handle_commit. The function has a code to write out
> > notifications to connected clients but it doesn't nothing on logical
> > replication workers.
> >
> >
> This bug was already reported some time ago (#15293) but it slipped through
> the
> cracks. I don't think you should simply call ProcessCompletedNotifies [1].

Yeah, Thanks for pointing that. I faintly thought of a similar thing
to the discussion there. Just calling ProcessCompletedNotifies in
apply_handle_commit is actually wrong.

We can move only SignalBackends() to AtCommit_Notify since
asyncQueueAdvanceTail() is no longer dependent on the result of
SignalBackends, but anyway we need to call asyncQueueAdvanceTail in
AtCommit_Notify and AtAbort_Notify since otherwise the queue cannot be
shorten while running logical replication. This can slightly defers
tail-advancing but I think it wouldn't be a significant problem.

> [1] https://www.postgresql.org/message-id/13844.1532468610%40sss.pgh.pa.us

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From c3aa3e584cf57632284dc9b282dd635c418f3084 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Tue, 9 Jun 2020 14:01:34 +0900
Subject: [PATCH v2] Fix notification signaling

Notifications are signaled at command loop. That prevents logical
replication apply loop from signaling properly.  To fix, send signal
in AtCommit_Notify instead of the top-level command loop.

Discussion: https://www.postgresql.org/message-id/13844.1532468610%40sss.pgh.pa.us
Discussion: https://www.postgresql.org/message-id/20200608.172730.68580977059033.horikyota.ntt%40gmail.com
---
 src/backend/commands/async.c | 103 +++++++++++++++++++++--------------
 1 file changed, 63 insertions(+), 40 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 71b7577afc..590ad7fcc8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -449,7 +449,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
-static void SignalBackends(void);
+static bool SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                          QueuePosition stop,
@@ -976,7 +976,8 @@ PreCommit_Notify(void)
  *
  *        This is called at transaction commit, after committing to clog.
  *
- *        Update listenChannels and clear transaction-local state.
+ *        Update listenChannels and clear transaction-local state. Send signals
+ *        for notifications to other backends to process them.
  */
 void
 AtCommit_Notify(void)
@@ -1021,6 +1022,29 @@ AtCommit_Notify(void)
 
     /* And clean up */
     ClearPendingActionsAndNotifies();
+
+    /* signal our notifications to other backends */
+    if (backendHasSentNotifications)
+    {
+        /*
+         * No use reading the queue at idle time later if this backend is not a
+         * listener.
+         */
+        if (!SignalBackends())
+            backendHasSentNotifications = false;
+
+        /*
+         * If it's time to try to advance the global tail pointer, do that. We
+         * need do this here in case where many transactions are committed
+         * without returning to the top-level loop, like logical replication
+         * apply loop.
+         */
+        if (backendTryAdvanceTail)
+        {
+            backendTryAdvanceTail = false;
+            asyncQueueAdvanceTail();
+        }
+    }
 }
 
 /*
@@ -1196,10 +1220,8 @@ Exec_UnlistenAllCommit(void)
  *
  * This is called from postgres.c just before going idle at the completion
  * of a transaction.  If we issued any notifications in the just-completed
- * transaction, send signals to other backends to process them, and also
- * process the queue ourselves to send messages to our own frontend.
- * Also, if we filled enough queue pages with new notifies, try to advance
- * the queue tail pointer.
+ * transaction, process the queue ourselves to send messages to our own
+ * frontend.
  *
  * The reason that this is not done in AtCommit_Notify is that there is
  * a nonzero chance of errors here (for example, encoding conversion errors
@@ -1208,17 +1230,11 @@ Exec_UnlistenAllCommit(void)
  * to ensure that a transaction's self-notifies are delivered to the frontend
  * before it gets the terminating ReadyForQuery message.
  *
- * Note that we send signals and process the queue even if the transaction
- * eventually aborted.  This is because we need to clean out whatever got
- * added to the queue.
- *
  * NOTE: we are outside of any transaction here.
  */
 void
 ProcessCompletedNotifies(void)
 {
-    MemoryContext caller_context;
-
     /* Nothing to do if we didn't send any notifications */
     if (!backendHasSentNotifications)
         return;
@@ -1230,43 +1246,32 @@ ProcessCompletedNotifies(void)
      */
     backendHasSentNotifications = false;
 
-    /*
-     * We must preserve the caller's memory context (probably MessageContext)
-     * across the transaction we do here.
-     */
-    caller_context = CurrentMemoryContext;
-
     if (Trace_notify)
         elog(DEBUG1, "ProcessCompletedNotifies");
 
-    /*
-     * We must run asyncQueueReadAllNotifications inside a transaction, else
-     * bad things happen if it gets an error.
-     */
-    StartTransactionCommand();
-
-    /* Send signals to other backends */
-    SignalBackends();
-
     if (listenChannels != NIL)
     {
+        MemoryContext caller_context;
+
+        /*
+         * We must preserve the caller's memory context (probably
+         * MessageContext) across the transaction we do here.
+         */
+        caller_context = CurrentMemoryContext;
+
+        /*
+         * We must run asyncQueueReadAllNotifications inside a transaction,
+         * else bad things happen if it gets an error.
+         */
+        StartTransactionCommand();
+
         /* Read the queue ourselves, and send relevant stuff to the frontend */
         asyncQueueReadAllNotifications();
-    }
+        CommitTransactionCommand();
 
-    /*
-     * If it's time to try to advance the global tail pointer, do that.
-     */
-    if (backendTryAdvanceTail)
-    {
-        backendTryAdvanceTail = false;
-        asyncQueueAdvanceTail();
+        MemoryContextSwitchTo(caller_context);
     }
 
-    CommitTransactionCommand();
-
-    MemoryContextSwitchTo(caller_context);
-
     /* We don't need pq_flush() here since postgres.c will do one shortly */
 }
 
@@ -1655,13 +1660,16 @@ asyncQueueFillWarning(void)
  * advance their queue position pointers, allowing the global tail to advance.
  *
  * Since we know the BackendId and the Pid the signaling is quite cheap.
+ *
+ * Returns true if this backend is listening to notifications.
  */
-static void
+static bool
 SignalBackends(void)
 {
     int32       *pids;
     BackendId  *ids;
     int            count;
+    bool        am_listener = false;
 
     /*
      * Identify backends that we need to signal.  We don't want to send
@@ -1684,7 +1692,10 @@ SignalBackends(void)
 
         Assert(pid != InvalidPid);
         if (pid == MyProcPid)
+        {
+            am_listener = true;
             continue;            /* never signal self */
+        }
         pos = QUEUE_BACKEND_POS(i);
         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
         {
@@ -1729,6 +1740,8 @@ SignalBackends(void)
 
     pfree(pids);
     pfree(ids);
+
+    return am_listener;
 }
 
 /*
@@ -1752,6 +1765,16 @@ AtAbort_Notify(void)
 
     /* And clean up */
     ClearPendingActionsAndNotifies();
+
+    /*
+     * We can reach here having some notifications queued in this
+     * transaction. Advance tail pointer in that case.
+     */
+    if (backendTryAdvanceTail)
+    {
+        backendTryAdvanceTail = false;
+        asyncQueueAdvanceTail();
+    }
 }
 
 /*
-- 
2.18.2


pgsql-bugs by date:

Previous
From: Tom Lane
Date:
Subject: Re: BUG #16484: pg_regress fails with --outputdir parameter
Next
From: Felix Geisendörfer
Date:
Subject: Re: BUG #16487: EXPLAIN produces JSON with duplicate "Workers" arrays