From 18f71a62fcdfbdcf490c16fac44e58a4a09086f5 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 16 Feb 2023 02:23:45 -0500 Subject: [PATCH 8/8] Allow replicated objects to have the same owner from the publisher. Specifically, the changes include: 1. Change event trigger functions to collect the current role in CollectedCommand. 2. Change Deparser function deparse_utility_command to encode owner role in the top-level element such as {myowner:role_name, fmt:..., identity:...} of the deparsed jsonb output for commands that create database objects. Also change function deparse_ddl_json_to_string to retrieve the myowner element from a jsonb string. 3. Introduce a new subscription option match_ddl_owner: when turned on, the apply worker will apply DDL messages in the role retrieved from the "myowner" field of the deparsed jsonb string. The default value of match_ddl_owner is on. --- src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/ddl_deparse.c | 85 +++++++++-- src/backend/commands/ddl_json.c | 25 ++- src/backend/commands/event_trigger.c | 6 + src/backend/commands/subscriptioncmds.c | 27 +++- src/backend/replication/logical/ddltrigger.c | 6 +- src/backend/replication/logical/worker.c | 22 ++- src/bin/pg_dump/pg_dump.c | 16 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/include/catalog/pg_subscription.h | 5 + src/include/tcop/ddl_deparse.h | 4 +- src/include/tcop/deparse_utility.h | 1 + src/test/regress/expected/subscription.out | 144 +++++++++--------- .../subscription/t/032_ddl_replication.pl | 17 +++ 15 files changed, 269 insertions(+), 99 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a56ae311c3..022aab0fa9 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->matchddlowner = subform->submatchddlowner; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c index fe2f715fc6..5eab104e12 100644 --- a/src/backend/commands/ddl_deparse.c +++ b/src/backend/commands/ddl_deparse.c @@ -169,7 +169,7 @@ static ObjElem *new_object_object(ObjTree *value); static ObjTree *new_objtree_VA(char *fmt, int numobjs,...); static ObjTree *new_objtree(char *fmt); static ObjElem *new_string_object(char *value); -static JsonbValue *objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state); +static JsonbValue *objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state, char *owner); static void pg_get_indexdef_detailed(Oid indexrelid, char **index_am, char **definition, @@ -990,14 +990,39 @@ objtree_fmt_to_jsonb_element(JsonbParseState *state, ObjTree *tree) } /* - * Create a JSONB representation from an ObjTree. + * Process the role string into the output parse state. + */ +static void +role_to_jsonb_element(JsonbParseState *state, char *owner) +{ + JsonbValue key; + JsonbValue val; + + if (owner == NULL) + return; + + /* Push the key first */ + key.type = jbvString; + key.val.string.val = "myowner"; + key.val.string.len = strlen(key.val.string.val); + pushJsonbValue(&state, WJB_KEY, &key); + + /* Then process the role string */ + val.type = jbvString; + val.val.string.len = strlen(owner); + val.val.string.val = owner; + pushJsonbValue(&state, WJB_VALUE, &val); +} + +/* + * Create a JSONB representation from an ObjTree and its owner (if given). */ static Jsonb * -objtree_to_jsonb(ObjTree *tree) +objtree_to_jsonb(ObjTree *tree, char *owner) { JsonbValue *value; - value = objtree_to_jsonb_rec(tree, NULL); + value = objtree_to_jsonb_rec(tree, NULL, owner); return JsonbValueToJsonb(value); } @@ -1049,7 +1074,7 @@ objtree_to_jsonb_element(JsonbParseState *state, ObjElem *object, case ObjTypeObject: /* Recursively add the object into the existing parse state */ - objtree_to_jsonb_rec(object->value.object, state); + objtree_to_jsonb_rec(object->value.object, state, NULL); break; case ObjTypeArray: @@ -1077,12 +1102,13 @@ objtree_to_jsonb_element(JsonbParseState *state, ObjElem *object, * Recursive helper for objtree_to_jsonb. */ static JsonbValue * -objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state) +objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state, char *owner) { slist_iter iter; pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL); + role_to_jsonb_element(state, owner); objtree_fmt_to_jsonb_element(state, tree); slist_foreach(iter, &tree->params) @@ -3709,7 +3735,7 @@ deparse_drop_command(const char *objidentity, const char *objecttype, "present", ObjTypeBool, behavior == DROP_CASCADE); append_object_object(stmt, "%{cascade}s", tmp_obj); - jsonb = objtree_to_jsonb(stmt); + jsonb = objtree_to_jsonb(stmt, NULL /* Owner/role can be skipped for drop command */); command = JsonbToCString(&str, &jsonb->root, JSONB_ESTIMATED_LEN); return command; @@ -9651,7 +9677,7 @@ deparse_CreateAmStmt(Oid objectId, Node *parsetree) * This function should cover all cases handled in ProcessUtilitySlow. */ static ObjTree * -deparse_simple_command(CollectedCommand *cmd) +deparse_simple_command(CollectedCommand *cmd, bool *include_owner) { Oid objectId; Node *parsetree; @@ -9668,64 +9694,83 @@ deparse_simple_command(CollectedCommand *cmd) switch (nodeTag(parsetree)) { case T_AlterCollationStmt: + *include_owner = false; return deparse_AlterCollation(objectId, parsetree); case T_AlterDomainStmt: + *include_owner = false; return deparse_AlterDomainStmt(objectId, parsetree, cmd->d.simple.secondaryObject); case T_AlterEnumStmt: + *include_owner = false; return deparse_AlterEnumStmt(objectId, parsetree); case T_AlterExtensionContentsStmt: + *include_owner = false; return deparse_AlterExtensionContentsStmt(objectId, parsetree, cmd->d.simple.secondaryObject); case T_AlterExtensionStmt: + *include_owner = false; return deparse_AlterExtensionStmt(objectId, parsetree); case T_AlterFdwStmt: + *include_owner = false; return deparse_AlterFdwStmt(objectId, parsetree); case T_AlterForeignServerStmt: + *include_owner = false; return deparse_AlterForeignServerStmt(objectId, parsetree); case T_AlterFunctionStmt: + *include_owner = false; return deparse_AlterFunction(objectId, parsetree); case T_AlterObjectDependsStmt: + *include_owner = false; return deparse_AlterDependStmt(objectId, parsetree); case T_AlterObjectSchemaStmt: + *include_owner = false; return deparse_AlterObjectSchemaStmt(cmd->d.simple.address, parsetree, cmd->d.simple.secondaryObject); case T_AlterOperatorStmt: + *include_owner = false; return deparse_AlterOperatorStmt(objectId, parsetree); case T_AlterOwnerStmt: + *include_owner = false; return deparse_AlterOwnerStmt(cmd->d.simple.address, parsetree); case T_AlterPolicyStmt: + *include_owner = false; return deparse_AlterPolicyStmt(objectId, parsetree); case T_AlterSeqStmt: + *include_owner = false; return deparse_AlterSeqStmt(objectId, parsetree); case T_AlterStatsStmt: + *include_owner = false; return deparse_AlterStatsStmt(objectId, parsetree); case T_AlterTSDictionaryStmt: + *include_owner = false; return deparse_AlterTSDictionaryStmt(objectId, parsetree); case T_AlterTypeStmt: + *include_owner = false; return deparse_AlterTypeSetStmt(objectId, parsetree); case T_AlterUserMappingStmt: + *include_owner = false; return deparse_AlterUserMappingStmt(objectId, parsetree); case T_CommentStmt: + *include_owner = false; return deparse_CommentStmt(cmd->d.simple.address, parsetree); case T_CompositeTypeStmt: @@ -9805,9 +9850,11 @@ deparse_simple_command(CollectedCommand *cmd) return deparse_IndexStmt(objectId, parsetree); case T_RefreshMatViewStmt: + *include_owner = false; return deparse_RefreshMatViewStmt(objectId, parsetree); case T_RenameStmt: + *include_owner = false; return deparse_RenameStmt(cmd->d.simple.address, parsetree); case T_RuleStmt: @@ -9826,9 +9873,15 @@ deparse_simple_command(CollectedCommand *cmd) /* * Workhorse to deparse a CollectedCommand. + * + * include_owner indicates if the owner/role of the command should be + * included in the deparsed Json output. It is set to false for any commands + * that don't CREATE database objects (ALTER commands for example), this is + * to avoid encoding and sending the owner to downstream for replay as it is + * unnecessary for such commands. */ char * -deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) +deparse_utility_command(CollectedCommand *cmd, bool include_owner, bool verbose_mode) { OverrideSearchPath *overridePath; MemoryContext oldcxt; @@ -9869,36 +9922,43 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) switch (cmd->type) { case SCT_Simple: - tree = deparse_simple_command(cmd); + tree = deparse_simple_command(cmd, &include_owner); break; case SCT_AlterTable: tree = deparse_AlterRelation(cmd); + include_owner = false; break; case SCT_Grant: tree = deparse_GrantStmt(cmd); + include_owner = false; break; case SCT_CreateTableAs: tree = deparse_CreateTableAsStmt(cmd); break; case SCT_AlterOpFamily: + include_owner = false; tree = deparse_AlterOpFamily(cmd); break; case SCT_CreateOpClass: tree = deparse_CreateOpClassStmt(cmd); break; case SCT_AlterDefaultPrivileges: + include_owner = false; tree = deparse_AlterDefaultPrivilegesStmt(cmd); break; case SCT_CreatePublication: tree = deparse_CreatePublicationStmt(cmd); break; case SCT_AlterPublication: + include_owner = false; tree = deparse_AlterPublicationStmt(cmd); break; case SCT_AlterTSConfig: + include_owner = false; tree = deparse_AlterTSConfigurationStmt(cmd); break; case SCT_SecurityLabel: + include_owner = false; tree = deparse_SecLabelStmt(cmd); break; default: @@ -9911,7 +9971,8 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) { Jsonb *jsonb; - jsonb = objtree_to_jsonb(tree); + jsonb = include_owner ? objtree_to_jsonb(tree, cmd->role) : + objtree_to_jsonb(tree, NULL); command = JsonbToCString(&str, &jsonb->root, JSONB_ESTIMATED_LEN); } @@ -9937,7 +9998,7 @@ ddl_deparse_to_json(PG_FUNCTION_ARGS) CollectedCommand *cmd = (CollectedCommand *) PG_GETARG_POINTER(0); char *command; - command = deparse_utility_command(cmd, true); + command = deparse_utility_command(cmd, false, true); if (command) PG_RETURN_TEXT_P(cstring_to_text(command)); diff --git a/src/backend/commands/ddl_json.c b/src/backend/commands/ddl_json.c index 2d7dff4bda..ef7f828466 100644 --- a/src/backend/commands/ddl_json.c +++ b/src/backend/commands/ddl_json.c @@ -714,7 +714,7 @@ expand_jsonb_array(StringInfo buf, char *param, * Workhorse for ddl_deparse_expand_command. */ char * -deparse_ddl_json_to_string(char *json_str) +deparse_ddl_json_to_string(char *json_str, char** owner) { Datum d; Jsonb *jsonb; @@ -725,6 +725,27 @@ deparse_ddl_json_to_string(char *json_str) d = DirectFunctionCall1(jsonb_in, PointerGetDatum(json_str)); jsonb = (Jsonb *) DatumGetPointer(d); + if (owner != NULL) + { + const char *key = "myowner"; + JsonbValue *value; + + value = getKeyJsonValueFromContainer(&jsonb->root, key, strlen(key), NULL); + if (value) + { + char *str; + + /* value->val.string.val may not be NULL terminated */ + str = palloc(value->val.string.len + 1); + memcpy(str, value->val.string.val, value->val.string.len); + str[value->val.string.len] = '\0'; + *owner = str; + } + else + /* myowner is not given in this jsonb, e.g. for Drop Commands */ + *owner = NULL; + } + expand_fmt_recursive(buf, &jsonb->root); return buf->data; @@ -761,7 +782,7 @@ ddl_deparse_expand_command(PG_FUNCTION_ARGS) json_str = text_to_cstring(json); - PG_RETURN_TEXT_P(cstring_to_text(deparse_ddl_json_to_string(json_str))); + PG_RETURN_TEXT_P(cstring_to_text(deparse_ddl_json_to_string(json_str, NULL))); } /* diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index ca67d6b289..dfa62feff2 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -882,6 +882,7 @@ EventTriggerTableInitWriteStart(Node *parsetree) command->type = (stmt->objtype == OBJECT_TABLE) ? SCT_CreateTableAs : SCT_Simple; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.ctas.address = InvalidObjectAddress; command->d.ctas.real_create = NULL; command->parsetree = copyObject(parsetree); @@ -1627,6 +1628,7 @@ EventTriggerCollectSimpleCommand(ObjectAddress address, command->type = SCT_Simple; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.simple.address = address; command->d.simple.secondaryObject = secondaryObject; @@ -1663,6 +1665,7 @@ EventTriggerAlterTableStart(Node *parsetree) command->type = SCT_AlterTable; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.alterTable.classId = RelationRelationId; command->d.alterTable.objectId = InvalidOid; @@ -1930,6 +1933,7 @@ EventTriggerCollectGrant(InternalGrant *istmt) command = palloc(sizeof(CollectedCommand)); command->type = SCT_Grant; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.grant.istmt = icopy; command->parsetree = NULL; @@ -1961,6 +1965,7 @@ EventTriggerCollectAlterOpFam(AlterOpFamilyStmt *stmt, Oid opfamoid, command = palloc(sizeof(CollectedCommand)); command->type = SCT_AlterOpFamily; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); ObjectAddressSet(command->d.opfam.address, OperatorFamilyRelationId, opfamoid); command->d.opfam.operators = operators; @@ -2190,6 +2195,7 @@ EventTriggerCollectSecLabel(ObjectAddress address, char *provider, command = palloc0(sizeof(CollectedCommand)); command->type = SCT_SecurityLabel; command->in_extension = creating_extension; + command->role = GetUserNameFromId(GetUserId(), false); command->d.seclabel.address = address; command->d.seclabel.provider = provider; command->parsetree = (Node *) copyObject(stmt); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 464db6d247..f7bb73843e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -66,6 +66,7 @@ #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 #define SUBOPT_ORIGIN 0x00001000 +#define SUBOPT_MATCH_DDL_OWNER 0x00002000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -89,6 +90,7 @@ typedef struct SubOpts bool twophase; bool disableonerr; char *origin; + bool matchddlowner; XLogRecPtr lsn; } SubOpts; @@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->disableonerr = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_MATCH_DDL_OWNER)) + opts->matchddlowner = true; /* Parse options */ foreach(lc, stmt_options) @@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_MATCH_DDL_OWNER) && + strcmp(defel->defname, "match_ddl_owner") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MATCH_DDL_OWNER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MATCH_DDL_OWNER; + opts->matchddlowner = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -560,7 +573,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN); + SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN | + SUBOPT_MATCH_DDL_OWNER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -649,6 +663,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -1054,7 +1069,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | SUBOPT_MATCH_DDL_OWNER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1118,6 +1133,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_suborigin - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_MATCH_DDL_OWNER)) + { + values[Anum_pg_subscription_submatchddlowner - 1] + = BoolGetDatum(opts.matchddlowner); + replaces[Anum_pg_subscription_submatchddlowner - 1] + = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/logical/ddltrigger.c b/src/backend/replication/logical/ddltrigger.c index 3cad469033..5c9afc3036 100644 --- a/src/backend/replication/logical/ddltrigger.c +++ b/src/backend/replication/logical/ddltrigger.c @@ -122,7 +122,7 @@ publication_deparse_table_rewrite(PG_FUNCTION_ARGS) if (relpersist != RELPERSISTENCE_TEMP) { /* Deparse the DDL command and WAL log it to allow decoding of the same. */ - json_string = deparse_utility_command(cmd, false); + json_string = deparse_utility_command(cmd, true, false); if (json_string != NULL) LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter, @@ -200,7 +200,7 @@ publication_deparse_ddl_command_end(PG_FUNCTION_ARGS) * Deparse the DDL command and WAL log it to allow decoding of the * same. */ - json_string = deparse_utility_command(cmd, false); + json_string = deparse_utility_command(cmd, true, false); if (json_string != NULL) LogLogicalDDLMessage("deparse", relid, type, json_string, @@ -348,7 +348,7 @@ publication_deparse_table_init_write(PG_FUNCTION_ARGS) return PointerGetDatum(NULL); /* Deparse the DDL command and WAL log it to allow decoding of the same. */ - json_string = deparse_utility_command(cmd, false); + json_string = deparse_utility_command(cmd, true, false); if (json_string != NULL) LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 1a8afb9a6b..01f021727a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3357,21 +3357,33 @@ apply_handle_ddl(StringInfo s) const char *prefix = NULL; char *message = NULL; char *ddl_command; + char *owner; Size sz; List *parsetree_list; ListCell *parsetree_item; DestReceiver *receiver; MemoryContext oldcontext; const char *save_debug_query_string = debug_query_string; + int save_nestlevel = 0; message = logicalrep_read_ddl(s, &lsn, &prefix, &sz); /* Make sure we are in a transaction command */ begin_replication_step(); - ddl_command = deparse_ddl_json_to_string(message); + ddl_command = deparse_ddl_json_to_string(message, &owner); debug_query_string = ddl_command; + if (MySubscription->matchddlowner && owner) + { + /* + * Set the current role to the owner that executed the command on the + * publication server. + */ + save_nestlevel = NewGUCNestLevel(); + SetConfigOption("role", owner, PGC_INTERNAL, PGC_S_OVERRIDE); + } + /* DestNone for logical replication */ receiver = CreateDestReceiver(DestNone); parsetree_list = pg_parse_query(ddl_command); @@ -3469,6 +3481,14 @@ apply_handle_ddl(StringInfo s) MemoryContextDelete(per_parsetree_context); } + /* + * Restore the GUC variables we set above. + */ + if (save_nestlevel > 0) + { + AtEOXact_GUC(true, save_nestlevel); + } + debug_query_string = save_debug_query_string; end_replication_step(); } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 1b2998021f..a0e2e2d9a9 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4520,6 +4520,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_submatchddlowner; int i, ntups; @@ -4572,9 +4573,14 @@ getSubscriptions(Archive *fout) LOGICALREP_TWOPHASE_STATE_DISABLED); if (fout->remoteVersion >= 160000) - appendPQExpBufferStr(query, " s.suborigin\n"); + appendPQExpBufferStr(query, + " s.suborigin,\n" + " s.submatchddlowner\n"); else - appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY); + appendPQExpBuffer(query, + " '%s' AS suborigin,\n" + " false AS submatchddlowner\n", + LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, "FROM pg_subscription s\n" @@ -4602,6 +4608,7 @@ getSubscriptions(Archive *fout) i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_suborigin = PQfnumber(res, "suborigin"); + i_submatchddlowner = PQfnumber(res, "submatchddlowner"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4632,6 +4639,8 @@ getSubscriptions(Archive *fout) subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + subinfo[i].submatchddlowner = + pg_strdup(PQgetvalue(res, i, i_submatchddlowner)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4710,6 +4719,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0) appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin); + if (strcmp(subinfo->submatchddlowner, "f") == 0) + appendPQExpBufferStr(query, ", match_ddl_owner = false"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 3ea30ce6fb..56451d6779 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -663,6 +663,7 @@ typedef struct _SubscriptionInfo char *suborigin; char *subsynccommit; char *subpublications; + char *submatchddlowner; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7832ef04cc..601db2a181 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6500,7 +6500,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6557,8 +6557,10 @@ describeSubscriptions(const char *pattern, bool verbose) if (pset.sversion >= 160000) appendPQExpBuffer(&buf, - ", suborigin AS \"%s\"\n", - gettext_noop("Origin")); + ", suborigin AS \"%s\"\n" + ", submatchddlowner AS \"%s\"\n", + gettext_noop("Origin"), + gettext_noop("Match DDL owner")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index b0f2a1705d..17af7c7750 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool submatchddlowner; /* True if replicated objects by DDL replication + * should match the original owner on the publisher */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -137,6 +140,8 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + bool matchddlowner; /* Indicates if replicated objects by DDL replication + * should match the original owner on the publisher */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/tcop/ddl_deparse.h b/src/include/tcop/ddl_deparse.h index d519cc73ed..c7f98a78fa 100644 --- a/src/include/tcop/ddl_deparse.h +++ b/src/include/tcop/ddl_deparse.h @@ -24,8 +24,8 @@ typedef struct publication_rel List *columnList; /* column list */ } publication_rel; -extern char *deparse_utility_command(CollectedCommand *cmd, bool verbose_mode); -extern char *deparse_ddl_json_to_string(char *jsonb); +extern char *deparse_utility_command(CollectedCommand *cmd, bool include_owner, bool verbose_mode); +extern char *deparse_ddl_json_to_string(char *jsonb, char** owner); extern char *deparse_drop_command(const char *objidentity, const char *objecttype, DropBehavior behavior); diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h index b4e62fefc9..c177576a84 100644 --- a/src/include/tcop/deparse_utility.h +++ b/src/include/tcop/deparse_utility.h @@ -51,6 +51,7 @@ typedef struct CollectedCommand CollectedCommandType type; bool in_extension; + char *role; Node *parsetree; union diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 3f99b14394..af894f75b9 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -163,10 +163,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -324,10 +324,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -375,10 +375,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -404,18 +404,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/subscription/t/032_ddl_replication.pl b/src/test/subscription/t/032_ddl_replication.pl index 4bc4ff2212..902d2034f5 100644 --- a/src/test/subscription/t/032_ddl_replication.pl +++ b/src/test/subscription/t/032_ddl_replication.pl @@ -457,6 +457,23 @@ is($result, qq(42), 'CREATE TABLE OF replicated'); $node_publisher->safe_psql('postgres', "DROP TABLE tmp"); $node_publisher->safe_psql('postgres', "DROP TYPE int42 cascade"); +# Test owner of replicated table on subscriber matches the owner on publisher when +# the match_ddl_owner subscription option is enabled +$node_publisher->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;"); + +$node_subscriber->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;"); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION mysub SET (match_ddl_owner = true);"); + +$node_publisher->safe_psql('postgres', "SET SESSION AUTHORIZATION 'ddl_replication_user'; CREATE TABLE tmp (a int, b varchar);"); +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT tableowner from pg_catalog.pg_tables where tablename = 'tmp';"); +is($result, qq(ddl_replication_user), 'Owner of tmp is ddl_replication_user'); +$node_publisher->safe_psql('postgres', "DROP TABLE tmp"); +# reset match_ddl_owner +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION mysub SET (match_ddl_owner = false);"); + + pass "DDL replication tests passed:"; $node_subscriber->stop; -- 2.39.1