Thread: [HACKERS] Parallel Hash take II

[HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
Hi hackers,

Here is a new version of my parallel-aware hash join patchset.  I've
dropped 'shared' from the feature name and EXPLAIN output since that's
now implied by the word "Parallel" (that only made sense in earlier
versions that had Shared Hash and Parallel Shared Hash, but a Shared
Hash with just one participant building it didn't turn out to be very
useful so I dropped it a few versions ago).  I figured for this new
round I should create a new thread, but took the liberty of copying
the CC list from the previous one[1].

The main changes are:

1.  Implemented the skew optimisation for parallel-aware mode.  The
general approach is the same as the regular hash table: insert with a
CAS loop.  The details of memory budget management are different
though.  It grants chunks of budget to participants as needed even
though allocation is still per-tuple, and it has to deal with
concurrent bucket removal.  I removed one level of indirection from
the skew hash table: in this version hashtable->skewBucket is an array
of HashSkewBucket instead of pointers to HashSkewBuckets allocated
separately.  That makes the hash table array twice as big but avoids
one pointer hop when probing an active bucket; that refactoring was
not strictly necessary but made the changes to support parallel build
simpler.

2.  Simplified costing.  There is now just one control knob
"parallel_synchronization_cost", which I charge for each time the
participants will wait for each other at a barrier, to be set high
enough to dissuade the planner from using Parallel Hash for tiny hash
tables that would be faster in a parallel-oblivious hash join.
Earlier ideas about modelling the cost of shared memory access didn't
work out.

Status:  I think there are probably some thinkos in the new skew
stuff.  I think I need some new ideas about how to refactor things so
that there isn't quite so much "if-shared-then-this-else-that".  I
think I should build some kind of test mode to control barriers so
that I can test the permutations of participant arrival phase
exhaustively.  I need to propose an empirically derived default for
the GUC.  There are several other details I would like to tidy up and
improve.  That said, I wanted to post what I have as a checkpoint now
that I have the major remaining piece (skew optimisation) more-or-less
working and the costing at a place that I think make sense.

I attach some queries to exercise various interesting cases.  I would
like to get something like these into fast-running regression test
format.

Note that this patch requires the shared record typmod patch[2] in
theory, since shared hash table tuples might reference bless record
types, but there is no API dependency so you can use this patch set
without applying that one.  If anyone knows how to actually provoke a
parallel hash join that puts RECORD types into the hash table, I'd be
very interested to hear about it, but certainly for TPC and similar
testing that other patch set is not necessary.

Of the TPC-H queries, I find that Q3, Q5, Q7, Q8, Q9, Q10, Q12, Q14,
Q16, Q18, Q20 and Q21 make use of Parallel Hash nodes (I tested with
neqjoinsel-fix-v3.patch[3] also applied, which avoids some but not all
craziness in Q21).  For examples that also include a
parallel-oblivious Hash see Q8 and Q10: in those queries you can see
the planner deciding that it's not worth paying
parallel_synchronization_cost = 10 to load the 25 row "nation" table.
I'll report on performance separately.

[1] https://www.postgresql.org/message-id/flat/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
[2] https://www.postgresql.org/message-id/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
[3]
https://www.postgresql.org/message-id/CAEepm%3D3%3DNHHko3oOzpik%2BggLy17AO%2Bpx3rGYrg3x_x05%2BBr9-A%40mail.gmail.com

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

On 2017-07-26 20:12:56 +1200, Thomas Munro wrote:
> Here is a new version of my parallel-aware hash join patchset.

Yay!

Working on reviewing this. Will send separate emails for individual
patch reviews.


> 2.  Simplified costing.  There is now just one control knob
> "parallel_synchronization_cost", which I charge for each time the
> participants will wait for each other at a barrier, to be set high
> enough to dissuade the planner from using Parallel Hash for tiny hash
> tables that would be faster in a parallel-oblivious hash join.
> Earlier ideas about modelling the cost of shared memory access didn't
> work out.

Hm. You say, "didn't work out" - could you expand a bit on that? I'm
quite doubtful that justaccounting for barriers will be good enough.


> I'll report on performance separately.

Looking forward to that ;)

Greetings,

Andres Freund



Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Tue, Aug 1, 2017 at 9:28 AM, Andres Freund <andres@anarazel.de> wrote:
> On 2017-07-26 20:12:56 +1200, Thomas Munro wrote:
>> 2.  Simplified costing.  There is now just one control knob
>> "parallel_synchronization_cost", which I charge for each time the
>> participants will wait for each other at a barrier, to be set high
>> enough to dissuade the planner from using Parallel Hash for tiny hash
>> tables that would be faster in a parallel-oblivious hash join.
>> Earlier ideas about modelling the cost of shared memory access didn't
>> work out.
>
> Hm. You say, "didn't work out" - could you expand a bit on that? I'm
> quite doubtful that justaccounting for barriers will be good enough.

The earlier approach and some variants I played with were based on the
idea that we should try to estimate the cost of using shared memory.
But there's no precedent for costing the cache hierarchy beyond disk
vs memory, and it depends so much on your hardware (NUMA vs UMA) and
the data distribution.  I have no doubt that variations in memory
access costs are important (for example, it was data distribution that
determined whether big-cache-oblivious-shared-hash-table or
MonetDB-style cache-aware approach won in that paper I've mentioned
here before[1]), but it seems like a hard problem and I didn't feel
like it was necessary.  Or do you have a different idea here?

Another point is that in the earlier versions I was trying to teach
the planner how to choose among Hash, Shared Hash and Parallel Shared
Hash.  The difference in costing between Hash and Shared Hash (one
worker builds, all workers probe) was important and sensitive, because
the only difference between them would be the cost of memory sharing.
When I dropped Shared Hash from the patch set, it no longer seemed
necessary to try to deal with such subtle costing, because Hash and
Parallel Hash (now without the word 'Shared') already have wildly
different costs: the latter is divided over N CPUs.  So I felt I could
get away with a much blunter instrument: just something to avoid
parallel build overheads for tiny tables like TPC-H "nation".

I still wanted something that makes intuitive sense and that could be
set using experimental evidence though.  Parallel_synchronization_cost
is an estimate of how long the average backend will have to wait for
the last backend to complete the phase and arrive at each barrier.
The most interesting case is the build phase: how long will the the
last backend make us wait before probing can begin?  Well, that
depends on the parallel grain.  Currently, the ultimate source of all
parallelism in our executor is Parallel Seq Scan and Parallel Index
Scan, and they hand out a page at a time.  Of course, any number of
nodes may sit between the hash join and the scan, and one of them
might include a function that sleeps for 100 years in one backend or
performs a join that generates wildly different numbers of tuples in
each backend.  I don't know what to do about that, other than to
assume we have perfectly spherical cows and reason on the basis of an
expected parallel grain reaching us from the scans.

One thing to note about parallel_synchronization_cost is that the cost
units, where 1 is traditionally the cost of scanning a page, actually
make *some* kind of sense here, though it's a bit tenuous: the last
worker to complete is the one that scans the final pages, while the
others see the scan finished.  What's really wanted here is not simply
page scanning cost but rather a percentage of the total cost that
represents how much extra work the lanterne rouge of backends has to
do.

Two relevant projects here are:

1.  David Rowley proposes changing the seq scan grain[2], perhaps
adaptively.  I suppose as this number increases the time at which two
workers finish can vary more greatly.
2.  The parallel-append project introduces a completely different type
of granularity based on unrelated and separately costed subplans
rather than pages.  Perhaps there are things that could be done here
to model the fact that some workers might finish a long time before
others, but I don't know.

Perhaps what parallel hash really needs is not a user-controlled
parallel_synchronization_cost, but some number produced by the planner
to describe the expected distribution of tuple counts over workers.
Armed with something like that and the cost per tuple you might be
able to estimate how long we expect hash join barriers to make you
wait without introducing any new GUCs at all.  I thought about some of
these things a bit but it seemed like a big research project of its
own and I was persuaded in an off-list discussion by Robert to try to
find the simplest thing that would avoid parallel-aware hash for
little tables that are already built very cheaply.

[1] http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.225.3495
[2] https://www.postgresql.org/message-id/CAKJS1f-XhfQ2-%3D85wgYo5b3WtEs%3Dys%3D2Rsq%3DNuvnmaV4ZsM1XQ%40mail.gmail.com

-- 
Thomas Munro
http://www.enterprisedb.com



Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Wed 26 Jul 2017 19:58:20 NZST
Subject: [PATCH] Add support for parallel-aware hash joins.

Hi,

WRT the main patch:

- Echoing concerns from other threads (Robert: ping): I'm doubtful that it makes sense to size the number of parallel
workerssolely based on the parallel scan node's size.  I don't think it's this patch's job to change that, but to me it
seriouslyamplifys that - I'd bet there's a lot of cases with nontrivial joins where the benefit from parallelism on the
joinlevel is bigger than on the scan level itself.  And the number of rows in the upper nodes might also be bigger than
onthe scan node level, making it more important to have higher number of nodes.
 

- If I understand the code in initial_cost_hashjoin() correctly, we count the synchronization overhead once,
independentof the number of workers.  But on the other hand we calculate the throughput by dividing by the number of
workers. Do you think that's right?
 

- I haven't really grokked the deadlock issue you address. Could you expand the comments on that? Possibly somewhere
centralreferenced by the various parts.
 

- maybe I'm overly paranoid, but it might not be bad to add some extra checks for ExecReScanHashJoin ensuring that it
doesn'tget called when workers are still doing something.
 

- seems like you're dereffing tuple unnecessarily here:

+    /*
+     * If we detached a chain of tuples, transfer them to the main hash table
+     * or batch storage.
+     */
+    if (regainable_space > 0)
+    {
+        HashJoinTuple tuple;
+
+        tuple = (HashJoinTuple)
+            dsa_get_address(hashtable->area, detached_chain_shared);
+        ExecHashTransferSkewTuples(hashtable, detached_chain,
+                                   detached_chain_shared);
+
+        /* Remove from the total space used. */
+        LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE);
+        Assert(hashtable->shared->size >= regainable_space);
+        hashtable->shared->size -= regainable_space;
+        LWLockRelease(&hashtable->shared->chunk_lock);
+
+        /*
+         * If the bucket we removed is the same as the bucket the caller just
+         * overflowed, then we can forget about the overflowing part of the
+         * tuple.  It's been moved out of the skew hash table.  Otherwise, the
+         * caller will call again; eventually we'll either succeed in
+         * allocating space for the overflow or reach this case.
+         */
+        if (bucket_to_remove == bucketno)
+        {
+            hashtable->spaceUsedSkew = 0;
+            hashtable->spaceAllowedSkew = 0;
+        }
+    }


- The names here could probably improved some:
+        case WAIT_EVENT_HASH_SHRINKING1:
+            event_name = "Hash/Shrinking1";
+            break;
+        case WAIT_EVENT_HASH_SHRINKING2:
+            event_name = "Hash/Shrinking2";
+            break;
+        case WAIT_EVENT_HASH_SHRINKING3:
+            event_name = "Hash/Shrinking3";
+            break;
+        case WAIT_EVENT_HASH_SHRINKING4:
+            event_name = "Hash/Shrinking4";

- why are we restricting rows_total bit to parallel aware?

+    /*
+     * If parallel-aware, the executor will also need an estimate of the total
+     * number of rows expected from all participants so that it can size the
+     * shared hash table.
+     */
+    if (best_path->jpath.path.parallel_aware)
+    {
+        hash_plan->plan.parallel_aware = true;
+        hash_plan->rows_total = best_path->inner_rows_total;
+    }
+

- seems we need a few more test - I don't think the existing tests are properly going to exercise the skew stuff,
multiplebatches, etc? This is nontrivial code, I'd really like to see a high test coverage of the new code.
 

- might not hurt to reindent before the final submission

- Unsurprisingly, please implement the FIXME ;)


Regards,

Andres



Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Mon, Jul 31, 2017 at 9:11 PM, Andres Freund <andres@anarazel.de> wrote:
> - Echoing concerns from other threads (Robert: ping): I'm doubtful that
>   it makes sense to size the number of parallel workers solely based on
>   the parallel scan node's size.  I don't think it's this patch's job to
>   change that, but to me it seriously amplifys that - I'd bet there's a
>   lot of cases with nontrivial joins where the benefit from parallelism
>   on the join level is bigger than on the scan level itself.  And the
>   number of rows in the upper nodes might also be bigger than on the
>   scan node level, making it more important to have higher number of
>   nodes.

Well, I feel like a broken record here but ... yeah, I agree we need
to improve that.  It's probably generally true that the more parallel
operators we add, the more potential benefit there is in doing
something about that problem.  But, like you say, not in this patch.

http://postgr.es/m/CA+TgmoYL-SQZ2gRL2DpenAzOBd5+SW30QB=A4CseWtOgejz4aQ@mail.gmail.com

I think we could improve things significantly by generating multiple
partial paths with different number of parallel workers, instead of
just picking a number of workers based on the table size and going
with it.  For that to work, though, you'd need something built into
the costing to discourage picking paths with too many workers.  And
you'd need to be OK with planning taking a lot longer when parallelism
is involved, because you'd be carrying around more paths for longer.
There are other problems to solve, too.

I still think, though, that it's highly worthwhile to get at least a
few more parallel operators - and this one in particular - done before
we attack that problem in earnest.  Even with a dumb calculation of
the number of workers, this helps a lot.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <andres@anarazel.de> wrote:
> WRT the main patch:

Thanks for the review.  I will respond soon, but for now I just wanted
to post a rebased version (no changes) because v16 no longer applies.

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Wed, Aug 2, 2017 at 10:06 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <andres@anarazel.de> wrote:
>> WRT the main patch:
>
> Thanks for the review.  I will respond soon, but for now I just wanted
> to post a rebased version (no changes) because v16 no longer applies.

