Thread: WIP: Barriers

WIP: Barriers

From
Thomas Munro
Date:
Hi hackers,

I would like to propose "barriers" for Postgres processes.  A barrier
is a very simple mechanism for coordinating parallel computation, as
found in many threading libraries.

First, you initialise a Barrier object somewhere in shared memory,
most likely in the DSM segment used by parallel query, by calling
BarrierInit(&barrier, nworkers).  Then workers can call
BarrierWait(&barrier) when they want to block until all workers arrive
at the barrier.  When the final worker arrives, BarrierWait returns in
all workers, releasing them to continue their work.  One arbitrary
worker receives a different return value as a way of "electing" it to
perform serial phases of computation.  For parallel phases of
computation, the return value can be ignored.  For example, there may
be preparation, merging, or post-processing phases which must be done
by just one worker, interspersed with phases where all workers do
something.

My use case for this is coordinating the phases of parallel hash
joins, but I strongly suspect there are other cases.  Parallel sort
springs to mind, which is why I wanted to post this separately and
earlier than my larger patch series, to get feedback from people
working on other parallel features.

A problem that I'm still grappling with is how to deal with workers
that fail to launch.  What I'm proposing so far is based on static
worker sets, where you can only give the number of workers at
initialisation time, just like pthread_barrier_init.  Some other
libraries allow for adjustable worker sets, and I'm wondering if a
parallel leader might need to be able to adjust the barrier when it
hears of a worker not starting.  More on that soon.

Please see the attached WIP patch.  I had an earlier version with its
own waitlists and signalling machinery etc, but I've now rebased it to
depend on Robert Haas's proposed condition variables, making this code
much shorter and sweeter.  So it depends on his
condition-variable-vX.patch[1], which in turn depends on my
lwlocks-in-dsm-vX.patch[2] (for proclist).

When Michaël Paquier's work on naming wait points[3] lands, I plan to
include event IDs as an extra argument to BarrierWait which will be
passed though so as to show up in pg_stat_activity.  Then you'll be
able to see where workers are waiting for each other!  For now I
didn't want to tangle this up with yet another patch.

I thought about using a different name to avoid colliding with
barrier.h and overloading the term: there are of course also compiler
barriers and memory barriers.  But then I realised that that header
was basically vacant real estate, and 'barrier' is the super-well
established standard term for this parallel computing primitive.

I'd be grateful for any thoughts, feedback, flames etc.

[1] https://www.postgresql.org/message-id/flat/CA%2BTgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr%3DC56Xng%40mail.gmail.com
[2] https://www.postgresql.org/message-id/flat/CAEepm%3D0Vvr9zgwHt67RwuTfwMEby1GiGptBk3xFPDbbgEtZgMg%40mail.gmail.com
[3] https://www.postgresql.org/message-id/flat/CAB7nPqTGhFOUHag1eJrvsKn8-E5fpqvhM7aL0tAfsDzjQG_YKQ@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: WIP: Barriers

From
konstantin knizhnik
Date:
Hi Thomas,

Barriers are really very simple and convenient mechanism for process synchronization.
But it is actually a special case of semaphores: having semaphore primitive it is trivial to implement a barrier.
We have semaphores in Postgres, but ... them can not be used by extensions: there is fixed number of semaphores
allocatedbased on maximal number of connections and there is no mechanism for requesting additional semaphores.  


Rober has recently proposed conditional variables, which are also very useful. Right now we have spinlocks, LW-locks
andlatches. 
From my point of view it is not enough. While creating various extensions for Postgres I always fill lack of such
synchronizationprimitive as events (condition variables) and semaphores. Events and semaphores are similar and it is
possibleto implement any of them based on another. But from user's point of view them have different semantic and use
cases,so it is better to provide both of them. 

I wonder if we should provide system-abstraction-layer (SAL) for Postgres, where we can isolate all system dependent
codeand which will be available for vore developers as well as for developers of extensions?  The obvious candidates
forSAL are: 
1. Synchronization primitives (locks, events, semaphores, mutexes, spinlocks, latches, barriers)
2. Shared memory
3. File access
4. Network sockets
5. Process control (fork, ...)

Certainly it requires a lot of refactoring but will make Postgres code much more elegant, easer to read and maintain.
Also it is not necessary to do all this changes  in one step: here we do not need atomic transactions:)
We can start for example with synchronization primitives, as far as in any case a lot of changes are proposed here.

