Re: [HACKERS] Parallel Hash take II - Mailing list pgsql-hackers

From Andres Freund
Subject Re: [HACKERS] Parallel Hash take II
Date
Msg-id 20171114212439.rfj26sdhhcoohyhz@alap3.anarazel.de
Whole thread Raw
In response to Re: [HACKERS] Parallel Hash take II  (Thomas Munro <thomas.munro@enterprisedb.com>)
Responses Re: [HACKERS] Parallel Hash take II  (Robert Haas <robertmhaas@gmail.com>)
List pgsql-hackers
Hi,

On 2017-11-14 01:30:30 +1300, Thomas Munro wrote:
> > +-- The "good" case: batches required, but we plan the right number; we
> > +-- plan for 16 batches, and we stick to that number, and peak memory
> > +-- usage says within our work_mem budget
> > +-- non-parallel
> > +set max_parallel_workers_per_gather = 0;
> > +set work_mem = '128kB';
> >
> > So how do we know that's actually the case we're testing rather than
> > something arbitrarily different? There's IIRC tests somewhere that just
> > filter the json explain output to the right parts...
> 
> Yeah, good idea.  My earlier attempts to dump out the hash join
> dimensions ran into problems with architecture sensitivity and then
> some run-to-run non-determinism in the parallel case (due to varying
> fragmentation depending on how many workers get involved in time).
> The attached version tells you about batch growth without reporting
> the exact numbers, except in the "ugly" case where we know that there
> is only one possible outcome because the extreme skew detector is
> guaranteed to go off after the first nbatch increase (I got rid of all
> other tuples except ones with the same key to make this true).

Hm. The way you access this doesn't quite seem right:
+--
+-- exercises for the hash join code
+--
+begin;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  line text;
+  matches text[];
+begin
+  for line in
+    execute 'explain analyze ' || query
+  loop
+    matches := (regexp_matches(line, '  Batches: ([0-9]+) \(originally ([0-9]+)\)'));
+    if matches is not null then
+      original := matches[2]::int;
+      final := matches[1]::int;
+      return next;
+    else
+      matches := regexp_matches(line, '  Batches: ([0-9]+)');
+      if matches is not null then
+        original := matches[1]::int;
+        final := original;
+        return next;
+      end if;
+    end if;
+  end loop;
+end;
+$$;

Why not use format json and access the output that way? Then you can be
sure you access the right part of the tree and such?

> > +       else
> > +       {
> > +               errno = stat_errno;
> > +               elog(LOG, "could not stat file \"%s\": %m", path);
> > +       }
> >
> > All these messages are "not expected to ever happen" ones, right?
> 
> You'd have to suffer a nasty filesystem failure, remount read-only or
> manually with permissions or something.  Not sure where the line is,
> but I've changed all of these new elog calls to ereport.

Oh, I'd been fine keeping them as elogs. The one exception would have
been out-of-space cases which'll occur in practice.


> > +       if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
> > +       {
> > +               /* Subtract its size from current usage (do first in case of error) */
> > +               temporary_files_size -= vfdP->fileSize;
> > +               vfdP->fileSize = 0;
> > +       }
> >
> > So, is it right to do so unconditionally and without regard for errors?
> > If the file isn't deleted, it shouldn't be subtracted from fileSize. I
> > guess you're managing that through the flag, but that's not entirely
> > obvious.
> 
> I think it is.  Reasoning:  The existing behaviour of fd.c is that if
> we don't manage to delete temporary files, we'll LOG something and
> forget about them (they'll be cleaned up eventually by a clean restart
> or human intervention).

IOW: Never ;)


