Thread: Multithreaded queue in PgSQL
Hi all.
This may be trivial, but I cannot find good references for it. The problem is this:
Suppose we have one table in PgSQL which is a job queue, each row represents one job with several status flags, IDs,... Several processes will attempt to access the queue and "take" their batch of jobs, the batch will have some parameterizable size. So, the simple idea is "select N lowest IDs that do not have a flag <in process> set and set the flag", "then proceed with whatever it is that should be done".
Trouble is, with MVCC I don't see a way to prevent overlapping and race conditions. Oh, sure, if I issue select for update, it will lock rows, but, if I understand correctly, the change may not be instantaneous and atomic, so I might get transaction to roll back and then there is error handling that will lead to the uglies serialization I can think of. Let me clarify this, so somebody can tell me if I got it wrong.
Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine 2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do that?
P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN (
SELECT id FROM job_queue WHERE status="new" and id IN (
SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT 10 FOR UPDATE)
)
)
P2: the same
P1: SELECT * FROM job_queue WHERE process_id=$1 ....
P2: SELECT * FROM job_queue WHERE process_id=$1 ....
The reason for the 2 selects is that if 2 or more processes content for the same set of jobs, the first one will set the status. The second will, after P1 has released the rows get those rows, that are already taken. Of course, this will most likely return 0 rows for P2, since all 10 will be taken. If I leave out the LIMIT 10 in the inner select, I am effectively locking the entire table. Is that the way to go?
LOCK TABLE job_queue EXCLUSIVE;
UPDATE ...
UNLOCK TABLE job_queue;
Nix.
This may be trivial, but I cannot find good references for it. The problem is this:
Suppose we have one table in PgSQL which is a job queue, each row represents one job with several status flags, IDs,... Several processes will attempt to access the queue and "take" their batch of jobs, the batch will have some parameterizable size. So, the simple idea is "select N lowest IDs that do not have a flag <in process> set and set the flag", "then proceed with whatever it is that should be done".
Trouble is, with MVCC I don't see a way to prevent overlapping and race conditions. Oh, sure, if I issue select for update, it will lock rows, but, if I understand correctly, the change may not be instantaneous and atomic, so I might get transaction to roll back and then there is error handling that will lead to the uglies serialization I can think of. Let me clarify this, so somebody can tell me if I got it wrong.
Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine 2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do that?
P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN (
SELECT id FROM job_queue WHERE status="new" and id IN (
SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT 10 FOR UPDATE)
)
)
P2: the same
P1: SELECT * FROM job_queue WHERE process_id=$1 ....
P2: SELECT * FROM job_queue WHERE process_id=$1 ....
The reason for the 2 selects is that if 2 or more processes content for the same set of jobs, the first one will set the status. The second will, after P1 has released the rows get those rows, that are already taken. Of course, this will most likely return 0 rows for P2, since all 10 will be taken. If I leave out the LIMIT 10 in the inner select, I am effectively locking the entire table. Is that the way to go?
LOCK TABLE job_queue EXCLUSIVE;
UPDATE ...
UNLOCK TABLE job_queue;
Nix.
Le mardi 10 juin 2008, Nikola Milutinovic a écrit : > Suppose we have one table in PgSQL which is a job queue, each row > represents one job with several status flags, IDs,... Several processes > will attempt to access the queue and "take" their batch of jobs, the batch > will have some parameterizable size. So, the simple idea is "select N > lowest IDs that do not have a flag <in process> set and set the flag", > "then proceed with whatever it is that should be done". You may find that the PGQ component of skytools is what you want: http://pgfoundry.org/projects/skytools http://skytools.projects.postgresql.org/doc/ http://skytools.projects.postgresql.org/doc/pgq-sql.html And there's even a nice and recent presentation paper about it: http://www.pgcon.org/2008/schedule/events/79.en.html And a less recent but still useful blog entry on the subject: http://kaiv.wordpress.com/2007/10/19/skytools-database-scripting-framework-pgq/ Regards, -- dim
Attachment
> You may find that the PGQ component of skytools is what you want:
> http://pgfoundry.org/projects/skytools
> http://skytools.projects.postgresql.org/doc/
> http://skytools.projects.postgresql.org/doc/pgq-sql.html
>
Thanks, we will look into it. Still, I am surprised to learn that SQL as such cannot handle it. I do realize that the question is not trivial. Would setting transaction isolation level to SERIALIZABLE help in any way? Would locking of the entire table help in any way?
Nix.
> http://pgfoundry.org/projects/skytools
> http://skytools.projects.postgresql.org/doc/
> http://skytools.projects.postgresql.org/doc/pgq-sql.html
>
Thanks, we will look into it. Still, I am surprised to learn that SQL as such cannot handle it. I do realize that the question is not trivial. Would setting transaction isolation level to SERIALIZABLE help in any way? Would locking of the entire table help in any way?
Nix.
Nikola Milutinovic wrote: > Hi all. > > This may be trivial, but I cannot find good references for it. The > problem is this: > > Suppose we have one table in PgSQL which is a job queue, each row > represents one job with several status flags, IDs,... Several > processes will attempt to access the queue and "take" their batch of > jobs, the batch will have some parameterizable size. So, the simple > idea is "select N lowest IDs that do not have a flag <in process> set > and set the flag", "then proceed with whatever it is that should be done". > > Trouble is, with MVCC I don't see a way to prevent overlapping and > race conditions. Oh, sure, if I issue select for update, it will lock > rows, but, if I understand correctly, the change may not be > instantaneous and atomic, so I might get transaction to roll back and > then there is error handling that will lead to the uglies > serialization I can think of. Let me clarify this, so somebody can > tell me if I got it wrong. > > Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine > 2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do > that? > > P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN ( > SELECT id FROM job_queue WHERE status="new" and id IN ( > SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT > 10 FOR UPDATE) > ) > ) > P2: the same > P1: SELECT * FROM job_queue WHERE process_id=$1 .... > P2: SELECT * FROM job_queue WHERE process_id=$1 .... > > The reason for the 2 selects is that if 2 or more processes content > for the same set of jobs, the first one will set the status. The > second will, after P1 has released the rows get those rows, that are > already taken. Of course, this will most likely return 0 rows for P2, > since all 10 will be taken. If I leave out the LIMIT 10 in the inner > select, I am effectively locking the entire table. Is that the way to go? > > LOCK TABLE job_queue EXCLUSIVE; > UPDATE ... > UNLOCK TABLE job_queue; > > we recently use an almost pure SQL method to solve it somehow, but simply apply a hash algorithm to make "partition" inside a table to avoid conflit for different "process_id", the method is we build a simple xor() function to calculate XOR value of job_queue table's ID, like: create or replace function xor255(int) returns int as $$ select (($1>>24)&255)#(($1>>16)&255)#(($1>>8)&255)#($1&255) ; $$language sql immutable; then create a functional index like: create index idx_job_queue_xor on job_queue using btree (xor255(id)); --assume your id column is primary key of type integer/serial then we use query like: select url from crawljob where xor255(id) >=N and xor255(id)<M and status = 'blahblah' order by blahblah; to fetch jobs, here the number N and M is some math with your proces_id, then we could sure no conflict for different process_id. This method simply use Xoring a integer to obtain a value between 0 and 255, thus we could partition the whole id set into 255 part without conflict with each other thus avoid the race etc. problem with concurrently 256 consumer process at most, and we surly could change it into adapt for more process, depend on you need. wish that helps. -laser
On Tue, Jun 10, 2008 at 4:40 AM, Nikola Milutinovic <alokin1@yahoo.com> wrote: > Hi all. > > This may be trivial, but I cannot find good references for it. The problem > is this: > > Suppose we have one table in PgSQL which is a job queue, each row represents > one job with several status flags, IDs,... Several processes will attempt to > access the queue and "take" their batch of jobs, the batch will have some > parameterizable size. So, the simple idea is "select N lowest IDs that do > not have a flag <in process> set and set the flag", "then proceed with > whatever it is that should be done". Do the jobs HAVE to be assigned sequentially in groups? I.e. is it only allowable to assign jobs 1-10 to one thread, then 11 through 18 to the next? Or could you assign 1,2,3,6,7,8,9,10 to one, and 4,5,11,12... to another? If so, assign them with a separate sequence.
On Jun 10, 1:58 pm, alok...@yahoo.com (Nikola Milutinovic) wrote: > > You may find that the PGQ component of skytools is what you want: > > http://pgfoundry.org/projects/skytools > > http://skytools.projects.postgresql.org/doc/ > > http://skytools.projects.postgresql.org/doc/pgq-sql.html > > Thanks, we will look into it. Still, I am surprised to learn that SQL as such cannot handle it. I do realize that the questionis not trivial. Would setting transaction isolation level to SERIALIZABLE help in any way? Would locking of the entiretable help in any way? > > Nix. The easiest solution that I have found by now, is to use advisory lock, so that every thread is waiting until the other has released the lock. -- Valentine
Hello all,
I've initially brought the question to Nix, so I'll try to clarify situation.
Whole point is to have multiple services accessing same table and dividing the work, so locking with waiting for lock to be released is out of question.
I want to deploy same Java Spring Web application to multiple servers all present in same environment accessing same database. Thus, on each server there will be same business services running. One of them is quartz scheduling service which sends newsletters to newsletter recipients. Hibernate is used for persistence, with Postgres as RDBMS. It is not important for this discussion how newsletters are being created but they end up in the database in following tables:
newsletter (newsletter_id, newsletter_content, newsletter_status, newsletter_status_date)
newsletter_recipient (newsletter_id, newsletter_recipient_email, newsletter_recipient_status, newsletter_recipient_status_date)
newsletter and newsletter_recipient stand in one-to-many relationship with newsletter_recipient.newsletter_id being a FK to newsletter.newsletter_id. PK of each relation is underlined.
Idea is that each service checks for newsletter recipients which either have status NOT_SENT, or have status PROCESSING but newsletter_recipient_status_date is more than 1h old (indicating that newsletter has been tried to be sent, but service didn't complete sending or at least didn't complete updating status to SENT, so we need to retry). Each service should take, e.g. up to 10 such rows, and lock them, so other services see these rows as locked and they shouldn't wait for rows to become unlocked, but should try getting next 10 rows, all until either such batch has been acquired or there are no more such rows.
I'm trying to make implementation more generic, not to use Postgres specific SQL, and through Hibernate and Spring configuration make services acquire lock on batch of rows, when trying to acquire lock on batch of rows an exception should be thrown if rows are already locked by a different service, and through that exception I intend to signal to other services that they should try to handle and acquire lock on next batch of rows. Will see how that goes.
Regards,
Stevo.
valgog wrote:
I've initially brought the question to Nix, so I'll try to clarify situation.
Whole point is to have multiple services accessing same table and dividing the work, so locking with waiting for lock to be released is out of question.
I want to deploy same Java Spring Web application to multiple servers all present in same environment accessing same database. Thus, on each server there will be same business services running. One of them is quartz scheduling service which sends newsletters to newsletter recipients. Hibernate is used for persistence, with Postgres as RDBMS. It is not important for this discussion how newsletters are being created but they end up in the database in following tables:
newsletter (newsletter_id, newsletter_content, newsletter_status, newsletter_status_date)
newsletter_recipient (newsletter_id, newsletter_recipient_email, newsletter_recipient_status, newsletter_recipient_status_date)
newsletter and newsletter_recipient stand in one-to-many relationship with newsletter_recipient.newsletter_id being a FK to newsletter.newsletter_id. PK of each relation is underlined.
Idea is that each service checks for newsletter recipients which either have status NOT_SENT, or have status PROCESSING but newsletter_recipient_status_date is more than 1h old (indicating that newsletter has been tried to be sent, but service didn't complete sending or at least didn't complete updating status to SENT, so we need to retry). Each service should take, e.g. up to 10 such rows, and lock them, so other services see these rows as locked and they shouldn't wait for rows to become unlocked, but should try getting next 10 rows, all until either such batch has been acquired or there are no more such rows.
I'm trying to make implementation more generic, not to use Postgres specific SQL, and through Hibernate and Spring configuration make services acquire lock on batch of rows, when trying to acquire lock on batch of rows an exception should be thrown if rows are already locked by a different service, and through that exception I intend to signal to other services that they should try to handle and acquire lock on next batch of rows. Will see how that goes.
Regards,
Stevo.
valgog wrote:
On Jun 10, 1:58 pm, alok...@yahoo.com (Nikola Milutinovic) wrote:You may find that the PGQ component of skytools is what you want: http://pgfoundry.org/projects/skytools http://skytools.projects.postgresql.org/doc/ http://skytools.projects.postgresql.org/doc/pgq-sql.htmlThanks, we will look into it. Still, I am surprised to learn that SQL as such cannot handle it. I do realize that the question is not trivial. Would setting transaction isolation level to SERIALIZABLE help in any way? Would locking of the entire table help in any way? Nix.The easiest solution that I have found by now, is to use advisory lock, so that every thread is waiting until the other has released the lock. -- Valentine
We also have such a queue here, and our solution is an algorithm like this: 1. get the next processor_count * 2 queue ids which are not marked as taken; 2. choose randomly one of these ids; 3. lock for update with nowait; 4. if locking succeeds: 4.1. check again the item, as it could have been processed in the meantime - if not available, go to 5.; 4.2. update the DB row to mark the id as taken, and process the item; 5. there are more ids to try: loop to 2. 6. sleep a small random interval, and loop to 1. This algorithm should have small enough collision rate on a busy queue due to the random chosen ids and random sleep (it will have high collision rate on an almost empty queue, but than you don't really care), while still allowing all processors to access all entries. Cheers, Csaba.
Stevo Slavić wrote: > I'm trying to make implementation more generic, not to use Postgres > specific SQL, and through Hibernate and Spring configuration make > services acquire lock on batch of rows, when trying to acquire lock on > batch of rows an exception should be thrown if rows are already locked > by a different service, and through that exception I intend to signal > to other services that they should try to handle and acquire lock on > next batch of rows. Will see how that goes. It's postgres specific, but a serializable transaction and update/returning fits with how you want it to act. begin transaction isolation level serializable; update newsletter_recipients set .... where (...) in (select ... from newsletter_recipients where not_sent or crashed limit 10) returning *; commit; The update marks the rows as processing. The returning gives the selected ones back to the application without having to issue a select and an update. The serializable transaction throws an error in other threads that try to claim the same rows. You could add an offset to the limit to try to select different rows. klint. -- Klint Gore Database Manager Sheep CRC A.G.B.U. University of New England Armidale NSW 2350 Ph: 02 6773 3789 Fax: 02 6773 3266 EMail: kgore4@une.edu.au
> Whole point is to have multiple services accessing same table and > dividing the work, so locking with waiting for lock to be released is > out of question. > We are doing the same (newsletter) and there is no problem to lock the whole table for a short time with an advisory lock as the java id fetching worker is locking the table (that does not lock the table for reading or writing, it is only locking his java worker brothers that are using the same advisory lock), fetches, let's say, 50 id's of records marked as CREATED and changes their status to PROCESSING. Then several workers are getting the id's and fetch the needed data from the table independently and process and update them in parallel. We have 3 java machines getting id's for 10 parallel workers and everything works just fine. Getting the IDs is much much faster usually then real processing.
> We are doing the same (newsletter) and there is no problem to lock the > whole table for a short time with an advisory lock as the java id > fetching worker is locking the table (that does not lock the table for > reading or writing, it is only locking his java worker brothers that > are using the same advisory lock), fetches, let's say, 50 id's of > records marked as CREATED and changes their status to PROCESSING. Then > several workers are getting the id's and fetch the needed data from > the table independently and process and update them in parallel. We > have 3 java machines getting id's for 10 parallel workers and > everything works just fine. > > Getting the IDs is much much faster usually then real processing. Weird this thread came back up today. I actually implemented a small example this morning in Python using Klint Gore's suggestion and was very happy with the results. I have attached 4 files for anyone interested in looking at a really simple example. You'll need Python+psycopg2 driver for this, but anyone should be able to follow this easily: db.py - Edit this file with your db settings. client.py - This would be your worker process insert.py - This is used to enter some jobs into the jobs table. reset.py - This is used to reset each job back to 'REQUESTED' status. This example assumes that you have a jobs table like below: create table jobs(id serial primary key, status text not null) First edit db.py with your own db settings. Fire up as many copies of client.py as you'd like. Now enter some jobs into jobs table. Running insert.py will enter 100 jobs for you. Now watch as your clients process the jobs. Once all of the jobs have finished processing, you can run reset.py to mark all of the jobs back to 'REQUESTED' status so that the clients start processing all over again. I hope it is OK to attach examples! Just seems like this question comes up often. Jeff Peck