Parallel execution is one of the most promising approach to improve Postgres performance. I do not mean just parallel
executionof single query. 
Parallel vacuum, parallel index creation, parallel sort, ... And to implement all this stuff we definitely need
convenientand efficient synchronization primitives. 
The set of such primitives can be discussed. IMHO it should include RW-locks (current LW-locks), mutexes (spinlock +
somemechanism to wait), events (condition variables), semaphores and barriers (based on semaphores). Latches can be
leftfor backward compatibility or be replaced with events. 
I wonder if somebody has measured  how much times latches (signal+socket) are slower then posix semaphores or
conditionalvariables? 



On Aug 14, 2016, at 2:18 AM, Thomas Munro wrote:

> Hi hackers,
>
> I would like to propose "barriers" for Postgres processes.  A barrier
> is a very simple mechanism for coordinating parallel computation, as
> found in many threading libraries.
>
> First, you initialise a Barrier object somewhere in shared memory,
> most likely in the DSM segment used by parallel query, by calling
> BarrierInit(&barrier, nworkers).  Then workers can call
> BarrierWait(&barrier) when they want to block until all workers arrive
> at the barrier.  When the final worker arrives, BarrierWait returns in
> all workers, releasing them to continue their work.  One arbitrary
> worker receives a different return value as a way of "electing" it to
> perform serial phases of computation.  For parallel phases of
> computation, the return value can be ignored.  For example, there may
> be preparation, merging, or post-processing phases which must be done
> by just one worker, interspersed with phases where all workers do
> something.
>
> My use case for this is coordinating the phases of parallel hash
> joins, but I strongly suspect there are other cases.  Parallel sort
> springs to mind, which is why I wanted to post this separately and
> earlier than my larger patch series, to get feedback from people
> working on other parallel features.
>
> A problem that I'm still grappling with is how to deal with workers
> that fail to launch.  What I'm proposing so far is based on static
> worker sets, where you can only give the number of workers at
> initialisation time, just like pthread_barrier_init.  Some other
> libraries allow for adjustable worker sets, and I'm wondering if a
> parallel leader might need to be able to adjust the barrier when it
> hears of a worker not starting.  More on that soon.
>
> Please see the attached WIP patch.  I had an earlier version with its
> own waitlists and signalling machinery etc, but I've now rebased it to
> depend on Robert Haas's proposed condition variables, making this code
> much shorter and sweeter.  So it depends on his
> condition-variable-vX.patch[1], which in turn depends on my
> lwlocks-in-dsm-vX.patch[2] (for proclist).
>
> When Michaël Paquier's work on naming wait points[3] lands, I plan to
> include event IDs as an extra argument to BarrierWait which will be
> passed though so as to show up in pg_stat_activity.  Then you'll be
> able to see where workers are waiting for each other!  For now I
> didn't want to tangle this up with yet another patch.
>
> I thought about using a different name to avoid colliding with
> barrier.h and overloading the term: there are of course also compiler
> barriers and memory barriers.  But then I realised that that header
> was basically vacant real estate, and 'barrier' is the super-well
> established standard term for this parallel computing primitive.
>
> I'd be grateful for any thoughts, feedback, flames etc.
>
> [1]
https://www.postgresql.org/message-id/flat/CA%2BTgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr%3DC56Xng%40mail.gmail.com
> [2] https://www.postgresql.org/message-id/flat/CAEepm%3D0Vvr9zgwHt67RwuTfwMEby1GiGptBk3xFPDbbgEtZgMg%40mail.gmail.com
> [3] https://www.postgresql.org/message-id/flat/CAB7nPqTGhFOUHag1eJrvsKn8-E5fpqvhM7aL0tAfsDzjQG_YKQ@mail.gmail.com
>
> --
> Thomas Munro
> http://www.enterprisedb.com
> <barrier-v1.patch>
> --
> Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers




Re: WIP: Barriers

From
Thomas Munro
Date:
On Sun, Aug 14, 2016 at 6:54 PM, konstantin knizhnik
<k.knizhnik@postgrespro.ru> wrote:
> Barriers are really very simple and convenient mechanism for process synchronization.
> But it is actually a special case of semaphores: having semaphore primitive it is trivial to implement a barrier.
> We have semaphores in Postgres, but ... them can not be used by extensions: there is fixed number of semaphores
allocatedbased on maximal number of connections and there is no mechanism for requesting additional semaphores. 

Probably because they are kernel objects requiring extra resource
management.  I'm hoping for something that can be created dynamically
in DSM segments with no cleanup, and that aspect of semaphores is
problematic.

