Skip site navigation (1) Skip section navigation (2)

Re: psycopg2 (async) socket timeout

From: Danny Milosavljevic <danny(dot)milo+ml(at)gmail(dot)com>
To: Jan Urbański <wulczer(at)wulczer(dot)org>
Cc: psycopg(at)postgresql(dot)org
Subject: Re: psycopg2 (async) socket timeout
Date: 2011-02-14 18:59:41
Message-ID: AANLkTi=J=c9ZD6L5LYEyif=DGtDq__Lw0g2n2+u3a9nn@mail.gmail.com (view raw or flat)
Thread:
Lists: psycopg
Hi,

2011/2/9 Jan Urbański <wulczer(at)wulczer(dot)org>:
> ----- Original message -----
> I'll try to reproduce this problem, AIUI you should have the Deferred errback if the connection is lost, but perhaps it takes some time for Twisted to detect it (actually it takes time for the kernel to detect it). You might try playing with your TCP keepalive settings.

I'm trying. No luck so far...

http://twistedmatrix.com/trac/wiki/FrequentlyAskedQuestions says "If
you rely on TCP timeouts, expect as much as two hours (the precise
amount is platform specific) to pass between when the disruption
occurs and when connectionLost is called". Oops.

Hmm, even when I connect, then just down the network interface and
only after that call runQuery, it is also never calling back anything
(well, I didn't wait more than half an hour per try so far).

But good point, although does this even work for async sockets? -
where you are not reading actively, that is, nobody knows you want to
receive any data? If that worked, that would be the nicest fix. For
the not-so-nice fix, read on :-)

I've now started to do it the way Daniele and you suggested ("just
close it from the client"), so I modified the Connection to start a
timer which will fire if I don't defuse it early enough (and modified
ConnectionPool to check connections periodically and reconnect).

After I receive a response, I defuse the timer. If not, the timer
callback will be run. It will call the errback - which will call
connection.close().

As far as noticing the "disconnect" (well, potential disconnect) goes,
this works perfectly.
However, doing a connection.close() then doesn't seem to help much,
still investigating why... getting the following:

	  File "/usr/lib/python2.6/site-packages/twisted/internet/selectreactor.py",
line 104, in doSelect
	    [], timeout)
	exceptions.ValueError: file descriptor cannot be a negative integer (-1)

So it seems the FD of the closed connection to postgres is still in
the Twisted reactor?
Seems I am missing some calls to self.reactor.removeReader or -Writer,
maybe. Do those belong in Connection.close() ?

If I try to reconnect periodically, can I use the same txpostgres
Connection instance and just call connect() again?

> Another option is implementing a timeout with a callLater. The problem there is that it requires additional code

Yeah, did that now, see the end of this post for the code...

