From 40df802e38e18c96a4672062f1305405c1641a4a Mon Sep 17 00:00:00 2001 From: Juan Jose Santamaria Flecha Date: Mon, 14 Aug 2023 11:10:06 -0400 Subject: [PATCH] =?UTF-8?q?Allow=20parallel=20plan=20for=20referential=20i?= =?UTF-8?q?ntegrity=20checks=20Based=20on=20previous=20work=20from=20Fr?= =?UTF-8?q?=C3=A9d=C3=A9ric=20Yhuel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/utils/adt/ri_triggers.c | 19 ++++++++++++++++--- src/test/regress/expected/alter_table.out | 10 ++++++++++ src/test/regress/sql/alter_table.sql | 11 +++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 6945d99..b7f92ed 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -1383,8 +1383,10 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) const char *pk_only; int save_nestlevel; char workmembuf[32]; + char maxmntworkers[4]; int spi_result; SPIPlanPtr qplan; + SPIPrepareOptions options; riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false); @@ -1531,6 +1533,8 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) } } appendStringInfoChar(&querybuf, ')'); + elog(DEBUG2, "The RI_Initial_Check() query string built is \"%s\"", + querybuf.data); /* * Temporarily increase work_mem so that the check query can be executed @@ -1540,7 +1544,8 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) * this seems to meet the criteria for being considered a "maintenance" * operation, and accordingly we use maintenance_work_mem. However, we * must also set hash_mem_multiplier to 1, since it is surely not okay to - * let that get applied to the maintenance_work_mem value. + * let that get applied to the maintenance_work_mem value. In the same + * fashion, cap parallel processes by max_parallel_maintenance_workers. * * We use the equivalent of a function SET option to allow the setting to * persist for exactly the duration of the check query. guc.c also takes @@ -1549,12 +1554,18 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) save_nestlevel = NewGUCNestLevel(); snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem); + /* max_parallel_maintenance_workers <= 1024, so maxmntworkers is char[4] */ + snprintf(maxmntworkers, sizeof(maxmntworkers), "%d", + max_parallel_maintenance_workers); (void) set_config_option("work_mem", workmembuf, PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); (void) set_config_option("hash_mem_multiplier", "1", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0, false); + (void) set_config_option("max_parallel_workers_per_gather", maxmntworkers, + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); if (SPI_connect() != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); @@ -1563,10 +1574,12 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) * Generate the plan. We don't need to cache it, and there are no * arguments to the plan. */ - qplan = SPI_prepare(querybuf.data, 0, NULL); + memset(&options, 0, sizeof(options)); + options.cursorOptions |= CURSOR_OPT_PARALLEL_OK; + qplan = SPI_prepare_extended(querybuf.data, &options); if (qplan == NULL) - elog(ERROR, "SPI_prepare returned %s for %s", + elog(ERROR, "SPI_prepare_extended returned %s for %s", SPI_result_code_string(SPI_result), querybuf.data); /* diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out index cd814ff..0d2a634 100644 --- a/src/test/regress/expected/alter_table.out +++ b/src/test/regress/expected/alter_table.out @@ -4658,3 +4658,13 @@ drop publication pub1; drop schema alter1 cascade; drop schema alter2 cascade; NOTICE: drop cascades to table alter2.t1 +-- ALTER TABLE operations that can be parallel +CREATE TABLE parallel_pk_table (a int) WITH (autovacuum_enabled = off); +CREATE TABLE parallel_fk_table (a int) WITH (autovacuum_enabled = off); +SET max_parallel_maintenance_workers TO 4; +SET parallel_setup_cost TO 0; +SET parallel_tuple_cost TO 0; +SET parallel_leader_participation TO 0; +SET min_parallel_table_scan_size TO 0; +ALTER TABLE parallel_pk_table ADD PRIMARY KEY (a); +ALTER TABLE parallel_fk_table ADD CONSTRAINT parallel_fk FOREIGN KEY (a) REFERENCES parallel_pk_table (a); diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql index ff8c498..6256bdb 100644 --- a/src/test/regress/sql/alter_table.sql +++ b/src/test/regress/sql/alter_table.sql @@ -3066,3 +3066,14 @@ alter table alter1.t1 set schema alter2; drop publication pub1; drop schema alter1 cascade; drop schema alter2 cascade; + +-- ALTER TABLE operations that can be parallel +CREATE TABLE parallel_pk_table (a int) WITH (autovacuum_enabled = off); +CREATE TABLE parallel_fk_table (a int) WITH (autovacuum_enabled = off); +SET max_parallel_maintenance_workers TO 4; +SET parallel_setup_cost TO 0; +SET parallel_tuple_cost TO 0; +SET parallel_leader_participation TO 0; +SET min_parallel_table_scan_size TO 0; +ALTER TABLE parallel_pk_table ADD PRIMARY KEY (a); +ALTER TABLE parallel_fk_table ADD CONSTRAINT parallel_fk FOREIGN KEY (a) REFERENCES parallel_pk_table (a); -- 2.11.0