> [...]  I wonder if somebody has measured  how much times latches (signal+socket) are slower then posix semaphores or
conditionalvariables? 

I'm not sure if it makes sense for us to use POSIX conditional
variables: they require using POSIX mutexes, and we're pretty heavily
invested in the use of lwlocks that are hooked into our error handling
system, and spinlocks.

I'd be curious to know if you can make a better barrier with
semaphores.  I've attached a little test module which can be used to
measure the time for N processes to synchronise at M barrier wait
points.  You can run with SELECT barrier_test(<nworkers>, <nbarriers>,
<implementation>), where implementation 0 uses the barrier patch I
posted and you can add another implementation as 1.  This patch
requires lwlocks-in-dsm-v3.patch, condition-variable-v2.patch,
barrier-v1.patch, in that order.

This implementation is using a spinlock for the arrival counter, and
signals (via Robert's condition variables and latches) for waking up
peer processes when the counter reaches the target.  I realise that
using signals for this sort of thing is a bit unusual outside the
Postgres universe,  but won't a semaphore-based implementation require
just as many system calls, context switches and scheduling operations?

--
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: WIP: Barriers

From
Robert Haas
Date:
On Sat, Aug 13, 2016 at 7:18 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> I would like to propose "barriers" for Postgres processes.  A barrier
> is a very simple mechanism for coordinating parallel computation, as
> found in many threading libraries.
>
> First, you initialise a Barrier object somewhere in shared memory,
> most likely in the DSM segment used by parallel query, by calling
> BarrierInit(&barrier, nworkers).  Then workers can call
> BarrierWait(&barrier) when they want to block until all workers arrive
> at the barrier.  When the final worker arrives, BarrierWait returns in
> all workers, releasing them to continue their work.  One arbitrary
> worker receives a different return value as a way of "electing" it to
> perform serial phases of computation.  For parallel phases of
> computation, the return value can be ignored.  For example, there may
> be preparation, merging, or post-processing phases which must be done
> by just one worker, interspersed with phases where all workers do
> something.
>
> My use case for this is coordinating the phases of parallel hash
> joins, but I strongly suspect there are other cases.  Parallel sort
> springs to mind, which is why I wanted to post this separately and
> earlier than my larger patch series, to get feedback from people
> working on other parallel features.

I was thinking about this over the weekend and I started to wonder
whether this is really going to work well for hash joins.  For
example, suppose that 6GB of work_mem is available and the projected
size of the hash table is 8GB.  Clearly, we're going to need 2
batches, but, if our estimates are accurate and the tuples split
evenly between batches, each batch will be only 4GB!  That means that
we can build up to 2GB of the hash table for the next batch before
we've finished with the hash table for the previous batch.  It seems
like a really good idea to try to take advantage of that as much as
possible.

The simple version of this is that when a worker gets done with its
own probe phase for batch X, it can immediately start building the
hash table for phase X+1, stopping if it fills up the unused portion
of work_mem before the old hash table goes away.  Of course, there are
some tricky issues with reading tapes that were originally created by
other backends, but if I understand correctly, Peter Geoghegan has
already done some work on that problem, and it seems like something we
can eventually solve, even if not in the first version.

The more complicated version of this is that we might want to delegate
one or more workers to start building as much of the next-batch hash
table as will fit instead of assisting with the current probe phase.
Once work_mem is full, they join the probe phase and continue until
it's done.  Again, tape management is an issue.  But you can see that
if you can make this work, in this example, you can reduce the
enforced pause between batches by about 50%; half the work is already
done by the time the old hash table goes away.  I bet that has a
chance of being fairly significant, especially for hash joins that
have tons of batches.  I once saw a 64-batch hash join outperform a
nested loop with inner index scan!

Anyway, my point here is that I'm not sure whether the barrier
mechanic is going to work well for computations with overlapping
phases, and I suspect that overlapping phases is going to be an
important technique, so we should make sure not to settle into a
synchronization model that makes it hard.

> A problem that I'm still grappling with is how to deal with workers
> that fail to launch.  What I'm proposing so far is based on static
> worker sets, where you can only give the number of workers at
> initialisation time, just like pthread_barrier_init.  Some other
> libraries allow for adjustable worker sets, and I'm wondering if a
> parallel leader might need to be able to adjust the barrier when it
> hears of a worker not starting.  More on that soon.

I think tying this into the number of workers for the parallel context
in general is going to get you into trouble.  For example, suppose
that we have an Append plan and beneath that we have two children of
each of which is a Hash Join.  Right now, Append is dumb, so it will
blindly throw all of the workers at the first Hash Join and then, as
they emerge, it will throw them at the second one.  However, we've had
previous discussions which I'm too lazy to look up right now about
making a Parallel Append that would split the workers between the two
hash joins; if one of them finished, then those workers could go join
the other hash join in medias res.

Now, in this world, it's clearly very bad if each hash join waits for
"all of the workers" to finish a given phase before beginning the next
phase.  In fact, it's probably going to result in both hash joins
hanging, followed by everybody waiting for each other forever.  The
actual condition that must be verified is that there are no workers
which are going to keep trying to probe the old hash table after we
blow it up.  In other words, we need to verify that every outer tuple
for the current batch has been joined.  I'm not sure how tight we want
to make the accounting, but consider the following plan:

Hash Join
-> Parallel Seq Scan on a
-> Hash -> Seq Scan on b

Whenever a worker first pulls a tuple from a, it claims an entire page
of tuples from the scan.  A hazard then exists until that backend has
pulled every tuple from that page and joined them all.  There is then
no hazard - for that backend - until it pulls the first tuple from the
next page.

A sort of dumb way of handling all this is to assume that once a
worker joins the hash join, it won't go off and do anything else until
the hash join is done.  Under that assumption, you just need some sort
of BarrierAttach() operation; workers that have never attached the
barrier aren't participating in the hash join at all and so they are
irrelevant - and now you know how many workers you need to await,
because you can keep a count how many have attached.  Perhaps you
simply turn away any workers that arrive after batch 0 is complete.

That approach is not entirely satisfying, though.  As discussed on the
thread about asynchronous and vectorized execution, it's desirable
that, when a particular branch of a parallel query can't use any more
workers - e.g. because they are all waiting for the next phase to
begin - those workers can leave and go do something else.
Contrariwise, if some workers are busy with another branch of the
query tree and they finish all the work over there, it's desirable for
those workers to be able to come join our branch of the query tree to
help out.  So it seems like it would be nicer here to be precise about
the bookkeeping: (1) track the number of workers that actually have a
hazard for this portion of the query tree right now and (2) be
prepared for a future day in which we will wish to allow a worker
which has no such active hazard to depart the query tree for a time or
indefinitely.  The fly in the ointment, of course, is that the
existence of the hazard depends on state we don't readily have access
to at the point where we want to make the decision...  and I don't
have a good idea how to solve that problem.  We probably want to do
something as simple as possible for now, but at the same try not to
make future improvements along these lines any more difficult than
necessary.

> I thought about using a different name to avoid colliding with
> barrier.h and overloading the term: there are of course also compiler
> barriers and memory barriers.  But then I realised that that header
> was basically vacant real estate, and 'barrier' is the super-well
> established standard term for this parallel computing primitive.

If we're going to remove barrier.h, I think that should be a separate
commit from creating a new barrier.h.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: WIP: Barriers

From
Konstantin Knizhnik
Date:
On 15.08.2016 15:42, Thomas Munro wrote:
> This implementation is using a spinlock for the arrival counter, and 
> signals (via Robert's condition variables and latches) for waking up 
> peer processes when the counter reaches the target. I realise that 
> using signals for this sort of thing is a bit unusual outside the 
> Postgres universe, but won't a semaphore-based implementation require 
> just as many system calls, context switches and scheduling operations? 

Yes, you are right.
I never expected that this combination of signal+local socket+select can 
provide performance comparable with pthread_cond_t.
I have implemented simple test where two background workers are 
emulating request-response round-trip using latches and pthread primitives.
Result (average round-trip time) was 7.49 microseconds for Postgres 
latches vs. 4.59 microseconds for pthread_cond_timedwait.




#define N_ROUNDTRIPS 1000000
#define WAIT_LATCH_TIMEOUT 60000

static void PongLatch(Datum arg)
{    int i;    timestamp_t start;    int result;
    BackgroundWorkerUnblockSignals();
    Mtm->pong = MyProc->pgprocno;    ResetLatch(&MyProc->procLatch);    MtmSleep(1000000);    Assert(Mtm->ping);

    for (i = 0; i <= N_ROUNDTRIPS; i++) {        result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, 
WAIT_LATCH_TIMEOUT);        Assert(result & WL_LATCH_SET);        ResetLatch(&MyProc->procLatch);
SetLatch(&ProcGlobal->allProcs[Mtm->ping].procLatch);        if (i == 0) {               start = MtmGetSystemTime();
   }    }    fprintf(stderr, "Average roundrip time: %f microsconds\n", 
 
(double)(MtmGetSystemTime() - start) /  N_ROUNDTRIPS);
}

static void PingLatch(Datum arg)
{    int i;    timestamp_t start;    int result;
    BackgroundWorkerUnblockSignals();
    Mtm->ping = MyProc->pgprocno;    ResetLatch(&MyProc->procLatch);    MtmSleep(1000000);    Assert(Mtm->pong);
    for (i = 0; i <= N_ROUNDTRIPS; i++) {
SetLatch(&ProcGlobal->allProcs[Mtm->pong].procLatch);        result = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET|WL_TIMEOUT,
 
WAIT_LATCH_TIMEOUT);        Assert(result & WL_LATCH_SET);        ResetLatch(&MyProc->procLatch);        if (i == 0) {
            start = MtmGetSystemTime();        }    }    fprintf(stderr, "Average roundrip time: %f microseconds\n", 
 
(double)(MtmGetSystemTime() - start) /  N_ROUNDTRIPS);
}


