From c1ab5d01abbc65f6ebcd43edb5c6116bf122c6aa Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 29 Mar 2023 21:20:00 +0800 Subject: [PATCH 4/5] Apply the DDL change as that same user that executed the DDL on publisher. 1. Change event trigger functions to collect the current role in CollectedCommand. 2. Change Deparser function deparse_utility_command to encode owner role in the top-level element such as {myowner:role_name, fmt:..., identity:...} of the deparsed jsonb output for commands that create database objects. Also change function deparse_ddl_json_to_string to retrieve the myowner element from a jsonb string. 3. Introduce a new subscription option match_ddl_owner: when turned on, the apply worker will apply DDL messages in the role retrieved from the "myowner" field of the deparsed jsonb string. The default value of match_ddl_owner is on. --- src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/ddl_deparse.c | 62 ++++++-- src/backend/commands/ddl_json.c | 25 +++- src/backend/commands/event_trigger.c | 5 + src/backend/commands/subscriptioncmds.c | 27 +++- src/backend/replication/logical/ddltrigger.c | 6 +- src/backend/replication/logical/worker.c | 22 ++- src/bin/pg_dump/pg_dump.c | 16 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/include/catalog/pg_subscription.h | 5 + src/include/tcop/ddl_deparse.h | 4 +- src/include/tcop/deparse_utility.h | 1 + src/test/regress/expected/subscription.out | 144 +++++++++---------- 14 files changed, 228 insertions(+), 99 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d322b9482c..d4f482ab3b 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->matchddlowner = subform->submatchddlowner; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c index 57befc7855..7c4198bf46 100644 --- a/src/backend/commands/ddl_deparse.c +++ b/src/backend/commands/ddl_deparse.c @@ -165,7 +165,7 @@ static ObjElem *new_object(ObjType type, char *name); static ObjTree *new_objtree_for_qualname_id(Oid classId, Oid objectId); static ObjElem *new_object_object(ObjTree *value); static ObjTree *new_objtree_VA(char *fmt, int numobjs,...); -static JsonbValue *objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state); +static JsonbValue *objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state, char *owner); static char *RelationGetColumnDefault(Relation rel, AttrNumber attno, List *dpcontext, List **exprs); @@ -808,14 +808,39 @@ objtree_fmt_to_jsonb_element(JsonbParseState *state, ObjTree *tree) } /* - * Create a JSONB representation from an ObjTree. + * Process the role string into the output parse state. + */ +static void +role_to_jsonb_element(JsonbParseState *state, char *owner) +{ + JsonbValue key; + JsonbValue val; + + if (owner == NULL) + return; + + /* Push the key first */ + key.type = jbvString; + key.val.string.val = "myowner"; + key.val.string.len = strlen(key.val.string.val); + pushJsonbValue(&state, WJB_KEY, &key); + + /* Then process the role string */ + val.type = jbvString; + val.val.string.len = strlen(owner); + val.val.string.val = owner; + pushJsonbValue(&state, WJB_VALUE, &val); +} + +/* + * Create a JSONB representation from an ObjTree and its owner (if given). */ static Jsonb * -objtree_to_jsonb(ObjTree *tree) +objtree_to_jsonb(ObjTree *tree, char *owner) { JsonbValue *value; - value = objtree_to_jsonb_rec(tree, NULL); + value = objtree_to_jsonb_rec(tree, NULL, owner); return JsonbValueToJsonb(value); } @@ -867,7 +892,7 @@ objtree_to_jsonb_element(JsonbParseState *state, ObjElem *object, case ObjTypeObject: /* Recursively add the object into the existing parse state */ - objtree_to_jsonb_rec(object->value.object, state); + objtree_to_jsonb_rec(object->value.object, state, NULL); break; case ObjTypeArray: @@ -895,12 +920,13 @@ objtree_to_jsonb_element(JsonbParseState *state, ObjElem *object, * Recursive helper for objtree_to_jsonb. */ static JsonbValue * -objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state) +objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state, char *owner) { slist_iter iter; pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL); + role_to_jsonb_element(state, owner); objtree_fmt_to_jsonb_element(state, tree); slist_foreach(iter, &tree->params) @@ -3375,7 +3401,7 @@ deparse_drop_command(const char *objidentity, const char *objecttype, "present", ObjTypeBool, behavior == DROP_CASCADE); append_object_object(stmt, "%{cascade}s", tmp_obj); - jsonb = objtree_to_jsonb(stmt); + jsonb = objtree_to_jsonb(stmt, NULL /* Owner/role can be skipped for drop command */); command = JsonbToCString(&str, &jsonb->root, JSONB_ESTIMATED_LEN); return command; @@ -3599,7 +3625,7 @@ deparse_AlterDependStmt(Oid objectId, Node *parsetree) * This function should cover all cases handled in ProcessUtilitySlow. */ static ObjTree * -deparse_simple_command(CollectedCommand *cmd) +deparse_simple_command(CollectedCommand *cmd, bool *include_owner) { Oid objectId; Node *parsetree; @@ -3616,14 +3642,17 @@ deparse_simple_command(CollectedCommand *cmd) switch (nodeTag(parsetree)) { case T_AlterObjectDependsStmt: + *include_owner = false; return deparse_AlterDependStmt(objectId, parsetree); case T_AlterObjectSchemaStmt: + *include_owner = false; return deparse_AlterObjectSchemaStmt(cmd->d.simple.address, parsetree, cmd->d.simple.secondaryObject); case T_AlterOwnerStmt: + *include_owner = false; return deparse_AlterOwnerStmt(cmd->d.simple.address, parsetree); case T_CreateStmt: @@ -3633,6 +3662,7 @@ deparse_simple_command(CollectedCommand *cmd) return deparse_IndexStmt(objectId, parsetree); case T_RenameStmt: + *include_owner = false; return deparse_RenameStmt(cmd->d.simple.address, parsetree); default: @@ -3645,9 +3675,15 @@ deparse_simple_command(CollectedCommand *cmd) /* * Workhorse to deparse a CollectedCommand. + * + * include_owner indicates if the owner/role of the command should be + * included in the deparsed Json output. It is set to false for any commands + * that don't CREATE database objects (ALTER commands for example), this is + * to avoid encoding and sending the owner to downstream for replay as it is + * unnecessary for such commands. */ char * -deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) +deparse_utility_command(CollectedCommand *cmd, bool include_owner, bool verbose_mode) { OverrideSearchPath *overridePath; MemoryContext oldcxt; @@ -3688,10 +3724,11 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) switch (cmd->type) { case SCT_Simple: - tree = deparse_simple_command(cmd); + tree = deparse_simple_command(cmd, &include_owner); break; case SCT_AlterTable: tree = deparse_AlterRelation(cmd); + include_owner = false; break; case SCT_CreateTableAs: tree = deparse_CreateTableAsStmt(cmd); @@ -3706,7 +3743,8 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) { Jsonb *jsonb; - jsonb = objtree_to_jsonb(tree); + jsonb = include_owner ? objtree_to_jsonb(tree, cmd->role) : + objtree_to_jsonb(tree, NULL); command = JsonbToCString(&str, &jsonb->root, JSONB_ESTIMATED_LEN); } @@ -3732,7 +3770,7 @@ ddl_deparse_to_json(PG_FUNCTION_ARGS) CollectedCommand *cmd = (CollectedCommand *) PG_GETARG_POINTER(0); char *command; - command = deparse_utility_command(cmd, true); + command = deparse_utility_command(cmd, false, true); if (command) PG_RETURN_TEXT_P(cstring_to_text(command)); diff --git a/src/backend/commands/ddl_json.c b/src/backend/commands/ddl_json.c index 3a57d2697c..76cefb9487 100644 --- a/src/backend/commands/ddl_json.c +++ b/src/backend/commands/ddl_json.c @@ -718,7 +718,7 @@ expand_jsonb_array(StringInfo buf, char *param, * Workhorse for ddl_deparse_expand_command. */ char * -deparse_ddl_json_to_string(char *json_str) +deparse_ddl_json_to_string(char *json_str, char** owner) { Datum d; Jsonb *jsonb; @@ -729,6 +729,27 @@ deparse_ddl_json_to_string(char *json_str) d = DirectFunctionCall1(jsonb_in, PointerGetDatum(json_str)); jsonb = (Jsonb *) DatumGetPointer(d); + if (owner != NULL) + { + const char *key = "myowner"; + JsonbValue *value; + + value = getKeyJsonValueFromContainer(&jsonb->root, key, strlen(key), NULL); + if (value) + { + char *str; + + /* value->val.string.val may not be NULL terminated */ + str = palloc(value->val.string.len + 1); + memcpy(str, value->val.string.val, value->val.string.len); + str[value->val.string.len] = '\0'; + *owner = str; + } + else + /* myowner is not given in this jsonb, e.g. for Drop Commands */ + *owner = NULL; + } + expand_fmt_recursive(buf, &jsonb->root); return buf->data; @@ -765,7 +786,7 @@ ddl_deparse_expand_command(PG_FUNCTION_ARGS) json_str = text_to_cstring(json); - PG_RETURN_TEXT_P(cstring_to_text(deparse_ddl_json_to_string(json_str))); + PG_RETURN_TEXT_P(cstring_to_text(deparse_ddl_json_to_string(json_str, NULL))); } /* diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 71d2c43afc..38c11f000c 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -881,6 +881,7 @@ EventTriggerTableInitWriteStart(Node *parsetree) command->type = (stmt->objtype == OBJECT_TABLE) ? SCT_CreateTableAs : SCT_Simple; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.ctas.address = InvalidObjectAddress; command->d.ctas.real_create = NULL; command->parsetree = copyObject(parsetree); @@ -1626,6 +1627,7 @@ EventTriggerCollectSimpleCommand(ObjectAddress address, command->type = SCT_Simple; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.simple.address = address; command->d.simple.secondaryObject = secondaryObject; @@ -1662,6 +1664,7 @@ EventTriggerAlterTableStart(Node *parsetree) command->type = SCT_AlterTable; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.alterTable.classId = RelationRelationId; command->d.alterTable.objectId = InvalidOid; @@ -1929,6 +1932,7 @@ EventTriggerCollectGrant(InternalGrant *istmt) command = palloc(sizeof(CollectedCommand)); command->type = SCT_Grant; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.grant.istmt = icopy; command->parsetree = NULL; @@ -1960,6 +1964,7 @@ EventTriggerCollectAlterOpFam(AlterOpFamilyStmt *stmt, Oid opfamoid, command = palloc(sizeof(CollectedCommand)); command->type = SCT_AlterOpFamily; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); ObjectAddressSet(command->d.opfam.address, OperatorFamilyRelationId, opfamoid); command->d.opfam.operators = operators; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 93a238412a..a5c5f6e2fc 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -66,6 +66,7 @@ #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 #define SUBOPT_ORIGIN 0x00001000 +#define SUBOPT_MATCH_DDL_OWNER 0x00002000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -89,6 +90,7 @@ typedef struct SubOpts bool twophase; bool disableonerr; char *origin; + bool matchddlowner; XLogRecPtr lsn; } SubOpts; @@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->disableonerr = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_MATCH_DDL_OWNER)) + opts->matchddlowner = true; /* Parse options */ foreach(lc, stmt_options) @@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_MATCH_DDL_OWNER) && + strcmp(defel->defname, "match_ddl_owner") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MATCH_DDL_OWNER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MATCH_DDL_OWNER; + opts->matchddlowner = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -560,7 +573,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN); + SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN | + SUBOPT_MATCH_DDL_OWNER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -649,6 +663,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -1054,7 +1069,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | SUBOPT_MATCH_DDL_OWNER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1118,6 +1133,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_suborigin - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_MATCH_DDL_OWNER)) + { + values[Anum_pg_subscription_submatchddlowner - 1] + = BoolGetDatum(opts.matchddlowner); + replaces[Anum_pg_subscription_submatchddlowner - 1] + = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/logical/ddltrigger.c b/src/backend/replication/logical/ddltrigger.c index 70018a305c..1625dc1093 100644 --- a/src/backend/replication/logical/ddltrigger.c +++ b/src/backend/replication/logical/ddltrigger.c @@ -122,7 +122,7 @@ publication_deparse_table_rewrite(PG_FUNCTION_ARGS) if (relpersist != RELPERSISTENCE_TEMP) { /* Deparse the DDL command and WAL log it to allow decoding of the same. */ - json_string = deparse_utility_command(cmd, false); + json_string = deparse_utility_command(cmd, true, false); if (json_string != NULL) LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter, @@ -200,7 +200,7 @@ publication_deparse_ddl_command_end(PG_FUNCTION_ARGS) * Deparse the DDL command and WAL log it to allow decoding of the * same. */ - json_string = deparse_utility_command(cmd, false); + json_string = deparse_utility_command(cmd, true, false); if (json_string != NULL) LogLogicalDDLMessage("deparse", relid, type, json_string, @@ -350,7 +350,7 @@ publication_deparse_table_init_write(PG_FUNCTION_ARGS) return PointerGetDatum(NULL); /* Deparse the DDL command and WAL log it to allow decoding of the same. */ - json_string = deparse_utility_command(cmd, false); + json_string = deparse_utility_command(cmd, true, false); if (json_string != NULL) LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9e053a3d83..6710893499 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3349,21 +3349,33 @@ apply_handle_ddl(StringInfo s) const char *prefix = NULL; char *message = NULL; char *ddl_command; + char *owner; Size sz; List *parsetree_list; ListCell *parsetree_item; DestReceiver *receiver; MemoryContext oldcontext; const char *save_debug_query_string = debug_query_string; + int save_nestlevel = 0; message = logicalrep_read_ddl(s, &lsn, &prefix, &sz); /* Make sure we are in a transaction command */ begin_replication_step(); - ddl_command = deparse_ddl_json_to_string(message); + ddl_command = deparse_ddl_json_to_string(message, &owner); debug_query_string = ddl_command; + if (MySubscription->matchddlowner && owner) + { + /* + * Set the current role to the owner that executed the command on the + * publication server. + */ + save_nestlevel = NewGUCNestLevel(); + SetConfigOption("role", owner, PGC_INTERNAL, PGC_S_OVERRIDE); + } + /* DestNone for logical replication */ receiver = CreateDestReceiver(DestNone); parsetree_list = pg_parse_query(ddl_command); @@ -3461,6 +3473,14 @@ apply_handle_ddl(StringInfo s) MemoryContextDelete(per_parsetree_context); } + /* + * Restore the GUC variables we set above. + */ + if (save_nestlevel > 0) + { + AtEOXact_GUC(true, save_nestlevel); + } + debug_query_string = save_debug_query_string; end_replication_step(); } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fe50ddf591..4b3bb35752 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4622,6 +4622,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_submatchddlowner; int i, ntups; @@ -4674,9 +4675,14 @@ getSubscriptions(Archive *fout) LOGICALREP_TWOPHASE_STATE_DISABLED); if (fout->remoteVersion >= 160000) - appendPQExpBufferStr(query, " s.suborigin\n"); + appendPQExpBufferStr(query, + " s.suborigin,\n" + " s.submatchddlowner\n"); else - appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY); + appendPQExpBuffer(query, + " '%s' AS suborigin,\n" + " false AS submatchddlowner\n", + LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, "FROM pg_subscription s\n" @@ -4704,6 +4710,7 @@ getSubscriptions(Archive *fout) i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_suborigin = PQfnumber(res, "suborigin"); + i_submatchddlowner = PQfnumber(res, "submatchddlowner"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4734,6 +4741,8 @@ getSubscriptions(Archive *fout) subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + subinfo[i].submatchddlowner = + pg_strdup(PQgetvalue(res, i, i_submatchddlowner)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4812,6 +4821,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0) appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin); + if (strcmp(subinfo->submatchddlowner, "f") == 0) + appendPQExpBufferStr(query, ", match_ddl_owner = false"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a5952c53ff..ae90b2cd84 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -664,6 +664,7 @@ typedef struct _SubscriptionInfo char *suborigin; char *subsynccommit; char *subpublications; + char *submatchddlowner; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index a41500deed..700a858ac8 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6510,7 +6510,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6567,8 +6567,10 @@ describeSubscriptions(const char *pattern, bool verbose) if (pset.sversion >= 160000) appendPQExpBuffer(&buf, - ", suborigin AS \"%s\"\n", - gettext_noop("Origin")); + ", suborigin AS \"%s\"\n" + ", submatchddlowner AS \"%s\"\n", + gettext_noop("Origin"), + gettext_noop("Match DDL owner")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index b0f2a1705d..17af7c7750 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool submatchddlowner; /* True if replicated objects by DDL replication + * should match the original owner on the publisher */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -137,6 +140,8 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + bool matchddlowner; /* Indicates if replicated objects by DDL replication + * should match the original owner on the publisher */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/tcop/ddl_deparse.h b/src/include/tcop/ddl_deparse.h index 1a2702c5ac..2b7754bfac 100644 --- a/src/include/tcop/ddl_deparse.h +++ b/src/include/tcop/ddl_deparse.h @@ -14,8 +14,8 @@ #include "tcop/deparse_utility.h" -extern char *deparse_utility_command(CollectedCommand *cmd, bool verbose_mode); -extern char *deparse_ddl_json_to_string(char *jsonb); +extern char *deparse_utility_command(CollectedCommand *cmd, bool include_owner, bool verbose_mode); +extern char *deparse_ddl_json_to_string(char *jsonb, char** owner); extern char *deparse_drop_command(const char *objidentity, const char *objecttype, DropBehavior behavior); diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h index a4a12377b8..87a761bb3e 100644 --- a/src/include/tcop/deparse_utility.h +++ b/src/include/tcop/deparse_utility.h @@ -48,6 +48,7 @@ typedef struct CollectedCommand CollectedCommandType type; bool in_extension; + char *role; Node *parsetree; union diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 3f99b14394..af894f75b9 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -163,10 +163,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -324,10 +324,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -375,10 +375,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -404,18 +404,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); -- 2.39.1.windows.1