Re: [HACKERS] Anyone working on asynchronous NOTIFY reception? - Mailing list pgsql-hackers

From Massimo Dal Zotto
Subject Re: [HACKERS] Anyone working on asynchronous NOTIFY reception?
Date
Msg-id 199804222036.WAA00620@nikita.wizard.it
Whole thread Raw
In response to Re: [HACKERS] Anyone working on asynchronous NOTIFY reception?  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-hackers
>
> > The biggest problem is that if you have many clients listening on the same
> > thing they are signaled at the same time and all of them try to access the
> > pg_listener table for write. The result is that you have a lot of waits on
> > the table and sometimes also deadlocks if you don't do things carefully.
>
> Right, I recall seeing some things about that in the mailing list
> archives (from you, no doubt?).  I had the impression that async.c
> had been changed to handle this better as of the current release.
> Is there still a problem?
>
> (Fortunately, I don't expect a *lot* of clients waiting on the same
> table, but deadlock would still be very bad news...)
>
> > From the Tcl side, a better solution would be to define a tcl event handler,
> > like the standard Tcl filehandler, which would be invoked automatically by
> > the Tk event loop or by tkwait if using pure Tcl.
>
> I agree.
>
> I don't have an immediate need for Tcl-based clients, so I was just
> going to revise libpg and libpg++.  Do you want to redo libpgtcl?
> I'd probably get to that eventually, but splitting the work sounds
> better :-).

Not now, I am too busy.

> I'll post something later today about what the extensions to the
> libpg API should look like.
>
> > I have also some new patches which try to reduce the notify overhead by
> > avoiding unnecessary unlocks of the table. If you are interested I can
> > post them.
>
> Please do.
>
>             regards, tom lane

This is the patch against 6.2.1p7. I haven't the the time to port it to 6.3.1.
The idea is to notify the backends while we have a write lock on the table
before doing the first CommitTransactionCommand. Otherwise if we must also
notify our frontend we almost certainly get the lock again only after all the
other backends have processed the notify and this may take a lot of time.

Note however that there is a little problem by releasing the lock before the
end of transaction: you may get duplicate records in pg_listener if more
backends are notifying the same relation at the same time. I don't know why
this happens and hadn't time to investigate, so I wrote a quick hack in
Async_NotifyFrontEnd_Aux() to avoid the problem (search for "notifyHack").

This is what I found in my pg_listener:
mytable        |        627|           0
mytable        |        627|           0
mytable        |        627|           0

And this is the patch for 6.2.1p7:

*** async.c.orig    Tue Jan 27 17:06:42 1998
--- async.c    Thu Mar 19 01:09:49 1998
***************
*** 22,30 ****
   *           notification (we are notifying something that we are listening),
   *           signal the corresponding frontend over the comm channel using the
   *           out-of-band channel.
!  *      2.b  For all other listening processes, we send kill(2) to wake up
!  *           the listening backend.
!  * 3. Upon receiving a kill(2) signal from another backend process notifying
   *      that one of the relation that we are listening is being notified,
   *      we can be in either of two following states:
   *      3.a  We are sleeping, wake up and signal our frontend.
--- 22,30 ----
   *           notification (we are notifying something that we are listening),
   *           signal the corresponding frontend over the comm channel using the
   *           out-of-band channel.
!  *      2.b  For all other listening processes, we send a SIGUSR2 signal
!  *           to wake up the listening backend.
!  * 3. Upon receiving a SIGUSR2 signal from another backend process notifying
   *      that one of the relation that we are listening is being notified,
   *      we can be in either of two following states:
   *      3.a  We are sleeping, wake up and signal our frontend.
***************
*** 85,99 ****
  #include <port-protos.h>        /* for strdup() */

  #include <storage/lmgr.h>

  static int    notifyFrontEndPending = 0;
  static int    notifyIssued = 0;
  static Dllist *pendingNotifies = NULL;

-
  static int    AsyncExistsPendingNotify(char *);
  static void ClearPendingNotify(void);
  static void Async_NotifyFrontEnd(void);
         void Async_Unlisten(char *relname, int pid);
  static void Async_UnlistenOnExit(int code, char *relname);

--- 85,105 ----
  #include <port-protos.h>        /* for strdup() */

  #include <storage/lmgr.h>