static BackgroundWorker Pinger = {    "ping",    BGWORKER_SHMEM_ACCESS,// | BGWORKER_BACKEND_DATABASE_CONNECTION,
BgWorkerStart_ConsistentState,   BGW_NEVER_RESTART,    PingLatch
 
};

static BackgroundWorker Ponger = {    "pong",    BGWORKER_SHMEM_ACCESS,// | BGWORKER_BACKEND_DATABASE_CONNECTION,
BgWorkerStart_ConsistentState,   BGW_NEVER_RESTART,    PongLatch
 
};

static void PingPong()
{    RegisterBackgroundWorker(&Pinger);    RegisterBackgroundWorker(&Ponger);
}




-- 
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company




Re: WIP: Barriers

From
Peter Geoghegan
Date:
On Mon, Aug 15, 2016 at 6:55 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> The simple version of this is that when a worker gets done with its
> own probe phase for batch X, it can immediately start building the
> hash table for phase X+1, stopping if it fills up the unused portion
> of work_mem before the old hash table goes away.  Of course, there are
> some tricky issues with reading tapes that were originally created by
> other backends, but if I understand correctly, Peter Geoghegan has
> already done some work on that problem, and it seems like something we
> can eventually solve, even if not in the first version.

The tape vs. BufFile vs. fd.c file handle distinctions get
*confusing*. Thomas and I have hashed this out (pun intended), but I
should summarize.

Currently, and without bringing parallelism into it, Hash joins have
multiple BufFiles (two per batch -- innerBatchFile and
outerBatchFile), which are accessed as needed. External sorts have
only one BufFile, with multiple "logical tapes" within a single
"tapeset" effectively owning space within the BufFile -- that space
doesn't have to be contiguous, and can be reused *eagerly* within and
across logical tapes in tuplesort.c's tapeset. logtape.c is a kind of
block-orientated rudimentary filesystem built on top of one BufFile.
The only real advantage of having the logtape.c abstraction is that
moving stuff around (to sort it, when multiple passes are required)
can be accomplished with minimal wasted disk space (it's eagerly
reclaimed). This is less important today than it would have been in
the past.

Clearly, it doesn't make much sense to talk about logtape.c and
anything that isn't sorting, because it is very clearly written with
that purpose alone in mind. To avoid confusion, please only talk about
tapes when talking about sorting.

So:

* tuplesort.c always talks to logtape.c, which talks to buffile.c
(which talks to fd.c).

* Hash joins use buffile.c directly, though (and have multiple
buffiles, as already noted).

Now, I might still have something that Thomas can reuse, because
buffile.c was made to support "unification" of worker BufFiles in
general. Thomas would be using that interface, if any. I haven't
studied parallel hash join at all, but presumably the difference would
be that *multiple* BufFiles would be unified, such that a
concatenated/unified BufFile would be addressable within each worker,
one per batch. All of this assumes that there is a natural way of
unifying the various batches involved across all workers, of course.

This aspect would present some complexity for Thomas, I think
(comments from hashjoin.h):
* It is possible to increase nbatch on the fly if the in-memory hash table* gets too big.  The hash-value-to-batch
computationis arranged so that this* can only cause a tuple to go into a later batch than previously thought,* never
intoan earlier batch.  When we increase nbatch, we rescan the hash* table and dump out any tuples that are now of a
laterbatch to the correct* inner batch file.  Subsequently, while reading either inner or outer batch* files, we might
findtuples that no longer belong to the current batch;* if so, we just dump them out to the correct batch file.
 

I'd be concerned about managing which backend was entitled to move
tuples across batches, and so on. One thing that I haven't had to
contend with is which backend "owns" which BufFile (or underlying fd.c
file handles). There is no ambiguity about that for me. Owners delete
the temp files on Xact end, and are the only ones entitled to write to
files, and only before unification. These latter restrictions might be
lifted if there was a good reason to do so.

-- 
Peter Geoghegan



Re: WIP: Barriers

From
Peter Geoghegan
Date:
On Mon, Aug 15, 2016 at 6:55 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> A sort of dumb way of handling all this is to assume that once a
> worker joins the hash join, it won't go off and do anything else until
> the hash join is done.  Under that assumption, you just need some sort
> of BarrierAttach() operation; workers that have never attached the
> barrier aren't participating in the hash join at all and so they are
> irrelevant - and now you know how many workers you need to await,
> because you can keep a count how many have attached.  Perhaps you
> simply turn away any workers that arrive after batch 0 is complete.

Is that really so bad? In general, I don't tend to think of workers as
the cost to worry about. Rather, we should be concerned about the
active use of CPU cores as our major cost.

-- 
Peter Geoghegan



Re: WIP: Barriers

From
Peter Geoghegan
Date:
On Sat, Aug 13, 2016 at 4:18 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> First, you initialise a Barrier object somewhere in shared memory,
> most likely in the DSM segment used by parallel query, by calling
> BarrierInit(&barrier, nworkers).  Then workers can call
> BarrierWait(&barrier) when they want to block until all workers arrive
> at the barrier.  When the final worker arrives, BarrierWait returns in
> all workers, releasing them to continue their work.  One arbitrary
> worker receives a different return value as a way of "electing" it to
> perform serial phases of computation.  For parallel phases of
> computation, the return value can be ignored.  For example, there may
> be preparation, merging, or post-processing phases which must be done
> by just one worker, interspersed with phases where all workers do
> something.

I think that this mechanism could be quite useful for sorting with
partitioning, which doesn't exist yet. What does exist is unlikely to
benefit from this over and above what Robert's "condition variables"
offer, because as it happens there is no need to "elect" a single
worker at all. The ordering dependencies happen to be quite naturally
across one leader process and one or more worker processes.

I do see value in this, though.

-- 
Peter Geoghegan



Re: WIP: Barriers

From
Thomas Munro
Date:
On Tue, Aug 16, 2016 at 1:55 AM, Robert Haas <robertmhaas@gmail.com> wrote:
> On Sat, Aug 13, 2016 at 7:18 PM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> My use case for this is coordinating the phases of parallel hash
>> joins, but I strongly suspect there are other cases.  Parallel sort
>> springs to mind, which is why I wanted to post this separately and
>> earlier than my larger patch series, to get feedback from people
>> working on other parallel features.
>
> I was thinking about this over the weekend and I started to wonder
> whether this is really going to work well for hash joins.  For
> example, suppose that 6GB of work_mem is available and the projected
> size of the hash table is 8GB.  Clearly, we're going to need 2
> batches, but, if our estimates are accurate and the tuples split
> evenly between batches, each batch will be only 4GB!  That means that
> we can build up to 2GB of the hash table for the next batch before
> we've finished with the hash table for the previous batch.  It seems
> like a really good idea to try to take advantage of that as much as
> possible.

Right.  Thanks for that example.  A naive approach with barriers
between building and probing does indeed introduce a kind of pipeline
stall where workers twiddle their thumbs, despite there being unused
work_mem and useful things that could be done concurrently to fill it.

> The simple version of this is that when a worker gets done with its
> own probe phase for batch X, it can immediately start building the
> hash table for phase X+1, stopping if it fills up the unused portion
> of work_mem before the old hash table goes away.  Of course, there are
> some tricky issues with reading tapes that were originally created by
> other backends, but if I understand correctly, Peter Geoghegan has
> already done some work on that problem, and it seems like something we
> can eventually solve, even if not in the first version.
>
> The more complicated version of this is that we might want to delegate
> one or more workers to start building as much of the next-batch hash
> table as will fit instead of assisting with the current probe phase.
> Once work_mem is full, they join the probe phase and continue until
> it's done.  Again, tape management is an issue.  But you can see that
> if you can make this work, in this example, you can reduce the
> enforced pause between batches by about 50%; half the work is already
> done by the time the old hash table goes away.  I bet that has a
> chance of being fairly significant, especially for hash joins that
> have tons of batches.  I once saw a 64-batch hash join outperform a
> nested loop with inner index scan!
>
> Anyway, my point here is that I'm not sure whether the barrier
> mechanic is going to work well for computations with overlapping
> phases, and I suspect that overlapping phases is going to be an
> important technique, so we should make sure not to settle into a
> synchronization model that makes it hard.

