Thread: Processing a work queue

Processing a work queue

From
Steve Crawford
Date:
Anyone have any ideas on how to handle a work queue? I've been thinking
about optimizing this process for quite a while.

Basically, my queue table consists of a few-hundred-thousand records
describing "things to do". To pare things to the minimum, a queue record
can be considered to have a status (available, assigned, completed), a
priority, and a description-of-work.

A process will grab an item from the queue, mark it as in-progress,
process it, and, depending on success, update the item as completed or
as available with an updated priority. There may be upwards of a
thousand "worker" processes and the work of each process may be
completed in anywhere from a few seconds to nearly an hour. I expect the
system as a whole to be handling a few-dozen queue items per second.

My original plan to fetch work was:

begin;

select item-id, item-info
   from the-queue
   where available
   order by priority
   limit 1
   for update;

update the-queue
  set status = 'assigned'
  where item-id = previously-selected-item-id;

commit;

This does not produce desirable results. In the case where requests for
work overlap, the first query will complete. The second query will block
until the first completes and then apparently re-evaluate the condition
and toss the record thus returning zero-rows.

Plan 1a:

Check for tuples returned and re-run query if zero. This will go into an
infinite loop whenever there is nothing in the queue and cause
undesirable thrashing if there is too much contention.

Plan 2:

Lock the table, run the query/update, unlock the table. Functions fine
but work halts when any operation interferes with obtaining the
table-level lock;

Plan 3:

Same as plan 1 but use a higher limit, say 100, then just choose and
update the first tuple. The second query will block till the first
completes, and then return 99 records. If limit is set to the number of
workers, every request should return some work to be done, if any is
available. It's a kludge, but does anyone see any significant drawbacks?

Plan 4:

Add an intermediary "dispatcher" with which the workers will communicate
via SOAP/XML-RPC/? But if dispatcher is allowed to run multiple
processes we are back to needing to resolving database query issues.

Plan 5:

I could, potentially, reverse everything and have the workers announce
availability and wait for the dispatcher to send work. Fixes the
database issue but creates some others.

So from the standpoint of the database query part, anyone have any
ideas/suggestions on how to handle a work queue?

Cheers,
Steve

Re: Processing a work queue

From
"Alexander Staubo"
Date:
On 4/27/07, Steve Crawford <scrawford@pinpointresearch.com> wrote:
> Anyone have any ideas on how to handle a work queue? I've been thinking
> about optimizing this process for quite a while.

I have been using PostgreSQL for the exact same thing, except I have
not yet reached the stage where I need to process queue items in
parallel. :) Anyway, this question has been covered several times, and
I believe this post by Tom Lane delineates the canonical recipe:

  http://archives.postgresql.org/pgsql-general/2003-05/msg00351.php

Alexander.

Re: Processing a work queue

From
"Merlin Moncure"
Date:
On 4/27/07, Steve Crawford <scrawford@pinpointresearch.com> wrote:
> Anyone have any ideas on how to handle a work queue? I've been thinking
> about optimizing this process for quite a while.
>
> Basically, my queue table consists of a few-hundred-thousand records
> describing "things to do". To pare things to the minimum, a queue record
> can be considered to have a status (available, assigned, completed), a
> priority, and a description-of-work.
>
> A process will grab an item from the queue, mark it as in-progress,
> process it, and, depending on success, update the item as completed or
> as available with an updated priority. There may be upwards of a
> thousand "worker" processes and the work of each process may be
> completed in anywhere from a few seconds to nearly an hour. I expect the
> system as a whole to be handling a few-dozen queue items per second.
>
> My original plan to fetch work was:
>
> begin;
>
> select item-id, item-info
>    from the-queue
>    where available
>    order by priority
>    limit 1
>    for update;
>
> update the-queue
>   set status = 'assigned'
>   where item-id = previously-selected-item-id;
>
> commit;


how about this:
create table job(job_id int, [...])
create sequence worker;

your worker threads can do something like:
select * from job join
(
  select nextval('worker') as requested_job
) on job_id = requested_job
and
(
  (select (w.last_value, w.is_called) < (j.last_value, j.is_called)
from worker w, job_id_seq j)
)

