Thread: async notification patch for dblink

async notification patch for dblink

From
"Marcus Kempe"
Date:
This patch adds the ability to retrieve async notifications using dblink,  via the addition of the function dblink_get_notify.

It is written against cvs head, includes documentation and regression testing. It compiles, tests and works well.

I would be interested in some feedback on the regression test.
My initial thought was to test the function as thoroughly as possible. So I perform listen and notify commands within the test to be able to test all aspects of the code. Even though this works well for me, I get the feeling that this is not the correct way to do it. I can find no other testing of the listen/notify functionality in the regression tests, and I imagine this is for good reason.
If someone would care to explain this, and maybe give a hint about what amount of testing is appropriate for this fairly trivial patch, it would be appreciated.

Best regards,

Marcus Kempe
Attachment

Re: async notification patch for dblink

From
Bruce Momjian
Date:
What is the status on this?

---------------------------------------------------------------------------

Marcus Kempe wrote:
> This patch adds the ability to retrieve async notifications using dblink,
> via the addition of the function dblink_get_notify.
> 
> It is written against cvs head, includes documentation and regression
> testing. It compiles, tests and works well.
> 
> I would be interested in some feedback on the regression test.
> My initial thought was to test the function as thoroughly as possible. So I
> perform listen and notify commands within the test to be able to test all
> aspects of the code. Even though this works well for me, I get the feeling
> that this is not the correct way to do it. I can find no other testing of
> the listen/notify functionality in the regression tests, and I imagine this
> is for good reason.
> If someone would care to explain this, and maybe give a hint about what
> amount of testing is appropriate for this fairly trivial patch, it would be
> appreciated.
> 
> Best regards,
> 
> Marcus Kempe

[ Attachment, skipping... ]

> 
> -- 
> Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers

--  Bruce Momjian  <bruce@momjian.us>        http://momjian.us EnterpriseDB
http://enterprisedb.com
 + If your life is a hard drive, Christ can be your backup. +


Re: async notification patch for dblink

From
"Marcus Kempe"
Date:
Hi,

status of the patch is that it's working fine / as expected.

As is the regression test, my only concern there is if it's testing the functionality thoroughly enough. But at it's current state I suppose it's in line with the rest of the regression tests for dblink functionality.

Also, please advice if I'm expected to take any additional actions as part of submitting this patch.

Best regards,

Marcus Kempe

On Wed, Jan 14, 2009 at 23:12, Bruce Momjian <bruce@momjian.us> wrote:

What is the status on this?

---------------------------------------------------------------------------

Marcus Kempe wrote:
> This patch adds the ability to retrieve async notifications using dblink,
> via the addition of the function dblink_get_notify.
>
> It is written against cvs head, includes documentation and regression
> testing. It compiles, tests and works well.
>
> I would be interested in some feedback on the regression test.
> My initial thought was to test the function as thoroughly as possible. So I
> perform listen and notify commands within the test to be able to test all
> aspects of the code. Even though this works well for me, I get the feeling
> that this is not the correct way to do it. I can find no other testing of
> the listen/notify functionality in the regression tests, and I imagine this
> is for good reason.
> If someone would care to explain this, and maybe give a hint about what
> amount of testing is appropriate for this fairly trivial patch, it would be
> appreciated.
>
> Best regards,
>
> Marcus Kempe

[ Attachment, skipping... ]

>
> --
> Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers

--
 Bruce Momjian  <bruce@momjian.us>        http://momjian.us
 EnterpriseDB                             http://enterprisedb.com

 + If your life is a hard drive, Christ can be your backup. +

Re: async notification patch for dblink

From
Joe Conway
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

In reference to:
http://archives.postgresql.org/message-id/6534f7ae0811181547v1dc1f096g6222e8273b461606@mail.gmail.com

Had to fix a lot of bit rot, but otherwise looks good. My updated patch
attached. Will commit in a day or so if no objections.

BTW, some commitfest procedural comments/questions:

1) I couldn't figure out how to attach my patch to the commitfest page
short of cut-n-paste into the small comment text box -- is that my only
choice?

2) It would be nice if the mail archive web page of the original message
had a "Reply To" button. Otherwise I guess I can do what I've done above.

3) I couldn't see any way to assign myself as the committer.

Joe
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org

iQIcBAEBCAAGBQJKd5K8AAoJEDfy90M199hlDbIQAIYeLR1B3JKkXvoBCZUkzVNj
hkj67d38ygU/Q4RDsnPT0N96PbPvzBJGCueaCLfwzbdpEQb4Si37R8xBMXPzT/df
SEoWrx/2r1/cERyImxHVGlj0kBWBa7K42hcEotD0mNqWqX6rqByLDpVTSlpZZdfM
a/b04iXfPcIvOLSpX6PJOb047SeHQrzOmcnurBqc1HE81XiiBNQlJjOd0Zi+y+pT
zGJChGSRO1lXriOI1Pu2K18daqY8vLLZA0LlaZ3SD6UeS7Uayeo+8cOpAk6N1b7H
EKqYAwWfmRrSO6bUCmgkqlHa/TiaIL0uJb0me68YcUy08DAt1O8iRmUnAE+cxXU5
HeZgGqJI2G19Ts5i9R2YaHVe8kTmPD88zztv8giiAw01m9h6azkiM342CzNuyQTw
8TbZnjWdCkb4KuH9en4C/puIWUpCOd2OkVju3ZUJCvzaO/jS/HVf2fc08K9TddPK
OWpsytXCNS6ojM/Em5lZzfsZZ9sDcrP29dSHZEJqOIkMhFoEGqq4DVqtJnLMHsLw
3PtSfgz0SbXcuAA0vv/VJSDm+cAl+aJN93hbwNybwrT14XEotllFdhyJrSPqFgYm
4b7P29dFKP+aF90C6klxr5Mq/nRYiJVdTepqjfyikVBsdZoMwQXvG+ROiwbzjl9g
eEmht8ysuxn2Ju8FcDYc
=AJfk
-----END PGP SIGNATURE-----
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.82
diff -c -r1.82 dblink.c
*** contrib/dblink/dblink.c    11 Jun 2009 14:48:50 -0000    1.82
--- contrib/dblink/dblink.c    4 Aug 2009 01:05:49 -0000
***************
*** 1635,1640 ****
--- 1635,1681 ----
      PG_RETURN_DATUM(current_query(fcinfo));
  }

+ /*
+  * Retrieve async notifications for a connection.
+  *
+  * Returns an array of notifications, or NULL if none recieved.
+  * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_get_notify);
+ Datum
+ dblink_get_notify(PG_FUNCTION_ARGS)
+ {
+     PGconn        *conn = NULL;
+     remoteConn     *rconn = NULL;
+     ArrayBuildState    *astate = NULL;
+     PGnotify    *notify;
+
+     DBLINK_INIT;
+     if (PG_NARGS() == 1)
+         DBLINK_GET_NAMED_CONN;
+     else
+         conn = pconn->conn;
+
+     PQconsumeInput(conn);
+
+     while ((notify = PQnotifies(conn)) != NULL)
+     {
+         /* stash away current value */
+         astate = accumArrayResult(astate,
+                             PointerGetDatum(cstring_to_text(notify->relname)),
+                             false, TEXTOID, CurrentMemoryContext);
+         PQfreemem(notify);
+         PQconsumeInput(conn);
+     }
+
+     if (astate)
+         PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+                                 CurrentMemoryContext));
+     else
+         PG_RETURN_NULL();
+ }
+
  /*************************************************************
   * internal functions
   */
Index: contrib/dblink/dblink.h
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.h,v
retrieving revision 1.22
diff -c -r1.22 dblink.h
*** contrib/dblink/dblink.h    9 Jun 2009 17:41:02 -0000    1.22
--- contrib/dblink/dblink.h    4 Aug 2009 00:56:56 -0000
***************
*** 57,61 ****
--- 57,62 ----
  extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
  extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
  extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_notify(PG_FUNCTION_ARGS);

  #endif   /* DBLINK_H */
Index: contrib/dblink/dblink.sql.in
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.sql.in,v
retrieving revision 1.18
diff -c -r1.18 dblink.sql.in
*** contrib/dblink/dblink.sql.in    9 Jun 2009 17:41:02 -0000    1.18
--- contrib/dblink/dblink.sql.in    4 Aug 2009 00:58:52 -0000
***************
*** 202,204 ****
--- 202,214 ----
  RETURNS text
  AS 'MODULE_PATHNAME', 'dblink_error_message'
  LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify()
