From 9e043f64e34eb0ebb104d19ff532365551b89ab6 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 29 Mar 2023 21:04:32 +0800 Subject: [PATCH 3/5] Deparser for INDEX DDL commands --- doc/src/sgml/logical-replication.sgml | 12 +- src/backend/commands/ddl_deparse.c | 375 ++++++++++++++++++++++++++ 2 files changed, 384 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 9b5663e1f5..d916610b46 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1493,6 +1493,12 @@ test_sub=# SELECT * FROM t1 ORDER BY id; + + + index: this option enables replication of Index DDL commands, + which include CREATE/ALTER/DROP INDEX. + + @@ -1593,7 +1599,7 @@ ALTER PUBLICATION mypub SET (ddl = 'table'); ALTER INDEX - - + X @@ -1873,7 +1879,7 @@ ALTER PUBLICATION mypub SET (ddl = 'table'); CREATE INDEX - - + X @@ -2128,7 +2134,7 @@ ALTER PUBLICATION mypub SET (ddl = 'table'); DROP INDEX - - + X diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c index 9098220a9c..57befc7855 100644 --- a/src/backend/commands/ddl_deparse.c +++ b/src/backend/commands/ddl_deparse.c @@ -1014,6 +1014,244 @@ obtainConstraints(List *elements, Oid relationId, Oid domainId) return elements; } +/* + * Return an index definition, split into several pieces. + * + * A large amount of code is duplicated from pg_get_indexdef_worker, but + * control flow is different enough that it doesn't seem worth keeping them + * together. + */ +static void +pg_get_indexdef_detailed(Oid indexrelid, + char **index_am, + char **definition, + char **reloptions, + char **tablespace, + char **whereClause) +{ + HeapTuple ht_idx; + HeapTuple ht_idxrel; + HeapTuple ht_am; + Form_pg_index idxrec; + Form_pg_class idxrelrec; + Form_pg_am amrec; + IndexAmRoutine *amroutine; + List *indexprs; + ListCell *indexpr_item; + List *context; + Oid indrelid; + int keyno; + Datum indcollDatum; + Datum indclassDatum; + Datum indoptionDatum; + bool isnull; + oidvector *indcollation; + oidvector *indclass; + int2vector *indoption; + StringInfoData definitionBuf; + + *tablespace = NULL; + *whereClause = NULL; + + /* Fetch the pg_index tuple by the Oid of the index */ + ht_idx = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexrelid)); + if (!HeapTupleIsValid(ht_idx)) + elog(ERROR, "cache lookup failed for index with OID %u", indexrelid); + idxrec = (Form_pg_index) GETSTRUCT(ht_idx); + + indrelid = idxrec->indrelid; + Assert(indexrelid == idxrec->indexrelid); + + /* Must get indcollation, indclass, and indoption the hard way */ + indcollDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indcollation, &isnull); + Assert(!isnull); + indcollation = (oidvector *) DatumGetPointer(indcollDatum); + + indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + + indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indoption, &isnull); + Assert(!isnull); + indoption = (int2vector *) DatumGetPointer(indoptionDatum); + + /* Fetch the pg_class tuple of the index relation */ + ht_idxrel = SearchSysCache1(RELOID, ObjectIdGetDatum(indexrelid)); + if (!HeapTupleIsValid(ht_idxrel)) + elog(ERROR, "cache lookup failed for relation with OID %u", indexrelid); + idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); + + /* Fetch the pg_am tuple of the index' access method */ + ht_am = SearchSysCache1(AMOID, ObjectIdGetDatum(idxrelrec->relam)); + if (!HeapTupleIsValid(ht_am)) + elog(ERROR, "cache lookup failed for access method with OID %u", + idxrelrec->relam); + amrec = (Form_pg_am) GETSTRUCT(ht_am); + + /* + * Get the index expressions, if any. (NOTE: we do not use the relcache + * versions of the expressions and predicate, because we want to display + * non-const-folded expressions.) + */ + if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs, NULL)) + { + Datum exprsDatum; + char *exprsString; + + exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indexprs, &isnull); + Assert(!isnull); + exprsString = TextDatumGetCString(exprsDatum); + indexprs = (List *) stringToNode(exprsString); + pfree(exprsString); + } + else + indexprs = NIL; + + indexpr_item = list_head(indexprs); + + context = deparse_context_for(get_rel_name(indrelid), indrelid); + + initStringInfo(&definitionBuf); + + /* Output index AM */ + *index_am = pstrdup(quote_identifier(NameStr(amrec->amname))); + + /* Fetch the index AM's API struct */ + amroutine = GetIndexAmRoutine(amrec->amhandler); + + /* + * Output index definition. Note the outer parens must be supplied by + * caller. + */ + appendStringInfoString(&definitionBuf, "("); + for (keyno = 0; keyno < idxrec->indnatts; keyno++) + { + AttrNumber attnum = idxrec->indkey.values[keyno]; + int16 opt = indoption->values[keyno]; + Oid keycoltype; + Oid keycolcollation; + + /* Print INCLUDE to divide key and non-key attrs. */ + if (keyno == idxrec->indnkeyatts) + { + appendStringInfoString(&definitionBuf, ") INCLUDE ("); + } + else + appendStringInfoString(&definitionBuf, keyno == 0 ? "" : ", "); + + if (attnum != 0) + { + /* Simple index column */ + char *attname; + int32 keycoltypmod; + + attname = get_attname(indrelid, attnum, false); + appendStringInfoString(&definitionBuf, quote_identifier(attname)); + get_atttypetypmodcoll(indrelid, attnum, + &keycoltype, &keycoltypmod, + &keycolcollation); + } + else + { + /* Expressional index */ + Node *indexkey; + char *str; + + if (indexpr_item == NULL) + elog(ERROR, "too few entries in indexprs list"); + indexkey = (Node *) lfirst(indexpr_item); + indexpr_item = lnext(indexprs, indexpr_item); + + /* Deparse */ + str = deparse_expression(indexkey, context, false, false); + + /* Need parens if it's not a bare function call */ + if (indexkey && IsA(indexkey, FuncExpr) && + ((FuncExpr *) indexkey)->funcformat == COERCE_EXPLICIT_CALL) + appendStringInfoString(&definitionBuf, str); + else + appendStringInfo(&definitionBuf, "(%s)", str); + + keycoltype = exprType(indexkey); + keycolcollation = exprCollation(indexkey); + } + + /* Print additional decoration for (selected) key columns, even if default */ + if (keyno < idxrec->indnkeyatts) + { + Oid indcoll = indcollation->values[keyno]; + if (OidIsValid(indcoll)) + appendStringInfo(&definitionBuf, " COLLATE %s", + generate_collation_name((indcoll))); + + /* Add the operator class name, even if default */ + get_opclass_name(indclass->values[keyno], InvalidOid, &definitionBuf); + + /* Add options if relevant */ + if (amroutine->amcanorder) + { + /* If it supports sort ordering, report DESC and NULLS opts */ + if (opt & INDOPTION_DESC) + { + appendStringInfoString(&definitionBuf, " DESC"); + /* NULLS FIRST is the default in this case */ + if (!(opt & INDOPTION_NULLS_FIRST)) + appendStringInfoString(&definitionBuf, " NULLS LAST"); + } + else + { + if (opt & INDOPTION_NULLS_FIRST) + appendStringInfoString(&definitionBuf, " NULLS FIRST"); + } + } + + /* XXX excludeOps thingy was here; do we need anything? */ + } + } + appendStringInfoString(&definitionBuf, ")"); + *definition = definitionBuf.data; + + /* Output reloptions */ + *reloptions = flatten_reloptions(indexrelid); + + /* Output tablespace */ + { + Oid tblspc; + + tblspc = get_rel_tablespace(indexrelid); + if (OidIsValid(tblspc)) + *tablespace = pstrdup(quote_identifier(get_tablespace_name(tblspc))); + } + + /* Report index predicate, if any */ + if (!heap_attisnull(ht_idx, Anum_pg_index_indpred, NULL)) + { + Node *node; + Datum predDatum; + char *predString; + + /* Convert text string to node tree */ + predDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indpred, &isnull); + Assert(!isnull); + predString = TextDatumGetCString(predDatum); + node = (Node *) stringToNode(predString); + pfree(predString); + + /* Deparse */ + *whereClause = deparse_expression(node, context, false, false); + } + + /* Clean up */ + ReleaseSysCache(ht_idx); + ReleaseSysCache(ht_idxrel); + ReleaseSysCache(ht_am); +} + /* * Obtain the deparsed default value for the given column of the given table. * @@ -1927,6 +2165,102 @@ deparse_TableElements(Relation relation, List *tableElements, List *dpcontext, return elements; } +/* + * Deparse an IndexStmt. + * + * Given an index OID and the parse tree that created it, return an ObjTree + * representing the creation command. + * + * If the index corresponds to a constraint, NULL is returned. + * + * Verbose syntax + * CREATE %{unique}s INDEX %{concurrently}s %{if_not_exists}s %{name}I ON + * %{only}s %{table}D USING %{index_am}s %{definition}s %{nulls_not_distinct}s + * %{with}s %{tablespace}s %{where_clause}s + */ +static ObjTree * +deparse_IndexStmt(Oid objectId, Node *parsetree) +{ + IndexStmt *node = (IndexStmt *) parsetree; + ObjTree *ret; + ObjTree *tmp_obj; + Relation idxrel; + Relation heaprel; + char *index_am; + char *definition; + char *reloptions; + char *tablespace; + char *whereClause; + + if (node->primary || node->isconstraint) + { + /* + * Indexes for PRIMARY KEY and other constraints are output + * separately; return empty here. + */ + return NULL; + } + + idxrel = relation_open(objectId, AccessShareLock); + heaprel = relation_open(idxrel->rd_index->indrelid, AccessShareLock); + + pg_get_indexdef_detailed(objectId, + &index_am, &definition, &reloptions, + &tablespace, &whereClause); + + ret = new_objtree_VA("CREATE %{unique}s INDEX %{concurrently}s %{if_not_exists}s %{name}I ON %{only}s %{table}D USING %{index_am}s %{definition}s", 8, + "unique", ObjTypeString, + node->unique ? "UNIQUE" : "", + "concurrently", ObjTypeString, + node->concurrent ? "CONCURRENTLY" : "", + "if_not_exists", ObjTypeString, + node->if_not_exists ? "IF NOT EXISTS" : "", + "only", ObjTypeString, + node->relation->inh ? "" : "ONLY", + "name", ObjTypeString, + RelationGetRelationName(idxrel), + "table", ObjTypeObject, + new_objtree_for_qualname(heaprel->rd_rel->relnamespace, + RelationGetRelationName(heaprel)), + "index_am", ObjTypeString, index_am, + "definition", ObjTypeString, definition); + + /* nulls_not_distinct */ + if (node->nulls_not_distinct) + append_format_string(ret, "NULLS NOT DISTINCT"); + else + append_format_string(ret, "NULLS DISTINCT"); + + /* reloptions */ + tmp_obj = new_objtree("WITH"); + if (reloptions) + append_string_object(tmp_obj, "(%{opts}s)", "opts", reloptions); + else + append_not_present(tmp_obj); + append_object_object(ret, "%{with}s", tmp_obj); + + /* tablespace */ + tmp_obj = new_objtree("TABLESPACE"); + if (tablespace) + append_string_object(tmp_obj, "%{tablespace}s", "tablespace", tablespace); + else + append_not_present(tmp_obj); + append_object_object(ret, "%{tablespace}s", tmp_obj); + + /* WHERE clause */ + tmp_obj = new_objtree("WHERE"); + if (whereClause) + append_string_object(tmp_obj, "%{where}s", "where", whereClause); + else + append_not_present(tmp_obj); + append_object_object(ret, "%{where_clause}s", tmp_obj); + + table_close(idxrel, AccessShareLock); + table_close(heaprel, AccessShareLock); + + return ret; +} + /* * Deparse a CreateStmt (CREATE TABLE). * @@ -3142,6 +3476,7 @@ deparse_RenameStmt(ObjectAddress address, Node *parsetree) */ switch (node->renameType) { + case OBJECT_INDEX: case OBJECT_TABLE: relation = relation_open(address.objectId, AccessShareLock); schemaId = RelationGetNamespace(relation); @@ -3224,6 +3559,40 @@ deparse_RenameStmt(ObjectAddress address, Node *parsetree) return ret; } +/* + * Deparse a AlterObjectDependsStmt (ALTER ... DEPENDS ON ...). + * + * Verbose syntax + * ALTER INDEX %{identity}D %{no}s DEPENDS ON EXTENSION %{newname}I + */ +static ObjTree * +deparse_AlterDependStmt(Oid objectId, Node *parsetree) +{ + AlterObjectDependsStmt *node = (AlterObjectDependsStmt *) parsetree; + ObjTree *ret = NULL; + + if (node->objectType == OBJECT_INDEX) + { + ObjTree *qualified; + Relation relation = relation_open(objectId, AccessShareLock); + + qualified = new_objtree_for_qualname(relation->rd_rel->relnamespace, + node->relation->relname); + relation_close(relation, AccessShareLock); + + ret = new_objtree_VA("ALTER INDEX %{identity}D %{no}s DEPENDS ON EXTENSION %{newname}I", 3, + "identity", ObjTypeObject, qualified, + "no", ObjTypeString, + node->remove ? "NO" : "", + "newname", ObjTypeString, strVal(node->extname)); + } + else + elog(LOG, "unrecognized node type in deparse command: %d", + (int) nodeTag(parsetree)); + + return ret; +} + /* * Handle deparsing of simple commands. * @@ -3246,6 +3615,9 @@ deparse_simple_command(CollectedCommand *cmd) /* This switch needs to handle everything that ProcessUtilitySlow does */ switch (nodeTag(parsetree)) { + case T_AlterObjectDependsStmt: + return deparse_AlterDependStmt(objectId, parsetree); + case T_AlterObjectSchemaStmt: return deparse_AlterObjectSchemaStmt(cmd->d.simple.address, parsetree, @@ -3257,6 +3629,9 @@ deparse_simple_command(CollectedCommand *cmd) case T_CreateStmt: return deparse_CreateStmt(objectId, parsetree); + case T_IndexStmt: + return deparse_IndexStmt(objectId, parsetree); + case T_RenameStmt: return deparse_RenameStmt(cmd->d.simple.address, parsetree); -- 2.39.1.windows.1