Thread: Multithreaded queue in PgSQL

Multithreaded queue in PgSQL

From
Nikola Milutinovic
Date:
Hi all.

This may be trivial, but I cannot find good references for it. The problem is this:

Suppose we have one table in PgSQL which is a job queue, each row represents one job with several status flags, IDs,... Several processes will attempt to access the queue and "take" their batch of jobs, the batch will have some parameterizable size. So, the simple idea is "select N lowest IDs that do not have a flag <in process> set and set the flag", "then proceed with whatever it is that should be done".

Trouble is, with MVCC I don't see a way to prevent overlapping and race conditions. Oh, sure, if I issue select for update, it will lock rows, but, if I understand correctly, the change may not be instantaneous and atomic, so I might get transaction to roll back and then there is error handling that will lead to the uglies serialization I can think of. Let me clarify this, so somebody can tell me if I got it wrong.

Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine 2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do that?

P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN (
    SELECT id FROM job_queue WHERE status="new" and id IN (  
        SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT 10 FOR UPDATE)
    )
)
P2: the same
P1: SELECT * FROM job_queue WHERE process_id=$1 ....
P2: SELECT * FROM job_queue WHERE process_id=$1 ....

The reason for the 2 selects is that if 2 or more processes content for the same set of jobs, the first one will set the status. The second will, after P1 has released the rows get those rows, that are already taken. Of course, this will most likely return 0 rows for P2, since all 10 will be taken. If I leave out the LIMIT 10 in the inner select, I am effectively locking the entire table. Is that the way to go?

LOCK TABLE job_queue EXCLUSIVE;
UPDATE ...
UNLOCK TABLE job_queue;

Nix.

Re: Multithreaded queue in PgSQL

From
Dimitri Fontaine
Date:
Le mardi 10 juin 2008, Nikola Milutinovic a écrit :
> Suppose we have one table in PgSQL which is a job queue, each row
> represents one job with several status flags, IDs,... Several processes
> will attempt to access the queue and "take" their batch of jobs, the batch
> will have some parameterizable size. So, the simple idea is "select N
> lowest IDs that do not have a flag <in process> set and set the flag",
> "then proceed with whatever it is that should be done".

You may find that the PGQ component of skytools is what you want:
  http://pgfoundry.org/projects/skytools
  http://skytools.projects.postgresql.org/doc/
  http://skytools.projects.postgresql.org/doc/pgq-sql.html

And there's even a nice and recent presentation paper about it:
  http://www.pgcon.org/2008/schedule/events/79.en.html

And a less recent but still useful blog entry on the subject:

http://kaiv.wordpress.com/2007/10/19/skytools-database-scripting-framework-pgq/

Regards,
--
dim

Attachment

Re: Multithreaded queue in PgSQL

From
Nikola Milutinovic
Date:
> You may find that the PGQ component of skytools is what you want:
http://pgfoundry.org/projects/skytools
http://skytools.projects.postgresql.org/doc/
http://skytools.projects.postgresql.org/doc/pgq-sql.html
>

Thanks, we will look into it. Still, I am surprised to learn that SQL as such cannot handle it. I do realize that the question is not trivial. Would setting transaction isolation level to SERIALIZABLE help in any way? Would locking of the entire table help in any way?

Nix.

Re: Multithreaded queue in PgSQL

From
laser
Date:
Nikola Milutinovic wrote:
> Hi all.
>
> This may be trivial, but I cannot find good references for it. The
> problem is this:
>
> Suppose we have one table in PgSQL which is a job queue, each row
> represents one job with several status flags, IDs,... Several
> processes will attempt to access the queue and "take" their batch of
> jobs, the batch will have some parameterizable size. So, the simple
> idea is "select N lowest IDs that do not have a flag <in process> set
> and set the flag", "then proceed with whatever it is that should be done".
>
> Trouble is, with MVCC I don't see a way to prevent overlapping and
> race conditions. Oh, sure, if I issue select for update, it will lock
> rows, but, if I understand correctly, the change may not be
> instantaneous and atomic, so I might get transaction to roll back and
> then there is error handling that will lead to the uglies
> serialization I can think of. Let me clarify this, so somebody can
> tell me if I got it wrong.
>
> Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine
> 2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do
> that?
>
> P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN (
>     SELECT id FROM job_queue WHERE status="new" and id IN (
>         SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT
> 10 FOR UPDATE)
>     )
> )
> P2: the same
> P1: SELECT * FROM job_queue WHERE process_id=$1 ....
> P2: SELECT * FROM job_queue WHERE process_id=$1 ....
>
> The reason for the 2 selects is that if 2 or more processes content
> for the same set of jobs, the first one will set the status. The
> second will, after P1 has released the rows get those rows, that are
> already taken. Of course, this will most likely return 0 rows for P2,
> since all 10 will be taken. If I leave out the LIMIT 10 in the inner
> select, I am effectively locking the entire table. Is that the way to go?
>
> LOCK TABLE job_queue EXCLUSIVE;
> UPDATE ...
> UNLOCK TABLE job_queue;
>
>
we recently use an almost pure SQL method to solve it somehow, but
simply apply a hash
algorithm to make "partition" inside a table to avoid conflit for
different "process_id",
the method is we build a simple xor() function to calculate XOR value of
job_queue table's ID,
like:

create or replace function xor255(int) returns int as $$
select (($1>>24)&255)#(($1>>16)&255)#(($1>>8)&255)#($1&255) ;
$$language sql immutable;

then create a functional index like:

create index idx_job_queue_xor on job_queue using btree (xor255(id));
--assume your id column is primary key of type integer/serial

then we use query like:

select url from crawljob where xor255(id) >=N and xor255(id)<M and
status = 'blahblah' order by blahblah;

to fetch jobs, here the number N and M is some math with your proces_id,
then we could sure no conflict
for different process_id.

This method simply use Xoring a integer to obtain a value between 0 and
255, thus we
could partition the whole id set into 255 part without conflict with
each other thus avoid
the race etc. problem with concurrently 256 consumer process at most,
and we surly could
change it into adapt for more process, depend on you need.

wish that helps.

-laser



Re: Multithreaded queue in PgSQL

From
"Scott Marlowe"
Date:
On Tue, Jun 10, 2008 at 4:40 AM, Nikola Milutinovic <alokin1@yahoo.com> wrote:
> Hi all.
>
> This may be trivial, but I cannot find good references for it. The problem
> is this:
>
> Suppose we have one table in PgSQL which is a job queue, each row represents
> one job with several status flags, IDs,... Several processes will attempt to
> access the queue and "take" their batch of jobs, the batch will have some
> parameterizable size. So, the simple idea is "select N lowest IDs that do
> not have a flag <in process> set and set the flag", "then proceed with
> whatever it is that should be done".

Do the jobs HAVE to be assigned sequentially in groups?  I.e. is it
only allowable to assign jobs 1-10 to one thread, then 11 through 18
to the next?  Or could you assign 1,2,3,6,7,8,9,10 to one, and
4,5,11,12... to another?

If so, assign them with a separate sequence.

Re: Multithreaded queue in PgSQL

From
valgog
Date:
On Jun 10, 1:58 pm, alok...@yahoo.com (Nikola Milutinovic) wrote:
> > You may find that the PGQ component of skytools is what you want:
> >  http://pgfoundry.org/projects/skytools
> >  http://skytools.projects.postgresql.org/doc/
> >  http://skytools.projects.postgresql.org/doc/pgq-sql.html
>
> Thanks, we will look into it. Still, I am surprised to learn that SQL as such cannot handle it. I do realize that the
questionis not trivial. Would setting transaction isolation level to SERIALIZABLE help in any way? Would locking of the
entiretable help in any way? 
>
> Nix.

The easiest solution that I have found by now, is to use advisory
lock, so that every thread is waiting until the other has released the
lock.

-- Valentine

Re: Multithreaded queue in PgSQL

From
Stevo Slavić
Date:
Hello all,

I've initially brought the question to Nix, so I'll try to clarify situation.

Whole point is to have multiple services accessing same table and dividing the work, so locking with waiting for lock to be released is out of question.

I want to deploy same Java Spring Web application to multiple servers all present in same environment accessing same database. Thus, on each server there will be same business services running. One of them is quartz scheduling service which sends newsletters to newsletter recipients. Hibernate is used for persistence, with Postgres as RDBMS. It is not important for this discussion how newsletters are being created but they end up in the database in following tables:

newsletter (newsletter_id, newsletter_content, newsletter_status, newsletter_status_date)
newsletter_recipient (newsletter_id, newsletter_recipient_email, newsletter_recipient_status, newsletter_recipient_status_date)


newsletter and newsletter_recipient stand in one-to-many relationship with newsletter_recipient.newsletter_id being a FK to newsletter.newsletter_id. PK of each relation is underlined.

Idea is that each service checks for newsletter recipients which either have status NOT_SENT, or have status PROCESSING but newsletter_recipient_status_date is more than 1h old (indicating that newsletter has been tried to be sent, but service didn't complete sending or at least didn't complete updating status to SENT, so we need to retry). Each service should take, e.g. up to 10 such rows, and lock them, so other services see these rows as locked and they shouldn't wait for rows to become unlocked, but should try getting next 10 rows, all until either such batch has been acquired or there are no more such rows.

I'm trying to make implementation more generic, not to use Postgres specific SQL, and through Hibernate and Spring configuration make services acquire lock on batch of rows, when trying to acquire lock on batch of rows an exception should be thrown if rows are already locked by a different service, and through that exception I intend to signal to other services that they should try to handle and acquire lock on next batch of rows. Will see how that goes.

Regards,
Stevo.

