Thread: Implementing queue semantics (novice)

Implementing queue semantics (novice)

From
KÖPFERL Robert
Date:
Hi,

since I am new to writing stored procedures I'd like to ask first bevore I
do a mistake.

I want to implement some kind of queue (fifo). There are n users/processes
that add new records to a table and there are m consumers that take out
these records and process them.
It's however possible for a consumer to die or loose connection while
records must not be unprocessed. They may rather be processed twice.

This seems to me as a rather common problem. But also with atomicy-holes to
fall into.
How is this commonly implemented?


I can imagine an 'add' and a 'get' function together with one aditional
'processed' timestamp-column?



Thanks for helping me do the right.


Re: Implementing queue semantics (novice)

From
Andrew Hammond
Date:
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

The name for what you're looking to build is a concurrent batch
processing system. Here's a basic one.

- -- adding processes

BEGIN;

INSERT INTO queue (queue_id, processing_pid, processing_start,
~ processing_status, foreign_id)
VALUES (DEFAULT, NULL, NULL,
~ (SELECT queue_status_id FROM queue_status WHERE name = 'pending'),
~ foreign_id);

COMMIT;


- -- removing processes

BEGIN;

SELECT queue_id, foreign_id FROM queue
WHERE processing_status = (SELECT queue_status_id FROM queue_status
~ WHERE name = 'pending')
ORDER BY queue_id LIMIT 1
FOR UPDATE;

UPDATE queue
SET processing_pid = ?,
~ processing_start = now(),
~ processing_status = (SELECT queue_status_id FROM queue_status WHERE
~  name = 'active')
WHERE id = ?;

COMMIT;

- -- client code does whatever it's going to do here

BEGIN;

SELECT 1 FROM queue
WHERE queue_id = ? AND processing_pid = ?
FOR UPDATE;

- -- confirm that it exists

DELETE FROM queue WHERE queue_id = ?

INSERT INTO queue_history (queue_id, processing_pid, processing_start,
~ processing_complete, processing_status, foreign_id)
VALUES (queue_id, processing_pid, processing_start, now(),
~ (SELECT queue_status_id FROM queue_status WHERE name = 'done'),
~ foreign_id);

COMMIT;

- -- a seperate process reaps orphaned entries should processing fail.

BEGIN;
SELECT queue_id, processing_pid FROM queue
WHERE now() - processing_start > 'some reasonable interval'::interval
AND processing_status = (SELECT queue_status_id FROM queue_status WHERE
~ name = 'active' FOR UPDATE;

- -- for each entry, check to see if the PID is still running

UPDATE queue
SET
~ processing_pid = NULL,
~ processing_start = NULL,
~ processing_status = (SELECT id FROM queue_status WHERE name = 'pending')
WHERE id = ?;

COMMIT;

There are more complicated approaches available. If you plan to have
multiple machines processing, you probably want to add a processing_node
entry too.


KÖPFERL Robert wrote:
| Hi,
|
| since I am new to writing stored procedures I'd like to ask first bevore I
| do a mistake.
|
| I want to implement some kind of queue (fifo). There are n users/processes
| that add new records to a table and there are m consumers that take out
| these records and process them.
| It's however possible for a consumer to die or loose connection while
| records must not be unprocessed. They may rather be processed twice.
|
| This seems to me as a rather common problem. But also with
atomicy-holes to
| fall into.
| How is this commonly implemented?
|
|
| I can imagine an 'add' and a 'get' function together with one aditional
| 'processed' timestamp-column?
|
|
|
| Thanks for helping me do the right.
|
| ---------------------------(end of broadcast)---------------------------
| TIP 4: Don't 'kill -9' the postmaster


- --
Andrew Hammond    416-673-4138    ahammond@ca.afilias.info
Database Administrator, Afilias Canada Corp.
CB83 2838 4B67 D40F D086 3568 81FC E7E5 27AF 4A9A
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.2.5 (GNU/Linux)

iD8DBQFB5U3kgfzn5SevSpoRAoesAKCAZkr61I5knCw9tIr8rlO0xri7YACgifrn
N01nXZY8UKmIlTnGkngHKUo=
=UXRk
-----END PGP SIGNATURE-----


Re: Implementing queue semantics (novice)

From
KÖPFERL Robert
Date:
Thank you for this rather detailed example.
I already learned something and omitted a fault. There should be enogh to
implement such a Queue.  :-)