+ RETURNS text[]
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(conname text)
+ RETURNS text[]
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
Index: contrib/dblink/uninstall_dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/uninstall_dblink.sql,v
retrieving revision 1.7
diff -c -r1.7 uninstall_dblink.sql
*** contrib/dblink/uninstall_dblink.sql    5 Apr 2008 02:26:14 -0000    1.7
--- contrib/dblink/uninstall_dblink.sql    4 Aug 2009 00:56:56 -0000
***************
*** 76,78 ****
--- 76,82 ----
  DROP FUNCTION dblink_is_busy(text);

  DROP FUNCTION dblink_send_query(text, text);
+
+ DROP FUNCTION dblink_get_notify();
+
+ DROP FUNCTION dblink_get_notify(text);
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.25
diff -c -r1.25 dblink.out
*** contrib/dblink/expected/dblink.out    6 Jun 2009 21:27:56 -0000    1.25
--- contrib/dblink/expected/dblink.out    4 Aug 2009 01:20:09 -0000
***************
*** 827,829 ****
--- 827,864 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ NOTIFY regression;
+ --should return {regression}
+ SELECT dblink_get_notify();
+  dblink_get_notify
+ -------------------
+  {regression}
+ (1 row)
+
+ --should return null
+ SELECT dblink_get_notify();
+  dblink_get_notify
+ -------------------
+
+ (1 row)
+
+ SELECT dblink_disconnect();
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.21
diff -c -r1.21 dblink.sql
*** contrib/dblink/sql/dblink.sql    6 Jun 2009 21:27:56 -0000    1.21
--- contrib/dblink/sql/dblink.sql    4 Aug 2009 01:19:48 -0000
***************
*** 389,391 ****
--- 389,407 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+
+ NOTIFY regression;
+
+ --should return {regression}
+ SELECT dblink_get_notify();
+
+ --should return null
+ SELECT dblink_get_notify();
+
+ SELECT dblink_disconnect();
Index: doc/src/sgml/dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.8
diff -c -r1.8 dblink.sgml
*** doc/src/sgml/dblink.sgml    18 Jun 2009 14:34:36 -0000    1.8
--- doc/src/sgml/dblink.sgml    4 Aug 2009 00:52:09 -0000
***************
*** 1260,1265 ****
--- 1260,1339 ----
    </refsect1>
   </refentry>

+  <refentry id="CONTRIB-DBLINK-GET-NOTIFY">
+   <refnamediv>
+    <refname>dblink_get_notify</refname>
+    <refpurpose>retrieve async notifications on a connection</refpurpose>
+   </refnamediv>
+
+   <refsynopsisdiv>
+    <synopsis>
+     dblink_get_notify() returns text[]
+     dblink_get_notify(text connname) returns text[]
+    </synopsis>
+   </refsynopsisdiv>
+
+   <refsect1>
+    <title>Description</title>
+
+    <para>
+     <function>dblink_get_notify</> retrieves notifications on either
+     the unnamed connection, or on a named connection if specified.
+     To receive notifications via dblink, <function>LISTEN</> must
+     first be issued, using <function>dblink_exec</>.
+     For details see <xref linkend="sql-listen"> and <xref linkend="sql-notify">.
+    </para>
+
+   </refsect1>
+
+   <refsect1>
+    <title>Arguments</title>
+
+    <variablelist>
+     <varlistentry>
+      <term><parameter>conname</parameter></term>
+      <listitem>
+       <para>
+        The name of a named connection to get notifications on.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </refsect1>
+
+   <refsect1>
+    <title>Return Value</title>
+     <para>Returns a text array of notification names, or NULL if none.</para>
+   </refsect1>
+
+   <refsect1>
+    <title>Example</title>
+
+    <programlisting>
+ test=# SELECT dblink_exec('LISTEN virtual');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ test=# SELECT dblink_get_notify();
+  dblink_get_notify
+ -------------------
+
+ (1 row)
+
+ test=# NOTIFY virtual;
+ NOTIFY
+
+ test=# SELECT dblink_get_notify();
+  dblink_get_notify
+ -------------------
+  {virtual}
+ (1 row)
+    </programlisting>
+   </refsect1>
+  </refentry>
+
   <refentry id="CONTRIB-DBLINK-GET-RESULT">
    <refmeta>
     <refentrytitle>dblink_get_result</refentrytitle>

Re: async notification patch for dblink

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> BTW, some commitfest procedural comments/questions:

> 1) I couldn't figure out how to attach my patch to the commitfest page
> short of cut-n-paste into the small comment text box -- is that my only
> choice?

No, what you should do is first send the patch to -hackers, then put the
message-ID of that email into the place provided.  The comment box isn't
really meant for much more than a one-line summary of the message being
linked to.

> 3) I couldn't see any way to assign myself as the committer.

Yeah, the webapp doesn't explicitly record the committer for a patch.
What I've been doing is adding a comment saying that I'm taking a patch
to commit.  A separate field would probably be better though.
        regards, tom lane


Re: async notification patch for dblink

From
Alvaro Herrera
Date:
Joe Conway escribió:

> 2) It would be nice if the mail archive web page of the original message
> had a "Reply To" button. Otherwise I guess I can do what I've done above.

I totally agree, but this is not workable given our current software.
I've been giving some time to reworking the email archives using
something completely different to allow this kind of trick, but this is
to be considered longish-term.

-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.


Re: async notification patch for dblink

From
Joe Conway
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

Alvaro Herrera wrote:
> Joe Conway escribió:
> 
>> 2) It would be nice if the mail archive web page of the original message
>> had a "Reply To" button. Otherwise I guess I can do what I've done above.
> 
> I totally agree, but this is not workable given our current software.
> I've been giving some time to reworking the email archives using
> something completely different to allow this kind of trick, but this is
> to be considered longish-term.

OK, good to know it isn't just me ;-)

Thanks,

Joe

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org

iQIcBAEBCAAGBQJKd5zzAAoJEDfy90M199hlCuoP/0ZDDBSjShDeZXf5ReMEl/cw
7W5N0J78HJoM9Jrq587i/iqc//hU35V9Jq2GxHXbwYo+ByWd86KCwmhoY7fEDMTZ
bQKd26ZPmTdxME/JXF+R/6XWa5cDhzOyaMNsaJJkSrT1CvJTk6H10R5JF8ibrz2H
gcAPFYprd7IYGzXamq8cwOnsu3pTPBXDu6Z7HxSd3d1AoZfb3YwPsN+1Y18U5ffj
XCy/MbL8EmO/Vvs3cFTG2AP04tU6w63w1zENN0FHaWc6zyDQBbgrUhg9dudnIrNz
wlFigvECwPFST0pi6pNJie+0gOkgwTZq3ePWUb1J3qIVycCezS7dWNFgLRO2t8XH
zrlnYF2V9QouMfwDU+oYdS95mUNcFIKq65nHLi6skswNPh+04iX54TX85XaR9wge
MDn0vCtWV1gqC8SFrkkerNRo1pGhxIuAzjxlRrIaRkPzjcQLh/DAtYPyHNnv7LFp
hETT7AL0MUmvXlL+eHAQ+cotV98sF/Jw6Lb94e8wUABcgxoyknbUQxUCn0k//Wrz
irBRKLPLDS28EdZayV1VzyeNoNLtDH5X6vjp2Y6yqLkn+hQQNpjEPhUIFh+aaDLv
4TXe53oTnHOkle1PbhPLjE+MbRLTBxeHODNia5t3yY4mNHbiGvwS0ASPOzs5MPNX
wyfwBeLCpxSAgZsYU7SS
=Wy56
-----END PGP SIGNATURE-----


Re: async notification patch for dblink

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> In reference to:
> http://archives.postgresql.org/message-id/6534f7ae0811181547v1dc1f096g6222e8273b461606@mail.gmail.com

> Had to fix a lot of bit rot, but otherwise looks good. My updated patch
> attached. Will commit in a day or so if no objections.

After a quick look-over, here are some comments on the patch itself:

1. Sooner or later, hopefully sooner, we will have "payload" strings in
notifications, not just the "relname" string.  libpq and the protocol
already have support for that (see the "extra" field in PGnotify).
It would be a shame to have this function's API become obsolete
immediately.  Can we fix the API to return the "extra" string too?

