Thread: WIP: Barriers
Hi hackers, I would like to propose "barriers" for Postgres processes. A barrier is a very simple mechanism for coordinating parallel computation, as found in many threading libraries. First, you initialise a Barrier object somewhere in shared memory, most likely in the DSM segment used by parallel query, by calling BarrierInit(&barrier, nworkers). Then workers can call BarrierWait(&barrier) when they want to block until all workers arrive at the barrier. When the final worker arrives, BarrierWait returns in all workers, releasing them to continue their work. One arbitrary worker receives a different return value as a way of "electing" it to perform serial phases of computation. For parallel phases of computation, the return value can be ignored. For example, there may be preparation, merging, or post-processing phases which must be done by just one worker, interspersed with phases where all workers do something. My use case for this is coordinating the phases of parallel hash joins, but I strongly suspect there are other cases. Parallel sort springs to mind, which is why I wanted to post this separately and earlier than my larger patch series, to get feedback from people working on other parallel features. A problem that I'm still grappling with is how to deal with workers that fail to launch. What I'm proposing so far is based on static worker sets, where you can only give the number of workers at initialisation time, just like pthread_barrier_init. Some other libraries allow for adjustable worker sets, and I'm wondering if a parallel leader might need to be able to adjust the barrier when it hears of a worker not starting. More on that soon. Please see the attached WIP patch. I had an earlier version with its own waitlists and signalling machinery etc, but I've now rebased it to depend on Robert Haas's proposed condition variables, making this code much shorter and sweeter. So it depends on his condition-variable-vX.patch[1], which in turn depends on my lwlocks-in-dsm-vX.patch[2] (for proclist). When Michaël Paquier's work on naming wait points[3] lands, I plan to include event IDs as an extra argument to BarrierWait which will be passed though so as to show up in pg_stat_activity. Then you'll be able to see where workers are waiting for each other! For now I didn't want to tangle this up with yet another patch. I thought about using a different name to avoid colliding with barrier.h and overloading the term: there are of course also compiler barriers and memory barriers. But then I realised that that header was basically vacant real estate, and 'barrier' is the super-well established standard term for this parallel computing primitive. I'd be grateful for any thoughts, feedback, flames etc. [1] https://www.postgresql.org/message-id/flat/CA%2BTgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr%3DC56Xng%40mail.gmail.com [2] https://www.postgresql.org/message-id/flat/CAEepm%3D0Vvr9zgwHt67RwuTfwMEby1GiGptBk3xFPDbbgEtZgMg%40mail.gmail.com [3] https://www.postgresql.org/message-id/flat/CAB7nPqTGhFOUHag1eJrvsKn8-E5fpqvhM7aL0tAfsDzjQG_YKQ@mail.gmail.com -- Thomas Munro http://www.enterprisedb.com
Attachment
Hi Thomas, Barriers are really very simple and convenient mechanism for process synchronization. But it is actually a special case of semaphores: having semaphore primitive it is trivial to implement a barrier. We have semaphores in Postgres, but ... them can not be used by extensions: there is fixed number of semaphores allocatedbased on maximal number of connections and there is no mechanism for requesting additional semaphores. Rober has recently proposed conditional variables, which are also very useful. Right now we have spinlocks, LW-locks andlatches. From my point of view it is not enough. While creating various extensions for Postgres I always fill lack of such synchronizationprimitive as events (condition variables) and semaphores. Events and semaphores are similar and it is possibleto implement any of them based on another. But from user's point of view them have different semantic and use cases,so it is better to provide both of them. I wonder if we should provide system-abstraction-layer (SAL) for Postgres, where we can isolate all system dependent codeand which will be available for vore developers as well as for developers of extensions? The obvious candidates forSAL are: 1. Synchronization primitives (locks, events, semaphores, mutexes, spinlocks, latches, barriers) 2. Shared memory 3. File access 4. Network sockets 5. Process control (fork, ...) Certainly it requires a lot of refactoring but will make Postgres code much more elegant, easer to read and maintain. Also it is not necessary to do all this changes in one step: here we do not need atomic transactions:) We can start for example with synchronization primitives, as far as in any case a lot of changes are proposed here. Parallel execution is one of the most promising approach to improve Postgres performance. I do not mean just parallel executionof single query. Parallel vacuum, parallel index creation, parallel sort, ... And to implement all this stuff we definitely need convenientand efficient synchronization primitives. The set of such primitives can be discussed. IMHO it should include RW-locks (current LW-locks), mutexes (spinlock + somemechanism to wait), events (condition variables), semaphores and barriers (based on semaphores). Latches can be leftfor backward compatibility or be replaced with events. I wonder if somebody has measured how much times latches (signal+socket) are slower then posix semaphores or conditionalvariables? On Aug 14, 2016, at 2:18 AM, Thomas Munro wrote: > Hi hackers, > > I would like to propose "barriers" for Postgres processes. A barrier > is a very simple mechanism for coordinating parallel computation, as > found in many threading libraries. > > First, you initialise a Barrier object somewhere in shared memory, > most likely in the DSM segment used by parallel query, by calling > BarrierInit(&barrier, nworkers). Then workers can call > BarrierWait(&barrier) when they want to block until all workers arrive > at the barrier. When the final worker arrives, BarrierWait returns in > all workers, releasing them to continue their work. One arbitrary > worker receives a different return value as a way of "electing" it to > perform serial phases of computation. For parallel phases of > computation, the return value can be ignored. For example, there may > be preparation, merging, or post-processing phases which must be done > by just one worker, interspersed with phases where all workers do > something. > > My use case for this is coordinating the phases of parallel hash > joins, but I strongly suspect there are other cases. Parallel sort > springs to mind, which is why I wanted to post this separately and > earlier than my larger patch series, to get feedback from people > working on other parallel features. > > A problem that I'm still grappling with is how to deal with workers > that fail to launch. What I'm proposing so far is based on static > worker sets, where you can only give the number of workers at > initialisation time, just like pthread_barrier_init. Some other > libraries allow for adjustable worker sets, and I'm wondering if a > parallel leader might need to be able to adjust the barrier when it > hears of a worker not starting. More on that soon. > > Please see the attached WIP patch. I had an earlier version with its > own waitlists and signalling machinery etc, but I've now rebased it to > depend on Robert Haas's proposed condition variables, making this code > much shorter and sweeter. So it depends on his > condition-variable-vX.patch[1], which in turn depends on my > lwlocks-in-dsm-vX.patch[2] (for proclist). > > When Michaël Paquier's work on naming wait points[3] lands, I plan to > include event IDs as an extra argument to BarrierWait which will be > passed though so as to show up in pg_stat_activity. Then you'll be > able to see where workers are waiting for each other! For now I > didn't want to tangle this up with yet another patch. > > I thought about using a different name to avoid colliding with > barrier.h and overloading the term: there are of course also compiler > barriers and memory barriers. But then I realised that that header > was basically vacant real estate, and 'barrier' is the super-well > established standard term for this parallel computing primitive. > > I'd be grateful for any thoughts, feedback, flames etc. > > [1] https://www.postgresql.org/message-id/flat/CA%2BTgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr%3DC56Xng%40mail.gmail.com > [2] https://www.postgresql.org/message-id/flat/CAEepm%3D0Vvr9zgwHt67RwuTfwMEby1GiGptBk3xFPDbbgEtZgMg%40mail.gmail.com > [3] https://www.postgresql.org/message-id/flat/CAB7nPqTGhFOUHag1eJrvsKn8-E5fpqvhM7aL0tAfsDzjQG_YKQ@mail.gmail.com > > -- > Thomas Munro > http://www.enterprisedb.com > <barrier-v1.patch> > -- > Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) > To make changes to your subscription: > http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Aug 14, 2016 at 6:54 PM, konstantin knizhnik <k.knizhnik@postgrespro.ru> wrote: > Barriers are really very simple and convenient mechanism for process synchronization. > But it is actually a special case of semaphores: having semaphore primitive it is trivial to implement a barrier. > We have semaphores in Postgres, but ... them can not be used by extensions: there is fixed number of semaphores allocatedbased on maximal number of connections and there is no mechanism for requesting additional semaphores. Probably because they are kernel objects requiring extra resource management. I'm hoping for something that can be created dynamically in DSM segments with no cleanup, and that aspect of semaphores is problematic. > [...] I wonder if somebody has measured how much times latches (signal+socket) are slower then posix semaphores or conditionalvariables? I'm not sure if it makes sense for us to use POSIX conditional variables: they require using POSIX mutexes, and we're pretty heavily invested in the use of lwlocks that are hooked into our error handling system, and spinlocks. I'd be curious to know if you can make a better barrier with semaphores. I've attached a little test module which can be used to measure the time for N processes to synchronise at M barrier wait points. You can run with SELECT barrier_test(<nworkers>, <nbarriers>, <implementation>), where implementation 0 uses the barrier patch I posted and you can add another implementation as 1. This patch requires lwlocks-in-dsm-v3.patch, condition-variable-v2.patch, barrier-v1.patch, in that order. This implementation is using a spinlock for the arrival counter, and signals (via Robert's condition variables and latches) for waking up peer processes when the counter reaches the target. I realise that using signals for this sort of thing is a bit unusual outside the Postgres universe, but won't a semaphore-based implementation require just as many system calls, context switches and scheduling operations? -- Thomas Munro http://www.enterprisedb.com
Attachment
On Sat, Aug 13, 2016 at 7:18 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > I would like to propose "barriers" for Postgres processes. A barrier > is a very simple mechanism for coordinating parallel computation, as > found in many threading libraries. > > First, you initialise a Barrier object somewhere in shared memory, > most likely in the DSM segment used by parallel query, by calling > BarrierInit(&barrier, nworkers). Then workers can call > BarrierWait(&barrier) when they want to block until all workers arrive > at the barrier. When the final worker arrives, BarrierWait returns in > all workers, releasing them to continue their work. One arbitrary > worker receives a different return value as a way of "electing" it to > perform serial phases of computation. For parallel phases of > computation, the return value can be ignored. For example, there may > be preparation, merging, or post-processing phases which must be done > by just one worker, interspersed with phases where all workers do > something. > > My use case for this is coordinating the phases of parallel hash > joins, but I strongly suspect there are other cases. Parallel sort > springs to mind, which is why I wanted to post this separately and > earlier than my larger patch series, to get feedback from people > working on other parallel features. I was thinking about this over the weekend and I started to wonder whether this is really going to work well for hash joins. For example, suppose that 6GB of work_mem is available and the projected size of the hash table is 8GB. Clearly, we're going to need 2 batches, but, if our estimates are accurate and the tuples split evenly between batches, each batch will be only 4GB! That means that we can build up to 2GB of the hash table for the next batch before we've finished with the hash table for the previous batch. It seems like a really good idea to try to take advantage of that as much as possible. The simple version of this is that when a worker gets done with its own probe phase for batch X, it can immediately start building the hash table for phase X+1, stopping if it fills up the unused portion of work_mem before the old hash table goes away. Of course, there are some tricky issues with reading tapes that were originally created by other backends, but if I understand correctly, Peter Geoghegan has already done some work on that problem, and it seems like something we can eventually solve, even if not in the first version. The more complicated version of this is that we might want to delegate one or more workers to start building as much of the next-batch hash table as will fit instead of assisting with the current probe phase. Once work_mem is full, they join the probe phase and continue until it's done. Again, tape management is an issue. But you can see that if you can make this work, in this example, you can reduce the enforced pause between batches by about 50%; half the work is already done by the time the old hash table goes away. I bet that has a chance of being fairly significant, especially for hash joins that have tons of batches. I once saw a 64-batch hash join outperform a nested loop with inner index scan! Anyway, my point here is that I'm not sure whether the barrier mechanic is going to work well for computations with overlapping phases, and I suspect that overlapping phases is going to be an important technique, so we should make sure not to settle into a synchronization model that makes it hard. > A problem that I'm still grappling with is how to deal with workers > that fail to launch. What I'm proposing so far is based on static > worker sets, where you can only give the number of workers at > initialisation time, just like pthread_barrier_init. Some other > libraries allow for adjustable worker sets, and I'm wondering if a > parallel leader might need to be able to adjust the barrier when it > hears of a worker not starting. More on that soon. I think tying this into the number of workers for the parallel context in general is going to get you into trouble. For example, suppose that we have an Append plan and beneath that we have two children of each of which is a Hash Join. Right now, Append is dumb, so it will blindly throw all of the workers at the first Hash Join and then, as they emerge, it will throw them at the second one. However, we've had previous discussions which I'm too lazy to look up right now about making a Parallel Append that would split the workers between the two hash joins; if one of them finished, then those workers could go join the other hash join in medias res. Now, in this world, it's clearly very bad if each hash join waits for "all of the workers" to finish a given phase before beginning the next phase. In fact, it's probably going to result in both hash joins hanging, followed by everybody waiting for each other forever. The actual condition that must be verified is that there are no workers which are going to keep trying to probe the old hash table after we blow it up. In other words, we need to verify that every outer tuple for the current batch has been joined. I'm not sure how tight we want to make the accounting, but consider the following plan: Hash Join -> Parallel Seq Scan on a -> Hash -> Seq Scan on b Whenever a worker first pulls a tuple from a, it claims an entire page of tuples from the scan. A hazard then exists until that backend has pulled every tuple from that page and joined them all. There is then no hazard - for that backend - until it pulls the first tuple from the next page. A sort of dumb way of handling all this is to assume that once a worker joins the hash join, it won't go off and do anything else until the hash join is done. Under that assumption, you just need some sort of BarrierAttach() operation; workers that have never attached the barrier aren't participating in the hash join at all and so they are irrelevant - and now you know how many workers you need to await, because you can keep a count how many have attached. Perhaps you simply turn away any workers that arrive after batch 0 is complete. That approach is not entirely satisfying, though. As discussed on the thread about asynchronous and vectorized execution, it's desirable that, when a particular branch of a parallel query can't use any more workers - e.g. because they are all waiting for the next phase to begin - those workers can leave and go do something else. Contrariwise, if some workers are busy with another branch of the query tree and they finish all the work over there, it's desirable for those workers to be able to come join our branch of the query tree to help out. So it seems like it would be nicer here to be precise about the bookkeeping: (1) track the number of workers that actually have a hazard for this portion of the query tree right now and (2) be prepared for a future day in which we will wish to allow a worker which has no such active hazard to depart the query tree for a time or indefinitely. The fly in the ointment, of course, is that the existence of the hazard depends on state we don't readily have access to at the point where we want to make the decision... and I don't have a good idea how to solve that problem. We probably want to do something as simple as possible for now, but at the same try not to make future improvements along these lines any more difficult than necessary. > I thought about using a different name to avoid colliding with > barrier.h and overloading the term: there are of course also compiler > barriers and memory barriers. But then I realised that that header > was basically vacant real estate, and 'barrier' is the super-well > established standard term for this parallel computing primitive. If we're going to remove barrier.h, I think that should be a separate commit from creating a new barrier.h. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On 15.08.2016 15:42, Thomas Munro wrote: > This implementation is using a spinlock for the arrival counter, and > signals (via Robert's condition variables and latches) for waking up > peer processes when the counter reaches the target. I realise that > using signals for this sort of thing is a bit unusual outside the > Postgres universe, but won't a semaphore-based implementation require > just as many system calls, context switches and scheduling operations? Yes, you are right. I never expected that this combination of signal+local socket+select can provide performance comparable with pthread_cond_t. I have implemented simple test where two background workers are emulating request-response round-trip using latches and pthread primitives. Result (average round-trip time) was 7.49 microseconds for Postgres latches vs. 4.59 microseconds for pthread_cond_timedwait. #define N_ROUNDTRIPS 1000000 #define WAIT_LATCH_TIMEOUT 60000 static void PongLatch(Datum arg) { int i; timestamp_t start; int result; BackgroundWorkerUnblockSignals(); Mtm->pong = MyProc->pgprocno; ResetLatch(&MyProc->procLatch); MtmSleep(1000000); Assert(Mtm->ping); for (i = 0; i <= N_ROUNDTRIPS; i++) { result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, WAIT_LATCH_TIMEOUT); Assert(result & WL_LATCH_SET); ResetLatch(&MyProc->procLatch); SetLatch(&ProcGlobal->allProcs[Mtm->ping].procLatch); if (i == 0) { start = MtmGetSystemTime(); } } fprintf(stderr, "Average roundrip time: %f microsconds\n", (double)(MtmGetSystemTime() - start) / N_ROUNDTRIPS); } static void PingLatch(Datum arg) { int i; timestamp_t start; int result; BackgroundWorkerUnblockSignals(); Mtm->ping = MyProc->pgprocno; ResetLatch(&MyProc->procLatch); MtmSleep(1000000); Assert(Mtm->pong); for (i = 0; i <= N_ROUNDTRIPS; i++) { SetLatch(&ProcGlobal->allProcs[Mtm->pong].procLatch); result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, WAIT_LATCH_TIMEOUT); Assert(result & WL_LATCH_SET); ResetLatch(&MyProc->procLatch); if (i == 0) { start = MtmGetSystemTime(); } } fprintf(stderr, "Average roundrip time: %f microseconds\n", (double)(MtmGetSystemTime() - start) / N_ROUNDTRIPS); } static BackgroundWorker Pinger = { "ping", BGWORKER_SHMEM_ACCESS,// | BGWORKER_BACKEND_DATABASE_CONNECTION, BgWorkerStart_ConsistentState, BGW_NEVER_RESTART, PingLatch }; static BackgroundWorker Ponger = { "pong", BGWORKER_SHMEM_ACCESS,// | BGWORKER_BACKEND_DATABASE_CONNECTION, BgWorkerStart_ConsistentState, BGW_NEVER_RESTART, PongLatch }; static void PingPong() { RegisterBackgroundWorker(&Pinger); RegisterBackgroundWorker(&Ponger); } -- Konstantin Knizhnik Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
On Mon, Aug 15, 2016 at 6:55 AM, Robert Haas <robertmhaas@gmail.com> wrote: > The simple version of this is that when a worker gets done with its > own probe phase for batch X, it can immediately start building the > hash table for phase X+1, stopping if it fills up the unused portion > of work_mem before the old hash table goes away. Of course, there are > some tricky issues with reading tapes that were originally created by > other backends, but if I understand correctly, Peter Geoghegan has > already done some work on that problem, and it seems like something we > can eventually solve, even if not in the first version. The tape vs. BufFile vs. fd.c file handle distinctions get *confusing*. Thomas and I have hashed this out (pun intended), but I should summarize. Currently, and without bringing parallelism into it, Hash joins have multiple BufFiles (two per batch -- innerBatchFile and outerBatchFile), which are accessed as needed. External sorts have only one BufFile, with multiple "logical tapes" within a single "tapeset" effectively owning space within the BufFile -- that space doesn't have to be contiguous, and can be reused *eagerly* within and across logical tapes in tuplesort.c's tapeset. logtape.c is a kind of block-orientated rudimentary filesystem built on top of one BufFile. The only real advantage of having the logtape.c abstraction is that moving stuff around (to sort it, when multiple passes are required) can be accomplished with minimal wasted disk space (it's eagerly reclaimed). This is less important today than it would have been in the past. Clearly, it doesn't make much sense to talk about logtape.c and anything that isn't sorting, because it is very clearly written with that purpose alone in mind. To avoid confusion, please only talk about tapes when talking about sorting. So: * tuplesort.c always talks to logtape.c, which talks to buffile.c (which talks to fd.c). * Hash joins use buffile.c directly, though (and have multiple buffiles, as already noted). Now, I might still have something that Thomas can reuse, because buffile.c was made to support "unification" of worker BufFiles in general. Thomas would be using that interface, if any. I haven't studied parallel hash join at all, but presumably the difference would be that *multiple* BufFiles would be unified, such that a concatenated/unified BufFile would be addressable within each worker, one per batch. All of this assumes that there is a natural way of unifying the various batches involved across all workers, of course. This aspect would present some complexity for Thomas, I think (comments from hashjoin.h): * It is possible to increase nbatch on the fly if the in-memory hash table* gets too big. The hash-value-to-batch computationis arranged so that this* can only cause a tuple to go into a later batch than previously thought,* never intoan earlier batch. When we increase nbatch, we rescan the hash* table and dump out any tuples that are now of a laterbatch to the correct* inner batch file. Subsequently, while reading either inner or outer batch* files, we might findtuples that no longer belong to the current batch;* if so, we just dump them out to the correct batch file. I'd be concerned about managing which backend was entitled to move tuples across batches, and so on. One thing that I haven't had to contend with is which backend "owns" which BufFile (or underlying fd.c file handles). There is no ambiguity about that for me. Owners delete the temp files on Xact end, and are the only ones entitled to write to files, and only before unification. These latter restrictions might be lifted if there was a good reason to do so. -- Peter Geoghegan
On Mon, Aug 15, 2016 at 6:55 AM, Robert Haas <robertmhaas@gmail.com> wrote: > A sort of dumb way of handling all this is to assume that once a > worker joins the hash join, it won't go off and do anything else until > the hash join is done. Under that assumption, you just need some sort > of BarrierAttach() operation; workers that have never attached the > barrier aren't participating in the hash join at all and so they are > irrelevant - and now you know how many workers you need to await, > because you can keep a count how many have attached. Perhaps you > simply turn away any workers that arrive after batch 0 is complete. Is that really so bad? In general, I don't tend to think of workers as the cost to worry about. Rather, we should be concerned about the active use of CPU cores as our major cost. -- Peter Geoghegan
On Sat, Aug 13, 2016 at 4:18 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > First, you initialise a Barrier object somewhere in shared memory, > most likely in the DSM segment used by parallel query, by calling > BarrierInit(&barrier, nworkers). Then workers can call > BarrierWait(&barrier) when they want to block until all workers arrive > at the barrier. When the final worker arrives, BarrierWait returns in > all workers, releasing them to continue their work. One arbitrary > worker receives a different return value as a way of "electing" it to > perform serial phases of computation. For parallel phases of > computation, the return value can be ignored. For example, there may > be preparation, merging, or post-processing phases which must be done > by just one worker, interspersed with phases where all workers do > something. I think that this mechanism could be quite useful for sorting with partitioning, which doesn't exist yet. What does exist is unlikely to benefit from this over and above what Robert's "condition variables" offer, because as it happens there is no need to "elect" a single worker at all. The ordering dependencies happen to be quite naturally across one leader process and one or more worker processes. I do see value in this, though. -- Peter Geoghegan
On Tue, Aug 16, 2016 at 1:55 AM, Robert Haas <robertmhaas@gmail.com> wrote: > On Sat, Aug 13, 2016 at 7:18 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> My use case for this is coordinating the phases of parallel hash >> joins, but I strongly suspect there are other cases. Parallel sort >> springs to mind, which is why I wanted to post this separately and >> earlier than my larger patch series, to get feedback from people >> working on other parallel features. > > I was thinking about this over the weekend and I started to wonder > whether this is really going to work well for hash joins. For > example, suppose that 6GB of work_mem is available and the projected > size of the hash table is 8GB. Clearly, we're going to need 2 > batches, but, if our estimates are accurate and the tuples split > evenly between batches, each batch will be only 4GB! That means that > we can build up to 2GB of the hash table for the next batch before > we've finished with the hash table for the previous batch. It seems > like a really good idea to try to take advantage of that as much as > possible. Right. Thanks for that example. A naive approach with barriers between building and probing does indeed introduce a kind of pipeline stall where workers twiddle their thumbs, despite there being unused work_mem and useful things that could be done concurrently to fill it. > The simple version of this is that when a worker gets done with its > own probe phase for batch X, it can immediately start building the > hash table for phase X+1, stopping if it fills up the unused portion > of work_mem before the old hash table goes away. Of course, there are > some tricky issues with reading tapes that were originally created by > other backends, but if I understand correctly, Peter Geoghegan has > already done some work on that problem, and it seems like something we > can eventually solve, even if not in the first version. > > The more complicated version of this is that we might want to delegate > one or more workers to start building as much of the next-batch hash > table as will fit instead of assisting with the current probe phase. > Once work_mem is full, they join the probe phase and continue until > it's done. Again, tape management is an issue. But you can see that > if you can make this work, in this example, you can reduce the > enforced pause between batches by about 50%; half the work is already > done by the time the old hash table goes away. I bet that has a > chance of being fairly significant, especially for hash joins that > have tons of batches. I once saw a 64-batch hash join outperform a > nested loop with inner index scan! > > Anyway, my point here is that I'm not sure whether the barrier > mechanic is going to work well for computations with overlapping > phases, and I suspect that overlapping phases is going to be an > important technique, so we should make sure not to settle into a > synchronization model that makes it hard. I have a draft scheme worked out to do what you called the "simple version", though not in the first version I will post (the "naive version"). I don't really want to go into all the details in this thread, not least because I'm still working on it, but I do want to point out that barriers as a synchronisation primitive are not at all incompatible with overlapping work: on the contrary, they can help coordinate it while keeping the code tractable. Rough sketch*: There will be a secondary hash table, used for preloading the next batch. As in the naive version, there is a barrier after the hash table is loaded: you can't start probing an incomplete hash table. But after probing the primary table for batch n, workers will immediately begin preloading the secondary table with tuples for batch n + 1, until either work_mem is exhausted or batch n + 1 is entirely loaded. Then will they synchronise, elect one worker to promote the secondary table to primary, synchronise again, finish loading the newly promoted primary table, and then rince and repeat: synchronise to coordinate the beginning of probing for batch n + 1... Of course there are also several other phases relating to initialization, hashing, resizing and special cases for first and last batches, as I will describe in a future thread with code. *Actual patch may not resemble illustration. >> A problem that I'm still grappling with is how to deal with workers >> that fail to launch. What I'm proposing so far is based on static >> worker sets, where you can only give the number of workers at >> initialisation time, just like pthread_barrier_init. Some other >> libraries allow for adjustable worker sets, and I'm wondering if a >> parallel leader might need to be able to adjust the barrier when it >> hears of a worker not starting. More on that soon. > > I think tying this into the number of workers for the parallel context > in general is going to get you into trouble. For example, suppose > that we have an Append plan and beneath that we have two children of > each of which is a Hash Join. Right now, Append is dumb, so it will > blindly throw all of the workers at the first Hash Join and then, as > they emerge, it will throw them at the second one. However, we've had > previous discussions which I'm too lazy to look up right now about > making a Parallel Append that would split the workers between the two > hash joins; if one of them finished, then those workers could go join > the other hash join in medias res. Yeah. I continued grappling with this and found a way forward that I am quite happy with for now. Picture an N-lane Olympic swimming pool with N identical swimmers at the starting blocks waiting for the gun to fire, with a rule that they must wait for all swimmers to reach the end before the next heat of the race begins. That's what textbook parallel computing of the type supported by pthread_barrier_t looks like, and what barrier-v1.patch provided. Postgres parallel query is not at all like that: it's more like a flash mob pool party. The referee fires the starting gun, and some number of swimmers show up some time soonish, or not, and they jump in anywhere they like; the referee (the backend executing the gather node) may get bored and decide to dive-bomb into the middle of it all too. In future, the executor may send in new swimmers at random by parachute! OK, sorry for the tortured analogy... but the point is that the size of the party may change at any time and new workers need to synchronise with the active phase. Here's a new version of barriers extended to allow for dynamic worker parties. It's something like .NET's System.Threading.Barrier, Java's java.util.concurrent.Phaser and X10's clock, so I have at least a small amount of confidence that it's not completely crackpot. This addition allows new workers to arrive at any time and get in sync with the ongoing work: for example, if the gather node feels the impulse to jump in, it will join the current activity and pitch in. There are some complexities I'm still working out: it would be nice to figure out how to remain attached to the barrier if I know for certain that the caller will call again (basically an ExecutorRun(..., ..., 0) loop, as opposed to a one-off call from a gather node). > [...] > That approach is not entirely satisfying, though. As discussed on the > thread about asynchronous and vectorized execution, it's desirable > that, when a particular branch of a parallel query can't use any more > workers - e.g. because they are all waiting for the next phase to > begin - those workers can leave and go do something else. > [...] I believe my current approach is not incompatible with those future work scheduling ideas. The key requirement seems to be the ability to detach and walk away without holding anyone else up, and reattach and help out with whatever is currently going on as appropriate, while synchronising work as appropriate. From the point of view of an executor node, there are basically two things you need to support a user space coroutine/green thread/fibre/event style scheduler as I see it: a way to yield and a way to continue from the right point, and the latter is already covered. Whatever yield-like operations are invented in future will hopefully just slot into the right places, without changing the underlying parallel hash join algorithm. >> I thought about using a different name to avoid colliding with >> barrier.h and overloading the term: there are of course also compiler >> barriers and memory barriers. But then I realised that that header >> was basically vacant real estate, and 'barrier' is the super-well >> established standard term for this parallel computing primitive. > > If we're going to remove barrier.h, I think that should be a separate > commit from creating a new barrier.h. OK. Here's a patch to remove the old header, and the v2 barrier patch which adds phases and attach/detach. As before, it depends on condition-variable-v2.patch. -- Thomas Munro http://www.enterprisedb.com
Attachment
On Thu, Aug 18, 2016 at 1:55 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Tue, Aug 16, 2016 at 1:55 AM, Robert Haas <robertmhaas@gmail.com> wrote: >> If we're going to remove barrier.h, I think that should be a separate >> commit from creating a new barrier.h. > > OK. Here's a patch to remove the old header, and the v2 barrier patch > which adds phases and attach/detach. As before, it depends on > condition-variable-v2.patch. Here's a new version which is rebased and adds support for passing wait_event through to pg_stat_activity. -- Thomas Munro http://www.enterprisedb.com
Attachment
On Tue, Nov 1, 2016 at 5:03 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Thu, Aug 18, 2016 at 1:55 PM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: >> On Tue, Aug 16, 2016 at 1:55 AM, Robert Haas <robertmhaas@gmail.com> wrote: >>> If we're going to remove barrier.h, I think that should be a separate >>> commit from creating a new barrier.h. >> >> OK. Here's a patch to remove the old header, and the v2 barrier patch >> which adds phases and attach/detach. As before, it depends on >> condition-variable-v2.patch. > > Here's a new version which is rebased and adds support for passing > wait_event through to pg_stat_activity. Here's a version updated for the new conditional variables interface which has just been committed as e8ac886c. Some comments improved. -- Thomas Munro http://www.enterprisedb.com
Attachment
On Tue, Nov 22, 2016 at 4:42 PM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > On Tue, Nov 1, 2016 at 5:03 PM, Thomas Munro >> Here's a new version which is rebased and adds support for passing >> wait_event through to pg_stat_activity. > > Here's a version updated for the new conditional variables interface > which has just been committed as e8ac886c. Some comments improved. The code here looks OK. A few thoughts: - I'm a little unsure whether it's a good idea to remove the existing barrier.h and immediately add a new barrier.h that does something totally different. It's tempting to try to find another name for these things just to avoid that. Then again, I bet there's not much non-core code that is relying on the existing barrier.h, so maybe it's OK. On the third hand, the risk of confusing developers may be non-nil even if we don't confuse any code. I think I'll go commit remove-useless-barrier-v4.patch now and see if anyone shows up to complain... - Should parallel bitmap heap scan be using this to decide who runs the subplan? It looks like it would work for what is needed in that case, and it looks like it would be simpler (and probably more correct) than what's there now. PBMState could go away completely, I think. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Wed, Nov 23, 2016 at 2:28 PM, Robert Haas <robertmhaas@gmail.com> wrote: > The code here looks OK. A few thoughts: > > - I'm a little unsure whether it's a good idea to remove the existing > barrier.h and immediately add a new barrier.h that does something > totally different. It's tempting to try to find another name for > these things just to avoid that. Then again, I bet there's not much > non-core code that is relying on the existing barrier.h, so maybe it's > OK. On the third hand, the risk of confusing developers may be > non-nil even if we don't confuse any code. I think I'll go commit > remove-useless-barrier-v4.patch now and see if anyone shows up to > complain... Here is a new version with updated copyright messages and some dtrace probes which provide an easy way to measure the time spend waiting at barriers. -- Thomas Munro http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers