Re: psycopg2 (async) socket timeout - Mailing list psycopg
From | Danny Milosavljevic |
---|---|
Subject | Re: psycopg2 (async) socket timeout |
Date | |
Msg-id | AANLkTi=J=c9ZD6L5LYEyif=DGtDq__Lw0g2n2+u3a9nn@mail.gmail.com Whole thread Raw |
In response to | Re: psycopg2 (async) socket timeout (Jan Urbański <wulczer@wulczer.org>) |
Responses |
Re: psycopg2 (async) socket timeout
|
List | psycopg |
Hi, 2011/2/9 Jan Urbański <wulczer@wulczer.org>: > ----- Original message ----- > I'll try to reproduce this problem, AIUI you should have the Deferred errback if the connection is lost, but perhaps ittakes some time for Twisted to detect it (actually it takes time for the kernel to detect it). You might try playing withyour TCP keepalive settings. I'm trying. No luck so far... http://twistedmatrix.com/trac/wiki/FrequentlyAskedQuestions says "If you rely on TCP timeouts, expect as much as two hours (the precise amount is platform specific) to pass between when the disruption occurs and when connectionLost is called". Oops. Hmm, even when I connect, then just down the network interface and only after that call runQuery, it is also never calling back anything (well, I didn't wait more than half an hour per try so far). But good point, although does this even work for async sockets? - where you are not reading actively, that is, nobody knows you want to receive any data? If that worked, that would be the nicest fix. For the not-so-nice fix, read on :-) I've now started to do it the way Daniele and you suggested ("just close it from the client"), so I modified the Connection to start a timer which will fire if I don't defuse it early enough (and modified ConnectionPool to check connections periodically and reconnect). After I receive a response, I defuse the timer. If not, the timer callback will be run. It will call the errback - which will call connection.close(). As far as noticing the "disconnect" (well, potential disconnect) goes, this works perfectly. However, doing a connection.close() then doesn't seem to help much, still investigating why... getting the following: File "/usr/lib/python2.6/site-packages/twisted/internet/selectreactor.py", line 104, in doSelect [], timeout) exceptions.ValueError: file descriptor cannot be a negative integer (-1) So it seems the FD of the closed connection to postgres is still in the Twisted reactor? Seems I am missing some calls to self.reactor.removeReader or -Writer, maybe. Do those belong in Connection.close() ? If I try to reconnect periodically, can I use the same txpostgres Connection instance and just call connect() again? > Another option is implementing a timeout with a callLater. The problem there is that it requires additional code Yeah, did that now, see the end of this post for the code... Since I'm trying to make the pg client more resilient against our flaky network, what I want to do is just close the socket to the server on timeout. What I don't want to do it send a cancellation request over the broken socket to the server saying that the connection is broken and he please cancel the query :-) (I hope the PostgreSQL server will notice soon enough when the client doesn't answer) > and txpostgres does not support query cancellation (yet, it's on the roadmap). Yeah, but when you say "cancellation" do you mean "sending a - cancellation - request to the server via the non-working connection"? :) Well, I'll be reading a bit more of the twisted reactor code now, I guess :-) Cheers, Danny The file "postgresto.py" which is all that extra code needed for client-side impatience follows. For "Connection" it's mostly your code with a few minimal changes: some connectTimeouter() calls sprinkled in. For "ConnectionPool" it's a periodically-checking health checker and reconnecter too now - please note that this is work in progress and in no way stable yet: #!/usr/bin/env python2 # Postgres with timeouts. import sys from txpostgres import txpostgres from twisted.internet import interfaces, reactor, defer from twisted.python import log class Connection(txpostgres.Connection): def connectTimeouter(self, d, timeout): """ connect a timeouter to a deferred """ delayedCall = reactor.callLater(timeout, self.handleTimeout, d) d.addBoth(self.cancelTimeout, delayedCall) return(d) def _runQuery(self, *args, **kwargs): c = self.cursor() timeout = kwargs.get("timeout") or 10 d = c.execute(*args, **kwargs) return self.connectTimeouter(d, timeout).addCallback(lambda c: c.fetchall()) def _runOperation(self, *args, **kwargs): c = self.cursor() timeout = kwargs.get("timeout") or 10 d = c.execute(*args, **kwargs) return self.connectTimeouter(d, timeout).addCallback(lambda _: None) def _runInteraction(self, interaction, *args, **kwargs): c = self.cursor() timeout = kwargs.get("timeout") or 10 d = c.execute("begin") # we assume that the interaction does something on the database here, so if the interaction times out, take it as a database timeout! self.connectTimeouter(d, timeout).addCallback(lambda arg: self.connectTimeouter(defer.maybeDeferred(interaction(arg, *args, **kwargs)))) # FIXME also timeout the interaction itself. def commitAndPassthrough(ret, cursor): e = cursor.execute("commit") return e.addCallback(lambda _: ret) def rollbackAndPassthrough(f, cursor): # maybeDeferred in case cursor.execute raises a synchronous exception e = defer.maybeDeferred(cursor.execute, "rollback") def just_panic(rf): log.err(rf) return defer.fail(RollbackFailed(self, f)) # if rollback failed, panic e.addErrback(just_panic) # reraise the original failure afterwards return e.addCallback(lambda _: f) #self.connectTimeouter(d, timeout) d.addCallback(commitAndPassthrough, c) d.addErrback(rollbackAndPassthrough, c) return d def handleTimeout(self, d): """ handles the timeout since we DID time out """ log.err("timed out") self.close() # close the connection (maybe it was a connection problem...) def cancelTimeout(self, arg, delayedCall): """ cancels the timeout since we DID NOT time out """ #print >>sys.stderr, "not timed out, OK" if delayedCall.active(): delayedCall.cancel() return(arg) def isConnected(self): return self.pollable() is not None class ConnectionPool(txpostgres.ConnectionPool): connectionFactory = Connection """def connect(self, *args, **kwargs): result = txpostgres.ConnectionPool.connect(self, *args, **kwargs) return(result) """ def __init__(self, *args, **kwargs): txpostgres.ConnectionPool.__init__(self, *args, **kwargs) self.reconnectionInterval = 10 # sec reactor.callLater(self.reconnectionInterval, self.reconnectIfNeeded) self.connectionAttempts = set() def reconnectIfNeeded(self): for connection in self.connections: if not connection.isConnected() and connection not in self.connectionAttempts: # TODO don't try that too often... log.msg("database connection was lost, trying again") self.connectionAttempts.add(connection) d = connection.connect(*self.connargs, **self.connkw) delayedCall = reactor.callLater(self.reconnectionInterval, self.handleTimeout, d, connection) d.addBoth(self.cancelTimeout, delayedCall, connection) reactor.callLater(self.reconnectionInterval, self.reconnectIfNeeded) def handleTimeout(self, d, connection): """ handles the timeout since we DID time out """ log.err("reconnect timed out") try: self.connectionAttempts.remove(connection) except KeyError: pass d.errback() def cancelTimeout(self, arg, delayedCall, connection): """ cancels the timeout since we DID NOT time out """ print >>sys.stderr, "reconnect DID NOT time out, OK" # does not neccessarily mean that it worked. if delayedCall.active(): delayedCall.cancel() try: self.connectionAttempts.remove(connection) except KeyError: pass return(arg)