Re: LISTEN, select/poll question/clarification

From: Israel Brewster <israel(at)ravnalaska(dot)net>
To: Daniele Varrazzo <daniele(dot)varrazzo(at)gmail(dot)com>
Cc: "psycopg(at)postgresql(dot)org" <psycopg(at)postgresql(dot)org>
Subject: Re: LISTEN, select/poll question/clarification
Date: 2017-12-05 19:39:08
Message-ID: 3FBF1AFC-920F-40F8-AF65-975EE21642C9@ravnalaska.net
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: psycopg

On Dec 5, 2017, at 10:25 AM, Daniele Varrazzo <daniele(dot)varrazzo(at)gmail(dot)com> wrote:
>
> On Tue, Dec 5, 2017 at 6:31 PM, Israel Brewster <israel(at)ravnalaska(dot)net <mailto:israel(at)ravnalaska(dot)net>> wrote:
>> I am using the PostgreSQL PubSub feature with psycopg2 and gevent in the
>> following manner (dbconn is, of course, a psycopg2 connection object on
>> which LISTEN has been called):
>>
>> while True:
>> if gevent.select.select([dbconn], [], []) != ([], [], []):
>> dbconn.poll()
>> gevent.spawn(process_result,dbconn.notifies)
>
>>
>>
>> Which works fine. Now my understanding of how this all works is that when
>> dbconn.poll() is called, it should pull in *all* pending NOTIFYs and append
>> them to the dbconn.notifies object. So lets say I have NOTIFYs coming in at
>> a rate of 1 per second. I would think that if I added a "gevent.sleep(5)" to
>> the end of the above while loop, then each time through I should have 5
>> notifies in the dbconn.notifies list, since it has been 5 seconds since I
>> last checked. However, that doesn't appear to be the case - rather, no mater
>> how long a sleep I put in, I still only get *one* item in the notifies list,
>> making me think that I am missing data.
>
> Uhm, from the top of my very rusty familiarity with gevent, I think
> that gevent.select() is woken up as soon as the notify is received,
> and that exactly because you have gevent.sleep(5) that is saying "I
> don't have anything else to do for 5 seconds, so take a look if there
> is any other fd that needs attention".

Hmmm, I'm not so sure about that. I would think that select would only wake up if the thread was actually on the select, that is it would block at select (while allowing other greenlets to run) until the NOTIFY was received, then move on. Once moved on, like when it gets to the sleep(), I wouldn't think it even know about the select any more. More to the point, I would think the sleep would pause the *entire* greenlet, in this case the while True loop (or, rather, the function containing said loop, but since that is the entire function, same diff) for the specified time before wakeup. I could certainly be wrong though, I'm no gevent expert :-)

>
> while True:
> if gevent.select.select([dbconn], [], []) != ([], [], []):
> dbconn.poll()
> gevent.spawn(process_result,dbconn.notifies)
>>
>> Can someone explain why this is? Why am I not getting 5 at a time under that
>> scenario?
>>
>> In case someone was wondering, the reasoning behind adding the sleep() lies
>> in that process_result function. Due to the looping it contains, it can
>> process, say, 5 data points in one call much more efficiently than it can
>> process those same 5 data points in 5 calls of 1 data point each. So rather
>> than run it every time a data point comes in, I would like to let the data
>> "pile up" as it were for a short period of time before processing. I was
>> thinking I could easily accomplish this by simply waiting some appropriate
>> period of time between poll() ing the database for new NOTIFYs, but
>> apparently that's not working. The select() is there because the data isn't
>> coming in at a regular speed like in my example, but rather it could come in
>> much faster, or not at all for a while, depending on the time of day.
>
> At the most simple (and still very IIRC), you can just ignore select
> and poll every 5 seconds, and you should receive the batches.
>
> while True:
> gevent.sleep(5)
> dbconn.poll()
> if dbconn.notifies:
> gevent.spawn(process_result,dbconn.notifies)

Conceptually sound, however the actual numbers I'm dealing with are somewhat faster (the sleep is more like .5 seconds, I just used these to make an easy example), so I like the idea of it not doing anything if no data is received (at night, for example, there will be little or no data for extended periods).

>
> A bit more complex, you can do something at application level, such as
> pushing your notifies in a queue as fast as they are received by
> select(), and on the other side of the queue having a consumer stuck
> on get(): as soon as it receives an object, the consumer can sleep for
> 5 seconds, then get_nowait() until the queue is empty to gather
> everything received while it was napping, and call process_result()
> with the whole set.

That seems to fit the bill nicely. Kinda seems like the same idea as what I was going for, but with a local queue rather than the "remote" select, so more predictable/controllable.

Thanks!

>
>
> -- Daniele

In response to

Browse psycopg by date

  From Date Subject
Next Message Daniele Varrazzo 2018-01-13 21:50:16 Solving the problems with wheel packages
Previous Message Daniele Varrazzo 2017-12-05 19:25:57 Re: LISTEN, select/poll question/clarification