While awaiting the awesomeness of the upcoming "skip locked" feature in 9.5 I need to handle a work queue.
Does anyone see any glaring issues or subtle nuances with the basic method below which combines CTEs with queue-handling methods posted by depesz, on the PG wiki and elsewhere.
Note that it appears that there is the slight potential for a race-condition which would cause one worker to occasionally fail to get a record but the application code handles that issue fine.
The work is sent to an externally hosted API which will ultimately reply to a callback API at our end so obviously there's a lot of other stuff in the system to update final results, recover from lost work, add to the queue, etc. I'm just asking about the sanity of the queue processing query itself:
with next_up as (
select
the_id
from
queuetest
where
not sent_for_processing
and pg_try_advisory_xact_lock(12345, the_id)
order by
the_priority
limit 1 for update)
update
queuetest
set
sent_for_processing = true
where
the_id = (select the_id from next_up)
returning
the_work_to_do;
This will only be sane if the inner query can use an index to do the "order by". Otherwise it is going to read every row in order to sort them, and get the advisory lock on every row, and you will run out of shared memory. Of course, if it were sorting each time it would probably be too slow anyway.
And it has to be a partial index:
(the_priority) where not sent_for_processing
Because if you just have an index on the_priority, the sub select will start getting inefficient once all the lowest numbered priority items are marked as sent.
Also, you probably want to make sent_for_processing be some kind of token or time stamp, to make it easier to detect lost work. In which case NULL would mean not yet sent, so the partial index would be "where sent_for_processing is null".