*** a/doc/src/sgml/func.sgml
--- b/doc/src/sgml/func.sgml
***************
*** 14800,14805 **** SELECT * FROM pg_ls_dir('.') WITH ORDINALITY AS t(ls,n);
--- 14800,14811 ----
+ pg_notify_queue_saturation()
+ double
+ proportion of the asynchronous notification queue currently occupied
+
+
+
pg_my_temp_schema()
oid
OID of session's temporary schema, or 0 if none
***************
*** 14939,14948 **** SET search_path TO schema> , schema>, ..
pg_listening_channels
pg_listening_channels returns a set of names of
! channels that the current session is listening to. See for more information.
--- 14945,14962 ----
pg_listening_channels
+
+ pg_notify_queue_saturation
+
+
pg_listening_channels returns a set of names of
! asynchronous notification channels that the current session is listening
! to. pg_notify_queue_saturation returns the proportion
! of the total available space for notifications currently occupied by
! notifications that are waiting to be processed. See
! and
! for more information.
*** a/doc/src/sgml/ref/notify.sgml
--- b/doc/src/sgml/ref/notify.sgml
***************
*** 166,171 **** NOTIFY channel [ ,
+ The function pg_notify_queue_saturation returns the
+ proportion of the queue that is currently occupied by pending notifications.
+
+
A transaction that has executed NOTIFY cannot be
prepared for two-phase commit.
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 371,376 **** static bool asyncQueueIsFull(void);
--- 371,377 ----
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
+ static double asyncQueueSaturation(void);
static void asyncQueueFillWarning(void);
static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
***************
*** 1362,1387 **** asyncQueueAddEntries(ListCell *nextNotify)
}
/*
! * Check whether the queue is at least half full, and emit a warning if so.
! *
! * This is unlikely given the size of the queue, but possible.
! * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
*
! * Caller must hold exclusive AsyncQueueLock.
*/
! static void
! asyncQueueFillWarning(void)
{
! int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
! int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
! int occupied;
! double fillDegree;
! TimestampTz t;
occupied = headPage - tailPage;
if (occupied == 0)
! return; /* fast exit for common case */
if (occupied < 0)
{
--- 1363,1399 ----
}
/*
! * SQL function to return the proportion of the notification queue currently
! * occupied.
! */
! Datum
! pg_notify_queue_saturation(PG_FUNCTION_ARGS)
! {
! double saturation;
!
! LWLockAcquire(AsyncQueueLock, LW_SHARED);
! saturation = asyncQueueSaturation();
! LWLockRelease(AsyncQueueLock);
!
! PG_RETURN_FLOAT8(saturation);
! }
!
! /*
! * Return the proportion of the queue that is currently occupied.
*
! * The caller must hold (at least) shared AysncQueueLock.
*/
! static double
! asyncQueueSaturation(void)
{
! int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
! int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
! int occupied;
occupied = headPage - tailPage;
if (occupied == 0)
! return (double) 0; /* fast exit for common case */
if (occupied < 0)
{
***************
*** 1389,1396 **** asyncQueueFillWarning(void)
occupied += QUEUE_MAX_PAGE + 1;
}
! fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
if (fillDegree < 0.5)
return;
--- 1401,1424 ----
occupied += QUEUE_MAX_PAGE + 1;
}
! return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
! }
!
! /*
! * Check whether the queue is at least half full, and emit a warning if so.
! *
! * This is unlikely given the size of the queue, but possible.
! * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
! *
! * Caller must hold exclusive AsyncQueueLock.
! */
! static void
! asyncQueueFillWarning(void)
! {
! double fillDegree;
! TimestampTz t;
+ fillDegree = asyncQueueSaturation();
if (fillDegree < 0.5)
return;
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 4036,4045 **** DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 0 f f f f t
DESCR("get the available time zone names");
DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));
DESCR("trigger description with pretty-print option");
! DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to");
! DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event");
/* non-persistent series generator */
DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ ));
--- 4036,4049 ----
DESCR("get the available time zone names");
DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));
DESCR("trigger description with pretty-print option");
!
! /* asynchronous notifications */
! DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
DESCR("get the channels that the current backend listens to");
! DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
DESCR("send a notification event");
+ DATA(insert OID = 3293 ( pg_notify_queue_saturation PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notify_queue_saturation _null_ _null_ _null_ ));
+ DESCR("get the saturation of the asynchronous notification queue");
/* non-persistent series generator */
DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ ));
*** a/src/include/commands/async.h
--- b/src/include/commands/async.h
***************
*** 37,42 **** extern void Async_UnlistenAll(void);
--- 37,43 ----
/* notify-related SQL functions */
extern Datum pg_listening_channels(PG_FUNCTION_ARGS);
extern Datum pg_notify(PG_FUNCTION_ARGS);
+ extern Datum pg_notify_queue_saturation(PG_FUNCTION_ARGS);
/* perform (or cancel) outbound notify processing at transaction commit */
extern void PreCommit_Notify(void);
*** /dev/null
--- b/src/test/isolation/expected/async-notify.out
***************
*** 0 ****
--- 1,17 ----
+ Parsed test spec with 2 sessions
+
+ starting permutation: listen begin check notify check
+ step listen: LISTEN a;
+ step begin: BEGIN;
+ step check: SELECT pg_notify_queue_saturation() > 0 AS nonzero;
+ nonzero
+
+ f
+ step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
+ count
+
+ 1000
+ step check: SELECT pg_notify_queue_saturation() > 0 AS nonzero;
+ nonzero
+
+ t
*** /dev/null
--- b/src/test/isolation/specs/async-notify.spec
***************
*** 0 ****
--- 1,14 ----
+ # Verify that pg_notify_queue_saturation correctly reports a non-zero result,
+ # after submitting notifications while another connection is listening for
+ # those notifications and waiting inside an active transaction.
+
+ session "listener"
+ step "listen" { LISTEN a; }
+ step "begin" { BEGIN; }
+ teardown { ROLLBACK; }
+
+ session "notifier"
+ step "check" { SELECT pg_notify_queue_saturation() > 0 AS nonzero; }
+ step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
+
+ permutation "listen" "begin" "check" "notify" "check"
*** a/src/test/regress/expected/async.out
--- b/src/test/regress/expected/async.out
***************
*** 32,34 **** NOTIFY notify_async2;
--- 32,42 ----
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
+ -- Should return zero while there are no pending notifications.
+ -- src/test/isolation/specs/async-notify.spec actually tests for saturation.
+ SELECT pg_notify_queue_saturation();
+ pg_notify_queue_saturation
+ ----------------------------
+ 0
+ (1 row)
+
*** a/src/test/regress/sql/async.sql
--- b/src/test/regress/sql/async.sql
***************
*** 17,19 **** NOTIFY notify_async2;
--- 17,23 ----
LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
+
+ -- Should return zero while there are no pending notifications.
+ -- src/test/isolation/specs/async-notify.spec actually tests for saturation.
+ SELECT pg_notify_queue_saturation();