notify duplicate elimination - Mailing list pgsql-hackers

From Hardy Falk
Subject notify duplicate elimination
Date
Msg-id 52F76DB8.2010804@blue-cable.de
Whole thread Raw
List pgsql-hackers
I have prepared a patch to backends/commands/async,c to speed up
duplicate elimination. rdtsc timing results are sent back via ereport.



*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 326,337 **** typedef struct Notification
--- 326,353 ----
  {
      char       *channel;        /* channel name */
      char       *payload;        /* payload string (can be empty) */
+     uint32      hash ;            /* speed up search for duplicates */
+     struct Notification *left  ;
+     struct Notification *right ;
+
  } Notification;

  static List *pendingNotifies = NIL;        /* list of Notifications */

  static List *upperPendingNotifies = NIL;        /* list of upper-xact lists */

+ static Notification *treerootNotifies = NULL ; /* speed up search for duplicates */
+ #warning rdtsc just included for lack of a suitable reference
+ static uint64   rdtsc_total ;
+ static int      rdtsc_count ;
+ static inline uint64 rdtsc(void)
+ {
+     uint64  x;
+     __asm__ volatile (".byte 0x0f, 0x31" : "=A" (x));
+     return x;
+ }
+
+
  /*
   * State for inbound notifications consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
***************
*** 382,389 **** static void ProcessIncomingNotify(void);
  static void NotifyMyFrontEnd(const char *channel,
                   const char *payload,
                   int32 srcPid);
- static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
  static void ClearPendingActionsAndNotifies(void);

  /*
   * We will work on the page range of 0..QUEUE_MAX_PAGE.
--- 398,408 ----
  static void NotifyMyFrontEnd(const char *channel,
                   const char *payload,
                   int32 srcPid);
  static void ClearPendingActionsAndNotifies(void);
+ static Notification **AsyncSearchPendingNotifies(const char *channel,
+                 const char *payload,
+                 uint32 *hash);
+ /* Does pendingNotifies include the given channel/payload? */

  /*
   * We will work on the page range of 0..QUEUE_MAX_PAGE.
***************
*** 533,538 **** Async_Notify(const char *channel, const char *payload)
--- 552,560 ----
  {
      Notification *n;
      MemoryContext oldcontext;
+     Notification **nn ;
+     uint64 t1,t2 ;
+     uint32 hash ;

      if (Trace_notify)
          elog(DEBUG1, "Async_Notify(%s)", channel);
***************
*** 557,564 **** Async_Notify(const char *channel, const char *payload)
      }

      /* no point in making duplicate entries in the list ... */
!     if (AsyncExistsPendingNotify(channel, payload))
!         return;

      /*
       * The notification list needs to live until end of transaction, so store
--- 579,592 ----
      }

      /* no point in making duplicate entries in the list ... */
!     t1 = rdtsc() ;
!     nn = AsyncSearchPendingNotifies(channel,payload,&hash) ;
!     t2 = rdtsc();
!     rdtsc_total += t2-t1 ;
!     rdtsc_count += 1 ;
!     if ( !nn ) /* this was a duplicate entry */
!         return ;
!

      /*
       * The notification list needs to live until end of transaction, so store
***************
*** 566,584 **** Async_Notify(const char *channel, const char *payload)
       */
      oldcontext = MemoryContextSwitchTo(CurTransactionContext);

!     n = (Notification *) palloc(sizeof(Notification));
      n->channel = pstrdup(channel);
      if (payload)
          n->payload = pstrdup(payload);
      else
          n->payload = "";

!     /*
!      * We want to preserve the order so we need to append every notification.
!      * See comments at AsyncExistsPendingNotify().
       */
      pendingNotifies = lappend(pendingNotifies, n);
-
      MemoryContextSwitchTo(oldcontext);
  }

--- 594,625 ----
       */
      oldcontext = MemoryContextSwitchTo(CurTransactionContext);

!     n = (Notification *) palloc(sizeof(*n));
      n->channel = pstrdup(channel);
      if (payload)
          n->payload = pstrdup(payload);
      else
          n->payload = "";
+
+     /* append to search tree */
+     n->left  = NULL ;
+     n->right = NULL ;
+     n->hash  = hash ;
+     *nn = n ;

!     /* We want to preserve the order so we need to append every notification.
!      * As we are not checking our parents' lists, we can still get duplicates
!      * in combination with subtransactions, like in:
!      *
!      * begin;
!      * notify foo '1';
!      * savepoint foo;
!      * notify foo '1';
!      * commit;
!      *----------
       */
+
      pendingNotifies = lappend(pendingNotifies, n);
      MemoryContextSwitchTo(oldcontext);
  }

