Thread: [HACKERS] Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE
Hello everybody, Here is a thing I observed in my recent experimentation, on changing the value of PARALLEL_TUPLE_QUEUE_SIZE to 6553600, the performance of a TPC-H query is improved by more than 50%. Specifically, with this change, q12 completes in 14 seconds which was taking 45 seconds on head. There wasn't any change in the plan structure, just the time at gather-merge reduced which gave this improvement. This clearly says that the current value of PARALLEL_TUPLE_QUEUE_SIZE is not the best one for all the queries, rather some modification in it is very likely to improve performance significantly. One way to do is to give this parameters as another GUC just like min_parallel_table_scan_size, etc. Attached .txt file gives the plan at head and with this patch, additionally patch is attached for setting PARALLEL_TUPLE_QUEUE_SIZE to 6553600 too. Thoughts? -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/ -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Tue, May 30, 2017 at 5:28 AM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > Hello everybody, > > Here is a thing I observed in my recent experimentation, on changing > the value of PARALLEL_TUPLE_QUEUE_SIZE to 6553600, the performance of > a TPC-H query is improved by more than 50%. How many tuples are being gathered? This could happen if the workers are waiting for the leader to make space in the queue after its filled. By increasing the queue size we might be reducing the waiting time for worker. In that case, it may be better to check why leader is not pulling rows faster. How does the performance vary with different values of PARALLEL_TUPLE_QUEUE_SIZE? > > Specifically, with this change, q12 completes in 14 seconds which was > taking 45 seconds on head. There wasn't any change in the plan > structure, just the time at gather-merge reduced which gave this > improvement. > > This clearly says that the current value of PARALLEL_TUPLE_QUEUE_SIZE > is not the best one for all the queries, rather some modification in > it is very likely to improve performance significantly. One way to do > is to give this parameters as another GUC just like > min_parallel_table_scan_size, etc. GUC may help. > > Attached .txt file gives the plan at head and with this patch, > additionally patch is attached for setting PARALLEL_TUPLE_QUEUE_SIZE > to 6553600 too. Increasing that number would require increased DSM which may not be available. Also, I don't see any analysis as to why 6553600 is chosen? Is it optimal? Does that work for all kinds of work loads? -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On Tue, May 30, 2017 at 6:50 AM, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: > Increasing that number would require increased DSM which may not be > available. Also, I don't see any analysis as to why 6553600 is chosen? > Is it optimal? Does that work for all kinds of work loads? Picky, picky. The point is that Rafia has discovered that a large increase can sometimes significantly improve performance. I don't think she's necessarily proposing that (or anything else) as a final value that we should definitely use, just getting the conversation started. I did a little bit of brief experimentation on this same topic a long time ago and didn't see an improvement from boosting the queue size beyond 64k but Rafia is testing Gather rather than Gather Merge and, as I say, my test was very brief. I think it would be a good idea to try to get a complete picture here. Does this help on any query that returns many tuples through the Gather? Only the ones that use Gather Merge? Some queries but not others with no obvious pattern? Only this query? Blindly adding a GUC because we found one query that would be faster with a different value is not the right solution. If we don't even know why a larger value is needed here and (maybe) not elsewhere, then how will any user possibly know how to tune the GUC? And do we really want the user to have to keep adjusting a GUC before each query to get maximum performance? I think we need to understand the whole picture here, and then decide what to do. Ideally this would auto-tune, but we can't write code for that without a more complete picture of the behavior. BTW, there are a couple of reasons I originally picked 64k here. One is that making it smaller was very noticeably terrible in my testing, while making it bigger didn't help much. The other is that I figured 64k was small enough that nobody would care about the memory utilization. I'm not sure we can assume the same thing if we make this bigger. It's probably fine to use a 6.4M tuple queue for each worker if work_mem is set to something big, but maybe not if work_mem is set to the default of 4MB. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On 2017-05-30 07:27:12 -0400, Robert Haas wrote: > The other is that I figured 64k was small enough that nobody would > care about the memory utilization. I'm not sure we can assume the > same thing if we make this bigger. It's probably fine to use a 6.4M > tuple queue for each worker if work_mem is set to something big, but > maybe not if work_mem is set to the default of 4MB. Probably not. It might also end up being detrimental performancewise, because we start touching more memory. I guess it'd make sense to set it in the planner, based on the size of a) work_mem b) number of expected tuples. I do wonder whether the larger size fixes some scheduling issue (i.e. while some backend is scheduled out, the other side of the queue can continue), or whether it's largely triggered by fixable contention inside the queue. I'd guess it's a bit of both. It should be measurable in some cases, by comparing the amount of time blocking on reading the queue (or continuing because the queue is empty), writing to the queue (should always result in blocking) and time spent waiting for the spinlock. - Andres
> > I did a little bit of brief experimentation on this same topic a long > time ago and didn't see an improvement from boosting the queue size > beyond 64k but Rafia is testing Gather rather than Gather Merge and, > as I say, my test was very brief. I think it would be a good idea to > try to get a complete picture here. Does this help on any query that > returns many tuples through the Gather? Only the ones that use Gather > Merge? Some queries but not others with no obvious pattern? Only > this query? Yes, we need to get answers to those questions. I guess, performance measurements varying one parameter at a time would help us make right decision. Some of the relevant parameters I could quickly think of are: number of tuples received by Gather, size of tuples, Gather/Gather merge, number of workers. There may be more. I am guessing that the number of tuples that can fit the queue = (size of queue - headers)/(size of tuple + per tuple header). Higher the size of tuple, lesser is the number of tuples that a worker can queue up and so higher the chances that it will wait for the leader to empty the queue. For gather merge that varies a lot depending upon how the data is distributed across workers and it's probably more susceptible to variations in the number of tuples that fit in the queue. More the number of workers, busier will be the leader and thus more chance of workers waiting for the leader to empty the queue. But in that case a balancing effect will be that each worker will queue lesser number of rows. Measurements would help us see how these balancing factors play out actually. > > Blindly adding a GUC because we found one query that would be faster > with a different value is not the right solution. If we don't even > know why a larger value is needed here and (maybe) not elsewhere, then > how will any user possibly know how to tune the GUC? And do we really > want the user to have to keep adjusting a GUC before each query to get > maximum performance? I think we need to understand the whole picture > here, and then decide what to do. Ideally this would auto-tune, but > we can't write code for that without a more complete picture of the > behavior. We will need a way for user to cap the memory allocated and GUC looks like a better way to do that. I agree that the GUC as a tuning parameter will be much less useful. > > BTW, there are a couple of reasons I originally picked 64k here. One > is that making it smaller was very noticeably terrible in my testing, > while making it bigger didn't help much. The other is that I figured > 64k was small enough that nobody would care about the memory > utilization. I'm not sure we can assume the same thing if we make > this bigger. It's probably fine to use a 6.4M tuple queue for each > worker if work_mem is set to something big, but maybe not if work_mem > is set to the default of 4MB. AFAIK, work_mem comes from memory private to the process whereas this memory will come from the shared memory pool. There are different OS level settings for those and thus linking size of parallel tuple queue with work_mem may not always work. But I agree that size of work_mem is indicative of size of data that needs to be processed in general and hence can be used as a good estimate of required size of the queue. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
On Wed, May 31, 2017 at 2:35 AM, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote: > AFAIK, work_mem comes from memory private to the process whereas this > memory will come from the shared memory pool. I don't think that really matters. The point of limits like work_mem is to avoid driving the machine into swap. Allocating shared memory might error out rather than causing swapping in some cases on some systems, but any difference between private and shared memory is not the real issue here. The issue is overall memory consumption. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Tue, May 30, 2017 at 4:57 PM, Robert Haas <robertmhaas@gmail.com> wrote: > I did a little bit of brief experimentation on this same topic a long > time ago and didn't see an improvement from boosting the queue size > beyond 64k but Rafia is testing Gather rather than Gather Merge and, > as I say, my test was very brief. I think it would be a good idea to > try to get a complete picture here. Does this help on any query that > returns many tuples through the Gather? Only the ones that use Gather > Merge? Some queries but not others with no obvious pattern? Only > this query? > I did further exploration trying other values of PARALLEL_TUPLE_QUEUE_SIZE and trying different queries and here are my findings, - on even setting PARALLEL_TUPLE_QUEUE_SIZE to 655360, there isn't much improvement in q12 itself. - there is no other TPC-H query which is showing significant improvement on 6553600 itself. There is a small improvement in q3 which is also using gather-merge. - as per perf analysis of q12 on head and patch, the %age of ExecGatherMerge is 18% with patch and 98% on head, and similar with gather_merge_readnext and gather_merge_writenext. As per my understanding it looks like this increase in tuple queue size is helping only gather-merge. Particularly, in the case where it is enough stalling by master in gather-merge because it is maintaining the sort-order. Like in q12 the index is unclustered and gather-merge is just above parallel index scan, thus, it is likely that to maintain the order the workers have to wait long for the in-sequence tuple is attained by the master. Something like this might be happening, master takes one tuple from worker 1, then next say 10 tuples from worker 2 and so on, and then finally returning to worker1, so, one worker 1 has done enough that filled it's queue it sits idle. Hence, on increasing the tuple queue size helps in workers to keep on working for longer and this is improving the performance. In other cases like q3, q18, etc. gather-merge is above sort, partial group aggregate, etc. here the chances of stalls is comparatively lesser stalls since the scan of relation is using the primary key, hence the tuples in the blocks are likely to be in the order. Similar was the case for many other cases of TPC-H queries. Other thing is that in TPC-H benchmark queries most of the time the number of tuples at gather-merge is fairly low so I'll try to test this on some custom queries which exhibit aforementioned case. > Blindly adding a GUC because we found one query that would be faster > with a different value is not the right solution. If we don't even > know why a larger value is needed here and (maybe) not elsewhere, then > how will any user possibly know how to tune the GUC? And do we really > want the user to have to keep adjusting a GUC before each query to get > maximum performance? I think we need to understand the whole picture > here, and then decide what to do. Ideally this would auto-tune, but > we can't write code for that without a more complete picture of the > behavior. > Yeah may be for the scenario discussed above GUC is not the best idea but may be using something which can tell the relation between the ordering on index and the physical ordering of the tuples along with the number of tuples, etc. by the planner to decide the value of PARALLEL_TUPLE_QUEUE_SIZE might help. E.g. if the index is primary key then the physical order is same as index order and if this the sort key then while at gather-merge stalls would be less, but if this is unclustered index then the physical order is way different than index order then it is likely that workers would be stalling more so keep a higher value of PARALLEL_TUPLE_QUEUE _SIZE based on the number of tuples. Again I am not yet concluding anything as this is very less experimentation to ascertain something, I'll continue the experiments and would be grateful to have more suggestions on that. -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/
On 2017-06-01 18:41:20 +0530, Rafia Sabih wrote: > As per my understanding it looks like this increase in tuple queue > size is helping only gather-merge. Particularly, in the case where it > is enough stalling by master in gather-merge because it is maintaining > the sort-order. Like in q12 the index is unclustered and gather-merge > is just above parallel index scan, thus, it is likely that to maintain > the order the workers have to wait long for the in-sequence tuple is > attained by the master. I wonder if there's some way we could make this problem a bit less bad. One underlying problem is that we don't know what the current boundary on each worker is, unless it returns a tuple. I.e. even if some worker is guaranteed to not return any further tuples below another worker's last tuple, gather-merge won't know about that until it finds another matching tuple. Perhaps, for some subsets, we could make the workers update that boundary without producing a tuple that gather will actually return? In the, probably reasonably common, case of having merge-joins below the gather, it shouldn't be very hard to do so. Imagine e.g. that every worker gets a "slot" in a dsm where it can point to a tuple (managed by dsa.c to deal with variable-length keys) that contains the current boundary. For a merge-join it'd not be troublesome to occasionally - although what constitutes that isn't easy, perhaps the master signals the worker? - put a new boundary tuple there, even if it doesn't find a match. It's probably harder for cases where most of the filtering happens far below the top-level worker node. - Andres
On Thu, Jun 1, 2017 at 6:41 PM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > On Tue, May 30, 2017 at 4:57 PM, Robert Haas <robertmhaas@gmail.com> wrote: > >> I did a little bit of brief experimentation on this same topic a long >> time ago and didn't see an improvement from boosting the queue size >> beyond 64k but Rafia is testing Gather rather than Gather Merge and, >> as I say, my test was very brief. I think it would be a good idea to >> try to get a complete picture here. Does this help on any query that >> returns many tuples through the Gather? Only the ones that use Gather >> Merge? Some queries but not others with no obvious pattern? Only >> this query? >> > I did further exploration trying other values of > PARALLEL_TUPLE_QUEUE_SIZE and trying different queries and here are my > findings, > - on even setting PARALLEL_TUPLE_QUEUE_SIZE to 655360, there isn't > much improvement in q12 itself. > - there is no other TPC-H query which is showing significant > improvement on 6553600 itself. There is a small improvement in q3 > which is also using gather-merge. > - as per perf analysis of q12 on head and patch, the %age of > ExecGatherMerge is 18% with patch and 98% on head, and similar with > gather_merge_readnext and gather_merge_writenext. > > As per my understanding it looks like this increase in tuple queue > size is helping only gather-merge. Particularly, in the case where it > is enough stalling by master in gather-merge because it is maintaining > the sort-order. Like in q12 the index is unclustered and gather-merge > is just above parallel index scan, thus, it is likely that to maintain > the order the workers have to wait long for the in-sequence tuple is > attained by the master. Something like this might be happening, master > takes one tuple from worker 1, then next say 10 tuples from worker 2 > and so on, and then finally returning to worker1, so, one worker 1 has > done enough that filled it's queue it sits idle. Hence, on increasing > the tuple queue size helps in workers to keep on working for longer > and this is improving the performance. > Your reasoning sounds sensible to me. I think the other way to attack this problem is that we can maintain some local queue in each of the workers when the shared memory queue becomes full. Basically, we can extend your "Faster processing at Gather node" patch [1] such that instead of fixed sized local queue, we can extend it when the shm queue become full. I think that way we can handle both the problems (worker won't stall if shm queues are full and workers can do batched writes in shm queue to avoid the shm queue communication overhead) in a similar way. [1] - https://www.postgresql.org/message-id/CAOGQiiMwhOd5-iKZnizn%2BEdzZmB0bc3xa6rKXQgvhbnQ29zCJg%40mail.gmail.com -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
On Fri, Jun 2, 2017 at 9:01 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: > Your reasoning sounds sensible to me. I think the other way to attack > this problem is that we can maintain some local queue in each of the > workers when the shared memory queue becomes full. Basically, we can > extend your "Faster processing at Gather node" patch [1] such that > instead of fixed sized local queue, we can extend it when the shm > queue become full. I think that way we can handle both the problems > (worker won't stall if shm queues are full and workers can do batched > writes in shm queue to avoid the shm queue communication overhead) in > a similar way. We still have to bound the amount of memory that we use for queueing data in some way. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Fri, Jun 2, 2017 at 6:38 PM, Robert Haas <robertmhaas@gmail.com> wrote: > On Fri, Jun 2, 2017 at 9:01 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: >> Your reasoning sounds sensible to me. I think the other way to attack >> this problem is that we can maintain some local queue in each of the >> workers when the shared memory queue becomes full. Basically, we can >> extend your "Faster processing at Gather node" patch [1] such that >> instead of fixed sized local queue, we can extend it when the shm >> queue become full. I think that way we can handle both the problems >> (worker won't stall if shm queues are full and workers can do batched >> writes in shm queue to avoid the shm queue communication overhead) in >> a similar way. > > We still have to bound the amount of memory that we use for queueing > data in some way. > Yeah, probably till work_mem (or some percentage of work_mem). If we want to have some extendable solution then we might want to back it up with some file, however, we might not need to go that far. I think we can do some experiments to see how much additional memory is sufficient to give us maximum benefit. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
On Fri, Jun 2, 2017 at 9:15 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: > On Fri, Jun 2, 2017 at 6:38 PM, Robert Haas <robertmhaas@gmail.com> wrote: >> On Fri, Jun 2, 2017 at 9:01 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: >>> Your reasoning sounds sensible to me. I think the other way to attack >>> this problem is that we can maintain some local queue in each of the >>> workers when the shared memory queue becomes full. Basically, we can >>> extend your "Faster processing at Gather node" patch [1] such that >>> instead of fixed sized local queue, we can extend it when the shm >>> queue become full. I think that way we can handle both the problems >>> (worker won't stall if shm queues are full and workers can do batched >>> writes in shm queue to avoid the shm queue communication overhead) in >>> a similar way. >> >> We still have to bound the amount of memory that we use for queueing >> data in some way. > > Yeah, probably till work_mem (or some percentage of work_mem). If we > want to have some extendable solution then we might want to back it up > with some file, however, we might not need to go that far. I think we > can do some experiments to see how much additional memory is > sufficient to give us maximum benefit. Yes, I think that's important. Also, I think we still need a better understanding of in which cases the benefit is there. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Fri, Jun 2, 2017 at 6:31 PM, Amit Kapila <amit.kapila16@gmail.com> wrote: > > Your reasoning sounds sensible to me. I think the other way to attack > this problem is that we can maintain some local queue in each of the > workers when the shared memory queue becomes full. Basically, we can > extend your "Faster processing at Gather node" patch [1] such that > instead of fixed sized local queue, we can extend it when the shm > queue become full. I think that way we can handle both the problems > (worker won't stall if shm queues are full and workers can do batched > writes in shm queue to avoid the shm queue communication overhead) in > a similar way. > > > [1] - https://www.postgresql.org/message-id/CAOGQiiMwhOd5-iKZnizn%2BEdzZmB0bc3xa6rKXQgvhbnQ29zCJg%40mail.gmail.com > I worked on this idea of using local queue as a temporary buffer to write the tuples when master is busy and shared queue is full, and it gives quite some improvement in the query performance. Design: On a basic level, the design of this patch can be explained as following, similar to shm_mq, there is a new structure local_mq which is private for each worker. Once shared queue is full, we write the tuple in local queue. Since, local queue is never shared we do not need any sort of locking for writing in it, hence writing in local queue is one cheap operation. Once local queue is atleast 5% (for this version, I've kept this, but we might need to modify it) full we copy the data from local to shared queue. In case both the queues are full, wait till master reads from shared queue, then copy some data from local to shared queue, till required space is available, subsequently write the tuple to local queue. If at any instant local queue becomes empty then we write the tuple in shared queue itself, provided there is space. At the time of worker shutdown we copy all the data from local queue to shared queue. For this version of the patch I have kept the size of local queue = 100 * PARALLEL_TUPLE_QUEUE_SIZE = 6553600, which might not be the best and I am open to understand the reasons for modifying it. But it is kept that way for the scenarios where gather/gather-merge node is slow. And I expect when a master is busy it might be for some long time or the data to be processed is high and we would not want our worker to wait for some long time. Performance: These experiments are on TPC-H scale factor 20. The patch is giving around 20-30% performance improvement in queries with selectivity something around 20-30%. Head: Default plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 15000000; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------- Index Scan using idx_lineitem_orderkey on lineitem (cost=0.57..334367.85 rows=10313822 width=129) (actual time=0.057..26389.587 rows=10258702 loops=1) Index Cond: (l_orderkey < 15000000) Filter: (l_extendedprice < '50000'::numeric) Rows Removed by Filter: 4737888 Planning time: 1.686 ms Execution time: 27402.801 ms (6 rows) Force parallelism plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 15000000; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather (cost=0.57..193789.78 rows=10313822 width=129) (actual time=0.354..41153.916 rows=10258702 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Index Scan using idx_lineitem_orderkey on lineitem (cost=0.57..193789.78 rows=2578456 width=129) (actual time=0.062..6530.167 rows=2051740 loops=5) Index Cond: (l_orderkey < 15000000) Filter: (l_extendedprice < '50000'::numeric) Rows Removed by Filter: 947578 Planning time: 0.383 ms Execution time: 42027.645 ms (9 rows) Patch: Force parallelism plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 15000000; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather (cost=0.57..193789.78 rows=10313822 width=129) (actual time=0.413..16690.294 rows=10258702 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Index Scan using idx_lineitem_orderkey on lineitem (cost=0.57..193789.78 rows=2578456 width=129) (actual time=0.047..6185.527 rows=2051740 loops=5) Index Cond: (l_orderkey < 15000000) Filter: (l_extendedprice < '50000'::numeric) Rows Removed by Filter: 947578 Planning time: 0.406 ms Execution time: 17616.750 ms (9 rows) Head: Default plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 30000000; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------- Index Scan using idx_lineitem_orderkey on lineitem (cost=0.57..684102.33 rows=21101661 width=129) (actual time=0.131..55532.251 rows=20519918 loops=1) Index Cond: (l_orderkey < 30000000) Filter: (l_extendedprice < '50000'::numeric) Rows Removed by Filter: 9479875 Planning time: 0.318 ms Execution time: 57436.251 ms (6 rows) Force parallelism plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 30000000; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather (cost=0.57..396485.31 rows=21101661 width=129) (actual time=0.557..69190.640 rows=20519918 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Index Scan using idx_lineitem_orderkey on lineitem (cost=0.57..396485.31 rows=5275415 width=129) (actual time=0.106..12797.711 rows=4103984 loops=5) Index Cond: (l_orderkey < 30000000) Filter: (l_extendedprice < '50000'::numeric) Rows Removed by Filter: 1895975 Planning time: 0.393 ms Execution time: 70924.801 ms (9 rows) Patch: Force parallelism plan: explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 30000000; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather (cost=0.57..396485.31 rows=21101661 width=129) (actual time=0.424..31677.524 rows=20519918 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Index Scan using idx_lineitem_orderkey on lineitem (cost=0.57..396485.31 rows=5275415 width=129) (actual time=0.075..12811.910 rows=4103984 loops=5) Index Cond: (l_orderkey < 30000000) Filter: (l_extendedprice < '50000'::numeric) Rows Removed by Filter: 1895975 Planning time: 0.462 ms Execution time: 33440.322 ms (9 rows) Head: Default plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 60000000; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------- Index Scan using idx_lineitem_orderkey on lineitem (cost=0.57..1337265.07 rows=41248987 width=129) (actual time=0.070..107944.729 rows=41035759 loops=1) Index Cond: (l_orderkey < 60000000) Filter: (l_extendedprice < '50000'::numeric) Rows Removed by Filter: 18950286 Planning time: 2.021 ms Execution time: 111963.420 ms (6 rows) Force parallelism plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 60000000; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------- Gather (cost=0.00..692896.08 rows=41248987 width=129) (actual time=0.354..141432.886 rows=41035759 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Seq Scan on lineitem (cost=0.00..692896.08 rows=10312247 width=129) (actual time=0.029..31678.105 rows=8207152 loops=5) Filter: ((l_extendedprice < '50000'::numeric) AND (l_orderkey < 60000000)) Rows Removed by Filter: 15791770 Planning time: 1.883 ms Execution time: 144859.515 ms (8 rows) Patch: Force parallelism plan explain analyse select * from lineitem where l_extendedprice < 50000 and l_orderkey < 60000000; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------- Gather (cost=0.00..692896.08 rows=41248987 width=129) (actual time=0.350..78312.666 rows=41035759 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Seq Scan on lineitem (cost=0.00..692896.08 rows=10312247 width=129) (actual time=0.027..31867.170 rows=8207152 loops=5) Filter: ((l_extendedprice < '50000'::numeric) AND (l_orderkey < 60000000)) Rows Removed by Filter: 15791770 Planning time: 0.439 ms Execution time: 82057.225 ms (8 rows) Apart from these, Q12 from the benchmark queries shows good improvement with this patch. Head: --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1001.19..426457.34 rows=1 width=27) (actual time=42770.491..42770.491 rows=1 loops=1) -> GroupAggregate (cost=1001.19..2979194.24 rows=7 width=27) (actual time=42770.489..42770.489 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1001.19..2969127.63 rows=575231 width=27) (actual time=11.355..42224.843 rows=311095 loops=1) Workers Planned: 4 Workers Launched: 4 -> Nested Loop (cost=1.13..2899612.01 rows=143808 width=27) (actual time=0.346..10385.472 rows=62906 loops=5) -> Parallel Index Scan using idx_l_shipmode on lineitem (cost=0.57..2796168.46 rows=143808 width=19) (actual time=0.280..9004.095 rows=62906 loops=5) Index Cond: (l_shipmode = ANY ('{"REG AIR",RAIL}'::bpchar[])) Filter: ((l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1995-01-01'::date) AND (l_receiptdate < '1996-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 3402367 -> Index Scan using orders_pkey on orders (cost=0.56..0.72 rows=1 width=20) (actual time=0.020..0.020 rows=1 loops=314530) Index Cond: (o_orderkey = lineitem.l_orderkey) Planning time: 1.202 ms Execution time: 42841.895 ms (15 rows) Patch: -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1001.19..426457.34 rows=1 width=27) (actual time=19461.653..19461.654 rows=1 loops=1) -> GroupAggregate (cost=1001.19..2979194.24 rows=7 width=27) (actual time=19461.651..19461.651 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1001.19..2969127.63 rows=575231 width=27) (actual time=10.239..18783.386 rows=311095 loops=1) Workers Planned: 4 Workers Launched: 4 -> Nested Loop (cost=1.13..2899612.01 rows=143808 width=27) (actual time=0.376..19109.107 rows=66104 loops=5) -> Parallel Index Scan using idx_l_shipmode on lineitem (cost=0.57..2796168.46 rows=143808 width=19) (actual time=0.310..16615.236 rows=66104 loops=5) Index Cond: (l_shipmode = ANY ('{"REG AIR",RAIL}'::bpchar[])) Filter: ((l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1995-01-01'::date) AND (l_receiptdate < '1996-01-01 00:00:00'::timestamp with out time zone)) Rows Removed by Filter: 3574492 -> Index Scan using orders_pkey on orders (cost=0.56..0.72 rows=1 width=20) (actual time=0.034..0.034 rows=1 loops=330519) Index Cond: (o_orderkey = lineitem.l_orderkey) Planning time: 3.498 ms Execution time: 19661.054 ms (15 rows) This suggests that with such an idea the range of selectivity for using parallelism can be extended for improving the performance of the queries. Credits: Would like to extend thanks to my colleagues Dilip Kumar, Amit Kapila, and Robert Haas for their discussions and words of encouragement throughout the development of this patch. Feedback and suggestions are welcome. -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/ -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > I worked on this idea of using local queue as a temporary buffer to > write the tuples when master is busy and shared queue is full, and it > gives quite some improvement in the query performance. > I have done some initial review of this patch and I have some comments. /* this is actual size for this tuple which will be written in queue */ + tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len); + + /* create and attach a local queue, if it is not yet created */ + if (mqh->mqh_local_queue == NULL) + mqh = local_mq_attach(mqh); I think we can create the local queue when first time we need it. So basically you can move this code inside else part where we first identify that there is no space in the shared queue. ------ + /* write in local queue if there is enough space*/ + if (local_space > tuple_size) I think the condition should be if (local_space >= tuple_size) ------ + while(shm_space <= 0) + { + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + shm_space = space_in_shm(mqh->mqh_queue); + } Instead of waiting in CPU intensive while loop we should wait on some latch, why can't we use wait latch of the shared queue and whenever some space free, the latch will be set then we can recheck the space and if it's enough we can write to share queue otherwise wait on the latch again. Check other similar occurrences. --------- + if (read_local_queue(lq, true) && shm_space > 0) + copy_local_to_shared(lq, mqh, false); + Instead of adding shm_space > 0 in check it can be Assert or nothing because above loop will only break if shm_space > 0. ---- + /* + * create a local queue, the size of this queue should be way higher + * than PARALLEL_TUPLE_QUEUE_SIZE + */ + char *mq; + Size len; + + len = 6553600; Create some macro which is multiple of PARALLEL_TUPLE_QUEUE_SIZE, ------- + /* this local queue is not required anymore, hence free the space. */ + pfree(mqh->mqh_local_queue); + return; +} I think you can remove the return at the end of the void function. ----- In empty_queue(shm_mq_handle *mqh) function I see that only last exit path frees the local queue but not all even though local queue is created. ---- Other cosmetic issues. ----------------------------- +/* check the space availability in local queue */ +static uint64 +space_in_local(local_mq *lq, Size tuple_size) +{ + uint64 read, written, used, available, ringsize, writer_offset, reader_offset; + + ringsize = lq->mq_ring_size; + read = lq->mq_bytes_read; + written = lq->mq_bytes_written; + used = written - read; + available = ringsize - used; + Alignment is not correct, I think you can run pgindent once. ---- + /* check is there is required space in shared queue */ statement need refactoring. "check if there is required space in shared queue" ? ----- + /* write in local queue if there is enough space*/ space missing before comment end. ----- + +/* Routines required for local queue */ + +/* + * Initialize a new local message queue, this is kept quite similar to shm_mq_create. + */ Header comments formatting is not in sync with other functions. ----- +} +/* routine to create and attach local_mq to the shm_mq_handle */ one blank line between two functions. ----- + bool detached; + + detached = false; a better way is bool detached = false; ----- Compilation warning -------------------- shm_mq.c: In function ‘write_in_local_queue’: shm_mq.c:1489:32: warning: variable ‘tuple_size’ set but not used [-Wunused-but-set-variable] uint64 bytes_written, nbytes, tuple_size; -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Sep 17, 2017 at 9:10 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote: > On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih > <rafia.sabih@enterprisedb.com> wrote: > >> I worked on this idea of using local queue as a temporary buffer to >> write the tuples when master is busy and shared queue is full, and it >> gives quite some improvement in the query performance. >> > > I have done some initial review of this patch and I have some comments. > Thanks for the review. > /* this is actual size for this tuple which will be written in queue */ > + tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len); > + > + /* create and attach a local queue, if it is not yet created */ > + if (mqh->mqh_local_queue == NULL) > + mqh = local_mq_attach(mqh); > > I think we can create the local queue when first time we need it. So > basically you > can move this code inside else part where we first identify that there is no > space in the shared queue. > Done. > ------ > + /* write in local queue if there is enough space*/ > + if (local_space > tuple_size) > > I think the condition should be if (local_space >= tuple_size) > I did this to be on the safer side, anyhow modified. > ------ > + while(shm_space <= 0) > + { > + if (shm_mq_is_detached(mqh->mqh_queue)) > + return SHM_MQ_DETACHED; > + > + shm_space = space_in_shm(mqh->mqh_queue); > + } > > Instead of waiting in CPU intensive while loop we should wait on some latch, why > can't we use wait latch of the shared queue and whenever some space > free, the latch will > be set then we can recheck the space and if it's enough we can write > to share queue > otherwise wait on the latch again. > > Check other similar occurrences. Done. > --------- > > + if (read_local_queue(lq, true) && shm_space > 0) > + copy_local_to_shared(lq, mqh, false); > + > > Instead of adding shm_space > 0 in check it can be Assert or nothing > because above loop will > only break if shm_space > 0. > ---- Done > > + /* > + * create a local queue, the size of this queue should be way higher > + * than PARALLEL_TUPLE_QUEUE_SIZE > + */ > + char *mq; > + Size len; > + > + len = 6553600; > > Create some macro which is multiple of PARALLEL_TUPLE_QUEUE_SIZE, > Done. > ------- > > + /* this local queue is not required anymore, hence free the space. */ > + pfree(mqh->mqh_local_queue); > + return; > +} > > I think you can remove the return at the end of the void function. > ----- Done. > > In empty_queue(shm_mq_handle *mqh) function I see that only last exit path frees > the local queue but not all even though local queue is created. > ---- > Modified. > Other cosmetic issues. > ----------------------------- > +/* check the space availability in local queue */ > +static uint64 > +space_in_local(local_mq *lq, Size tuple_size) > +{ > + uint64 read, written, used, available, ringsize, writer_offset, reader_offset; > + > + ringsize = lq->mq_ring_size; > + read = lq->mq_bytes_read; > + written = lq->mq_bytes_written; > + used = written - read; > + available = ringsize - used; > + > Alignment is not correct, I think you can run pgindent once. > ---- > > + /* check is there is required space in shared queue */ > statement need refactoring. "check if there is required space in shared queue" ? > > ----- > > + /* write in local queue if there is enough space*/ > space missing before comment end. > > ----- > > + > +/* Routines required for local queue */ > + > +/* > + * Initialize a new local message queue, this is kept quite similar > to shm_mq_create. > + */ > > Header comments formatting is not in sync with other functions. > > ----- > > +} > +/* routine to create and attach local_mq to the shm_mq_handle */ > > one blank line between two functions. > > ----- > Ran pgindent for these issues. > + bool detached; > + > + detached = false; > > a better way is bool detached = false; > > ----- > Done. > Compilation warning > -------------------- > shm_mq.c: In function ‘write_in_local_queue’: > shm_mq.c:1489:32: warning: variable ‘tuple_size’ set but not used > [-Wunused-but-set-variable] > uint64 bytes_written, nbytes, tuple_size; > That might be in case not configured with cassert, however, it is removed in current version anyway. Please find the attached file for the revised version. -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/ -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Thu, Sep 21, 2017 at 4:50 PM, Rafia Sabih <rafia.sabih@enterprisedb.com> wrote: > On Sun, Sep 17, 2017 at 9:10 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote: >> On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih >> <rafia.sabih@enterprisedb.com> wrote: >> > > Please find the attached file for the revised version. Thanks for the updated patch, I have some more comments. +static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh); +static uint64 space_in_local(local_mq * lq, Size tuple_size); +static bool read_local_queue(local_mq * lq, bool shm_mq_full); local_mq * lq -> local_mq *lq same for other places as well. --- +static uint64 space_in_shm(shm_mq *mq); + +static uint64 space_in_local(local_mq * lq, Size tuple_size); we better use Size here instead if uint64 --- + available = ringsize - used; + + ringsize = lq->mq_ring_size; + writer_offset = lq->mq_bytes_written % ringsize; + reader_offset = lq->mq_bytes_read % ringsize; + + if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset) + available = (ringsize - writer_offset); even though there is space in queue but tuple need rotation then we are not allowing it to write into the local queue. If there is some strong reason behind that, please add comments to explain the same. --- + if (shm_mq_full || (written - read) >= .05 * lq->mq_ring_size) + return true; + + else + return true; +} Seems like you want to return 'false' in the else case. ---- + read_offset = lq->mq_bytes_read % lq->mq_ring_size; + available = space_in_shm(mqh->mqh_queue); + + /* always read data in the aligned form */ + to_read = MAXALIGN_DOWN(Min(used, available)); + + /* + * if the amount of data to be send from local queue involves wrapping of + * local queue, then send only the data till the end of queue right now + * and rest later. + */ + if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size) You can directly use "read_offset" instead of recalculating lq->mq_bytes_read % lq->mq_ring_size. ---- + do + { + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + shm_space = space_in_shm(mqh->mqh_queue); + + /* + * cannot send data to shared queue, unless there is required + * space, so wait till we get some space, since we cannot + * write anymore in local queue as of now + */ + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND); + + /* Reset the latch so we don't spin. */ + ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + shm_space = space_in_shm(mqh->mqh_queue); + + if (read_local_queue(lq, true) && shm_space > 0) + copy_local_to_shared(lq, mqh, false); + + local_space = space_in_local(lq, tuple_size); + + } while (local_space <= tuple_size); + 1. Just after getting the shm_space, you are calling WaitLatch, without even checking whether that space is sufficient to send the tuple. 2. Every time after latch is set (maybe some space freed in the shared queue) you are calling copy_local_to_shared to send as much data as possible from local to shared queue, but that doesn't even guarantee that we will have sufficient space in the local queue to accommodate the current tuple. I think calling copy_local_to_shared multiple time (which will internally acquire mutex), after latch is set you can check the shared queue space, don't attempt copy_local_to_shared unless shm_space >=tuple_size -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Sep 21, 2017 at 10:34 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote: > On Thu, Sep 21, 2017 at 4:50 PM, Rafia Sabih > <rafia.sabih@enterprisedb.com> wrote: >> On Sun, Sep 17, 2017 at 9:10 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote: >>> On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih >>> <rafia.sabih@enterprisedb.com> wrote: >>> >> >> Please find the attached file for the revised version. > > Thanks for the updated patch, I have some more comments. > Again, thanks for the review. Please find the attached file for updated patch. > +static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh); > +static uint64 space_in_local(local_mq * lq, Size tuple_size); > +static bool read_local_queue(local_mq * lq, bool shm_mq_full); > > local_mq * lq -> local_mq *lq > same for other places as well. > > --- > Done. This is something pgindent does, anyhow corrected it. > +static uint64 space_in_shm(shm_mq *mq); > + > +static uint64 space_in_local(local_mq * lq, Size tuple_size); > > we better use Size here instead if uint64 > > --- Done > > + available = ringsize - used; > + > + ringsize = lq->mq_ring_size; > + writer_offset = lq->mq_bytes_written % ringsize; > + reader_offset = lq->mq_bytes_read % ringsize; > + > + if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset) > + available = (ringsize - writer_offset); > > even though there is space in queue but tuple need rotation then we > are not allowing it to > write into the local queue. If there is some strong reason behind > that, please add comments > to explain the same. > --- > Corrected, it will just return available space now. > + if (shm_mq_full || (written - read) >= .05 * lq->mq_ring_size) > + return true; > + > + else > + return true; > +} > > Seems like you want to return 'false' in the else case. > > ---- Yes and done. > > + read_offset = lq->mq_bytes_read % lq->mq_ring_size; > + available = space_in_shm(mqh->mqh_queue); > + > + /* always read data in the aligned form */ > + to_read = MAXALIGN_DOWN(Min(used, available)); > + > + /* > + * if the amount of data to be send from local queue involves wrapping of > + * local queue, then send only the data till the end of queue right now > + * and rest later. > + */ > + if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size) > > You can directly use "read_offset" instead of recalculating > lq->mq_bytes_read % lq->mq_ring_size. > > ---- Done. > + do > + { > + if (shm_mq_is_detached(mqh->mqh_queue)) > + return SHM_MQ_DETACHED; > + > + shm_space = space_in_shm(mqh->mqh_queue); > + > + /* > + * cannot send data to shared queue, unless there is required > + * space, so wait till we get some space, since we cannot > + * write anymore in local queue as of now > + */ > + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND); > + > + /* Reset the latch so we don't spin. */ > + ResetLatch(MyLatch); > + > + /* An interrupt may have occurred while we were waiting. */ > + CHECK_FOR_INTERRUPTS(); > + > + shm_space = space_in_shm(mqh->mqh_queue); > + > + if (read_local_queue(lq, true) && shm_space > 0) > + copy_local_to_shared(lq, mqh, false); > + > + local_space = space_in_local(lq, tuple_size); > + > + } while (local_space <= tuple_size); > + > > 1. Just after getting the shm_space, you are calling WaitLatch, > without even checking whether > that space is sufficient to send the tuple. > 2. Every time after latch is set (maybe some space freed in the shared > queue) you are calling > copy_local_to_shared to send as much data as possible from local to > shared queue, but that doesn't > even guarantee that we will have sufficient space in the local queue > to accommodate the current tuple. > > I think calling copy_local_to_shared multiple time (which will > internally acquire mutex), after latch > is set you can check the shared queue space, don't attempt > copy_local_to_shared unless > shm_space >=tuple_size > Done. > -- > Regards, > Dilip Kumar > EnterpriseDB: http://www.enterprisedb.com -- Regards, Rafia Sabih EnterpriseDB: http://www.enterprisedb.com/ -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers