Multiplexing SUGUSR1 - Mailing list pgsql-hackers

From Heikki Linnakangas
Subject Multiplexing SUGUSR1
Date
Msg-id 493CD508.9080509@enterprisedb.com
Whole thread Raw
Responses Re: Multiplexing SUGUSR1
Re: Multiplexing SUGUSR1
Re: Multiplexing SUGUSR1
Re: Multiplexing SUGUSR1
List pgsql-hackers
I've been looking at the signal handling part of the synchronous
replication patch. It looks OK, but one thing makes me worry.

To set or clear the flag from PGPROC, to send or handle a signal, we
have to acquire ProcArrayLock. Is that safe to do in a signal handler?
And is the performance impact of that acceptable?


Another observation is that the patch introduces a new function called
SendProcSignal. Nothing wrong with that, except that there's an existing
function called ProcSendSignal, just above SendProcSignal, so there's
some potential for confusion. The old ProcSendSignal function uses the
per-backend semaphore to wake up a backend. It's only used to wait for
the cleanup lock in bufmgr.c. I'm tempted to remove that altogether, and
use the new signal multiplexing for that too, but OTOH if it works,
maybe I shouldn't touch it.

Attached is a patch with some minor changes I've made. Mostly cosmetic,
but I did modify the sinval code so that ProcState has PGPROC pointer
instead of backend pid, so that we don't need to search the ProcArray to
find the PGPROC struct of the backend we're signaling.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 287,292 **** MarkAsPreparing(TransactionId xid, const char *gid,
--- 287,293 ----
      gxact->proc.databaseId = databaseid;
      gxact->proc.roleId = owner;
      gxact->proc.inCommit = false;
+     gxact->proc.signalFlags = 0;
      gxact->proc.vacuumFlags = 0;
      gxact->proc.lwWaiting = false;
      gxact->proc.lwExclusive = false;
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 915,923 **** EnableNotifyInterrupt(void)
   *        a frontend command.  Signal handler execution of inbound notifies
   *        is disabled until the next EnableNotifyInterrupt call.
   *
!  *        The SIGUSR1 signal handler also needs to call this, so as to
!  *        prevent conflicts if one signal interrupts the other.  So we
!  *        must return the previous state of the flag.
   */
  bool
  DisableNotifyInterrupt(void)
--- 915,924 ----
   *        a frontend command.  Signal handler execution of inbound notifies
   *        is disabled until the next EnableNotifyInterrupt call.
   *
!  *        This also needs to be called when SIGUSR1 with
!  *        PROCSIG_CATCHUP_INTERRUPT is received, so as to prevent conflicts
!  *        if one signal interrupts the other.  So we must return the previous
!  *        state of the flag.
   */
  bool
  DisableNotifyInterrupt(void)
***************
*** 954,960 **** ProcessIncomingNotify(void)
                  nulls[Natts_pg_listener];
      bool        catchup_enabled;

!     /* Must prevent SIGUSR1 interrupt while I am running */
      catchup_enabled = DisableCatchupInterrupt();

      if (Trace_notify)
--- 955,961 ----
                  nulls[Natts_pg_listener];
      bool        catchup_enabled;

!     /* Must prevent catchup interrupt while I am running */
      catchup_enabled = DisableCatchupInterrupt();

      if (Trace_notify)
*** a/src/backend/postmaster/autovacuum.c
--- b/src/backend/postmaster/autovacuum.c
***************
*** 1477,1483 **** AutoVacWorkerMain(int argc, char *argv[])
      pqsignal(SIGALRM, handle_sig_alarm);

      pqsignal(SIGPIPE, SIG_IGN);
!     pqsignal(SIGUSR1, CatchupInterruptHandler);
      /* We don't listen for async notifies */
      pqsignal(SIGUSR2, SIG_IGN);
      pqsignal(SIGFPE, FloatExceptionHandler);
--- 1477,1483 ----
      pqsignal(SIGALRM, handle_sig_alarm);

      pqsignal(SIGPIPE, SIG_IGN);
!     pqsignal(SIGUSR1, proc_sigusr1_handler);
      /* We don't listen for async notifies */
      pqsignal(SIGUSR2, SIG_IGN);
      pqsignal(SIGFPE, FloatExceptionHandler);
*** a/src/backend/storage/ipc/sinval.c
--- b/src/backend/storage/ipc/sinval.c
***************
*** 27,33 ****
   * need a way to give an idle backend a swift kick in the rear and make
   * it catch up before the sinval queue overflows and forces it to go
   * through a cache reset exercise.    This is done by sending SIGUSR1
!  * to any backend that gets too far behind.
   *
   * State for catchup events consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessCatchupEvent
--- 27,34 ----
   * need a way to give an idle backend a swift kick in the rear and make
   * it catch up before the sinval queue overflows and forces it to go
   * through a cache reset exercise.    This is done by sending SIGUSR1
!  * with PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far
!  * behind.
   *
   * State for catchup events consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessCatchupEvent
***************
*** 144,152 **** ReceiveSharedInvalidMessages(


  /*
!  * CatchupInterruptHandler
   *
!  * This is the signal handler for SIGUSR1.
   *
   * If we are idle (catchupInterruptEnabled is set), we can safely
   * invoke ProcessCatchupEvent directly.  Otherwise, just set a flag
--- 145,154 ----


  /*
!  * HandleCatchupInterrupt
   *
!  * This is called when SIGUSR1 with PROCSIG_CATCHUP_INTERRUPT is
!  * received.
   *
   * If we are idle (catchupInterruptEnabled is set), we can safely
   * invoke ProcessCatchupEvent directly.  Otherwise, just set a flag
***************
*** 156,168 **** ReceiveSharedInvalidMessages(
   * since there's no longer any reason to do anything.)
   */
  void
! CatchupInterruptHandler(SIGNAL_ARGS)
  {
-     int            save_errno = errno;
-
      /*
!      * Note: this is a SIGNAL HANDLER.    You must be very wary what you do
!      * here.
       */

      /* Don't joggle the elbow of proc_exit */
--- 158,168 ----
   * since there's no longer any reason to do anything.)
   */
  void
! HandleCatchupInterrupt(void)
  {
      /*
!      * Note: this is called by a SIGNAL HANDLER.
!      * You must be very wary what you do here.
       */

      /* Don't joggle the elbow of proc_exit */
***************
*** 216,223 **** CatchupInterruptHandler(SIGNAL_ARGS)
           */
          catchupInterruptOccurred = 1;
      }
-
-     errno = save_errno;
  }

  /*
--- 216,221 ----
***************
*** 289,295 **** DisableCatchupInterrupt(void)
  /*
   * ProcessCatchupEvent
   *
!  * Respond to a catchup event (SIGUSR1) from another backend.
   *
   * This is called either directly from the SIGUSR1 signal handler,
   * or the next time control reaches the outer idle loop (assuming
--- 287,294 ----
  /*
   * ProcessCatchupEvent
   *
!  * Respond to a catchup event (SIGUSR1 with PROCSIG_CATCHUP_INTERRUPT)
!  * from another backend.
   *
   * This is called either directly from the SIGUSR1 signal handler,
   * or the next time control reaches the outer idle loop (assuming
*** a/src/backend/storage/ipc/sinvaladt.c
--- b/src/backend/storage/ipc/sinvaladt.c
***************
*** 21,26 ****
--- 21,27 ----
  #include "storage/backendid.h"
  #include "storage/ipc.h"
  #include "storage/proc.h"
+ #include "storage/procarray.h"
  #include "storage/shmem.h"
  #include "storage/sinvaladt.h"
  #include "storage/spin.h"
***************
*** 136,144 ****
  /* Per-backend state in shared invalidation structure */
  typedef struct ProcState
  {
!     /* procPid is zero in an inactive ProcState array entry. */
!     pid_t        procPid;        /* PID of backend, for signaling */
!     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
      int            nextMsgNum;        /* next message number to read */
      bool        resetState;        /* backend needs to reset its state */
      bool        signaled;        /* backend has been sent catchup signal */
--- 137,145 ----
  /* Per-backend state in shared invalidation structure */
  typedef struct ProcState
  {
!     /* proc is NULL in an inactive ProcState array entry. */
!     PGPROC       *proc;            /* PGPROC entry of backend, for signaling */
!     /* nextMsgNum is meaningless if proc == NULL or resetState is true. */
      int            nextMsgNum;        /* next message number to read */
      bool        resetState;        /* backend needs to reset its state */
      bool        signaled;        /* backend has been sent catchup signal */
***************
*** 235,241 **** CreateSharedInvalidationState(void)
      /* Mark all backends inactive, and initialize nextLXID */
      for (i = 0; i < shmInvalBuffer->maxBackends; i++)
      {
!         shmInvalBuffer->procState[i].procPid = 0;            /* inactive */
          shmInvalBuffer->procState[i].nextMsgNum = 0;        /* meaningless */
          shmInvalBuffer->procState[i].resetState = false;
          shmInvalBuffer->procState[i].signaled = false;
--- 236,242 ----
      /* Mark all backends inactive, and initialize nextLXID */
      for (i = 0; i < shmInvalBuffer->maxBackends; i++)
      {
!         shmInvalBuffer->procState[i].proc = NULL;            /* inactive */
          shmInvalBuffer->procState[i].nextMsgNum = 0;        /* meaningless */
          shmInvalBuffer->procState[i].resetState = false;
          shmInvalBuffer->procState[i].signaled = false;
***************
*** 266,272 **** SharedInvalBackendInit(void)
      /* Look for a free entry in the procState array */
      for (index = 0; index < segP->lastBackend; index++)
      {
!         if (segP->procState[index].procPid == 0)        /* inactive slot? */
          {
              stateP = &segP->procState[index];
              break;
--- 267,273 ----
      /* Look for a free entry in the procState array */
      for (index = 0; index < segP->lastBackend; index++)
      {
!         if (segP->procState[index].proc == NULL)        /* inactive slot? */
          {
              stateP = &segP->procState[index];
              break;
***************
*** 278,284 **** SharedInvalBackendInit(void)
          if (segP->lastBackend < segP->maxBackends)
          {
              stateP = &segP->procState[segP->lastBackend];
!             Assert(stateP->procPid == 0);
              segP->lastBackend++;
          }
          else
--- 279,285 ----
          if (segP->lastBackend < segP->maxBackends)
          {
              stateP = &segP->procState[segP->lastBackend];
!             Assert(stateP->proc == NULL);
              segP->lastBackend++;
          }
          else
***************
*** 303,309 **** SharedInvalBackendInit(void)
      nextLocalTransactionId = stateP->nextLXID;

      /* mark myself active, with all extant messages already read */
!     stateP->procPid = MyProcPid;
      stateP->nextMsgNum = segP->maxMsgNum;
      stateP->resetState = false;
      stateP->signaled = false;
--- 304,310 ----
      nextLocalTransactionId = stateP->nextLXID;

      /* mark myself active, with all extant messages already read */
!     stateP->proc = MyProc;
      stateP->nextMsgNum = segP->maxMsgNum;
      stateP->resetState = false;
      stateP->signaled = false;
***************
*** 341,347 **** CleanupInvalidationState(int status, Datum arg)
      stateP->nextLXID = nextLocalTransactionId;

      /* Mark myself inactive */
!     stateP->procPid = 0;
      stateP->nextMsgNum = 0;
      stateP->resetState = false;
      stateP->signaled = false;
--- 342,348 ----
      stateP->nextLXID = nextLocalTransactionId;

      /* Mark myself inactive */
!     stateP->proc = NULL;
      stateP->nextMsgNum = 0;
      stateP->resetState = false;
      stateP->signaled = false;
***************
*** 349,355 **** CleanupInvalidationState(int status, Datum arg)
      /* Recompute index of last active backend */
      for (i = segP->lastBackend; i > 0; i--)
      {
!         if (segP->procState[i - 1].procPid != 0)
              break;
      }
      segP->lastBackend = i;
--- 350,356 ----
      /* Recompute index of last active backend */
      for (i = segP->lastBackend; i > 0; i--)
      {
!         if (segP->procState[i - 1].proc != NULL)
              break;
      }
      segP->lastBackend = i;
***************
*** 374,380 **** BackendIdIsActive(int backendID)
      {
          ProcState  *stateP = &segP->procState[backendID - 1];

!         result = (stateP->procPid != 0);
      }
      else
          result = false;
--- 375,381 ----
      {
          ProcState  *stateP = &segP->procState[backendID - 1];

!         result = (stateP->proc != NULL);
      }
      else
          result = false;
***************
*** 590,596 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
          int        n = stateP->nextMsgNum;

          /* Ignore if inactive or already in reset state */
!         if (stateP->procPid == 0 || stateP->resetState)
              continue;

          /*
--- 591,597 ----
          int        n = stateP->nextMsgNum;

          /* Ignore if inactive or already in reset state */
!         if (stateP->proc == NULL || stateP->resetState)
              continue;

          /*
***************
*** 644,661 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
          segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;

      /*
!      * Lastly, signal anyone who needs a catchup interrupt.  Since kill()
!      * might not be fast, we don't want to hold locks while executing it.
       */
      if (needSig)
      {
!         pid_t    his_pid = needSig->procPid;

          needSig->signaled = true;
          LWLockRelease(SInvalReadLock);
          LWLockRelease(SInvalWriteLock);
!         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
!         kill(his_pid, SIGUSR1);
          if (callerHasWriteLock)
              LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
      }
--- 645,664 ----
          segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;

      /*
!      * Lastly, signal anyone who needs a catchup interrupt.  Since
!      * SendProcSignal() might not be fast, we don't want to hold locks while
!      * executing it.
       */
      if (needSig)
      {
!         PGPROC *his_proc = needSig->proc;

          needSig->signaled = true;
          LWLockRelease(SInvalReadLock);
          LWLockRelease(SInvalWriteLock);
!         elog(DEBUG4, "sending sinval catchup signal to PID %d",
!              (int) his_proc->pid);
!         SendProcSignal(his_proc, PROCSIG_CATCHUP_INTERRUPT);
          if (callerHasWriteLock)
              LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
      }
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 289,294 **** InitProcess(void)
--- 289,295 ----
      MyProc->databaseId = InvalidOid;
      MyProc->roleId = InvalidOid;
      MyProc->inCommit = false;
+     MyProc->signalFlags = 0;
      MyProc->vacuumFlags = 0;
      if (IsAutoVacuumWorkerProcess())
          MyProc->vacuumFlags |= PROC_IS_AUTOVACUUM;
***************
*** 428,433 **** InitAuxiliaryProcess(void)
--- 429,435 ----
      MyProc->databaseId = InvalidOid;
      MyProc->roleId = InvalidOid;
      MyProc->inCommit = false;
+     MyProc->signalFlags = 0;
      /* we don't set the "is autovacuum" flag in the launcher */
      MyProc->vacuumFlags = 0;
      MyProc->lwWaiting = false;
***************
*** 1277,1282 **** ProcSendSignal(int pid)
--- 1279,1330 ----
          PGSemaphoreUnlock(&proc->sem);
  }

+ /*
+  * SendProcSignal - send the signal with the reason to the process
+  * (such as backend, autovacuum worker and auxiliary process)
+  * identified by proc.
+  */
+ void
+ SendProcSignal(PGPROC *proc, uint8 reason)
+ {
+     int pid;
+
+     if (proc == NULL)
+         return;
+
+     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+     pid = proc->pid;
+     if (pid != 0)
+         proc->signalFlags |= reason;
+     LWLockRelease(ProcArrayLock);
+
+     /* Send SIGUSR1 to the process */
+     kill(pid, SIGUSR1);
+ }
+
+ /*
+  * CheckProcSignal - check to see if the particular reason has been
+  * signaled, and clear the signal flag.  Should be called after
+  * receiving SIGUSR1.
+  */
+ bool
+ CheckProcSignal(uint8 reason)
+ {
+     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+     /* Careful here --- don't clear flag if we haven't seen it set */
+     if (MyProc->signalFlags & reason)
+     {
+         MyProc->signalFlags &= ~reason;
+         LWLockRelease(ProcArrayLock);
+         return true;
+     }
+
+     LWLockRelease(ProcArrayLock);
+
+     return false;
+ }
+

  /*****************************************************************************
   * SIGALRM interrupt support
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 2437,2442 **** drop_unnamed_stmt(void)
--- 2437,2464 ----
   */

  /*
+  * proc_sigusr1_handler - handle SIGUSR1 signal.
+  *
+  * SIGUSR1 is multiplexed to handle multiple different events. The signalFlags
+  * bitmask in PGPROC indicates which events have been signaled.
+  */
+ void
+ proc_sigusr1_handler(SIGNAL_ARGS)
+ {
+     int            save_errno = errno;
+
+     if (CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT))
+     {
+         /*
+          * Catchup interrupt has been sent.
+          */
+         HandleCatchupInterrupt();
+     }
+
+     errno = save_errno;
+ }
+
+ /*
   * quickdie() occurs when signalled SIGQUIT by the postmaster.
   *
   * Some backend has bought the farm,
***************
*** 3180,3186 **** PostgresMain(int argc, char *argv[], const char *username)
       * of output during who-knows-what operation...
       */
      pqsignal(SIGPIPE, SIG_IGN);
!     pqsignal(SIGUSR1, CatchupInterruptHandler);
      pqsignal(SIGUSR2, NotifyInterruptHandler);
      pqsignal(SIGFPE, FloatExceptionHandler);

--- 3202,3208 ----
       * of output during who-knows-what operation...
       */
      pqsignal(SIGPIPE, SIG_IGN);
!     pqsignal(SIGUSR1, proc_sigusr1_handler);
      pqsignal(SIGUSR2, NotifyInterruptHandler);
      pqsignal(SIGFPE, FloatExceptionHandler);

*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 38,43 **** struct XidCache
--- 38,46 ----
      TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS];
  };

+ /* Signals for PGPROC->signalFlags */
+ #define        PROCSIG_CATCHUP_INTERRUPT    0x01    /* catchup interrupt */
+
  /* Flags for PGPROC->vacuumFlags */
  #define        PROC_IS_AUTOVACUUM    0x01    /* is it an autovac worker? */
  #define        PROC_IN_VACUUM        0x02    /* currently running lazy vacuum */
***************
*** 91,96 **** struct PGPROC
--- 94,100 ----

      bool        inCommit;        /* true if within commit critical section */

+     uint8        signalFlags;    /* bitmask of signals raised, see above */
      uint8        vacuumFlags;    /* vacuum-related flags, see above */

      /* Info about LWLock the process is currently waiting for, if any. */
***************
*** 171,176 **** extern void LockWaitCancel(void);
--- 175,183 ----
  extern void ProcWaitForSignal(void);
  extern void ProcSendSignal(int pid);

+ extern void SendProcSignal(PGPROC *proc, uint8 reason);
+ extern bool CheckProcSignal(uint8 reason);
+
  extern bool enable_sig_alarm(int delayms, bool is_statement_timeout);
  extern bool disable_sig_alarm(bool is_statement_timeout);
  extern void handle_sig_alarm(SIGNAL_ARGS);
*** a/src/include/storage/sinval.h
--- b/src/include/storage/sinval.h
***************
*** 90,96 **** extern void ReceiveSharedInvalidMessages(
                               void (*resetFunction) (void));

  /* signal handler for catchup events (SIGUSR1) */
! extern void CatchupInterruptHandler(SIGNAL_ARGS);

  /*
   * enable/disable processing of catchup events directly from signal handler.
--- 90,96 ----
                               void (*resetFunction) (void));

  /* signal handler for catchup events (SIGUSR1) */
! extern void HandleCatchupInterrupt(void);

  /*
   * enable/disable processing of catchup events directly from signal handler.
*** a/src/include/tcop/tcopprot.h
--- b/src/include/tcop/tcopprot.h
***************
*** 56,61 **** extern List *pg_plan_queries(List *querytrees, int cursorOptions,
--- 56,62 ----

  extern bool assign_max_stack_depth(int newval, bool doit, GucSource source);

+ extern void proc_sigusr1_handler(SIGNAL_ARGS);
  extern void die(SIGNAL_ARGS);
  extern void quickdie(SIGNAL_ARGS);
  extern void authdie(SIGNAL_ARGS);

pgsql-hackers by date:

Previous
From: "David E. Wheeler"
Date:
Subject: Re: Polymorphic types vs. domains
Next
From: Andrew Gierth
Date:
Subject: Regexps vs. locale