Thread: initial pruning in parallel append
Hi, In an off-list chat, Robert suggested that it might be a good idea to look more closely into $subject, especially in the context of the project of moving the locking of child tables / partitions to the ExecInitNode() phase when executing cached generic plans [1]. Robert's point is that a worker's output of initial pruning which consists of the set of child subplans (of a parallel-aware Append or MergeAppend) it considers as valid for execution may not be the same as the leader's and that of other workers. If that does indeed happen, it may confuse the Append's parallel-execution code, possibly even cause crashes, because the ParallelAppendState set up by the leader assumes a certain number and identity (?) of valid-for-execution subplans. So he suggests that initial pruning should only be done once in the leader and the result of that put in the EState for ExecInitParallelPlan() to serialize to pass down to workers. Workers would simply consume that as-is to set the valid-for-execution child subplans in its copy of AppendState, instead of doing the initial pruning again. Actually, earlier patches at [1] had implemented that mechanism (remembering the result of initial pruning and using it at a later time and place), because the earlier design there was to move the initial pruning on the nodes in a cached generic plan tree from ExecInitNode() to GetCachedPlan(). The result of initial pruning done in the latter would be passed down to and consumed in the former using what was called PartitionPruneResult nodes. Maybe that stuff could be resurrected, though I was wondering if the risk of the same initial pruning steps returning different results when performed repeatedly in *one query lifetime* aren't pretty minimal or maybe rather non-existent? I think that's because performing initial pruning steps entails computing constant and/or stable expressions and comparing them with an unchanging set of partition bound values, with comparison functions whose result is also presumed to be stable. Then there's also the step of mapping the partition indexes as they appear in the PartitionDesc to the indexes of their subplans under Append/MergeAppend using the information contained in PartitionPruneInfo (subplan_map) and the result of mapping should be immutable too. I considered that the comparison functions that match_clause_to_partition_key() obtains by calling get_opfamily_proc() may in fact not be stable, though that doesn't seem to be a worry at least with the out-of-the-box pg_amproc collection: select amproc, p.provolatile from pg_amproc, pg_proc p where amproc = p.oid and p.provolatile <> 'i'; amproc | provolatile ---------------------------+------------- date_cmp_timestamptz | s timestamp_cmp_timestamptz | s timestamptz_cmp_date | s timestamptz_cmp_timestamp | s pg_catalog.in_range | s (5 rows) Is it possible for a user to add a volatile procedure to pg_amproc? If that's possible, match_clause_to_partition_key() may pick one as a comparison function for pruning, because it doesn't actually check the procedure's provolatile before doing so. I'd hope not, though would like to be sure to support what I wrote above. -- Thanks, Amit Langote EDB: http://www.enterprisedb.com [1] https://commitfest.postgresql.org/43/3478/
On Tue, Jun 27, 2023 at 9:23 AM Amit Langote <amitlangote09@gmail.com> wrote: > Maybe that stuff could be resurrected, though I was wondering if the > risk of the same initial pruning steps returning different results > when performed repeatedly in *one query lifetime* aren't pretty > minimal or maybe rather non-existent? I think that's because > performing initial pruning steps entails computing constant and/or > stable expressions and comparing them with an unchanging set of > partition bound values, with comparison functions whose result is also > presumed to be stable. Then there's also the step of mapping the > partition indexes as they appear in the PartitionDesc to the indexes > of their subplans under Append/MergeAppend using the information > contained in PartitionPruneInfo (subplan_map) and the result of > mapping should be immutable too. > > I considered that the comparison functions that > match_clause_to_partition_key() obtains by calling get_opfamily_proc() > may in fact not be stable, though that doesn't seem to be a worry at > least with the out-of-the-box pg_amproc collection: I think it could be acceptable if a stable function not actually being stable results in some kind of internal error message, hopefully one that in some way hints to the user what the problem was. But crashing because some expression was supposed to be stable and wasn't is a bridge too far, especially if, as I think would be the case here, the crash happens in a part of the code that is far removed from where the problem was introduced. The real issue here is about how much trust you can place in a given invariant. If, in a single function, we initialize a value to 0 and thereafter only ever increment it, we can logically reason that if we ever see a value less than zero, there must have been an overflow. It is true that if our function calls some other function, that other function could access data through a garbage pointer and possibly corrupt the value of our function's local variable, but that's extremely unlikely, and we can basically decide that we're not going to are about it, because such code is likely to crash anyway before too long. But now consider an invariant that implicates a larger amount of code e.g. you must always hold a buffer pin before accessing the buffer contents. In many cases, it's fairly easy to verify that this must be so in any given piece of code, but there are problems: some code that does buffer access is complicated enough that it's hard to fully verify, especially when buffer pins are held across long periods, and what is probably worse, there are tons of different places in the code that access buffers. Hence, we've had bugs in this area, and likely will have bugs in this area again. In theory, with a sufficient amount of really careful work, you can find all of the problems, but in practice it's pretty difficult. Nonetheless, we just have to just accept the risk that we're going to crash if a bug in this area does exist, because there's no real way to cope with the contents of the buffer that you're accessing being swapped out while you're in the middle of looking at it, or even modifying it. But the present case is different in a couple of ways. First, there's probably even more code involved, including a good bit of it that's not in core but is user-defined. Second, we've generally made a decision up until now that we don't want to have a hard dependency on stable functions actually being stable. If they aren't, and for example you're using index expressions, your queries may return wrong answers, but you won't get weird internal error messages, and you won't get a crash. I think the bar for this feature is the same. -- Robert Haas EDB: http://www.enterprisedb.com
Robert Haas <robertmhaas@gmail.com> writes: > ... Second, we've generally made a > decision up until now that we don't want to have a hard dependency on > stable functions actually being stable. If they aren't, and for > example you're using index expressions, your queries may return wrong > answers, but you won't get weird internal error messages, and you > won't get a crash. I think the bar for this feature is the same. Yeah, I agree --- wrong answers may be acceptable in such a case, but crashes or unintelligible error messages aren't. There are practical reasons for being tolerant here, notably that it's not that easy for users to get their volatility markings right. In the case at hand, I think that means that allowing workers to do pruning would require hardening the parallel append code against the situation where their pruning results vary. Maybe, instead of passing the pruning results *down*, we could pass them *up* to the leader and then throw an error if they're different? regards, tom lane
On Mon, Aug 7, 2023 at 22:29 Tom Lane <tgl@sss.pgh.pa.us> wrote:
Robert Haas <robertmhaas@gmail.com> writes:
> ... Second, we've generally made a
> decision up until now that we don't want to have a hard dependency on
> stable functions actually being stable. If they aren't, and for
> example you're using index expressions, your queries may return wrong
> answers, but you won't get weird internal error messages, and you
> won't get a crash. I think the bar for this feature is the same.
Yeah, I agree --- wrong answers may be acceptable in such a case, but
crashes or unintelligible error messages aren't. There are practical
reasons for being tolerant here, notably that it's not that easy
for users to get their volatility markings right.
In the case at hand, I think that means that allowing workers to do
pruning would require hardening the parallel append code against the
situation where their pruning results vary. Maybe, instead of passing
the pruning results *down*, we could pass them *up* to the leader and
then throw an error if they're different?
Note we’re talking here about “initial” pruning that occurs during ExecInitNode(). Workers are only launched during ExecGather[Merge]() which thereafter do ExecInitNode() on their copy of the the plan tree. So if we are to pass the pruning results for cross-checking, it will have to be from the leader to workers.
Thanks, Amit Langote
EDB: http://www.enterprisedb.com
EDB: http://www.enterprisedb.com
On Mon, Aug 7, 2023 at 10:25 AM Amit Langote <amitlangote09@gmail.com> wrote: > Note we’re talking here about “initial” pruning that occurs during ExecInitNode(). Workers are only launched during ExecGather[Merge]()which thereafter do ExecInitNode() on their copy of the the plan tree. So if we are to pass the pruningresults for cross-checking, it will have to be from the leader to workers. That doesn't seem like a big problem because there aren't many node types that do pruning, right? I think we're just talking about Append and MergeAppend, or something like that, right? You just need the ExecWhateverEstimate function to budget some DSM space to store the information, which can basically just be a bitmap over the set of child plans, and the ExecWhateverInitializeDSM copies the information into that DSM space, and ExecWhateverInitializeWorker() copies the information from the shared space back into the local node (or maybe just points to it, if the representation is sufficiently compatible). I feel like this is an hour or two's worth of coding, unless I'm missing something, and WAY easier than trying to reason about what happens if expression evaluation isn't as stable as we'd like it to be. -- Robert Haas EDB: http://www.enterprisedb.com
On Tue, Aug 8, 2023 at 12:53 AM Robert Haas <robertmhaas@gmail.com> wrote: > On Mon, Aug 7, 2023 at 10:25 AM Amit Langote <amitlangote09@gmail.com> wrote: > > Note we’re talking here about “initial” pruning that occurs during ExecInitNode(). Workers are only launched during ExecGather[Merge]()which thereafter do ExecInitNode() on their copy of the the plan tree. So if we are to pass the pruningresults for cross-checking, it will have to be from the leader to workers. > > That doesn't seem like a big problem because there aren't many node > types that do pruning, right? I think we're just talking about Append > and MergeAppend, or something like that, right? MergeAppend can't be parallel-aware atm, so only Append. > You just need the > ExecWhateverEstimate function to budget some DSM space to store the > information, which can basically just be a bitmap over the set of > child plans, and the ExecWhateverInitializeDSM copies the information > into that DSM space, and ExecWhateverInitializeWorker() copies the > information from the shared space back into the local node (or maybe > just points to it, if the representation is sufficiently compatible). > I feel like this is an hour or two's worth of coding, unless I'm > missing something, and WAY easier than trying to reason about what > happens if expression evaluation isn't as stable as we'd like it to > be. OK, I agree that we'd better share the pruning result between the leader and workers. I hadn't thought about putting the pruning result into Append's DSM (ParallelAppendState), which is what you're describing IIUC. I looked into it, though I'm not sure if it can be made to work given the way things are on the worker side, or at least not without some reshuffling of code in ParallelQueryMain(). The pruning result will have to be available in ExecInitAppend, but because the worker reads the DSM only after finishing the plan tree initialization, it won't. Perhaps, we can integrate ExecParallelInitializeWorker()'s responsibilities into ExecutorStart() / ExecInitNode() somehow? So change the ordering of the following code in ParallelQueryMain(): /* Start up the executor */ queryDesc->plannedstmt->jitFlags = fpes->jit_flags; ExecutorStart(queryDesc, fpes->eflags); /* Special executor initialization steps for parallel workers */ queryDesc->planstate->state->es_query_dsa = area; if (DsaPointerIsValid(fpes->param_exec)) { char *paramexec_space; paramexec_space = dsa_get_address(area, fpes->param_exec); RestoreParamExecParams(paramexec_space, queryDesc->estate); } pwcxt.toc = toc; pwcxt.seg = seg; ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt); Looking inside ExecParallelInitializeWorker(): static bool ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) { if (planstate == NULL) return false; switch (nodeTag(planstate)) { case T_SeqScanState: if (planstate->plan->parallel_aware) ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt); I guess that'd mean putting the if (planstate->plan->parallel_aware) block seen here at the end of ExecInitSeqScan() and so on. Or we could consider something like the patch I mentioned in my 1st email. The idea there was to pass the pruning result via a separate channel, not the DSM chunk linked into the PlanState tree. To wit, on the leader side, ExecInitParallelPlan() puts the serialized List-of-Bitmapset into the shm_toc with a dedicated PARALLEL_KEY, alongside PlannedStmt, ParamListInfo, etc. The List-of-Bitmpaset is initialized during the leader's ExecInitNode(). On the worker side, ExecParallelGetQueryDesc() reads the List-of-Bitmapset string and puts the resulting node into the QueryDesc, that ParallelQueryMain() then uses to do ExecutorStart() which copies the pointer to EState.es_part_prune_results. ExecInitAppend() consults EState.es_part_prune_results and uses the Bitmapset from there, if present, instead of performing initial pruning. I'm assuming it's not too ugly if ExecInitAppend() uses IsParallelWorker() to decide whether it should be writing to EState.es_part_prune_results or reading from it -- the former if in the leader and the latter in a worker. If we are to go with this approach we will need to un-revert ec386948948c, which moved PartitionPruneInfo nodes out of Append/MergeAppend nodes to a List in PlannedStmt (copied into EState.es_part_prune_infos), such that es_part_prune_results mirrors es_part_prune_infos. Thoughts? -- Thanks, Amit Langote EDB: http://www.enterprisedb.com
On Tue, Aug 8, 2023 at 2:58 AM Amit Langote <amitlangote09@gmail.com> wrote: > > That doesn't seem like a big problem because there aren't many node > > types that do pruning, right? I think we're just talking about Append > > and MergeAppend, or something like that, right? > > MergeAppend can't be parallel-aware atm, so only Append. Well, the question isn't whether it's parallel-aware, but whether startup-time pruning happens there. > So change the ordering of the following code in ParallelQueryMain(): Yeah, that would be a reasonable thing to do. > Or we could consider something like the patch I mentioned in my 1st > email. The idea there was to pass the pruning result via a separate > channel, not the DSM chunk linked into the PlanState tree. To wit, on > the leader side, ExecInitParallelPlan() puts the serialized > List-of-Bitmapset into the shm_toc with a dedicated PARALLEL_KEY, > alongside PlannedStmt, ParamListInfo, etc. The List-of-Bitmpaset is > initialized during the leader's ExecInitNode(). On the worker side, > ExecParallelGetQueryDesc() reads the List-of-Bitmapset string and puts > the resulting node into the QueryDesc, that ParallelQueryMain() then > uses to do ExecutorStart() which copies the pointer to > EState.es_part_prune_results. ExecInitAppend() consults > EState.es_part_prune_results and uses the Bitmapset from there, if > present, instead of performing initial pruning. This also seems reasonable. > I'm assuming it's not > too ugly if ExecInitAppend() uses IsParallelWorker() to decide whether > it should be writing to EState.es_part_prune_results or reading from > it -- the former if in the leader and the latter in a worker. I don't think that's too ugly. I mean you have to have an if statement someplace. > If we > are to go with this approach we will need to un-revert ec386948948c, > which moved PartitionPruneInfo nodes out of Append/MergeAppend nodes > to a List in PlannedStmt (copied into EState.es_part_prune_infos), > such that es_part_prune_results mirrors es_part_prune_infos. The comment for the revert (which was 5472743d9e8583638a897b47558066167cc14583) points to https://www.postgresql.org/message-id/4191508.1674157166@sss.pgh.pa.us as the reason, but it's not very clear to me why that email led to this being reverted. In any event, I agree that if we go with your idea to pass this via a separate PARALLEL_KEY, unreverting that patch seems to make sense, because otherwise I think we don't have a fast way to find the nodes that contain the state that we care about. -- Robert Haas EDB: http://www.enterprisedb.com
On Tue, Aug 8, 2023 at 11:16 PM Robert Haas <robertmhaas@gmail.com> wrote: > On Tue, Aug 8, 2023 at 2:58 AM Amit Langote <amitlangote09@gmail.com> wrote: > > Or we could consider something like the patch I mentioned in my 1st > > email. The idea there was to pass the pruning result via a separate > > channel, not the DSM chunk linked into the PlanState tree. To wit, on > > the leader side, ExecInitParallelPlan() puts the serialized > > List-of-Bitmapset into the shm_toc with a dedicated PARALLEL_KEY, > > alongside PlannedStmt, ParamListInfo, etc. The List-of-Bitmpaset is > > initialized during the leader's ExecInitNode(). On the worker side, > > ExecParallelGetQueryDesc() reads the List-of-Bitmapset string and puts > > the resulting node into the QueryDesc, that ParallelQueryMain() then > > uses to do ExecutorStart() which copies the pointer to > > EState.es_part_prune_results. ExecInitAppend() consults > > EState.es_part_prune_results and uses the Bitmapset from there, if > > present, instead of performing initial pruning. > > This also seems reasonable. > > > I'm assuming it's not > > too ugly if ExecInitAppend() uses IsParallelWorker() to decide whether > > it should be writing to EState.es_part_prune_results or reading from > > it -- the former if in the leader and the latter in a worker. > > I don't think that's too ugly. I mean you have to have an if statement > someplace. Yes, that makes sense. It's just that I thought maybe I haven't thought hard enough about options before adding a new IsParallelWorker(), because I don't find too many instances of IsParallelWorker() in the generic executor code. I think that's because most parallel worker-specific logic lives in execParallel.c or in Exec*Worker() functions outside that file, so the generic code remains parallel query agnostic as much as possible. > > If we > > are to go with this approach we will need to un-revert ec386948948c, > > which moved PartitionPruneInfo nodes out of Append/MergeAppend nodes > > to a List in PlannedStmt (copied into EState.es_part_prune_infos), > > such that es_part_prune_results mirrors es_part_prune_infos. > > The comment for the revert (which was > 5472743d9e8583638a897b47558066167cc14583) points to > https://www.postgresql.org/message-id/4191508.1674157166@sss.pgh.pa.us > as the reason, but it's not very clear to me why that email led to > this being reverted. In any event, I agree that if we go with your > idea to pass this via a separate PARALLEL_KEY, unreverting that patch > seems to make sense, because otherwise I think we don't have a fast > way to find the nodes that contain the state that we care about. OK, I've attached the unreverted patch that adds EState.es_part_prune_infos as 0001. 0002 adds EState.es_part_prune_results. Parallel query leader stores the bitmapset of initially valid subplans by performing initial pruning steps contained in a given PartitionPruneInfo into that list at the same index as the PartitionPruneInfo's index in es_part_prune_infos. ExecInitParallelPlan() serializes es_part_prune_results and stores it in the DSM. A worker initializes es_part_prune_results in its own EState by reading the leader's value from the DSM and for each PartitionPruneInfo in its own copy of EState.es_part_prune_infos, gets the set of initially valid subplans by referring to es_part_prune_results in lieu of performing initial pruning again. Should workers, as Tom says, instead do the pruning and cross-check the result to give an error if it doesn't match the leader's? The error message can't specifically point out which, though a user would at least know that they have functions in their database with wrong volatility markings. -- Thanks, Amit Langote EDB: http://www.enterprisedb.com
Attachment
On Wed, Aug 9, 2023 at 6:22 AM Amit Langote <amitlangote09@gmail.com> wrote: > > > I'm assuming it's not > > > too ugly if ExecInitAppend() uses IsParallelWorker() to decide whether > > > it should be writing to EState.es_part_prune_results or reading from > > > it -- the former if in the leader and the latter in a worker. > > > > I don't think that's too ugly. I mean you have to have an if statement > > someplace. > > Yes, that makes sense. > > It's just that I thought maybe I haven't thought hard enough about > options before adding a new IsParallelWorker(), because I don't find > too many instances of IsParallelWorker() in the generic executor code. > I think that's because most parallel worker-specific logic lives in > execParallel.c or in Exec*Worker() functions outside that file, so the > generic code remains parallel query agnostic as much as possible. Oh, actually, there is an issue here. IsParallelWorker() is not the right test. Imagine that there's a parallel query which launches some workers, and one of those calls a user-defined function which again uses parallelism, launching more workers. This may not be possible today, I don't really remember, but the test should be "am I a parallel worker with respect to this plan?" not "am I a parallel worker at all?".Not quite sure what the best way to code that is. If we could just test whether we have a ParallelWorkerContext, it would be easy... -- Robert Haas EDB: http://www.enterprisedb.com
On Wed, Aug 9, 2023 at 9:48 PM Robert Haas <robertmhaas@gmail.com> wrote: > On Wed, Aug 9, 2023 at 6:22 AM Amit Langote <amitlangote09@gmail.com> wrote: > > > > I'm assuming it's not > > > > too ugly if ExecInitAppend() uses IsParallelWorker() to decide whether > > > > it should be writing to EState.es_part_prune_results or reading from > > > > it -- the former if in the leader and the latter in a worker. > > > > > > I don't think that's too ugly. I mean you have to have an if statement > > > someplace. > > > > Yes, that makes sense. > > > > It's just that I thought maybe I haven't thought hard enough about > > options before adding a new IsParallelWorker(), because I don't find > > too many instances of IsParallelWorker() in the generic executor code. > > I think that's because most parallel worker-specific logic lives in > > execParallel.c or in Exec*Worker() functions outside that file, so the > > generic code remains parallel query agnostic as much as possible. > > Oh, actually, there is an issue here. IsParallelWorker() is not the > right test. Imagine that there's a parallel query which launches some > workers, and one of those calls a user-defined function which again > uses parallelism, launching more workers. This may not be possible > today, I don't really remember, but the test should be "am I a > parallel worker with respect to this plan?" not "am I a parallel > worker at all?".Not quite sure what the best way to code that is. If > we could just test whether we have a ParallelWorkerContext, it would > be easy... I checked enough to be sure that IsParallelWorker() is reliable at the time of ExecutorStart() / ExecInitNode() in ParallelQueryMain() in a worker. However, ParallelWorkerContext is not available at that point. Here's the relevant part of ParallelQueryMain(): /* Start up the executor */ queryDesc->plannedstmt->jitFlags = fpes->jit_flags; ExecutorStart(queryDesc, fpes->eflags); /* Special executor initialization steps for parallel workers */ queryDesc->planstate->state->es_query_dsa = area; if (DsaPointerIsValid(fpes->param_exec)) { char *paramexec_space; paramexec_space = dsa_get_address(area, fpes->param_exec); RestoreParamExecParams(paramexec_space, queryDesc->estate); } pwcxt.toc = toc; pwcxt.seg = seg; ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt); BTW, we do also use IsParallelWorker() in ExecGetRangeTableRelation() which also probably only runs during ExecInitNode(), same as ExecInitPartitionPruning() that this patch adds it to. -- Thanks, Amit Langote EDB: http://www.enterprisedb.com
On Wed, Aug 9, 2023 at 8:57 AM Amit Langote <amitlangote09@gmail.com> wrote: > I checked enough to be sure that IsParallelWorker() is reliable at the > time of ExecutorStart() / ExecInitNode() in ParallelQueryMain() in a > worker. However, ParallelWorkerContext is not available at that > point. Here's the relevant part of ParallelQueryMain(): > > /* Start up the executor */ > queryDesc->plannedstmt->jitFlags = fpes->jit_flags; > ExecutorStart(queryDesc, fpes->eflags); > > /* Special executor initialization steps for parallel workers */ > queryDesc->planstate->state->es_query_dsa = area; > if (DsaPointerIsValid(fpes->param_exec)) > { > char *paramexec_space; > > paramexec_space = dsa_get_address(area, fpes->param_exec); > RestoreParamExecParams(paramexec_space, queryDesc->estate); > } > pwcxt.toc = toc; > pwcxt.seg = seg; > ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt); > > BTW, we do also use IsParallelWorker() in ExecGetRangeTableRelation() > which also probably only runs during ExecInitNode(), same as > ExecInitPartitionPruning() that this patch adds it to. I don't know if that's a great idea, but I guess if we're already doing it, it doesn't hurt to expand the use a little bit. -- Robert Haas EDB: http://www.enterprisedb.com