Re: Serialization, Locking...implement processing Queue with a table - Mailing list pgsql-general

From D. Dante Lorenso
Subject Re: Serialization, Locking...implement processing Queue with a table
Date
Msg-id 04ad01c318bc$a7c798e0$1564a8c0@ROMULUS
Whole thread Raw
In response to Serialization, Locking...implement processing Queue with a table  ("D. Dante Lorenso" <dante@lorenso.com>)
Responses Re: Serialization, Locking...implement processing Queue with a table
List pgsql-general
Tom,

Thanks for the excellent reply.  I was tossing solutions
back and forth and came across this one, but I don't like
the idea of failing on a transaction and having to retry
it after a delay, so I've come up with this...

What do you think of my alternative solution:?

In Java, I have a function like this which begins a transaction,
locks a dummy table exclusively, and then runs the stored
procedure to reserve the next record for processing:

---------- 8< -------------------- 8< --------------------
public int reserveQueuedImport(int pid) throws SQLException {
    Connection conn = LeadDBConnection.getConnection();

    // Reserve an import (for processing), and return it's ID.
    PreparedStatement pstmt =
        conn.prepareStatement(
            ""
                + "BEGIN TRANSACTION; "
                + "LOCK TABLE import_lock IN EXCLUSIVE MODE; "
                + "SELECT reserve_next_import(?) AS import_id; "
                + "COMMIT; ");
    pstmt.setInt(1, pid);
    ResultSet rec = pstmt.executeQuery();

    // get the value from the first row and first column
    rec.first();
    return (rec.getInt(1));
}


---------- 8< -------------------- 8< --------------------

Meanwhile, the PL/PGSQL stored procedure looks like this:

---------- 8< -------------------- 8< --------------------
CREATE OR REPLACE FUNCTION reserve_next_import (bigint)
RETURNS bigint AS'
DECLARE
    processor_id ALIAS FOR $1;
    my_import_id BIGINT;
BEGIN
    -- initialize the id
    my_import_id := -1;

    -- Find the import ID we wish to reserve and get a lock on that row
    SELECT import_id INTO my_import_id
    FROM import
    WHERE import_state = ''Q''
    AND import_processor_id IS NULL
    ORDER BY import_id
    LIMIT 1
    FOR UPDATE;

    -- abort if there are no queued rows
    IF NOT FOUND THEN
        RETURN (-1);
    END IF;

    -- now go reserve the record with our processor id
    UPDATE import SET
        import_processor_id = processor_id,
        import_prc_start = NULL,
        import_prc_end = NULL,
        import_state = ''R''
    WHERE import_id = my_import_id;

    -- this is the row we reserved...
    RETURN (my_import_id);
END;
'LANGUAGE 'plpgsql';
---------- 8< -------------------- 8< --------------------

What I've done is used the
'LOCK TABLE import_lock IN EXCLUSIVE MODE;' to create a
'synchronized' block around the code which reserves the
item in the queue.  This way, only one application or
thread can run the PL/PGSQL function at a given time.
There will be BLOCKING for applications that sit at the
LOCK call, but that's more desireable than the Fail/Retry
approach, eh?

Can you confirm that this solution will perform as I expect
while keeping the transaction isolation level at a
READ COMMITTED mode instead of SERIALIZABLE?

Oh, yeah, and as a note, the only purpose for the
'import_lock' table is to provide an object to LOCK on for
this code.  This table is empty and is not used for any other
purpose.  Is there any other 'lighter' objects I can create
or lock on in PostgreSQL than a table like this?

Dante

D. Dante Lorenso
dante@lorenso.com
972-333-4139


----- Original Message -----
From: "Tom Lane" <tgl@sss.pgh.pa.us>
To: "D. Dante Lorenso" <dante@lorenso.com>
Cc: <pgsql-general@postgresql.org>
Sent: Monday, May 12, 2003 9:23 AM
Subject: Re: [GENERAL] Serialization, Locking...implement processing Queue
with a table


> "D. Dante Lorenso" <dante@lorenso.com> writes:
> > How should I go about implementing a synchronized process id
> > queue that will select one unique row from a table at a time
> > and make the selection be safe with concurrent accesses?
>
> You can find various discussions of this in the archives, but a
> reasonable way to proceed is:
>
> 1. The table of pending or in-process jobs has a column "processor_id"
> that is zero for pending jobs and equal to the (unique) processor number
> for active jobs.  (Assume for the moment that completed jobs are removed
> from the table entirely.)
>
> 2. When idle, you try to reserve a job like so:
>
> BEGIN;
> SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
> SELECT job_id, ... FROM job_table
>     WHERE processor_id = 0 LIMIT 1 FOR UPDATE;
>
> The SELECT has three possible outcomes:
>
> 2a: One row is returned.  You do
>
> UPDATE job_table SET processor_id = $me
>     WHERE job_id = $jobid;
> COMMIT;
>
> and then go about executing the job.  When done, delete the row from
> job_table and try to get another one.
>
> 2b: No row is returned: no jobs are pending.  Commit your transaction,
> sleep for an appropriate delay period, and try again.
>
> 2c: You get a "can't serialize" failure.  This will happen if two
> processors try to reserve the same row at the same time.  In this case,
> roll back your transaction, sleep for a short interval (maybe a few
> msec) and try again.  You don't want to sleep as long as normal in this
> case, since there might be another available job.
>
> (Note that you *cannot* do this in a plpgsql function, since it cannot
> start or commit a transaction; these commands have got to be directly
> issued by the application.)
>
>
> Assuming that there aren't a vast number of pending jobs at any time,
> this should work pretty well without even bothering with an index on
> job_table.  You will want to vacuum it often though (at least every few
> hundred job completions, I'd think).
>
> Now, what if you wanted to remember completed jobs?  I'd actually
> recommend transferring the records of completed jobs to a different
> table.  But if you really want to keep them in the same table, maybe
> add a boolean "completed" field, and make the initial SELECT be
>
> SELECT job_id, ... FROM job_table
>     WHERE processor_id = 0 AND NOT completed LIMIT 1 FOR UPDATE;
>
> Now you *will* need an index to keep things speedy.  I'd try a partial
> index on processor_id with condition "NOT completed".  You'll still
> need frequent vacuums.
>
> regards, tom lane
>
>
> ---------------------------(end of broadcast)---------------------------
> TIP 3: if posting/reading through Usenet, please send an appropriate
> subscribe-nomail command to majordomo@postgresql.org so that your
> message can get through to the mailing list cleanly
>


pgsql-general by date:

Previous
From: "Chris Palmer"
Date:
Subject: Re: Unicode confusion
Next
From: "alex b."
Date:
Subject: PREPARED ...