Re: Testing LISTEN/NOTIFY more effectively - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Testing LISTEN/NOTIFY more effectively
Date
Msg-id 29162.1564266052@sss.pgh.pa.us
Whole thread Raw
In response to Re: Testing LISTEN/NOTIFY more effectively  (Tom Lane <tgl@sss.pgh.pa.us>)
Responses Re: Testing LISTEN/NOTIFY more effectively  (Andres Freund <andres@anarazel.de>)
List pgsql-hackers
I wrote:
> Andres Freund <andres@anarazel.de> writes:
>> Perhaps we could just have isolationtester check to which
>> isolationtester session the backend pid belongs? And then print the
>> session name instead of the pid? That should be fairly easy, and would
>> probably give us all we need?

> Oh, that's a good idea -- it's already tracking all the backend PIDs,
> so probably not much extra work to do it like that.

I found out that to avoid confusion, one really wants the message to
identify both the sending and receiving sessions.  Here's a patch
that does it that way and extends the async-notify.spec test to
perform basic end-to-end checks on LISTEN/NOTIFY.

I intentionally made the test show the lack of NOTIFY de-deduplication
that currently happens with subtransactions.  If we change this as I
proposed in <17822.1564186806@sss.pgh.pa.us>, this test output will
change.

            regards, tom lane

diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 92d281a..3dc1227 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,17 +1,82 @@
 Parsed test spec with 2 sessions

-starting permutation: listen begin check notify check
-step listen: LISTEN a;
-step begin: BEGIN;
-step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
+starting permutation: listenc notify1 notify2 notify3
+step listenc: LISTEN c1; LISTEN c2;
+step notify1: NOTIFY c1;
+notifier: NOTIFY "c1" with payload "" from notifier
+step notify2: NOTIFY c2, 'payload';
+notifier: NOTIFY "c2" with payload "payload" from notifier
+step notify3: NOTIFY c3, 'payload3';
+
+starting permutation: listenc notifyd notifys1
+step listenc: LISTEN c1; LISTEN c2;
+step notifyd: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload';
+notifier: NOTIFY "c2" with payload "payload" from notifier
+notifier: NOTIFY "c1" with payload "" from notifier
+step notifys1:
+    BEGIN;
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    SAVEPOINT s1;
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s';
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s';
+    RELEASE SAVEPOINT s1;
+    SAVEPOINT s2;
+    NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2';
+    NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s';
+    NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2';
+    NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s';
+    ROLLBACK TO SAVEPOINT s2;
+    COMMIT;
+
+notifier: NOTIFY "c1" with payload "payload1" from notifier
+notifier: NOTIFY "c2" with payload "payload2" from notifier
+notifier: NOTIFY "c1" with payload "payload1" from notifier
+notifier: NOTIFY "c2" with payload "payload2" from notifier
+notifier: NOTIFY "c1" with payload "payload1s" from notifier
+notifier: NOTIFY "c2" with payload "payload2s" from notifier
+
+starting permutation: llisten notify1 notify2 notify3 lcheck
+step llisten: LISTEN c1; LISTEN c2;
+step notify1: NOTIFY c1;
+step notify2: NOTIFY c2, 'payload';
+step notify3: NOTIFY c3, 'payload3';
+step lcheck: SELECT 1 AS x;
+x
+
+1
+listener: NOTIFY "c1" with payload "" from notifier
+listener: NOTIFY "c2" with payload "payload" from notifier
+
+starting permutation: listenc llisten notify1 notify2 notify3 lcheck
+step listenc: LISTEN c1; LISTEN c2;
+step llisten: LISTEN c1; LISTEN c2;
+step notify1: NOTIFY c1;
+notifier: NOTIFY "c1" with payload "" from notifier
+step notify2: NOTIFY c2, 'payload';
+notifier: NOTIFY "c2" with payload "payload" from notifier
+step notify3: NOTIFY c3, 'payload3';
+step lcheck: SELECT 1 AS x;
+x
+
+1
+listener: NOTIFY "c1" with payload "" from notifier
+listener: NOTIFY "c2" with payload "payload" from notifier
+
+starting permutation: llisten lbegin usage bignotify usage
+step llisten: LISTEN c1; LISTEN c2;
+step lbegin: BEGIN;
+step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
 nonzero

 f
-step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
+step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s;
 count

 1000
-step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
+step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
 nonzero

 t
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 6ab19b1..98e5bf2 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -23,10 +23,12 @@

 /*
  * conns[0] is the global setup, teardown, and watchdog connection.  Additional
- * connections represent spec-defined sessions.
+ * connections represent spec-defined sessions.  We also track the backend
+ * PID, in numeric and string formats, for each connection.
  */
 static PGconn **conns = NULL;
