I have prepared a patch to backends/commands/async,c to speed up
duplicate elimination. rdtsc timing results are sent back via ereport.
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 326,337 **** typedef struct Notification
--- 326,353 ----
{
char *channel; /* channel name */
char *payload; /* payload string (can be empty) */
+ uint32 hash ; /* speed up search for duplicates */
+ struct Notification *left ;
+ struct Notification *right ;
+
} Notification;
static List *pendingNotifies = NIL; /* list of Notifications */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
+ static Notification *treerootNotifies = NULL ; /* speed up search for duplicates */
+ #warning rdtsc just included for lack of a suitable reference
+ static uint64 rdtsc_total ;
+ static int rdtsc_count ;
+ static inline uint64 rdtsc(void)
+ {
+ uint64 x;
+ __asm__ volatile (".byte 0x0f, 0x31" : "=A" (x));
+ return x;
+ }
+
+
/*
* State for inbound notifications consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessIncomingNotify
***************
*** 382,389 **** static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
- static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void);
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
--- 398,408 ----
static void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
static void ClearPendingActionsAndNotifies(void);
+ static Notification **AsyncSearchPendingNotifies(const char *channel,
+ const char *payload,
+ uint32 *hash);
+ /* Does pendingNotifies include the given channel/payload? */
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
***************
*** 533,538 **** Async_Notify(const char *channel, const char *payload)
--- 552,560 ----
{
Notification *n;
MemoryContext oldcontext;
+ Notification **nn ;
+ uint64 t1,t2 ;
+ uint32 hash ;
if (Trace_notify)
elog(DEBUG1, "Async_Notify(%s)", channel);
***************
*** 557,564 **** Async_Notify(const char *channel, const char *payload)
}
/* no point in making duplicate entries in the list ... */
! if (AsyncExistsPendingNotify(channel, payload))
! return;
/*
* The notification list needs to live until end of transaction, so store
--- 579,592 ----
}
/* no point in making duplicate entries in the list ... */
! t1 = rdtsc() ;
! nn = AsyncSearchPendingNotifies(channel,payload,&hash) ;
! t2 = rdtsc();
! rdtsc_total += t2-t1 ;
! rdtsc_count += 1 ;
! if ( !nn ) /* this was a duplicate entry */
! return ;
!
/*
* The notification list needs to live until end of transaction, so store
***************
*** 566,584 **** Async_Notify(const char *channel, const char *payload)
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
! n = (Notification *) palloc(sizeof(Notification));
n->channel = pstrdup(channel);
if (payload)
n->payload = pstrdup(payload);
else
n->payload = "";
! /*
! * We want to preserve the order so we need to append every notification.
! * See comments at AsyncExistsPendingNotify().
*/
pendingNotifies = lappend(pendingNotifies, n);
-
MemoryContextSwitchTo(oldcontext);
}
--- 594,625 ----
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
! n = (Notification *) palloc(sizeof(*n));
n->channel = pstrdup(channel);
if (payload)
n->payload = pstrdup(payload);
else
n->payload = "";
+
+ /* append to search tree */
+ n->left = NULL ;
+ n->right = NULL ;
+ n->hash = hash ;
+ *nn = n ;
! /* We want to preserve the order so we need to append every notification.
! * As we are not checking our parents' lists, we can still get duplicates
! * in combination with subtransactions, like in:
! *
! * begin;
! * notify foo '1';
! * savepoint foo;
! * notify foo '1';
! * commit;
! *----------
*/
+
pendingNotifies = lappend(pendingNotifies, n);
MemoryContextSwitchTo(oldcontext);
}
***************
*** 2149,2156 **** NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
}
! /* Does pendingNotifies include the given channel/payload? */
! static bool
AsyncExistsPendingNotify(const char *channel, const char *payload)
{
ListCell *p;
--- 2190,2314 ----
elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
}
!
! static inline uint32 murmurhash2( const char* key, uint32 seed )
! #warning yet another copy of murmurhash2
! #warning should we use murmurhash3 or one of the hash functions from backend/utils/hash/hashfn.c?
! {
! const uint32 m = 0x5bd1e995;
! const int r = 24;
! int len = strlen(key) ;
! uint32 h = seed ^ len;
! uint32 k ;
! const unsigned char * data = (const unsigned char *)key;
! while(len >= 4)
! {
! memcpy(&k,data,4) ;
!
! k *= m;
! k ^= k >> r;
! k *= m;
!
! h *= m;
! h ^= k;
!
! data += 4;
! len -= 4;
! }
!
! /* Handle the last few bytes of the input array */
!
! switch(len)
! {
! case 3: h ^= data[2] << 16;
! case 2: h ^= data[1] << 8;
! case 1: h ^= data[0];
! h *= m;
! };
!
! /* Do a few final mixes of the hash to ensure the last few */
! /* bytes are well-incorporated. */
!
! h ^= h >> 13;
! h *= m;
! h ^= h >> 15;
!
! return h;
! }
!
! static Notification **
! AsyncSearchPendingNotifies ( const char *channel, const char *payload , uint32 *phash )
! /* return NULL if (channel,payload) is a duplicate entry
! * otherwise, return an insert position and set phash.
! * a NULL payload is treated as an empty string.
! * the tree search is never worse than a linear list and the
! * mixing properties of the hash function will keep the average depth low
! */
! {
! Notification *t ;
! Notification **p ;
! uint32 hash ;
! int depth = 0 ;
!
! if ( !payload )
! payload = "" ;
!
! /*----------
! * We need to append new elements to the end of the list in order to keep
! * the order.
! * As we are not checking our parents' lists, we can still get duplicates
! * in combination with subtransactions, like in:
! *
! * begin;
! * notify foo '1';
! * savepoint foo;
! * notify foo '1';
! * commit;
! *----------
! */
! /* avoid warning about possibly uninitialized value */
! *phash = 0 ;
! if ( pendingNotifies )
! {
! Notification *n = (Notification *) llast(pendingNotifies);
! if (strcmp(n->channel, channel) == 0 &&
! strcmp(n->payload, payload) == 0)
! return NULL ;
! }
!
! hash = murmurhash2(channel,691) ;
! hash = murmurhash2(payload,hash) ; /* sadly, not incremental */
! *phash = hash ;
!
! t = treerootNotifies ;
! p = &treerootNotifies ;
! while ( t )
! {
! ++depth ;
! if ( hash < t->hash )
! {
! p = &t->left ;
! t = t->left ;
! }
! else if ( hash > t->hash )
! {
! p = &t->right ;
! t = t->right ;
! }
! else
! {
! if ( 0 == strcmp(t->channel,channel) && 0 == strcmp(t->payload,payload) )
! return NULL ;
! p = &t->left ;
! t = t->left ;
! }
! }
! return p ;
! }
!
!
! #if 0
! bool
AsyncExistsPendingNotify(const char *channel, const char *payload)
{
ListCell *p;
***************
*** 2196,2201 **** AsyncExistsPendingNotify(const char *channel, const char *payload)
--- 2354,2360 ----
return false;
}
+ #endif
/* Clear the pendingActions and pendingNotifies lists. */
static void
***************
*** 2210,2213 **** ClearPendingActionsAndNotifies(void)
--- 2369,2380 ----
*/
pendingActions = NIL;
pendingNotifies = NIL;
+
+ if ( rdtsc_count )
+ ereport(NOTICE,(errmsg("AsyncSearchPendingNotifies: rtdsc timing: %.2f (%d)",
(double)rdtsc_total/rdtsc_count,rdtsc_count))) ;
+
+ treerootNotifies = NULL ;
+ rdtsc_total = 0 ;
+ rdtsc_count = 0 ;
}
+