***************
*** 2149,2156 **** NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
          elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
  }

! /* Does pendingNotifies include the given channel/payload? */
! static bool
  AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
      ListCell   *p;
--- 2190,2314 ----
          elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
  }

!
! static inline uint32 murmurhash2( const char* key, uint32 seed )
! #warning yet another copy of murmurhash2
! #warning should we use murmurhash3 or one of the hash functions from backend/utils/hash/hashfn.c?
! {
!     const uint32  m = 0x5bd1e995;
!     const int r = 24;
!     int len = strlen(key) ;
!     uint32 h = seed ^ len;
!     uint32 k ;
!     const unsigned char * data = (const unsigned char *)key;
!     while(len >= 4)
!     {
!         memcpy(&k,data,4) ;
!
!         k *= m;
!         k ^= k >> r;
!         k *= m;
!
!         h *= m;
!         h ^= k;
!
!         data += 4;
!         len -= 4;
!     }
!
!     /* Handle the last few bytes of the input array */
!
!     switch(len)
!     {
!     case 3: h ^= data[2] << 16;
!     case 2: h ^= data[1] << 8;
!     case 1: h ^= data[0];
!             h *= m;
!     };
!
!     /* Do a few final mixes of the hash to ensure the last few */
!     /* bytes are well-incorporated. */
!
!     h ^= h >> 13;
!     h *= m;
!     h ^= h >> 15;
!
!     return h;
! }
!
! static Notification **
! AsyncSearchPendingNotifies ( const char *channel, const char *payload , uint32 *phash )
! /* return NULL if (channel,payload) is a duplicate entry
!  * otherwise, return an insert position and set phash.
!  * a NULL payload is treated as an empty string.
!  * the tree search is never worse than a linear list and the
!  * mixing properties of the hash function will keep the average depth low
!  */
! {
!     Notification   *t ;
!     Notification  **p ;
!     uint32 hash ;
!     int depth = 0 ;
!
!     if ( !payload )
!         payload = "" ;
!
!     /*----------
!      * We need to append new elements to the end of the list in order to keep
!      * the order.
!      * As we are not checking our parents' lists, we can still get duplicates
!      * in combination with subtransactions, like in:
!      *
!      * begin;
!      * notify foo '1';
!      * savepoint foo;
!      * notify foo '1';
!      * commit;
!      *----------
!      */
!     /* avoid warning about possibly uninitialized value */
!     *phash = 0 ;
!     if ( pendingNotifies )
!     {
!         Notification *n = (Notification *) llast(pendingNotifies);
!         if (strcmp(n->channel, channel) == 0 &&
!             strcmp(n->payload, payload) == 0)
!         return NULL ;
!     }
!
!     hash = murmurhash2(channel,691) ;
!     hash = murmurhash2(payload,hash) ; /* sadly, not incremental */
!     *phash = hash ;
!
!     t =  treerootNotifies ;
!     p = &treerootNotifies ;
!     while ( t )
!     {
!         ++depth ;
!         if ( hash < t->hash )
!         {
!             p = &t->left ;
!             t = t->left ;
!         }
!         else if ( hash > t->hash )
!         {
!             p = &t->right ;
!             t = t->right ;
!         }
!         else
!         {
!             if ( 0 == strcmp(t->channel,channel) && 0 == strcmp(t->payload,payload) )
!                 return NULL ;
!             p = &t->left ;
!             t = t->left ;
!         }
!     }
!     return p ;
! }
!
!
! #if 0
! bool
  AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
      ListCell   *p;
***************
*** 2196,2201 **** AsyncExistsPendingNotify(const char *channel, const char *payload)
--- 2354,2360 ----

      return false;
  }
+ #endif

  /* Clear the pendingActions and pendingNotifies lists. */
  static void
***************
*** 2210,2213 **** ClearPendingActionsAndNotifies(void)
--- 2369,2380 ----
       */
      pendingActions = NIL;
      pendingNotifies = NIL;
+
+     if ( rdtsc_count )
+         ereport(NOTICE,(errmsg("AsyncSearchPendingNotifies: rtdsc timing: %.2f (%d)",
(double)rdtsc_total/rdtsc_count,rdtsc_count))) ;  
+
+     treerootNotifies = NULL ;
+     rdtsc_total = 0 ;
+     rdtsc_count = 0 ;
  }
+

pgsql-hackers by date:

Previous
From: Magnus Hagander
Date:
Subject: Re: [PATCH] pg_basebackup: progress report max once per second
Next
From: Magnus Hagander
Date:
Subject: Terminating pg_basebackup background streamer