Thread: Re: [HACKERS] [postgresql 10 beta3] unrecognized node type: 90
On Fri, Aug 11, 2017 at 11:59:14AM -0400, Tom Lane wrote: > "Adam, Etienne (Nokia-TECH/Issy Les Moulineaux)" <etienne.adam@nokia.com> writes: > > ERROR: XX000: unrecognized node type: 90 > > LOCATION: ExecReScan, execAmi.c:284 > > (gdb) p (NodeTag) 90 > $1 = T_GatherMergeState > > So, apparently somebody wrote ExecReScanGatherMerge, but never bothered > to plug it into ExecReScan. From which we may draw depressing conclusions > about how much it's been tested. [Action required within three days. This is a generic notification.] The above-described topic is currently a PostgreSQL 10 open item. Robert, since you committed the patch believed to have created it, you own this open item. If some other commit is more relevant or if this does not belong as a v10 open item, please let us know. Otherwise, please observe the policy on open item ownership[1] and send a status update within three calendar days of this message. Include a date for your subsequent status update. Testers may discover new open items at any time, and I want to plan to get them all fixed well in advance of shipping v10. Consequently, I will appreciate your efforts toward speedy resolution. Thanks. [1] https://www.postgresql.org/message-id/20170404140717.GA2675809%40tornado.leadboat.com
On Tue, Aug 15, 2017 at 8:22 AM, Noah Misch <noah@leadboat.com> wrote: > On Fri, Aug 11, 2017 at 11:59:14AM -0400, Tom Lane wrote: >> "Adam, Etienne (Nokia-TECH/Issy Les Moulineaux)" <etienne.adam@nokia.com> writes: >> > ERROR: XX000: unrecognized node type: 90 >> > LOCATION: ExecReScan, execAmi.c:284 >> >> (gdb) p (NodeTag) 90 >> $1 = T_GatherMergeState >> >> So, apparently somebody wrote ExecReScanGatherMerge, but never bothered >> to plug it into ExecReScan. Attached patch fixes the issue for me. I have locally verified that the gather merge gets executed in rescan path. I haven't added a test case for the same as having gather or gather merge on the inner side of join can be time-consuming. However, if you or others feel that it is important to have a test to cover this code path, then I can try to produce one. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
On Tue, Aug 15, 2017 at 7:31 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: > On Tue, Aug 15, 2017 at 8:22 AM, Noah Misch <noah@leadboat.com> wrote: >> On Fri, Aug 11, 2017 at 11:59:14AM -0400, Tom Lane wrote: >>> "Adam, Etienne (Nokia-TECH/Issy Les Moulineaux)" <etienne.adam@nokia.com> writes: >>> > ERROR: XX000: unrecognized node type: 90 >>> > LOCATION: ExecReScan, execAmi.c:284 >>> >>> (gdb) p (NodeTag) 90 >>> $1 = T_GatherMergeState >>> >>> So, apparently somebody wrote ExecReScanGatherMerge, but never bothered >>> to plug it into ExecReScan. > > Attached patch fixes the issue for me. I have locally verified that > the gather merge gets executed in rescan path. I haven't added a test > case for the same as having gather or gather merge on the inner side > of join can be time-consuming. However, if you or others feel that it > is important to have a test to cover this code path, then I can try to > produce one. Committed. I believe that between this commit and the test-coverage commit from Andres, this open item is reasonably well addressed. If someone thinks more needs to be done, please specify. Thanks. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Robert Haas <robertmhaas@gmail.com> writes: > On Tue, Aug 15, 2017 at 7:31 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: >> Attached patch fixes the issue for me. I have locally verified that >> the gather merge gets executed in rescan path. I haven't added a test >> case for the same as having gather or gather merge on the inner side >> of join can be time-consuming. However, if you or others feel that it >> is important to have a test to cover this code path, then I can try to >> produce one. > Committed. > I believe that between this commit and the test-coverage commit from > Andres, this open item is reasonably well addressed. If someone > thinks more needs to be done, please specify. Thanks. How big a deal do we think test coverage is? It looks like ExecReScanGatherMerge is identical logic to ExecReScanGather, which *is* covered according to coverage.postgresql.org, but it wouldn't be too surprising if they diverge in future. I should think it wouldn't be that expensive to create a test case, if you already have test cases that invoke GatherMerge. Adding a right join against a VALUES clause with a small number of entries, and a non-mergeable/hashable join clause, ought to do it. regards, tom lane
On Tue, Aug 15, 2017 at 9:46 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > How big a deal do we think test coverage is? It looks like > ExecReScanGatherMerge is identical logic to ExecReScanGather, > which *is* covered according to coverage.postgresql.org, but > it wouldn't be too surprising if they diverge in future. > > I should think it wouldn't be that expensive to create a test > case, if you already have test cases that invoke GatherMerge. > Adding a right join against a VALUES clause with a small number of > entries, and a non-mergeable/hashable join clause, ought to do it. I chatted with Amit about this -- he's planning to look into it. I assume we'll hear from him tomorrow about this, but for official status update purposes I'll set a next-update date of one week from today (August 23rd). -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Tue, Aug 15, 2017 at 7:16 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Robert Haas <robertmhaas@gmail.com> writes: >> On Tue, Aug 15, 2017 at 7:31 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: >>> Attached patch fixes the issue for me. I have locally verified that >>> the gather merge gets executed in rescan path. I haven't added a test >>> case for the same as having gather or gather merge on the inner side >>> of join can be time-consuming. However, if you or others feel that it >>> is important to have a test to cover this code path, then I can try to >>> produce one. > >> Committed. > >> I believe that between this commit and the test-coverage commit from >> Andres, this open item is reasonably well addressed. If someone >> thinks more needs to be done, please specify. Thanks. > > How big a deal do we think test coverage is? It looks like > ExecReScanGatherMerge is identical logic to ExecReScanGather, > which *is* covered according to coverage.postgresql.org, but > it wouldn't be too surprising if they diverge in future. > > I should think it wouldn't be that expensive to create a test > case, if you already have test cases that invoke GatherMerge. > Adding a right join against a VALUES clause with a small number of > entries, and a non-mergeable/hashable join clause, ought to do it. > I have done some experiments based on this idea to generate a test, but I think it is not as straightforward as it appears. Below are some of my experiments: Query that uses GatherMerge in regression tests --------------------------------------------------------------------- regression=# explain (costs off) select string4, count((unique2)) from tenk1 group by string4 order by string4; QUERY PLAN ----------------------------------------------------Finalize GroupAggregate Group Key: string4 -> Gather Merge Workers Planned: 2 -> Partial GroupAggregate Group Key: string4 -> Sort Sort Key: string4 -> Parallel Seq Scan on tenk1 (9 rows) Modified Query ---------------------- regression=# explain (costs off) select tenk1.hundred, count((unique2)) from tenk1 right join (values (1,100), (2,200)) as t (two, hundred) on t.two > 1 and tenk1.hundred > 1 group by tenk1.hundred order by tenk1.hundred; QUERY PLAN --------------------------------------------------------------------------Sort Sort Key: tenk1.hundred -> HashAggregate Group Key: tenk1.hundred -> Nested Loop Left Join Join Filter: ("*VALUES*".column1> 1) -> Values Scan on "*VALUES*" -> Gather Workers Planned:4 -> Parallel Index Scan using tenk1_hundred on tenk1 Index Cond: (hundred> 1) (11 rows) The cost of GatherMerge is always higher than Gather in this case which is quite obvious as GatherMerge has to perform some additional task. I am not aware of a way such that Grouping and Sorting can be pushed below parallel node (Gather/GatherMerge) in this case, if there is any such possibility, then it might prefer GatherMerge. Another way to make it parallel is, add a new guc enable_gather similar to enable_gathermerge and then set that to off, it will prefer GatherMerge in that case. I think it is anyway good to have such a guc. I will go and do it this way unless you have a better idea. Note - enable_gathermerge is not present in postgresql.conf. I think we should add it in the postgresql.conf.sample file. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
On Thu, Aug 17, 2017 at 4:37 PM, Amit Kapila <amit.kapila16@gmail.com> wrote: > Note - enable_gathermerge is not present in postgresql.conf. I think > we should add it in the postgresql.conf.sample file. +1 See also https://www.postgresql.org/message-id/CAEepm=0B7yM9MZSviq1d-hnt4KoaRVeJvSfAyVfykNV-pVDqug@mail.gmail.com . -- Thomas Munro http://www.enterprisedb.com
On Thu, Aug 17, 2017 at 10:07 AM, Amit Kapila <amit.kapila16@gmail.com> wrote: > On Tue, Aug 15, 2017 at 7:16 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> >>> I believe that between this commit and the test-coverage commit from >>> Andres, this open item is reasonably well addressed. If someone >>> thinks more needs to be done, please specify. Thanks. >> >> How big a deal do we think test coverage is? It looks like >> ExecReScanGatherMerge is identical logic to ExecReScanGather, >> which *is* covered according to coverage.postgresql.org, but >> it wouldn't be too surprising if they diverge in future. >> >> I should think it wouldn't be that expensive to create a test >> case, if you already have test cases that invoke GatherMerge. >> Adding a right join against a VALUES clause with a small number of >> entries, and a non-mergeable/hashable join clause, ought to do it. >> > > > Another way to make it parallel is, add a new guc enable_gather > similar to enable_gathermerge and then set that to off, it will prefer > GatherMerge in that case. I think it is anyway good to have such a > guc. I will go and do it this way unless you have a better idea. > Going by above, I have created two separate patches. First to introduce a new guc enable_gather and second patch to test the rescan behavior of gather merge. I have found a problem in the rescan path of gather merge which is that it is not initializing the gather merge state which is required to initialize the heap for processing of tuples. I think this should have been caught earlier, but probably I didn't notice it because in the previous tests left side would not have passed enough rows to hit this case. I have fixed it in the attached patch (execrescan_gathermerge_v2). > Note - enable_gathermerge is not present in postgresql.conf. I think > we should add it in the postgresql.conf.sample file. > Thomas has already posted a patch to handle this problem. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
Amit Kapila <amit.kapila16@gmail.com> writes: > On Tue, Aug 15, 2017 at 7:16 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> I should think it wouldn't be that expensive to create a test >> case, if you already have test cases that invoke GatherMerge. >> Adding a right join against a VALUES clause with a small number of >> entries, and a non-mergeable/hashable join clause, ought to do it. > I have done some experiments based on this idea to generate a test, > but I think it is not as straightforward as it appears. I did this (the first 4 SETs duplicate what's already used in select_parallel.sql): regression=# set parallel_setup_cost=0; SET regression=# set parallel_tuple_cost=0; SET regression=# set min_parallel_table_scan_size=0; SET regression=# set max_parallel_workers_per_gather=4; SET regression=# set enable_hashagg TO 0; SET regression=# set enable_material TO 0; SET regression=# explain select * from (select string4, count((unique2)) from tenk1 group by string4 order by string4) ss right join (values(1),(2)) v(x) on true; QUERY PLAN --------------------------------------------------------------------------------------------------Nested Loop Left Join (cost=524.15..1086.77 rows=8 width=76) -> Values Scan on "*VALUES*" (cost=0.00..0.03 rows=2 width=4) -> Finalize GroupAggregate (cost=524.15..543.29 rows=4 width=72) Group Key: tenk1.string4 -> Gather Merge (cost=524.15..543.17rows=16 width=72) Workers Planned: 4 -> Partial GroupAggregate (cost=524.10..542.89rows=4 width=72) Group Key: tenk1.string4 -> Sort (cost=524.10..530.35rows=2500 width=68) Sort Key: tenk1.string4 -> ParallelSeq Scan on tenk1 (cost=0.00..383.00 rows=2500 width=68) (11 rows) regression=# select * from (select string4, count((unique2)) from tenk1 group by string4 order by string4) ss right join (values(1),(2)) v(x) on true; server closed the connection unexpectedly So, not only is it not that hard to reach ExecReScanGatherMerge, but there is indeed a bug to fix there somewhere. The stack trace indicates that the failure occurs in a later execution of ExecGatherMerge: Program terminated with signal 11, Segmentation fault. #0 0x000000000064b4e4 in swap_nodes (heap=0x15a9440) at binaryheap.c:223 223 heap->bh_nodes[a] = heap->bh_nodes[b]; (gdb) bt #0 0x000000000064b4e4 in swap_nodes (heap=0x15a9440) at binaryheap.c:223 #1 binaryheap_remove_first (heap=0x15a9440) at binaryheap.c:189 #2 0x0000000000634196 in gather_merge_getnext (pstate=<value optimized out>) at nodeGatherMerge.c:479 #3 ExecGatherMerge (pstate=<value optimized out>) at nodeGatherMerge.c:241 #4 0x00000000006251fe in ExecProcNode (aggstate=0x157a6d0) at ../../../src/include/executor/executor.h:249 #5 fetch_input_tuple (aggstate=0x157a6d0) at nodeAgg.c:688 #6 0x0000000000629264 in agg_retrieve_direct (pstate=<value optimized out>) at nodeAgg.c:2313 #7 ExecAgg (pstate=<value optimized out>) at nodeAgg.c:2124 #8 0x00000000006396ef in ExecProcNode (pstate=0x1579d98) at ../../../src/include/executor/executor.h:249 #9 ExecNestLoop (pstate=0x1579d98) at nodeNestloop.c:160 #10 0x000000000061bc3f in ExecProcNode (queryDesc=0x14d5570, direction=<value optimized out>, count=0, execute_once=-104'\230') at ../../../src/include/executor/executor.h:249 #11 ExecutePlan (queryDesc=0x14d5570, direction=<value optimized out>, count=0, execute_once=-104 '\230') at execMain.c:1693 #12 standard_ExecutorRun (queryDesc=0x14d5570, direction=<value optimized out>, count=0, execute_once=-104 '\230') atexecMain.c:362 regards, tom lane
On Thu, Aug 17, 2017 at 7:49 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Amit Kapila <amit.kapila16@gmail.com> writes: >> On Tue, Aug 15, 2017 at 7:16 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >>> I should think it wouldn't be that expensive to create a test >>> case, if you already have test cases that invoke GatherMerge. >>> Adding a right join against a VALUES clause with a small number of >>> entries, and a non-mergeable/hashable join clause, ought to do it. > >> I have done some experiments based on this idea to generate a test, >> but I think it is not as straightforward as it appears. > > I did this (the first 4 SETs duplicate what's already used in > select_parallel.sql): > > regression=# set parallel_setup_cost=0; > SET > regression=# set parallel_tuple_cost=0; > SET > regression=# set min_parallel_table_scan_size=0; > SET > regression=# set max_parallel_workers_per_gather=4; > SET > regression=# set enable_hashagg TO 0; > SET > regression=# set enable_material TO 0; > SET > regression=# explain select * from (select string4, count((unique2)) > from tenk1 group by string4 order by string4) ss right join > (values(1),(2)) v(x) on true; > QUERY PLAN > -------------------------------------------------------------------------------------------------- > Nested Loop Left Join (cost=524.15..1086.77 rows=8 width=76) > -> Values Scan on "*VALUES*" (cost=0.00..0.03 rows=2 width=4) > -> Finalize GroupAggregate (cost=524.15..543.29 rows=4 width=72) > Group Key: tenk1.string4 > -> Gather Merge (cost=524.15..543.17 rows=16 width=72) > Workers Planned: 4 > -> Partial GroupAggregate (cost=524.10..542.89 rows=4 width=72) > Group Key: tenk1.string4 > -> Sort (cost=524.10..530.35 rows=2500 width=68) > Sort Key: tenk1.string4 > -> Parallel Seq Scan on tenk1 (cost=0.00..383.00 rows=2500 width=68) > (11 rows) > > regression=# select * from (select string4, count((unique2)) > from tenk1 group by string4 order by string4) ss right join > (values(1),(2)) v(x) on true; > server closed the connection unexpectedly > > > So, not only is it not that hard to reach ExecReScanGatherMerge, > but there is indeed a bug to fix there somewhere. The stack > trace indicates that the failure occurs in a later execution > of ExecGatherMerge: > This will be fixed by the patch [1] (execrescan_gathermerge_v2.patch) I posted sometime back. The test case is slightly different, but may I can re post the patch with your test case. [1] - https://www.postgresql.org/message-id/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ%40mail.gmail.com -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
On Thu, Aug 17, 2017 at 8:08 PM, Amit Kapila <amit.kapila16@gmail.com> wrote: > On Thu, Aug 17, 2017 at 7:49 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> Amit Kapila <amit.kapila16@gmail.com> writes: >>> On Tue, Aug 15, 2017 at 7:16 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >>>> I should think it wouldn't be that expensive to create a test >>>> case, if you already have test cases that invoke GatherMerge. >>>> Adding a right join against a VALUES clause with a small number of >>>> entries, and a non-mergeable/hashable join clause, ought to do it. >> >>> I have done some experiments based on this idea to generate a test, >>> but I think it is not as straightforward as it appears. >> >> I did this (the first 4 SETs duplicate what's already used in >> select_parallel.sql): >> >> regression=# set parallel_setup_cost=0; >> SET >> regression=# set parallel_tuple_cost=0; >> SET >> regression=# set min_parallel_table_scan_size=0; >> SET >> regression=# set max_parallel_workers_per_gather=4; >> SET >> regression=# set enable_hashagg TO 0; >> SET >> regression=# set enable_material TO 0; >> SET >> regression=# explain select * from (select string4, count((unique2)) >> from tenk1 group by string4 order by string4) ss right join >> (values(1),(2)) v(x) on true; >> QUERY PLAN >> -------------------------------------------------------------------------------------------------- >> Nested Loop Left Join (cost=524.15..1086.77 rows=8 width=76) >> -> Values Scan on "*VALUES*" (cost=0.00..0.03 rows=2 width=4) >> -> Finalize GroupAggregate (cost=524.15..543.29 rows=4 width=72) >> Group Key: tenk1.string4 >> -> Gather Merge (cost=524.15..543.17 rows=16 width=72) >> Workers Planned: 4 >> -> Partial GroupAggregate (cost=524.10..542.89 rows=4 width=72) >> Group Key: tenk1.string4 >> -> Sort (cost=524.10..530.35 rows=2500 width=68) >> Sort Key: tenk1.string4 >> -> Parallel Seq Scan on tenk1 (cost=0.00..383.00 rows=2500 width=68) >> (11 rows) >> >> regression=# select * from (select string4, count((unique2)) >> from tenk1 group by string4 order by string4) ss right join >> (values(1),(2)) v(x) on true; >> server closed the connection unexpectedly >> >> >> So, not only is it not that hard to reach ExecReScanGatherMerge, >> but there is indeed a bug to fix there somewhere. The stack >> trace indicates that the failure occurs in a later execution >> of ExecGatherMerge: >> > > This will be fixed by the patch [1] (execrescan_gathermerge_v2.patch) > I posted sometime back. The test case is slightly different, but may > I can re post the patch with your test case. > I have kept the fix as it is but changed the test to match your test. I think the another patch posted above to add a new guc for enable_gather is still relevant and important. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
Attachment
Amit Kapila <amit.kapila16@gmail.com> writes: > I think the another patch posted above to add a new guc for > enable_gather is still relevant and important. FWIW, I'm pretty much -1 on that. I don't see that it has any real-world use; somebody who wants to turn that off would likely really want to turn off parallelism altogether. We have mucho knobs in that area already. regards, tom lane
Amit Kapila <amit.kapila16@gmail.com> writes: >> This will be fixed by the patch [1] (execrescan_gathermerge_v2.patch) >> I posted sometime back. The test case is slightly different, but may >> I can re post the patch with your test case. > I have kept the fix as it is but changed the test to match your test. Pushed, with minor tidying of the test case. I think we can now close this open item. regards, tom lane
I wrote: > Pushed, with minor tidying of the test case. I think we can now > close this open item. Nope, spoke too soon. See buildfarm. (Man, am I glad I insisted on a test case.) regards, tom lane
On Thu, Aug 17, 2017 at 2:06 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > I wrote: >> Pushed, with minor tidying of the test case. I think we can now >> close this open item. > > Nope, spoke too soon. See buildfarm. > > (Man, am I glad I insisted on a test case.) Whoa, that's not good. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Robert Haas <robertmhaas@gmail.com> writes: > On Thu, Aug 17, 2017 at 2:06 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> Nope, spoke too soon. See buildfarm. > Whoa, that's not good. Ah-hah, I see my dromedary box is one of the ones failing, so I'll have a look there. I can't reproduce it on my other machines. I'm a bit suspicious that it's got something to do with getting a different number of workers during restart. Whether that's the issue or not, though, it sure seems like a rescan leaks an unpleasantly large amount of memory. I wonder if we shouldn't refactor this so that the per-reader structs can be reused. regards, tom lane
I wrote: > Ah-hah, I see my dromedary box is one of the ones failing, so I'll > have a look there. I can't reproduce it on my other machines. OK, so this is a whole lot more broken than I thought :-(. Bear in mind that the plan for this (omitting uninteresting detail) is Nested Loop Left Join -> Values Scan on "*VALUES*" -> Finalize GroupAggregate -> Gather Merge -> Partial GroupAggregate -> Sort -> Parallel Seq Scan on tenk1 What seems to be happening is that: 1. On the first pass, the parallel seqscan work gets doled out to several workers, plus the leader, as expected. 2. When the nestloop rescans its right input, ExecReScanGatherMerge supposes that this is good enough to handle rescanning its subnodes: ExecReScan(node->ps.lefttree); Leaving aside the question of why that doesn't look like nearly every other child rescan call, what happens is that that invokes ExecReScanAgg, which does the more usual thing: if (outerPlan->chgParam == NULL) ExecReScan(outerPlan); and so that invokes ExecReScanSort, and then behold what ExecReScanSort thinks it can optimize away: * If subnode is to be rescanned then we forget previous sort results; we * have to re-read the subplan and re-sort. Also must re-sort if the * bounded-sort parameters changed or we didn't select randomAccess. * * Otherwisewe can just rewind and rescan the sorted output. So we never get to ExecReScanSeqScan, and thus not to heap_rescan, with the effect that parallel_scan->phs_nallocated never gets reset. 3. On the next pass, we fire up all the workers as expected, but they all perceive phs_nallocated >= rs_nblocks and conclude they have nothing to do. Meanwhile, in the leader, nodeSort just re-emits the sorted data it had last time. Net effect is that the GatherMerge node returns only the fraction of the data that was scanned by the leader in the first pass. 4. The fact that the test succeeds on many machines implies that the leader process is usually doing *all* of the work. This is in itself not very good. Even on the machines where it fails, the fact that the tuple counts are usually a pretty large fraction of the expected values indicates that the leader usually did most of the work. We need to take a closer look at why that is. However, the bottom line here is that parallel scan is completely broken for rescans, and it's not (solely) the fault of nodeGatherMerge; rather, the issue is that nobody bothered to wire up parallelism to the rescan parameterization mechanism. I imagine that related bugs can be demonstrated in 9.6 with little effort. I think that the correct fix probably involves marking each parallel scan plan node as dependent on a pseudo executor parameter, which the parent Gather or GatherMerge node would flag as being changed on each rescan. This would cue the plan layers in between that they cannot optimize on the assumption that the leader's instance of the parallel scan will produce exactly the same rows as it did last time, even when "nothing else changed". The "wtParam" pseudo parameter that's used for communication between RecursiveUnion and its descendant WorkTableScan node is a good model for what needs to happen. A possibly-simpler fix would be to abandon the idea that the leader should do any of the work, but I imagine that will be unpopular. As I mentioned, I'm outta here for the next week. I'd be willing to work on, or review, a patch for this when I get back. regards, tom lane PS: while I was trying to decipher this, I found three or four other minor bugs or near-bugs in nodeGatherMerge.c. But none of them seem to be triggering in this example. I plan to do a round of code-review-ish fixes there when I get back.
On Fri, Aug 18, 2017 at 3:50 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > I wrote: >> Ah-hah, I see my dromedary box is one of the ones failing, so I'll >> have a look there. I can't reproduce it on my other machines. > > OK, so this is a whole lot more broken than I thought :-(. > > Bear in mind that the plan for this (omitting uninteresting detail) is > > Nested Loop Left Join > -> Values Scan on "*VALUES*" > -> Finalize GroupAggregate > -> Gather Merge > -> Partial GroupAggregate > -> Sort > -> Parallel Seq Scan on tenk1 > > What seems to be happening is that: > > 1. On the first pass, the parallel seqscan work gets doled out to several > workers, plus the leader, as expected. > > 2. When the nestloop rescans its right input, ExecReScanGatherMerge > supposes that this is good enough to handle rescanning its subnodes: > > ExecReScan(node->ps.lefttree); > > Leaving aside the question of why that doesn't look like nearly every > other child rescan call, what happens is that that invokes ExecReScanAgg, > which does the more usual thing: > > if (outerPlan->chgParam == NULL) > ExecReScan(outerPlan); > > and so that invokes ExecReScanSort, and then behold what ExecReScanSort > thinks it can optimize away: > > * If subnode is to be rescanned then we forget previous sort results; we > * have to re-read the subplan and re-sort. Also must re-sort if the > * bounded-sort parameters changed or we didn't select randomAccess. > * > * Otherwise we can just rewind and rescan the sorted output. > > So we never get to ExecReScanSeqScan, and thus not to heap_rescan, > with the effect that parallel_scan->phs_nallocated never gets reset. > > 3. On the next pass, we fire up all the workers as expected, but they all > perceive phs_nallocated >= rs_nblocks and conclude they have nothing to > do. Meanwhile, in the leader, nodeSort just re-emits the sorted data it > had last time. Net effect is that the GatherMerge node returns only the > fraction of the data that was scanned by the leader in the first pass. > > 4. The fact that the test succeeds on many machines implies that the > leader process is usually doing *all* of the work. This is in itself not > very good. Even on the machines where it fails, the fact that the tuple > counts are usually a pretty large fraction of the expected values > indicates that the leader usually did most of the work. We need to take > a closer look at why that is. > > However, the bottom line here is that parallel scan is completely broken > for rescans, and it's not (solely) the fault of nodeGatherMerge; rather, > the issue is that nobody bothered to wire up parallelism to the rescan > parameterization mechanism. > I think we don't generate parallel plans for parameterized paths, so I am not sure whether any work is required in that area. > I imagine that related bugs can be > demonstrated in 9.6 with little effort. > > I think that the correct fix probably involves marking each parallel scan > plan node as dependent on a pseudo executor parameter, which the parent > Gather or GatherMerge node would flag as being changed on each rescan. > This would cue the plan layers in between that they cannot optimize on the > assumption that the leader's instance of the parallel scan will produce > exactly the same rows as it did last time, even when "nothing else > changed". The "wtParam" pseudo parameter that's used for communication > between RecursiveUnion and its descendant WorkTableScan node is a good > model for what needs to happen. > Yeah, that seems like a good idea. I think another way could be to *not* optimize rescanning when we are in parallel mode (IsInParallelMode()), that might be restrictive as compared to what you are suggesting, but will be somewhat simpler. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
I wrote: > 4. The fact that the test succeeds on many machines implies that the > leader process is usually doing *all* of the work. This is in itself not > very good. Even on the machines where it fails, the fact that the tuple > counts are usually a pretty large fraction of the expected values > indicates that the leader usually did most of the work. We need to take > a closer look at why that is. I've spent some time poking into this, and it seems the bottom line is that the time needed to launch a parallel worker and get it started on running the query plan is comparable to the time needed to scan all 10000 rows of tenk1. Maybe this isn't surprising, I dunno; but it doesn't give me a warm feeling about how much exercise the parallel scan machinery is really getting in select_parallel.sql. Not sure what to do about that --- I don't really want to make the regression test case large enough (and slow enough) to provide a more realistic scenario. In the meantime, I've been able to make the failure reproducible by the expedient of sticking "pg_usleep(1000)" into heap_parallelscan_nextpage(), thus slowing down the leader's scan enough so that the workers can get started. > However, the bottom line here is that parallel scan is completely broken > for rescans, and it's not (solely) the fault of nodeGatherMerge; rather, > the issue is that nobody bothered to wire up parallelism to the rescan > parameterization mechanism. I imagine that related bugs can be > demonstrated in 9.6 with little effort. I've so far been unable to break it for cases involving only Gather. The issue is triggered, basically, by having a Sort or HashAgg node below Gather[Merge], since either of those can decide that they don't need to rescan their child. While it's not terribly hard to get the planner to make such plans, you always end up with another Sort or HashAgg above the Gather, and that masks the problem because the upper node makes the same decision that it needn't rescan its child, protecting the Gather from being run more than once. The particular plan shape that a2b70c89c used, -> Finalize GroupAggregate -> Gather Merge -> Partial GroupAggregate -> Sort -> Parallel Seq Scan on tenk1 does exhibit the problem, but it requires GatherMerge so that no extra sort is needed above the parallel subplan. This may mean that we don't need to risk back-patching the fix into 9.6. I'm not totally convinced of that yet, but I can't show that it's needed given 9.6's limited support for parallelism. regards, tom lane
I wrote: > I think that the correct fix probably involves marking each parallel scan > plan node as dependent on a pseudo executor parameter, which the parent > Gather or GatherMerge node would flag as being changed on each rescan. > This would cue the plan layers in between that they cannot optimize on the > assumption that the leader's instance of the parallel scan will produce > exactly the same rows as it did last time, even when "nothing else > changed". The "wtParam" pseudo parameter that's used for communication > between RecursiveUnion and its descendant WorkTableScan node is a good > model for what needs to happen. Here is a draft patch for this. It's a bit different from wtParam in that the special parameter isn't allocated until createplan.c time, so that we don't eat a parameter slot if we end up choosing a non-parallel plan; but otherwise things are comparable. I could use some feedback on whether this is marking dependent child nodes sanely. As written, any plan node that's marked parallel_aware is assumed to need a dependency on the parent Gather or GatherMerge's rescan param --- and the planner will now bitch if a parallel_aware plan node is not under any such Gather. Is this reasonable? I do not see any documentation that defines the parallel_aware field with enough clarity to be very sure about it. I included the heap_parallelscan_nextpage hack I'm using to make the original failure reproducible, but that hunk is not meant for commit. Also, the regression test case is the same as in a2b70c89c. regards, tom lane diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ff03c68..c478897 100644 *** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** heap_parallelscan_nextpage(HeapScanDesc *** 1763,1768 **** --- 1763,1770 ---- ss_report_location(scan->rs_rd, parallel_scan->phs_startblock); } + pg_usleep(random()/1000000); + return page; } diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index e8d94ee..3aa819f 100644 *** a/src/backend/executor/nodeGather.c --- b/src/backend/executor/nodeGather.c *************** ExecShutdownGather(GatherState *node) *** 430,435 **** --- 430,438 ---- void ExecReScanGather(GatherState *node) { + Gather *gather = (Gather *) node->ps.plan; + PlanState *outerPlan = outerPlanState(node); + /* * Re-initialize the parallel workers to perform rescan of relation. We * want to gracefully shutdown all the workers so that they should be able *************** ExecReScanGather(GatherState *node) *** 443,447 **** if (node->pei) ExecParallelReinitialize(node->pei); ! ExecReScan(node->ps.lefttree); } --- 446,467 ---- if (node->pei) ExecParallelReinitialize(node->pei); ! /* ! * Set child node's chgParam to tell it that the next scan might deliver a ! * different set of rows within the leader process. (The overall rowset ! * shouldn't change, but the leader process's subset might; hence nodes ! * between here and the parallel table scan node mustn't optimize on the ! * assumption of an unchanging rowset.) ! */ ! if (gather->rescan_param >= 0) ! outerPlan->chgParam = bms_add_member(outerPlan->chgParam, ! gather->rescan_param); ! ! ! /* ! * if chgParam of subnode is not null then plan will be re-scanned by ! * first ExecProcNode. ! */ ! if (outerPlan->chgParam == NULL) ! ExecReScan(outerPlan); } diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 64c6239..e8c70df 100644 *** a/src/backend/executor/nodeGatherMerge.c --- b/src/backend/executor/nodeGatherMerge.c *************** ExecShutdownGatherMergeWorkers(GatherMer *** 325,330 **** --- 325,333 ---- void ExecReScanGatherMerge(GatherMergeState *node) { + GatherMerge *gm = (GatherMerge *) node->ps.plan; + PlanState *outerPlan = outerPlanState(node); + /* * Re-initialize the parallel workers to perform rescan of relation. We * want to gracefully shutdown all the workers so that they should be able *************** ExecReScanGatherMerge(GatherMergeState * *** 339,345 **** if (node->pei) ExecParallelReinitialize(node->pei); ! ExecReScan(node->ps.lefttree); } /* --- 342,365 ---- if (node->pei) ExecParallelReinitialize(node->pei); ! /* ! * Set child node's chgParam to tell it that the next scan might deliver a ! * different set of rows within the leader process. (The overall rowset ! * shouldn't change, but the leader process's subset might; hence nodes ! * between here and the parallel table scan node mustn't optimize on the ! * assumption of an unchanging rowset.) ! */ ! if (gm->rescan_param >= 0) ! outerPlan->chgParam = bms_add_member(outerPlan->chgParam, ! gm->rescan_param); ! ! ! /* ! * if chgParam of subnode is not null then plan will be re-scanned by ! * first ExecProcNode. ! */ ! if (outerPlan->chgParam == NULL) ! ExecReScan(outerPlan); } /* diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 7204169..f9ddf4e 100644 *** a/src/backend/nodes/copyfuncs.c --- b/src/backend/nodes/copyfuncs.c *************** _copyGather(const Gather *from) *** 361,366 **** --- 361,367 ---- * copy remainder of node */ COPY_SCALAR_FIELD(num_workers); + COPY_SCALAR_FIELD(rescan_param); COPY_SCALAR_FIELD(single_copy); COPY_SCALAR_FIELD(invisible); *************** _copyGatherMerge(const GatherMerge *from *** 384,389 **** --- 385,391 ---- * copy remainder of node */ COPY_SCALAR_FIELD(num_workers); + COPY_SCALAR_FIELD(rescan_param); COPY_SCALAR_FIELD(numCols); COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber)); COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 5ce3c7c..9ee3e23 100644 *** a/src/backend/nodes/outfuncs.c --- b/src/backend/nodes/outfuncs.c *************** _outGather(StringInfo str, const Gather *** 479,484 **** --- 479,485 ---- _outPlanInfo(str, (const Plan *) node); WRITE_INT_FIELD(num_workers); + WRITE_INT_FIELD(rescan_param); WRITE_BOOL_FIELD(single_copy); WRITE_BOOL_FIELD(invisible); } *************** _outGatherMerge(StringInfo str, const Ga *** 493,498 **** --- 494,500 ---- _outPlanInfo(str, (const Plan *) node); WRITE_INT_FIELD(num_workers); + WRITE_INT_FIELD(rescan_param); WRITE_INT_FIELD(numCols); appendStringInfoString(str, " :sortColIdx"); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 86c811d..67b9e19 100644 *** a/src/backend/nodes/readfuncs.c --- b/src/backend/nodes/readfuncs.c *************** _readGather(void) *** 2163,2168 **** --- 2163,2169 ---- ReadCommonPlan(&local_node->plan); READ_INT_FIELD(num_workers); + READ_INT_FIELD(rescan_param); READ_BOOL_FIELD(single_copy); READ_BOOL_FIELD(invisible); *************** _readGatherMerge(void) *** 2180,2185 **** --- 2181,2187 ---- ReadCommonPlan(&local_node->plan); READ_INT_FIELD(num_workers); + READ_INT_FIELD(rescan_param); READ_INT_FIELD(numCols); READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols); READ_OID_ARRAY(sortOperators, local_node->numCols); diff --git a/src/backend/optimizer/README b/src/backend/optimizer/README index fc0fca4..62242e8 100644 *** a/src/backend/optimizer/README --- b/src/backend/optimizer/README *************** RelOptInfo - a relation or joined r *** 374,379 **** --- 374,380 ---- MaterialPath - a Material plan node UniquePath - remove duplicate rows (either by hashing or sorting) GatherPath - collect the results of parallel workers + GatherMergePath - collect parallel results, preserving their common sort order ProjectionPath - a Result plan node with child (used for projection) ProjectSetPath - a ProjectSet plan node applied to some sub-path SortPath - a Sort plan node applied to some sub-path *************** either by an entire query or some portio *** 1030,1036 **** some of that work can be done by one or more worker processes, which are called parallel workers. Parallel workers are a subtype of dynamic background workers; see src/backend/access/transam/README.parallel for a ! fuller description. Academic literature on parallel query suggests that that parallel execution strategies can be divided into essentially two categories: pipelined parallelism, where the execution of the query is divided into multiple stages and each stage is handled by a separate --- 1031,1037 ---- some of that work can be done by one or more worker processes, which are called parallel workers. Parallel workers are a subtype of dynamic background workers; see src/backend/access/transam/README.parallel for a ! fuller description. The academic literature on parallel query suggests that parallel execution strategies can be divided into essentially two categories: pipelined parallelism, where the execution of the query is divided into multiple stages and each stage is handled by a separate *************** that the underlying table be partitioned *** 1046,1061 **** there is some method of dividing the data from at least one of the base tables involved in the relation across multiple processes, (2) allowing each process to handle its own portion of the data, and then (3) ! collecting the results. Requirements (2) and (3) is satisfied by the ! executor node Gather, which launches any number of worker processes and ! executes its single child plan in all of them (and perhaps in the leader ! also, if the children aren't generating enough data to keep the leader ! busy). Requirement (1) is handled by the SeqScan node: when invoked ! with parallel_aware = true, this node will, in effect, partition the ! table on a block by block basis, returning a subset of the tuples from ! the relation in each worker where that SeqScan is executed. A similar ! scheme could be (and probably should be) implemented for bitmap heap ! scans. Just as we do for non-parallel access methods, we build Paths to represent access strategies that can be used in a parallel plan. These --- 1047,1060 ---- there is some method of dividing the data from at least one of the base tables involved in the relation across multiple processes, (2) allowing each process to handle its own portion of the data, and then (3) ! collecting the results. Requirements (2) and (3) are satisfied by the ! executor node Gather (or GatherMerge), which launches any number of worker ! processes and executes its single child plan in all of them, and perhaps ! in the leader also, if the children aren't generating enough data to keep ! the leader busy. Requirement (1) is handled by the table scan node: when ! invoked with parallel_aware = true, this node will, in effect, partition ! the table on a block by block basis, returning a subset of the tuples from ! the relation in each worker where that scan node is executed. Just as we do for non-parallel access methods, we build Paths to represent access strategies that can be used in a parallel plan. These diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 5c934f2..2821662 100644 *** a/src/backend/optimizer/plan/createplan.c --- b/src/backend/optimizer/plan/createplan.c *************** static Unique *make_unique_from_sortclau *** 267,273 **** static Unique *make_unique_from_pathkeys(Plan *lefttree, List *pathkeys, int numCols); static Gather *make_gather(List *qptlist, List *qpqual, ! int nworkers, bool single_copy, Plan *subplan); static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy, Plan *lefttree, List *distinctList, AttrNumber flagColIdx, int firstFlag, long numGroups); --- 267,273 ---- static Unique *make_unique_from_pathkeys(Plan *lefttree, List *pathkeys, int numCols); static Gather *make_gather(List *qptlist, List *qpqual, ! int nworkers, int rescan_param, bool single_copy, Plan *subplan); static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy, Plan *lefttree, List *distinctList, AttrNumber flagColIdx, int firstFlag, long numGroups); *************** create_gather_plan(PlannerInfo *root, Ga *** 1471,1476 **** --- 1471,1477 ---- gather_plan = make_gather(tlist, NIL, best_path->num_workers, + SS_assign_special_param(root), best_path->single_copy, subplan); *************** create_gather_merge_plan(PlannerInfo *ro *** 1505,1510 **** --- 1506,1514 ---- gm_plan->num_workers = best_path->num_workers; copy_generic_path_info(&gm_plan->plan, &best_path->path); + /* Assign the rescan Param. */ + gm_plan->rescan_param = SS_assign_special_param(root); + /* Gather Merge is pointless with no pathkeys; use Gather instead. */ Assert(pathkeys != NIL); *************** static Gather * *** 6238,6243 **** --- 6242,6248 ---- make_gather(List *qptlist, List *qpqual, int nworkers, + int rescan_param, bool single_copy, Plan *subplan) { *************** make_gather(List *qptlist, *** 6249,6254 **** --- 6254,6260 ---- plan->lefttree = subplan; plan->righttree = NULL; node->num_workers = nworkers; + node->rescan_param = rescan_param; node->single_copy = single_copy; node->invisible = false; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index fdef00a..9662302 100644 *** a/src/backend/optimizer/plan/planner.c --- b/src/backend/optimizer/plan/planner.c *************** standard_planner(Query *parse, int curso *** 375,380 **** --- 375,386 ---- gather->invisible = (force_parallel_mode == FORCE_PARALLEL_REGRESS); /* + * Since this Gather has no parallel-aware descendants to signal to, + * we don't need a rescan Param. + */ + gather->rescan_param = -1; + + /* * Ideally we'd use cost_gather here, but setting up dummy path data * to satisfy it doesn't seem much cleaner than knowing what it does. */ diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index ffbd3ee..1103984 100644 *** a/src/backend/optimizer/plan/subselect.c --- b/src/backend/optimizer/plan/subselect.c *************** static Node *process_sublinks_mutator(No *** 79,84 **** --- 79,85 ---- process_sublinks_context *context); static Bitmapset *finalize_plan(PlannerInfo *root, Plan *plan, + int gather_param, Bitmapset *valid_params, Bitmapset *scan_params); static bool finalize_primnode(Node *node, finalize_primnode_context *context); *************** void *** 2217,2228 **** SS_finalize_plan(PlannerInfo *root, Plan *plan) { /* No setup needed, just recurse through plan tree. */ ! (void) finalize_plan(root, plan, root->outer_params, NULL); } /* * Recursive processing of all nodes in the plan tree * * valid_params is the set of param IDs supplied by outer plan levels * that are valid to reference in this plan node or its children. * --- 2218,2232 ---- SS_finalize_plan(PlannerInfo *root, Plan *plan) { /* No setup needed, just recurse through plan tree. */ ! (void) finalize_plan(root, plan, -1, root->outer_params, NULL); } /* * Recursive processing of all nodes in the plan tree * + * gather_param is the rescan_param of an ancestral Gather/GatherMerge, + * or -1 if there is none. + * * valid_params is the set of param IDs supplied by outer plan levels * that are valid to reference in this plan node or its children. * *************** SS_finalize_plan(PlannerInfo *root, Plan *** 2249,2255 **** * can be handled more cleanly. */ static Bitmapset * ! finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, Bitmapset *scan_params) { finalize_primnode_context context; --- 2253,2261 ---- * can be handled more cleanly. */ static Bitmapset * ! finalize_plan(PlannerInfo *root, Plan *plan, ! int gather_param, ! Bitmapset *valid_params, Bitmapset *scan_params) { finalize_primnode_context context; *************** finalize_plan(PlannerInfo *root, Plan *p *** 2302,2307 **** --- 2308,2325 ---- finalize_primnode((Node *) plan->targetlist, &context); finalize_primnode((Node *) plan->qual, &context); + /* + * If it's a parallel-aware scan node, mark it as dependent on the parent + * Gather/GatherMerge's rescan Param. + */ + if (plan->parallel_aware) + { + if (gather_param < 0) + elog(ERROR, "parallel-aware plan node is not below a Gather"); + context.paramids = + bms_add_member(context.paramids, gather_param); + } + /* Check additional node-type-specific fields */ switch (nodeTag(plan)) { *************** finalize_plan(PlannerInfo *root, Plan *p *** 2512,2517 **** --- 2530,2536 ---- bms_add_members(context.paramids, finalize_plan(root, (Plan *) lfirst(lc), + gather_param, valid_params, scan_params)); } *************** finalize_plan(PlannerInfo *root, Plan *p *** 2542,2547 **** --- 2561,2567 ---- bms_add_members(context.paramids, finalize_plan(root, (Plan *) lfirst(l), + gather_param, valid_params, scan_params)); } *************** finalize_plan(PlannerInfo *root, Plan *p *** 2558,2563 **** --- 2578,2584 ---- bms_add_members(context.paramids, finalize_plan(root, (Plan *) lfirst(l), + gather_param, valid_params, scan_params)); } *************** finalize_plan(PlannerInfo *root, Plan *p *** 2574,2579 **** --- 2595,2601 ---- bms_add_members(context.paramids, finalize_plan(root, (Plan *) lfirst(l), + gather_param, valid_params, scan_params)); } *************** finalize_plan(PlannerInfo *root, Plan *p *** 2590,2595 **** --- 2612,2618 ---- bms_add_members(context.paramids, finalize_plan(root, (Plan *) lfirst(l), + gather_param, valid_params, scan_params)); } *************** finalize_plan(PlannerInfo *root, Plan *p *** 2606,2611 **** --- 2629,2635 ---- bms_add_members(context.paramids, finalize_plan(root, (Plan *) lfirst(l), + gather_param, valid_params, scan_params)); } *************** finalize_plan(PlannerInfo *root, Plan *p *** 2697,2709 **** &context); break; case T_ProjectSet: case T_Hash: case T_Material: case T_Sort: case T_Unique: - case T_Gather: - case T_GatherMerge: case T_SetOp: case T_Group: /* no node-type-specific fields need fixing */ --- 2721,2771 ---- &context); break; + case T_Gather: + /* child nodes are allowed to reference rescan_param, if any */ + locally_added_param = ((Gather *) plan)->rescan_param; + if (locally_added_param >= 0) + { + valid_params = bms_add_member(bms_copy(valid_params), + locally_added_param); + + /* + * We currently don't support nested Gathers. The issue so + * far as this function is concerned would be how to identify + * which child nodes depend on which Gather. + */ + Assert(gather_param < 0); + /* Pass down rescan_param to child parallel-aware nodes */ + gather_param = locally_added_param; + } + /* rescan_param does *not* get added to scan_params */ + break; + + case T_GatherMerge: + /* child nodes are allowed to reference rescan_param, if any */ + locally_added_param = ((GatherMerge *) plan)->rescan_param; + if (locally_added_param >= 0) + { + valid_params = bms_add_member(bms_copy(valid_params), + locally_added_param); + + /* + * We currently don't support nested Gathers. The issue so + * far as this function is concerned would be how to identify + * which child nodes depend on which Gather. + */ + Assert(gather_param < 0); + /* Pass down rescan_param to child parallel-aware nodes */ + gather_param = locally_added_param; + } + /* rescan_param does *not* get added to scan_params */ + break; + case T_ProjectSet: case T_Hash: case T_Material: case T_Sort: case T_Unique: case T_SetOp: case T_Group: /* no node-type-specific fields need fixing */ *************** finalize_plan(PlannerInfo *root, Plan *p *** 2717,2722 **** --- 2779,2785 ---- /* Process left and right child plans, if any */ child_params = finalize_plan(root, plan->lefttree, + gather_param, valid_params, scan_params); context.paramids = bms_add_members(context.paramids, child_params); *************** finalize_plan(PlannerInfo *root, Plan *p *** 2726,2731 **** --- 2789,2795 ---- /* right child can reference nestloop_params as well as valid_params */ child_params = finalize_plan(root, plan->righttree, + gather_param, bms_union(nestloop_params, valid_params), scan_params); /* ... and they don't count as parameters used at my level */ *************** finalize_plan(PlannerInfo *root, Plan *p *** 2737,2742 **** --- 2801,2807 ---- /* easy case */ child_params = finalize_plan(root, plan->righttree, + gather_param, valid_params, scan_params); } diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 7c51e7f..a382331 100644 *** a/src/include/nodes/plannodes.h --- b/src/include/nodes/plannodes.h *************** typedef struct Unique *** 825,837 **** /* ------------ * gather node * ------------ */ typedef struct Gather { Plan plan; ! int num_workers; ! bool single_copy; bool invisible; /* suppress EXPLAIN display (for testing)? */ } Gather; --- 825,845 ---- /* ------------ * gather node + * + * Note: rescan_param is the ID of a PARAM_EXEC parameter slot. That slot + * will never actually contain a value, but the Gather node must flag it as + * having changed whenever it is rescanned. The child parallel-aware scan + * nodes are marked as depending on that parameter, so that the rescan + * machinery is aware that their output is likely to change across rescans. + * In some cases we don't need a rescan Param, so rescan_param is set to -1. * ------------ */ typedef struct Gather { Plan plan; ! int num_workers; /* planned number of worker processes */ ! int rescan_param; /* ID of Param that signals a rescan, or -1 */ ! bool single_copy; /* don't execute plan more than once */ bool invisible; /* suppress EXPLAIN display (for testing)? */ } Gather; *************** typedef struct Gather *** 842,848 **** typedef struct GatherMerge { Plan plan; ! int num_workers; /* remaining fields are just like the sort-key info in struct Sort */ int numCols; /* number of sort-key columns */ AttrNumber *sortColIdx; /* their indexes in the target list */ --- 850,857 ---- typedef struct GatherMerge { Plan plan; ! int num_workers; /* planned number of worker processes */ ! int rescan_param; /* ID of Param that signals a rescan, or -1 */ /* remaining fields are just like the sort-key info in struct Sort */ int numCols; /* number of sort-key columns */ AttrNumber *sortColIdx; /* their indexes in the target list */ diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 3ccc9d1..bd27088 100644 *** a/src/include/nodes/relation.h --- b/src/include/nodes/relation.h *************** typedef struct GatherPath *** 1268,1276 **** } GatherPath; /* ! * GatherMergePath runs several copies of a plan in parallel and ! * collects the results. For gather merge parallel leader always execute the ! * plan. */ typedef struct GatherMergePath { --- 1268,1276 ---- } GatherPath; /* ! * GatherMergePath runs several copies of a plan in parallel and collects ! * the results, preserving their common sort order. For gather merge, the ! * parallel leader always executes the plan too, so we don't need single_copy. */ typedef struct GatherMergePath { diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 084f0f0..c5bb500 100644 *** a/src/test/regress/expected/select_parallel.out --- b/src/test/regress/expected/select_parallel.out *************** select count(*) from tenk1 group by twen *** 300,305 **** --- 300,348 ---- 500 (20 rows) + --test rescan behavior of gather merge + set enable_material = false; + explain (costs off) + select * from + (select string4, count(unique2) + from tenk1 group by string4 order by string4) ss + right join (values (1),(2),(3)) v(x) on true; + QUERY PLAN + ---------------------------------------------------------- + Nested Loop Left Join + -> Values Scan on "*VALUES*" + -> Finalize GroupAggregate + Group Key: tenk1.string4 + -> Gather Merge + Workers Planned: 4 + -> Partial GroupAggregate + Group Key: tenk1.string4 + -> Sort + Sort Key: tenk1.string4 + -> Parallel Seq Scan on tenk1 + (11 rows) + + select * from + (select string4, count(unique2) + from tenk1 group by string4 order by string4) ss + right join (values (1),(2),(3)) v(x) on true; + string4 | count | x + ---------+-------+--- + AAAAxx | 2500 | 1 + HHHHxx | 2500 | 1 + OOOOxx | 2500 | 1 + VVVVxx | 2500 | 1 + AAAAxx | 2500 | 2 + HHHHxx | 2500 | 2 + OOOOxx | 2500 | 2 + VVVVxx | 2500 | 2 + AAAAxx | 2500 | 3 + HHHHxx | 2500 | 3 + OOOOxx | 2500 | 3 + VVVVxx | 2500 | 3 + (12 rows) + + reset enable_material; -- gather merge test with 0 worker set max_parallel_workers = 0; explain (costs off) diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 58c3f59..bb322b0 100644 *** a/src/test/regress/sql/select_parallel.sql --- b/src/test/regress/sql/select_parallel.sql *************** explain (costs off) *** 118,123 **** --- 118,139 ---- select count(*) from tenk1 group by twenty; + --test rescan behavior of gather merge + set enable_material = false; + + explain (costs off) + select * from + (select string4, count(unique2) + from tenk1 group by string4 order by string4) ss + right join (values (1),(2),(3)) v(x) on true; + + select * from + (select string4, count(unique2) + from tenk1 group by string4 order by string4) ss + right join (values (1),(2),(3)) v(x) on true; + + reset enable_material; + -- gather merge test with 0 worker set max_parallel_workers = 0; explain (costs off) -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Aug 28, 2017 at 1:59 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > I wrote: >> I think that the correct fix probably involves marking each parallel scan >> plan node as dependent on a pseudo executor parameter, which the parent >> Gather or GatherMerge node would flag as being changed on each rescan. >> This would cue the plan layers in between that they cannot optimize on the >> assumption that the leader's instance of the parallel scan will produce >> exactly the same rows as it did last time, even when "nothing else >> changed". The "wtParam" pseudo parameter that's used for communication >> between RecursiveUnion and its descendant WorkTableScan node is a good >> model for what needs to happen. > > Here is a draft patch for this. ! /* ! * Set child node's chgParam to tell it that the next scan might deliver a ! * different set of rows within the leader process. (The overall rowset ! * shouldn't change, but the leader process's subset might; hence nodes ! * between here and the parallel table scan node mustn't optimize on the ! * assumption of an unchanging rowset.) ! */ ! if (gm->rescan_param >= 0) ! outerPlan->chgParam = bms_add_member(outerPlan->chgParam, ! gm->rescan_param); ! ! ! /* ! * if chgParam of subnode is not null then plan will be re-scanned by ! * first ExecProcNode. ! */ ! if (outerPlan->chgParam == NULL) ! ExecReScan(outerPlan); With this change, it is quite possible that during rescans workers will not do any work. I think this will allow workers to launch before rescan (for sequence scan) can reset the scan descriptor in the leader which means that workers will still see the old value and assume that the scan is finished and come out without doing any work. Now, this won't produce wrong results because the leader will scan the whole relation by itself in such a case, but it might be inefficient. It's a bit different from wtParam in > that the special parameter isn't allocated until createplan.c time, > so that we don't eat a parameter slot if we end up choosing a non-parallel > plan; but otherwise things are comparable. > > I could use some feedback on whether this is marking dependent child nodes > sanely. As written, any plan node that's marked parallel_aware is assumed > to need a dependency on the parent Gather or GatherMerge's rescan param > --- and the planner will now bitch if a parallel_aware plan node is not > under any such Gather. Is this reasonable? I think so. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
Amit Kapila <amit.kapila16@gmail.com> writes: > With this change, it is quite possible that during rescans workers > will not do any work. Um, what's different about that than before? regards, tom lane
On Mon, Aug 28, 2017 at 6:01 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Amit Kapila <amit.kapila16@gmail.com> writes: >> With this change, it is quite possible that during rescans workers >> will not do any work. > > Um, what's different about that than before? > Earlier, we perform the rescan of all the nodes before ExecProcNode, so workers will always start (restart) after the scan descriptor is initialized. Now, as evident in the discussion in this thread that was not the right thing for gather merge as some of the nodes like Sort does some optimization due to which rescan for the lower nodes will never be called. So, we need to ensure in some way that we don't skip rescanning in such nodes and one way to achieve that is what you have done in the patch, but it seems to have some side effect. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
Amit Kapila <amit.kapila16@gmail.com> writes: > On Mon, Aug 28, 2017 at 6:01 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> Um, what's different about that than before? > Earlier, we perform the rescan of all the nodes before ExecProcNode, > so workers will always start (restart) after the scan descriptor is > initialized. If what you're complaining about is that I put back the "if (outerPlan->chgParam == NULL)" test to allow postponement of the recursive ExecReScan call, I'm afraid that it's mere wishful thinking that omitting that test in nodeGather did anything. The nodes underneath the Gather are likely to do the same thing, so that the parallel table scan node itself is going to get a postponed rescan call anyway. See e.g. ExecReScanNestLoop(). I see your point that there's inadequate interlocking between resetting the parallel scan's shared state and starting a fresh set of workers, but that's a pre-existing bug. In practice I doubt it makes any difference, because according to my testing the leader will generally reach the table scan long before any workers do. It'd be nice to do better though. I'm inclined to think that what's needed is to move the reset of the shared state into a new "ExecParallelReInitializeDSM" plan tree walk, which would have to occur before we start the new set of workers. regards, tom lane
On Mon, Aug 28, 2017 at 6:34 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Amit Kapila <amit.kapila16@gmail.com> writes: >> On Mon, Aug 28, 2017 at 6:01 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >>> Um, what's different about that than before? > >> Earlier, we perform the rescan of all the nodes before ExecProcNode, >> so workers will always start (restart) after the scan descriptor is >> initialized. > > If what you're complaining about is that I put back the "if > (outerPlan->chgParam == NULL)" test to allow postponement of the > recursive ExecReScan call, I'm afraid that it's mere wishful > thinking that omitting that test in nodeGather did anything. > The nodes underneath the Gather are likely to do the same thing, > so that the parallel table scan node itself is going to get a > postponed rescan call anyway. See e.g. ExecReScanNestLoop(). > Previously outerPlan->chgParam will be NULL, so I think rescan's won't be postponed. IIRC, I have debugged it during the development of this code that rescans were not postponed. I don't deny that for some cases it might get delayed but for simple cases, it was done immediately. I agree that in general, the proposed fix is better than having nothing, but not sure if we need it for Gather as well considering we are not able to demonstrate any case. > I see your point that there's inadequate interlocking between resetting > the parallel scan's shared state and starting a fresh set of workers, > but that's a pre-existing bug. In practice I doubt it makes any > difference, because according to my testing the leader will generally > reach the table scan long before any workers do. It'd be nice to do > better though. > Agreed. BTW, I have mentioned above that we can avoid skipping optimization in rescan path if we are in parallel mode. I think that will not be as elegant a solution as your patch, but it won't have this problem. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
Amit Kapila <amit.kapila16@gmail.com> writes: > On Mon, Aug 28, 2017 at 6:34 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> If what you're complaining about is that I put back the "if >> (outerPlan->chgParam == NULL)" test to allow postponement of the >> recursive ExecReScan call, I'm afraid that it's mere wishful >> thinking that omitting that test in nodeGather did anything. > Previously outerPlan->chgParam will be NULL, so I think rescan's won't > be postponed. That seems like an unacceptably fragile assumption. Even if it happens to be true today, we would need to fix it sooner or later. (And I kinda suspect it's possible to break it today, anyway. Treating PARAM_EXEC Params as parallel-restricted seems to lock out the easiest cases, but we have param slots that don't correspond to any Param node, eg for recursive union worktables. replace_nestloop_params is also a source of PARAM_EXEC Params that won't be detected during is_parallel_safe() tests, because it happens later.) regards, tom lane
On Mon, Aug 28, 2017 at 10:47 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > That seems like an unacceptably fragile assumption. Even if it happens to > be true today, we would need to fix it sooner or later. (And I kinda > suspect it's possible to break it today, anyway. Treating PARAM_EXEC > Params as parallel-restricted seems to lock out the easiest cases, but we > have param slots that don't correspond to any Param node, eg for recursive > union worktables. replace_nestloop_params is also a source of PARAM_EXEC > Params that won't be detected during is_parallel_safe() tests, because it > happens later.) Those particular cases are, I think, handled. The CTE case is handled by considering CTE scans as parallel-restricted, and the nestloop case is handled by insisting that all partial paths must be unparameterized. You can join a partial path to a parameterized non-partial path to make a new partial path, but neither the original partial path nor the resulting one can itself be parameterized. - fuller description. Academic literature on parallel query suggests that + fuller description. The academic literature on parallel query suggests That sentence isn't wrong as written. I don't really understand the part about depending on a parallel-aware node. I mean, there should always be one, except in the single-copy-Gather case, but why is it right to depend on that rather than anything else? What happens when the Parallel Hash patch goes in and we have multiple parallel-aware scan nodes (plus a parallel-aware Hash node) under the same Gather? -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Robert Haas <robertmhaas@gmail.com> writes: > On Mon, Aug 28, 2017 at 10:47 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > - fuller description. Academic literature on parallel query suggests that > + fuller description. The academic literature on parallel query suggests > That sentence isn't wrong as written. Count the "that"s (you're failing to notice the next line). > I don't really understand the part about depending on a parallel-aware > node. I mean, there should always be one, except in the > single-copy-Gather case, but why is it right to depend on that rather > than anything else? What happens when the Parallel Hash patch goes in > and we have multiple parallel-aware scan nodes (plus a parallel-aware > Hash node) under the same Gather? Well, that's what I'm asking. AFAICS we only really need the scan node(s) to be marked as depending on the Gather's rescan parameter. It would not, however, hurt anything for nodes above them to be so marked as well --- and even if we didn't explicitly mark them, those nodes would end up depending on the parameter anyway because of the way that parameter dependency propagation works. I think the question boils down to whether a "parallel_aware" node would ever not be underneath a related Gather. regards, tom lane
On Mon, Aug 28, 2017 at 3:00 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> That sentence isn't wrong as written. > > Count the "that"s (you're failing to notice the next line). OK, true. But "Academic literature" -> "The academic literature" is just second-guessing, I think. >> I don't really understand the part about depending on a parallel-aware >> node. I mean, there should always be one, except in the >> single-copy-Gather case, but why is it right to depend on that rather >> than anything else? What happens when the Parallel Hash patch goes in >> and we have multiple parallel-aware scan nodes (plus a parallel-aware >> Hash node) under the same Gather? > > Well, that's what I'm asking. AFAICS we only really need the scan node(s) > to be marked as depending on the Gather's rescan parameter. It would not, > however, hurt anything for nodes above them to be so marked as well --- > and even if we didn't explicitly mark them, those nodes would end up > depending on the parameter anyway because of the way that parameter > dependency propagation works. I think the question boils down to whether > a "parallel_aware" node would ever not be underneath a related Gather. There should never be a parallel_aware node that's not beneath a Gather or Gather Merge; I don't know what the meaning of such a plan would be, so I think we're safe against such a thing appearing in the future. What I'm unclear about is what happens with nodes that aren't directly in the chain between the Gather and the parallel-aware node. For instance: Something -> Gather -> Merge Join -> Sort -> Parallel Seq Scan on a -> Merge Join -> Sort -> Seq Scan on b ->Sort -> Seq Scan on c If the Gather gets rescanned, is it OK to force a re-sort of a but not of b or c? Hmm, maybe so. The workers are going to have to do the sorts of b and c since any workers from a previous scan are GONE, but if the leader's done that work, it's still good. Similarly: Something -> Gather -> Merge Join -> Sort -> Parallel Seq Scan on a -> Hash Join -> Index Scan on b -> Hash ->Seq Scan on c If the leader's got an existing hash table built on c, it can reuse it. The workers will need to build one. Now consider Parallel Hash (not yet committed), where we might get this: Something -> Gather -> Merge Join -> Sort -> Parallel Seq Scan on a -> Hash Join -> Index Scan on b -> Parallel Hash -> Parallel Seq Scan on c Now what? We clearly still need to force a re-sort of a, but what about the shared hash table built on c? If we've still got that hash table and it was a single-batch join, there's probably no need to rescan it even though both the Parallel Hash node and the Parallel Seq Scan beneath it are parallel-aware. In this case all workers collaborated to build a shared hash table covering all rows from c; if we've still got that, it's still good. In fact, even the workers can reuse that hash table even though, for them, it'll not really be a re-scan at all. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Robert Haas <robertmhaas@gmail.com> writes: > On Mon, Aug 28, 2017 at 3:00 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> Count the "that"s (you're failing to notice the next line). > OK, true. But "Academic literature" -> "The academic literature" is > just second-guessing, I think. No, it was more to avoid reflowing the paragraph (or leaving a weirdly short line). > There should never be a parallel_aware node that's not beneath a > Gather or Gather Merge; I don't know what the meaning of such a plan > would be, so I think we're safe against such a thing appearing in the > future. What I'm unclear about is what happens with nodes that aren't > directly in the chain between the Gather and the parallel-aware node. Nothing. The parallel-aware node(s) get marked as dependent on the rescan parameter, and then that marking bubbles up to their ancestor nodes (up to the Gather). Nodes that are not ancestral to any parallel-aware node are unchanged. > Now consider Parallel Hash > (not yet committed), where we might get this: > Something > -> Gather > -> Merge Join > -> Sort > -> Parallel Seq Scan on a > -> Hash Join > -> Index Scan on b > -> Parallel Hash > -> Parallel Seq Scan on c > Now what? We clearly still need to force a re-sort of a, but what > about the shared hash table built on c? If we've still got that hash > table and it was a single-batch join, there's probably no need to > rescan it even though both the Parallel Hash node and the Parallel Seq > Scan beneath it are parallel-aware. In this case all workers > collaborated to build a shared hash table covering all rows from c; if > we've still got that, it's still good. In fact, even the workers can > reuse that hash table even though, for them, it'll not really be a > re-scan at all. Well, I'd say that's something for the parallel hash patch to resolve. Yes, if the contents of the hash table are expected to be the same regardless of how many workers were involved, then we shouldn't need to rebuild it after a rescan of the Gather. That would mean not marking either Parallel Hash or its descendant Parallel Seq Scan as dependent on the Gather's rescan param. That's not terribly hard to mechanize within the structure of this patch: just ignore the param at/below the ParallelHash. Cowboy coding would be, perhaps, if (plan->parallel_aware) { if (gather_param < 0) elog(ERROR, "parallel-aware plan node is not belowa Gather"); + if (IsA(plan, Hash)) + gather_param = -1; + else context.paramids = bms_add_member(context.paramids, gather_param); } but probably we should think of a more arm's-length way to do it. Maybe parallel_aware should have more than two values, depending on whether the result of the node is context-dependent or not. regards, tom lane
On Mon, Aug 28, 2017 at 6:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > but probably we should think of a more arm's-length way to do it. > Maybe parallel_aware should have more than two values, depending > on whether the result of the node is context-dependent or not. My original intent for the parallel_aware flag was for it to signal whether the plan node was going to do something functionally different when in parallel mode. For scans, that's come to mean "partition the input among the workers", and there doesn't seem to be any other sensible meaning. I don't have a good idea what it's going to mean for non-scan nodes yet. Parallel Hash will be the first non-parallel aware scan node, and it uses it to mean that the hash table in dynamic shared memory, so that the inner side can be partial (which is otherwise not possible). I'm guessing that is going to be a common meaning for nodes that store stuff - it's easy to imagine Parallel Materialize, Parallel Sort, Parallel HashAggregate with similar semantics. There's also a proposed patch for Parallel Append where it signals that DSM is being used to coordinate task scheduling and load balancing. It seems likely the whole concept of parallel_aware is only only a zero-order approximation to what we really want. This bug is, IMHO, the first really tangible evidence of the concept creaking around the edges, but I've kind of had a feeling for a while that it wasn't likely to be problem-free. I'm still not sure exactly what the right answer will turn out to be. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Robert Haas <robertmhaas@gmail.com> writes: > On Mon, Aug 28, 2017 at 6:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> Maybe parallel_aware should have more than two values, depending >> on whether the result of the node is context-dependent or not. > It seems likely the whole concept of parallel_aware is only only a > zero-order approximation to what we really want. Yeah, I agree --- but it's also clear that we don't yet know what it should be. We'll have to work that out as we accrete more functionality. In the meantime, I think what we should do is commit the bug fix more or less as I have it, and then work on Amit's concern about losing parallel efficiency by separating the resetting of shared parallel-scan state into a new plan tree traversal that's done before launching new worker processes. The only real alternative is to lobotomize the existing rescan optimizations, and that seems like a really poor choice from here. regards, tom lane
On Mon, Aug 28, 2017 at 10:17 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > In the meantime, I think what we should do is commit the bug fix more or > less as I have it, and then work on Amit's concern about losing parallel > efficiency by separating the resetting of shared parallel-scan state > into a new plan tree traversal that's done before launching new worker > processes. The only real alternative is to lobotomize the existing rescan > optimizations, and that seems like a really poor choice from here. There's already ExecParallelReinitialize, which could be made to walk the nodes in addition to what it does already, but I don't understand exactly what else needs fixing. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Tue, Aug 29, 2017 at 8:32 AM, Robert Haas <robertmhaas@gmail.com> wrote: > On Mon, Aug 28, 2017 at 10:17 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> In the meantime, I think what we should do is commit the bug fix more or >> less as I have it, and then work on Amit's concern about losing parallel >> efficiency by separating the resetting of shared parallel-scan state >> into a new plan tree traversal that's done before launching new worker >> processes. >> Sounds reasonable plan to me. >> The only real alternative is to lobotomize the existing rescan >> optimizations, and that seems like a really poor choice from here. > > There's already ExecParallelReinitialize, which could be made to walk > the nodes in addition to what it does already, but I don't understand > exactly what else needs fixing. > Sure, but it is not advisable to reset state of all the nodes below gather at that place, otherwise, it will be more or less like we are forcing rescan of each node. I think there we can reset the shared parallel state of parallel-aware nodes (or anything related) and then allow rescan to reset the master backend specific state for all nodes beneath gather. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
Amit Kapila <amit.kapila16@gmail.com> writes: > On Tue, Aug 29, 2017 at 8:32 AM, Robert Haas <robertmhaas@gmail.com> wrote: >> There's already ExecParallelReinitialize, which could be made to walk >> the nodes in addition to what it does already, but I don't understand >> exactly what else needs fixing. > Sure, but it is not advisable to reset state of all the nodes below > gather at that place, otherwise, it will be more or less like we are > forcing rescan of each node. I think there we can reset the shared > parallel state of parallel-aware nodes (or anything related) and then > allow rescan to reset the master backend specific state for all nodes > beneath gather. Right, the idea is to make this happen separately from the "rescan" logic. In general, it's a good idea for ExecReScanFoo to do as little as possible, so that you don't pay if a node is rescanned more than once before it's asked to do anything, or indeed if no rows are ever demanded from it at all. Attached is a WIP patch along this line. It's unfinished because I've not done the tedium of extending the FDW and CustomScan APIs to support this new type of per-node operation; but that part seems straightforward enough. The core code is complete and survived light testing. I'm pretty happy with the results --- note in particular how we get rid of some very dubious coding in ExecReScanIndexScan and ExecReScanIndexOnlyScan. If you try the test case from a2b70c89c on this patch alone, you'll notice that instead of sometimes reporting too-small counts during the rescans, it pretty consistently reports too-large counts. This is because we are now successfully resetting the shared state for the parallel seqscan, but we haven't done anything about the leader's HashAgg node deciding that it can re-use its old hashtable. So on the first scan, the leader typically scans all or most of the table because of its startup time advantage, and saves those counts in its hashtable. On later scans, the workers read all of the table while the leader decides it need do no scanning. So we get counts that reflect all of the table (from the workers) plus whatever part of the table the leader read the first time. So this by no means removes the need for my other patch. If no objections, I'll do the additional legwork and push. As before, I think we can probably get away without fixing 9.6, even though it's nominally got the same bug. regards, tom lane diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ff03c68..e29c5ad 100644 *** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** heap_rescan(HeapScanDesc scan, *** 1531,1551 **** * reinitialize scan descriptor */ initscan(scan, key, true); - - /* - * reset parallel scan, if present - */ - if (scan->rs_parallel != NULL) - { - ParallelHeapScanDesc parallel_scan; - - /* - * Caller is responsible for making sure that all workers have - * finished the scan before calling this. - */ - parallel_scan = scan->rs_parallel; - pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0); - } } /* ---------------- --- 1531,1536 ---- *************** heap_parallelscan_initialize(ParallelHea *** 1643,1648 **** --- 1628,1646 ---- } /* ---------------- + * heap_parallelscan_reinitialize - reset a parallel scan + * + * Call this in the leader process. Caller is responsible for + * making sure that all workers have finished the scan beforehand. + * ---------------- + */ + void + heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan) + { + pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0); + } + + /* ---------------- * heap_beginscan_parallel - join a parallel scan * * Caller must hold a suitable lock on the correct relation. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d..d8cdb0e 100644 *** a/src/backend/executor/execParallel.c --- b/src/backend/executor/execParallel.c *************** static bool ExecParallelInitializeDSM(Pl *** 109,114 **** --- 109,116 ---- ExecParallelInitializeDSMContext *d); static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize); + static bool ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); *************** ExecParallelSetupTupleQueues(ParallelCon *** 365,382 **** } /* - * Re-initialize the parallel executor info such that it can be reused by - * workers. - */ - void - ExecParallelReinitialize(ParallelExecutorInfo *pei) - { - ReinitializeParallelDSM(pei->pcxt); - pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); - pei->finished = false; - } - - /* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ --- 367,372 ---- *************** ExecInitParallelPlan(PlanState *planstat *** 567,573 **** ExecParallelInitializeDSM(planstate, &d); /* ! * Make sure that the world hasn't shifted under our feat. This could * probably just be an Assert(), but let's be conservative for now. */ if (e.nnodes != d.nnodes) --- 557,563 ---- ExecParallelInitializeDSM(planstate, &d); /* ! * Make sure that the world hasn't shifted under our feet. This could * probably just be an Assert(), but let's be conservative for now. */ if (e.nnodes != d.nnodes) *************** ExecInitParallelPlan(PlanState *planstat *** 578,583 **** --- 568,639 ---- } /* + * Re-initialize the parallel executor shared memory state before launching + * a fresh batch of workers. + */ + void + ExecParallelReinitialize(PlanState *planstate, + ParallelExecutorInfo *pei) + { + ReinitializeParallelDSM(pei->pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->finished = false; + + /* Traverse plan tree and let each child node reset associated state. */ + ExecParallelReInitializeDSM(planstate, pei->pcxt); + } + + /* + * Traverse plan tree to reinitialize per-node dynamic shared memory state + */ + static bool + ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt) + { + if (planstate == NULL) + return false; + + /* + * Call reinitializers for parallel-aware plan nodes. + */ + if (planstate->plan->parallel_aware) + { + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanReInitializeDSM((SeqScanState *) planstate, + pcxt); + break; + case T_IndexScanState: + ExecIndexScanReInitializeDSM((IndexScanState *) planstate, + pcxt); + break; + case T_IndexOnlyScanState: + ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate, + pcxt); + break; + case T_ForeignScanState: + ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, + pcxt); + break; + case T_CustomScanState: + ExecCustomScanReInitializeDSM((CustomScanState *) planstate, + pcxt); + break; + case T_BitmapHeapScanState: + ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, + pcxt); + break; + + default: + break; + } + } + + return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt); + } + + /* * Copy instrumentation information about this node and its descendants from * dynamic shared memory. */ diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c index 79f534e..f7e55e0 100644 *** a/src/backend/executor/nodeBitmapHeapscan.c --- b/src/backend/executor/nodeBitmapHeapscan.c *************** ExecReScanBitmapHeapScan(BitmapHeapScanS *** 705,727 **** node->shared_tbmiterator = NULL; node->shared_prefetch_iterator = NULL; - /* Reset parallel bitmap state, if present */ - if (node->pstate) - { - dsa_area *dsa = node->ss.ps.state->es_query_dsa; - - node->pstate->state = BM_INITIAL; - - if (DsaPointerIsValid(node->pstate->tbmiterator)) - tbm_free_shared_area(dsa, node->pstate->tbmiterator); - - if (DsaPointerIsValid(node->pstate->prefetch_iterator)) - tbm_free_shared_area(dsa, node->pstate->prefetch_iterator); - - node->pstate->tbmiterator = InvalidDsaPointer; - node->pstate->prefetch_iterator = InvalidDsaPointer; - } - ExecScanReScan(&node->ss); /* --- 705,710 ---- *************** ExecBitmapHeapInitializeDSM(BitmapHeapSc *** 1000,1005 **** --- 983,1013 ---- } /* ---------------------------------------------------------------- + * ExecBitmapHeapReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ + void + ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, + ParallelContext *pcxt) + { + ParallelBitmapHeapState *pstate = node->pstate; + dsa_area *dsa = node->ss.ps.state->es_query_dsa; + + pstate->state = BM_INITIAL; + + if (DsaPointerIsValid(pstate->tbmiterator)) + tbm_free_shared_area(dsa, pstate->tbmiterator); + + if (DsaPointerIsValid(pstate->prefetch_iterator)) + tbm_free_shared_area(dsa, pstate->prefetch_iterator); + + pstate->tbmiterator = InvalidDsaPointer; + pstate->prefetch_iterator = InvalidDsaPointer; + } + + /* ---------------------------------------------------------------- * ExecBitmapHeapInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index fb7645b..7ec72d7 100644 *** a/src/backend/executor/nodeCustom.c --- b/src/backend/executor/nodeCustom.c *************** ExecCustomScanInitializeDSM(CustomScanSt *** 195,200 **** --- 195,206 ---- } void + ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt) + { + /* XXX */ + } + + void ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) { const CustomExecMethods *methods = node->methods; diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 140e82e..779d0db 100644 *** a/src/backend/executor/nodeForeignscan.c --- b/src/backend/executor/nodeForeignscan.c *************** ExecForeignScanInitializeDSM(ForeignScan *** 332,338 **** } /* ---------------------------------------------------------------- ! * ExecForeignScanInitializeDSM * * Initialization according to the parallel coordination information * ---------------------------------------------------------------- --- 332,350 ---- } /* ---------------------------------------------------------------- ! * ExecForeignScanReInitializeDSM ! * ! * Reset shared state before beginning a fresh scan. ! * ---------------------------------------------------------------- ! */ ! void ! ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) ! { ! /* XXX */ ! } ! ! /* ---------------------------------------------------------------- ! * ExecForeignScanInitializeWorker * * Initialization according to the parallel coordination information * ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index e8d94ee..b8e24d9 100644 *** a/src/backend/executor/nodeGather.c --- b/src/backend/executor/nodeGather.c *************** ExecGather(PlanState *pstate) *** 152,162 **** { ParallelContext *pcxt; ! /* Initialize the workers required to execute Gather node. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gather->num_workers); /* * Register backend workers. We might not get as many as we --- 152,165 ---- { ParallelContext *pcxt; ! /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gather->num_workers); + else + ExecParallelReinitialize(node->ps.lefttree, + node->pei); /* * Register backend workers. We might not get as many as we *************** ExecShutdownGather(GatherState *node) *** 430,447 **** void ExecReScanGather(GatherState *node) { ! /* ! * Re-initialize the parallel workers to perform rescan of relation. We ! * want to gracefully shutdown all the workers so that they should be able ! * to propagate any error or other information to master backend before ! * dying. Parallel context will be reused for rescan. ! */ ExecShutdownGatherWorkers(node); node->initialized = false; - if (node->pei) - ExecParallelReinitialize(node->pei); - ExecReScan(node->ps.lefttree); } --- 433,443 ---- void ExecReScanGather(GatherState *node) { ! /* Make sure any existing workers are gracefully shut down */ ExecShutdownGatherWorkers(node); + /* Mark node so that shared state will be rebuilt at next call */ node->initialized = false; ExecReScan(node->ps.lefttree); } diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 64c6239..26fde0b 100644 *** a/src/backend/executor/nodeGatherMerge.c --- b/src/backend/executor/nodeGatherMerge.c *************** ExecGatherMerge(PlanState *pstate) *** 186,196 **** { ParallelContext *pcxt; ! /* Initialize data structures for workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gm->num_workers); /* Try to launch workers. */ pcxt = node->pei->pcxt; --- 186,199 ---- { ParallelContext *pcxt; ! /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gm->num_workers); + else + ExecParallelReinitialize(node->ps.lefttree, + node->pei); /* Try to launch workers. */ pcxt = node->pei->pcxt; *************** ExecShutdownGatherMergeWorkers(GatherMer *** 325,344 **** void ExecReScanGatherMerge(GatherMergeState *node) { ! /* ! * Re-initialize the parallel workers to perform rescan of relation. We ! * want to gracefully shutdown all the workers so that they should be able ! * to propagate any error or other information to master backend before ! * dying. Parallel context will be reused for rescan. ! */ ExecShutdownGatherMergeWorkers(node); node->initialized = false; node->gm_initialized = false; - if (node->pei) - ExecParallelReinitialize(node->pei); - ExecReScan(node->ps.lefttree); } --- 328,340 ---- void ExecReScanGatherMerge(GatherMergeState *node) { ! /* Make sure any existing workers are gracefully shut down */ ExecShutdownGatherMergeWorkers(node); + /* Mark node so that shared state will be rebuilt at next call */ node->initialized = false; node->gm_initialized = false; ExecReScan(node->ps.lefttree); } diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index fe7ba3f..5351cb8 100644 *** a/src/backend/executor/nodeIndexonlyscan.c --- b/src/backend/executor/nodeIndexonlyscan.c *************** *** 25,30 **** --- 25,31 ---- * parallel index-only scan * ExecIndexOnlyScanInitializeDSM initialize DSM for parallel * index-only scan + * ExecIndexOnlyScanReInitializeDSM reinitialize DSM for fresh scan * ExecIndexOnlyScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" *************** ExecIndexOnlyScan(PlanState *pstate) *** 336,351 **** void ExecReScanIndexOnlyScan(IndexOnlyScanState *node) { - bool reset_parallel_scan = true; - - /* - * If we are here to just update the scan keys, then don't reset parallel - * scan. For detailed reason behind this look in the comments for - * ExecReScanIndexScan. - */ - if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady) - reset_parallel_scan = false; - /* * If we are doing runtime key calculations (ie, any of the index key * values weren't simple Consts), compute the new key values. But first, --- 337,342 ---- *************** ExecReScanIndexOnlyScan(IndexOnlyScanSta *** 366,380 **** /* reset index scan */ if (node->ioss_ScanDesc) - { - index_rescan(node->ioss_ScanDesc, node->ioss_ScanKeys, node->ioss_NumScanKeys, node->ioss_OrderByKeys, node->ioss_NumOrderByKeys); - if (reset_parallel_scan && node->ioss_ScanDesc->parallel_scan) - index_parallelrescan(node->ioss_ScanDesc); - } ExecScanReScan(&node->ss); } --- 357,366 ---- *************** ExecIndexOnlyScanInitializeDSM(IndexOnly *** 672,677 **** --- 658,676 ---- } /* ---------------------------------------------------------------- + * ExecIndexOnlyScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ + void + ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, + ParallelContext *pcxt) + { + index_parallelrescan(node->ioss_ScanDesc); + } + + /* ---------------------------------------------------------------- * ExecIndexOnlyScanInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 404076d..638b17b 100644 *** a/src/backend/executor/nodeIndexscan.c --- b/src/backend/executor/nodeIndexscan.c *************** *** 24,29 **** --- 24,30 ---- * ExecIndexRestrPos restores scan position. * ExecIndexScanEstimate estimates DSM space needed for parallel index scan * ExecIndexScanInitializeDSM initialize DSM for parallel indexscan + * ExecIndexScanReInitializeDSM reinitialize DSM for fresh scan * ExecIndexScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" *************** ExecIndexScan(PlanState *pstate) *** 577,594 **** void ExecReScanIndexScan(IndexScanState *node) { - bool reset_parallel_scan = true; - - /* - * If we are here to just update the scan keys, then don't reset parallel - * scan. We don't want each of the participating process in the parallel - * scan to update the shared parallel scan state at the start of the scan. - * It is quite possible that one of the participants has already begun - * scanning the index when another has yet to start it. - */ - if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady) - reset_parallel_scan = false; - /* * If we are doing runtime key calculations (ie, any of the index key * values weren't simple Consts), compute the new key values. But first, --- 578,583 ---- *************** ExecReScanIndexScan(IndexScanState *node *** 614,634 **** reorderqueue_pop(node); } ! /* ! * Reset (parallel) index scan. For parallel-aware nodes, the scan ! * descriptor is initialized during actual execution of node and we can ! * reach here before that (ex. during execution of nest loop join). So, ! * avoid updating the scan descriptor at that time. ! */ if (node->iss_ScanDesc) - { index_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); - - if (reset_parallel_scan && node->iss_ScanDesc->parallel_scan) - index_parallelrescan(node->iss_ScanDesc); - } node->iss_ReachedEnd = false; ExecScanReScan(&node->ss); --- 603,613 ---- reorderqueue_pop(node); } ! /* reset index scan */ if (node->iss_ScanDesc) index_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); node->iss_ReachedEnd = false; ExecScanReScan(&node->ss); *************** ExecIndexScanInitializeDSM(IndexScanStat *** 1717,1722 **** --- 1696,1714 ---- } /* ---------------------------------------------------------------- + * ExecIndexScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ + void + ExecIndexScanReInitializeDSM(IndexScanState *node, + ParallelContext *pcxt) + { + index_parallelrescan(node->iss_ScanDesc); + } + + /* ---------------------------------------------------------------- * ExecIndexScanInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 5c49d4c..d4ac939 100644 *** a/src/backend/executor/nodeSeqscan.c --- b/src/backend/executor/nodeSeqscan.c *************** *** 22,27 **** --- 22,28 ---- * * ExecSeqScanEstimate estimates DSM space needed for parallel scan * ExecSeqScanInitializeDSM initialize DSM for parallel scan + * ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan * ExecSeqScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" *************** ExecSeqScanInitializeDSM(SeqScanState *n *** 325,330 **** --- 326,346 ---- } /* ---------------------------------------------------------------- + * ExecSeqScanReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ + void + ExecSeqScanReInitializeDSM(SeqScanState *node, + ParallelContext *pcxt) + { + HeapScanDesc scan = node->ss.ss_currentScanDesc; + + heap_parallelscan_reinitialize(scan->rs_parallel); + } + + /* ---------------------------------------------------------------- * ExecSeqScanInitializeWorker * * Copy relevant information from TOC into planstate. diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index b2132e7..4e41024 100644 *** a/src/include/access/heapam.h --- b/src/include/access/heapam.h *************** extern HeapTuple heap_getnext(HeapScanDe *** 130,135 **** --- 130,136 ---- extern Size heap_parallelscan_estimate(Snapshot snapshot); extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, Snapshot snapshot); + extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan); extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); extern bool heap_fetch(Relation relation, Snapshot snapshot, diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index bd0a87f..a651224 100644 *** a/src/include/executor/execParallel.h --- b/src/include/executor/execParallel.h *************** extern ParallelExecutorInfo *ExecInitPar *** 36,42 **** EState *estate, int nworkers); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); ! extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); --- 36,43 ---- EState *estate, int nworkers); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); ! extern void ExecParallelReinitialize(PlanState *planstate, ! ParallelExecutorInfo *pei); extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h index c77694c..10844a4 100644 *** a/src/include/executor/nodeBitmapHeapscan.h --- b/src/include/executor/nodeBitmapHeapscan.h *************** extern void ExecBitmapHeapEstimate(Bitma *** 24,29 **** --- 24,31 ---- ParallelContext *pcxt); extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt); + extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, + ParallelContext *pcxt); extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc); diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index a1cc63a..25767b6 100644 *** a/src/include/executor/nodeCustom.h --- b/src/include/executor/nodeCustom.h *************** extern void ExecCustomScanEstimate(Custo *** 34,39 **** --- 34,41 ---- ParallelContext *pcxt); extern void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt); + extern void ExecCustomScanReInitializeDSM(CustomScanState *node, + ParallelContext *pcxt); extern void ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc); extern void ExecShutdownCustomScan(CustomScanState *node); diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 0b66259..0354c2c 100644 *** a/src/include/executor/nodeForeignscan.h --- b/src/include/executor/nodeForeignscan.h *************** extern void ExecForeignScanEstimate(Fore *** 25,30 **** --- 25,32 ---- ParallelContext *pcxt); extern void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt); + extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, + ParallelContext *pcxt); extern void ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc); extern void ExecShutdownForeignScan(ForeignScanState *node); diff --git a/src/include/executor/nodeIndexonlyscan.h b/src/include/executor/nodeIndexonlyscan.h index c8a709c..690b5db 100644 *** a/src/include/executor/nodeIndexonlyscan.h --- b/src/include/executor/nodeIndexonlyscan.h *************** extern void ExecIndexOnlyScanEstimate(In *** 28,33 **** --- 28,35 ---- ParallelContext *pcxt); extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt); + extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, + ParallelContext *pcxt); extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc); diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h index 1668e34..0670e87 100644 *** a/src/include/executor/nodeIndexscan.h --- b/src/include/executor/nodeIndexscan.h *************** extern void ExecIndexRestrPos(IndexScanS *** 24,29 **** --- 24,30 ---- extern void ExecReScanIndexScan(IndexScanState *node); extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt); + extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc); /* diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index 0fba79f..eb96799 100644 *** a/src/include/executor/nodeSeqscan.h --- b/src/include/executor/nodeSeqscan.h *************** extern void ExecReScanSeqScan(SeqScanSta *** 24,29 **** --- 24,30 ---- /* parallel scan support */ extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); + extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); #endif /* NODESEQSCAN_H */ -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Aug 29, 2017 at 10:05 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Amit Kapila <amit.kapila16@gmail.com> writes: >> On Tue, Aug 29, 2017 at 8:32 AM, Robert Haas <robertmhaas@gmail.com> wrote: >>> There's already ExecParallelReinitialize, which could be made to walk >>> the nodes in addition to what it does already, but I don't understand >>> exactly what else needs fixing. > >> Sure, but it is not advisable to reset state of all the nodes below >> gather at that place, otherwise, it will be more or less like we are >> forcing rescan of each node. I think there we can reset the shared >> parallel state of parallel-aware nodes (or anything related) and then >> allow rescan to reset the master backend specific state for all nodes >> beneath gather. > > Right, the idea is to make this happen separately from the "rescan" > logic. In general, it's a good idea for ExecReScanFoo to do as little > as possible, so that you don't pay if a node is rescanned more than > once before it's asked to do anything, or indeed if no rows are ever > demanded from it at all. > > Attached is a WIP patch along this line. > The idea looks sane to me. > It's unfinished because > I've not done the tedium of extending the FDW and CustomScan APIs > to support this new type of per-node operation; but that part seems > straightforward enough. The core code is complete and survived > light testing. > I have also played a bit with both of the patches together and didn't found any problem. In your second patch, I have a minor comment. void ExecReScanGather(GatherState *node) { ! /* Make sure any existing workers are gracefully shut down */ ExecShutdownGatherWorkers(node); The above call doesn't ensure the shutdown. It just ensures that we receive all messages from parallel workers. Basically, it doesn't call WaitForParallelWorkersToExit. > I'm pretty happy with the results --- note in > particular how we get rid of some very dubious coding in > ExecReScanIndexScan and ExecReScanIndexOnlyScan. > > If you try the test case from a2b70c89c on this patch alone, you'll notice > that instead of sometimes reporting too-small counts during the rescans, > it pretty consistently reports too-large counts. This is because we are > now successfully resetting the shared state for the parallel seqscan, but > we haven't done anything about the leader's HashAgg node deciding that it > can re-use its old hashtable. So on the first scan, the leader typically > scans all or most of the table because of its startup time advantage, and > saves those counts in its hashtable. On later scans, the workers read all > of the table while the leader decides it need do no scanning. So we get > counts that reflect all of the table (from the workers) plus whatever part > of the table the leader read the first time. So this by no means removes > the need for my other patch. > > If no objections, I'll do the additional legwork and push. > No objections. > As before, > I think we can probably get away without fixing 9.6, even though it's > nominally got the same bug. > +1. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com
Amit Kapila <amit.kapila16@gmail.com> writes: > On Tue, Aug 29, 2017 at 10:05 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > ! /* Make sure any existing workers are gracefully shut down */ > ExecShutdownGatherWorkers(node); > The above call doesn't ensure the shutdown. It just ensures that we > receive all messages from parallel workers. Basically, it doesn't > call WaitForParallelWorkersToExit. Perhaps you should submit a patch to rename ExecShutdownGatherWorkers to something less misleading, then. But the previous comment there was even more wrong :-( regards, tom lane
On Wed, Aug 30, 2017 at 7:39 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > Amit Kapila <amit.kapila16@gmail.com> writes: >> On Tue, Aug 29, 2017 at 10:05 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> ! /* Make sure any existing workers are gracefully shut down */ >> ExecShutdownGatherWorkers(node); > >> The above call doesn't ensure the shutdown. It just ensures that we >> receive all messages from parallel workers. Basically, it doesn't >> call WaitForParallelWorkersToExit. > > Perhaps you should submit a patch to rename ExecShutdownGatherWorkers > to something less misleading, then. But the previous comment there > was even more wrong :-( Your (Tom's) proposed comment doesn't seem wrong to me. Shutting down workers consists of several stages. We destroy the tuple queues -- which will cause them to cease generating tuples once they notice -- then we wait for them to send us an 'X' message to indicate that they've shut down cleanly -- then they actually exit -- then the postmaster notices and releases their slots for reuse. After ExecShutdownGatherWorkers has completed, the first two of those things have finished but the third and fourth may not be quite done yet. I'd say it's fair to say, at that point, that the workers are gracefully shut down. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Amit Kapila <amit.kapila16@gmail.com> writes: > On Tue, Aug 29, 2017 at 10:05 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> If no objections, I'll do the additional legwork and push. > No objections. Done. Out of curiosity, I pushed just the rescan-param patch to the buildfarm to start with, to see if anything would fall over, and indeed some things did: * prairiedog has shown several instances of a parallel bitmap heap scan test failing with too many rows being retrieved. I think what's happening there is that the leader's ExecReScanBitmapHeapScan call is slow enough to happen that the worker(s) have already retrieved some rows using the old shared state. We'd determined that the equivalent case for a plain seqscan would result in no failure because the workers would think they had nothing to do, but this evidently isn't true for a parallel bitmap scan. * prairiedog and loach have both shown failures with the test case from a2b70c89c, in which the *first* scan produces too many rows and then the later ones are fine. This befuddled me initially, but then I remembered that nodeNestloop.c will unconditionally do an ExecReScan call on its inner plan before the first ExecProcNode call. With the modified code from 7df2c1f8d, this results in the leader's Gather node's top child having a pending rescan on it due to a chgParam bit. That's serviced when we do the first ExecProcNode call on the child, after having started the workers. So that's another way in which a ReScan call can happen in the leader when workers are already running, and if the workers have already scanned some pages then those pages will get scanned again. So I think this is all fixed up by 41b0dd987, but evidently those patches are not nearly as independent as I first thought. regards, tom lane