Thread: PostgreSQL as advanced job queuing system

PostgreSQL as advanced job queuing system

From
ushi
Date:
Hello List,

i am playing with the idea to implement a job queuing system using PostgreSQL. To meet requirements the system needs to
offersome advanced 
 
features compared to "classic" queuing systems:

- users can create new queues at any time
- queues have priorities
- priorities of queues can change at any time
- jobs in queues with the highest priority should be processed first

A simple schema could look like this:

    create table queues (
      id integer primary key,
      priority integer not null default 0
    );

    create table jobs (
      id serial primary key,
      queue_id integer not null references queues(id)
    );

Right now i am stuck writing an efficient query for dequeuing. This is what i got so far:

    insert into queues (id, priority) values (1, 0), (2, 1), (3, 1);
    insert into jobs (queue_id) select 1 from generate_series(1, 1000000);
    insert into jobs (queue_id) select 2 from generate_series(1, 1000000);
    insert into jobs (queue_id) select 3 from generate_series(1, 1000000);

Here is a simple dequeuing query that is super efficient, but ignores priority:

    select * from jobs limit 1 for update of jobs skip locked;
    --  id | queue_id
    -- ----+----------
    --   1 |        1
    -- (1 row)
    --
    -- Time: 2.641 ms

This is my naive query obeying priority. This is very slow and i could not get it any faster:

    select *
    from jobs
    join queues on queues.id = jobs.queue_id
    order by priority desc
    limit 1
    for update of jobs skip locked;
    --    id    | queue_id | id | priority
    -- ---------+----------+----+----------
    --  1000001 |        2 |  2 |        1
    -- (1 row)
    --
    -- Time: 1116.631 ms (00:01.117)

Here is the query plan:

   --                                                                QUERY PLAN
   --

-----------------------------------------------------------------------------------------------------------------------------------------
   --  Limit  (cost=66225.61..66225.63 rows=1 width=28) (actual time=1333.139..1333.142 rows=1 loops=1)
   --    ->  LockRows  (cost=66225.61..103725.61 rows=3000000 width=28) (actual time=1333.137..1333.139 rows=1
loops=1)
   --          ->  Sort  (cost=66225.61..73725.61 rows=3000000 width=28) (actual time=1333.110..1333.112 rows=1
loops=1)
   --                Sort Key: queues.priority DESC
   --                Sort Method: external merge  Disk: 111568kB
   --                ->  Hash Join  (cost=60.85..51225.61 rows=3000000 width=28) (actual time=0.064..660.317
rows=3000000loops=1)
 
   --                      Hash Cond: (jobs.queue_id = queues.id)
   --                      ->  Seq Scan on jobs  (cost=0.00..43275.00 rows=3000000 width=14) (actual
time=0.027..253.868rows=3000000 loops=1)
 
   --                      ->  Hash  (cost=32.60..32.60 rows=2260 width=14) (actual time=0.021..0.022 rows=3 loops=1)
   --                            Buckets: 4096  Batches: 1  Memory Usage: 33kB
   --                            ->  Seq Scan on queues  (cost=0.00..32.60 rows=2260 width=14) (actual
time=0.011..0.013rows=3 loops=1)
 
   --  Planning Time: 0.430 ms
   --  Execution Time: 1347.448 ms
   -- (13 rows)

Any ideas for a more efficient implementation?

Thank you for your time,
ushi




Re: PostgreSQL as advanced job queuing system

From
Dominique Devienne
Date:
On Fri, Mar 22, 2024 at 12:58 PM ushi <ushi@mailbox.org> wrote:
i am playing with the idea to implement a job queuing system using PostgreSQL.

FYI, two bookmarks I have on this subject, that I plan to revisit eventually:

If others have links to good articles on this subject, or good input to give in this thread, I'd be interested. Thanks, --DD

Re: PostgreSQL as advanced job queuing system

From
Robert Treat
Date:
On Fri, Mar 22, 2024 at 8:05 AM Dominique Devienne <ddevienne@gmail.com> wrote:
>
> On Fri, Mar 22, 2024 at 12:58 PM ushi <ushi@mailbox.org> wrote:
>>
>> i am playing with the idea to implement a job queuing system using PostgreSQL.
>
>
> FYI, two bookmarks I have on this subject, that I plan to revisit eventually:
> * https://news.ycombinator.com/item?id=20020501
> *
https://www.enterprisedb.com/blog/listening-postgres-how-listen-and-notify-syntax-promote-high-availability-application-layer
>
> If others have links to good articles on this subject, or good input to give in this thread, I'd be interested.
Thanks,--DD 

This is a well worn topic within the postgres community, with a number
of different implementations, but a couple of links that are probably
worth looking at would be:
- https://wiki.postgresql.org/wiki/PGQ_Tutorial, probably the first
queue system that gained wide adoption
- https://brandur.org/river, a new queue system based on postgres/go,
which also has a link to an article about why the authors had ditched
postgres based queueing in favor of redis some years before which is
worth a read to understand some of the issues that Postgres has as the
basis for a queue system.

And yeah, I suspect this may become a hot topic again now that Redis
is moving away from open source:
https://redis.com/blog/redis-adopts-dual-source-available-licensing/

Robert Treat
https://xzilla.net



Re: PostgreSQL as advanced job queuing system

From
"Peter J. Holzer"
Date:
On 2024-03-22 12:43:40 +0100, ushi wrote:
> i am playing with the idea to implement a job queuing system using
> PostgreSQL. To meet requirements the system needs to offer some advanced
> features compared to "classic" queuing systems:
>
> - users can create new queues at any time
> - queues have priorities
> - priorities of queues can change at any time
> - jobs in queues with the highest priority should be processed first
>
> A simple schema could look like this:
>
>    create table queues (
>      id integer primary key,
>      priority integer not null default 0
>    );
>
>    create table jobs (
>      id serial primary key,
>      queue_id integer not null references queues(id)
>    );
>
> Right now i am stuck writing an efficient query for dequeuing. This is what i got so far:
>
>    insert into queues (id, priority) values (1, 0), (2, 1), (3, 1);
>    insert into jobs (queue_id) select 1 from generate_series(1, 1000000);
>    insert into jobs (queue_id) select 2 from generate_series(1, 1000000);
>    insert into jobs (queue_id) select 3 from generate_series(1, 1000000);
>
[...]
>
> This is my naive query obeying priority. This is very slow and i could not get it any faster:
>
>    select *
>    from jobs
>    join queues on queues.id = jobs.queue_id
>    order by priority desc
>    limit 1
>    for update of jobs skip locked;
>    --    id    | queue_id | id | priority
>    -- ---------+----------+----+----------
>    --  1000001 |        2 |  2 |        1
>    -- (1 row)
>    --
>    -- Time: 1116.631 ms (00:01.117)
>
> Here is the query plan:
>
>   --                                                                QUERY PLAN
>   --
>
-----------------------------------------------------------------------------------------------------------------------------------------
>   --  Limit  (cost=66225.61..66225.63 rows=1 width=28) (actual time=1333.139..1333.142 rows=1 loops=1)
>   --    ->  LockRows  (cost=66225.61..103725.61 rows=3000000 width=28) (actual time=1333.137..1333.139 rows=1
loops=1)
>   --          ->  Sort  (cost=66225.61..73725.61 rows=3000000 width=28) (actual time=1333.110..1333.112 rows=1
loops=1)
>   --                Sort Key: queues.priority DESC
>   --                Sort Method: external merge  Disk: 111568kB
                                   ^^^^^^^^^^^^^^^^^^^^
                                   You might want to increase work_mem.
>   --                ->  Hash Join  (cost=60.85..51225.61 rows=3000000 width=28) (actual time=0.064..660.317
rows=3000000loops=1) 
[...]
>   --  Planning Time: 0.430 ms
>   --  Execution Time: 1347.448 ms
>   -- (13 rows)

Is 3 queues with 1 million jobs each a realistic scenario in your case?

If you have 3000 queues with 1000 jobs each instead the planner can use
an index on queues(priority):

hjp=> explain(analyze)   select *
   from jobs
   join queues on queues.id = jobs.queue_id
   order by priority desc, jobs.id asc
   limit 1
;

╔═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║                                                                         QUERY PLAN
                                     ║ 

╟─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║ Limit  (cost=1222.42..1222.47 rows=1 width=16) (actual time=9.128..9.129 rows=1 loops=1)
                                     ║ 
║   ->  Incremental Sort  (cost=1222.42..159673.21 rows=3000000 width=16) (actual time=9.127..9.127 rows=1 loops=1)
                                     ║ 
║         Sort Key: queues.priority DESC, jobs.id
                                     ║ 
║         Presorted Key: queues.priority
                                     ║ 
║         Full-sort Groups: 1  Sort Method: top-N heapsort  Average Memory: 25kB  Peak Memory: 25kB
                                     ║ 
║         Pre-sorted Groups: 1  Sort Method: top-N heapsort  Average Memory: 25kB  Peak Memory: 25kB
                                     ║ 
║         ->  Nested Loop  (cost=0.71..107171.21 rows=3000000 width=16) (actual time=0.037..5.824 rows=27001 loops=1)
                                     ║ 
║               ->  Index Scan Backward using queues_priority_idx on queues  (cost=0.28..121.21 rows=3000 width=8)
(actualtime=0.018..0.059 rows=28 loops=1) ║ 
║               ->  Index Only Scan using jobs_queue_id_id_idx on jobs  (cost=0.43..25.68 rows=1000 width=8) (actual
time=0.011..0.119rows=964 loops=28)     ║ 
║                     Index Cond: (queue_id = queues.id)
                                     ║ 
║                     Heap Fetches: 0
                                     ║ 
║ Planning Time: 0.362 ms
                                     ║ 
║ Execution Time: 9.158 ms
                                     ║ 

╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
(13 rows)

hjp=> \d queues
                 Table "public.queues"
╔══════════╤═════════╤═══════════╤══════════╤═════════╗
║  Column  │  Type   │ Collation │ Nullable │ Default ║
╟──────────┼─────────┼───────────┼──────────┼─────────╢
║ id       │ integer │           │ not null │         ║
║ priority │ integer │           │ not null │ 0       ║
╚══════════╧═════════╧═══════════╧══════════╧═════════╝
Indexes:
    "queues_pkey" PRIMARY KEY, btree (id)
    "queues_priority_idx" btree (priority)
Referenced by:
    TABLE "jobs" CONSTRAINT "jobs_queue_id_fkey" FOREIGN KEY (queue_id) REFERENCES queues(id)

hjp=> \d jobs
                              Table "public.jobs"
╔══════════╤═════════╤═══════════╤══════════╤══════════════════════════════════╗
║  Column  │  Type   │ Collation │ Nullable │             Default              ║
╟──────────┼─────────┼───────────┼──────────┼──────────────────────────────────╢
║ id       │ integer │           │ not null │ nextval('jobs_id_seq'::regclass) ║
║ queue_id │ integer │           │ not null │                                  ║
╚══════════╧═════════╧═══════════╧══════════╧══════════════════════════════════╝
Indexes:
    "jobs_pkey" PRIMARY KEY, btree (id)
    "jobs_queue_id_id_idx" btree (queue_id, id)
    "jobs_queue_id_idx" btree (queue_id)
Foreign-key constraints:
    "jobs_queue_id_fkey" FOREIGN KEY (queue_id) REFERENCES queues(id)


If you do have very few very long queues it might be faster to query
each queue separately.

        hp

