Parallel tuplesort (for parallel B-Tree index creation) - Mailing list pgsql-hackers
From | Peter Geoghegan |
---|---|
Subject | Parallel tuplesort (for parallel B-Tree index creation) |
Date | |
Msg-id | CAM3SWZQKM=Pzc=CAHzRixKjp2eO5Q0Jg1SoFQqeXFQ647JiwqQ@mail.gmail.com Whole thread Raw |
Responses |
Re: Parallel tuplesort (for parallel B-Tree index creation)
Re: Parallel tuplesort (for parallel B-Tree index creation) Re: Parallel tuplesort (for parallel B-Tree index creation) Re: Parallel tuplesort (for parallel B-Tree index creation) Re: Parallel tuplesort (for parallel B-Tree index creation) Re: Parallel tuplesort (for parallel B-Tree index creation) |
List | pgsql-hackers |
As some of you know, I've been working on parallel sort. I think I've gone as long as I can without feedback on the design (and I see that we're accepting stuff for September CF now), so I'd like to share what I came up with. This project is something that I've worked on inconsistently since late last year. It can be thought of as the Postgres 10 follow-up to the 9.6 work on external sorting. Attached WIP patch series: * Adds a parallel sorting capability to tuplesort.c. * Adds a new client of this capability: btbuild()/nbtsort.c can now create B-Trees in parallel. Most of the complexity here relates to the first item; the tuplesort module has been extended to support sorting in parallel. This is usable in principle by every existing tuplesort caller, without any restriction imposed by the newly expanded tuplesort.h interface. So, for example, randomAccess MinimalTuple support has been added, although it goes unused for now. I went with CREATE INDEX as the first client of parallel sort in part because the cost model and so on can be relatively straightforward. Even CLUSTER uses the optimizer to determine if a sort strategy is appropriate, and that would need to be taught about parallelism if its tuplesort is to be parallelized. I suppose that I'll probably try to get CLUSTER (with a tuplesort) done in the Postgres 10 development cycle too, but not just yet. For now, I would prefer to focus discussion on tuplesort itself. If you can only look at one part of this patch, please look at the high-level description of the interface/caller contract that was added to tuplesort.h. Performance =========== Without further ado, I'll demonstrate how the patch series improves performance in one case. This benchmark was run on an AWS server with many disks. A d2.4xlarge instance was used, with 16 vCPUs, 122 GiB RAM, 12 x 2 TB HDDs, running Amazon Linux. Apparently, this AWS instance type can sustain 1,750 MB/second of I/O, which I was able to verify during testing (when a parallel sequential scan ran, iotop reported read throughput slightly above that for multi-second bursts). Disks were configured in software RAID0. These instances have disks that are optimized for sequential performance, which suits the patch quite well. I don't usually trust AWS EC2 for performance testing, but it seemed to work well here (results were pretty consistent). Setup: CREATE TABLE parallel_sort_test AS SELECT hashint8(i) randint, md5(i::text) collate "C" padding1, md5(i::text || '2') collate "C" padding2 FROM generate_series(0, 1e9::bigint) i; CHECKPOINT; This leaves us with a parallel_sort_test table that is 94 GB in size. SET maintenance_work_mem = '8GB'; -- Serial case (external sort, should closely match master branch): CREATE INDEX serial_idx ON parallel_sort_test (randint) WITH (parallel_workers = 0); Total time: 00:15:42.15 -- Patch with 8 tuplesort "sort-and-scan" workers (leader process participates as a worker here): CREATE INDEX patch_8_idx ON parallel_sort_test (randint) WITH (parallel_workers = 7); Total time: 00:06:03.86 As you can see, the parallel case is 2.58x faster (while using more memory, though it's not the case that a higher maintenance_work_mem setting speeds up the serial/baseline index build). 8 workers are a bit faster than 4, but not by much (not shown). 16 are a bit slower, but not by much (not shown). trace_sort output for "serial_idx" case: """ begin index sort: unique = f, workMem = 8388608, randomAccess = f switching to external sort with 501 tapes: CPU 7.81s/25.54u sec elapsed 33.95 sec *** SNIP *** performsort done (except 7-way final merge): CPU 53.52s/666.89u sec elapsed 731.67 sec external sort ended, 2443786 disk blocks used: CPU 74.40s/854.52u sec elapsed 942.15 sec """ trace_sort output for "patch_8_idx" case: """ begin index sort: unique = f, workMem = 8388608, randomAccess = f *** SNIP *** sized memtuples 1.62x from worker's 130254158 (3052832 KB) to 210895910 (4942873 KB) for leader merge (0 KB batch memory conserved) *** SNIP *** tape -1/7 initially used 411907 KB of 430693 KB batch (0.956) and 26361986 out of 26361987 slots (1.000) performsort done (except 8-way final merge): CPU 12.28s/101.76u sec elapsed 129.01 sec parallel external sort ended, 2443805 disk blocks used: CPU 30.08s/318.15u sec elapsed 363.86 sec """ This is roughly the degree of improvement that I expected when I first undertook this project late last year. As I go into in more detail below, I believe that we haven't exhausted all avenues to make parallel CREATE INDEX faster still, but I do think what's left on the table is not enormous. There is less benefit when sorting on a C locale text attribute, because the overhead of merging dominates parallel sorts, and that's even more pronounced with text. So, many text cases tend to work out at about only 2x - 2.2x faster. We could work on this indirectly. I've seen cases where a CREATE INDEX ended up more than 3x faster, though. I benchmarked this case in the interest of simplicity (the serial case is intended to be comparable, making the test fair). Encouragingly, as you can see from the trace_sort output, the 8 parallel workers are 5.67x faster at getting to the final merge (a merge that even it performs serially). Note that the final merge for each CREATE INDEX is comparable (7 runs vs. 8 runs from each of 8 workers). Not bad! Design: New, key concepts for tuplesort.c ========================================== The heap is scanned in parallel, and worker processes also merge in parallel if required (it isn't required in the example above). The implementation makes heavy use of existing external sort infrastructure. In fact, it's almost the case that the implementation is a generalization of external sorting that allows workers to perform heap scanning and run sorting independently, with tapes then "unified" in the leader process for merging. At that point, the state held by the leader is more or less consistent with the leader being a serial external sort process that has reached its merge phase in the conventional manner (serially). The steps callers must take are described fully in tuplesort.h. The general idea is that a Tuplesortstate is aware that it might not be a self-contained sort; it may instead be one part of a parallel sort operation. You might say that the tuplesort caller must "build its own sort" from participant worker process Tuplesortstates. The caller creates a dynamic shared memory segment + TOC for each parallel sort operation (could be more than one concurrent sort operation, of course), passes that to tuplesort to initialize and manage, and creates a "leader" Tuplesortstate in private memory, plus one or more "worker" Tuplesortstates, each presumably managed by a different parallel worker process. tuplesort.c does most of the heavy lifting, including having processes wait on each other to respect its ordering dependencies. Caller is responsible for spawning workers to do the work, reporting details of the workers to tuplesort through shared memory, and having workers call tuplesort to actually perform sorting. Caller consumes final output through leader Tuplesortstate in leader process. I think that this division of labor works well for us. Tape unification ---------------- Sort operations have a unique identifier, generated before any workers are launched, using a scheme based on the leader's PID, and a unique temp file number. This makes all on-disk state (temp files managed by logtape.c) discoverable by the leader process. State in shared memory is sized in proportion to the number of workers, so the only thing about the data being sorted that gets passed around in shared memory is a little logtape.c metadata for tapes, describing for example how large each constituent BufFile is (a BufFile associated with one particular worker's tapeset). (See below also for notes on buffile.c's role in all of this, fd.c and resource management, etc.) workMem ------- Each worker process claims workMem as if it was an independent node. The new implementation reuses much of what was originally designed for external sorts. As such, parallel sorts are necessarily external sorts, even when the workMem (i.e. maintenance_work_mem) budget could in principle allow for parallel sorting to take place entirely in memory. The implementation arguably *insists* on making such cases external sorts, when they don't really need to be. This is much less of a problem than you might think, since the 9.6 work on external sorting does somewhat blur the distinction between internal and external sorts (just consider how much time trace_sort indicates is spent waiting on writes in workers; it's typically a small part of the total time spent). Since parallel sort is really only compelling for large sorts, it makes sense to make them external, or at least to prioritize the cases that should be performed externally. Anyway, workMem-not-exceeded cases require special handling to not completely waste memory. Statistics about worker observations are used at later stages, to at least avoid blatant waste, and to ensure that memory is used optimally more generally. Merging ======= The model that I've come up with is that every worker process is guaranteed to output one materialized run onto one tape for the leader to merge within from its "unified" tapeset. This is the case regardless of how much workMem is available, or any other factor. The leader always assumes that the worker runs/tapes are present and discoverable based only on the number of known-launched worker processes, and a little metadata on each that is passed through shared memory. Producing one output run/materialized tape from all input tuples in a worker often happens without the worker running out of workMem, which you saw above. A straight quicksort and dump of all tuples is therefore possible, without any merging required in the worker. Alternatively, it may prove necessary to do some amount of merging in each worker to generate one materialized output run. This case is handled in the same way as a randomAccess case that requires one materialized output tape to support random access by the caller. This worker merging does necessitate another pass over all temp files for the worker, but that's a much lower cost than you might imagine, in part because the newly expanded use of batch memory makes merging here cache efficient. Batch allocation is used for all merging involved here, not just the leader's own final-on-the-fly merge, so merging is consistently cache efficient. (Workers that must merge on their own are therefore similar to traditional randomAccess callers, so these cases become important enough to optimize with the batch memory patch, although that's still independently useful.) No merging in parallel ---------------------- Currently, merging worker *output* runs may only occur in the leader process. In other words, we always keep n worker processes busy with scanning-and-sorting (and maybe some merging), but then all processes but the leader process grind to a halt (note that the leader process can participate as a scan-and-sort tuplesort worker, just as it will everywhere else, which is why I specified "parallel_workers = 7" but talked about 8 workers). One leader process is kept busy with merging these n output runs on the fly, so things will bottleneck on that, which you saw in the example above. As already described, workers will sometimes merge in parallel, but only their own runs -- never another worker's runs. I did attempt to address the leader merge bottleneck by implementing cross-worker run merging in workers. I got as far as implementing a very rough version of this, but initial results were disappointing, and so that was not pursued further than the experimentation stage. Parallel merging is a possible future improvement that could be added to what I've come up with, but I don't think that it will move the needle in a really noticeable way. Partitioning for parallelism (samplesort style "bucketing") ----------------------------------------------------------- Perhaps a partition-based approach would be more effective than parallel merging (e.g., redistribute slices of worker runs across workers along predetermined partition boundaries, sort a range of values within dedicated workers, then concatenate to get final result, a bit like the in-memory samplesort algorithm). That approach would not suit CREATE INDEX, because the approach's great strength is that the workers can run in parallel for the entire duration, since there is no merge bottleneck (this assumes good partition boundaries, which is of a bit risky assumption). Parallel CREATE INDEX wants something where the workers can independently write the index, and independently WAL log, and independently create a unified set of internal pages, all of which is hard. This patch series will tend to proportionally speed up CREATE INDEX statements at a level that is comparable to other major database systems. That's enough progress for one release. I think that partitioning to sort is more useful for query execution than for utility statements like CREATE INDEX. Partitioning and merge joins ---------------------------- Robert has often speculated about what it would take to make merge joins work well in parallel. I think that "range distribution"/bucketing will prove an important component of that. It's just too useful to aggregate tuples in shared memory initially, and have workers sort them without any serial merge bottleneck; arguments about misestimations, data skew, and so on should not deter us from this, long term. This approach has minimal IPC overhead, especially with regard to LWLock contention. This kind of redistribution probably belongs in a Gather-like node, though, which has access to the context necessary to determine a range, and even dynamically alter the range in the event of a misestimation. Under this scheme, tuplesort.c just needs to be instructed that these worker-private Tuplesortstates are range-partitioned (i.e., the sorts are virtually independent, as far as it's concerned). That's a bit messy, but it is still probably the way to go for merge joins and other sort-reliant executor nodes. buffile.c, and "unification" ============================ There has been significant new infrastructure added to make logtape.c aware of workers. buffile.c has in turn been taught about unification as a first class part of the abstraction, with low-level management of certain details occurring within fd.c. So, "tape unification" within processes to open other backend's logical tapes to generate a unified logical tapeset for the leader to merge is added. This is probably the single biggest source of complexity for the patch, since I must consider: * Creating a general, reusable abstraction for other possible BufFile users (logtape.c only has to serve tuplesort.c, though). * Logical tape free space management. * Resource management, file lifetime, etc. fd.c resource management can now close a file at xact end for temp files, while not deleting it in the leader backend (only the "owning" worker backend deletes the temp file it owns). * Crash safety (e.g., when to truncate existing temp files, and when not to). CREATE INDEX user interface =========================== There are two ways of determine how many parallel workers a CREATE INDEX requests: * A cost model, which is closely based on create_plain_partial_paths() at the moment. This needs more work, particularly to model things like maintenance_work_mem. Even still, it isn't terrible. * A parallel_workers storage parameter, which completely bypasses the cost model. This is the "DBA knows best" approach, and is what I've consistently used during testing. Corey Huinker has privately assisted me with performance testing the patch, using his own datasets. Testing has exclusively used the storage parameter. I've added a new GUC, max_parallel_workers_maintenance, which is essentially the utility statement equivalent of max_parallel_workers_per_gather. This is clearly necessary, since we're using up to maintenance_work_mem per worker, which is of course typically much higher than work_mem. I didn't feel the need to create a new maintenance-wise variant GUC for things like min_parallel_relation_size, though. Only this one new GUC is added (plus the new storage parameter, parallel_workers, not to be confused with the existing table storage parameter of the same name). I am much more concerned about the tuplesort.h interface than the CREATE INDEX user interface as such. The user interface is merely a facade on top of tuplesort.c and nbtsort.c (and not one that I'm particularly attached to). -- Peter Geoghegan
Attachment
pgsql-hackers by date: