Re: Proposal : For Auto-Prewarm. - Mailing list pgsql-hackers

From andres@anarazel.de (Andres Freund)
Subject Re: Proposal : For Auto-Prewarm.
Date
Msg-id 20170405224222.l2nwmg5j5sd4q6qr@alap3.anarazel.de
Whole thread Raw
In response to Re: [HACKERS] Proposal : For Auto-Prewarm.  (Mithun Cy <mithun.cy@enterprisedb.com>)
List pgsql-hackers
On 2017-03-13 18:45:00 +0530, Mithun Cy wrote:
> I have implemented a similar logic now. The prewarm bgworker will
> launch a sub-worker per database in the dump file. And, each
> sub-worker will load its database block info. The sub-workers will be
> launched only after previous one is finished. All of this will only
> start if the database has reached a consistent state.

Hm. For replay performance it'd possibly be good to start earlier,
before reaching consistency.  Is there an issue starting earlier?


> diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
> new file mode 100644
> index 0000000..f4b34ca
> --- /dev/null
> +++ b/contrib/pg_prewarm/autoprewarm.c
> @@ -0,0 +1,1137 @@
> +/*-------------------------------------------------------------------------
> + *
> + * autoprewarm.c
> + *
> + * -- Automatically prewarm the shared buffer pool when server restarts.

Don't think we ususally use -- here.


> + *    Copyright (c) 2013-2017, PostgreSQL Global Development Group

Hm, that's a bit of a weird date range.


> + *    IDENTIFICATION
> + *        contrib/pg_prewarm.c/autoprewarm.c
> + *-------------------------------------------------------------------------
> + */

The pg_prewarm.c in there looks like some search & replace gone awry.



> +#include "postgres.h"
> +#include <unistd.h>
> +
> +/* These are always necessary for a bgworker. */
> +#include "miscadmin.h"
> +#include "postmaster/bgworker.h"
> +#include "storage/ipc.h"
> +#include "storage/latch.h"
> +#include "storage/lwlock.h"
> +#include "storage/proc.h"
> +#include "storage/shmem.h"
> +
> +/* These are necessary for prewarm utilities. */
> +#include "pgstat.h"
> +#include "storage/buf_internals.h"
> +#include "storage/smgr.h"
> +#include "utils/memutils.h"
> +#include "utils/resowner.h"
> +#include "utils/guc.h"
> +#include "catalog/pg_class.h"
> +#include "catalog/pg_type.h"
> +#include "executor/spi.h"
> +#include "access/xact.h"
> +#include "utils/rel.h"
> +#include "port/atomics.h"

I'd rather just sort these alphabetically.




I think this should rather be in the initial header.

> +/*
> + * autoprewarm :
> + *
> + * What is it?
> + * ===========
> + * A bgworker which automatically records information about blocks which were
> + * present in buffer pool before server shutdown and then prewarm the buffer
> + * pool upon server restart with those blocks.
> + *
> + * How does it work?
> + * =================
> + * When the shared library "pg_prewarm" is preloaded, a
> + * bgworker "autoprewarm" is launched immediately after the server has reached
> + * consistent state. The bgworker will start loading blocks recorded in the
> + * format BlockInfoRecord
> + * <<DatabaseId,TableSpaceId,RelationId,Forknum,BlockNum>> in
> + * $PGDATA/AUTOPREWARM_FILE, until there is a free buffer left in the buffer
> + * pool. This way we do not replace any new blocks which were loaded either by
> + * the recovery process or the querying clients.

s/until there is a/until there is no/?


> +/*
> + * ============================================================================
> + * ===========================     SIGNAL HANDLERS    ===========================
> + * ============================================================================
> + */

Hm...

> +static void sigtermHandler(SIGNAL_ARGS);
> +static void sighupHandler(SIGNAL_ARGS);

I don't think that's a casing we commonly use.  We mostly use CamelCase
or underscore_case.


> +/*
> + *    Signal handler for SIGUSR1.
> + */
> +static void
> +sigusr1Handler(SIGNAL_ARGS)
> +{
> +    int            save_errno = errno;
> +
> +    if (MyProc)
> +        SetLatch(&MyProc->procLatch);
> +
> +    errno = save_errno;
> +}

Hm, what's this one for?


> +/*
> + * Shared state information about the running autoprewarm bgworker.
> + */
> +typedef struct AutoPrewarmSharedState
> +{
> +    pg_atomic_uint32 current_task;        /* current tasks performed by
> +                                         * autoprewarm workers. */
> +} AutoPrewarmSharedState;

Hm.  Why do we need atomics here?  I thought there's no concurrency?


> +/*
> + * sort_cmp_func - compare function used for qsort().
> + */
> +static int
> +sort_cmp_func(const void *p, const void *q)
> +{

rename to blockinfo_cmp?



> +static AutoPrewarmTask
> +get_autoprewarm_task(AutoPrewarmTask todo_task)
> +{
> +    bool        found;
> +
> +    state = NULL;
> +
> +    LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
> +    state = ShmemInitStruct("autoprewarm",
> +                            sizeof(AutoPrewarmSharedState),
> +                            &found);
> +    if (!found)
> +        pg_atomic_write_u32(&(state->current_task), todo_task);

Superflous parens (repeated a lot).


> +    LWLockRelease(AddinShmemInitLock);
> +
> +    /* If found check if we can go ahead. */
> +    if (found)
> +    {
> +        if (pg_atomic_read_u32(&(state->current_task)) ==
> +            TASK_PREWARM_BUFFERPOOL)

You repeat the read in every branch - why don't you store it in a
variable instead?

That aside, the use of an atomic doesn't seem to actually gain us
anything here.  If we need control over concurrency it seems a lot
better to instead use a lwlock or spinlock.  There's no contention here,
using lock-free stuff just increases complexity without a corresponding
benefit.


> +        {
> +            if (todo_task == TASK_PREWARM_BUFFERPOOL)
> +            {
> +                /*
> +                 * we were prewarming and we are back to do same, time to
> +                 * abort prewarming and move to dumping.
> +                 */

I'm not sure what "back to do same" should mean here - changing to a
different type of task surely is not the same.


> +                pg_atomic_write_u32(&(state->current_task),
> +                                    TASK_DUMP_BUFFERPOOL_INFO);
> +                return TASK_DUMP_BUFFERPOOL_INFO;
> +            }
> +            else
> +                return TASK_END;    /* rest all cannot proceed further. */

What does that comment mean?


> +        }
> +        else if (pg_atomic_read_u32(&(state->current_task)) ==
> +                 TASK_DUMP_IMMEDIATE_ONCE)
> +        {
> +            uint32        current_state = TASK_DUMP_IMMEDIATE_ONCE;
> +
> +            /* We cannot do a TASK_PREWARM_BUFFERPOOL but rest can go ahead */
> +            if (todo_task == TASK_DUMP_IMMEDIATE_ONCE)
> +                return TASK_DUMP_IMMEDIATE_ONCE;
> +
> +            if (todo_task == TASK_PREWARM_BUFFERPOOL)
> +                todo_task = TASK_DUMP_BUFFERPOOL_INFO;    /* skip to do dump only */
> +
> +            /*
> +             * first guy who can atomically set the current_task get the
> +             * opportunity to proceed further
> +             */
> +            if (pg_atomic_compare_exchange_u32(&(state->current_task),
> +                                               ¤t_state,
> +                                               TASK_DUMP_BUFFERPOOL_INFO))
> +            {
> +                /* Wow! We won the race proceed with the task. */
> +                return TASK_DUMP_BUFFERPOOL_INFO;
> +            }
> +            else
> +                return TASK_END;

Note that it's not generally guaranteed that any
pg_atomic_compare_exchange_u32 actually wins, it could temporarily fail
for all.


> +/*
> + * getnextblockinfo -- given a BlkType get its next BlockInfoRecord from the
> + *                       dump file.
> + */
> +static BlkType
> +getnextblockinfo(FILE *file, BlockInfoRecord *currblkinfo, BlkType reqblock,
> +                 BlockInfoRecord *newblkinfo)
> +{
> +    BlkType        nextblk;
> +
> +    while (true)
> +    {
> +        /* get next block. */
> +        if (5 != fscanf(file, "%u,%u,%u,%u,%u\n", &(newblkinfo->database),
> +                        &(newblkinfo->spcNode), &(newblkinfo->filenode),
> +                        (uint32 *) &(newblkinfo->forknum),
> +                        &(newblkinfo->blocknum)))
> +            return BLKTYPE_END; /* No more valid entry hence stop processing. */

Hm.  Is it actually helpful to store the file as text?  That's commonly
going to increase the size of the file quite considerably, no?

> +/*
> + * GetRelOid -- given a filenode get its relation oid.
> + */
> +static Oid
> +get_reloid(Oid filenode)
> +{

Function and comment don't agree on naming.


But what is this actually used for?  I thought Robert, in
http://archives.postgresql.org/message-id/CA%2BTgmoa%3DUqCL2mR%2B9WTq05tB3Up-z4Sv2wkzkDxDwBP7Mj_2_w%40mail.gmail.com
suggested storing the filenode in the dump, and then to use
RelidByRelfilenode to get the corresponding relation?

It seems a lot better to use relfilenodes, because otherwise table
rewrites will lead to reloading wrong things.


> +    int            ret;
> +    Oid            relationid;
> +    bool        isnull;
> +    Datum        value[1] = {ObjectIdGetDatum(filenode)};
> +    StringInfoData buf;
> +    Oid            ptype[1] = {OIDOID};
> +
> +    initStringInfo(&buf);
> +    appendStringInfo(&buf,
> +            "select oid from pg_class where pg_relation_filenode(oid) = $1");
> +
> +    ret = SPI_execute_with_args(buf.data, 1, (Oid *) &ptype, (Datum *) &value,
> +                                NULL, true, 1);
> +
> +    if (ret != SPI_OK_SELECT)
> +        ereport(FATAL, (errmsg("SPI_execute failed: error code %d", ret)));
> +
> +    if (SPI_processed < 1)
> +        return InvalidOid;
> +
> +    relationid = DatumGetObjectId(SPI_getbinval(SPI_tuptable->vals[0],
> +                                                SPI_tuptable->tupdesc,
> +                                                1, &isnull));
> +    if (isnull)
> +        return InvalidOid;
> +
> +    return relationid;
> +}

Doing this via SPI doesn't strike me as a good idea - that's really
quite expensive.  Why not call the underlying function directly?


> +/*
> + * load_one_database -- start of prewarm sub-worker, this will try to load
> + * blocks of one database starting from block info position passed by main
> + * prewarm worker.
> + */
> +void
> +load_one_database(Datum main_arg)
> +{

> +    /* check if file exists and open file in read mode. */
> +    snprintf(dump_file_path, sizeof(dump_file_path), "%s", AUTOPREWARM_FILE);
> +    file = fopen(dump_file_path, PG_BINARY_R);
> +    if (!file)
> +        return;                    /* No file to load. */

Shouldn't this be an error case?  In which case is it ok for the file to
be gone after we launched the worker?


> +    /*
> +     * It should be a block info belonging to a new database. Or else dump
> +     * file is corrupted better to end the loading of bocks now.
> +     */
> +    if (loadblocktype != BLKTYPE_NEW_DATABASE)
> +        goto end_load;            /* should we raise a voice here? */

Yes, this should raise an error.



> +            case BLKTYPE_NEW_RELATION:
> +
> +                /*
> +                 * release lock on previous relation.
> +                 */
> +                if (rel)
> +                {
> +                    relation_close(rel, AccessShareLock);
> +                    rel = NULL;
> +                }
> +
> +                loadblocktype = BLKTYPE_NEW_RELATION;
> +
> +                /*
> +                 * lock new relation.
> +                 */
> +                reloid = get_reloid(toload_block.filenode);
> +
> +                if (!OidIsValid(reloid))
> +                    break;
> +
> +                rel = try_relation_open(reloid, AccessShareLock);
> +                if (!rel)
> +                    break;
> +                RelationOpenSmgr(rel);

Now I'm confused.  Your get_reloid used pg_relation_filenode() to map
from relation oid to filenode - and then you're using it to lock the
relation?  Something's wrong.


> +            case BLKTYPE_NEW_FORK:
> +
> +                /*
> +                 * check if fork exists and if block is within the range
> +                 */
> +                loadblocktype = BLKTYPE_NEW_FORK;
> +                if (            /* toload_block.forknum > InvalidForkNumber &&
> +                                 * toload_block.forknum <= MAX_FORKNUM && */
> +                    !smgrexists(rel->rd_smgr, toload_block.forknum))
> +                    break;

Huh? What's with that commented out section of code?


> +            case BLKTYPE_NEW_BLOCK:
> +
> +                /* check if blocknum is valid and with in fork file size. */
> +                if (toload_block.blocknum >= nblocks)
> +                {
> +                    /* move to next forknum. */
> +                    loadblocktype = BLKTYPE_NEW_FORK;
> +                    break;
> +                }

Hm. Why does the size of the underlying file allow us to skip to the
next fork? Don't we have to read all the pending dump records?


> +                buf = ReadBufferExtended(rel, toload_block.forknum,
> +                                         toload_block.blocknum, RBM_NORMAL,
> +                                         NULL);
> +                if (BufferIsValid(buf))
> +                {
> +                    ReleaseBuffer(buf);
> +                }
> +
> +                loadblocktype = BLKTYPE_NEW_BLOCK;
> +                break;

Hm. RBM_NORMAL will error out in a bunch of cases, is that ok?

> +    if (have_dbconnection)
> +    {
> +        SPI_finish();
> +        PopActiveSnapshot();
> +        CommitTransactionCommand();
> +    }
> +    return;
> +}

Are we really ok keeping open a transaction through all of this? That
could potentially be quite long, no?  How about doing that on a per-file
basis, or even moving to session locks alltogether?



> +/* This sub-module is for periodically dumping buffer pool's block info into
> + * a dump file AUTOPREWARM_FILE.
> + * Each entry of block info looks like this:
> + * <DatabaseId,TableSpaceId,RelationId,Forknum,BlockNum> and we shall call it
> + * as BlockInfoRecord.
> + *
> + * Contents of AUTOPREWARM_FILE has been formated such a way that
> + * blockInfoRecord of each database can be given to different prewarm workers.
> + *
> + *    format of AUTOPREWAM_FILE
> + *    =======================================
> + *    [offset position of database map table]
> + *    [sorted BlockInfoRecords..............]
> + *    [database map table]
> + *    =======================================

This doesn't mention storing things as ascii, instead of binary...


> + *    The [database map table] is sequence of offset in file which will point to
> + *    first BlockInfoRecords of each database in the dump. The prewarm worker
> + *    will read this offset one by one in sequence and ask its subworker to seek
> + *    to this position and then start loading the BlockInfoRecords one by one
> + *    until it see a BlockInfoRecords of a different database than it is actually
> + *    connected to.
> + *    NOTE : We store off_t inside file so the dump file will not be portable to
> + *    be used across systems where sizeof off_t is different from each other.
> + */

Why are we using off_t? Shouldn't this just be BlockNumber?


> +static uint32
> +dump_now(void)
> +{
> +    static char dump_file_path[MAXPGPATH],

> +
> +    for (num_blocks = 0, i = 0; i < NBuffers; i++)
> +    {
> +        uint32        buf_state;
> +
> +        bufHdr = GetBufferDescriptor(i);
> +
> +        /* lock each buffer header before inspecting. */
> +        buf_state = LockBufHdr(bufHdr);
> +
> +        if (buf_state & BM_TAG_VALID)
> +        {
> +            block_info_array[num_blocks].database = bufHdr->tag.rnode.dbNode;
> +            block_info_array[num_blocks].spcNode = bufHdr->tag.rnode.spcNode;
> +            block_info_array[num_blocks].filenode = bufHdr->tag.rnode.relNode;
> +            block_info_array[num_blocks].forknum = bufHdr->tag.forkNum;
> +            block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum;
> +            ++num_blocks;
> +        }
> +
> +        UnlockBufHdr(bufHdr, buf_state);

> +    }
> +
> +    /* sorting now only to avoid sorting while loading. */

"sorting while loading"? You mean random accesses?

> +    pg_qsort(block_info_array, num_blocks, sizeof(BlockInfoRecord),
> +             sort_cmp_func);



> +    snprintf(transient_dump_file_path, sizeof(dump_file_path),
> +             "%s.%d", AUTOPREWARM_FILE, MyProcPid);
> +    file = fopen(transient_dump_file_path, "w");
> +    if (file == NULL)
> +        ereport(ERROR,
> +                (errcode_for_file_access(),
> +                 errmsg("autoprewarm: could not open \"%s\": %m",
> +                        dump_file_path)));

What if that file already exists? You're not truncating it.  Also, what
if we error out in the middle of this? We'll leak an fd.  I think this
needs to use OpenTransientFile etc.

> +    snprintf(dump_file_path, sizeof(dump_file_path),
> +             "%s", AUTOPREWARM_FILE);
> +    ret = fprintf(file, "%020jd\n", (intmax_t) 0);
> +    if (ret < 0)
> +    {
> +        fclose(file);
> +        ereport(ERROR,
> +                (errcode_for_file_access(),
> +                 errmsg("autoprewarm: error writing to \"%s\" : %m",
> +                        dump_file_path)));
> +    }
> +
> +    database_map_table[num_db++] = ftello(file);
> +
> +    for (i = 0; i < num_blocks; i++)
> +    {
> +        if (i > 0 && block_info_array[i].database != prev_database)
> +        {
> +            if (num_db == database_map_table_size)
> +            {
> +                database_map_table_size *= 2;    /* double and repalloc. */
> +                database_map_table =
> +                    (off_t *) repalloc(database_map_table,
> +                                    sizeof(off_t) * database_map_table_size);
> +            }
> +            fflush(file);
> +            database_map_table[num_db++] = ftello(file);
> +        }
> +
> +        ret = fprintf(file, "%u,%u,%u,%u,%u\n",
> +                      block_info_array[i].database,
> +                      block_info_array[i].spcNode,
> +                      block_info_array[i].filenode,
> +                      (uint32) block_info_array[i].forknum,
> +                      block_info_array[i].blocknum);
> +        if (ret < 0)
> +        {
> +            fclose(file);
> +            ereport(ERROR,
> +                    (errcode_for_file_access(),
> +                     errmsg("autoprewarm: error writing to \"%s\" : %m",
> +                            dump_file_path)));
> +        }
> +
> +        prev_database = block_info_array[i].database;
> +    }

I think we should check for interrupts somewhere in that (and the
preceding) loop.

> +/*
> + * dump_block_info_periodically - at regular intervals, which is defined by GUC
> + * dump_interval, dump the info of blocks which are present in buffer pool.
> + */
> +void
> +dump_block_info_periodically()
> +{

Suggest adding void to the parameter list.


> +    pg_time_t    last_dump_time = (pg_time_t) time(NULL);
> +
> +    while (!got_sigterm)
> +    {
> +        int            rc;
> +        pg_time_t    now;
> +        int            elapsed_secs = 0,
> +                    timeout = AT_PWARM_DEFAULT_DUMP_INTERVAL;
> +
> +        if (dump_interval > AT_PWARM_DUMP_AT_SHUTDOWN_ONLY)
> +        {
> +            now = (pg_time_t) time(NULL);
> +            elapsed_secs = now - last_dump_time;
> +
> +            if (elapsed_secs > dump_interval)
> +            {
> +                dump_now();
> +                if (got_sigterm)
> +                    return;        /* got shutdown signal just after a dump. And,
> +                                 * I think better to return now. */
> +                last_dump_time = (pg_time_t) time(NULL);
> +                elapsed_secs = 0;
> +            }
> +
> +            timeout = dump_interval - elapsed_secs;
> +        }

I suggest using GetCurrenttimstamp() and TimestampDifferenceExceeds()
instead.


> +        /* Has been set not to dump. Nothing more to do. */
> +        if (dump_interval == AT_PWARM_OFF)
> +            return;
> +
> +        ResetLatch(&MyProc->procLatch);
> +        rc = WaitLatch(&MyProc->procLatch,
> +                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
> +                       timeout * 1000, PG_WAIT_EXTENSION);
> +
> +        if (rc & WL_POSTMASTER_DEATH)
> +            proc_exit(1);
> +
> +        /*
> +         * In case of a SIGHUP, just reload the configuration.
> +         */
> +        if (got_sighup)
> +        {
> +            got_sighup = false;
> +            ProcessConfigFile(PGC_SIGHUP);
> +        }
> +    }
> +
> +    /* One last block meta info dump while postmaster shutdown. */
> +    if (dump_interval != AT_PWARM_OFF)
> +        dump_now();

Uh, afaics we'll also do this if somebody SIGTERMed the process
interactively?


> +/* Extension's entry point. */
> +void
> +_PG_init(void)
> +{
> +    BackgroundWorker autoprewarm;
> +
> +    /* Define custom GUC variables. */
> +    DefineCustomIntVariable("pg_prewarm.dump_interval",
> +                       "Sets the maximum time between two buffer pool dumps",
> +                            "If set to Zero, timer based dumping is disabled."
> +                            " If set to -1, stops the running autoprewarm.",
> +                            &dump_interval,
> +                            AT_PWARM_DEFAULT_DUMP_INTERVAL,
> +                            AT_PWARM_OFF, INT_MAX / 1000,
> +                            PGC_SIGHUP,
> +                            GUC_UNIT_S,
> +                            NULL,
> +                            NULL,
> +                            NULL);
> +
> +    /* if not run as a preloaded library, nothing more to do here! */
> +    if (!process_shared_preload_libraries_in_progress)
> +        return;
> +
> +    DefineCustomStringVariable("pg_prewarm.default_database",
> +                "default database to connect if dump has not recorded same.",
> +                               NULL,
> +                               &default_database,
> +                               "postgres",
> +                               PGC_POSTMASTER,
> +                               0,
> +                               NULL,
> +                               NULL,
> +                               NULL);

I don't think it's a good idea to make guc registration depending on
process_shared_preload_libraries_in_progress.


You should also use EmitWarningsOnPlaceholders() somewhere here.




I also wonder whether we don't need to use prefetch to actually make
this fast enough.


I think it's pretty clear that this needs a bit more work and thus won't
be ready for v10.  Moved to the next CF.


- Andres



pgsql-hackers by date:

Previous
From: David Rowley
Date:
Subject: Re: multivariate statistics (v25)
Next
From: Andres Freund
Date:
Subject: Re: PATCH: Batch/pipelining support for libpq