Thread: NOTIFY in asynchronous mode

NOTIFY in asynchronous mode

From
Tobias Oberstein
Date:
Hello,

is it possible to use notification (sent via NOTIFY from Postgres) in asynchronous mode?

Background:

I want to use

https://github.com/wulczer/txpostgres

which wraps psycopg2 in asynchronous mode for use in Twisted without the need
of running a background thread pool.

And I want to use NOTIFY.

Thanks!

Re: NOTIFY in asynchronous mode

From
Daniele Varrazzo
Date:
On Fri, Nov 4, 2011 at 7:12 PM, Tobias Oberstein
<tobias.oberstein@tavendo.de> wrote:

> is it possible to use notification (sent via NOTIFY from Postgres) in asynchronous mode?

Yes: you should register the connection's file descriptor in the
twisted reactor so that you can be notified by the kernel when the
server sends you a notification.

-- Daniele

Re: NOTIFY in asynchronous mode

From
Jan Urbański
Date:
On 05/11/11 01:15, Daniele Varrazzo wrote:
> On Sat, Nov 5, 2011 at 12:03 AM, Jan Urbański <wulczer@wulczer.org> wrote:
>> On 04/11/11 21:23, Daniele Varrazzo wrote:
>>> On Fri, Nov 4, 2011 at 7:12 PM, Tobias Oberstein
>>> <tobias.oberstein@tavendo.de> wrote:
>>>
>>>> is it possible to use notification (sent via NOTIFY from Postgres) in asynchronous mode?
>>>
>>> Yes: you should register the connection's file descriptor in the
>>> twisted reactor so that you can be notified by the kernel when the
>>> server sends you a notification.
>>
>> Actually... it won't work out of the box. I got a patch from someone
>> that added NOTIFY support for txpostgres, but it broke some unit tests
>> and there were a few other minor issues with it and then I kind of
>> forgot about it.
>
> Is it possible to use a naked psycopg connection instead of tx? I
> mean, not for the regular query-return cycle, but just for sitting
> idle in the reactor and get a callback called upon notify.

Not in an easy way, I'm afraid. Stuff put in the reactor as readers need
to implement the IReadDescriptor interface[0] which means you'd have to
wrap the connection object with something that proxies fileno() to the
connection and looks for notifies when doRead() is called on it.

Since you have to connect first, and issue the LISTEN statement, you
have to go through the entire polling cycle, at which point you
basically would have implemented half of txpostgres :)

J

[0]
http://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IReadDescriptor.html

Re: NOTIFY in asynchronous mode

From
Daniele Varrazzo
Date:
On Sat, Nov 5, 2011 at 12:03 AM, Jan Urbański <wulczer@wulczer.org> wrote:
> On 04/11/11 21:23, Daniele Varrazzo wrote:
>> On Fri, Nov 4, 2011 at 7:12 PM, Tobias Oberstein
>> <tobias.oberstein@tavendo.de> wrote:
>>
>>> is it possible to use notification (sent via NOTIFY from Postgres) in asynchronous mode?
>>
>> Yes: you should register the connection's file descriptor in the
>> twisted reactor so that you can be notified by the kernel when the
>> server sends you a notification.
>
> Actually... it won't work out of the box. I got a patch from someone
> that added NOTIFY support for txpostgres, but it broke some unit tests
> and there were a few other minor issues with it and then I kind of
> forgot about it.

Is it possible to use a naked psycopg connection instead of tx? I
mean, not for the regular query-return cycle, but just for sitting
idle in the reactor and get a callback called upon notify.

-- Daniele

Re: NOTIFY in asynchronous mode

From
Jan Urbański
Date:
On 05/11/11 11:54, Tobias Oberstein wrote:
> -- Original message -----
>>> What would be provided to the callback?
>>>
>>> Channel (from NOTIFIY), Payload (from NOTIFIY), Connection (txpostgres)
>> ?
>>
>> Probably simply the psycopg2 Notify object.
>
> Ok, fine also.
>
> And not the Connection object on which it was received? Ok, maybe it's
>  not needed: When I want to do some db stuff within the callback, should
> not make a difference on what connection I do that.

You would register the callback on the Connection object, so you could
tell it which connection it will get registered on beforehand.

> Few other design Qs:
>
> Is it possible to UNLISTEN?

Sure, because you'd call both LISTEN and UNLISTEN as SQL statements with
runQuery.

> Or unregister a callback on a channel? Or set it to None?
>
> Are callbacks for channels registered "globally" or per connection?

The callbacks would be registered per connection, since it's the
connection who receives notifications.

> Can there be only one callback at most registered per channel?

Channel handling would be entirely up to the client. Here's a quick
example of how it could work (based on the patch I previously mentioned
from Jan Pobrislo, with a slightly changed callback API):


from twisted.python import log
from twisted.python.util import println

class NotifyObserver(object):

    def __init__(self, conn):
        self.conn = conn

    def __call__(self, notify):
        if notify.channel == 'channel1':
            println('got notify on channel 1, unlistening and removing')
            self.conn.removeNotifyObserver(self)
            d = conn.runQuery('UNLISTEN *')
            d.addCallback(lambda _: println('unlistened'))
            d.addErrback(log.err)
        else:
            println('got some other notify')


# conn is a txpostgres Connection
observer = NotifyObserver(conn)
# any number of different callables can be registered, here we just
register one
conn.addNotifyObserver(observer)

d = conn.runQuery('LISTEN channel1')
d.addCallback(lambda _: conn.runQuery('LISTEN channel2'))
d.addCallback(lanbda _: println('listening')

# then, after the program prints "listening" execute from psql in
another session
=$ NOTIFY channel2;
=$ NOTIFY channel2;
=$ NOTIFY channel1;
=$ NOTIFY channel2;

# which would yield the following from the running program
"got some other notify"
"got some other notify"
"got notify on channel 1, unlistening and removing"

# note that after a notify on channel1 is received, the observer is gone
and no more notifies are processed

I think that's a reasonably simple API and one that allows you to do
anything you want without introducing some extra interfaces that notify
observers have to implement, just requiring that they are callable.

Cheers,
Jan

Re: NOTIFY in asynchronous mode

From
Jan Urbański
Date:
On 04/11/11 21:23, Daniele Varrazzo wrote:
> On Fri, Nov 4, 2011 at 7:12 PM, Tobias Oberstein
> <tobias.oberstein@tavendo.de> wrote:
>
>> is it possible to use notification (sent via NOTIFY from Postgres) in asynchronous mode?
>
> Yes: you should register the connection's file descriptor in the
> twisted reactor so that you can be notified by the kernel when the
> server sends you a notification.

Actually... it won't work out of the box. I got a patch from someone
that added NOTIFY support for txpostgres, but it broke some unit tests
and there were a few other minor issues with it and then I kind of
forgot about it.

Recently I've been picking up txpostgres' development and I might add
NOTIFY support soon, maybe even the next week.

The problem with it now is that when there's no query going on, the
connection's descriptor is not present in the reactor, it only gets
added for the duration of the query. Once I figure out how to change
that without breaking some subtle corner cases, you'll be able to
register a NOTIFY callback that will get called when there's an async
notification received by the connection.

Cheers,
Jan

Re: NOTIFY in asynchronous mode

From
Tobias Oberstein
Date:
> I think that's a reasonably simple API and one that allows you to do anything
> you want without introducing some extra interfaces that notify observers
> have to implement, just requiring that they are callable.

Yeah, thats fine. So

+ all observers registered on a connection would get notified when a notification
on that connection arrives.

+ a dispatching scheme based on channel can be built on top of above

+ the LISTEN/UNLISTEN is orthogonal to observer registration

+ any callable which takes a first positional argument of type "notify" can be used

+ the whole interface for above is Connection.addNotifyObserver/removeNotifyObserver

, right?


Re: NOTIFY in asynchronous mode

From
Tobias Oberstein
Date:
> >> is it possible to use notification (sent via NOTIFY from Postgres) in
> asynchronous mode?
> >
> > Yes: you should register the connection's file descriptor in the
> > twisted reactor so that you can be notified by the kernel when the
> > server sends you a notification.
> 
> Actually... it won't work out of the box. I got a patch from someone that
> added NOTIFY support for txpostgres, but it broke some unit tests and there
> were a few other minor issues with it and then I kind of forgot about it.
> 
> Recently I've been picking up txpostgres' development and I might add
> NOTIFY support soon, maybe even the next week.

That would be marvelous!

> 
> The problem with it now is that when there's no query going on, the
> connection's descriptor is not present in the reactor, it only gets added for
> the duration of the query. Once I figure out how to change that without
> breaking some subtle corner cases, you'll be able to register a NOTIFY

Sounds intricate .. if there is something how I could help, let me know ..

> callback that will get called when there's an async notification received by the
> connection.

What would be provided to the callback?

Channel (from NOTIFIY), Payload (from NOTIFIY), Connection (txpostgres) ?

> 
> Cheers,
> Jan

Re: NOTIFY in asynchronous mode

From
Jan Urbański
Date:
----- Original message -----
> What would be provided to the callback?
>
> Channel (from NOTIFIY), Payload (from NOTIFIY), Connection (txpostgres) ?

Probably simply the psycopg2 Notify object.

J

Re: NOTIFY in asynchronous mode

From
Tobias Oberstein
Date:
-- Original message -----
> > What would be provided to the callback?
> >
> > Channel (from NOTIFIY), Payload (from NOTIFIY), Connection (txpostgres)
> ?
> 
> Probably simply the psycopg2 Notify object.

Ok, fine also.

And not the Connection object on which it was received? Ok, maybe it's
 not needed: When I want to do some db stuff within the callback, should
not make a difference on what connection I do that.

Few other design Qs:

Is it possible to UNLISTEN?

Or unregister a callback on a channel? Or set it to None?

Are callbacks for channels registered "globally" or per connection?

Can there be only one callback at most registered per channel?

Re: NOTIFY in asynchronous mode

From
Daniele Varrazzo
Date:
On Nov 5, 2011 12:21 AM, "Jan Urbański" <wulczer@wulczer.org> wrote:
>
> On 05/11/11 01:15, Daniele Varrazzo wrote:

> > Is it possible to use a naked psycopg connection instead of tx? I
> > mean, not for the regular query-return cycle, but just for sitting
> > idle in the reactor and get a callback called upon notify.
>
> Not in an easy way, I'm afraid. Stuff put in the reactor as readers need
> to implement the IReadDescriptor interface[0] which means you'd have to
> wrap the connection object with something that proxies fileno() to the
> connection and looks for notifies when doRead() is called on it.

Doh, sorry if I made it too easy. It would be great if notifies could
be used easily from Twisted: they seem made for each other like bread
and nutella. Please keep us informed if you manage to add the support
to the library.

Comparatively, receiving notifies in greenlet environments is
straightforward:
<http://initd.org/psycopg/articles/2010/12/01/postgresql-notifications-psycopg2-eventlet/>.

-- Daniele

Re: NOTIFY in asynchronous mode

From
Jan Urbański
Date:
On 06/11/11 11:42, Daniele Varrazzo wrote:
> On Nov 5, 2011 12:21 AM, "Jan Urbański" <wulczer@wulczer.org> wrote:
>>
>> On 05/11/11 01:15, Daniele Varrazzo wrote:
>
>>> Is it possible to use a naked psycopg connection instead of tx? I
>>> mean, not for the regular query-return cycle, but just for sitting
>>> idle in the reactor and get a callback called upon notify.
>>
>> Not in an easy way, I'm afraid. Stuff put in the reactor as readers need
>> to implement the IReadDescriptor interface[0] which means you'd have to
>> wrap the connection object with something that proxies fileno() to the
>> connection and looks for notifies when doRead() is called on it.
>
> Doh, sorry if I made it too easy. It would be great if notifies could
> be used easily from Twisted: they seem made for each other like bread
> and nutella. Please keep us informed if you manage to add the support
> to the library.

No problem ;) I think it was just what I needed to add support for
NOTIFY to txpostgres, which is now present in HEAD.

