Thread: WIP: [[Parallel] Shared] Hash
Hi hackers, In PostgreSQL 9.6, hash joins can be parallelised under certain conditions, but a copy of the hash table is built in every participating backend. That means that memory and CPU time are wasted. In many cases, that's OK: if the hash table contents are small and cheap to compute, then we don't really care, we're just happy that the probing can be done in parallel. But in cases where the hash table is large and/or expensive to build, we could do much better. I am working on that problem. To recap the situation in 9.6, a hash join can appear below a Gather node and it looks much the same as a non-parallel hash join except that it has a partial outer plan: -> Hash Join -> <partial outer plan> -> Hash -> <non-partial parallel-safe innerplan> A partial plan is one that has some kind of 'scatter' operation as its ultimate source of tuples. Currently the only kind of scatter operation is a Parallel Seq Scan (but see also the Parallel Index Scan and Parallel Bitmap Scan proposals). The scatter operation enables parallelism in all the executor nodes above it, as far as the enclosing 'gather' operation which must appear somewhere above it. Currently the only kind of gather operation is a Gather node (but see also the Gather Merge proposal which adds a new one). The inner plan is built from a non-partial parallel-safe path and will be run in every worker. Note that a Hash Join node in 9.6 isn't parallel-aware itself: it's not doing anything special at execution time to support parallelism. The planner has determined that correct partial results will be produced by this plan, but the executor nodes are blissfully unaware of parallelism. PROPOSED NEW PLAN VARIANTS Shortly I will post a patch which introduces two new hash join plan variants that are parallel-aware: 1. Parallel Hash Join with Shared Hash -> Parallel Hash Join -> <partial outer plan> -> Shared Hash -> <non-partialparallel-safe inner plan> In this case, there is only one copy of the hash table and only one participant loads it. The other participants wait patiently for one chosen backend to finish building the hash table, and then they all wake up and probe. Call the number of participants P, being the number of workers + 1 (for the leader). Compared to a non-shared hash plan, we avoid wasting CPU and IO resources running P copies of the inner plan in parallel (something that is not well captured in our costing model for parallel query today), and we can allow ourselves to use a hash table P times larger while sticking to the same overall space target of work_mem * P. 2. Parallel Hash Join with Parallel Shared Hash -> Parallel Hash Join -> <partial outer plan> -> Parallel Shared Hash -> <partialinner plan> In this case, the inner plan is run in parallel by all participants. We have the advantages of a shared hash table as described above, and now we can also divide the work of running the inner plan and hashing the resulting tuples by P participants. Note that Parallel Shared Hash is acting as a special kind of gather operation that is the counterpart to the scatter operation contained in the inner plan. PERFORMANCE So far I have been unable to measure any performance degradation compared with unpatched master for hash joins with non-shared hash. That's good because it means that I didn't slow existing plans down when I introduced a bunch of conditional branches to existing hash join code. Laptop testing shows greater than 2x speedups on several of the TPC-H queries with single batches, and no slowdowns. I will post test numbers on big rig hardware in the coming weeks when I have the batching code in more complete and stable shape. IMPLEMENTATION I have taken the approach of extending the existing hash join algorithm, rather than introducing separate hash join executor nodes or a fundamentally different algorithm. Here's a short description of what the patch does: 1. SHARED HASH TABLE To share data between participants, the patch uses two other patches I have proposed: DSA areas[1], which provide a higher level interface to DSM segments to make programming with processes a little more like programming with threads, and in particular a per-parallel-query DSA area[2] that is made available for any executor node that needs some shared work space. The patch uses atomic operations to push tuples into the hash table buckets while building, rehashing and loading, and then the hash table is immutable during probing (except for match flags used to implement outer joins). The existing memory chunk design is retained for dense allocation of tuples, which provides a convenient way to rehash the table when its size changes. 2. WORK COORDINATION To coordinate parallel work, this patch uses two other patches: barriers[3], to implement a 'barrier' or 'phaser' synchronisation primitive, and those in turn use the condition variables proposed by Robert Haas. Barriers provide a way for participants to break work up into phases that they unanimously agree to enter together, which is a basic requirement for parallelising hash joins. It is not safe to insert into the hash table until exactly one participant has created it; it is not safe to probe the hash table until all participants have finished inserting into it; it is not safe to scan it for unmatched tuples until all participants have finished probing it; it is not safe to discard it and start loading the next batch until ... you get the idea. You could also construct appropriate synchronisation using various other interlocking primitives or flow control systems, but fundamentally these wait points would exist at some level, and I think this way is quite clean and simple. YMMV. If we had exactly W workers and the leader didn't participate, then we could use a simple simple pthread- or MPI-style barrier without an explicit notion of 'phase'. We would simply take the existing hash join code, add the shared hash table, add barrier waits at various points and make sure that all participants always hit all of those points in the same order, and it should All Just Work. But we have a variable party size and a dual-role leader process, and I want to highlight the specific problems that causes here because they increase the patch size significantly: Problem 1: We don't know how many workers will actually start. We know how many were planned, but at execution time we may have exhausted limits and actually get a smaller number. So we can't use "static" barriers like the classic barriers in POSIX or MPI where the group size is known up front. We need "dynamic" barriers with attach and detach operations. As soon as you have varying party size you need some kind of explicit model of the current phase, so that a new participant can know what to do when it joins. For that reason, this patch uses a phase number to track progress through the parallel hash join. See MultiExecHash and ExecHashJoin which have switch statements allowing a newly joined participant to synchronise their own state machine and program counter with the phase. Problem 2: One participant is not like the others: Gather may or may not decide to run its subplan directly if the worker processes aren't producing any tuples (and the proposed Gather Merge is the same). The problem is that it also needs to consume tuples from the fixed-size queues of the regular workers. A deadlock could arise if the leader's plan blocks waiting for other participants while another participant has filled its output queue and is waiting for the leader to consume. One way to avoid such deadlocks is to follow the rule that the leader should never wait for other participants if there is any possibility that they have emitted tuples. The simplest way to do that would be to have shared hash plans refuse to run in the leader by returning NULL to signal the end of this partial tuple stream, but then we'd lose a CPU compared to non-shared hash plans. The latest point the leader can exit while respecting that rule is at the end of probing the first batch. That is the approach taken by the patch currently. See ExecHashCheckForEarlyExit for logic and discussion. It would be better to be able to use the leader in later batches too, but as far as I can see that'd require changes that are out of scope for this patch. One idea would be an executor protocol change allowing plans running in the leader to detach and yield, saying 'I have no further tuples right now, but I'm not finished; try again later', and then reattach when you call it back. Clearly that sails close to asynchronous execution territory. Problem 3: If the leader drops out after the first batch to solve problem 2, then it may leave behind batch files which must be processed by other participants. I had originally planned to defer work on batch file sharing until a later iteration, thinking that it would be a nice performance improvement to redistribute work from uneven batch files, but it turns out to be necessary for correct results because of participants exiting early. I am working on a very simple batch sharing system to start with... Participants still generate their own batch files, and then new operations BufFileExport and BufFileImport are used to grant read-only access to the BufFile to other participants. Each participant reads its own batch files entirely and then tries to read from every other participant's batch files until they are all exhausted, using a shared read head. The per-tuple locking granularity, extra seeking and needless buffering in every backend on batch file reads aren't great, and I'm still figuring out temporary file cleanup/ownership semantics. There may be an opportunity to make use of 'unified' BufFile concepts from Peter Geoghegan's work, or create some new reusable shared tuple spilling infrastructure. 3. COSTING For now, I have introduced a GUC called cpu_shared_tuple_cost which provides a straw-man model of the overhead of exchanging tuples via a shared hash table, and the extra process coordination required. If it's zero then a non-shared hash plan (ie multiple copies) has the same cost as a shared hash plan, even though the non-shared hash plan wastefully runs P copies of the plan. If cost represents runtime and and we assume perfectly spherical cows running without interference from each other, that makes some kind of sense, but it doesn't account for the wasted resources and contention caused by running the same plan in parallel. I don't know what to do about that yet. If cpu_shared_tuple_cost is a positive number, as it probably should be (more on that later), then shared hash tables look more expensive than non-shared ones, which is technically true (CPU cache sharing etc) but unhelpful because what you lose there you tend to gain by not running all those plans in parallel. In other words cpu_shared_tuple_cost doesn't really model the cost situation at all well, but it's a useful GUC for development purposes for now as positive and negative numbers can be used to turn the feature on and off for testing... As for work_mem, it seems to me that 9.6 already established that work_mem is a per participant limit, and it would be only fair to let a shared plan use a total of work_mem * P too. I am still working on work_mem accounting and reporting. Accounting for the parallelism in parallel shared hash plans is easy though: their estimated tuple count is already divided by P in the underlying partial path, and that is a fairly accurate characterisation of what's going to happen at execution time: it's often going to go a lot faster, and those plans are the real goal of this work. STATUS Obviously this is a work in progress. I am actively working on the following: * rescan * batch number increases * skew buckets * costing model and policy/accounting for work_mem * shared batch file reading * preloading next batch * debugging and testing * tidying and refactoring The basic approach is visible and simple cases are working though, so I am submitting this WIP work for a round of review in the current commitfest and hoping to get some feedback and ideas. I will post the patch in a follow-up email shortly... Thanks for reading! [1] https://www.postgresql.org/message-id/flat/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com#CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com [2] https://www.postgresql.org/message-id/flat/CAEepm%3D0HmRefi1%2BxDJ99Gj5APHr8Qr05KZtAxrMj8b%2Bay3o6sA%40mail.gmail.com [3] https://www.postgresql.org/message-id/flat/CAEepm%3D2_y7oi01OjA_wLvYcWMc9_d%3DLaoxrY3eiROCZkB_qakA%40mail.gmail.com -- Thomas Munro http://www.enterprisedb.com
Thomas Munro <thomas.munro@enterprisedb.com> wrote: > The basic approach is visible and simple cases are working though, so > I am submitting this WIP work for a round of review in the current > commitfest and hoping to get some feedback and ideas. I will post the > patch in a follow-up email shortly... Aloha, Please find a WIP patch attached. Everything related to batch reading is not currently in a working state, which breaks multi-batch joins, but many single batch cases work correctly. In an earlier version I had multi-batch joins working but was before I started tackling problems 2 and 3 listed in my earlier message. There is some error handling and resource cleanup missing, and doubtless some cases not handled correctly. But I thought it would be good to share this development snapshot for discussion, so I'm posting this as is, and will post an updated version when I've straightened out the batching code some more. To apply parallel-hash-v1, first apply the following patches, in this order: condition-variable-v3.patch [1] remove-useless-barrier-header-v2.patch [2] barrier-v3.patch [2] dsa-v4.patch [3] dsa-area-for-executor-v1.patch [4] When applying dsa-v4 on top of barrier-v3, it will reject a hunk in src/backend/storage/ipc/Makefile where they both add their object file. Simply add dsa.o to OBJS manually. Then you can apply parallel-hash-v1.patch, which is attached to this message. [1] https://www.postgresql.org/message-id/flat/CA%2BTgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr%3DC56Xng%40mail.gmail.com [2] https://www.postgresql.org/message-id/CAEepm%3D1wrrzxh%3DSRCF_Hk4SZQ9BULy1vWsicx0EbgUf0B85vZQ%40mail.gmail.com [3] https://www.postgresql.org/message-id/flat/CAEepm%3D1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA%40mail.gmail.com [4] https://www.postgresql.org/message-id/flat/CAEepm%3D0HmRefi1%2BxDJ99Gj5APHr8Qr05KZtAxrMj8b%2Bay3o6sA%40mail.gmail.com -- Thomas Munro http://www.enterprisedb.com
Attachment
On Tue, Nov 1, 2016 at 5:33 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Please find a WIP patch attached. Everything related to batch reading > is not currently in a working state, which breaks multi-batch joins, > but many single batch cases work correctly. In an earlier version I > had multi-batch joins working but was before I started tackling > problems 2 and 3 listed in my earlier message. Here is a better version with code to handle multi-batch joins. The BufFile sharing approach to reading other participants' batch files is a straw-man (perhaps what we really want would look more like a shared tuplestore?), but solves the immediate problem I described earlier so I can focus on other aspects of the problem. There may be some issues with cleanup though, more on that soon. Here's a summary of how this patch chops the hash join up into phases. The 'phase' is an integer that encodes the step we're up to in the algorithm, including the current batch number, and I represent that with macros like PHJ_PHASE_HASHING and PHJ_PHASE_PROBING_BATCH(42). Each phase is either serial, meaning that one participant does something special, or parallel meaning that all participants do the same thing. It goes like this: * PHJ_PHASE_INIT The initial phase established by the leader before launching workers. * PHJ_PHASE_CREATING Serial: One participant creates the hash table. * PHJ_PHASE_HASHING Serial or parallel: Depending on plan, one or all participants execute the inner plan to completion, building the hash table for batch 0 and possibly writing tuples to batch files on disk for future batches. * PHJ_PHASE_RESIZING Serial: One participant resizes the hash table if necessary. * PHJ_PHASE_REBUCKETING Parallel: If the hash table was resized, all participants rehash all the tuples in it and insert them into the buckets of the new larger hash table. * PHJ_PHASE_PROBING_BATCH(0) Parallel: All participants execute the outer plan to completion. For each tuple they either probe the hash table if it's for the current batch, or write it out to a batch file if it's for a future batch. For each tuple matched in the hash table, they set the matched bit. When they are finished probing batch 0, they also preload tuples from inner batch 1 into a secondary hash table until work_mem is exhausted (note that at this time work_mem is occupied by the primary hash table: this is just a way to use any remaining work_mem and extract a little bit more parallelism, since otherwise every participant would be waiting for all participants to finish probing; instead we wait for all paticipants to finish probing AND for spare work_mem to run out). * PHJ_PHASE_UNMATCHED_BATCH(0) Parallel: For right/full joins, all participants then scan the hash table looking for unmatched tuples. ... now we are ready for batch 1 ... * PHJ_PHASE_PROMOTING_BATCH(1) Serial: One participant promotes the secondary hash table to become the new primary hash table. * PHJ_PHASE_LOADING_BATCH(1) Parallel: All participants finish loading inner batch 1 into the hash table (work that was started in the previous probing phase). * PHJ_PHASE_PREPARING_BATCH(1) Serial: One participant resets the batch reading heads, so that we are ready to read from outer batch 1, and inner batch 2. * PHJ_PHASE_PROBING_BATCH(1) Parallel: All participants read from outer batch 1 to probe the hash table, then read from inner batch 2 to preload tuples into the secondary hash table. * PHJ_PHASE_UNMATCHED_BATCH(1) Parallel: For right/full joins, all participants then scan the hash table looking for unmatched tuples. ... now we are ready for batch 2 ... Then all participants synchronise a final time to enter batch PHJ_PHASE_PROMOTING_BATCH(nbatch), which is one past the end and is the point at which it is safe to clean up. (There may be an optimisation where I can clean up after the last participant detaches instead, more on that soon). Obviously I'm actively working on developing and stabilising all this. Some of the things I'm working on are: work_mem accounting, batch increases, rescans and figuring out if the resource management for those BufFiles is going to work. There are quite a lot of edge cases some of which I'm still figuring out, but I feel like this approach is workable. At this stage I want to share what I'm doing to see if others have feedback, ideas, blood curdling screams of horror, etc. I will have better patches and a set of test queries soon. Thanks for reading. -- Thomas Munro http://www.enterprisedb.com
Attachment
Obviously I'm actively working on developing and stabilising all this.
Some of the things I'm working on are: work_mem accounting, batch
increases, rescans and figuring out if the resource management for
those BufFiles is going to work. There are quite a lot of edge cases
some of which I'm still figuring out, but I feel like this approach is
workable. At this stage I want to share what I'm doing to see if
others have feedback, ideas, blood curdling screams of horror, etc. I
will have better patches and a set of test queries soon. Thanks for
reading.
This patch doesn't receive any review. Patch is not applying properly to HEAD.
On Sat, Dec 3, 2016 at 1:38 AM, Haribabu Kommi <kommi.haribabu@gmail.com> wrote: > Moved to next CF with "waiting on author" status. Unfortunately it's been a bit trickier than I anticipated to get the interprocess batch file sharing and hash table shrinking working correctly and I don't yet have a new patch in good enough shape to post in time for the January CF. More soon. -- Thomas Munro http://www.enterprisedb.com
On Sat, Dec 31, 2016 at 2:52 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Unfortunately it's been a bit trickier than I anticipated to get the > interprocess batch file sharing and hash table shrinking working > correctly and I don't yet have a new patch in good enough shape to > post in time for the January CF. More soon. I noticed a bug in your latest revision: > + /* > + * In HJ_NEED_NEW_OUTER, we already selected the current inner batch for > + * reading from. If there is a shared hash table, we may have already > + * partially loaded the hash table in ExecHashJoinPreloadNextBatch. > + */ > + Assert(hashtable->batch_reader.batchno = curbatch); > + Assert(hashtable->batch_reader.inner); Obviously this isn't supposed to be an assignment. -- Peter Geoghegan
On Mon, Jan 2, 2017 at 3:17 PM, Peter Geoghegan <pg@heroku.com> wrote: > I noticed a bug in your latest revision: > >> + /* >> + * In HJ_NEED_NEW_OUTER, we already selected the current inner batch for >> + * reading from. If there is a shared hash table, we may have already >> + * partially loaded the hash table in ExecHashJoinPreloadNextBatch. >> + */ >> + Assert(hashtable->batch_reader.batchno = curbatch); >> + Assert(hashtable->batch_reader.inner); > > Obviously this isn't supposed to be an assignment. Right, thanks! I will post a new rebased version soon with that and some other nearby problems fixed. -- Thomas Munro http://www.enterprisedb.com
On Tue, Jan 3, 2017 at 10:53 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > I will post a new rebased version soon with that and > some other nearby problems fixed. Here is a new WIP patch. I have plenty of things to tidy up (see note at end), but the main ideas are now pretty clear and I'd appreciate some feedback. The main changes since the last patch, other than debugging, are: * the number of batches now increases if work_mem would be exceeded; the work of 'shrinking' the hash table in memory in that case is done in parallel * work_mem accounting is done at chunk level, instead of tuples * interlocking has been rethought Previously, I had some ideas about using some lock free tricks for managing chunks of memory, but you may be relieved to hear that I abandoned those plans. Now, atomic ops are used only for one thing: pushing tuples into the shared hash table buckets. An LWLock called 'chunk_lock' protects various linked lists of chunks of memory, and also the shared work_mem accounting. The idea is that backends can work independently on HASH_CHUNK_SIZE blocks of tuples at a time in between needing to acquire that lock briefly. Also, there is now a second barrier, used to coordinate hash table shrinking. This can happen any number of times during PHJ_PHASE_HASHING and PHJ_PHASE_LOADING_BATCH(n) phases as required to stay under work_mem, so it needed to be a separate barrier. The communication in this patch is a bit more complicated than other nearby parallel query projects I've looked at; probably the worst bit is the leader deadlock avoidance stuff (see ExecHashCheckForEarlyExit), and the second worst bit is probably the switch statements for allowing participants to show up late and get in sync, which makes that other problem even more annoying; without those problems and with just the right kind of reusable shared tuplestore, this would be a vastly simpler patch. Those are not really fundamental problems of parallel joins using a shared hash tables, but they're problems I don't have a better solution to right now. Stepping back a bit, I am aware of the following approaches to hash join parallelism: 1. Run the inner plan and build a private hash table in each participant, and then scatter the outer plan arbitrarily across participants. This is what 9.6 does, and it's a good plan for small hash tables with fast inner plans, but a terrible plan for expensive or large inner plans. Communication overhead: zero; CPU overhead: runs the inner plan in k workers simultaneously; memory overhead: builds k copies of the hashtable; disk overhead: may need to spill k copies of all batches to disk if work_mem exceeded; restrictions: Can't do right/full joins because no shared 'matched' flags. 2. Run a partition-wise hash join[1]. Communication overhead: zero; CPU overhead: zero; memory overhead: zero; disk overhead: zero; restrictions: the schema must include compatible partition keys, and potential parallelism is limited by the number of partitions. 3. Repartition the data on the fly, and then run a partition-wise hash join. Communication overhead: every tuple on at least one and possibly both sides must be rerouted to the correct participant; CPU overhead: zero, once repartitioning is done; memory overhead: none; disk overhead: may need to spill partitions to disk if work_mem is exceeded 4. Scatter both the inner and outer plans arbitrarily across participants (ie uncorrelated partitioning), and build a shared hash table. Communication overhead: synchronisation of build/probe phases, but no tuple rerouting; CPU overhead: none; memory overhead: none; disk overhead: may need to spill batches to disk; restrictions: none in general, but currently we have to drop the leader after the first batch of a multi-batch join due to our consumer/producer leader problem mentioned in earlier messages. We have 1. This proposal aims to provide 4. It seems we have 2 on the way (that technique works for all 3 join algorithms without any changes to the join operators and looks best by any measure, but is limited by the user's schema, ie takes careful planning on the user's part instead of potentially helping any join). Other databases including SQL Server offer 3. I suspect that 4 is probably a better fit than 3 for Postgres today, because the communication overhead of shovelling nearly all tuples through extra tuple queues to route them to the right hash table would surely be very high, though I can see that it's very attractive to have a reusable tuple repartitioning operator and then run k disjoint communication-free joins (again, without code change to the join operator, and to the benefit of all join operators). About the shared batch reading code: this patch modifies BufFile so that a temporary file can be shared read-only with other participants, and then introduces a mechanism for coordinating shared reads. Each worker starts out reading all the tuples from the file that it wrote, before attempting to steal tuples from the files written by other participants, until there are none left anywhere. In the best case they all write out and then read back in just their own files with minimal contention, and contention rises as tuples are less evenly distributed among participants, but we never quite get the best case because the leader always leaves behind a bunch of batches for the others to deal with when it quits early. Maybe I should separate all the batch reader stuff into another patch so it doesn't clutter the hash join code up so much? I will start reviewing Parallel Tuplesort shortly, which includes some related ideas. Some assorted notes on the status: I need to do some thinking about the file cleanup logic: both explicit deletes at the earliest possible time, and failure/error paths. Currently the creator of each file is responsible for cleaning it up, but I guess if the creator aborts early the file disappears underneath the others' feet, and then I guess they might raise a confusing error report that races against the root cause error report; I'm looking into that. Rescans and skew buckets not finished yet. The new chunk-queue based ExecScanHashTableForUnmatched isn't tested yet (it replaces and earlier version that was doing a bucket-by-bucket parallel scan). There are several places where I haven't changed the private hash table code to match the shared version because I'm not sure about that, in particular the idea of chunk-based accounting (which happens to be convenient for this code, but I also believe it to be more correct). I'm still trying to decide how to report the hash table tuple count and size: possibly the grand totals. Generally I need to do some tidying and provide a suite of queries that hits interesting cases. I hope to move on these things fairly quickly now that I've got the hash table resizing and batch sharing stuff working (a puzzle that kept me very busy for a while) though I'm taking a break for a bit to do some reviewing. The test query I've been looking at recently is TPCH Q9. With scale 1GB and work_mem = 64KB, I get a query plan that includes three different variants of Hash node: Hash (run in every backend, duplicate hash tables), Shared Hash (run in just one backend, but allowed to use the sum of work_mem of all the backends, so usually wins by avoiding batching), and Parallel Shared Hash (run in parallel and using sum of work_mem). As an anecdatum, I see around 2.5x speedup against master, using only 2 workers in both cases, though it seems to be bimodal, either 2x or 2.8x, which I expect has something to do with that leader exit stuff and I'm looking into that.. More on performance soon. Thanks for reading! [1] https://www.postgresql.org/message-id/flat/CAFjFpRfQ8GrQvzp3jA2wnLqrHmaXna-urjm_UY9BqXj%3DEaDTSA%40mail.gmail.com -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Sat, Jan 7, 2017 at 9:01 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Tue, Jan 3, 2017 at 10:53 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> I will post a new rebased version soon with that and >> some other nearby problems fixed. > > Here is a new WIP patch. I forgot to mention: this applies on top of barrier-v5.patch, over here: https://www.postgresql.org/message-id/CAEepm%3D3g3EC734kgriWseiJPfUQZeoMWdhAfzOc0ecewAa5uXg%40mail.gmail.com -- Thomas Munro http://www.enterprisedb.com
On Sat, Jan 7, 2017 at 9:01 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Tue, Jan 3, 2017 at 10:53 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> I will post a new rebased version soon with that and >> some other nearby problems fixed. > > Here is a new WIP patch. To make this easier to understand and harmonise the logic used in a few places, I'm now planning to chop it up into a patch series, probably something like this: 1. Change existing hash join code to use chunk-based accounting 2. Change existing hash join code to use a new interface for dealing with batches 3. Add shared hash join support, single batch only 4. Add components for doing shared batch reading (unused) 5. Add multi-batch shared hash join support -- Thomas Munro http://www.enterprisedb.com
On Fri, Jan 6, 2017 at 12:01 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Here is a new WIP patch. I have plenty of things to tidy up (see note > at end), but the main ideas are now pretty clear and I'd appreciate > some feedback. I have some review feedback for your V3. I've chosen to start with the buffile.c stuff, since of course it might share something with my parallel tuplesort patch. This isn't comprehensive, but I will have more comprehensive feedback soon. I'm not surprised that you've generally chosen to make shared BufFile management as simple as possible, with no special infrastructure other than the ability to hold open other backend temp files concurrently within a worker, and no writing to another worker's temp file, or shared read pointer. As you put it, everything is immutable. I couldn't see much opportunity for adding a lot of infrastructure that wasn't written explicitly as parallel hash join code/infrastructure. My sense is that that was a good decision. I doubted that you'd ever want some advanced, generic shared BufFile thing with multiple read pointers, built-in cache coherency, etc. (Robert seemed to think that you'd go that way, though.) Anyway, some more specific observations: * ISTM that this is the wrong thing for shared BufFiles: > +BufFile * > +BufFileImport(BufFileDescriptor *descriptor) > +{ ... > + file->isInterXact = true; /* prevent cleanup by this backend */ There is only one user of isInterXact = true BufFiles at present, tuplestore.c. It, in turn, only does so for cases that require persistent tuple stores. A quick audit of these tuplestore.c callers show this to just be cursor support code within portalmem.c. Here is the relevant tuplestore_begin_heap() rule that that code adheres to, unlike the code I've quoted above: * interXact: if true, the files used for on-disk storage persist beyond the* end of the current transaction. NOTE: It'sthe caller's responsibility to* create such a tuplestore in a memory context and resource owner that will* also survivetransaction boundaries, and to ensure the tuplestore is closed* when it's no longer wanted. I don't think it's right for buffile.c to know anything about file paths directly -- I'd say that that's a modularity violation. PathNameOpenFile() is called by very few callers at the moment, all of them very low level (e.g. md.c), but you're using it within buffile.c to open a path to the file that you obtain from shared memory directly. This is buggy because the following code won't be reached in workers that call your BufFileImport() function: /* Mark it for deletion at close */ VfdCache[file].fdstate |= FD_TEMPORARY; /* Register it with the current resource owner */ if (!interXact) { VfdCache[file].fdstate |= FD_XACT_TEMPORARY; ResourceOwnerEnlargeFiles(CurrentResourceOwner); ResourceOwnerRememberFile(CurrentResourceOwner, file); VfdCache[file].resowner = CurrentResourceOwner; /* ensure cleanup happens at eoxact */ have_xact_temporary_files = true; } Certainly, you don't want the "Mark it for deletion at close" bit. Deletion should not happen at eoxact for non-owners-but-sharers (within FileClose()), but you *do* want CleanupTempFiles() to call FileClose() for the virtual file descriptors you've opened in the backend, to do some other cleanup. In general, you want to buy into resource ownership for workers. As things stand, I think that this will leak virtual file descriptors. That's really well hidden because there is a similar CleanupTempFiles() call at proc exit, I think. (Didn't take the time to make sure that that's what masked problems. I'm sure that you want minimal divergence with serial cases, resource-ownership-wise, in any case.) Instead of all this, I suggest copying some of my changes to fd.c, so that resource ownership within fd.c differentiates between a vfd that is owned by the backend in the conventional sense, including having a need to delete at eoxact, as well as a lesser form of ownership where deletion should not happen. Maybe you'll end up using my BufFileUnify interface [1] within workers (instead of just within the leader, as with parallel tuplesort), and have it handle all of that for you. Currently, that would mean that there'd be an unused/0 sized "local" segment for the unified BufFile, but I was thinking of making that not happen unless and until a new segment is actually needed, so even that minor wart wouldn't necessarily affect you. > Some assorted notes on the status: I need to do some thinking about > the file cleanup logic: both explicit deletes at the earliest possible > time, and failure/error paths. Currently the creator of each file is > responsible for cleaning it up, but I guess if the creator aborts > early the file disappears underneath the others' feet, and then I > guess they might raise a confusing error report that races against the > root cause error report; I'm looking into that. Rescans and skew > buckets not finished yet. The rescan code path seems to segfault when the regression tests are run. There is a NULL pointer dereference here: > @@ -985,6 +1855,14 @@ ExecReScanHashJoin(HashJoinState *node) > node->hj_HashTable = NULL; > node->hj_JoinState = HJ_BUILD_HASHTABLE; > > + if (HashJoinTableIsShared(node->hj_HashTable)) > + { > + /* Coordinate a rewind to the shared hash table creation phase. */ > + BarrierWaitSet(&hashNode->shared_table_data->barrier, > + PHJ_PHASE_BEGINNING, > + WAIT_EVENT_HASHJOIN_REWINDING3); > + } > + Clearly, HashJoinTableIsShared() should not be called when its argument (in this case node->hj_HashTable) is NULL. In general, I think you should try to set expectations about what happens when the regression tests run up front, because that's usually the first thing reviewers do. Various compiler warnings on my system: /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHash.c:1376:7: warning: variable ‘size_before_shrink’ set but not used [-Wunused-but-set-variable] Size size_before_shrink = 0; ^ ... /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c: In function ‘ExecHashJoinCloseBatch’: /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1548:28: warning: variable ‘participant’ set but not used [-Wunused-but-set-variable] HashJoinParticipantState *participant; ^ /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c: In function ‘ExecHashJoinRewindBatches’: /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1587:23: warning: variable ‘batch_reader’ set but not used [-Wunused-but-set-variable] HashJoinBatchReader *batch_reader; ^ Is this change really needed?: > --- a/src/backend/executor/nodeSeqscan.c > +++ b/src/backend/executor/nodeSeqscan.c > @@ -31,6 +31,8 @@ > #include "executor/nodeSeqscan.h" > #include "utils/rel.h" > > +#include <unistd.h> > + > static void InitScanRelation(SeqScanState *node, EState *estate, int eflags); > static TupleTableSlot *SeqNext(SeqScanState *node); > That's all I have for now... [1] https://wiki.postgresql.org/wiki/Parallel_External_Sort#buffile.c.2C_and_BufFile_unification -- Peter Geoghegan
On Tue, Jan 10, 2017 at 8:56 PM, Peter Geoghegan <pg@heroku.com> wrote: > Instead of all this, I suggest copying some of my changes to fd.c, so > that resource ownership within fd.c differentiates between a vfd that > is owned by the backend in the conventional sense, including having a > need to delete at eoxact, as well as a lesser form of ownership where > deletion should not happen. If multiple processes are using the same file via the BufFile interface, I think that it is absolutely necessary that there should be a provision to track the "attach count" of the BufFile. Each process that reaches EOXact decrements the attach count and when it reaches 0, the process that reduced it to 0 removes the BufFile. I think anything that's based on the notion that leaders will remove files and workers won't is going to be fragile and limiting, and I am going to push hard against any such proposal. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Wed, Jan 11, 2017 at 10:57 AM, Robert Haas <robertmhaas@gmail.com> wrote: > On Tue, Jan 10, 2017 at 8:56 PM, Peter Geoghegan <pg@heroku.com> wrote: >> Instead of all this, I suggest copying some of my changes to fd.c, so >> that resource ownership within fd.c differentiates between a vfd that >> is owned by the backend in the conventional sense, including having a >> need to delete at eoxact, as well as a lesser form of ownership where >> deletion should not happen. > > If multiple processes are using the same file via the BufFile > interface, I think that it is absolutely necessary that there should > be a provision to track the "attach count" of the BufFile. Each > process that reaches EOXact decrements the attach count and when it > reaches 0, the process that reduced it to 0 removes the BufFile. I > think anything that's based on the notion that leaders will remove > files and workers won't is going to be fragile and limiting, and I am > going to push hard against any such proposal. Okay. My BufFile unification approach happens to assume that backends clean up after themselves, but that isn't a ridged assumption (of course, these are always temp files, so we reason about them as temp files). It could be based on a refcount fairly easily, such that, as you say here, deletion of files occurs within workers (that "own" the files) only as a consequence of their being the last backend with a reference, that must therefore "turn out the lights" (delete the file). That seems consistent with what I've done within fd.c, and what I suggested to Thomas (that he more or less follow that approach). You'd probably still want to throw an error when workers ended up not deleting BufFile segments they owned, though, at least for parallel tuplesort. This idea is something that's much more limited than the SharedTemporaryFile() API that you sketched on the parallel sort thread, because it only concerns resource management, and not how to make access to the shared file concurrency safe in any special, standard way. I think that this resource management is something that should be managed by buffile.c (and the temp file routines within fd.c that are morally owned by buffile.c, their only caller). It shouldn't be necessary for a client of this new infrastructure, such as parallel tuplesort or parallel hash join, to know anything about file paths. Instead, they should be passing around some kind of minimal private-to-buffile state in shared memory that coordinates backends participating in BufFile unification. Private state created by buffile.c, and passed back to buffile.c. Everything should be encapsulated within buffile.c, IMV, making parallel implementations as close as possible to their serial implementations. -- Peter Geoghegan
On Wed, Jan 11, 2017 at 2:20 PM, Peter Geoghegan <pg@heroku.com> wrote: > You'd probably still want to throw an error when workers ended up not > deleting BufFile segments they owned, though, at least for parallel > tuplesort. Don't see why. > This idea is something that's much more limited than the > SharedTemporaryFile() API that you sketched on the parallel sort > thread, because it only concerns resource management, and not how to > make access to the shared file concurrency safe in any special, > standard way. Actually, I only intended that sketch to be about resource management. Sounds like I didn't explain very well. > Instead, they should be passing around some kind of minimal > private-to-buffile state in shared memory that coordinates backends > participating in BufFile unification. Private state created by > buffile.c, and passed back to buffile.c. Everything should be > encapsulated within buffile.c, IMV, making parallel implementations as > close as possible to their serial implementations. That seems reasonable although I haven't studied the details carefully as yet. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Wed, Jan 11, 2017 at 11:20 AM, Peter Geoghegan <pg@heroku.com> wrote: >> If multiple processes are using the same file via the BufFile >> interface, I think that it is absolutely necessary that there should >> be a provision to track the "attach count" of the BufFile. Each >> process that reaches EOXact decrements the attach count and when it >> reaches 0, the process that reduced it to 0 removes the BufFile. I >> think anything that's based on the notion that leaders will remove >> files and workers won't is going to be fragile and limiting, and I am >> going to push hard against any such proposal. > > Okay. My BufFile unification approach happens to assume that backends > clean up after themselves, but that isn't a ridged assumption (of > course, these are always temp files, so we reason about them as temp > files). Also, to be clear, and to avoid confusion: I don't think anyone wants an approach "that's based on the notion that leaders will remove files and workers won't". All that has been suggested is that the backend that creates the file should be responsible for deleting the file, by definition. And, that any other backend that may have files owned by another backend must be sure to not try to access them after the owner deletes them. (Typically, that would be ensured by some barrier condition, some dependency, inherent to how the parallel operation is implemented.) I will implement the reference count thing. -- Peter Geoghegan
On Wed, Jan 11, 2017 at 12:05 PM, Robert Haas <robertmhaas@gmail.com> wrote: > On Wed, Jan 11, 2017 at 2:20 PM, Peter Geoghegan <pg@heroku.com> wrote: >> You'd probably still want to throw an error when workers ended up not >> deleting BufFile segments they owned, though, at least for parallel >> tuplesort. > > Don't see why. Simply because that's not expected as things stand -- why should the file go away in that context? (Admittedly, that doesn't seem like an excellent reason now.) I actually like the idea of a reference count, the more I think about it, since it doesn't actually have any tension with my original idea of ownership. If something like a randomAccess parallel tuplesort leader merge needs to write new segments (which it almost certainly *won't* anyway, due to my recent V7 changes), then it can still own those new segments itself, alone, and delete them on its own in the manner of conventional temp files, because we can still restrict the shared refcount mechanism to the deletion of "initial" segments. The refcount == 0 deleter only deletes those initial segments, and not any same-BufFile segments that might have been added (added to append to our unified BufFile within leader). I think that parallel hash join won't use this at all, and, as I said, it's only a theoretical requirement for parallel tuplesort, which will generally recycle blocks from worker temp files for its own writes all the time for randomAccess cases, the only cases that ever write within logtape.c. So, the only BufFile shared state needed, that must be maintained over time, is the refcount variable itself. The size of the "initial" BufFile (from which we derive number of new segments during unification) is passed, but it doesn't get maintained in shared memory. BufFile size remains a one way, one time message needed during unification. I only really need to tweak things in fd.c temp routines to make all this work. This is something I like because it makes certain theoretically useful things easier. Say, for example, we wanted to have tuplesort workers merge worker final materialized tapes (their final output), in order to arrange for the leader to have fewer than $NWORKER runs to merge at the end -- that's made easier by the refcount stuff. (I'm still not convinced that that's actually going to make CREATE INDEX faster. Still, it should, on general principle, be easy to write a patch that makes it happen -- a good overall design should leave things so that writing that prototype patch is easy.) >> This idea is something that's much more limited than the >> SharedTemporaryFile() API that you sketched on the parallel sort >> thread, because it only concerns resource management, and not how to >> make access to the shared file concurrency safe in any special, >> standard way. > > Actually, I only intended that sketch to be about resource management. > Sounds like I didn't explain very well. I'm glad to hear that, because I was very puzzled by what you said. I guess I was thrown off by "shared read pointers". I don't want to get into the business of flushing out dirty buffers, or making sure that every backend stays in lockstep about what the total size of the BufFile needs to be. It's so much simpler to just have clear "barriers" for each parallel operation, where backends present a large amount of immutable state to one other backend at the end, and tells it how big its BufFile is only once. (It's not quite immutable, since randomAccess recycle of temp files can happen within logtape.c, but the point is that there should be very little back and forth -- that needs to be severely restricted.) -- Peter Geoghegan
On Wed, Jan 11, 2017 at 2:56 PM, Peter Geoghegan <pg@heroku.com> wrote: > On Fri, Jan 6, 2017 at 12:01 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> Here is a new WIP patch. I have plenty of things to tidy up (see note >> at end), but the main ideas are now pretty clear and I'd appreciate >> some feedback. > > I have some review feedback for your V3. I've chosen to start with the > buffile.c stuff, since of course it might share something with my > parallel tuplesort patch. This isn't comprehensive, but I will have > more comprehensive feedback soon. Thanks! > I'm not surprised that you've generally chosen to make shared BufFile > management as simple as possible, with no special infrastructure other > than the ability to hold open other backend temp files concurrently > within a worker, and no writing to another worker's temp file, or > shared read pointer. As you put it, everything is immutable. I > couldn't see much opportunity for adding a lot of infrastructure that > wasn't written explicitly as parallel hash join code/infrastructure. > My sense is that that was a good decision. I doubted that you'd ever > want some advanced, generic shared BufFile thing with multiple read > pointers, built-in cache coherency, etc. (Robert seemed to think that > you'd go that way, though.) Right, this is extremely minimalist infrastructure. fd.c is unchanged. buffile.c only gains the power to export/import read-only views of BufFiles. There is no 'unification' of BufFiles: each hash join participant simply reads from the buffile it wrote, and then imports and reads from its peers' BufFiles, until all are exhausted; so the 'unification' is happening in caller code which knows about the set of participants and manages shared read positions. Clearly there are some ownership/cleanup issues to straighten out, but I think those problems are fixable (probably involving refcounts). I'm entirely willing to throw that away and use the unified BufFile concept, if it can be extended to support multiple readers of the data, where every participant unifies the set of files. I have so far assumed that it would be most efficient for each participant to read from the file that it wrote before trying to read from files written by other participants. I'm reading your patch now; more soon. > Anyway, some more specific observations: > > * ISTM that this is the wrong thing for shared BufFiles: > >> +BufFile * >> +BufFileImport(BufFileDescriptor *descriptor) >> +{ > ... >> + file->isInterXact = true; /* prevent cleanup by this backend */ > > There is only one user of isInterXact = true BufFiles at present, > tuplestore.c. It, in turn, only does so for cases that require > persistent tuple stores. A quick audit of these tuplestore.c callers > show this to just be cursor support code within portalmem.c. Here is > the relevant tuplestore_begin_heap() rule that that code adheres to, > unlike the code I've quoted above: > > * interXact: if true, the files used for on-disk storage persist beyond the > * end of the current transaction. NOTE: It's the caller's responsibility to > * create such a tuplestore in a memory context and resource owner that will > * also survive transaction boundaries, and to ensure the tuplestore is closed > * when it's no longer wanted. Hmm. Yes, that is an entirely bogus use of isInterXact. I am thinking about how to fix that with refcounts. > I don't think it's right for buffile.c to know anything about file > paths directly -- I'd say that that's a modularity violation. > PathNameOpenFile() is called by very few callers at the moment, all of > them very low level (e.g. md.c), but you're using it within buffile.c > to open a path to the file that you obtain from shared memory Hmm. I'm not seeing the modularity violation. buffile.c uses interfaces already exposed by fd.c to do this: OpenTemporaryFile, then FilePathName to find the path, then PathNameOpenFile to open from another process. I see that your approach instead has client code provide more meta data so that things can be discovered, which may well be a much better idea. > directly. This is buggy because the following code won't be reached in > workers that call your BufFileImport() function: > > /* Mark it for deletion at close */ > VfdCache[file].fdstate |= FD_TEMPORARY; > > /* Register it with the current resource owner */ > if (!interXact) > { > VfdCache[file].fdstate |= FD_XACT_TEMPORARY; > > ResourceOwnerEnlargeFiles(CurrentResourceOwner); > ResourceOwnerRememberFile(CurrentResourceOwner, file); > VfdCache[file].resowner = CurrentResourceOwner; > > /* ensure cleanup happens at eoxact */ > have_xact_temporary_files = true; > } Right, that is a problem. A refcount mode could fix that; virtual file descriptors would be closed in every backend using the current resource owner, and the files would be deleted when the last one turns out the lights. > Certainly, you don't want the "Mark it for deletion at close" bit. > Deletion should not happen at eoxact for non-owners-but-sharers > (within FileClose()), but you *do* want CleanupTempFiles() to call > FileClose() for the virtual file descriptors you've opened in the > backend, to do some other cleanup. In general, you want to buy into > resource ownership for workers. As things stand, I think that this > will leak virtual file descriptors. That's really well hidden because > there is a similar CleanupTempFiles() call at proc exit, I think. > (Didn't take the time to make sure that that's what masked problems. > I'm sure that you want minimal divergence with serial cases, > resource-ownership-wise, in any case.) > > Instead of all this, I suggest copying some of my changes to fd.c, so > that resource ownership within fd.c differentiates between a vfd that > is owned by the backend in the conventional sense, including having a > need to delete at eoxact, as well as a lesser form of ownership where > deletion should not happen. Maybe you'll end up using my BufFileUnify > interface [1] within workers (instead of just within the leader, as > with parallel tuplesort), and have it handle all of that for you. > Currently, that would mean that there'd be an unused/0 sized "local" > segment for the unified BufFile, but I was thinking of making that not > happen unless and until a new segment is actually needed, so even that > minor wart wouldn't necessarily affect you. Ok, I'm studying that code now. >> Some assorted notes on the status: I need to do some thinking about >> the file cleanup logic: both explicit deletes at the earliest possible >> time, and failure/error paths. Currently the creator of each file is >> responsible for cleaning it up, but I guess if the creator aborts >> early the file disappears underneath the others' feet, and then I >> guess they might raise a confusing error report that races against the >> root cause error report; I'm looking into that. Rescans and skew >> buckets not finished yet. > > The rescan code path seems to segfault when the regression tests are > run. There is a NULL pointer dereference here: > >> @@ -985,6 +1855,14 @@ ExecReScanHashJoin(HashJoinState *node) >> node->hj_HashTable = NULL; >> node->hj_JoinState = HJ_BUILD_HASHTABLE; >> >> + if (HashJoinTableIsShared(node->hj_HashTable)) >> + { >> + /* Coordinate a rewind to the shared hash table creation phase. */ >> + BarrierWaitSet(&hashNode->shared_table_data->barrier, >> + PHJ_PHASE_BEGINNING, >> + WAIT_EVENT_HASHJOIN_REWINDING3); >> + } >> + > > Clearly, HashJoinTableIsShared() should not be called when its > argument (in this case node->hj_HashTable) is NULL. > > In general, I think you should try to set expectations about what > happens when the regression tests run up front, because that's usually > the first thing reviewers do. Apologies, poor form. That block can be commented out for now because rescan support is obviously incomplete, and I didn't mean to post it that way. Doing so reveals two remaining test failures: "join" and "rowsecurity" managed to lose a couple of rows. Oops. I will figure out what I broke and have a fix for that in my next version. > Various compiler warnings on my system: > > /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHash.c:1376:7: > warning: variable ‘size_before_shrink’ set but not used > [-Wunused-but-set-variable] > Size size_before_shrink = 0; > ^ In this case it was only used in dtrace builds; I will make sure any such code is compiled out when in non-dtrace builds. > /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c: > In function ‘ExecHashJoinCloseBatch’: > /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1548:28: > warning: variable ‘participant’ set but not used > [-Wunused-but-set-variable] > HashJoinParticipantState *participant; > ^ > /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c: > In function ‘ExecHashJoinRewindBatches’: > /home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1587:23: > warning: variable ‘batch_reader’ set but not used > [-Wunused-but-set-variable] > HashJoinBatchReader *batch_reader; > ^ > > Is this change really needed?: > >> --- a/src/backend/executor/nodeSeqscan.c >> +++ b/src/backend/executor/nodeSeqscan.c >> @@ -31,6 +31,8 @@ >> #include "executor/nodeSeqscan.h" >> #include "utils/rel.h" >> >> +#include <unistd.h> >> + >> static void InitScanRelation(SeqScanState *node, EState *estate, int eflags); >> static TupleTableSlot *SeqNext(SeqScanState *node); Right, will clean up. > That's all I have for now... Thanks! I'm away from my computer for a couple of days but will have a new patch series early next week, and hope to have a better handle on what's involved in adopting the 'unification' approach here instead. -- Thomas Munro http://www.enterprisedb.com
On Wed, Jan 11, 2017 at 2:56 PM, Peter Geoghegan <pg@heroku.com> wrote:
> On Fri, Jan 6, 2017 at 12:01 PM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> Here is a new WIP patch. I have plenty of things to tidy up (see note
>> at end), but the main ideas are now pretty clear and I'd appreciate
>> some feedback.
>
> I have some review feedback for your V3. I've chosen to start with the
> buffile.c stuff, since of course it might share something with my
> parallel tuplesort patch. This isn't comprehensive, but I will have
> more comprehensive feedback soon.
Thanks!
> I'm not surprised that you've generally chosen to make shared BufFile
> management as simple as possible, with no special infrastructure other
> than the ability to hold open other backend temp files concurrently
> within a worker, and no writing to another worker's temp file, or
> shared read pointer. As you put it, everything is immutable. I
> couldn't see much opportunity for adding a lot of infrastructure that
> wasn't written explicitly as parallel hash join code/infrastructure.
> My sense is that that was a good decision. I doubted that you'd ever
> want some advanced, generic shared BufFile thing with multiple read
> pointers, built-in cache coherency, etc. (Robert seemed to think that
> you'd go that way, though.)
Right, this is extremely minimalist infrastructure. fd.c is
unchanged. buffile.c only gains the power to export/import read-only
views of BufFiles. There is no 'unification' of BufFiles: each hash
join participant simply reads from the buffile it wrote, and then
imports and reads from its peers' BufFiles, until all are exhausted;
so the 'unification' is happening in caller code which knows about the
set of participants and manages shared read positions. Clearly there
are some ownership/cleanup issues to straighten out, but I think those
problems are fixable (probably involving refcounts).
I'm entirely willing to throw that away and use the unified BufFile
concept, if it can be extended to support multiple readers of the
data, where every participant unifies the set of files. I have so far
assumed that it would be most efficient for each participant to read
from the file that it wrote before trying to read from files written
by other participants. I'm reading your patch now; more soon.
> Anyway, some more specific observations:
>
> * ISTM that this is the wrong thing for shared BufFiles:
>
>> +BufFile *
>> +BufFileImport(BufFileDescriptor *descriptor)
>> +{
> ...
>> + file->isInterXact = true; /* prevent cleanup by this backend */
>
> There is only one user of isInterXact = true BufFiles at present,
> tuplestore.c. It, in turn, only does so for cases that require
> persistent tuple stores. A quick audit of these tuplestore.c callers
> show this to just be cursor support code within portalmem.c. Here is
> the relevant tuplestore_begin_heap() rule that that code adheres to,
> unlike the code I've quoted above:
>
> * interXact: if true, the files used for on-disk storage persist beyond the
> * end of the current transaction. NOTE: It's the caller's responsibility to
> * create such a tuplestore in a memory context and resource owner that will
> * also survive transaction boundaries, and to ensure the tuplestore is closed
> * when it's no longer wanted.
Hmm. Yes, that is an entirely bogus use of isInterXact. I am
thinking about how to fix that with refcounts.
> I don't think it's right for buffile.c to know anything about file
> paths directly -- I'd say that that's a modularity violation.
> PathNameOpenFile() is called by very few callers at the moment, all of
> them very low level (e.g. md.c), but you're using it within buffile.c
> to open a path to the file that you obtain from shared memory
Hmm. I'm not seeing the modularity violation. buffile.c uses
interfaces already exposed by fd.c to do this: OpenTemporaryFile,
then FilePathName to find the path, then PathNameOpenFile to open from
another process. I see that your approach instead has client code
provide more meta data so that things can be discovered, which may
well be a much better idea.
> directly. This is buggy because the following code won't be reached in
> workers that call your BufFileImport() function:
>
> /* Mark it for deletion at close */
> VfdCache[file].fdstate |= FD_TEMPORARY;
>
> /* Register it with the current resource owner */
> if (!interXact)
> {
> VfdCache[file].fdstate |= FD_XACT_TEMPORARY;
>
> ResourceOwnerEnlargeFiles(CurrentResourceOwner);
> ResourceOwnerRememberFile(CurrentResourceOwner, file);
> VfdCache[file].resowner = CurrentResourceOwner;
>
> /* ensure cleanup happens at eoxact */
> have_xact_temporary_files = true;
> }
Right, that is a problem. A refcount mode could fix that; virtual
file descriptors would be closed in every backend using the current
resource owner, and the files would be deleted when the last one turns
out the lights.
> Certainly, you don't want the "Mark it for deletion at close" bit.
> Deletion should not happen at eoxact for non-owners-but-sharers
> (within FileClose()), but you *do* want CleanupTempFiles() to call
> FileClose() for the virtual file descriptors you've opened in the
> backend, to do some other cleanup. In general, you want to buy into
> resource ownership for workers. As things stand, I think that this
> will leak virtual file descriptors. That's really well hidden because
> there is a similar CleanupTempFiles() call at proc exit, I think.
> (Didn't take the time to make sure that that's what masked problems.
> I'm sure that you want minimal divergence with serial cases,
> resource-ownership-wise, in any case.)
>
> Instead of all this, I suggest copying some of my changes to fd.c, so
> that resource ownership within fd.c differentiates between a vfd that
> is owned by the backend in the conventional sense, including having a
> need to delete at eoxact, as well as a lesser form of ownership where
> deletion should not happen. Maybe you'll end up using my BufFileUnify
> interface [1] within workers (instead of just within the leader, as
> with parallel tuplesort), and have it handle all of that for you.
> Currently, that would mean that there'd be an unused/0 sized "local"
> segment for the unified BufFile, but I was thinking of making that not
> happen unless and until a new segment is actually needed, so even that
> minor wart wouldn't necessarily affect you.
Ok, I'm studying that code now.Apologies, poor form. That block can be commented out for now because
>> Some assorted notes on the status: I need to do some thinking about
>> the file cleanup logic: both explicit deletes at the earliest possible
>> time, and failure/error paths. Currently the creator of each file is
>> responsible for cleaning it up, but I guess if the creator aborts
>> early the file disappears underneath the others' feet, and then I
>> guess they might raise a confusing error report that races against the
>> root cause error report; I'm looking into that. Rescans and skew
>> buckets not finished yet.
>
> The rescan code path seems to segfault when the regression tests are
> run. There is a NULL pointer dereference here:
>
>> @@ -985,6 +1855,14 @@ ExecReScanHashJoin(HashJoinState *node)
>> node->hj_HashTable = NULL;
>> node->hj_JoinState = HJ_BUILD_HASHTABLE;
>>
>> + if (HashJoinTableIsShared(node->hj_HashTable))
>> + {
>> + /* Coordinate a rewind to the shared hash table creation phase. */
>> + BarrierWaitSet(&hashNode->shared_table_data->barrier,
>> + PHJ_PHASE_BEGINNING,
>> + WAIT_EVENT_HASHJOIN_REWINDING3);
>> + }
>> +
>
> Clearly, HashJoinTableIsShared() should not be called when its
> argument (in this case node->hj_HashTable) is NULL.
>
> In general, I think you should try to set expectations about what
> happens when the regression tests run up front, because that's usually
> the first thing reviewers do.
rescan support is obviously incomplete, and I didn't mean to post it
that way. Doing so reveals two remaining test failures: "join" and
"rowsecurity" managed to lose a couple of rows. Oops. I will figure
out what I broke and have a fix for that in my next version.
> Various compiler warnings on my system:
>
> /home/pg/pgbuild/builds/root/../../postgresql/src/backend/ executor/nodeHash.c:1376:7:
> warning: variable ‘size_before_shrink’ set but not used
> [-Wunused-but-set-variable]
> Size size_before_shrink = 0;
> ^
In this case it was only used in dtrace builds; I will make sure any
such code is compiled out when in non-dtrace builds.
> /home/pg/pgbuild/builds/root/../../postgresql/src/backend/ executor/nodeHashjoin.c:
> In function ‘ExecHashJoinCloseBatch’:
> /home/pg/pgbuild/builds/root/../../postgresql/src/backend/ executor/nodeHashjoin.c:1548: 28:
> warning: variable ‘participant’ set but not used
> [-Wunused-but-set-variable]
> HashJoinParticipantState *participant;
> ^
> /home/pg/pgbuild/builds/root/../../postgresql/src/backend/ executor/nodeHashjoin.c:
> In function ‘ExecHashJoinRewindBatches’:
> /home/pg/pgbuild/builds/root/../../postgresql/src/backend/ executor/nodeHashjoin.c:1587: 23:
> warning: variable ‘batch_reader’ set but not used
> [-Wunused-but-set-variable]
> HashJoinBatchReader *batch_reader;
> ^
>
> Is this change really needed?:
>
>> --- a/src/backend/executor/nodeSeqscan.c
>> +++ b/src/backend/executor/nodeSeqscan.c
>> @@ -31,6 +31,8 @@
>> #include "executor/nodeSeqscan.h"
>> #include "utils/rel.h"
>>
>> +#include <unistd.h>
>> +
>> static void InitScanRelation(SeqScanState *node, EState *estate, int eflags);
>> static TupleTableSlot *SeqNext(SeqScanState *node);
Right, will clean up.
> That's all I have for now...
Thanks! I'm away from my computer for a couple of days but will have
a new patch series early next week, and hope to have a better handle
on what's involved in adopting the 'unification' approach here
instead.
--
Thomas Munro
http://www.enterprisedb.com--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Jan 11, 2017 at 7:37 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Hmm. Yes, that is an entirely bogus use of isInterXact. I am > thinking about how to fix that with refcounts. Cool. As I said, the way I'd introduce refcounts would not be very different from what I've already done -- there'd still be a strong adherence to the use of resource managers to clean-up, with that including exactly one particular backend doing the extra step of deletion. The refcount only changes which backend does that extra step in corner cases, which is conceptually a very minor change. >> I don't think it's right for buffile.c to know anything about file >> paths directly -- I'd say that that's a modularity violation. >> PathNameOpenFile() is called by very few callers at the moment, all of >> them very low level (e.g. md.c), but you're using it within buffile.c >> to open a path to the file that you obtain from shared memory > > Hmm. I'm not seeing the modularity violation. buffile.c uses > interfaces already exposed by fd.c to do this: OpenTemporaryFile, > then FilePathName to find the path, then PathNameOpenFile to open from > another process. I see that your approach instead has client code > provide more meta data so that things can be discovered, which may > well be a much better idea. Indeed, my point was that the metadata thing would IMV be better. buffile.c shouldn't need to know about file paths, etc. Instead, caller should pass BufFileImport()/BufFileUnify() simple private state sufficient for routine to discover all details itself, based on a deterministic scheme. In my tuplesort patch, that piece of state is: /* + * BufFileOp is an identifier for a particular parallel operation involving + * temporary files. Parallel temp file operations must be discoverable across + * processes based on these details. + * + * These fields should be set by BufFileGetIdent() within leader process. + * Identifier BufFileOp makes temp files from workers discoverable within + * leader. + */ +typedef struct BufFileOp +{ + /* + * leaderPid is leader process PID. + * + * tempFileIdent is an identifier for a particular temp file (or parallel + * temp file op) for the leader. Needed to distinguish multiple parallel + * temp file operations within a given leader process. + */ + int leaderPid; + long tempFileIdent; +} BufFileOp; + > Right, that is a problem. A refcount mode could fix that; virtual > file descriptors would be closed in every backend using the current > resource owner, and the files would be deleted when the last one turns > out the lights. Yeah. That's basically what the BufFile unification process can provide you with (or will, once I get around to implementing the refcount thing, which shouldn't be too hard). As already noted, I'll also want to make it defer creation of a leader-owned segment, unless and until that proves necessary, which it never will for hash join. Perhaps I should make superficial changes to unification in my patch to suit your work, like rename the field BufFileOp.leaderPid to BufFileOp.ownerPid, without actually changing any behaviors, except as noted in the last paragraph. Since you only require that backends be able to open up some other backend's temp file themselves for a short while, that gives you everything you need. You'll be doing unification in backends, and not just within the leader as in the tuplesort patch, I believe, but that's just fine. All that matters is that you present all data at once to a consuming backend via unification (since you treat temp file contents as immutable, this will be true for hash join, just as it is for tuplesort). There is a good argument against my making such a tweak, however, which is that maybe it's clearer to DBAs what's going on if temp file names have the leader PID in them for all operations. So, maybe BufFileOp.leaderPid isn't renamed to BufFileOp.ownerPid by me; instead, you always make it the leader pid, while at the same time having the leader dole out BufFileOp.tempFileIdent identifiers to each worker as needed (see how I generate BufFileOps for an idea of what I mean if it's not immediately clear). That's also an easy change, or at least will be once the refcount thing is added. -- Peter Geoghegan
On Fri, Jan 13, 2017 at 2:36 PM, Peter Geoghegan <pg@heroku.com> wrote: > [...] > Yeah. That's basically what the BufFile unification process can > provide you with (or will, once I get around to implementing the > refcount thing, which shouldn't be too hard). As already noted, I'll > also want to make it defer creation of a leader-owned segment, unless > and until that proves necessary, which it never will for hash join. Hi Peter, I have broken this up into a patch series, harmonised the private vs shared hash table code paths better and fixed many things including the problems with rescans and regression tests mentioned upthread. You'll see that one of the patches is that throwaway BufFile import/export facility, which I'll replace with your code as discussed. The three 'refactor' patches change the existing hash join code to work in terms of chunks in more places. These may be improvements in their own right, but mainly they pave the way for parallelism. The later patches introduce single-batch and then multi-batch shared tables. The patches in the attached tarball are: 0001-nail-down-regression-test-row-order-v4.patch: A couple of regression tests would fail with later refactoring that changes the order of unmatched rows emitted by hash joins. So first, let's fix that by adding ORDER BY in those places, without any code changes. 0002-hj-add-dtrace-probes-v4.patch: Before making any code changes, let's add some dtrace probes so that we can measure time spent doing different phases of hash join work before and after the later changes. The main problem with the probes as I have them here (and the extra probes inserted by later patches in the series) is that interesting query plans contain multiple hash joins so these get all mixed up when you're trying to measure stuff, so perhaps I should pass executor node IDs into all the probes. More on this later. (If people don't want dtrace probes in the executor, I'm happy to omit them and maintain that kind of thing locally for my own testing purposes...) 0003-hj-refactor-memory-accounting-v4.patch: Modify the existing hash join code to work in terms of chunks when estimating and later tracking memory usage. This is probably more accurate than the current tuple-based approach, because it tries to take into account the space used by chunk headers and the wasted space in chunks. In practice the difference is probably small, but it's arguably more accurate; I did this because I need chunk-based accounting the later patches. Also, make HASH_CHUNK_SIZE the actual size of allocated chunks (ie the header information is included in that size so we allocate exactly 32KB, not 32KB + a bit, for the benefit of the dsa allocator which otherwise finishes up allocating 36KB). 0004-hj-refactor-batch-increases-v4.patch: Modify the existing hash join code to detect work_mem exhaustion at the point where chunks are allocated, instead of checking after every tuple insertion. This matches the logic used for estimating, and more importantly allows for some parallelism in later patches. 0005-hj-refactor-unmatched-v4.patch: Modifies the existing hash join code to handle unmatched tuples in right/full joins chunk-by-chunk. This is probably a cache-friendlier scan order anyway, but the real goal is to provide a natural grain for parallelism in a later patch. 0006-hj-barrier-v4.patch: The patch from a nearby thread previously presented as a dependency of this project. It might as well be considered part of this patch series. 0007-hj-exec-detach-node-v4.patch By the time ExecEndNode() runs in workers, ExecShutdownNode() has already run. That's done on purpose because, for example, the hash table needs to survive longer than the parallel environment to allow EXPLAIN to peek at it. But it means that the Gather node has thrown out the shared memory before any parallel-aware node below it gets to run its Shutdown and End methods. So I invented ExecDetachNode() which runs before ExecShutdownNode(), giving parallel-aware nodes a chance to say goodbye before their shared memory vanishes. Better ideas? 0008-hj-shared-single-batch-v4.patch: Introduces hash joins with "Shared Hash" and "Parallel Shared Hash" nodes, for single-batch joins only. If the planner has a partial inner plan, it'll pick a Parallel Shared Hash plan to divide that over K participants. Failing that, if the planner has a parallel-safe inner plan and thinks that it can avoid batching by using work_mem * K memory (shared by all K participants), it will now use a Shared Hash. Otherwise it'll typically use a Hash plan as before. Without the later patches, it will blow through work_mem * K if it turns out to have underestimated the hash table size, because it lacks infrastructure for dealing with batches. The trickiest thing at this point in the series is that participants (workers and the leader) can show up at any time, so there are three places that provide synchronisation with a parallel hash join that is already in progress. Those can be seen in ExecHashTableCreate, MultiExecHash and ExecHashJoin (HJ_BUILD_HASHTABLE case). 0009-hj-shared-buffile-strawman-v4.patch: Simple code for sharing BufFiles between backends. This is standing in for Peter G's BufFile sharing facility with refcount-based cleanup. 0010-hj-shared-multi-batch-v4.patch: Adds support for multi-batch joins with shared hash tables. At this point, more complications appear: deadlock avoidance with the leader, batch file sharing and coordinated batch number increases (shrinking the hash table) while building or loading. Some thoughts: * Although this patch series adds a ton of wait points, in the common case of a single batch inner join there is effectively only one: participants wait for PHJ_PHASE_BUILDING to end and PHJ_PHASE_PROBING to begin (resizing the hash table in between if necessary). For a single batch outer join, there is one more wait point: participants wait for PHJ_PHASE_PROBING to end so that PHJ_PHASE_UNMATCHED can begin. The length of the wait for PHJ_PHASE_BUILDING to finish is limited by the grain of the scattered data being loaded into the hash table: if the source of parallelism is Parallel Seq Scan, then the worst case scenario is that you run out of tuples to insert and twiddle your thumbs while some other participant chews on the final pageful of tuples. The wait for PHJ_PHASE_UNMATCHED (if applicable) is similarly limited by the time it takes for the slowest participant to scan the match bits of one chunk of tuples. All other phases and associated wait points relate to multi-batch joins: either running out of work_mem and needing to shrink the hash table, or coordinating loading and various batches; in other words, ugly synchronisation only enters the picture at the point where hash join starts doing IO because you don't have enough work_mem. * I wrestled with rescans for a long time; I think I have it right now! The key thing to understand is that only the leader runs ExecHashJoinReScan; new workers will be created for the next scan, so the leader is able to get the barrier into the right state (attached and fast-forwarded to PHJ_PHASE_PROBING if reusing the hash table, detached and in the initial phase PHJ_PHASE_BEGINNING if we need to recreate it). * Skew table not supported yet. * I removed the support for preloading data for the next batch; it didn't seem to buy anything (it faithfully used up exactly all of your work_mem for a brief moment, but since probing usually finishes very close together in all participants anyway, no total execution time seems to be saved) and added some complexity to the code; might be worth revisiting but I'm not hopeful. * The thing where different backends attach at different phases of the hash join obviously creates a fairly large bug surface; of course we can review the code and convince ourselves that it is correct, but what is really needed is a test with 100% coverage that somehow arranges for a worker to join at phases 0 to 12, and then perhaps also for the leader to do the same; I have an idea for how to do that with a debug build, more soon. * Some of this needs to be more beautiful. * With the patches up to 0008-hj-shared-single-batch.patch, I find that typically I can get up to 3x or 4x speedups on queries like TPCH Q9 that can benefit from a partial inner plan using Parallel Shared Hash when work_mem is set 'just right', and at least some speedup on queries without a partial inner plan but where the extra usable memory available to Shared Hash can avoid the need to batching. (The best cases I've seen probably combine these factors: avoiding batching and dividing work up). * With the full patch series up to 0010-hj-shared-multi-batch.patch, it produces some terrible plans for some TPCH queries right now, and I'm investigating that. Up to this point I have been focused on getting the multi-batch code to work correctly, but will now turn some attention to planning and efficiency and figure out what's happening there. -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
Hi Thomas, On Fri, Jan 27, 2017 at 5:03 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > I have broken this up into a patch series, harmonised the private vs > shared hash table code paths better and fixed many things including > the problems with rescans and regression tests mentioned upthread. > You'll see that one of the patches is that throwaway BufFile > import/export facility, which I'll replace with your code as > discussed. I'll try to get back to this ASAP, but expect to be somewhat busy next week. Next week will be my last week at Heroku. It was not an easy decision for me to leave Heroku, but I felt it was time for a change. I am very grateful to have had the opportunity. I have learned an awful lot during my time at the company. It has been excellent to have an employer that has been so supportive of my work on Postgres this whole time. -- Peter Geoghegan
On Sat, Jan 7, 2017 at 9:01 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Stepping back a bit, I am aware of the following approaches to hash > join parallelism: > > 1. Run the inner plan and build a private hash table in each > participant [...]. > > 2. Run a partition-wise hash join[1]. [...] > > 3. Repartition the data on the fly, and then run a partition-wise > hash join. [...] > > 4. Scatter both the inner and outer plans arbitrarily across > participants [...], and build a shared hash > table. [...] > > [...] I suspect that 4 is probably a better > fit than 3 for Postgres today, because the communication overhead of > shovelling nearly all tuples through extra tuple queues to route them > to the right hash table would surely be very high, though I can see > that it's very attractive to have a reusable tuple repartitioning > operator and then run k disjoint communication-free joins (again, > without code change to the join operator, and to the benefit of all > join operators). On this topic, I recently stumbled on the 2011 paper "Design and Evaluation of Main Memory Hash Join Algorithms for Multi-core CPUs"[1] and found it reassuring. It compares simple shared hash tables to some state-of-the-art repartitioning approaches (including the radix join algorithm which performs the amazing feat of building a lot of cacheline-sized hash tables and then runs with minimal cache misses). From the introduction: "Second, we show that an algorithm that does not do any partitioning, but simply constructs a single shared hash table on the build relation often outperforms more complex algorithms. This simple “no-partitioning” hash join algorithm is robust to sub-optimal parameter choices by the optimizer, and does not require any knowledge of the characteristics of the input to work well. To the best of our knowledge, this simple hash join technique differs from what is currently implemented in existing DBMSs for multi-core hash join processing, and offers a tantalizingly simple, efficient, and robust technique for implementing the hash join operation." "Finally, we show that the simple “no-partitioning” hash join algorithm takes advantage of intrinsic hardware optimizations to handle skew. As a result, this simple hash join technique often benefits from skew and its relative performance increases as the skew increases! This property is a big advancement over the state-of-the-art methods, as it is important to have methods that can gracefully handle skew in practice [8]." (That relates to SMT pipelining compensating for the extra cacheline misses during probing by doing thread A's work while waiting for thread B's memory to be fetched.) From the conclusion: "... Our results show that a simple hash join technique that does not do any partitioning of the input relations often outperforms the other more complex partitioning-based join alternatives. In addition, the relative performance of this simple hash join technique rapidly improves with increasing skew, and it outperforms every other algorithm in the presence of even small amounts of skew." For balance, the authors of a 2013 paper "Main-Memory Hash Joins on Multi-Core CPUs: Tuning to the Underlying Hardware"[2] are less keen on the simple "hardware-oblivious" "no partitioning" approach and don't buy the other paper's ideas about SMT. Incidentally, their results on the benefits of large (huge) pages are interesting (table VI) and suggest that huge page support for DSM segments could be good here. [1] https://pdfs.semanticscholar.org/9de4/b32f2c7b630a4f6aae6994a362a46c7c49e9.pdf [2] https://www.inf.ethz.ch/personal/cagri.balkesen/publications/parallel-joins-icde13.pdf -- Thomas Munro http://www.enterprisedb.com
On Sat, Jan 28, 2017 at 10:03 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > I have broken this up into a patch series, harmonised the private vs > shared hash table code paths better and fixed many things including > the problems with rescans and regression tests mentioned upthread. > You'll see that one of the patches is that throwaway BufFile > import/export facility, which I'll replace with your code as > discussed. Patch moved to CF 2017-03. -- Michael
> > 0003-hj-refactor-memory-accounting-v4.patch: > > Modify the existing hash join code to work in terms of chunks when > estimating and later tracking memory usage. This is probably more > accurate than the current tuple-based approach, because it tries to > take into account the space used by chunk headers and the wasted space > in chunks. In practice the difference is probably small, but it's > arguably more accurate; I did this because I need chunk-based > accounting the later patches. Also, make HASH_CHUNK_SIZE the actual > size of allocated chunks (ie the header information is included in > that size so we allocate exactly 32KB, not 32KB + a bit, for the > benefit of the dsa allocator which otherwise finishes up allocating > 36KB). > I looked at this patch. I agree that it accounts the memory usage more accurately. Here are few comments. spaceUsed is defined with comment Size spaceUsed; /* memory space currently used by tuples */ In ExecHashTableCreate(), although the space is allocated for buckets, no tuples are yet inserted, so no space is used by the tuples, so going strictly by the comment, spaceUsed should be 0 in that function. But I think the patch is accounting the spaceUsed more accurately. Without this patch, the actual allocation might cross spaceAllowed without being noticed. With this patch that's not possible. Probably we should change the comment to say memory space currently allocated. However, ExecHashIncreaseNumBatches() may change the number of buckets; the patch does not seem to account for spaceUsed changes because of that. Without this patch ExecHashTableInsert() used to account for the space used by a single tuple inserted. The patch moves this calculation in dense_alloc() and accounts for out-of-bound allocation for larger tuples. That's good. The change in ExecChooseHashTableSize() too looks fine. In ExecHashTableReset(), do we want to update spacePeak while setting spaceUsed. While this patch tracks space usage more accurately, I am afraid we might be overdoing it; a reason why we don't track space usage accurately now. But I think I will leave it to be judged by someone who is more familiar with the code and possibly has historical perspective. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On Wed, Feb 1, 2017 at 2:10 AM, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: >> >> 0003-hj-refactor-memory-accounting-v4.patch: >> [...] >> > I looked at this patch. I agree that it accounts the memory usage more > accurately. Here are few comments. Thanks for the review! > spaceUsed is defined with comment > Size spaceUsed; /* memory space currently used by tuples */ > > In ExecHashTableCreate(), although the space is allocated for buckets, no > tuples are yet inserted, so no space is used by the tuples, so going strictly > by the comment, spaceUsed should be 0 in that function. But I think the patch > is accounting the spaceUsed more accurately. Without this patch, the actual > allocation might cross spaceAllowed without being noticed. With this patch > that's not possible. Probably we should change the comment to say memory space > currently allocated. Right, that comment is out of date. It is now the space used by the bucket array and the tuples. I will fix that in the next version. > However, ExecHashIncreaseNumBatches() may change the > number of buckets; the patch does not seem to account for spaceUsed changes > because of that. That's what this hunk is intended to do: @@ -795,6 +808,12 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) TRACE_POSTGRESQL_HASH_INCREASE_BUCKETS(hashtable->nbuckets, hashtable->nbuckets_optimal); + /* account for the increase in space that will be used by buckets */ + hashtable->spaceUsed += sizeof(HashJoinTuple) * + (hashtable->nbuckets_optimal - hashtable->nbuckets); + if (hashtable->spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = hashtable->spaceUsed; + hashtable->nbuckets = hashtable->nbuckets_optimal; hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; It knows that spaceUsed already includes the old bucket array (nbuckets), so it figures out how much bigger the new bucket array will be (nbuckets_optimal - nbuckets) and adds that. > Without this patch ExecHashTableInsert() used to account for the space used by > a single tuple inserted. The patch moves this calculation in dense_alloc() and > accounts for out-of-bound allocation for larger tuples. That's good. > > The change in ExecChooseHashTableSize() too looks fine. > > In ExecHashTableReset(), do we want to update spacePeak while setting > spaceUsed. I figured there was no way that the new spaceUsed value could be bigger than spacePeak, because we threw out all chunks and have just the bucket array, and we had that number of buckets before, so spacePeak must at least have been set to a number >= this number either when we expanded to this many buckets, or when we created the hashtable in the first place. Perhaps I should Assert(hashtable->spaceUsed <= hashtable->spacePeak). > While this patch tracks space usage more accurately, I am afraid we might be > overdoing it; a reason why we don't track space usage accurately now. But I > think I will leave it to be judged by someone who is more familiar with the > code and possibly has historical perspective. Well it's not doing more work; it doesn't make any practical difference whatsoever but it's technically doing less work than master, by doing memory accounting only when acquiring a new 32KB chunk. But if by overdoing it you mean that no one really cares about the tiny increase in accuracy so the patch on its own is a bit of a waste of time, you're probably right. Depending on tuple size, you could imagine something like 64 bytes of header and unused space per 32KB chunk that we're not accounting for, and that's only 0.2%. So I probably wouldn't propose this refactoring just on accuracy grounds alone. This refactoring is intended to pave the way for shared memory accounting in the later patches, which would otherwise generate ugly IPC if done for every time a tuple is allocated. I considered using atomic add to count space per tuple, or maintaining per-backend local subtotals and periodically summing. Then I realised that switching to per-chunk accounting would fix the IPC problem AND be justifiable on theoretical grounds. When we allocate a new 32KB chunk, we really are using 32KB more of your memory. -- Thomas Munro http://www.enterprisedb.com
> >> However, ExecHashIncreaseNumBatches() may change the >> number of buckets; the patch does not seem to account for spaceUsed changes >> because of that. > > That's what this hunk is intended to do: > > @@ -795,6 +808,12 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) > TRACE_POSTGRESQL_HASH_INCREASE_BUCKETS(hashtable->nbuckets, > > hashtable->nbuckets_optimal); > > + /* account for the increase in space that will be used by buckets */ > + hashtable->spaceUsed += sizeof(HashJoinTuple) * > + (hashtable->nbuckets_optimal - hashtable->nbuckets); > + if (hashtable->spaceUsed > hashtable->spacePeak) > + hashtable->spacePeak = hashtable->spaceUsed; > + Sorry, I missed that hunk. You are right, it's getting accounted for. >> >> In ExecHashTableReset(), do we want to update spacePeak while setting >> spaceUsed. > > I figured there was no way that the new spaceUsed value could be > bigger than spacePeak, because we threw out all chunks and have just > the bucket array, and we had that number of buckets before, so > spacePeak must at least have been set to a number >= this number > either when we expanded to this many buckets, or when we created the > hashtable in the first place. Perhaps I should > Assert(hashtable->spaceUsed <= hashtable->spacePeak). That would help, better if you explain this with a comment before Assert. > >> While this patch tracks space usage more accurately, I am afraid we might be >> overdoing it; a reason why we don't track space usage accurately now. But I >> think I will leave it to be judged by someone who is more familiar with the >> code and possibly has historical perspective. > > Well it's not doing more work; it doesn't make any practical > difference whatsoever but it's technically doing less work than > master, by doing memory accounting only when acquiring a new 32KB > chunk. This patch does more work while counting the space used by buckets, I guess. AFAIU, right now, that happens only after the hash table is built completely. But that's fine. I am not worried about whether the it's less work or more. > But if by overdoing it you mean that no one really cares about > the tiny increase in accuracy so the patch on its own is a bit of a > waste of time, you're probably right. This is what I meant by overdoing; you have spelled it better. > Depending on tuple size, you > could imagine something like 64 bytes of header and unused space per > 32KB chunk that we're not accounting for, and that's only 0.2%. So I > probably wouldn't propose this refactoring just on accuracy grounds > alone. > > This refactoring is intended to pave the way for shared memory > accounting in the later patches, which would otherwise generate ugly > IPC if done for every time a tuple is allocated. I considered using > atomic add to count space per tuple, or maintaining per-backend local > subtotals and periodically summing. Then I realised that switching to > per-chunk accounting would fix the IPC problem AND be justifiable on > theoretical grounds. When we allocate a new 32KB chunk, we really are > using 32KB more of your memory. +1. Thanks for considering the comments. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On Wed, Feb 1, 2017 at 10:13 AM, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: > > > > >> However, ExecHashIncreaseNumBatches() may change the > >> number of buckets; the patch does not seem to account for spaceUsed changes > >> because of that. > > > > That's what this hunk is intended to do: > > > > @@ -795,6 +808,12 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) > > TRACE_POSTGRESQL_HASH_INCREASE_BUCKETS(hashtable->nbuckets, > > > > hashtable->nbuckets_optimal); > > > > + /* account for the increase in space that will be used by buckets */ > > + hashtable->spaceUsed += sizeof(HashJoinTuple) * > > + (hashtable->nbuckets_optimal - hashtable->nbuckets); > > + if (hashtable->spaceUsed > hashtable->spacePeak) > > + hashtable->spacePeak = hashtable->spaceUsed; > > + > > Sorry, I missed that hunk. You are right, it's getting accounted for. > > >> > >> In ExecHashTableReset(), do we want to update spacePeak while setting > >> spaceUsed. > > > > I figured there was no way that the new spaceUsed value could be > > bigger than spacePeak, because we threw out all chunks and have just > > the bucket array, and we had that number of buckets before, so > > spacePeak must at least have been set to a number >= this number > > either when we expanded to this many buckets, or when we created the > > hashtable in the first place. Perhaps I should > > Assert(hashtable->spaceUsed <= hashtable->spacePeak). > > That would help, better if you explain this with a comment before Assert. > > > > >> While this patch tracks space usage more accurately, I am afraid we might be > >> overdoing it; a reason why we don't track space usage accurately now. But I > >> think I will leave it to be judged by someone who is more familiar with the > >> code and possibly has historical perspective. > > > > Well it's not doing more work; it doesn't make any practical > > difference whatsoever but it's technically doing less work than > > master, by doing memory accounting only when acquiring a new 32KB > > chunk. > > This patch does more work while counting the space used by buckets, I > guess. AFAIU, right now, that happens only after the hash table is > built completely. But that's fine. I am not worried about whether the > it's less work or more. > > > But if by overdoing it you mean that no one really cares about > > the tiny increase in accuracy so the patch on its own is a bit of a > > waste of time, you're probably right. > > This is what I meant by overdoing; you have spelled it better. > > > Depending on tuple size, you > > could imagine something like 64 bytes of header and unused space per > > 32KB chunk that we're not accounting for, and that's only 0.2%. So I > > probably wouldn't propose this refactoring just on accuracy grounds > > alone. > > > > This refactoring is intended to pave the way for shared memory > > accounting in the later patches, which would otherwise generate ugly > > IPC if done for every time a tuple is allocated. I considered using > > atomic add to count space per tuple, or maintaining per-backend local > > subtotals and periodically summing. Then I realised that switching to > > per-chunk accounting would fix the IPC problem AND be justifiable on > > theoretical grounds. When we allocate a new 32KB chunk, we really are > > using 32KB more of your memory. > > +1. > > Thanks for considering the comments. > Hello Thomas, I was performing performance analysis of this set of patches on TPC-H higher scale factor and came across following cases: - Only 6 queries are using parallel hash - Q8, is showing regression from 8 seconds on head to 15 seconds with this patch set - In the remaining queries, most are not showing significant improvement in performance, numbers are, Query | Head | with patch ---------|----------------|---------------- 3 | 72829.921 | 59915.961 5 | 54815.123 | 55751.214 7 | 41346.71 | 46149.742 8 | 8801.814 | 15049.155 9 | 62928.88 | 59077.909 10 | 62446.136 | 61933.278 Could you please look into this regression case, also let me know if the setup I am using is something that is expectant to give such performance for your patch, or is there anything else you might want to point out. Let me know if you need any more information for these tests. Experimental setup is as follows: Scale factor: 20 work_mem = 64 MB max_parallel_workers_per_gather = 4 shared_buffers = 8GB effective_cache_size = 10 GB Additional indexes are on columns (all individually) l_shipdate, l_shipmode, o_comment, o_orderdate, c_mktsegment. For the output plans on head and with this set of patch please check the attached tar folder. -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/ -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Thu, Feb 2, 2017 at 3:34 AM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > 9 | 62928.88 | 59077.909 Thanks Rafia. At first glance this plan is using the Parallel Shared Hash in one place where it should pay off, that is loading the orders table, but the numbers are terrible. I noticed that it uses batch files and then has to increase the number of batch files, generating a bunch of extra work, even though it apparently overestimated the number of rows, though that's only ~9 seconds of ~60. I am investigating. -- Thomas Munro http://www.enterprisedb.com
On Thu, Feb 2, 2017 at 1:19 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Thu, Feb 2, 2017 at 3:34 AM, Rafia Sabih > <rafia.sabih@enterprisedb.com> wrote: >> 9 | 62928.88 | 59077.909 > > Thanks Rafia. At first glance this plan is using the Parallel Shared > Hash in one place where it should pay off, that is loading the orders > table, but the numbers are terrible. I noticed that it uses batch > files and then has to increase the number of batch files, generating a > bunch of extra work, even though it apparently overestimated the > number of rows, though that's only ~9 seconds of ~60. I am > investigating. Hi Thomas, Apart from the previously reported regression, there appear one more issue in this set of patches. At times, running a query using parallel hash it hangs up and all the workers including the master shows the following backtrace, #0 0x00003fff880c7de8 in __epoll_wait_nocancel () from /lib64/power8/libc.so.6 #1 0x00000000104e2718 in WaitEventSetWaitBlock (set=0x100157bde90, cur_timeout=-1, occurred_events=0x3fffdbe69698, nevents=1) at latch.c:998 #2 0x00000000104e255c in WaitEventSetWait (set=0x100157bde90, timeout=-1, occurred_events=0x3fffdbe69698, nevents=1, wait_event_info=134217745) at latch.c:950 #3 0x0000000010512970 in ConditionVariableSleep (cv=0x3ffd736e05a4, wait_event_info=134217745) at condition_variable.c:132 #4 0x00000000104dbb1c in BarrierWaitSet (barrier=0x3ffd736e0594, new_phase=1, wait_event_info=134217745) at barrier.c:97 #5 0x00000000104dbb9c in BarrierWait (barrier=0x3ffd736e0594, wait_event_info=134217745) at barrier.c:127 #6 0x00000000103296a8 in ExecHashShrink (hashtable=0x3ffd73747dc0) at nodeHash.c:1075 #7 0x000000001032c46c in dense_alloc_shared (hashtable=0x3ffd73747dc0, size=40, shared=0x3fffdbe69eb8, respect_work_mem=1 '\001') at nodeHash.c:2618 #8 0x000000001032a2f0 in ExecHashTableInsert (hashtable=0x3ffd73747dc0, slot=0x100158f9e90, hashvalue=2389907270) at nodeHash.c:1476 #9 0x0000000010327fd0 in MultiExecHash (node=0x100158f9800) at nodeHash.c:296 #10 0x0000000010306730 in MultiExecProcNode (node=0x100158f9800) at execProcnode.c:577 The issue is not deterministic and straightforwardly reproducible, sometimes after make clean, etc. queries run sometimes they hang up again. I wanted to bring this to your notice hoping you might be faster than me in picking up the exact reason behind this anomaly. -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/
On Thu, Feb 2, 2017 at 4:57 PM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > Apart from the previously reported regression, there appear one more > issue in this set of patches. At times, running a query using parallel > hash it hangs up and all the workers including the master shows the > following backtrace, Ugh, thanks. Investigating. -- Thomas Munro http://www.enterprisedb.com
> > 0004-hj-refactor-batch-increases-v4.patch: > > Modify the existing hash join code to detect work_mem exhaustion at > the point where chunks are allocated, instead of checking after every > tuple insertion. This matches the logic used for estimating, and more > importantly allows for some parallelism in later patches. The patch has three changes 1. change dense_alloc() to accept respect_workmem argument and use it within the function. 2. Move call to ExecHashIncreaseNumBatches() into dense_alloc() from ExecHashTableInsert() to account for memory before inserting new tuple 3. Check growEnabled before calling ExecHashIncreaseNumBatches(). I think checking growEnabled within ExecHashIncreaseNumBatches() is more easy to maintain that checking at every caller. If someone is to add a caller tomorrow, s/he has to remember to add the check. It might be better to add some comments in ExecHashRemoveNextSkewBucket() explaining why dense_alloc() should be called with respect_work_mem = false? ExecHashSkewTableInsert() does call ExecHashIncreaseNumBatches() after calling ExecHashRemoveNextSkewBucket() multiple times, so it looks like we do expect increase in space used and thus go beyond work_mem for a short while. Is there a way we can handle this case in dense_alloc()? Is it possible that increasing the number of batches changes the bucket number of the tuple being inserted? If so, should we recalculate the bucket and batch of the tuple being inserted? -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On Thu, Feb 2, 2017 at 4:57 PM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > On Thu, Feb 2, 2017 at 1:19 AM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> On Thu, Feb 2, 2017 at 3:34 AM, Rafia Sabih >> <rafia.sabih@enterprisedb.com> wrote: >>> [ regressions ] >> >> Thanks Rafia. At first glance this plan is using the Parallel Shared >> Hash in one place where it should pay off, that is loading the orders >> table, but the numbers are terrible. I noticed that it uses batch >> files and then has to increase the number of batch files, generating a >> bunch of extra work, even though it apparently overestimated the >> number of rows, though that's only ~9 seconds of ~60. I am >> investigating. > > Hi Thomas, > Apart from the previously reported regression, there appear one more > issue in this set of patches. At times, running a query using parallel > hash it hangs up and all the workers including the master shows the > following backtrace, Here's a new version to fix the problems reported by Rafia above. The patch descriptions are as before but it starts from 0002 because 0001 was committed as 7c5d8c16 (thanks, Andres). First, some quick master-vs-patch numbers from the queries listed with regressions, using TPCH dbgen scale 10, work_mem = 64MB, max_parallel_workers_per_gather = 4, shared_buffers = 8GB (the numbers themselves not comparable as different scale and different hardware). Better except for Q5 and Q8, which for some mysterious reason plans only one worker and then loses. I'm looking into that. Q3 19917.682 -> 8649.822 Q5 4149.983 -> 4192.551 Q7 14453.721 -> 10303.911 Q8 1981.540 -> 8030.264 Q9 26928.102 -> 17384.607 Q10 16955.240 -> 14563.787 I plan to explore the performance space with a range of worker numbers and work_mem sizes and do some analysis; more soon. Changes: 1. Fixed two bugs that resulted in ExecHashShrink sometimes hanging, as reported by Rafia. (1) When splitting the large v3 patch up into smaller patches for v4, I'd managed to lose the line that initialises shared->shrink_barrier, causing some occasional strange behaviour. (2) I found a bug[1] in condition_variable.c that could cause hangs and fixed that via a separate patch and the fix was committed as 3f3d60d3 (thanks, Robert). 2. Simplified barrier.c by removing BarrierWaitSet(), because that turned out to be unnecessary to implement rescan as I'd originally thought, and was incompatible with the way BarrierDetach() works. The latter assumes that the phase only ever increments, so that combination of features was broken. 3. Sorted out the hash table sizing logic that was previously leading to some strange decisions about batches. This involved putting the total estimated number of inner rows into the path and plan when there is a partial inner plan, because plan_rows only has the partial number. I need to size the hash table correctly at execution time. It seems a bit strange to do that specifically and only for Hash (see rows_total in the 0008 patch)... should there be some more generic way? Should total rows go into Plan rather than HashPlan, or perhaps the parallel divisor should go somewhere? 4. Comments fixed and added based on Ashutosh's feedback on patch 0003. 5. Various small bug fixes. I've also attached a small set of test queries that hit the four "modes" (for want of a better word) of our hash join algorithm for dealing with different memory conditions, which I've nicknamed thus: 1. "Good": We estimate that the hash table will fit in work_mem, and at execution time it does. This patch makes that more likely because [Parallel] Shared Hash gets to use more work_mem as discussed. 2. "Bad": We estimate that the hash table won't fit in work_mem, but that if we partition it into N batches using some bits from the hash value then each batch will fit in work_mem. At execution time, each batch does indeed fit into work_mem. This is not ideal, because we have to write out and read back in N - (1 / N) inner and outer tuples (ie all batches except the first one, although actually costsize.c always charges for all of them). But it may still be better than other plans, and the IO is sequential. Currently Shared Hash shouldn't be selected over (private) Hash if it would require batching anyway due to the cpu_shared_tuple_cost tie-breaker: on the one had it avoids a bunch of copies of the batch files being written out, but on the other it introduces a bunch of synchronisation overhead. Parallel Shared Hash is fairly likely to be chosen if possible be due to division of the inner relation's cost outweighing cpu_shared_tuple_cost. 3. "Ugly": We planned for "good" or "bad" mode, but we ran out of work_mem at some point during execution: this could be during the initial hash table load, or while loading a subsequent batch. So now we double the number of batches, splitting the current batch and all batches that haven't been processed yet into two in the hope of shrinking the hash table, while generating extra reading and writing of all as-yet unprocessed tuples. This patch can do the shrinking work in parallel, which may help. 4. "Fail": After reaching "ugly" mode (and perhaps trying multiple times to shrink the hash table), we deduce that there is a kind of extreme skew that our partitioning scheme can never help with. So we stop respecting work_mem and hope for the best. The hash join may or may not be able to complete, depending on how much memory you can successfully allocate without melting the server or being killed by the OOM reaper. The "ugly" mode was added in 2005[1], so before that we had only "good", "bad" and "fail". We don't ever want to be in "ugly" or "fail" modes: a sort merge join would have been better, or in any case is guaranteed to be able to run to completion in the configured space. However, at the point where we reach this condition, there isn't anything else we can do. Some other interesting cases that hit new code are: rescan with single batch (reuses the hash table contents), rescan with multiple batches (blows away and rebuilds the hash table), outer join (scans hash table for unmatched tuples). Outer joins are obviously easy to test but rescans are a bit tricky to reach... one way is to run TPCH Q9 with cph_shared_tuple_cost = -10 (I think what's happening here is that it's essentially running the optimiser in reverse, and a nested loop rescanning a gather node (= fork/exit workers for every loop) is about the worst plan imaginable), but I haven't found a short and sweet test query for that yet. Some assorted thoughts: * Instead of abandoning our work_mem limit in "fail" mode, you might think we could probe the portion of the hash table that we managed to load so far, then rewind the outer batch and probe again using the next work_mem-sized portion of the same inner batch file. This doesn't work though because in the case of work_mem exhaustion during the initial batch it's too late to decide to start recording the the initial outer batch, so we have no way to rewind. * Instead of using the shared hash table for batch mode, we could do just the initial batch with a shared hash table, but drop back to smaller private hash tables for later batches and give each worker its own batch to work until they're all done with no further communication. There are some problems with this though: inability to handle outer joins (just like parallel hash join in 9.6), limit of work_mem (not work_mem * P) for the private hash tables, load balancing/granularity problems with skewed data. Thanks to my colleague Ashutosh Bapat for this off-list suggestion. One of the unpleasant things about this patch is the risk of deadlock, as already discussed. I wanted to mention an idea for how to get rid of this problem eventually. I am aware of two ways that a deadlock could happen: 1. A worker is waiting to write into its tuple queue (because the reader is not consuming fast enough and its fixed buffer has filled up), but the leader (which should be reading the tuple queue) is stuck waiting for the worker. This is avoided currently with the early-exit protocol, at the cost of losing a CPU core after probing the first batch. 2. Two different hash joins run in non-deterministic order. Workers A and B have executed hash join nodes 1 and 2 at least once and attached to the barrier, and now Worker A is in hash join node 1, and worker B is in hash join node 2 at a barrier wait point. I am not aware of any executor nodes that could do that currently, but there is nothing to say that future nodes couldn't do that. If I am wrong about that and this could happen today, that would be fatal for this patch in its current form. Once we have asynchronous execution infrastructure, perhaps we could make those problems go away like this: 1. Introduce a new way for barrier clients to try to advance to the next phase, but detach and return immediately if they would have to wait. 2. Introduce a way for barriers to participate in the the readiness protocol used for async execution, so that barrier advances counts as a kind of readiness. (The asynchronous scheduler probably doesn't need to know anything about that since it's based on latches which the WaitSet API already knows how to multiplex.) 3. Teach Hash Join to yield instead of waiting at barriers, asking to be executed again when the barrier might have advanced. 4. Make sure the Gather node is suitably asynchronicity-aware. At a minimum it should be able to deal with the child plan yielding (in the case where it runs in the leader due to lack of better things to do) and be able to try that again when it needs to. [1] https://www.postgresql.org/message-id/CAEepm%3D3a4VaPFnmwcdyUH8gE5_hW4tRvXQkpfQyrzgDQ9gJCYw%40mail.gmail.com [2] https://www.postgresql.org/message-id/15661.1109887540@sss.pgh.pa.us [3] 849074f9ae422c64501bb1d53ef840de870bf65c -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Thu, Feb 9, 2017 at 2:03 AM, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: >> >> 0004-hj-refactor-batch-increases-v4.patch: >> >> Modify the existing hash join code to detect work_mem exhaustion at >> the point where chunks are allocated, instead of checking after every >> tuple insertion. This matches the logic used for estimating, and more >> importantly allows for some parallelism in later patches. > > The patch has three changes > 1. change dense_alloc() to accept respect_workmem argument and use it > within the function. > 2. Move call to ExecHashIncreaseNumBatches() into dense_alloc() from > ExecHashTableInsert() to account for memory before inserting new tuple > 3. Check growEnabled before calling ExecHashIncreaseNumBatches(). Thanks for the review! > I think checking growEnabled within ExecHashIncreaseNumBatches() is > more easy to maintain that checking at every caller. If someone is to > add a caller tomorrow, s/he has to remember to add the check. Hmm. Yeah. In the later 0010 patch ExecHashIncreaseNumBatches will be used in a slightly different way -- not for making decisions or performing the hash table shrink, but only for reallocating the batch arrays. I will see if putting the growEnabled check back in there in the 0004 patch and then refactoring in a later patch makes more sense to someone reviewing the patches independently, for the next version. > It might be better to add some comments in > ExecHashRemoveNextSkewBucket() explaining why dense_alloc() should be > called with respect_work_mem = false? ExecHashSkewTableInsert() does > call ExecHashIncreaseNumBatches() after calling > ExecHashRemoveNextSkewBucket() multiple times, so it looks like we do > expect increase in space used and thus go beyond work_mem for a short > while. Is there a way we can handle this case in dense_alloc()? Right, that needs some explanation, which I'll add for the next version. The explanation is that while 'shrinking' the hash table, we may need to go over the work_mem limit by one chunk for a short time. That is already true in master, but by moving the work_mem checks into dense_alloc I ran into the problem that dense_alloc might decide to shrink the hash table which needs to call dense alloc. Shrinking works by spinning through all the chunks copying only the tuples we want to keep into new chunks and freeing the old chunks as we go. We will temporarily go one chunk over work_mem when we allocate the first new chunk but before we've freed the first old one. We don't want shrink operations to trigger recursive shrink operations, so we disable respect for work_mem when calling it from ExecHashIncreaseNumBatches. In the course of regular hash table loading, we want to respect work_mem. Looking at the v5 patch series I posted yesterday, I see that in fact ExecHashIncreaseNumBatches calls dense_alloc with respect_work_mem = true in the 0004 patch, and then I corrected that mistake in the 0008 patch; I'll move the correction back to the 0004 patch in the next version. To reach ExecHashIncreaseNumBatches, see the "ugly" query in hj-test-queries.sql (posted with v5). In ExecHashRemoveNextSkewBucket I preserved the existing behaviour of not caring about work_mem when performing the rare operation of copying a tuple from the skew bucket into a dense_alloc memory chunk so it can be inserted into a regular (non-skew) bucket. > Is it possible that increasing the number of batches changes the > bucket number of the tuple being inserted? If so, should we > recalculate the bucket and batch of the tuple being inserted? No -- see the function documentation for ExecHashGetBucketAndBatch. -- Thomas Munro http://www.enterprisedb.com
Out of archeological curiosity, I was digging around in the hash join code and RCS history from Postgres 4.2[1], and I was astounded to discover that it had a parallel executor for Sequent SMP systems and was capable of parallel hash joins as of 1991. At first glance, it seems to follow approximately the same design as I propose: share a hash table and use a barrier to coordinate the switch from build phase to probe phase and deal with later patches. It uses mmap to get space and then works with relative pointers. See src/backend/executor/n_hash.c and src/backend/executor/n_hashjoin.c. Some of this might be described in Wei Hong's PhD thesis[2] which I haven't had the pleasure of reading yet. The parallel support is absent from the first commit in our repo (1996), but there are some vestiges like RelativeAddr and ABSADDR used to access the hash table (presumably needlessly) and also some mentions of parallel machines in comments that survived up until commit 26069a58 (1999). [1] http://db.cs.berkeley.edu/postgres.html [2] http://db.cs.berkeley.edu/papers/ERL-M93-28.pdf -- Thomas Munro http://www.enterprisedb.com
Hi, On 2017-02-13 23:57:00 +1300, Thomas Munro wrote: > Here's a new version to fix the problems reported by Rafia above. The > patch descriptions are as before but it starts from 0002 because 0001 > was committed as 7c5d8c16 (thanks, Andres). FWIW, I'd appreciate if you'd added a short commit message to the individual patches - I find it helpful to have a littlebit more context while looking at them than just the titles. Alternatively you can include that text when re-posting the series, but it's imo just as easy to have a short commit message (and just use format-patch). I'm for now using [1] as context. 0002-hj-add-dtrace-probes-v5.patch Hm. I'm personally very unenthusiastic about addming more of these, and would rather rip all of them out. I tend to believe that static problems simply aren't a good approach for anything requiring a lot of detail. But whatever. 0003-hj-refactor-memory-accounting-v5.patch @@ -424,15 +422,29 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, if (ntuples <= 0.0) ntuples= 1000.0; - /* - * Estimate tupsize based on footprint of tuple in hashtable... note this - * does not allow for any palloc overhead. The manipulations of spaceUsed - * don't count palloc overhead either. - */ + /* Estimate tupsize based on footprint of tuple in hashtable. */ palloc overhead is still unaccounted for, no? In the chunked case that might not be much, I realize that (so that comment should probably have been updated when chunking was introduced). - Size spaceUsed; /* memory space currently used by tuples */ + Size spaceUsed; /* memory space currently used by hashtable */ It's not really the full hashtable, is it? The ->buckets array appears to still be unaccounted for. Looks ok. 0004-hj-refactor-batch-increases-v5.patch @@ -1693,10 +1689,12 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)}/* - * Allocate 'size' bytes from the currently active HashMemoryChunk + * Allocate 'size' bytes from the currently active HashMemoryChunk. If + * 'respect_work_mem' is true, this may cause the number of batches to be + * increased in an attempt to shrink the hash table. */static void * -dense_alloc(HashJoinTable hashtable, Size size) +dense_alloc(HashJoinTable hashtable, Size size, bool respect_work_mem) { HashMemoryChunk newChunk; char *ptr; @@ -1710,6 +1708,15 @@ dense_alloc(HashJoinTable hashtable, Size size) */ if (size > HASH_CHUNK_THRESHOLD) { + if (respect_work_mem && + hashtable->growEnabled && + hashtable->spaceUsed + HASH_CHUNK_HEADER_SIZE + size > + hashtable->spaceAllowed) + { + /* work_mem would be exceeded: try to shrink hash table */ + ExecHashIncreaseNumBatches(hashtable); + } + Isn't it kinda weird to do this from within dense_alloc()? I mean that dumps a lot of data to disk, frees a bunch of memory and so on - not exactly what "dense_alloc" implies. Isn't the free()ing part also dangerous, because the caller might actually use some of that memory, like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked deeply enough to check whether that's an active bug, but it seems like inviting one if not. 0005-hj-refactor-unmatched-v5.patch I'm a bit confused as to why unmatched tuple scan is a good parallelism target, but I might see later... 0006-hj-barrier-v5.patch Skipping that here. 0007-hj-exec-detach-node-v5.patch Hm. You write elsewhere: > By the time ExecEndNode() runs in workers, ExecShutdownNode() has > already run. That's done on purpose because, for example, the hash > table needs to survive longer than the parallel environment to allow > EXPLAIN to peek at it. But it means that the Gather node has thrown > out the shared memory before any parallel-aware node below it gets to > run its Shutdown and End methods. So I invented ExecDetachNode() > which runs before ExecShutdownNode(), giving parallel-aware nodes a > chance to say goodbye before their shared memory vanishes. Better > ideas? To me that is a weakness in the ExecShutdownNode() API - imo child nodes should get the chance to shutdown before the upper-level node. ExecInitNode/ExecEndNode etc give individual nodes the freedom to do things in the right order, but ExecShutdownNode() doesn't. I don't quite see why we'd want to invent a separate ExecDetachNode() that'd be called immediately before ExecShutdownNode(). An easy way to change that would be to return in the ExecShutdownNode()'s T_GatherState case, and delegate the responsibility of calling it on Gather's children to ExecShutdownGather(). Alternatively we could make it a full-blown thing like ExecInitNode() that every node needs to implement, but that seems a bit painful. Or have I missed something here? Random aside: Wondered before if having to provide all executor callbacks is a weakness of our executor integration, and whether it shouldn't be a struct of callbacks instead... 0008-hj-shared-single-batch-v5.patch First-off: I wonder if we should get the HASHPATH_TABLE_SHARED_SERIAL path committed first. ISTM that's already quite beneficial, and there's a good chunk of problems that we could push out initially. This desperately needs tests. Have you measured whether the new branches in nodeHash[join] slow down non-parallel executions? I do wonder if it'd not be better to have to put the common code in helper functions and have seperate T_SharedHashJoin/T_SharedHash types. If you both have a parallel and non-parallel hash in the same query, the branches will be hard to predict... I think the synchronization protocol with the various phases needs to be documented somewhere. Probably in nodeHashjoin.c's header. The state machine code in MultiExecHash() also needs more comments. Including the fact that avoiding repeating work is done by "electing" leaders via BarrierWait(). I wonder if it wouldn't be better to inline the necessary code into the switch (with fall-throughs), instead of those gotos; putting some of the relevant code (particularly the scanning of the child node) into seperate functions. + build: + if (HashJoinTableIsShared(hashtable)) + { + /* Make sure our local state is up-to-date so we can build. */ + Assert(BarrierPhase(barrier) == PHJ_PHASE_BUILDING); + ExecHashUpdate(hashtable); + } + /* * set expression context */ @@ -128,18 +197,78 @@ MultiExecHash(HashState *node) Why's is the parallel code before variable initialization stuff like setting up econtext? > Introduces hash joins with "Shared Hash" and "Parallel Shared Hash" > nodes, for single-batch joins only. We don't necessarily know that ahead of time. So this isn't something that we could actually apply separately, right? /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ - if (hashtable->nbuckets != hashtable->nbuckets_optimal) - ExecHashIncreaseNumBuckets(hashtable); + ExecHashUpdate(hashtable); + ExecHashIncreaseNumBuckets(hashtable); It's kinda weird that we had the nearly redundant nbuckets != nbuckets_optimal checks before... +static void * +dense_alloc_shared(HashJoinTable hashtable, + Size size, + dsa_pointer *shared) Hm. I wonder if HASH_CHUNK_SIZE being only 32kb isn't going to be a bottlenck here. @@ -195,6 +238,40 @@ ExecHashJoin(HashJoinState *node) if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ + + if (HashJoinTableIsShared(hashtable)) + { + /* + * An important optimization: if this is a + * single-batch join and not an outer join, there is + * no reason to synchronize again when we've finished + * probing. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PROBING); + if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node)) + return NULL; /* end of join */ + I think it's a bit weird that the parallel path now has an exit path that the non-parallel path doesn't have. + * If this is a shared hash table, there is a extra charge for inserting + * each tuple into the shared hash table to cover memory synchronization + * overhead, compared to a private hash table. There is no extra charge + * for probing the hash table for outer path row, on the basis that + * read-only access to a shared hash table shouldn't be any more + * expensive. + * + * cpu_shared_tuple_cost acts a tie-breaker controlling whether we prefer + * HASHPATH_TABLE_PRIVATE or HASHPATH_TABLE_SHARED_SERIAL plans when the + * hash table fits in work_mem, since the cost is otherwise the same. If + * it is positive, then we'll prefer private hash tables, even though that + * means that we'll be running N copies of the inner plan. Running N + * copies of the copies of the inner plan in parallel is not considered + * more expensive than running 1 copy of the inner plan while N-1 + * participants do nothing, despite doing less work in total. + */ + if (table_type != HASHPATH_TABLE_PRIVATE) + startup_cost += cpu_shared_tuple_cost * inner_path_rows; + + /* + * If this is a parallel shared hash table, then the value we have for + * inner_rows refers only to the rows returned by each participant. For + * shared hash table size estimation, we need the total number, so we need + * to undo the division. + */ + if (table_type == HASHPATH_TABLE_SHARED_PARALLEL) + inner_path_rows_total *= get_parallel_divisor(inner_path); + + /* Is the per-tuple cost really the same for HASHPATH_TABLE_SHARED_SERIAL and PARALLEL? Don't we also need to somehow account for the more expensive hash-probes in the HASHPATH_TABLE_SHARED_* cases? Seems quite possible that we'll otherwise tend to use shared tables for small hashed tables that are looked up very frequently, even though a private one will likely be faster. + /* + * Set the table as sharable if appropriate, with parallel or serial + * building. If parallel, the executor will also need an estimate of the + * total number of rows expected from all participants. + */ Oh. I was about to comment that sharable is wrong, just to discover it's valid in NA. Weird. @@ -2096,6 +2096,7 @@ create_mergejoin_path(PlannerInfo *root, * 'required_outer' is the set of required outer rels * 'hashclauses'are the RestrictInfo nodes to use as hash clauses * (this should be a subset of the restrict_clauseslist) + * 'table_type' to select [[Parallel] Shared] Hash */HashPath *create_hashjoin_path(PlannerInfo *root, Reminds me that you're not denoting the Parallel bit in explain right now - intentionally so? /* - * To reduce palloc overhead, the HashJoinTuples for the current batch are - * packed in 32kB buffers instead of pallocing each tuple individually. + * To reduce palloc/dsa_allocate overhead, the HashJoinTuples for the current + * batch are packed in 32kB buffers instead of pallocing each tuple + * individually. s/palloc\/dsa_allocate/allocator/? @@ -112,8 +121,12 @@ typedef struct HashMemoryChunkData size_t maxlen; /* size of the buffer holdingthe tuples */ size_t used; /* number of buffer bytes already used */ - struct HashMemoryChunkData *next; /* pointer to the next chunk (linked - * list) */ + /* pointer to the next chunk (linked list) */ + union + { + dsa_pointer shared; + struct HashMemoryChunkData *unshared; + } next; This'll increase memory usage on some platforms, e.g. when using spinlock backed atomics. I tend to think that that's fine, but it's probably worth calling out. --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -787,7 +787,15 @@ typedef enum WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_SAFE_SNAPSHOT, - WAIT_EVENT_SYNC_REP + WAIT_EVENT_SYNC_REP, + WAIT_EVENT_HASH_BEGINNING, + WAIT_EVENT_HASH_CREATING, + WAIT_EVENT_HASH_BUILDING, + WAIT_EVENT_HASH_RESIZING, + WAIT_EVENT_HASH_REINSERTING, + WAIT_EVENT_HASH_UNMATCHED, + WAIT_EVENT_HASHJOIN_PROBING, + WAIT_EVENT_HASHJOIN_REWINDING} WaitEventIPC; Hm. That seems a bit on the detailed side - if we're going that way it seems likely that we'll end up with hundreds of wait events. I don't think gradually evolving wait events into something like a query progress framework is a good idea. That's it for now... - Andres [1] http://archives.postgresql.org/message-id/CAEepm%3D1D4-tP7j7UAgT_j4ZX2j4Ehe1qgZQWFKBMb8F76UW5Rg%40mail.gmail.com
Hi, Just to summarize what you could read between the lines in the previous mail: From a higher level POV the design here makes sense to me, I do however think there's a good chunk of code-level improvements needed. Regards, Andres
On Thu, Feb 16, 2017 at 3:36 PM, Andres Freund <andres@anarazel.de> wrote: > Hi, Thanks for the review! > FWIW, I'd appreciate if you'd added a short commit message to the > individual patches - I find it helpful to have a littlebit more context > while looking at them than just the titles. Alternatively you can > include that text when re-posting the series, but it's imo just as easy > to have a short commit message (and just use format-patch). > > I'm for now using [1] as context. Ok, will do. > 0002-hj-add-dtrace-probes-v5.patch > > Hm. I'm personally very unenthusiastic about addming more of these, and > would rather rip all of them out. I tend to believe that static > problems simply aren't a good approach for anything requiring a lot of > detail. But whatever. Ok, I will get rid of these. Apparently you aren't the only committer who hates these. (I have some other thoughts on that but will save them for another time.) > 0003-hj-refactor-memory-accounting-v5.patch > @@ -424,15 +422,29 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, > if (ntuples <= 0.0) > ntuples = 1000.0; > > - /* > - * Estimate tupsize based on footprint of tuple in hashtable... note this > - * does not allow for any palloc overhead. The manipulations of spaceUsed > - * don't count palloc overhead either. > - */ > + /* Estimate tupsize based on footprint of tuple in hashtable. */ > > palloc overhead is still unaccounted for, no? In the chunked case that > might not be much, I realize that (so that comment should probably have > been updated when chunking was introduced). Yeah, it seemed like an obsolete comment, but I'll put it back as that isn't relevant to this patch. > - Size spaceUsed; /* memory space currently used by tuples */ > + Size spaceUsed; /* memory space currently used by hashtable */ > > It's not really the full hashtable, is it? The ->buckets array appears > to still be unaccounted for. It is actually the full hash table, and that is a change in this patch. See ExecHashTableCreate and ExecHashTableReset where is it set to nbuckets * sizeof(HashJoinTuple), so that at all times it holds the total size of buckets + all chunks. Unlike the code in master, where it's just the sum of all tuples while building, but then the bucket space is added at the end in MultiExecHash. > Looks ok. Thanks! > 0004-hj-refactor-batch-increases-v5.patch > > @@ -1693,10 +1689,12 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) > } > > /* > - * Allocate 'size' bytes from the currently active HashMemoryChunk > + * Allocate 'size' bytes from the currently active HashMemoryChunk. If > + * 'respect_work_mem' is true, this may cause the number of batches to be > + * increased in an attempt to shrink the hash table. > */ > static void * > -dense_alloc(HashJoinTable hashtable, Size size) > +dense_alloc(HashJoinTable hashtable, Size size, bool respect_work_mem) > > { > HashMemoryChunk newChunk; > char *ptr; > @@ -1710,6 +1708,15 @@ dense_alloc(HashJoinTable hashtable, Size size) > */ > if (size > HASH_CHUNK_THRESHOLD) > { > + if (respect_work_mem && > + hashtable->growEnabled && > + hashtable->spaceUsed + HASH_CHUNK_HEADER_SIZE + size > > + hashtable->spaceAllowed) > + { > + /* work_mem would be exceeded: try to shrink hash table */ > + ExecHashIncreaseNumBatches(hashtable); > + } > + > > Isn't it kinda weird to do this from within dense_alloc()? I mean that > dumps a lot of data to disk, frees a bunch of memory and so on - not > exactly what "dense_alloc" implies. Hmm. Yeah I guess that is a bit weird. My problem was that in the shared case (later patch), when you call this function it has a fast path and a slow path: the fast path just hands out more space from the existing chunk, but the slow path acquires an LWLock and makes space management decisions which have to be done sort of "atomically". In an earlier version I toyed with the idea of making dense_alloc return NULL if you said respect_work_mem = true and it determined that you need to increase the number of batches or go help other workers who have already started doing so. Then the batch increase stuff was not in here, but callers who say respect_work_mem = true (the build and reload loops) had to be prepared to loop and shrink if they get NULL, or some wrapper function needs to do that them. I will try it that way again. > Isn't the free()ing part also > dangerous, because the caller might actually use some of that memory, > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked > deeply enough to check whether that's an active bug, but it seems like > inviting one if not. I'm not sure if I get what you mean here. ExecHashRemoveNextSkewBucket calls dense_alloc with respect_work_mem = false, so it's not going to enter that path. > 0005-hj-refactor-unmatched-v5.patch > > I'm a bit confused as to why unmatched tuple scan is a good parallelism > target, but I might see later... Macroscopically because any time we can spread the resulting tuples over all participants, we enable parallelism in all executor nodes above this one in the plan. Suppose I made one worker do the unmatched scan while the others twiddled their thumbs; now some other join above me finishes up with potentially many tuples all in one process while the rest do nothing. Microscopically because we may be spinning through 1GB of memory testing these bits, and the way that it is coded in master will do that in random order whereas this way will be in sequential order, globally and within each participant. (You could stuff the matched bits all up one end of each chunk, so that they'd all fit in a cacheline... but not suggesting that or any other micro-optimisation for the sake of it: the main reason is the macroscopic one.) > 0006-hj-barrier-v5.patch > > Skipping that here. > > > 0007-hj-exec-detach-node-v5.patch > > Hm. You write elsewhere: >> By the time ExecEndNode() runs in workers, ExecShutdownNode() has >> already run. That's done on purpose because, for example, the hash >> table needs to survive longer than the parallel environment to allow >> EXPLAIN to peek at it. But it means that the Gather node has thrown >> out the shared memory before any parallel-aware node below it gets to >> run its Shutdown and End methods. So I invented ExecDetachNode() >> which runs before ExecShutdownNode(), giving parallel-aware nodes a >> chance to say goodbye before their shared memory vanishes. Better >> ideas? > > To me that is a weakness in the ExecShutdownNode() API - imo child nodes > should get the chance to shutdown before the upper-level node. > ExecInitNode/ExecEndNode etc give individual nodes the freedom to do > things in the right order, but ExecShutdownNode() doesn't. I don't > quite see why we'd want to invent a separate ExecDetachNode() that'd be > called immediately before ExecShutdownNode(). Hmm. Yes that makes sense, I think. > An easy way to change that would be to return in the > ExecShutdownNode()'s T_GatherState case, and delegate the responsibility > of calling it on Gather's children to ExecShutdownGather(). That might work for the leader but maybe not for workers (?) > Alternatively we could make it a full-blown thing like ExecInitNode() > that every node needs to implement, but that seems a bit painful. > > Or have I missed something here? Let me try a couple of ideas and get back to you. > Random aside: Wondered before if having to provide all executor > callbacks is a weakness of our executor integration, and whether it > shouldn't be a struct of callbacks instead... > > > 0008-hj-shared-single-batch-v5.patch > > First-off: I wonder if we should get the HASHPATH_TABLE_SHARED_SERIAL > path committed first. ISTM that's already quite beneficial, and there's > a good chunk of problems that we could push out initially. The reason I don't think we can do that is because single-batch hash joins can turn into multi-batch hash joins at execution time, unless you're prepared to use unbounded memory in rare cases. I don't think that's acceptable. I had the single batch shared hash code working reasonably well early on, and then came to understand that it couldn't really be committed without the full enchilada, because melting your server is not a reasonable thing to do if the estimates are off. Then I spent a really long time battling with the multi-batch case to get here! > This desperately needs tests. Will add. > Have you measured whether the new branches in nodeHash[join] slow down > non-parallel executions? I do wonder if it'd not be better to have to > put the common code in helper functions and have seperate > T_SharedHashJoin/T_SharedHash types. If you both have a parallel and > non-parallel hash in the same query, the branches will be hard to > predict... Huh. That is an interesting thought. Will look into that. > I think the synchronization protocol with the various phases needs to be > documented somewhere. Probably in nodeHashjoin.c's header. Will do. > The state machine code in MultiExecHash() also needs more > comments. Including the fact that avoiding repeating work is done by > "electing" leaders via BarrierWait(). Ok. > I wonder if it wouldn't be better to inline the necessary code into the > switch (with fall-throughs), instead of those gotos; putting some of the > relevant code (particularly the scanning of the child node) into > seperate functions. Right, this comes from a desire to keep the real code common for private and shared hash tables. I will look into other ways to structure it. > + build: > + if (HashJoinTableIsShared(hashtable)) > + { > + /* Make sure our local state is up-to-date so we can build. */ > + Assert(BarrierPhase(barrier) == PHJ_PHASE_BUILDING); > + ExecHashUpdate(hashtable); > + } > + > /* > * set expression context > */ > @@ -128,18 +197,78 @@ MultiExecHash(HashState *node) > > Why's is the parallel code before variable initialization stuff like > setting up econtext? Will move. >> Introduces hash joins with "Shared Hash" and "Parallel Shared Hash" >> nodes, for single-batch joins only. > > We don't necessarily know that ahead of time. So this isn't something > that we could actually apply separately, right? Indeed, as mentioned above. > /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ > - if (hashtable->nbuckets != hashtable->nbuckets_optimal) > - ExecHashIncreaseNumBuckets(hashtable); > + ExecHashUpdate(hashtable); > + ExecHashIncreaseNumBuckets(hashtable); > > It's kinda weird that we had the nearly redundant nbuckets != > nbuckets_optimal checks before... +1 > +static void * > +dense_alloc_shared(HashJoinTable hashtable, > + Size size, > + dsa_pointer *shared) > > Hm. I wonder if HASH_CHUNK_SIZE being only 32kb isn't going to be a > bottlenck here. Yeah, I should benchmark some different sizes. > @@ -195,6 +238,40 @@ ExecHashJoin(HashJoinState *node) > if (TupIsNull(outerTupleSlot)) > { > /* end of batch, or maybe whole join */ > + > + if (HashJoinTableIsShared(hashtable)) > + { > + /* > + * An important optimization: if this is a > + * single-batch join and not an outer join, there is > + * no reason to synchronize again when we've finished > + * probing. > + */ > + Assert(BarrierPhase(&hashtable->shared->barrier) == > + PHJ_PHASE_PROBING); > + if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node)) > + return NULL; /* end of join */ > + > > I think it's a bit weird that the parallel path now has an exit path > that the non-parallel path doesn't have. Indeed, but I think it's fairly clearly explained? Do you think there is something unsafe about exiting in that state? > + * If this is a shared hash table, there is a extra charge for inserting > + * each tuple into the shared hash table to cover memory synchronization > + * overhead, compared to a private hash table. There is no extra charge > + * for probing the hash table for outer path row, on the basis that > + * read-only access to a shared hash table shouldn't be any more > + * expensive. > + * > + * cpu_shared_tuple_cost acts a tie-breaker controlling whether we prefer > + * HASHPATH_TABLE_PRIVATE or HASHPATH_TABLE_SHARED_SERIAL plans when the > + * hash table fits in work_mem, since the cost is otherwise the same. If > + * it is positive, then we'll prefer private hash tables, even though that > + * means that we'll be running N copies of the inner plan. Running N > + * copies of the copies of the inner plan in parallel is not considered > + * more expensive than running 1 copy of the inner plan while N-1 > + * participants do nothing, despite doing less work in total. > + */ > + if (table_type != HASHPATH_TABLE_PRIVATE) > + startup_cost += cpu_shared_tuple_cost * inner_path_rows; > + > + /* > + * If this is a parallel shared hash table, then the value we have for > + * inner_rows refers only to the rows returned by each participant. For > + * shared hash table size estimation, we need the total number, so we need > + * to undo the division. > + */ > + if (table_type == HASHPATH_TABLE_SHARED_PARALLEL) > + inner_path_rows_total *= get_parallel_divisor(inner_path); > + > + /* > > Is the per-tuple cost really the same for HASHPATH_TABLE_SHARED_SERIAL > and PARALLEL? I *guess* the real cost for insertion depends on hard-to-estimate things like collision probability (many tuples into same bucket, also false sharing on same cacheline). I think the dynamic partitioning based parallel hash join systems would use the histogram to deal with balancing for their more course-grained disjoint version of this problem, but that seemed like overkill for this. I just added a simple GUC cpu_shared_tuple_cost to model the cost for inserting, primarily as a tie-breaker so that we'd prefer private hash tables to shared ones, unless shared ones allow us to avoid batching or enable parallel build. Let me try to measure the difference in insertion speeds with a few interesting key distributions and get back to you. > Don't we also need to somehow account for the more expensive hash-probes > in the HASHPATH_TABLE_SHARED_* cases? Seems quite possible that we'll > otherwise tend to use shared tables for small hashed tables that are > looked up very frequently, even though a private one will likely be > faster. Hmm. I don't expect hash probes to be more expensive. Why should they be: DSA address decoding? I will try to measure that too. With the costing as I have it, we should use private tables for small relations unless there is a partial plan available. If there is a partial plan it usually looks better because it gets to divide the whole shemozzle by 2, 3, 8 or whatever. To avoid using shared tables for small cheap to build tables even if there is a partial plan available I think we might need an extra cost term which estimates the number of times we expect to have to wait for peers, and how long you might have to wait. The simple version might be a GUC "synchronization_cost", which is the cost per anticipated barrier wait. In a typical single batch inner join we could charge one of those (for the wait between building and probing), and for a single batch outer join we could charge two (you also have to wait to begin the outer scan). Then, if the subplan looks really expensive (say a big scan with a lot of filtering), we'll still go for the partial plan so we can divide the cost by P and we'll come out ahead even though we have to pay one synchronisation cost, but if it looks cheap (seq scan of tiny table) we won't bother with a partial plan because the synchronisation cost wouldn't pay for itself. Add more for extra batches. But... that's a bit bogus, because the real cost isn't really some kind of fixed "synchronisation" per se; it's how long you think it'll take between the moment the average participant finishes building (ie runs out of tuples to insert) and the moment the last participant finishes. That comes down to the granularity of parallelism and the cost per tuple. For example, parallel index scans and parallel sequential scans read whole pages at a time; so at some point you hit the end of the supply of tuples, but one of your peers might have up to one whole page worth to process, so however long that takes, that's how long you'll have to wait for that guy to be finished and reach the barrier. That's quite tricky to estimate, unless you have a way to ask a child path "how many times to do I have to execute you to pull one 'granule' of data from your ultimate tuple source", and multiple that by the path's total cost / path's estimated rows, and then (say) guesstimate that on average you'll be twiddling your thumbs for half that many cost units. Or some better maths, but that sort of thing. Thoughts? (I suppose a partition-wise join as subplan of a Hash node might introduce an extreme case of course granularity if it allows participants to process whole join partitions on their own, so that a barrier wait at end-of-hash-table-build might leave everyone waiting 24 hours for one peer to finish pulling tuples from the final join partition in its subplan...?!) > > + /* > + * Set the table as sharable if appropriate, with parallel or serial > + * building. If parallel, the executor will also need an estimate of the > + * total number of rows expected from all participants. > + */ > > Oh. I was about to comment that sharable is wrong, just to discover it's > valid in NA. Weird. It does look pretty weird now that you mention it! I'll change it, because "shareable prevails by a 2:1 margin in American texts" according to http://grammarist.com/spelling/sharable-shareable/ , or maybe I'll change it to "shared". > > @@ -2096,6 +2096,7 @@ create_mergejoin_path(PlannerInfo *root, > * 'required_outer' is the set of required outer rels > * 'hashclauses' are the RestrictInfo nodes to use as hash clauses > * (this should be a subset of the restrict_clauses list) > + * 'table_type' to select [[Parallel] Shared] Hash > */ > HashPath * > create_hashjoin_path(PlannerInfo *root, > > Reminds me that you're not denoting the Parallel bit in explain right > now - intentionally so? Yes I am... here are the three cases: Hash Join-> [... some parallel-safe plan ...]-> Hash -> [... some parallel-safe plan ...] Parallel Hash Join-> [... some partial plan ...]-> Shared Hash -> [... some parallel-safe plan ...] Parallel Hash Join-> [... some partial plan ...]-> Parallel Shared Hash -> [... some partial plan ...] Make sense? > /* > - * To reduce palloc overhead, the HashJoinTuples for the current batch are > - * packed in 32kB buffers instead of pallocing each tuple individually. > + * To reduce palloc/dsa_allocate overhead, the HashJoinTuples for the current > + * batch are packed in 32kB buffers instead of pallocing each tuple > + * individually. > > s/palloc\/dsa_allocate/allocator/? Ok. > @@ -112,8 +121,12 @@ typedef struct HashMemoryChunkData > size_t maxlen; /* size of the buffer holding the tuples */ > size_t used; /* number of buffer bytes already used */ > > - struct HashMemoryChunkData *next; /* pointer to the next chunk (linked > - * list) */ > + /* pointer to the next chunk (linked list) */ > + union > + { > + dsa_pointer shared; > + struct HashMemoryChunkData *unshared; > + } next; > > This'll increase memory usage on some platforms, e.g. when using > spinlock backed atomics. I tend to think that that's fine, but it's > probably worth calling out. In the code quoted above it won't because that's a plain dsa_pointer, not an atomic one. But yeah you're right about HashJoinBucketHead. I will note with comments. If I'm looking at the right column of https://wiki.postgresql.org/wiki/Atomics then concretely we're talking about 80386 (not the more general i386 architecture but the specific dead chip), ARM v5, PA-RISC and SparcV8 (and presumably you'd only bother turning on parallel query if you had an SMP configuration), so it's a technicality to consider but as long as it compiles and produces the right answer on those machines I assume it's OK, right? (Postgres 4.2 also supported parallel hash joins on Sequent 80386 SMP systems and put a spinlock into each bucket so anyone upgrading their Sequent system directly from Postgres 4.2 to a theoretical future PostgreSQL version with this patch will hopefully not consider this to be a regression.) On the other hand, I could get rid of the union for each bucket slot and instead have a union that points to the first bucket, so that such systems don't have to pay for the wider buckets-with-spinlocks even when using private hash tables. Will look into that. Actually I was meaning to ask you something about this: is it OK to memset all the bucket heads to zero when clearing the hash table or do I have to loop over them and pg_atomic_write_XXX(&x, 0) to avoid upsetting the emulated atomic state into a bad state? If that memset is not safe on emulated-atomics systems then I guess I should probably consider macros to select between a loop or memset depending on the implementation. > > --- a/src/include/pgstat.h > +++ b/src/include/pgstat.h > @@ -787,7 +787,15 @@ typedef enum > WAIT_EVENT_MQ_SEND, > WAIT_EVENT_PARALLEL_FINISH, > WAIT_EVENT_SAFE_SNAPSHOT, > - WAIT_EVENT_SYNC_REP > + WAIT_EVENT_SYNC_REP, > + WAIT_EVENT_HASH_BEGINNING, > + WAIT_EVENT_HASH_CREATING, > + WAIT_EVENT_HASH_BUILDING, > + WAIT_EVENT_HASH_RESIZING, > + WAIT_EVENT_HASH_REINSERTING, > + WAIT_EVENT_HASH_UNMATCHED, > + WAIT_EVENT_HASHJOIN_PROBING, > + WAIT_EVENT_HASHJOIN_REWINDING > } WaitEventIPC; > > Hm. That seems a bit on the detailed side - if we're going that way it > seems likely that we'll end up with hundreds of wait events. I don't > think gradually evolving wait events into something like a query > progress framework is a good idea. I thought the idea was to label each wait point in the source so that an expert can see exactly why we're waiting. > That's it for now... Thanks! Plenty for me to go away and think about. I will post a new version soon. -- Thomas Munro http://www.enterprisedb.com
On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <andres@anarazel.de> wrote: > 0002-hj-add-dtrace-probes-v5.patch > > Hm. I'm personally very unenthusiastic about addming more of these, and > would rather rip all of them out. I tend to believe that static > problems simply aren't a good approach for anything requiring a lot of > detail. But whatever. I'm not a big fan of either static problems or static probes, myself. > Isn't it kinda weird to do this from within dense_alloc()? I mean that > dumps a lot of data to disk, frees a bunch of memory and so on - not > exactly what "dense_alloc" implies. Isn't the free()ing part also > dangerous, because the caller might actually use some of that memory, > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked > deeply enough to check whether that's an active bug, but it seems like > inviting one if not. I haven't looked at this, but one idea might be to just rename dense_alloc() to ExecHashBlahBlahSomething(). If there's a real abstraction layer problem here then we should definitely fix it, but maybe it's just the angle at which you hold your head. > To me that is a weakness in the ExecShutdownNode() API - imo child nodes > should get the chance to shutdown before the upper-level node. > ExecInitNode/ExecEndNode etc give individual nodes the freedom to do > things in the right order, but ExecShutdownNode() doesn't. I don't > quite see why we'd want to invent a separate ExecDetachNode() that'd be > called immediately before ExecShutdownNode(). Interestingly, the same point came up on the Parallel Bitmap Heap Scan thread. > An easy way to change that would be to return in the > ExecShutdownNode()'s T_GatherState case, and delegate the responsibility > of calling it on Gather's children to ExecShutdownGather(). > Alternatively we could make it a full-blown thing like ExecInitNode() > that every node needs to implement, but that seems a bit painful. I was thinking we should just switch things so that ExecShutdownNode() recurses first, and then does the current node. There's no real excuse for a node terminating the shutdown scan early, I think. > Or have I missed something here? > > Random aside: Wondered before if having to provide all executor > callbacks is a weakness of our executor integration, and whether it > shouldn't be a struct of callbacks instead... I honestly have no idea whether that would be better or worse from the CPU's point of view. > I think it's a bit weird that the parallel path now has an exit path > that the non-parallel path doesn't have. I'm not sure about this particular one, but in general those are pretty common. For example, look at the changes 569174f1be92be93f5366212cc46960d28a5c5cd made to _bt_first(). When you get there, you can discover that you aren't actually the first, and that in fact all the work is already complete, and there's nothing left for you to do but give up. > Hm. That seems a bit on the detailed side - if we're going that way it > seems likely that we'll end up with hundreds of wait events. I don't > think gradually evolving wait events into something like a query > progress framework is a good idea. I'm pretty strongly of the opinion that we should not reuse multiple wait events for the same purpose. The whole point of the wait event system is to identify what caused the wait. Having relatively recently done a ton of work to separate all of the waits in the system and identify them individually, I'm loathe to see us start melding things back together again. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Thu, Feb 16, 2017 at 9:08 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Thu, Feb 16, 2017 at 3:36 PM, Andres Freund <andres@anarazel.de> wrote: >> That's it for now... > > Thanks! Plenty for me to go away and think about. I will post a new > version soon. I'm testing a new version which incorporates feedback from Andres and Ashutosh, and is refactored to use a new SharedBufFileSet component to handle batch files, replacing the straw-man implementation from the v5 patch series. I've set this to waiting-on-author and will post v6 tomorrow. -- Thomas Munro http://www.enterprisedb.com
On Wed, Mar 1, 2017 at 10:40 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > I'm testing a new version which incorporates feedback from Andres and > Ashutosh, and is refactored to use a new SharedBufFileSet component to > handle batch files, replacing the straw-man implementation from the v5 > patch series. I've set this to waiting-on-author and will post v6 > tomorrow. I created a system for reference counted partitioned temporary files called SharedBufFileSet: see 0007-hj-shared-buf-file.patch. Then I ripped out the code for sharing batch files that I previously had cluttering up nodeHashjoin.c, and refactored it into a new component called a SharedTuplestore which wraps a SharedBufFileSet and gives it a tuple-based interface: see 0008-hj-shared-tuplestore.patch. The name implies aspirations of becoming a more generally useful shared analogue of tuplestore, but for now it supports only the exact access pattern needed for hash join batches ($10 wrench). It creates temporary files like this: base/pgsql_tmp/pgsql_tmp[pid].[set].[partition].[participant].[segment] I'm not sure why nodeHashjoin.c is doing raw batchfile read/write operations anyway; why not use tuplestore.c for that (as tuplestore.c's comments incorrectly say is the case)? Maybe because Tuplestore's interface doesn't support storing the extra hash value. In SharedTuplestore I solved that problem by introducing an optional fixed sized piece of per-tuple meta-data. Another thing that is different about SharedTuplestore is that it supports partitions, which is convenient for this project and probably other parallel projects too. In order for workers to be able to participate in reference counting schemes based on DSM segment lifetime, I had to give the Exec*InitializeWorker() functions access to the dsm_segment object, whereas previously they received only the shm_toc in order to access its contents. I invented ParallelWorkerContext which has just two members 'seg' and 'toc': see 0005-hj-let-node-have-seg-in-worker.patch. I didn't touch the FDW API or custom scan API where they currently take toc, though I can see that there is an argument that they should; changing those APIs seems like a bigger deal. Another approach would be to use ParallelContext, as passed into ExecXXXInitializeDSM, with the members that are not applicable to workers zeroed out. Thoughts? I got rid of the ExecDetachXXX stuff I had invented in the last version, because acf555bc fixed the problem a better way. I found that I needed to put use more than one toc entry for a single executor node, in order to reserve space for the inner and outer SharedTuplestore objects. So I invented a way to make more extra keys with PARALLEL_KEY_EXECUTOR_NTH(plan_node_id, N). -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
Hi, On 2017-03-07 02:57:30 +1300, Thomas Munro wrote: > I'm not sure why nodeHashjoin.c is doing raw batchfile read/write > operations anyway; why not use tuplestore.c for that (as > tuplestore.c's comments incorrectly say is the case)? Another reason presumably is that using tuplestores would make it harder to control the amount of memory used - we do *not* want an extra set of work_mem used here, right? - Andres
Hi, 0001: Do hash join work_mem accounting in chunks. Don't think there's much left to say. 0002: Check hash join work_mem usage at the point of chunk allocation. Modify the existing hash join code to detect work_mem exhaustion at the point where chunks are allocated, instead of checking after every tuple insertion. This matches the logic used for estimating, and more importantly allows for some parallelism in later patches. diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 406c180..af1b66d 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, int bucketNumber);staticvoid ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); -static void *dense_alloc(HashJoinTable hashtable, Size size); +static void *dense_alloc(HashJoinTable hashtable, Size size, + bool respect_work_mem); I still dislike this, but maybe Robert's point of: On 2017-02-16 08:57:21 -0500, Robert Haas wrote: > On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <andres@anarazel.de> wrote: > > Isn't it kinda weird to do this from within dense_alloc()? I mean that > > dumps a lot of data to disk, frees a bunch of memory and so on - not > > exactly what "dense_alloc" implies. Isn't the free()ing part also > > dangerous, because the caller might actually use some of that memory, > > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked > > deeply enough to check whether that's an active bug, but it seems like > > inviting one if not. > > I haven't looked at this, but one idea might be to just rename > dense_alloc() to ExecHashBlahBlahSomething(). If there's a real > abstraction layer problem here then we should definitely fix it, but > maybe it's just the angle at which you hold your head. Is enough. 0003: Scan for unmatched tuples in a hash join one chunk at a time. @@ -1152,8 +1155,65 @@ boolExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext){ HashJoinTablehashtable = hjstate->hj_HashTable; - HashJoinTuple hashTuple = hjstate->hj_CurTuple; + HashJoinTuple hashTuple; + MinimalTuple tuple; + + /* + * First, process the queue of chunks holding tuples that are in regular + * (non-skew) buckets. + */ + for (;;) + { + /* Do we need a new chunk to scan? */ + if (hashtable->current_chunk == NULL) + { + /* Have we run out of chunks to scan? */ + if (hashtable->unmatched_chunks == NULL) + break; + + /* Pop the next chunk from the front of the queue. */ + hashtable->current_chunk = hashtable->unmatched_chunks; + hashtable->unmatched_chunks = hashtable->current_chunk->next; + hashtable->current_chunk_index = 0; + } + + /* Have we reached the end of this chunk yet? */ + if (hashtable->current_chunk_index >= hashtable->current_chunk->used) + { + /* Go around again to get the next chunk from the queue. */ + hashtable->current_chunk = NULL; + continue; + } + + /* Take the next tuple from this chunk. */ + hashTuple = (HashJoinTuple) + (hashtable->current_chunk->data + hashtable->current_chunk_index); + tuple = HJTUPLE_MINTUPLE(hashTuple); + hashtable->current_chunk_index += + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + + /* Is it unmatched? */ + if (!HeapTupleHeaderHasMatch(tuple)) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* reset context each time (see below for explanation) */ + ResetExprContext(econtext); + return true; + } + } I suspect this might actually be slower than the current/old logic, because the current_chunk tests are repeated every loop. I think retaining the two loops the previous code had makes sense, i.e. one to find a relevant chunk, and one to iterate through all tuples in a chunk, checking for an unmatched one. Have you run a performance comparison pre/post this patch? I don't think there'd be a lot, but it seems important to verify that. I'd just run a tpc-h pre/post comparison (prewarmed, fully cache resident, parallelism disabled, hugepages is my personal recipe for the least run-over-run variance). 0004: Add a barrier primitive for synchronizing backends. +/*------------------------------------------------------------------------- + * + * barrier.c + * Barriers for synchronizing cooperating processes. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * This implementation of barriers allows for static sets of participants + * known up front, or dynamic sets of participants which processes can join + * or leave at any time. In the dynamic case, a phase number can be used to + * track progress through a parallel algorithm; in the static case it isn't + * needed. Why would a phase id generally not be needed in the static case? There's also further references to it ("Increments the current phase.") that dont quite jive with that. + * IDENTIFICATION + * src/backend/storage/ipc/barrier.c This could use a short example usage scenario. Without knowing existing usages of the "pattern", it's probably hard to grasp. + *------------------------------------------------------------------------- + */ + +#include "storage/barrier.h" Aren't you missing an include of postgres.h here? To quote postgres.h:* This should be the first file included by PostgreSQL backend modules.* Client-side code should includepostgres_fe.h instead. +bool +BarrierWait(Barrier *barrier, uint32 wait_event_info) +{ + bool first; + bool last; + int start_phase; + int next_phase; + + SpinLockAcquire(&barrier->mutex); + start_phase = barrier->phase; + next_phase = start_phase + 1; + ++barrier->arrived; + if (barrier->arrived == 1) + first = true; + else + first = false; + if (barrier->arrived == barrier->participants) + { + last = true; + barrier->arrived = 0; + barrier->phase = next_phase; + } + else + last = false; + SpinLockRelease(&barrier->mutex); Hm. So what's the defined concurrency protocol for non-static barriers, when they attach after the spinlock here has been released? I think the concurrency aspects deserve some commentary. Afaics it'll correctly just count as the next phase - without any blocking - but that shouldn't have to be inferred. Things might get wonky if that new participant then starts waiting for the new phase, violating the assert below... + /* + * Otherwise we have to wait for the last participant to arrive and + * advance the phase. + */ + ConditionVariablePrepareToSleep(&barrier->condition_variable); + for (;;) + { + bool advanced; + + SpinLockAcquire(&barrier->mutex); + Assert(barrier->phase == start_phase || barrier->phase == next_phase); + advanced = barrier->phase == next_phase; + SpinLockRelease(&barrier->mutex); + if (advanced) + break; + ConditionVariableSleep(&barrier->condition_variable, wait_event_info); + } + ConditionVariableCancelSleep(); + + return first; +} +/* + * Detach from a barrier. This may release other waiters from BarrierWait and + * advance the phase, if they were only waiting for this backend. Return + * true if this participant was the last to detach. + */ +bool +BarrierDetach(Barrier *barrier) +{ + bool release; + bool last; + + SpinLockAcquire(&barrier->mutex); + Assert(barrier->participants > 0); + --barrier->participants; + + /* + * If any other participants are waiting and we were the last participant + * waited for, release them. + */ + if (barrier->participants > 0 && + barrier->arrived == barrier->participants) + { + release = true; + barrier->arrived = 0; + barrier->phase++; + } + else + release = false; + + last = barrier->participants == 0; + SpinLockRelease(&barrier->mutex); + + if (release) + ConditionVariableBroadcast(&barrier->condition_variable); + + return last; +} Doesn't this, again, run into danger of leading to an assert failure in the loop in BarrierWait? +++ b/src/include/storage/barrier.h @@ -0,0 +1,42 @@ +/*------------------------------------------------------------------------- + * + * barrier.h + * Barriers for synchronizing workers. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/include/storage/barrier.h + * + *------------------------------------------------------------------------- + */ +#ifndef BARRIER_H +#define BARRIER_H + +/* + * For the header previously known as "barrier.h", please include + * "port/atomics.h", which deals with atomics, compiler barriers and memory + * barriers. + */ + +#include "postgres.h" Huh, that normally shouldn't be in a header. I see you introduced that in a bunch of other places too - that really doesn't look right to me. - Andres
Andres Freund <andres@anarazel.de> writes: > +++ b/src/include/storage/barrier.h > +#include "postgres.h" > Huh, that normally shouldn't be in a header. I see you introduced that > in a bunch of other places too - that really doesn't look right to me. That is absolutely not project style and is not acceptable. The core reason why not is that postgres.h/postgres_fe.h/c.h have to be the *first* inclusion in every compilation, for arcane portability reasons you really don't want to know about. (Suffice it to say that on some platforms, stdio.h isn't all that std.) Our coding rule for that is that we put the appropriate one of these first in every .c file, while .h files always assume that it's been included already. As soon as you break that convention, it becomes unclear from looking at a .c file whether the ordering requirement has been satisfied. Also, since now you've moved the must-be-first requirement to some other header file(s), you risk breakage when somebody applies another project convention about alphabetizing #include references for all headers other than those magic ones. In short, don't even think of doing this. regards, tom lane
On Wed, Mar 8, 2017 at 1:15 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Andres Freund <andres@anarazel.de> writes: >> +++ b/src/include/storage/barrier.h >> +#include "postgres.h" > >> Huh, that normally shouldn't be in a header. I see you introduced that >> in a bunch of other places too - that really doesn't look right to me. > > That is absolutely not project style and is not acceptable. > > The core reason why not is that postgres.h/postgres_fe.h/c.h have to be > the *first* inclusion in every compilation, for arcane portability reasons > you really don't want to know about. (Suffice it to say that on some > platforms, stdio.h isn't all that std.) Our coding rule for that is that > we put the appropriate one of these first in every .c file, while .h files > always assume that it's been included already. As soon as you break that > convention, it becomes unclear from looking at a .c file whether the > ordering requirement has been satisfied. Also, since now you've moved > the must-be-first requirement to some other header file(s), you risk > breakage when somebody applies another project convention about > alphabetizing #include references for all headers other than those magic > ones. Thanks for the explanation. Will post a new series addressing this and other complaints from Andres shortly. -- Thomas Munro http://www.enterprisedb.com
On Wed, Mar 8, 2017 at 12:58 PM, Andres Freund <andres@anarazel.de> wrote: > 0002: Check hash join work_mem usage at the point of chunk allocation. > > Modify the existing hash join code to detect work_mem exhaustion at > the point where chunks are allocated, instead of checking after every > tuple insertion. This matches the logic used for estimating, and more > importantly allows for some parallelism in later patches. > > diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c > index 406c180..af1b66d 100644 > --- a/src/backend/executor/nodeHash.c > +++ b/src/backend/executor/nodeHash.c > @@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, > int bucketNumber); > static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); > > -static void *dense_alloc(HashJoinTable hashtable, Size size); > +static void *dense_alloc(HashJoinTable hashtable, Size size, > + bool respect_work_mem); > > I still dislike this, but maybe Robert's point of: > > On 2017-02-16 08:57:21 -0500, Robert Haas wrote: >> On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <andres@anarazel.de> wrote: >> > Isn't it kinda weird to do this from within dense_alloc()? I mean that >> > dumps a lot of data to disk, frees a bunch of memory and so on - not >> > exactly what "dense_alloc" implies. Isn't the free()ing part also >> > dangerous, because the caller might actually use some of that memory, >> > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked >> > deeply enough to check whether that's an active bug, but it seems like >> > inviting one if not. >> >> I haven't looked at this, but one idea might be to just rename >> dense_alloc() to ExecHashBlahBlahSomething(). If there's a real >> abstraction layer problem here then we should definitely fix it, but >> maybe it's just the angle at which you hold your head. > > Is enough. There is a problem here. It can determine that it needs to increase the number of batches, effectively splitting the current batch, but then the caller goes on to insert the current tuple anyway, even though it may no longer belong in this batch. I will post a fix for that soon. I will also refactor it so that it doesn't do that work inside dense_alloc. You're right, that's too weird. In the meantime, here is a new patch series addressing the other things you raised. > 0003: Scan for unmatched tuples in a hash join one chunk at a time. > > > @@ -1152,8 +1155,65 @@ bool > ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) > { > HashJoinTable hashtable = hjstate->hj_HashTable; > - HashJoinTuple hashTuple = hjstate->hj_CurTuple; > + HashJoinTuple hashTuple; > + MinimalTuple tuple; > + > + /* > + * First, process the queue of chunks holding tuples that are in regular > + * (non-skew) buckets. > + */ > + for (;;) > + { > + /* Do we need a new chunk to scan? */ > + if (hashtable->current_chunk == NULL) > + { > + /* Have we run out of chunks to scan? */ > + if (hashtable->unmatched_chunks == NULL) > + break; > + > + /* Pop the next chunk from the front of the queue. */ > + hashtable->current_chunk = hashtable->unmatched_chunks; > + hashtable->unmatched_chunks = hashtable->current_chunk->next; > + hashtable->current_chunk_index = 0; > + } > + > + /* Have we reached the end of this chunk yet? */ > + if (hashtable->current_chunk_index >= hashtable->current_chunk->used) > + { > + /* Go around again to get the next chunk from the queue. */ > + hashtable->current_chunk = NULL; > + continue; > + } > + > + /* Take the next tuple from this chunk. */ > + hashTuple = (HashJoinTuple) > + (hashtable->current_chunk->data + hashtable->current_chunk_index); > + tuple = HJTUPLE_MINTUPLE(hashTuple); > + hashtable->current_chunk_index += > + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); > + > + /* Is it unmatched? */ > + if (!HeapTupleHeaderHasMatch(tuple)) > + { > + TupleTableSlot *inntuple; > + > + /* insert hashtable's tuple into exec slot */ > + inntuple = ExecStoreMinimalTuple(tuple, > + hjstate->hj_HashTupleSlot, > + false); /* do not pfree */ > + econtext->ecxt_innertuple = inntuple; > + > + /* reset context each time (see below for explanation) */ > + ResetExprContext(econtext); > + return true; > + } > + } > > I suspect this might actually be slower than the current/old logic, > because the current_chunk tests are repeated every loop. I think > retaining the two loops the previous code had makes sense, i.e. one to > find a relevant chunk, and one to iterate through all tuples in a chunk, > checking for an unmatched one. Ok, I've updated it to use two loops as suggested. I couldn't measure any speedup as a result but it's probably better code that way. > Have you run a performance comparison pre/post this patch? I don't > think there'd be a lot, but it seems important to verify that. I'd just > run a tpc-h pre/post comparison (prewarmed, fully cache resident, > parallelism disabled, hugepages is my personal recipe for the least > run-over-run variance). I haven't been able to measure any difference in TPCH results yet. I tried to contrive a simple test where there is a measurable difference. I created a pair of tables and repeatedly ran two FULL OUTER JOIN queries. In Q1 no unmatched tuples are found in the hash table, and in Q2 every tuple in the hash table turns out to be unmatched. I consistently measure just over 10% improvement. CREATE TABLE t1 AS SELECT generate_series(1, 10000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; CREATE TABLE t2 AS SELECT generate_series(10000001, 20000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; SET work_mem = '1GB'; -- Q1 SELECT COUNT(*) FROM t1 FULL OUTER JOIN t1 other USING (id); -- Q2 SELECT COUNT(*) FROM t1 FULL OUTER JOIN t2 USING (id); master: Q1 = 9.280s, Q2 = 9.645s 0003-hj-refactor-unmatched-v6.patch: Q1 = 8.341s, Q2 = 8.661s 0003-hj-refactor-unmatched-v7.patch: Q1 = 8.186s, Q2 = 8.642s > 0004: Add a barrier primitive for synchronizing backends. > > > +/*------------------------------------------------------------------------- > + * > + * barrier.c > + * Barriers for synchronizing cooperating processes. > + * > + * Copyright (c) 2017, PostgreSQL Global Development Group > + * > + * This implementation of barriers allows for static sets of participants > + * known up front, or dynamic sets of participants which processes can join > + * or leave at any time. In the dynamic case, a phase number can be used to > + * track progress through a parallel algorithm; in the static case it isn't > + * needed. > > Why would a phase id generally not be needed in the static case? > There's also further references to it ("Increments the current phase.") > that dont quite jive with that. I've extended that text at the top to explain. Short version: there is always a phase internally; that comment refers to the need for client code to examine it. Dynamic barrier users probably need to care what it is, since progress can be made while they're not attached so they need a way to find out about that after they attach, but static barriers generally don't need to care about the phase number because nothing can happen without explicit action from all participants so they should be in sync automatically. Hopefully the new comments explain that better. > + * IDENTIFICATION > + * src/backend/storage/ipc/barrier.c > > This could use a short example usage scenario. Without knowing existing > usages of the "pattern", it's probably hard to grasp. Examples added. > + *------------------------------------------------------------------------- > + */ > + > +#include "storage/barrier.h" > > Aren't you missing an include of postgres.h here? Fixed. > +bool > +BarrierWait(Barrier *barrier, uint32 wait_event_info) > +{ > + bool first; > + bool last; > + int start_phase; > + int next_phase; > + > + SpinLockAcquire(&barrier->mutex); > + start_phase = barrier->phase; > + next_phase = start_phase + 1; > + ++barrier->arrived; > + if (barrier->arrived == 1) > + first = true; > + else > + first = false; > + if (barrier->arrived == barrier->participants) > + { > + last = true; > + barrier->arrived = 0; > + barrier->phase = next_phase; > + } > + else > + last = false; > + SpinLockRelease(&barrier->mutex); > > Hm. So what's the defined concurrency protocol for non-static barriers, > when they attach after the spinlock here has been released? I think the > concurrency aspects deserve some commentary. Afaics it'll correctly > just count as the next phase - without any blocking - but that shouldn't > have to be inferred. It may join at start_phase or next_phase depending on what happened above. If it we just advanced the phase (by being the last to arrive) then another backend that attaches will be joining at phase == next_phase, and if that new backend calls BarrierWait it'll be waiting for the phase after that. > Things might get wonky if that new participant > then starts waiting for the new phase, violating the assert below... > + Assert(barrier->phase == start_phase || barrier->phase == next_phase); I've added a comment near that assertion that explains the reason the assertion holds. Short version: The caller is attached, so there is no way for the phase to advance beyond next_phase without the caller's participation; the only possibilities to consider in the wait loop are "we're still waiting" or "the final participant arrived or detached, advancing the phase and releasing me". Put another way, no waiting backend can ever see phase advance beyond next_phase, because in order to do so, the waiting backend would need to run BarrierWait again; barrier->arrived can never reach barrier->participants a second time while we're in that wait loop. > +/* > + * Detach from a barrier. This may release other waiters from BarrierWait and > + * advance the phase, if they were only waiting for this backend. Return > + * true if this participant was the last to detach. > + */ > +bool > +BarrierDetach(Barrier *barrier) > +{ > + bool release; > + bool last; > + > + SpinLockAcquire(&barrier->mutex); > + Assert(barrier->participants > 0); > + --barrier->participants; > + > + /* > + * If any other participants are waiting and we were the last participant > + * waited for, release them. > + */ > + if (barrier->participants > 0 && > + barrier->arrived == barrier->participants) > + { > + release = true; > + barrier->arrived = 0; > + barrier->phase++; > + } > + else > + release = false; > + > + last = barrier->participants == 0; > + SpinLockRelease(&barrier->mutex); > + > + if (release) > + ConditionVariableBroadcast(&barrier->condition_variable); > + > + return last; > +} > > Doesn't this, again, run into danger of leading to an assert failure in > the loop in BarrierWait? I believe this code is correct. The assertion in BarrierWait can't fail, because waiters know that there is no way for the phase to get any further ahead without their help (because they are attached): again, the only possibilities are phase == start_phase (implying that they received a spurious condition variable signal) or phase == next_phase (the last backend being waited on has finally arrived or detached, allowing other participants to proceed). I've attached a test module that starts N workers, and makes the workers attach, call BarrierWait a random number of times, then detach, and then rinse and repeat, until the phase reaches some large number and they all exit. This exercises every interleaving of the attach, wait, detach. CREATE EXTENSION test_barrier, then something like SELECT test_barrier_reattach_random(4, 1000000) to verify that no assertions are thrown and it always completes. > +#include "postgres.h" > > Huh, that normally shouldn't be in a header. I see you introduced that > in a bunch of other places too - that really doesn't look right to me. Fixed. -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Thu, Mar 9, 2017 at 3:58 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > In the meantime, here is a new patch series addressing the other > things you raised. Please see my remarks on 0007-hj-shared-buf-file-v7.patch over on the "on_dsm_detach() callback and parallel tuplesort BufFile resource management" thread. They still apply to this latest version of the patch series. -- Peter Geoghegan
There is a problem here. It can determine that it needs to increaseOn Wed, Mar 8, 2017 at 12:58 PM, Andres Freund <andres@anarazel.de> wrote:
> 0002: Check hash join work_mem usage at the point of chunk allocation.
>
> Modify the existing hash join code to detect work_mem exhaustion at
> the point where chunks are allocated, instead of checking after every
> tuple insertion. This matches the logic used for estimating, and more
> importantly allows for some parallelism in later patches.
>
> diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/ nodeHash.c
> index 406c180..af1b66d 100644
> --- a/src/backend/executor/nodeHash.c
> +++ b/src/backend/executor/nodeHash.c
> @@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
> int bucketNumber);
> static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
>
> -static void *dense_alloc(HashJoinTable hashtable, Size size);
> +static void *dense_alloc(HashJoinTable hashtable, Size size,
> + bool respect_work_mem);
>
> I still dislike this, but maybe Robert's point of:
>
> On 2017-02-16 08:57:21 -0500, Robert Haas wrote:
>> On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <andres@anarazel.de> wrote:
>> > Isn't it kinda weird to do this from within dense_alloc()? I mean that
>> > dumps a lot of data to disk, frees a bunch of memory and so on - not
>> > exactly what "dense_alloc" implies. Isn't the free()ing part also
>> > dangerous, because the caller might actually use some of that memory,
>> > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked
>> > deeply enough to check whether that's an active bug, but it seems like
>> > inviting one if not.
>>
>> I haven't looked at this, but one idea might be to just rename
>> dense_alloc() to ExecHashBlahBlahSomething(). If there's a real
>> abstraction layer problem here then we should definitely fix it, but
>> maybe it's just the angle at which you hold your head.
>
> Is enough.
the number of batches, effectively splitting the current batch, but
then the caller goes on to insert the current tuple anyway, even
though it may no longer belong in this batch. I will post a fix for
that soon. I will also refactor it so that it doesn't do that work
inside dense_alloc. You're right, that's too weird.
In the meantime, here is a new patch series addressing the other
things you raised.Ok, I've updated it to use two loops as suggested. I couldn't measure
> 0003: Scan for unmatched tuples in a hash join one chunk at a time.
>
>
> @@ -1152,8 +1155,65 @@ bool
> ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
> {
> HashJoinTable hashtable = hjstate->hj_HashTable;
> - HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> + HashJoinTuple hashTuple;
> + MinimalTuple tuple;
> +
> + /*
> + * First, process the queue of chunks holding tuples that are in regular
> + * (non-skew) buckets.
> + */
> + for (;;)
> + {
> + /* Do we need a new chunk to scan? */
> + if (hashtable->current_chunk == NULL)
> + {
> + /* Have we run out of chunks to scan? */
> + if (hashtable->unmatched_chunks == NULL)
> + break;
> +
> + /* Pop the next chunk from the front of the queue. */
> + hashtable->current_chunk = hashtable->unmatched_chunks;
> + hashtable->unmatched_chunks = hashtable->current_chunk->next;
> + hashtable->current_chunk_index = 0;
> + }
> +
> + /* Have we reached the end of this chunk yet? */
> + if (hashtable->current_chunk_index >= hashtable->current_chunk-> used)
> + {
> + /* Go around again to get the next chunk from the queue. */
> + hashtable->current_chunk = NULL;
> + continue;
> + }
> +
> + /* Take the next tuple from this chunk. */
> + hashTuple = (HashJoinTuple)
> + (hashtable->current_chunk->data + hashtable->current_chunk_ index);
> + tuple = HJTUPLE_MINTUPLE(hashTuple);
> + hashtable->current_chunk_index +=
> + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
> +
> + /* Is it unmatched? */
> + if (!HeapTupleHeaderHasMatch(tuple))
> + {
> + TupleTableSlot *inntuple;
> +
> + /* insert hashtable's tuple into exec slot */
> + inntuple = ExecStoreMinimalTuple(tuple,
> + hjstate->hj_HashTupleSlot,
> + false); /* do not pfree */
> + econtext->ecxt_innertuple = inntuple;
> +
> + /* reset context each time (see below for explanation) */
> + ResetExprContext(econtext);
> + return true;
> + }
> + }
>
> I suspect this might actually be slower than the current/old logic,
> because the current_chunk tests are repeated every loop. I think
> retaining the two loops the previous code had makes sense, i.e. one to
> find a relevant chunk, and one to iterate through all tuples in a chunk,
> checking for an unmatched one.
any speedup as a result but it's probably better code that way.
> Have you run a performance comparison pre/post this patch? I don't
> think there'd be a lot, but it seems important to verify that. I'd just
> run a tpc-h pre/post comparison (prewarmed, fully cache resident,
> parallelism disabled, hugepages is my personal recipe for the least
> run-over-run variance).
I haven't been able to measure any difference in TPCH results yet. I
tried to contrive a simple test where there is a measurable
difference. I created a pair of tables and repeatedly ran two FULL
OUTER JOIN queries. In Q1 no unmatched tuples are found in the hash
table, and in Q2 every tuple in the hash table turns out to be
unmatched. I consistently measure just over 10% improvement.
CREATE TABLE t1 AS
SELECT generate_series(1, 10000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa ';
CREATE TABLE t2 AS
SELECT generate_series(10000001, 20000000) AS id,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa ';
SET work_mem = '1GB';
-- Q1
SELECT COUNT(*)
FROM t1 FULL OUTER JOIN t1 other USING (id);
-- Q2
SELECT COUNT(*)
FROM t1 FULL OUTER JOIN t2 USING (id);
master: Q1 = 9.280s, Q2 = 9.645s
0003-hj-refactor-unmatched-v6.patch: Q1 = 8.341s, Q2 = 8.661s
0003-hj-refactor-unmatched-v7.patch: Q1 = 8.186s, Q2 = 8.642s
> 0004: Add a barrier primitive for synchronizing backends.
>
>
> +/*--------------------------------------------------------- ----------------
> + *
> + * barrier.c
> + * Barriers for synchronizing cooperating processes.
> + *
> + * Copyright (c) 2017, PostgreSQL Global Development Group
> + *
> + * This implementation of barriers allows for static sets of participants
> + * known up front, or dynamic sets of participants which processes can join
> + * or leave at any time. In the dynamic case, a phase number can be used to
> + * track progress through a parallel algorithm; in the static case it isn't
> + * needed.
>
> Why would a phase id generally not be needed in the static case?
> There's also further references to it ("Increments the current phase.")
> that dont quite jive with that.
I've extended that text at the top to explain.
Short version: there is always a phase internally; that comment refers
to the need for client code to examine it. Dynamic barrier users
probably need to care what it is, since progress can be made while
they're not attached so they need a way to find out about that after
they attach, but static barriers generally don't need to care about
the phase number because nothing can happen without explicit action
from all participants so they should be in sync automatically.
Hopefully the new comments explain that better.
> + * IDENTIFICATION
> + * src/backend/storage/ipc/barrier.c
>
> This could use a short example usage scenario. Without knowing existing
> usages of the "pattern", it's probably hard to grasp.
Examples added.
> + *----------------------------------------------------------- --------------
> + */
> +
> +#include "storage/barrier.h"
>
> Aren't you missing an include of postgres.h here?
Fixed.It may join at start_phase or next_phase depending on what happened
> +bool
> +BarrierWait(Barrier *barrier, uint32 wait_event_info)
> +{
> + bool first;
> + bool last;
> + int start_phase;
> + int next_phase;
> +
> + SpinLockAcquire(&barrier->mutex);
> + start_phase = barrier->phase;
> + next_phase = start_phase + 1;
> + ++barrier->arrived;
> + if (barrier->arrived == 1)
> + first = true;
> + else
> + first = false;
> + if (barrier->arrived == barrier->participants)
> + {
> + last = true;
> + barrier->arrived = 0;
> + barrier->phase = next_phase;
> + }
> + else
> + last = false;
> + SpinLockRelease(&barrier->mutex);
>
> Hm. So what's the defined concurrency protocol for non-static barriers,
> when they attach after the spinlock here has been released? I think the
> concurrency aspects deserve some commentary. Afaics it'll correctly
> just count as the next phase - without any blocking - but that shouldn't
> have to be inferred.
above. If it we just advanced the phase (by being the last to arrive)
then another backend that attaches will be joining at phase ==
next_phase, and if that new backend calls BarrierWait it'll be waiting
for the phase after that.
> Things might get wonky if that new participant
> then starts waiting for the new phase, violating the assert below...
> + Assert(barrier->phase == start_phase || barrier->phase == next_phase);
I've added a comment near that assertion that explains the reason the
assertion holds.
Short version: The caller is attached, so there is no way for the
phase to advance beyond next_phase without the caller's participation;
the only possibilities to consider in the wait loop are "we're still
waiting" or "the final participant arrived or detached, advancing the
phase and releasing me".
Put another way, no waiting backend can ever see phase advance beyond
next_phase, because in order to do so, the waiting backend would need
to run BarrierWait again; barrier->arrived can never reach
barrier->participants a second time while we're in that wait loop.I believe this code is correct. The assertion in BarrierWait can't
> +/*
> + * Detach from a barrier. This may release other waiters from BarrierWait and
> + * advance the phase, if they were only waiting for this backend. Return
> + * true if this participant was the last to detach.
> + */
> +bool
> +BarrierDetach(Barrier *barrier)
> +{
> + bool release;
> + bool last;
> +
> + SpinLockAcquire(&barrier->mutex);
> + Assert(barrier->participants > 0);
> + --barrier->participants;
> +
> + /*
> + * If any other participants are waiting and we were the last participant
> + * waited for, release them.
> + */
> + if (barrier->participants > 0 &&
> + barrier->arrived == barrier->participants)
> + {
> + release = true;
> + barrier->arrived = 0;
> + barrier->phase++;
> + }
> + else
> + release = false;
> +
> + last = barrier->participants == 0;
> + SpinLockRelease(&barrier->mutex);
> +
> + if (release)
> + ConditionVariableBroadcast(&barrier->condition_variable);
> +
> + return last;
> +}
>
> Doesn't this, again, run into danger of leading to an assert failure in
> the loop in BarrierWait?
fail, because waiters know that there is no way for the phase to get
any further ahead without their help (because they are attached):
again, the only possibilities are phase == start_phase (implying that
they received a spurious condition variable signal) or phase ==
next_phase (the last backend being waited on has finally arrived or
detached, allowing other participants to proceed).
I've attached a test module that starts N workers, and makes the
workers attach, call BarrierWait a random number of times, then
detach, and then rinse and repeat, until the phase reaches some large
number and they all exit. This exercises every interleaving of the
attach, wait, detach. CREATE EXTENSION test_barrier, then something
like SELECT test_barrier_reattach_random(4, 1000000) to verify that no
assertions are thrown and it always completes.
> +#include "postgres.h"
>
> Huh, that normally shouldn't be in a header. I see you introduced that
> in a bunch of other places too - that really doesn't look right to me.
Fixed.
Attachment
On Mon, Mar 13, 2017 at 8:40 PM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > In an attempt to test v7 of this patch on TPC-H 20 scale factor I found a > few regressions, > Q21: 52 secs on HEAD and 400 secs with this patch Thanks Rafia. Robert just pointed out off-list that there is a bogus 0 row estimate in here: -> Parallel Hash Semi Join (cost=1006599.34..1719227.30 rows=0 width=24) (actual time=38716.488..100933.250 rows=7315896 loops=5) Will investigate, thanks. > Q8: 8 secs on HEAD to 14 secs with patch Also looking into this one. -- Thomas Munro http://www.enterprisedb.com
On Tue, Mar 14, 2017 at 8:03 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Mon, Mar 13, 2017 at 8:40 PM, Rafia Sabih > <rafia.sabih@enterprisedb.com> wrote: >> In an attempt to test v7 of this patch on TPC-H 20 scale factor I found a >> few regressions, >> Q21: 52 secs on HEAD and 400 secs with this patch > > Thanks Rafia. Robert just pointed out off-list that there is a bogus > 0 row estimate in here: > > -> Parallel Hash Semi Join (cost=1006599.34..1719227.30 rows=0 > width=24) (actual time=38716.488..100933.250 rows=7315896 loops=5) > > Will investigate, thanks. There are two problems here. 1. There is a pre-existing cardinality estimate problem for semi-joins with <> filters. The big Q21 regression reported by Rafia is caused by that phenomenon, probably exacerbated by another bug that allowed 0 cardinality estimates to percolate inside the planner. Estimates have been clamped at or above 1.0 since her report by commit 1ea60ad6. I started a new thread to discuss that because it's unrelated to this patch, except insofar as it confuses the planner about Q21 (with or without parallelism). Using one possible selectivity tweak suggested by Tom Lane, I was able to measure significant speedups on otherwise unpatched master: https://www.postgresql.org/message-id/CAEepm%3D11BiYUkgXZNzMtYhXh4S3a9DwUP8O%2BF2_ZPeGzzJFPbw%40mail.gmail.com 2. If you compare master tweaked as above against the latest version of my patch series with the tweak, then the patched version always runs faster with 4 or more workers, but with only 1 or 2 workers Q21 is a bit slower... but not always. I realised that there was a bi-modal distribution of execution times. It looks like my 'early exit' protocol, designed to make tuple-queue deadlock impossible, is often causing us to lose a worker. I am working on that now. I have code changes for Peter G's and Andres's feedback queued up and will send a v8 series shortly, hopefully with a fix for problem 2 above. -- Thomas Munro http://www.enterprisedb.com
Hi, Here is a new version of the patch series addressing complaints from Rafia, Peter, Andres and Robert -- see below. First, two changes not already covered in this thread: 1. Today Robert asked me a question off-list that I hadn't previously considered: since I am sharing tuples between backends, don't I have the same kind of transient record remapping problems that tqueue.c has to deal with? The answer must be yes, and in fact it's a trickier version because there are N 'senders' and N 'receivers' communicating via the shared hash table. So I decided to avoid the problem by not planning shared hash tables if the tuples could include transient RECORD types: see tlist_references_transient_type() in 0007-hj-shared-single-batch-v8.patch. Perhaps in the future we can find a way for parallel query to keep local types in sync, so this restriction could be lifted. (I've tested this with a specially modified build, because I couldn't figure out how to actually get any transient types to be considered in a parallel query, but if someone has a suggestion for a good query for that I'd love to put one into the regression test.) 2. Earlier versions included support for Shared Hash (= one worker builds, other workers wait, but we get to use work_mem * P memory) and Parallel Shared Hash (= all workers build). Shared Hash is by now quite hard to reach, since so many hash join inner plans are now parallelisable. I decided to remove support for it from the latest patch series: I think it adds cognitive load and patch lines for little or no gain. With time running out, I thought that it would be better to rip it out for now to try to simplify things and avoid some difficult questions about how to cost that mode. It could be added with a separate patch after some more study if it really does make some sense. >> On Mon, Mar 13, 2017 at 8:40 PM, Rafia Sabih >> <rafia.sabih@enterprisedb.com> wrote: >>> In an attempt to test v7 of this patch on TPC-H 20 scale factor I found a >>> few regressions, >>> Q21: 52 secs on HEAD and 400 secs with this patch As already mentioned there is a planner bug which we can fix separately from this patch series. Until that is resolved, please see that other thread[1] for the extra tweak require for sane results when testing Q21. Even with that tweak, there was a slight regression with fewer than 3 workers at 1GB for Q21. That turned out to be because the patched version was not always using as many workers as unpatched. To fix that, I had to rethink the deadlock avoidance system to make it a bit less conservative about giving up workers: see src/backend/utils/misc/leader_gate.c in 0007-hj-shared-single-batch-v8.patch. Here are some speed-up numbers comparing master to patched that I recorded on TPCH scale 10 with work_mem = 1GB. These are the queries whose plans change with the patch. Both master and v8 were patched with fix-neqseljoin-for-semi-joins.patch. query | w = 0 | w = 1 | w = 2 | w = 3 | w = 4 | w = 5 | w = 6 | w = 7 | w = 8 -------+-------+-------+-------+-------+-------+-------+-------+-------+------- Q3 | 0.94x | 1.06x | 1.25x | 1.46x | 1.64x | 1.87x | 1.99x | 1.67x | 1.67x Q5 | 1.17x | 1.03x | 1.23x | 1.27x | 1.44x | 0.56x | 0.95x | 0.94x | 1.16x Q7 | 1.13x | 1.04x | 1.31x | 1.06x | 1.15x | 1.28x | 1.31x | 1.35x | 1.13x Q8 | 0.99x | 1.13x | 1.23x | 1.22x | 1.36x | 0.42x | 0.82x | 0.78x | 0.81x Q9 | 1.16x | 0.95x | 1.92x | 1.68x | 1.90x | 1.89x | 2.02x | 2.05x | 1.81x Q10 | 1.01x | 1.03x | 1.08x | 1.10x | 1.16x | 1.17x | 1.09x | 1.01x | 1.07x Q12 | 1.03x | 1.19x | 1.42x | 0.75x | 0.74x | 1.00x | 0.99x | 1.00x | 1.01x Q13 | 1.10x | 1.66x | 1.99x | 1.00x | 1.12x | 1.00x | 1.12x | 1.01x | 1.13x Q14 | 0.97x | 1.13x | 1.22x | 1.45x | 1.43x | 1.55x | 1.55x | 1.50x | 1.45x Q16 | 1.02x | 1.13x | 1.07x | 1.09x | 1.10x | 1.10x | 1.13x | 1.10x | 1.11x Q18 | 1.05x | 1.43x | 1.33x | 1.21x | 1.07x | 1.57x | 1.76x | 1.09x | 1.09x Q21 | 0.99x | 1.01x | 1.07x | 1.18x | 1.28x | 1.37x | 1.63x | 1.26x | 1.60x These tests are a bit short and noisy and clearly there are some strange dips in there that need some investigation but the trend is positive. Here are some numbers from some simple test joins, so that you can see the raw speedup of large hash joins without all the other things going on in those TPCH plans. I executed 1-join, 2-join and 3-join queries like this: CREATE TABLE simple AS SELECT generate_series(1, 10000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; ANALYZE simple; SELECT COUNT(*) FROM simple r JOIN simple s USING (id); SELECT COUNT(*) FROM simple r JOIN simple s USING (id) JOIN simple t USING (id); SELECT COUNT(*) FROM simple r JOIN simple s USING (id) JOIN simple t USING (id) JOIN simple u USING (id); Unpatched master can make probing go faster by adding workers, but not building, so in these self-joins the ability to scale with more CPUs is limited (here w = 1 shows the speedup compared to the base case of w = 0): joins | w = 0 | w = 1 | w = 2 | w = 3 | w = 4 | w = 5 -------+-------------+-------+-------+-------+-------+------- 1 | 10746.395ms | 1.46x | 1.66x | 1.63x | 1.49x | 1.36x 2 | 20057.117ms | 1.41x | 1.58x | 1.59x | 1.43x | 1.28x 3 | 30108.872ms | 1.29x | 1.39x | 1.36x | 1.35x | 1.03x With the patch, scalability is better because extra CPUs can be used for both of these phases (though probably limited here by my 4 core machine): joins | w = 0 | w = 1 | w = 2 | w = 3 | w = 4 | w = 5 -------+-------------+-------+-------+-------+-------+------- 1 | 10820.613ms | 1.86x | 2.62x | 2.99x | 3.04x | 2.90x 2 | 20348.011ms | 1.83x | 2.54x | 2.71x | 3.06x | 3.17x 3 | 30074.413ms | 1.82x | 2.49x | 2.79x | 3.08x | 3.27x On Thu, Feb 16, 2017 at 3:36 PM, Andres Freund <andres@anarazel.de> wrote: > I think the synchronization protocol with the various phases needs to be > documented somewhere. Probably in nodeHashjoin.c's header. I will supply that shortly. > Don't we also need to somehow account for the more expensive hash-probes > in the HASHPATH_TABLE_SHARED_* cases? Seems quite possible that we'll > otherwise tend to use shared tables for small hashed tables that are > looked up very frequently, even though a private one will likely be > faster. In this version I have two GUCs: cpu_shared_tuple_cost to account for the extra cost of building a shared hash table. cpu_synchronization_cost to account for the cost of waiting for a barrier between building and probing, probing and unmatched-scan if outer, and so on for future batches. I'm not yet sure what their default settings should be, but these provide the mechanism to discourage the case you're talking about. On Wed, Mar 8, 2017 at 12:58 PM, Andres Freund <andres@anarazel.de> wrote: > +static void *dense_alloc(HashJoinTable hashtable, Size size, > + bool respect_work_mem); > > I still dislike this, but maybe Robert's point of: > ... > Is enough. I this version I changed the name to load_(private|shared)_tuple, and made it return NULL to indicate that work_mem would be exceeded. The caller needs to handle that by trying to shrink the hash table. Is this better? On Fri, Mar 10, 2017 at 3:02 PM, Peter Geoghegan <pg@bowt.ie> wrote: > On Thu, Mar 9, 2017 at 4:29 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> Yeah, this seems to fall out of the requirement to manage a growable >> number of partition files in a fixed space. I wonder how this could >> go wrong. One way would be for a crash-restart to happen (which >> leaves all temporary files in place by design, though it could clean >> them up like a normal restart if it wanted to), followed by a very >> long running cluster eventually generating the same (pid, set number) >> pair. I think I see a simple way to defend against that, which I'll >> write about in the PHJ thread. > > I am not expressing any real opinion about the idea of relying on or > suppressing ENOENT-on-unlink() just yet. What's clear is that that's > unorthodox. I seldom have any practical reason to make a distinction > between unorthodox and unacceptable. It's usually easier to just not > do the unorthodox thing. Maybe this is one of the rare exceptions. In 0008-hj-shared-buf-file-v8.patch, the problem I mentioned above is addressed; see make_tagged_segment(). >> Thanks. I will respond with code and comments over on the PHJ thread. >> Aside from the broken extendBufFile behaviour you mentioned, I'll look >> into the general modularity complaints I'm hearing about fd.c and >> buffile.c interaction. > > buffile.c should stop pretending to care about anything other than > temp files, IMV. 100% of all clients that want temporary files go > through buffile.c. 100% of all clients that want non-temp files (files > which are not marked FD_TEMPORARY) access fd.c directly, rather than > going through buffile.c. I still need BufFile because I want buffering. There are 3 separate characteristics enabled by flags with 'temporary' in their name. I think we should consider separating the concerns by splitting and renaming them: 1. Segmented BufFile behaviour. I propose renaming BufFile's isTemp member to isSegmented, because that is what it really does. I want that feature independently without getting confused about lifetimes. Tested with small MAX_PHYSICAL_FILESIZE as you suggested. 2. The temp_file_limit system. Currently this applies to fd.c files opened with FD_TEMPORARY. You're right that we shouldn't be able to escape that sanity check on disk space just because we want to manage disk file ownership differently. I propose that we create a new flag FD_TEMP_FILE_LIMIT that can be set independently of the flags controlling disk file lifetime. When working with SharedBufFileSet, the limit applies to each backend in respect of files it created, while it has them open. This seems a lot simpler than any shared-temp-file-limit type scheme and is vaguely similar to the way work_mem applies in each backend for parallel query. 3. Delete-on-close/delete-at-end-of-xact. I don't want to use that facility so I propose disconnecting it from the above. We c{ould rename those fd.c-internal flags FD_TEMPORARY and FD_XACT_TEMPORARY to FD_DELETE_AT_CLOSE and FD_DELETE_AT_EOXACT. As shown in 0008-hj-shared-buf-file-v8.patch. Thoughts? [1] https://www.postgresql.org/message-id/flat/CAEepm%3D270ze2hVxWkJw-5eKzc3AB4C9KpH3L2kih75R5pdSogg%40mail.gmail.com -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Tue, Mar 21, 2017 at 5:07 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: >> buffile.c should stop pretending to care about anything other than >> temp files, IMV. 100% of all clients that want temporary files go >> through buffile.c. 100% of all clients that want non-temp files (files >> which are not marked FD_TEMPORARY) access fd.c directly, rather than >> going through buffile.c. > > I still need BufFile because I want buffering. > > There are 3 separate characteristics enabled by flags with 'temporary' > in their name. I think we should consider separating the concerns by > splitting and renaming them: > > 1. Segmented BufFile behaviour. I propose renaming BufFile's isTemp > member to isSegmented, because that is what it really does. I want > that feature independently without getting confused about lifetimes. > Tested with small MAX_PHYSICAL_FILESIZE as you suggested. I would have proposed to get rid of the isTemp field entirely. It is always true with current usage, any only #ifdef NOT_USED code presumes that it could be any other way. BufFile is all about temp files, which ISTM should be formalized. The whole point of BufFile is to segment fd.c temp file segments. Who would ever want to use BufFile without that capability anyway? > 2. The temp_file_limit system. Currently this applies to fd.c files > opened with FD_TEMPORARY. You're right that we shouldn't be able to > escape that sanity check on disk space just because we want to manage > disk file ownership differently. I propose that we create a new flag > FD_TEMP_FILE_LIMIT that can be set independentlyisTemp of the flags > controlling disk file lifetime. When working with SharedBufFileSet, > the limit applies to each backend in respect of files it created, > while it has them open. This seems a lot simpler than any > shared-temp-file-limit type scheme and is vaguely similar to the way > work_mem applies in each backend for parallel query. I agree that that makes sense as a user-visible behavior of temp_file_limit. This user-visible behavior is what I actually implemented for parallel CREATE INDEX. > 3. Delete-on-close/delete-at-end-of-xact. I don't want to use that > facility so I propose disconnecting it from the above. We c{ould > rename those fd.c-internal flags FD_TEMPORARY and FD_XACT_TEMPORARY to > FD_DELETE_AT_CLOSE and FD_DELETE_AT_EOXACT. This reliably unlink()s all files, albeit while relying on unlink() ENOENT as a condition that terminates deletion of one particular worker's BufFile's segments. However, because you effectively no longer use resowner.c, ISTM that there is still a resource leak in error paths. ResourceOwnerReleaseInternal() won't call FileClose() for temp-ish files (that are not quite temp files in the current sense) in the absence of no other place managing to do so, such as BufFileClose(). How can you be sure that you'll actually close() the FD itself (not vFD) within fd.c in the event of an error? Or Delete(), which does some LRU maintenance for backend's local VfdCache? If I follow the new code correctly, then it doesn't matter that you've unlink()'d to take care of the more obvious resource management chore. You can still have a reference leak like this, if I'm not mistaken, because you still have backend local state (local VfdCache) that is left totally decoupled with the new "shadow resource manager" for shared BufFiles. > As shown in 0008-hj-shared-buf-file-v8.patch. Thoughts? A less serious issue I've also noticed is that you add palloc() calls, implicitly using the current memory context, within buffile.c. BufFileOpenTagged() has some, for example. However, there is a note that we don't need to save the memory context when we open a BufFile because we always repalloc(). That is no longer the case here. -- Peter Geoghegan
On Tue, Mar 21, 2017 at 7:18 PM, Peter Geoghegan <pg@bowt.ie> wrote: >> As shown in 0008-hj-shared-buf-file-v8.patch. Thoughts? > > A less serious issue I've also noticed is that you add palloc() calls, > implicitly using the current memory context, within buffile.c. > BufFileOpenTagged() has some, for example. However, there is a note > that we don't need to save the memory context when we open a BufFile > because we always repalloc(). That is no longer the case here. Similarly, I think that your new type of BufFile has no need to save CurrentResourceOwner, because it won't ever actually be used. I suppose that you should at least note this in comments. -- Peter Geoghegan
Hi, Here is a new version addressing feedback from Peter and Andres. Please see below. On Wed, Mar 22, 2017 at 3:18 PM, Peter Geoghegan <pg@bowt.ie> wrote: > On Tue, Mar 21, 2017 at 5:07 AM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >>> buffile.c should stop pretending to care about anything other than >>> temp files, IMV. 100% of all clients that want temporary files go >>> through buffile.c. 100% of all clients that want non-temp files (files >>> which are not marked FD_TEMPORARY) access fd.c directly, rather than >>> going through buffile.c. >> >> I still need BufFile because I want buffering. >> >> There are 3 separate characteristics enabled by flags with 'temporary' >> in their name. I think we should consider separating the concerns by >> splitting and renaming them: >> >> 1. Segmented BufFile behaviour. I propose renaming BufFile's isTemp >> member to isSegmented, because that is what it really does. I want >> that feature independently without getting confused about lifetimes. >> Tested with small MAX_PHYSICAL_FILESIZE as you suggested. > > I would have proposed to get rid of the isTemp field entirely. It is > always true with current usage, any only #ifdef NOT_USED code presumes > that it could be any other way. BufFile is all about temp files, which > ISTM should be formalized. The whole point of BufFile is to segment > fd.c temp file segments. Who would ever want to use BufFile without > that capability anyway? Yeah, it looks like you're probably right, but I guess others could have uses for BufFile that we don't know about. It doesn't seem like it hurts to leave the variable in existence. >> 2. The temp_file_limit system. Currently this applies to fd.c files >> opened with FD_TEMPORARY. You're right that we shouldn't be able to >> escape that sanity check on disk space just because we want to manage >> disk file ownership differently. I propose that we create a new flag >> FD_TEMP_FILE_LIMIT that can be set independentlyisTemp of the flags >> controlling disk file lifetime. When working with SharedBufFileSet, >> the limit applies to each backend in respect of files it created, >> while it has them open. This seems a lot simpler than any >> shared-temp-file-limit type scheme and is vaguely similar to the way >> work_mem applies in each backend for parallel query. > > I agree that that makes sense as a user-visible behavior of > temp_file_limit. This user-visible behavior is what I actually > implemented for parallel CREATE INDEX. Ok, good. >> 3. Delete-on-close/delete-at-end-of-xact. I don't want to use that >> facility so I propose disconnecting it from the above. We c{ould >> rename those fd.c-internal flags FD_TEMPORARY and FD_XACT_TEMPORARY to >> FD_DELETE_AT_CLOSE and FD_DELETE_AT_EOXACT. > > This reliably unlink()s all files, albeit while relying on unlink() > ENOENT as a condition that terminates deletion of one particular > worker's BufFile's segments. However, because you effectively no > longer use resowner.c, ISTM that there is still a resource leak in > error paths. ResourceOwnerReleaseInternal() won't call FileClose() for > temp-ish files (that are not quite temp files in the current sense) in > the absence of no other place managing to do so, such as > BufFileClose(). How can you be sure that you'll actually close() the > FD itself (not vFD) within fd.c in the event of an error? Or Delete(), > which does some LRU maintenance for backend's local VfdCache? Yeah, I definitely need to use resowner.c. The only thing I want to opt out of is automatic file deletion in that code path. > If I follow the new code correctly, then it doesn't matter that you've > unlink()'d to take care of the more obvious resource management chore. > You can still have a reference leak like this, if I'm not mistaken, > because you still have backend local state (local VfdCache) that is > left totally decoupled with the new "shadow resource manager" for > shared BufFiles. You're right. The attached version fixes these problems. The BufFiles created or opened in this new way now participate in both of our leak-detection and clean-up schemes: the one in resowner.c (because I'm now explicitly registering with it as I had failed to do before) and the one in CleanupTempFiles (because FD_CLOSE_AT_EOXACT is set, which I already had in the previous version for the creator, but not the opener of such a file). I tested by commenting out my explicit BufFileClose calls to check that resowner.c starts complaining, and then by commenting out the resowner registration too to check that CleanupTempFiles starts complaining. >> As shown in 0008-hj-shared-buf-file-v8.patch. Thoughts? > > A less serious issue I've also noticed is that you add palloc() calls, > implicitly using the current memory context, within buffile.c. > BufFileOpenTagged() has some, for example. However, there is a note > that we don't need to save the memory context when we open a BufFile > because we always repalloc(). That is no longer the case here. I don't see a problem here. BufFileOpenTagged() is similar to BufFileCreateTemp() which calls makeBufFile() and thereore returns a result that is allocated in the current memory context. This seems like the usual deal. Thanks for the review! On Wed, Mar 22, 2017 at 1:07 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Thu, Feb 16, 2017 at 3:36 PM, Andres Freund <andres@anarazel.de> wrote: >> I think the synchronization protocol with the various phases needs to be >> documented somewhere. Probably in nodeHashjoin.c's header. > > I will supply that shortly. Added in the attached version. -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Wed, Mar 22, 2017 at 3:17 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: >> If I follow the new code correctly, then it doesn't matter that you've >> unlink()'d to take care of the more obvious resource management chore. >> You can still have a reference leak like this, if I'm not mistaken, >> because you still have backend local state (local VfdCache) that is >> left totally decoupled with the new "shadow resource manager" for >> shared BufFiles. > > You're right. The attached version fixes these problems. The > BufFiles created or opened in this new way now participate in both of > our leak-detection and clean-up schemes: the one in resowner.c > (because I'm now explicitly registering with it as I had failed to do > before) and the one in CleanupTempFiles (because FD_CLOSE_AT_EOXACT is > set, which I already had in the previous version for the creator, but > not the opener of such a file). I tested by commenting out my > explicit BufFileClose calls to check that resowner.c starts > complaining, and then by commenting out the resowner registration too > to check that CleanupTempFiles starts complaining. I took a quick look at your V9 today. This is by no means a comprehensive review of 0008-hj-shared-buf-file-v9.patch, but it's what I can manage right now. The main change you made is well represented by the following part of the patch, where you decouple close at eoXact with delete at eoXact, with the intention of doing one but not the other for BufFiles that are shared: > /* these are the assigned bits in fdstate below: */ > -#define FD_TEMPORARY (1 << 0) /* T = delete when closed */ > -#define FD_XACT_TEMPORARY (1 << 1) /* T = delete at eoXact */ > +#define FD_DELETE_AT_CLOSE (1 << 0) /* T = delete when closed */ > +#define FD_CLOSE_AT_EOXACT (1 << 1) /* T = close at eoXact */ > +#define FD_TEMP_FILE_LIMIT (1 << 2) /* T = respect temp_file_limit */ So, shared BufFile fd.c segments within backend local resource manager do not have FD_DELETE_AT_CLOSE set, because you mean to do that part yourself by means of generic shared cleanup through dynamic shared memory segment callback. So far so good. However, I notice that the place that this happens instead, PathNameDelete(), does not repeat the fd.c step of doing a final stat(), and using the stats for a pgstat_report_tempfile(). So, a new pgstat_report_tempfile() call is simply missing. However, the more fundamental issue in my mind is: How can you fix that? Where would it go if you had it? If you do the obvious thing of just placing that before the new unlink() within PathNameDelete(), on the theory that that needs parity with the fd.c stuff, that has non-obvious implications. Does the pgstat_report_tempfile() call need to happen when going through this path, for example?: > +/* > + * Destroy a shared BufFile early. Files are normally cleaned up > + * automatically when all participants detach, but it might be useful to > + * reclaim disk space sooner than that. The caller asserts that no backends > + * will attempt to read from this file again and that only one backend will > + * destroy it. > + */ > +void > +SharedBufFileDestroy(SharedBufFileSet *set, int partition, int participant) > +{ The theory with the unlink()'ing() function PathNameDelete(), I gather, is that it doesn't matter if it happens to be called more than once, say from a worker and then in an error handling path in the leader or whatever. Do I have that right? Obviously the concern I have about that is that any stat() call you might add for the benefit of a new pgstat_report_tempfile() call, needed to keep parity with fd.c, now has a risk of double counting in error paths, if I'm not mistaken. We do need to do that accounting in the event of error, just as we do when there is no error, at least if current stats collector behavior is to be preserved. How can you determine which duplicate call here is the duplicate? In other words, how can you figure out which one is not supposed to pgstat_report_tempfile()? If the size of temp files in each worker is unknowable to the implementation in error paths, does it not follow that it's unknowable to the user that queries pg_stat_database? Now, I don't imagine that this should stump you. Maybe I'm wrong about that possibility (that you cannot have exactly once unlink()/stat()/whatever), or maybe I'm right and you can fix it while preserving existing behavior, for example by relying on unlink() reliably failing when called a second time, no matter how tight any race was. What exact semantics does unlink() have with concurrency, as far as the link itself goes? If I'm not wrong about the general possibility, then maybe the existing behavior doesn't need to be preserved in error paths, which are after all exceptional -- it's not as if the statistics collector is currently highly reliable. It's not obvious that you are deliberately accepting of any of these risks or costs, though, which I think needs to be clearer, at a minimum. What trade-off are you making here? Unfortunately, that's about the only useful piece of feedback that I can think of right now -- be more explicit about what is permissible and not permissible in this area, and do something with pgstat_report_tempfile(). This is a bit like the unlink()-ENOENT/-to-terminate (ENOENT ignore) issue. There are no really hard questions here, but there certainly are some awkward questions. -- Peter Geoghegan
Hi, Here is a new patch series responding to feedback from Peter and Andres: 1. Support pgstat_report_tempfile and log_temp_files, which I had overlooked as Peter pointed out. 2. Use a patch format that is acceptable to git am, per complaint off-list from Andres. (Not actually made with git format-patch; I need to learn some more git-fu, but they now apply cleanly with git am). On Thu, Mar 23, 2017 at 12:55 PM, Peter Geoghegan <pg@bowt.ie> wrote: > I took a quick look at your V9 today. This is by no means a > comprehensive review of 0008-hj-shared-buf-file-v9.patch, but it's > what I can manage right now. Thanks. I really appreciate your patience with the resource management stuff I had failed to think through. > ... > > However, I notice that the place that this happens instead, > PathNameDelete(), does not repeat the fd.c step of doing a final > stat(), and using the stats for a pgstat_report_tempfile(). So, a new > pgstat_report_tempfile() call is simply missing. However, the more > fundamental issue in my mind is: How can you fix that? Where would it > go if you had it? You're right. I may be missing something here (again), but it does seem straightforward to implement because we always delete each file that really exists exactly once (and sometimes we also try to delete files that don't exist due to imprecise meta-data, but that isn't harmful and we know when that turns out to be the case). > If you do the obvious thing of just placing that before the new > unlink() within PathNameDelete(), on the theory that that needs parity > with the fd.c stuff, that has non-obvious implications. Does the > pgstat_report_tempfile() call need to happen when going through this > path, for example?: > >> +/* >> + * Destroy a shared BufFile early. Files are normally cleaned up >> + * automatically when all participants detach, but it might be useful to >> + * reclaim disk space sooner than that. The caller asserts that no backends >> + * will attempt to read from this file again and that only one backend will >> + * destroy it. >> + */ >> +void >> +SharedBufFileDestroy(SharedBufFileSet *set, int partition, int participant) >> +{ Yes, I think it should definitely go into PathNameDeleteTemporaryFile() (formerly PathNameDelete()). > The theory with the unlink()'ing() function PathNameDelete(), I > gather, is that it doesn't matter if it happens to be called more than > once, say from a worker and then in an error handling path in the > leader or whatever. Do I have that right? Yes, it may be called for a file that doesn't exist either because it never existed, or because it has already been deleted. To recap, there are two reasons it needs to tolerate attempts to delete files that aren't there: 1. To be able to delete the fd.c files backing a BufFile given only a BufFileTag. We don't know how many segment files there are, but we know how to build the prefix of the filename so we try to delete [prefix].0, [prefix].1, [prefix].2 ... until we get ENOENT and terminate. I think this sort of thing would be more questionable for durable storage backing a database object, but for temporary files I can't think of a problem with it. 2. SharedBufFileSet doesn't actually know how many partitions exist, it just knows the *range* of partition numbers (because of its conflicting fixed space and increasable partitions requirements). From that information it can loop building BufFileTags for all backing files that *might* exist, and in practice they usually do because we don't tend to have a 'sparse' range of partitions. The error handling path isn't a special case: whoever is the last to detach from the DSM segment will delete all the files, whether that results from an error or not. Now someone might call SharedBufFileDestroy() to delete files sooner, but that can't happen at the same time as a detach cleanup (the caller is still attached). As a small optimisation avoiding a bunch of pointless unlink syscalls, I shrink the SharedBufFileSet range if you happen to delete explicitly with a partition number at the extremities of the range, and it so happens that Parallel Hash Join explicitly deletes them in partition order as the join runs, so in practice the range is empty by the time SharedBufFileSet's cleanup runs and there is nothing to do, unless an error occurs. > Obviously the concern I have about that is that any stat() call you > might add for the benefit of a new pgstat_report_tempfile() call, > needed to keep parity with fd.c, now has a risk of double counting in > error paths, if I'm not mistaken. We do need to do that accounting in > the event of error, just as we do when there is no error, at least if > current stats collector behavior is to be preserved. How can you > determine which duplicate call here is the duplicate? In other words, > how can you figure out which one is not supposed to > pgstat_report_tempfile()? If the size of temp files in each worker is > unknowable to the implementation in error paths, does it not follow > that it's unknowable to the user that queries pg_stat_database? There is no double counting, if you only report after you successfully unlink (ie if you don't get ENOENT). In the attached patch I have refactored the reporting code into a small function, and I added a stat call to PathNameDeleteTemporaryFile() which differs from the FileClose() coding only in that it tolerates ENOENT. Now when I SET log_temp_files = 1 and then \i hj-test-queries.sql[1] I see temporary file log messages resulting from both private and shared temporary files being deleted: 2017-03-23 18:59:55.999 NZDT [30895] LOG: temporary file: path "base/pgsql_tmp/pgsql_tmp30895.203", size 920400 2017-03-23 18:59:55.999 NZDT [30895] STATEMENT: EXPLAIN ANALYZE SELECT COUNT(*) FROM simple r JOIN bigger_than_it_looks s USING (id); 2017-03-23 19:00:03.007 NZDT [30903] LOG: temporary file: path "base/pgsql_tmp/pgsql_tmp30895.8.1.0.0", size 9749868 2017-03-23 19:00:03.007 NZDT [30903] STATEMENT: EXPLAIN ANALYZE SELECT COUNT(*) FROM simple r JOIN awkwardly_skewed s USING (id); Am I missing something? > Now, I don't imagine that this should stump you. Maybe I'm wrong about > that possibility (that you cannot have exactly once > unlink()/stat()/whatever), or maybe I'm right and you can fix it while > preserving existing behavior, for example by relying on unlink() > reliably failing when called a second time, no matter how tight any > race was. What exact semantics does unlink() have with concurrency, as > far as the link itself goes? On Unixoid systems at least, concurrent unlink() for the same file must surely only succeed in one process and fail with ENOENT in any others, but there is no chance for this to happen anyway: SharedBufFileDestroy() is documented as only callable once for a given set of parameters (even though nothing bad would happen if you broke that rule AFAIK), and the code in the later patch that uses it adheres to that rule, and the SharedBufFileSet cleanup can only run when the last person detaches so there can't be a concurrent call to SharedBufFileDestroy(). > If I'm not wrong about the general possibility, then maybe the > existing behavior doesn't need to be preserved in error paths, which > are after all exceptional -- it's not as if the statistics collector > is currently highly reliable. It's not obvious that you are > deliberately accepting of any of these risks or costs, though, which I > think needs to be clearer, at a minimum. What trade-off are you making > here? There seems no reason not to make every effort to keep the stats collector and logs posted on these files just as we do with regular private temporary files, and it was pure oversight that I didn't. Thanks! > Unfortunately, that's about the only useful piece of feedback that I > can think of right now -- be more explicit about what is permissible > and not permissible in this area, and do something with > pgstat_report_tempfile(). This is a bit like the > unlink()-ENOENT/-to-terminate (ENOENT ignore) issue. There are no > really hard questions here, but there certainly are some awkward > questions. Much appreciated. [1] https://www.postgresql.org/message-id/CAEepm%3D2PRCtpo6UL4RxSbp%3DOXpyty0dg3oT3Vyk0eb%3Dr8JwZhg@mail.gmail.com -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Thu, Mar 23, 2017 at 12:35 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Thanks. I really appreciate your patience with the resource > management stuff I had failed to think through. It's a surprisingly difficult problem, that almost requires prototyping just to explain. No need to apologize. This is the process by which many hard problems end up being solved. >> However, I notice that the place that this happens instead, >> PathNameDelete(), does not repeat the fd.c step of doing a final >> stat(), and using the stats for a pgstat_report_tempfile(). So, a new >> pgstat_report_tempfile() call is simply missing. However, the more >> fundamental issue in my mind is: How can you fix that? Where would it >> go if you had it? > > You're right. I may be missing something here (again), but it does > seem straightforward to implement because we always delete each file > that really exists exactly once (and sometimes we also try to delete > files that don't exist due to imprecise meta-data, but that isn't > harmful and we know when that turns out to be the case). ISTM that your patch now shares a quality with parallel tuplesort: You may now hold files open after an unlink() of the original link/path that they were opened using. As Robert pointed out when discussing parallel tuplesort earlier in the week, that comes with the risk, however small, that the vFD cache will close() the file out from under us during LRU maintenance, resulting in a subsequent open() (at the tail-end of the vFD's lifetime) that fails unexpectedly. It's probably fine to assume that we can sanely close() the file ourselves in fd.c error paths despite a concurrent unlink(), since we never operate on the link itself, and there probably isn't much pressure on each backend's vFD cache. But, is that good enough? I can't say, though I suspect that this particular risk is one that's best avoided. I haven't tested out how much of a problem this might be for your patch, but I do know that resowner.c will call your shared mem segment callback before closing any backend local vFDs, so I can't imagine how it could be that this risk doesn't exist. FWIW, I briefly entertained the idea that we could pin a vFD for just a moment, ensuring that the real FD could not be close()'d out by vfdcache LRU maintenance, which would fix this problem for parallel tuplesort, I suppose. That may not be workable for PHJ, because PHJ would probably need to hold on to such a "pin" for much longer, owing to the lack of any explicit "handover" phase. -- Peter Geoghegan
On Sun, Mar 26, 2017 at 1:53 PM, Peter Geoghegan <pg@bowt.ie> wrote: > ISTM that your patch now shares a quality with parallel tuplesort: You > may now hold files open after an unlink() of the original link/path > that they were opened using. As Robert pointed out when discussing > parallel tuplesort earlier in the week, that comes with the risk, > however small, that the vFD cache will close() the file out from under > us during LRU maintenance, resulting in a subsequent open() (at the > tail-end of the vFD's lifetime) that fails unexpectedly. It's probably > fine to assume that we can sanely close() the file ourselves in fd.c > error paths despite a concurrent unlink(), since we never operate on > the link itself, and there probably isn't much pressure on each > backend's vFD cache. But, is that good enough? I can't say, though I > suspect that this particular risk is one that's best avoided. > > I haven't tested out how much of a problem this might be for your > patch, but I do know that resowner.c will call your shared mem segment > callback before closing any backend local vFDs, so I can't imagine how > it could be that this risk doesn't exist. I wouldn't have expected anything like that to be a problem, because FileClose() doesn't call FileAccess(). So IIUC it wouldn't ever try to reopen a kernel fd just to close it. But... what you said above must be a problem for Windows. I believe it doesn't allow files to be unlinked if they are open, and I see that DSM segments are cleaned up in resowner's phase == RESOURCE_RELEASE_BEFORE_LOCKS and files are closed in phase == RESOURCE_RELEASE_AFTER_LOCKS. Hmm. -- Thomas Munro http://www.enterprisedb.com
On Sat, Mar 25, 2017 at 7:56 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Sun, Mar 26, 2017 at 1:53 PM, Peter Geoghegan <pg@bowt.ie> wrote: >> ISTM that your patch now shares a quality with parallel tuplesort: You >> may now hold files open after an unlink() of the original link/path >> that they were opened using. As Robert pointed out when discussing >> parallel tuplesort earlier in the week, that comes with the risk, >> however small, that the vFD cache will close() the file out from under >> us during LRU maintenance, resulting in a subsequent open() (at the >> tail-end of the vFD's lifetime) that fails unexpectedly. It's probably >> fine to assume that we can sanely close() the file ourselves in fd.c >> error paths despite a concurrent unlink(), since we never operate on >> the link itself, and there probably isn't much pressure on each >> backend's vFD cache. But, is that good enough? I can't say, though I >> suspect that this particular risk is one that's best avoided. >> >> I haven't tested out how much of a problem this might be for your >> patch, but I do know that resowner.c will call your shared mem segment >> callback before closing any backend local vFDs, so I can't imagine how >> it could be that this risk doesn't exist. > > I wouldn't have expected anything like that to be a problem, because > FileClose() doesn't call FileAccess(). So IIUC it wouldn't ever try > to reopen a kernel fd just to close it. The concern is that something somewhere does. For example, mdread() calls FileRead(), which calls FileAccess(), ultimately because of some obscure catalog access. It's very hard to reason about things like that. -- Peter Geoghegan
Hi, SharedBufFile allows temporary files to be created by one backend and then exported for read-only access by other backends, with clean-up managed by reference counting associated with a DSM segment. This includes changes to fd.c and buffile.c to support new kinds of temporary file. diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 4ca0ea4..a509c05 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c I think the new facilities should be explained in the file's header. @@ -68,9 +71,10 @@ struct BufFile * avoid making redundant FileSeek calls. */ - bool isTemp; /* can only add files if this is TRUE */ + bool isSegmented; /* can only add files if this is TRUE */ That's a bit of a weird and uncommented upon change. @@ -79,6 +83,8 @@ struct BufFile */ ResourceOwner resowner; + BufFileTag tag; /* for discoverability between backends */ Not perfectly happy with the name tag here, the name is a bit too similar to BufferTag - something quite different. +static void +make_tagged_path(char *tempdirpath, char *tempfilepath, + const BufFileTag *tag, int segment) +{ + if (tag->tablespace == DEFAULTTABLESPACE_OID || + tag->tablespace == GLOBALTABLESPACE_OID) + snprintf(tempdirpath, MAXPGPATH, "base/%s", PG_TEMP_FILES_DIR); + else + { + snprintf(tempdirpath, MAXPGPATH, "pg_tblspc/%u/%s/%s", + tag->tablespace, TABLESPACE_VERSION_DIRECTORY, + PG_TEMP_FILES_DIR); + } + + snprintf(tempfilepath, MAXPGPATH, "%s/%s%d.%d.%d.%d.%d", tempdirpath, + PG_TEMP_FILE_PREFIX, + tag->creator_pid, tag->set, tag->partition, tag->participant, + segment); Is there a risk that this ends up running afoul of filename length limits on some platforms? +} + +static File +make_tagged_segment(const BufFileTag *tag, int segment) +{ + File file; + char tempdirpath[MAXPGPATH]; + char tempfilepath[MAXPGPATH]; + + /* + * There is a remote chance that disk files with this (pid, set) pair + * already exists after a crash-restart. Since the presence of + * consecutively numbered segment files is used by BufFileOpenShared to + * determine the total size of a shared BufFile, we'll defend against + * confusion by unlinking segment 1 (if it exists) before creating segment + * 0. + */ Gah. Why on earth aren't we removing temp files when restarting, not just on the initial start? That seems completely wrong? If we do decide not to change this: Why is that sufficient? Doesn't the same problem exist for segments later than the first? +/* + * Open a file that was previously created in another backend with + * BufFileCreateShared. + */ +BufFile * +BufFileOpenTagged(const BufFileTag *tag) +{ + BufFile *file = (BufFile *) palloc(sizeof(BufFile)); + char tempdirpath[MAXPGPATH]; + char tempfilepath[MAXPGPATH]; + Size capacity = 1024; + File *files = palloc(sizeof(File) * capacity); + int nfiles = 0; + + /* + * We don't know how many segments there are, so we'll probe the + * filesystem to find out. + */ + for (;;) + { + /* See if we need to expand our file space. */ + if (nfiles + 1 > capacity) + { + capacity *= 2; + files = repalloc(files, sizeof(File) * capacity); + } + /* Try to load a segment. */ + make_tagged_path(tempdirpath, tempfilepath, tag, nfiles); + files[nfiles] = PathNameOpenTemporaryFile(tempfilepath); + if (files[nfiles] <= 0) + break; Isn't 0 a theoretically valid return value from PathNameOpenTemporaryFile? +/* + * Delete a BufFile that was created by BufFileCreateTagged. Return true if + * at least one segment was deleted; false indicates that no segment was + * found, or an error occurred while trying to delete. Errors are logged but + * the function returns normally because this is assumed to run in a clean-up + * path that might already involve an error. + */ +bool +BufFileDeleteTagged(const BufFileTag *tag) +{ + char tempdirpath[MAXPGPATH]; + char tempfilepath[MAXPGPATH]; + int segment = 0; + bool found = false; + + /* + * We don't know if the BufFile really exists, because SharedBufFile + * tracks only the range of file numbers. If it does exists, we don't + * know many 1GB segments it has, so we'll delete until we hit ENOENT or + * an IO error. + */ + for (;;) + { + make_tagged_path(tempdirpath, tempfilepath, tag, segment); + if (!PathNameDeleteTemporaryFile(tempfilepath, false)) + break; + found = true; + ++segment; + } + + return found; +} If we crash in the middle of this, we'll leave the later files abanded, no? +/* + * BufFileSetReadOnly --- flush and make read-only, in preparation for sharing + */ +void +BufFileSetReadOnly(BufFile *file) +{ + BufFileFlush(file); + file->readOnly = true; +} That flag is unused, right? + * PathNameCreateTemporaryFile, PathNameOpenTemporaryFile and + * PathNameDeleteTemporaryFile are used for temporary files that may be shared + * between backends. A File created or opened with these functions is not + * automatically deleted when the file is closed, but it is automatically + * closed and end of transaction and counts agains the temporary file limit of + * the backend that created it. Any File created this way must be explicitly + * deleted with PathNameDeleteTemporaryFile. Automatic file deletion is not + * provided because this interface is designed for use by buffile.c and + * indirectly by sharedbuffile.c to implement temporary files with shared + * ownership and cleanup. Hm. Those name are pretty easy to misunderstand, no? s/Temp/Shared/?/* + * Called whenever a temporary file is deleted to report its size. + */ +static void +ReportTemporaryFileUsage(const char *path, off_t size) +{ + pgstat_report_tempfile(size); + + if (log_temp_files >= 0) + { + if ((size / 1024) >= log_temp_files) + ereport(LOG, + (errmsg("temporary file: path \"%s\", size %lu", + path, (unsigned long) size))); + } +} Man, the code for this sucks (not your fault). Shouldn't this properly be at the buffile.c level, where we could implement limits above 1GB properly? +/* + * Open a file that was created with PathNameCreateTemporaryFile in another + * backend. Files opened this way don't count agains the temp_file_limit of + * the caller, are read-only and are automatically closed at the end of the + * transaction but are not deleted on close. + */ This really reinforces my issues with the naming scheme. This ain't a normal tempfile. +File +PathNameOpenTemporaryFile(char *tempfilepath) +{ + File file; + + /* + * Open the file. Note: we don't use O_EXCL, in case there is an orphaned + * temp file that can be reused. + */ + file = PathNameOpenFile(tempfilepath, O_RDONLY | PG_BINARY, 0); If so, wouldn't we need to truncate the file? + * A single SharedBufFileSet can manage any number of 'tagged' BufFiles that + * are shared between a fixed number of participating backends. Each shared + * BufFile can be written to by a single participant but can be read by any + * backend after it has been 'exported'. Once a given BufFile is exported, it + * becomes read-only and cannot be extended. To create a new shared BufFile, + * a participant needs its own distinct participant number, and needs to + * specify an arbitrary partition number for the file. To make it available + * to other backends, it must be explicitly exported, which flushes internal + * buffers and renders it read-only. To open a file that has been shared, a + * backend needs to know the number of the participant that created the file, + * and the partition number. It is the responsibily of calling code to ensure + * that files are not accessed before they have been shared. Hm. One way to make this safer would be to rename files when exporting. Should be sufficient to do this to the first segment, I guess. + * Each file is identified by a partition number and a participant number, so + * that a SharedBufFileSet can be viewed as a 2D table of individual files. I think using "files" as a term here is a bit dangerous - they're individually segmented again, right? +/* + * The number of bytes of shared memory required to construct a + * SharedBufFileSet. + */ +Size +SharedBufFileSetSize(int participants) +{ + return offsetof(SharedBufFileSet, participants) + + sizeof(SharedBufFileParticipant) * participants; +} The function name sounds a bit like a function actuallize setting some size... s/Size/DetermineSize/? +/* + * Create a new file suitable for sharing. Each backend that calls this must + * use a distinct participant number. Behavior is undefined if a participant + * calls this more than once for the same partition number. Partitions should + * ideally be numbered consecutively or in as small a range as possible, + * because file cleanup will scan the range of known partitions looking for + * files. + */ Wonder if we shouldn't just create a directory for all such files. I'm a bit unhappy with the partition terminology around this. It's getting a bit confusing. We have partitions, participants and segements. Most of them could be understood for something entirely different than the meaning you have here... +static void +shared_buf_file_on_dsm_detach(dsm_segment *segment, Datum datum) +{ + bool unlink_files = false; + SharedBufFileSet *set = (SharedBufFileSet *) DatumGetPointer(datum); + + SpinLockAcquire(&set->mutex); + Assert(set->refcount > 0); + if (--set->refcount == 0) + unlink_files = true; + SpinLockRelease(&set->mutex); I'm a bit uncomfortable with releasing a refcount, and then still using the memory from the set... I don't think there's a concrete danger here as the code stands, but it's a fairly dangerous pattern. - Andres
On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: > Here is a new patch series responding to feedback from Peter and Andres: + +/* Per-participant shared state. */ +typedef struct SharedTuplestoreParticipant +{ + LWLock lock; Hm. No padding (ala LWLockMinimallyPadded / LWLockPadded) - but that's probably ok, for now. + bool error; /* Error occurred flag. */ + bool eof; /* End of file reached. */ + int read_fileno; /* BufFile segment file number. */ + off_t read_offset; /* Offset within segment file. */ Hm. I wonder if it'd not be better to work with 64bit offsets, and just separate that out upon segment access. +/* The main data structure in shared memory. */ "main data structure" isn't particularly meaningful. +struct SharedTuplestore +{ + int reading_partition; + int nparticipants; + int flags; Maybe add a comment saying /* flag bits from SHARED_TUPLESTORE_* */? + Size meta_data_size; What's this? + SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]; I'd add a comment here, that there's further data after participants. +}; + +/* Per-participant backend-private state. */ +struct SharedTuplestoreAccessor +{ Hm. The name and it being backend-local are a bit conflicting. + int participant; /* My partitipant number. */ + SharedTuplestore *sts; /* The shared state. */ + int nfiles; /* Size of local files array. */ + BufFile **files; /* Files we have open locally for writing. */ Shouldn't this mention that it's indexed by partition? + BufFile *read_file; /* The current file to read from. */ + int read_partition; /* The current partition to read from. */ + int read_participant; /* The current participant to read from. */ + int read_fileno; /* BufFile segment file number. */ + off_t read_offset; /* Offset within segment file. */ +}; +/* + * Initialize a SharedTuplestore in existing shared memory. There must be + * space for sts_size(participants) bytes. If flags is set to the value + * SHARED_TUPLESTORE_SINGLE_PASS then each partition may only be read once, + * because underlying files will be deleted. Any reason not to use flags that are compatible with tuplestore.c? + * Tuples that are stored may optionally carry a piece of fixed sized + * meta-data which will be retrieved along with the tuple. This is useful for + * the hash codes used for multi-batch hash joins, but could have other + * applications. + */ +SharedTuplestoreAccessor * +sts_initialize(SharedTuplestore *sts, int participants, + int my_participant_number, + Size meta_data_size, + int flags, + dsm_segment *segment) +{ Not sure I like that the naming here has little in common with tuplestore.h's api. + +MinimalTuple +sts_gettuple(SharedTuplestoreAccessor *accessor, void *meta_data) +{ This needs docs. + SharedBufFileSet *fileset = GetSharedBufFileSet(accessor->sts); + MinimalTuple tuple = NULL; + + for (;;) + { ... + /* Check if this participant's file has already been entirely read. */ + if (participant->eof) + { + BufFileClose(accessor->read_file); + accessor->read_file = NULL; + LWLockRelease(&participant->lock); + continue; Why are we closing the file while holding the lock? + + /* Read the optional meta-data. */ + eof = false; + if (accessor->sts->meta_data_size > 0) + { + nread = BufFileRead(accessor->read_file, meta_data, + accessor->sts->meta_data_size); + if (nread == 0) + eof = true; + else if (nread != accessor->sts->meta_data_size) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from temporary file: %m"))); + } + + /* Read the size. */ + if (!eof) + { + nread = BufFileRead(accessor->read_file, &tuple_size, sizeof(tuple_size)); + if (nread == 0) + eof = true; Why is it legal to have EOF here, if metadata previously didn't have an EOF? Perhaps add an error if accessor->sts->meta_data_size != 0? + if (eof) + { + participant->eof = true; + if ((accessor->sts->flags & SHARED_TUPLESTORE_SINGLE_PASS) != 0) + SharedBufFileDestroy(fileset, accessor->read_partition, + accessor->read_participant); + + participant->error = false; + LWLockRelease(&participant->lock); + + /* Move to next participant's file. */ + BufFileClose(accessor->read_file); + accessor->read_file = NULL; + continue; + } + + /* Read the tuple. */ + tuple = (MinimalTuple) palloc(tuple_size); + tuple->t_len = tuple_size; Hm. Constantly re-allocing this doesn't strike me as a good idea (not to mention that the API doesn't mention this is newly allocated). Seems like it'd be a better idea to have a per-accessor buffer where this can be stored in - increased in size when necessary. - Andres
On Mon, Mar 27, 2017 at 9:41 AM, Andres Freund <andres@anarazel.de> wrote: > Hi, > > > SharedBufFile allows temporary files to be created by one backend and > then exported for read-only access by other backends, with clean-up > managed by reference counting associated with a DSM segment. This includes > changes to fd.c and buffile.c to support new kinds of temporary file. > > > diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c > index 4ca0ea4..a509c05 100644 > --- a/src/backend/storage/file/buffile.c > +++ b/src/backend/storage/file/buffile.c > > I think the new facilities should be explained in the file's header. Will do. > @@ -68,9 +71,10 @@ struct BufFile > * avoid making redundant FileSeek calls. > */ > > - bool isTemp; /* can only add files if this is TRUE */ > + bool isSegmented; /* can only add files if this is TRUE */ > > That's a bit of a weird and uncommented upon change. I was trying to cut down on the number of places we use the word 'temporary' to activate various different behaviours. In this case, the only thing it controls is whether the BufFile is backed by one single fd.c File or many segments, so I figured it should be renamed. As Peter and you have pointed out, there may be a case for removing it altogether. > @@ -79,6 +83,8 @@ struct BufFile > */ > ResourceOwner resowner; > > + BufFileTag tag; /* for discoverability between backends */ > > Not perfectly happy with the name tag here, the name is a bit too > similar to BufferTag - something quite different. Yeah, will rename. > +static void > +make_tagged_path(char *tempdirpath, char *tempfilepath, > + const BufFileTag *tag, int segment) > +{ > + if (tag->tablespace == DEFAULTTABLESPACE_OID || > + tag->tablespace == GLOBALTABLESPACE_OID) > + snprintf(tempdirpath, MAXPGPATH, "base/%s", PG_TEMP_FILES_DIR); > + else > + { > + snprintf(tempdirpath, MAXPGPATH, "pg_tblspc/%u/%s/%s", > + tag->tablespace, TABLESPACE_VERSION_DIRECTORY, > + PG_TEMP_FILES_DIR); > + } > + > + snprintf(tempfilepath, MAXPGPATH, "%s/%s%d.%d.%d.%d.%d", tempdirpath, > + PG_TEMP_FILE_PREFIX, > + tag->creator_pid, tag->set, tag->partition, tag->participant, > + segment); > > Is there a risk that this ends up running afoul of filename length > limits on some platforms? Hmm. I didn't think so. Do we have a project guideline on maximum path lengths based on some kind of survey? There are various limits involved (filesystem and OS per-path-component limits, total limits, and the confusing PATH_MAX, MAX_PATH etc macros), but I was under the impression that these numbers were always at least 255. This scheme seems capable of producing ~50 bytes in the final component (admittedly more if int is 64 bits), and then nowhere near enough to reach a limit of that order in the earlier components. > +} > + > +static File > +make_tagged_segment(const BufFileTag *tag, int segment) > +{ > + File file; > + char tempdirpath[MAXPGPATH]; > + char tempfilepath[MAXPGPATH]; > + > + /* > + * There is a remote chance that disk files with this (pid, set) pair > + * already exists after a crash-restart. Since the presence of > + * consecutively numbered segment files is used by BufFileOpenShared to > + * determine the total size of a shared BufFile, we'll defend against > + * confusion by unlinking segment 1 (if it exists) before creating segment > + * 0. > + */ > > Gah. Why on earth aren't we removing temp files when restarting, not > just on the initial start? That seems completely wrong? See the comment above RemovePgTempFiles in fd.c. From comments on this list I understand that this is a subject that Robert and Tom don't agree on. I don't mind either way, but as long as RemovePgTempFiles works that way and my patch uses the existence of files to know how many files there are, I have to defend against that danger by making sure that I don't accidentally identify files from before a crash/restart as active. > If we do decide not to change this: Why is that sufficient? Doesn't the > same problem exist for segments later than the first? It does exist and it is handled. The comment really should say "unlinking segment N + 1 (if it exists) before creating segment N". Will update. > +/* > + * Open a file that was previously created in another backend with > + * BufFileCreateShared. > + */ > +BufFile * > +BufFileOpenTagged(const BufFileTag *tag) > +{ > + BufFile *file = (BufFile *) palloc(sizeof(BufFile)); > + char tempdirpath[MAXPGPATH]; > + char tempfilepath[MAXPGPATH]; > + Size capacity = 1024; > + File *files = palloc(sizeof(File) * capacity); > + int nfiles = 0; > + > + /* > + * We don't know how many segments there are, so we'll probe the > + * filesystem to find out. > + */ > + for (;;) > + { > + /* See if we need to expand our file space. */ > + if (nfiles + 1 > capacity) > + { > + capacity *= 2; > + files = repalloc(files, sizeof(File) * capacity); > + } > + /* Try to load a segment. */ > + make_tagged_path(tempdirpath, tempfilepath, tag, nfiles); > + files[nfiles] = PathNameOpenTemporaryFile(tempfilepath); > + if (files[nfiles] <= 0) > + break; > > Isn't 0 a theoretically valid return value from > PathNameOpenTemporaryFile? I was confused by that too, because it isn't the way normal OS fds work. But existing code dealing with Postgres vfd return values treats 0 as an error. See for example OpenTemporaryFile and OpenTemporaryFileInTablespace. > +/* > + * Delete a BufFile that was created by BufFileCreateTagged. Return true if > + * at least one segment was deleted; false indicates that no segment was > + * found, or an error occurred while trying to delete. Errors are logged but > + * the function returns normally because this is assumed to run in a clean-up > + * path that might already involve an error. > + */ > +bool > +BufFileDeleteTagged(const BufFileTag *tag) > +{ > + char tempdirpath[MAXPGPATH]; > + char tempfilepath[MAXPGPATH]; > + int segment = 0; > + bool found = false; > + > + /* > + * We don't know if the BufFile really exists, because SharedBufFile > + * tracks only the range of file numbers. If it does exists, we don't > + * know many 1GB segments it has, so we'll delete until we hit ENOENT or > + * an IO error. > + */ > + for (;;) > + { > + make_tagged_path(tempdirpath, tempfilepath, tag, segment); > + if (!PathNameDeleteTemporaryFile(tempfilepath, false)) > + break; > + found = true; > + ++segment; > + } > + > + return found; > +} > > If we crash in the middle of this, we'll leave the later files abanded, > no? Yes. In general, there are places we can crash or unplug the server etc and leave files behind. In that case, RemovePgTempFiles cleans up (or declines to do so deliberately to support debugging, as discussed). > +/* > + * BufFileSetReadOnly --- flush and make read-only, in preparation for sharing > + */ > +void > +BufFileSetReadOnly(BufFile *file) > +{ > + BufFileFlush(file); > + file->readOnly = true; > +} > > That flag is unused, right? It's used for an assertion in BufFileWrite. Maybe could be elog(ERROR, ...) instead, but either way it's a debugging aid to report misuse. > + * PathNameCreateTemporaryFile, PathNameOpenTemporaryFile and > + * PathNameDeleteTemporaryFile are used for temporary files that may be shared > + * between backends. A File created or opened with these functions is not > + * automatically deleted when the file is closed, but it is automatically > + * closed and end of transaction and counts agains the temporary file limit of > + * the backend that created it. Any File created this way must be explicitly > + * deleted with PathNameDeleteTemporaryFile. Automatic file deletion is not > + * provided because this interface is designed for use by buffile.c and > + * indirectly by sharedbuffile.c to implement temporary files with shared > + * ownership and cleanup. > > Hm. Those name are pretty easy to misunderstand, no? s/Temp/Shared/? Hmm. Yeah these may be better. Will think about that. > /* > + * Called whenever a temporary file is deleted to report its size. > + */ > +static void > +ReportTemporaryFileUsage(const char *path, off_t size) > +{ > + pgstat_report_tempfile(size); > + > + if (log_temp_files >= 0) > + { > + if ((size / 1024) >= log_temp_files) > + ereport(LOG, > + (errmsg("temporary file: path \"%s\", size %lu", > + path, (unsigned long) size))); > + } > +} > > Man, the code for this sucks (not your fault). Shouldn't this properly > be at the buffile.c level, where we could implement limits above 1GB > properly? +1 > +/* > + * Open a file that was created with PathNameCreateTemporaryFile in another > + * backend. Files opened this way don't count agains the temp_file_limit of > + * the caller, are read-only and are automatically closed at the end of the > + * transaction but are not deleted on close. > + */ > > This really reinforces my issues with the naming scheme. This ain't a > normal tempfile. It sort of makes sense if you consider that a 'named' temporary file is different... but yeah, point taken. > +File > +PathNameOpenTemporaryFile(char *tempfilepath) > +{ > + File file; > + > + /* > + * Open the file. Note: we don't use O_EXCL, in case there is an orphaned > + * temp file that can be reused. > + */ > + file = PathNameOpenFile(tempfilepath, O_RDONLY | PG_BINARY, 0); > > If so, wouldn't we need to truncate the file? Yes, this lacks O_TRUNC. Thanks. > + * A single SharedBufFileSet can manage any number of 'tagged' BufFiles that > + * are shared between a fixed number of participating backends. Each shared > + * BufFile can be written to by a single participant but can be read by any > + * backend after it has been 'exported'. Once a given BufFile is exported, it > + * becomes read-only and cannot be extended. To create a new shared BufFile, > + * a participant needs its own distinct participant number, and needs to > + * specify an arbitrary partition number for the file. To make it available > + * to other backends, it must be explicitly exported, which flushes internal > + * buffers and renders it read-only. To open a file that has been shared, a > + * backend needs to know the number of the participant that created the file, > + * and the partition number. It is the responsibily of calling code to ensure > + * that files are not accessed before they have been shared. > > Hm. One way to make this safer would be to rename files when exporting. > Should be sufficient to do this to the first segment, I guess. Interesting idea. Will think about that. That comment isn't great and repeats itself. Will improve. > + * Each file is identified by a partition number and a participant number, so > + * that a SharedBufFileSet can be viewed as a 2D table of individual files. > > I think using "files" as a term here is a bit dangerous - they're > individually segmented again, right? True. It's a 2D matrix of BufFiles. The word "file" is super overloaded here. Will fix. > +/* > + * The number of bytes of shared memory required to construct a > + * SharedBufFileSet. > + */ > +Size > +SharedBufFileSetSize(int participants) > +{ > + return offsetof(SharedBufFileSet, participants) + > + sizeof(SharedBufFileParticipant) * participants; > +} > > The function name sounds a bit like a function actuallize setting some > size... s/Size/DetermineSize/? Hmm yeah "set" as verb vs "set" as noun. I think "estimate" is the established word for this sort of thing (even though that seems strange because it sounds like it doesn't have to be exactly right: clearly in all these shmem-space-reservation functions it has to be exactly right). Will change. > > +/* > + * Create a new file suitable for sharing. Each backend that calls this must > + * use a distinct participant number. Behavior is undefined if a participant > + * calls this more than once for the same partition number. Partitions should > + * ideally be numbered consecutively or in as small a range as possible, > + * because file cleanup will scan the range of known partitions looking for > + * files. > + */ > > Wonder if we shouldn't just create a directory for all such files. Hmm. Yes, that could work well. Will try that. > I'm a bit unhappy with the partition terminology around this. It's > getting a bit confusing. We have partitions, participants and > segements. Most of them could be understood for something entirely > different than the meaning you have here... Ok. Let me try to explain and defend them and see if we can come up with something better. 1. Segments are what buffile.c already calls the individual capped-at-1GB files that it manages. They are an implementation detail that is not part of buffile.c's user interface. There seems to be no reason to change that. My understanding is that this was done to support pre-large-file filesystems/OSs which limited files to 2^31 or 2^32 bytes, and we decided to cap the segments at 1GB (maybe some ancient OS had a 2^30 limit, or maybe it was just a conservative choice that's easy for humans to think about). We could perhaps get rid of that entirely today without anyone complaining and just use one big file, though don't know that and I'm not suggesting it. (One argument against that is that the parallel CREATE INDEX patch actually makes use of the segmented nature of BufFiles to splice them together, to 'unify' a bunch of worker LogicalTapeSets to create one LogicalTapeSet. That's off topic here but it's in the back of my mind as a potential client of this code. I'll have more to say about that over on the parallel CREATE INDEX thread shortly.) 2. Partitions here = 'batches'. The 'batches' used by the hash join code are formally partitions in all the literature on hash joins, and I bet that anyone else doing parallel work that involves sharing temporary disk files will run into the need for partitioning. I think you are complaining that we now have a database object called a PARTITION, and that may be a problem because we're overloading the term. But it's the same name because it's mathematically the same thing. We don't complain about the existence of 'lock tables' or 'hash tables' just because there is a database object called a TABLE. I considered other names for this, like "file number", but it was confusing and vague. I keep coming back to "partition" for this, because fundamentally this is for partitioning temporary data. I could maybe call it "file_partition" or something? 3. Participants are what I have taken to calling the processes involved in parallel query, when I mean the larger set that includes workers + leader. It may seem a little odd that such a thing appears in an API that deals with temporary files. But the basic idea here is that each participant gets to write out its own partial results, for each partition. Stepping back a bit, that means that there are two kinds of partitioning going on at the same time. Partitioning the keyspace into batch numbers, and then the arbitrary partitioning that comes from each participant processing partial plans. This is how SharedBufFileSet finishes up managing a 2D matrix of BufFiles. You might argue that buffile.c shouldn't know about partitions and participants. The only thing I really need here is for BufFileTag (to be renamed) to be able to name things differently. Perhaps it should just include a char[] buffer for a name fragment, and the SharedBufFileSet should encode the partition and participant numbers into it, rather than exposing these rather higher level concepts to buffile.c. I will think about that. (Perhaps SharedBufFileSet should be called PartitionedBufFileSet?) > +static void > +shared_buf_file_on_dsm_detach(dsm_segment *segment, Datum datum) > +{ > + bool unlink_files = false; > + SharedBufFileSet *set = (SharedBufFileSet *) DatumGetPointer(datum); > + > + SpinLockAcquire(&set->mutex); > + Assert(set->refcount > 0); > + if (--set->refcount == 0) > + unlink_files = true; > + SpinLockRelease(&set->mutex); > > I'm a bit uncomfortable with releasing a refcount, and then still using > the memory from the set... I don't think there's a concrete danger > here as the code stands, but it's a fairly dangerous pattern. Will fix. -- Thomas Munro http://www.enterprisedb.com
On Mon, Mar 27, 2017 at 11:03 AM, Thomas Munro <thomas.munro@enterprisedb.com> >> Is there a risk that this ends up running afoul of filename length >> limits on some platforms? > > Hmm. I didn't think so. Do we have a project guideline on maximum > path lengths based on some kind of survey? There are various limits > involved (filesystem and OS per-path-component limits, total limits, > and the confusing PATH_MAX, MAX_PATH etc macros), but I was under the > impression that these numbers were always at least 255. This scheme > seems capable of producing ~50 bytes in the final component > (admittedly more if int is 64 bits), and then nowhere near enough to > reach a limit of that order in the earlier components. Err, plus prefix. Still seems unlikely to be too long. >> I'm a bit unhappy with the partition terminology around this. It's >> getting a bit confusing. We have partitions, participants and >> segements. Most of them could be understood for something entirely >> different than the meaning you have here... > > Ok. Let me try to explain and defend them and see if we can come up > with something better. > > 1. Segments are what buffile.c already calls the individual > capped-at-1GB files that it manages. They are an implementation > detail that is not part of buffile.c's user interface. There seems to > be no reason to change that. After reading your next email I realised this is not quite true: BufFileTell and BufFileSeek expose the existence of segments. -- Thomas Munro http://www.enterprisedb.com
On Sun, Mar 26, 2017 at 3:41 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: >> 1. Segments are what buffile.c already calls the individual >> capped-at-1GB files that it manages. They are an implementation >> detail that is not part of buffile.c's user interface. There seems to >> be no reason to change that. > > After reading your next email I realised this is not quite true: > BufFileTell and BufFileSeek expose the existence of segments. Yeah, that's something that tuplestore.c itself relies on. I always thought that the main reason practical why we have BufFile multiplex 1GB segments concerns use of temp_tablespaces, rather than considerations that matter only when using obsolete file systems: /** We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.* The reason is that we'd like large temporaryBufFiles to be spread across* multiple tablespaces when available.*/ Now, I tend to think that most installations that care about performance would be better off using RAID to stripe their one temp tablespace file system. But, I suppose this still makes sense when you have a number of file systems that happen to be available, and disk capacity is the main concern. PHJ uses one temp tablespace per worker, which I further suppose might not be as effective in balancing disk space usage. -- Peter Geoghegan
On Mon, Mar 27, 2017 at 12:12 PM, Peter Geoghegan <pg@bowt.ie> wrote: > On Sun, Mar 26, 2017 at 3:41 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >>> 1. Segments are what buffile.c already calls the individual >>> capped-at-1GB files that it manages. They are an implementation >>> detail that is not part of buffile.c's user interface. There seems to >>> be no reason to change that. >> >> After reading your next email I realised this is not quite true: >> BufFileTell and BufFileSeek expose the existence of segments. > > Yeah, that's something that tuplestore.c itself relies on. > > I always thought that the main reason practical why we have BufFile > multiplex 1GB segments concerns use of temp_tablespaces, rather than > considerations that matter only when using obsolete file systems: > > /* > * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE. > * The reason is that we'd like large temporary BufFiles to be spread across > * multiple tablespaces when available. > */ > > Now, I tend to think that most installations that care about > performance would be better off using RAID to stripe their one temp > tablespace file system. But, I suppose this still makes sense when you > have a number of file systems that happen to be available, and disk > capacity is the main concern. PHJ uses one temp tablespace per worker, > which I further suppose might not be as effective in balancing disk > space usage. I was thinking about IO bandwidth balance rather than size. If you rotate through tablespaces segment-by-segment, won't you be exposed to phasing effects that could leave disk arrays idle for periods of time?Whereas if you assign them to participants, you canonly get idle arrays if you have fewer participants than tablespaces. This seems like a fairly complex subtopic and I don't have a strong view on it. Clearly you could rotate through tablespaces on the basis of participant, partition, segment, some combination, or something else. Doing it by participant seemed to me to be the least prone to IO imbalance cause by phasing effects (= segment based) or data distribution (= partition based), of the options I considered when I wrote it that way. Like you, I also tend to suspect that people would be more likely to use RAID type technologies to stripe things like this for both bandwidth and space reasons these days. Tablespaces seem to make more sense as a way of separating different classes of storage (fast/expensive, slow/cheap etc), not as an IO or space striping technique. I may be way off base there though... -- Thomas Munro http://www.enterprisedb.com
On Sun, Mar 26, 2017 at 6:50 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Like you, I also tend to suspect that people would be more likely to > use RAID type technologies to stripe things like this for both > bandwidth and space reasons these days. Tablespaces seem to make more > sense as a way of separating different classes of storage > (fast/expensive, slow/cheap etc), not as an IO or space striping > technique. I agree. -- Peter Geoghegan
On Sun, Mar 26, 2017 at 3:56 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > But... what you said above must be a problem for Windows. I believe > it doesn't allow files to be unlinked if they are open, and I see that > DSM segments are cleaned up in resowner's phase == > RESOURCE_RELEASE_BEFORE_LOCKS and files are closed in phase == > RESOURCE_RELEASE_AFTER_LOCKS. I thought this last point about Windows might be fatal to my design, but it seems that Windows since at least version 2000 has support for Unixoid unlinkability via the special flag FILE_SHARE_DELETE. -- Thomas Munro http://www.enterprisedb.com
On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: > Here is a new patch series responding to feedback from Peter and Andres: Here's a review of 0007 & 0010 together - they're going to have to be applied together anyway... diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index ac339fb566..775c9126c7 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3814,6 +3814,21 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" </listitem> </varlistentry> + <varlistentry id="guc-cpu-shared-tuple-cost" xreflabel="cpu_shared_tuple_cost"> + <term><varname>cpu_shared_tuple_cost</varname> (<type>floating point</type>) + <indexterm> + <primary><varname>cpu_shared_tuple_cost</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the planner's estimate of the cost of sharing rows in + memory during a parallel query. + The default is 0.001. + </para> + </listitem> + </varlistentry> + Isn't that really low in comparison to the other costs? I think specifying a bit more what this actually measures would be good too - is it putting the tuple in shared memory? Is it accessing it? + <varlistentry id="guc-cpu-synchronization-cost" xreflabel="cpu_synchronization_cost"> + <term><varname>cpu_synchronization_cost</varname> (<type>floating point</type>) + <indexterm> + <primary><varname>cpu_synchronization_cost</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the planner's estimate of the cost of waiting at synchronization + points for other processes while executing parallel queries. + The default is 1.0. + </para> + </listitem> + </varlistentry> Isn't this also really cheap in comparison to a, probably cached, seq page read? + if (HashJoinTableIsShared(hashtable)) + { + /* + * Synchronize parallel hash table builds. At this stage we know that + * the shared hash table has been created, but we don't know if our + * peers are still in MultiExecHash and if so how far through. We use + * the phase to synchronize with them. + */ + barrier = &hashtable->shared->barrier; + + switch (BarrierPhase(barrier)) + { + case PHJ_PHASE_BEGINNING: Note pgindent will indent this further. Might be worthwhile to try to pgindent the file, revert some of the unintended damage. /* * set expression context */ I'd still like this to be moved to the start. @@ -126,17 +202,79 @@ MultiExecHash(HashState *node) /* Not subject to skew optimization, so insert normally*/ ExecHashTableInsert(hashtable, slot, hashvalue); } - hashtable->totalTuples += 1; + hashtable->partialTuples += 1; + if (!HashJoinTableIsShared(hashtable)) + hashtable->totalTuples += 1; } } FWIW, I'd put HashJoinTableIsShared() into a local var - the compiler won't be able to do that on its own because external function calls could invalidate the result. That brings me to a related topic: Have you measured whether your changes cause performance differences? + finish_loading(hashtable); I find the sudden switch to a different naming scheme in the same file a bit jarring. + if (HashJoinTableIsShared(hashtable)) + BarrierDetach(&hashtable->shared->shrink_barrier); + + if (HashJoinTableIsShared(hashtable)) + { Consecutive if blocks with the same condition... + bool elected_to_resize; + + /* + * Wait for all backends to finish building. If only one worker is + * running the building phase because of a non-partial inner plan, the + * other workers will pile up here waiting. If multiple worker are + * building, they should finish close to each other in time. + */ That comment is outdated, isn't it? /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ - if (hashtable->nbuckets != hashtable->nbuckets_optimal) - ExecHashIncreaseNumBuckets(hashtable); + ExecHashUpdate(hashtable); + ExecHashIncreaseNumBuckets(hashtable); So this now doesn't actually increase the number of buckets anymore. + reinsert: + /* If the table was resized, insert tuples into the new buckets. */ + ExecHashUpdate(hashtable); + ExecHashReinsertAll(hashtable); ReinsertAll just happens to do nothing if we didn't have to resize... Not entirely obvious, sure reads as if it were unconditional. Also, it's not actually "All" when batching is in use, no? + post_resize: + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING); + BarrierWait(barrier, WAIT_EVENT_HASH_RESIZING); + Assert(BarrierPhase(barrier) == PHJ_PHASE_REINSERTING); + } + + reinsert: + /* If the table was resized, insert tuples into the new buckets. */ + ExecHashUpdate(hashtable); + ExecHashReinsertAll(hashtable); Hm. So even non-resizing backends reach this - but they happen to not do anything because there's no work queued up, right? That's, uh, not obvious. For me the code here would be a good bit easier to read if we had a MultiExecHash and MultiExecParallelHash. Half of MultiExecHash is just if(IsShared) blocks, and copying would avoid potential slowdowns. + /* + * Set up for skew optimization, if possible and there's a need for + * more than one batch. (In a one-batch join, there's no point in + * it.) + */ + if (nbatch > 1) + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); So there's no equivalent to the skew optimization for parallel query yet... It doesn't sound like that should be particulalry hard on first blush? static void -ExecHashIncreaseNumBatches(HashJoinTable hashtable) +ExecHashIncreaseNumBatches(HashJoinTable hashtable, int nbatch) So this doesn't actually increase the number of batches anymore... At the very least this should mention that the main work is done in ExecHashShrink. +/* + * Process the queue of chunks whose tuples need to be redistributed into the + * correct batches until it is empty. In the best case this will shrink the + * hash table, keeping about half of the tuples in memory and sending the rest + * to a future batch. + */ +static void +ExecHashShrink(HashJoinTable hashtable) Should mention this really only is meaningful after ExecHashIncreaseNumBatches has run. +{ + long ninmemory; + long nfreed; + dsa_pointer chunk_shared; + HashMemoryChunk chunk; - /* If know we need to resize nbuckets, we can do it while rebatching. */ - if (hashtable->nbuckets_optimal != hashtable->nbuckets) + if (HashJoinTableIsShared(hashtable)) { - /* we never decrease the number of buckets */ - Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + /* + * Since a newly launched participant could arrive while shrinking is + * already underway, we need to be able to jump to the correct place + * in this function. + */ + switch (PHJ_SHRINK_PHASE(BarrierPhase(&hashtable->shared->shrink_barrier))) + { + case PHJ_SHRINK_PHASE_BEGINNING: /* likely case */ + break; + case PHJ_SHRINK_PHASE_CLEARING: + goto clearing; + case PHJ_SHRINK_PHASE_WORKING: + goto working; + case PHJ_SHRINK_PHASE_DECIDING: + goto deciding; + } Hm, so we jump into different nesting levels here :/ ok, ENOTIME for today... diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index f2c885afbe..87d8f3766e 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -6,10 +6,78 @@ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c) 1994,Regents of the University of California * - * * IDENTIFICATION * src/backend/executor/nodeHashjoin.c * + * NOTES: + * + * PARALLELISM + * + * Hash joins can participate in parallel queries in two ways: in + * non-parallel-aware mode, where each backend builds an identical hash table + * and then probes it with a partial outer relation, or parallel-aware mode + * where there is a shared hash table that all participants help to build. A + * parallel-aware hash join can save time and space by dividing the work up + * and sharing the result, but has extra communication overheads. There's a third, right? The hashjoin, and everything below it, could also not be parallel, but above it could be some parallel aware node (e.g. a parallel aware HJ). + * In both cases, hash joins use a private state machine to track progress + * through the hash join algorithm. That's not really parallel specific, right? Perhaps just say that parallel HJs use the normal state machine? + * In a parallel-aware hash join, there is also a shared 'phase' which + * co-operating backends use to synchronize their local state machine and + * program counter with the multi-process join. The phase is managed by a + * 'barrier' IPC primitive. Hm. I wonder if 'phase' shouldn't just be name sharedHashJoinState. Might be a bit easier to understand than a different terminology. + * The phases are as follows: + * + * PHJ_PHASE_BEGINNING -- initial phase, before any participant acts + * PHJ_PHASE_CREATING -- one participant creates the shmem hash table + * PHJ_PHASE_BUILDING -- all participants build the hash table + * PHJ_PHASE_RESIZING -- one participant decides whether to expand buckets + * PHJ_PHASE_REINSERTING -- all participants reinsert tuples if necessary + * PHJ_PHASE_PROBING -- all participants probe the hash table + * PHJ_PHASE_UNMATCHED -- all participants scan for unmatched tuples I think somewhere here - and probably around the sites it's happening - should mention that state transitions are done kinda implicitly via BarrierWait progressing to the numerically next phase. That's not entirely obvious (and actually limits what the barrier mechanism can be used for...). - Andres
On Mon, Mar 27, 2017 at 12:20 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > > On Sun, Mar 26, 2017 at 3:56 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: > > But... what you said above must be a problem for Windows. I believe > > it doesn't allow files to be unlinked if they are open, and I see that > > DSM segments are cleaned up in resowner's phase == > > RESOURCE_RELEASE_BEFORE_LOCKS and files are closed in phase == > > RESOURCE_RELEASE_AFTER_LOCKS. > > I thought this last point about Windows might be fatal to my design, > but it seems that Windows since at least version 2000 has support for > Unixoid unlinkability via the special flag FILE_SHARE_DELETE. On testing v10 of this patch over commit b54aad8e34bd6299093e965c50f4a23da96d7cc3 and applying the tweak mentioned in [1], for TPC-H queries I found the results quite encouraging, Experimental setup: TPC-H scale factor - 20 work_mem = 1GB shared_buffers = 10GB effective_cache_size = 10GB random_page_cost = seq_page_cost = 0.1 max_parallel_workers_per_gather = 4 Performance numbers: (Time in seconds) Query | Head | Patch | ------------------------------- Q3 | 73 | 37 | Q5 | 56 | 31 | Q7 | 40 | 30 | Q8 | 8 | 8 | Q9 | 85 | 42 | Q10 | 86 | 46 | Q14 | 11 | 6 | Q16 | 32 | 11 | Q21 | 53 | 56 | Please find the attached file for the explain analyse output of these queries on head as well as patch. Would be working on analysing the performance of this patch on 300 scale factor. [1] https://www.postgresql.org/message-id/flat/CAEepm%3D270ze2hVxWkJw-5eKzc3AB4C9KpH3L2kih75R5pdSogg%40mail.gmail.com -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/
Attachment
Hi Thomas, On 3/28/17 1:41 AM, Rafia Sabih wrote: > On Mon, Mar 27, 2017 at 12:20 PM, Thomas Munro >> >> I thought this last point about Windows might be fatal to my design, >> but it seems that Windows since at least version 2000 has support for >> Unixoid unlinkability via the special flag FILE_SHARE_DELETE. <...> > > Please find the attached file for the explain analyse output of these > queries on head as well as patch. > Would be working on analysing the performance of this patch on 300 scale factor. I have marked this submission "Waiting for Author". A new patch is needed to address Andres' comments and you should have a look at Rafia's results. Thanks, -- -David david@pgmasters.net
Hi, On 2017-03-27 22:33:03 -0700, Andres Freund wrote: > On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: > > Here is a new patch series responding to feedback from Peter and Andres: > > Here's a review of 0007 & 0010 together - they're going to have to be > applied together anyway... > ... > ok, ENOTIME for today... Continuing, where I dropped of tiredly yesterday. - ExecHashJoinSaveTuple(tuple, - hashvalue, - &hashtable->innerBatchFile[batchno]); + if (HashJoinTableIsShared(hashtable)) + sts_puttuple(hashtable->shared_inner_batches, batchno, &hashvalue, + tuple); + else + ExecHashJoinSaveTuple(tuple, + hashvalue, + &hashtable->innerBatchFile[batchno]); }} Why isn't this done inside of ExecHashJoinSaveTuple? @@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable) + /* Rewind the shared read heads for this batch, inner and outer. */ + sts_prepare_parallel_read(hashtable->shared_inner_batches, + curbatch); + sts_prepare_parallel_read(hashtable->shared_outer_batches, + curbatch); It feels somewhat wrong to do this in here, rather than on the callsites. + } + + /* + * Each participant needs to make sure that data it has written for + * this partition is now read-only and visible to other participants. + */ + sts_end_write(hashtable->shared_inner_batches, curbatch); + sts_end_write(hashtable->shared_outer_batches, curbatch); + + /* + * Wait again, so that all workers see the new hash table and can + * safely read from batch files from any participant because they have + * all ended writing. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_RESETTING_BATCH(curbatch)); + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_RESETTING); + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_LOADING_BATCH(curbatch)); + ExecHashUpdate(hashtable); + + /* Forget the current chunks. */ + hashtable->current_chunk = NULL; + return; + } /* * Release all the hash buckets and tuples acquired in the prior pass, and @@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); + hashtable->buckets = (HashJoinBucketHead *) + palloc0(nbuckets * sizeof(HashJoinBucketHead)); - hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple); + hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead); /* Cannot be more than our previous peak; we had thissize before. */ Assert(hashtable->spaceUsed <= hashtable->spacePeak); @@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable) /* Forget the chunks (the memory was freed by the contextreset above). */ hashtable->chunks = NULL; + + /* Rewind the shared read heads for this batch, inner and outer. */ + if (hashtable->innerBatchFile[curbatch] != NULL) + { + if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + } + if (hashtable->outerBatchFile[curbatch] != NULL) + { + if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + }}/* @@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable)voidExecHashTableResetMatchFlags(HashJoinTable hashtable){ + dsa_pointer chunk_shared = InvalidDsaPointer; HashMemoryChunk chunk; HashJoinTuple tuple; int i; /* Reset all flags in the main table ... */ - chunk = hashtable->chunks; + if (HashJoinTableIsShared(hashtable)) + { + /* This only runs in the leader during rescan initialization. */ + Assert(!IsParallelWorker()); + hashtable->shared->chunk_work_queue = hashtable->shared->chunks; + chunk = pop_chunk_queue(hashtable, &chunk_shared); + } + else + chunk = hashtable->chunks; Hm - doesn't pop_chunk_queue empty the work queue? +/* + * Load a tuple into shared dense storage, like 'load_private_tuple'. This + * version is for shared hash tables. + */ +static HashJoinTuple +load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple, + dsa_pointer *shared, bool respect_work_mem) +{ Hm. Are there issues with "blessed" records being stored in shared memory? I seem to recall you talking about it, but I see nothing addressing the issue here? (later) Ah, I see - you just prohibit paralleism in that case - might be worth pointing to. + /* Check if some other participant has increased nbatch. */ + if (hashtable->shared->nbatch > hashtable->nbatch) + { + Assert(respect_work_mem); + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch); + } + + /* Check if we need to help shrinking. */ + if (hashtable->shared->shrink_needed && respect_work_mem) + { + hashtable->current_chunk = NULL; + LWLockRelease(&hashtable->shared->chunk_lock); + return NULL; + } + + /* Oversized tuples get their own chunk. */ + if (size > HASH_CHUNK_THRESHOLD) + chunk_size = size + HASH_CHUNK_HEADER_SIZE; + else + chunk_size = HASH_CHUNK_SIZE; + + /* If appropriate, check if work_mem would be exceeded by a new chunk. */ + if (respect_work_mem && + hashtable->shared->grow_enabled && + hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP && + (hashtable->shared->size + + chunk_size) > (work_mem * 1024L * + hashtable->shared->planned_participants)) + { + /* + * It would be exceeded. Let's increase the number of batches, so we + * can try to shrink the hash table. + */ + hashtable->shared->nbatch *= 2; + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch); + hashtable->shared->chunk_work_queue = hashtable->shared->chunks; + hashtable->shared->chunks = InvalidDsaPointer; + hashtable->shared->shrink_needed = true; + hashtable->current_chunk = NULL; + LWLockRelease(&hashtable->shared->chunk_lock); + + /* The caller needs to shrink the hash table. */ + return NULL; + } Hm - we could end up calling ExecHashIncreaseNumBatches twice here? Probably harmless. /* ---------------------------------------------------------------- * ExecHashJoin @@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (hashNode->shared_table_data != NULL) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables yet. + */ + node->hj_FirstOuterTupleSlot = NULL; Hm, why is this checking for the shared-ness of the join in a different manner? + if (HashJoinTableIsShared(hashtable)) + { + /* + * An important optimization: if this is a + * single-batch join and not an outer join, there is + * no reason to synchronize again when we've finished + * probing. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); + if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node)) + return NULL; /* end of join */ + + /* + * Check if we are a leader that can't go further than + * probing the first batch, to avoid risk of deadlock + * against workers. + */ + if (!LeaderGateCanContinue(&hashtable->shared->leader_gate)) + { + /* + * Other backends will need to handle all future + * batches written by me. We don't detach until + * after we've finished writing to all batches so + * that they are flushed, otherwise another + * participant might try to read them too soon. + */ + sts_end_write_all_partitions(hashNode->shared_inner_batches); + sts_end_write_all_partitions(hashNode->shared_outer_batches); + BarrierDetach(&hashtable->shared->barrier); + hashtable->detached_early = true; + return NULL; + } + + /* + * We can't start searching for unmatched tuples until + * all participants have finished probing, so we + * synchronize here. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); + if (BarrierWait(&hashtable->shared->barrier, + WAIT_EVENT_HASHJOIN_PROBING)) + { + /* Serial phase: prepare for unmatched. */ + if (HJ_FILL_INNER(node)) + { + hashtable->shared->chunk_work_queue = + hashtable->shared->chunks; + hashtable->shared->chunks = InvalidDsaPointer; + } + } Couldn't we skip that if this isn't an outer join? Not sure if the complication would be worth it... +void +ExecShutdownHashJoin(HashJoinState *node) +{ + /* + * By the time ExecEndHashJoin runs in a work, shared memory has been s/work/worker/ + * destroyed. So this is our last chance to do any shared memory cleanup. + */ + if (node->hj_HashTable) + ExecHashTableDetach(node->hj_HashTable); +} + There is no extra charge + * for probing the hash table for outer path row, on the basis that + * read-only access to a shared hash table shouldn't be any more + * expensive. + */ Hm, that's debatable. !shared will mostly be on the local numa node, shared probably not. * Get hash table size that executor would use for inner relation. * + * Shared hash tables are allowed to use the work_mem of all participants + * combined to make up for the fact that there is only one copy shared by + * all. Hm. I don't quite understand that reasoning. * XXX for the moment, always assume that skew optimization will be * performed. As long as SKEW_WORK_MEM_PERCENTis small, it's not worth * trying to determine that for sure. If we don't do skew for parallelism, should we skip that bit? - Andres
On Tue, Mar 28, 2017 at 11:11 AM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > On Mon, Mar 27, 2017 at 12:20 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> >> On Sun, Mar 26, 2017 at 3:56 PM, Thomas Munro >> <thomas.munro@enterprisedb.com> wrote: >> > But... what you said above must be a problem for Windows. I believe >> > it doesn't allow files to be unlinked if they are open, and I see that >> > DSM segments are cleaned up in resowner's phase == >> > RESOURCE_RELEASE_BEFORE_LOCKS and files are closed in phase == >> > RESOURCE_RELEASE_AFTER_LOCKS. >> >> I thought this last point about Windows might be fatal to my design, >> but it seems that Windows since at least version 2000 has support for >> Unixoid unlinkability via the special flag FILE_SHARE_DELETE. > > On testing v10 of this patch over commit > b54aad8e34bd6299093e965c50f4a23da96d7cc3 and applying the tweak > mentioned in [1], for TPC-H queries I found the results quite > encouraging, > > Experimental setup: > TPC-H scale factor - 20 > work_mem = 1GB > shared_buffers = 10GB > effective_cache_size = 10GB > random_page_cost = seq_page_cost = 0.1 > max_parallel_workers_per_gather = 4 > > Performance numbers: > (Time in seconds) > Query | Head | Patch | > ------------------------------- > Q3 | 73 | 37 | > Q5 | 56 | 31 | > Q7 | 40 | 30 | > Q8 | 8 | 8 | > Q9 | 85 | 42 | > Q10 | 86 | 46 | > Q14 | 11 | 6 | > Q16 | 32 | 11 | > Q21 | 53 | 56 | > > Please find the attached file for the explain analyse output of these > queries on head as well as patch. > Would be working on analysing the performance of this patch on 300 scale factor. > > [1] https://www.postgresql.org/message-id/flat/CAEepm%3D270ze2hVxWkJw-5eKzc3AB4C9KpH3L2kih75R5pdSogg%40mail.gmail.com > -- Before moving to higher scale I tried playing around work_mem effects on this patch and came across following results, All settings are kept as before with the exception of work_mem that is set to 64MB. Most of the queries showed similar performance except a few, details are as follows, (all time are given in ms) Query | Head | Patch ---------+----------+-------- Q8 | 8720 | 8839 Q18 | 370710 | 384347 Q21 | 53270 | 65189 Clearly, regression in Q8 and Q18 is minor but that in Q21 is significant. Just to confirm, I have applied the tweak mentioned in [1] as before, For the explain analyse output of Q21 on head and with patch, please check the attached file. [1] https://www.postgresql.org/message-id/flat/CAEepm%3D270ze2hVxWkJw-5eKzc3AB4C9KpH3L2kih75R5pdSogg%40mail.gmail.com -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/
Attachment
Hi hackers, Thanks very much to Rafia for testing, and to Andres for his copious review feedback. Here's a new version. Changes: 1. Keep all the backing files that are part of a BufFileSet in subdirectories, as suggested by Andres. Now, instead of that unpopular logic for scanning ranges of possible file paths to delete, we can just blow away whole directories that group sets of related files. 2. Don't expose 'participant' and 'partition' concepts, Andres didn't like much, in the BufFile API. There is a new concept 'stripe' which client code of BufFileSet can use to specify the participant number in a more general way without saying so: it's really just a way to spread files over tablespaces. I'm not sure if tablespaces are really used that way much, but it seemed like Peter wasn't going to be too happy with a proposal that didn't do *something* to respect the existing temp_tablespaces GUC beahviour (and he'd be right). But I didn't think it would make any kind of sense at all to stripe by 1GB segments as private BufFiles do when writing from multiple processes, as I have argued elsewhere, hence this scheme. The 'qunique' function used here (basically poor man's std::unique) is one I proposed earlier, with the name suggested by Tom Lane: See https://www.postgresql.org/message-id/flat/CAEepm%3D2vmFTNpAmwbGGD2WaryM6T3hSDVKQPfUwjdD_5XY6vAA%40mail.gmail.com . 3. Merged the single-batch and multi-batch patches into one. EarlierI had the idea that it was easier to review them in layers since I hoped people might catch a glimpse of the central simplicity without being hit by a wall of multi-batch logic, but since Andres is reviewing and disagrees, I give you 0010-hj-parallel-v11.patch which weighs in at 32 files changed, 2278 insertions(+), 250 deletions(-). 4. Moved the DSM handling to the every end of resowner.c's cleanup. Peter pointed out that it would otherwise happen before fd.c Files are closed. He was concerned about a different aspect of that which I'm not sure I fully understand, but at the very least it seemed to represent a significant problem for this design on Windows. I discussed this briefly with Robert off-list and he told me that there is probably no good reason for the ordering that we have, and what's more, there may be good arguments even outside this case for DSM segments being cleaned up as late as possible, now that they contain shared control information and not just tuple data as once had been imagined. I can't think of any reason why this would not be safe. Can you? 5. The empty inner relation optimisation implemented. Some smaller changes and miles of feedback inline below: On Mon, Mar 27, 2017 at 11:03 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Mon, Mar 27, 2017 at 9:41 AM, Andres Freund <andres@anarazel.de> wrote: >> SharedBufFile allows temporary files to be created by one backend and >> then exported for read-only access by other backends, with clean-up >> managed by reference counting associated with a DSM segment. This includes >> changes to fd.c and buffile.c to support new kinds of temporary file. >> >> >> diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c >> index 4ca0ea4..a509c05 100644 >> --- a/src/backend/storage/file/buffile.c >> +++ b/src/backend/storage/file/buffile.c >> >> I think the new facilities should be explained in the file's header. > > Will do. Done. >> @@ -68,9 +71,10 @@ struct BufFile >> * avoid making redundant FileSeek calls. >> */ >> >> - bool isTemp; /* can only add files if this is TRUE */ >> + bool isSegmented; /* can only add files if this is TRUE */ >> >> That's a bit of a weird and uncommented upon change. > > I was trying to cut down on the number of places we use the word > 'temporary' to activate various different behaviours. In this case, > the only thing it controls is whether the BufFile is backed by one > single fd.c File or many segments, so I figured it should be renamed. > > As Peter and you have pointed out, there may be a case for removing it > altogether. Done in 0007-hj-remove-buf-file-is-temp-v11.patch. >> @@ -79,6 +83,8 @@ struct BufFile >> */ >> ResourceOwner resowner; >> >> + BufFileTag tag; /* for discoverability between backends */ >> >> Not perfectly happy with the name tag here, the name is a bit too >> similar to BufferTag - something quite different. > > Yeah, will rename. Done. That existed only because I had sharedbuffile.c which needed special access to buffile.c via those weird 'tag' interfaces. In the new version that isn't required, and a new struct BufFileSet is provided by buffile.c/h. >> +static void >> +make_tagged_path(char *tempdirpath, char *tempfilepath, >> + const BufFileTag *tag, int segment) >> +{ >> + if (tag->tablespace == DEFAULTTABLESPACE_OID || >> + tag->tablespace == GLOBALTABLESPACE_OID) >> + snprintf(tempdirpath, MAXPGPATH, "base/%s", PG_TEMP_FILES_DIR); >> + else >> + { >> + snprintf(tempdirpath, MAXPGPATH, "pg_tblspc/%u/%s/%s", >> + tag->tablespace, TABLESPACE_VERSION_DIRECTORY, >> + PG_TEMP_FILES_DIR); >> + } >> + >> + snprintf(tempfilepath, MAXPGPATH, "%s/%s%d.%d.%d.%d.%d", tempdirpath, >> + PG_TEMP_FILE_PREFIX, >> + tag->creator_pid, tag->set, tag->partition, tag->participant, >> + segment); >> >> Is there a risk that this ends up running afoul of filename length >> limits on some platforms? The names are shorter now, and split over two levels: pgsql_tmp37303.2.set/pgsql_tmp.p30.b0.0 >> If we do decide not to change this: Why is that sufficient? Doesn't the >> same problem exist for segments later than the first? > > It does exist and it is handled. The comment really should say > "unlinking segment N + 1 (if it exists) before creating segment N". > Will update. I got rid of this. This doesn't come up anymore because the patch now blows away whole directories. There is never a case where files left over after a crash-restart would confuse us. There may be left over directories, but if we find that we can't create a directory, we try to delete it and all its contents first (ie to see if there was a leftover directory from before a crash-restart) and then try again, so individual segment files shouldn't be able to confuse us. >> + * PathNameCreateTemporaryFile, PathNameOpenTemporaryFile and >> + * PathNameDeleteTemporaryFile are used for temporary files that may be shared >> + * between backends. A File created or opened with these functions is not >> + * automatically deleted when the file is closed, but it is automatically >> + * closed and end of transaction and counts agains the temporary file limit of >> + * the backend that created it. Any File created this way must be explicitly >> + * deleted with PathNameDeleteTemporaryFile. Automatic file deletion is not >> + * provided because this interface is designed for use by buffile.c and >> + * indirectly by sharedbuffile.c to implement temporary files with shared >> + * ownership and cleanup. >> >> Hm. Those name are pretty easy to misunderstand, no? s/Temp/Shared/? > > Hmm. Yeah these may be better. Will think about that. I like these names. This is fd.c providing named temporary files. They are definitely temporary files still: they participate in the total temp limit and logging/pgstat and they are automatically closed. The only different things are: they have names permitting opening by other backends, and (it follows) are not automatically deleted on close. buffile.c takes over that duty using a BufFileSet. >> +File >> +PathNameOpenTemporaryFile(char *tempfilepath) >> +{ >> + File file; >> + >> + /* >> + * Open the file. Note: we don't use O_EXCL, in case there is an orphaned >> + * temp file that can be reused. >> + */ >> + file = PathNameOpenFile(tempfilepath, O_RDONLY | PG_BINARY, 0); >> >> If so, wouldn't we need to truncate the file? > > Yes, this lacks O_TRUNC. Thanks. Actually the reason I did that is because I wanted to open the file with O_RDONLY, which is incompatible with O_TRUNC. Misleading comment removed. >> + * A single SharedBufFileSet can manage any number of 'tagged' BufFiles that >> + * are shared between a fixed number of participating backends. Each shared >> + * BufFile can be written to by a single participant but can be read by any >> + * backend after it has been 'exported'. Once a given BufFile is exported, it >> + * becomes read-only and cannot be extended. To create a new shared BufFile, >> + * a participant needs its own distinct participant number, and needs to >> + * specify an arbitrary partition number for the file. To make it available >> + * to other backends, it must be explicitly exported, which flushes internal >> + * buffers and renders it read-only. To open a file that has been shared, a >> + * backend needs to know the number of the participant that created the file, >> + * and the partition number. It is the responsibily of calling code to ensure >> + * that files are not accessed before they have been shared. >> >> Hm. One way to make this safer would be to rename files when exporting. >> Should be sufficient to do this to the first segment, I guess. > > Interesting idea. Will think about that. That comment isn't great > and repeats itself. Will improve. Comment improved. I haven't investigated a file-renaming scheme for exporting files yet. >> + * Each file is identified by a partition number and a participant number, so >> + * that a SharedBufFileSet can be viewed as a 2D table of individual files. >> >> I think using "files" as a term here is a bit dangerous - they're >> individually segmented again, right? > > True. It's a 2D matrix of BufFiles. The word "file" is super > overloaded here. Will fix. No longer present. >> +/* >> + * The number of bytes of shared memory required to construct a >> + * SharedBufFileSet. >> + */ >> +Size >> +SharedBufFileSetSize(int participants) >> +{ >> + return offsetof(SharedBufFileSet, participants) + >> + sizeof(SharedBufFileParticipant) * participants; >> +} >> >> The function name sounds a bit like a function actuallize setting some >> size... s/Size/DetermineSize/? > > Hmm yeah "set" as verb vs "set" as noun. I think "estimate" is the > established word for this sort of thing (even though that seems > strange because it sounds like it doesn't have to be exactly right: > clearly in all these shmem-space-reservation functions it has to be > exactly right). Will change. Done. (Of course 'estimate' is both a noun and a verb too, and for extra points pronounced differently...) >> >> +/* >> + * Create a new file suitable for sharing. Each backend that calls this must >> + * use a distinct participant number. Behavior is undefined if a participant >> + * calls this more than once for the same partition number. Partitions should >> + * ideally be numbered consecutively or in as small a range as possible, >> + * because file cleanup will scan the range of known partitions looking for >> + * files. >> + */ >> >> Wonder if we shouldn't just create a directory for all such files. > > Hmm. Yes, that could work well. Will try that. Done. >> I'm a bit unhappy with the partition terminology around this. It's >> getting a bit confusing. We have partitions, participants and >> segements. Most of them could be understood for something entirely >> different than the meaning you have here... > > Ok. Let me try to explain [explanation...]. > > (Perhaps SharedBufFileSet should be called PartitionedBufFileSet?) I got rid of most of that terminology. Now I have BufFileSet which is a set of named BufFiles and it's up to client code to manage the namespace within it. SharedTuplestore happens to build names that include partition and participant numbers, but that's its business. There is also a 'stripe' number, which is used as a way to spread files across multiple temp_tablespaces. >> +static void >> +shared_buf_file_on_dsm_detach(dsm_segment *segment, Datum datum) >> +{ >> + bool unlink_files = false; >> + SharedBufFileSet *set = (SharedBufFileSet *) DatumGetPointer(datum); >> + >> + SpinLockAcquire(&set->mutex); >> + Assert(set->refcount > 0); >> + if (--set->refcount == 0) >> + unlink_files = true; >> + SpinLockRelease(&set->mutex); >> >> I'm a bit uncomfortable with releasing a refcount, and then still using >> the memory from the set... I don't think there's a concrete danger >> here as the code stands, but it's a fairly dangerous pattern. > > Will fix. I could fix that but I'd feel bad about doing more work while holding the spinlock (even though it can't possibly be contended because we are the last to detach). I have added a comment to explain that it's safe to continue accessing the DSM segment while in this function body. On Mon, Mar 27, 2017 at 10:47 AM, Andres Freund <andres@anarazel.de> wrote: > On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: >> Here is a new patch series responding to feedback from Peter and Andres: > > + > +/* Per-participant shared state. */ > +typedef struct SharedTuplestoreParticipant > +{ > + LWLock lock; > > Hm. No padding (ala LWLockMinimallyPadded / LWLockPadded) - but that's > probably ok, for now. I hunted around but didn't see an idiom for making this whole struct cacheline-sized. > + bool error; /* Error occurred flag. */ > + bool eof; /* End of file reached. */ > + int read_fileno; /* BufFile segment file number. */ > + off_t read_offset; /* Offset within segment file. */ > > Hm. I wonder if it'd not be better to work with 64bit offsets, and just > separate that out upon segment access. This falls out of the current two-part BufFileTell and BufFileSeek interface. Since translation could be done trivially (single_address_space_offset = fileno * MAX_PHYSICAL_FILESIZE + offset), that might be a reasonable refactoring, but it seems to be material for a separate patch, considering that other client code would be affected, no? > +/* The main data structure in shared memory. */ > > "main data structure" isn't particularly meaningful. Fixed. > +struct SharedTuplestore > +{ > + int reading_partition; > + int nparticipants; > + int flags; > > Maybe add a comment saying /* flag bits from SHARED_TUPLESTORE_* */? Done. > + Size meta_data_size; > > What's this? Comments added to every struct member. > + SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]; > > I'd add a comment here, that there's further data after participants. Done. > +}; > > + > +/* Per-participant backend-private state. */ > +struct SharedTuplestoreAccessor > +{ > > Hm. The name and it being backend-local are a bit conflicting. Hmm. It's a (SharedTupleStore) Accessor, not a Shared (...). Not sure if we have an established convention for this kind of thing... > + int participant; /* My partitipant number. */ > + SharedTuplestore *sts; /* The shared state. */ > + int nfiles; /* Size of local files array. */ > + BufFile **files; /* Files we have open locally for writing. */ > > Shouldn't this mention that it's indexed by partition? Done. > + BufFile *read_file; /* The current file to read from. */ > + int read_partition; /* The current partition to read from. */ > + int read_participant; /* The current participant to read from. */ > + int read_fileno; /* BufFile segment file number. */ > + off_t read_offset; /* Offset within segment file. */ > +}; > > > +/* > + * Initialize a SharedTuplestore in existing shared memory. There must be > + * space for sts_size(participants) bytes. If flags is set to the value > + * SHARED_TUPLESTORE_SINGLE_PASS then each partition may only be read once, > + * because underlying files will be deleted. > > Any reason not to use flags that are compatible with tuplestore.c? tuplestore.c uses some executor.h flags like EXEC_FLAG_MARK. sharedtuplestore.c's interface and capabilities are extremely primitive and only really let it do exactly what I needed to do here. Namely, every participant writes into its own set of partition files, and then all together we perform a single "partial scan" in some undefined order to get all the tuples back and share them out between backends. Extending it to behave more like the real tuplestore may be interesting for other projects (dynamic partitioning etc) but it didn't seem like a good idea to speculate on what exactly would be needed. This particular flag means 'please delete individual backing files as we go after reading them', and I don't believe there is any equivalent; someone thought the private HJ should do that so I figured I should do it here too. > + * Tuples that are stored may optionally carry a piece of fixed sized > + * meta-data which will be retrieved along with the tuple. This is useful for > + * the hash codes used for multi-batch hash joins, but could have other > + * applications. > + */ > +SharedTuplestoreAccessor * > +sts_initialize(SharedTuplestore *sts, int participants, > + int my_participant_number, > + Size meta_data_size, > + int flags, > + dsm_segment *segment) > +{ > > Not sure I like that the naming here has little in common with > tuplestore.h's api. Hmm. I feel like its interface needs to be significantly different to express the things it needs to do, especially at initialisation. As for the tuple write/write interface, how would you improve this? sts_puttuple(...); sts_puttuple(...); ... sts_end_write_all_partitions(...); sts_prepare_partial_scan(...); /* in one backend only */ sts_begin_partial_scan(...); ... = sts_gettuple(...); ... = sts_gettuple(...); ... sts_end_partial_scan(...); One thought that I keep having: the private hash join code should also use tuplestore. But a smarter tuplestore that knows how to hold onto the hash value (the meta-data in my sharedtuplestore.c) and knows about partitions (batches). It would be nice if the private and shared batching code finished up harmonised in this respect. > + > +MinimalTuple > +sts_gettuple(SharedTuplestoreAccessor *accessor, void *meta_data) > +{ > > This needs docs. Done. > + SharedBufFileSet *fileset = GetSharedBufFileSet(accessor->sts); > + MinimalTuple tuple = NULL; > + > + for (;;) > + { > > ... > + /* Check if this participant's file has already been entirely read. */ > + if (participant->eof) > + { > + BufFileClose(accessor->read_file); > + accessor->read_file = NULL; > + LWLockRelease(&participant->lock); > + continue; > > Why are we closing the file while holding the lock? Fixed. > + > + /* Read the optional meta-data. */ > + eof = false; > + if (accessor->sts->meta_data_size > 0) > + { > + nread = BufFileRead(accessor->read_file, meta_data, > + accessor->sts->meta_data_size); > + if (nread == 0) > + eof = true; > + else if (nread != accessor->sts->meta_data_size) > + ereport(ERROR, > + (errcode_for_file_access(), > + errmsg("could not read from temporary file: %m"))); > + } > + > + /* Read the size. */ > + if (!eof) > + { > + nread = BufFileRead(accessor->read_file, &tuple_size, sizeof(tuple_size)); > + if (nread == 0) > + eof = true; > > Why is it legal to have EOF here, if metadata previously didn't have an > EOF? Perhaps add an error if accessor->sts->meta_data_size != 0? Improved comments. > + if (eof) > + { > + participant->eof = true; > + if ((accessor->sts->flags & SHARED_TUPLESTORE_SINGLE_PASS) != 0) > + SharedBufFileDestroy(fileset, accessor->read_partition, > + accessor->read_participant); > + > + participant->error = false; > + LWLockRelease(&participant->lock); > + > + /* Move to next participant's file. */ > + BufFileClose(accessor->read_file); > + accessor->read_file = NULL; > + continue; > + } > + > + /* Read the tuple. */ > + tuple = (MinimalTuple) palloc(tuple_size); > + tuple->t_len = tuple_size; > > Hm. Constantly re-allocing this doesn't strike me as a good idea (not to > mention that the API doesn't mention this is newly allocated). Seems > like it'd be a better idea to have a per-accessor buffer where this can > be stored in - increased in size when necessary. Done. On Tue, Mar 28, 2017 at 6:33 PM, Andres Freund <andres@anarazel.de> wrote: > On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: >> Here is a new patch series responding to feedback from Peter and Andres: > > Here's a review of 0007 & 0010 together - they're going to have to be > applied together anyway... I have now merged them FWIW. > diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml > index ac339fb566..775c9126c7 100644 > --- a/doc/src/sgml/config.sgml > +++ b/doc/src/sgml/config.sgml > @@ -3814,6 +3814,21 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" > </listitem> > </varlistentry> > > + <varlistentry id="guc-cpu-shared-tuple-cost" xreflabel="cpu_shared_tuple_cost"> > + <term><varname>cpu_shared_tuple_cost</varname> (<type>floating point</type>) > + <indexterm> > + <primary><varname>cpu_shared_tuple_cost</> configuration parameter</primary> > + </indexterm> > + </term> > + <listitem> > + <para> > + Sets the planner's estimate of the cost of sharing rows in > + memory during a parallel query. > + The default is 0.001. > + </para> > + </listitem> > + </varlistentry> > + > > Isn't that really low in comparison to the other costs? I think > specifying a bit more what this actually measures would be good too - is > it putting the tuple in shared memory? Is it accessing it? Yeah. It was really just to make the earlier Shared Hash consistently more expensive than private Hash, by a tiny amount. Then it wouldn't kick in until it could help you avoid batching. I will try to come up with some kind of argument based on data... > + <varlistentry id="guc-cpu-synchronization-cost" xreflabel="cpu_synchronization_cost"> > + <term><varname>cpu_synchronization_cost</varname> (<type>floating point</type>) > + <indexterm> > + <primary><varname>cpu_synchronization_cost</> configuration parameter</primary> > + </indexterm> > + </term> > + <listitem> > + <para> > + Sets the planner's estimate of the cost of waiting at synchronization > + points for other processes while executing parallel queries. > + The default is 1.0. > + </para> > + </listitem> > + </varlistentry> > > Isn't this also really cheap in comparison to a, probably cached, seq > page read? It's not really the synchronisation primitive itself, which is fast, it's how long the other guys may spend doing other stuff before they reach the barrier. Currently we have a block granularity parallel query system, so really this is an estimation of how long the average participant will have to wait for the last of its peers to finish chewing on up to one page of tuples from its (ultimate) source of parallelism. Yeah I'm waffling a bit because I don't have a principled answer to this question yet... > + if (HashJoinTableIsShared(hashtable)) > + { > + /* > + * Synchronize parallel hash table builds. At this stage we know that > + * the shared hash table has been created, but we don't know if our > + * peers are still in MultiExecHash and if so how far through. We use > + * the phase to synchronize with them. > + */ > + barrier = &hashtable->shared->barrier; > + > + switch (BarrierPhase(barrier)) > + { > + case PHJ_PHASE_BEGINNING: > > Note pgindent will indent this further. Might be worthwhile to try to > pgindent the file, revert some of the unintended damage. Fixed switch statement indentation. I will try pgindent soon and see how badly it all breaks. > /* > * set expression context > */ > > I'd still like this to be moved to the start. Done. > @@ -126,17 +202,79 @@ MultiExecHash(HashState *node) > /* Not subject to skew optimization, so insert normally */ > ExecHashTableInsert(hashtable, slot, hashvalue); > } > - hashtable->totalTuples += 1; > + hashtable->partialTuples += 1; > + if (!HashJoinTableIsShared(hashtable)) > + hashtable->totalTuples += 1; > } > } > > FWIW, I'd put HashJoinTableIsShared() into a local var - the compiler > won't be able to do that on its own because external function calls > could invalidate the result. Done in in the hot loops. > That brings me to a related topic: Have you measured whether your > changes cause performance differences? I have never succeeded in measuring any reproducible difference between master with 0 workers and my patch with the 0 workers on various contrived queries and TPCH queries (except the ones where my patch makes certain outer joins faster for known reasons). I suspect it just spends to much time ping ponging in and out of the node for each tuple for tiny differences in coding to show up. But I could be testing for the wrong things... > + finish_loading(hashtable); > > I find the sudden switch to a different naming scheme in the same file a > bit jarring. Ok. I have now changed all of the static functions in nodeHash.c from foo_bar to ExecHashFooBar. > + if (HashJoinTableIsShared(hashtable)) > + BarrierDetach(&hashtable->shared->shrink_barrier); > + > + if (HashJoinTableIsShared(hashtable)) > + { > > Consecutive if blocks with the same condition... Fixed. > > + bool elected_to_resize; > + > + /* > + * Wait for all backends to finish building. If only one worker is > + * running the building phase because of a non-partial inner plan, the > + * other workers will pile up here waiting. If multiple worker are > + * building, they should finish close to each other in time. > + */ > > That comment is outdated, isn't it? Yes, fixed. > /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ > - if (hashtable->nbuckets != hashtable->nbuckets_optimal) > - ExecHashIncreaseNumBuckets(hashtable); > + ExecHashUpdate(hashtable); > + ExecHashIncreaseNumBuckets(hashtable); > > So this now doesn't actually increase the number of buckets anymore. Well that function always returned if found there were already enough buckets, so either the test at call site or in the function was redundant. I have renamed it to ExecHashIncreaseNumBucketsIfNeeded() to make that clearer. > + reinsert: > + /* If the table was resized, insert tuples into the new buckets. */ > + ExecHashUpdate(hashtable); > + ExecHashReinsertAll(hashtable); > > ReinsertAll just happens to do nothing if we didn't have to > resize... Not entirely obvious, sure reads as if it were unconditional. > Also, it's not actually "All" when batching is in use, no? Renamed to ExecHashReinsertHashtableIfNeeded. > + post_resize: > + if (HashJoinTableIsShared(hashtable)) > + { > + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING); > + BarrierWait(barrier, WAIT_EVENT_HASH_RESIZING); > + Assert(BarrierPhase(barrier) == PHJ_PHASE_REINSERTING); > + } > + > + reinsert: > + /* If the table was resized, insert tuples into the new buckets. */ > + ExecHashUpdate(hashtable); > + ExecHashReinsertAll(hashtable); > > Hm. So even non-resizing backends reach this - but they happen to not > do anything because there's no work queued up, right? That's, uh, not > obvious. Added comments to that effect. > For me the code here would be a good bit easier to read if we had a > MultiExecHash and MultiExecParallelHash. Half of MultiExecHash is just > if(IsShared) blocks, and copying would avoid potential slowdowns. Hmm. Yeah I have struggled with this question in several places. For example I have ExecHashLoadPrivateTuple and ExecHashLoadSharedTuple because the intertwangled version was unbearable. But in MultiExecHash's case, I feel there is some value in showing that the basic hash build steps are the same. The core loop, where the main action really happens, is unchanged. > + /* > + * Set up for skew optimization, if possible and there's a need for > + * more than one batch. (In a one-batch join, there's no point in > + * it.) > + */ > + if (nbatch > 1) > + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); > > So there's no equivalent to the skew optimization for parallel query > yet... It doesn't sound like that should be particulalry hard on first > blush? Making the skew table shared, setting up buckets for MVCs, build and probing it is easy. It's work_mem exhaustion and shrinking and related jiggery pokery that'll be tricky, but I'll shortly be looking at that with vigour and vim. That there may be one or two empty relation optimisations that I haven't got yet because they involve a bit of extra communication. > static void > -ExecHashIncreaseNumBatches(HashJoinTable hashtable) > +ExecHashIncreaseNumBatches(HashJoinTable hashtable, int nbatch) > > So this doesn't actually increase the number of batches anymore... At > the very least this should mention that the main work is done in > ExecHashShrink. Yeah. Done. > +/* > + * Process the queue of chunks whose tuples need to be redistributed into the > + * correct batches until it is empty. In the best case this will shrink the > + * hash table, keeping about half of the tuples in memory and sending the rest > + * to a future batch. > + */ > +static void > +ExecHashShrink(HashJoinTable hashtable) > > Should mention this really only is meaningful after > ExecHashIncreaseNumBatches has run. Updated. > +{ > + long ninmemory; > + long nfreed; > + dsa_pointer chunk_shared; > + HashMemoryChunk chunk; > > - /* If know we need to resize nbuckets, we can do it while rebatching. */ > - if (hashtable->nbuckets_optimal != hashtable->nbuckets) > + if (HashJoinTableIsShared(hashtable)) > { > - /* we never decrease the number of buckets */ > - Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); > + /* > + * Since a newly launched participant could arrive while shrinking is > + * already underway, we need to be able to jump to the correct place > + * in this function. > + */ > + switch (PHJ_SHRINK_PHASE(BarrierPhase(&hashtable->shared->shrink_barrier))) > + { > + case PHJ_SHRINK_PHASE_BEGINNING: /* likely case */ > + break; > + case PHJ_SHRINK_PHASE_CLEARING: > + goto clearing; > + case PHJ_SHRINK_PHASE_WORKING: > + goto working; > + case PHJ_SHRINK_PHASE_DECIDING: > + goto deciding; > + } > > Hm, so we jump into different nesting levels here :/ I rewrote this without goto. Mea culpa. > ok, ENOTIME for today... Thanks! Was enough to keep me busy for some time... > diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c > index f2c885afbe..87d8f3766e 100644 > --- a/src/backend/executor/nodeHashjoin.c > +++ b/src/backend/executor/nodeHashjoin.c > @@ -6,10 +6,78 @@ > * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group > * Portions Copyright (c) 1994, Regents of the University of California > * > - * > * IDENTIFICATION > * src/backend/executor/nodeHashjoin.c > * > + * NOTES: > + * > + * PARALLELISM > + * > + * Hash joins can participate in parallel queries in two ways: in > + * non-parallel-aware mode, where each backend builds an identical hash table > + * and then probes it with a partial outer relation, or parallel-aware mode > + * where there is a shared hash table that all participants help to build. A > + * parallel-aware hash join can save time and space by dividing the work up > + * and sharing the result, but has extra communication overheads. > > There's a third, right? The hashjoin, and everything below it, could > also not be parallel, but above it could be some parallel aware node > (e.g. a parallel aware HJ). Yeah that's the same thing: it's not aware of parallelism. Its outer plan may be partial or not, and it doesn't even know. That's the distinction I'm trying to make clear: actually doing something special for parallelism. I've update the text slightly to say that the outer plan may be partial or not in a hash join that is under Gather. > + * In both cases, hash joins use a private state machine to track progress > + * through the hash join algorithm. > > That's not really parallel specific, right? Perhaps just say that > parallel HJs use the normal state machine? Updated. > + * In a parallel-aware hash join, there is also a shared 'phase' which > + * co-operating backends use to synchronize their local state machine and > + * program counter with the multi-process join. The phase is managed by a > + * 'barrier' IPC primitive. > > Hm. I wonder if 'phase' shouldn't just be name > sharedHashJoinState. Might be a bit easier to understand than a > different terminology. Hmm. Well it is a lot like a state machine but it might be more confusing to have both local and shared 'state'. I think 'phases' of parallel computation are quite intuitive. I'm rather attached to this terminology... > + * The phases are as follows: > + * > + * PHJ_PHASE_BEGINNING -- initial phase, before any participant acts > + * PHJ_PHASE_CREATING -- one participant creates the shmem hash table > + * PHJ_PHASE_BUILDING -- all participants build the hash table > + * PHJ_PHASE_RESIZING -- one participant decides whether to expand buckets > + * PHJ_PHASE_REINSERTING -- all participants reinsert tuples if necessary > + * PHJ_PHASE_PROBING -- all participants probe the hash table > + * PHJ_PHASE_UNMATCHED -- all participants scan for unmatched tuples > > I think somewhere here - and probably around the sites it's happening - > should mention that state transitions are done kinda implicitly via > BarrierWait progressing to the numerically next phase. That's not > entirely obvious (and actually limits what the barrier mechanism can be > used for...). Yeah. Added comments. On Wed, Mar 29, 2017 at 9:31 AM, Andres Freund <andres@anarazel.de> wrote: > - ExecHashJoinSaveTuple(tuple, > - hashvalue, > - &hashtable->innerBatchFile[batchno]); > + if (HashJoinTableIsShared(hashtable)) > + sts_puttuple(hashtable->shared_inner_batches, batchno, &hashvalue, > + tuple); > + else > + ExecHashJoinSaveTuple(tuple, > + hashvalue, > + &hashtable->innerBatchFile[batchno]); > } > } > > Why isn't this done inside of ExecHashJoinSaveTuple? I had it that way earlier but the arguments got ugly. I suppose it could take an SOMETHING_INNER/SOMETHING_OUTER enum and a partition number. I wonder if SharedTuplestore should be able to handle the private case too... > @@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable) > > + /* Rewind the shared read heads for this batch, inner and outer. */ > + sts_prepare_parallel_read(hashtable->shared_inner_batches, > + curbatch); > + sts_prepare_parallel_read(hashtable->shared_outer_batches, > + curbatch); > > It feels somewhat wrong to do this in here, rather than on the callsites. The private hash table code does the moral equivalent directly below: it uses BufFileSeek to rewind the current inner and outer batch to the start. > + } > + > + /* > + * Each participant needs to make sure that data it has written for > + * this partition is now read-only and visible to other participants. > + */ > + sts_end_write(hashtable->shared_inner_batches, curbatch); > + sts_end_write(hashtable->shared_outer_batches, curbatch); > + > + /* > + * Wait again, so that all workers see the new hash table and can > + * safely read from batch files from any participant because they have > + * all ended writing. > + */ > + Assert(BarrierPhase(&hashtable->shared->barrier) == > + PHJ_PHASE_RESETTING_BATCH(curbatch)); > + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_RESETTING); > + Assert(BarrierPhase(&hashtable->shared->barrier) == > + PHJ_PHASE_LOADING_BATCH(curbatch)); > + ExecHashUpdate(hashtable); > + > + /* Forget the current chunks. */ > + hashtable->current_chunk = NULL; > + return; > + } > > /* > * Release all the hash buckets and tuples acquired in the prior pass, and > @@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable) > oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); > > /* Reallocate and reinitialize the hash bucket headers. */ > - hashtable->buckets = (HashJoinTuple *) > - palloc0(nbuckets * sizeof(HashJoinTuple)); > + hashtable->buckets = (HashJoinBucketHead *) > + palloc0(nbuckets * sizeof(HashJoinBucketHead)); > > - hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple); > + hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead); > > /* Cannot be more than our previous peak; we had this size before. */ > Assert(hashtable->spaceUsed <= hashtable->spacePeak); > @@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable) > > /* Forget the chunks (the memory was freed by the context reset above). */ > hashtable->chunks = NULL; > + > + /* Rewind the shared read heads for this batch, inner and outer. */ > + if (hashtable->innerBatchFile[curbatch] != NULL) > + { > + if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET)) > + ereport(ERROR, > + (errcode_for_file_access(), > + errmsg("could not rewind hash-join temporary file: %m"))); > + } > + if (hashtable->outerBatchFile[curbatch] != NULL) > + { > + if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) > + ereport(ERROR, > + (errcode_for_file_access(), > + errmsg("could not rewind hash-join temporary file: %m"))); > + } > } > > /* > @@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable) > void > ExecHashTableResetMatchFlags(HashJoinTable hashtable) > { > + dsa_pointer chunk_shared = InvalidDsaPointer; > HashMemoryChunk chunk; > HashJoinTuple tuple; > int i; > > /* Reset all flags in the main table ... */ > - chunk = hashtable->chunks; > + if (HashJoinTableIsShared(hashtable)) > + { > + /* This only runs in the leader during rescan initialization. */ > + Assert(!IsParallelWorker()); > + hashtable->shared->chunk_work_queue = hashtable->shared->chunks; > + chunk = pop_chunk_queue(hashtable, &chunk_shared); > + } > + else > + chunk = hashtable->chunks; > > Hm - doesn't pop_chunk_queue empty the work queue? Well first it puts the main chunks onto the work queue, and then it pops them off one by one clearing flags until there is nothing left on the work queue. But this is only running in one backend. It's not very exciting. Do you see a bug here? > +/* > + * Load a tuple into shared dense storage, like 'load_private_tuple'. This > + * version is for shared hash tables. > + */ > +static HashJoinTuple > +load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple, > + dsa_pointer *shared, bool respect_work_mem) > +{ > > Hm. Are there issues with "blessed" records being stored in shared > memory? I seem to recall you talking about it, but I see nothing > addressing the issue here? (later) Ah, I see - you just prohibit > paralleism in that case - might be worth pointing to. Note added. I had difficulty testing that. I couldn't create anonymous ROW(...) values without the project moving above the hash table. Andrew Gierth showed me a way to prevent that with OFFSET 0 but that disabled parallelism. I tested that code by writing extra test code to dump the output of tlist_references_transient_type() on the tlists of various test paths not in a parallel query. Ideas welcome, as I feel like this belongs in a regression test. > + /* Check if some other participant has increased nbatch. */ > + if (hashtable->shared->nbatch > hashtable->nbatch) > + { > + Assert(respect_work_mem); > + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch); > + } > + > + /* Check if we need to help shrinking. */ > + if (hashtable->shared->shrink_needed && respect_work_mem) > + { > + hashtable->current_chunk = NULL; > + LWLockRelease(&hashtable->shared->chunk_lock); > + return NULL; > + } > + > + /* Oversized tuples get their own chunk. */ > + if (size > HASH_CHUNK_THRESHOLD) > + chunk_size = size + HASH_CHUNK_HEADER_SIZE; > + else > + chunk_size = HASH_CHUNK_SIZE; > + > + /* If appropriate, check if work_mem would be exceeded by a new chunk. */ > + if (respect_work_mem && > + hashtable->shared->grow_enabled && > + hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP && > + (hashtable->shared->size + > + chunk_size) > (work_mem * 1024L * > + hashtable->shared->planned_participants)) > + { > + /* > + * It would be exceeded. Let's increase the number of batches, so we > + * can try to shrink the hash table. > + */ > + hashtable->shared->nbatch *= 2; > + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch); > + hashtable->shared->chunk_work_queue = hashtable->shared->chunks; > + hashtable->shared->chunks = InvalidDsaPointer; > + hashtable->shared->shrink_needed = true; > + hashtable->current_chunk = NULL; > + LWLockRelease(&hashtable->shared->chunk_lock); > + > + /* The caller needs to shrink the hash table. */ > + return NULL; > + } > > Hm - we could end up calling ExecHashIncreaseNumBatches twice here? > Probably harmless. Yes. In the code higher up we could observe that someone else has increased the number of batches: here we are just updating our local hashtable->nbatch. Then further down we could decide that it needs to be done again because we work out that this allocation will push us over the work_mem limit. Really that function just *sets* the number of batches. It's really the code beginning hashtable->shared->nbatch *= 2 that is really increasing the number of batches and setting up the state for all participants to shrink the hash table and free up some memory. > > /* ---------------------------------------------------------------- > * ExecHashJoin > @@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node) > /* no chance to not build the hash table */ > node->hj_FirstOuterTupleSlot = NULL; > } > + else if (hashNode->shared_table_data != NULL) > + { > + /* > + * The empty-outer optimization is not implemented for > + * shared hash tables yet. > + */ > + node->hj_FirstOuterTupleSlot = NULL; > > Hm, why is this checking for the shared-ness of the join in a different > manner? The usual manner is HashJoinTableIsShare(hashtable) but you see Assert(hashtable == NULL) a few lines earlier; this is the HJ_BUILD_HASHTABLE state where it hasn't been constructed yet. When ExecHashTableCreate (a bit further down) constructs it it'll assign hashtable->shared = state->shared_table_data (to point to a bit of DSM memory). The reason the usual test is based on the HashJoinTable pointer usually called 'hashtable' is because that is passed around almost everywhere so it's convenient to use that. > + if (HashJoinTableIsShared(hashtable)) > + { > + /* > + * An important optimization: if this is a > + * single-batch join and not an outer join, there is > + * no reason to synchronize again when we've finished > + * probing. > + */ > + Assert(BarrierPhase(&hashtable->shared->barrier) == > + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); > + if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node)) > + return NULL; /* end of join */ > + > + /* > + * Check if we are a leader that can't go further than > + * probing the first batch, to avoid risk of deadlock > + * against workers. > + */ > + if (!LeaderGateCanContinue(&hashtable->shared->leader_gate)) > + { > + /* > + * Other backends will need to handle all future > + * batches written by me. We don't detach until > + * after we've finished writing to all batches so > + * that they are flushed, otherwise another > + * participant might try to read them too soon. > + */ > + sts_end_write_all_partitions(hashNode->shared_inner_batches); > + sts_end_write_all_partitions(hashNode->shared_outer_batches); > + BarrierDetach(&hashtable->shared->barrier); > + hashtable->detached_early = true; > + return NULL; > + } > + > + /* > + * We can't start searching for unmatched tuples until > + * all participants have finished probing, so we > + * synchronize here. > + */ > + Assert(BarrierPhase(&hashtable->shared->barrier) == > + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); > + if (BarrierWait(&hashtable->shared->barrier, > + WAIT_EVENT_HASHJOIN_PROBING)) > + { > + /* Serial phase: prepare for unmatched. */ > + if (HJ_FILL_INNER(node)) > + { > + hashtable->shared->chunk_work_queue = > + hashtable->shared->chunks; > + hashtable->shared->chunks = InvalidDsaPointer; > + } > + } > > Couldn't we skip that if this isn't an outer join? Not sure if the > complication would be worth it... Yes, well we don't even get this far in the very common case of a single batch inner join (see note above that about an "important optimization"). If it's outer you need this, and if there are multiple batches it hardly matters if you have to go through this extra step. But you're right that there are a few missed opportunities here and there. > +void > +ExecShutdownHashJoin(HashJoinState *node) > +{ > + /* > + * By the time ExecEndHashJoin runs in a work, shared memory has been > > s/work/worker/ Fixed. > + * destroyed. So this is our last chance to do any shared memory cleanup. > + */ > + if (node->hj_HashTable) > + ExecHashTableDetach(node->hj_HashTable); > +} > > + There is no extra charge > + * for probing the hash table for outer path row, on the basis that > + * read-only access to a shared hash table shouldn't be any more > + * expensive. > + */ > > Hm, that's debatable. !shared will mostly be on the local numa node, > shared probably not. Agreed, NUMA surely changes the situation for probing. I wonder if it deserves a separate GUC. I'm actually quite hesitant to try to model things like that because it seems like a can of worms. I will try to come up with some numbers backed up with data though. Watch this space. > * Get hash table size that executor would use for inner relation. > * > + * Shared hash tables are allowed to use the work_mem of all participants > + * combined to make up for the fact that there is only one copy shared by > + * all. > > Hm. I don't quite understand that reasoning. Our model for memory usage limits is that every instance of an executor node is allowed to allocate up to work_mem. If I run a parallel hash join in 9.6 with 3 workers and I have set work_mem to 10MB, then the system will attempt to stay under 10MB in each participant, using up to 40MB across the 4 processes. The goal of Parallel Shared Hash is to divide the work of building the hash table up over the 4 backends, and combine the work_mem of the 4 backends to create a shared hash table. The total amount of memory used is the same, but we make much better use of it. Make sense? > * XXX for the moment, always assume that skew optimization will be > * performed. As long as SKEW_WORK_MEM_PERCENT is small, it's not worth > * trying to determine that for sure. > > If we don't do skew for parallelism, should we skip that bit? I am looking into the skew optimisation. Will report back on that soon, and also try to get some data relevant to costing. -- Thomas Munro http://www.enterprisedb.com
Attachment
Hi Thomas, On 2017-03-31 17:53:12 +1300, Thomas Munro wrote: > Thanks very much to Rafia for testing, and to Andres for his copious > review feedback. Here's a new version. Changes: I've not looked at that aspect, but one thing I think would be good is to first add patch that increases coverage of nodeHash[join].c to nearly 100%. There's currently significant bits of nodeHash.c that aren't covered (skew optimization, large tuples). https://coverage.postgresql.org/src/backend/executor/nodeHash.c.gcov.html https://coverage.postgresql.org/src/backend/executor/nodeHashjoin.c.gcov.html - Andres
Hi, On 2017-03-31 17:53:12 +1300, Thomas Munro wrote: > Thanks very much to Rafia for testing, and to Andres for his copious > review feedback. Here's a new version. Changes: I unfortunately think it's too late to get this into v10. There's still heavy development going on, several pieces changed quite noticeably since the start of the CF and there's still features missing. Hence I think this unfortunately has to be pushed - as much as I'd have liked to have this in 10. Do you agree? Regards, Andres
On Tue, Apr 4, 2017 at 9:11 AM, Andres Freund <andres@anarazel.de> wrote: > Hi, > > On 2017-03-31 17:53:12 +1300, Thomas Munro wrote: >> Thanks very much to Rafia for testing, and to Andres for his copious >> review feedback. Here's a new version. Changes: > > I unfortunately think it's too late to get this into v10. There's still > heavy development going on, several pieces changed quite noticeably > since the start of the CF and there's still features missing. Hence I > think this unfortunately has to be pushed - as much as I'd have liked to > have this in 10. > > Do you agree? Agreed. Thank you very much Andres, Ashutosh, Peter, Rafia and Robert for all the review, testing and discussion so far. -- Thomas Munro http://www.enterprisedb.com
I got errors of patching on CentOS 7:
bash-4.2$ grep Hunk *.log | grep FAILED
0005-hj-leader-gate-v11.patch.log:Hunk #1 FAILED at 14.
0010-hj-parallel-v11.patch.log:Hunk #2 FAILED at 2850.
0010-hj-parallel-v11.patch.log:Hunk #1 FAILED at 21.
0010-hj-parallel-v11.patch.log:Hunk #3 FAILED at 622.
0010-hj-parallel-v11.patch.log:Hunk #6 FAILED at 687.
0010-hj-parallel-v11.patch.log:Hunk #1 FAILED at 21.
0010-hj-parallel-v11.patch.log:Hunk #3 FAILED at 153.What is wrong? The sources were clean:
bash-4.2$ git status
# On branch master
nothing to commit, working directory clean
I was patching by the command:
patch -b -i ../.patches/parallel-shared-hash-v11/0001-hj-refactor-memory-accounting-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0001-hj-refactor-memory-accounting-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0002-hj-refactor-batch-increases-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0002-hj-refactor-batch-increases-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0003-hj-refactor-unmatched-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0003-hj-refactor-unmatched-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0004-hj-barrier-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0004-hj-barrier-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0005-hj-leader-gate-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0005-hj-leader-gate-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0006-hj-let-node-have-seg-in-worker-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0006-hj-let-node-have-seg-in-worker-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0007-hj-remove-buf-file-is-temp-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0007-hj-remove-buf-file-is-temp-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0008-hj-buf-file-set-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0008-hj-buf-file-set-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0009-hj-shared-tuplestore-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0009-hj-shared-tuplestore-v11.patch.log
patch -b -i ../.patches/parallel-shared-hash-v11/0010-hj-parallel-v11.patch -p1 --verbose > ../.patches/parallel-shared-hash-v11/0010-hj-parallel-v11.patch.logBest Regards,
Oleg Golovanov
Moscow, Russia
Вторник, 4 апреля 2017, 0:28 +03:00 от Thomas Munro <thomas.munro@enterprisedb.com>:
> Hi,
>
> On 2017-03-31 17:53:12 +1300, Thomas Munro wrote:
>> Thanks very much to Rafia for testing, and to Andres for his copious
>> review feedback. Here's a new version. Changes:
>
> I unfortunately think it's too late to get this into v10. There's still
> heavy development going on, several pieces changed quite noticeably
> since the start of the CF and there's still features missing. Hence I
> think this unfortunately has to be pushed - as much as I'd have liked to
> have this in 10.
>
> Do you agree?
Agreed.
Thank you very much Andres, Ashutosh, Peter, Rafia and Robert for all
the review, testing and discussion so far.
--
Thomas Munro
http://www.enterprisedb.com
On Thu, Apr 13, 2017 at 10:04 PM, Oleg Golovanov <rentech@mail.ru> wrote: > bash-4.2$ grep Hunk *.log | grep FAILED > 0005-hj-leader-gate-v11.patch.log:Hunk #1 FAILED at 14. > 0010-hj-parallel-v11.patch.log:Hunk #2 FAILED at 2850. > 0010-hj-parallel-v11.patch.log:Hunk #1 FAILED at 21. > 0010-hj-parallel-v11.patch.log:Hunk #3 FAILED at 622. > 0010-hj-parallel-v11.patch.log:Hunk #6 FAILED at 687. > 0010-hj-parallel-v11.patch.log:Hunk #1 FAILED at 21. > 0010-hj-parallel-v11.patch.log:Hunk #3 FAILED at 153. Hi Oleg Thanks for looking at this. It conflicted with commit 9c7f5229. Here is a rebased patch set. This version also removes some code for dealing with transient record types which didn't work out. I'm trying to deal with that problem separately[1] and in a general way so that the parallel hash join patch doesn't have to deal with it at all. [1] https://www.postgresql.org/message-id/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
Thanks for rebased patch set v12. Currently I try to use this patch on my new test site and get following:
Hmm... The next patch looks like a unified diff to me...
The text leading up to this was:
--------------------------
|diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
|index bdf15621c83..e9db8880161 100644
|--- a/src/include/access/parallel.h
|+++ b/src/include/access/parallel.h
--------------------------
patching file src/include/access/parallel.h
Using Plan A...
Hunk #1 FAILED at 58.
1 out of 1 hunk FAILED -- saving rejects to file src/include/access/parallel.h.rej
Can you actualize your patch set? The error got from 0010-hj-parallel-v12.patch.
Best Regards,
Oleg Golovanov
Moscow, Russia
Четверг, 13 апреля 2017, 13:49 +03:00 от Thomas Munro <thomas.munro@enterprisedb.com>:
> bash-4.2$ grep Hunk *.log | grep FAILED
> 0005-hj-leader-gate-v11.patch.log:Hunk #1 FAILED at 14.
> 0010-hj-parallel-v11.patch.log:Hunk #2 FAILED at 2850.
> 0010-hj-parallel-v11.patch.log:Hunk #1 FAILED at 21.
> 0010-hj-parallel-v11.patch.log:Hunk #3 FAILED at 622.
> 0010-hj-parallel-v11.patch.log:Hunk #6 FAILED at 687.
> 0010-hj-parallel-v11.patch.log:Hunk #1 FAILED at 21.
> 0010-hj-parallel-v11.patch.log:Hunk #3 FAILED at 153.
Hi Oleg
Thanks for looking at this. It conflicted with commit 9c7f5229. Here
is a rebased patch set.
This version also removes some code for dealing with transient record
types which didn't work out. I'm trying to deal with that problem
separately[1] and in a general way so that the parallel hash join
patch doesn't have to deal with it at all.
[1] https://www.postgresql.org/message-id/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
--
Thomas Munro
http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Apr 27, 2017 at 5:13 AM, Oleg Golovanov <rentech@mail.ru> wrote: > Can you actualize your patch set? The error got from > 0010-hj-parallel-v12.patch. I really should get around to setting up a cron job to tell me about that. Here's a rebased version. The things currently on my list for this patch are: 1. Implement the skew optimisation. 2. Consider Andres's suggestion of splitting MultiExecHash into two functions, serial and parallel version, rather than having all those conditional blocks in there. 3. Figure out whether the shared BufFile stuff I propose would work well for Peter Geoghegan's parallel tuple sort patch, by trying it (I've made a start, more soon). 4. Figure out how the costing model needs to be tweaked, probably based on experimentation. I'm taking a short break to work on other things right now but will post a version with those changes soon. -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Thu, Apr 27, 2017 at 11:03 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Thu, Apr 27, 2017 at 5:13 AM, Oleg Golovanov <rentech@mail.ru> wrote: >> Can you actualize your patch set? The error got from >> 0010-hj-parallel-v12.patch. > > I really should get around to setting up a cron job to tell me about > that. Here's a rebased version. Rebased. -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Mon, May 22, 2017 at 6:39 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Thu, Apr 27, 2017 at 11:03 AM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> On Thu, Apr 27, 2017 at 5:13 AM, Oleg Golovanov <rentech@mail.ru> wrote: >>> Can you actualize your patch set? The error got from >>> 0010-hj-parallel-v12.patch. >> >> I really should get around to setting up a cron job to tell me about >> that. Here's a rebased version. > > Rebased. Rebased for the recent re-indent and shm_toc API change; no functional changes in this version. (I have a new patch set in the pipeline adding the skew optimisation and some other things, more on that soon.) -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Wed, Jun 28, 2017 at 9:58 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > Rebased for the recent re-indent and shm_toc API change; no functional > changes in this version. > > (I have a new patch set in the pipeline adding the skew optimisation > and some other things, more on that soon.) This patch does not apply. And the thread has stalled for three months now but I cannot see a review for what has been submitted. I am moving it to next CF with waiting on author. Please provide a rebased version. If there are other threads on this topic, it would be nice to link them to the existing CF entry https://commitfest.postgresql.org/15/871/.. -- Michael
On Thu, Nov 30, 2017 at 2:20 PM, Michael Paquier <michael.paquier@gmail.com> wrote: > This patch does not apply. And the thread has stalled for three months > now but I cannot see a review for what has been submitted. I am moving > it to next CF with waiting on author. Please provide a rebased > version. If there are other threads on this topic, it would be nice to > link them to the existing CF entry > https://commitfest.postgresql.org/15/871/.. Thanks. There is in fact a second thread with updated patches and current discussion, and it is listed in the CF entry. It would be nice if the CF could show more clearly which thread is 'active' (for the benefit of humans and also robots), and list any others as archived/old/history. -- Thomas Munro http://www.enterprisedb.com