Re: Testing LISTEN/NOTIFY more effectively - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: Testing LISTEN/NOTIFY more effectively |
Date | |
Msg-id | 29162.1564266052@sss.pgh.pa.us Whole thread Raw |
In response to | Re: Testing LISTEN/NOTIFY more effectively (Tom Lane <tgl@sss.pgh.pa.us>) |
Responses |
Re: Testing LISTEN/NOTIFY more effectively
|
List | pgsql-hackers |
I wrote: > Andres Freund <andres@anarazel.de> writes: >> Perhaps we could just have isolationtester check to which >> isolationtester session the backend pid belongs? And then print the >> session name instead of the pid? That should be fairly easy, and would >> probably give us all we need? > Oh, that's a good idea -- it's already tracking all the backend PIDs, > so probably not much extra work to do it like that. I found out that to avoid confusion, one really wants the message to identify both the sending and receiving sessions. Here's a patch that does it that way and extends the async-notify.spec test to perform basic end-to-end checks on LISTEN/NOTIFY. I intentionally made the test show the lack of NOTIFY de-deduplication that currently happens with subtransactions. If we change this as I proposed in <17822.1564186806@sss.pgh.pa.us>, this test output will change. regards, tom lane diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 92d281a..3dc1227 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,17 +1,82 @@ Parsed test spec with 2 sessions -starting permutation: listen begin check notify check -step listen: LISTEN a; -step begin: BEGIN; -step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +starting permutation: listenc notify1 notify2 notify3 +step listenc: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +notifier: NOTIFY "c1" with payload "" from notifier +step notify2: NOTIFY c2, 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +step notify3: NOTIFY c3, 'payload3'; + +starting permutation: listenc notifyd notifys1 +step listenc: LISTEN c1; LISTEN c2; +step notifyd: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +notifier: NOTIFY "c1" with payload "" from notifier +step notifys1: + BEGIN; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + SAVEPOINT s1; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s'; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s'; + RELEASE SAVEPOINT s1; + SAVEPOINT s2; + NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2'; + NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s'; + NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2'; + NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s'; + ROLLBACK TO SAVEPOINT s2; + COMMIT; + +notifier: NOTIFY "c1" with payload "payload1" from notifier +notifier: NOTIFY "c2" with payload "payload2" from notifier +notifier: NOTIFY "c1" with payload "payload1" from notifier +notifier: NOTIFY "c2" with payload "payload2" from notifier +notifier: NOTIFY "c1" with payload "payload1s" from notifier +notifier: NOTIFY "c2" with payload "payload2s" from notifier + +starting permutation: llisten notify1 notify2 notify3 lcheck +step llisten: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +step notify2: NOTIFY c2, 'payload'; +step notify3: NOTIFY c3, 'payload3'; +step lcheck: SELECT 1 AS x; +x + +1 +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c2" with payload "payload" from notifier + +starting permutation: listenc llisten notify1 notify2 notify3 lcheck +step listenc: LISTEN c1; LISTEN c2; +step llisten: LISTEN c1; LISTEN c2; +step notify1: NOTIFY c1; +notifier: NOTIFY "c1" with payload "" from notifier +step notify2: NOTIFY c2, 'payload'; +notifier: NOTIFY "c2" with payload "payload" from notifier +step notify3: NOTIFY c3, 'payload3'; +step lcheck: SELECT 1 AS x; +x + +1 +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c2" with payload "payload" from notifier + +starting permutation: llisten lbegin usage bignotify usage +step llisten: LISTEN c1; LISTEN c2; +step lbegin: BEGIN; +step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero; nonzero f -step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; +step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; count 1000 -step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero; nonzero t diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c index 6ab19b1..98e5bf2 100644 --- a/src/test/isolation/isolationtester.c +++ b/src/test/isolation/isolationtester.c @@ -23,10 +23,12 @@ /* * conns[0] is the global setup, teardown, and watchdog connection. Additional - * connections represent spec-defined sessions. + * connections represent spec-defined sessions. We also track the backend + * PID, in numeric and string formats, for each connection. */ static PGconn **conns = NULL; -static const char **backend_pids = NULL; +static int *backend_pids = NULL; +static const char **backend_pid_strs = NULL; static int nconns = 0; /* In dry run only output permutations to be run by the tester. */ @@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps); #define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */ #define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */ -static bool try_complete_step(Step *step, int flags); +static bool try_complete_step(TestSpec *testspec, Step *step, int flags); static int step_qsort_cmp(const void *a, const void *b); static int step_bsearch_cmp(const void *a, const void *b); @@ -159,9 +161,11 @@ main(int argc, char **argv) * extra for lock wait detection and global work. */ nconns = 1 + testspec->nsessions; - conns = calloc(nconns, sizeof(PGconn *)); + conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *)); + backend_pids = pg_malloc0(nconns * sizeof(*backend_pids)); + backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs)); atexit(disconnect_atexit); - backend_pids = calloc(nconns, sizeof(*backend_pids)); + for (i = 0; i < nconns; i++) { conns[i] = PQconnectdb(conninfo); @@ -187,26 +191,9 @@ main(int argc, char **argv) blackholeNoticeProcessor, NULL); - /* Get the backend pid for lock wait checking. */ - res = PQexec(conns[i], "SELECT pg_catalog.pg_backend_pid()"); - if (PQresultStatus(res) == PGRES_TUPLES_OK) - { - if (PQntuples(res) == 1 && PQnfields(res) == 1) - backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0)); - else - { - fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column", - PQntuples(res), PQnfields(res)); - exit(1); - } - } - else - { - fprintf(stderr, "backend pid query failed: %s", - PQerrorMessage(conns[i])); - exit(1); - } - PQclear(res); + /* Save each connection's backend PID for subsequent use. */ + backend_pids[i] = PQbackendPID(conns[i]); + backend_pid_strs[i] = psprintf("%d", backend_pids[i]); } /* Set the session index fields in steps. */ @@ -231,9 +218,9 @@ main(int argc, char **argv) appendPQExpBufferStr(&wait_query, "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{"); /* The spec syntax requires at least one session; assume that here. */ - appendPQExpBufferStr(&wait_query, backend_pids[1]); + appendPQExpBufferStr(&wait_query, backend_pid_strs[1]); for (i = 2; i < nconns; i++) - appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]); + appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]); appendPQExpBufferStr(&wait_query, "}')"); res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL); @@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) oldstep = waiting[w]; /* Wait for previous step on this connection. */ - try_complete_step(oldstep, STEP_RETRY); + try_complete_step(testspec, oldstep, STEP_RETRY); /* Remove that step from the waiting[] array. */ if (w + 1 < nwaiting) @@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) nerrorstep = 0; while (w < nwaiting) { - if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) + if (try_complete_step(testspec, waiting[w], + STEP_NONBLOCK | STEP_RETRY)) { /* Still blocked on a lock, leave it alone. */ w++; @@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) } /* Try to complete this step without blocking. */ - mustwait = try_complete_step(step, STEP_NONBLOCK); + mustwait = try_complete_step(testspec, step, STEP_NONBLOCK); /* Check for completion of any steps that were previously waiting. */ w = 0; nerrorstep = 0; while (w < nwaiting) { - if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY)) + if (try_complete_step(testspec, waiting[w], + STEP_NONBLOCK | STEP_RETRY)) w++; else { @@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) /* Wait for any remaining queries. */ for (w = 0; w < nwaiting; ++w) { - try_complete_step(waiting[w], STEP_RETRY); + try_complete_step(testspec, waiting[w], STEP_RETRY); report_error_message(waiting[w]); } @@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) * a lock, returns true. Otherwise, returns false. */ static bool -try_complete_step(Step *step, int flags) +try_complete_step(TestSpec *testspec, Step *step, int flags) { PGconn *conn = conns[1 + step->session]; fd_set read_set; @@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags) int sock = PQsocket(conn); int ret; PGresult *res; + PGnotify *notify; bool canceled = false; if (sock < 0) @@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags) bool waiting; res = PQexecPrepared(conns[0], PREP_WAITING, 1, - &backend_pids[step->session + 1], + &backend_pid_strs[step->session + 1], NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1) @@ -858,6 +848,35 @@ try_complete_step(Step *step, int flags) PQclear(res); } + /* Report any available NOTIFY messages, too */ + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) + { + /* Try to identify which session it came from */ + const char *sendername = NULL; + char pidstring[32]; + + for (int i = 0; i < testspec->nsessions; i++) + { + if (notify->be_pid == backend_pids[i + 1]) + { + sendername = testspec->sessions[i]->name; + break; + } + } + if (sendername == NULL) + { + /* Doesn't seem to be any test session, so show the hard way */ + snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid); + sendername = pidstring; + } + printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n", + testspec->sessions[step->session]->name, + notify->relname, notify->extra, sendername); + PQfreemem(notify); + PQconsumeInput(conn); + } + return false; } diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 8adad42..498e357 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -1,14 +1,68 @@ -# Verify that pg_notification_queue_usage correctly reports a non-zero result, -# after submitting notifications while another connection is listening for -# those notifications and waiting inside an active transaction. +# Tests for LISTEN/NOTIFY -session "listener" -step "listen" { LISTEN a; } -step "begin" { BEGIN; } -teardown { ROLLBACK; UNLISTEN *; } +# Most of these tests use only the "notifier" session and hence exercise only +# self-notifies, which are convenient because they minimize timing concerns. +# Note we assume that each step is delivered to the backend as a single Query +# message so it will run as one transaction. session "notifier" -step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } -step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; } +step "listenc" { LISTEN c1; LISTEN c2; } +step "notify1" { NOTIFY c1; } +step "notify2" { NOTIFY c2, 'payload'; } +step "notify3" { NOTIFY c3, 'payload3'; } # not listening to c3 +step "notifyd" { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; } +step "notifys1" { + BEGIN; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + SAVEPOINT s1; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s'; + NOTIFY c1, 'payload1'; NOTIFY "c2", 'payload2'; + NOTIFY c1, 'payload1s'; NOTIFY "c2", 'payload2s'; + RELEASE SAVEPOINT s1; + SAVEPOINT s2; + NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2'; + NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s'; + NOTIFY c1, 'rpayload1'; NOTIFY "c2", 'rpayload2'; + NOTIFY c1, 'rpayload1s'; NOTIFY "c2", 'rpayload2s'; + ROLLBACK TO SAVEPOINT s2; + COMMIT; +} +step "usage" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } +step "bignotify" { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } +teardown { UNLISTEN *; } + +# The listener session is used for cross-backend notify checks. + +session "listener" +step "llisten" { LISTEN c1; LISTEN c2; } +step "lcheck" { SELECT 1 AS x; } +step "lbegin" { BEGIN; } +teardown { UNLISTEN *; } + + +# Trivial cases. +permutation "listenc" "notify1" "notify2" "notify3" + +# Check simple and less-simple deduplication. +permutation "listenc" "notifyd" "notifys1" + +# Cross-backend notification delivery. We use a "select 1" to force the +# listener session to check for notifies. In principle we could just wait +# for delivery, but that would require extra support in isolationtester +# and might have portability-of-timing issues. +permutation "llisten" "notify1" "notify2" "notify3" "lcheck" + +# Again, with local delivery too. +permutation "listenc" "llisten" "notify1" "notify2" "notify3" "lcheck" + +# Verify that pg_notification_queue_usage correctly reports a non-zero result, +# after submitting notifications while another connection is listening for +# those notifications and waiting inside an active transaction. We have to +# fill a page of the notify SLRU to make this happen, which is a good deal +# of traffic. To not bloat the expected output, we intentionally don't +# commit the listener's transaction, so that it never reports these events. +# Hence, this should be the last test in this script. -permutation "listen" "begin" "check" "notify" "check" +permutation "llisten" "lbegin" "usage" "bignotify" "usage"
pgsql-hackers by date: