Re: [HACKERS] WIP: [[Parallel] Shared] Hash - Mailing list pgsql-hackers

From Andres Freund
Subject Re: [HACKERS] WIP: [[Parallel] Shared] Hash
Date
Msg-id 20170216023617.eszzvg2daiarhqpu@alap3.anarazel.de
Whole thread Raw
In response to Re: [HACKERS] WIP: [[Parallel] Shared] Hash  (Thomas Munro <thomas.munro@enterprisedb.com>)
Responses Re: [HACKERS] WIP: [[Parallel] Shared] Hash
Re: [HACKERS] WIP: [[Parallel] Shared] Hash
Re: [HACKERS] WIP: [[Parallel] Shared] Hash
List pgsql-hackers
Hi,

On 2017-02-13 23:57:00 +1300, Thomas Munro wrote:
> Here's a new version to fix the problems reported by Rafia above.  The
> patch descriptions are as before but it starts from 0002 because 0001
> was committed as 7c5d8c16 (thanks, Andres).

FWIW, I'd appreciate if you'd added a short commit message to the
individual patches - I find it helpful to have a littlebit more context
while looking at them than just the titles.  Alternatively you can
include that text when re-posting the series, but it's imo just as easy
to have a short commit message (and just use format-patch).

I'm for now using [1] as context.


0002-hj-add-dtrace-probes-v5.patch

Hm. I'm personally very unenthusiastic about addming more of these, and
would rather rip all of them out.  I tend to believe that static
problems simply aren't a good approach for anything requiring a lot of
detail.  But whatever.


0003-hj-refactor-memory-accounting-v5.patch
@@ -424,15 +422,29 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,    if (ntuples <= 0.0)
ntuples= 1000.0;
 
-    /*
-     * Estimate tupsize based on footprint of tuple in hashtable... note this
-     * does not allow for any palloc overhead.  The manipulations of spaceUsed
-     * don't count palloc overhead either.
-     */
+    /* Estimate tupsize based on footprint of tuple in hashtable. */

palloc overhead is still unaccounted for, no? In the chunked case that
might not be much, I realize that (so that comment should probably have
been updated when chunking was introduced).

-    Size        spaceUsed;        /* memory space currently used by tuples */
+    Size        spaceUsed;        /* memory space currently used by hashtable */

It's not really the full hashtable, is it? The ->buckets array appears
to still be unaccounted for.

Looks ok.


0004-hj-refactor-batch-increases-v5.patch

@@ -1693,10 +1689,12 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)}/*
- * Allocate 'size' bytes from the currently active HashMemoryChunk
+ * Allocate 'size' bytes from the currently active HashMemoryChunk.  If
+ * 'respect_work_mem' is true, this may cause the number of batches to be
+ * increased in an attempt to shrink the hash table. */static void *
-dense_alloc(HashJoinTable hashtable, Size size)
+dense_alloc(HashJoinTable hashtable, Size size, bool respect_work_mem)

{    HashMemoryChunk newChunk;    char       *ptr;
@@ -1710,6 +1708,15 @@ dense_alloc(HashJoinTable hashtable, Size size)     */    if (size > HASH_CHUNK_THRESHOLD)    {
+        if (respect_work_mem &&
+            hashtable->growEnabled &&
+            hashtable->spaceUsed + HASH_CHUNK_HEADER_SIZE + size >
+            hashtable->spaceAllowed)
+        {
+            /* work_mem would be exceeded: try to shrink hash table */
+            ExecHashIncreaseNumBatches(hashtable);
+        }
+

Isn't it kinda weird to do this from within dense_alloc()?  I mean that
dumps a lot of data to disk, frees a bunch of memory and so on - not
exactly what "dense_alloc" implies.  Isn't the free()ing part also
dangerous, because the caller might actually use some of that memory,
like e.g. in ExecHashRemoveNextSkewBucket() or such.  I haven't looked
deeply enough to check whether that's an active bug, but it seems like
inviting one if not.


0005-hj-refactor-unmatched-v5.patch

I'm a bit confused as to why unmatched tuple scan is a good parallelism
target, but I might see later...

0006-hj-barrier-v5.patch

Skipping that here.


0007-hj-exec-detach-node-v5.patch

Hm. You write elsewhere:
> By the time ExecEndNode() runs in workers, ExecShutdownNode() has
> already run.  That's done on purpose because, for example, the hash
> table needs to survive longer than the parallel environment to allow
> EXPLAIN to peek at it.  But it means that the Gather node has thrown
> out the shared memory before any parallel-aware node below it gets to
> run its Shutdown and End methods.  So I invented ExecDetachNode()
> which runs before ExecShutdownNode(), giving parallel-aware nodes a
> chance to say goodbye before their shared memory vanishes.  Better
> ideas?

To me that is a weakness in the ExecShutdownNode() API - imo child nodes
should get the chance to shutdown before the upper-level node.
ExecInitNode/ExecEndNode etc give individual nodes the freedom to do
things in the right order, but ExecShutdownNode() doesn't.  I don't
quite see why we'd want to invent a separate ExecDetachNode() that'd be
called immediately before ExecShutdownNode().

An easy way to change that would be to return in the
ExecShutdownNode()'s T_GatherState case, and delegate the responsibility
of calling it on Gather's children to ExecShutdownGather().
Alternatively we could make it a full-blown thing like ExecInitNode()
that every node needs to implement, but that seems a bit painful.

Or have I missed something here?

Random aside: Wondered before if having to provide all executor
callbacks is a weakness of our executor integration, and whether it
shouldn't be a struct of callbacks instead...


0008-hj-shared-single-batch-v5.patch

First-off: I wonder if we should get the HASHPATH_TABLE_SHARED_SERIAL
path committed first. ISTM that's already quite beneficial, and there's
a good chunk of problems that we could push out initially.

This desperately needs tests.

Have you measured whether the new branches in nodeHash[join] slow down
non-parallel executions?  I do wonder if it'd not be better to have to
put the common code in helper functions and have seperate
T_SharedHashJoin/T_SharedHash types.  If you both have a parallel and
non-parallel hash in the same query, the branches will be hard to
predict...

I think the synchronization protocol with the various phases needs to be
documented somewhere.  Probably in nodeHashjoin.c's header.

The state machine code in MultiExecHash() also needs more
comments. Including the fact that avoiding repeating work is done by
"electing" leaders via BarrierWait().

I wonder if it wouldn't be better to inline the necessary code into the
switch (with fall-throughs), instead of those gotos; putting some of the
relevant code (particularly the scanning of the child node) into
seperate functions.

+ build:
+    if (HashJoinTableIsShared(hashtable))
+    {
+        /* Make sure our local state is up-to-date so we can build. */
+        Assert(BarrierPhase(barrier) == PHJ_PHASE_BUILDING);
+        ExecHashUpdate(hashtable);
+    }
+    /*     * set expression context     */
@@ -128,18 +197,78 @@ MultiExecHash(HashState *node)

Why's is the parallel code before variable initialization stuff like
setting up econtext?


> Introduces hash joins with "Shared Hash" and "Parallel Shared Hash"
> nodes, for single-batch joins only.

We don't necessarily know that ahead of time.  So this isn't something
that we could actually apply separately, right?

    /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
-    if (hashtable->nbuckets != hashtable->nbuckets_optimal)
-        ExecHashIncreaseNumBuckets(hashtable);
+    ExecHashUpdate(hashtable);
+    ExecHashIncreaseNumBuckets(hashtable);

It's kinda weird that we had the nearly redundant nbuckets !=
nbuckets_optimal checks before...


+static void *
+dense_alloc_shared(HashJoinTable hashtable,
+                   Size size,
+                   dsa_pointer *shared)

Hm. I wonder if HASH_CHUNK_SIZE being only 32kb isn't going to be a
bottlenck here.


@@ -195,6 +238,40 @@ ExecHashJoin(HashJoinState *node)                if (TupIsNull(outerTupleSlot))                {
                /* end of batch, or maybe whole join */
 
+
+                    if (HashJoinTableIsShared(hashtable))
+                    {
+                        /*
+                         * An important optimization: if this is a
+                         * single-batch join and not an outer join, there is
+                         * no reason to synchronize again when we've finished
+                         * probing.
+                         */
+                        Assert(BarrierPhase(&hashtable->shared->barrier) ==
+                               PHJ_PHASE_PROBING);
+                        if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node))
+                            return NULL;    /* end of join */
+

I think it's a bit weird that the parallel path now has an exit path
that the non-parallel path doesn't have.


+     * If this is a shared hash table, there is a extra charge for inserting
+     * each tuple into the shared hash table to cover memory synchronization
+     * overhead, compared to a private hash table.  There is no extra charge
+     * for probing the hash table for outer path row, on the basis that
+     * read-only access to a shared hash table shouldn't be any more
+     * expensive.
+     *
+     * cpu_shared_tuple_cost acts a tie-breaker controlling whether we prefer
+     * HASHPATH_TABLE_PRIVATE or HASHPATH_TABLE_SHARED_SERIAL plans when the
+     * hash table fits in work_mem, since the cost is otherwise the same.  If
+     * it is positive, then we'll prefer private hash tables, even though that
+     * means that we'll be running N copies of the inner plan.  Running N
+     * copies of the copies of the inner plan in parallel is not considered
+     * more expensive than running 1 copy of the inner plan while N-1
+     * participants do nothing, despite doing less work in total.
+     */
+    if (table_type != HASHPATH_TABLE_PRIVATE)
+        startup_cost += cpu_shared_tuple_cost * inner_path_rows;
+
+    /*
+     * If this is a parallel shared hash table, then the value we have for
+     * inner_rows refers only to the rows returned by each participant.  For
+     * shared hash table size estimation, we need the total number, so we need
+     * to undo the division.
+     */
+    if (table_type == HASHPATH_TABLE_SHARED_PARALLEL)
+        inner_path_rows_total *= get_parallel_divisor(inner_path);
+
+    /*

Is the per-tuple cost really the same for HASHPATH_TABLE_SHARED_SERIAL
and PARALLEL?

Don't we also need to somehow account for the more expensive hash-probes
in the HASHPATH_TABLE_SHARED_* cases? Seems quite possible that we'll
otherwise tend to use shared tables for small hashed tables that are
looked up very frequently, even though a private one will likely be
faster.


+    /*
+     * Set the table as sharable if appropriate, with parallel or serial
+     * building.  If parallel, the executor will also need an estimate of the
+     * total number of rows expected from all participants.
+     */

Oh. I was about to comment that sharable is wrong, just to discover it's
valid in NA. Weird.


@@ -2096,6 +2096,7 @@ create_mergejoin_path(PlannerInfo *root, * 'required_outer' is the set of required outer rels *
'hashclauses'are the RestrictInfo nodes to use as hash clauses *        (this should be a subset of the
restrict_clauseslist)
 
+ * 'table_type' to select [[Parallel] Shared] Hash */HashPath *create_hashjoin_path(PlannerInfo *root,

Reminds me that you're not denoting the Parallel bit in explain right
now - intentionally so?

/*
- * To reduce palloc overhead, the HashJoinTuples for the current batch are
- * packed in 32kB buffers instead of pallocing each tuple individually.
+ * To reduce palloc/dsa_allocate overhead, the HashJoinTuples for the current
+ * batch are packed in 32kB buffers instead of pallocing each tuple
+ * individually.

s/palloc\/dsa_allocate/allocator/?


@@ -112,8 +121,12 @@ typedef struct HashMemoryChunkData    size_t        maxlen;            /* size of the buffer
holdingthe tuples */    size_t        used;            /* number of buffer bytes already used */
 
-    struct HashMemoryChunkData *next;    /* pointer to the next chunk (linked
-                                         * list) */
+    /* pointer to the next chunk (linked list) */
+    union
+    {
+        dsa_pointer shared;
+        struct HashMemoryChunkData *unshared;
+    } next;

This'll increase memory usage on some platforms, e.g. when using
spinlock backed atomics.  I tend to think that that's fine, but it's
probably worth calling out.


--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -787,7 +787,15 @@ typedef enum    WAIT_EVENT_MQ_SEND,    WAIT_EVENT_PARALLEL_FINISH,    WAIT_EVENT_SAFE_SNAPSHOT,
-    WAIT_EVENT_SYNC_REP
+    WAIT_EVENT_SYNC_REP,
+    WAIT_EVENT_HASH_BEGINNING,
+    WAIT_EVENT_HASH_CREATING,
+    WAIT_EVENT_HASH_BUILDING,
+    WAIT_EVENT_HASH_RESIZING,
+    WAIT_EVENT_HASH_REINSERTING,
+    WAIT_EVENT_HASH_UNMATCHED,
+    WAIT_EVENT_HASHJOIN_PROBING,
+    WAIT_EVENT_HASHJOIN_REWINDING} WaitEventIPC;

Hm. That seems a bit on the detailed side - if we're going that way it
seems likely that we'll end up with hundreds of wait events. I don't
think gradually evolving wait events into something like a query
progress framework is a good idea.



That's it for now...

- Andres

[1] http://archives.postgresql.org/message-id/CAEepm%3D1D4-tP7j7UAgT_j4ZX2j4Ehe1qgZQWFKBMb8F76UW5Rg%40mail.gmail.com



pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: [HACKERS] Proposal: GetOldestXminExtend for ignoring arbitraryvacuum flags
Next
From: Masahiko Sawada
Date:
Subject: Re: [HACKERS] DROP SUBSCRIPTION and ROLLBACK