+ #include <utils/trace.h>
+
+ #define notifyUnlock pg_options[OPT_NOTIFYUNLOCK]
+ #define notifyHack   pg_options[OPT_NOTIFYHACK]
+
+ GlobalMemory notifyContext = NULL;

  static int    notifyFrontEndPending = 0;
  static int    notifyIssued = 0;
  static Dllist *pendingNotifies = NULL;

  static int    AsyncExistsPendingNotify(char *);
  static void ClearPendingNotify(void);
  static void Async_NotifyFrontEnd(void);
+ static void Async_NotifyFrontEnd_Aux(void);
         void Async_Unlisten(char *relname, int pid);
  static void Async_UnlistenOnExit(int code, char *relname);

***************
*** 121,145 ****
  {
      extern TransactionState CurrentTransactionState;

      if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
          (CurrentTransactionState->blockState == TRANS_DEFAULT))
      {
!
! #ifdef ASYNC_DEBUG
!         elog(DEBUG, "Waking up sleeping backend process");
! #endif
          Async_NotifyFrontEnd();
-
      }
      else
      {
! #ifdef ASYNC_DEBUG
!         elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
!              CurrentTransactionState->state,
!              CurrentTransactionState->blockState);
! #endif
          notifyFrontEndPending = 1;
      }
  }

  /*
--- 127,152 ----
  {
      extern TransactionState CurrentTransactionState;

+     TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
+
      if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
          (CurrentTransactionState->blockState == TRANS_DEFAULT))
      {
!         TPRINTF(TRACE_NOTIFY, "Waking up sleeping backend process");
          Async_NotifyFrontEnd();
      }
      else
      {
!         TPRINTF(TRACE_NOTIFY,
!                 "Process is in the middle of another transaction, "
!                 "state = %d, block state = %d",
!                 CurrentTransactionState->state,
!                 CurrentTransactionState->blockState);
          notifyFrontEndPending = 1;
+         TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
      }
+
+     TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler done");
  }

  /*
***************
*** 184,192 ****

      char       *notifyName;

! #ifdef ASYNC_DEBUG
!     elog(DEBUG, "Async_Notify: %s", relname);
! #endif

      if (!pendingNotifies)
          pendingNotifies = DLNewList();
--- 191,197 ----

      char       *notifyName;

!     TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);

      if (!pendingNotifies)
          pendingNotifies = DLNewList();
***************
*** 224,234 ****
              heap_replace(lRel, &lTuple->t_ctid, rTuple);
          }
          ReleaseBuffer(b);
      }
      heap_endscan(sRel);
!     RelationUnsetLockForWrite(lRel);
      heap_close(lRel);
!     notifyIssued = 1;
  }

  /*
--- 229,249 ----
              heap_replace(lRel, &lTuple->t_ctid, rTuple);
          }
          ReleaseBuffer(b);
+         notifyIssued = 1;
      }
      heap_endscan(sRel);
!
!     /*
!      * Note: if we unset the lock or we could get multiple tuples
!      * with same oid if other backends notify the same relation.
!      */
!     if (notifyUnlock) {
!         RelationUnsetLockForWrite(lRel);
!     }
!
      heap_close(lRel);
!
!     TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
  }

  /*
***************
*** 278,286 ****
          {                        /* 'notify <relname>' issued by us */
              notifyIssued = 0;
              StartTransactionCommand();