I have a draft scheme worked out to do what you called the "simple
version", though not in the first version I will post (the "naive
version").  I don't really want to go into all the details in this
thread, not least because I'm still working on it, but I do want to
point out that barriers as a synchronisation primitive are not at all
incompatible with overlapping work: on the contrary, they can help
coordinate it while keeping the code tractable.

Rough sketch*:  There will be a secondary hash table, used for
preloading the next batch.  As in the naive version, there is a
barrier after the hash table is loaded: you can't start probing an
incomplete hash table.  But after probing the primary table for batch
n, workers will immediately begin preloading the secondary table with
tuples for batch n + 1, until either work_mem is exhausted or batch n
+ 1 is entirely loaded.  Then will they synchronise, elect one worker
to promote the secondary table to primary, synchronise again, finish
loading the newly promoted primary table, and then rince and repeat:
synchronise to coordinate the beginning of probing for batch n + 1...

Of course there are also several other phases relating to
initialization, hashing, resizing and special cases for first and last
batches, as I will describe in a future thread with code.

*Actual patch may not resemble illustration.

>> A problem that I'm still grappling with is how to deal with workers
>> that fail to launch.  What I'm proposing so far is based on static
>> worker sets, where you can only give the number of workers at
>> initialisation time, just like pthread_barrier_init.  Some other
>> libraries allow for adjustable worker sets, and I'm wondering if a
>> parallel leader might need to be able to adjust the barrier when it
>> hears of a worker not starting.  More on that soon.
>
> I think tying this into the number of workers for the parallel context
> in general is going to get you into trouble.  For example, suppose
> that we have an Append plan and beneath that we have two children of
> each of which is a Hash Join.  Right now, Append is dumb, so it will
> blindly throw all of the workers at the first Hash Join and then, as
> they emerge, it will throw them at the second one.  However, we've had
> previous discussions which I'm too lazy to look up right now about
> making a Parallel Append that would split the workers between the two
> hash joins; if one of them finished, then those workers could go join
> the other hash join in medias res.

Yeah.  I continued grappling with this and found a way forward that I
am quite happy with for now.

Picture an N-lane Olympic swimming pool with N identical swimmers at
the starting blocks waiting for the gun to fire, with a rule that they
must wait for all swimmers to reach the end before the next heat of
the race begins.  That's what textbook parallel computing of the type
supported by pthread_barrier_t looks like, and what barrier-v1.patch
provided.

Postgres parallel query is not at all like that: it's more like a
flash mob pool party.  The referee fires the starting gun, and some
number of swimmers show up some time soonish, or not, and they jump in
anywhere they like; the referee (the backend executing the gather
node) may get bored and decide to dive-bomb into the middle of it all
too.  In future, the executor may send in new swimmers at random by
parachute!  OK, sorry for the tortured analogy... but the point is
that the size of the party may change at any time and new workers need
to synchronise with the active phase.

Here's a new version of barriers extended to allow for dynamic worker
parties.  It's something like .NET's System.Threading.Barrier, Java's
java.util.concurrent.Phaser and X10's clock, so I have at least a
small amount of confidence that it's not completely crackpot.  This
addition allows new workers to arrive at any time and get in sync with
the ongoing work: for example, if the gather node feels the impulse to
jump in, it will join the current activity and pitch in.

There are some complexities I'm still working out: it would be nice to
figure out how to remain attached to the barrier if I know for certain
that the caller will call again (basically an ExecutorRun(..., ..., 0)
loop, as opposed to a one-off call from a gather node).

> [...]
> That approach is not entirely satisfying, though.  As discussed on the
> thread about asynchronous and vectorized execution, it's desirable
> that, when a particular branch of a parallel query can't use any more
> workers - e.g. because they are all waiting for the next phase to
> begin - those workers can leave and go do something else.
> [...]