2. By the same line of reasoning, the be_pid field might be useful.
Back when I was doing a lot of LISTEN/NOTIY coding, you really needed
that field in order to distinguish your own notifies coming back from
other sessions' notifies.  I don't think we've done anything to
eliminate the need for that.

3. I see the function returns NULL when there's nothing to report,
but maybe an empty array would be more sensible.

[ thinks for awhile... ]  Actually, it seems to me that the present
patch's definition of the function would be very hard to work with.
You would normally want to work with the events one at a time.
There isn't much you could do with the array result except unnest() it,
and I'm a bit worried that careless usage could result in multiple
evaluation of the function and hence loss of events.  I wonder whether
it would be better to have the function return setof record.  Which, not
incidentally, would greatly simplify adding in those extra result
fields.
        regards, tom lane


Re: async notification patch for dblink

From
Joe Conway
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

Tom Lane wrote:
> [ thinks for awhile... ]  Actually, it seems to me that the present
> patch's definition of the function would be very hard to work with.
> You would normally want to work with the events one at a time.
> There isn't much you could do with the array result except unnest() it,
> and I'm a bit worried that careless usage could result in multiple
> evaluation of the function and hence loss of events.  I wonder whether
> it would be better to have the function return setof record.  Which, not
> incidentally, would greatly simplify adding in those extra result
> fields.

Sure that makes sense. I'll take a stab at it.

Joe
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org

iQIcBAEBCAAGBQJKd54DAAoJEDfy90M199hleEoP/i9JAI3c+5LMrz90ntAoEthf
PzC6+QpjKIEeU5vF/NFZl8r7yWiCqlw/015clGQ4bzZfoCVwKiabGl9ziH8G1xzz
ruuM/dr3H8gZfl69LdN65t+t0P1QLSLeNeOQbtLjm0n9439lCt8r8q3joxw2sKIK
HHN6BBpyINHrlgkfw0CjRYLm0kEFW2Jj76AQyOs0V4HYtK4d+Zmr0Ut3q9aTHg7h
1MYtkPHvzq0ploKmMtx7+zpqEI3JPzyWA2hxeCZJfHHM5Y7j2eadDeN+CqWaDjs5
2T1HrtXIgtzeQWBV7J8q3rGTFc3YXTzv0mCYveHULUByl/vIx6Lind6ErbPd7gig
sTUdTo77JK7J4oV4PAZfJDRMIjUiKZGHoOPMeCIIXPyuCIYCFv/YTR3lFlEllwws
3ocY/0BZzNtUiCvH7CLD0BiSNF2sSfG6bC1I9FbHwoezlPCLKInjUoRyknGSnHAV
i2W5IIdmHiwxR5sSy+zNAUASFaK2shcvn2SX0hLbPAsDAMnPa1nYVNuqoojb1HWG
uvYXtRHoBrrtQkXl2F8NzSBmiaxfG02YY5Y2o6zkv8C/UG5+eaUIbF7SKcZMmHwJ
Ar/Zdz/eY0c/Fcy3ttfHc4C03E6qn1aDKHD+sXgDMDHfbGafDGfGhntW92ipngP1
CXbtfEYLgWcO4eGHusSB
=/Q5q
-----END PGP SIGNATURE-----


Re: async notification patch for dblink

From
Andrew Dunstan
Date:

Tom Lane wrote:
>> 3) I couldn't see any way to assign myself as the committer.
>>     
>
> Yeah, the webapp doesn't explicitly record the committer for a patch.
> What I've been doing is adding a comment saying that I'm taking a patch
> to commit.  A separate field would probably be better though.
>
>             
>   

+1. I raised this before, iirc.

cheers

andrew


Re: async notification patch for dblink

From
Robert Haas
Date:
On Mon, Aug 3, 2009 at 10:44 PM, Andrew Dunstan<andrew@dunslane.net> wrote:
> Tom Lane wrote:
>>>
>>> 3) I couldn't see any way to assign myself as the committer.
>>>
>>
>> Yeah, the webapp doesn't explicitly record the committer for a patch.
>> What I've been doing is adding a comment saying that I'm taking a patch
>> to commit.  A separate field would probably be better though.
>>
>
> +1. I raised this before, iirc.

Yes, I just haven't had enough round tuits yet.

...Robert


Re: async notification patch for dblink

From
Joe Conway
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

Tom Lane wrote:
> [ thinks for awhile... ]  Actually, it seems to me that the present
> patch's definition of the function would be very hard to work with.
> You would normally want to work with the events one at a time.
> There isn't much you could do with the array result except unnest() it,
> and I'm a bit worried that careless usage could result in multiple
> evaluation of the function and hence loss of events.  I wonder whether
> it would be better to have the function return setof record.  Which, not
> incidentally, would greatly simplify adding in those extra result
> fields.

OK, how's this look?

Joe
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org

iQIcBAEBCAAGBQJKd7diAAoJEDfy90M199hlhlUP/0nsiVPY7wCRdGGs7LsTmnQR
o4Sd9f7R4XlZdakZLKHPf61Qxe33/Af9OdosLToBjssdDW4rrER1rql8+MwddKld
+H/5VZkMRTA91BLbt8kgSZzBj3sGtGpi4zCTgYrFNfTpvCNWK/YLb5rlmbyoCbST
AlnIr/MvcFGNj/JAzQFcoA+YHjEinMnnOA/VS03hbbzBUj2F3Q2uIhsx+YxxZpEQ
jeW54YMOolpsnmQBGIY/NKbU379zWdKtscgKiDO+OLM5OkowaKPbeAZTUBx4+OGR
juOfHH7A5bLZ9APPO/N1yLNHPOLr49DrsYKdkY0Ho97NBEFZhZSqKZQgUemMB+5Z
PNjxFrC2y6HTRabeV+yKQOM/jL8ZmSiSMOwrsdmomAjLNSYi2r2o+XTTDQbNwMqQ
MqHHXRNslooJft2iNWp8iF1L/wX5URroTP+7aZbdTqUqNp/ITJu4BFZTjajZP2zQ
waAAEIVz740yVL3V8mOWyHnHgH1vQEIZ7zMqd4ss0Nn+V9Yltby2hG2Cy9/MRMxt
AkzmE2H+f794mIiyp/jydLiqqgxzbSVOv3m2cx76srSkKY7/C/wZGm2DqjbXqzb/
Dcfs3TgOYozUD02lcnrC4tncdexru/sr6iiAsprslQtzsFxWlHqYqqIh1LfwThba
SJnCszfPpDHx98RA/9b8
=7koq
-----END PGP SIGNATURE-----
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.82
diff -c -r1.82 dblink.c
*** contrib/dblink/dblink.c    11 Jun 2009 14:48:50 -0000    1.82
--- contrib/dblink/dblink.c    4 Aug 2009 03:48:33 -0000
***************
*** 1635,1640 ****
--- 1635,1723 ----
      PG_RETURN_DATUM(current_query(fcinfo));
  }

+ /*
+  * Retrieve async notifications for a connection.
+  *
+  * Returns an setof record of notifications, or an empty set if none recieved.
+  * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
+  *
+  */
+ #define DBLINK_NOTIFY_COLS        3
+
+ PG_FUNCTION_INFO_V1(dblink_get_notify);
+ Datum
+ dblink_get_notify(PG_FUNCTION_ARGS)
+ {
+     PGconn               *conn = NULL;
+     remoteConn           *rconn = NULL;
+     PGnotify           *notify;
+     ReturnSetInfo       *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+     TupleDesc            tupdesc;
+     Tuplestorestate       *tupstore;
+     MemoryContext        per_query_ctx;
+     MemoryContext        oldcontext;
+
+     DBLINK_INIT;
+     if (PG_NARGS() == 1)
+         DBLINK_GET_NAMED_CONN;
+     else
+         conn = pconn->conn;
+
+     /* create the tuplestore */
+     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+     oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+     tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
+                        TEXTOID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
+                        INT4OID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
+                        TEXTOID, -1, 0);
+
+     tupstore = tuplestore_begin_heap(true, false, work_mem);
+     rsinfo->returnMode = SFRM_Materialize;
+     rsinfo->setResult = tupstore;
+     rsinfo->setDesc = tupdesc;
+
+     MemoryContextSwitchTo(oldcontext);
+
+     PQconsumeInput(conn);
+     while ((notify = PQnotifies(conn)) != NULL)
+     {
+         Datum        values[DBLINK_NOTIFY_COLS];
+         bool        nulls[DBLINK_NOTIFY_COLS];
+
+         memset(values, 0, sizeof(values));
+         memset(nulls, 0, sizeof(nulls));
+
+         if (notify->relname != NULL)
+             values[0] = CStringGetTextDatum(notify->relname);
+         else
+             nulls[0] = true;
+
+         values[1] = Int32GetDatum(notify->be_pid);
+
+         if (notify->extra != NULL)
+             values[2] = CStringGetTextDatum(notify->extra);
+         else
+             nulls[2] = true;
+
+         /* switch to appropriate context while storing the tuple */
+         MemoryContextSwitchTo(per_query_ctx);
+         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+         MemoryContextSwitchTo(oldcontext);
+
+         PQfreemem(notify);
+         PQconsumeInput(conn);
+     }
+
+     /* clean up and return the tuplestore */
+     tuplestore_donestoring(tupstore);
+
+     return (Datum) 0;
+ }
+
  /*************************************************************
   * internal functions
   */
Index: contrib/dblink/dblink.h
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.h,v
retrieving revision 1.22
diff -c -r1.22 dblink.h
*** contrib/dblink/dblink.h    9 Jun 2009 17:41:02 -0000    1.22
--- contrib/dblink/dblink.h    4 Aug 2009 02:35:59 -0000
***************
*** 57,61 ****
--- 57,62 ----
  extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
  extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
  extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_notify(PG_FUNCTION_ARGS);

  #endif   /* DBLINK_H */
Index: contrib/dblink/dblink.sql.in
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.sql.in,v
retrieving revision 1.18
diff -c -r1.18 dblink.sql.in
*** contrib/dblink/dblink.sql.in    9 Jun 2009 17:41:02 -0000    1.18
--- contrib/dblink/dblink.sql.in    4 Aug 2009 03:51:03 -0000
***************
*** 202,204 ****
--- 202,221 ----
  RETURNS text
  AS 'MODULE_PATHNAME', 'dblink_error_message'
  LANGUAGE C STRICT;
+
+ CREATE TYPE dblink_get_notify_type AS
+ (
+     notify_name TEXT,
+     be_pid INT4,
+     extra TEXT
+ );
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify()
+ RETURNS setof dblink_get_notify_type
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(conname text)
+ RETURNS setof dblink_get_notify_type
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
Index: contrib/dblink/uninstall_dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/uninstall_dblink.sql,v
retrieving revision 1.7
diff -c -r1.7 uninstall_dblink.sql
*** contrib/dblink/uninstall_dblink.sql    5 Apr 2008 02:26:14 -0000    1.7
--- contrib/dblink/uninstall_dblink.sql    4 Aug 2009 02:35:59 -0000
***************
*** 76,78 ****
--- 76,82 ----
  DROP FUNCTION dblink_is_busy(text);

  DROP FUNCTION dblink_send_query(text, text);
+
+ DROP FUNCTION dblink_get_notify();
+
+ DROP FUNCTION dblink_get_notify(text);
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.25
diff -c -r1.25 dblink.out
*** contrib/dblink/expected/dblink.out    6 Jun 2009 21:27:56 -0000    1.25
--- contrib/dblink/expected/dblink.out    4 Aug 2009 04:07:05 -0000
***************
*** 827,829 ****
--- 827,870 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ NOTIFY regression;
+ NOTIFY foobar;
+ SELECT notify_name, be_pid/be_pid as be_pid, extra from dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+  regression  |      1 |
+  foobar      |      1 |
+ (2 rows)
+
+ SELECT * from dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ SELECT dblink_disconnect();
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.21
diff -c -r1.21 dblink.sql
*** contrib/dblink/sql/dblink.sql    6 Jun 2009 21:27:56 -0000    1.21
--- contrib/dblink/sql/dblink.sql    4 Aug 2009 04:06:37 -0000
***************
*** 389,391 ****
--- 389,408 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+
+ NOTIFY regression;
+ NOTIFY foobar;
+
+ SELECT notify_name, be_pid/be_pid as be_pid, extra from dblink_get_notify();
+
+ SELECT * from dblink_get_notify();
+
+ SELECT dblink_disconnect();
Index: doc/src/sgml/dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.8
diff -c -r1.8 dblink.sgml
*** doc/src/sgml/dblink.sgml    18 Jun 2009 14:34:36 -0000    1.8
--- doc/src/sgml/dblink.sgml    4 Aug 2009 04:13:45 -0000
***************
*** 1260,1265 ****
--- 1260,1338 ----
    </refsect1>
   </refentry>

+  <refentry id="CONTRIB-DBLINK-GET-NOTIFY">
+   <refnamediv>
+    <refname>dblink_get_notify</refname>
+    <refpurpose>retrieve async notifications on a connection</refpurpose>
+   </refnamediv>
+
+   <refsynopsisdiv>
+    <synopsis>
+     dblink_get_notify() returns setof (notify_name text, be_pid int, extra text)
+     dblink_get_notify(text connname) returns setof (notify_name text, be_pid int, extra text)
+    </synopsis>
+   </refsynopsisdiv>
+
+   <refsect1>
+    <title>Description</title>
+
+    <para>
+     <function>dblink_get_notify</> retrieves notifications on either
+     the unnamed connection, or on a named connection if specified.
+     To receive notifications via dblink, <function>LISTEN</> must
+     first be issued, using <function>dblink_exec</>.
+     For details see <xref linkend="sql-listen"> and <xref linkend="sql-notify">.
+    </para>
+
+   </refsect1>
+
+   <refsect1>
+    <title>Arguments</title>
+
+    <variablelist>
+     <varlistentry>
+      <term><parameter>conname</parameter></term>
+      <listitem>
+       <para>
+        The name of a named connection to get notifications on.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </refsect1>
+
+   <refsect1>
+    <title>Return Value</title>
+     <para>Returns setof (notify_name text, be_pid int, extra text), or an empty set if none.</para>
+   </refsect1>
+
+   <refsect1>
+    <title>Example</title>
+
+    <programlisting>
+ test=# SELECT dblink_exec('LISTEN virtual');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ test=# SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ test=# NOTIFY virtual;
+ NOTIFY
+
+ SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+  virtual     |   1229 |
+ (1 row)
+    </programlisting>
+   </refsect1>
+  </refentry>
+
   <refentry id="CONTRIB-DBLINK-GET-RESULT">
    <refmeta>
     <refentrytitle>dblink_get_result</refentrytitle>

Re: async notification patch for dblink

From
Alvaro Herrera
Date:
Joe Conway escribió:

> OK, how's this look?

Hmm, is it possible to use OUT parameters in the function instead of
declaring a new type for the result?


-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support


Re: async notification patch for dblink

From
Joe Conway
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

Alvaro Herrera wrote:
> Joe Conway escribió:
>
>> OK, how's this look?
>
> Hmm, is it possible to use OUT parameters in the function instead of
> declaring a new type for the result?

Sure, I guess I ought to use the latest-and-greatest. Any other comments
before I commit?

Thanks,

Joe
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org

iQIcBAEBCAAGBQJKeDzfAAoJEDfy90M199hl3FsP/RYXWVVlItm3jk3hKNPCqTaT
maSwtl0QrFLl1pAc3ZTk16QdERrYFjfxIcxALz2am7OtStmhOz63Y4nlY8L0GHdz
ZJDF7K3r5o2RJ12h7Zucpo3racxp1eJqfi4S61RCeIi+aPxtcJd7dds1588FYACc
cPi6AuZyFfE6lhUjgniqOjzvTMptqWbtmuRZl1m+iyXsPE2FLlMclfugbzFKFXso
cnUI31cuejg9tF1zboatopm/qAcIiCB+U2KVG7tyiI+KdL/bNJlkELRJKODxdCvB
XNHzNBey1ZmbgE+H8gu1bz7DXbTxaffoQunCd4HnfOXOGrJpG+djsSCNW860sSVK
icCvdvq3taHfcpLtIIjpecu9LatTFGkJ12YVIV33gnv/Fgr8pj+84VsxRfqlfNaN
V743KdWtEv19gii3qIa817ZcS4tpBfIxXyt37cuZMdOv3VXR7LejeOoDr4aHGjDq
SCozcQQ+Fh8diU1XMzl3rRtz3Brz1cVYE8Ue8ELcQzzuOvWT+a0cjCBii1SmZdTz
DnpVEzmeOGrnHM8+IOG+h9IFRvNx3RQ2nxs71gcbgDp62fnQqb3m7c/Aj0xWv8xp
mQhY2DNQg0AY1vr71nfMGbYL7xTHFD5fA2Md2l/029PA+qFhJlqnZpWScHwqafcC
nkPH88kPUgy33vuNTJ3A
=U88M
-----END PGP SIGNATURE-----
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.82
diff -c -r1.82 dblink.c
*** contrib/dblink/dblink.c    11 Jun 2009 14:48:50 -0000    1.82
--- contrib/dblink/dblink.c    4 Aug 2009 13:41:26 -0000
***************
*** 1635,1640 ****
--- 1635,1723 ----
      PG_RETURN_DATUM(current_query(fcinfo));
  }