and then sleep appropriately if there is nothing to do.  Of course, if
the job fails you have to put it back on the queue.  No locking
required!  This relies on false being < true...safer to break out to a
case stmt but im just trying to be clever :-)

This has couple of advantages but is also pretty fragile.  I'm not
necessarily suggesting it but it was a fun way to think about the
problem.

merlin

Re: Processing a work queue

From
"Merlin Moncure"
Date:
On 4/27/07, Merlin Moncure <mmoncure@gmail.com> wrote:
> how about this:
> create table job(job_id int, [...])
> create sequence worker;
>

couple typos: here is an example that works:
create table job(job_id serial);
create sequence worker;

-- get next available job
create function next_job() returns job as
$$
select job from job join
(
 select nextval('worker') as requested_job
) q on job_id = requested_job
and
(
 (select (w.last_value, w.is_called) < (j.last_value, j.is_called)
from worker w, job_job_id_seq j)
);
$$ language sql;

select next_job();

again, remembering that sequences are not rolled back on transaction
failure, you have to think really carefully about failure conditions
before going with something like this.

merlin

Re: Processing a work queue

From
"John D. Burger"
Date:
Steve Crawford wrote:

> Anyone have any ideas on how to handle a work queue? I've been
> thinking
> about optimizing this process for quite a while.

I use a variant of The Tom Lane Solution previously pointed to, your
Plan 1 is very similar.

> This does not produce desirable results. In the case where requests
> for
> work overlap, the first query will complete. The second query will
> block
> until the first completes and then apparently re-evaluate the
> condition
> and toss the record thus returning zero-rows.

I have no experience with this, but I think you can do SELECT FOR
UPDATE NOWAIT to avoid the blocking.

> Plan 1a:
>
> Check for tuples returned and re-run query if zero. This will go
> into an
> infinite loop whenever there is nothing in the queue and cause
> undesirable thrashing if there is too much contention.

So either sleep a bit, as in Tom's solution, or use NOTIFY/LISTEN,
which is what I do.  I have a trigger like this on my queue:

create or replace function notify_new_work() returns trigger as
'
BEGIN
NOTIFY WORK;
RETURN NULL;
END;
' language 'plpgsql';

create trigger notify_new_work
        after insert on work_queue
        for each statement execute procedure notify_new_work();

My workers do LISTEN WORK after connecting, and then do a (UNIX)
select on the connection socket when they get zero results from the
(SQL) select.  This puts them to sleep until the next NOTIFY fires.
How to get the socket and do the (UNIX) select will depend on your
client library and language.

- John Burger
   MITRE

Re: Processing a work queue

From
"John D. Burger"
Date:
I wrote:

> I use a variant of The Tom Lane Solution previously pointed to,
> your Plan 1 is very similar.

Hmm, per that pointed-to post:

   http://archives.postgresql.org/pgsql-general/2003-05/msg00351.php

I decided to run a periodic vacuum on my work queue.  Lo and behold,
I get this:

   ERROR:  tuple concurrently updated

In addition, all of my workers locked up, apparently indefinitely.  I
presume this was because I was foolishly doing VACUUM FULL, which
locks the table.  But what exactly was going on?  Why did my workers
hang?  Thanks in advance, I am new to all this concurrency stuff ...

- John Burger
   MITRE

Re: Processing a work queue

From
Tom Lane
Date:
"John D. Burger" <john@mitre.org> writes:
> I decided to run a periodic vacuum on my work queue.  Lo and behold,
> I get this:

>    ERROR:  tuple concurrently updated

Which PG version is this, and do you have autovacuum enabled?  Awhile
back it was possible to get this error if autovac and a manual vacuum
hit the same table concurrently (the error actually stems from trying
to update pg_statistic concurrently during the ANALYZE phase).

> In addition, all of my workers locked up, apparently indefinitely.

[ squint... ]  That shouldn't happen.  Look into pg_locks to see if
you can determine who's waiting for what.

            regards, tom lane

