notify duplicate elimination - Mailing list pgsql-hackers
From | Hardy Falk |
---|---|
Subject | notify duplicate elimination |
Date | |
Msg-id | 52F76DB8.2010804@blue-cable.de Whole thread Raw |
List | pgsql-hackers |
I have prepared a patch to backends/commands/async,c to speed up duplicate elimination. rdtsc timing results are sent back via ereport. *** a/src/backend/commands/async.c --- b/src/backend/commands/async.c *************** *** 326,337 **** typedef struct Notification --- 326,353 ---- { char *channel; /* channel name */ char *payload; /* payload string (can be empty) */ + uint32 hash ; /* speed up search for duplicates */ + struct Notification *left ; + struct Notification *right ; + } Notification; static List *pendingNotifies = NIL; /* list of Notifications */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ + static Notification *treerootNotifies = NULL ; /* speed up search for duplicates */ + #warning rdtsc just included for lack of a suitable reference + static uint64 rdtsc_total ; + static int rdtsc_count ; + static inline uint64 rdtsc(void) + { + uint64 x; + __asm__ volatile (".byte 0x0f, 0x31" : "=A" (x)); + return x; + } + + /* * State for inbound notifications consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify *************** *** 382,389 **** static void ProcessIncomingNotify(void); static void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid); - static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); /* * We will work on the page range of 0..QUEUE_MAX_PAGE. --- 398,408 ---- static void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid); static void ClearPendingActionsAndNotifies(void); + static Notification **AsyncSearchPendingNotifies(const char *channel, + const char *payload, + uint32 *hash); + /* Does pendingNotifies include the given channel/payload? */ /* * We will work on the page range of 0..QUEUE_MAX_PAGE. *************** *** 533,538 **** Async_Notify(const char *channel, const char *payload) --- 552,560 ---- { Notification *n; MemoryContext oldcontext; + Notification **nn ; + uint64 t1,t2 ; + uint32 hash ; if (Trace_notify) elog(DEBUG1, "Async_Notify(%s)", channel); *************** *** 557,564 **** Async_Notify(const char *channel, const char *payload) } /* no point in making duplicate entries in the list ... */ ! if (AsyncExistsPendingNotify(channel, payload)) ! return; /* * The notification list needs to live until end of transaction, so store --- 579,592 ---- } /* no point in making duplicate entries in the list ... */ ! t1 = rdtsc() ; ! nn = AsyncSearchPendingNotifies(channel,payload,&hash) ; ! t2 = rdtsc(); ! rdtsc_total += t2-t1 ; ! rdtsc_count += 1 ; ! if ( !nn ) /* this was a duplicate entry */ ! return ; ! /* * The notification list needs to live until end of transaction, so store *************** *** 566,584 **** Async_Notify(const char *channel, const char *payload) */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); ! n = (Notification *) palloc(sizeof(Notification)); n->channel = pstrdup(channel); if (payload) n->payload = pstrdup(payload); else n->payload = ""; ! /* ! * We want to preserve the order so we need to append every notification. ! * See comments at AsyncExistsPendingNotify(). */ pendingNotifies = lappend(pendingNotifies, n); - MemoryContextSwitchTo(oldcontext); } --- 594,625 ---- */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); ! n = (Notification *) palloc(sizeof(*n)); n->channel = pstrdup(channel); if (payload) n->payload = pstrdup(payload); else n->payload = ""; + + /* append to search tree */ + n->left = NULL ; + n->right = NULL ; + n->hash = hash ; + *nn = n ; ! /* We want to preserve the order so we need to append every notification. ! * As we are not checking our parents' lists, we can still get duplicates ! * in combination with subtransactions, like in: ! * ! * begin; ! * notify foo '1'; ! * savepoint foo; ! * notify foo '1'; ! * commit; ! *---------- */ + pendingNotifies = lappend(pendingNotifies, n); MemoryContextSwitchTo(oldcontext); } *************** *** 2149,2156 **** NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); } ! /* Does pendingNotifies include the given channel/payload? */ ! static bool AsyncExistsPendingNotify(const char *channel, const char *payload) { ListCell *p; --- 2190,2314 ---- elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); } ! ! static inline uint32 murmurhash2( const char* key, uint32 seed ) ! #warning yet another copy of murmurhash2 ! #warning should we use murmurhash3 or one of the hash functions from backend/utils/hash/hashfn.c? ! { ! const uint32 m = 0x5bd1e995; ! const int r = 24; ! int len = strlen(key) ; ! uint32 h = seed ^ len; ! uint32 k ; ! const unsigned char * data = (const unsigned char *)key; ! while(len >= 4) ! { ! memcpy(&k,data,4) ; ! ! k *= m; ! k ^= k >> r; ! k *= m; ! ! h *= m; ! h ^= k; ! ! data += 4; ! len -= 4; ! } ! ! /* Handle the last few bytes of the input array */ ! ! switch(len) ! { ! case 3: h ^= data[2] << 16; ! case 2: h ^= data[1] << 8; ! case 1: h ^= data[0]; ! h *= m; ! }; ! ! /* Do a few final mixes of the hash to ensure the last few */ ! /* bytes are well-incorporated. */ ! ! h ^= h >> 13; ! h *= m; ! h ^= h >> 15; ! ! return h; ! } ! ! static Notification ** ! AsyncSearchPendingNotifies ( const char *channel, const char *payload , uint32 *phash ) ! /* return NULL if (channel,payload) is a duplicate entry ! * otherwise, return an insert position and set phash. ! * a NULL payload is treated as an empty string. ! * the tree search is never worse than a linear list and the ! * mixing properties of the hash function will keep the average depth low ! */ ! { ! Notification *t ; ! Notification **p ; ! uint32 hash ; ! int depth = 0 ; ! ! if ( !payload ) ! payload = "" ; ! ! /*---------- ! * We need to append new elements to the end of the list in order to keep ! * the order. ! * As we are not checking our parents' lists, we can still get duplicates ! * in combination with subtransactions, like in: ! * ! * begin; ! * notify foo '1'; ! * savepoint foo; ! * notify foo '1'; ! * commit; ! *---------- ! */ ! /* avoid warning about possibly uninitialized value */ ! *phash = 0 ; ! if ( pendingNotifies ) ! { ! Notification *n = (Notification *) llast(pendingNotifies); ! if (strcmp(n->channel, channel) == 0 && ! strcmp(n->payload, payload) == 0) ! return NULL ; ! } ! ! hash = murmurhash2(channel,691) ; ! hash = murmurhash2(payload,hash) ; /* sadly, not incremental */ ! *phash = hash ; ! ! t = treerootNotifies ; ! p = &treerootNotifies ; ! while ( t ) ! { ! ++depth ; ! if ( hash < t->hash ) ! { ! p = &t->left ; ! t = t->left ; ! } ! else if ( hash > t->hash ) ! { ! p = &t->right ; ! t = t->right ; ! } ! else ! { ! if ( 0 == strcmp(t->channel,channel) && 0 == strcmp(t->payload,payload) ) ! return NULL ; ! p = &t->left ; ! t = t->left ; ! } ! } ! return p ; ! } ! ! ! #if 0 ! bool AsyncExistsPendingNotify(const char *channel, const char *payload) { ListCell *p; *************** *** 2196,2201 **** AsyncExistsPendingNotify(const char *channel, const char *payload) --- 2354,2360 ---- return false; } + #endif /* Clear the pendingActions and pendingNotifies lists. */ static void *************** *** 2210,2213 **** ClearPendingActionsAndNotifies(void) --- 2369,2380 ---- */ pendingActions = NIL; pendingNotifies = NIL; + + if ( rdtsc_count ) + ereport(NOTICE,(errmsg("AsyncSearchPendingNotifies: rtdsc timing: %.2f (%d)", (double)rdtsc_total/rdtsc_count,rdtsc_count))) ; + + treerootNotifies = NULL ; + rdtsc_total = 0 ; + rdtsc_count = 0 ; } +
pgsql-hackers by date: