Thread: Autovacuum launcher patch

Autovacuum launcher patch

From
Alvaro Herrera
Date:
Hello,

This patch separates autovacuum in two families of processes: one is the
"launcher", in charge of examining statistics and deciding when to start
a worker.  The other is the worker, which is started by the postmaster
under command of the launcher, and processes what the launcher tells it
to process (by way of setting info up in shared memory).

The postmaster treats workers as regular backends; they are listed in
the backend list, so when it wants to shut down, it'll send a SIGTERM
signal just like everyone else, meaning it'll Just Work(tm).

The launcher is a dummy process; it never connects to any database.
Right now, the scheduling is more or less the same as before: it'll only
start a single worker, which will process a whole database.  Or rather,
all tables in it that are determined to need vacuuming, per the old
rules.  Currently, the launcher first examines the last autovacuum time
to determine which database to vacuum; the worker then examines the
stats to determine which tables to vacuum.  Eventually this will need to
be changed so that the launcher tells the worker exactly what table to
work on.

I've been wondering how to make the scheduling work in the future, when
we need to have the launcher read stuff from catalogs to configure the
scheduling ...  Maybe the solution will be to store flatfiles based on
the catalogs, like we do for pg_database and pg_authid.

Comments are welcome.

--
Alvaro Herrera                                http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

Attachment

Re: Autovacuum launcher patch

From
Tom Lane
Date:
Alvaro Herrera <alvherre@commandprompt.com> writes:
> The launcher is a dummy process; it never connects to any database.
> ...  Eventually this will need to
> be changed so that the launcher tells the worker exactly what table to
> work on.

I detect a certain lack of clarity of thinking here.  Either the
launcher can read databases or it can't.  Do you intend to solve the
problem of all the transaction/catcache infrastructure being designed
on the assumption of being in exactly one database?

I'd suggest sticking to something closer to the current two-phase design
where you make some preliminary decision which database to send a worker
to, and then the worker determines exactly what to do once it can look
around inside the DB.  Possibly we need some back-signaling mechanism
whereby a worker can tell the launcher "hey boss, send help" if it sees
that there are enough tables that need work, but I'm unenthused about
having the launcher figure that out itself.

            regards, tom lane

Re: Autovacuum launcher patch

From
Alvaro Herrera
Date:
Tom Lane wrote:
> Alvaro Herrera <alvherre@commandprompt.com> writes:
> > The launcher is a dummy process; it never connects to any database.
> > ...  Eventually this will need to
> > be changed so that the launcher tells the worker exactly what table to
> > work on.
>
> I detect a certain lack of clarity of thinking here.  Either the
> launcher can read databases or it can't.  Do you intend to solve the
> problem of all the transaction/catcache infrastructure being designed
> on the assumption of being in exactly one database?

I had the same thought, but then I realized that most of the needed data
is actually stored in pgstat, so we don't need to connect to any
database to get it.  (Hmm, except pg_class.reltuples).

What will probably live in databases will be the scheduling catalogs,
but I think I suggested that we could solve that problem by storing the
contents of those in plain text files, like pg_database.

I don't think this is a fundamental problem with the current patch
though.  I've refrained from committing it mostly because I'd like
someone else to eyeball it just for safety, so I'll still allow for
several days to pass (unless there is a rush for getting it in ...)

> I'd suggest sticking to something closer to the current two-phase design
> where you make some preliminary decision which database to send a worker
> to, and then the worker determines exactly what to do once it can look
> around inside the DB.  Possibly we need some back-signaling mechanism
> whereby a worker can tell the launcher "hey boss, send help" if it sees
> that there are enough tables that need work, but I'm unenthused about
> having the launcher figure that out itself.

Hmm, yeah, we'll probably need some communication channel eventually.

--
Alvaro Herrera                                http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

Re: Autovacuum launcher patch

From
Markus Schiltknecht
Date:
Alvaro Herrera wrote:
> Hmm, I remember eyeballing that code.  Would you mind sending me an URL
> to that file, or something?  Or maybe send me the files themselves?

Sure, here's a patch against current CVS. Please remove all the
functions referencing to "buffer" and "buffer.h" to compile.

