Re: [HACKERS] Parallel Hash take II - Mailing list pgsql-hackers

From Andres Freund
Subject Re: [HACKERS] Parallel Hash take II
Date
Msg-id 20171107210155.kuksdd324kgz5oev@alap3.anarazel.de
Whole thread Raw
In response to Re: [HACKERS] Parallel Hash take II  (Thomas Munro <thomas.munro@enterprisedb.com>)
Responses Re: [HACKERS] Parallel Hash take II  (Peter Geoghegan <pg@bowt.ie>)
Re: [HACKERS] Parallel Hash take II  (Robert Haas <robertmhaas@gmail.com>)
List pgsql-hackers
Hi,

Here's a review of v24

+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+delete from bigger_than_it_looks where id <= 19000;
+vacuum bigger_than_it_looks;
+analyze bigger_than_it_looks;
+insert into bigger_than_it_looks
+  select generate_series(1, 19000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';

It seems kinda easier to just manipulate ndistinct and reltuples...


+set max_parallel_workers_per_gather = 0;
+set work_mem = '4MB';

I hope there's a fair amount of slop here - with different archs you're
going to see quite some size differences.

+-- The "good" case: batches required, but we plan the right number; we
+-- plan for 16 batches, and we stick to that number, and peak memory
+-- usage says within our work_mem budget
+-- non-parallel
+set max_parallel_workers_per_gather = 0;
+set work_mem = '128kB';

So how do we know that's actually the case we're testing rather than
something arbitrarily different? There's IIRC tests somewhere that just
filter the json explain output to the right parts...





+/*
+ * Build the name for a given segment of a given BufFile.
+ */
+static void
+MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
+{
+    snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
+}

Not a fan of this name - you're not "making" a filename here (as in
allocating or such). I think I'd just remove the Make prefix.



+/*
+ * Open a file that was previously created in another backend with
+ * BufFileCreateShared in the same SharedFileSet using the same name.  The
+ * backend that created the file must have called BufFileClose() or
+ * BufFileExport() to make sure that it is ready to be opened by other
+ * backends and render it read-only.
+ */

Is it actually guaranteed that it's another backend / do we rely on
that?