! #ifdef ASYNC_DEBUG
!             elog(DEBUG, "Async_NotifyAtCommit.");
! #endif
              ScanKeyEntryInitialize(&key, 0,
                                     Anum_pg_listener_notify,
                                     Integer32EqualRegProcedure,
--- 293,299 ----
          {                        /* 'notify <relname>' issued by us */
              notifyIssued = 0;
              StartTransactionCommand();
!             TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
              ScanKeyEntryInitialize(&key, 0,
                                     Anum_pg_listener_notify,
                                     Integer32EqualRegProcedure,
***************
*** 303,318 ****

                      if (ourpid == DatumGetInt32(d))
                      {
- #ifdef ASYNC_DEBUG
-                         elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
- #endif
                          notifyFrontEndPending = 1;
                      }
                      else
                      {
! #ifdef ASYNC_DEBUG
!                         elog(DEBUG, "Notifying others");
! #endif
  #ifdef HAVE_KILL
                          if (kill(DatumGetInt32(d), SIGUSR2) < 0)
                          {
--- 316,330 ----

                      if (ourpid == DatumGetInt32(d))
                      {
                          notifyFrontEndPending = 1;
+                         TPRINTF(TRACE_NOTIFY,
+                                 "Async_NotifyAtCommit notifying self");
                      }
                      else
                      {
!                         TPRINTF(TRACE_NOTIFY,
!                                 "Async_NotifyAtCommit notifying %d",
!                                 DatumGetInt32(d));
  #ifdef HAVE_KILL
                          if (kill(DatumGetInt32(d), SIGUSR2) < 0)
                          {
***************
*** 327,344 ****
                  ReleaseBuffer(b);
              }
              heap_endscan(sRel);
-             RelationUnsetLockForWrite(lRel);
              heap_close(lRel);
-
-             CommitTransactionCommand();
              ClearPendingNotify();
-         }

!         if (notifyFrontEndPending)
!         {                        /* we need to notify the frontend of all
!                                  * pending notifies. */
!             notifyFrontEndPending = 1;
!             Async_NotifyFrontEnd();
          }
      }
  }
--- 339,361 ----
                  ReleaseBuffer(b);
              }
              heap_endscan(sRel);
              heap_close(lRel);
              ClearPendingNotify();

!             if (notifyFrontEndPending)
!             {
!                 /* Notify the frontend inside the current transaction! */
!                 Async_NotifyFrontEnd_Aux();
!             }
!
!             TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit done");
!             CommitTransactionCommand();
!         } else {
!             /* Notify the frontend of pending notifies from other backends. */
!             if (notifyFrontEndPending)
!             {
!                 Async_NotifyFrontEnd();
!             }
          }
      }
  }
***************
*** 422,430 ****
      char       *relnamei;
      TupleDesc    tupDesc;

! #ifdef ASYNC_DEBUG
!     elog(DEBUG, "Async_Listen: %s", relname);
! #endif
      for (i = 0; i < Natts_pg_listener; i++)
      {
          nulls[i] = ' ';
--- 439,445 ----
      char       *relnamei;
      TupleDesc    tupDesc;

!     TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
      for (i = 0; i < Natts_pg_listener; i++)
      {
          nulls[i] = ' ';
***************
*** 457,462 ****
--- 472,480 ----
              }
          }
          ReleaseBuffer(b);
+         if (alreadyListener) {
+             break;
+         }
      }
      heap_endscan(s);

***************
*** 464,485 ****
      {
          elog(NOTICE, "Async_Listen: We are already listening on %s",
               relname);
          return;
      }

      tupDesc = lDesc->rd_att;
!     tup = heap_formtuple(tupDesc,
!                          values,
!                          nulls);
      heap_insert(lDesc, tup);
-
      pfree(tup);

-     /*
-      * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
-      * listener on %s (possibly dead)",relname); }
-      */
-
      RelationUnsetLockForWrite(lDesc);
      heap_close(lDesc);

--- 482,497 ----
      {
          elog(NOTICE, "Async_Listen: We are already listening on %s",
               relname);
+         RelationUnsetLockForWrite(lDesc);
+         heap_close(lDesc);
          return;
      }

      tupDesc = lDesc->rd_att;
!     tup = heap_formtuple(tupDesc, values, nulls);
      heap_insert(lDesc, tup);
      pfree(tup);

      RelationUnsetLockForWrite(lDesc);
      heap_close(lDesc);

***************
*** 519,534 ****
      lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
                                   Int32GetDatum(pid),
                                   0, 0);
-     lDesc = heap_openr(ListenerRelationName);
-     RelationSetLockForWrite(lDesc);
-
      if (lTuple != NULL)
      {
          heap_delete(lDesc, &lTuple->t_ctid);
-     }

!     RelationUnsetLockForWrite(lDesc);
!     heap_close(lDesc);
  }

  static void
--- 531,545 ----
      lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
                                   Int32GetDatum(pid),
                                   0, 0);
      if (lTuple != NULL)
      {
+         lDesc = heap_openr(ListenerRelationName);
+         RelationSetLockForWrite(lDesc);
          heap_delete(lDesc, &lTuple->t_ctid);

!         RelationUnsetLockForWrite(lDesc);
!         heap_close(lDesc);
!     }
  }

  static void