Remember that it's a work in progress thing. It has flaws. One issue
that currently bugs me is, that processes can deadlock if they keep
trying to create a message (IMessagesCreate), but fail because the queue
is full of messages for themselves. A process should thus always try to
fetch messages (via IMessagesCheck) and remove pending ones before
retrying to send one. That's not always practical.

One design limitation is, that you have to know how large your message
is as soon as you reserve (shared) memory for it, but that's intended.

At least I've stress tested the wrap-around code and it seems to work.
No guarantees, though ;-)

Regards

Markus

#
# old_revision [9a68fa59cb0ca3246f03880664062abb98f1a61a]
#
# add_file "src/backend/storage/ipc/imsg.c"
#  content [3e84c6372a47612a2fe233fee6b122808135580e]
#
# add_file "src/include/storage/imsg.h"
#  content [3cf37b12a00b90f65b8393fc5e27c98d772dc22b]
#
# patch "src/backend/storage/ipc/Makefile"
#  from [71276ab6483aebbb27f87c988d77ab876611f190]
#    to [9a99101d3e8bbfe52c97763db536804e94371828]
#
# patch "src/backend/storage/ipc/ipci.c"
#  from [177f266b4668190a6ab1f2902305f7b7e577ef8d]
#    to [1971e2122ba4455c8b9784e70059d917fdf4f4c8]
#
============================================================
--- src/backend/storage/ipc/imsg.c    3e84c6372a47612a2fe233fee6b122808135580e
+++ src/backend/storage/ipc/imsg.c    3e84c6372a47612a2fe233fee6b122808135580e
@@ -0,0 +1,375 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <markus@bluegap.ch>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#include <sys/ioctl.h>
+
+#include "postgres.h"
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "storage/imsg.h"
+#include "storage/ipc.h"
+#include "storage/buffer.h"
+#include "storage/spin.h"
+#include "utils/elog.h"
+
+/* global variable pointing to the shmem area */
+IMessageCtlData *IMessageCtl = NULL;
+
+/*
+ * Initialization of shared memory for internal messages.
+ */
+int
+IMessageShmemSize(void)
+{
+    return MAXALIGN(IMessageBufferSize);
+}
+
+void
+IMessageShmemInit(void)
+{
+    bool        foundIMessageCtl;
+
+#ifdef IMSG_DEBUG
+    elog(DEBUG3, "IMessageShmemInit(): initializing shared memory");
+#endif
+
+    IMessageCtl = (IMessageCtlData *)
+        ShmemInitStruct("IMsgCtl",
+                        MAXALIGN(IMessageBufferSize),
+                        &foundIMessageCtl);
+
+    if (foundIMessageCtl)
+        return;
+
+    /* empty the control structure and all message descriptors */
+    memset(IMessageCtl, 0, MAXALIGN(IMessageBufferSize));
+
+    /* initialize start and end pointers */
+    IMessageCtl->queue_start = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+    IMessageCtl->queue_end = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+
+    SpinLockInit(&IMessageCtl->msgs_lck);
+}
+
+/*
+ *   IMessageCreate
+ *
+ * creates a new but deactivated message within the queue, returning the
+ * message header of the newly created message.
+ */
+IMessage*
+IMessageCreate(int recipient, int msg_size)
+{
+    IMessage       *msg;
+    int                remaining_space;
+
+#ifdef IMSG_DEBUG
+    elog(DEBUG3, "IMessageCreate(): recipient: %d, size: %d",
+        recipient, msg_size);
+#endif
+
+    /* assert a reasonable maximum message size */
+    Assert(msg_size < (MAXALIGN(IMessageBufferSize) / 4));
+
+    START_CRIT_SECTION();
+    {
+        /* use volatile pointer to prevent code rearrangement */
+        volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+        SpinLockAcquire(&imsgctl->msgs_lck);
+
+        /*
+         * Check if there is enough space for the message plus the
+         * terminating header
+         */
+        if (imsgctl->queue_end < imsgctl->queue_start)
+            remaining_space = (int) imsgctl->queue_start -
+                              (int) imsgctl->queue_end;
+        else
+            remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+                              (int) imsgctl->queue_end;
+
+        if (remaining_space < (MAXALIGN(IMessageBufferSize) / 8))
+        {
+#ifdef IMSG_DEBUG
+            elog(DEBUG3, "IMessageCreate(): cleanup starting");
+#endif
+
+            /* Clean up messages that have been removed. */
+            while (imsgctl->queue_start->recipient == 0)
+            {
+                if (imsgctl->queue_start > imsgctl->queue_end)
+                {
+                    if ((imsgctl->queue_start->sender == 0) &&
+                        (imsgctl->queue_start->recipient == 0))
+                    {
+#ifdef IMSG_DEBUG
+            elog(DEBUG3, "IMessageCreate(): cleanup wrapped");
+#endif
+                        imsgctl->queue_start = (IMessage*) IMSG_BUFFER_START(imsgctl);
+                        continue;
+                    }
+                }
+                else if (imsgctl->queue_start >= imsgctl->queue_end)
+                    break;
+
+                imsgctl->queue_start = (IMessage*) (
+                    (int) imsgctl->queue_start +
+                    IMSG_ALIGN(imsgctl->queue_start->size +
+                               sizeof(IMessage)));
+            }
+
+            /* recalc remainig space */
+            if (imsgctl->queue_end < imsgctl->queue_start)
+                remaining_space = (int) imsgctl->queue_start -
+                                  (int) imsgctl->queue_end;
+            else
+                remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+                                  (int) imsgctl->queue_end;
+
+        }
+
+        if (IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) < remaining_space)
+        {
+            msg = (IMessage*) imsgctl->queue_end;
+            imsgctl->queue_end = (IMessage*) ((int) imsgctl->queue_end +
+                                 IMSG_ALIGN(msg_size + sizeof(IMessage)));
+        }
+        else
+        {
+            remaining_space = (int) imsgctl->queue_start -
+                              (int) IMSG_BUFFER_START(imsgctl);
+#ifdef IMSG_DEBUG
+            elog(DEBUG5, "IMessageCreate:    remaining wrap space: %d",
+                 remaining_space);
+#endif
+
+            /* There is not enough space. But maybe we can wrap around? */
+            if ((imsgctl->queue_end >= imsgctl->queue_start) &&
+                ((int) IMSG_BUFFER_START(imsgctl) +
+                IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) <
+                (int) imsgctl->queue_start))
+            {
+                /* Yes, wrap around */
+#ifdef IMSG_DEBUG
+                elog(DEBUG5, "IMessageCreate: wrapped around.");
+#endif
+                msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+                imsgctl->queue_end = (IMessage*) ((int) msg +
+                                    IMSG_ALIGN(msg_size + sizeof(IMessage)));
+            }
+            else
+            {
+                /* TODO: correct error handling here... */
+                elog(ERROR, "Not enough space within IMessages buffer.");
+                SpinLockRelease(&imsgctl->msgs_lck);
+                return NULL;
+            }
+        }
+
+        /* initialize the message as inactive */
+        msg->sender = 0;
+        msg->recipient = recipient;
+        msg->size = msg_size;
+
+        /* clean the following block */
+        imsgctl->queue_end->sender = 0;
+        imsgctl->queue_end->recipient = 0;
+
+        /* queue editing finished */
+        SpinLockRelease(&imsgctl->msgs_lck);
+
+#ifdef IMSG_DEBUG
+    elog(DEBUG3, "IMessageCreate(): created at %08X size: %d (next: %08X)",
+         (int) msg, msg->size, (unsigned int) imsgctl->queue_end);
+#endif
+    }
+    END_CRIT_SECTION();
+
+    return msg;
+}
+
+void
+IMessageForward(IMessage *msg, int new_recipient)
+{
+    msg->recipient = new_recipient;
+    msg->sender = 0;
+
+    IMessageActivate(msg);
+}
+
+void
+IMessageActivate(IMessage *msg)
+{
+    msg->sender = MyProcPid;
+
+    /* TODO: use PGPROC to determine if the recipient wants to be signaled,
+     *       probably we can save that signaling step in certain occasions.
+     */
+
+    /* send a signal to the recipient */
+    kill(msg->recipient, SIGUSR1);
+}
+
+/*
+ *   IMessageRemove
+ *
+ * Marks a message as removable by setting the recipient to null. The message
+ * will eventually be removed during creation of new messages, see
+ * IMessageCreate().
+ */
+void
+IMessageRemove(IMessage *msg)
+{
+    msg->recipient = 0;
+}
+
+/*
+ *   IMessageCheck
+ *
+ * Checks if there is a message in the queue for this process. Returns null
+ * if there is no message for this process, the message header otherwise. The
+ * message remains in the queue and should be removed by IMessageRemove().
+ */
+IMessage*
+IMessageCheck(void)
+{
+    IMessage       *msg,
+                   *res;
+
+    res = NULL;
+    START_CRIT_SECTION();
+    {
+        /* use volatile pointer to prevent code rearrangement */
+        volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+        SpinLockAcquire(&imsgctl->msgs_lck);
+
+        /* Loop through the queue from the start. Wraping might be
+         * required */
+        msg = imsgctl->queue_start;
+        while (1)
+        {
+            if (((int) msg >= (int) imsgctl->queue_start) &&
+                ((int) imsgctl->queue_start > (int) imsgctl->queue_end))
+            {
+                if ((msg->sender == 0) &&
+                    (msg->recipient == 0))
+                {
+                    msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+                    continue;
+                }
+            }
+            else if (msg >= imsgctl->queue_end)
+                break;
+
+            if ((msg->sender != 0) && (msg->recipient == MyProcPid))
+            {
+                res = msg;
+                break;
+            }
+
+            msg = (IMessage*) ((int) msg +
+                    IMSG_ALIGN(msg->size + sizeof(IMessage)));
+        }
+
+        SpinLockRelease(&imsgctl->msgs_lck);
+    }
+    END_CRIT_SECTION();
+
+#ifdef IMSG_DEBUG
+    if (res == NULL)
+        elog(DEBUG3, "IMessageCheck(): no new message for %d.", MyProcPid);
+    else
+        elog(DEBUG3, "IMessageCheck(): new message of size %d for %d.",
+                msg->size, MyProcPid);
+#endif
+
+    return res;
+}
+
+/*
+ *   IMessageAwait
+ *
+ * Waits for a message but leaves the message in the queue.
+ */
+IMessage*
+IMessageAwait(void)
+{
+    IMessage       *msg;
+    struct timeval    tv;
+
+    msg = IMessageCheck();
+    while (!msg)
+    {
+        /*
+         * TODO: we want to wait for signals here. Check if select() is
+         * appropriate. Maybe pause() is better, but how about portability?
+         * However, make sure we have a timeout here, since we could
+         * probably miss a signal.
+         */
+        tv.tv_sec = 2;
+        tv.tv_usec = 0;
+        select(1, NULL, NULL, NULL, &tv);
+        // pause();
+        msg = IMessageCheck();
+    }
+
+    return msg;
+}
+
+/*
+ *   IMessageGetReadBuffer
+ *
+ * gets a readable buffer for the given message
+ */
+buffer *
+IMessageGetReadBuffer(IMessage *msg)
+{
+    buffer *b = palloc(sizeof(buffer));
+
+    Assert(msg);
+    Assert(msg->size > 0);
+
+    init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+    b->fill_size = msg->size;
+
+    return b;
+}
+
+/*
+ *   IMessageGetWriteBuffer
+ *
+ * gets a writeable buffer for the given message
+ */
+buffer *
+IMessageGetWriteBuffer(IMessage *msg)
+{
+    buffer *b = palloc(sizeof(buffer));
+
+    init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+
+    return b;
+}
+
+void
+IMessageFreeBuffer(buffer *b)
+{
+    pfree(b);
+}
============================================================
--- src/include/storage/imsg.h    3cf37b12a00b90f65b8393fc5e27c98d772dc22b
+++ src/include/storage/imsg.h    3cf37b12a00b90f65b8393fc5e27c98d772dc22b
@@ -0,0 +1,85 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <markus@bluegap.ch>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef IMSG_H
+#define IMSG_H
+
+#include <sys/types.h>
+#include "storage/spin.h"
+#include "storage/buffer.h"
+
+/* TODO: replace with GUC variable to be configurable */
+#define IMessageBufferSize 8388608        /* 8 MB */
+
+/* alignment for messages (8 bytes) */
+#define IMSG_ALIGN(size) (((size) + 7) & 0xFFFFFFF8)
+
+/* for convinience to buffer access */
+#define IMSG_BUFFER_START(imsgctl) ((int) \
+            (IMSG_ALIGN((int) imsgctl + sizeof(IMessageCtlData))))
+
+#define IMSG_BUFFER_END(imsgctl) ((int) \
+            (IMSG_ALIGN((int) imsgctl + MAXALIGN(IMessageBufferSize))))
+
+/* get a data pointer from the header */
+#define IMSG_DATA(imsg) ((void*) ((int) imsg + sizeof(IMessage)))
+
+/*
+ * Message descriptor in front of the message
+ */
+typedef struct
+{
+    /* pid of the sender, null means not yet activated message */
+    pid_t        sender;
+
+    /* pid of the recipient, null meaning has already been received */
+    pid_t        recipient;
+
+    /* message size following, but not including this header */
+    int            size;
+} IMessage;
+
+/*
+ * shared-memory pool for internal messages.
+ */
+typedef struct
+{
+    /* currently active messages */
+    unsigned int        count_messages;
+
+    /* start of messages within the cycling queue */
+    IMessage           *queue_start;
+
+    /* next free place, just after the last message */
+    IMessage           *queue_end;
+
+    /* lock for editing the message queue */
+    slock_t                msgs_lck;
+} IMessageCtlData;
+
+/* the global variable storing pointer to the shared memory area */
+extern IMessageCtlData *RmgrCtl;
+
+/* routines to send and receive internal messages */
+extern int IMessageShmemSize(void);
+extern void IMessageShmemInit(void);
+extern IMessage* IMessageCreate(int recipient, int msg_size);
+extern void IMessageForward(IMessage *msg, int new_recipient);
+extern void IMessageActivate(IMessage *msg);
+extern void IMessageRemove(IMessage *msg);
+extern IMessage* IMessageCheck(void);
+extern IMessage* IMessageAwait(void);
+
+extern buffer *IMessageGetReadBuffer(IMessage *msg);
+extern buffer *IMessageGetWriteBuffer(IMessage *msg);
+extern void IMessageFreeBuffer(buffer *b);
+
+#endif   /* IMSG_H */
============================================================
--- src/backend/storage/ipc/Makefile    71276ab6483aebbb27f87c988d77ab876611f190
+++ src/backend/storage/ipc/Makefile    9a99101d3e8bbfe52c97763db536804e94371828
@@ -16,7 +16,7 @@ OBJS = ipc.o ipci.o pmsignal.o procarray
 endif

 OBJS = ipc.o ipci.o pmsignal.o procarray.o shmem.o shmqueue.o \
-    sinval.o sinvaladt.o
+    sinval.o sinvaladt.o imsg.o buffer.o

 all: SUBSYS.o

============================================================
--- src/backend/storage/ipc/ipci.c    177f266b4668190a6ab1f2902305f7b7e577ef8d
+++ src/backend/storage/ipc/ipci.c    1971e2122ba4455c8b9784e70059d917fdf4f4c8
@@ -24,6 +24,7 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
 #include "storage/freespace.h"
+#include "storage/imsg.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
 #include "storage/pmsignal.h"
@@ -110,6 +111,7 @@ CreateSharedMemoryAndSemaphores(bool mak
         size = add_size(size, FreeSpaceShmemSize());
         size = add_size(size, BgWriterShmemSize());
         size = add_size(size, BTreeShmemSize());
+        size = add_size(size, IMessageShmemSize());
 #ifdef EXEC_BACKEND
         size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -178,6 +180,7 @@ CreateSharedMemoryAndSemaphores(bool mak
     SUBTRANSShmemInit();
     TwoPhaseShmemInit();
     MultiXactShmemInit();
+    IMessageShmemInit();
     InitBufferPool();

     /*