+BufFile *
+BufFileOpenShared(SharedFileSet *fileset, const char *name)
+{

+    /*
+     * If we didn't find any files at all, then no BufFile exists with this
+     * tag.
+     */
+    if (nfiles == 0)
+        return NULL;

s/taag/name/?


+/*
+ * Delete a BufFile that was created by BufFileCreateShared in the given
+ * SharedFileSet using the given name.
+ *
+ * It is not necessary to delete files explicitly with this function.  It is
+ * provided only as a way to delete files proactively, rather than waiting for
+ * the SharedFileSet to be cleaned up.
+ *
+ * Only one backend should attempt to delete a given name, and should know
+ * that it exists and has been exported or closed.
+ */
+void
+BufFileDeleteShared(SharedFileSet *fileset, const char *name)
+{
+    char        segment_name[MAXPGPATH];
+    int            segment = 0;
+    bool        found = false;
+
+    /*
+     * We don't know how many segments the file has.  We'll keep deleting
+     * until we run out.  If we don't manage to find even an initial segment,
+     * raise an error.
+     */
+    for (;;)
+    {
+        MakeSharedSegmentName(segment_name, name, segment);
+        if (!SharedFileSetDelete(fileset, segment_name, true))
+            break;
+        found = true;
+        ++segment;
+    }

Hm. Do we properly delete all the files via the resowner mechanism if
this fails midway? I.e. if there are no leading segments? Also wonder if
this doesn't need a CFI check.

+void
+PathNameCreateTemporaryDir(const char *basedir, const char *directory)
+{
+    if (mkdir(directory, S_IRWXU) < 0)
+    {
+        if (errno == EEXIST)
+            return;
+
+        /*
+         * Failed.  Try to create basedir first in case it's missing. Tolerate
+         * ENOENT to close a race against another process following the same
+         * algorithm.
+         */
+        if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT)
+            elog(ERROR, "cannot create temporary directory \"%s\": %m",
+                 basedir);

ENOENT or EEXIST?



+File
+PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
+{
+    File        file;
+
+    /*
+     * Open the file.  Note: we don't use O_EXCL, in case there is an orphaned
+     * temp file that can be reused.
+     */
+    file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY);
+    if (file <= 0)
+    {
+        if (error_on_failure)
+            elog(ERROR, "could not create temporary file \"%s\": %m", path);
+        else
+            return file;
+    }
+
+    /* Mark it for temp_file_limit accounting. */
+    VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
+
+    /*
+     * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we still
+     * want to make sure they get closed at end of xact.
+     */
+    ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+    ResourceOwnerRememberFile(CurrentResourceOwner, file);
+    VfdCache[file].resowner = CurrentResourceOwner;

So maybe I'm being pedantic here, but wouldn't the right order be to do
ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
allocating operation, so it can fail, which'd leak the file.

+/*
+ * Open a file that was created with PathNameCreateTemporaryFile, possibly in
+ * another backend.  Files opened this way don't count agains the

s/agains/against/

+ * temp_file_limit of the caller, are read-only and are automatically closed
+ * at the end of the transaction but are not deleted on close.
+ */
+File
+PathNameOpenTemporaryFile(const char *path)
+{
+    File        file;
+
+    /* We open the file read-only. */
+    file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+    /* If no such file, then we don't raise an error. */
+    if (file <= 0 && errno != ENOENT)
+        elog(ERROR, "could not open temporary file \"%s\": %m", path);
+
+    if (file > 0)
+    {
+        /*
+         * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we
+         * still want to make sure they get closed at end of xact.
+         */
+        ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+        ResourceOwnerRememberFile(CurrentResourceOwner, file);
+        VfdCache[file].resowner = CurrentResourceOwner;

Same complaint as above, ResourceOwnerEnlargeFiles() should be done
earlier.


+/*
+ * Delete a file by pathname.  Return true if the file existed, false if
+ * didn't.
+ */
+bool
+PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
+{
+    struct stat filestats;
+    int            stat_errno;
+
+    /* Get the final size for pgstat reporting. */
+    if (stat(path, &filestats) != 0)
+        stat_errno = errno;
+    else
+        stat_errno = 0;
+
+    /*
+     * Unlike FileClose's automatic file deletion code, we tolerate
+     * non-existence to support BufFileDeleteShared which doesn't know how
+     * many segments it has to delete until it runs out.
+     */
+    if (stat_errno == ENOENT)
+        return false;
+
+    if (unlink(path) < 0)
+    {
+        if (errno != ENOENT)
+            elog(error_on_failure ? ERROR : LOG,
+                 "cannot unlink temporary file \"%s\": %m", path);
+        return false;
+    }
+
+    if (stat_errno == 0)
+        ReportTemporaryFileUsage(path, filestats.st_size);
+    else
+    {
+        errno = stat_errno;
+        elog(LOG, "could not stat file \"%s\": %m", path);
+    }

All these messages are "not expected to ever happen" ones, right?

+    return true;
+}
+/* * close a file when done with it */
@@ -1537,10 +1747,17 @@ FileClose(File file)        Delete(file);    }
+    if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
+    {
+        /* Subtract its size from current usage (do first in case of error) */
+        temporary_files_size -= vfdP->fileSize;
+        vfdP->fileSize = 0;
+    }

So, is it right to do so unconditionally and without regard for errors?
If the file isn't deleted, it shouldn't be subtracted from fileSize. I
guess you're managing that through the flag, but that's not entirely
obvious.

diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
new file mode 100644
index 00000000000..6da80838b37
--- /dev/null
+++ b/src/backend/storage/file/sharedfileset.c
@@ -0,0 +1,240 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedfileset.c
+ *      Shared temporary file management.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *      src/backend/storage/file/sharedfileset.c
+ *
+ *-------------------------------------------------------------------------
+ */

A slightly bigger comment wouldn't hurt.



+/*
+ * Attach to a set of directories that was created with SharedFileSetInit.
+ */
+void
+SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
+{
+    bool        success;
+
+    SpinLockAcquire(&fileset->mutex);
+    if (fileset->refcnt == 0)
+        success = false;

I've not read finished reading through this, but is this safe? If the
segment's gone, is the spinlock guaranteed to still be a spinlock?  I
suspect this isn't a problem because just the underlying data is
removed, but the SharedFileSet stays alive?

+static void
+GetSharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace)
+{
+    char        tempdirpath[MAXPGPATH];
+    
+    GetTempTablespacePath(tempdirpath, tablespace);
+    snprintf(path, MAXPGPATH, "%s/%s%d.%d.sharedfileset" PG_TEMP_SUBDIR_SUFFIX,
+             tempdirpath, PG_TEMP_FILE_PREFIX,
+             fileset->creator_pid, fileset->number);
+}

+/*
+ * Sorting hat to determine which tablespace a given shared temporary file
+ * belongs in.
+ */
+static Oid
+ChooseTablespace(const SharedFileSet *fileset, const char *name)
+{
+    uint32        hash = hash_any((const unsigned char *) name, strlen(name));
+
+    return fileset->tablespaces[hash % fileset->ntablespaces];
+}

Hm. I wonder if just round-robin through these isn't a better approach.


+/*
+ * Compute the full path of a file in a SharedFileSet.
+ */
+static void
+GetSharedFilePath(char *path, SharedFileSet *fileset, const char *name)
+{
+    char        dirpath[MAXPGPATH];
+
+    GetSharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
+    snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s", dirpath, name);
+}
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 4c35ccf65eb..8b91d5a6ebe 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,                PrintRelCacheLeakWarning(res);
       RelationClose(res);        }
 
-
-        /* Ditto for dynamic shared memory segments */
-        while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
-        {
-            dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
-
-            if (isCommit)
-                PrintDSMLeakWarning(res);
-            dsm_detach(res);
-        }    }    else if (phase == RESOURCE_RELEASE_LOCKS)    {
@@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,                PrintFileLeakWarning(res);
   FileClose(res);        }
 
+
+        /* Ditto for dynamic shared memory segments */
+        while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
+        {
+            dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
+
+            if (isCommit)
+                PrintDSMLeakWarning(res);
+            dsm_detach(res);
+        }    }

Is that entirely unproblematic? Are there any DSM callbacks that rely on
locks still being held? Please split this part into a separate commit
with such analysis.



+/* The initial size of chunks in pages. */
+#define STS_MIN_CHUNK_PAGES 4

Could use quick description at how you've arrived at that specific
value.


+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+    int            npages;            /* Size of this chunk in BLCKSZ pages. */
+    int            ntuples;        /* Number of tuples in this chunk. */
+    char        data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;
+
+/* Per-participant shared state. */
+typedef struct SharedTuplestoreParticipant
+{
+    slock_t        mutex;
+    BlockNumber    read_page;        /* Page number for next read. */
+    BlockNumber    npages;            /* Number of pages written. */
+    bool        writing;        /* Used only for assertions. */
+
+    /*
+     * We need variable sized chunks, because we might be asked to store
+     * gigantic tuples.  To avoid the locking contention that would come from
+     * reading chunk sizes from disk, we store the chunk size for ranges of
+     * the file in a compact format in memory.  chunk_pages starts out at
+     * STS_MIN_CHUNK_PAGES and then doubles each time we reach a page listed
+     * in chunk_expansion_log.
+     */
+    BlockNumber    chunk_expansion_log[sizeof(BlockNumber) * CHAR_BIT];
+    int            chunk_expansions;
+    int            chunk_expansion;
+    int            chunk_pages;

This needs more explanation.

+/*
+ * Initialize a SharedTuplestore in existing shared memory.  There must be
+ * space for sts_estimate(participants) bytes.  If flags is set to the value
+ * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
+ * eagerly (but this isn't yet implemented).

s/iset set to the value/includes the value/ - otherwise it's not really
a flags argument.


+ * Tuples that are stored may optionally carry a piece of fixed sized
+ * meta-data which will be retrieved along with the tuple.  This is useful for
+ * the hash codes used for multi-batch hash joins, but could have other
+ * applications.

"hash codes"?



+/*
+ * Prepare to rescan.  Only participant should call this.  After it returns,
+ * all participants should call sts_begin_parallel_scan() and then loop over
+ * sts_parallel_scan_next().
+ */

s/should/may/?  Also maybe document what happens with in-progress reads
(or rather them not being allowed to exist)?


+/*
+ * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+             MinimalTuple tuple)
+{

+    /* Do we have space? */
+    size = accessor->sts->meta_data_size + tuple->t_len;
+    if (accessor->write_pointer + size >= accessor->write_end)
+    {
+        /* Try flushing to see if that creates enough space. */
+        if (accessor->write_chunk != NULL)
+            sts_flush_chunk(accessor);
+
+        /*
+         * It may still not be enough in the case of a gigantic tuple, or if
+         * we haven't created a chunk buffer at all yet.
+         */
+        if (accessor->write_pointer + size >= accessor->write_end)
+        {
+            SharedTuplestoreParticipant *participant;
+            size_t    space_needed;
+            int        pages_needed;
+
+            /* How many pages to hold this data and the chunk header? */
+            space_needed = offsetof(SharedTuplestoreChunk, data) + size;
+            pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
+            pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
+
+            /*
+             * Double the chunk size until it's big enough, and record that
+             * fact in the shared expansion log so that readers know about it.
+             */
+            participant = &accessor->sts->participants[accessor->participant];
+            while (accessor->write_pages < pages_needed)
+            {
+                accessor->write_pages *= 2;
+                participant->chunk_expansion_log[participant->chunk_expansions++] =
+                    accessor->write_page;
+            }

Hm. Isn't that going to be pretty unfunny if you have one large and a
lot of small tuples?


+            /* Create the output buffer. */
+            if (accessor->write_chunk != NULL)
+                pfree(accessor->write_chunk);
+            accessor->write_chunk = (SharedTuplestoreChunk *)
+                palloc0(accessor->write_pages * BLCKSZ);

Are we guaranteed to be in a long-lived memory context here?


+/*
+ * Get the next tuple in the current parallel scan.
+ */
+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+    SharedTuplestoreParticipant *p;
+    BlockNumber    read_page;
+    int            chunk_pages;
+    bool        eof;
+
+    for (;;)
+    {
+        /* Can we read more tuples from the current chunk? */
+        if (likely(accessor->read_ntuples < accessor->read_ntuples_available))
+            return sts_read_tuple(accessor, meta_data);

I'm not convinced this is a good use of likely/unlikely (not biased and
not performance critical enough).

+        /* Find the location of a new chunk to read. */
+        p = &accessor->sts->participants[accessor->read_participant];
+
+        SpinLockAcquire(&p->mutex);
+        eof = p->read_page >= p->npages;
+        if (!eof)
+        {
+            /*
+             * Figure out how big this chunk is.  It will almost always be the
+             * same as the last chunk loaded, but if there is one or more
+             * entry in the chunk expansion log for this page then we know
+             * that it doubled that number of times.  This avoids the need to
+             * do IO to adjust the read head, so we don't need to hold up
+             * concurrent readers.  (An alternative to this extremely rarely
+             * run loop would be to use more space storing the new size in the
+             * log so we'd have 'if' instead of 'while'.)
+             */
+            read_page = p->read_page;
+            while (p->chunk_expansion < p->chunk_expansions &&
+                   p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
+            {
+                p->chunk_pages *= 2;
+                p->chunk_expansion++;
+            }
+            chunk_pages = p->chunk_pages;
+
+            /* The next reader will start after this chunk. */
+            p->read_page += chunk_pages;
+        }
+        SpinLockRelease(&p->mutex);

This looks more like the job of an lwlock rather than a spinlock.



+/*
+ * Create the name used for our shared BufFiles.
+ */
+static void
+make_name(char *name, SharedTuplestoreAccessor *accessor, int participant)
+{
+    snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
+}

Name's a bit generic. And it's still not really making ;)


Going to buy some groceries and then look at the next patches.

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: Re: [HACKERS] [PATCH] Overestimated filter cost and its mitigation
Next
From: Robert Haas
Date:
Subject: Re: [HACKERS] why not parallel seq scan for slow functions