I believe my current approach is not incompatible with those future
work scheduling ideas.  The key requirement seems to be the ability to
detach and walk away without holding anyone else up, and reattach and
help out with whatever is currently going on as appropriate, while
synchronising work as appropriate.  From the point of view of an
executor node, there are basically two things you need to support a
user space coroutine/green thread/fibre/event style scheduler as I see
it: a way to yield and a way to continue from the right point, and the
latter is already covered.  Whatever yield-like operations are
invented in future will hopefully just slot into the right places,
without changing the underlying parallel hash join algorithm.

>> I thought about using a different name to avoid colliding with
>> barrier.h and overloading the term: there are of course also compiler
>> barriers and memory barriers.  But then I realised that that header
>> was basically vacant real estate, and 'barrier' is the super-well
>> established standard term for this parallel computing primitive.
>
> If we're going to remove barrier.h, I think that should be a separate
> commit from creating a new barrier.h.

OK.  Here's a patch to remove the old header, and the v2 barrier patch
which adds phases and attach/detach.  As before, it depends on
condition-variable-v2.patch.

--
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: WIP: Barriers

From
Thomas Munro
Date:
On Thu, Aug 18, 2016 at 1:55 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Tue, Aug 16, 2016 at 1:55 AM, Robert Haas <robertmhaas@gmail.com> wrote:
>> If we're going to remove barrier.h, I think that should be a separate
>> commit from creating a new barrier.h.
>
> OK.  Here's a patch to remove the old header, and the v2 barrier patch
> which adds phases and attach/detach.  As before, it depends on
> condition-variable-v2.patch.

Here's a new version which is rebased and adds support for passing
wait_event through to pg_stat_activity.

--
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: WIP: Barriers

From
Thomas Munro
Date:
On Tue, Nov 1, 2016 at 5:03 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Thu, Aug 18, 2016 at 1:55 PM, Thomas Munro
> <thomas.munro@enterprisedb.com> wrote:
>> On Tue, Aug 16, 2016 at 1:55 AM, Robert Haas <robertmhaas@gmail.com> wrote:
>>> If we're going to remove barrier.h, I think that should be a separate
>>> commit from creating a new barrier.h.
>>
>> OK.  Here's a patch to remove the old header, and the v2 barrier patch
>> which adds phases and attach/detach.  As before, it depends on
>> condition-variable-v2.patch.
>
> Here's a new version which is rebased and adds support for passing
> wait_event through to pg_stat_activity.

Here's a version updated for the new conditional variables interface
which has just been committed as e8ac886c.  Some comments improved.

--
Thomas Munro
http://www.enterprisedb.com

Attachment

Re: WIP: Barriers

From
Robert Haas
Date:
On Tue, Nov 22, 2016 at 4:42 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
> On Tue, Nov 1, 2016 at 5:03 PM, Thomas Munro
>> Here's a new version which is rebased and adds support for passing
>> wait_event through to pg_stat_activity.
>
> Here's a version updated for the new conditional variables interface
> which has just been committed as e8ac886c.  Some comments improved.

The code here looks OK.  A few thoughts:

- I'm a little unsure whether it's a good idea to remove the existing
barrier.h and immediately add a new barrier.h that does something
totally different.  It's tempting to try to find another name for
these things just to avoid that.  Then again, I bet there's not much
non-core code that is relying on the existing barrier.h, so maybe it's
OK.  On the third hand, the risk of confusing developers may be
non-nil even if we don't confuse any code.  I think I'll go commit
remove-useless-barrier-v4.patch now and see if anyone shows up to
complain...

- Should parallel bitmap heap scan be using this to decide who runs
the subplan?  It looks like it would work for what is needed in that
case, and it looks like it would be simpler (and probably more
correct) than what's there now.  PBMState could go away completely, I
think.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: [HACKERS] WIP: Barriers

From
Thomas Munro
Date:
On Wed, Nov 23, 2016 at 2:28 PM, Robert Haas <robertmhaas@gmail.com> wrote:
> The code here looks OK.  A few thoughts:
>
> - I'm a little unsure whether it's a good idea to remove the existing
> barrier.h and immediately add a new barrier.h that does something
> totally different.  It's tempting to try to find another name for
> these things just to avoid that.  Then again, I bet there's not much
> non-core code that is relying on the existing barrier.h, so maybe it's
> OK.  On the third hand, the risk of confusing developers may be
> non-nil even if we don't confuse any code.  I think I'll go commit
> remove-useless-barrier-v4.patch now and see if anyone shows up to
> complain...

Here is a new version with updated copyright messages and some dtrace
probes which provide an easy way to measure the time spend waiting at
barriers.

-- 
Thomas Munro
http://www.enterprisedb.com

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Attachment