Re: Multithreaded queue in PgSQL

From: laser <laserlist(at)pgsqldb(dot)com>
To: PostgreSQL general <pgsql-general(at)postgresql(dot)org>
Subject: Re: Multithreaded queue in PgSQL
Date: 2008-06-10 14:19:12
Message-ID: 484E8D60.7010401@pgsqldb.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general

Nikola Milutinovic wrote:
> Hi all.
>
> This may be trivial, but I cannot find good references for it. The
> problem is this:
>
> Suppose we have one table in PgSQL which is a job queue, each row
> represents one job with several status flags, IDs,... Several
> processes will attempt to access the queue and "take" their batch of
> jobs, the batch will have some parameterizable size. So, the simple
> idea is "select N lowest IDs that do not have a flag <in process> set
> and set the flag", "then proceed with whatever it is that should be done".
>
> Trouble is, with MVCC I don't see a way to prevent overlapping and
> race conditions. Oh, sure, if I issue select for update, it will lock
> rows, but, if I understand correctly, the change may not be
> instantaneous and atomic, so I might get transaction to roll back and
> then there is error handling that will lead to the uglies
> serialization I can think of. Let me clarify this, so somebody can
> tell me if I got it wrong.
>
> Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine
> 2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do
> that?
>
> P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN (
> SELECT id FROM job_queue WHERE status="new" and id IN (
> SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT
> 10 FOR UPDATE)
> )
> )
> P2: the same
> P1: SELECT * FROM job_queue WHERE process_id=$1 ....
> P2: SELECT * FROM job_queue WHERE process_id=$1 ....
>
> The reason for the 2 selects is that if 2 or more processes content
> for the same set of jobs, the first one will set the status. The
> second will, after P1 has released the rows get those rows, that are
> already taken. Of course, this will most likely return 0 rows for P2,
> since all 10 will be taken. If I leave out the LIMIT 10 in the inner
> select, I am effectively locking the entire table. Is that the way to go?
>
> LOCK TABLE job_queue EXCLUSIVE;
> UPDATE ...
> UNLOCK TABLE job_queue;
>
>
we recently use an almost pure SQL method to solve it somehow, but
simply apply a hash
algorithm to make "partition" inside a table to avoid conflit for
different "process_id",
the method is we build a simple xor() function to calculate XOR value of
job_queue table's ID,
like:

create or replace function xor255(int) returns int as $$
select (($1>>24)&255)#(($1>>16)&255)#(($1>>8)&255)#($1&255) ;
$$language sql immutable;

then create a functional index like:

create index idx_job_queue_xor on job_queue using btree (xor255(id));
--assume your id column is primary key of type integer/serial

then we use query like:

select url from crawljob where xor255(id) >=N and xor255(id)<M and
status = 'blahblah' order by blahblah;

to fetch jobs, here the number N and M is some math with your proces_id,
then we could sure no conflict
for different process_id.

This method simply use Xoring a integer to obtain a value between 0 and
255, thus we
could partition the whole id set into 255 part without conflict with
each other thus avoid
the race etc. problem with concurrently 256 consumer process at most,
and we surly could
change it into adapt for more process, depend on you need.

wish that helps.

-laser

In response to

Browse pgsql-general by date

  From Date Subject
Next Message Richard Broersma 2008-06-10 14:24:13 Strange ODBC error & statement in Server log
Previous Message Adrian Klaver 2008-06-10 14:16:43 Re: Re: Accessing other databases with DBLink when leaving user/password empty