> -----Original Message-----
> From: Andrew Hammond [mailto:ahammond@ca.afilias.info]
> Sent: Mittwoch, 12. Jänner 2005 17:19
> To: KÖPFERL Robert
> Cc: 'pgsql-sql@postgresql.org'
> Subject: Re: [SQL] Implementing queue semantics (novice)
>
>
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> The name for what you're looking to build is a concurrent batch
> processing system. Here's a basic one.
>
> - -- adding processes
>
> BEGIN;
>
> INSERT INTO queue (queue_id, processing_pid, processing_start,
> ~ processing_status, foreign_id)
> VALUES (DEFAULT, NULL, NULL,
> ~ (SELECT queue_status_id FROM queue_status WHERE name = 'pending'),
> ~ foreign_id);
>
> COMMIT;
>
>
> - -- removing processes
>
> BEGIN;
>
> SELECT queue_id, foreign_id FROM queue
> WHERE processing_status = (SELECT queue_status_id FROM queue_status
> ~ WHERE name = 'pending')
> ORDER BY queue_id LIMIT 1
> FOR UPDATE;
>
> UPDATE queue
> SET processing_pid = ?,
> ~ processing_start = now(),
> ~ processing_status = (SELECT queue_status_id FROM queue_status WHERE
> ~  name = 'active')
> WHERE id = ?;
>
> COMMIT;
>
> - -- client code does whatever it's going to do here
>
> BEGIN;
>
> SELECT 1 FROM queue
> WHERE queue_id = ? AND processing_pid = ?
> FOR UPDATE;
>
> - -- confirm that it exists
>
> DELETE FROM queue WHERE queue_id = ?
>
> INSERT INTO queue_history (queue_id, processing_pid, processing_start,
> ~ processing_complete, processing_status, foreign_id)
> VALUES (queue_id, processing_pid, processing_start, now(),
> ~ (SELECT queue_status_id FROM queue_status WHERE name = 'done'),
> ~ foreign_id);
>
> COMMIT;
>
> - -- a seperate process reaps orphaned entries should processing fail.
>
> BEGIN;
> SELECT queue_id, processing_pid FROM queue
> WHERE now() - processing_start > 'some reasonable interval'::interval
> AND processing_status = (SELECT queue_status_id FROM
> queue_status WHERE
> ~ name = 'active' FOR UPDATE;
>
> - -- for each entry, check to see if the PID is still running
>
> UPDATE queue
> SET
> ~ processing_pid = NULL,
> ~ processing_start = NULL,
> ~ processing_status = (SELECT id FROM queue_status WHERE name
> = 'pending')
> WHERE id = ?;
>
> COMMIT;
>
> There are more complicated approaches available. If you plan to have
> multiple machines processing, you probably want to add a
> processing_node
> entry too.
>
>
> KÖPFERL Robert wrote:
> | Hi,
> |
> | since I am new to writing stored procedures I'd like to ask
> first bevore I
> | do a mistake.
> |
> | I want to implement some kind of queue (fifo). There are n
> users/processes
> | that add new records to a table and there are m consumers
> that take out
> | these records and process them.
> | It's however possible for a consumer to die or loose
> connection while
> | records must not be unprocessed. They may rather be processed twice.
> |
> | This seems to me as a rather common problem. But also with
> atomicy-holes to
> | fall into.
> | How is this commonly implemented?
> |
> |
> | I can imagine an 'add' and a 'get' function together with
> one aditional
> | 'processed' timestamp-column?
> |
> |
> |
> | Thanks for helping me do the right.
> |
> | ---------------------------(end of
> broadcast)---------------------------
> | TIP 4: Don't 'kill -9' the postmaster
>
>
> - --
> Andrew Hammond    416-673-4138    ahammond@ca.afilias.info
> Database Administrator, Afilias Canada Corp.
> CB83 2838 4B67 D40F D086 3568 81FC E7E5 27AF 4A9A
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.2.5 (GNU/Linux)
>
> iD8DBQFB5U3kgfzn5SevSpoRAoesAKCAZkr61I5knCw9tIr8rlO0xri7YACgifrn
> N01nXZY8UKmIlTnGkngHKUo=
> =UXRk
> -----END PGP SIGNATURE-----
>