-static const char **backend_pids = NULL;
+static int *backend_pids = NULL;
+static const char **backend_pid_strs = NULL;
 static int    nconns = 0;

 /* In dry run only output permutations to be run by the tester. */
@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);

 #define STEP_NONBLOCK    0x1        /* return 0 as soon as cmd waits for a lock */
 #define STEP_RETRY        0x2        /* this is a retry of a previously-waiting cmd */
-static bool try_complete_step(Step *step, int flags);
+static bool try_complete_step(TestSpec *testspec, Step *step, int flags);

 static int    step_qsort_cmp(const void *a, const void *b);
 static int    step_bsearch_cmp(const void *a, const void *b);
@@ -159,9 +161,11 @@ main(int argc, char **argv)
      * extra for lock wait detection and global work.
      */
     nconns = 1 + testspec->nsessions;
-    conns = calloc(nconns, sizeof(PGconn *));
+    conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *));
+    backend_pids = pg_malloc0(nconns * sizeof(*backend_pids));
+    backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs));
     atexit(disconnect_atexit);
-    backend_pids = calloc(nconns, sizeof(*backend_pids));
+
     for (i = 0; i < nconns; i++)
     {
         conns[i] = PQconnectdb(conninfo);
@@ -187,26 +191,9 @@ main(int argc, char **argv)
                                  blackholeNoticeProcessor,
                                  NULL);

-        /* Get the backend pid for lock wait checking. */
-        res = PQexec(conns[i], "SELECT pg_catalog.pg_backend_pid()");
-        if (PQresultStatus(res) == PGRES_TUPLES_OK)
-        {
-            if (PQntuples(res) == 1 && PQnfields(res) == 1)
-                backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0));
-            else
-            {
-                fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column",
-                        PQntuples(res), PQnfields(res));
-                exit(1);
-            }
-        }
-        else
-        {
-            fprintf(stderr, "backend pid query failed: %s",
-                    PQerrorMessage(conns[i]));
-            exit(1);
-        }
-        PQclear(res);
+        /* Save each connection's backend PID for subsequent use. */
+        backend_pids[i] = PQbackendPID(conns[i]);
+        backend_pid_strs[i] = psprintf("%d", backend_pids[i]);
     }

     /* Set the session index fields in steps. */
@@ -231,9 +218,9 @@ main(int argc, char **argv)
     appendPQExpBufferStr(&wait_query,
                          "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
     /* The spec syntax requires at least one session; assume that here. */
-    appendPQExpBufferStr(&wait_query, backend_pids[1]);
+    appendPQExpBufferStr(&wait_query, backend_pid_strs[1]);
     for (i = 2; i < nconns; i++)
-        appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]);
+        appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]);
     appendPQExpBufferStr(&wait_query, "}')");

     res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL);
@@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
                 oldstep = waiting[w];

                 /* Wait for previous step on this connection. */
-                try_complete_step(oldstep, STEP_RETRY);
+                try_complete_step(testspec, oldstep, STEP_RETRY);

                 /* Remove that step from the waiting[] array. */
                 if (w + 1 < nwaiting)
@@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
             nerrorstep = 0;
             while (w < nwaiting)
             {
-                if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
+                if (try_complete_step(testspec, waiting[w],
+                                      STEP_NONBLOCK | STEP_RETRY))
                 {
                     /* Still blocked on a lock, leave it alone. */
                     w++;
@@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
         }

         /* Try to complete this step without blocking.  */
-        mustwait = try_complete_step(step, STEP_NONBLOCK);
+        mustwait = try_complete_step(testspec, step, STEP_NONBLOCK);

         /* Check for completion of any steps that were previously waiting. */
         w = 0;
         nerrorstep = 0;
         while (w < nwaiting)
         {
-            if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
+            if (try_complete_step(testspec, waiting[w],
+                                  STEP_NONBLOCK | STEP_RETRY))
                 w++;
             else
             {
@@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
     /* Wait for any remaining queries. */
     for (w = 0; w < nwaiting; ++w)
     {
-        try_complete_step(waiting[w], STEP_RETRY);
+        try_complete_step(testspec, waiting[w], STEP_RETRY);
         report_error_message(waiting[w]);
     }

@@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
  * a lock, returns true.  Otherwise, returns false.
  */
 static bool
-try_complete_step(Step *step, int flags)
+try_complete_step(TestSpec *testspec, Step *step, int flags)
 {
     PGconn       *conn = conns[1 + step->session];
     fd_set        read_set;
@@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags)
     int            sock = PQsocket(conn);
     int            ret;
     PGresult   *res;
+    PGnotify   *notify;
     bool        canceled = false;

     if (sock < 0)
@@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags)
                 bool        waiting;

                 res = PQexecPrepared(conns[0], PREP_WAITING, 1,
-                                     &backend_pids[step->session + 1],
+                                     &backend_pid_strs[step->session + 1],
                                      NULL, NULL, 0);
                 if (PQresultStatus(res) != PGRES_TUPLES_OK ||
                     PQntuples(res) != 1)
@@ -858,6 +848,35 @@ try_complete_step(Step *step, int flags)
         PQclear(res);
     }

+    /* Report any available NOTIFY messages, too */
+    PQconsumeInput(conn);
+    while ((notify = PQnotifies(conn)) != NULL)
+    {
+        /* Try to identify which session it came from */
+        const char *sendername = NULL;
+        char        pidstring[32];
+
+        for (int i = 0; i < testspec->nsessions; i++)
+        {
+            if (notify->be_pid == backend_pids[i + 1])
+            {
+                sendername = testspec->sessions[i]->name;
+                break;
+            }
+        }
+        if (sendername == NULL)
+        {
+            /* Doesn't seem to be any test session, so show the hard way */
+            snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
+            sendername = pidstring;
+        }
+        printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
+               testspec->sessions[step->session]->name,
+               notify->relname, notify->extra, sendername);
+        PQfreemem(notify);
+        PQconsumeInput(conn);
+    }
+
     return false;
 }

diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 8adad42..498e357 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -1,14 +1,68 @@
-# Verify that pg_notification_queue_usage correctly reports a non-zero result,
-# after submitting notifications while another connection is listening for
-# those notifications and waiting inside an active transaction.
+# Tests for LISTEN/NOTIFY

-session "listener"
-step "listen"    { LISTEN a; }
-step "begin"    { BEGIN; }
-teardown        { ROLLBACK; UNLISTEN *; }
+# Most of these tests use only the "notifier" session and hence exercise only
+# self-notifies, which are convenient because they minimize timing concerns.
+# Note we assume that each step is delivered to the backend as a single Query
+# message so it will run as one transaction.

 session "notifier"
-step "check"    { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
-step "notify"    { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
+step "listenc"    { LISTEN c1; LISTEN c2; }
+step "notify1"    { NOTIFY c1; }
+step "notify2"    { NOTIFY c2, 'payload'; }
+step "notify3"    { NOTIFY c3, 'payload3'; }  # not listening to c3
+step "notifyd"    { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; }
+step "notifys1"    {
+    BEGIN;
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    SAVEPOINT s1;
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s';
+    NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2';
+    NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s';
+    RELEASE SAVEPOINT s1;
+    SAVEPOINT s2;
+    NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2';
+    NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s';
+    NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2';
+    NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s';
+    ROLLBACK TO SAVEPOINT s2;
+    COMMIT;
+}
+step "usage"    { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
+step "bignotify"    { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
+teardown        { UNLISTEN *; }
+
+# The listener session is used for cross-backend notify checks.
+
+session "listener"
+step "llisten"    { LISTEN c1; LISTEN c2; }
+step "lcheck"    { SELECT 1 AS x; }
+step "lbegin"    { BEGIN; }
+teardown        { UNLISTEN *; }
+
+
+# Trivial cases.
+permutation "listenc" "notify1" "notify2" "notify3"
+
+# Check simple and less-simple deduplication.
+permutation "listenc" "notifyd" "notifys1"
+
+# Cross-backend notification delivery.  We use a "select 1" to force the
+# listener session to check for notifies.  In principle we could just wait
+# for delivery, but that would require extra support in isolationtester
+# and might have portability-of-timing issues.
+permutation "llisten" "notify1" "notify2" "notify3" "lcheck"
+
+# Again, with local delivery too.
+permutation "listenc" "llisten" "notify1" "notify2" "notify3" "lcheck"
+
+# Verify that pg_notification_queue_usage correctly reports a non-zero result,
+# after submitting notifications while another connection is listening for
+# those notifications and waiting inside an active transaction.  We have to
+# fill a page of the notify SLRU to make this happen, which is a good deal
+# of traffic.  To not bloat the expected output, we intentionally don't
+# commit the listener's transaction, so that it never reports these events.
+# Hence, this should be the last test in this script.

-permutation "listen" "begin" "check" "notify" "check"
+permutation "llisten" "lbegin" "usage" "bignotify" "usage"

pgsql-hackers by date:

Previous
From: Tomas Vondra
Date:
Subject: Re: idea: log_statement_sample_rate - bottom limit for sampling
Next
From: Andres Freund
Date:
Subject: Re: tap tests driving the database via psql