Thread: parallelism and sorting
Hi, I've been thinking about how parallelism interacts with sorting over the last few days and I wanted to share a few preliminary thoughts. I definitely don't have all the answers worked out here yet, so thoughts are welcome. But here are a few observations: 1. Parallel sort is useful but within parallel queries and for utility commands like CREATE INDEX. Index builds can take a long time, and that time is often dominated by the time needed to sort the data, so being able to do that faster would be good. 2. Within parallel query, there are two reasons to care about data that is in sorted order. First, we might need to deliver the results to the user in a particular order, because they've specified ORDER BY whatever. Second, the optimal join strategy might be a merge join, which requires that both relations be sorted according to the join key.[1] 3. The current Gather node reads tuples from the workers in round-robin fashion, skipping over workers that don't have a tuple ready yet when it needs one. It seems potentially useful to have a Gather Merge node which would assume that the results from each worker are ordered with respect to each other, and do a final merge pass over those. Then we could get the toplevel query ordering we want using a plan like this: Gather Merge -> Sort -> Parallel Seq Scan on foo Filter: something 4. Gather Merge would be an executor node, and thus not available to any code that uses tuplesort.c directly. Also, it seems fairly mediocre for merge joins. The best we could do is something like this:[2] Merge Join -> Gather Merge -> Sort -> Parallel Seq Scan -> Gather Merge -> Sort -> Parallel Seq Scan The problem with this plan is that the join itself is not done in parallel, only the sorting. That's not great, especially if there are more joins that need to be done afterwards, necessarily not in parallel.[2] It's possible that one side of the join could be an Index Scan rather than Gather Merge -> Sort -> Parallel Seq Scan, but that doesn't change the overall picture here much. 5. Really nailing the merge join case seems to require partitioning both relations in a fashion compatible with the join attribute, and then joining the partitions separately. Consider an operator Repartition which reads rows from its child plan and returns those where hash(joincol) % NumberOfWorkers == MyWorkerNumber. The rest are sent to the worker whose worker number is hash(joincol) % NumberOfWorkers and are returned by its copy of the corrresponding Repartition operator. Then we could express a merge join reasonably well as: Gather (Merge) -> Merge Join -> Sort -> Repartition -> Parallel Seq Scan -> Sort -> Repartition -> Parallel Seq Scan The Gather could be a Gather Merge if the merge join ordering matches the final output ordering, or a simple Gather if it doesn't. Additional join steps could be inserted between the Gather (Merge) operator and the merge join. So this is a big improvement over the plan shown under point #4. However, it's probably still not optimal, because we probably want to have substantially more partitions than there are workers. Otherwise, if some workers finish before others, it's hard to spread the load. Getting this right probably requires some sort of cooperation between Gather and Repartition where they agree on a number of partitions and then the workers repeatedly pick a partition, run the plan for that partition, and then loop around to get the next unfinished partition until all are completed. 6. Even without repartitioning, if one side of the join has a usable index, we could instead do this: Gather (Merge) -> Merge Join -> Sort -> Parallel Seq Scan -> Index Scan However, this might not be a good idea: we'll scan the index once per worker. If we had a Parallel Index Scan which worked like a Parallel Seq Scan, in that it returned only a subset of the results in each worker but in the same order that the non-parallel version would have returned them, we could instead do this, which might or might not be better: Gather (Merge) -> Merge Join -> Sort -> Repartition -> Parallel Seq Scan -> Repartition -> Parallel Index Scan Here we scan the index just once (spread across all the workers) but we've got to repartition the rows we read from it, so I'm not sure how that's going to work out. Parallel index scan is of course useful apart from merge joins, because you can do something like this to preserve the ordering it creates: Gather Merge -> Nested Loop -> Parallel Index Scan on a -> Index Scan on b Index Qual: b.x = a.x 7. Another option, instead or in addition to introducing a Repartition operator, is to make the sort itself parallel-aware. Each worker reads rows until it fills work_mem, quicksorts them, and dumps them out as a run. Suppose there are few enough runs that we don't need multiple merge passes, and that we have some way of making every worker available of every run performed by any worker. Then any one or more of the workers can get the sorted results out by performing a final merge pass over the runs we produced. We could support various models for reading the results of the sort: return every tuple to every worker, return every tuple to some worker but don't return any given tuple to more than one worker; return all tuples in the leader. So if we just want to sort a big pile of tuples, the plan can now look like this: Gather -> Parallel Sort Output Mode: Leader Only -> Parallel Seq Scan I'm not sure if that's better or worse or exactly equivalent to the Gather Merge > Sort > Parallel Seq Scan approach. If we want to do a parallel merge join, we now have options like this: Gather (Merge) -> Merge Join -> Parallel Sort Output Mode: Each Tuple Once -> Parallel Seq Scan -> Index Scan Or: Gather (Merge) -> Merge Join -> Repartition -> Parallel Sort Output Mode: Each Tuple Once -> Parallel Seq Scan -> Repartition -> Parallel Sort Output Mode: Each Tuple To Every Worker -> Parallel Seq Scan OK, that's all I've got. So in the space of one email, I've proposed executor nodes for Gather Merge, Repartition, Partial Index Scan, and Parallel Sort (with three different output modes). And I don't know which ones are actually most interesting, or whether we need them all. Whee! Nor do I know whether any of this can work for code that currently uses tuplesort.c directly. Double whee! Thoughts welcome. Thanks, -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company [1] Nested loops preserve the input ordering, but there is no special reason for the input to have an ordering in the first place unless it's useful for a merge join higher up in the plan tree or unless it matches the final query ordering. Hash joins do not benefit from any particular input ordering, and in fact they destroy the input ordering if they go to multiple batches; so we always treat the output of a hash join as unordered. [2] Currently, Gather nodes cannot appear in a plan tree directly or indirectly beneath other Gather nodes, partly because it's not exactly clear what the semantics of such a thing would be. Therefore, the plan shown here precludes a parallel join between the output of the merge join and anything else.
On Mon, Nov 23, 2015 at 05:01:43PM -0500, Robert Haas wrote: > Hi, > > [snip] > > If we had a Parallel Index Scan which worked like a Parallel Seq > Scan, That sounds like a very handy thing to have. Any idea whether it's possible for 9.6? Is there any of the Parallel Seq Scan code that looks like it could be reused or slightly generalized for the implementation? Cheers, David. -- David Fetter <david@fetter.org> http://fetter.org/ Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter Skype: davidfetter XMPP: david.fetter@gmail.com Remember to vote! Consider donating to Postgres: http://www.postgresql.org/about/donate
On Mon, Nov 23, 2015 at 5:38 PM, David Fetter <david@fetter.org> wrote: > That sounds like a very handy thing to have. Any idea whether it's > possible for 9.6? Is there any of the Parallel Seq Scan code that > looks like it could be reused or slightly generalized for the > implementation? I think it would be a good idea to pattern a hypothetical Parallel Index Scan feature after what we did in commits ee7ca559fcf404f9a3bd99da85c8f4ea9fbc2e92 and f0661c4e8c44c0ec7acd4ea7c82e85b265447398, which are only about 500 lines of code combined, but I don't expect any direct code reuse to be possible. However: 1. Parallel Seq Scan is easier because we have, at present, only one heapam API. Partial Index Scan is likely to be more complicated because we need to deal not only with the indexam API but also with the individual access methods (btree, etc.). 2. In Parallel Seq Scan, the determination of what page to scan next isn't dependent on the contents of any page previously scanned. In Parallel Index Scan, it is. Therefore, the amount of effective parallelism is likely to be less. This doesn't mean that trying to parallelize things here is worthless: one backend can be fetching the next index page while some other backend is processing the tuples from a page previously read. 3. Without Gather Merge, it figures to be mostly useless, because a straight Gather node is order-destroying. I'm not prepared to speculate on whether this will get done for 9.6 at this point. I'll say it would be nice. :-) -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Mon, Nov 23, 2015 at 2:01 PM, Robert Haas <robertmhaas@gmail.com> wrote: > I've been thinking about how parallelism interacts with sorting over > the last few days and I wanted to share a few preliminary thoughts. I > definitely don't have all the answers worked out here yet, so thoughts > are welcome. I think it's definitely a good idea to have some high level discussion of these issues now. My responses will in some cases also be high level and aspirational. > 2. Within parallel query, there are two reasons to care about data > that is in sorted order. First, we might need to deliver the results > to the user in a particular order, because they've specified ORDER BY > whatever. Second, the optimal join strategy might be a merge join, > which requires that both relations be sorted according to the join > key.[1] I gather the distinction you're making here is between a Sort node, and a node that happens to use a tuplesort without an explicit Sort node (like a "COUNT(DISTINCT(foo))" *Aggregate* node -- *not* a GroupAggregate node). I am a little concerned cases like this might accidentally not benefit due to not explicitly having a Sort node, as you refer to below. Beyond that, CREATE INDEX and CLUSTER utility cases will also need to be parallelized without all this executor infrastructure. > 3. The current Gather node reads tuples from the workers in > round-robin fashion, skipping over workers that don't have a tuple > ready yet when it needs one. It seems potentially useful to have a > Gather Merge node which would assume that the results from each worker > are ordered with respect to each other, and do a final merge pass over > those. Then we could get the toplevel query ordering we want using a > plan like this: > > Gather Merge > -> Sort > -> Parallel Seq Scan on foo > Filter: something I am of course strongly of the opinion that extending the new, improved, but pending approach to external sorts [1] is the way to go. Using the filesystem as "poor man's shared memory" when you can actually afford real shared memory now seems like much less of a problem than I thought in the past. More on that later. The problem I see here is that having real executor nodes, while preserving various useful properties of an on-the-fly merge implies a degree of cross-node promiscuity that I think won't fly. For one thing, all my tricks with memory pooling during the final on-the-fly merge become iffy, to say the least. For another, the children cannot very well feed SortTuples to the parent using the usual TupleTableSlot mechanism -- we benefit plenty from reuse of SortSupport and so on during the merge. Who would want to reconstruct something SortTuple-like on the other side (within the Gather Merge)? Besides, if one child cannot produce tuples in time, unlike many things there is a legitimate need to hold everything up. I think we should parallelize the Merge in a later release -- possibly much later. It should probably still be presented to users more or less as you outline -- it will just be an implementation detail. IOW, what explain.c calls "special child plans". Actually, the answer here is probably simple -- as you suggest separately, the "Gather Merge" actually does almost the same thing as our existing on-the-fly merge step within tuplesort.c. The difference is only that it gets information about runs to merge from the workers when they finish the sort. There is a little bit of bookkeeping in shared memory, plus we revise the tuplesort.c interface to allow what is essentially my new approach to external sorts to happen in phases managed at a slightly lower level by the tuplesort client. The existing interface is preserved, plus a "build your own sort" interface. Clients continue to pass back and forth a little opaque state for tuplesort's benefit, some of which is stored by a Gather-like node in shared memory, but that's it. We need a new tuplesort_end() variant, to free memory early, without releasing tapes, just for this, and the caller needs to know a bit about runs (or that partitioning is a consequence of the number of workers actually available). > 4. Gather Merge would be an executor node, and thus not available to > any code that uses tuplesort.c directly. Also, it seems fairly > mediocre for merge joins. The best we could do is something like > this:[2] > > Merge Join > -> Gather Merge > -> Sort > -> Parallel Seq Scan > -> Gather Merge > -> Sort > -> Parallel Seq Scan > > The problem with this plan is that the join itself is not done in > parallel, only the sorting. That's not great, especially if there are > more joins that need to be done afterwards, necessarily not in > parallel.[2] It's possible that one side of the join could be an > Index Scan rather than Gather Merge -> Sort -> Parallel Seq Scan, but > that doesn't change the overall picture here much. That's not a huge problem, because at least the Sort is the really expensive part. Have you tried contriving a merge join test case with a really cheap sort or pair of sorts? I'd try coming up with a case with perfectly sorted inputs into the sort nodes, and compare that with an equivalent case with random inputs. Our silly "bubble sort best case" quicksort optimization for pre-sorted input will make the sort artificially very cheap. That might provide guidance on how much parallelizing the synchronization of relations (merging) can be expected to help. That cost will not grow O(n log n), an observation that crops up in a few places. The funny thing about linearithmic growth is that if you have enough memory to do a big enough sort (or sort of a run), which these days you probably do when a parallel sort is recommended, it will come to dominate everything. Suddenly, problems like having to write out runs matter way less, and asynchronous I/O becomes merely a nice-to-have, where 10 or 15 years ago it was very important for parallel sort. Merging might be in a similar category. We should consider that Postgres development will benefit from coming late to parallel sort, since the current large memory sizes and large database sizes (and to a lesser extent the properties of modern block devices) significantly reduce what in the past were larger problems. More research is required, but it seems like something worth considering. What I found really interesting during my experiments with the new approach to sorting (simple hybrid sort-merge strategy) was how performance was very consistent past a certain work_mem setting (about 1GB IIRC). Lower settings greater than that fuzzy threshold resulted in a shorter, maybe even much shorter time spent sorting runs. And yet, (at least without abbreviated keys, and especially for the pass-by-value datum case) the later cost of merging grew surprisingly well in-line with whatever the reduced time spent sorting runs was. This indicated to me that linearithmic growth dominated. It also indicates that we can perhaps partition very strategically, without regard for anything other than keeping n workers busy, making assigning work to workers relatively straightforward. The whole idea that it's okay that there is a huge gulf between internal and external sort is a bad, old-fashioned idea that needs to die. Blurring the distinction between the two has benefits all over the place, since for example it greatly reduces the cost of a misestimation. Besides, big sorts will tend to need to be external sorts. I also think that sort is something that is more amenable to being usefully parallelized than any other thing -- nothing else is so computationally intensive. As Nyberg et al put it in 1994: """ Reducing data cache misses can be an intractable problem if the data references are made by a very large number of instructions. For instance, code to execute the TPC-A benchmarks is usually characterized by a very large number of basic blocks that do not loop. In this environment, it is very difficult to understand the data access patterns, let alone modify them to reduce cache misses. In contrast, sorting belongs to a class of programs that make a very large number of data accesses from a small amount of looping code. In this environment, it is feasible to control data accesses via algorithm or data structure modifications. """ More recently, it has been predicted that trends in CPU development -- more cores, *less* memory bandwidth per core (the per-core trend for memory bandwidth is not mere stagnation, it's _regression_) will tend to favor merge joins in the long term. Hash joins cannot scale as well, primarily due to the memory bandwidth bottleneck, but also because hashing is not amenable to using SIMD instructions, which we can hope to eventually benefit from with sorting. The point is that parallel sorting is relatively easy to get significant benefits from as compared to parallelizing other things, and maybe an orientation towards using merge joins more frequently is a good long term goal to have. > 5. Really nailing the merge join case seems to require partitioning > both relations in a fashion compatible with the join attribute, and > then joining the partitions separately. Consider an operator > Repartition which reads rows from its child plan and returns those > where hash(joincol) % NumberOfWorkers == MyWorkerNumber. I'll have to think about that some more. > 7. Another option, instead or in addition to introducing a Repartition > operator, is to make the sort itself parallel-aware. Each worker > reads rows until it fills work_mem, quicksorts them, and dumps them > out as a run. Suppose there are few enough runs that we don't need > multiple merge passes, and that we have some way of making every > worker available of every run performed by any worker. Then any one > or more of the workers can get the sorted results out by performing a > final merge pass over the runs we produced. We could support various > models for reading the results of the sort: return every tuple to > every worker, return every tuple to some worker but don't return any > given tuple to more than one worker; return all tuples in the leader. > So if we just want to sort a big pile of tuples, the plan can now look > like this: > > Gather > -> Parallel Sort > Output Mode: Leader Only > -> Parallel Seq Scan > > I'm not sure if that's better or worse or exactly equivalent to the > Gather Merge > Sort > Parallel Seq Scan approach. If we want to do a > parallel merge join, we now have options like this: As I went into already, my tentative view is that I think this is better. > OK, that's all I've got. So in the space of one email, I've proposed > executor nodes for Gather Merge, Repartition, Partial Index Scan, and > Parallel Sort (with three different output modes). And I don't know > which ones are actually most interesting, or whether we need them all. > Whee! Nor do I know whether any of this can work for code that > currently uses tuplesort.c directly. Double whee! Sometimes it's appropriate to talk about things in a hand-wavey fashion. We don't do enough of that. Closing thought: work_mem is a bad model for sorting in the long run, since higher settings won't help much past a certain threshold. We need to come up with a model that basically only allows a very high effective work_mem setting when that is enough to do the sort fully internally (and possibly when we weren't going to parallelize the sort anyway, since that may at least initially only work for external sorts). Otherwise, we might as well size an effective work_mem setting according to what our n workers require, or at a level past the threshold at which the benefits of a higher setting are almost noise. This is perhaps also a stepping stone to admission control. Finally, it also empowers us to provide wiggle-room to allow a worker an "effective work_mem burst", sufficient to not have an additional tiny run, which seems like a good idea, and simplifies the partitioning model that the planner needs. [1] https://commitfest.postgresql.org/7/317/ -- Peter Geoghegan
On Mon, Nov 23, 2015 at 8:45 PM, Peter Geoghegan <pg@heroku.com> wrote: >> 2. Within parallel query, there are two reasons to care about data >> that is in sorted order. First, we might need to deliver the results >> to the user in a particular order, because they've specified ORDER BY >> whatever. Second, the optimal join strategy might be a merge join, >> which requires that both relations be sorted according to the join >> key.[1] > > I gather the distinction you're making here is between a Sort node, > and a node that happens to use a tuplesort without an explicit Sort > node (like a "COUNT(DISTINCT(foo))" *Aggregate* node -- *not* a > GroupAggregate node). Yes. Or things that aren't part of the executor at all. > I am a little concerned cases like this might > accidentally not benefit due to not explicitly having a Sort node, as > you refer to below. A valid concern. > Beyond that, CREATE INDEX and CLUSTER utility > cases will also need to be parallelized without all this executor > infrastructure. Or, alternatively, CREATE INDEX and CLUSTER could be refactored to use the executor. This is might sound crazy, but maybe it's not. Perhaps we could have the executor tree output correctly-formed index tuples that get funneled into a new kind of DestReceiver that puts them into the index. I don't know if that's a GOOD idea, but it's an idea. > The problem I see here is that having real executor nodes, while > preserving various useful properties of an on-the-fly merge implies a > degree of cross-node promiscuity that I think won't fly. For one > thing, all my tricks with memory pooling during the final on-the-fly > merge become iffy, to say the least. For another, the children cannot > very well feed SortTuples to the parent using the usual TupleTableSlot > mechanism -- we benefit plenty from reuse of SortSupport and so on > during the merge. Who would want to reconstruct something > SortTuple-like on the other side (within the Gather Merge)? Besides, > if one child cannot produce tuples in time, unlike many things there > is a legitimate need to hold everything up. I think we should > parallelize the Merge in a later release -- possibly much later. The implementation I have in mind for Gather Merge is as follows. Currently, a Gather node has two TupleTableSlots - one for tuples that the leader generates itself by running the plan before the workers get started or when they can't keep up, and a second for tuples read from the workers. What I plan to do is refactor it so that there is one TupleTableSlot per worker. If we're doing a standard Gather, we simply return a tuple from whichever slot we manage to fill first. If we're doing a Gather Merge, we fill every slot, then build a heap of the tuples and return the lowest one. When we need the next tuple, we refill that slot, restore the heap property, lather, rinse, repeat. This is basically the same way MergeAppend works, but instead of reading tuples from multiple subplans, we're reading them from multiple workers. There's really no cross-node promiscuity here - whatever is under the Gather Merge neither knows nor cares what the Gather Merge will do with the tuples, and it does not need to be fed by an explicit sort any more than MergeAppend does. >> 4. Gather Merge would be an executor node, and thus not available to >> any code that uses tuplesort.c directly. Also, it seems fairly >> mediocre for merge joins. The best we could do is something like >> this:[2] >> >> Merge Join >> -> Gather Merge >> -> Sort >> -> Parallel Seq Scan >> -> Gather Merge >> -> Sort >> -> Parallel Seq Scan >> >> The problem with this plan is that the join itself is not done in >> parallel, only the sorting. That's not great, especially if there are >> more joins that need to be done afterwards, necessarily not in >> parallel.[2] It's possible that one side of the join could be an >> Index Scan rather than Gather Merge -> Sort -> Parallel Seq Scan, but >> that doesn't change the overall picture here much. > > That's not a huge problem, because at least the Sort is the really > expensive part. OK, but suppose you need to do a hash or nested loop join to another table after the merge join. With this approach, you cannot parallelize that. > Have you tried contriving a merge join test case with a really cheap > sort or pair of sorts? No. My real-world experience, back before I became a full-time hacker, was that hash joins were often faster than nested loops, and merge joins were dog slow. I dunno if that's representative of other people's experience, or whether subsequent releases have changed the picture. > What I found really interesting during my experiments with the new > approach to sorting (simple hybrid sort-merge strategy) was how > performance was very consistent past a certain work_mem setting (about > 1GB IIRC). Lower settings greater than that fuzzy threshold resulted > in a shorter, maybe even much shorter time spent sorting runs. And > yet, (at least without abbreviated keys, and especially for the > pass-by-value datum case) the later cost of merging grew surprisingly > well in-line with whatever the reduced time spent sorting runs was. > This indicated to me that linearithmic growth dominated. It also > indicates that we can perhaps partition very strategically, without > regard for anything other than keeping n workers busy, making > assigning work to workers relatively straightforward. Agreed on that last part. It's interesting to think about what further operations we can do with a built-in partitioning notion that permits us to recast a join-of-appends as an append-of-joins, a standard technique for making partitioned parallelism work well at scale. But in the meantime, and maybe even in the long term, the algorithm we've actually implemented, where Parallel Seq Scan partitions the relation block-by-block, has a lot to recommend it. Worker imbalance is avoided because each worker slurps up data as fast as it can, and that speed varies from worker to worker for whatever reason, we still keep all the workers busy until the whole computation is done. > The point is that parallel sorting is relatively easy to get > significant benefits from as compared to parallelizing other things, > and maybe an orientation towards using merge joins more frequently is > a good long term goal to have. Maybe. I don't think anyone's done a lot of work to compare the speed of merge joins to the speed of say hash joins on a modern version of PostgreSQL in situations where both algorithms are practical. That seems like an essential prerequisite to any thought of changing the cost model. >> 7. Another option, instead or in addition to introducing a Repartition >> operator, is to make the sort itself parallel-aware. Each worker >> reads rows until it fills work_mem, quicksorts them, and dumps them >> out as a run. Suppose there are few enough runs that we don't need >> multiple merge passes, and that we have some way of making every >> worker available of every run performed by any worker. Then any one >> or more of the workers can get the sorted results out by performing a >> final merge pass over the runs we produced. We could support various >> models for reading the results of the sort: return every tuple to >> every worker, return every tuple to some worker but don't return any >> given tuple to more than one worker; return all tuples in the leader. >> So if we just want to sort a big pile of tuples, the plan can now look >> like this: >> >> Gather >> -> Parallel Sort >> Output Mode: Leader Only >> -> Parallel Seq Scan >> >> I'm not sure if that's better or worse or exactly equivalent to the >> Gather Merge > Sort > Parallel Seq Scan approach. If we want to do a >> parallel merge join, we now have options like this: > > As I went into already, my tentative view is that I think this is better. OK, that's helpful to know. I'm quite sure we need Gather Merge no matter what, because of stuff like: Gather Merge -> Nested Loop -> Index Scan on foo -> Index Scan on bar Index Cond: bar.x = foo.x Without Gather Merge, there's no way to do a parallel join that preserves the ordering provided by the outer index scan, so the query will involve an explicit sort where it need not. The interesting question in my mind isn't so much whether we need Gather Merge, because I am pretty well sure we do, but what else we need, and it sounds like you're saying a parallel-aware tuplesort.c ought to be on the list. Good enough. What would this actually look like, from an API point of view? I think probably: 1. Caller creates a ParallelContext. 2. Caller creates a parallel-aware tuplesort using some tuplesort.h API. 3. Caller calls LaunchParallelWorkers(pcxt), arranging for each worker to "attach" to the parallel-aware tuplesort. 4. Workers, and caller if desired, put data into the tuplesort to be sorted. When done, they perform the sort. 5. When all workers have performed the sort, the sorted data can be read by one or all workers in the usual way. 6. After the workers exit, original process destroys the parallel context. > Closing thought: work_mem is a bad model for sorting in the long run, > since higher settings won't help much past a certain threshold. > > We need to come up with a model that basically only allows a very high > effective work_mem setting when that is enough to do the sort fully > internally (and possibly when we weren't going to parallelize the sort > anyway, since that may at least initially only work for external > sorts). Otherwise, we might as well size an effective work_mem setting > according to what our n workers require, or at a level past the > threshold at which the benefits of a higher setting are almost noise. > This is perhaps also a stepping stone to admission control. Finally, > it also empowers us to provide wiggle-room to allow a worker an > "effective work_mem burst", sufficient to not have an additional tiny > run, which seems like a good idea, and simplifies the partitioning > model that the planner needs. Interesting idea. One idea about parallel sort is that perhaps if multiple workers feed data into the sort, they can each just sort what they have and then merge the results. So there's no real distinction between internal and external for parallel sorts; but a parallel sort always involves a final merge. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Tue, Nov 24, 2015 at 8:59 AM, Robert Haas <robertmhaas@gmail.com> wrote:
With Regards,
Amit Kapila.
One idea about parallel sort is that perhaps if multiple workers feed
data into the sort, they can each just sort what they have and then
merge the results.
Sounds like a good approach for parallel sorting, however small extension
to it that could avoid merging the final results is that workers allocated
for sort will perform range-based sorting. A simple example to sort integers
from 1-100 will be, worker-1 will be responsible for sorting any integer
between 1-50 and worker-2 will be responsible for sorting integers from
51-100 and then master backend just needs to ensure that it first returns
the tuples from worker-1 and then from worker-2. I think it has some
similarity to your idea-5 (use of repartition), but not exactly same.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Tue, Nov 24, 2015 at 5:29 AM, Robert Haas <robertmhaas@gmail.com> wrote: > On Mon, Nov 23, 2015 at 8:45 PM, Peter Geoghegan <pg@heroku.com> wrote: >> Beyond that, CREATE INDEX and CLUSTER utility >> cases will also need to be parallelized without all this executor >> infrastructure. > > Or, alternatively, CREATE INDEX and CLUSTER could be refactored to use > the executor. This is might sound crazy, but maybe it's not. Perhaps > we could have the executor tree output correctly-formed index tuples > that get funneled into a new kind of DestReceiver that puts them into > the index. I don't know if that's a GOOD idea, but it's an idea. Having CREATE INDEX use the executor seems like a useful idea for reasons unrelated to parallelism. The use case I have in mind is a table containing multiple years worth of (approximately) time series data, where overwhelming majority of queries are explicitly interested in recent data. Having a partial index with WHERE tstamp > $some_recent_tstamp cutting out 90+% of tuples was extremely helpful for performance for both index size reasons and having to process less tuples. This index needs to be periodically rebuilt with a newer timestamp constant, and the rebuild would be a lot faster if it could use the existing index to perform an index only scan of 10% of data instead of scanning and sorting the full table. Ants Aasma -- Cybertec Schönig & Schönig GmbH Gröhrmühlgasse 26 A-2700 Wiener Neustadt Web: http://www.postgresql-support.de
On Tue, Nov 24, 2015 at 7:59 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: > On Tue, Nov 24, 2015 at 8:59 AM, Robert Haas <robertmhaas@gmail.com> wrote: >> One idea about parallel sort is that perhaps if multiple workers feed >> data into the sort, they can each just sort what they have and then >> merge the results. > > Sounds like a good approach for parallel sorting, however small extension > to it that could avoid merging the final results is that workers allocated > for sort will perform range-based sorting. A simple example to sort integers > from 1-100 will be, worker-1 will be responsible for sorting any integer > between 1-50 and worker-2 will be responsible for sorting integers from > 51-100 and then master backend just needs to ensure that it first returns > the tuples from worker-1 and then from worker-2. I think it has some > similarity to your idea-5 (use of repartition), but not exactly same. This is not so easy to accomplish for a couple of reasons. First, how would you know where to partition the range? That would work fine if you had all the data in sorted order to begin with, but of course if you had that you wouldn't be sorting it. Second, remember that the data is probably arriving in separate streams in each worker - e.g. the sort may be being fed by a parallel sequential scan. If you do what I'm proposing, those workers don't need to communicate with each other except for the final merge at the end; but to do what you're proposing, you'd need to move each tuple from the worker that got it originally to the correct worker. I would guess that would be at least as expensive as the final merge pass you are hoping to avoid, and maybe significantly moreso. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Tue, Nov 24, 2015 at 7:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
>
> On Tue, Nov 24, 2015 at 7:59 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
> > On Tue, Nov 24, 2015 at 8:59 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> >> One idea about parallel sort is that perhaps if multiple workers feed
> >> data into the sort, they can each just sort what they have and then
> >> merge the results.
> >
> > Sounds like a good approach for parallel sorting, however small extension
> > to it that could avoid merging the final results is that workers allocated
> > for sort will perform range-based sorting. A simple example to sort integers
> > from 1-100 will be, worker-1 will be responsible for sorting any integer
> > between 1-50 and worker-2 will be responsible for sorting integers from
> > 51-100 and then master backend just needs to ensure that it first returns
> > the tuples from worker-1 and then from worker-2. I think it has some
> > similarity to your idea-5 (use of repartition), but not exactly same.
>
> This is not so easy to accomplish for a couple of reasons. First, how
> would you know where to partition the range?
I was thinking to form range map by referring histogram from stats.
>
> That would work fine if
> you had all the data in sorted order to begin with, but of course if
> you had that you wouldn't be sorting it. Second, remember that the
> data is probably arriving in separate streams in each worker - e.g.
> the sort may be being fed by a parallel sequential scan.
True, at this moment I am not sure what is the best way to reduce that
overhead, but may be some form of min tuple can be used for the same.
>
> If you do
> what I'm proposing, those workers don't need to communicate with each
> other except for the final merge at the end; but to do what you're
> proposing, you'd need to move each tuple from the worker that got it
> originally to the correct worker. I would guess that would be at
> least as expensive as the final merge pass you are hoping to avoid,
> and maybe significantly moreso.
>
I think we can evaluate pros and cons of each approach and then proceed
with one which is more promising.
>
> On Tue, Nov 24, 2015 at 7:59 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
> > On Tue, Nov 24, 2015 at 8:59 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> >> One idea about parallel sort is that perhaps if multiple workers feed
> >> data into the sort, they can each just sort what they have and then
> >> merge the results.
> >
> > Sounds like a good approach for parallel sorting, however small extension
> > to it that could avoid merging the final results is that workers allocated
> > for sort will perform range-based sorting. A simple example to sort integers
> > from 1-100 will be, worker-1 will be responsible for sorting any integer
> > between 1-50 and worker-2 will be responsible for sorting integers from
> > 51-100 and then master backend just needs to ensure that it first returns
> > the tuples from worker-1 and then from worker-2. I think it has some
> > similarity to your idea-5 (use of repartition), but not exactly same.
>
> This is not so easy to accomplish for a couple of reasons. First, how
> would you know where to partition the range?
I was thinking to form range map by referring histogram from stats.
>
> That would work fine if
> you had all the data in sorted order to begin with, but of course if
> you had that you wouldn't be sorting it. Second, remember that the
> data is probably arriving in separate streams in each worker - e.g.
> the sort may be being fed by a parallel sequential scan.
True, at this moment I am not sure what is the best way to reduce that
overhead, but may be some form of min tuple can be used for the same.
>
> If you do
> what I'm proposing, those workers don't need to communicate with each
> other except for the final merge at the end; but to do what you're
> proposing, you'd need to move each tuple from the worker that got it
> originally to the correct worker. I would guess that would be at
> least as expensive as the final merge pass you are hoping to avoid,
> and maybe significantly moreso.
>
I think we can evaluate pros and cons of each approach and then proceed
with one which is more promising.
On 11/23/15 5:47 PM, Robert Haas wrote: > 2. In Parallel Seq Scan, the determination of what page to scan next > isn't dependent on the contents of any page previously scanned. In > Parallel Index Scan, it is. Therefore, the amount of effective > parallelism is likely to be less. This doesn't mean that trying to > parallelize things here is worthless: one backend can be fetching the > next index page while some other backend is processing the tuples from > a page previously read. Presumably we could simulate that today by asking the kernel for the next page in advance, like we do for seqscans when effective_io_concurrency > 1. My guess is a parallel worker won't help there. Where a parallel worker might provide a lot of benefit is separating index scanning from heap scanning (to check visibility or satisfy a filter). It wouldn't surprise me if a single worker reading an index could keep a number of children busy retrieving heap tuples and processing them. It might be nice if an index scan node just fired up it's own workers and talked to them directly. -- Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX Experts in Analytics, Data Architecture and PostgreSQL Data in Trouble? Get it in Treble! http://BlueTreble.com
On 11/24/15 7:10 AM, Ants Aasma wrote: > The use case I have in mind is a table containing multiple years worth > of (approximately) time series data, where overwhelming majority of > queries are explicitly interested in recent data. Having a partial > index with WHERE tstamp > $some_recent_tstamp cutting out 90+% of > tuples was extremely helpful for performance for both index size > reasons and having to process less tuples. This index needs to be > periodically rebuilt with a newer timestamp constant, and the rebuild > would be a lot faster if it could use the existing index to perform an > index only scan of 10% of data instead of scanning and sorting the > full table. There are other cases where you'd want to build an index off an existing index as well. It's not that uncommon to have small, specialized indexes that are fully or partially a subset of another index. -- Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX Experts in Analytics, Data Architecture and PostgreSQL Data in Trouble? Get it in Treble! http://BlueTreble.com
On Tue, Nov 24, 2015 at 7:44 PM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote: > On 11/23/15 5:47 PM, Robert Haas wrote: >> 2. In Parallel Seq Scan, the determination of what page to scan next >> isn't dependent on the contents of any page previously scanned. In >> Parallel Index Scan, it is. Therefore, the amount of effective >> parallelism is likely to be less. This doesn't mean that trying to >> parallelize things here is worthless: one backend can be fetching the >> next index page while some other backend is processing the tuples from >> a page previously read. > > Presumably we could simulate that today by asking the kernel for the next > page in advance, like we do for seqscans when effective_io_concurrency > 1. We don't do any such thing. We prefetch for bitmap heap scans, not seq scans. > My guess is a parallel worker won't help there. > Where a parallel worker might provide a lot of benefit is separating index > scanning from heap scanning (to check visibility or satisfy a filter). It > wouldn't surprise me if a single worker reading an index could keep a number > of children busy retrieving heap tuples and processing them. Fortunately, the design I'm describing permits that exact thing. > It might be > nice if an index scan node just fired up it's own workers and talked to them > directly. That would be a bad idea, I'm pretty sure. Passing tuples between workers is expensive and needs to be minimized. I am quite confident that the right model for making parallelism better is to push as much stuff beneath the Gather node as possible - that is, each worker should have as many different things as possible that it can do without incurring communication overhead. Single purpose workers that only assist with one part of the computation and then relay data to some other process are exactly what we want to avoid. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company