Turns out it required learning more that I cared to know about epoll and
fixing some unrelated bugs, but it's done.

> Comparatively, receiving notifies in greenlet environments is
> straightforward:
> <http://initd.org/psycopg/articles/2010/12/01/postgresql-notifications-psycopg2-eventlet/>.

Cool, I tried implementing that with Twisted and ended up with the
following:

https://gist.github.com/1343455

Requires txpostgres HEAD and txWebSocket, preferably from my fork
(https://github.com/wulczer/txWebSocket) to have it support newer
versions of the WebSocket protocol, as found in recent Chrome and Firefox.

All in all, thank you everyone for your input and sorry for conducting a
discussion that would belong more on the Twisted mailing list than here :)

Cheers,
Jan

Re: NOTIFY in asynchronous mode

From
Tobias Oberstein
Date:
> > Doh, sorry if I made it too easy. It would be great if notifies could
> > be used easily from Twisted: they seem made for each other like bread
> > and nutella. Please keep us informed if you manage to add the support
> > to the library.
> 
> No problem ;) I think it was just what I needed to add support for NOTIFY to
> txpostgres, which is now present in HEAD.
>

very nice. thanks!

btw: just tested .. it works with the revived kqueue reactor on FreeBSD

https://github.com/oberstet/txkqreactor

from txkqreactor import kqreactor
kqreactor.install()

from twisted.internet import reactor

from txpostgres.txpostgres import ConnectionPool, Connection

def observer(notify):
   print "NOTIFY", notify

conn = Connection()
conn.addNotifyObserver(observer)
d0 = conn.connect(DSN)
d1 = d0.addCallback(lambda res: conn.runOperation("LISTEN test"))

print "reactor class", reactor.__class__
reactor.run()