Re: Processing a work queue

From
"John D. Burger"
Date:
>> I decided to run a periodic vacuum on my work queue.  Lo and behold,
>> I get this:
>
>>    ERROR:  tuple concurrently updated
>
> Which PG version is this, and do you have autovacuum enabled?

7.4.8, so no autovac, right?

>> In addition, all of my workers locked up, apparently indefinitely.
>
> [ squint... ]  That shouldn't happen.  Look into pg_locks to see if
> you can determine who's waiting for what.

I don't want to recreate the problem right now, but I will
investigate later.  For what it's worth, while the workers were
locked up, I couldn't query the table in psql either.

- John D. Burger
   MITRE



Re: Processing a work queue

From
Chris Browne
Date:
john@mitre.org ("John D. Burger") writes:
> I wrote:
>
>> I use a variant of The Tom Lane Solution previously pointed to,
>> your Plan 1 is very similar.
>
> Hmm, per that pointed-to post:
>
>   http://archives.postgresql.org/pgsql-general/2003-05/msg00351.php
>
> I decided to run a periodic vacuum on my work queue.  Lo and behold,
> I get this:
>
>   ERROR:  tuple concurrently updated
>
> In addition, all of my workers locked up, apparently indefinitely.  I
> presume this was because I was foolishly doing VACUUM FULL, which
> locks the table.  But what exactly was going on?  Why did my workers
> hang?  Thanks in advance, I am new to all this concurrency stuff ...

The error would be the result of two concurrent attempts to ANALYZE
the table; ANALYZE updates pg_catalog.pg_statistic, and so two
concurrent ANALYZEs on the same table will try to update some of the
same tuples.

That's quite separate from anything surrounding VACUUM FULL...
The phenomena that you can expect are thus:

- VACUUM FULL needs exclusive access to the table it is working on, so
  it will have to wait for completion of any accesses by other
  connections that are in progress.

- As soon as you request VACUUM FULL, any other connections that
  request access to the table will be blocked until the VACUUM FULL
  completes.
--
select 'cbbrowne' || '@' || 'linuxdatabases.info';
http://cbbrowne.com/info/lsf.html
Never criticize anybody until  you have walked  a mile in their shoes,
because by that time you will be a mile away and have their shoes.
-- email sig, Brian Servis

Re: Processing a work queue

From
Lexington Luthor
Date:
Steve Crawford wrote:
>
> begin;
>
> select item-id, item-info
>    from the-queue
>    where available
>    order by priority
>    limit 1
>    for update;
>
> update the-queue
>   set status = 'assigned'
>   where item-id = previously-selected-item-id;
>
> commit;
>

I do something similar in one of my apps:

BEGIN;

update the-queue
   set status = 'assigned'
   where available
   order by priority
   limit 1
   returning item-id, item-info;

COMMIT;

This should be safer and faster.

Regards,
LL

Re: Processing a work queue

From
Andrew - Supernews
Date:
On 2007-04-26, Steve Crawford <scrawford@pinpointresearch.com> wrote:
> Anyone have any ideas on how to handle a work queue?

Advisory locks (userlocks in pre-8.2).

--
Andrew, Supernews
http://www.supernews.com - individual and corporate NNTP services

Re: Processing a work queue

From
"John D. Burger"
Date:
Andrew - Supernews wrote:

>> Anyone have any ideas on how to handle a work queue?
>
> Advisory locks (userlocks in pre-8.2).

Can someone explain why these are a better fit than whatever locks
SELECT FOR UPDATE acquires?

Thanks.


- John D. Burger
   MITRE



Re: Processing a work queue

From
"Merlin Moncure"
Date:
On 4/30/07, John D. Burger <john@mitre.org> wrote:
> Andrew - Supernews wrote:
>
> >> Anyone have any ideas on how to handle a work queue?
> >
> > Advisory locks (userlocks in pre-8.2).
>
> Can someone explain why these are a better fit than whatever locks
> SELECT FOR UPDATE acquires?

They are fast, and can lock for sub-transaction durations, which is
sometimes useful when dealing with these types of problems.