--
   _  | Peter J. Holzer    | Story must make more sense than reality.
|_|_) |                    |
| |   | hjp@hjp.at         |    -- Charles Stross, "Creative writing
__/   | http://www.hjp.at/ |       challenge!"

Attachment

Re: PostgreSQL as advanced job queuing system

From
Fred Habash
Date:
We developed a home-grown queue system using Postgres, but its performance was largely hindered by que tables bloating and the need to continuously vacuum them. It did not scale whatsoever. With some workarounds, we ended up designing three sets of queue tables, switching between them based on some queue stats, vacuum the inactive set, and repeat.
We kept this queue system for low SLA app components. For others, we switched to Kafka. Investing in learning and implementing purpose built queue systems pays off for the long term. 

On Fri, Mar 22, 2024 at 8:31 AM Robert Treat <rob@xzilla.net> wrote:
On Fri, Mar 22, 2024 at 8:05 AM Dominique Devienne <ddevienne@gmail.com> wrote:
>
> On Fri, Mar 22, 2024 at 12:58 PM ushi <ushi@mailbox.org> wrote:
>>
>> i am playing with the idea to implement a job queuing system using PostgreSQL.
>
>
> FYI, two bookmarks I have on this subject, that I plan to revisit eventually:
> * https://news.ycombinator.com/item?id=20020501
> * https://www.enterprisedb.com/blog/listening-postgres-how-listen-and-notify-syntax-promote-high-availability-application-layer
>
> If others have links to good articles on this subject, or good input to give in this thread, I'd be interested. Thanks, --DD

This is a well worn topic within the postgres community, with a number
of different implementations, but a couple of links that are probably
worth looking at would be:
- https://wiki.postgresql.org/wiki/PGQ_Tutorial, probably the first
queue system that gained wide adoption
- https://brandur.org/river, a new queue system based on postgres/go,
which also has a link to an article about why the authors had ditched
postgres based queueing in favor of redis some years before which is
worth a read to understand some of the issues that Postgres has as the
basis for a queue system.

And yeah, I suspect this may become a hot topic again now that Redis
is moving away from open source:
https://redis.com/blog/redis-adopts-dual-source-available-licensing/

Robert Treat
https://xzilla.net




--

----------------------------------------
Thank you


Re: PostgreSQL as advanced job queuing system

From
Thiemo Kellner
Date:

Am 22.03.2024 um 14:15 schrieb Fred Habash:
> We developed a home-grown queue system using Postgres, but its 
> performance was largely hindered by que tables bloating and the need to 
> continuously vacuum them. It did not scale whatsoever. With some 
> workarounds, we ended up designing three sets of queue tables, switching 
> between them based on some queue stats, vacuum the inactive set, and repeat.
> We kept this queue system for low SLA app components. For others, we 
> switched to Kafka. Investing in learning and implementing purpose built 
> queue systems pays off for the long term.

I wonder whether one should https://youtu.be/VEWXmdjzIpQ&t=543 not to 
scale either.



Re: PostgreSQL as advanced job queuing system

From
Shaheed Haque
Date:
Generally, I'd suggest you think carefully about the nature of the jobs, and draw up a list of must-have properties (performance of course, but also things like whether jobs have to survive planned or unplanned outages, be visible across a WAN, numbers of readers and writers, delivery guarantees, etc etc) and then decide on make versus "buy". Distributed systems are hard, and hard to get right. 

On Fri, 22 Mar 2024, 16:17 Thiemo Kellner, <thiemo@gelassene-pferde.biz> wrote:


Am 22.03.2024 um 14:15 schrieb Fred Habash:
> We developed a home-grown queue system using Postgres, but its
> performance was largely hindered by que tables bloating and the need to
> continuously vacuum them. It did not scale whatsoever. With some
> workarounds, we ended up designing three sets of queue tables, switching
> between them based on some queue stats, vacuum the inactive set, and repeat.
> We kept this queue system for low SLA app components. For others, we
> switched to Kafka. Investing in learning and implementing purpose built
> queue systems pays off for the long term.

I wonder whether one should https://youtu.be/VEWXmdjzIpQ&t=543 not to
scale either.


Re: PostgreSQL as advanced job queuing system

From
Merlin Moncure
Date:
On Fri, Mar 22, 2024 at 6:58 AM ushi <ushi@mailbox.org> wrote:
Hello List,

i am playing with the idea to implement a job queuing system using PostgreSQL. To meet requirements the system needs to offer some advanced
features compared to "classic" queuing systems:

- users can create new queues at any time
- queues have priorities
- priorities of queues can change at any time
- jobs in queues with the highest priority should be processed first

 You can definitely do this.  I wrote an enterprise scheduler, called pgtask, which ochestates a very large amount of work each night..   Here's a couple of screenshots.  It's running a distributed analytics enterprise analytics batch environment in the vein of airflow.  Here's a couple of screenshots.   It's a single threaded stored procedure architecture that uses dbink calls to distribute the work.



I'm running at a pretty high scale and it's reliable.  It's not really positioned as a proper queue, but more of a graph orchestration system, and it supports realtime and quasirealtime.  The main bottleneck is that dblink does not have epoll style 'wait for first to finish or fail' (it really needs to), forcing a loop in sql which bounds the active connections a bit.

Being able to manage the state explicitly in the database is wonderful, if you know what you're doing.

merlin

Re: PostgreSQL as advanced job queuing system

From
Dominique Devienne
Date:
On Sat, Mar 23, 2024 at 3:13 AM Merlin Moncure <mmoncure@gmail.com> wrote:
On Fri, Mar 22, 2024 at 6:58 AM ushi <ushi@mailbox.org> wrote:
the idea to implement a job queuing system using PostgreSQL.

I wrote an enterprise scheduler, called pgtask, which ochestates a very large amount of work [...]

Hi. Anything you can share? OSS? Doesn't look like it...
If it's not, a more details higher level architecture overview would be nice.
 
I'm running at a pretty high scale and it's reliable. [...]

Sounds great.
 
Being able to manage the state explicitly in the database is wonderful, if you know what you're doing.

Indeed. I want to reduce the complexity of the stack as much as possible, and rely only on PostgreSQL, if possible.
We don't need a super duper queue or full distributed system. So a PostgreSQL-based solution aught to be enough. --DD

Re: PostgreSQL as advanced job queuing system

From
Merlin Moncure
Date:
On Mon, Mar 25, 2024 at 4:43 AM Dominique Devienne <ddevienne@gmail.com> wrote:
On Sat, Mar 23, 2024 at 3:13 AM Merlin Moncure <mmoncure@gmail.com> wrote:
On Fri, Mar 22, 2024 at 6:58 AM ushi <ushi@mailbox.org> wrote:
the idea to implement a job queuing system using PostgreSQL.

I wrote an enterprise scheduler, called pgtask, which ochestates a very large amount of work [...]

Hi. Anything you can share? OSS? Doesn't look like it...
If it's not, a more details higher level architecture overview would be nice.

let me float that, I would love to project-ize this.  Stay tuned

merlin

Re: PostgreSQL as advanced job queuing system

From
Justin Clift
Date:
On 2024-03-25 23:44, Merlin Moncure wrote:
> On Mon, Mar 25, 2024 at 4:43 AM Dominique Devienne 
> <ddevienne@gmail.com> wrote:
<snip>
>> Hi. Anything you can share? OSS? Doesn't look like it...
>> If it's not, a more details higher level architecture overview would 
>> be
>> nice.
> 
> let me float that, I would love to project-ize this.  Stay tuned

Hopefully it get approved.  More battle tested queue systems are 
welcome,
especially those that have been used at non-trivial scales. :D

+ Justin