Thread: Processing a work queue
Anyone have any ideas on how to handle a work queue? I've been thinking about optimizing this process for quite a while. Basically, my queue table consists of a few-hundred-thousand records describing "things to do". To pare things to the minimum, a queue record can be considered to have a status (available, assigned, completed), a priority, and a description-of-work. A process will grab an item from the queue, mark it as in-progress, process it, and, depending on success, update the item as completed or as available with an updated priority. There may be upwards of a thousand "worker" processes and the work of each process may be completed in anywhere from a few seconds to nearly an hour. I expect the system as a whole to be handling a few-dozen queue items per second. My original plan to fetch work was: begin; select item-id, item-info from the-queue where available order by priority limit 1 for update; update the-queue set status = 'assigned' where item-id = previously-selected-item-id; commit; This does not produce desirable results. In the case where requests for work overlap, the first query will complete. The second query will block until the first completes and then apparently re-evaluate the condition and toss the record thus returning zero-rows. Plan 1a: Check for tuples returned and re-run query if zero. This will go into an infinite loop whenever there is nothing in the queue and cause undesirable thrashing if there is too much contention. Plan 2: Lock the table, run the query/update, unlock the table. Functions fine but work halts when any operation interferes with obtaining the table-level lock; Plan 3: Same as plan 1 but use a higher limit, say 100, then just choose and update the first tuple. The second query will block till the first completes, and then return 99 records. If limit is set to the number of workers, every request should return some work to be done, if any is available. It's a kludge, but does anyone see any significant drawbacks? Plan 4: Add an intermediary "dispatcher" with which the workers will communicate via SOAP/XML-RPC/? But if dispatcher is allowed to run multiple processes we are back to needing to resolving database query issues. Plan 5: I could, potentially, reverse everything and have the workers announce availability and wait for the dispatcher to send work. Fixes the database issue but creates some others. So from the standpoint of the database query part, anyone have any ideas/suggestions on how to handle a work queue? Cheers, Steve
On 4/27/07, Steve Crawford <scrawford@pinpointresearch.com> wrote: > Anyone have any ideas on how to handle a work queue? I've been thinking > about optimizing this process for quite a while. I have been using PostgreSQL for the exact same thing, except I have not yet reached the stage where I need to process queue items in parallel. :) Anyway, this question has been covered several times, and I believe this post by Tom Lane delineates the canonical recipe: http://archives.postgresql.org/pgsql-general/2003-05/msg00351.php Alexander.
On 4/27/07, Steve Crawford <scrawford@pinpointresearch.com> wrote: > Anyone have any ideas on how to handle a work queue? I've been thinking > about optimizing this process for quite a while. > > Basically, my queue table consists of a few-hundred-thousand records > describing "things to do". To pare things to the minimum, a queue record > can be considered to have a status (available, assigned, completed), a > priority, and a description-of-work. > > A process will grab an item from the queue, mark it as in-progress, > process it, and, depending on success, update the item as completed or > as available with an updated priority. There may be upwards of a > thousand "worker" processes and the work of each process may be > completed in anywhere from a few seconds to nearly an hour. I expect the > system as a whole to be handling a few-dozen queue items per second. > > My original plan to fetch work was: > > begin; > > select item-id, item-info > from the-queue > where available > order by priority > limit 1 > for update; > > update the-queue > set status = 'assigned' > where item-id = previously-selected-item-id; > > commit; how about this: create table job(job_id int, [...]) create sequence worker; your worker threads can do something like: select * from job join ( select nextval('worker') as requested_job ) on job_id = requested_job and ( (select (w.last_value, w.is_called) < (j.last_value, j.is_called) from worker w, job_id_seq j) ) and then sleep appropriately if there is nothing to do. Of course, if the job fails you have to put it back on the queue. No locking required! This relies on false being < true...safer to break out to a case stmt but im just trying to be clever :-) This has couple of advantages but is also pretty fragile. I'm not necessarily suggesting it but it was a fun way to think about the problem. merlin
On 4/27/07, Merlin Moncure <mmoncure@gmail.com> wrote: > how about this: > create table job(job_id int, [...]) > create sequence worker; > couple typos: here is an example that works: create table job(job_id serial); create sequence worker; -- get next available job create function next_job() returns job as $$ select job from job join ( select nextval('worker') as requested_job ) q on job_id = requested_job and ( (select (w.last_value, w.is_called) < (j.last_value, j.is_called) from worker w, job_job_id_seq j) ); $$ language sql; select next_job(); again, remembering that sequences are not rolled back on transaction failure, you have to think really carefully about failure conditions before going with something like this. merlin
Steve Crawford wrote: > Anyone have any ideas on how to handle a work queue? I've been > thinking > about optimizing this process for quite a while. I use a variant of The Tom Lane Solution previously pointed to, your Plan 1 is very similar. > This does not produce desirable results. In the case where requests > for > work overlap, the first query will complete. The second query will > block > until the first completes and then apparently re-evaluate the > condition > and toss the record thus returning zero-rows. I have no experience with this, but I think you can do SELECT FOR UPDATE NOWAIT to avoid the blocking. > Plan 1a: > > Check for tuples returned and re-run query if zero. This will go > into an > infinite loop whenever there is nothing in the queue and cause > undesirable thrashing if there is too much contention. So either sleep a bit, as in Tom's solution, or use NOTIFY/LISTEN, which is what I do. I have a trigger like this on my queue: create or replace function notify_new_work() returns trigger as ' BEGIN NOTIFY WORK; RETURN NULL; END; ' language 'plpgsql'; create trigger notify_new_work after insert on work_queue for each statement execute procedure notify_new_work(); My workers do LISTEN WORK after connecting, and then do a (UNIX) select on the connection socket when they get zero results from the (SQL) select. This puts them to sleep until the next NOTIFY fires. How to get the socket and do the (UNIX) select will depend on your client library and language. - John Burger MITRE
I wrote: > I use a variant of The Tom Lane Solution previously pointed to, > your Plan 1 is very similar. Hmm, per that pointed-to post: http://archives.postgresql.org/pgsql-general/2003-05/msg00351.php I decided to run a periodic vacuum on my work queue. Lo and behold, I get this: ERROR: tuple concurrently updated In addition, all of my workers locked up, apparently indefinitely. I presume this was because I was foolishly doing VACUUM FULL, which locks the table. But what exactly was going on? Why did my workers hang? Thanks in advance, I am new to all this concurrency stuff ... - John Burger MITRE
"John D. Burger" <john@mitre.org> writes: > I decided to run a periodic vacuum on my work queue. Lo and behold, > I get this: > ERROR: tuple concurrently updated Which PG version is this, and do you have autovacuum enabled? Awhile back it was possible to get this error if autovac and a manual vacuum hit the same table concurrently (the error actually stems from trying to update pg_statistic concurrently during the ANALYZE phase). > In addition, all of my workers locked up, apparently indefinitely. [ squint... ] That shouldn't happen. Look into pg_locks to see if you can determine who's waiting for what. regards, tom lane
>> I decided to run a periodic vacuum on my work queue. Lo and behold, >> I get this: > >> ERROR: tuple concurrently updated > > Which PG version is this, and do you have autovacuum enabled? 7.4.8, so no autovac, right? >> In addition, all of my workers locked up, apparently indefinitely. > > [ squint... ] That shouldn't happen. Look into pg_locks to see if > you can determine who's waiting for what. I don't want to recreate the problem right now, but I will investigate later. For what it's worth, while the workers were locked up, I couldn't query the table in psql either. - John D. Burger MITRE
john@mitre.org ("John D. Burger") writes: > I wrote: > >> I use a variant of The Tom Lane Solution previously pointed to, >> your Plan 1 is very similar. > > Hmm, per that pointed-to post: > > http://archives.postgresql.org/pgsql-general/2003-05/msg00351.php > > I decided to run a periodic vacuum on my work queue. Lo and behold, > I get this: > > ERROR: tuple concurrently updated > > In addition, all of my workers locked up, apparently indefinitely. I > presume this was because I was foolishly doing VACUUM FULL, which > locks the table. But what exactly was going on? Why did my workers > hang? Thanks in advance, I am new to all this concurrency stuff ... The error would be the result of two concurrent attempts to ANALYZE the table; ANALYZE updates pg_catalog.pg_statistic, and so two concurrent ANALYZEs on the same table will try to update some of the same tuples. That's quite separate from anything surrounding VACUUM FULL... The phenomena that you can expect are thus: - VACUUM FULL needs exclusive access to the table it is working on, so it will have to wait for completion of any accesses by other connections that are in progress. - As soon as you request VACUUM FULL, any other connections that request access to the table will be blocked until the VACUUM FULL completes. -- select 'cbbrowne' || '@' || 'linuxdatabases.info'; http://cbbrowne.com/info/lsf.html Never criticize anybody until you have walked a mile in their shoes, because by that time you will be a mile away and have their shoes. -- email sig, Brian Servis
Steve Crawford wrote: > > begin; > > select item-id, item-info > from the-queue > where available > order by priority > limit 1 > for update; > > update the-queue > set status = 'assigned' > where item-id = previously-selected-item-id; > > commit; > I do something similar in one of my apps: BEGIN; update the-queue set status = 'assigned' where available order by priority limit 1 returning item-id, item-info; COMMIT; This should be safer and faster. Regards, LL
On 2007-04-26, Steve Crawford <scrawford@pinpointresearch.com> wrote: > Anyone have any ideas on how to handle a work queue? Advisory locks (userlocks in pre-8.2). -- Andrew, Supernews http://www.supernews.com - individual and corporate NNTP services
Andrew - Supernews wrote: >> Anyone have any ideas on how to handle a work queue? > > Advisory locks (userlocks in pre-8.2). Can someone explain why these are a better fit than whatever locks SELECT FOR UPDATE acquires? Thanks. - John D. Burger MITRE
On 4/30/07, John D. Burger <john@mitre.org> wrote: > Andrew - Supernews wrote: > > >> Anyone have any ideas on how to handle a work queue? > > > > Advisory locks (userlocks in pre-8.2). > > Can someone explain why these are a better fit than whatever locks > SELECT FOR UPDATE acquires? They are fast, and can lock for sub-transaction durations, which is sometimes useful when dealing with these types of problems. merlin
On 4/30/07, John D. Burger <john@mitre.org> wrote: > Can someone explain why [advisory locks] are a better fit than whatever locks > SELECT FOR UPDATE acquires? ok, here's an example. I was thinking that my sequence idea might not be safe because of race conditions revolving around querying the sequence table. Here is how I might use advisory locks eliminate the race condition: create table job (job_id serial primary key); create sequence worker; -- get next job select pg_advisory_lock(1), ( case when (select last_value from worker) < (select last_value from job_job_id_seq) then (select job from job where job_id = (select nextval('worker'))) else null::job end ) as job, pg_advisory_unlock(1); couple notes here: * this may not actually safe, just fooling around * does not account for is_called * assumes left to right evaluation of expressions (dangerous?) Here we are using advisory lock guard around the check sequence/evaluate sequence step. The idea is to prevent the race of somebody incrementing worker after we looked at it last. Advisory locks can hold locks for sub-transaction duration or even (as in this example) sub-query duration. This query can be dropped into a much larger transaction without ruining concurrency...any standard type of lock can't be released like that. merlin
Merlin Moncure wrote: > ok, here's an example. I was thinking that my sequence idea might not > be safe because of race conditions revolving around querying the > sequence table. Here is how I might use advisory locks eliminate the I've seen your name pop up regularly on this list (or are you from freebsd-stable?), so you kind of got me scratching my head whether you really don't understand sequences. Kind of hard to imagine... Maybe I don't understand what you're asking. Sequences are safe in concurrent use. * Nextval() always returns a new number, so no two concurrent sessions can get the same one. * Currval() is only valid within one session after calling nextval(), so it's number cannot have been modified by another session. Why do you expect to need locking? -- Alban Hertroys alban@magproductions.nl magproductions b.v. T: ++31(0)534346874 F: ++31(0)534346876 M: I: www.magproductions.nl A: Postbus 416 7500 AK Enschede // Integrate Your World //
On 5/1/07, Alban Hertroys <alban@magproductions.nl> wrote: > Merlin Moncure wrote: > > ok, here's an example. I was thinking that my sequence idea might not > > be safe because of race conditions revolving around querying the > > sequence table. Here is how I might use advisory locks eliminate the > > I've seen your name pop up regularly on this list (or are you from > freebsd-stable?), so you kind of got me scratching my head whether you > really don't understand sequences. Kind of hard to imagine... Maybe I > don't understand what you're asking. been posting here for years :-) > Sequences are safe in concurrent use. > * Nextval() always returns a new number, so no two concurrent sessions > can get the same one. > * Currval() is only valid within one session after calling nextval(), so > it's number cannot have been modified by another session. > > Why do you expect to need locking? take another look at my example. there are two things happening that have to be logically combined into one operation. one is checking the last_value column of two sequences and the other is the nextval(). the advisory lock protects against this: session a: worker last_value < job last_value..true! session b: worker last_value < job last_value..true! session a: increments worker session b: increments worker this will cause a job to get skipped. My first go at this example didn't have the locks in there and I was thinking I introduced a race (i'm almost sure of it), the advisory lock serializes those two operations, but only those two operations. sequences are naturally safe from point of view of generating unique number if used with nextval(), setval(), etc. but not necessarily select last_value from s; from point of view of comparing that value with something else and incrementing it if and only if that condition is true in a consistent context. merlin
Merlin Moncure wrote: > take another look at my example. there are two things happening that > have to be logically combined into one operation. one is checking the > last_value column of two sequences and the other is the nextval(). > the advisory lock protects against this: > > session a: worker last_value < job last_value..true! > session b: worker last_value < job last_value..true! > session a: increments worker > session b: increments worker Hmm, now I think I see what your problem is. You're accessing the sequence from different sessions, but you need the last value that was used in another session, right? That's why you query last_value. You're basically fighting against the assumption that different sessions must have different values, which sequences are based on. > this will cause a job to get skipped. My first go at this example > didn't have the locks in there and I was thinking I introduced a race > (i'm almost sure of it), Yes, probably. Session a will need to prevent session b from running jobs until it's done. Or maybe not entirely... I suppose it knows beforehand when it will be done, based on the data it sees at the start of the session - it may be able to tell other sessions where to start. Let it reserve the current queue for its own use, so to speak. That basically moves the locking problem around, but it seems better than to lock until it's done - other sessions can do work in the mean time. Whether that improves your performance pretty much depends on the queue sizes, the frequency of processing and the load processing causes... Heh, I just realize we have a work queue table here as well, I'd like to know the result. > generating unique number if used with nextval(), setval(), etc. but > not necessarily > select last_value from s; > > from point of view of comparing that value with something else and > incrementing it if and only if that condition is true in a consistent > context. Indeed, that's one of the weaknesses of last_val. With your example I now understand why it even exists, I always thought it an odd and dangerous feature. -- Alban Hertroys alban@magproductions.nl magproductions b.v. T: ++31(0)534346874 F: ++31(0)534346876 M: I: www.magproductions.nl A: Postbus 416 7500 AK Enschede // Integrate Your World //
On 5/1/07, Alban Hertroys <alban@magproductions.nl> wrote: > Whether that improves your performance pretty much depends on the queue > sizes, the frequency of processing and the load processing causes... > Heh, I just realize we have a work queue table here as well, I'd like to > know the result. the optimization here is that we are not updating job.status to flag which jobs are done or not. this is extremely high concurrency solution but the advantage is moot if the jobs not extremely short. you can insert into a completed jobs table but you don't have to check that to get the next undone job. it might be impractical to do it this way for real jobs... thinking about it some more, the only way to skip a job would if the race condition happened at or near the end of the queue. > > generating unique number if used with nextval(), setval(), etc. but > > not necessarily > > select last_value from s; > > > > from point of view of comparing that value with something else and > > incrementing it if and only if that condition is true in a consistent > > context. > > Indeed, that's one of the weaknesses of last_val. With your example I > now understand why it even exists, I always thought it an odd and > dangerous feature. small clarification here...i'm not acually looking at the lastval function (get the last returned value by sequence in this session) but the 'last_value' column of the sequences (get the last value returned to any session for this sequence. a cooperative lock is the only way to prevent other sessions from jerking the sequence around via the 'non-mvcc' sequence functions. merlin
On 2007-04-30, "John D. Burger" <john@mitre.org> wrote: > Andrew - Supernews wrote: > >>> Anyone have any ideas on how to handle a work queue? >> >> Advisory locks (userlocks in pre-8.2). > > Can someone explain why these are a better fit than whatever locks > SELECT FOR UPDATE acquires? They can be acquired without blocking, and they are non-transactional (and can therefore be held for long periods of time, while you work on the item - this allows you to either detect cases where a queue runner died before completing an item, or (where appropriate) automatically release such items back into the queue). The nonblocking bit avoids the need for a backoff-and-retry in the case when two queue runners both try and fetch from the queue at the same time - using advisory locks they both get a (different) item, rather than one getting a serialization failure. -- Andrew, Supernews http://www.supernews.com - individual and corporate NNTP services