merlin

Re: Processing a work queue

From
"Merlin Moncure"
Date:
On 4/30/07, John D. Burger <john@mitre.org> wrote:
> Can someone explain why [advisory locks] are a better fit than whatever locks
> SELECT FOR UPDATE acquires?

ok, here's an example.  I was thinking that my sequence idea might not
be safe because of race conditions revolving around querying the
sequence table.  Here is how I might use advisory locks eliminate the
race condition:

create table job (job_id serial primary key);
create sequence worker;

-- get next job
select
  pg_advisory_lock(1),
  (
    case
      when (select last_value from worker) < (select last_value from
job_job_id_seq)
      then (select job from job where job_id = (select nextval('worker')))
      else null::job
    end
  ) as job,
  pg_advisory_unlock(1);

couple notes here:
* this may not actually safe, just fooling around
* does not account for is_called
* assumes left to right evaluation of expressions (dangerous?)

Here we are using advisory lock guard around the check
sequence/evaluate sequence step.  The idea is to prevent the race of
somebody incrementing worker after we looked at it last.

Advisory locks can hold locks for sub-transaction duration or even (as
in this example) sub-query duration.  This query can be dropped into a
much larger transaction without ruining concurrency...any standard
type of lock can't be released like that.

merlin

Re: Processing a work queue

From
Alban Hertroys
Date:
Merlin Moncure wrote:
> ok, here's an example.  I was thinking that my sequence idea might not
> be safe because of race conditions revolving around querying the
> sequence table.  Here is how I might use advisory locks eliminate the

I've seen your name pop up regularly on this list (or are you from
freebsd-stable?), so you kind of got me scratching my head whether you
really don't understand sequences. Kind of hard to imagine... Maybe I
don't understand what you're asking.

Sequences are safe in concurrent use.
* Nextval() always returns a new number, so no two concurrent sessions
can get the same one.
* Currval() is only valid within one session after calling nextval(), so
it's number cannot have been modified by another session.

Why do you expect to need locking?

--
Alban Hertroys
alban@magproductions.nl

magproductions b.v.

T: ++31(0)534346874
F: ++31(0)534346876
M:
I: www.magproductions.nl
A: Postbus 416
   7500 AK Enschede

// Integrate Your World //

Re: Processing a work queue

From
"Merlin Moncure"
Date:
On 5/1/07, Alban Hertroys <alban@magproductions.nl> wrote:
> Merlin Moncure wrote:
> > ok, here's an example.  I was thinking that my sequence idea might not
> > be safe because of race conditions revolving around querying the
> > sequence table.  Here is how I might use advisory locks eliminate the
>
> I've seen your name pop up regularly on this list (or are you from
> freebsd-stable?), so you kind of got me scratching my head whether you
> really don't understand sequences. Kind of hard to imagine... Maybe I
> don't understand what you're asking.

been posting here for years :-)

> Sequences are safe in concurrent use.
> * Nextval() always returns a new number, so no two concurrent sessions
> can get the same one.
> * Currval() is only valid within one session after calling nextval(), so
> it's number cannot have been modified by another session.
>
> Why do you expect to need locking?

take another look at my example.  there are two things happening that
have to be logically combined into one operation. one is checking the
last_value column of two sequences and the other is the nextval().
the advisory lock protects against this:

session a: worker last_value < job last_value..true!
session b: worker last_value < job last_value..true!
session a: increments worker
session b: increments worker

this will cause a job to get skipped.  My first go at this example
didn't have the locks in there and I was thinking I introduced a race
(i'm almost sure of it),

the advisory lock serializes those two operations, but only those two
operations.  sequences are naturally safe from point of view of
generating unique number if used with nextval(), setval(), etc. but
not necessarily
select last_value from s;

from point of view of comparing that value with something else and
incrementing it if and only if that condition is true in a consistent
context.

merlin

Re: Processing a work queue

From
Alban Hertroys
Date:
Merlin Moncure wrote:
> take another look at my example.  there are two things happening that
> have to be logically combined into one operation. one is checking the
> last_value column of two sequences and the other is the nextval().
> the advisory lock protects against this:
>
> session a: worker last_value < job last_value..true!
> session b: worker last_value < job last_value..true!
> session a: increments worker
> session b: increments worker

Hmm, now I think I see what your problem is.

You're accessing the sequence from different sessions, but you need the
last value that was used in another session, right? That's why you query
last_value.

You're basically fighting against the assumption that different sessions
must have different values, which sequences are based on.

> this will cause a job to get skipped.  My first go at this example
> didn't have the locks in there and I was thinking I introduced a race
> (i'm almost sure of it),

Yes, probably. Session a will need to prevent session b from running
jobs until it's done. Or maybe not entirely...

I suppose it knows beforehand when it will be done, based on the data it
sees at the start of the session - it may be able to tell other sessions
where to start. Let it reserve the current queue for its own use, so to
speak.

That basically moves the locking problem around, but it seems better
than to lock until it's done - other sessions can do work in the mean time.

Whether that improves your performance pretty much depends on the queue
sizes, the frequency of processing and the load processing causes...
Heh, I just realize we have a work queue table here as well, I'd like to
know the result.

> generating unique number if used with nextval(), setval(), etc. but
> not necessarily
> select last_value from s;
>
> from point of view of comparing that value with something else and
> incrementing it if and only if that condition is true in a consistent
> context.

Indeed, that's one of the weaknesses of last_val. With your example I
now understand why it even exists, I always thought it an odd and
dangerous feature.

--
Alban Hertroys
alban@magproductions.nl

magproductions b.v.

T: ++31(0)534346874
F: ++31(0)534346876
M:
I: www.magproductions.nl
A: Postbus 416
   7500 AK Enschede

// Integrate Your World //

Re: Processing a work queue

From
"Merlin Moncure"
Date:
On 5/1/07, Alban Hertroys <alban@magproductions.nl> wrote:
> Whether that improves your performance pretty much depends on the queue
> sizes, the frequency of processing and the load processing causes...
> Heh, I just realize we have a work queue table here as well, I'd like to
> know the result.

the optimization here is that we are not updating job.status to flag
which jobs are done or not.  this is extremely high concurrency
solution but the advantage is moot if the jobs not extremely short.
you can insert into a completed jobs table but you don't have to check
that to get the next undone job.  it might be impractical to do it
this way for real jobs...

thinking about it some more, the only way to skip a job would if the
race condition happened at or near the end of the queue.

> > generating unique number if used with nextval(), setval(), etc. but
> > not necessarily
> > select last_value from s;
> >
> > from point of view of comparing that value with something else and
> > incrementing it if and only if that condition is true in a consistent
> > context.
>
> Indeed, that's one of the weaknesses of last_val. With your example I
> now understand why it even exists, I always thought it an odd and
> dangerous feature.

small clarification here...i'm not acually looking at the lastval
function (get the last returned value by sequence in this session) but
the 'last_value' column of the sequences (get the last value returned
to any session for this sequence.  a cooperative lock is the only way
to prevent other sessions from jerking the sequence around via the
'non-mvcc' sequence functions.

merlin

Re: Processing a work queue

From
Andrew - Supernews
Date:
On 2007-04-30, "John D. Burger" <john@mitre.org> wrote:
> Andrew - Supernews wrote:
>
>>> Anyone have any ideas on how to handle a work queue?
>>
>> Advisory locks (userlocks in pre-8.2).
>
> Can someone explain why these are a better fit than whatever locks
> SELECT FOR UPDATE acquires?

They can be acquired without blocking, and they are non-transactional (and
can therefore be held for long periods of time, while you work on the item -
this allows you to either detect cases where a queue runner died before
completing an item, or (where appropriate) automatically release such items
back into the queue).

The nonblocking bit avoids the need for a backoff-and-retry in the case
when two queue runners both try and fetch from the queue at the same time -
using advisory locks they both get a (different) item, rather than one
getting a serialization failure.

--
Andrew, Supernews
http://www.supernews.com - individual and corporate NNTP services