+ /*
+  * Retrieve async notifications for a connection.
+  *
+  * Returns an setof record of notifications, or an empty set if none recieved.
+  * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
+  *
+  */
+ #define DBLINK_NOTIFY_COLS        3
+
+ PG_FUNCTION_INFO_V1(dblink_get_notify);
+ Datum
+ dblink_get_notify(PG_FUNCTION_ARGS)
+ {
+     PGconn               *conn = NULL;
+     remoteConn           *rconn = NULL;
+     PGnotify           *notify;
+     ReturnSetInfo       *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+     TupleDesc            tupdesc;
+     Tuplestorestate       *tupstore;
+     MemoryContext        per_query_ctx;
+     MemoryContext        oldcontext;
+
+     DBLINK_INIT;
+     if (PG_NARGS() == 1)
+         DBLINK_GET_NAMED_CONN;
+     else
+         conn = pconn->conn;
+
+     /* create the tuplestore */
+     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+     oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+     tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
+                        TEXTOID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
+                        INT4OID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
+                        TEXTOID, -1, 0);
+
+     tupstore = tuplestore_begin_heap(true, false, work_mem);
+     rsinfo->returnMode = SFRM_Materialize;
+     rsinfo->setResult = tupstore;
+     rsinfo->setDesc = tupdesc;
+
+     MemoryContextSwitchTo(oldcontext);
+
+     PQconsumeInput(conn);
+     while ((notify = PQnotifies(conn)) != NULL)
+     {
+         Datum        values[DBLINK_NOTIFY_COLS];
+         bool        nulls[DBLINK_NOTIFY_COLS];
+
+         memset(values, 0, sizeof(values));
+         memset(nulls, 0, sizeof(nulls));
+
+         if (notify->relname != NULL)
+             values[0] = CStringGetTextDatum(notify->relname);
+         else
+             nulls[0] = true;
+
+         values[1] = Int32GetDatum(notify->be_pid);
+
+         if (notify->extra != NULL)
+             values[2] = CStringGetTextDatum(notify->extra);
+         else
+             nulls[2] = true;
+
+         /* switch to appropriate context while storing the tuple */
+         MemoryContextSwitchTo(per_query_ctx);
+         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+         MemoryContextSwitchTo(oldcontext);
+
+         PQfreemem(notify);
+         PQconsumeInput(conn);
+     }
+
+     /* clean up and return the tuplestore */
+     tuplestore_donestoring(tupstore);
+
+     return (Datum) 0;
+ }
+
  /*************************************************************
   * internal functions
   */
Index: contrib/dblink/dblink.h
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.h,v
retrieving revision 1.22
diff -c -r1.22 dblink.h
*** contrib/dblink/dblink.h    9 Jun 2009 17:41:02 -0000    1.22
--- contrib/dblink/dblink.h    4 Aug 2009 13:41:26 -0000
***************
*** 57,61 ****
--- 57,62 ----
  extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
  extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
  extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_notify(PG_FUNCTION_ARGS);

  #endif   /* DBLINK_H */
Index: contrib/dblink/dblink.sql.in
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.sql.in,v
retrieving revision 1.18
diff -c -r1.18 dblink.sql.in
*** contrib/dblink/dblink.sql.in    9 Jun 2009 17:41:02 -0000    1.18
--- contrib/dblink/dblink.sql.in    4 Aug 2009 13:44:34 -0000
***************
*** 202,204 ****
--- 202,223 ----
  RETURNS text
  AS 'MODULE_PATHNAME', 'dblink_error_message'
  LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(
+     OUT notify_name TEXT,
+     OUT be_pid INT4,
+     OUT extra TEXT
+ )
+ RETURNS setof record
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(
+     conname TEXT,
+     OUT notify_name TEXT,
+     OUT be_pid INT4,
+     OUT extra TEXT
+ )
+ RETURNS setof record
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
Index: contrib/dblink/uninstall_dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/uninstall_dblink.sql,v
retrieving revision 1.7
diff -c -r1.7 uninstall_dblink.sql
*** contrib/dblink/uninstall_dblink.sql    5 Apr 2008 02:26:14 -0000    1.7
--- contrib/dblink/uninstall_dblink.sql    4 Aug 2009 13:41:26 -0000
***************
*** 76,78 ****
--- 76,82 ----
  DROP FUNCTION dblink_is_busy(text);

  DROP FUNCTION dblink_send_query(text, text);
+
+ DROP FUNCTION dblink_get_notify();
+
+ DROP FUNCTION dblink_get_notify(text);
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.25
diff -c -r1.25 dblink.out
*** contrib/dblink/expected/dblink.out    6 Jun 2009 21:27:56 -0000    1.25
--- contrib/dblink/expected/dblink.out    4 Aug 2009 13:41:26 -0000
***************
*** 827,829 ****
--- 827,870 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ NOTIFY regression;
+ NOTIFY foobar;
+ SELECT notify_name, be_pid/be_pid as be_pid, extra from dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+  regression  |      1 |
+  foobar      |      1 |
+ (2 rows)
+
+ SELECT * from dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ SELECT dblink_disconnect();
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.21
diff -c -r1.21 dblink.sql
*** contrib/dblink/sql/dblink.sql    6 Jun 2009 21:27:56 -0000    1.21
--- contrib/dblink/sql/dblink.sql    4 Aug 2009 13:41:26 -0000
***************
*** 389,391 ****
--- 389,408 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+
+ NOTIFY regression;
+ NOTIFY foobar;
+
+ SELECT notify_name, be_pid/be_pid as be_pid, extra from dblink_get_notify();
+
+ SELECT * from dblink_get_notify();
+
+ SELECT dblink_disconnect();
Index: doc/src/sgml/dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.8
diff -c -r1.8 dblink.sgml
*** doc/src/sgml/dblink.sgml    18 Jun 2009 14:34:36 -0000    1.8
--- doc/src/sgml/dblink.sgml    4 Aug 2009 13:41:26 -0000
***************
*** 1260,1265 ****
--- 1260,1338 ----
    </refsect1>
   </refentry>

+  <refentry id="CONTRIB-DBLINK-GET-NOTIFY">
+   <refnamediv>
+    <refname>dblink_get_notify</refname>
+    <refpurpose>retrieve async notifications on a connection</refpurpose>
+   </refnamediv>
+
+   <refsynopsisdiv>
+    <synopsis>
+     dblink_get_notify() returns setof (notify_name text, be_pid int, extra text)
+     dblink_get_notify(text connname) returns setof (notify_name text, be_pid int, extra text)
+    </synopsis>
+   </refsynopsisdiv>
+
+   <refsect1>
+    <title>Description</title>
+
+    <para>
+     <function>dblink_get_notify</> retrieves notifications on either
+     the unnamed connection, or on a named connection if specified.
+     To receive notifications via dblink, <function>LISTEN</> must
+     first be issued, using <function>dblink_exec</>.
+     For details see <xref linkend="sql-listen"> and <xref linkend="sql-notify">.
+    </para>
+
+   </refsect1>
+
+   <refsect1>
+    <title>Arguments</title>
+
+    <variablelist>
+     <varlistentry>
+      <term><parameter>conname</parameter></term>
+      <listitem>
+       <para>
+        The name of a named connection to get notifications on.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </refsect1>
+
+   <refsect1>
+    <title>Return Value</title>
+     <para>Returns setof (notify_name text, be_pid int, extra text), or an empty set if none.</para>
+   </refsect1>
+
+   <refsect1>
+    <title>Example</title>
+
+    <programlisting>
+ test=# SELECT dblink_exec('LISTEN virtual');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ test=# SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ test=# NOTIFY virtual;
+ NOTIFY
+
+ SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+  virtual     |   1229 |
+ (1 row)
+    </programlisting>
+   </refsect1>
+  </refentry>
+
   <refentry id="CONTRIB-DBLINK-GET-RESULT">
    <refmeta>
     <refentrytitle>dblink_get_result</refentrytitle>