Rebased with better commit messages.  Sorry for the changed patch
names, I switched to using git-format properly...  (I'll be posting a
new version with some bigger changes to the 0010 patch and some
answers to good questions you've asked soon.)

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
Here's a new rebased and debugged patch set.

On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <andres@anarazel.de> wrote:
> - Echoing concerns from other threads (Robert: ping): I'm doubtful that
>   it makes sense to size the number of parallel workers solely based on
>   the parallel scan node's size.  I don't think it's this patch's job to
>   change that, but to me it seriously amplifys that - I'd bet there's a
>   lot of cases with nontrivial joins where the benefit from parallelism
>   on the join level is bigger than on the scan level itself.  And the
>   number of rows in the upper nodes might also be bigger than on the
>   scan node level, making it more important to have higher number of
>   nodes.

Agreed that this is bogus.  The number of workers is really determined
by the outer path (the probe side), except that if the inner path (the
build side) is not big enough to warrant parallel workers at all then
parallelism is inhibited on that side.  That prevents small tables
from being loaded by Parallel Hash.  That is something we want, but
it's probably not doing it for the right reasons with the right
threshold -- about which more below.

> - If I understand the code in initial_cost_hashjoin() correctly, we
>   count the synchronization overhead once, independent of the number of
>   workers.  But on the other hand we calculate the throughput by
>   dividing by the number of workers.  Do you think that's right?

It's how long you think the average participant will have to wait for
the last participant to arrive, and I think that's mainly determined
by the parallel grain, not the number of workers.  If you're a work
that has reached the end of a scan, the best case is that every other
worker has already reached the end too and the worst case is that
another worker read the last granule (currently page) just before you
hit the end, so you'll have to wait for it to process a granule's
worth of work.

To show this I used dtrace to measure the number of microseconds spent
waiting at the barrier before probing while running a 5 million row
self-join 100 times, and got the following histograms:

1 worker:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@@@@@@@@@@@@@                   110
              20 |                                         1
              40 |@                                        5
              60 |@@@@@                                    24
              80 |@@@                                      14
             100 |@                                        5
             120 |@@@                                      16
             140 |@@@                                      17
             160 |@@                                       8
             180 |                                         0

2 workers:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@@@@@                           107
              20 |                                         1
              40 |@@@                                      21
              60 |@@@                                      25
              80 |@@                                       16
             100 |@@                                       14
             120 |@@@@@                                    38
             140 |@@@@@@@                                  51
             160 |@@@                                      20
             180 |                                         3
             200 |                                         1
             220 |                                         3
             240 |                                         0

3 workers:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@@                              113
              20 |@@                                       15
              40 |@@@                                      29
              60 |@@@@                                     35
              80 |@@@@                                     37
             100 |@@@@@@                                   56
             120 |@@@@@                                    51
             140 |@@@                                      31
             160 |@@                                       21
             180 |@                                        6

4 workers:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@                               121
              20 |                                         4
              40 |@@@                                      39
              60 |@@                                       29
              80 |@@                                       24
             100 |@@@@@@@                                  88
             120 |@@@@@@@                                  82
             140 |@@@@@                                    58
             160 |@@                                       26
             180 |@                                        15
             200 |@                                        9
             220 |                                         4
             240 |                                         1
             260 |                                         0

I didn't know what to expect above my machine's core count of 4, but
this is for 8:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@                                    116
              20 |                                         2
              40 |@@                                       36
              60 |@@@                                      69
              80 |@@@@                                     95
             100 |@@@@@                                    113
             120 |@@@                                      74
             140 |@@@                                      71
             160 |@@                                       44
             180 |@@                                       36
             200 |@                                        30
             220 |@                                        14
             240 |@                                        18
             260 |                                         8
             280 |                                         3
             300 |                                         4

It's true that the fraction of waits that go into the 0-20us bucket
(because the last to arrive at a barrier doesn't have to wait at all)
decreases as you add more workers, but above 1 worker the main story
is the bell curve (?) we see clustered around 100-120us, and it
doesn't seem to be moving.

If we call the fraction of samples outside the 0-20us bucket
"wait_probability" and call their average wait time
"expected_wait_cost", then one way to estimate this is something like:

   wait_probability * expected_wait_cost
 = (1 - 1 / participants) * (tuples_per_grain * cost_per_tuple * 0.5)

I don't think we can do that today, because we don't have access to
tuples_per_grain from the subplan.  That would in theory come
ultimately from the scan, adjusted as appropriate by selectivity
estimates.  The grain could in future be more than one page at a time
as proposed by David Rowley and others, or "it's complicated" for a
Parallel Append.  But I'm not sure if that's correct, doable or worth
doing, hence my attempt to provide a single knob to model this for
now.

I did some experiments to find a value of
parallel_synchronization_cost that avoids Parallel Hash when it won't
pay off, like this:

 * a "big" table with 1 million rows to be the outer relation
 * a "small" table with a range of sizes from 5k to 100k rows to hash
 * both tables have a unique integer key "a" and a 60 byte text column "b"
 * query (a): SELECT COUNT(*) FROM big JOIN small USING (a)
 * query (b): ... WHERE length(small.b) * 2 - len(small.b) = length(small.b)
 * work_mem set high enough that we never have multiple batches
 * one warmup run and then the median of 3 measurements
 * all default except min_parallel_table_scan_size = 0
 * 4 core developer machine
 * -O2, no asserts

Just to be clear:  The following number aren't supposed to be
impressive and are way shorter than the queries that Parallel Hash
feature is really intended to help with.  That's because we're
searching for the threshold below which Parallel Hash *doesn't* help,
and that involves running queries where there isn't much to hash.  The
times are for the complete query (ie include probing too, not just the
hash table build), and show "parallel-oblivious-hash-join-time ->
parallel-aware-hash-join-time" for queries "a" and "b" on patched
master.  I also compared with unpatched master to confirm that the
parallel-oblivious times on the left of the arrows match unpatched
master's, modulo a bit of noise.

1 worker:

 5,000 rows hashed:  (a) 157ms -> 166ms, (b) 166ms -> 183ms
 7,500 rows hashed:  (a) 162ms -> 174ms, (b) 176ms -> 182ms
10,000 rows hashed:  (a) 161ms -> 170ms, (b) 181ms -> 210ms
12,500 rows hashed:  (a) 169ms -> 175ms, (b) 194ms -> 188ms
15,000 rows hashed:  (a) 175ms -> 181ms, (b) 199ms -> 195ms
17,500 rows hashed:  (a) 173ms -> 175ms, (b) 201ms -> 202ms
20,000 rows hashed:  (a) 179ms -> 179ms, (b) 210ms -> 195ms <== a & b threshold
30,000 rows hashed:  (a) 196ms -> 192ms, (b) 244ms -> 218ms
40,000 rows hashed:  (a) 201ms -> 197ms, (b) 265ms -> 228ms
50,000 rows hashed:  (a) 217ms -> 251ms, (b) 294ms -> 249ms
60,000 rows hashed:  (a) 228ms -> 222ms, (b) 324ms -> 268ms
70,000 rows hashed:  (a) 230ms -> 214ms, (b) 338ms -> 275ms
80,000 rows hashed:  (a) 243ms -> 229ms, (b) 366ms -> 291ms
90,000 rows hashed:  (a) 256ms -> 239ms, (b) 391ms -> 311ms
100,000 rows hashed: (a) 266ms -> 248ms, (b) 420ms -> 326ms

2 workers:

 5,000 rows hashed:  (a) 110ms -> 115ms, (b) 118ms -> 127ms
 7,500 rows hashed:  (a) 115ms -> 128ms, (b) 131ms -> 128ms
10,000 rows hashed:  (a) 114ms -> 116ms, (b) 135ms -> 148ms
12,500 rows hashed:  (a) 126ms -> 126ms, (b) 145ms -> 131ms
15,000 rows hashed:  (a) 134ms -> 142ms, (b) 151ms -> 134ms
17,500 rows hashed:  (a) 125ms -> 122ms, (b) 153ms -> 147ms <== a & b threshold
20,000 rows hashed:  (a) 126ms -> 124ms, (b) 160ms -> 136ms
30,000 rows hashed:  (a) 144ms -> 132ms, (b) 191ms -> 152ms
40,000 rows hashed:  (a) 165ms -> 151ms, (b) 213ms -> 158ms
50,000 rows hashed:  (a) 161ms -> 143ms, (b) 240ms -> 171ms
60,000 rows hashed:  (a) 171ms -> 150ms, (b) 266ms -> 186ms
70,000 rows hashed:  (a) 176ms -> 151ms, (b) 283ms -> 190ms
80,000 rows hashed:  (a) 181ms -> 156ms, (b) 315ms -> 204ms
90,000 rows hashed:  (a) 189ms -> 164ms, (b) 338ms -> 214ms
100,000 rows hashed: (a) 207ms -> 177ms, (b) 362ms -> 232ms

3 workers:

 5,000 rows hashed:  (a)  90ms -> 103ms, (b) 107ms -> 118ms
 7,500 rows hashed:  (a) 106ms -> 104ms, (b) 115ms -> 118ms
10,000 rows hashed:  (a) 100ms ->  95ms, (b) 121ms -> 110ms <== b threshold
12,500 rows hashed:  (a) 103ms -> 120ms, (b) 134ms -> 113ms
15,000 rows hashed:  (a) 134ms -> 110ms, (b) 142ms -> 116ms <== a threshold
17,500 rows hashed:  (a) 110ms -> 104ms, (b) 146ms -> 123ms
20,000 rows hashed:  (a) 107ms -> 103ms, (b) 151ms -> 120ms
30,000 rows hashed:  (a) 124ms -> 110ms, (b) 183ms -> 135ms
40,000 rows hashed:  (a) 125ms -> 108ms, (b) 209ms -> 137ms
50,000 rows hashed:  (a) 133ms -> 115ms, (b) 238ms -> 150ms
60,000 rows hashed:  (a) 143ms -> 119ms, (b) 266ms -> 159ms
70,000 rows hashed:  (a) 146ms -> 120ms, (b) 288ms -> 165ms
80,000 rows hashed:  (a) 150ms -> 129ms, (b) 316ms -> 176ms
90,000 rows hashed:  (a) 159ms -> 126ms, (b) 343ms -> 187ms
100,000 rows hashed: (a) 176ms -> 136ms, (b) 370ms -> 195ms

4 workers:

 5,000 rows hashed:  (a)  93ms -> 103ms, (b) 109ms -> 117ms
 7,500 rows hashed:  (a) 106ms -> 102ms, (b) 121ms -> 115ms <== b threshold
10,000 rows hashed:  (a)  99ms -> 100ms, (b) 126ms -> 113ms
12,500 rows hashed:  (a) 107ms -> 102ms, (b) 137ms -> 117ms <== a threshold
15,000 rows hashed:  (a) 111ms -> 107ms, (b) 145ms -> 115ms
17,500 rows hashed:  (a) 110ms ->  10ms, (b) 151ms -> 118ms
20,000 rows hashed:  (a) 108ms -> 103ms, (b) 160ms -> 120ms
30,000 rows hashed:  (a) 120ms -> 108ms, (b) 196ms -> 127ms
40,000 rows hashed:  (a) 129ms -> 109ms, (b) 225ms -> 134ms
50,000 rows hashed:  (a) 140ms -> 121ms, (b) 262ms -> 148ms
60,000 rows hashed:  (a) 152ms -> 123ms, (b) 294ms -> 154ms
70,000 rows hashed:  (a) 157ms -> 122ms, (b) 322ms -> 165ms
80,000 rows hashed:  (a) 154ms -> 138ms, (b) 372ms -> 201ms
90,000 rows hashed:  (a) 186ms -> 122ms, (b) 408ms -> 180ms
100,000 rows hashed: (a) 170ms -> 124ms, (b) 421ms -> 186ms

I found that a good value of parallel_synchronization_cost that
enables Parallel Hash somewhere around those thresholds is 250 for
these test queries, so I have set that as the default in the new patch
set.

All of this might be considered moot, because I still needed to frob
min_parallel_table_scan_size to get a Parallel Hash below 90,0000 rows
anyway due to the policies in compute_parallel_worker().  So really
there is no danger of tables like the TPC-H "nation" and "region"
tables being loaded by Parallel Hash even if you set
parallel_synchronization_cost to 0, and probably no reason to worry to
much about its default value for now.  It could probably be argued
that we shouldn't have the GUC at all, but at least it provides a
handy way to enable and disable Parallel Hash!

One hidden factor here is that it takes a while for workers to start
up and the leader can scan thousands of rows before they arrive.  This
effect will presumably be exaggerated on systems with slow
fork/equivalent (Windows, some commercial Unices IIRC), and minimised
by someone writing a patch to reuse parallel workers.  I haven't tried
to investigate that effect because it doesn't seem very interesting or
likely to persist but it may contribute the experimental thresholds I
observed.

> - I haven't really grokked the deadlock issue you address. Could you
>   expand the comments on that? Possibly somewhere central referenced by
>   the various parts.

The central place is leader_gate.c.  What's wrong with the explanation in there?

Let me restate the problem here, and the three solutions I considered:

Problem: The leader must never be allowed to wait for other
participants that have emitted tuples (it doesn't matter whether that
waiting takes the form of latches, condition variables, barriers,
shm_queues or anything else).  Any participant that has emitted tuples
might currently be blocked waiting for the leader to drain the tuple
queue, so a deadlock could be created.

Concrete example: In this case, once we get past PHJ_PHASE_PROBING we
have to allow only the leader or the workers to continue.  Otherwise
some worker might be trying to finish probing by emitting tuples,
while the leader might be in BarrierWait() waiting for everyone to
finish probing.  This problems affects only outer joins (they have
wait to start PHJ_PHASE_UNMATCHED after probing) and multibatch joins
(they wait to be able to load the next batch).

Solution 1:  LeaderGate is a simple mechanism for reaching consensus
on whether the leader or a set of workers will be allowed to run after
a certain point, in this case the end of probing.  Concretely this
means that either the leader or any workers will drop out early at
that point, leaving nothing left to do.  This is made slightly more
complicated by the fact that we don't know up front if there are any
workers yet.

Solution 2:  Teach tuple queues to spill to disk instead of blocking
when full.  I think this behaviour should probably only be activated
while the leader is running the plan rather than draining tuple
queues; the current block-when-full behaviour would still be
appropriate if the leader is simply unable to drain queues fast
enough.  Then the deadlock risk would go away.

Solution 3:  An asynchronous executor model where you don't actually
wait synchronously at barriers -- instead you detach and go and do
something else, but come back and reattach when there is progress to
be made.  I have some ideas about that but they are dependent on the
async execution project reaching a fairly advanced state first.

When I wrote it, I figured that leader_gate.c was cheap and would do
for now, but I have to admit that it's quite confusing and it sucks
that later batches lose a core.  I'm now thinking that 2 may be a
better idea.  My first thought is that Gather needs a way to advertise
that it's busy while running the plan, shm_mq needs a slightly
different all-or-nothing nowait mode, and TupleQueue needs to write to
a shared tuplestore or other temp file-backed mechanism when
appropriate.  Thoughts?

> - maybe I'm overly paranoid, but it might not be bad to add some extra
>   checks for ExecReScanHashJoin ensuring that it doesn't get called when
>   workers are still doing something.

Check out ExecReScanGather(): it shuts down and waits for all workers
to complete, which makes the assumptions in ExecReScanHashJoin() true.
If a node below Gather but above Hash Join could initiate a rescan
then the assumptions would not hold.  I am not sure what it would mean
though and we don't generate any such plans today to my knowledge.  It
doesn't seem to make sense for the inner side of Nested Loop to be
partial.  Have I missed something here?

It looks like some details may have changed here due to 41b0dd98 and
nearby commits, and I may need to implement at least ReInitializeDSM.

I also need a regression test to hit the rescan but I'm not sure how
to write one currently.  In an earlier version of this patch set I
could do it by setting shared_tuple_cost (a GUC I no longer have) to a
negative number, which essentially turned our optimiser into a
pessimiser capable of producing a nested loop that rescans a gather
node, forking workers for every row...

> - seems like you're dereffing tuple unnecessarily here:
>
> +               tuple = (HashJoinTuple)
> +                       dsa_get_address(hashtable->area, detached_chain_shared);
> +               ExecHashTransferSkewTuples(hashtable, detached_chain,

Yes, here lurked a bug, fixed.

> - The names here could probably improved some:
> +               case WAIT_EVENT_HASH_SHRINKING1:
> +                       event_name = "Hash/Shrinking1";
> +                       break;
> +               case WAIT_EVENT_HASH_SHRINKING2:
> +                       event_name = "Hash/Shrinking2";
> +                       break;
> +               case WAIT_EVENT_HASH_SHRINKING3:
> +                       event_name = "Hash/Shrinking3";
> +                       break;
> +               case WAIT_EVENT_HASH_SHRINKING4:
> +                       event_name = "Hash/Shrinking4";

Fixed.

> - why are we restricting rows_total bit to parallel aware?
>
> +       /*
> +        * If parallel-aware, the executor will also need an estimate of the total
> +        * number of rows expected from all participants so that it can size the
> +        * shared hash table.
> +        */
> +       if (best_path->jpath.path.parallel_aware)
> +       {
> +               hash_plan->plan.parallel_aware = true;
> +               hash_plan->rows_total = best_path->inner_rows_total;
> +       }
> +

I could set it unconditionally and then skip this bit that receives the number:

    rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;

Do you think it would be better to push plan_rows_total into Plan instead?

> - seems we need a few more test - I don't think the existing tests are
>   properly going to exercise the skew stuff, multiple batches, etc?
>   This is nontrivial code, I'd really like to see a high test coverage
>   of the new code.

I've added some regression tests in a patch to apply before making any
changes.  You would have to change the "explain (costs off)" to
"explain analyze" to verify the claims I put in comments about the
number of batches and peak memory usage in the work_mem management
tests.  I chose to put them into join.sql, and then a later patch adds
parallel-aware versions.  (An alternative would be to put them into
select.sql and select_parallel.sql, but it seemed better to keep the
non-parallel, parallel with parallel-oblivious join and parallel-aware
cases next to each other.)

While testing I found a timing bug that could produce incorrect query
results because of the empty hash table optimisation, because it had
an incorrect value hashtable->totalTuples == 0.  Fixed (see code in
the "finish:" case in MultiExecHash()).

Last week I finally figured out a way to test different startup
timings, considering the complexity created by the "flash mob" problem
I described when I first proposed dynamic barriers[1].  If you build
with -DBARRIER_DEBUG in the attached patch set you get a new GUC
"barrier_attach_sequence" which you can set like this:

  SET barrier_attach_phases = 'HashJoin.barrier:2,7,0';

That list of number tells it which phase each participant should
simulate attaching at.  In that example the leader will attach at
phase 2 (PHJ_PHASE_BUILDING), worker 0 will attach at 7
(PHJ_PHASE_RESETTING_BATCH(1)) and worker 1 will attach at 0
(PHJ_PHASE_BEGINNING).  Note that *someone* has to start at 0 or bad
things will happen.

Using this technique I can now use simple scripts to test every case
in the switch statements that appear in three places in the patch.
See attached file parallel-hash-attach-phases.sql.

I'm not sure whether, and if so how, to package any such tests for the
regression suite, since they require a special debug build.

Ideally I would also like to find a way to tell Gather not to run the
plan in the leader (a bit like single_copy mode, except allowing
multiple workers to run the plan, and raising an error out if no
workers could be launched).

> - might not hurt to reindent before the final submission

Will do.

> - Unsurprisingly, please implement the FIXME ;)

This must refer to a note about cleaning up skew buckets after they're
not needed, which I've now done.

Some other things:

Previously I failed to initialise the atomics in the shared skew hash
table correctly, and also I used memset to overwrite atomics when
loading a new batch.  This worked on modern systems but would of
course fail when using emulated atomics.  Fixed in the attached.

In the process I discovered that initialising and clearing large hash
tables this way is quite a lot slower than memset on my machine under
simple test conditions.  I think it might be worth experimenting with
a array-oriented atomic operations that have a specialisation for 0
that just uses memset if it can (something like
pg_atomic_init_u64_array(base, stride, n, 0)).  I also think it may be
interesting to parallelise the initialisation and reset of the hash
table, since I've seen cases where I have 7 backends waiting on a
barrier while one initialises a couple of GB of memory for several
seconds.  Those are just small optimisations though and I'm not
planning to investigate them until after the basic patch is in
committable form.

[1] https://www.postgresql.org/message-id/CAEepm=3yJ65sQZUAhfF3S7UfEv83X_rnH5a4-JXmqxGQRQ+7qQ@mail.gmail.com

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> Solution 2:  Teach tuple queues to spill to disk instead of blocking
> when full.  I think this behaviour should probably only be activated
> while the leader is running the plan rather than draining tuple
> queues; the current block-when-full behaviour would still be
> appropriate if the leader is simply unable to drain queues fast
> enough.  Then the deadlock risk would go away.
>
> When I wrote it, I figured that leader_gate.c was cheap and would do
> for now, but I have to admit that it's quite confusing and it sucks
> that later batches lose a core.  I'm now thinking that 2 may be a
> better idea.  My first thought is that Gather needs a way to advertise
> that it's busy while running the plan, shm_mq needs a slightly
> different all-or-nothing nowait mode, and TupleQueue needs to write to
> a shared tuplestore or other temp file-backed mechanism when
> appropriate.  Thoughts?

The problem with solution 2 is that it might lead to either (a)
unbounded amounts of stuff getting spooled to files or (b) small spool
files being repeatedly created and deleted depending on how the leader
is spending its time.  If you could spill only when the leader is
actually waiting for the worker, that might be OK.

> Check out ExecReScanGather(): it shuts down and waits for all workers
> to complete, which makes the assumptions in ExecReScanHashJoin() true.
> If a node below Gather but above Hash Join could initiate a rescan
> then the assumptions would not hold.  I am not sure what it would mean
> though and we don't generate any such plans today to my knowledge.  It
> doesn't seem to make sense for the inner side of Nested Loop to be
> partial.  Have I missed something here?

I bet this could happen, although recent commits have demonstrated
that my knowledge of how PostgreSQL handles rescans is less than
compendious.  Suppose there's a Nested Loop below the Gather and above
the Hash Join, implementing a join condition that can't give rise to a
parameterized path, like a.x + b.x = 0.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Sat, Sep 2, 2017 at 5:13 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> Check out ExecReScanGather(): it shuts down and waits for all workers
>> to complete, which makes the assumptions in ExecReScanHashJoin() true.
>> If a node below Gather but above Hash Join could initiate a rescan
>> then the assumptions would not hold.  I am not sure what it would mean
>> though and we don't generate any such plans today to my knowledge.  It
>> doesn't seem to make sense for the inner side of Nested Loop to be
>> partial.  Have I missed something here?
>
> I bet this could happen, although recent commits have demonstrated
> that my knowledge of how PostgreSQL handles rescans is less than
> compendious.  Suppose there's a Nested Loop below the Gather and above
> the Hash Join, implementing a join condition that can't give rise to a
> parameterized path, like a.x + b.x = 0.

Hmm.  I still don't see how that could produce a rescan of a partial
path without an intervening Gather, and I would really like to get to
the bottom of this.

At the risk of mansplaining the code that you wrote and turning out to
be wrong:  A Nested Loop can't ever have a partial path on the inner
side.  Under certain circumstances it can have a partial path on the
outer side, because its own results are partial, but for each outer
row it needs to do a total (non-partial) scan of the inner side so
that it can reliably find or not find matches.  Therefore we'll never
rescan partial paths directly, we'll only ever rescan partial paths
indirectly via a Gatheroid node that will synchronise the rescan of
all children to produce a non-partial result.

There may be more reasons to rescan that I'm not thinking of.  But the
whole idea of a rescan seems to make sense only for non-partial paths.
What would it even mean for a worker process to decide to rescan (say)
a Seq Scan without any kind of consensus?

Thought experiment: I suppose we could consider replacing Gather's
clunky shut-down-and-relaunch-workers synchronisation technique with a
new protocol where the Gather node sends a 'rescan!' message to each
worker and then discards their tuples until it receives 'OK, rescan
starts here', and then each parallel-aware node type supplies its own
rescan synchronisation logic as appropriate.  For example, Seq Scan
would somehow need to elect one participant to run
heap_parallelscan_reinitialize and others would wait until it has
done.  This might not be worth the effort, but thinking about this
problem helped me see that rescan of a partial plan without a Gather
node to coordinate doesn't make any sense.

Am I wrong?

-- 
Thomas Munro
http://www.enterprisedb.com



Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Fri, Sep 1, 2017 at 6:32 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Sat, Sep 2, 2017 at 5:13 AM, Robert Haas <robertmhaas@gmail.com> wrote:
>> On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
>> <thomas.munro@enterprisedb.com> wrote:
>>> Check out ExecReScanGather(): it shuts down and waits for all workers
>>> to complete, which makes the assumptions in ExecReScanHashJoin() true.
>>> If a node below Gather but above Hash Join could initiate a rescan
>>> then the assumptions would not hold.  I am not sure what it would mean
>>> though and we don't generate any such plans today to my knowledge.  It
>>> doesn't seem to make sense for the inner side of Nested Loop to be
>>> partial.  Have I missed something here?
>>
>> I bet this could happen, although recent commits have demonstrated
>> that my knowledge of how PostgreSQL handles rescans is less than
>> compendious.  Suppose there's a Nested Loop below the Gather and above
>> the Hash Join, implementing a join condition that can't give rise to a
>> parameterized path, like a.x + b.x = 0.
>
> Hmm.  I still don't see how that could produce a rescan of a partial
> path without an intervening Gather, and I would really like to get to
> the bottom of this.

I'm thinking about something like this:

Gather
-> Nested Loop -> Parallel Seq Scan -> Hash Join   -> Seq Scan   -> Parallel Hash     -> Parallel Seq Scan

The hash join has to be rescanned for every iteration of the nested loop.

Maybe I'm confused.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Sat, Sep 2, 2017 at 10:45 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> On Fri, Sep 1, 2017 at 6:32 PM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> On Sat, Sep 2, 2017 at 5:13 AM, Robert Haas <robertmhaas@gmail.com> wrote:
>>> On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
>>> <thomas.munro@enterprisedb.com> wrote:
>>>> Check out ExecReScanGather(): it shuts down and waits for all workers
>>>> to complete, which makes the assumptions in ExecReScanHashJoin() true.
>>>> If a node below Gather but above Hash Join could initiate a rescan
>>>> then the assumptions would not hold.  I am not sure what it would mean
>>>> though and we don't generate any such plans today to my knowledge.  It
>>>> doesn't seem to make sense for the inner side of Nested Loop to be
>>>> partial.  Have I missed something here?
>>>
>>> I bet this could happen, although recent commits have demonstrated
>>> that my knowledge of how PostgreSQL handles rescans is less than
>>> compendious.  Suppose there's a Nested Loop below the Gather and above
>>> the Hash Join, implementing a join condition that can't give rise to a
>>> parameterized path, like a.x + b.x = 0.
>>
>> Hmm.  I still don't see how that could produce a rescan of a partial
>> path without an intervening Gather, and I would really like to get to
>> the bottom of this.
>
> I'm thinking about something like this:
>
> Gather
> -> Nested Loop
>   -> Parallel Seq Scan
>   -> Hash Join
>     -> Seq Scan
>     -> Parallel Hash
>       -> Parallel Seq Scan
>
> The hash join has to be rescanned for every iteration of the nested loop.

I think you mean:
Gather-> Nested Loop  -> Parallel Seq Scan  -> Parallel Hash Join    -> Parallel Seq Scan    -> Parallel Hash      ->
ParallelSeq Scan
 

... but we can't make plans like that and they would produce nonsense
output.  The Nested Loop's inner plan is partial, but
consider_parallel_nestloop only makes plans with parallel-safe but
non-partial ("complete") inner paths.

/** consider_parallel_nestloop*        Try to build partial paths for a joinrel by joining a
partial path for the*        outer relation to a complete path for the inner relation.*

-- 
Thomas Munro
http://www.enterprisedb.com



Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Fri, Sep 1, 2017 at 7:42 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
>> I'm thinking about something like this:
>>
>> Gather
>> -> Nested Loop
>>   -> Parallel Seq Scan
>>   -> Hash Join
>>     -> Seq Scan
>>     -> Parallel Hash
>>       -> Parallel Seq Scan
>>
>> The hash join has to be rescanned for every iteration of the nested loop.
>
> I think you mean:
>
>  Gather
>  -> Nested Loop
>    -> Parallel Seq Scan
>    -> Parallel Hash Join
>      -> Parallel Seq Scan
>      -> Parallel Hash
>        -> Parallel Seq Scan

I don't, though, because that's nonsense.  Maybe what I wrote is also
nonsense, but it is at least different nonsense.

Let's try it again with some table names:

Gather
-> Nested Loop -> Parallel Seq Scan on a -> (Parallel?) Hash Join   -> Seq Scan on b (NOT A PARALLEL SEQ SCAN)   ->
ParallelHash     -> Parallel Seq Scan on c
 

I argue that this is a potentially valid plan.  b, of course, has to
be scanned in its entirety by every worker every time through, which
is why it's not a Parallel Seq Scan, but that requirement does not
apply to c.  If we take all the rows in c and stick them into a
DSM-based hash table, we can reuse them every time the hash join is
rescanned and, AFAICS, that should work just fine, and it's probably a
win over letting each worker build a separate copy of the hash table
on c, too.

Of course, there's the "small" problem that I have no idea what to do
if the b-c join is (or becomes) multi-batch.  When I was thinking
about this before, I was imagining that this case might Just Work with
your patch provided that you could generate a plan shaped like this,
but now I see that that's not actually true, because of multiple
batches.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [HACKERS] Parallel Hash take II

From
Prabhat Sahu
Date:

On Thu, Aug 31, 2017 at 6:23 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote:
Here's a new rebased and debugged patch set.

Hi Thomas,

I have applied the recent patch (v19) and started testing on this feature and i got a crash with below testcase.

with default setting on "postgres.conf" file

create table tab1 (a int, b text);
create table tab2 (a int, b text);
insert into tab1 (select x, x||'_b' from generate_series(1,200000) x);
insert into tab2 (select x%20000, x%20000||'_b' from generate_series(1,200000) x);
ANALYZE;
select * from tab1 t1, tab2 t2, tab1 t3 where t1.a = t2.a and  t2.b = t3.b order by 1;

WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
!> 

Kindly check, if you can reproduce this at your end.


Thanks & Regards,

Prabhat Kumar Sahu
Mob: 7758988455
Skype ID: prabhat.sahu1984

 

Re: [HACKERS] Parallel Hash take II

From
Prabhat Sahu
Date:
Hi Thomas,

Setting with lower "shared_buffers" and "work_mem" as below,  query getting crash but able to see explain plan.

shared_buffers = 1MB
work_mem = 1MB
max_parallel_workers_per_gather = 4
max_parallel_workers = 8
enable_mergejoin = off
enable_nestloop = off
enable_hashjoin = on
force_parallel_mode = on
seq_page_cost = 0.1
random_page_cost = 0.1
effective_cache_size = 128MB
parallel_tuple_cost = 0
parallel_setup_cost = 0
parallel_synchronization_cost = 0

CREATE TABLE t1 (a int, b text);
INSERT INTO t1 (SELECT x%20000, x%20000||'_b' FROM generate_series(1,200000) x);
ANALYZE;

postgres=# explain select * from t1, t1 t2 where t1.a = t2.a;
                                       QUERY PLAN                                        
-----------------------------------------------------------------------------------------
 Gather  (cost=2852.86..16362.74 rows=2069147 width=22)
   Workers Planned: 1
   ->  Parallel Hash Join  (cost=2852.86..16362.74 rows=1217145 width=22)
         Hash Cond: (t1.a = t2.a)
         ->  Parallel Seq Scan on t1  (cost=0.00..1284.57 rows=117647 width=11)
         ->  Parallel Hash  (cost=1284.57..1284.57 rows=117647 width=11)
               ->  Parallel Seq Scan on t1 t2  (cost=0.00..1284.57 rows=117647 width=11)
(7 rows)

postgres=# select * from t1, t1 t2 where t1.a = t2.a;
WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
!> 


-- After assigning more "shared_buffers(10MB)" and "work_mem(10MB)" query execute successfully.

Kindly check, if you can reproduce this at your end.

Thanks & Regards,

Prabhat Kumar Sahu
Mob: 7758988455
Skype ID: prabhat.sahu1984


On Wed, Sep 13, 2017 at 12:34 PM, Prabhat Sahu <prabhat.sahu@enterprisedb.com> wrote:

On Thu, Aug 31, 2017 at 6:23 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote:
Here's a new rebased and debugged patch set.

Hi Thomas,

I have applied the recent patch (v19) and started testing on this feature and i got a crash with below testcase.

with default setting on "postgres.conf" file

create table tab1 (a int, b text);
create table tab2 (a int, b text);
insert into tab1 (select x, x||'_b' from generate_series(1,200000) x);
insert into tab2 (select x%20000, x%20000||'_b' from generate_series(1,200000) x);
ANALYZE;
select * from tab1 t1, tab2 t2, tab1 t3 where t1.a = t2.a and  t2.b = t3.b order by 1;

WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
!> 

Kindly check, if you can reproduce this at your end.


Thanks & Regards,

Prabhat Kumar Sahu
Mob: 7758988455
Skype ID: prabhat.sahu1984

 

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Sep 14, 2017 at 12:51 AM, Prabhat Sahu
<prabhat.sahu@enterprisedb.com> wrote:
> Setting with lower "shared_buffers" and "work_mem" as below,  query getting crash but able to see explain plan.

Thanks Prabhat.  A small thinko in the batch reset code means that it
sometimes thinks the shared skew hash table is present and tries to
probe it after batch 1.  I have a fix for that and I will post a new
patch set just as soon as I have a good regression test figured out.

-- 
Thomas Munro
http://www.enterprisedb.com


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Sep 14, 2017 at 11:57 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Thu, Sep 14, 2017 at 12:51 AM, Prabhat Sahu
> <prabhat.sahu@enterprisedb.com> wrote:
>> Setting with lower "shared_buffers" and "work_mem" as below,  query getting crash but able to see explain plan.
>
> Thanks Prabhat.  A small thinko in the batch reset code means that it
> sometimes thinks the shared skew hash table is present and tries to
> probe it after batch 1.  I have a fix for that and I will post a new
> patch set just as soon as I have a good regression test figured out.

Fixed in the attached version, by adding a missing
"hashtable->shared->num_skew_buckets = 0;" to ExecHashFreeSkewTable().
I did some incidental tidying of the regression tests, but didn't
manage to find a version of your example small enough to put in a
regression tests.  I also discovered some other things:

1.  Multi-batch Parallel Hash Join could occasionally produce a
resowner warning about a leaked temporary File associated with
SharedTupleStore objects.  Fixed by making sure we call routines that
close all files handles in ExecHashTableDetach().

2.  Since last time I tested, a lot fewer TPCH queries choose a
Parallel Hash plan.  Not sure why yet.  Possibly because Gather Merge
and other things got better.  Will investigate.

3.  Gather Merge and Parallel Hash Join may have a deadlock problem.
Since Gather Merge needs to block waiting for tuples, but workers wait
for all participants (including the leader) to reach barriers.  TPCH
Q18 (with a certain set of indexes and settings, YMMV) has Gather
Merge over Sort over Parallel Hash Join, and although it usually runs
successfully I have observed one deadlock.  Ouch.  This seems to be a
more fundamental problem than the blocked TupleQueue scenario.  Not
sure what to do about that.

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Thu, Sep 14, 2017 at 10:01 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> 3.  Gather Merge and Parallel Hash Join may have a deadlock problem.
> Since Gather Merge needs to block waiting for tuples, but workers wait
> for all participants (including the leader) to reach barriers.  TPCH
> Q18 (with a certain set of indexes and settings, YMMV) has Gather
> Merge over Sort over Parallel Hash Join, and although it usually runs
> successfully I have observed one deadlock.  Ouch.  This seems to be a
> more fundamental problem than the blocked TupleQueue scenario.  Not
> sure what to do about that.

Thomas and I spent about an hour and a half brainstorming about this
just now.  Parallel query doesn't really have a documented deadlock
avoidance strategy, yet all committed and proposed patches other than
this one manage to avoid deadlock.  This one has had a number of
problems crop up in this area, so it struck me that it might be
violating a rule which every other patch was following.  I struggled
for a bit and finally managed to articulate what I think the
deadlock-avoidance rule is that is generally followed by other
committed and proposed patches:

<rule>
Once you enter a state in which other participants might wait for you,
you must exit that state before doing anything that might wait for
another participant.
</rule>

From this, it's easy to see that the waits-for graph can't contain any
cycles: if every parallel query node obeys the above rule, then a
given node can have in-arcs or out-arcs, but not both.  I also believe
it to be the case that every existing node follows this rule.  For
instance, Gather and Gather Merge wait for workers, but they aren't at
that point doing anything that can make the workers wait for them.
Parallel Bitmap Heap Scan waits for the leader to finish building the
bitmap, but that leader never waits for anyone else while building the
bitmap.  Parallel Index(-Only) Scan waits for the process advancing
the scan to reach the next page, but that process never waits for any
other while so doing.  Other types of parallel nodes -- including the
proposed Parallel Append node, which is an interesting case because
like Parallel Hash it appears in the "middle" of the parallel portion
of the plan tree rather than the root like Gather or the leaves like a
parallel scan -- don't wait at all, except for short
spinlock-protected or LWLock-protected critical sections during which
they surely don't go into any sort of long-term wait (which would be
unacceptable for other reasons anyway).

Parallel hash violates this rule only in the case of a multi-batch
hash join, and for only one reason: to avoid blowing out work_mem.
Since, consistent with resource management decisions elsewhere, each
participant is entitled to an amount of memory equal to work_mem, the
shared hash table can and does use up to (participants * work_mem),
which means that we must wait for everybody to be done with the hash
table for batch N before building the hash table for batch N+1.  More
properly, if the hash table for the current batch happens to be
smaller than the absolute maximum amount of memory we can use, we can
build the hash table for the next batch up to the point where all the
memory is used, but must then pause and wait for the old hash table to
go away before continuing.  But that means that the process for which
we are waiting violated the rule mentioned above: by not being done
with the memory, it's making other processes wait, and by returning a
tuple, it's allowing other parts of the executor to do arbitrary
computations which can themselves wait.  So, kaboom.

One simple and stupid way to avoid this deadlock is to reduce the
memory budget for the shared hash table to work_mem and remove the
barriers that prevent more than one such hash table from existing at a
time.  In the worst case, we still use (participants * work_mem),
frequently we'll use less, but there are no longer any waits for
processes that might not even be running the parallel has node
(ignoring the moment the problem of right and full parallel hash
joins, which might need more thought).  So no deadlock.

We can do better.  First, as long as nbatches == 1, we can use a hash
table of up to size (participants * work_mem); if we have to switch to
multiple batches, then just increase the number of batches enough that
the current memory usage drops below work_mem.  Second, following an
idea originally by Ashutosh Bapat whose relevance to this issue Thomas
Munro realized during our discussion, we can make all the batches
small enough to fit in work_mem (rather than participants * work_mem
as the current patch does) and spread them across the workers (in the
style of Parallel Append, including potentially deploying multiple
workers against the same batch if there are fewer batches than
workers).  Then, single-batch parallel hash joins use the maximum
allowable memory always, and multi-batch parallel hash joins use the
maximum allowable memory after the first batch.  Not perfect, but not
bad, and definitely better than deadlocking.  Further refinements
might be possible.

If we don't adopt some approach along these lines, then I think we've
got to articulate some alternative deadlock-avoidance rule and make
sure every parallel query facility follows it.  I welcome ideas on
that front, but I don't think the rule mentioned above is a bad one,
and I'd obviously like to minimize the amount of rework that we need
to do.  Assuming we do settle on the above rule, it clearly needs to
be documented someplace -- not sure of the place.  I think that it
doesn't belong in README.parallel because it's an executor-specific
rule, not necessarily a general rule to which other users of
parallelism must adhere; they can choose their own strategies.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Peter Geoghegan
Date:
On Mon, Sep 18, 2017 at 1:06 PM, Robert Haas <robertmhaas@gmail.com> wrote:
> If we don't adopt some approach along these lines, then I think we've
> got to articulate some alternative deadlock-avoidance rule and make
> sure every parallel query facility follows it.  I welcome ideas on
> that front, but I don't think the rule mentioned above is a bad one,
> and I'd obviously like to minimize the amount of rework that we need
> to do.  Assuming we do settle on the above rule, it clearly needs to
> be documented someplace -- not sure of the place.  I think that it
> doesn't belong in README.parallel because it's an executor-specific
> rule, not necessarily a general rule to which other users of
> parallelism must adhere; they can choose their own strategies.

+1

Graefe's "Query Evaluation Techniques for Large Databases" has several
pages on deadlock avoidance strategies. It was written almost 25 years
ago, but still has some good insights IMV (you'll recall that Graefe
is the author of the Volcano paper; this reference paper seems like
his follow-up). Apparently, deadlock avoidance strategy becomes
important for parallel sort with partitioning. You may be able to get
some ideas from there. And even if you don't, his handling of the
topic is very deliberate and high level, which suggests that ours
should be, too.

-- 
Peter Geoghegan


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Sep 21, 2017 at 5:49 AM, Peter Geoghegan <pg@bowt.ie> wrote:
> Graefe's "Query Evaluation Techniques for Large Databases" has several
> pages on deadlock avoidance strategies. It was written almost 25 years
> ago, but still has some good insights IMV (you'll recall that Graefe
> is the author of the Volcano paper; this reference paper seems like
> his follow-up). Apparently, deadlock avoidance strategy becomes
> important for parallel sort with partitioning. You may be able to get
> some ideas from there. And even if you don't, his handling of the
> topic is very deliberate and high level, which suggests that ours
> should be, too.

Very interesting and certainly relevant (the parts I've read so far),
though we don't have multiple consumers.  Multiplexing one thread so
that it is both a consumer and a producer is an extra twist though.

-- 
Thomas Munro
http://www.enterprisedb.com


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Rushabh Lathia
Date:
v20 patch set (I was trying 0008, 0009 patch)  not getting cleanly apply on
latest commit also getting compilation error due to refactor in below commit.

commit 0c5803b450e0cc29b3527df3f352e6f18a038cc6
Author: Peter Eisentraut <peter_e@gmx.net>
Date:   Sat Sep 23 09:49:22 2017 -0400

    Refactor new file permission handling



On Mon, Sep 25, 2017 at 11:38 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote:
On Thu, Sep 21, 2017 at 5:49 AM, Peter Geoghegan <pg@bowt.ie> wrote:
> Graefe's "Query Evaluation Techniques for Large Databases" has several
> pages on deadlock avoidance strategies. It was written almost 25 years
> ago, but still has some good insights IMV (you'll recall that Graefe
> is the author of the Volcano paper; this reference paper seems like
> his follow-up). Apparently, deadlock avoidance strategy becomes
> important for parallel sort with partitioning. You may be able to get
> some ideas from there. And even if you don't, his handling of the
> topic is very deliberate and high level, which suggests that ours
> should be, too.

Very interesting and certainly relevant (the parts I've read so far),
though we don't have multiple consumers.  Multiplexing one thread so
that it is both a consumer and a producer is an extra twist though.

--
Thomas Munro
http://www.enterprisedb.com


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers



--
Rushabh Lathia

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Oct 5, 2017 at 7:07 PM, Rushabh Lathia <rushabh.lathia@gmail.com> wrote:
> v20 patch set (I was trying 0008, 0009 patch)  not getting cleanly apply on
> latest commit also getting compilation error due to refactor in below
> commit.
>
> commit 0c5803b450e0cc29b3527df3f352e6f18a038cc6

Hi Rushabh

I am about to post a new patch set that fixes the deadlock problem,
but in the meantime here is a rebase of those two patches (numbers
changed to 0006 + 0007).  In the next version I think I should
probably remove that 'stripe' concept.  The idea was to spread
temporary files over the available temporary tablespaces, but it's a
terrible API, since you have to promise to use the same stripe number
when opening the same name later... Maybe I should use a hash of the
name for that instead.  Thoughts?

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Prabhat Sahu
Date:
Hi Thomas,

I was testing this feature with v20 patch, and I got a crash while doing large joins with small work_mem, and lots of workers as below.

-- Machine Configuration: (d1.xlarge) CUPs : 8 , RAM  : 16GB , SIze : 640GB

-- postgres.conf setting as below:
work_mem = 64kB
max_parallel_workers_per_gather = 128
max_parallel_workers = 64
enable_mergejoin = off
enable_nestloop = off
enable_hashjoin = on
force_parallel_mode = on
seq_page_cost = 0.1
random_page_cost = 0.1
effective_cache_size = 128MB
parallel_tuple_cost = 0
parallel_setup_cost = 0
parallel_synchronization_cost = 0

-- created only one table "lineitem" of size 93GB.
postgres=# select pg_size_pretty(pg_total_relation_size('lineitem'));
 pg_size_pretty
----------------
 93 GB
(1 row)

[centos@centos-prabhat bin]$ vi test10.sql
explain (analyze, costs off)
select  count(*) from lineitem t1 join lineitem t2 using(l_suppkey) join lineitem t3 using(l_suppkey);
select  count(*) from lineitem t1 join lineitem t2 using(l_suppkey) join lineitem t3 using(l_suppkey);

[centos@centos-prabhat bin]$ ./psql postgres -a -f test10.sql > test10.out

[centos@centos-prabhat bin]$ vi test10.out
explain (analyze, costs off)
select  count(*) from lineitem t1 join lineitem t2 using(l_suppkey) join lineitem t3 using(l_suppkey);
psql:test10.sql:2: WARNING:  terminating connection because of crash of another server process
DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
psql:test10.sql:2: server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
psql:test10.sql:2: connection to server was lost


Kindly check, if you can reproduce the same at your end.



Thanks & Regards,

Prabhat Kumar Sahu
Mob: 7758988455
Skype ID: prabhat.sahu1984


On Thu, Oct 5, 2017 at 1:15 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote:
On Thu, Oct 5, 2017 at 7:07 PM, Rushabh Lathia <rushabh.lathia@gmail.com> wrote:
> v20 patch set (I was trying 0008, 0009 patch)  not getting cleanly apply on
> latest commit also getting compilation error due to refactor in below
> commit.
>
> commit 0c5803b450e0cc29b3527df3f352e6f18a038cc6

Hi Rushabh

I am about to post a new patch set that fixes the deadlock problem,
but in the meantime here is a rebase of those two patches (numbers
changed to 0006 + 0007).  In the next version I think I should
probably remove that 'stripe' concept.  The idea was to spread
temporary files over the available temporary tablespaces, but it's a
terrible API, since you have to promise to use the same stripe number
when opening the same name later... Maybe I should use a hash of the
name for that instead.  Thoughts?

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Tue, Sep 19, 2017 at 8:06 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> On Thu, Sep 14, 2017 at 10:01 AM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> 3.  Gather Merge and Parallel Hash Join may have a deadlock problem.
>
> [...]
>
> Thomas and I spent about an hour and a half brainstorming about this
> just now.
>
> [...]
>
> First, as long as nbatches == 1, we can use a hash
> table of up to size (participants * work_mem); if we have to switch to
> multiple batches, then just increase the number of batches enough that
> the current memory usage drops below work_mem.  Second, following an
> idea originally by Ashutosh Bapat whose relevance to this issue Thomas
> Munro realized during our discussion, we can make all the batches
> small enough to fit in work_mem (rather than participants * work_mem
> as the current patch does) and spread them across the workers (in the
> style of Parallel Append, including potentially deploying multiple
> workers against the same batch if there are fewer batches than
> workers).

Here is an updated patch set that does that ^.

Assorted details:

1.  To avoid deadlocks, we can only wait at barriers when we know that
all other attached backends have either arrived already or are
actively executing the code preceding the barrier wait, so that
progress is guaranteed.  The rules is that executor nodes can remain
attached to a barrier after they've emitted a tuple, which is useful
for resource management (ie avoids inventing a separate reference
counting scheme), but must never again wait for it.  With that
programming rule there can be no deadlock between executor nodes.

2.  Multiple batches are processed at the same time in parallel,
rather than being processed serially.  Participants try to spread
themselves out over different batches to reduce contention.

3.  I no longer try to handle outer joins.  I have an idea for how to
do that while adhering to the above deadlock-avoidance programming
rule, but I would like to consider that for a later patch.

4.  I moved most of the parallel-aware code into ExecParallelHash*()
functions that exist alongside the private hash table versions.  This
avoids uglifying the existing hash join code and introducing
conditional branches all over the place, as requested by Andres.  This
made some of the earlier refactoring patches unnecessary.

5.  Inner batch repartitioning, if required, is now completed up front
for Parallel Hash.  Rather than waiting until we try to load hash
tables back into memory to discover that they don't fit, this version
tracks the size of hash table contents while writing the batches out.
That change has various pros and cons, but its main purpose is to
remove dependencies between batches.

6.  Outer batch repartitioning is now done up front too, if it's
necessary.  This removes the dependencies that exist between batch 0
and later batches, but means that outer batch 0 is now written to disk
if for multi-batch joins.  I don't see any way to avoid that while
adhering to the deadlock avoidance rule stated above.  If we've
already started emitting tuples for batch 0 (by returning control to
higher nodes) then we have no deadlock-free way to wait for the scan
of the outer relation to finish, so we can't safely process later
batches.  Therefore we must buffer batch 0's tuples.  This renders the
skew optimisation useless.

7.  There is now some book-keeping state for each batch.  For typical
cases the total space used is negligible but obviously you can
contrive degenerate cases where it eats a lot of memory (say by
creating a million batches, which is unlikely to work well for other
reasons).  I have some ideas on reducing their size, but on the other
hand I also have some ideas on increasing it profitably... (this is
the perfect place to put the Bloom filters discussed elsewhere that
would  make up for the loss of the skew optimisation, for selective
joins; a subject for another day).

8.  Barrier API extended slightly.  (1) BarrierWait() is renamed to
BarrierArriveAndWait().  (2) BarrierArriveAndDetach() is new.  The new
function is the mechanism required to get from PHJ_BATCH_PROBING to
PHJ_BATCH_DONE without waiting, and corresponds to the operation known
as Phaser.arriveAndDeregister() in Java (and maybe also
barrier::arrive_and_drop() in the C++ concurrency TS and Clock.drop()
in X10, not sure).

9.  I got rid of PARALLEL_KEY_EXECUTOR_NODE_NTH().  Previously I
wanted more than one reserved smh_toc key per executor node.  Robert
didn't like that.

10.  I got rid of "LeaderGate".  That earlier attempt at deadlock
avoidance clearly missed the mark.  (I think it probably defended
against the Gather + full TupleQueue deadlock but not the
GatherMerge-induced deadlock so it was a useless non-general
solution.)

11.  The previous incarnation of SharedTuplestore had a built-in
concept of partitions, which allowed the number of partitions to be
expanded any time but only allowed one partition to be read back in at
a time.  That worked for the earlier kind of sequential partition
processing but doesn't work for parallel partition processing.  I
chose to get rid of the built-in partitioning concept and create
separate SharedTuplestores for each batch, since I now have a place to
store per-batch information in memory.

12.  The previous incarnation of BufFileSet had a concept of "stripe"
used for distributing temporary files over tablespaces; this is now
gone from the API.  Files are distributed over your configured
temp_tablespaces without help from client code.

13.  For now the rescan optimisation (ie reusing the hash table) is
not enabled for Parallel Hash.  I had to provide
ExecHashJoinReInitializeDSM (for 41b0dd98), but haven't figured out
how to reach it yet and didn't want to wait any longer before posting
a new patch so that function is effectively blind code and will
probably require adjustments.

I've also attached a test that shows a worker starting up and
attaching at every phase.

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Tue, Oct 24, 2017 at 10:10 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> Here is an updated patch set that does that ^.

It's a bit hard to understand what's going on with the v21 patch set I
posted yesterday because EXPLAIN ANALYZE doesn't tell you anything
interesting.  Also, if you apply the multiplex_gather patch[1] I
posted recently and set multiplex_gather to off then it doesn't tell
you anything at all, because the leader has no hash table (I suppose
that could happen with unpatched master given sufficiently bad
timing).  Here's a new version with an extra patch that adds some
basic information about load balancing to EXPLAIN ANALYZE, inspired by
what commit bf11e7ee did for Sort.

Example output:

enable_parallel_hash = on, multiplex_gather = on:

 ->  Parallel Hash (actual rows=1000000 loops=3)
       Buckets: 131072  Batches: 16
       Leader:    Shared Memory Usage: 3552kB  Hashed: 396120  Batches Probed: 7
       Worker 0:  Shared Memory Usage: 3552kB  Hashed: 276640  Batches Probed: 6
       Worker 1:  Shared Memory Usage: 3552kB  Hashed: 327240  Batches Probed: 6
       ->  Parallel Seq Scan on simple s (actual rows=333333 loops=3)

 ->  Parallel Hash (actual rows=10000000 loops=8)
       Buckets: 131072  Batches: 256
       Leader:    Shared Memory Usage: 2688kB  Hashed: 1347720
Batches Probed: 36
       Worker 0:  Shared Memory Usage: 2688kB  Hashed: 1131360
Batches Probed: 33
       Worker 1:  Shared Memory Usage: 2688kB  Hashed: 1123560
Batches Probed: 38
       Worker 2:  Shared Memory Usage: 2688kB  Hashed: 1231920
Batches Probed: 38
       Worker 3:  Shared Memory Usage: 2688kB  Hashed: 1272720
Batches Probed: 34
       Worker 4:  Shared Memory Usage: 2688kB  Hashed: 1234800
Batches Probed: 33
       Worker 5:  Shared Memory Usage: 2688kB  Hashed: 1294680
Batches Probed: 37
       Worker 6:  Shared Memory Usage: 2688kB  Hashed: 1363240
Batches Probed: 35
       ->  Parallel Seq Scan on big s2 (actual rows=1250000 loops=8)

enable_parallel_hash = on, multiplex_gather = off (ie no leader participation):

 ->  Parallel Hash (actual rows=1000000 loops=2)
       Buckets: 131072  Batches: 16
       Worker 0:  Shared Memory Usage: 3520kB  Hashed: 475920  Batches Probed: 9
       Worker 1:  Shared Memory Usage: 3520kB  Hashed: 524080  Batches Probed: 8
       ->  Parallel Seq Scan on simple s (actual rows=500000 loops=2)

enable_parallel_hash = off, multiplex_gather = on:

 ->  Hash (actual rows=1000000 loops=3)
       Buckets: 131072  Batches: 16
       Leader:    Memory Usage: 3227kB
       Worker 0:  Memory Usage: 3227kB
       Worker 1:  Memory Usage: 3227kB
       ->  Seq Scan on simple s (actual rows=1000000 loops=3)

enable_parallel_hash = off, multiplex_gather = off:

 ->  Hash (actual rows=1000000 loops=2)
       Buckets: 131072  Batches: 16
       Worker 0:  Memory Usage: 3227kB
       Worker 1:  Memory Usage: 3227kB
       ->  Seq Scan on simple s (actual rows=1000000 loops=2)

parallelism disabled (traditional single-line output, unchanged):

 ->  Hash (actual rows=1000000 loops=1)
       Buckets: 131072  Batches: 16  Memory Usage: 3227kB
       ->  Seq Scan on simple s (actual rows=1000000 loops=1)

(It actually says "Tuples Hashed", not "Hashed" but I edited the above
to fit on a standard punchcard.)  Thoughts?

[1] https://www.postgresql.org/message-id/CAEepm%3D2U%2B%2BLp3bNTv2Bv_kkr5NE2pOyHhxU%3DG0YTa4ZhSYhHiw%40mail.gmail.com

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Rushabh Lathia
Date:
While re-basing the parallel-B-tree-index-build patch on top v22 patch
sets, found cosmetic review:

1) BufFileSetEstimate is removed but it's still into buffile.h

+extern size_t BufFileSetEstimate(int stripes);


2) BufFileSetCreate is renamed with BufFileSetInit, but used at below
place in comments:

* Attach to a set of named BufFiles that was created with BufFileSetCreate.

Thanks,

On Wed, Oct 25, 2017 at 11:33 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote:
On Tue, Oct 24, 2017 at 10:10 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> Here is an updated patch set that does that ^.

It's a bit hard to understand what's going on with the v21 patch set I
posted yesterday because EXPLAIN ANALYZE doesn't tell you anything
interesting.  Also, if you apply the multiplex_gather patch[1] I
posted recently and set multiplex_gather to off then it doesn't tell
you anything at all, because the leader has no hash table (I suppose
that could happen with unpatched master given sufficiently bad
timing).  Here's a new version with an extra patch that adds some
basic information about load balancing to EXPLAIN ANALYZE, inspired by
what commit bf11e7ee did for Sort.

Example output:

enable_parallel_hash = on, multiplex_gather = on:

 ->  Parallel Hash (actual rows=1000000 loops=3)
       Buckets: 131072  Batches: 16
       Leader:    Shared Memory Usage: 3552kB  Hashed: 396120  Batches Probed: 7
       Worker 0:  Shared Memory Usage: 3552kB  Hashed: 276640  Batches Probed: 6
       Worker 1:  Shared Memory Usage: 3552kB  Hashed: 327240  Batches Probed: 6
       ->  Parallel Seq Scan on simple s (actual rows=333333 loops=3)

 ->  Parallel Hash (actual rows=10000000 loops=8)
       Buckets: 131072  Batches: 256
       Leader:    Shared Memory Usage: 2688kB  Hashed: 1347720
Batches Probed: 36
       Worker 0:  Shared Memory Usage: 2688kB  Hashed: 1131360
Batches Probed: 33
       Worker 1:  Shared Memory Usage: 2688kB  Hashed: 1123560
Batches Probed: 38
       Worker 2:  Shared Memory Usage: 2688kB  Hashed: 1231920
Batches Probed: 38
       Worker 3:  Shared Memory Usage: 2688kB  Hashed: 1272720
Batches Probed: 34
       Worker 4:  Shared Memory Usage: 2688kB  Hashed: 1234800
Batches Probed: 33
       Worker 5:  Shared Memory Usage: 2688kB  Hashed: 1294680
Batches Probed: 37
       Worker 6:  Shared Memory Usage: 2688kB  Hashed: 1363240
Batches Probed: 35
       ->  Parallel Seq Scan on big s2 (actual rows=1250000 loops=8)

enable_parallel_hash = on, multiplex_gather = off (ie no leader participation):

 ->  Parallel Hash (actual rows=1000000 loops=2)
       Buckets: 131072  Batches: 16
       Worker 0:  Shared Memory Usage: 3520kB  Hashed: 475920  Batches Probed: 9
       Worker 1:  Shared Memory Usage: 3520kB  Hashed: 524080  Batches Probed: 8
       ->  Parallel Seq Scan on simple s (actual rows=500000 loops=2)

enable_parallel_hash = off, multiplex_gather = on:

 ->  Hash (actual rows=1000000 loops=3)
       Buckets: 131072  Batches: 16
       Leader:    Memory Usage: 3227kB
       Worker 0:  Memory Usage: 3227kB
       Worker 1:  Memory Usage: 3227kB
       ->  Seq Scan on simple s (actual rows=1000000 loops=3)

enable_parallel_hash = off, multiplex_gather = off:

 ->  Hash (actual rows=1000000 loops=2)
       Buckets: 131072  Batches: 16
       Worker 0:  Memory Usage: 3227kB
       Worker 1:  Memory Usage: 3227kB
       ->  Seq Scan on simple s (actual rows=1000000 loops=2)

parallelism disabled (traditional single-line output, unchanged):

 ->  Hash (actual rows=1000000 loops=1)
       Buckets: 131072  Batches: 16  Memory Usage: 3227kB
       ->  Seq Scan on simple s (actual rows=1000000 loops=1)

(It actually says "Tuples Hashed", not "Hashed" but I edited the above
to fit on a standard punchcard.)  Thoughts?

[1] https://www.postgresql.org/message-id/CAEepm%3D2U%2B%2BLp3bNTv2Bv_kkr5NE2pOyHhxU%3DG0YTa4ZhSYhHiw%40mail.gmail.com



--
Rushabh Lathia

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Fri, Oct 27, 2017 at 12:24 AM, Rushabh Lathia
<rushabh.lathia@gmail.com> wrote:
> While re-basing the parallel-B-tree-index-build patch on top v22 patch
> sets, found cosmetic review:

Thanks!

> 1) BufFileSetEstimate is removed but it's still into buffile.h
>
> +extern size_t BufFileSetEstimate(int stripes);

Fixed.

> 2) BufFileSetCreate is renamed with BufFileSetInit, but used at below
> place in comments:
>
> * Attach to a set of named BufFiles that was created with BufFileSetCreate.

Fixed.

Other minor tweaks: I fixed a harmless warning from Visual C++ and
added a CHECK_FOR_INTERRUPTS() to
ExecParallelHashJoinPartitionOuter()'s loop.

Two other changes:

1.  Improved concurrency for sharedtuplestore.c.

Last week I investigated some test failures on AppVeyor CI and
discovered a small problem with sharedtuplestore.c on Windows: it
could occasionally get ERROR_ACCESS_DENIED errors when attempting to
open files that were concurrently being unlinked (unlinking is not
atomic on Windows, see pgsql-bugs #14243 for another manifestation).
That code was a bit sloppy (though not incorrect), and was easy to fix
by doing some checks in a different order, but...

While hacking on that I realised that sharedtuplestore.c's parallel
scan efficiency was pretty terrible anyway, so I made an improvement
that I'd earlier threatened to make in a comment.  Instead of holding
a per-file lock while reading individual tuples, it now works in
page-multiple-sized chunks.  Readers only acquire a spinlock when they
need a new chunk, don't hold any locks while doing IO, and never read
overlapping pages.  From a user perspective, this means that EXPLAIN
(BUFFERS) temporary buffer read counts are approximately the same as
for the equivalent non-parallel hash join, because each worker reads a
disjoint set of temporary file pages instead of reading interleaved
tuples from the same pages, and there is no more LWLock "shared
tuplestore" that can show up in wait_event when backends pile up on
the same batch.  It writes very slightly more than it reads because of
unread pages at the end of the final chunk (because it reads back in
tuple-at-a-time and thus page-at-a-time, not whole chunk at a time --
I considered reading whole chunks and then returning pointer to
MinimalTuples in the chunk, but that required MAXALIGNing data in the
files on disk which made the files noticeably bigger).

So, to summarise, there are now three layers of contention avoidance
strategy being used by Parallel Hash Join for scanning batches in
parallel:

i)  Participants in a Parallel Hash Join try to work on different
batches so they avoid scanning the same SharedTuplestore completely.
That's visible with EXPLAIN ANALYZE as "Batches Probed" (that shows
the number of outer batches scanned; it doesn't seem worth the pixels
to show "Batches Loaded" for the number of inner batches scanned which
may be lower).

ii)  When that's not possible, they start out reading from different
backing files by starting with the one they wrote themselves and then
go around the full set.  That means they don't contend on the per-file
read-head lock until a reader drains a whole file and then choses
another one that's still being read by someone else.

iii)  [New in this version] Since they might still finish up reading
from the same file (and often do towards the end of a join), the
tuples are chopped into multi-page chunks and participants are
allocated chunks using a spinlock-protected counter.  This is quite a
lot like Parallel Sequential Scan, with some extra complications due
to variable sized chunks.

2.  Improved oversized tuple handling.

I added a new regression test case to exercise the oversized tuple
support in ExecParallelHashLoadTuple() and sts_puttuple(), as
requested by Andres a while back.  (Thanks to Andrew Gierth for a
pointer on how to get detoasted tuples into a hash join table without
disabling parallelism.)  While testing that I realised that my
defences against some degenerate behaviour with very small work_mem
weren't quite good enough.  Previously, I adjusted space_allowed to
make sure every backend could allocate at least one memory chunk
without triggering an increase in the number of batches.  Now, I leave
space_allowed alone but explicitly allow every backend to allocate at
least one chunk ignoring the memory budget (whether regular chunk size
or oversized tuple size), to avoid futile repeated batch increases
when a single monster tuple is never going to fit in work_mem.

A couple of stupid things outstanding:

1.  EXPLAIN ANALYZE for Parallel Hash "actual" shows the complete row
count, which is interesting to know (or not?  maybe I should show it
somewhere else?), but the costing shows the partial row estimate used
for costing purposes.
2.  The BufFileSet's temporary directory gets created even if you
don't need it for batches.  Duh.
3.  I don't have a good query rescan regression query yet.  I wish I
could write my own query plans to test the executor.

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Tue, Aug 1, 2017 at 9:28 AM, Andres Freund <andres@anarazel.de> wrote:
> On 2017-07-26 20:12:56 +1200, Thomas Munro wrote:
>> I'll report on performance separately.
>
> Looking forward to that ;)

Here are some experimental results from a Xeon E5-2695 v3 with a ton
of RAM and spinning disks (EDB lab server "scylla").  I used TPC-H
dbgen scale 10 with the additional indexes suggested by Tomas
Vondra[1].  10GB of source data (= 23GB pgdata dir) is obviously quite
small as these things go, and I hope we'll run some of these with a
much larger scale soon (it's a big job), but it's enough to runs
queries for tens of seconds to minutes so it's definitely in parallel
query territory and shows some pretty interesting effects IMHO.

First, here's a stupid self-join as a warm-up.  The query is SELECT
COUNT(*) FROM lineitem r JOIN lineitem s USING (l_orderkey,
l_linenumber), where lineitem is ~60 million rows.

(1) With work_mem set sky-high so no batching is required, how much
speed-up does each worker contribute with this feature off (= same as
unpatched master) and on?  In this table, each cell shows the speed-up
compared to w=0 (no workers):
parallel_hash |  w=0  |  w=1  |  w=2  |  w=3  |  w=4  |  w=5  |  w=6
---------------+-------+-------+-------+-------+-------+-------+-------off           | 1.00x | 1.42x | 1.66x | 1.76x |
1.66x| 1.68x | 1.69xon            | 1.00x | 1.76x | 2.54x | 3.21x | 3.80x | 4.30x | 4.79x
 

(2) With work_mem set to 128MB this query needs 32 batches.  Again,
how much speed-up does each worker contribute with the feature off and
on?
parallel_hash |  w=0  |  w=1  |  w=2  |  w=3  |  w=4  |  w=5  |  w=6
---------------+-------+-------+-------+-------+-------+-------+-------off           | 1.00x | 1.37x | 1.49x | 1.32x |
1.67x| 1.63x | 1.64xon            | 1.00x | 1.94x | 2.72x | 2.82x | 3.02x | 4.64x | 5.98x
 

I haven't tried to grok the shape of that curve yet.  Interestingly
(not shown here) the 32 batch parallel hash actually starts to beat
the single-batch parallel hash somewhere around 5-6 workers, and at 15
workers it achieves 9.53x speed-up compared to w=0 and is still
gaining as you add more workers, whereas the single-batch version tops
out around 8 workers.  This may be in part due to the trade-offs
discussed in "Design and Evaluation of Main Memory Hash Join
Algorithms for Multi-core CPUs" (short version: partitioning up front
can pay off by reducing cache misses at various levels and some
research databases would consider that), but I would think we're
probably pretty far away from that frontier and there other probably
other more basic problems.  Investigation/profiling required.

Next, here are some numbers from the TPC-H queries.  I included only
queries where a Parallel Hash was selected by the planner.  I stopped
at w=6 because that's the highest number of workers the planner would
pick by default at that scale.  (If I'm getting the maths right, TPC-H
scale 300's lineitem table should inspire about 10 workers to get out
of bed; you get an extra worker each time a table triples in size.)

(3) What is the speed-up with enable_parallel_hash = on vs
enable_parallel_hash = off?  Here is the result table for various
numbers of workers, with work_mem set to 1GB.
query |  w=0  |  w=1  |  w=2  |  w=3  |  w=4  |  w=5  |  w=6
-------+-------+-------+-------+-------+-------+-------+-------    3 | 1.02x | 1.16x | 1.37x | 1.79x | 1.95x | 2.29x |
2.44x   5 | 1.03x | 1.15x | 1.20x | 1.44x | 1.95x | 2.05x | 1.34x    7 | 1.02x | 1.26x | 1.54x | 2.18x | 2.57x | 1.25x
|1.32x    8 | 1.00x | 1.56x | 1.49x | 1.47x | 1.40x | 0.55x | 0.55x    9 | 1.02x | 1.24x | 1.35x | 1.50x | 1.59x |
1.82x| 1.82x   10 | 1.02x | 1.16x | 1.19x | 1.44x | 1.51x | 1.75x | 1.83x   12 | 1.01x | 1.22x | 1.53x | 0.72x | 0.74x
|0.73x | 0.99x   14 | 1.00x | 1.08x | 1.18x | 1.33x | 1.41x | 1.54x | 1.52x   16 | 1.01x | 1.22x | 1.10x | 1.10x |
1.10x| 1.11x | 1.10x   18 | 0.99x | 1.07x | 1.05x | 0.99x | 0.99x | 0.99x | 1.03x   21 | 1.00x | 1.28x | 1.24x | 1.34x
|0.18x | 0.19x | 0.23x
 

Some commentary on the cases where the performance was apparently hurt
by the feature: for Q21 with w=3 workers and above with
enable_parallel_hash = off the planner switched from a hash join to a
nested loop and that turned out to be better, but with
enable_parallel_hash = on it didn't give up on hash join until there
were 6 workers.  Something similar happened with Q8 around 5 workers.
Q21 has some major cardinality estimation problems as discussed
elsewhere, and on this run I didn't think to apply the patch that
fixes (?) that.  In other words, as far as I can tell, all of those
are cases where there is possibly room for general planner improvement
outside this project: the point at which we flip from one plan type to
another moves around, not necessarily indicating a problem with
Parallel Hash as an executor node.  That isn't to say I'm not
interested in understanding the causes better and trying to fix them
if I can.

(4) The same comparison, with work_mem set to 128MB resulting in more batching:
query |  w=0  |  w=1  |  w=2  |  w=3  |  w=4  |  w=5  |  w=6
-------+-------+-------+-------+-------+-------+-------+-------    3 | 1.03x | 1.23x | 1.44x | 1.76x | 1.97x | 2.23x |
2.55x   5 | 1.01x | 1.07x | 1.25x | 1.44x | 1.79x | 2.05x | 1.31x    7 | 1.02x | 1.42x | 1.73x | 1.26x | 1.20x | 1.28x
|1.33x    8 | 1.01x | 1.57x | 1.51x | 1.49x | 1.41x | 0.55x | 0.52x    9 | 0.99x | 1.14x | 1.43x | 1.56x | 1.82x |
1.96x| 2.06x   10 | 1.02x | 1.08x | 1.24x | 1.38x | 1.51x | 1.54x | 1.65x   12 | 1.02x | 1.02x | 0.71x | 0.73x | 0.73x
|0.99x | 0.99x   14 | 1.03x | 1.06x | 1.19x | 1.37x | 1.59x | 1.58x | 1.59x   16 | 1.00x | 1.21x | 1.10x | 1.09x |
1.13x| 1.12x | 1.12x   18 | 0.98x | 1.22x | 1.28x | 1.21x | 1.10x | 0.98x | 0.95x   21 | 0.96x | 1.25x | 1.56x | 0.41x
|0.41x | 0.87x | 1.18x
 

Similar, with the inflection points moving around a bit.

(5) Another way to look at the data is to see how much speed-up each
new worker gives you with and without this feature, as I did for the
self-join above.  In this table, there are two lines for each query.
The first line shows the speed-up as we add more workers with
enable_parallel_hash = off (should be same as unpatched master), and
the second line shows the speed-up as we add more workers, with
enable_parallel_hash = on.
query |  w=0  |  w=1  |  w=2  |  w=3  |  w=4  |  w=5  |  w=6
-------+-------+-------+-------+-------+-------+-------+-------    3 | 1.00x | 1.60x | 2.00x | 2.07x | 2.27x | 2.23x |
2.39x   3 | 1.00x | 1.83x | 2.68x | 3.64x | 4.35x | 5.02x | 5.72x
 
-------+-------+-------+-------+-------+-------+-------+-------    5 | 1.00x | 1.58x | 2.14x | 2.36x | 2.30x | 2.57x |
8.68x   5 | 1.00x | 1.75x | 2.49x | 3.29x | 4.34x | 5.09x | 11.28x
 
-------+-------+-------+-------+-------+-------+-------+-------    7 | 1.00x | 1.44x | 1.75x | 1.61x | 1.67x | 4.02x |
4.35x   7 | 1.00x | 1.78x | 2.66x | 3.44x | 4.24x | 4.93x | 5.64x
 
-------+-------+-------+-------+-------+-------+-------+-------    8 | 1.00x | 1.19x | 1.28x | 1.31x | 1.36x | 3.30x |
3.34x   8 | 1.00x | 1.85x | 1.90x | 1.93x | 1.91x | 1.82x | 1.85x
 
-------+-------+-------+-------+-------+-------+-------+-------    9 | 1.00x | 1.59x | 2.19x | 2.52x | 2.81x | 2.76x |
2.74x   9 | 1.00x | 1.94x | 2.88x | 3.69x | 4.38x | 4.92x | 4.89x
 
-------+-------+-------+-------+-------+-------+-------+-------   10 | 1.00x | 1.45x | 1.92x | 2.19x | 2.36x | 2.28x |
2.49x  10 | 1.00x | 1.65x | 2.25x | 3.10x | 3.48x | 3.91x | 4.48x
 
-------+-------+-------+-------+-------+-------+-------+-------   12 | 1.00x | 1.50x | 1.76x | 4.71x | 5.66x | 6.61x |
7.61x  12 | 1.00x | 1.81x | 2.65x | 3.36x | 4.14x | 4.78x | 7.43x
 
-------+-------+-------+-------+-------+-------+-------+-------   14 | 1.00x | 1.40x | 1.68x | 1.86x | 1.97x | 1.95x |
1.95x  14 | 1.00x | 1.50x | 1.98x | 2.47x | 2.76x | 2.98x | 2.95x
 
-------+-------+-------+-------+-------+-------+-------+-------   16 | 1.00x | 1.01x | 1.25x | 1.31x | 1.35x | 1.38x |
1.39x  16 | 1.00x | 1.22x | 1.36x | 1.43x | 1.47x | 1.53x | 1.53x
 
-------+-------+-------+-------+-------+-------+-------+-------   18 | 1.00x | 0.86x | 0.93x | 1.08x | 1.11x | 1.22x |
1.15x  18 | 1.00x | 0.93x | 0.98x | 1.08x | 1.11x | 1.22x | 1.20x
 
-------+-------+-------+-------+-------+-------+-------+-------   21 | 1.00x | 1.12x | 0.49x | 0.59x | 5.10x | 5.67x |
5.18x  21 | 1.00x | 1.44x | 0.62x | 0.80x | 0.95x | 1.08x | 1.22x
 

[1] https://github.com/tvondra/pg_tpch/blob/master/dss/tpch-index.sql

-- 
Thomas Munro
http://www.enterprisedb.com


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Mon, Oct 30, 2017 at 1:49 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> A couple of stupid things outstanding:
>
> 1.  EXPLAIN ANALYZE for Parallel Hash "actual" shows the complete row
> count, which is interesting to know (or not?  maybe I should show it
> somewhere else?), but the costing shows the partial row estimate used
> for costing purposes.

Fixed.

> 2.  The BufFileSet's temporary directory gets created even if you
> don't need it for batches.  Duh.

Fixed.

I also refactored shared temporary files a bit while looking into
this.  The shared file ownership mechanism is now promoted to its own
translation unit sharedfileset.c and it now works with fd.c files.
buffile.c can still make use of it.  That seems like a better division
of labour.

> 3.  I don't have a good query rescan regression query yet.  I wish I
> could write my own query plans to test the executor.

I found a query that rescans a parallel-aware hash join and added a
couple of variants to the regression tests.

I also decluttered the EXPLAIN ANALYZE output for enable_parallel_hash
= off a bit.

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

Here's a review of v24

+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+delete from bigger_than_it_looks where id <= 19000;
+vacuum bigger_than_it_looks;
+analyze bigger_than_it_looks;
+insert into bigger_than_it_looks
+  select generate_series(1, 19000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';

It seems kinda easier to just manipulate ndistinct and reltuples...


+set max_parallel_workers_per_gather = 0;
+set work_mem = '4MB';

I hope there's a fair amount of slop here - with different archs you're
going to see quite some size differences.

+-- The "good" case: batches required, but we plan the right number; we
+-- plan for 16 batches, and we stick to that number, and peak memory
+-- usage says within our work_mem budget
+-- non-parallel
+set max_parallel_workers_per_gather = 0;
+set work_mem = '128kB';

So how do we know that's actually the case we're testing rather than
something arbitrarily different? There's IIRC tests somewhere that just
filter the json explain output to the right parts...





+/*
+ * Build the name for a given segment of a given BufFile.
+ */
+static void
+MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
+{
+    snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
+}

Not a fan of this name - you're not "making" a filename here (as in
allocating or such). I think I'd just remove the Make prefix.



+/*
+ * Open a file that was previously created in another backend with
+ * BufFileCreateShared in the same SharedFileSet using the same name.  The
+ * backend that created the file must have called BufFileClose() or
+ * BufFileExport() to make sure that it is ready to be opened by other
+ * backends and render it read-only.
+ */

Is it actually guaranteed that it's another backend / do we rely on
that?

+BufFile *
+BufFileOpenShared(SharedFileSet *fileset, const char *name)
+{

+    /*
+     * If we didn't find any files at all, then no BufFile exists with this
+     * tag.
+     */
+    if (nfiles == 0)
+        return NULL;

s/taag/name/?


+/*
+ * Delete a BufFile that was created by BufFileCreateShared in the given
+ * SharedFileSet using the given name.
+ *
+ * It is not necessary to delete files explicitly with this function.  It is
+ * provided only as a way to delete files proactively, rather than waiting for
+ * the SharedFileSet to be cleaned up.
+ *
+ * Only one backend should attempt to delete a given name, and should know
+ * that it exists and has been exported or closed.
+ */
+void
+BufFileDeleteShared(SharedFileSet *fileset, const char *name)
+{
+    char        segment_name[MAXPGPATH];
+    int            segment = 0;
+    bool        found = false;
+
+    /*
+     * We don't know how many segments the file has.  We'll keep deleting
+     * until we run out.  If we don't manage to find even an initial segment,
+     * raise an error.
+     */
+    for (;;)
+    {
+        MakeSharedSegmentName(segment_name, name, segment);
+        if (!SharedFileSetDelete(fileset, segment_name, true))
+            break;
+        found = true;
+        ++segment;
+    }

Hm. Do we properly delete all the files via the resowner mechanism if
this fails midway? I.e. if there are no leading segments? Also wonder if
this doesn't need a CFI check.

+void
+PathNameCreateTemporaryDir(const char *basedir, const char *directory)
+{
+    if (mkdir(directory, S_IRWXU) < 0)
+    {
+        if (errno == EEXIST)
+            return;
+
+        /*
+         * Failed.  Try to create basedir first in case it's missing. Tolerate
+         * ENOENT to close a race against another process following the same
+         * algorithm.
+         */
+        if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT)
+            elog(ERROR, "cannot create temporary directory \"%s\": %m",
+                 basedir);

ENOENT or EEXIST?



+File
+PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
+{
+    File        file;
+
+    /*
+     * Open the file.  Note: we don't use O_EXCL, in case there is an orphaned
+     * temp file that can be reused.
+     */
+    file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY);
+    if (file <= 0)
+    {
+        if (error_on_failure)
+            elog(ERROR, "could not create temporary file \"%s\": %m", path);
+        else
+            return file;
+    }
+
+    /* Mark it for temp_file_limit accounting. */
+    VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
+
+    /*
+     * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we still
+     * want to make sure they get closed at end of xact.
+     */
+    ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+    ResourceOwnerRememberFile(CurrentResourceOwner, file);
+    VfdCache[file].resowner = CurrentResourceOwner;

So maybe I'm being pedantic here, but wouldn't the right order be to do
ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
allocating operation, so it can fail, which'd leak the file.

+/*
+ * Open a file that was created with PathNameCreateTemporaryFile, possibly in
+ * another backend.  Files opened this way don't count agains the

s/agains/against/

+ * temp_file_limit of the caller, are read-only and are automatically closed
+ * at the end of the transaction but are not deleted on close.
+ */
+File
+PathNameOpenTemporaryFile(const char *path)
+{
+    File        file;
+
+    /* We open the file read-only. */
+    file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+    /* If no such file, then we don't raise an error. */
+    if (file <= 0 && errno != ENOENT)
+        elog(ERROR, "could not open temporary file \"%s\": %m", path);
+
+    if (file > 0)
+    {
+        /*
+         * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we
+         * still want to make sure they get closed at end of xact.
+         */
+        ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+        ResourceOwnerRememberFile(CurrentResourceOwner, file);
+        VfdCache[file].resowner = CurrentResourceOwner;

Same complaint as above, ResourceOwnerEnlargeFiles() should be done
earlier.


+/*
+ * Delete a file by pathname.  Return true if the file existed, false if
+ * didn't.
+ */
+bool
+PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
+{
+    struct stat filestats;
+    int            stat_errno;
+
+    /* Get the final size for pgstat reporting. */
+    if (stat(path, &filestats) != 0)
+        stat_errno = errno;
+    else
+        stat_errno = 0;
+
+    /*
+     * Unlike FileClose's automatic file deletion code, we tolerate
+     * non-existence to support BufFileDeleteShared which doesn't know how
+     * many segments it has to delete until it runs out.
+     */
+    if (stat_errno == ENOENT)
+        return false;
+
+    if (unlink(path) < 0)
+    {
+        if (errno != ENOENT)
+            elog(error_on_failure ? ERROR : LOG,
+                 "cannot unlink temporary file \"%s\": %m", path);
+        return false;
+    }
+
+    if (stat_errno == 0)
+        ReportTemporaryFileUsage(path, filestats.st_size);
+    else
+    {
+        errno = stat_errno;
+        elog(LOG, "could not stat file \"%s\": %m", path);
+    }

All these messages are "not expected to ever happen" ones, right?

+    return true;
+}
+/* * close a file when done with it */
@@ -1537,10 +1747,17 @@ FileClose(File file)        Delete(file);    }
+    if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
+    {
+        /* Subtract its size from current usage (do first in case of error) */
+        temporary_files_size -= vfdP->fileSize;
+        vfdP->fileSize = 0;
+    }

So, is it right to do so unconditionally and without regard for errors?
If the file isn't deleted, it shouldn't be subtracted from fileSize. I
guess you're managing that through the flag, but that's not entirely
obvious.

diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
new file mode 100644
index 00000000000..6da80838b37
--- /dev/null
+++ b/src/backend/storage/file/sharedfileset.c
@@ -0,0 +1,240 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedfileset.c
+ *      Shared temporary file management.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *      src/backend/storage/file/sharedfileset.c
+ *
+ *-------------------------------------------------------------------------
+ */

A slightly bigger comment wouldn't hurt.



+/*
+ * Attach to a set of directories that was created with SharedFileSetInit.
+ */
+void
+SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
+{
+    bool        success;
+
+    SpinLockAcquire(&fileset->mutex);
+    if (fileset->refcnt == 0)
+        success = false;

I've not read finished reading through this, but is this safe? If the
segment's gone, is the spinlock guaranteed to still be a spinlock?  I
suspect this isn't a problem because just the underlying data is
removed, but the SharedFileSet stays alive?

+static void
+GetSharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace)
+{
+    char        tempdirpath[MAXPGPATH];
+    
+    GetTempTablespacePath(tempdirpath, tablespace);
+    snprintf(path, MAXPGPATH, "%s/%s%d.%d.sharedfileset" PG_TEMP_SUBDIR_SUFFIX,
+             tempdirpath, PG_TEMP_FILE_PREFIX,
+             fileset->creator_pid, fileset->number);
+}

+/*
+ * Sorting hat to determine which tablespace a given shared temporary file
+ * belongs in.
+ */
+static Oid
+ChooseTablespace(const SharedFileSet *fileset, const char *name)
+{
+    uint32        hash = hash_any((const unsigned char *) name, strlen(name));
+
+    return fileset->tablespaces[hash % fileset->ntablespaces];
+}

Hm. I wonder if just round-robin through these isn't a better approach.


+/*
+ * Compute the full path of a file in a SharedFileSet.
+ */
+static void
+GetSharedFilePath(char *path, SharedFileSet *fileset, const char *name)
+{
+    char        dirpath[MAXPGPATH];
+
+    GetSharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
+    snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s", dirpath, name);
+}
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 4c35ccf65eb..8b91d5a6ebe 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,                PrintRelCacheLeakWarning(res);
       RelationClose(res);        }
 
-
-        /* Ditto for dynamic shared memory segments */
-        while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
-        {
-            dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
-
-            if (isCommit)
-                PrintDSMLeakWarning(res);
-            dsm_detach(res);
-        }    }    else if (phase == RESOURCE_RELEASE_LOCKS)    {
@@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,                PrintFileLeakWarning(res);
   FileClose(res);        }
 
+
+        /* Ditto for dynamic shared memory segments */
+        while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
+        {
+            dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
+
+            if (isCommit)
+                PrintDSMLeakWarning(res);
+            dsm_detach(res);
+        }    }

Is that entirely unproblematic? Are there any DSM callbacks that rely on
locks still being held? Please split this part into a separate commit
with such analysis.



+/* The initial size of chunks in pages. */
+#define STS_MIN_CHUNK_PAGES 4

Could use quick description at how you've arrived at that specific
value.


+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+    int            npages;            /* Size of this chunk in BLCKSZ pages. */
+    int            ntuples;        /* Number of tuples in this chunk. */
+    char        data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;
+
+/* Per-participant shared state. */
+typedef struct SharedTuplestoreParticipant
+{
+    slock_t        mutex;
+    BlockNumber    read_page;        /* Page number for next read. */
+    BlockNumber    npages;            /* Number of pages written. */
+    bool        writing;        /* Used only for assertions. */
+
+    /*
+     * We need variable sized chunks, because we might be asked to store
+     * gigantic tuples.  To avoid the locking contention that would come from
+     * reading chunk sizes from disk, we store the chunk size for ranges of
+     * the file in a compact format in memory.  chunk_pages starts out at
+     * STS_MIN_CHUNK_PAGES and then doubles each time we reach a page listed
+     * in chunk_expansion_log.
+     */
+    BlockNumber    chunk_expansion_log[sizeof(BlockNumber) * CHAR_BIT];
+    int            chunk_expansions;
+    int            chunk_expansion;
+    int            chunk_pages;

This needs more explanation.

+/*
+ * Initialize a SharedTuplestore in existing shared memory.  There must be
+ * space for sts_estimate(participants) bytes.  If flags is set to the value
+ * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
+ * eagerly (but this isn't yet implemented).

s/iset set to the value/includes the value/ - otherwise it's not really
a flags argument.


+ * Tuples that are stored may optionally carry a piece of fixed sized
+ * meta-data which will be retrieved along with the tuple.  This is useful for
+ * the hash codes used for multi-batch hash joins, but could have other
+ * applications.

"hash codes"?



+/*
+ * Prepare to rescan.  Only participant should call this.  After it returns,
+ * all participants should call sts_begin_parallel_scan() and then loop over
+ * sts_parallel_scan_next().
+ */

s/should/may/?  Also maybe document what happens with in-progress reads
(or rather them not being allowed to exist)?


+/*
+ * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+             MinimalTuple tuple)
+{

+    /* Do we have space? */
+    size = accessor->sts->meta_data_size + tuple->t_len;
+    if (accessor->write_pointer + size >= accessor->write_end)
+    {
+        /* Try flushing to see if that creates enough space. */
+        if (accessor->write_chunk != NULL)
+            sts_flush_chunk(accessor);
+
+        /*
+         * It may still not be enough in the case of a gigantic tuple, or if
+         * we haven't created a chunk buffer at all yet.
+         */
+        if (accessor->write_pointer + size >= accessor->write_end)
+        {
+            SharedTuplestoreParticipant *participant;
+            size_t    space_needed;
+            int        pages_needed;
+
+            /* How many pages to hold this data and the chunk header? */
+            space_needed = offsetof(SharedTuplestoreChunk, data) + size;
+            pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
+            pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
+
+            /*
+             * Double the chunk size until it's big enough, and record that
+             * fact in the shared expansion log so that readers know about it.
+             */
+            participant = &accessor->sts->participants[accessor->participant];
+            while (accessor->write_pages < pages_needed)
+            {
+                accessor->write_pages *= 2;
+                participant->chunk_expansion_log[participant->chunk_expansions++] =
+                    accessor->write_page;
+            }

Hm. Isn't that going to be pretty unfunny if you have one large and a
lot of small tuples?


+            /* Create the output buffer. */
+            if (accessor->write_chunk != NULL)
+                pfree(accessor->write_chunk);
+            accessor->write_chunk = (SharedTuplestoreChunk *)
+                palloc0(accessor->write_pages * BLCKSZ);

Are we guaranteed to be in a long-lived memory context here?


+/*
+ * Get the next tuple in the current parallel scan.
+ */
+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+    SharedTuplestoreParticipant *p;
+    BlockNumber    read_page;
+    int            chunk_pages;
+    bool        eof;
+
+    for (;;)
+    {
+        /* Can we read more tuples from the current chunk? */
+        if (likely(accessor->read_ntuples < accessor->read_ntuples_available))
+            return sts_read_tuple(accessor, meta_data);

I'm not convinced this is a good use of likely/unlikely (not biased and
not performance critical enough).

+        /* Find the location of a new chunk to read. */
+        p = &accessor->sts->participants[accessor->read_participant];
+
+        SpinLockAcquire(&p->mutex);
+        eof = p->read_page >= p->npages;
+        if (!eof)
+        {
+            /*
+             * Figure out how big this chunk is.  It will almost always be the
+             * same as the last chunk loaded, but if there is one or more
+             * entry in the chunk expansion log for this page then we know
+             * that it doubled that number of times.  This avoids the need to
+             * do IO to adjust the read head, so we don't need to hold up
+             * concurrent readers.  (An alternative to this extremely rarely
+             * run loop would be to use more space storing the new size in the
+             * log so we'd have 'if' instead of 'while'.)
+             */
+            read_page = p->read_page;
+            while (p->chunk_expansion < p->chunk_expansions &&
+                   p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
+            {
+                p->chunk_pages *= 2;
+                p->chunk_expansion++;
+            }
+            chunk_pages = p->chunk_pages;
+
+            /* The next reader will start after this chunk. */
+            p->read_page += chunk_pages;
+        }
+        SpinLockRelease(&p->mutex);

This looks more like the job of an lwlock rather than a spinlock.



+/*
+ * Create the name used for our shared BufFiles.
+ */
+static void
+make_name(char *name, SharedTuplestoreAccessor *accessor, int participant)
+{
+    snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
+}

Name's a bit generic. And it's still not really making ;)


Going to buy some groceries and then look at the next patches.

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Peter Geoghegan
Date:
On Tue, Nov 7, 2017 at 1:01 PM, Andres Freund <andres@anarazel.de> wrote:
> +/*
> + * Build the name for a given segment of a given BufFile.
> + */
> +static void
> +MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
> +{
> +       snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
> +}
>
> Not a fan of this name - you're not "making" a filename here (as in
> allocating or such). I think I'd just remove the Make prefix.

+1

Can we document the theory behind file naming here, if that isn't in
the latest version? This is a routine private to parallel hash join
(or shared tuplestore), not Buffile. Maybe Buffile should have some
opinion on this, though. Just as a matter of style.

> +/*
> + * Delete a BufFile that was created by BufFileCreateShared in the given
> + * SharedFileSet using the given name.
> + *
> + * It is not necessary to delete files explicitly with this function.  It is
> + * provided only as a way to delete files proactively, rather than waiting for
> + * the SharedFileSet to be cleaned up.
> + *
> + * Only one backend should attempt to delete a given name, and should know
> + * that it exists and has been exported or closed.
> + */

This part is new to me. We now want one backend to delete a given
filename. What changed? Please provide a Message-Id reference if that
will help me to understand.

For now, I'm going to guess that this development had something to do
with the need to deal with virtual FDs that do a close() on an FD to
keep under backend limits. Do I have that right?

> +       /*
> +        * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we still
> +        * want to make sure they get closed at end of xact.
> +        */
> +       ResourceOwnerEnlargeFiles(CurrentResourceOwner);
> +       ResourceOwnerRememberFile(CurrentResourceOwner, file);
> +       VfdCache[file].resowner = CurrentResourceOwner;
>
> So maybe I'm being pedantic here, but wouldn't the right order be to do
> ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
> allocating operation, so it can fail, which'd leak the file.

I remember going to pains to get this right with my own unifiable
BufFile concept. I'm going to wait for an my question about file
deletion + resowners before commenting further, though.

> +       if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
> +       {
> +               /* Subtract its size from current usage (do first in case of error) */
> +               temporary_files_size -= vfdP->fileSize;
> +               vfdP->fileSize = 0;
> +       }
>
> So, is it right to do so unconditionally and without regard for errors?
> If the file isn't deleted, it shouldn't be subtracted from fileSize. I
> guess you're managing that through the flag, but that's not entirely
> obvious.

I think that the problem here is that the accounting is expected to
always work. It's not like there is a resowner style error path in
which temporary_files_size gets reset to 0.

> Is that entirely unproblematic? Are there any DSM callbacks that rely on
> locks still being held? Please split this part into a separate commit
> with such analysis.

I was always confused about the proper use of DSM callbacks myself.
They seemed generally underdocumented.

> +                       /* Create the output buffer. */
> +                       if (accessor->write_chunk != NULL)
> +                               pfree(accessor->write_chunk);
> +                       accessor->write_chunk = (SharedTuplestoreChunk *)
> +                               palloc0(accessor->write_pages * BLCKSZ);
>
> Are we guaranteed to be in a long-lived memory context here?

I imagine that Thomas looked to tuplestore_begin_heap() + interXact as
a kind of precedent here. See comments above that function.

-- 
Peter Geoghegan


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Tue, Nov 7, 2017 at 4:01 PM, Andres Freund <andres@anarazel.de> wrote:
> +       ResourceOwnerEnlargeFiles(CurrentResourceOwner);
> +       ResourceOwnerRememberFile(CurrentResourceOwner, file);
> +       VfdCache[file].resowner = CurrentResourceOwner;
>
> So maybe I'm being pedantic here, but wouldn't the right order be to do
> ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
> allocating operation, so it can fail, which'd leak the file.

That's not pedantic ... that's a very sound criticism.  IMHO, anyway.

> diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
> index 4c35ccf65eb..8b91d5a6ebe 100644
> --- a/src/backend/utils/resowner/resowner.c
> +++ b/src/backend/utils/resowner/resowner.c
> @@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
>                                 PrintRelCacheLeakWarning(res);
>                         RelationClose(res);
>                 }
> -
> -               /* Ditto for dynamic shared memory segments */
> -               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
> -               {
> -                       dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
> -
> -                       if (isCommit)
> -                               PrintDSMLeakWarning(res);
> -                       dsm_detach(res);
> -               }
>         }
>         else if (phase == RESOURCE_RELEASE_LOCKS)
>         {
> @@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
>                                 PrintFileLeakWarning(res);
>                         FileClose(res);
>                 }
> +
> +               /* Ditto for dynamic shared memory segments */
> +               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
> +               {
> +                       dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
> +
> +                       if (isCommit)
> +                               PrintDSMLeakWarning(res);
> +                       dsm_detach(res);
> +               }
>         }
>
> Is that entirely unproblematic? Are there any DSM callbacks that rely on
> locks still being held? Please split this part into a separate commit
> with such analysis.

FWIW, I think this change is a really good idea (I recommended it to
Thomas at some stage, I think).  The current positioning was decided
by me at a very early stage of parallel query development where I
reasoned as follows (1) the first thing we're going to implement is
going to be parallel quicksort, (2) that's going to allocate a huge
amount of DSM, (3) therefore we should try to free it as early as
possible.  However, I now thing that was wrongheaded, and not just
because parallel quicksort didn't turn out to be the first thing we
developed.  Memory is the very last resource we should release when
aborting a transaction, because any other resource we have is tracked
using data structures that are stored in memory.  Throwing the memory
away before the end therefore makes life very difficult. That's why,
for backend-private memory, we clean up most everything else in
AbortTransaction() and then only destroy memory contexts in
CleanupTransaction().  This change doesn't go as far, but it's in the
same general direction, and I think rightly so.  My error was in
thinking that the primary use of memory would be for storing data, but
really it's about where you put your control structures.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Wed, Nov 8, 2017 at 10:32 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> On Tue, Nov 7, 2017 at 4:01 PM, Andres Freund <andres@anarazel.de> wrote:
>> diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
>> index 4c35ccf65eb..8b91d5a6ebe 100644
>> --- a/src/backend/utils/resowner/resowner.c
>> +++ b/src/backend/utils/resowner/resowner.c
>> @@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
>>                                 PrintRelCacheLeakWarning(res);
>>                         RelationClose(res);
>>                 }
>> -
>> -               /* Ditto for dynamic shared memory segments */
>> -               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
>> -               {
>> -                       dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
>> -
>> -                       if (isCommit)
>> -                               PrintDSMLeakWarning(res);
>> -                       dsm_detach(res);
>> -               }
>>         }
>>         else if (phase == RESOURCE_RELEASE_LOCKS)
>>         {
>> @@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
>>                                 PrintFileLeakWarning(res);
>>                         FileClose(res);
>>                 }
>> +
>> +               /* Ditto for dynamic shared memory segments */
>> +               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
>> +               {
>> +                       dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
>> +
>> +                       if (isCommit)
>> +                               PrintDSMLeakWarning(res);
>> +                       dsm_detach(res);
>> +               }
>>         }
>>
>> Is that entirely unproblematic? Are there any DSM callbacks that rely on
>> locks still being held? Please split this part into a separate commit
>> with such analysis.
>
> FWIW, I think this change is a really good idea (I recommended it to
> Thomas at some stage, I think).

Yeah, it was Robert's suggestion; I thought I needed *something* like
this but was hesitant for the niggling reason that Andres mentions:
what if someone somewhere (including code outside our source tree)
depends on this ordering because of unlocking etc?

At that time I thought that my clean-up logic wasn't going to work on
Windows without this reordering, because we were potentially closing
file handles after unlinking the files, and I was under the impression
that Windows wouldn't like that.  Since then I've learned that Windows
does actually allow it, but only if all file handles were opened with
the FILE_SHARE_DELETE flag.  We always do that (see src/port/open.c),
so in fact this change is probably not needed for my patch set (theory
not tested).  I will put it in a separate patch as requested by
Andres, because it's generally a good idea anyway for the reasons that
Robert explained (ie you probably always want to clean up memory last,
since it might contain the meta-data/locks/control objects/whatever
you'll need to clean up anything else).

-- 
Thomas Munro
http://www.enterprisedb.com


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
Hi Peter,

See responses to a couple of points below.  I'll respond to the other
points separately (ie with code/comment changes).

On Wed, Nov 8, 2017 at 10:32 AM, Peter Geoghegan <pg@bowt.ie> wrote:
> On Tue, Nov 7, 2017 at 1:01 PM, Andres Freund <andres@anarazel.de> wrote:
>> +/*
>> + * Delete a BufFile that was created by BufFileCreateShared in the given
>> + * SharedFileSet using the given name.
>> + *
>> + * It is not necessary to delete files explicitly with this function.  It is
>> + * provided only as a way to delete files proactively, rather than waiting for
>> + * the SharedFileSet to be cleaned up.
>> + *
>> + * Only one backend should attempt to delete a given name, and should know
>> + * that it exists and has been exported or closed.
>> + */
>
> This part is new to me. We now want one backend to delete a given
> filename. What changed? Please provide a Message-Id reference if that
> will help me to understand.
>
> For now, I'm going to guess that this development had something to do
> with the need to deal with virtual FDs that do a close() on an FD to
> keep under backend limits. Do I have that right?

No -- this is simply an option available to client code that wants to
clean up individual temporary files earlier.  Such client code is
responsible for meeting the synchronisation requirements described in
the comment, but it's entirely optional.  For example:
SharedTuplestore is backed by multiple files and it could take the
opportunity to delete individual files after they've been scanned, if
you told it you were going to scan only once
(SHARED_TUPLESTORE_SINGLE_PASS) and if it could prove that no other
backend could still need to read from it, as mentioned in a comment in
sts_end_parallel_scan().

However, since I changed to the page/chunk based model (spinlock while
advancing block counter, mimicking Parallel Seq Scan) instead of the
earlier Fisher Price version (LWLock while reading each tuple with a
shared read head maintained with tell/seek), I don't actually do that.
Hitting the end of the file no longer means that no one else is
reading from the file (someone else might still be reading from an
earlier chunk even though you've finished reading the final chunk in
the file, and the vfd systems means that they must be free to close
and reopen the file at any time).  In the current patch version the
files are cleaned up wholesale at two times: SharedFileSet cleanup
triggered by DSM destruction, and SharedFileSet reset triggered by
rescan.  Practically, it's always the former case.  It's vanishingly
rare that you'd actually want to be rescanning a Parallel Hash that
spills to disk but in that case we delete the old files and recreate,
and that case is tested in the regression tests.

If it bothers you that I have an API there that I'm not actually using
yet, I will remove it.

>> +       if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
>> +       {
>> +               /* Subtract its size from current usage (do first in case of error) */
>> +               temporary_files_size -= vfdP->fileSize;
>> +               vfdP->fileSize = 0;
>> +       }
>>
>> So, is it right to do so unconditionally and without regard for errors?
>> If the file isn't deleted, it shouldn't be subtracted from fileSize. I
>> guess you're managing that through the flag, but that's not entirely
>> obvious.
>
> I think that the problem here is that the accounting is expected to
> always work. It's not like there is a resowner style error path in
> which temporary_files_size gets reset to 0.

But there is a resowner error path in which File handles get
automatically closed and temporary_files_size gets adjusted.  The
counter goes up when you create, and goes down when you close or
resowner closes for you.  Eventually you either close and the
bookkeeping is consistent or you crash and it doesn't matter.  And
some kind of freak multiple close attempt is guarded against by
setting the files size to 0 so we can't double-subtract.  Do you see a
bug?

None of this has any impact on whether files are leaked: either
SharedFileSet removes the files, or you crash (or take a filesystem
snapshot, etc) and RemovePgTempFiles() mops them up at the next clean
startup.

-- 
Thomas Munro
http://www.enterprisedb.com


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,
* avoids wasting memory on duplicated hash tables* avoids wasting disk space on duplicated batch files* avoids wasting
CPUexecuting duplicate subplans
 

What's the last one referring to?





+static void
+MultiExecParallelHash(HashState *node)
+{

+    switch (BarrierPhase(build_barrier))
+    {
+        case PHJ_BUILD_ALLOCATING:
+
+            /*
+             * Either I just allocated the initial hash table in
+             * ExecHashTableCreate(), or someone else is doing that.  Either
+             * way, wait for everyone to arrive here so we can proceed, and
+             * then fall through.
+             */
+            BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);

Can you add a /* fallthrough */ comment here? Gcc is warning if you
don't. While we currently have lotsa other places not having the
annotation, it seem reasonable to have it in new code.


+        case PHJ_BUILD_HASHING_INNER:
+
+            /*
+             * It's time to begin hashing, or if we just arrived here then
+             * hashing is already underway, so join in that effort.  While
+             * hashing we have to be prepared to help increase the number of
+             * batches or buckets at any time, and if we arrived here when
+             * that was already underway we'll have to help complete that work
+             * immediately so that it's safe to access batches and buckets
+             * below.
+             */
+            if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
+                PHJ_GROW_BATCHES_ELECTING)
+                ExecParallelHashIncreaseNumBatches(hashtable);
+            if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
+                PHJ_GROW_BUCKETS_ELECTING)
+                ExecParallelHashIncreaseNumBuckets(hashtable);
+            ExecParallelHashEnsureBatchAccessors(hashtable);

"accessors" sounds a bit weird for a bunch of pointers, but maybe that's
just my ESL senses tingling wrongly.



/* ----------------------------------------------------------------
@@ -240,12 +427,15 @@ ExecEndHash(HashState *node) * ----------------------------------------------------------------
*/HashJoinTable
-ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
+ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)

+    /*
+     * Parallel Hash tries to use the combined work_mem of all workers to
+     * avoid the need to batch.  If that won't work, it falls back to work_mem
+     * per worker and tries to process batches in parallel.
+     */

One day we're going to need a better approach to this. I have no idea
how, but this per-node, and now per_node * max_parallelism, approach has
only implementation simplicity as its benefit.





+static HashJoinTuple
+ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
+                          dsa_pointer *shared)
+{

+static void
+ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
+{



+/*
+ * Get the first tuple in a given bucket identified by number.
+ */
+static HashJoinTuple
+ExecHashFirstTupleInBucket(HashJoinTable hashtable, int bucketno)
+{
+    if (hashtable->parallel_state)
+    {
+        dsa_pointer p =
+        dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);

Can you make this, and possibly a few other places, more readable by
introducing a temporary variable?


+/*
+ * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
+ */
+static void
+ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+                          HashJoinTuple tuple,
+                          dsa_pointer tuple_shared)
+{
+    do
+    {
+        tuple->next.shared = dsa_pointer_atomic_read(head);
+    } while (!dsa_pointer_atomic_compare_exchange(head,
+                                                  &tuple->next.shared,
+                                                  tuple_shared));
+}

This is hard to read.


+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets.  Their phases are as follows:
+ *
+ *   PHJ_GROW_BATCHES_ELECTING       -- initial state
+ *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
+ *   PHJ_GROW_BATCHES_REPARTITIONING -- all rep
s/rep/repartition/?
#include "access/htup_details.h"
+#include "access/parallel.h"#include "executor/executor.h"#include "executor/hashjoin.h"#include
"executor/nodeHash.h"#include"executor/nodeHashjoin.h"#include "miscadmin.h"
 
+#include "pgstat.h"#include "utils/memutils.h"
-
+#include "utils/sharedtuplestore.h"

deletes a separator newline./* ----------------------------------------------------------------
@@ -138,6 +236,18 @@ ExecHashJoin(PlanState *pstate)                    /* no chance to not build the hash table */
              node->hj_FirstOuterTupleSlot = NULL;                }
 
+                else if (hashNode->parallel_state != NULL)
+                {
+                    /*
+                     * The empty-outer optimization is not implemented for
+                     * shared hash tables, because no one participant can
+                     * determine that there are no outer tuples, and it's not
+                     * yet clear that it's worth the synchronization overhead
+                     * of reaching consensus to figure that out.  So we have
+                     * to build the hash table.
+                     */
+                    node->hj_FirstOuterTupleSlot = NULL;
+                }

Hm. Isn't MultiExecParallelHash already doing so?


-                node->hj_JoinState = HJ_NEED_NEW_OUTER;
+                if (hashtable->parallel_state)
+                {
+                    Barrier    *build_barrier;
+
+                    build_barrier = &hashtable->parallel_state->build_barrier;
+                    if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+                    {
+                        /*
+                         * If multi-batch, we need to hash the outer relation
+                         * up front.
+                         */
+                        if (hashtable->nbatch > 1)
+                            ExecParallelHashJoinPartitionOuter(node);
+                        BarrierArriveAndWait(build_barrier,
+                                             WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+                    }
+                    Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+
+                    /* Each backend should now select a batch to work on. */
+                    hashtable->curbatch = -1;
+                    node->hj_JoinState = HJ_NEED_NEW_BATCH;
+
+                    continue;
+                }
+                else
+                    node->hj_JoinState = HJ_NEED_NEW_OUTER;

You know what I'm going to say about all these branches, and sigh.

If we don't split this into two versions, we at least should store
hashNode->parallel_state in a local var, so the compiler doesn't have to
pull that out of memory after every external function call (of which
there are a lot). In common cases it'll end up in a callee saved
registers, and most of the called functions won't be too register
starved (on x86-64).



+/*
+ * Choose a batch to work on, and attach to it.  Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{


+                    /*
+                     * This batch is ready to probe.  Return control to
+                     * caller. We stay attached to batch_barrier so that the
+                     * hash table stays alive until everyone's finish probing

*finished?


+                case PHJ_BATCH_DONE:
+
+                    /*
+                     * Already done.  Detach and go around again (if any
+                     * remain).
+                     */
+                    BarrierDetach(batch_barrier);
+
+                    /*
+                     * We didn't work on this batch, but we need to observe
+                     * its size for EXPLAIN.
+                     */
+                    ExecParallelHashUpdateSpacePeak(hashtable, batchno);
+                    hashtable->batches[batchno].done = true;
+                    hashtable->curbatch = -1;
+                    break;

Hm, maybe I'm missing something, but why is it guaranteed that "we
didn't work on this batch"?



+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+    /*
+     * By the time ExecEndHashJoin runs in a worker, shared memory has been
+     * destroyed.  So this is our last chance to do any shared memory cleanup.
+     */

This comment doesn't really make much sense to me.


+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{

could use a header comment.



a) The executor side is starting to look good.
b) This is a lot of code.
c) I'm tired, planner has to wait till tomorrow.

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

@@ -747,7 +747,7 @@ try_hashjoin_path(PlannerInfo *root,     * never have any output pathkeys, per comments in
create_hashjoin_path.    */    initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
 
-                          outer_path, inner_path, extra);
+                          outer_path, inner_path, extra, false);    if (add_path_precheck(joinrel,
    workspace.startup_cost, workspace.total_cost,
 
@@ -761,6 +761,7 @@ try_hashjoin_path(PlannerInfo *root,                                      extra,
                 outer_path,                                      inner_path,
 
+                                      false,    /* parallel_hash */
extra->restrictlist,                                     required_outer,
hashclauses));
@@ -776,6 +777,10 @@ try_hashjoin_path(PlannerInfo *root, * try_partial_hashjoin_path *      Consider a partial
hashjoinjoin path; if it appears useful, push it into *      the joinrel's partial_pathlist via add_partial_path().
 
+ *      The outer side is partial.  If parallel_hash is true, then the inner path
+ *      must be partial and will be run in parallel to create one or more shared
+ *      hash tables; otherwise the inner path must be complete and a copy of it
+ *      is run in every process to create separate identical private hash tables. */

When do we have "or more shared hash tables" rather than one? Are you
thinking about subordinate nodes?


@@ -1839,6 +1846,10 @@ hash_inner_and_outer(PlannerInfo *root,         * able to properly guarantee uniqueness.
Similarly,we can't handle         * JOIN_FULL and JOIN_RIGHT, because they can produce false null         * extended
rows. Also, the resulting path must not be parameterized.
 
+         * We should be able to support JOIN_FULL and JOIN_RIGHT for Parallel
+         * Hash, since in that case we're back to a single hash table with a
+         * single set of match bits for each batch, but that will require
+         * figuring out a deadlock-free way to wait for the probe to finish.         */

s/should be able/would be able/?



index 6a45b68e5df..2d38a5efae8 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -451,7 +451,6 @@ BarrierDetachImpl(Barrier *barrier, bool arrive)        release = true;        barrier->arrived =
0;       ++barrier->phase;
 
-        Assert(barrier->selected);        barrier->selected = false;    }

Uh, what?




diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out
index 35523bd8065..40a076d976f 100644
--- a/src/test/regress/expected/join.out
+++ b/src/test/regress/expected/join.out
@@ -5821,6 +5821,9 @@ analyze extremely_skewed;insert into extremely_skewed  select 42 as id,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' from generate_series(1, 19000);
 
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);

I'm doubtful this is actually going to be a wide tuple - this'll get
compressed down quite a bit, no?

postgres[26465][1]=# SELECT octet_length(t), pg_column_size(t) FROM wide ;
┌──────────────┬────────────────┐
│ octet_length │ pg_column_size │
├──────────────┼────────────────┤
│       320000 │           3671 │
│       320000 │           3671 │
└──────────────┴────────────────┘
(2 rows)


(and yes, it's ridiculous that a compressed datum of that size still
takes up 3kb)



+-- parallel with parallel-aware hash join
+set max_parallel_workers_per_gather = 2;
+set work_mem = '128kB';
+set enable_parallel_hash = on;

I think it'd be better if we structured the file so we just sat guc's
with SET LOCAL inside a transaction.


+-- parallel with parallel-aware hash join
+set max_parallel_workers_per_gather = 2;
+set work_mem = '64kB';
+set enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                              QUERY PLAN                               
+-----------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on extremely_skewed s
+(9 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)

As written before, I think it'd be good if we extracted the number of
batches from json output to be sure things are going sensibly.


All-in-all this part looks fairly boring.


- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
Hi Andres and Peter,

Please see below for inline responses to your feedback.  New patch attached.

On Wed, Nov 8, 2017 at 10:01 AM, Andres Freund <andres@anarazel.de> wrote:
> +set min_parallel_table_scan_size = 0;
> +set parallel_setup_cost = 0;
> +-- Make a simple relation with well distributed keys and correctly
> +-- estimated size.
> +create table simple as
> +  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
> +alter table simple set (parallel_workers = 2);
> +analyze simple;
> +-- Make a relation whose size we will under-estimate.  We want stats
> +-- to say 1000 rows, but actually there are 20,000 rows.
> +create table bigger_than_it_looks as
> +  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
> +alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
> +alter table bigger_than_it_looks set (parallel_workers = 2);
> +delete from bigger_than_it_looks where id <= 19000;
> +vacuum bigger_than_it_looks;
> +analyze bigger_than_it_looks;
> +insert into bigger_than_it_looks
> +  select generate_series(1, 19000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
>
> It seems kinda easier to just manipulate ndistinct and reltuples...

Done.

> +set max_parallel_workers_per_gather = 0;
> +set work_mem = '4MB';
>
> I hope there's a fair amount of slop here - with different archs you're
> going to see quite some size differences.

Yeah, this is a problem I wrestled with.  See next.

> +-- The "good" case: batches required, but we plan the right number; we
> +-- plan for 16 batches, and we stick to that number, and peak memory
> +-- usage says within our work_mem budget
> +-- non-parallel
> +set max_parallel_workers_per_gather = 0;
> +set work_mem = '128kB';
>
> So how do we know that's actually the case we're testing rather than
> something arbitrarily different? There's IIRC tests somewhere that just
> filter the json explain output to the right parts...

Yeah, good idea.  My earlier attempts to dump out the hash join
dimensions ran into problems with architecture sensitivity and then
some run-to-run non-determinism in the parallel case (due to varying
fragmentation depending on how many workers get involved in time).
The attached version tells you about batch growth without reporting
the exact numbers, except in the "ugly" case where we know that there
is only one possible outcome because the extreme skew detector is
guaranteed to go off after the first nbatch increase (I got rid of all
other tuples except ones with the same key to make this true).

This exercise did reveal a bug in
0008-Show-hash-join-per-worker-information-in-EXPLAIN-ANA.patch
though: it is capturing shared instrumentation too soon in the
non-Parallel Hash case so the nbatch reported by EXPLAIN ANALYZE might
be too low if we grew while probing.  Oops.  Will post a fix for that.

> +/*
> + * Build the name for a given segment of a given BufFile.
> + */
> +static void
> +MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
> +{
> +       snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
> +}
>
> Not a fan of this name - you're not "making" a filename here (as in
> allocating or such). I think I'd just remove the Make prefix.

Done.  I also changed some similar code where I'd used GetXXX when
building paths.

> +/*
> + * Open a file that was previously created in another backend with
> + * BufFileCreateShared in the same SharedFileSet using the same name.  The
> + * backend that created the file must have called BufFileClose() or
> + * BufFileExport() to make sure that it is ready to be opened by other
> + * backends and render it read-only.
> + */
>
> Is it actually guaranteed that it's another backend / do we rely on
> that?

No, it could be any backend that is attached to the SharedFileSet,
including the current one.  Wording improved.

> +BufFile *
> +BufFileOpenShared(SharedFileSet *fileset, const char *name)
> +{
>
> +       /*
> +        * If we didn't find any files at all, then no BufFile exists with this
> +        * tag.
> +        */
> +       if (nfiles == 0)
> +               return NULL;
>
> s/taag/name/?

Fixed.

> +/*
> + * Delete a BufFile that was created by BufFileCreateShared in the given
> + * SharedFileSet using the given name.
> + *
> + * It is not necessary to delete files explicitly with this function.  It is
> + * provided only as a way to delete files proactively, rather than waiting for
> + * the SharedFileSet to be cleaned up.
> + *
> + * Only one backend should attempt to delete a given name, and should know
> + * that it exists and has been exported or closed.
> + */
> +void
> +BufFileDeleteShared(SharedFileSet *fileset, const char *name)
> +{
> +       char            segment_name[MAXPGPATH];
> +       int                     segment = 0;
> +       bool            found = false;
> +
> +       /*
> +        * We don't know how many segments the file has.  We'll keep deleting
> +        * until we run out.  If we don't manage to find even an initial segment,
> +        * raise an error.
> +        */
> +       for (;;)
> +       {
> +               MakeSharedSegmentName(segment_name, name, segment);
> +               if (!SharedFileSetDelete(fileset, segment_name, true))
> +                       break;
> +               found = true;
> +               ++segment;
> +       }
>
> Hm. Do we properly delete all the files via the resowner mechanism if
> this fails midway? I.e. if there are no leading segments? Also wonder if
> this doesn't need a CFI check.

The resowner mechanism recursively deletes everything, so order
doesn't matter here.  CFI added.

> +void
> +PathNameCreateTemporaryDir(const char *basedir, const char *directory)
> +{
> +       if (mkdir(directory, S_IRWXU) < 0)
> +       {
> +               if (errno == EEXIST)
> +                       return;
> +
> +               /*
> +                * Failed.  Try to create basedir first in case it's missing. Tolerate
> +                * ENOENT to close a race against another process following the same
> +                * algorithm.
> +                */
> +               if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT)
> +                       elog(ERROR, "cannot create temporary directory \"%s\": %m",
> +                                basedir);
>
> ENOENT or EEXIST?

Oops, right, fixed.

> +File
> +PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
> +{
> +       File            file;
> +
> +       /*
> +        * Open the file.  Note: we don't use O_EXCL, in case there is an orphaned
> +        * temp file that can be reused.
> +        */
> +       file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY);
> +       if (file <= 0)
> +       {
> +               if (error_on_failure)
> +                       elog(ERROR, "could not create temporary file \"%s\": %m", path);
> +               else
> +                       return file;
> +       }
> +
> +       /* Mark it for temp_file_limit accounting. */
> +       VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
> +
> +       /*
> +        * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we still
> +        * want to make sure they get closed at end of xact.
> +        */
> +       ResourceOwnerEnlargeFiles(CurrentResourceOwner);
> +       ResourceOwnerRememberFile(CurrentResourceOwner, file);
> +       VfdCache[file].resowner = CurrentResourceOwner;
>
> So maybe I'm being pedantic here, but wouldn't the right order be to do
> ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
> allocating operation, so it can fail, which'd leak the file.

Fixed.  See also commit c5269472.

> +/*
> + * Open a file that was created with PathNameCreateTemporaryFile, possibly in
> + * another backend.  Files opened this way don't count agains the
>
> s/agains/against/

Fixed.

> + * temp_file_limit of the caller, are read-only and are automatically closed
> + * at the end of the transaction but are not deleted on close.
> + */
> +File
> +PathNameOpenTemporaryFile(const char *path)
> +{
> +       File            file;
> +
> +       /* We open the file read-only. */
> +       file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
> +
> +       /* If no such file, then we don't raise an error. */
> +       if (file <= 0 && errno != ENOENT)
> +               elog(ERROR, "could not open temporary file \"%s\": %m", path);
> +
> +       if (file > 0)
> +       {
> +               /*
> +                * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we
> +                * still want to make sure they get closed at end of xact.
> +                */
> +               ResourceOwnerEnlargeFiles(CurrentResourceOwner);
> +               ResourceOwnerRememberFile(CurrentResourceOwner, file);
> +               VfdCache[file].resowner = CurrentResourceOwner;
>
> Same complaint as above, ResourceOwnerEnlargeFiles() should be done
> earlier.

Fixed.

> +/*
> + * Delete a file by pathname.  Return true if the file existed, false if
> + * didn't.
> + */
> +bool
> +PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
> +{
> +       struct stat filestats;
> +       int                     stat_errno;
> +
> +       /* Get the final size for pgstat reporting. */
> +       if (stat(path, &filestats) != 0)
> +               stat_errno = errno;
> +       else
> +               stat_errno = 0;
> +
> +       /*
> +        * Unlike FileClose's automatic file deletion code, we tolerate
> +        * non-existence to support BufFileDeleteShared which doesn't know how
> +        * many segments it has to delete until it runs out.
> +        */
> +       if (stat_errno == ENOENT)
> +               return false;
> +
> +       if (unlink(path) < 0)
> +       {
> +               if (errno != ENOENT)
> +                       elog(error_on_failure ? ERROR : LOG,
> +                                "cannot unlink temporary file \"%s\": %m", path);
> +               return false;
> +       }
> +
> +       if (stat_errno == 0)
> +               ReportTemporaryFileUsage(path, filestats.st_size);
> +       else
> +       {
> +               errno = stat_errno;
> +               elog(LOG, "could not stat file \"%s\": %m", path);
> +       }
>
> All these messages are "not expected to ever happen" ones, right?

You'd have to suffer a nasty filesystem failure, remount read-only or
manually with permissions or something.  Not sure where the line is,
but I've changed all of these new elog calls to ereport.

> +       return true;
> +}
> +
>  /*
>   * close a file when done with it
>   */
> @@ -1537,10 +1747,17 @@ FileClose(File file)
>                 Delete(file);
>         }
>
> +       if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
> +       {
> +               /* Subtract its size from current usage (do first in case of error) */
> +               temporary_files_size -= vfdP->fileSize;
> +               vfdP->fileSize = 0;
> +       }
>
> So, is it right to do so unconditionally and without regard for errors?
> If the file isn't deleted, it shouldn't be subtracted from fileSize. I
> guess you're managing that through the flag, but that's not entirely
> obvious.

I think it is.  Reasoning:  The existing behaviour of fd.c is that if
we don't manage to delete temporary files, we'll LOG something and
forget about them (they'll be cleaned up eventually by a clean restart
or human intervention).  If you have a filesystem that lets you create
and write files but not unlink them then Postgres will eventually eat
all your disk.  The alternative would be not to adjust
temporary_files_size so that we prevent this backend from creating
more temporary files: that might make some kind of sense but it
wouldn't solve any real operational problems: you could still eat all
the disk space by disconnecting and reconnecting to get a new session
with a new temp filespace allowance.  If we want true space limit
we'll need to design that, but that seems out of scope for this
discussion.

> diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
> new file mode 100644
> index 00000000000..6da80838b37
> --- /dev/null
> +++ b/src/backend/storage/file/sharedfileset.c
> @@ -0,0 +1,240 @@
> +/*-------------------------------------------------------------------------
> + *
> + * sharedfileset.c
> + *       Shared temporary file management.
> + *
> + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
> + * Portions Copyright (c) 1994, Regents of the University of California
> + *
> + * IDENTIFICATION
> + *       src/backend/storage/file/sharedfileset.c
> + *
> + *-------------------------------------------------------------------------
> + */
>
> A slightly bigger comment wouldn't hurt.

Done.

> +/*
> + * Attach to a set of directories that was created with SharedFileSetInit.
> + */
> +void
> +SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
> +{
> +       bool            success;
> +
> +       SpinLockAcquire(&fileset->mutex);
> +       if (fileset->refcnt == 0)
> +               success = false;
>
> I've not read finished reading through this, but is this safe? If the
> segment's gone, is the spinlock guaranteed to still be a spinlock?  I
> suspect this isn't a problem because just the underlying data is
> removed, but the SharedFileSet stays alive?

If you have a dsm_segment object, you are attached to it and it exists
(ie is mapped in your backend).  The spinlock is guaranteed to be a
spinlock.  The race being guarded against here is: a worker starts up
and attaches to the DSM segment and then tries to attach to the
SharedFileSet but finds that the other backends have all detached and
the refcount is 0.  That's actually quite unlikely, because they
detach from the SharedFileSet as part of the process of detaching from
the DSM segment, so there is a pretty narrow window of time in which
SharedFileSet's refcount is 0 but the DSM segment still exists (they
haven't detached from that yet) and you managed to attach to the DSM
segment.

This is a variant of the problem SQL Smith found and I fixed in commit
fddf45b38097d14301d249fbeebca32e40233bd2.  I think in general anything
that has its own reference count or other kind of state indicating "I
have been destroyed, you can't use me" and lives in a DSM segment
needs protection against this race.  In general, I think parallel
workers that start up late (say in an error case where the leader gave
up or something) are bound to show racy error messages ranging from
"can't attach to this invalid DSM handle" to "I attached to the DSM
segment, but I can't seem to attach to <thing> inside it", which are
all just versions of "hey, where'd everyone go?" with different
timing.

> +/*
> + * Sorting hat to determine which tablespace a given shared temporary file
> + * belongs in.
> + */
> +static Oid
> +ChooseTablespace(const SharedFileSet *fileset, const char *name)
> +{
> +       uint32          hash = hash_any((const unsigned char *) name, strlen(name));
> +
> +       return fileset->tablespaces[hash % fileset->ntablespaces];
> +}
>
> Hm. I wonder if just round-robin through these isn't a better approach.

The problem is that all backends opening that file by name need to
agree on which tablespace it lives in, and I don't have per-file shmem
space to remember them.  In an earlier version I required all callers
to tell me which "stripe number" the file lived in and to promise that
they would use the same stripe number for the same filename every time
in every backend, so then maybe the calling code could somehow figure
out how to round-robin the stripe numbers... but that seemed at best
clumsy.  This way seemed much tidier, and has only one downside I
could think of: when you have only 2 files in the set there is a 50%
chance that they both finish up in the same tablespace.  With any more
than that it should quickly approach even load balancing.  It didn't
seem worth trying very hard to solve that problem, since people don't
really use temp_tablespaces for serious load balancing anyway AFAIK.

> +/*
> + * Compute the full path of a file in a SharedFileSet.
> + */
> +static void
> +GetSharedFilePath(char *path, SharedFileSet *fileset, const char *name)
> +{
> +       char            dirpath[MAXPGPATH];
> +
> +       GetSharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
> +       snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s", dirpath, name);
> +}
> diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
> index 4c35ccf65eb..8b91d5a6ebe 100644
> --- a/src/backend/utils/resowner/resowner.c
> +++ b/src/backend/utils/resowner/resowner.c
> @@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
>                                 PrintRelCacheLeakWarning(res);
>                         RelationClose(res);
>                 }
> -
> -               /* Ditto for dynamic shared memory segments */
> -               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
> -               {
> -                       dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
> -
> -                       if (isCommit)
> -                               PrintDSMLeakWarning(res);
> -                       dsm_detach(res);
> -               }
>         }
>         else if (phase == RESOURCE_RELEASE_LOCKS)
>         {
> @@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
>                                 PrintFileLeakWarning(res);
>                         FileClose(res);
>                 }
> +
> +               /* Ditto for dynamic shared memory segments */
> +               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
> +               {
> +                       dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
> +
> +                       if (isCommit)
> +                               PrintDSMLeakWarning(res);
> +                       dsm_detach(res);
> +               }
>         }
>
> Is that entirely unproblematic? Are there any DSM callbacks that rely on
> locks still being held? Please split this part into a separate commit
> with such analysis.

I've removed this change.  As far as I know I was wrong about Windows
needing this change for my patch set (due to FILE_SHARED_DELETE).

> +/* The initial size of chunks in pages. */
> +#define STS_MIN_CHUNK_PAGES 4
>
> Could use quick description at how you've arrived at that specific
> value.

Done.  The new comment is:

/*
 * The initial size of chunks, in pages.  This is somewhat arbitrarily set to
 * match the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of
 * tuples at approximately the same rate as it allocates new chunks of memory
 * to insert them into.
 */

> +/* Chunk written to disk. */
> +typedef struct SharedTuplestoreChunk
> +{
> +       int                     npages;                 /* Size of this chunk in BLCKSZ pages. */
> +       int                     ntuples;                /* Number of tuples in this chunk. */
> +       char            data[FLEXIBLE_ARRAY_MEMBER];
> +} SharedTuplestoreChunk;
> +
> +/* Per-participant shared state. */
> +typedef struct SharedTuplestoreParticipant
> +{
> +       slock_t         mutex;
> +       BlockNumber     read_page;              /* Page number for next read. */
> +       BlockNumber     npages;                 /* Number of pages written. */
> +       bool            writing;                /* Used only for assertions. */
> +
> +       /*
> +        * We need variable sized chunks, because we might be asked to store
> +        * gigantic tuples.  To avoid the locking contention that would come from
> +        * reading chunk sizes from disk, we store the chunk size for ranges of
> +        * the file in a compact format in memory.  chunk_pages starts out at
> +        * STS_MIN_CHUNK_PAGES and then doubles each time we reach a page listed
> +        * in chunk_expansion_log.
> +        */
> +       BlockNumber     chunk_expansion_log[sizeof(BlockNumber) * CHAR_BIT];
> +       int                     chunk_expansions;
> +       int                     chunk_expansion;
> +       int                     chunk_pages;
>
> This needs more explanation.

I hope this is now explained a bit better -- see also a few points further down.

> +/*
> + * Initialize a SharedTuplestore in existing shared memory.  There must be
> + * space for sts_estimate(participants) bytes.  If flags is set to the value
> + * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
> + * eagerly (but this isn't yet implemented).
>
> s/iset set to the value/includes the value/ - otherwise it's not really
> a flags argument.

Fixed.

> + * Tuples that are stored may optionally carry a piece of fixed sized
> + * meta-data which will be retrieved along with the tuple.  This is useful for
> + * the hash codes used for multi-batch hash joins, but could have other
> + * applications.
>
> "hash codes"?

Fixed.

> +/*
> + * Prepare to rescan.  Only participant should call this.  After it returns,
> + * all participants should call sts_begin_parallel_scan() and then loop over
> + * sts_parallel_scan_next().
> + */
>
> s/should/may/?  Also maybe document what happens with in-progress reads
> (or rather them not being allowed to exist)?

Done.  Note that this interface is not currently used, but it could
obviously be used for rescans.  In an earlier version this needed to
be called even for the first scan, but I got rid of that requirement.

> +/*
> + * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
> + * pointer to meta data of that size must be provided.
> + */
> +void
> +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
> +                        MinimalTuple tuple)
> +{
>
> +       /* Do we have space? */
> +       size = accessor->sts->meta_data_size + tuple->t_len;
> +       if (accessor->write_pointer + size >= accessor->write_end)
> +       {
> +               /* Try flushing to see if that creates enough space. */
> +               if (accessor->write_chunk != NULL)
> +                       sts_flush_chunk(accessor);
> +
> +               /*
> +                * It may still not be enough in the case of a gigantic tuple, or if
> +                * we haven't created a chunk buffer at all yet.
> +                */
> +               if (accessor->write_pointer + size >= accessor->write_end)
> +               {
> +                       SharedTuplestoreParticipant *participant;
> +                       size_t  space_needed;
> +                       int             pages_needed;
> +
> +                       /* How many pages to hold this data and the chunk header? */
> +                       space_needed = offsetof(SharedTuplestoreChunk, data) + size;
> +                       pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
> +                       pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
> +
> +                       /*
> +                        * Double the chunk size until it's big enough, and record that
> +                        * fact in the shared expansion log so that readers know about it.
> +                        */
> +                       participant = &accessor->sts->participants[accessor->participant];
> +                       while (accessor->write_pages < pages_needed)
> +                       {
> +                               accessor->write_pages *= 2;
> +                               participant->chunk_expansion_log[participant->chunk_expansions++] =
> +                                       accessor->write_page;
> +                       }
>
> Hm. Isn't that going to be pretty unfunny if you have one large and a
> lot of small tuples?

It will increase the parallel scan grain size, and then keep that size
for the rest of the contents of one backend's output file.  I am aware
of two downsides to using a large parallel grain:

1.  It determines the amount of unfairness when we run out of data:
it's the maximum amount of extra data that the unlucky last worker can
finish up with after all the others have finished.  I think this
effect is reduced by higher level factors: when a reader runs out of
data in one backend's file, it'll start reading another backend's
file.  If it's hit the end of all backends' files and this is an outer
batch, Parallel Hash will just go and work on another batch
immediately.

2.  It affects the read-ahead heuristics.  On Linux, if the parallel
scan grain size is larger than the read-ahead window and some other
backend advances the block counter between your reads then the scan
looks like random IO.  Suppose you have the default read-ahead window
size of 512kB.  You need to hit a tuple over 256kB in size *and* be
unlucky enough to have multiple backends reading the same file
concurrently (which I try to avoid as discussed elsewhere) to befuddle
the read-ahead heuristics.  If this is ever a problem we could
consider explicit read-ahead advice.

A couple of alternatives seemed like bad ideas: we could read the
chunk size from the chunk headers, but then each read would be
dependent on (ie have to wait for) the preceding read, like Parallel
(btree) Index Scan and unlike Parallel Seq Scan; or we could track
changes in chunk size in shared memory in a more sophisticated way
that allows decreasing, but that would no longer be small, simple and
fixed size as I have it.

Better ideas?

BTW this code is covered by the regression test.

> +                       /* Create the output buffer. */
> +                       if (accessor->write_chunk != NULL)
> +                               pfree(accessor->write_chunk);
> +                       accessor->write_chunk = (SharedTuplestoreChunk *)
> +                               palloc0(accessor->write_pages * BLCKSZ);
>
> Are we guaranteed to be in a long-lived memory context here?

I changed it so that it captures CurrentMemoryContext when you
intialise or attach and uses that for allocating buffers.

> +/*
> + * Get the next tuple in the current parallel scan.
> + */
> +MinimalTuple
> +sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
> +{
> +       SharedTuplestoreParticipant *p;
> +       BlockNumber     read_page;
> +       int                     chunk_pages;
> +       bool            eof;
> +
> +       for (;;)
> +       {
> +               /* Can we read more tuples from the current chunk? */
> +               if (likely(accessor->read_ntuples < accessor->read_ntuples_available))
> +                       return sts_read_tuple(accessor, meta_data);
>
> I'm not convinced this is a good use of likely/unlikely (not biased and
> not performance critical enough).

Removed.

> +               /* Find the location of a new chunk to read. */
> +               p = &accessor->sts->participants[accessor->read_participant];
> +
> +               SpinLockAcquire(&p->mutex);
> +               eof = p->read_page >= p->npages;
> +               if (!eof)
> +               {
> +                       /*
> +                        * Figure out how big this chunk is.  It will almost always be the
> +                        * same as the last chunk loaded, but if there is one or more
> +                        * entry in the chunk expansion log for this page then we know
> +                        * that it doubled that number of times.  This avoids the need to
> +                        * do IO to adjust the read head, so we don't need to hold up
> +                        * concurrent readers.  (An alternative to this extremely rarely
> +                        * run loop would be to use more space storing the new size in the
> +                        * log so we'd have 'if' instead of 'while'.)
> +                        */
> +                       read_page = p->read_page;
> +                       while (p->chunk_expansion < p->chunk_expansions &&
> +                                  p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
> +                       {
> +                               p->chunk_pages *= 2;
> +                               p->chunk_expansion++;
> +                       }
> +                       chunk_pages = p->chunk_pages;
> +
> +                       /* The next reader will start after this chunk. */
> +                       p->read_page += chunk_pages;
> +               }
> +               SpinLockRelease(&p->mutex);
>
> This looks more like the job of an lwlock rather than a spinlock.

I switched to the alternative algorithm mentioned in parentheses in
the comment.  It uses a bit more space, but that loop is gone.  In my
mind this is much like Parallel Seq Scan: we acquire a spinlock just
to advance the block pointer.  The added complication is that we also
check if the chunk size has changed, which clang renders as this many
instructions:

postgres[0x10047eee0] <+176>:  movslq 0x144(%r15,%rbx), %rcx
postgres[0x10047eee8] <+184>:  cmpl   0x140(%r15,%rbx), %ecx
postgres[0x10047eef0] <+192>:  jge    0x10047ef16               ;
<+230> at sharedtuplestore.c:489
postgres[0x10047eef2] <+194>:  leaq   (%r15,%rbx), %rdx
postgres[0x10047eef6] <+198>:  cmpl   %r12d, 0x40(%rdx,%rcx,8)
postgres[0x10047eefb] <+203>:  jne    0x10047ef16               ;
<+230> at sharedtuplestore.c:489
postgres[0x10047eefd] <+205>:  leaq   0x144(%r15,%rbx), %rsi
postgres[0x10047ef05] <+213>:  leal   0x1(%rcx), %edi
postgres[0x10047ef08] <+216>:  movl   %edi, (%rsi)
postgres[0x10047ef0a] <+218>:  movl   0x44(%rdx,%rcx,8), %ecx
postgres[0x10047ef0e] <+222>:  movl   %ecx, 0x148(%r15,%rbx)
postgres[0x10047ef16] <+230>:  movl   0x148(%r15,%rbx), %r15d

That should be OK, right?

> +/*
> + * Create the name used for our shared BufFiles.
> + */
> +static void
> +make_name(char *name, SharedTuplestoreAccessor *accessor, int participant)
> +{
> +       snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
> +}
>
> Name's a bit generic. And it's still not really making ;)

Renamed to sts_filename.

On Wed, Nov 8, 2017 at 10:32 AM, Peter Geoghegan <pg@bowt.ie> wrote:
> On Tue, Nov 7, 2017 at 1:01 PM, Andres Freund <andres@anarazel.de> wrote:
>> +/*
>> + * Build the name for a given segment of a given BufFile.
>> + */
>> +static void
>> +MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
>> +{
>> +       snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
>> +}
>>
>> Not a fan of this name - you're not "making" a filename here (as in
>> allocating or such). I think I'd just remove the Make prefix.
>
> +1
>
> Can we document the theory behind file naming here, if that isn't in
> the latest version? This is a routine private to parallel hash join
> (or shared tuplestore), not Buffile. Maybe Buffile should have some
> opinion on this, though. Just as a matter of style.

I have added some text about naming to BufFileCreateShared().

>> +                       /* Create the output buffer. */
>> +                       if (accessor->write_chunk != NULL)
>> +                               pfree(accessor->write_chunk);
>> +                       accessor->write_chunk = (SharedTuplestoreChunk *)
>> +                               palloc0(accessor->write_pages * BLCKSZ);
>>
>> Are we guaranteed to be in a long-lived memory context here?
>
> I imagine that Thomas looked to tuplestore_begin_heap() + interXact as
> a kind of precedent here. See comments above that function.

Yeah, but I had missed something important:  I now capture the current
memory context when you initialise or attach to a SharedTuplestore.
That's the point at which the SharedTuplestoreAccessor is allocated,
and the caller needs to make sure that the right memory context is
active, but now that I hold onto it I can use the same one for future
buffer allocations.  Like the code highlighted above.  In nodeHash.c I
now make sure that always happens hashtable->hashCxt.

On Wed, Nov 8, 2017 at 4:40 PM, Andres Freund <andres@anarazel.de> wrote:
> Hi,
>
>  * avoids wasting memory on duplicated hash tables
>  * avoids wasting disk space on duplicated batch files
>  * avoids wasting CPU executing duplicate subplans
>
> What's the last one referring to?

Erm, I was saying the same thing two different ways (with the
following line).  Fixed.

> +static void
> +MultiExecParallelHash(HashState *node)
> +{
>
> +       switch (BarrierPhase(build_barrier))
> +       {
> +               case PHJ_BUILD_ALLOCATING:
> +
> +                       /*
> +                        * Either I just allocated the initial hash table in
> +                        * ExecHashTableCreate(), or someone else is doing that.  Either
> +                        * way, wait for everyone to arrive here so we can proceed, and
> +                        * then fall through.
> +                        */
> +                       BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);
>
> Can you add a /* fallthrough */ comment here? Gcc is warning if you
> don't. While we currently have lotsa other places not having the
> annotation, it seem reasonable to have it in new code.

Fixed all warnings from GCC 7.2 -Wimplicit-fallthrough=3.

> +               case PHJ_BUILD_HASHING_INNER:
> +
> +                       /*
> +                        * It's time to begin hashing, or if we just arrived here then
> +                        * hashing is already underway, so join in that effort.  While
> +                        * hashing we have to be prepared to help increase the number of
> +                        * batches or buckets at any time, and if we arrived here when
> +                        * that was already underway we'll have to help complete that work
> +                        * immediately so that it's safe to access batches and buckets
> +                        * below.
> +                        */
> +                       if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
> +                               PHJ_GROW_BATCHES_ELECTING)
> +                               ExecParallelHashIncreaseNumBatches(hashtable);
> +                       if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
> +                               PHJ_GROW_BUCKETS_ELECTING)
> +                               ExecParallelHashIncreaseNumBuckets(hashtable);
> +                       ExecParallelHashEnsureBatchAccessors(hashtable);
>
> "accessors" sounds a bit weird for a bunch of pointers, but maybe that's
> just my ESL senses tingling wrongly.

Pointers to ParallelHashJoinBatchAccessor objects.  That's where we
keep this backend's pointers into ParallelHashJoinBatch objects and
some other backend-local state.  It's a pattern: you have some shared
object of type Foo, and every backend interacting with it will also
need an object of type FooAccessor if a simple pointer to the shared
object isn't enough.  If you have a better name than 'Accessor' for
this then I'm all ears.

> /* ----------------------------------------------------------------
> @@ -240,12 +427,15 @@ ExecEndHash(HashState *node)
>   * ----------------------------------------------------------------
>   */
>  HashJoinTable
> -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
> +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
>
> +       /*
> +        * Parallel Hash tries to use the combined work_mem of all workers to
> +        * avoid the need to batch.  If that won't work, it falls back to work_mem
> +        * per worker and tries to process batches in parallel.
> +        */
>
> One day we're going to need a better approach to this. I have no idea
> how, but this per-node, and now per_node * max_parallelism, approach has
> only implementation simplicity as its benefit.

I agree, and I am interested in that subject.  In the meantime, I
think it'd be pretty unfair if parallel-oblivious hash join and
sort-merge join and every other parallel plan get to use work_mem * p
(and in some cases waste it with duplicate data), but Parallel Hash
isn't allowed to do the same (and put it to good use).

> +static HashJoinTuple
> +ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
> +                                                 dsa_pointer *shared)
> +{
>
> +static void
> +ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
> +{
>
>
>
> +/*
> + * Get the first tuple in a given bucket identified by number.
> + */
> +static HashJoinTuple
> +ExecHashFirstTupleInBucket(HashJoinTable hashtable, int bucketno)
> +{
> +       if (hashtable->parallel_state)
> +       {
> +               dsa_pointer p =
> +               dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);
>
> Can you make this, and possibly a few other places, more readable by
> introducing a temporary variable?

Done.

> +/*
> + * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
> + */
> +static void
> +ExecParallelHashPushTuple(dsa_pointer_atomic *head,
> +                                                 HashJoinTuple tuple,
> +                                                 dsa_pointer tuple_shared)
> +{
> +       do
> +       {
> +               tuple->next.shared = dsa_pointer_atomic_read(head);
> +       } while (!dsa_pointer_atomic_compare_exchange(head,
> +
&tuple->next.shared,
> +                                                                                                 tuple_shared));
> +}
>
> This is hard to read.

Change to a for loop with a break.

> + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
> + * be used repeatedly as required to coordinate expansions in the number of
> + * batches or buckets.  Their phases are as follows:
> + *
> + *   PHJ_GROW_BATCHES_ELECTING       -- initial state
> + *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
> + *   PHJ_GROW_BATCHES_REPARTITIONING -- all rep
> s/rep/repartition/?

Fixed.

> -
> +#include "utils/sharedtuplestore.h"
>
> deletes a separator newline.

Fixed.

>  /* ----------------------------------------------------------------
> @@ -138,6 +236,18 @@ ExecHashJoin(PlanState *pstate)
>                                         /* no chance to not build the hash table */
>                                         node->hj_FirstOuterTupleSlot = NULL;
>                                 }
> +                               else if (hashNode->parallel_state != NULL)
> +                               {
> +                                       /*
> +                                        * The empty-outer optimization is not implemented for
> +                                        * shared hash tables, because no one participant can
> +                                        * determine that there are no outer tuples, and it's not
> +                                        * yet clear that it's worth the synchronization overhead
> +                                        * of reaching consensus to figure that out.  So we have
> +                                        * to build the hash table.
> +                                        */
> +                                       node->hj_FirstOuterTupleSlot = NULL;
> +                               }
>
> Hm. Isn't MultiExecParallelHash already doing so?

I do support the empty-inner (empty table) optimisation.
MultiExecParallelHash makes sure that all workers agree on the total
number of inner tuples.  Therefore they agree on whether to give up
early and not bother scanning the outer relation.  The synchronisation
point required to implement this was unavoidable anyway (it's the
"main" synchronisation point in this algorithm: everyone has to be
finished building the hash table before anyone can probe it).

I don't support the empty-outer optimisation.  I could support that
like this: all workers try to pull one tuple from the outer relation,
before even building the hash table.  Then they would synchronise with
each other and see if anybody managed to get a tuple.  If none of them
did, then we can give up even earlier and skip building the hash
table.  The question is: is it worth introducing an extra
synchronisation point to reach consensus?  It seems unlikely that the
outer relation is empty, so my answer is "no".  There may be other
complications I haven't thought of.

> -                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
> +                               if (hashtable->parallel_state)
> +                               {
> +                                       Barrier    *build_barrier;
> +
> +                                       build_barrier = &hashtable->parallel_state->build_barrier;
> +                                       if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
> +                                       {
> +                                               /*
> +                                                * If multi-batch, we need to hash the outer relation
> +                                                * up front.
> +                                                */
> +                                               if (hashtable->nbatch > 1)
> +                                                       ExecParallelHashJoinPartitionOuter(node);
> +                                               BarrierArriveAndWait(build_barrier,
> +
WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
> +                                       }
> +                                       Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
> +
> +                                       /* Each backend should now select a batch to work on. */
> +                                       hashtable->curbatch = -1;
> +                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
> +
> +                                       continue;
> +                               }
> +                               else
> +                                       node->hj_JoinState = HJ_NEED_NEW_OUTER;
>
> You know what I'm going to say about all these branches, and sigh.

BTW this is not per-tuple code -- it runs once at the end of hashing.
Not sure what you're looking for here.

> If we don't split this into two versions, we at least should store
> hashNode->parallel_state in a local var, so the compiler doesn't have to
> pull that out of memory after every external function call (of which
> there are a lot). In common cases it'll end up in a callee saved
> registers, and most of the called functions won't be too register
> starved (on x86-64).

Hmm.  Well I did that already in v24 -- in many places there is now a
local variable called pstate.

> +/*
> + * Choose a batch to work on, and attach to it.  Returns true if successful,
> + * false if there are no more batches.
> + */
> +static bool
> +ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
> +{
>
>
> +                                       /*
> +                                        * This batch is ready to probe.  Return control to
> +                                        * caller. We stay attached to batch_barrier so that the
> +                                        * hash table stays alive until everyone's finish probing
>
> *finished?

Fixed.

> +                               case PHJ_BATCH_DONE:
> +
> +                                       /*
> +                                        * Already done.  Detach and go around again (if any
> +                                        * remain).
> +                                        */
> +                                       BarrierDetach(batch_barrier);
> +
> +                                       /*
> +                                        * We didn't work on this batch, but we need to observe
> +                                        * its size for EXPLAIN.
> +                                        */
> +                                       ExecParallelHashUpdateSpacePeak(hashtable, batchno);
> +                                       hashtable->batches[batchno].done = true;
> +                                       hashtable->curbatch = -1;
> +                                       break;
>
> Hm, maybe I'm missing something, but why is it guaranteed that "we
> didn't work on this batch"?

We just attached to it (see switch(BarrierAttach(batch_barrier))) and
jumped to this label.  If it's PHJ_BATCH_DONE, then we know that some
other backend(s) did all the work and brought it to that state,
because otherwise our own backend-local flag
hashtable->batches[batchno].done would have been true and we'd have
skipped it.

In any case -- sorry about this -- the following patch that adds
EXPLAIN ANALYZE support actually removes that little bit of code
there.  In the version you quote I wanted to make sure that every
backend had a peak at every batch's peak size.   The follow-on patch
introduces extra shm state to instrument stuff better so that we get
per worker data into explain.c.

> +void
> +ExecShutdownHashJoin(HashJoinState *node)
> +{
> +       /*
> +        * By the time ExecEndHashJoin runs in a worker, shared memory has been
> +        * destroyed.  So this is our last chance to do any shared memory cleanup.
> +        */
>
> This comment doesn't really make much sense to me.

Tried again.  The point is that ExecShutdownXXX has to leave things in
a state where ExecEndXXX won't try to follow any pointers into DSM
segments, because it'll be unmapped.

> +void
> +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
> +{
>
> could use a header comment.

Standard boiler-plate copied.

> a) The executor side is starting to look good.

Thanks!

> b) This is a lot of code.

Yeah.

On Thu, Nov 9, 2017 at 8:12 AM, Andres Freund <andres@anarazel.de> wrote:
> @@ -747,7 +747,7 @@ try_hashjoin_path(PlannerInfo *root,
>          * never have any output pathkeys, per comments in create_hashjoin_path.
>          */
>         initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
> -                                                 outer_path, inner_path, extra);
> +                                                 outer_path, inner_path, extra, false);
>
>         if (add_path_precheck(joinrel,
>                                                   workspace.startup_cost, workspace.total_cost,
> @@ -761,6 +761,7 @@ try_hashjoin_path(PlannerInfo *root,
>                                                                           extra,
>                                                                           outer_path,
>                                                                           inner_path,
> +                                                                         false,        /* parallel_hash */
>                                                                           extra->restrictlist,
>                                                                           required_outer,
>                                                                           hashclauses));
> @@ -776,6 +777,10 @@ try_hashjoin_path(PlannerInfo *root,
>   * try_partial_hashjoin_path
>   *       Consider a partial hashjoin join path; if it appears useful, push it into
>   *       the joinrel's partial_pathlist via add_partial_path().
> + *       The outer side is partial.  If parallel_hash is true, then the inner path
> + *       must be partial and will be run in parallel to create one or more shared
> + *       hash tables; otherwise the inner path must be complete and a copy of it
> + *       is run in every process to create separate identical private hash tables.
>   */
>
> When do we have "or more shared hash tables" rather than one? Are you
> thinking about subordinate nodes?

If there are multiple batches, we'll load several hash tables into
memory at the same time.

> @@ -1839,6 +1846,10 @@ hash_inner_and_outer(PlannerInfo *root,
>                  * able to properly guarantee uniqueness.  Similarly, we can't handle
>                  * JOIN_FULL and JOIN_RIGHT, because they can produce false null
>                  * extended rows.  Also, the resulting path must not be parameterized.
> +                * We should be able to support JOIN_FULL and JOIN_RIGHT for Parallel
> +                * Hash, since in that case we're back to a single hash table with a
> +                * single set of match bits for each batch, but that will require
> +                * figuring out a deadlock-free way to wait for the probe to finish.
>                  */
>
> s/should be able/would be able/?

Done.  It would definitely work (and worked in the earlier
deadlock-prone design).

> index 6a45b68e5df..2d38a5efae8 100644
> --- a/src/backend/storage/ipc/barrier.c
> +++ b/src/backend/storage/ipc/barrier.c
> @@ -451,7 +451,6 @@ BarrierDetachImpl(Barrier *barrier, bool arrive)
>                 release = true;
>                 barrier->arrived = 0;
>                 ++barrier->phase;
> -               Assert(barrier->selected);
>                 barrier->selected = false;
>         }
>
> Uh, what?

A fixup squashed into the wrong patch.  Fixed.

As for the change itself: when I introduced BarrierArriveAndDetach()
it became possible for the barrier to advanced through phases without
anyone being selected, so the assertion wasn't valid.

> diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out
> index 35523bd8065..40a076d976f 100644
> --- a/src/test/regress/expected/join.out
> +++ b/src/test/regress/expected/join.out
> @@ -5821,6 +5821,9 @@ analyze extremely_skewed;
>  insert into extremely_skewed
>    select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
>    from generate_series(1, 19000);
> +-- Make a relation with a couple of enormous tuples.
> +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
> +alter table wide set (parallel_workers = 2);
>
> I'm doubtful this is actually going to be a wide tuple - this'll get
> compressed down quite a bit, no?
>
> postgres[26465][1]=# SELECT octet_length(t), pg_column_size(t) FROM wide ;
> ┌──────────────┬────────────────┐
> │ octet_length │ pg_column_size │
> ├──────────────┼────────────────┤
> │       320000 │           3671 │
> │       320000 │           3671 │
> └──────────────┴────────────────┘
> (2 rows)
>
>
> (and yes, it's ridiculous that a compressed datum of that size still
> takes up 3kb)

The tuple is small on disk, but it allows me to create a large tuple
in the hash table with the following incantation (hat-tip to Andrew
Gierth for this trick):

  select length(max(s.t))
  from wide left join (select id, coalesce(t, '') || '' as t from
wide) s using (id);

A non-strict expression and a left join result in a projected
detoasted decompressed monster tuple which you can confirm by sticking
an elog into ExecParallelHashLoadTuple() like so:

2017-11-10 13:57:07.193 NZDT [7337] LOG:  tuple size = 320040

> +-- parallel with parallel-aware hash join
> +set max_parallel_workers_per_gather = 2;
> +set work_mem = '128kB';
> +set enable_parallel_hash = on;
>
> I think it'd be better if we structured the file so we just sat guc's
> with SET LOCAL inside a transaction.

I wrapped the whole region of join.sql concerned with hash joins in a
transaction that rolls back, so I don't have to write LOCAL.  That's
just as good, right?

> All-in-all this part looks fairly boring.

Thank you.

--
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

On 2017-11-14 01:30:30 +1300, Thomas Munro wrote:
> > +-- The "good" case: batches required, but we plan the right number; we
> > +-- plan for 16 batches, and we stick to that number, and peak memory
> > +-- usage says within our work_mem budget
> > +-- non-parallel
> > +set max_parallel_workers_per_gather = 0;
> > +set work_mem = '128kB';
> >
> > So how do we know that's actually the case we're testing rather than
> > something arbitrarily different? There's IIRC tests somewhere that just
> > filter the json explain output to the right parts...
> 
> Yeah, good idea.  My earlier attempts to dump out the hash join
> dimensions ran into problems with architecture sensitivity and then
> some run-to-run non-determinism in the parallel case (due to varying
> fragmentation depending on how many workers get involved in time).
> The attached version tells you about batch growth without reporting
> the exact numbers, except in the "ugly" case where we know that there
> is only one possible outcome because the extreme skew detector is
> guaranteed to go off after the first nbatch increase (I got rid of all
> other tuples except ones with the same key to make this true).

Hm. The way you access this doesn't quite seem right:
+--
+-- exercises for the hash join code
+--
+begin;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  line text;
+  matches text[];
+begin
+  for line in
+    execute 'explain analyze ' || query
+  loop
+    matches := (regexp_matches(line, '  Batches: ([0-9]+) \(originally ([0-9]+)\)'));
+    if matches is not null then
+      original := matches[2]::int;
+      final := matches[1]::int;
+      return next;
+    else
+      matches := regexp_matches(line, '  Batches: ([0-9]+)');
+      if matches is not null then
+        original := matches[1]::int;
+        final := original;
+        return next;
+      end if;
+    end if;
+  end loop;
+end;
+$$;

Why not use format json and access the output that way? Then you can be
sure you access the right part of the tree and such?

> > +       else
> > +       {
> > +               errno = stat_errno;
> > +               elog(LOG, "could not stat file \"%s\": %m", path);
> > +       }
> >
> > All these messages are "not expected to ever happen" ones, right?
> 
> You'd have to suffer a nasty filesystem failure, remount read-only or
> manually with permissions or something.  Not sure where the line is,
> but I've changed all of these new elog calls to ereport.

Oh, I'd been fine keeping them as elogs. The one exception would have
been out-of-space cases which'll occur in practice.


> > +       if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
> > +       {
> > +               /* Subtract its size from current usage (do first in case of error) */
> > +               temporary_files_size -= vfdP->fileSize;
> > +               vfdP->fileSize = 0;
> > +       }
> >
> > So, is it right to do so unconditionally and without regard for errors?
> > If the file isn't deleted, it shouldn't be subtracted from fileSize. I
> > guess you're managing that through the flag, but that's not entirely
> > obvious.
> 
> I think it is.  Reasoning:  The existing behaviour of fd.c is that if
> we don't manage to delete temporary files, we'll LOG something and
> forget about them (they'll be cleaned up eventually by a clean restart
> or human intervention).

IOW: Never ;)


> > +/*
> > + * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
> > + * pointer to meta data of that size must be provided.
> > + */
> > +void
> > +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
> > +                        MinimalTuple tuple)
> > +{
> >
> > +       /* Do we have space? */
> > +       size = accessor->sts->meta_data_size + tuple->t_len;
> > +       if (accessor->write_pointer + size >= accessor->write_end)
> > +       {
> > +               /* Try flushing to see if that creates enough space. */
> > +               if (accessor->write_chunk != NULL)
> > +                       sts_flush_chunk(accessor);
> > +
> > +               /*
> > +                * It may still not be enough in the case of a gigantic tuple, or if
> > +                * we haven't created a chunk buffer at all yet.
> > +                */
> > +               if (accessor->write_pointer + size >= accessor->write_end)
> > +               {
> > +                       SharedTuplestoreParticipant *participant;
> > +                       size_t  space_needed;
> > +                       int             pages_needed;
> > +
> > +                       /* How many pages to hold this data and the chunk header? */
> > +                       space_needed = offsetof(SharedTuplestoreChunk, data) + size;
> > +                       pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
> > +                       pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
> > +
> > +                       /*
> > +                        * Double the chunk size until it's big enough, and record that
> > +                        * fact in the shared expansion log so that readers know about it.
> > +                        */
> > +                       participant = &accessor->sts->participants[accessor->participant];
> > +                       while (accessor->write_pages < pages_needed)
> > +                       {
> > +                               accessor->write_pages *= 2;
> > +                               participant->chunk_expansion_log[participant->chunk_expansions++] =
> > +                                       accessor->write_page;
> > +                       }
> >
> > Hm. Isn't that going to be pretty unfunny if you have one large and a
> > lot of small tuples?
> 
> It will increase the parallel scan grain size, and then keep that size
> for the rest of the contents of one backend's output file.  I am aware
> of two downsides to using a large parallel grain:

> 1.  It determines the amount of unfairness when we run out of data:
> it's the maximum amount of extra data that the unlucky last worker can
> finish up with after all the others have finished.  I think this
> effect is reduced by higher level factors: when a reader runs out of
> data in one backend's file, it'll start reading another backend's
> file.  If it's hit the end of all backends' files and this is an outer
> batch, Parallel Hash will just go and work on another batch
> immediately.

Consider e.g. what happens if there's the occasional 500MB datum, and
the rest's very small...


> Better ideas?

Not really. I'm more than a bit suspicous of this solution, but I don't
really have a great suggestion otherwise.  One way to combat extreme
size skew would be to put very large datums into different files.

But I think we probably can go with your approach for now, ignoring my
failure prone spidey senses ;)


> > +               /* Find the location of a new chunk to read. */
> > +               p = &accessor->sts->participants[accessor->read_participant];
> > +
> > +               SpinLockAcquire(&p->mutex);
> > +               eof = p->read_page >= p->npages;
> > +               if (!eof)
> > +               {
> > +                       /*
> > +                        * Figure out how big this chunk is.  It will almost always be the
> > +                        * same as the last chunk loaded, but if there is one or more
> > +                        * entry in the chunk expansion log for this page then we know
> > +                        * that it doubled that number of times.  This avoids the need to
> > +                        * do IO to adjust the read head, so we don't need to hold up
> > +                        * concurrent readers.  (An alternative to this extremely rarely
> > +                        * run loop would be to use more space storing the new size in the
> > +                        * log so we'd have 'if' instead of 'while'.)
> > +                        */
> > +                       read_page = p->read_page;
> > +                       while (p->chunk_expansion < p->chunk_expansions &&
> > +                                  p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
> > +                       {
> > +                               p->chunk_pages *= 2;
> > +                               p->chunk_expansion++;
> > +                       }
> > +                       chunk_pages = p->chunk_pages;
> > +
> > +                       /* The next reader will start after this chunk. */
> > +                       p->read_page += chunk_pages;
> > +               }
> > +               SpinLockRelease(&p->mutex);
> >
> > This looks more like the job of an lwlock rather than a spinlock.
> 
> I switched to the alternative algorithm mentioned in parentheses in
> the comment.  It uses a bit more space, but that loop is gone.  In my
> mind this is much like Parallel Seq Scan: we acquire a spinlock just
> to advance the block pointer.  The added complication is that we also
> check if the chunk size has changed, which clang renders as this many
> instructions:
> 
> postgres[0x10047eee0] <+176>:  movslq 0x144(%r15,%rbx), %rcx
> postgres[0x10047eee8] <+184>:  cmpl   0x140(%r15,%rbx), %ecx
> postgres[0x10047eef0] <+192>:  jge    0x10047ef16               ;
> <+230> at sharedtuplestore.c:489
> postgres[0x10047eef2] <+194>:  leaq   (%r15,%rbx), %rdx
> postgres[0x10047eef6] <+198>:  cmpl   %r12d, 0x40(%rdx,%rcx,8)
> postgres[0x10047eefb] <+203>:  jne    0x10047ef16               ;
> <+230> at sharedtuplestore.c:489
> postgres[0x10047eefd] <+205>:  leaq   0x144(%r15,%rbx), %rsi
> postgres[0x10047ef05] <+213>:  leal   0x1(%rcx), %edi
> postgres[0x10047ef08] <+216>:  movl   %edi, (%rsi)
> postgres[0x10047ef0a] <+218>:  movl   0x44(%rdx,%rcx,8), %ecx
> postgres[0x10047ef0e] <+222>:  movl   %ecx, 0x148(%r15,%rbx)
> postgres[0x10047ef16] <+230>:  movl   0x148(%r15,%rbx), %r15d
> 
> That should be OK, right?

It's not too bad. Personally I'm of the opinion though that pretty much
no new spinlocks should be added - their worst case performance
characteristics are bad enough for that to be only worth the
experimentation in case swhere each cycle really matters and where
contention is unlikely.


> > One day we're going to need a better approach to this. I have no idea
> > how, but this per-node, and now per_node * max_parallelism, approach has
> > only implementation simplicity as its benefit.
> 
> I agree, and I am interested in that subject.  In the meantime, I
> think it'd be pretty unfair if parallel-oblivious hash join and
> sort-merge join and every other parallel plan get to use work_mem * p
> (and in some cases waste it with duplicate data), but Parallel Hash
> isn't allowed to do the same (and put it to good use).

I'm not sure I care about fairness between pieces of code ;) 

> > -                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
> > +                               if (hashtable->parallel_state)
> > +                               {
> > +                                       Barrier    *build_barrier;
> > +
> > +                                       build_barrier = &hashtable->parallel_state->build_barrier;
> > +                                       if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
> > +                                       {
> > +                                               /*
> > +                                                * If multi-batch, we need to hash the outer relation
> > +                                                * up front.
> > +                                                */
> > +                                               if (hashtable->nbatch > 1)
> > +                                                       ExecParallelHashJoinPartitionOuter(node);
> > +                                               BarrierArriveAndWait(build_barrier,
> > +
WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
> > +                                       }
> > +                                       Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
> > +
> > +                                       /* Each backend should now select a batch to work on. */
> > +                                       hashtable->curbatch = -1;
> > +                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
> > +
> > +                                       continue;
> > +                               }
> > +                               else
> > +                                       node->hj_JoinState = HJ_NEED_NEW_OUTER;
> >
> > You know what I'm going to say about all these branches, and sigh.
> 
> BTW this is not per-tuple code -- it runs once at the end of hashing.
> Not sure what you're looking for here.

It was more a general statement about all the branches in nodeHashjoin,
than about these specific branches. Should've made that clearer. There's
definitely branches in very common parts:        case HJ_NEED_NEW_OUTER:
            /*             * We don't have an outer tuple, try to get the next one             */            if
(hashtable->parallel_state)               outerTupleSlot =
ExecParallelHashJoinOuterGetTuple(outerNode,node,                                                      &hashvalue);
      else                outerTupleSlot =                    ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
 


I don't think you should do so now, but I think a reasonable approach
here would be to move the HJ_BUILD_HASHTABLE code into a separate
function (it really can't be hot). Then have specialized ExecHashJoin()
versions for parallel/non-parallel and potentially for outer/inner/anti.


> > If we don't split this into two versions, we at least should store
> > hashNode->parallel_state in a local var, so the compiler doesn't have to
> > pull that out of memory after every external function call (of which
> > there are a lot). In common cases it'll end up in a callee saved
> > registers, and most of the called functions won't be too register
> > starved (on x86-64).
> 
> Hmm.  Well I did that already in v24 -- in many places there is now a
> local variable called pstate.

See above piece of code, and a few others, in nodeHash.


> > I think it'd be better if we structured the file so we just sat guc's
> > with SET LOCAL inside a transaction.
> 
> I wrapped the whole region of join.sql concerned with hash joins in a
> transaction that rolls back, so I don't have to write LOCAL.  That's
> just as good, right?

Not really imo. Being able to read a test without going through all
previous ones is a lot better.

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Tue, Nov 14, 2017 at 4:24 PM, Andres Freund <andres@anarazel.de> wrote:
>> I agree, and I am interested in that subject.  In the meantime, I
>> think it'd be pretty unfair if parallel-oblivious hash join and
>> sort-merge join and every other parallel plan get to use work_mem * p
>> (and in some cases waste it with duplicate data), but Parallel Hash
>> isn't allowed to do the same (and put it to good use).
>
> I'm not sure I care about fairness between pieces of code ;)

I realize you're sort of joking here, but I think it's necessary to
care about fairness between pieces of code.

I mean, the very first version of this patch that Thomas submitted was
benchmarked by Rafia and had phenomenally good performance
characteristics.  That turned out to be because it wasn't respecting
work_mem; you can often do a lot better with more memory, and
generally you can't do nearly as well with less.  To make comparisons
meaningful, they have to be comparisons between algorithms that use
the same amount of memory.  And it's not just about testing.  If we
add an algorithm that will run twice as fast with equal memory but
only allow it half as much, it will probably never get picked and the
whole patch is a waste of time.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
On 2017-11-15 08:37:11 -0500, Robert Haas wrote:
> On Tue, Nov 14, 2017 at 4:24 PM, Andres Freund <andres@anarazel.de> wrote:
> >> I agree, and I am interested in that subject.  In the meantime, I
> >> think it'd be pretty unfair if parallel-oblivious hash join and
> >> sort-merge join and every other parallel plan get to use work_mem * p
> >> (and in some cases waste it with duplicate data), but Parallel Hash
> >> isn't allowed to do the same (and put it to good use).
> >
> > I'm not sure I care about fairness between pieces of code ;)
> 
> I realize you're sort of joking here, but I think it's necessary to
> care about fairness between pieces of code.

Indeed I kinda was.


> I mean, the very first version of this patch that Thomas submitted was
> benchmarked by Rafia and had phenomenally good performance
> characteristics.  That turned out to be because it wasn't respecting
> work_mem; you can often do a lot better with more memory, and
> generally you can't do nearly as well with less.  To make comparisons
> meaningful, they have to be comparisons between algorithms that use
> the same amount of memory.  And it's not just about testing.  If we
> add an algorithm that will run twice as fast with equal memory but
> only allow it half as much, it will probably never get picked and the
> whole patch is a waste of time.

But this does bug me, and I think it's what made me pause here to make a
bad joke.  The way that parallelism treats work_mem makes it even more
useless of a config knob than it was before.  Parallelism, especially
after this patch, shouldn't compete / be benchmarked against a
single-process run with the same work_mem. To make it "fair" you need to
compare parallelism against a single threaded run with work_mem *
max_parallelism.

Thomas argues that this makes hashjoins be treated faily vis-a-vi
parallel-oblivious hash join etc. And I think he has somewhat of a
point. But I don't think it's quite right either: In several of these
cases the planner will not prefer the multi-process plan because it uses
more work_mem, it's a cost to be paid. Whereas this'll optimize towards
using work_mem * max_parallel_workers_per_gather amount of memory.

This makes it pretty much impossible to afterwards tune work_mem on a
server in a reasonable manner. Previously you'd tune it to something
like free_server_memory - (max_connections * work_mem *
80%_most_complex_query). Which you can't really do anymore now, you'd
also need to multiply by max_parallel_workers_per_gather. Which means
that you might end up "forcing" paralellism on a bunch of plans that'd
normally execute in too short a time to make parallelism worth it.


I don't really have a good answer to "but what should we otherwise do",
but I'm doubtful this is quite the right answer.


Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Peter Geoghegan
Date:
On Wed, Nov 15, 2017 at 10:35 AM, Andres Freund <andres@anarazel.de> wrote:
>> I realize you're sort of joking here, but I think it's necessary to
>> care about fairness between pieces of code.
>
> Indeed I kinda was.

When I posted v1 of parallel CREATE INDEX, it followed the hash join
model of giving workMem (maintenance_work_mem) to every worker. Robert
suggested that my comparison with a serial case was therefore not
representative, since I was using much more memory. I actually changed
the patch to use a single maintenance_work_mem share for the entire
operation afterwards, which seemed to work better. And, it made very
little difference to performance for my original benchmark in the end,
so I was arguably wasting memory in v1.

>> I mean, the very first version of this patch that Thomas submitted was
>> benchmarked by Rafia and had phenomenally good performance
>> characteristics.  That turned out to be because it wasn't respecting
>> work_mem; you can often do a lot better with more memory, and
>> generally you can't do nearly as well with less.  To make comparisons
>> meaningful, they have to be comparisons between algorithms that use
>> the same amount of memory.  And it's not just about testing.  If we
>> add an algorithm that will run twice as fast with equal memory but
>> only allow it half as much, it will probably never get picked and the
>> whole patch is a waste of time.

The contrast with the situation with Thomas and his hash join patch is
interesting. Hash join is *much* more sensitive to the availability of
memory than a sort operation is.

> I don't really have a good answer to "but what should we otherwise do",
> but I'm doubtful this is quite the right answer.

I think that the work_mem model should be replaced by something that
centrally budgets memory. It would make sense to be less generous with
sorts and more generous with hash joins when memory is in short
supply, for example, and a model like this can make that possible. The
work_mem model has always forced users to be far too conservative.
Workloads are very complicated, and always having users target the
worst case leaves a lot to be desired.

-- 
Peter Geoghegan


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

On 2017-11-15 10:57:35 -0800, Peter Geoghegan wrote:
> > I don't really have a good answer to "but what should we otherwise do",
> > but I'm doubtful this is quite the right answer.
> 
> I think that the work_mem model should be replaced by something that
> centrally budgets memory. It would make sense to be less generous with
> sorts and more generous with hash joins when memory is in short
> supply, for example, and a model like this can make that possible. The
> work_mem model has always forced users to be far too conservative.
> Workloads are very complicated, and always having users target the
> worst case leaves a lot to be desired.

Obviously that's nice and worthwhile goal, but it seems more than a bit
out of reach for this patchset.

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Wed, Nov 15, 2017 at 1:35 PM, Andres Freund <andres@anarazel.de> wrote:
> But this does bug me, and I think it's what made me pause here to make a
> bad joke.  The way that parallelism treats work_mem makes it even more
> useless of a config knob than it was before.  Parallelism, especially
> after this patch, shouldn't compete / be benchmarked against a
> single-process run with the same work_mem. To make it "fair" you need to
> compare parallelism against a single threaded run with work_mem *
> max_parallelism.

I don't really know how to do a fair comparison between a parallel
plan and a non-parallel plan.  Even if the parallel plan contains zero
nodes that use work_mem, it might still use more memory than the
non-parallel plan, because a new backend uses a bunch of memory.  If
you really want a comparison that is fair on the basis of memory
usage, you have to take that into account somehow.

But even then, the parallel plan is also almost certainly consuming
more CPU cycles to produce the same results.  Parallelism is all about
trading away efficiency for execution time.  Not just because of
current planner and executor limitations, but intrinsically, parallel
plans are less efficient.  The globally optimal solution on a system
that is short on either memory or CPU cycles is to turn parallelism
off.

> Thomas argues that this makes hashjoins be treated faily vis-a-vi
> parallel-oblivious hash join etc. And I think he has somewhat of a
> point. But I don't think it's quite right either: In several of these
> cases the planner will not prefer the multi-process plan because it uses
> more work_mem, it's a cost to be paid. Whereas this'll optimize towards
> using work_mem * max_parallel_workers_per_gather amount of memory.

In general, we have no framework for evaluating the global impact on
the system of our decisions.  Not just with parallelism, but in
general, plans that use memory are going to typically beat plans that
don't, because using more memory is a good way to make things run
faster, so the cost goes down, and the cost is what matters.
Everything optimizes for eating as many resources as possible, even if
there is only an extremely marginal gain over a more costly plan that
uses dramatically fewer resources.

A good example of this is a parallel bitmap heap scan vs. a parallel
sequential scan.  With enough workers, we switch from having one
worker scan the index and then having all workers do a joint scan of
the heap -- to just performing a parallel sequential scan.  Because of
Moore's law, as you add workers, waiting for the non-parallel index
scan to build the bitmap eventually looks less desirable than
accepting that you're going to uselessly scan a lot of pages -- you
stop caring, because you have enough raw power to just blast through
it.

The best non-parallel example I can think of off-hand is sorts.
Sometimes, reducing the amount of memory available for a sort really
doesn't cost very much in terms of execution time, but we always run a
sort with the full allotted work_mem, even if that's gigantic.  We
won't use it all if the data is smaller than work_mem, but if there's
batching going on then we will, even if it doesn't really help.

> This makes it pretty much impossible to afterwards tune work_mem on a
> server in a reasonable manner. Previously you'd tune it to something
> like free_server_memory - (max_connections * work_mem *
> 80%_most_complex_query). Which you can't really do anymore now, you'd
> also need to multiply by max_parallel_workers_per_gather. Which means
> that you might end up "forcing" paralellism on a bunch of plans that'd
> normally execute in too short a time to make parallelism worth it.

I think you just need to use max_connections +
Min(max_parallel_workers, max_worker_processes) instead of
max_connections.  You can't use parallel query for every query at the
same time with reasonable settings...

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Nov 16, 2017 at 7:35 AM, Andres Freund <andres@anarazel.de> wrote:
> On 2017-11-15 08:37:11 -0500, Robert Haas wrote:
>> I mean, the very first version of this patch that Thomas submitted was
>> benchmarked by Rafia and had phenomenally good performance
>> characteristics.  That turned out to be because it wasn't respecting
>> work_mem; you can often do a lot better with more memory, and
>> generally you can't do nearly as well with less.  To make comparisons
>> meaningful, they have to be comparisons between algorithms that use
>> the same amount of memory.  And it's not just about testing.  If we
>> add an algorithm that will run twice as fast with equal memory but
>> only allow it half as much, it will probably never get picked and the
>> whole patch is a waste of time.
>
> But this does bug me, and I think it's what made me pause here to make a
> bad joke.  The way that parallelism treats work_mem makes it even more
> useless of a config knob than it was before.  Parallelism, especially
> after this patch, shouldn't compete / be benchmarked against a
> single-process run with the same work_mem. To make it "fair" you need to
> compare parallelism against a single threaded run with work_mem *
> max_parallelism.
>
> Thomas argues that this makes hashjoins be treated faily vis-a-vi
> parallel-oblivious hash join etc. And I think he has somewhat of a
> point. But I don't think it's quite right either: In several of these
> cases the planner will not prefer the multi-process plan because it uses
> more work_mem, it's a cost to be paid. Whereas this'll optimize towards
> using work_mem * max_parallel_workers_per_gather amount of memory.
>
> This makes it pretty much impossible to afterwards tune work_mem on a
> server in a reasonable manner. Previously you'd tune it to something
> like free_server_memory - (max_connections * work_mem *
> 80%_most_complex_query). Which you can't really do anymore now, you'd
> also need to multiply by max_parallel_workers_per_gather. Which means
> that you might end up "forcing" paralellism on a bunch of plans that'd
> normally execute in too short a time to make parallelism worth it.

Currently our way of choosing the number of workers is 'rule based':
we use a simple formula that takes relation sizes and some GUCs and
per-relation options as inputs.  The comparison against non-parallel
plans is cost based of course, but we won't consider any other number
of workers.

Suppose we had 'cost based' worker number selection instead:  simply
try planning with various different worker counts and pick the winner.
Then I think we'd see the moral hazard in this scheme more clearly:
the planner effectively has a free memory printing press.  It will
think that it's a good idea to use a huge number of workers to get
more and more work_mem-sized hash tables or in-memory sorts into
memory (whether that's with partition-wise join, Parallel Hash, or
something else).

We could switch to a model where work_mem is divided by the number of
workers.  Parallel Hash would be able to use a full work_mem by
combining them, and parallel-oblivious Hash would be able to use only
work_mem / participants.  That'd be the other way to give Parallel
Hash a fair amount of memory compared to the competition, but I didn't
propose that because it'd be a change to the already-released
behaviour.  Then I'd have been saying "hey, look at this new plan I
wrote, it performs really well if you first tie the other plans' shoe
laces together".  It may actually be better though, even without
Parallel Hash in the picture.

-- 
Thomas Munro
http://www.enterprisedb.com


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Nov 16, 2017 at 7:57 AM, Peter Geoghegan <pg@bowt.ie> wrote:
> The contrast with the situation with Thomas and his hash join patch is
> interesting. Hash join is *much* more sensitive to the availability of
> memory than a sort operation is.
>
>> I don't really have a good answer to "but what should we otherwise do",
>> but I'm doubtful this is quite the right answer.
>
> I think that the work_mem model should be replaced by something that
> centrally budgets memory. It would make sense to be less generous with
> sorts and more generous with hash joins when memory is in short
> supply, for example, and a model like this can make that possible. The
> work_mem model has always forced users to be far too conservative.
> Workloads are very complicated, and always having users target the
> worst case leaves a lot to be desired.

In the old days, Oracle had only simple per-operation memory limits
too, and that applied to every operation in every thread just like our
work_mem.  It's interesting that they had separate knobs for sort and
hash though, and defaulted to giving hash twice as much.

With a whole-plan memory target, our planner would probably begin to
plan join order differently to minimise the number of hash tables in
memory at once, like other RDBMSs.  Not sure how the plan-wide target
should work though -- try many plans, giving different portions of
budget to different subplans?  That should work fine if you like
O(please-melt-my-computer), especially if combined with a similar
approach to choosing worker numbers.  Some kind of feedback system?
Seems like a different kind of planner, but I have no clue.  If you
have ideas/papers/references, it'd be great to see a new thread on
that subject.

-- 
Thomas Munro
http://www.enterprisedb.com


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Nov 16, 2017 at 8:09 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> On Wed, Nov 15, 2017 at 1:35 PM, Andres Freund <andres@anarazel.de> wrote:
>> But this does bug me, and I think it's what made me pause here to make a
>> bad joke.  The way that parallelism treats work_mem makes it even more
>> useless of a config knob than it was before.  Parallelism, especially
>> after this patch, shouldn't compete / be benchmarked against a
>> single-process run with the same work_mem. To make it "fair" you need to
>> compare parallelism against a single threaded run with work_mem *
>> max_parallelism.
>
> I don't really know how to do a fair comparison between a parallel
> plan and a non-parallel plan.  Even if the parallel plan contains zero
> nodes that use work_mem, it might still use more memory than the
> non-parallel plan, because a new backend uses a bunch of memory.  If
> you really want a comparison that is fair on the basis of memory
> usage, you have to take that into account somehow.
>
> But even then, the parallel plan is also almost certainly consuming
> more CPU cycles to produce the same results.  Parallelism is all about
> trading away efficiency for execution time.  Not just because of
> current planner and executor limitations, but intrinsically, parallel
> plans are less efficient.  The globally optimal solution on a system
> that is short on either memory or CPU cycles is to turn parallelism
> off.

The guys who worked on the first attempt at Parallel Query for
Berkeley POSTGRES (and then ripped that out, moving to another project
called XPRS which I have found no trace of, perhaps it finished up in
some commercial RDBMS) wrote this[1]:

"The objective function that XPRS uses for query optimization is a
combination of resource consumption and response time as follows:
 cost = resource consumption + w * response time

Here w is a system-specifc weighting factor. A small w mostly
optimizes resource consumption, while a large w mostly optimizes
response time. Resource consumption is measured by the number of disk
pages accessed and number of tuples processed, while response time is
the elapsed time for executing the query."

http://db.cs.berkeley.edu/papers/ERL-M93-28.pdf

-- 
Thomas Munro
http://www.enterprisedb.com


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

On 2017-11-15 14:09:13 -0500, Robert Haas wrote:
> On Wed, Nov 15, 2017 at 1:35 PM, Andres Freund <andres@anarazel.de> wrote:
> > But this does bug me, and I think it's what made me pause here to make a
> > bad joke.  The way that parallelism treats work_mem makes it even more
> > useless of a config knob than it was before.  Parallelism, especially
> > after this patch, shouldn't compete / be benchmarked against a
> > single-process run with the same work_mem. To make it "fair" you need to
> > compare parallelism against a single threaded run with work_mem *
> > max_parallelism.
> 
> I don't really know how to do a fair comparison between a parallel
> plan and a non-parallel plan.  Even if the parallel plan contains zero
> nodes that use work_mem, it might still use more memory than the
> non-parallel plan, because a new backend uses a bunch of memory.  If
> you really want a comparison that is fair on the basis of memory
> usage, you have to take that into account somehow.

That's not quite what I'm concerned about.  Consider something
(completely artifical) like:

tpch_5[18786][1]=# SET work_mem = '50MB';
tpch_5[18786][1]=# EXPLAIN SELECT c_name, count(*) FROM orders JOIN customer ON (o_custkey = c_custkey) WHERE
o_orderdateBETWEEN '1995-01-01' AND '1995-01-05' GROUP BY 1 ORDER BY count(*) DESC LIMIT 10;
 

┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                        QUERY PLAN
   │
 

├───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Limit  (cost=77344.16..77344.19 rows=10 width=27)
   │
 
│   ->  Sort  (cost=77344.16..77379.68 rows=14206 width=27)
   │
 
│         Sort Key: (count(*)) DESC
   │
 
│         ->  HashAggregate  (cost=76895.12..77037.18 rows=14206 width=27)
   │
 
│               Group Key: customer.c_name
   │
 
│               ->  Hash Join  (cost=35347.04..76824.09 rows=14206 width=19)
   │
 
│                     Hash Cond: (orders.o_custkey = customer.c_custkey)
   │
 
│                     ->  Bitmap Heap Scan on orders  (cost=302.04..41599.74 rows=14206 width=4)
   │
 
│                           Recheck Cond: ((o_orderdate >= '1995-01-01'::date) AND (o_orderdate <= '1995-01-05'::date))
   │
 
│                           ->  Bitmap Index Scan on i_o_orderdate  (cost=0.00..298.49 rows=14206 width=0)
   │
 
│                                 Index Cond: ((o_orderdate >= '1995-01-01'::date) AND (o_orderdate <=
'1995-01-05'::date))│
 
│                     ->  Hash  (cost=25670.00..25670.00 rows=750000 width=23)
   │
 
│                           ->  Seq Scan on customer  (cost=0.00..25670.00 rows=750000 width=23)
   │
 

└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
(13 rows)

tpch_5[18786][1]=# SET work_mem = '10MB';
tpch_5[18786][1]=# EXPLAIN SELECT c_name, count(*) FROM orders JOIN customer ON (o_custkey = c_custkey) WHERE
o_orderdateBETWEEN '1995-01-01' AND '1995-01-05' GROUP BY 1 ORDER BY count(*) DESC LIMIT 10;
 

┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                           QUERY PLAN
         │
 

├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Limit  (cost=82847.92..82847.94 rows=10 width=27)
         │
 
│   ->  Sort  (cost=82847.92..82883.43 rows=14206 width=27)
         │
 
│         Sort Key: (count(*)) DESC
         │
 
│         ->  HashAggregate  (cost=82398.87..82540.93 rows=14206 width=27)
         │
 
│               Group Key: customer.c_name
         │
 
│               ->  Merge Join  (cost=42580.44..82327.84 rows=14206 width=19)
         │
 
│                     Merge Cond: (customer.c_custkey = orders.o_custkey)
         │
 
│                     ->  Index Scan using i_c_custkey on customer  (cost=0.42..37663.43 rows=750000 width=23)
         │
 
│                     ->  Sort  (cost=42579.54..42615.05 rows=14206 width=4)
         │
 
│                           Sort Key: orders.o_custkey
         │
 
│                           ->  Bitmap Heap Scan on orders  (cost=302.04..41599.74 rows=14206 width=4)
         │
 
│                                 Recheck Cond: ((o_orderdate >= '1995-01-01'::date) AND (o_orderdate <=
'1995-01-05'::date))    │
 
│                                 ->  Bitmap Index Scan on i_o_orderdate  (cost=0.00..298.49 rows=14206 width=0)
         │
 
│                                       Index Cond: ((o_orderdate >= '1995-01-01'::date) AND (o_orderdate <=
'1995-01-05'::date))│
 

└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
(14 rows)

Note how the plan switched from a hashjoin to a mergejoin solely on the
basis of different work_mem settings, and that there's obviously
different costs associated between the two plans.

What I'm basically worried about is that the *only* reason for some
plans to choose to use parallelism is that essentially the effective
amount of work_mem between the plans is that the parallel one uses
(max_parallel_workers_per_gather + 1) * work_mem. Which might push
queries to use parallelism even if it's not actually beneficial in
reducing runtime.


Thomas' earlier comparison of this behaviour with e.g. parallel
oblivious hash nodes does *NOT* seem apt to me. There's currently
effectively no cost pressure for/against parallelism for those (even if
there potentially should). Which means they do not favor parallel
queries solely because they're allowed to use more memory, and thus it's
far less likely that every of those nodes uses the maximum alloted
work_mem.


I think it's wrong to just multiply the amount of work_mem that way, and
it'll bite use. Introducing a separate guc, perhaps inheriting from
work_mem if set to -1, that limits the amount of memory inside a
parallel node seems saner. That value then would not be multiplied with
the chosen worker number.


> But even then, the parallel plan is also almost certainly consuming
> more CPU cycles to produce the same results.  Parallelism is all about
> trading away efficiency for execution time.  Not just because of
> current planner and executor limitations, but intrinsically, parallel
> plans are less efficient.  The globally optimal solution on a system
> that is short on either memory or CPU cycles is to turn parallelism
> off.

I do think that our parallelism isn't properly tunable on that front.  I
think we really need something like a 'parallel_resource_efficiency'
[0..1] GUC.

As far as I understand it we currently cost a gather's startup cost
once, not per worker. That startup cost effectively include redundant
work like materializing a table, building parallel oblivious hashtables,
resorting the same table for a mergejoin etc...

That's fine if your goal is solely to return a single query as fast as
possible, even if doubling the resources will only give you a minimal
cost advantage.  If instead your goal is to optimize both for individual
query performance and overall system throughput that's obviously not
good.

I think we should cost the startup cost of paralell nodes more like
startup_cost * (max_parallel_workers_per_gather * parallel_resource_efficiency + 1)

which'd allow to tune query performance for using parallelism even for
tiny benefits (parallel_resource_efficiency = 0), and conversely tune it
so it only gets used if the overall loss of efficiency is small
(parallel_resource_efficiency = 0.9 or such).

(skipping over a lot of  details for such a proposal)

Now, currently that'd have the weakness that we sometimes would end up
not using parallelism, because at the determined amount of parallelism
it's not beneficial to use it, even though it'd be still worthwhile to
use a lower level of parallelism. But given we currently don't plan for
multiple degrees of parallelism that seems the right thing to do if you
care about efficiency / overall throughput.


Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Robert Haas
Date:
On Thu, Nov 16, 2017 at 6:42 PM, Andres Freund <andres@anarazel.de> wrote:
> What I'm basically worried about is that the *only* reason for some
> plans to choose to use parallelism is that essentially the effective
> amount of work_mem between the plans is that the parallel one uses
> (max_parallel_workers_per_gather + 1) * work_mem. Which might push
> queries to use parallelism even if it's not actually beneficial in
> reducing runtime.

I don't get it.  If we switch to using parallelism and the runtime
doesn't go down, that just means the costing is wrong.  The costing is
supposed to reflect the runtime.

It's true that adding parallel hash may tend to increase the amount of
memory that gets used in practice.  But it seems to me that any plan
that uses memory and is sometimes chosen over a non-memory-using plan
does that.

> Thomas' earlier comparison of this behaviour with e.g. parallel
> oblivious hash nodes does *NOT* seem apt to me. There's currently
> effectively no cost pressure for/against parallelism for those (even if
> there potentially should). Which means they do not favor parallel
> queries solely because they're allowed to use more memory, and thus it's
> far less likely that every of those nodes uses the maximum alloted
> work_mem.

I agree that there is no cost pressure for or against parallelism.
The design which I have been pursuing with parallel query up to this
point is that cost represents execution time, so minimizing cost means
minimizing execution time, and that's the goal.  If we want, we can
put a tax on parallel query plans, so that they're only chosen when
they are *substantially* cheaper than non-parallel plans, or even that
a plan with a few workers is to be preferred over a plan with more
workers unless the gains are sufficiently substantial.  But I don't
think the right way to do that is to prevent a parallel hash from
using as much memory as a non-parallel hash in the same spot would
use.

Rather, what we could do is, for example, have a knob that multiplies
the cost of a partial path by a floating-point value when we insert a
Gather/Gather Merge node.  If you want to always pick the cheapest
path regardless of whether it uses parallelism, set the GUC to 1.0.
If you only want to pick a parallel query path if it's twice as cheap
as the best non-parallel path, set the GUC to 2.0.

> I think it's wrong to just multiply the amount of work_mem that way, and
> it'll bite use. Introducing a separate guc, perhaps inheriting from
> work_mem if set to -1, that limits the amount of memory inside a
> parallel node seems saner. That value then would not be multiplied with
> the chosen worker number.

I don't mind having an option to override the amount of memory that
parallel hash is allowed to used, but I'm also not yet convinced that
we have a real problem that needs solving.

> As far as I understand it we currently cost a gather's startup cost
> once, not per worker.

Yes - because cost is supposed to measure execution time, and the
workers start all at once, not sequentially.  The startup time for the
workers as a whole doesn't get much longer as the number of workers
rises.  It might be that the formula should be 1000 + 50/worker or
something rather than a constant, but I doubt it matters very much.

> That startup cost effectively include redundant
> work like materializing a table, building parallel oblivious hashtables,
> resorting the same table for a mergejoin etc...

Right, because that work all contributes to total execution time.
Actually, when the same worker is going to be redone in multiple
workers, we should really inflate the cost, because if two workers
each sort the same table, it takes each of them longer than it would
take a single worker to sort the table for itself.  That's one of the
biggest problems with parallel costing right now, from what I've seen:
we're too willing to do the same work over in each of 4-6
participants, not realizing that they're going to content for buffer
locks and I/O bandwidth.

> That's fine if your goal is solely to return a single query as fast as
> possible, even if doubling the resources will only give you a minimal
> cost advantage.  If instead your goal is to optimize both for individual
> query performance and overall system throughput that's obviously not
> good.

I agree.

> I think we should cost the startup cost of paralell nodes more like
> startup_cost * (max_parallel_workers_per_gather * parallel_resource_efficiency + 1)
>
> which'd allow to tune query performance for using parallelism even for
> tiny benefits (parallel_resource_efficiency = 0), and conversely tune it
> so it only gets used if the overall loss of efficiency is small
> (parallel_resource_efficiency = 0.9 or such).

No, I think that's wrong.  I think you need to apply the adjustment at
the end of the (parallel) planning process, not incrementally to each
node.  Otherwise, the costs assigned to the individual nodes becomes
some unholy amalgam of execution time and total resource expenditure.
I think that will cause strange and stupid plan choices.  See also
notes above.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

On 2017-11-14 01:30:30 +1300, Thomas Munro wrote:
> New patch attached.

(I've commit some of the preliminary work)

Looking at 0005-Add-infrastructure-for-sharing-temporary-files-betwe.patch:
- The created path/filenames seem really redundant:
base/pgsql_tmp/pgsql_tmp11160.9.sharedfileset.d/pgsql_tmp.o3of8.p0.0
 Including pgsql_tmp no less than three times seems a bit absurd.
 I'm quite inclined to just remove all but the first.

- There seems to be a moment where could leak temporary file directories:
File
SharedFileSetCreate(SharedFileSet *fileset, const char *name)
{char        path[MAXPGPATH];File        file;
SharedFilePath(path, fileset, name);file = PathNameCreateTemporaryFile(path, false);
/* If we failed, see if we need to create the directory on demand. */if (file <= 0){    char
tempdirpath[MAXPGPATH];   char        filesetpath[MAXPGPATH];    Oid            tablespace = ChooseTablespace(fileset,
name);
    TempTablespacePath(tempdirpath, tablespace);    SharedFileSetPath(filesetpath, fileset, tablespace);
PathNameCreateTemporaryDir(tempdirpath,filesetpath);    file = PathNameCreateTemporaryFile(path, true);}
 
return file;
}
 The resowner handling is done in PathNameCreateTemporaryFile(). But if we fail after creating the directory we'll not
havea resowner for that. That's probably not too bad.
 

- related to the last point, I'm kinda wondering why we need sub-fileset resowners? Given we're dealing with error
pathsin resowners I'm not quite seeing the point - we're not going to want to roll back sub-parts of of a fileset, no?
 

- If we want to keep these resowners, shouldn't we unregister them in PathNameDeleteTemporaryFile?

- PathNameCreateTemporaryFile() and OpenTemporaryFile() now overlap quite a bit. Can't we rejigger things to base the
secondon the first? At the very least the comments need to work out the difference more closely.
 

- It's not clear to me why it's correct to have the vfdP->fdstate & FD_TEMPORARY handling in FileClose() be independent
ofthe file being deleted. At the very least there needs to be a comment explaining why we chose that behaviour.
 

- I think we need to document somehwere that the temp_file_limit in a shared file set applies independently for each
participantthat's writing something.  We also should discuss whether that's actually sane behaviour.
 

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Peter Geoghegan
Date:
On Fri, Nov 17, 2017 at 1:55 PM, Andres Freund <andres@anarazel.de> wrote:
> Looking at 0005-Add-infrastructure-for-sharing-temporary-files-betwe.patch:
> - The created path/filenames seem really redundant:
>   base/pgsql_tmp/pgsql_tmp11160.9.sharedfileset.d/pgsql_tmp.o3of8.p0.0
>
>   Including pgsql_tmp no less than three times seems a bit absurd.
>
>   I'm quite inclined to just remove all but the first.

+1

> - It's not clear to me why it's correct to have the vfdP->fdstate & FD_TEMPORARY
>   handling in FileClose() be independent of the file being deleted. At
>   the very least there needs to be a comment explaining why we chose
>   that behaviour.

Isn't that just because only one backend is supposed to delete the
file, but they all must close it and do temp_file_limit accounting?
Sorry if I missed something (my explanation seems too obvious to be
correct).

> - I think we need to document somehwere that the temp_file_limit in a
>   shared file set applies independently for each participant that's
>   writing something.  We also should discuss whether that's actually
>   sane behaviour.

This is already the documented behavior of temp_file_limit, fwiw.

Another question for Thomas: Is it okay that routines like
BufFileOpenShared() introduce new palloc()s (not repalloc()s) to
buffile.c, given that struct BufFile already contains this?:

/** resowner is the ResourceOwner to use for underlying temp files.  (We* don't need to remember the memory context
we'reusing explicitly,* because after creation we only repalloc our arrays larger.)*/
 
ResourceOwner resowner;

Maybe we need to remember the original caller's memory context, too?
Either that, or the contract/comments for memory contexts need to be
revised.

-- 
Peter Geoghegan


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

On 2017-11-14 01:30:30 +1300, Thomas Munro wrote:
> New patch attached.

0002-Add-a-barrier-primitive-for-synchronizing-backends.patch
- Intro starts with:
+ *
+ * This implementation of barriers allows for static sets of participants
+ * known up front, or dynamic sets of participants which processes can join or that's a bit low-level, no? Should
explainwhat they're for, not everyone's going to be familiar with the concept.
 

- The comment for BarrierInit() reads a bit weird:
+ * Initialize this barrier, setting a static number of participants that we
+ * will wait for at each computation phase.  To use a dynamic number of
+ * participants, this number should be zero, and BarrierAttach and Set a static number! Except maybe not?

- I'm not entirely convinced that we want the barrier debug stuff merged, what are your feelings about that?  It's like
halfthe code, and adds some complexity to the non debug code... If we indeed want to keep it, it probably should be
documentedin config.sgml? And get an entry in pg_config_manual.h?
 

- Can we add assertions ensuring nobody attaches to a static barrier?

- If I understand correctly currently the first participant to arrive at a barrier is going to be selected, and the
lastwakes everyone up. Wouldn't it be better to do have the last arrived participant be selected? It already got a
schedulertimeslice, it's memory is most likely to be in cache etc? Given that in cases where selection plays a role
it'sgoing to be blocking everyone else, using the process most likely to finish first seems like a good idea.  I guess
theBarrierDetach() implementation would be slightly more complex?
 


Regards,

Andres


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
Hi Andres and Peter,

Here's a new patch set with responses to the last batch of review comments.

On Wed, Nov 15, 2017 at 10:24 AM, Andres Freund <andres@anarazel.de> wrote:
> Hm. The way you access this doesn't quite seem right:
> ...
> +      matches := regexp_matches(line, '  Batches: ([0-9]+)');
> ...
>
> Why not use format json and access the output that way? Then you can be
> sure you access the right part of the tree and such?

Okay, done.

>> 1.  It determines the amount of unfairness when we run out of data:
>> it's the maximum amount of extra data that the unlucky last worker can
>> finish up with after all the others have finished.  I think this
>> effect is reduced by higher level factors: when a reader runs out of
>> data in one backend's file, it'll start reading another backend's
>> file.  If it's hit the end of all backends' files and this is an outer
>> batch, Parallel Hash will just go and work on another batch
>> immediately.
>
> Consider e.g. what happens if there's the occasional 500MB datum, and
> the rest's very small...
>
>> Better ideas?
>
> Not really. I'm more than a bit suspicous of this solution, but I don't
> really have a great suggestion otherwise.  One way to combat extreme
> size skew would be to put very large datums into different files.

Right.  I considered opening a separate file for each chunk size (4
page, 8 page, 16 page, ...).  Then each file has uniform chunk size,
so you're not stuck with a large chunk size after one monster tuple
comes down the pipe.  I didn't propose that because it leads to even
more file descriptors being used.  I'd like to move towards fewer file
descriptors, because it's a cap on the number of partitions you can
reasonably use.  Perhaps in future we could develop some kind of
general purpose file space manager that would let us allocate extents
within a file, and then SharedTuplestore could allocate extents for
each chunk size.  Not today though.

> But I think we probably can go with your approach for now, ignoring my
> failure prone spidey senses ;)

Cool.

>> > This looks more like the job of an lwlock rather than a spinlock.
>>
>> ... assembler ...
>>
>> That should be OK, right?
>
> It's not too bad. Personally I'm of the opinion though that pretty much
> no new spinlocks should be added - their worst case performance
> characteristics are bad enough for that to be only worth the
> experimentation in case swhere each cycle really matters and where
> contention is unlikely.

Changed to LWLock.

>> > One day we're going to need a better approach to this. I have no idea
>> > how, but this per-node, and now per_node * max_parallelism, approach has
>> > only implementation simplicity as its benefit.
>>
>> I agree, and I am interested in that subject.  In the meantime, I
>> think it'd be pretty unfair if parallel-oblivious hash join and
>> sort-merge join and every other parallel plan get to use work_mem * p
>> (and in some cases waste it with duplicate data), but Parallel Hash
>> isn't allowed to do the same (and put it to good use).
>
> I'm not sure I care about fairness between pieces of code ;)

I'll leave that discussion to run in a separate subthread...

>> BTW this is not per-tuple code -- it runs once at the end of hashing.
>> Not sure what you're looking for here.
>
> It was more a general statement about all the branches in nodeHashjoin,
> than about these specific branches. Should've made that clearer. There's
> definitely branches in very common parts:
>
> ...
>
> I don't think you should do so now, but I think a reasonable approach
> here would be to move the HJ_BUILD_HASHTABLE code into a separate
> function (it really can't be hot). Then have specialized ExecHashJoin()
> versions for parallel/non-parallel and potentially for outer/inner/anti.

Okay, here's my proposal for how to get new branches out of the
per-tuple path.  I have separated ExecHashJoin() and
ExecParallelHashJoin() functions.  They call an inline function
ExecHashJoinImpl() with a constant parameter, so that I effectively
instantiate two variants with the unwanted branches removed by
constant folding.  Then the appropriate variant is installed as the
ExecProcNode function pointer.

Just "inline" wasn't enough though.  I defined
pg_attribute_always_inline to force that on GCC/Clang et al and MSVC.

I also created a separate function ExecParallelScanHashBucket().  I
guess I could have extended the above trick into ExecScanHashBucket()
and further too, but it's called from a different translation unit,
and I also don't want to get too carried away with this technique.  I
chose to supply different functions.

So -- that's introducing a couple of new techniques into the tree.
Treating ExecProcNode as a configuration point for a single node type,
and the function instantiation trick.  Thoughts?

>> > If we don't split this into two versions, we at least should store
>> > hashNode->parallel_state in a local var, so the compiler doesn't have to
>> > pull that out of memory after every external function call (of which
>> > there are a lot). In common cases it'll end up in a callee saved
>> > registers, and most of the called functions won't be too register
>> > starved (on x86-64).
>>
>> Hmm.  Well I did that already in v24 -- in many places there is now a
>> local variable called pstate.
>
> See above piece of code, and a few others, in nodeHash.

I tackled some more of these.

>> > I think it'd be better if we structured the file so we just sat guc's
>> > with SET LOCAL inside a transaction.
>>
>> I wrapped the whole region of join.sql concerned with hash joins in a
>> transaction that rolls back, so I don't have to write LOCAL.  That's
>> just as good, right?
>
> Not really imo. Being able to read a test without going through all
> previous ones is a lot better.

Added savepoint/rollback to savepoint around each individual test.
You still need to do the setup at the top of the section (create
tables etc, set a couple of gucs).

On Sat, Nov 18, 2017 at 10:55 AM, Andres Freund <andres@anarazel.de> wrote:
> (I've commit some of the preliminary work)

Thanks!

> Looking at 0005-Add-infrastructure-for-sharing-temporary-files-betwe.patch:

[For things quoted and commented on by Peter, see further down]

> - There seems to be a moment where could leak temporary file
>   directories:
> [...]
>                 PathNameCreateTemporaryDir(tempdirpath, filesetpath);
>                 file = PathNameCreateTemporaryFile(path, true);
> [...]
>
>   The resowner handling is done in PathNameCreateTemporaryFile(). But if
>   we fail after creating the directory we'll not have a resowner for
>   that. That's probably not too bad.

It doesn't matter because the resowner handling in
PathNameCreateTemporaryFile() is only concerned with closing file
handles, not deleting stuff.  In this world cleanup is done by
SharedFileSet via DSM detach hooks, and that directory is already
covered.

> - related to the last point, I'm kinda wondering why we need sub-fileset
>   resowners? Given we're dealing with error paths in resowners I'm not
>   quite seeing the point - we're not going to want to roll back
>   sub-parts of of a fileset, no?

It's just to make sure the individual file handles don't leak.  Even
though this system takes care of deleting all the files and
directories, it's still a programming error to forget to close the
BufFile objects, and we want the resowner machinery to complain about
leaked File objects if you forget (unless error path, in which case it
just silently closes them).  That's not a change.

> - If we want to keep these resowners, shouldn't we unregister them in
>   PathNameDeleteTemporaryFile?

Unlinking the file is entirely separate from having it open.  See above.

> - PathNameCreateTemporaryFile() and OpenTemporaryFile() now overlap
>   quite a bit. Can't we rejigger things to base the second on the first?
>   At the very least the comments need to work out the difference more
>   closely.

Ok, I have refactored this.  There is now a static function
RegisterTemporaryFile() which contains the common part called by
OpenTemporaryFile(), PathNameCreateTemporaryFile() and
PathNameOpenTemporaryFile().  That ensures the file handle is closed
automatically via *two* mechanisms: resowner and AtEOXact_Files.

Those three functions put different flags into fdstate though:

OpenTemporaryFile(): FD_DELETE_AT_CLOSE | FD_TEMP_FILE_LIMIT
PathNameCreateTemporaryFile(): FD_TEMP_FILE_LIMIT
PathNameOpenTemporaryFile(): 0

Explanation:  The file handles get closed automatically in all three
cases (unless disabled with interXact).  Only the anonymous private
temporary files get deleted at close.  Named (shared) temporary files
are the caller's problem, and in this case SharedFileSet will unlink
them when it deletes the tree they live it.  Only files you created
count against your temporary file limit.  If you open a file someone
else created it we don't double-count it.

On Sat, Nov 18, 2017 at 11:22 AM, Peter Geoghegan <pg@bowt.ie> wrote:
> On Fri, Nov 17, 2017 at 1:55 PM, Andres Freund <andres@anarazel.de> wrote:
>> Looking at 0005-Add-infrastructure-for-sharing-temporary-files-betwe.patch:
>> - The created path/filenames seem really redundant:
>>   base/pgsql_tmp/pgsql_tmp11160.9.sharedfileset.d/pgsql_tmp.o3of8.p0.0
>>
>>   Including pgsql_tmp no less than three times seems a bit absurd.
>>
>>   I'm quite inclined to just remove all but the first.
>
> +1

Yeah.  That was because I wanted to apply the same function
recursively, and it only deletes things beginning with the prefix.  In
this version I pass down a flag to say that it should delete
everything after the first level.  That means that I removed the last
one from your example but kept the first two.  I had only added the
third.  The first two are prior art.

Now it looks like this:

  base/pgsql_tmp/pgsql_tmp84528.0.sharedfileset/o999of1024.p1.0

If you create stuff in there that doesn't start with pgsql_tmp then at
startup if refuses to delete it.  Someone might value that ancient
feature, so it should probably be discussed somewhere more visible
than this and handled in a different commit if you want to change
that, no?  Example:

2017-11-22 19:23:31.055 NZDT [84606] LOG:  unexpected file found in
temporary-files directory: "base/pgsql_tmp/random_name"

>> - It's not clear to me why it's correct to have the vfdP->fdstate & FD_TEMPORARY
>>   handling in FileClose() be independent of the file being deleted. At
>>   the very least there needs to be a comment explaining why we chose
>>   that behaviour.
>
> Isn't that just because only one backend is supposed to delete the
> file, but they all must close it and do temp_file_limit accounting?
> Sorry if I missed something (my explanation seems too obvious to be
> correct).

Stepping back: the goal of this exercise is to split up or replace
different behaviours associated with temporary files so they could be
enabled individually.  Deleting files: now done at DSM segment
destruction.  Closing file handles: done in every backend.  Tracking
temp file space usage: only done in backends that created a file
(private or shared), but not in backends that merely opened a file
created by someone else.  Does this make sense?

>> - I think we need to document somehwere that the temp_file_limit in a
>>   shared file set applies independently for each participant that's
>>   writing something.  We also should discuss whether that's actually
>>   sane behaviour.
>
> This is already the documented behavior of temp_file_limit, fwiw.

We could definitely come up with something better.  My proposal is
that the each backend gets to create temp_file_limit worth of
temporary stuff and hold a file descriptor open.  When the file
descriptor is closed, it stops counting against the creator's file
limit, but it still exists.  It exists until someone deletes it, which
happens on rescan or end of join in my current patch.  It needs to
keep existing so that other people can open it.  A better system would
be one that applies temp_file_limit to your *session*, that is, your
leader process + any workers it might create.  That would require a
lot more book-keeping and communication than I have: writing to a file
would have to consume from a shared-memory counter or something.  Do
you think we need to do that?

> Another question for Thomas: Is it okay that routines like
> BufFileOpenShared() introduce new palloc()s (not repalloc()s) to
> buffile.c, given that struct BufFile already contains this?:
>
> /*
>  * resowner is the ResourceOwner to use for underlying temp files.  (We
>  * don't need to remember the memory context we're using explicitly,
>  * because after creation we only repalloc our arrays larger.)
>  */
> ResourceOwner resowner;
>
> Maybe we need to remember the original caller's memory context, too?
> Either that, or the contract/comments for memory contexts need to be
> revised.

BufFileCreateShared (new), BufFileOpenShared (new), BufFileCreateTemp
(old) are all constructors of BufFile objects.  They all use palloc
and thus capture the caller's CurrentMemoryContext implicitly.  I
continue that tradition in the new functions.  The new kinds of
BufFile don't ever allocate any more memory, but if they ever did
they'd need to either use repalloc or start capturing the memory
context during construction explicitly.

On Sat, Nov 18, 2017 at 12:20 PM, Andres Freund <andres@anarazel.de> wrote:
> 0002-Add-a-barrier-primitive-for-synchronizing-backends.patch
> - Intro starts with:
> + *
> + * This implementation of barriers allows for static sets of participants
> + * known up front, or dynamic sets of participants which processes can join or
>   that's a bit low-level, no? Should explain what they're for, not
>   everyone's going to be familiar with the concept.

I have added a one sentence description from the Wikipedia article
(with attribution).  We reference Wikipedia in a couple of other
places.

> - The comment for BarrierInit() reads a bit weird:
> + * Initialize this barrier, setting a static number of participants that we
> + * will wait for at each computation phase.  To use a dynamic number of
> + * participants, this number should be zero, and BarrierAttach and
>   Set a static number! Except maybe not?

Fixed.

> - I'm not entirely convinced that we want the barrier debug stuff
>   merged, what are your feelings about that?  It's like half the code,
>   and adds some complexity to the non debug code... If we indeed want to
>   keep it, it probably should be documented in config.sgml? And get an
>   entry in pg_config_manual.h?

I found it useful during development, but I've ripped it out of this version.

> - Can we add assertions ensuring nobody attaches to a static barrier?

Done.

> - If I understand correctly currently the first participant to arrive at
>   a barrier is going to be selected, and the last wakes everyone
>   up. Wouldn't it be better to do have the last arrived participant be
>   selected? It already got a scheduler timeslice, it's memory is most
>   likely to be in cache etc? Given that in cases where selection plays a
>   role it's going to be blocking everyone else, using the process most
>   likely to finish first seems like a good idea.  I guess the
>   BarrierDetach() implementation would be slightly more complex?

Good idea.  Now done that way.  BarrierDetach doesn't get more
complicated, but BarrierArriveAndWait() needs to be prepared to be
elected when it's woken up, just in case the barrier only advanced
because of someone detaching.  I changed to a model where instead of
setting and clearing a flag, I record the phase number when someone is
elected.  If that doesn't match the current phase then no one has been
elected yet.  Also, I standardised on 'elect' as the word for this.

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Nov 23, 2017 at 12:36 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> Here's a new patch set with responses to the last batch of review comments.

Rebased on top of the recent SGML->XML change. Also:

1.  The final patch in the v26 patchset extended EXPLAIN ANALYZE
output to show per-worker information.  I'm withdrawing that patch for
now.  If you want to see how many tuples each backend hashed you can
already do that with (ANALYZE, VERBOSE).  It's a pre-existing bug that
you don't get batch/bucket/size info when Hash Join doesn't run in the
leader, and it's a pre-existing bug that EXPLAIN doesn't show
information for the leader separately.  I decided that it's not this
patchset's job to fix that stuff, and it's not entirely clear what the
best approach is anyway.  Let's discuss the way that information is
captured and displayed separately from the Parallel Hash feature.

2.  I found a way to crash v26 by starting a worker very late.  Fixed.

Unfortunately I saw a one-off case of an assertion failure in
ExecParallelHashRepartitionRest()/sts_begin_parallel_scan() on Travis
CI that I can't explain.  I haven't been able to reproduce it there or
on any other machine since.  I am still looking into it.

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Mon, Nov 27, 2017 at 10:25 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Thu, Nov 23, 2017 at 12:36 AM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> Here's a new patch set with responses to the last batch of review comments.
>
> Rebased on top of the recent SGML->XML change.

Andres asked me off-list how I tested the barrier.c case where a
backend detaches, releasing other waiters.  There are special cases in
BarrierArriveAndWait() and BarrierDetach() for that to make sure that
the phase advances and waiters are released if they were only waiting
for this one backend to arrive, and that exactly one of them is
"elected" for any serial work.  Normally the last to arrive is elected
(see earlier discussion about why that's a good idea), but if the one
that would be last detaches instead of arriving then we'll have to
elect someone else.  Here is a throw-away test harness that can be
used to exercise that case.  CREATE EXTENSION test_barrier; SELECT
test_barrier_detach_releases(1);.  Adding a sleep before BarrierDetach
can be used to influence the race, and adding elog messages to
barrier.c can be used to see when the special path is taken.

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
On 2017-11-30 14:17:51 +1300, Thomas Munro wrote:
> On Mon, Nov 27, 2017 at 10:25 PM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
> > On Thu, Nov 23, 2017 at 12:36 AM, Thomas Munro
> > <thomas.munro@enterprisedb.com> wrote:
> >> Here's a new patch set with responses to the last batch of review comments.
> >
> > Rebased on top of the recent SGML->XML change.
>
> Andres asked me off-list how I tested the barrier.c case where a
> backend detaches, releasing other waiters.  There are special cases in
> BarrierArriveAndWait() and BarrierDetach() for that to make sure that
> the phase advances and waiters are released if they were only waiting
> for this one backend to arrive, and that exactly one of them is
> "elected" for any serial work.  Normally the last to arrive is elected
> (see earlier discussion about why that's a good idea), but if the one
> that would be last detaches instead of arriving then we'll have to
> elect someone else.  Here is a throw-away test harness that can be
> used to exercise that case.  CREATE EXTENSION test_barrier; SELECT
> test_barrier_detach_releases(1);.  Adding a sleep before BarrierDetach
> can be used to influence the race, and adding elog messages to
> barrier.c can be used to see when the special path is taken.

Played some with this, and confirmed that that case
works. Pushed. Interestingly my first few experiments with delaying the
master using pg_usleep() were *not* successful because the leader gets
signalled by the workers successfully having started, interrupting the
sleep...

I've not made any meaningful changes, there was a * missing in an empty
comment line, and I think I've added a single comma.

I do wonder if it's maybe worth adding a single graf about error
handling to the intro? It may not be obvious that this requires
installing a hook calling detach.

- Andres


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

On 2017-11-27 22:25:12 +1300, Thomas Munro wrote:
Pushed 0003-Add-infrastructure-for-sharing-temporary-files-betwe.patch
after minor adjustments (some including conversing with you).

Changes:
- Changed an impossible elog() into an Assert().
- changed SharedFileSet->counter, and the backend static variable, to
  uint32. Not impossible that a 32bit int overflows over the course of a
  few weeks, and we a) imo shouldn't unnecessarily rely on signed
  overflow being defined b) a negative number would look weird, even if
  well defined (-fwrapv et al).
- Added a small comment about arbitrary-ness of the 8 in Oid
  tablespaces[8].
- pgindent'ed

Questions:
- Right now RemovePgTempFilesInDir() will recurse into appropriately
  named directories, and when it recurses it doesn't require the same
  name pattern checks. I think that's good, but I think it'd be prudent
  to be a bit more paranoid and prevent recursing into symlinked
  subdirectories.
- As we don't clean temp files after crash-restarts it isn't impossible
  to have a bunch of crash-restarts and end up with pids *and* per-pid
  shared file set counters reused. Which'd lead to conflicts. Do we care?

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
On 2017-11-27 22:25:12 +1300, Thomas Munro wrote:
> On Thu, Nov 23, 2017 at 12:36 AM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
> > Here's a new patch set with responses to the last batch of review comments.

Looking at 0004-Add-shared-tuplestores.patch

Comments:
- I'd rename mutex to lock. Seems quite possible that we end up with shared
  lockers too.
- Expand "Simple mechanism for sharing tuples between backends." intro
  comment - that doesn't really seem like a meaningful description of
  the mechanism. Should probably mention that it's similar to
  tuplestores etc...
- I'm still concerned about the chunking mechanism. How about this
  sketch of an alternative solution?

  Chunks are always the same length. To avoid having to read the length
  from disk while holding a lock, introduce continuation chunks which
  store the amount of space needed by the overlarge tuple at the
  start. The reading process stays largely the same, except that if a
  backend reads a chunk that's a continuation, it reads the length of
  the continuation and skips ahead. That could mean that more than one
  backend read continuation chunks, but the window is small and there's
  normally not goign to be that many huge tuples (otherwise things are
  slow anyway).
- why are we using a separate hardcoded 32 for sts names? Why not just
  go for NAMEDATALEN or such?
- I'd replace most of the "should's" in comments with "need".

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Sat, Dec 2, 2017 at 1:55 PM, Andres Freund <andres@anarazel.de> wrote:
> Pushed 0003-Add-infrastructure-for-sharing-temporary-files-betwe.patch
> after minor adjustments (some including conversing with you).

Thank you!

> Questions:
> - Right now RemovePgTempFilesInDir() will recurse into appropriately
>   named directories, and when it recurses it doesn't require the same
>   name pattern checks. I think that's good, but I think it'd be prudent
>   to be a bit more paranoid and prevent recursing into symlinked
>   subdirectories.

That's why it uses lstat(), so that it sees symlinks rather than what
they point to. It only recurses if S_ISDIR(), and it unlinks anything
else.  That means that it unlinks any symlinks rather than (say)
following them and deleting stuff outside the temporary directory
tree.  Example:

$ mkdir /tmp/foo
$ touch /tmp/foo/canary
$ mkdir -p $PGDATA/base/pgsql_tmp/pgsql_tmpXXX/bar
$ ln -s /tmp/foo $PGDATA/base/pgsql_tmp/pgsql_tmpXXX/foo
$ ls $PGDATA/base/pgsql_tmp/pgsql_tmpXXX
bar foo
$ postgres/bin/postgres -D $PGDATA
... ^C ...
$ ls $PGDATA/base/pgsql_tmp
$ ls /tmp/foo
canary

Make sense?

> - As we don't clean temp files after crash-restarts it isn't impossible
>   to have a bunch of crash-restarts and end up with pids *and* per-pid
>   shared file set counters reused. Which'd lead to conflicts. Do we care?

We don't care.  PathNameCreateTemporaryDir() on a path that already
exists will return silently.  PathNameCreateTemporaryFile() on a path
that already exists, as mentioned in a comment and following an
existing convention, will open and truncate it.  So either it was
really an orphan and that is a continuation of our traditional
recycling behaviour, or the calling code has a bug and used the same
path twice and it's not going to end well.

Another type of collision we could have in theory is like this:  One
backend creates a SharedFileSet, and various other backends attach to
it.  The creator detaches and exits.  Later another process is created
with the same PID, and creates a new SharedFileSet, and happens to
have the same counter number, but the original SharedFileSet is still
in existence because there are still running backends attached to it.
Then things get ugly.  But this seems improbable, at least as long as
the usage pattern is that leader processes are the only ones creating
SharedFileSet objects and outlive their parallel workers in non-crash
outcomes, but if you think that's too flimsy I suppose it could be
fixed by replacing the per-backend counter variable with a
cluster-wide atomic counter.  Then the PID component would be
redundant but I'd suggest keeping it anyway for its diagnostic value.
Thoughts?

Just a reminder: a couple of problems have come up recently in the
Parallel Hash Join patch itself, so please don't consider that one
ready for commit quite yet.  They are: (1) Handling the case where
there is no DSA area because we're running a parallel-aware plan in
non-parallel mode due to lack of resources; (2) Investigating a rare
assertion failure.  For (1), that may depend on another patch that
I'll post shortly to kill "es_query_dsa" and, come to think of it, for
(2) it's possible that the problem is in either one of the remaining
patches -- SharedTuplestore or Parallel Hash Join -- so please hold
off on committing either of those until I've got to the bottom of
that.

-- 
Thomas Munro
http://www.enterprisedb.com


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Sat, Dec 2, 2017 at 3:54 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Sat, Dec 2, 2017 at 1:55 PM, Andres Freund <andres@anarazel.de> wrote:
>> - As we don't clean temp files after crash-restarts it isn't impossible
>>   to have a bunch of crash-restarts and end up with pids *and* per-pid
>>   shared file set counters reused. Which'd lead to conflicts. Do we care?
>
> We don't care.  PathNameCreateTemporaryDir() on a path that already
> exists will return silently.  PathNameCreateTemporaryFile() on a path
> that already exists, as mentioned in a comment and following an
> existing convention, will open and truncate it.  So either it was
> really an orphan and that is a continuation of our traditional
> recycling behaviour, or the calling code has a bug and used the same
> path twice and it's not going to end well.

On further reflection, there are problems with that higher up.  (1)
Even though PathNameCreateTemporaryFile() will happily truncate and
reuse an orphaned file when BufFileCreateShared() calls it,
BufFileOpenShared() could get confused by the orphaned files.  It
could believe that XXX.1 is a continuation of XXX.0, when in fact it
is junk left over from a crash restart.  Perhaps BufFileCreateShared()
needs to delete XXX.{N+1} if it exists, whenever it creates XXX.{N}.
That will create a gap in the series of existing files that will cause
BufFileOpenShared()'s search to terminate.  (2) BufFileOpenShared()
thinks that the absence of an XXX.0 file means there is no BufFile by
this name, when it could mistakenly open pre-crash junk due to a
colliding name.  I use that condition as information, but I think I
can fix that easily by using SharedTuplestoreParticipant::npage == 0
to detect that there should be no file, rather than trying to open it,
and then I can define this problem away by declaring that
BufFileOpenShared() on a name that you didn't call
BufFileCreateShared() on invokes undefined behaviour.  I will make it
so.

-- 
Thomas Munro
http://www.enterprisedb.com


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
On 2017-12-02 15:54:29 +1300, Thomas Munro wrote:
> On Sat, Dec 2, 2017 at 1:55 PM, Andres Freund <andres@anarazel.de> wrote:
> > - Right now RemovePgTempFilesInDir() will recurse into appropriately
> >   named directories, and when it recurses it doesn't require the same
> >   name pattern checks. I think that's good, but I think it'd be prudent
> >   to be a bit more paranoid and prevent recursing into symlinked
> >   subdirectories.
> 
> That's why it uses lstat(), so that it sees symlinks rather than what
> they point to. It only recurses if S_ISDIR(), and it unlinks anything
> else.

Right. I'd somehow confused myself by thinking one'd need an explicit
S_ISLINK check...


> Just a reminder: a couple of problems have come up recently in the
> Parallel Hash Join patch itself, so please don't consider that one
> ready for commit quite yet.  They are: (1) Handling the case where
> there is no DSA area because we're running a parallel-aware plan in
> non-parallel mode due to lack of resources; (2) Investigating a rare
> assertion failure.  For (1), that may depend on another patch that
> I'll post shortly to kill "es_query_dsa" and, come to think of it, for
> (2) it's possible that the problem is in either one of the remaining
> patches -- SharedTuplestore or Parallel Hash Join -- so please hold
> off on committing either of those until I've got to the bottom of
> that.

I'm a bit tempted to press ahead regardless of these issues. With your
consent obviously. ISTM we're pretty close to the point where this needs
to be exposed more widely and that'll surely bring more issues to light.

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Sat, Dec 2, 2017 at 4:46 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Sat, Dec 2, 2017 at 3:54 PM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> On Sat, Dec 2, 2017 at 1:55 PM, Andres Freund <andres@anarazel.de> wrote:
>>> - As we don't clean temp files after crash-restarts it isn't impossible
>>>   to have a bunch of crash-restarts and end up with pids *and* per-pid
>>>   shared file set counters reused. Which'd lead to conflicts. Do we care?
>>
>> We don't care.  PathNameCreateTemporaryDir() on a path that already
>> exists will return silently.  PathNameCreateTemporaryFile() on a path
>> that already exists, as mentioned in a comment and following an
>> existing convention, will open and truncate it.  So either it was
>> really an orphan and that is a continuation of our traditional
>> recycling behaviour, or the calling code has a bug and used the same
>> path twice and it's not going to end well.
>
> On further reflection, there are problems with that higher up.  (1)
> Even though PathNameCreateTemporaryFile() will happily truncate and
> reuse an orphaned file when BufFileCreateShared() calls it,
> BufFileOpenShared() could get confused by the orphaned files.  It
> could believe that XXX.1 is a continuation of XXX.0, when in fact it
> is junk left over from a crash restart.  Perhaps BufFileCreateShared()
> needs to delete XXX.{N+1} if it exists, whenever it creates XXX.{N}.
> That will create a gap in the series of existing files that will cause
> BufFileOpenShared()'s search to terminate.  (2) BufFileOpenShared()
> thinks that the absence of an XXX.0 file means there is no BufFile by
> this name, when it could mistakenly open pre-crash junk due to a
> colliding name.  I use that condition as information, but I think I
> can fix that easily by using SharedTuplestoreParticipant::npage == 0
> to detect that there should be no file, rather than trying to open it,
> and then I can define this problem away by declaring that
> BufFileOpenShared() on a name that you didn't call
> BufFileCreateShared() on invokes undefined behaviour.  I will make it
> so.

Here is a patch to deal with that problem.  Thoughts?

I suppose if we wanted to make this type of problem go away, but still
keep files for forensic purposes, we could introduce a "restart
number", and stick it into the top level temporary directory's name.
That way you'd never be able to collide with files created before a
crash-restart, and we could add O_EXCL so it'd become an error to try
to create the same filename again.

I'll post a new SharedTuplestore and Parallel Hash patch set shortly.

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Fri, Dec 8, 2017 at 12:07 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> I suppose if we wanted to make this type of problem go away, but still
> keep files for forensic purposes, we could introduce a "restart
> number", and stick it into the top level temporary directory's name.
> That way you'd never be able to collide with files created before a
> crash-restart, and we could add O_EXCL so it'd become an error to try
> to create the same filename again.

Or we could teach crash-restart to move the top level directory (in
each tablespace) to pgsql_tmp.old, so we'd keep the temporary files
from one previous lifetime only.  That'd prevent unlimited space
eating in multiple crash scenarios, and we could more comfortably say
that it's entirely safe to delete that directory manually in cases
like this:

https://www.postgresql.org/message-id/flat/4f3c89a224ff4660baa62a2b79fb0f1d%40ITUPW-EXMBOX3B.UniNet.unisa.edu.au

-- 
Thomas Munro
http://www.enterprisedb.com


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
On 2017-12-08 12:07:04 +1300, Thomas Munro wrote:
> I suppose if we wanted to make this type of problem go away, but still
> keep files for forensic purposes, we could introduce a "restart
> number", and stick it into the top level temporary directory's name.
> That way you'd never be able to collide with files created before a
> crash-restart, and we could add O_EXCL so it'd become an error to try
> to create the same filename again.

I'm deeply unconvinced by the "forensic" argument to not do temp file
cleanup after crash restarts. That causes problems like the one we're
debating upthread in the first place, so I'm wholly unconvinced we
should add to that further by adding another layer of complexity.

My personal opinion is that we should just do temp file cleanup after
crash restarts, and document restart_after_crash = false as the solution
for investigating crashes.  I don't want to hold up this patch with a
discussion of that however, so I'm ok with your fix.

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Sat, Dec 2, 2017 at 3:46 PM, Andres Freund <andres@anarazel.de> wrote:
> Looking at 0004-Add-shared-tuplestores.patch
>
> Comments:
> - I'd rename mutex to lock. Seems quite possible that we end up with shared
>   lockers too.

Done.

> - Expand "Simple mechanism for sharing tuples between backends." intro
>   comment - that doesn't really seem like a meaningful description of
>   the mechanism. Should probably mention that it's similar to
>   tuplestores etc...

Done.

> - I'm still concerned about the chunking mechanism. How about this
>   sketch of an alternative solution?
>
>   Chunks are always the same length. To avoid having to read the length
>   from disk while holding a lock, introduce continuation chunks which
>   store the amount of space needed by the overlarge tuple at the
>   start. The reading process stays largely the same, except that if a
>   backend reads a chunk that's a continuation, it reads the length of
>   the continuation and skips ahead. That could mean that more than one
>   backend read continuation chunks, but the window is small and there's
>   normally not goign to be that many huge tuples (otherwise things are
>   slow anyway).

Done.

I've also included a simple test harness that can be used to drive
SharedTuplestore independently of Parallel Hash, but that patch is not
for commit.  See example of usage in the commit message.
(Incidentally I noticed that ParallelWorkerNumber is not marked
PGDLLIMPORT so that fails to build on Windows CI.)

> - why are we using a separate hardcoded 32 for sts names? Why not just
>   go for NAMEDATALEN or such?

Done.

> - I'd replace most of the "should's" in comments with "need".

Done.

Another problem I discovered is that v27's way of installing a
different function into ExecProcNode in ExecInitHashJoin() was broken:
it didn't allow for the possibility that there is no DSA area
available due to lack of resources.  ExecInitNode() is too soon to
decide.  My solution is to provide a way for executor nodes to change
their ExecProcNode functions at any later time, which requires a way
for execProcnode.c to redo any wrapper functions.

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

Looking at the latest version of the tuplestore patch:


diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
new file mode 100644
index 00000000000..d1233221a58
--- /dev/null
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -0,0 +1,583 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedtuplestore.c
+ *      Simple mechanism for sharing tuples between backends.
+ *
+ * This module provides a shared temporary tuple storage mechanism, providing
+ * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
+ * can write to a SharedTuplestore, and then multiple backends can later scan
+ * the stored tuples.  Currently, the only scan type supported is a parallel
+ * scan where each backend reads an arbitrary subset of the tuples that were
+ * written.

Cool.


+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+    int            ntuples;        /* Number of tuples in this chunk. */
+    bool        overflow;        /* Continuation of previous chunk? */
+    char        data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;

Ah. I was thinking we could have the 'overflow' variable be an int,
indicating the remaining length of the oversized tuple. That'd allow us
to skip ahead to the end of the oversized tuple in concurrent processes
after hitting it.



+/*
+ * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+             MinimalTuple tuple)
+{
+    size_t        size;
+
+    /* Do we have our own file yet? */
+    if (accessor->write_file == NULL)
+    {
+        SharedTuplestoreParticipant *participant;
+        char        name[MAXPGPATH];
+
+        /* Create one.  Only this backend will write into it. */
+        sts_filename(name, accessor, accessor->participant);
+        accessor->write_file = BufFileCreateShared(accessor->fileset, name);
+
+        /* Set up the shared state for this backend's file. */
+        participant = &accessor->sts->participants[accessor->participant];
+        participant->writing = true;    /* for assertions only */
+    }
+
+    /* Do we have space? */
+    size = accessor->sts->meta_data_size + tuple->t_len;
+    if (accessor->write_pointer + size >= accessor->write_end)
+    {
+        if (accessor->write_chunk == NULL)
+        {
+            /* First time through.  Allocate chunk. */
+            accessor->write_chunk = (SharedTuplestoreChunk *)
+                MemoryContextAllocZero(accessor->context,
+                                       STS_CHUNK_PAGES * BLCKSZ);
+            accessor->write_chunk->ntuples = 0;
+            accessor->write_pointer = &accessor->write_chunk->data[0];
+            accessor->write_end = (char *)
+                accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
+        }
+        else
+        {
+            /* See if flushing helps. */
+            sts_flush_chunk(accessor);
+        }
+
+        /* It may still not be enough in the case of a gigantic tuple. */
+        if (accessor->write_pointer + size >= accessor->write_end)
+        {
+            size_t        written;
+
+            /*
+             * We'll write the beginning of the oversized tuple, and then
+             * write the rest in some number of 'overflow' chunks.
+             */
+            if (accessor->write_pointer + accessor->sts->meta_data_size >=
+                accessor->write_end)
+                elog(ERROR, "meta-data too long");

That seems more like an Assert than a proper elog? Given that we're
calculating size just a few lines above...


+            if (accessor->sts->meta_data_size > 0)
+                memcpy(accessor->write_pointer, meta_data,
+                       accessor->sts->meta_data_size);
+            written = accessor->write_end - accessor->write_pointer -
+                accessor->sts->meta_data_size;
+            memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
+                   tuple, written);

Also, shouldn't the same Assert() be here as well if you have it above?

+static MinimalTuple
+sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+    MinimalTuple tuple;
+    uint32        size;
+    size_t        remaining_size;
+    size_t        this_chunk_size;
+    char       *destination;
+
+    /*
+     * We'll keep track of bytes read from this chunk so that we can detect an
+     * overflowing tuples and switch to reading overflow pages.
+     */
+    if (accessor->sts->meta_data_size > 0)
+    {
+        if (BufFileRead(accessor->read_file,
+                        meta_data,
+                        accessor->sts->meta_data_size) !=
+            accessor->sts->meta_data_size)
+            ereport(ERROR,
+                    (errcode_for_file_access(),
+                     errmsg("could not read from shared tuplestore temporary file"),
+                     errdetail("Short read while reading meta-data")));

The errdetail doesn't follow the style guide (not a sentence ending with
.), and seems internal-ish. I'm ok with keeping it, but perhaps we
should change all these to be errdetail_internal()? Seems pointless to
translate all of them.

+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+    SharedTuplestoreParticipant *p;
+    BlockNumber    read_page;
+    bool        eof;
+
+    for (;;)
+    {
+        /* Can we read more tuples from the current chunk? */
+        if (accessor->read_ntuples < accessor->read_ntuples_available)
+            return sts_read_tuple(accessor, meta_data);
+
+        /* Find the location of a new chunk to read. */
+        p = &accessor->sts->participants[accessor->read_participant];
+
+        LWLockAcquire(&p->lock, LW_EXCLUSIVE);
+        eof = p->read_page >= p->npages;
+        if (!eof)
+        {
+            read_page = p->read_page;
+            p->read_page += STS_CHUNK_PAGES;
+        }
+        LWLockRelease(&p->lock);

So if we went to the world I'm suggesting, with overflow containing the
length till the end of the tuple, this'd probably would have to look a
bit different.


+        if (!eof)
+        {
+            SharedTuplestoreChunk chunk_header;
+
+            /* Make sure we have the file open. */
+            if (accessor->read_file == NULL)
+            {
+                char        name[MAXPGPATH];
+
+                sts_filename(name, accessor, accessor->read_participant);
+                accessor->read_file =
+                    BufFileOpenShared(accessor->fileset, name);
+                if (accessor->read_file == NULL)
+                    elog(ERROR, "could not open temporary file %s", name);

Isn't this more an Assert or just not anything? There's now way
BufFileOpenShared should ever return NULL, no?

+
+            /* Seek and load the chunk header. */
+            if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not read from shared tuplestore temporary file"),
+                         errdetail("Could not seek to next block")));
+            if (BufFileRead(accessor->read_file, &chunk_header,
+                            offsetof(SharedTuplestoreChunk, data)) !=
+                offsetof(SharedTuplestoreChunk, data))
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not read from shared tuplestore temporary file"),
+                         errdetail("Short read while reading chunk header")));
+
+            /* If this is an overflow chunk, we skip it. */
+            if (chunk_header.overflow)
+                continue;
+
+            accessor->read_ntuples = 0;
+            accessor->read_ntuples_available = chunk_header.ntuples;
+            accessor->read_bytes = offsetof(SharedTuplestoreChunk, data);

Perhaps somewhere around here comment that we'll just loop around and
call sts_read_tuple() in the next loop iteration?


Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Dec 14, 2017 at 11:45 AM, Andres Freund <andres@anarazel.de> wrote:
> +       bool            overflow;               /* Continuation of previous chunk? */
> +       char            data[FLEXIBLE_ARRAY_MEMBER];
> +} SharedTuplestoreChunk;
>
> Ah. I was thinking we could have the 'overflow' variable be an int,
> indicating the remaining length of the oversized tuple. That'd allow us
> to skip ahead to the end of the oversized tuple in concurrent processes
> after hitting it.

Right, that is a bit better as it avoids extra read-skip cycles for
multi-overflow-chunk cases.  Done that way.

> +                       if (accessor->write_pointer + accessor->sts->meta_data_size >=
> +                               accessor->write_end)
> +                               elog(ERROR, "meta-data too long");
>
> That seems more like an Assert than a proper elog? Given that we're
> calculating size just a few lines above...

It's an error because the logic is not smart enough to split the
optional meta-data and tuple size over multiple chunks.  I have added
comments there to explain.  That error can be reached by CALL
test_sharedtuplestore(1, 1, 1, 32756, 1), but 32755 is OK.  My goal
here is to support arbitrarily large tuples, not arbitrarily large
per-tuple meta-data, since for my use case I only need 4 bytes (a hash
value).  This could be improved if required by later features
(probably anyone wanting more general meta-data would want variable
sized meta-data anyway, whereas this is fixed, and it would also be
nice if oversized tuples didn't have to start at the beginning of a
new chunk).

I fixed two nearby fencepost bugs: I made the limit that triggers that
error smaller by size(uint32) and fixed a problem when small tuples
appear after an oversize tuple in a final overflow chunk (found by
hacking the test module to create mixtures of different sized tuples).

> +                       if (accessor->sts->meta_data_size > 0)
> +                               memcpy(accessor->write_pointer, meta_data,
> +                                          accessor->sts->meta_data_size);
> +                       written = accessor->write_end - accessor->write_pointer -
> +                               accessor->sts->meta_data_size;
> +                       memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
> +                                  tuple, written);
>
> Also, shouldn't the same Assert() be here as well if you have it above?

No, when it comes to the tuple we just write as much of it as will
fit, and write the rest in the loop below.  Comments improved to make
that clear.

> +                       ereport(ERROR,
> +                                       (errcode_for_file_access(),
> +                                        errmsg("could not read from shared tuplestore temporary file"),
> +                                        errdetail("Short read while reading meta-data")));
>
> The errdetail doesn't follow the style guide (not a sentence ending with
> .), and seems internal-ish. I'm ok with keeping it, but perhaps we
> should change all these to be errdetail_internal()? Seems pointless to
> translate all of them.

Done.

> +               LWLockAcquire(&p->lock, LW_EXCLUSIVE);
> +               eof = p->read_page >= p->npages;
> +               if (!eof)
> +               {
> +                       read_page = p->read_page;
> +                       p->read_page += STS_CHUNK_PAGES;
> +               }
> +               LWLockRelease(&p->lock);
>
> So if we went to the world I'm suggesting, with overflow containing the
> length till the end of the tuple, this'd probably would have to look a
> bit different.

Yeah.  I almost wanted to change it back to a spinlock but now it's
grown bigger again...

> +               if (!eof)
> +               {
> +                       SharedTuplestoreChunk chunk_header;
> +
> +                       /* Make sure we have the file open. */
> +                       if (accessor->read_file == NULL)
> +                       {
> +                               char            name[MAXPGPATH];
> +
> +                               sts_filename(name, accessor, accessor->read_participant);
> +                               accessor->read_file =
> +                                       BufFileOpenShared(accessor->fileset, name);
> +                               if (accessor->read_file == NULL)
> +                                       elog(ERROR, "could not open temporary file %s", name);
>
> Isn't this more an Assert or just not anything? There's now way
> BufFileOpenShared should ever return NULL, no?

Right.  As of commit 923e8dee this can no longer return NULL (instead
it would raise an error), so I removed this redundant check.

> +                       /* If this is an overflow chunk, we skip it. */
> +                       if (chunk_header.overflow)
> +                               continue;
> +
> +                       accessor->read_ntuples = 0;
> +                       accessor->read_ntuples_available = chunk_header.ntuples;
> +                       accessor->read_bytes = offsetof(SharedTuplestoreChunk, data);
>
> Perhaps somewhere around here comment that we'll just loop around and
> call sts_read_tuple() in the next loop iteration?

Done.

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

Looking at the main patch (v28).

First off: This looks pretty good, the code's quite readable now
(especially compared to earlier versions), the comments are good. Really
like the nodeHash split, and the always inline hackery in nodeHashjoin.
Think we're getting really really close.

  *     ExecHashJoinImpl
  *
- *     This function implements the Hybrid Hashjoin algorithm.
+ *     This function implements the Hybrid Hashjoin algorithm.  By forcing it
+ *     to be always inline many compilers are able to specialize it for
+ *     parallel = true/false without repeating the code.
  *

what about adding the above explanation for the always inline?


+        /*
+         * So far we have no idea whether there are any other participants,
+         * and if so, what phase they are working on.  The only thing we care
+         * about at this point is whether someone has already created the
+         * SharedHashJoinBatch objects, the main hash table for batch 0 and
+         * (if necessary) the skew hash table yet.  One backend will be
+         * elected to do that now if necessary.
+         */

The 'yet' sounds a bit weird in combination with the 'already'.


+ static void
+ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
+ ...
+         case PHJ_GROW_BATCHES_ELECTING:
+             /* Elect one participant to prepare the operation. */

That's a good chunk of code. I'm ever so slightly inclined to put that
into a separate function. But I'm not sure it'd look good. Feel entirely
free to disregard.


+ static HashJoinTuple
+ ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
+                           dsa_pointer *shared)

Not really happy with the name. ExecParallelHashTableInsert() calling
ExecParallelHashLoadTuple() to insert a tuple into the hashtable doesn't
quite strike me as right; the naming similarity to
ExecParallelHashTableLoad seems problematic too.
ExecParallelHashAllocTuple() or such?

One could argue it'd not be a bad idea to keep a similar split as
dense_alloc() and memcpy() have, but I'm not really convinced by
myself. Hm.


+        case PHJ_GROW_BATCHES_ELECTING:
+            /* Elect one participant to prepare the operation. */

Imo that comment could use a one-line summary of what preparing means.


+                    /*
+                     * We probably also need a smaller bucket array.  How many
+                     * tuples do we expect per batch, assuming we have only
+                     * half of them so far?

Makes sense, but did cost me a minute of thinking. Maybe add a short
explanation why.


+        case PHJ_GROW_BATCHES_ALLOCATING:
+            /* Wait for the above to be finished. */
+            BarrierArriveAndWait(&pstate->grow_batches_barrier,
+                                 WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING);
+            /* Fall through. */

Just to make sure I understand: The "empty" phase is to protect the
process of the electee doing the sizing calculations etc?  And the
reason that's not possible to do just by waiting for
PHJ_GROW_BATCHES_REPARTITIONING is that somebody could dynamically
arrive, i.e. it'd not be needed in a statically sized barrier?  Pretty
tired here, sorry ;)


+            /* reset temp memory each time to avoid leaks from qual expr */
+            ResetExprContext(econtext);
+
+            if (ExecQual(hjclauses, econtext))

I personally think it's better to avoid this pattern and store the
result of the ExecQual() in a variable, ResetExprContext() and then act
on the result.  No need to keep memory around for longer, and for bigger
contexts you're more likely to have all the metadata in cache.

I'd previously thought about introducing ExecQualAndReset() or such...


  * IDENTIFICATION
  *      src/backend/executor/nodeHashjoin.c
  *
+ * PARALLELISM
+ *

This is a pretty good explanation. How about adding a reference to it
from nodeHash.c's header?



+static TupleTableSlot *            /* return: a tuple or NULL */
+ExecHashJoin(PlanState *pstate)
+{
+    /*
+     * On sufficiently smart compilers this should be inlined with the
+     * parallel-aware branches removed.
+     */
+    return ExecHashJoinImpl(pstate, false);

Ah, the explanation I desired above is here. Still seems good to have a
comment at the somewhat suspicious use of always_inline.


+
+    /*
+     * If we started up so late that the shared batches have been freed
+     * already by ExecHashTableDetach(), then we are finished.
+     */
+    if (!DsaPointerIsValid(hashtable->parallel_state->batches))
+        return false;

This is really the only place that weird condition is detected? And why
is that possible in the first place? And if possible, shouldn't we have
detected earlier?  Also, if possible, what prevents this to occur in a
way that test fails, because pstate->batches is freed, but not yet
reset?


+                    while ((tuple = sts_parallel_scan_next(hashtable->batches[batchno].inner_tuples,
+                                                           &hashvalue)))

That's a fairly long line. Couldn't this whole block made more readable by
using a temp variable for inner_tuples?


Ok, eye's are closing against my will.  This looks pretty damn close.

- Andres


Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
On 2017-12-14 21:06:34 +1300, Thomas Munro wrote:
> On Thu, Dec 14, 2017 at 11:45 AM, Andres Freund <andres@anarazel.de> wrote:
> > +                       if (accessor->write_pointer + accessor->sts->meta_data_size >=
> > +                               accessor->write_end)
> > +                               elog(ERROR, "meta-data too long");
> >
> > That seems more like an Assert than a proper elog? Given that we're
> > calculating size just a few lines above...
> 
> It's an error because the logic is not smart enough to split the
> optional meta-data and tuple size over multiple chunks.  I have added
> comments there to explain.

I don't see how that requires it to be an elog rather than an assert. As
far as I can tell this is only reachable if
meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE. For anything
smaller the sts_flush_chunk() a few lines above would have started a new
chunk.

IOW, we should detect this at sts_initialize() time, elog there, and
Assert() out in puttuple.

I've changed the code like that, fixed my own < vs >= confusion during
the conversion, fixed a typo, and pushed.  If you're not ok with that
change, we can easily whack this around.

I've not changed this, but I wonder whether we should rename
sts_puttuple() to sts_put_tuple(), for consistency with
sts_read_tuple(). Or the other way round. Or...

Greetings,

Andres Freund


Re: [HACKERS] Parallel Hash take II

From
Thomas Munro
Date:
On Thu, Dec 14, 2017 at 10:12 PM, Andres Freund <andres@anarazel.de> wrote:
> Looking at the main patch (v28).
>
> First off: This looks pretty good, the code's quite readable now
> (especially compared to earlier versions), the comments are good. Really
> like the nodeHash split, and the always inline hackery in nodeHashjoin.
> Think we're getting really really close.

Thanks!

>   *     ExecHashJoinImpl
>   *
> - *     This function implements the Hybrid Hashjoin algorithm.
> + *     This function implements the Hybrid Hashjoin algorithm.  By forcing it
> + *     to be always inline many compilers are able to specialize it for
> + *     parallel = true/false without repeating the code.
>   *
>
> what about adding the above explanation for the always inline?

Ok, I added something like that.

> +               /*
> +                * So far we have no idea whether there are any other participants,
> +                * and if so, what phase they are working on.  The only thing we care
> +                * about at this point is whether someone has already created the
> +                * SharedHashJoinBatch objects, the main hash table for batch 0 and
> +                * (if necessary) the skew hash table yet.  One backend will be
> +                * elected to do that now if necessary.
> +                */
>
> The 'yet' sounds a bit weird in combination with the 'already'.

Fixed.  Also removed an outdated mention of skew hash table.

> + static void
> + ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
> + ...
> +               case PHJ_GROW_BATCHES_ELECTING:
> +                       /* Elect one participant to prepare the operation. */
>
> That's a good chunk of code. I'm ever so slightly inclined to put that
> into a separate function. But I'm not sure it'd look good. Feel entirely
> free to disregard.

Disregarding for now...

> + static HashJoinTuple
> + ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
> +                                                 dsa_pointer *shared)
>
> Not really happy with the name. ExecParallelHashTableInsert() calling
> ExecParallelHashLoadTuple() to insert a tuple into the hashtable doesn't
> quite strike me as right; the naming similarity to
> ExecParallelHashTableLoad seems problematic too.
> ExecParallelHashAllocTuple() or such?
>
> One could argue it'd not be a bad idea to keep a similar split as
> dense_alloc() and memcpy() have, but I'm not really convinced by
> myself. Hm.

Yeah, the names are confusing.  So:

1.  I changed ExecParallelHashTableLoad() to
ExecParallelHashTableInsertCurrentBatch().  It's just like
ExecParallelHashTableInsert() except that it's not prepared to send
tuples to a different batch or to run out of memory (it's used only
for loading batches from disk where the total size was already checked
by ExecParallelHashTablePrealloc(), and in the parallel case there is
no 'send forward to future batch' requirement so we might as well skip
the conditional stuff).

2.  I changed ExecParallelHashLoadTuple() to
ExecParallelHashTupleAlloc(), and made it so that the caller must call
memcpy so that it's more like dense_alloc().  I have now been working
on PostgreSQL long enough that arbitrary differences in case
conventions don't even hurt my eyes anymore.

I also renamed ExecParallelHashTableAllocate() to
ExecParallelHashTableAlloc(), and ExecParallelPreallocate() to
ExecParallelHashTuplePrealloc(), to conform with the that style.

> +               case PHJ_GROW_BATCHES_ELECTING:
> +                       /* Elect one participant to prepare the operation. */
>
> Imo that comment could use a one-line summary of what preparing means.

Done.

> +                                       /*
> +                                        * We probably also need a smaller bucket array.  How many
> +                                        * tuples do we expect per batch, assuming we have only
> +                                        * half of them so far?
>
> Makes sense, but did cost me a minute of thinking. Maybe add a short
> explanation why.

Done.

> +               case PHJ_GROW_BATCHES_ALLOCATING:
> +                       /* Wait for the above to be finished. */
> +                       BarrierArriveAndWait(&pstate->grow_batches_barrier,
> +                                                                WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING);
> +                       /* Fall through. */
>
> Just to make sure I understand: The "empty" phase is to protect the
> process of the electee doing the sizing calculations etc?  And the
> reason that's not possible to do just by waiting for
> PHJ_GROW_BATCHES_REPARTITIONING is that somebody could dynamically
> arrive, i.e. it'd not be needed in a statically sized barrier?

Yeah, it's where you wait for the serial phase above to be finished.
It needs an entry point of its own for the reason you said: someone
might attach while the allocation is taking place, and therefore needs
to wait for the allocation phase to finish.  By doing it this way
instead of (say) putting a wait at the start of the
PHJ_GROW_BATCHES_REPARTITIONING case, we allow someone to start
running at PHJ_GROW_BATCHES_REPARTITIONING phase and do useful work
immediately.  Does that make sense?

> +                       /* reset temp memory each time to avoid leaks from qual expr */
> +                       ResetExprContext(econtext);
> +
> +                       if (ExecQual(hjclauses, econtext))
>
> I personally think it's better to avoid this pattern and store the
> result of the ExecQual() in a variable, ResetExprContext() and then act
> on the result.  No need to keep memory around for longer, and for bigger
> contexts you're more likely to have all the metadata in cache.
>
> I'd previously thought about introducing ExecQualAndReset() or such...

Makes sense, but this is code that is identical in
ExecScanHashBucket() so I think we should keep it this way for now,
and explore expression context lifetime improvements in a separate
patch?  Looks like the same change could be made in other nodes too.

>   * IDENTIFICATION
>   *       src/backend/executor/nodeHashjoin.c
>   *
> + * PARALLELISM
> + *
>
> This is a pretty good explanation. How about adding a reference to it
> from nodeHash.c's header?

Done.

> +static TupleTableSlot *                        /* return: a tuple or NULL */
> +ExecHashJoin(PlanState *pstate)
> +{
> +       /*
> +        * On sufficiently smart compilers this should be inlined with the
> +        * parallel-aware branches removed.
> +        */
> +       return ExecHashJoinImpl(pstate, false);
>
> Ah, the explanation I desired above is here. Still seems good to have a
> comment at the somewhat suspicious use of always_inline.

Done.

> +
> +       /*
> +        * If we started up so late that the shared batches have been freed
> +        * already by ExecHashTableDetach(), then we are finished.
> +        */
> +       if (!DsaPointerIsValid(hashtable->parallel_state->batches))
> +               return false;
>
> This is really the only place that weird condition is detected? And why
> is that possible in the first place? And if possible, shouldn't we have
> detected earlier?  Also, if possible, what prevents this to occur in a
> way that test fails, because pstate->batches is freed, but not yet
> reset?

ExecParallelHashJoinNewBatch(), where this code appears, is generally
the place that we discover that we're finished.  Normally we discover
that we're finished by seeing that there are no batches left to chew
on, by inspecting the per-batch state in shmem.  This weird condition
arises when a worker starts up so late that the join is finished and
the shmem space used to tracks batches has already been freed.  I
agree that that was badly explained and there was in fact something a
bit kooky about that coding.  I have now changed it so that
ExecParallelHashEnsureBatchAccessors() detects this case and has a
better comment to explain it, and ExecParallelHashJoinNewBatch() now
just looks out for hashtable->batches == NULL with a comment referring
to the other place.

You can hit this case by hacking the code thus:

--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -986,6 +986,10 @@ ParallelWorkerMain(Datum main_arg)
        Assert(ParallelWorkerNumber == -1);
        memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));

+       elog(LOG, "delaying startup to stagger workers ...");
+       pg_usleep(ParallelWorkerNumber * 2000000);
+       elog(LOG, "... continuing");
+

... and then:

  create table foo as
    select generate_series(1, 1000000)::int a,
    'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'::text b;
  analyze foo;
  set parallel_leader_participation = off;
  explain analyze select count(*) from foo r join foo s using (a);

The point is that worker 0 completes the entire multi-batch join and
cleans up, freeing the parallel batch info, and then worker 1 arrives
2 seconds later and it can't even look up the batch info to figure out
which batches are finished.  It has to be like that, because worker 0
doesn't even know about worker 1, and as far as it's concerned the
last one out turns out the lights.

> +                                       while ((tuple =
sts_parallel_scan_next(hashtable->batches[batchno].inner_tuples,
> +
&hashvalue)))
>
> That's a fairly long line. Couldn't this whole block made more readable by
> using a temp variable for inner_tuples?

Done.

While running through every test I could think of I discovered that
the multi-batch parallel-aware hash join with rescan regression test
failed to be multi-batch on 32 bit x86, so I needed to tweak the
number of rows in the table called "bar".

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: [HACKERS] Parallel Hash take II

From
Andres Freund
Date:
Hi,

> > Not really happy with the name. ExecParallelHashTableInsert() calling
> > ExecParallelHashLoadTuple() to insert a tuple into the hashtable doesn't
> > quite strike me as right; the naming similarity to
> > ExecParallelHashTableLoad seems problematic too.
> > ExecParallelHashAllocTuple() or such?
> >
> > One could argue it'd not be a bad idea to keep a similar split as
> > dense_alloc() and memcpy() have, but I'm not really convinced by
> > myself. Hm.
>
> Yeah, the names are confusing.  So:

Cool.


> > Just to make sure I understand: The "empty" phase is to protect the
> > process of the electee doing the sizing calculations etc?  And the
> > reason that's not possible to do just by waiting for
> > PHJ_GROW_BATCHES_REPARTITIONING is that somebody could dynamically
> > arrive, i.e. it'd not be needed in a statically sized barrier?
>
> Yeah, it's where you wait for the serial phase above to be finished.
> [ Explanation ] Does that make sense?

Yes.


>
> > +                       /* reset temp memory each time to avoid leaks from qual expr */
> > +                       ResetExprContext(econtext);
> > +
> > +                       if (ExecQual(hjclauses, econtext))
> >
> > I personally think it's better to avoid this pattern and store the
> > result of the ExecQual() in a variable, ResetExprContext() and then act
> > on the result.  No need to keep memory around for longer, and for bigger
> > contexts you're more likely to have all the metadata in cache.
> >
> > I'd previously thought about introducing ExecQualAndReset() or such...
>
> Makes sense, but this is code that is identical in
> ExecScanHashBucket() so I think we should keep it this way for now,
> and explore expression context lifetime improvements in a separate
> patch?  Looks like the same change could be made in other nodes too.

Ok.


> > +
> > +       /*
> > +        * If we started up so late that the shared batches have been freed
> > +        * already by ExecHashTableDetach(), then we are finished.
> > +        */
> > +       if (!DsaPointerIsValid(hashtable->parallel_state->batches))
> > +               return false;
> >
> > This is really the only place that weird condition is detected? And why
> > is that possible in the first place? And if possible, shouldn't we have
> > detected earlier?  Also, if possible, what prevents this to occur in a
> > way that test fails, because pstate->batches is freed, but not yet
> > reset?
>
> ExecParallelHashJoinNewBatch(), where this code appears, is generally
> the place that we discover that we're finished.  Normally we discover
> that we're finished by seeing that there are no batches left to chew
> on, by inspecting the per-batch state in shmem.  This weird condition
> arises when a worker starts up so late that the join is finished and
> the shmem space used to tracks batches has already been freed.  I
> agree that that was badly explained and there was in fact something a
> bit kooky about that coding.  I have now changed it so that
> ExecParallelHashEnsureBatchAccessors() detects this case and has a
> better comment to explain it, and ExecParallelHashJoinNewBatch() now
> just looks out for hashtable->batches == NULL with a comment referring
> to the other place.

Thanks for updating.


My compiler complained that ExecHashJoinImpl() might not be
inlinable. That's just because you declared it always_inline without
actually making it an inline function...


Pushed.  Yeha!  Congrats, this has been quite the project.


I suspect we'll find a bunch of problems, both on the planning and
execution side, but I think at this point we're much more likely to find
and resolve these in-tree vs. out of tree.



Btw, I see dsa_get_address() show up pretty prominently in profiles. I
kinda wonder if there's some cases where we could ameliorate the cost by
recognizing that a bunch of lookups are all going to reside in the same
segment.

- Andres