valgog wrote:
On Jun 10, 1:58 pm, alok...@yahoo.com (Nikola Milutinovic) wrote: 
You may find that the PGQ component of skytools is what you want:
 http://pgfoundry.org/projects/skytools
 http://skytools.projects.postgresql.org/doc/
 http://skytools.projects.postgresql.org/doc/pgq-sql.html     
Thanks, we will look into it. Still, I am surprised to learn that SQL as such cannot handle it. I do realize that the question is not trivial. Would setting transaction isolation level to SERIALIZABLE help in any way? Would locking of the entire table help in any way?

Nix.   
The easiest solution that I have found by now, is to use advisory
lock, so that every thread is waiting until the other has released the
lock.

-- Valentine
 

Re: Multithreaded queue in PgSQL

From
Csaba Nagy
Date:
We also have such a queue here, and our solution is an algorithm like
this:
 1. get the next processor_count * 2 queue ids which are not marked as
taken;
 2. choose randomly one of these ids;
 3. lock for update with nowait;
 4. if locking succeeds:
     4.1. check again the item, as it could have been processed in the
meantime - if not available, go to 5.;
     4.2. update the DB row to mark the id as taken, and process the
item;
 5. there are more ids to try: loop to 2.
 6. sleep a small random interval, and loop to 1.

This algorithm should have small enough collision rate on a busy queue
due to the random chosen ids and random sleep (it will have high
collision rate on an almost empty queue, but than you don't really
care), while still allowing all processors to access all entries.

Cheers,
Csaba.



Re: Multithreaded queue in PgSQL

From
Klint Gore
Date:
Stevo Slavić wrote:
> I'm trying to make implementation more generic, not to use Postgres
> specific SQL, and through Hibernate and Spring configuration make
> services acquire lock on batch of rows, when trying to acquire lock on
> batch of rows an exception should be thrown if rows are already locked
> by a different service, and through that exception I intend to signal
> to other services that they should try to handle and acquire lock on
> next batch of rows. Will see how that goes.
It's postgres specific, but a serializable transaction and
update/returning fits with how you want it to act.

begin transaction isolation level serializable;
update newsletter_recipients
   set ....
where (...) in (select ... from newsletter_recipients where not_sent or
crashed limit 10)
returning *;
commit;

The update marks the rows as processing. The returning gives the
selected ones back to the application without having to issue a select
and an update.  The serializable transaction throws an error in other
threads that try to claim the same rows.  You could add an offset to the
limit to try to select different rows.

klint.

--
Klint Gore
Database Manager
Sheep CRC
A.G.B.U.
University of New England
Armidale NSW 2350

Ph: 02 6773 3789
Fax: 02 6773 3266
EMail: kgore4@une.edu.au


Re: Multithreaded queue in PgSQL

From
valgog
Date:
> Whole point is to have multiple services accessing same table and
> dividing the work, so locking with waiting for lock to be released is
> out of question.
>

We are doing the same (newsletter) and there is no problem to lock the
whole table for a short time with an advisory lock as the java id
fetching worker is locking the table (that does not lock the table for
reading or writing, it is only locking his java worker brothers that
are using the same advisory lock), fetches, let's say, 50 id's of
records marked as CREATED and changes their status to PROCESSING. Then
several workers are getting the id's and fetch the needed data from
the table independently and process and update them in parallel. We
have 3 java machines getting id's for 10 parallel workers and
everything works just fine.

Getting the IDs is much much faster usually then real processing.

Re: Multithreaded queue in PgSQL

From
"Jeff Peck"
Date:
> We are doing the same (newsletter) and there is no problem to lock the
> whole table for a short time with an advisory lock as the java id
> fetching worker is locking the table (that does not lock the table for
> reading or writing, it is only locking his java worker brothers that
> are using the same advisory lock), fetches, let's say, 50 id's of
> records marked as CREATED and changes their status to PROCESSING. Then
> several workers are getting the id's and fetch the needed data from
> the table independently and process and update them in parallel. We
> have 3 java machines getting id's for 10 parallel workers and
> everything works just fine.
>
> Getting the IDs is much much faster usually then real processing.


Weird this thread came back up today. I actually implemented a small
example this morning in Python using Klint Gore's suggestion and was
very happy with the results. I have attached 4 files for anyone
interested in looking at a really simple example. You'll need
Python+psycopg2 driver for this, but anyone should be able to follow
this easily:
    db.py      - Edit this file with your db settings.
    client.py - This would be your worker process
    insert.py - This is used to enter some jobs into the jobs table.
    reset.py  - This is used to reset each job back to 'REQUESTED' status.

This example assumes that you have a jobs table like below:
create table jobs(id serial primary key, status text not null)

First edit db.py with your own db settings.
Fire up as many copies of client.py as you'd like.
Now enter some jobs into jobs table. Running insert.py will enter 100
jobs for you.
Now watch as your clients process the jobs.

Once all of the jobs have finished processing, you can run reset.py to
mark all of the jobs back to 'REQUESTED' status so that the clients
start processing all over again.

I hope it is OK to attach examples! Just seems like this question
comes up often.


Jeff Peck

Attachment