Re: async notification patch for dblink

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> Sure, I guess I ought to use the latest-and-greatest. Any other comments
> before I commit?

That "be_pid/be_pid" hack in the regression test is pretty ugly, and
doesn't test anything very useful anyway seeing that it's integer
division.  Could you do something likebe_pid = pg_backend_pid() AS is_self_notify
instead, to verify that it's a self-notify?  (This is not quite right
because you'd need to execute pg_backend_pid() at the remote end, but
I'm not awake enough to remember the dblink syntax for that.  Maybe
it's too complex to be worth it, but I think demonstrating how to
check for self-notify would be a useful bit of doco.)

Otherwise it looks ok to me.
        regards, tom lane


Re: async notification patch for dblink

From
Joe Conway
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

Tom Lane wrote:
> Joe Conway <mail@joeconway.com> writes:
>> Sure, I guess I ought to use the latest-and-greatest. Any other comments
>> before I commit?
>
> That "be_pid/be_pid" hack in the regression test is pretty ugly, and
> doesn't test anything very useful anyway seeing that it's integer
> division.  Could you do something like
>     be_pid = pg_backend_pid() AS is_self_notify
> instead, to verify that it's a self-notify?  (This is not quite right
> because you'd need to execute pg_backend_pid() at the remote end, but
> I'm not awake enough to remember the dblink syntax for that.  Maybe
> it's too complex to be worth it, but I think demonstrating how to
> check for self-notify would be a useful bit of doco.)
>
> Otherwise it looks ok to me.

OK, still not pretty, but at least more useful. Last call for
comments...I hope ;-)

Joe
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org

iQIcBAEBCAAGBQJKePNeAAoJEDfy90M199hlduYP/2UaEuEXzWnGS1RX4Be+UajS
LBrS5uanAEUJcohnpMXCHjZVuLJBhXl8r/2RPXsVt5Fem3oYkxFrK3eCysMZs0jJ
F6iWb/Hiyi3bVf8coHjeabm0j/TF5yYsKuE3D0kcxktwKjq9vyGOxkadcAXV/lq1
+awVp5tip1ko3R20zSv+N4dcZp6ewHZ9Fa9AT9BS/3D5vQn6Jq3aUajqZ1DdSw0W
Fm5DlAl632xB8dQ9G/nNUYXMf5040OyD7foZEncS30i+diyHEvx5++5peCmlAvBj
KmeULbsmGn+brlQjvMftJU5LnOzEkKAHYs90BjR/jHoivJHSBjDNbREm1lJqGU9q
DU7E3NXz2/hva+/a+PPoyZjJDj+wvxzBivySB/fPCLpQFATDYHQ+1iMOKWqiGO1q
N1I6bEEZTAH1/8drANR224/BVnOs10s+4uUL5L8Me+eWO0gIxyhpHyIg/MkhlJ1a
Aopz3p7XEuXWmo4XGEAMVhJ2NpBiNKc69Ihno3kkY6FMo4Hs8rNsP+bdZnpBhugO
JB0+EDhSaVf2O7a8zmz3fylP2cNBQTh3gZlbNzYlnYgkhkuTXda2JXjwdnAkd5DK
33vVaJ0swb7MzeBWzP5PDMXKbzVPtz7FvReVVhIZbAJk+oVlwlw+KMRo8z8g0ci8
CgX2i1YwCh0AGuk450rZ
=JQmH
-----END PGP SIGNATURE-----
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.82
diff -c -r1.82 dblink.c
*** contrib/dblink/dblink.c    11 Jun 2009 14:48:50 -0000    1.82
--- contrib/dblink/dblink.c    4 Aug 2009 13:41:26 -0000
***************
*** 1635,1640 ****
--- 1635,1723 ----
      PG_RETURN_DATUM(current_query(fcinfo));
  }

+ /*
+  * Retrieve async notifications for a connection.
+  *
+  * Returns an setof record of notifications, or an empty set if none recieved.
+  * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
+  *
+  */
+ #define DBLINK_NOTIFY_COLS        3
+
+ PG_FUNCTION_INFO_V1(dblink_get_notify);
+ Datum
+ dblink_get_notify(PG_FUNCTION_ARGS)
+ {
+     PGconn               *conn = NULL;
+     remoteConn           *rconn = NULL;
+     PGnotify           *notify;
+     ReturnSetInfo       *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+     TupleDesc            tupdesc;
+     Tuplestorestate       *tupstore;
+     MemoryContext        per_query_ctx;
+     MemoryContext        oldcontext;
+
+     DBLINK_INIT;
+     if (PG_NARGS() == 1)
+         DBLINK_GET_NAMED_CONN;
+     else
+         conn = pconn->conn;
+
+     /* create the tuplestore */
+     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+     oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+     tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
+                        TEXTOID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
+                        INT4OID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
+                        TEXTOID, -1, 0);
+
+     tupstore = tuplestore_begin_heap(true, false, work_mem);
+     rsinfo->returnMode = SFRM_Materialize;
+     rsinfo->setResult = tupstore;
+     rsinfo->setDesc = tupdesc;
+
+     MemoryContextSwitchTo(oldcontext);
+
+     PQconsumeInput(conn);
+     while ((notify = PQnotifies(conn)) != NULL)
+     {
+         Datum        values[DBLINK_NOTIFY_COLS];
+         bool        nulls[DBLINK_NOTIFY_COLS];
+
+         memset(values, 0, sizeof(values));
+         memset(nulls, 0, sizeof(nulls));
+
+         if (notify->relname != NULL)
+             values[0] = CStringGetTextDatum(notify->relname);
+         else
+             nulls[0] = true;
+
+         values[1] = Int32GetDatum(notify->be_pid);
+
+         if (notify->extra != NULL)
+             values[2] = CStringGetTextDatum(notify->extra);
+         else
+             nulls[2] = true;
+
+         /* switch to appropriate context while storing the tuple */
+         MemoryContextSwitchTo(per_query_ctx);
+         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+         MemoryContextSwitchTo(oldcontext);
+
+         PQfreemem(notify);
+         PQconsumeInput(conn);
+     }
+
+     /* clean up and return the tuplestore */
+     tuplestore_donestoring(tupstore);
+
+     return (Datum) 0;
+ }
+
  /*************************************************************
   * internal functions
   */
Index: contrib/dblink/dblink.h
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.h,v
retrieving revision 1.22
diff -c -r1.22 dblink.h
*** contrib/dblink/dblink.h    9 Jun 2009 17:41:02 -0000    1.22
--- contrib/dblink/dblink.h    4 Aug 2009 13:41:26 -0000
***************
*** 57,61 ****
--- 57,62 ----
  extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
  extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
  extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_notify(PG_FUNCTION_ARGS);

  #endif   /* DBLINK_H */
Index: contrib/dblink/dblink.sql.in
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.sql.in,v
retrieving revision 1.18
diff -c -r1.18 dblink.sql.in
*** contrib/dblink/dblink.sql.in    9 Jun 2009 17:41:02 -0000    1.18
--- contrib/dblink/dblink.sql.in    4 Aug 2009 13:44:34 -0000
***************
*** 202,204 ****
--- 202,223 ----
  RETURNS text
  AS 'MODULE_PATHNAME', 'dblink_error_message'
  LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(
+     OUT notify_name TEXT,
+     OUT be_pid INT4,
+     OUT extra TEXT
+ )
+ RETURNS setof record
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(
+     conname TEXT,
+     OUT notify_name TEXT,
+     OUT be_pid INT4,
+     OUT extra TEXT
+ )
+ RETURNS setof record
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
Index: contrib/dblink/uninstall_dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/uninstall_dblink.sql,v
retrieving revision 1.7
diff -c -r1.7 uninstall_dblink.sql
*** contrib/dblink/uninstall_dblink.sql    5 Apr 2008 02:26:14 -0000    1.7
--- contrib/dblink/uninstall_dblink.sql    4 Aug 2009 13:41:26 -0000
***************
*** 76,78 ****
--- 76,82 ----
  DROP FUNCTION dblink_is_busy(text);

  DROP FUNCTION dblink_send_query(text, text);
+
+ DROP FUNCTION dblink_get_notify();
+
+ DROP FUNCTION dblink_get_notify(text);
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.25
diff -c -r1.25 dblink.out
*** contrib/dblink/expected/dblink.out    6 Jun 2009 21:27:56 -0000    1.25
--- contrib/dblink/expected/dblink.out    5 Aug 2009 02:37:51 -0000
***************
*** 827,829 ****
--- 827,880 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ SELECT dblink_exec('NOTIFY regression');
+  dblink_exec
+ -------------
+  NOTIFY
+ (1 row)
+
+ SELECT dblink_exec('NOTIFY foobar');
+  dblink_exec
+ -------------
+  NOTIFY
+ (1 row)
+
+ SELECT notify_name, be_pid = (select t.be_pid from dblink('select pg_backend_pid()') as t(be_pid int)) AS
is_self_notify,extra from dblink_get_notify(); 
+  notify_name | is_self_notify | extra
+ -------------+----------------+-------
+  regression  | t              |
+  foobar      | t              |
+ (2 rows)
+
+ SELECT * from dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ SELECT dblink_disconnect();
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.21
diff -c -r1.21 dblink.sql
*** contrib/dblink/sql/dblink.sql    6 Jun 2009 21:27:56 -0000    1.21
--- contrib/dblink/sql/dblink.sql    5 Aug 2009 02:36:00 -0000
***************
*** 389,391 ****
--- 389,408 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+
+ SELECT dblink_exec('NOTIFY regression');
+ SELECT dblink_exec('NOTIFY foobar');
+
+ SELECT notify_name, be_pid = (select t.be_pid from dblink('select pg_backend_pid()') as t(be_pid int)) AS
is_self_notify,extra from dblink_get_notify(); 
+
+ SELECT * from dblink_get_notify();
+
+ SELECT dblink_disconnect();
Index: doc/src/sgml/dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.8
diff -c -r1.8 dblink.sgml
*** doc/src/sgml/dblink.sgml    18 Jun 2009 14:34:36 -0000    1.8
--- doc/src/sgml/dblink.sgml    4 Aug 2009 13:41:26 -0000
***************
*** 1260,1265 ****
--- 1260,1338 ----
    </refsect1>
   </refentry>

+  <refentry id="CONTRIB-DBLINK-GET-NOTIFY">
+   <refnamediv>
+    <refname>dblink_get_notify</refname>
+    <refpurpose>retrieve async notifications on a connection</refpurpose>
+   </refnamediv>
+
+   <refsynopsisdiv>
+    <synopsis>
+     dblink_get_notify() returns setof (notify_name text, be_pid int, extra text)
+     dblink_get_notify(text connname) returns setof (notify_name text, be_pid int, extra text)
+    </synopsis>
+   </refsynopsisdiv>
+
+   <refsect1>
+    <title>Description</title>
+
+    <para>
+     <function>dblink_get_notify</> retrieves notifications on either
+     the unnamed connection, or on a named connection if specified.
+     To receive notifications via dblink, <function>LISTEN</> must
+     first be issued, using <function>dblink_exec</>.
+     For details see <xref linkend="sql-listen"> and <xref linkend="sql-notify">.
+    </para>
+
+   </refsect1>
+
+   <refsect1>
+    <title>Arguments</title>
+
+    <variablelist>
+     <varlistentry>
+      <term><parameter>conname</parameter></term>
+      <listitem>
+       <para>
+        The name of a named connection to get notifications on.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </refsect1>
+
+   <refsect1>
+    <title>Return Value</title>
+     <para>Returns setof (notify_name text, be_pid int, extra text), or an empty set if none.</para>
+   </refsect1>
+
+   <refsect1>
+    <title>Example</title>
+
+    <programlisting>
+ test=# SELECT dblink_exec('LISTEN virtual');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ test=# SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ test=# NOTIFY virtual;
+ NOTIFY
+
+ SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+  virtual     |   1229 |
+ (1 row)
+    </programlisting>
+   </refsect1>
+  </refentry>
+
   <refentry id="CONTRIB-DBLINK-GET-RESULT">
    <refmeta>
     <refentrytitle>dblink_get_result</refentrytitle>

Re: async notification patch for dblink

From
Joe Conway
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

Tom Lane wrote:
> Could you do something like
>     be_pid = pg_backend_pid() AS is_self_notify
> instead, to verify that it's a self-notify?  (This is not quite right
> because you'd need to execute pg_backend_pid() at the remote end, but
> I'm not awake enough to remember the dblink syntax for that.  Maybe
> it's too complex to be worth it, but I think demonstrating how to
> check for self-notify would be a useful bit of doco.)
>
> Otherwise it looks ok to me.

Committed. Final version attached.

Joe
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Fedora - http://enigmail.mozdev.org

iQIcBAEBCAAGBQJKea/ZAAoJEDfy90M199hlZP8P/RcOrp9nHjAqVilQlHbRdlGO
7xTq2e9LU6tY2V0mzLcLZQ5SY6m12gNbQSIE6/8cNO3nWTjbm0TOYWpwKohBLvBt
0QQMUxi4JWapJcplaE10pt3xOT5+Kqn1mDG97Id92DwHTT7JtIBciDGWTuVWyQu3
8YJcbPzcTtGzL3lOTMFbZss10Lr7bLEMx0UmPJiMWDMqKIpmgC1cegIL2M54jol/
/fFx3mlz52O2F/maPtm4noBWsrDP6x/T7K8hspsqyWP0Xv52xPZ5qRorXRK3mDMb
U1jpAi/jqWqJ3X3riTeda39dg2wxZY1feOn42NBFTilbwHnpT+a6nSaR55/ZgnMp
7rlnSMOZTHCxgPOGZFXUNZsgf0HNME/2jFpfRhtbGzIre/iXcHUfhAbOiD72Gxdv
so+IOwbAEDXtIIwCAufAVZG/OvweEH8y9M3MytY82ozfmLiVwd6MvHuvjAixWrK/
/rVhH3d3j3oZh6dnjwOpZPvdQuqPJdl0sU5vvatHBMH8Af2gfSKFdmBlG47D9LGn
Brish7KmQb9u4hBinVNyMac9V/VVmUbY0K4fLQru8DtElWZzCyTRylkmiUqFtKeu
0OSx/vO/csR2Wa83hgyYLWGR8ShgYNX5Fws/BMtdzxnD4mjguJ2FI2FyduXGZ3GK
EyVBHi8/NBXt6DNoOhbS
=uKPZ
-----END PGP SIGNATURE-----
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.82
diff -c -r1.82 dblink.c
*** contrib/dblink/dblink.c    11 Jun 2009 14:48:50 -0000    1.82
--- contrib/dblink/dblink.c    4 Aug 2009 13:41:26 -0000
***************
*** 1635,1640 ****
--- 1635,1723 ----
      PG_RETURN_DATUM(current_query(fcinfo));
  }

+ /*
+  * Retrieve async notifications for a connection.
+  *
+  * Returns an setof record of notifications, or an empty set if none recieved.
+  * Can optionally take a named connection as parameter, but uses the unnamed connection per default.
+  *
+  */
+ #define DBLINK_NOTIFY_COLS        3
+
+ PG_FUNCTION_INFO_V1(dblink_get_notify);
+ Datum
+ dblink_get_notify(PG_FUNCTION_ARGS)
+ {
+     PGconn               *conn = NULL;
+     remoteConn           *rconn = NULL;
+     PGnotify           *notify;
+     ReturnSetInfo       *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+     TupleDesc            tupdesc;
+     Tuplestorestate       *tupstore;
+     MemoryContext        per_query_ctx;
+     MemoryContext        oldcontext;
+
+     DBLINK_INIT;
+     if (PG_NARGS() == 1)
+         DBLINK_GET_NAMED_CONN;
+     else
+         conn = pconn->conn;
+
+     /* create the tuplestore */
+     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+     oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+     tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
+                        TEXTOID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
+                        INT4OID, -1, 0);
+     TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
+                        TEXTOID, -1, 0);
+
+     tupstore = tuplestore_begin_heap(true, false, work_mem);
+     rsinfo->returnMode = SFRM_Materialize;
+     rsinfo->setResult = tupstore;
+     rsinfo->setDesc = tupdesc;
+
+     MemoryContextSwitchTo(oldcontext);
+
+     PQconsumeInput(conn);
+     while ((notify = PQnotifies(conn)) != NULL)
+     {
+         Datum        values[DBLINK_NOTIFY_COLS];
+         bool        nulls[DBLINK_NOTIFY_COLS];
+
+         memset(values, 0, sizeof(values));
+         memset(nulls, 0, sizeof(nulls));
+
+         if (notify->relname != NULL)
+             values[0] = CStringGetTextDatum(notify->relname);
+         else
+             nulls[0] = true;
+
+         values[1] = Int32GetDatum(notify->be_pid);
+
+         if (notify->extra != NULL)
+             values[2] = CStringGetTextDatum(notify->extra);
+         else
+             nulls[2] = true;
+
+         /* switch to appropriate context while storing the tuple */
+         MemoryContextSwitchTo(per_query_ctx);
+         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+         MemoryContextSwitchTo(oldcontext);
+
+         PQfreemem(notify);
+         PQconsumeInput(conn);
+     }
+
+     /* clean up and return the tuplestore */
+     tuplestore_donestoring(tupstore);
+
+     return (Datum) 0;
+ }
+
  /*************************************************************
   * internal functions
   */
Index: contrib/dblink/dblink.h
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.h,v
retrieving revision 1.22
diff -c -r1.22 dblink.h
*** contrib/dblink/dblink.h    9 Jun 2009 17:41:02 -0000    1.22
--- contrib/dblink/dblink.h    4 Aug 2009 13:41:26 -0000
***************
*** 57,61 ****
--- 57,62 ----
  extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
  extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
  extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_notify(PG_FUNCTION_ARGS);

  #endif   /* DBLINK_H */
Index: contrib/dblink/dblink.sql.in
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.sql.in,v
retrieving revision 1.18
diff -c -r1.18 dblink.sql.in
*** contrib/dblink/dblink.sql.in    9 Jun 2009 17:41:02 -0000    1.18
--- contrib/dblink/dblink.sql.in    4 Aug 2009 13:44:34 -0000
***************
*** 202,204 ****
--- 202,223 ----
  RETURNS text
  AS 'MODULE_PATHNAME', 'dblink_error_message'
  LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(
+     OUT notify_name TEXT,
+     OUT be_pid INT4,
+     OUT extra TEXT
+ )
+ RETURNS setof record
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
+
+ CREATE OR REPLACE FUNCTION dblink_get_notify(
+     conname TEXT,
+     OUT notify_name TEXT,
+     OUT be_pid INT4,
+     OUT extra TEXT
+ )
+ RETURNS setof record
+ AS 'MODULE_PATHNAME', 'dblink_get_notify'
+ LANGUAGE C STRICT;
Index: contrib/dblink/uninstall_dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/uninstall_dblink.sql,v
retrieving revision 1.7
diff -c -r1.7 uninstall_dblink.sql
*** contrib/dblink/uninstall_dblink.sql    5 Apr 2008 02:26:14 -0000    1.7
--- contrib/dblink/uninstall_dblink.sql    4 Aug 2009 13:41:26 -0000
***************
*** 76,78 ****
--- 76,82 ----
  DROP FUNCTION dblink_is_busy(text);

  DROP FUNCTION dblink_send_query(text, text);
+
+ DROP FUNCTION dblink_get_notify();
+
+ DROP FUNCTION dblink_get_notify(text);
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.25
diff -c -r1.25 dblink.out
*** contrib/dblink/expected/dblink.out    6 Jun 2009 21:27:56 -0000    1.25
--- contrib/dblink/expected/dblink.out    5 Aug 2009 02:37:51 -0000
***************
*** 827,829 ****
--- 827,880 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+  dblink_connect
+ ----------------
+  OK
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ SELECT dblink_exec('NOTIFY regression');
+  dblink_exec
+ -------------
+  NOTIFY
+ (1 row)
+
+ SELECT dblink_exec('NOTIFY foobar');
+  dblink_exec
+ -------------
+  NOTIFY
+ (1 row)
+
+ SELECT notify_name, be_pid = (select t.be_pid from dblink('select pg_backend_pid()') as t(be_pid int)) AS
is_self_notify,extra from dblink_get_notify(); 
+  notify_name | is_self_notify | extra
+ -------------+----------------+-------
+  regression  | t              |
+  foobar      | t              |
+ (2 rows)
+
+ SELECT * from dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ SELECT dblink_disconnect();
+  dblink_disconnect
+ -------------------
+  OK
+ (1 row)
+
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.21
diff -c -r1.21 dblink.sql
*** contrib/dblink/sql/dblink.sql    6 Jun 2009 21:27:56 -0000    1.21
--- contrib/dblink/sql/dblink.sql    5 Aug 2009 02:36:00 -0000
***************
*** 389,391 ****
--- 389,408 ----
  DROP USER MAPPING FOR public SERVER fdtest;
  DROP SERVER fdtest;
  DROP FOREIGN DATA WRAPPER postgresql;
+
+ -- test asynchronous notifications
+ SELECT dblink_connect('dbname=contrib_regression');
+
+ --should return listen
+ SELECT dblink_exec('LISTEN regression');
+ --should return listen
+ SELECT dblink_exec('LISTEN foobar');
+
+ SELECT dblink_exec('NOTIFY regression');
+ SELECT dblink_exec('NOTIFY foobar');
+
+ SELECT notify_name, be_pid = (select t.be_pid from dblink('select pg_backend_pid()') as t(be_pid int)) AS
is_self_notify,extra from dblink_get_notify(); 
+
+ SELECT * from dblink_get_notify();
+
+ SELECT dblink_disconnect();
Index: doc/src/sgml/dblink.sgml
===================================================================
RCS file: /opt/src/cvs/pgsql/doc/src/sgml/dblink.sgml,v
retrieving revision 1.8
diff -c -r1.8 dblink.sgml
*** doc/src/sgml/dblink.sgml    18 Jun 2009 14:34:36 -0000    1.8
--- doc/src/sgml/dblink.sgml    4 Aug 2009 13:41:26 -0000
***************
*** 1260,1265 ****
--- 1260,1338 ----
    </refsect1>
   </refentry>

+  <refentry id="CONTRIB-DBLINK-GET-NOTIFY">
+   <refnamediv>
+    <refname>dblink_get_notify</refname>
+    <refpurpose>retrieve async notifications on a connection</refpurpose>
+   </refnamediv>
+
+   <refsynopsisdiv>
+    <synopsis>
+     dblink_get_notify() returns setof (notify_name text, be_pid int, extra text)
+     dblink_get_notify(text connname) returns setof (notify_name text, be_pid int, extra text)
+    </synopsis>
+   </refsynopsisdiv>
+
+   <refsect1>
+    <title>Description</title>
+
+    <para>
+     <function>dblink_get_notify</> retrieves notifications on either
+     the unnamed connection, or on a named connection if specified.
+     To receive notifications via dblink, <function>LISTEN</> must
+     first be issued, using <function>dblink_exec</>.
+     For details see <xref linkend="sql-listen"> and <xref linkend="sql-notify">.
+    </para>
+
+   </refsect1>
+
+   <refsect1>
+    <title>Arguments</title>
+
+    <variablelist>
+     <varlistentry>
+      <term><parameter>conname</parameter></term>
+      <listitem>
+       <para>
+        The name of a named connection to get notifications on.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </refsect1>
+
+   <refsect1>
+    <title>Return Value</title>
+     <para>Returns setof (notify_name text, be_pid int, extra text), or an empty set if none.</para>
+   </refsect1>
+
+   <refsect1>
+    <title>Example</title>
+
+    <programlisting>
+ test=# SELECT dblink_exec('LISTEN virtual');
+  dblink_exec
+ -------------
+  LISTEN
+ (1 row)
+
+ test=# SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+ (0 rows)
+
+ test=# NOTIFY virtual;
+ NOTIFY
+
+ SELECT * FROM dblink_get_notify();
+  notify_name | be_pid | extra
+ -------------+--------+-------
+  virtual     |   1229 |
+ (1 row)
+    </programlisting>
+   </refsect1>
+  </refentry>
+
   <refentry id="CONTRIB-DBLINK-GET-RESULT">
    <refmeta>
     <refentrytitle>dblink_get_result</refentrytitle>