From 5d786d24e09efbc746305838a5c57d38b0edcff3 Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Mon, 30 Mar 2026 01:28:34 +0300 Subject: [PATCH v16 2/3] Common infrastructure to MergeAppend and Append nodes This commit introduces the execAppend.c file to centralize executor operations that can be shared between MergeAppend and Append nodes. The logic for initializing and ending the nodes, as well as initializing asynchronous execution, is very similar for both MergeAppend and Append. To reduce redundancy, the duplicated code has been moved to execAppend.c, allowing these nodes to reuse the common implementation. The code responsible for actually fetching the tuples remains specific to each node and was not refactored into execAppend.c to preserve the unique execution requirements of each node type. Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru Author: Matheus Alcantara Co-authored-by: Alexander Korotkov Reviewed-by: Alexander Pyhalov Reviewed-by: Alena Rybakina --- contrib/pg_overexplain/pg_overexplain.c | 8 +- contrib/pg_plan_advice/pgpa_scan.c | 4 +- contrib/pg_plan_advice/pgpa_walker.c | 8 +- contrib/postgres_fdw/postgres_fdw.c | 12 +- src/backend/commands/explain.c | 26 +- src/backend/executor/Makefile | 1 + src/backend/executor/execAmi.c | 2 +- src/backend/executor/execAppend.c | 404 +++++++++++++++++++ src/backend/executor/execCurrent.c | 4 +- src/backend/executor/execProcnode.c | 8 +- src/backend/executor/meson.build | 1 + src/backend/executor/nodeAppend.c | 514 ++++-------------------- src/backend/executor/nodeMergeAppend.c | 188 ++------- src/backend/nodes/nodeFuncs.c | 8 +- src/backend/optimizer/plan/createplan.c | 46 +-- src/backend/optimizer/plan/setrefs.c | 48 +-- src/backend/optimizer/plan/subselect.c | 4 +- src/backend/utils/adt/ruleutils.c | 8 +- src/include/executor/execAppend.h | 60 +++ src/include/nodes/execnodes.h | 70 ++-- src/include/nodes/plannodes.h | 63 ++- src/tools/pgindent/typedefs.list | 2 + 22 files changed, 751 insertions(+), 738 deletions(-) create mode 100644 src/backend/executor/execAppend.c create mode 100644 src/include/executor/execAppend.h diff --git a/contrib/pg_overexplain/pg_overexplain.c b/contrib/pg_overexplain/pg_overexplain.c index c2b90493cc6..7361834c544 100644 --- a/contrib/pg_overexplain/pg_overexplain.c +++ b/contrib/pg_overexplain/pg_overexplain.c @@ -232,18 +232,18 @@ overexplain_per_node_hook(PlanState *planstate, List *ancestors, break; case T_Append: overexplain_bitmapset("Append RTIs", - ((Append *) plan)->apprelids, + ((Append *) plan)->ap.apprelids, es); overexplain_bitmapset_list("Child Append RTIs", - ((Append *) plan)->child_append_relid_sets, + ((Append *) plan)->ap.child_append_relid_sets, es); break; case T_MergeAppend: overexplain_bitmapset("Append RTIs", - ((MergeAppend *) plan)->apprelids, + ((MergeAppend *) plan)->ap.apprelids, es); overexplain_bitmapset_list("Child Append RTIs", - ((MergeAppend *) plan)->child_append_relid_sets, + ((MergeAppend *) plan)->ap.child_append_relid_sets, es); break; case T_Result: diff --git a/contrib/pg_plan_advice/pgpa_scan.c b/contrib/pg_plan_advice/pgpa_scan.c index 5f210f2b725..18b4ef0fb03 100644 --- a/contrib/pg_plan_advice/pgpa_scan.c +++ b/contrib/pg_plan_advice/pgpa_scan.c @@ -142,7 +142,7 @@ pgpa_build_scan(pgpa_plan_walker_context *walker, Plan *plan, /* Be sure to account for pulled-up scans. */ child_append_relid_sets = - ((Append *) plan)->child_append_relid_sets; + ((Append *) plan)->ap.child_append_relid_sets; break; case T_MergeAppend: /* Same logic here as for Append, above. */ @@ -154,7 +154,7 @@ pgpa_build_scan(pgpa_plan_walker_context *walker, Plan *plan, /* Be sure to account for pulled-up scans. */ child_append_relid_sets = - ((MergeAppend *) plan)->child_append_relid_sets; + ((MergeAppend *) plan)->ap.child_append_relid_sets; break; default: strategy = PGPA_SCAN_ORDINARY; diff --git a/contrib/pg_plan_advice/pgpa_walker.c b/contrib/pg_plan_advice/pgpa_walker.c index e32684d2075..f1bd6617005 100644 --- a/contrib/pg_plan_advice/pgpa_walker.c +++ b/contrib/pg_plan_advice/pgpa_walker.c @@ -440,14 +440,14 @@ pgpa_walk_recursively(pgpa_plan_walker_context *walker, Plan *plan, { Append *aplan = (Append *) plan; - extraplans = aplan->appendplans; + extraplans = aplan->ap.subplans; } break; case T_MergeAppend: { MergeAppend *maplan = (MergeAppend *) plan; - extraplans = maplan->mergeplans; + extraplans = maplan->ap.subplans; } break; case T_BitmapAnd: @@ -570,9 +570,9 @@ pgpa_relids(Plan *plan) else if (IsA(plan, ForeignScan)) return ((ForeignScan *) plan)->fs_relids; else if (IsA(plan, Append)) - return ((Append *) plan)->apprelids; + return ((Append *) plan)->ap.apprelids; else if (IsA(plan, MergeAppend)) - return ((MergeAppend *) plan)->apprelids; + return ((MergeAppend *) plan)->ap.apprelids; return NULL; } diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 41e47cc795b..7416d09c7e2 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2413,8 +2413,8 @@ find_modifytable_subplan(PlannerInfo *root, { Append *appendplan = (Append *) subplan; - if (subplan_index < list_length(appendplan->appendplans)) - subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); + if (subplan_index < list_length(appendplan->ap.subplans)) + subplan = (Plan *) list_nth(appendplan->ap.subplans, subplan_index); } else if (IsA(subplan, Result) && outerPlan(subplan) != NULL && @@ -2422,8 +2422,8 @@ find_modifytable_subplan(PlannerInfo *root, { Append *appendplan = (Append *) outerPlan(subplan); - if (subplan_index < list_length(appendplan->appendplans)) - subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); + if (subplan_index < list_length(appendplan->ap.subplans)) + subplan = (Plan *) list_nth(appendplan->ap.subplans, subplan_index); } /* Now, have we got a ForeignScan on the desired rel? */ @@ -7215,7 +7215,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; AppendState *requestor = (AppendState *) areq->requestor; - WaitEventSet *set = requestor->as_eventset; + WaitEventSet *set = requestor->as.eventset; /* This should not be called unless callback_pending */ Assert(areq->callback_pending); @@ -7257,7 +7257,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) * below, because we might otherwise end up with no configured events * other than the postmaster death event. */ - if (!bms_is_empty(requestor->as_needrequest)) + if (!bms_is_empty(requestor->as.needrequest)) return; if (GetNumRegisteredWaitEvents(set) > 1) return; diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index e4b70166b0e..a0ed8c3fc3a 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1228,11 +1228,11 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) break; case T_Append: *rels_used = bms_add_members(*rels_used, - ((Append *) plan)->apprelids); + ((Append *) plan)->ap.apprelids); break; case T_MergeAppend: *rels_used = bms_add_members(*rels_used, - ((MergeAppend *) plan)->apprelids); + ((MergeAppend *) plan)->ap.apprelids); break; case T_Result: *rels_used = bms_add_members(*rels_used, @@ -1276,7 +1276,7 @@ plan_is_disabled(Plan *plan) * includes any run-time pruned children. Ignoring those could give * us the incorrect number of disabled nodes. */ - foreach(lc, aplan->appendplans) + foreach(lc, aplan->ap.subplans) { Plan *subplan = lfirst(lc); @@ -1293,7 +1293,7 @@ plan_is_disabled(Plan *plan) * includes any run-time pruned children. Ignoring those could give * us the incorrect number of disabled nodes. */ - foreach(lc, maplan->mergeplans) + foreach(lc, maplan->ap.subplans) { Plan *subplan = lfirst(lc); @@ -2340,13 +2340,13 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_Append: - ExplainMissingMembers(((AppendState *) planstate)->as_nplans, - list_length(((Append *) plan)->appendplans), + ExplainMissingMembers(((AppendState *) planstate)->as.nplans, + list_length(((Append *) plan)->ap.subplans), es); break; case T_MergeAppend: - ExplainMissingMembers(((MergeAppendState *) planstate)->ms_nplans, - list_length(((MergeAppend *) plan)->mergeplans), + ExplainMissingMembers(((MergeAppendState *) planstate)->ms.nplans, + list_length(((MergeAppend *) plan)->ap.subplans), es); break; default: @@ -2390,13 +2390,13 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_Append: - ExplainMemberNodes(((AppendState *) planstate)->appendplans, - ((AppendState *) planstate)->as_nplans, + ExplainMemberNodes(((AppendState *) planstate)->as.plans, + ((AppendState *) planstate)->as.nplans, ancestors, es); break; case T_MergeAppend: - ExplainMemberNodes(((MergeAppendState *) planstate)->mergeplans, - ((MergeAppendState *) planstate)->ms_nplans, + ExplainMemberNodes(((MergeAppendState *) planstate)->ms.plans, + ((MergeAppendState *) planstate)->ms.nplans, ancestors, es); break; case T_BitmapAnd: @@ -2610,7 +2610,7 @@ static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ExplainState *es) { - MergeAppend *plan = (MergeAppend *) mstate->ps.plan; + MergeAppend *plan = (MergeAppend *) mstate->ms.ps.plan; show_sort_group_keys((PlanState *) mstate, "Sort Key", plan->numCols, 0, plan->sortColIdx, diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 11118d0ce02..66b62fca921 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ execAsync.o \ + execAppend.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 37fe03fdc37..23cb2f82281 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -538,7 +538,7 @@ ExecSupportsBackwardScan(Plan *node) if (((Append *) node)->nasyncplans > 0) return false; - foreach(l, ((Append *) node)->appendplans) + foreach(l, ((Append *) node)->ap.subplans) { if (!ExecSupportsBackwardScan((Plan *) lfirst(l))) return false; diff --git a/src/backend/executor/execAppend.c b/src/backend/executor/execAppend.c new file mode 100644 index 00000000000..6233dbe7b30 --- /dev/null +++ b/src/backend/executor/execAppend.c @@ -0,0 +1,404 @@ +/*------------------------------------------------------------------------- + * + * execAppend.c + * This code provides support functions for executing MergeAppend and + * Append nodes. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/execAppend.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "executor/execAppend.h" +#include "executor/execAsync.h" +#include "executor/execPartition.h" +#include "executor/executor.h" +#include "miscadmin.h" +#include "storage/latch.h" +#include "storage/waiteventset.h" + +#define EVENT_BUFFER_SIZE 16 + +/* Begin all of the subscans of an Appender node. */ +void +ExecInitAppender(AppenderState *state, + Appender *node, + EState *estate, + int eflags, + int first_partial_plan, + int *first_valid_partial_plan) +{ + PlanState **appendplanstates; + const TupleTableSlotOps *appendops; + Bitmapset *validsubplans; + Bitmapset *asyncplans; + int nplans; + int nasyncplans; + int firstvalid; + int i, + j; + + /* If run-time partition pruning is enabled, then set that up now */ + if (node->part_prune_index >= 0) + { + PartitionPruneState *prunestate; + + /* + * Set up pruning data structure. This also initializes the set of + * subplans to initialize (validsubplans) by taking into account the + * result of performing initial pruning if any. + */ + prunestate = ExecInitPartitionExecPruning(&state->ps, + list_length(node->subplans), + node->part_prune_index, + node->apprelids, + &validsubplans); + state->prune_state = prunestate; + nplans = bms_num_members(validsubplans); + + /* + * When no run-time pruning is required and there's at least one + * subplan, we can fill valid_subplans immediately, preventing later + * calls to ExecFindMatchingSubPlans. + */ + if (!prunestate->do_exec_prune && nplans > 0) + { + state->valid_subplans = bms_add_range(NULL, 0, nplans - 1); + state->valid_subplans_identified = true; + } + } + else + { + nplans = list_length(node->subplans); + + /* + * When run-time partition pruning is not enabled we can just mark all + * subplans as valid; they must also all be initialized. + */ + Assert(nplans > 0); + state->valid_subplans = validsubplans = + bms_add_range(NULL, 0, nplans - 1); + state->valid_subplans_identified = true; + state->prune_state = NULL; + } + + appendplanstates = palloc0_array(PlanState *, nplans); + + /* + * call ExecInitNode on each of the valid plans to be executed and save + * the results into the appendplanstates array. + * + * While at it, find out the first valid partial plan. + */ + j = 0; + asyncplans = NULL; + nasyncplans = 0; + firstvalid = nplans; + i = -1; + while ((i = bms_next_member(validsubplans, i)) >= 0) + { + Plan *initNode = (Plan *) list_nth(node->subplans, i); + + /* + * Record async subplans. When executing EvalPlanQual, we treat them + * as sync ones; don't do this when initializing an EvalPlanQual plan + * tree. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + + /* + * Record the lowest appendplans index which is a valid partial plan. + */ + if (first_valid_partial_plan && i >= first_partial_plan && j < firstvalid) + firstvalid = j; + + appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); + } + + if (first_valid_partial_plan) + *first_valid_partial_plan = firstvalid; + + state->plans = appendplanstates; + state->nplans = nplans; + + /* + * Initialize Append's result tuple type and slot. If the child plans all + * produce the same fixed slot type, we can use that slot type; otherwise + * make a virtual slot. (Note that the result slot itself is used only to + * return a null tuple at end of execution; real tuples are returned to + * the caller in the children's own result slots. What we are doing here + * is allowing the parent plan node to optimize if the Append will return + * only one kind of slot.) + */ + appendops = ExecGetCommonSlotOps(appendplanstates, j); + if (appendops != NULL) + { + ExecInitResultTupleSlotTL(&state->ps, appendops); + } + else + { + ExecInitResultTupleSlotTL(&state->ps, &TTSOpsVirtual); + /* show that the output slot type is not fixed */ + state->ps.resultopsset = true; + state->ps.resultopsfixed = false; + } + + /* Initialize async state */ + state->asyncplans = asyncplans; + state->nasyncplans = nasyncplans; + state->asyncrequests = NULL; + state->asyncresults = NULL; + state->needrequest = NULL; + state->eventset = NULL; + state->valid_asyncplans = NULL; + + if (nasyncplans > 0) + { + state->asyncrequests = palloc0_array(AsyncRequest *, nplans); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc_object(AsyncRequest); + areq->requestor = (PlanState *) state; + areq->requestee = appendplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + state->asyncrequests[i] = areq; + } + + state->asyncresults = palloc0_array(TupleTableSlot *, nplans); + } + + /* + * Miscellaneous initialization + */ + state->ps.ps_ProjInfo = NULL; +} + +void +ExecReScanAppender(AppenderState *node) +{ + int i; + int nasyncplans = node->nasyncplans; + + /* + * If any PARAM_EXEC Params used in pruning expressions have changed, then + * we'd better unset the valid subplans so that they are reselected for + * the new parameter values. + */ + if (node->prune_state && + bms_overlap(node->ps.chgParam, + node->prune_state->execparamids)) + { + node->valid_subplans_identified = false; + bms_free(node->valid_subplans); + node->valid_subplans = NULL; + bms_free(node->valid_asyncplans); + node->valid_asyncplans = NULL; + } + + for (i = 0; i < node->nplans; i++) + { + PlanState *subnode = node->plans[i]; + + /* + * ExecReScan doesn't know about my subplans, so I have to do + * changed-parameter signaling myself. + */ + if (node->ps.chgParam != NULL) + UpdateChangedParamSet(subnode, node->ps.chgParam); + + /* + * If chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. + */ + if (subnode->chgParam == NULL) + ExecReScan(subnode); + } + + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + bms_free(node->needrequest); + node->needrequest = NULL; + } +} + +/* Wait or poll for file descriptor events and fire callbacks. */ +void +ExecAppenderAsyncEventWait(AppenderState *node, int timeout, uint32 wait_event_info) +{ + int nevents = node->nasyncplans + 2; /* one for PM death and + * one for latch */ + int noccurred; + int i; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + + Assert(node->eventset == NULL); + + node->eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); + AddWaitEventToSet(node->eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* + * No need for further processing if none of the subplans configured any + * events. + */ + if (GetNumRegisteredWaitEvents(node->eventset) == 1) + { + FreeWaitEventSet(node->eventset); + node->eventset = NULL; + return; + } + + /* + * Add the process latch to the set, so that we wake up to process the + * standard interrupts with CHECK_FOR_INTERRUPTS(). + * + * NOTE: For historical reasons, it's important that this is added to the + * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, + * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if + * any other events are in the set. That's a poor design, it's + * questionable for postgres_fdw to be doing that in the first place, but + * we cannot change it now. The pattern has possibly been copied to other + * extensions too. + */ + AddWaitEventToSet(node->eventset, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + + /* Return at most EVENT_BUFFER_SIZE events in one call. */ + if (nevents > EVENT_BUFFER_SIZE) + nevents = EVENT_BUFFER_SIZE; + + /* Wait until at least one event occurs. */ + noccurred = WaitEventSetWait(node->eventset, timeout, occurred_event, + nevents, wait_event_info); + + + FreeWaitEventSet(node->eventset); + node->eventset = NULL; + if (noccurred == 0) + return; + + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + if (areq->callback_pending) + { + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } + + /* Handle standard interrupts */ + if ((w->events & WL_LATCH_SET) != 0) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } +} + +/* Begin executing async-capable subplans. */ +void +ExecAppenderAsyncBegin(AppenderState *node) +{ + int i; + + /* Backward scan is not supported by async-aware Appends. */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no subplans */ + Assert(node->nplans > 0); + + /* We should never be called when there are no async subplans. */ + Assert(node->nasyncplans > 0); + + /* Make a request for each of the valid async subplans. */ + i = -1; + while ((i = bms_next_member(node->valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } +} + +/* Shuts down the subplans of an Appender node. */ +void +ExecEndAppender(AppenderState *node) +{ + PlanState **subplans; + int nplans; + int i; + + /* + * get information from the node + */ + subplans = node->plans; + nplans = node->nplans; + + /* + * shut down each of the subscans + */ + for (i = 0; i < nplans; i++) + ExecEndNode(subplans[i]); +} diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c index 99f2b2d0c6f..37f5c7fd2c5 100644 --- a/src/backend/executor/execCurrent.c +++ b/src/backend/executor/execCurrent.c @@ -375,9 +375,9 @@ search_plan_tree(PlanState *node, Oid table_oid, AppendState *astate = (AppendState *) node; int i; - for (i = 0; i < astate->as_nplans; i++) + for (i = 0; i < astate->as.nplans; i++) { - ScanState *elem = search_plan_tree(astate->appendplans[i], + ScanState *elem = search_plan_tree(astate->as.plans[i], table_oid, pending_rescan); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index d35976925ae..8282e86635b 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -911,8 +911,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) AppendState *aState = (AppendState *) child_node; int i; - for (i = 0; i < aState->as_nplans; i++) - ExecSetTupleBound(tuples_needed, aState->appendplans[i]); + for (i = 0; i < aState->as.nplans; i++) + ExecSetTupleBound(tuples_needed, aState->as.plans[i]); } else if (IsA(child_node, MergeAppendState)) { @@ -924,8 +924,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) MergeAppendState *maState = (MergeAppendState *) child_node; int i; - for (i = 0; i < maState->ms_nplans; i++) - ExecSetTupleBound(tuples_needed, maState->mergeplans[i]); + for (i = 0; i < maState->ms.nplans; i++) + ExecSetTupleBound(tuples_needed, maState->ms.plans[i]); } else if (IsA(child_node, ResultState)) { diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build index dc45be0b2ce..06191b22142 100644 --- a/src/backend/executor/meson.build +++ b/src/backend/executor/meson.build @@ -3,6 +3,7 @@ backend_sources += files( 'execAmi.c', 'execAsync.c', + 'execAppend.c', 'execCurrent.c', 'execExpr.c', 'execExprInterp.c', diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 85c85569b5e..975ae87a036 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -57,6 +57,7 @@ #include "postgres.h" +#include "executor/execAppend.h" #include "executor/execAsync.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -111,15 +112,6 @@ AppendState * ExecInitAppend(Append *node, EState *estate, int eflags) { AppendState *appendstate = makeNode(AppendState); - PlanState **appendplanstates; - const TupleTableSlotOps *appendops; - Bitmapset *validsubplans; - Bitmapset *asyncplans; - int nplans; - int nasyncplans; - int firstvalid; - int i, - j; /* check for unsupported flags */ Assert(!(eflags & EXEC_FLAG_MARK)); @@ -127,167 +119,27 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* * create new AppendState for our append node */ - appendstate->ps.plan = (Plan *) node; - appendstate->ps.state = estate; - appendstate->ps.ExecProcNode = ExecAppend; + appendstate->as.ps.plan = (Plan *) node; + appendstate->as.ps.state = estate; + appendstate->as.ps.ExecProcNode = ExecAppend; /* Let choose_next_subplan_* function handle setting the first subplan */ appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; appendstate->as_syncdone = false; appendstate->as_begun = false; - /* If run-time partition pruning is enabled, then set that up now */ - if (node->part_prune_index >= 0) - { - PartitionPruneState *prunestate; - - /* - * Set up pruning data structure. This also initializes the set of - * subplans to initialize (validsubplans) by taking into account the - * result of performing initial pruning if any. - */ - prunestate = ExecInitPartitionExecPruning(&appendstate->ps, - list_length(node->appendplans), - node->part_prune_index, - node->apprelids, - &validsubplans); - appendstate->as_prune_state = prunestate; - nplans = bms_num_members(validsubplans); - - /* - * When no run-time pruning is required and there's at least one - * subplan, we can fill as_valid_subplans immediately, preventing - * later calls to ExecFindMatchingSubPlans. - */ - if (!prunestate->do_exec_prune && nplans > 0) - { - appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1); - appendstate->as_valid_subplans_identified = true; - } - } - else - { - nplans = list_length(node->appendplans); - - /* - * When run-time partition pruning is not enabled we can just mark all - * subplans as valid; they must also all be initialized. - */ - Assert(nplans > 0); - appendstate->as_valid_subplans = validsubplans = - bms_add_range(NULL, 0, nplans - 1); - appendstate->as_valid_subplans_identified = true; - appendstate->as_prune_state = NULL; - } - - appendplanstates = (PlanState **) palloc(nplans * - sizeof(PlanState *)); - - /* - * call ExecInitNode on each of the valid plans to be executed and save - * the results into the appendplanstates array. - * - * While at it, find out the first valid partial plan. - */ - j = 0; - asyncplans = NULL; - nasyncplans = 0; - firstvalid = nplans; - i = -1; - while ((i = bms_next_member(validsubplans, i)) >= 0) - { - Plan *initNode = (Plan *) list_nth(node->appendplans, i); - - /* - * Record async subplans. When executing EvalPlanQual, we treat them - * as sync ones; don't do this when initializing an EvalPlanQual plan - * tree. - */ - if (initNode->async_capable && estate->es_epq_active == NULL) - { - asyncplans = bms_add_member(asyncplans, j); - nasyncplans++; - } - - /* - * Record the lowest appendplans index which is a valid partial plan. - */ - if (i >= node->first_partial_plan && j < firstvalid) - firstvalid = j; - - appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); - } - - appendstate->as_first_partial_plan = firstvalid; - appendstate->appendplans = appendplanstates; - appendstate->as_nplans = nplans; + /* Initialize common fields */ + ExecInitAppender(&appendstate->as, + &node->ap, + estate, + eflags, + node->first_partial_plan, + &appendstate->as_first_partial_plan); - /* - * Initialize Append's result tuple type and slot. If the child plans all - * produce the same fixed slot type, we can use that slot type; otherwise - * make a virtual slot. (Note that the result slot itself is used only to - * return a null tuple at end of execution; real tuples are returned to - * the caller in the children's own result slots. What we are doing here - * is allowing the parent plan node to optimize if the Append will return - * only one kind of slot.) - */ - appendops = ExecGetCommonSlotOps(appendplanstates, j); - if (appendops != NULL) - { - ExecInitResultTupleSlotTL(&appendstate->ps, appendops); - } - else - { - ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual); - /* show that the output slot type is not fixed */ - appendstate->ps.resultopsset = true; - appendstate->ps.resultopsfixed = false; - } + if (appendstate->as.nasyncplans > 0 && appendstate->as.valid_subplans_identified) + classify_matching_subplans(appendstate); - /* Initialize async state */ - appendstate->as_asyncplans = asyncplans; - appendstate->as_nasyncplans = nasyncplans; - appendstate->as_asyncrequests = NULL; - appendstate->as_asyncresults = NULL; - appendstate->as_nasyncresults = 0; appendstate->as_nasyncremain = 0; - appendstate->as_needrequest = NULL; - appendstate->as_eventset = NULL; - appendstate->as_valid_asyncplans = NULL; - - if (nasyncplans > 0) - { - appendstate->as_asyncrequests = (AsyncRequest **) - palloc0(nplans * sizeof(AsyncRequest *)); - - i = -1; - while ((i = bms_next_member(asyncplans, i)) >= 0) - { - AsyncRequest *areq; - - areq = palloc_object(AsyncRequest); - areq->requestor = (PlanState *) appendstate; - areq->requestee = appendplanstates[i]; - areq->request_index = i; - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - - appendstate->as_asyncrequests[i] = areq; - } - - appendstate->as_asyncresults = (TupleTableSlot **) - palloc0(nasyncplans * sizeof(TupleTableSlot *)); - - if (appendstate->as_valid_subplans_identified) - classify_matching_subplans(appendstate); - } - - /* - * Miscellaneous initialization - */ - - appendstate->ps.ps_ProjInfo = NULL; /* For parallel query, this will be overridden later. */ appendstate->choose_next_subplan = choose_next_subplan_locally; @@ -317,11 +169,11 @@ ExecAppend(PlanState *pstate) Assert(!node->as_syncdone); /* Nothing to do if there are no subplans */ - if (node->as_nplans == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + if (node->as.nplans == 0) + return ExecClearTuple(node->as.ps.ps_ResultTupleSlot); /* If there are any async subplans, begin executing them. */ - if (node->as_nasyncplans > 0) + if (node->as.nasyncplans > 0) ExecAppendAsyncBegin(node); /* @@ -329,11 +181,11 @@ ExecAppend(PlanState *pstate) * proceeding. */ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + return ExecClearTuple(node->as.ps.ps_ResultTupleSlot); Assert(node->as_syncdone || (node->as_whichplan >= 0 && - node->as_whichplan < node->as_nplans)); + node->as_whichplan < node->as.nplans)); /* And we're initialized. */ node->as_begun = true; @@ -348,19 +200,19 @@ ExecAppend(PlanState *pstate) /* * try to get a tuple from an async subplan if any */ - if (node->as_syncdone || !bms_is_empty(node->as_needrequest)) + if (node->as_syncdone || !bms_is_empty(node->as.needrequest)) { if (ExecAppendAsyncGetNext(node, &result)) return result; Assert(!node->as_syncdone); - Assert(bms_is_empty(node->as_needrequest)); + Assert(bms_is_empty(node->as.needrequest)); } /* * figure out which sync subplan we are currently processing */ - Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); - subnode = node->appendplans[node->as_whichplan]; + Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as.nplans); + subnode = node->as.plans[node->as_whichplan]; /* * get a tuple from the subplan @@ -387,7 +239,7 @@ ExecAppend(PlanState *pstate) /* choose new sync subplan; if no sync/async subplans, we're done */ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + return ExecClearTuple(node->as.ps.ps_ResultTupleSlot); } } @@ -402,81 +254,22 @@ ExecAppend(PlanState *pstate) void ExecEndAppend(AppendState *node) { - PlanState **appendplans; - int nplans; - int i; - - /* - * get information from the node - */ - appendplans = node->appendplans; - nplans = node->as_nplans; - - /* - * shut down each of the subscans - */ - for (i = 0; i < nplans; i++) - ExecEndNode(appendplans[i]); + ExecEndAppender(&node->as); } void ExecReScanAppend(AppendState *node) { - int nasyncplans = node->as_nasyncplans; - int i; - - /* - * If any PARAM_EXEC Params used in pruning expressions have changed, then - * we'd better unset the valid subplans so that they are reselected for - * the new parameter values. - */ - if (node->as_prune_state && - bms_overlap(node->ps.chgParam, - node->as_prune_state->execparamids)) - { - node->as_valid_subplans_identified = false; - bms_free(node->as_valid_subplans); - node->as_valid_subplans = NULL; - bms_free(node->as_valid_asyncplans); - node->as_valid_asyncplans = NULL; - } - for (i = 0; i < node->as_nplans; i++) - { - PlanState *subnode = node->appendplans[i]; + int nasyncplans = node->as.nasyncplans; - /* - * ExecReScan doesn't know about my subplans, so I have to do - * changed-parameter signaling myself. - */ - if (node->ps.chgParam != NULL) - UpdateChangedParamSet(subnode, node->ps.chgParam); - - /* - * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode or by first ExecAsyncRequest. - */ - if (subnode->chgParam == NULL) - ExecReScan(subnode); - } + ExecReScanAppender(&node->as); - /* Reset async state */ + /* Reset specific append async state */ if (nasyncplans > 0) { - i = -1; - while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as_asyncrequests[i]; - - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - } - node->as_nasyncresults = 0; node->as_nasyncremain = 0; - bms_free(node->as_needrequest); - node->as_needrequest = NULL; } /* Let choose_next_subplan_* function handle setting the first subplan */ @@ -503,7 +296,7 @@ ExecAppendEstimate(AppendState *node, { node->pstate_len = add_size(offsetof(ParallelAppendState, pa_finished), - sizeof(bool) * node->as_nplans); + sizeof(bool) * node->as.nplans); shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); shm_toc_estimate_keys(&pcxt->estimator, 1); @@ -525,7 +318,7 @@ ExecAppendInitializeDSM(AppendState *node, pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); memset(pstate, 0, node->pstate_len); LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); - shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); + shm_toc_insert(pcxt->toc, node->as.ps.plan->plan_node_id, pstate); node->as_pstate = pstate; node->choose_next_subplan = choose_next_subplan_for_leader; @@ -543,7 +336,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) ParallelAppendState *pstate = node->as_pstate; pstate->pa_next_plan = 0; - memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); + memset(pstate->pa_finished, 0, sizeof(bool) * node->as.nplans); } /* ---------------------------------------------------------------- @@ -556,7 +349,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) { - node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); + node->as_pstate = shm_toc_lookup(pwcxt->toc, node->as.ps.plan->plan_node_id, false); node->choose_next_subplan = choose_next_subplan_for_worker; } @@ -574,7 +367,7 @@ choose_next_subplan_locally(AppendState *node) int nextplan; /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + Assert(node->as.nplans > 0); /* Nothing to do if syncdone */ if (node->as_syncdone) @@ -589,33 +382,33 @@ choose_next_subplan_locally(AppendState *node) */ if (whichplan == INVALID_SUBPLAN_INDEX) { - if (node->as_nasyncplans > 0) + if (node->as.nasyncplans > 0) { /* We'd have filled as_valid_subplans already */ - Assert(node->as_valid_subplans_identified); + Assert(node->as.valid_subplans_identified); } - else if (!node->as_valid_subplans_identified) + else if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; } whichplan = -1; } /* Ensure whichplan is within the expected range */ - Assert(whichplan >= -1 && whichplan <= node->as_nplans); + Assert(whichplan >= -1 && whichplan <= node->as.nplans); - if (ScanDirectionIsForward(node->ps.state->es_direction)) - nextplan = bms_next_member(node->as_valid_subplans, whichplan); + if (ScanDirectionIsForward(node->as.ps.state->es_direction)) + nextplan = bms_next_member(node->as.valid_subplans, whichplan); else - nextplan = bms_prev_member(node->as_valid_subplans, whichplan); + nextplan = bms_prev_member(node->as.valid_subplans, whichplan); if (nextplan < 0) { /* Set as_syncdone if in async mode */ - if (node->as_nasyncplans > 0) + if (node->as.nasyncplans > 0) node->as_syncdone = true; return false; } @@ -639,10 +432,10 @@ choose_next_subplan_for_leader(AppendState *node) ParallelAppendState *pstate = node->as_pstate; /* Backward scan is not supported by parallel-aware plans */ - Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + Assert(ScanDirectionIsForward(node->as.ps.state->es_direction)); /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + Assert(node->as.nplans > 0); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); @@ -654,18 +447,18 @@ choose_next_subplan_for_leader(AppendState *node) else { /* Start with last subplan. */ - node->as_whichplan = node->as_nplans - 1; + node->as_whichplan = node->as.nplans - 1; /* * If we've yet to determine the valid subplans then do so now. If * run-time pruning is disabled then the valid subplans will always be * set to all subplans. */ - if (!node->as_valid_subplans_identified) + if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; /* * Mark each invalid plan as finished to allow the loop below to @@ -721,10 +514,10 @@ choose_next_subplan_for_worker(AppendState *node) ParallelAppendState *pstate = node->as_pstate; /* Backward scan is not supported by parallel-aware plans */ - Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + Assert(ScanDirectionIsForward(node->as.ps.state->es_direction)); /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + Assert(node->as.nplans > 0); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); @@ -737,11 +530,11 @@ choose_next_subplan_for_worker(AppendState *node) * run-time pruning is disabled then the valid subplans will always be set * to all subplans. */ - else if (!node->as_valid_subplans_identified) + else if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; mark_invalid_subplans_as_finished(node); } @@ -761,7 +554,7 @@ choose_next_subplan_for_worker(AppendState *node) { int nextplan; - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as.valid_subplans, pstate->pa_next_plan); if (nextplan >= 0) { @@ -774,7 +567,7 @@ choose_next_subplan_for_worker(AppendState *node) * Try looping back to the first valid partial plan, if there is * one. If there isn't, arrange to bail out below. */ - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as.valid_subplans, node->as_first_partial_plan - 1); pstate->pa_next_plan = nextplan < 0 ? node->as_whichplan : nextplan; @@ -799,7 +592,7 @@ choose_next_subplan_for_worker(AppendState *node) /* Pick the plan we found, and advance pa_next_plan one more time. */ node->as_whichplan = pstate->pa_next_plan; - pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, + pstate->pa_next_plan = bms_next_member(node->as.valid_subplans, pstate->pa_next_plan); /* @@ -808,7 +601,7 @@ choose_next_subplan_for_worker(AppendState *node) */ if (pstate->pa_next_plan < 0) { - int nextplan = bms_next_member(node->as_valid_subplans, + int nextplan = bms_next_member(node->as.valid_subplans, node->as_first_partial_plan - 1); if (nextplan >= 0) @@ -850,16 +643,16 @@ mark_invalid_subplans_as_finished(AppendState *node) Assert(node->as_pstate); /* Shouldn't have been called when run-time pruning is not enabled */ - Assert(node->as_prune_state); + Assert(node->as.prune_state); /* Nothing to do if all plans are valid */ - if (bms_num_members(node->as_valid_subplans) == node->as_nplans) + if (bms_num_members(node->as.valid_subplans) == node->as.nplans) return; /* Mark all non-valid plans as finished */ - for (i = 0; i < node->as_nplans; i++) + for (i = 0; i < node->as.nplans; i++) { - if (!bms_is_member(i, node->as_valid_subplans)) + if (!bms_is_member(i, node->as.valid_subplans)) node->as_pstate->pa_finished[i] = true; } } @@ -878,47 +671,25 @@ mark_invalid_subplans_as_finished(AppendState *node) static void ExecAppendAsyncBegin(AppendState *node) { - int i; - - /* Backward scan is not supported by async-aware Appends. */ - Assert(ScanDirectionIsForward(node->ps.state->es_direction)); - - /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); - - /* We should never be called when there are no async subplans. */ - Assert(node->as_nasyncplans > 0); - /* If we've yet to determine the valid subplans then do so now. */ - if (!node->as_valid_subplans_identified) + if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; classify_matching_subplans(node); } /* Initialize state variables. */ - node->as_syncdone = bms_is_empty(node->as_valid_subplans); - node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans); + node->as_syncdone = bms_is_empty(node->as.valid_subplans); + node->as_nasyncremain = bms_num_members(node->as.valid_asyncplans); /* Nothing to do if there are no valid async subplans. */ if (node->as_nasyncremain == 0) return; - /* Make a request for each of the valid async subplans. */ - i = -1; - while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as_asyncrequests[i]; - - Assert(areq->request_index == i); - Assert(!areq->callback_pending); - - /* Do the actual work. */ - ExecAsyncRequest(areq); - } + ExecAppenderAsyncBegin(&node->as); } /* ---------------------------------------------------------------- @@ -963,7 +734,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) if (node->as_syncdone) { Assert(node->as_nasyncremain == 0); - *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + *result = ExecClearTuple(node->as.ps.ps_ResultTupleSlot); return true; } @@ -983,7 +754,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) int i; /* Nothing to do if there are no async subplans needing a new request. */ - if (bms_is_empty(node->as_needrequest)) + if (bms_is_empty(node->as.needrequest)) { Assert(node->as_nasyncresults == 0); return false; @@ -996,17 +767,17 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) if (node->as_nasyncresults > 0) { --node->as_nasyncresults; - *result = node->as_asyncresults[node->as_nasyncresults]; + *result = node->as.asyncresults[node->as_nasyncresults]; return true; } /* Make a new request for each of the async subplans that need it. */ - needrequest = node->as_needrequest; - node->as_needrequest = NULL; + needrequest = node->as.needrequest; + node->as.needrequest = NULL; i = -1; while ((i = bms_next_member(needrequest, i)) >= 0) { - AsyncRequest *areq = node->as_asyncrequests[i]; + AsyncRequest *areq = node->as.asyncrequests[i]; /* Do the actual work. */ ExecAsyncRequest(areq); @@ -1017,7 +788,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) if (node->as_nasyncresults > 0) { --node->as_nasyncresults; - *result = node->as_asyncresults[node->as_nasyncresults]; + *result = node->as.asyncresults[node->as_nasyncresults]; return true; } @@ -1033,105 +804,12 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) static void ExecAppendAsyncEventWait(AppendState *node) { - int nevents = node->as_nasyncplans + 2; long timeout = node->as_syncdone ? -1 : 0; - WaitEvent occurred_event[EVENT_BUFFER_SIZE]; - int noccurred; - int i; /* We should never be called when there are no valid async subplans. */ Assert(node->as_nasyncremain > 0); - Assert(node->as_eventset == NULL); - node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); - AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - - /* Give each waiting subplan a chance to add an event. */ - i = -1; - while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as_asyncrequests[i]; - - if (areq->callback_pending) - ExecAsyncConfigureWait(areq); - } - - /* - * No need for further processing if none of the subplans configured any - * events. - */ - if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) - { - FreeWaitEventSet(node->as_eventset); - node->as_eventset = NULL; - return; - } - - /* - * Add the process latch to the set, so that we wake up to process the - * standard interrupts with CHECK_FOR_INTERRUPTS(). - * - * NOTE: For historical reasons, it's important that this is added to the - * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, - * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if - * any other events are in the set. That's a poor design, it's - * questionable for postgres_fdw to be doing that in the first place, but - * we cannot change it now. The pattern has possibly been copied to other - * extensions too. - */ - AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - - /* Return at most EVENT_BUFFER_SIZE events in one call. */ - if (nevents > EVENT_BUFFER_SIZE) - nevents = EVENT_BUFFER_SIZE; - - /* - * If the timeout is -1, wait until at least one event occurs. If the - * timeout is 0, poll for events, but do not wait at all. - */ - noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, - nevents, WAIT_EVENT_APPEND_READY); - FreeWaitEventSet(node->as_eventset); - node->as_eventset = NULL; - if (noccurred == 0) - return; - - /* Deliver notifications. */ - for (i = 0; i < noccurred; i++) - { - WaitEvent *w = &occurred_event[i]; - - /* - * Each waiting subplan should have registered its wait event with - * user_data pointing back to its AsyncRequest. - */ - if ((w->events & WL_SOCKET_READABLE) != 0) - { - AsyncRequest *areq = (AsyncRequest *) w->user_data; - - if (areq->callback_pending) - { - /* - * Mark it as no longer needing a callback. We must do this - * before dispatching the callback in case the callback resets - * the flag. - */ - areq->callback_pending = false; - - /* Do the actual work. */ - ExecAsyncNotify(areq); - } - } - - /* Handle standard interrupts */ - if ((w->events & WL_LATCH_SET) != 0) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - } + ExecAppenderAsyncEventWait(&node->as, timeout, WAIT_EVENT_APPEND_READY); } /* ---------------------------------------------------------------- @@ -1167,14 +845,14 @@ ExecAsyncAppendResponse(AsyncRequest *areq) } /* Save result so we can return it. */ - Assert(node->as_nasyncresults < node->as_nasyncplans); - node->as_asyncresults[node->as_nasyncresults++] = slot; + Assert(node->as_nasyncresults < node->as.nasyncplans); + node->as.asyncresults[node->as_nasyncresults++] = slot; /* * Mark the subplan that returned a result as ready for a new request. We * don't launch another one here immediately because it might complete. */ - node->as_needrequest = bms_add_member(node->as_needrequest, + node->as.needrequest = bms_add_member(node->as.needrequest, areq->request_index); } @@ -1189,34 +867,20 @@ ExecAsyncAppendResponse(AsyncRequest *areq) static void classify_matching_subplans(AppendState *node) { - Bitmapset *valid_asyncplans; - - Assert(node->as_valid_subplans_identified); - Assert(node->as_valid_asyncplans == NULL); + Assert(node->as.valid_subplans_identified); /* Nothing to do if there are no valid subplans. */ - if (bms_is_empty(node->as_valid_subplans)) + if (bms_is_empty(node->as.valid_subplans)) { node->as_syncdone = true; node->as_nasyncremain = 0; return; } - /* Nothing to do if there are no valid async subplans. */ - if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) - { + /* No valid async subplans identified. */ + if (!classify_matching_subplans_common( + &node->as.valid_subplans, + node->as.asyncplans, + &node->as.valid_asyncplans)) node->as_nasyncremain = 0; - return; - } - - /* Get valid async subplans. */ - valid_asyncplans = bms_intersect(node->as_asyncplans, - node->as_valid_subplans); - - /* Adjust the valid subplans to contain sync subplans only. */ - node->as_valid_subplans = bms_del_members(node->as_valid_subplans, - valid_asyncplans); - - /* Save valid async subplans. */ - node->as_valid_asyncplans = valid_asyncplans; } diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index 72eebd50bdf..cd03b2bc7f8 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -38,6 +38,7 @@ #include "postgres.h" +#include "executor/execAppend.h" #include "executor/executor.h" #include "executor/execPartition.h" #include "executor/nodeMergeAppend.h" @@ -66,12 +67,7 @@ MergeAppendState * ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) { MergeAppendState *mergestate = makeNode(MergeAppendState); - PlanState **mergeplanstates; - const TupleTableSlotOps *mergeops; - Bitmapset *validsubplans; - int nplans; - int i, - j; + int i; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -79,98 +75,22 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) /* * create new MergeAppendState for our node */ - mergestate->ps.plan = (Plan *) node; - mergestate->ps.state = estate; - mergestate->ps.ExecProcNode = ExecMergeAppend; - - /* If run-time partition pruning is enabled, then set that up now */ - if (node->part_prune_index >= 0) - { - PartitionPruneState *prunestate; - - /* - * Set up pruning data structure. This also initializes the set of - * subplans to initialize (validsubplans) by taking into account the - * result of performing initial pruning if any. - */ - prunestate = ExecInitPartitionExecPruning(&mergestate->ps, - list_length(node->mergeplans), - node->part_prune_index, - node->apprelids, - &validsubplans); - mergestate->ms_prune_state = prunestate; - nplans = bms_num_members(validsubplans); - - /* - * When no run-time pruning is required and there's at least one - * subplan, we can fill ms_valid_subplans immediately, preventing - * later calls to ExecFindMatchingSubPlans. - */ - if (!prunestate->do_exec_prune && nplans > 0) - mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1); - } - else - { - nplans = list_length(node->mergeplans); - - /* - * When run-time partition pruning is not enabled we can just mark all - * subplans as valid; they must also all be initialized. - */ - Assert(nplans > 0); - mergestate->ms_valid_subplans = validsubplans = - bms_add_range(NULL, 0, nplans - 1); - mergestate->ms_prune_state = NULL; - } - - mergeplanstates = palloc_array(PlanState *, nplans); - mergestate->mergeplans = mergeplanstates; - mergestate->ms_nplans = nplans; - - mergestate->ms_slots = palloc0_array(TupleTableSlot *, nplans); - mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, + mergestate->ms.ps.plan = (Plan *) node; + mergestate->ms.ps.state = estate; + mergestate->ms.ps.ExecProcNode = ExecMergeAppend; + + /* Initialize common fields */ + ExecInitAppender(&mergestate->ms, + &node->ap, + estate, + eflags, + -1, + NULL); + + mergestate->ms_slots = palloc0_array(TupleTableSlot *, mergestate->ms.nplans); + mergestate->ms_heap = binaryheap_allocate(mergestate->ms.nplans, heap_compare_slots, mergestate); - /* - * call ExecInitNode on each of the valid plans to be executed and save - * the results into the mergeplanstates array. - */ - j = 0; - i = -1; - while ((i = bms_next_member(validsubplans, i)) >= 0) - { - Plan *initNode = (Plan *) list_nth(node->mergeplans, i); - - mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags); - } - - /* - * Initialize MergeAppend's result tuple type and slot. If the child - * plans all produce the same fixed slot type, we can use that slot type; - * otherwise make a virtual slot. (Note that the result slot itself is - * used only to return a null tuple at end of execution; real tuples are - * returned to the caller in the children's own result slots. What we are - * doing here is allowing the parent plan node to optimize if the - * MergeAppend will return only one kind of slot.) - */ - mergeops = ExecGetCommonSlotOps(mergeplanstates, j); - if (mergeops != NULL) - { - ExecInitResultTupleSlotTL(&mergestate->ps, mergeops); - } - else - { - ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual); - /* show that the output slot type is not fixed */ - mergestate->ps.resultopsset = true; - mergestate->ps.resultopsfixed = false; - } - - /* - * Miscellaneous initialization - */ - mergestate->ps.ps_ProjInfo = NULL; - /* * initialize sort-key information */ @@ -224,26 +144,25 @@ ExecMergeAppend(PlanState *pstate) if (!node->ms_initialized) { /* Nothing to do if all subplans were pruned */ - if (node->ms_nplans == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + if (node->ms.nplans == 0) + return ExecClearTuple(node->ms.ps.ps_ResultTupleSlot); - /* - * If we've yet to determine the valid subplans then do so now. If - * run-time pruning is disabled then the valid subplans will always be - * set to all subplans. - */ - if (node->ms_valid_subplans == NULL) - node->ms_valid_subplans = - ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL); + /* If we've yet to determine the valid subplans then do so now. */ + if (!node->ms.valid_subplans_identified) + { + node->ms.valid_subplans = + ExecFindMatchingSubPlans(node->ms.prune_state, false, NULL); + node->ms.valid_subplans_identified = true; + } /* * First time through: pull the first tuple from each valid subplan, * and set up the heap. */ i = -1; - while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0) + while ((i = bms_next_member(node->ms.valid_subplans, i)) >= 0) { - node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + node->ms_slots[i] = ExecProcNode(node->ms.plans[i]); if (!TupIsNull(node->ms_slots[i])) binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); } @@ -261,7 +180,7 @@ ExecMergeAppend(PlanState *pstate) * to not pull tuples until necessary.) */ i = DatumGetInt32(binaryheap_first(node->ms_heap)); - node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + node->ms_slots[i] = ExecProcNode(node->ms.plans[i]); if (!TupIsNull(node->ms_slots[i])) binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); else @@ -271,7 +190,7 @@ ExecMergeAppend(PlanState *pstate) if (binaryheap_empty(node->ms_heap)) { /* All the subplans are exhausted, and so is the heap */ - result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + result = ExecClearTuple(node->ms.ps.ps_ResultTupleSlot); } else { @@ -335,59 +254,14 @@ heap_compare_slots(Datum a, Datum b, void *arg) void ExecEndMergeAppend(MergeAppendState *node) { - PlanState **mergeplans; - int nplans; - int i; - - /* - * get information from the node - */ - mergeplans = node->mergeplans; - nplans = node->ms_nplans; - - /* - * shut down each of the subscans - */ - for (i = 0; i < nplans; i++) - ExecEndNode(mergeplans[i]); + ExecEndAppender(&node->ms); } void ExecReScanMergeAppend(MergeAppendState *node) { - int i; - - /* - * If any PARAM_EXEC Params used in pruning expressions have changed, then - * we'd better unset the valid subplans so that they are reselected for - * the new parameter values. - */ - if (node->ms_prune_state && - bms_overlap(node->ps.chgParam, - node->ms_prune_state->execparamids)) - { - bms_free(node->ms_valid_subplans); - node->ms_valid_subplans = NULL; - } + ExecReScanAppender(&node->ms); - for (i = 0; i < node->ms_nplans; i++) - { - PlanState *subnode = node->mergeplans[i]; - - /* - * ExecReScan doesn't know about my subplans, so I have to do - * changed-parameter signaling myself. - */ - if (node->ps.chgParam != NULL) - UpdateChangedParamSet(subnode, node->ps.chgParam); - - /* - * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. - */ - if (subnode->chgParam == NULL) - ExecReScan(subnode); - } binaryheap_reset(node->ms_heap); node->ms_initialized = false; } diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index 6a850349cf7..5124e787b14 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -4825,14 +4825,14 @@ planstate_tree_walker_impl(PlanState *planstate, switch (nodeTag(plan)) { case T_Append: - if (planstate_walk_members(((AppendState *) planstate)->appendplans, - ((AppendState *) planstate)->as_nplans, + if (planstate_walk_members(((AppendState *) planstate)->as.plans, + ((AppendState *) planstate)->as.nplans, walker, context)) return true; break; case T_MergeAppend: - if (planstate_walk_members(((MergeAppendState *) planstate)->mergeplans, - ((MergeAppendState *) planstate)->ms_nplans, + if (planstate_walk_members(((MergeAppendState *) planstate)->ms.plans, + ((MergeAppendState *) planstate)->ms.nplans, walker, context)) return true; break; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 26d0dbc2e1d..b23643fedb8 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1261,12 +1261,12 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) * child plans, to make cross-checking the sort info easier. */ plan = makeNode(Append); - plan->plan.targetlist = tlist; - plan->plan.qual = NIL; - plan->plan.lefttree = NULL; - plan->plan.righttree = NULL; - plan->apprelids = rel->relids; - plan->child_append_relid_sets = best_path->child_append_relid_sets; + plan->ap.plan.targetlist = tlist; + plan->ap.plan.qual = NIL; + plan->ap.plan.lefttree = NULL; + plan->ap.plan.righttree = NULL; + plan->ap.apprelids = rel->relids; + plan->ap.child_append_relid_sets = best_path->child_append_relid_sets; if (pathkeys != NIL) { @@ -1285,7 +1285,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) &nodeSortOperators, &nodeCollations, &nodeNullsFirst); - tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist)); + tlist_was_changed = (orig_tlist_length != list_length(plan->ap.plan.targetlist)); } /* If appropriate, consider async append */ @@ -1395,7 +1395,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } /* Set below if we find quals that we can use to run-time prune */ - plan->part_prune_index = -1; + plan->ap.part_prune_index = -1; /* * If any quals exist, they may be useful to perform further partition @@ -1420,16 +1420,16 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } if (prunequal != NIL) - plan->part_prune_index = make_partition_pruneinfo(root, rel, - best_path->subpaths, - prunequal); + plan->ap.part_prune_index = make_partition_pruneinfo(root, rel, + best_path->subpaths, + prunequal); } - plan->appendplans = subplans; + plan->ap.subplans = subplans; plan->nasyncplans = nasyncplans; plan->first_partial_plan = best_path->first_partial_path; - copy_generic_path_info(&plan->plan, (Path *) best_path); + copy_generic_path_info(&plan->ap.plan, (Path *) best_path); /* * If prepare_sort_from_pathkeys added sort columns, but we were told to @@ -1438,9 +1438,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) */ if (tlist_was_changed && (flags & (CP_EXACT_TLIST | CP_SMALL_TLIST))) { - tlist = list_copy_head(plan->plan.targetlist, orig_tlist_length); + tlist = list_copy_head(plan->ap.plan.targetlist, orig_tlist_length); return inject_projection_plan((Plan *) plan, tlist, - plan->plan.parallel_safe); + plan->ap.plan.parallel_safe); } else return (Plan *) plan; @@ -1458,7 +1458,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, int flags) { MergeAppend *node = makeNode(MergeAppend); - Plan *plan = &node->plan; + Plan *plan = &node->ap.plan; List *tlist = build_path_tlist(root, &best_path->path); int orig_tlist_length = list_length(tlist); bool tlist_was_changed; @@ -1478,8 +1478,8 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, plan->qual = NIL; plan->lefttree = NULL; plan->righttree = NULL; - node->apprelids = rel->relids; - node->child_append_relid_sets = best_path->child_append_relid_sets; + node->ap.apprelids = rel->relids; + node->ap.child_append_relid_sets = best_path->child_append_relid_sets; /* * Compute sort column info, and adjust MergeAppend's tlist as needed. @@ -1585,7 +1585,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, } /* Set below if we find quals that we can use to run-time prune */ - node->part_prune_index = -1; + node->ap.part_prune_index = -1; /* * If any quals exist, they may be useful to perform further partition @@ -1602,12 +1602,12 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, Assert(best_path->path.param_info == NULL); if (prunequal != NIL) - node->part_prune_index = make_partition_pruneinfo(root, rel, - best_path->subpaths, - prunequal); + node->ap.part_prune_index = make_partition_pruneinfo(root, rel, + best_path->subpaths, + prunequal); } - node->mergeplans = subplans; + node->ap.subplans = subplans; /* * If prepare_sort_from_pathkeys added sort columns, but we were told to diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index ff0e875f2a2..10c49c98ea9 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1881,10 +1881,10 @@ set_append_references(PlannerInfo *root, * check quals. If it's got exactly one child plan, then it's not doing * anything useful at all, and we can strip it out. */ - Assert(aplan->plan.qual == NIL); + Assert(aplan->ap.plan.qual == NIL); /* First, we gotta recurse on the children */ - foreach(l, aplan->appendplans) + foreach(l, aplan->ap.subplans) { lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset); } @@ -1897,11 +1897,11 @@ set_append_references(PlannerInfo *root, * plan may execute the non-parallel aware child multiple times. (If you * change these rules, update create_append_path to match.) */ - if (list_length(aplan->appendplans) == 1) + if (list_length(aplan->ap.subplans) == 1) { - Plan *p = (Plan *) linitial(aplan->appendplans); + Plan *p = (Plan *) linitial(aplan->ap.subplans); - if (p->parallel_aware == aplan->plan.parallel_aware) + if (p->parallel_aware == aplan->ap.plan.parallel_aware) { Plan *result; @@ -1909,7 +1909,7 @@ set_append_references(PlannerInfo *root, /* Remember that we removed an Append */ record_elided_node(root->glob, p->plan_node_id, T_Append, - offset_relid_set(aplan->apprelids, rtoffset)); + offset_relid_set(aplan->ap.apprelids, rtoffset)); return result; } @@ -1922,19 +1922,19 @@ set_append_references(PlannerInfo *root, */ set_dummy_tlist_references((Plan *) aplan, rtoffset); - aplan->apprelids = offset_relid_set(aplan->apprelids, rtoffset); + aplan->ap.apprelids = offset_relid_set(aplan->ap.apprelids, rtoffset); /* * Add PartitionPruneInfo, if any, to PlannerGlobal and update the index. * Also update the RT indexes present in it to add the offset. */ - if (aplan->part_prune_index >= 0) - aplan->part_prune_index = - register_partpruneinfo(root, aplan->part_prune_index, rtoffset); + if (aplan->ap.part_prune_index >= 0) + aplan->ap.part_prune_index = + register_partpruneinfo(root, aplan->ap.part_prune_index, rtoffset); /* We don't need to recurse to lefttree or righttree ... */ - Assert(aplan->plan.lefttree == NULL); - Assert(aplan->plan.righttree == NULL); + Assert(aplan->ap.plan.lefttree == NULL); + Assert(aplan->ap.plan.righttree == NULL); return (Plan *) aplan; } @@ -1958,10 +1958,10 @@ set_mergeappend_references(PlannerInfo *root, * or check quals. If it's got exactly one child plan, then it's not * doing anything useful at all, and we can strip it out. */ - Assert(mplan->plan.qual == NIL); + Assert(mplan->ap.plan.qual == NIL); /* First, we gotta recurse on the children */ - foreach(l, mplan->mergeplans) + foreach(l, mplan->ap.subplans) { lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset); } @@ -1975,11 +1975,11 @@ set_mergeappend_references(PlannerInfo *root, * multiple times. (If you change these rules, update * create_merge_append_path to match.) */ - if (list_length(mplan->mergeplans) == 1) + if (list_length(mplan->ap.subplans) == 1) { - Plan *p = (Plan *) linitial(mplan->mergeplans); + Plan *p = (Plan *) linitial(mplan->ap.subplans); - if (p->parallel_aware == mplan->plan.parallel_aware) + if (p->parallel_aware == mplan->ap.plan.parallel_aware) { Plan *result; @@ -1987,7 +1987,7 @@ set_mergeappend_references(PlannerInfo *root, /* Remember that we removed a MergeAppend */ record_elided_node(root->glob, p->plan_node_id, T_MergeAppend, - offset_relid_set(mplan->apprelids, rtoffset)); + offset_relid_set(mplan->ap.apprelids, rtoffset)); return result; } @@ -2000,19 +2000,19 @@ set_mergeappend_references(PlannerInfo *root, */ set_dummy_tlist_references((Plan *) mplan, rtoffset); - mplan->apprelids = offset_relid_set(mplan->apprelids, rtoffset); + mplan->ap.apprelids = offset_relid_set(mplan->ap.apprelids, rtoffset); /* * Add PartitionPruneInfo, if any, to PlannerGlobal and update the index. * Also update the RT indexes present in it to add the offset. */ - if (mplan->part_prune_index >= 0) - mplan->part_prune_index = - register_partpruneinfo(root, mplan->part_prune_index, rtoffset); + if (mplan->ap.part_prune_index >= 0) + mplan->ap.part_prune_index = + register_partpruneinfo(root, mplan->ap.part_prune_index, rtoffset); /* We don't need to recurse to lefttree or righttree ... */ - Assert(mplan->plan.lefttree == NULL); - Assert(mplan->plan.righttree == NULL); + Assert(mplan->ap.plan.lefttree == NULL); + Assert(mplan->ap.plan.righttree == NULL); return (Plan *) mplan; } diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index ccec1eaa7fe..2da13102a75 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2904,7 +2904,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, case T_Append: { - foreach(l, ((Append *) plan)->appendplans) + foreach(l, ((Append *) plan)->ap.subplans) { context.paramids = bms_add_members(context.paramids, @@ -2919,7 +2919,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, case T_MergeAppend: { - foreach(l, ((MergeAppend *) plan)->mergeplans) + foreach(l, ((MergeAppend *) plan)->ap.subplans) { context.paramids = bms_add_members(context.paramids, diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 7bc12589e40..36a7d736fda 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -5520,9 +5520,9 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) * natural choice. */ if (IsA(plan, Append)) - dpns->outer_plan = linitial(((Append *) plan)->appendplans); + dpns->outer_plan = linitial(((Append *) plan)->ap.subplans); else if (IsA(plan, MergeAppend)) - dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans); + dpns->outer_plan = linitial(((MergeAppend *) plan)->ap.subplans); else dpns->outer_plan = outerPlan(plan); @@ -8498,10 +8498,10 @@ resolve_special_varno(Node *node, deparse_context *context, if (IsA(dpns->plan, Append)) context->appendparents = bms_union(context->appendparents, - ((Append *) dpns->plan)->apprelids); + ((Append *) dpns->plan)->ap.apprelids); else if (IsA(dpns->plan, MergeAppend)) context->appendparents = bms_union(context->appendparents, - ((MergeAppend *) dpns->plan)->apprelids); + ((MergeAppend *) dpns->plan)->ap.apprelids); push_child_plan(dpns, dpns->outer_plan, &save_dpns); resolve_special_varno((Node *) tle->expr, context, diff --git a/src/include/executor/execAppend.h b/src/include/executor/execAppend.h new file mode 100644 index 00000000000..b6751c9b233 --- /dev/null +++ b/src/include/executor/execAppend.h @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * execAppend.h + * Support functions for MergeAppend and Append nodes. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execAppend.h + *------------------------------------------------------------------------- + */ + +#ifndef EXECAPPEND_H +#define EXECAPPEND_H + +#include "nodes/execnodes.h" + +void ExecInitAppender(AppenderState *state, + Appender *node, + EState *estate, + int eflags, + int first_partial_plan, + int *first_valid_partial_plan); + +void ExecEndAppender(AppenderState *node); + +void ExecReScanAppender(AppenderState *node); + +void ExecAppenderAsyncBegin(AppenderState *node); + +void ExecAppenderAsyncEventWait(AppenderState *node, + int timeout, + uint32 wait_event_info); + +/* Common part of classify_matching_subplans() for Append and MergeAppend */ +static inline bool +classify_matching_subplans_common(Bitmapset **valid_subplans, + Bitmapset *asyncplans, + Bitmapset **valid_asyncplans) +{ + Assert(*valid_asyncplans == NULL); + + /* Checked by classify_matching_subplans() */ + Assert(!bms_is_empty(*valid_subplans)); + + /* Nothing to do if there are no valid async subplans. */ + if (!bms_overlap(*valid_subplans, asyncplans)) + return false; + + /* Get valid async subplans. */ + *valid_asyncplans = bms_intersect(asyncplans, + *valid_subplans); + + /* Adjust the valid subplans to contain sync subplans only. */ + *valid_subplans = bms_del_members(*valid_subplans, + *valid_asyncplans); + return true; +} + +#endif /* EXECAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 684e398f824..0b93c004727 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1484,6 +1484,36 @@ typedef struct ModifyTableState List *mt_mergeJoinConditions; } ModifyTableState; +/* ---------------- + * AppenderState information + * + * Common base for AppendState and MergeAppendState. + * Contains fields shared by both node types: the array of subplan + * states, asynchronous execution infrastructure, and partition + * pruning state. + * ---------------- + */ +typedef struct AppenderState +{ + PlanState ps; /* its first field is NodeTag */ + PlanState **plans; /* array of PlanStates for my inputs */ + int nplans; + + /* Asynchronous execution state */ + Bitmapset *asyncplans; /* asynchronous plans indexes */ + int nasyncplans; /* # of asynchronous plans */ + AsyncRequest **asyncrequests; /* array of AsyncRequests */ + TupleTableSlot **asyncresults; /* unreturned results of async plans */ + Bitmapset *needrequest; /* asynchronous plans needing a new request */ + struct WaitEventSet *eventset; /* WaitEventSet for file descriptor waits */ + + /* Partition pruning state */ + struct PartitionPruneState *prune_state; + bool valid_subplans_identified; + Bitmapset *valid_subplans; + Bitmapset *valid_asyncplans; /* valid asynchronous plans indexes */ +} AppenderState; + /* ---------------- * AppendState information * @@ -1505,31 +1535,20 @@ struct PartitionPruneState; struct AppendState { - PlanState ps; /* its first field is NodeTag */ - PlanState **appendplans; /* array of PlanStates for my inputs */ - int as_nplans; + AppenderState as; + int as_whichplan; bool as_begun; /* false means need to initialize */ - Bitmapset *as_asyncplans; /* asynchronous plans indexes */ - int as_nasyncplans; /* # of asynchronous plans */ - AsyncRequest **as_asyncrequests; /* array of AsyncRequests */ - TupleTableSlot **as_asyncresults; /* unreturned results of async plans */ - int as_nasyncresults; /* # of valid entries in as_asyncresults */ - bool as_syncdone; /* true if all synchronous plans done in - * asynchronous mode, else false */ + int as_nasyncresults; /* # of valid entries in asyncresults */ + bool as_syncdone; /* all sync plans done in async mode? */ int as_nasyncremain; /* # of remaining asynchronous plans */ - Bitmapset *as_needrequest; /* asynchronous plans needing a new request */ - struct WaitEventSet *as_eventset; /* WaitEventSet used to configure file - * descriptor wait events */ - int as_first_partial_plan; /* Index of 'appendplans' containing - * the first partial plan */ - ParallelAppendState *as_pstate; /* parallel coordination info */ - Size pstate_len; /* size of parallel coordination info */ - struct PartitionPruneState *as_prune_state; - bool as_valid_subplans_identified; /* is as_valid_subplans valid? */ - Bitmapset *as_valid_subplans; - Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */ - bool (*choose_next_subplan) (AppendState *); + int as_first_partial_plan; + + /* Parallel append specific */ + ParallelAppendState *as_pstate; + Size pstate_len; + + bool (*choose_next_subplan) (struct AppendState *); }; /* ---------------- @@ -1549,16 +1568,13 @@ struct AppendState */ typedef struct MergeAppendState { - PlanState ps; /* its first field is NodeTag */ - PlanState **mergeplans; /* array of PlanStates for my inputs */ - int ms_nplans; + AppenderState ms; + int ms_nkeys; SortSupport ms_sortkeys; /* array of length ms_nkeys */ TupleTableSlot **ms_slots; /* array of length ms_nplans */ struct binaryheap *ms_heap; /* binary heap of slot indices */ bool ms_initialized; /* are subplans started? */ - struct PartitionPruneState *ms_prune_state; - Bitmapset *ms_valid_subplans; } MergeAppendState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index b6185825fcb..cdfd29e8ae0 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -386,6 +386,29 @@ typedef struct ModifyTable struct PartitionPruneInfo; /* forward reference to struct below */ +/* ---------------- + * Appender node - + * Common base for Append and MergeAppend plan nodes. + * Contains fields shared by both node types: the list of subplans, + * appendrel identifiers, and run-time partition pruning info. + * ---------------- + */ +typedef struct Appender +{ + Plan plan; /* its first field is NodeTag */ + Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */ + List *child_append_relid_sets; /* sets of RTIs of appendrels + * consolidated into this node */ + List *subplans; /* List of Plans (formerly + * appendplans/mergeplans) */ + + /* + * Index into PlannedStmt.partPruneInfos and parallel lists in EState. Set + * to -1 if no run-time pruning is used. + */ + int part_prune_index; +} Appender; + /* ---------------- * Append node - * Generate the concatenation of the results of sub-plans. @@ -393,32 +416,16 @@ struct PartitionPruneInfo; /* forward reference to struct below */ */ typedef struct Append { - Plan plan; - - /* RTIs of appendrel(s) formed by this node */ - Bitmapset *apprelids; - - /* sets of RTIs of appendrels consolidated into this node */ - List *child_append_relid_sets; - - /* plans to run */ - List *appendplans; + Appender ap; /* # of asynchronous plans */ int nasyncplans; /* - * All 'appendplans' preceding this index are non-partial plans. All - * 'appendplans' from this index onwards are partial plans. + * All 'subplans' preceding this index are non-partial plans. All + * 'subplans' from this index onwards are partial plans. */ int first_partial_plan; - - /* - * Index into PlannedStmt.partPruneInfos and parallel lists in EState: - * es_part_prune_states and es_part_prune_results. Set to -1 if no - * run-time pruning is used. - */ - int part_prune_index; } Append; /* ---------------- @@ -428,16 +435,7 @@ typedef struct Append */ typedef struct MergeAppend { - Plan plan; - - /* RTIs of appendrel(s) formed by this node */ - Bitmapset *apprelids; - - /* sets of RTIs of appendrels consolidated into this node */ - List *child_append_relid_sets; - - /* plans to run */ - List *mergeplans; + Appender ap; /* these fields are just like the sort-key info in struct Sort: */ @@ -455,13 +453,6 @@ typedef struct MergeAppend /* NULLS FIRST/LAST directions */ bool *nullsFirst pg_node_attr(array_size(numCols)); - - /* - * Index into PlannedStmt.partPruneInfos and parallel lists in EState: - * es_part_prune_states and es_part_prune_results. Set to -1 if no - * run-time pruning is used. - */ - int part_prune_index; } MergeAppend; /* ---------------- diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e3c1007abdf..c4899b03d9a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -126,6 +126,8 @@ AnlExprData AnlIndexData AnyArrayType Append +Appender +AppenderState AppendPath AppendPathInput AppendRelInfo -- 2.39.5 (Apple Git-154)