Re: Implementing queue semantics (novice) - Mailing list pgsql-sql
From | KÖPFERL Robert |
---|---|
Subject | Re: Implementing queue semantics (novice) |
Date | |
Msg-id | ED4E30DD9C43D5118DFB00508BBBA76EB16541@neptun.sonorys.at Whole thread Raw |
In response to | Implementing queue semantics (novice) (KÖPFERL Robert <robert.koepferl@sonorys.at>) |
List | pgsql-sql |
Thank you for this rather detailed example. I already learned something and omitted a fault. There should be enogh to implement such a Queue. :-) > -----Original Message----- > From: Andrew Hammond [mailto:ahammond@ca.afilias.info] > Sent: Mittwoch, 12. Jänner 2005 17:19 > To: KÖPFERL Robert > Cc: 'pgsql-sql@postgresql.org' > Subject: Re: [SQL] Implementing queue semantics (novice) > > > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > The name for what you're looking to build is a concurrent batch > processing system. Here's a basic one. > > - -- adding processes > > BEGIN; > > INSERT INTO queue (queue_id, processing_pid, processing_start, > ~ processing_status, foreign_id) > VALUES (DEFAULT, NULL, NULL, > ~ (SELECT queue_status_id FROM queue_status WHERE name = 'pending'), > ~ foreign_id); > > COMMIT; > > > - -- removing processes > > BEGIN; > > SELECT queue_id, foreign_id FROM queue > WHERE processing_status = (SELECT queue_status_id FROM queue_status > ~ WHERE name = 'pending') > ORDER BY queue_id LIMIT 1 > FOR UPDATE; > > UPDATE queue > SET processing_pid = ?, > ~ processing_start = now(), > ~ processing_status = (SELECT queue_status_id FROM queue_status WHERE > ~ name = 'active') > WHERE id = ?; > > COMMIT; > > - -- client code does whatever it's going to do here > > BEGIN; > > SELECT 1 FROM queue > WHERE queue_id = ? AND processing_pid = ? > FOR UPDATE; > > - -- confirm that it exists > > DELETE FROM queue WHERE queue_id = ? > > INSERT INTO queue_history (queue_id, processing_pid, processing_start, > ~ processing_complete, processing_status, foreign_id) > VALUES (queue_id, processing_pid, processing_start, now(), > ~ (SELECT queue_status_id FROM queue_status WHERE name = 'done'), > ~ foreign_id); > > COMMIT; > > - -- a seperate process reaps orphaned entries should processing fail. > > BEGIN; > SELECT queue_id, processing_pid FROM queue > WHERE now() - processing_start > 'some reasonable interval'::interval > AND processing_status = (SELECT queue_status_id FROM > queue_status WHERE > ~ name = 'active' FOR UPDATE; > > - -- for each entry, check to see if the PID is still running > > UPDATE queue > SET > ~ processing_pid = NULL, > ~ processing_start = NULL, > ~ processing_status = (SELECT id FROM queue_status WHERE name > = 'pending') > WHERE id = ?; > > COMMIT; > > There are more complicated approaches available. If you plan to have > multiple machines processing, you probably want to add a > processing_node > entry too. > > > KÖPFERL Robert wrote: > | Hi, > | > | since I am new to writing stored procedures I'd like to ask > first bevore I > | do a mistake. > | > | I want to implement some kind of queue (fifo). There are n > users/processes > | that add new records to a table and there are m consumers > that take out > | these records and process them. > | It's however possible for a consumer to die or loose > connection while > | records must not be unprocessed. They may rather be processed twice. > | > | This seems to me as a rather common problem. But also with > atomicy-holes to > | fall into. > | How is this commonly implemented? > | > | > | I can imagine an 'add' and a 'get' function together with > one aditional > | 'processed' timestamp-column? > | > | > | > | Thanks for helping me do the right. > | > | ---------------------------(end of > broadcast)--------------------------- > | TIP 4: Don't 'kill -9' the postmaster > > > - -- > Andrew Hammond 416-673-4138 ahammond@ca.afilias.info > Database Administrator, Afilias Canada Corp. > CB83 2838 4B67 D40F D086 3568 81FC E7E5 27AF 4A9A > -----BEGIN PGP SIGNATURE----- > Version: GnuPG v1.2.5 (GNU/Linux) > > iD8DBQFB5U3kgfzn5SevSpoRAoesAKCAZkr61I5knCw9tIr8rlO0xri7YACgifrn > N01nXZY8UKmIlTnGkngHKUo= > =UXRk > -----END PGP SIGNATURE----- >