diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 441445927e..c46e32ba37 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -160,7 +160,6 @@ typedef enum TBlockState TBLOCK_BEGIN, /* starting transaction block */ TBLOCK_INPROGRESS, /* live transaction */ TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN */ - TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */ TBLOCK_END, /* COMMIT received */ TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ @@ -994,7 +993,6 @@ ExitParallelMode(void) TransactionState s = CurrentTransactionState; Assert(s->parallelModeLevel > 0); - Assert(s->parallelModeLevel > 1 || !ParallelContextActive()); --s->parallelModeLevel; } @@ -2076,7 +2074,7 @@ CommitTransaction(void) TransactionId latestXid; bool is_parallel_worker; - is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); + is_parallel_worker = (ParallelCurrentXids != NULL); /* Enforce parallel mode restrictions during parallel worker commit. */ if (is_parallel_worker) @@ -2658,7 +2656,7 @@ AbortTransaction(void) /* * check the current transaction state */ - is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); + is_parallel_worker = (ParallelCurrentXids != NULL); if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); @@ -2877,7 +2875,6 @@ StartTransactionCommand(void) /* These cases are invalid. */ case TBLOCK_STARTED: case TBLOCK_BEGIN: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -2948,11 +2945,9 @@ CommitTransactionCommand(void) /* * These shouldn't happen. TBLOCK_DEFAULT means the previous * StartTransactionCommand didn't set the STARTED state - * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended - * by EndParallelWorkerTransaction(), not this function. + * appropriately. */ case TBLOCK_DEFAULT: - case TBLOCK_PARALLEL_INPROGRESS: elog(FATAL, "CommitTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -3265,7 +3260,6 @@ AbortCurrentTransaction(void) * ABORT state. We will stay in ABORT until we get a ROLLBACK. */ case TBLOCK_INPROGRESS: - case TBLOCK_PARALLEL_INPROGRESS: AbortTransaction(); s->blockState = TBLOCK_ABORT; /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ @@ -3660,7 +3654,6 @@ BeginTransactionBlock(void) * Already a transaction block in progress. */ case TBLOCK_INPROGRESS: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_ABORT: case TBLOCK_SUBABORT: @@ -3868,15 +3861,6 @@ EndTransactionBlock(bool chain) result = true; break; - /* - * The user issued a COMMIT that somehow ran inside a parallel - * worker. We can't cope with that. - */ - case TBLOCK_PARALLEL_INPROGRESS: - ereport(FATAL, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot commit during a parallel operation"))); - break; /* These cases are invalid. */ case TBLOCK_DEFAULT: @@ -3991,16 +3975,6 @@ UserAbortTransactionBlock(bool chain) s->blockState = TBLOCK_ABORT_PENDING; break; - /* - * The user issued an ABORT that somehow ran inside a parallel - * worker. We can't cope with that. - */ - case TBLOCK_PARALLEL_INPROGRESS: - ereport(FATAL, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot abort during a parallel operation"))); - break; - /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -4141,7 +4115,6 @@ DefineSavepoint(const char *name) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -4217,7 +4190,6 @@ ReleaseSavepoint(const char *name) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -4328,7 +4300,6 @@ RollbackToSavepoint(const char *name) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -4408,21 +4379,6 @@ BeginInternalSubTransaction(const char *name) { TransactionState s = CurrentTransactionState; - /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for new subtransactions after that - * point. We might be able to make an exception for the type of - * subtransaction established by this function, which is typically used in - * contexts where we're going to release or roll back the subtransaction - * before proceeding further, so that no enduring change to the - * transaction state occurs. For now, however, we prohibit this case along - * with all the others. - */ - if (IsInParallelMode()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot start subtransactions during a parallel operation"))); - switch (s->blockState) { case TBLOCK_STARTED: @@ -4446,7 +4402,6 @@ BeginInternalSubTransaction(const char *name) /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: @@ -4479,18 +4434,6 @@ ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; - /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for commit of subtransactions after that - * point. This should not happen anyway. Code calling this would - * typically have called BeginInternalSubTransaction() first, failing - * there. - */ - if (IsInParallelMode()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot commit subtransactions during a parallel operation"))); - if (s->blockState != TBLOCK_SUBINPROGRESS) elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); @@ -4533,7 +4476,6 @@ RollbackAndReleaseCurrentSubTransaction(void) case TBLOCK_STARTED: case TBLOCK_BEGIN: case TBLOCK_IMPLICIT_INPROGRESS: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: case TBLOCK_END: @@ -4614,7 +4556,6 @@ AbortOutOfAnyTransaction(void) case TBLOCK_BEGIN: case TBLOCK_INPROGRESS: case TBLOCK_IMPLICIT_INPROGRESS: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: case TBLOCK_PREPARE: @@ -4725,7 +4666,6 @@ TransactionBlockStatusCode(void) case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: case TBLOCK_IMPLICIT_INPROGRESS: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -5147,7 +5087,7 @@ PushTransaction(void) s->blockState = TBLOCK_SUBBEGIN; GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; - s->parallelModeLevel = 0; + s->parallelModeLevel = p->parallelModeLevel; /* Inherit from parent */ s->assigned = false; CurrentTransactionState = s; @@ -5317,7 +5257,7 @@ StartParallelWorkerTransaction(char *tstatespace) nParallelCurrentXids = tstate->nParallelCurrentXids; ParallelCurrentXids = &tstate->parallelCurrentXids[0]; - CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS; + CurrentTransactionState->blockState = TBLOCK_INPROGRESS; } /* @@ -5327,7 +5267,7 @@ StartParallelWorkerTransaction(char *tstatespace) void EndParallelWorkerTransaction(void) { - Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS); + Assert(CurrentTransactionState->blockState == TBLOCK_INPROGRESS); CommitTransaction(); CurrentTransactionState->blockState = TBLOCK_DEFAULT; } @@ -5401,8 +5341,6 @@ BlockStateAsString(TBlockState blockState) return "INPROGRESS"; case TBLOCK_IMPLICIT_INPROGRESS: return "IMPLICIT_INPROGRESS"; - case TBLOCK_PARALLEL_INPROGRESS: - return "PARALLEL_INPROGRESS"; case TBLOCK_END: return "END"; case TBLOCK_ABORT: diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b3ce4bae53..9dd93c4a5c 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1534,6 +1534,12 @@ ExecutePlan(EState *estate, use_parallel_mode = false; estate->es_use_parallel_mode = use_parallel_mode; + +#if 0 + elog(NOTICE, "ExecutePlan calling with use_parallel_mode %d. execute_once: %d, nestinglevel: %d, IsParallelWorker: %d, IsParallelMode: %d", + use_parallel_mode, execute_once, GetCurrentTransactionNestLevel(), IsParallelWorker(), IsInParallelMode()); +#endif + if (use_parallel_mode) EnterParallelMode(); diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 4ea1aa7dfd..625338a8b8 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -1194,4 +1194,88 @@ SELECT 1 FROM tenk1_vw_sec Filter: (f1 < tenk1_vw_sec.unique1) (9 rows) +-- test sub-transactions inside a parallel worker +CREATE TABLE subtrans_tab(id int); +INSERT INTO subtrans_tab(id) SELECT generate_series(1, 1000000, 1); +CREATE FUNCTION subtrans_func_2(inp int) RETURNS int AS +$$ +DECLARE + ret int = -1; + err bool = false; +BEGIN + SELECT id INTO ret FROM subtrans_tab WHERE id = inp; + delete from tab ; -- Should fail since we are inside worker + RAISE NOTICE 'delete successful'; -- Should not reach here if inside worker + RETURN ret; +EXCEPTION + WHEN division_by_zero THEN err = true; + WHEN others THEN + err = true; + RETURN ret; +END +$$ language plpgsql parallel safe; +CREATE FUNCTION subtrans_func(inp int) RETURNS int AS +$$ + DECLARE + tempint int; + ret int = -1; +BEGIN + BEGIN + BEGIN + -- Force division_by_zero for some inputs + tempint = 1/(inp%5); + -- This tries to generate new xid in yet another sub-transaction + tempint = subtrans_func_2(inp); + IF tempint <> inp THEN + RAISE NOTICE 'returned % from subtrans_tab did not match inp %', ret, inp; + END IF; + -- Even after func2() has failed in it's own sub-transaction, we + -- should be able to access tables from worker + SELECT id INTO ret FROM subtrans_tab WHERE id = inp; + IF ret <> inp THEN + RAISE NOTICE 'returned % from subtrans_tab did not match inp %', ret, inp; + END IF; + + EXCEPTION WHEN division_by_zero THEN + ret = -1; + END; + + -- Even after a divide-by-zero EXCEPTION above, we should be able to + -- access tables from worker + SELECT id INTO ret FROM subtrans_tab WHERE id = inp; + IF ret <> inp THEN + RAISE NOTICE 'returned % from subtrans_tab did not match inp %', ret, inp; + END IF; + + EXCEPTION WHEN division_by_zero THEN -- Just for sake of more nest levels + RAISE NOTICE 'division_by_zero EXCEPTION caught'; + END; + + RETURN ret; -- If all goes as expected above, ret should be equal to inp +END $$ language plpgsql parallel safe; +SELECT subtrans_func(id) FROM subtrans_tab WHERE id % 50000 = 0 order by 1; + subtrans_func +--------------- + 50000 + 100000 + 150000 + 200000 + 250000 + 300000 + 350000 + 400000 + 450000 + 500000 + 550000 + 600000 + 650000 + 700000 + 750000 + 800000 + 850000 + 900000 + 950000 + 1000000 +(20 rows) + rollback; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index f924731248..aef02a860d 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -455,4 +455,67 @@ EXPLAIN (COSTS OFF) SELECT 1 FROM tenk1_vw_sec WHERE (SELECT sum(f1) FROM int4_tbl WHERE f1 < unique1) < 100; +-- test sub-transactions inside a parallel worker +CREATE TABLE subtrans_tab(id int); +INSERT INTO subtrans_tab(id) SELECT generate_series(1, 1000000, 1); +CREATE FUNCTION subtrans_func_2(inp int) RETURNS int AS +$$ +DECLARE + ret int = -1; + err bool = false; +BEGIN + SELECT id INTO ret FROM subtrans_tab WHERE id = inp; + delete from tab ; -- Should fail since we are inside worker + RAISE NOTICE 'delete successful'; -- Should not reach here if inside worker + RETURN ret; +EXCEPTION + WHEN division_by_zero THEN err = true; + WHEN others THEN + err = true; + RETURN ret; +END +$$ language plpgsql parallel safe; + +CREATE FUNCTION subtrans_func(inp int) RETURNS int AS +$$ + DECLARE + tempint int; + ret int = -1; +BEGIN + BEGIN + BEGIN + -- Force division_by_zero for some inputs + tempint = 1/(inp%5); + -- This tries to generate new xid in yet another sub-transaction + tempint = subtrans_func_2(inp); + IF tempint <> inp THEN + RAISE NOTICE 'returned % from subtrans_tab did not match inp %', ret, inp; + END IF; + -- Even after func2() has failed in it's own sub-transaction, we + -- should be able to access tables from worker + SELECT id INTO ret FROM subtrans_tab WHERE id = inp; + IF ret <> inp THEN + RAISE NOTICE 'returned % from subtrans_tab did not match inp %', ret, inp; + END IF; + + EXCEPTION WHEN division_by_zero THEN + ret = -1; + END; + + -- Even after a divide-by-zero EXCEPTION above, we should be able to + -- access tables from worker + SELECT id INTO ret FROM subtrans_tab WHERE id = inp; + IF ret <> inp THEN + RAISE NOTICE 'returned % from subtrans_tab did not match inp %', ret, inp; + END IF; + + EXCEPTION WHEN division_by_zero THEN -- Just for sake of more nest levels + RAISE NOTICE 'division_by_zero EXCEPTION caught'; + END; + + RETURN ret; -- If all goes as expected above, ret should be equal to inp +END $$ language plpgsql parallel safe; + +SELECT subtrans_func(id) FROM subtrans_tab WHERE id % 50000 = 0 order by 1; + rollback;