Since I'm trying to make the pg client more resilient against our
flaky network, what I want to do is just close the socket to the
server on timeout. What I don't want to do it send a cancellation
request over the broken socket to the server saying that the
connection is broken and he please cancel the query :-)
(I hope the PostgreSQL server will notice soon enough when the client
doesn't answer)

> and txpostgres does not support query cancellation (yet, it's on the roadmap).

Yeah, but when you say "cancellation" do you mean "sending a -
cancellation - request to the server via the non-working connection"?
:)

Well, I'll be reading a bit more of the twisted reactor code now, I guess :-)

Cheers,
    Danny

The file "postgresto.py" which is all that extra code needed for
client-side impatience follows. For "Connection" it's mostly your code
with a few minimal changes: some connectTimeouter() calls sprinkled
in. For "ConnectionPool" it's a periodically-checking health checker
and reconnecter too now - please note that this is work in progress
and in no way stable yet:

#!/usr/bin/env python2

# Postgres with timeouts.

import sys
from txpostgres import txpostgres
from twisted.internet import interfaces, reactor, defer
from twisted.python import log

class Connection(txpostgres.Connection):
    def connectTimeouter(self, d, timeout):
        """ connect a timeouter to a deferred """
        delayedCall = reactor.callLater(timeout, self.handleTimeout, d)
        d.addBoth(self.cancelTimeout, delayedCall)
        return(d)
    def _runQuery(self, *args, **kwargs):
        c = self.cursor()
        timeout = kwargs.get("timeout") or 10
        d = c.execute(*args, **kwargs)
        return self.connectTimeouter(d, timeout).addCallback(lambda c:
c.fetchall())
    def _runOperation(self, *args, **kwargs):
        c = self.cursor()
        timeout = kwargs.get("timeout") or 10
        d = c.execute(*args, **kwargs)
        return self.connectTimeouter(d, timeout).addCallback(lambda _: None)
    def _runInteraction(self, interaction, *args, **kwargs):
        c = self.cursor()
        timeout = kwargs.get("timeout") or 10
        d = c.execute("begin")
        # we assume that the interaction does something on the
database here, so if the interaction times out, take it as a database
timeout!
        self.connectTimeouter(d, timeout).addCallback(lambda arg:
self.connectTimeouter(defer.maybeDeferred(interaction(arg, *args,
**kwargs))))
        # FIXME also timeout the interaction itself.
        def commitAndPassthrough(ret, cursor):
            e = cursor.execute("commit")
            return e.addCallback(lambda _: ret)
        def rollbackAndPassthrough(f, cursor):
            # maybeDeferred in case cursor.execute raises a
synchronous exception
            e = defer.maybeDeferred(cursor.execute, "rollback")
            def just_panic(rf):
                log.err(rf)
                return defer.fail(RollbackFailed(self, f))
            # if rollback failed, panic
            e.addErrback(just_panic)
            # reraise the original failure afterwards
            return e.addCallback(lambda _: f)
        #self.connectTimeouter(d, timeout)
        d.addCallback(commitAndPassthrough, c)
        d.addErrback(rollbackAndPassthrough, c)
        return d
    def handleTimeout(self, d):
        """ handles the timeout since we DID time out """
        log.err("timed out")
        self.close() # close the connection (maybe it was a connection
problem...)
    def cancelTimeout(self, arg, delayedCall):
        """ cancels the timeout since we DID NOT time out """
        #print >>sys.stderr, "not timed out, OK"
        if delayedCall.active():
            delayedCall.cancel()
        return(arg)
    def isConnected(self):
        return self.pollable() is not None

class ConnectionPool(txpostgres.ConnectionPool):
    connectionFactory = Connection
    """def connect(self, *args, **kwargs):
        result = txpostgres.ConnectionPool.connect(self, *args, **kwargs)
        return(result)
    """
    def __init__(self, *args, **kwargs):
        txpostgres.ConnectionPool.__init__(self, *args, **kwargs)
        self.reconnectionInterval = 10 # sec
        reactor.callLater(self.reconnectionInterval, self.reconnectIfNeeded)
        self.connectionAttempts = set()
    def reconnectIfNeeded(self):
        for connection in self.connections:
            if not connection.isConnected() and connection not in
self.connectionAttempts: # TODO don't try that too often...
                log.msg("database connection was lost, trying again")
                self.connectionAttempts.add(connection)
                d = connection.connect(*self.connargs, **self.connkw)
                delayedCall =
reactor.callLater(self.reconnectionInterval, self.handleTimeout, d,
connection)
                d.addBoth(self.cancelTimeout, delayedCall, connection)
        reactor.callLater(self.reconnectionInterval, self.reconnectIfNeeded)
    def handleTimeout(self, d, connection):
        """ handles the timeout since we DID time out """
        log.err("reconnect timed out")
        try:
            self.connectionAttempts.remove(connection)
        except KeyError:
            pass
        d.errback()
    def cancelTimeout(self, arg, delayedCall, connection):
        """ cancels the timeout since we DID NOT time out """
        print >>sys.stderr, "reconnect DID NOT time out, OK"
        # does not neccessarily mean that it worked.
        if delayedCall.active():
            delayedCall.cancel()
        try:
            self.connectionAttempts.remove(connection)
        except KeyError:
            pass
        return(arg)

In response to

Responses

psycopg by date

Next:From: Jan UrbańskiDate: 2011-02-14 19:16:06
Subject: Re: psycopg2 (async) socket timeout
Previous:From: LandrevilleDate: 2011-02-14 16:40:16
Subject: Failed to build 2.4-beta2

Privacy Policy | About PostgreSQL
Copyright © 1996-2014 The PostgreSQL Global Development Group