From a0976847db01a5160e0915b5f2127fd0ada10f49 Mon Sep 17 00:00:00 2001
From: houzj <houzj.fnst@cn.fujitsu.com>
Date: Mon, 22 Mar 2021 09:49:00 +0800
Subject: [PATCH] extend safery check to support parallel insert into fk
 relation

Currently, we cannot support parallel insert into a fk relation in all cases.

For example:
When inserting into a table with a foreign key, if the referenced table can also be modified by
the INSERT command, we will need to do CommandCounterIncrement to let the modification
on the referenced table be visible for the RI check, which is not supported in a parallel worker.

So, extend the parallel-safety check to treat the following cases (could modify referenced table) as parallel restricted:
1) foreign key and primary key are on the same table(INSERT's target table).
  (referenced and referencing are the same table)
2) referenced and referencing table are both partition of INSERT's target table.

(Note: modifyingCTE and function with modifying statement could also modifiy the referenced table,
However, the current parallel safety checks already treat it as unsafe, so we do not need to
do anything about it.)

Except for the above cases, we treat other cases as parallel safe.
---
 src/backend/optimizer/util/clauses.c | 85 +++++++++++++++++++++++++++++++++++-
 src/include/catalog/pg_proc.dat      |  8 ++--
 2 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index c6be4f8..b8bd0b0 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -100,6 +100,7 @@ typedef struct
 	RangeTblEntry *target_rte;	/* query's target relation if any */
 	CmdType		command_type;	/* query's command type */
 	PlannerGlobal *planner_global;	/* global info for planner invocation */
+	List	   *pk_rels;		/* OIDs of relations referenced by target relation */
 } max_parallel_hazard_context;
 
 static bool contain_agg_clause_walker(Node *node, void *context);
@@ -582,6 +583,7 @@ max_parallel_hazard(Query *parse, PlannerGlobal *glob)
 		rt_fetch(parse->resultRelation, parse->rtable) : NULL;
 	context.command_type = parse->commandType;
 	context.planner_global = glob;
+	context.pk_rels = NIL;
 	(void) max_parallel_hazard_walker((Node *) parse, &context);
 
 	return context.max_hazard;
@@ -618,6 +620,7 @@ is_parallel_safe(PlannerInfo *root, Node *node)
 	context.target_rte = NULL;
 	context.command_type = CMD_UNKNOWN;
 	context.planner_global = NULL;
+	context.pk_rels = NIL;
 
 	/*
 	 * The params that refer to the same or parent query level are considered
@@ -873,7 +876,7 @@ static bool
 target_rel_max_parallel_hazard(max_parallel_hazard_context *context)
 {
 	bool		max_hazard_found;
-
+	ListCell   *lc;
 	Relation	targetRel;
 
 	/*
@@ -884,6 +887,36 @@ target_rel_max_parallel_hazard(max_parallel_hazard_context *context)
 	max_hazard_found = target_rel_max_parallel_hazard_recurse(targetRel,
 															  context->command_type,
 															  context);
+
+	/*
+	 * Check if target relation is one of PK relations.
+	 */
+	if (!max_hazard_found && context->max_hazard == PROPARALLEL_SAFE)
+	{
+		if (list_member_oid(context->pk_rels, context->target_rte->relid))
+		{
+			max_hazard_found = max_parallel_hazard_test(PROPARALLEL_RESTRICTED,
+														context);
+		}
+	}
+
+	/*
+	 * check if any pk relation is partition of the target table.
+	 */
+	if (context->max_hazard == PROPARALLEL_SAFE)
+	{
+		foreach(lc, context->pk_rels)
+		{
+			if (list_member_oid(context->planner_global->partitionOids,
+							   lfirst_oid(lc)))
+			{
+				max_hazard_found = max_parallel_hazard_test(PROPARALLEL_RESTRICTED,
+															context);
+				break;
+			}
+		}
+	}
+
 	table_close(targetRel, NoLock);
 
 	return max_hazard_found;
@@ -971,10 +1004,60 @@ target_rel_trigger_max_parallel_hazard(Relation rel,
 	 */
 	for (i = 0; i < rel->trigdesc->numtriggers; i++)
 	{
+		int			trigtype;
 		Oid			tgfoid = rel->trigdesc->triggers[i].tgfoid;
 
 		if (max_parallel_hazard_test(func_parallel(tgfoid), context))
 			return true;
+
+		/*
+		 * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in
+		 * the relation, this should result in creation of new CommandIds if
+		 * PK relation is modified.
+		 *
+		 * However, creation of new CommandIds is not supported in a
+		 * parallel worker(but is safe in the parallel leader). So, treat all
+		 * scenarios that may modify PK relation as parallel-restricted.
+		 */
+		trigtype = RI_FKey_trigger_type(tgfoid);
+		if (trigtype == RI_TRIGGER_FK)
+		{
+			HeapTuple	tup;
+			Form_pg_constraint conForm;
+			Oid constraintOid;
+
+			constraintOid = rel->trigdesc->triggers[i].tgconstraint;
+
+			/*
+			 * Fetch the pg_constraint row so we can fill in the entry.
+			 */
+			tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constraintOid));
+			if (!HeapTupleIsValid(tup)) /* should not happen */
+				elog(ERROR, "cache lookup failed for constraint %u",
+					constraintOid);
+
+			conForm = (Form_pg_constraint) GETSTRUCT(tup);
+			if (conForm->contype != CONSTRAINT_FOREIGN) /* should not happen */
+				elog(ERROR, "constraint %u is not a foreign key constraint",
+					constraintOid);
+
+			/*
+			 * If FK relation and PK relation are not the same,
+			 * they could be partitions of the same table.
+			 * Remember Oids of PK relation for the later check.
+			 */
+			if (conForm->confrelid != conForm->conrelid)
+				context->pk_rels = lappend_oid(context->pk_rels, conForm->confrelid);
+
+			/*
+			 * If FK relation and PK relation are the same, the PK relation
+			 * could be modfied.
+			 */
+			else if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
+				return true;
+
+			ReleaseSysCache(tup);
+		}
 	}
 
 	return false;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e259531..64db87e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3745,11 +3745,11 @@
 
 # Generic referential integrity constraint triggers
 { oid => '1644', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
-  proname => 'RI_FKey_check_ins', provolatile => 'v', proparallel => 'r',
-  prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_ins' },
+  proname => 'RI_FKey_check_ins', provolatile => 'v', prorettype => 'trigger',
+  proargtypes => '', prosrc => 'RI_FKey_check_ins' },
 { oid => '1645', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
-  proname => 'RI_FKey_check_upd', provolatile => 'v', proparallel => 'r',
-  prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_upd' },
+  proname => 'RI_FKey_check_upd', provolatile => 'v', prorettype => 'trigger',
+  proargtypes => '', prosrc => 'RI_FKey_check_upd' },
 { oid => '1646', descr => 'referential integrity ON DELETE CASCADE',
   proname => 'RI_FKey_cascade_del', provolatile => 'v', prorettype => 'trigger',
   proargtypes => '', prosrc => 'RI_FKey_cascade_del' },
-- 
2.7.2.windows.1