> > +/*
> > + * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
> > + * pointer to meta data of that size must be provided.
> > + */
> > +void
> > +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
> > +                        MinimalTuple tuple)
> > +{
> >
> > +       /* Do we have space? */
> > +       size = accessor->sts->meta_data_size + tuple->t_len;
> > +       if (accessor->write_pointer + size >= accessor->write_end)
> > +       {
> > +               /* Try flushing to see if that creates enough space. */
> > +               if (accessor->write_chunk != NULL)
> > +                       sts_flush_chunk(accessor);
> > +
> > +               /*
> > +                * It may still not be enough in the case of a gigantic tuple, or if
> > +                * we haven't created a chunk buffer at all yet.
> > +                */
> > +               if (accessor->write_pointer + size >= accessor->write_end)
> > +               {
> > +                       SharedTuplestoreParticipant *participant;
> > +                       size_t  space_needed;
> > +                       int             pages_needed;
> > +
> > +                       /* How many pages to hold this data and the chunk header? */
> > +                       space_needed = offsetof(SharedTuplestoreChunk, data) + size;
> > +                       pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
> > +                       pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
> > +
> > +                       /*
> > +                        * Double the chunk size until it's big enough, and record that
> > +                        * fact in the shared expansion log so that readers know about it.
> > +                        */
> > +                       participant = &accessor->sts->participants[accessor->participant];
> > +                       while (accessor->write_pages < pages_needed)
> > +                       {
> > +                               accessor->write_pages *= 2;
> > +                               participant->chunk_expansion_log[participant->chunk_expansions++] =
> > +                                       accessor->write_page;
> > +                       }
> >
> > Hm. Isn't that going to be pretty unfunny if you have one large and a
> > lot of small tuples?
> 
> It will increase the parallel scan grain size, and then keep that size
> for the rest of the contents of one backend's output file.  I am aware
> of two downsides to using a large parallel grain:

> 1.  It determines the amount of unfairness when we run out of data:
> it's the maximum amount of extra data that the unlucky last worker can
> finish up with after all the others have finished.  I think this
> effect is reduced by higher level factors: when a reader runs out of
> data in one backend's file, it'll start reading another backend's
> file.  If it's hit the end of all backends' files and this is an outer
> batch, Parallel Hash will just go and work on another batch
> immediately.

Consider e.g. what happens if there's the occasional 500MB datum, and
the rest's very small...


> Better ideas?

Not really. I'm more than a bit suspicous of this solution, but I don't
really have a great suggestion otherwise.  One way to combat extreme
size skew would be to put very large datums into different files.

But I think we probably can go with your approach for now, ignoring my
failure prone spidey senses ;)


> > +               /* Find the location of a new chunk to read. */
> > +               p = &accessor->sts->participants[accessor->read_participant];
> > +
> > +               SpinLockAcquire(&p->mutex);
> > +               eof = p->read_page >= p->npages;
> > +               if (!eof)
> > +               {
> > +                       /*
> > +                        * Figure out how big this chunk is.  It will almost always be the
> > +                        * same as the last chunk loaded, but if there is one or more
> > +                        * entry in the chunk expansion log for this page then we know
> > +                        * that it doubled that number of times.  This avoids the need to
> > +                        * do IO to adjust the read head, so we don't need to hold up
> > +                        * concurrent readers.  (An alternative to this extremely rarely
> > +                        * run loop would be to use more space storing the new size in the
> > +                        * log so we'd have 'if' instead of 'while'.)
> > +                        */
> > +                       read_page = p->read_page;
> > +                       while (p->chunk_expansion < p->chunk_expansions &&
> > +                                  p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
> > +                       {
> > +                               p->chunk_pages *= 2;
> > +                               p->chunk_expansion++;
> > +                       }
> > +                       chunk_pages = p->chunk_pages;
> > +
> > +                       /* The next reader will start after this chunk. */
> > +                       p->read_page += chunk_pages;
> > +               }
> > +               SpinLockRelease(&p->mutex);
> >
> > This looks more like the job of an lwlock rather than a spinlock.
> 
> I switched to the alternative algorithm mentioned in parentheses in
> the comment.  It uses a bit more space, but that loop is gone.  In my
> mind this is much like Parallel Seq Scan: we acquire a spinlock just
> to advance the block pointer.  The added complication is that we also
> check if the chunk size has changed, which clang renders as this many
> instructions:
> 
> postgres[0x10047eee0] <+176>:  movslq 0x144(%r15,%rbx), %rcx
> postgres[0x10047eee8] <+184>:  cmpl   0x140(%r15,%rbx), %ecx
> postgres[0x10047eef0] <+192>:  jge    0x10047ef16               ;
> <+230> at sharedtuplestore.c:489
> postgres[0x10047eef2] <+194>:  leaq   (%r15,%rbx), %rdx
> postgres[0x10047eef6] <+198>:  cmpl   %r12d, 0x40(%rdx,%rcx,8)
> postgres[0x10047eefb] <+203>:  jne    0x10047ef16               ;
> <+230> at sharedtuplestore.c:489
> postgres[0x10047eefd] <+205>:  leaq   0x144(%r15,%rbx), %rsi
> postgres[0x10047ef05] <+213>:  leal   0x1(%rcx), %edi
> postgres[0x10047ef08] <+216>:  movl   %edi, (%rsi)
> postgres[0x10047ef0a] <+218>:  movl   0x44(%rdx,%rcx,8), %ecx
> postgres[0x10047ef0e] <+222>:  movl   %ecx, 0x148(%r15,%rbx)
> postgres[0x10047ef16] <+230>:  movl   0x148(%r15,%rbx), %r15d
> 
> That should be OK, right?

It's not too bad. Personally I'm of the opinion though that pretty much
no new spinlocks should be added - their worst case performance
characteristics are bad enough for that to be only worth the
experimentation in case swhere each cycle really matters and where
contention is unlikely.


> > One day we're going to need a better approach to this. I have no idea
> > how, but this per-node, and now per_node * max_parallelism, approach has
> > only implementation simplicity as its benefit.
> 
> I agree, and I am interested in that subject.  In the meantime, I
> think it'd be pretty unfair if parallel-oblivious hash join and
> sort-merge join and every other parallel plan get to use work_mem * p
> (and in some cases waste it with duplicate data), but Parallel Hash
> isn't allowed to do the same (and put it to good use).

I'm not sure I care about fairness between pieces of code ;) 

> > -                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
> > +                               if (hashtable->parallel_state)
> > +                               {
> > +                                       Barrier    *build_barrier;
> > +
> > +                                       build_barrier = &hashtable->parallel_state->build_barrier;
> > +                                       if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
> > +                                       {
> > +                                               /*
> > +                                                * If multi-batch, we need to hash the outer relation
> > +                                                * up front.
> > +                                                */
> > +                                               if (hashtable->nbatch > 1)
> > +                                                       ExecParallelHashJoinPartitionOuter(node);
> > +                                               BarrierArriveAndWait(build_barrier,
> > +
WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
> > +                                       }
> > +                                       Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
> > +
> > +                                       /* Each backend should now select a batch to work on. */
> > +                                       hashtable->curbatch = -1;
> > +                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
> > +
> > +                                       continue;
> > +                               }
> > +                               else
> > +                                       node->hj_JoinState = HJ_NEED_NEW_OUTER;
> >
> > You know what I'm going to say about all these branches, and sigh.
> 
> BTW this is not per-tuple code -- it runs once at the end of hashing.
> Not sure what you're looking for here.

It was more a general statement about all the branches in nodeHashjoin,
than about these specific branches. Should've made that clearer. There's
definitely branches in very common parts:        case HJ_NEED_NEW_OUTER:
            /*             * We don't have an outer tuple, try to get the next one             */            if
(hashtable->parallel_state)               outerTupleSlot =
ExecParallelHashJoinOuterGetTuple(outerNode,node,                                                      &hashvalue);
      else                outerTupleSlot =                    ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
 


I don't think you should do so now, but I think a reasonable approach
here would be to move the HJ_BUILD_HASHTABLE code into a separate
function (it really can't be hot). Then have specialized ExecHashJoin()
versions for parallel/non-parallel and potentially for outer/inner/anti.


> > If we don't split this into two versions, we at least should store
> > hashNode->parallel_state in a local var, so the compiler doesn't have to
> > pull that out of memory after every external function call (of which
> > there are a lot). In common cases it'll end up in a callee saved
> > registers, and most of the called functions won't be too register
> > starved (on x86-64).
> 
> Hmm.  Well I did that already in v24 -- in many places there is now a
> local variable called pstate.

See above piece of code, and a few others, in nodeHash.


> > I think it'd be better if we structured the file so we just sat guc's
> > with SET LOCAL inside a transaction.
> 
> I wrapped the whole region of join.sql concerned with hash joins in a
> transaction that rolls back, so I don't have to write LOCAL.  That's
> just as good, right?

Not really imo. Being able to read a test without going through all
previous ones is a lot better.

Greetings,

Andres Freund


pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: Re: [HACKERS] Proposal: Local indexes for partitioned table
Next
From: Dmitry Dolgov
Date:
Subject: Re: [HACKERS] [PATCH] Generic type subscripting