***************
*** 560,570 ****
   *
   * --------------------------------------------------------------
   */
- GlobalMemory notifyContext = NULL;
-
  static void
  Async_NotifyFrontEnd()
  {
      extern CommandDest whereToSendOutput;
      HeapTuple    lTuple,
                  rTuple;
--- 571,595 ----
   *
   * --------------------------------------------------------------
   */
  static void
  Async_NotifyFrontEnd()
  {
+     StartTransactionCommand();
+     Async_NotifyFrontEnd_Aux();
+     CommitTransactionCommand();
+ }
+
+ /*
+  * --------------------------------------------------------------
+  * Async_NotifyFrontEnd_Aux --
+  *
+  *        Like Async_NotifyFrontEnd but MUST be called inside a transaction.
+  *
+  * --------------------------------------------------------------
+  */
+ static void
+ Async_NotifyFrontEnd_Aux()
+ {
      extern CommandDest whereToSendOutput;
      HeapTuple    lTuple,
                  rTuple;
***************
*** 580,592 ****
      int            ourpid;
      bool        isnull;

!     notifyFrontEndPending = 0;

! #ifdef ASYNC_DEBUG
!     elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
! #endif

!     StartTransactionCommand();
      ourpid = getpid();
      ScanKeyEntryInitialize(&key[0], 0,
                             Anum_pg_listener_notify,
--- 605,616 ----
      int            ourpid;
      bool        isnull;

!     char        *hack[32];
!     int            i, hack_count = 0;

!     notifyFrontEndPending = 0;

!     TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
      ourpid = getpid();
      ScanKeyEntryInitialize(&key[0], 0,
                             Anum_pg_listener_notify,
***************
*** 611,620 ****
--- 635,664 ----
      {
          d = heap_getattr(lTuple, b, Anum_pg_listener_relname,
                           tdesc, &isnull);
+
+         /* Hack to delete duplicate tuples (possible if notifyUnlock is set) */
+         if (notifyHack) {
+             for (i=0; i<hack_count; i++) {
+                 if (strcmp(DatumGetName(d)->data, hack[i]) == 0) {
+                     TPRINTF(TRACE_NOTIFY,
+                             "Async_NotifyFrontEnd duplicate %s",
+                             DatumGetName(d)->data);
+                     heap_delete(lRel, &lTuple->t_ctid);
+                     goto release_buffer;
+                 }
+             }
+             if (hack_count < 32) {
+                 hack[hack_count++] = pstrdup(DatumGetName(d)->data);
+             }
+         }
+
          rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
          heap_replace(lRel, &lTuple->t_ctid, rTuple);

          /* notifying the front end */
+         TPRINTF(TRACE_NOTIFY,
+                 "Async_NotifyFrontEnd notifying %s",
+                 DatumGetName(d)->data);

          if (whereToSendOutput == Remote)
          {
***************
*** 625,635 ****
          }
          else
          {
!             elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
          }
          ReleaseBuffer(b);
      }
!     CommitTransactionCommand();
  }

  static int
--- 669,686 ----
          }
          else
          {
!             elog(NOTICE,
!                  "Async_NotifyFrontEnd: no asynchronous notification "
!                  "to frontend on interactive sessions");
          }
+
+     release_buffer:
          ReleaseBuffer(b);
      }
!     heap_endscan(sRel);
!     heap_close(lRel);
!     RelationUnsetLockForWrite(lRel);
!     TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd done");
  }

  static int


Massimo Dal Zotto

+----------------------------------------------------------------------+
|  Massimo Dal Zotto                e-mail:  dz@cs.unitn.it            |
|  Via Marconi, 141                 phone:  ++39-461-534251            |
|  38057 Pergine Valsugana (TN)     www:  http://www.cs.unitn.it/~dz/  |
|  Italy                            pgp:  finger dz@tango.cs.unitn.it  |
+----------------------------------------------------------------------+

pgsql-hackers by date:

Previous
From: ocie@paracel.com
Date:
Subject: Re: [HACKERS] Re: hackers-digest V1 #771 (safe/fast I/O)
Next
From: "Thomas G. Lockhart"
Date:
Subject: Re: [HACKERS] parser problem