From 5c7710edbc55b8590ac977ca96cea41465a88e13 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 7 Jul 2023 12:20:25 +0900 Subject: [PATCH v2 2/2] PoC: intitial table schema synchronization in logical replication. --- src/backend/catalog/pg_publication.c | 20 +- src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 299 +++++++++++++++++--- src/backend/replication/logical/tablesync.c | 4 +- src/include/catalog/pg_proc.dat | 8 +- src/include/catalog/pg_publication.h | 2 +- src/test/regress/expected/rules.out | 2 +- 7 files changed, 285 insertions(+), 52 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index c488b6370b..b392458642 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -802,7 +802,7 @@ GetAllTablesPublications(void) * root partitioned tables. */ List * -GetAllTablesPublicationRelations(bool pubviaroot) +GetAllTablesPublicationRelations(bool pubviaroot, bool include_all_partitions) { Relation classRel; ScanKeyData key[1]; @@ -825,13 +825,13 @@ GetAllTablesPublicationRelations(bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !(relForm->relispartition && pubviaroot)) + (include_all_partitions || !(relForm->relispartition && pubviaroot))) result = lappend_oid(result, relid); } table_endscan(scan); - if (pubviaroot) + if (pubviaroot || include_all_partitions) { ScanKeyInit(&key[0], Anum_pg_class_relkind, @@ -846,7 +846,7 @@ GetAllTablesPublicationRelations(bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !relForm->relispartition) + (include_all_partitions || !relForm->relispartition)) result = lappend_oid(result, relid); } @@ -1057,6 +1057,7 @@ Datum pg_get_publication_tables(PG_FUNCTION_ARGS) { #define NUM_PUBLICATION_TABLES_ELEM 4 + bool include_all_partitions = PG_GETARG_BOOL(0); FuncCallContext *funcctx; List *table_infos = NIL; @@ -1081,7 +1082,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * Deconstruct the parameter into elements where each element is a * publication name. */ - arr = PG_GETARG_ARRAYTYPE_P(0); + arr = PG_GETARG_ARRAYTYPE_P(1); deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT, &elems, NULL, &nelems); @@ -1101,17 +1102,22 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * those. Otherwise, get the partitioned table itself. */ if (pub_elem->alltables) - pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot); + pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot, + include_all_partitions); else { List *relids, *schemarelids; relids = GetPublicationRelations(pub_elem->oid, + include_all_partitions ? + PUBLICATION_PART_ALL : pub_elem->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid, + include_all_partitions ? + PUBLICATION_PART_ALL : pub_elem->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); @@ -1148,7 +1154,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * data of the child table to be double-published on the subscriber * side. */ - if (viaroot) + if (viaroot && !include_all_partitions) filter_partitions(table_infos); /* Construct a tuple descriptor for the result rows. */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c18fea8362..1698dd5374 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -376,7 +376,7 @@ CREATE VIEW pg_publication_tables AS ) AS attnames, pg_get_expr(GPT.qual, GPT.relid) AS rowfilter FROM pg_publication P, - LATERAL pg_get_publication_tables(P.pubname) GPT, + LATERAL pg_get_publication_tables(false, P.pubname) GPT, pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d4e798baeb..bdcfb8829e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -33,6 +33,7 @@ #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "executor/spi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" @@ -71,6 +72,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_COPY_SCHEMA 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -87,6 +89,7 @@ typedef struct SubOpts bool connect; bool enabled; bool create_slot; + bool copy_schema; bool copy_data; bool refresh; bool binary; @@ -99,13 +102,16 @@ typedef struct SubOpts XLogRecPtr lsn; } SubOpts; -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_table_list(WalReceiverConn *wrconn, List *publications, + bool all_partitions); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); +static void check_pg_dump_available(void); +static void synchronize_table_schema(char *conninfo, List *tables, char *snapshot_name); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -139,6 +145,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->enabled = true; if (IsSet(supported_opts, SUBOPT_CREATE_SLOT)) opts->create_slot = true; + if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA)) + opts->copy_schema = false; if (IsSet(supported_opts, SUBOPT_COPY_DATA)) opts->copy_data = true; if (IsSet(supported_opts, SUBOPT_REFRESH)) @@ -205,6 +213,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, else ReplicationSlotValidateName(opts->slot_name, ERROR); } + else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) && + strcmp(defel->defname, "copy_schema") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_COPY_SCHEMA; + opts->copy_schema = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) { @@ -388,12 +405,30 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); + if (opts->copy_schema && + IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "copy_schema = true"))); + /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; + opts->copy_schema = false; opts->copy_data = false; } + /* + * The initial schema sync needs a snapshot that is created and exported + * while creating a replication slot. + */ + if (!opts->create_slot && IsSet(supported_opts, SUBOPT_CONNECT) && + opts->copy_schema) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("subscription with %s must also set %s", + "copy_schema = on", "create_slot = true"))); /* * Do additional checking for disallowed combination when slot_name = NONE * was used. @@ -591,7 +626,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_COPY_SCHEMA); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -636,6 +671,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, errmsg("password_required=false is superuser-only"), errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser."))); + if (opts.copy_schema) + check_pg_dump_available(); + /* * If built with appropriate switch, whine when regression-testing * conventions for subscription names are violated. @@ -750,31 +788,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); - /* - * Set sync state based on if we were asked to do data copy or - * not. - */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; - /* * Get the table list from publisher and build local table status * info. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) - { - RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; - - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - - AddSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); - } + tables = fetch_table_list(wrconn, publications, false); /* * If requested, create permanent slot for the subscription. We @@ -783,10 +801,22 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, */ if (opts.create_slot) { + List *tables_schema_sync; bool twophase_enabled = false; + char *snapshot_name = NULL; Assert(opts.slot_name); + /* + * Get the list of tables in publications including both partitioned + * tables and theirs partitions. We have to fetch the list from the + * publisher before creating the replication slot below. Otherwise, + * the exported snapshot will be invalidated when fetching the table + * list. + */ + if (opts.copy_schema) + tables_schema_sync = fetch_table_list(wrconn, publications, true); + /* * Even if two_phase is set, don't create the slot with * two-phase enabled. Will enable it once all the tables are @@ -806,8 +836,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.twophase && !opts.copy_data && tables != NIL) twophase_enabled = true; - walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); + snapshot_name = walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, + opts.copy_schema ? CRS_EXPORT_SNAPSHOT: + CRS_NOEXPORT_SNAPSHOT, NULL); + + /* Synchronize schemas of tables that will be subscribed */ + if (opts.copy_schema) + { + Assert(snapshot_name != NULL); + synchronize_table_schema(conninfo, tables_schema_sync, snapshot_name); + } if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); @@ -816,6 +854,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, (errmsg("created replication slot \"%s\" on publisher", opts.slot_name))); } + + /* + * Set sync state based on if we were asked to do data copy or + * not. + */ + table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + } PG_FINALLY(); { @@ -843,7 +902,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data, +AlterSubscription_refresh(Subscription *sub, bool copy_data, bool copy_schema, List *validate_publications) { char *err; @@ -868,6 +927,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); + if (copy_schema) + check_pg_dump_available(); + /* Try to connect to the publisher. */ must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired; wrconn = walrcv_connect(sub->conninfo, true, must_use_password, @@ -883,7 +945,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, check_publications(wrconn, validate_publications); /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = fetch_table_list(wrconn, sub->publications, false); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid, false); @@ -909,6 +971,35 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub->origin, subrel_local_oids, subrel_count, sub->name); + if (copy_schema) + { + List *rels_schema_sync = NIL; + + foreach(lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, true); + + if (!bsearch(&relid, subrel_local_oids, + subrel_count, sizeof(Oid), oid_cmp)) + rels_schema_sync = lappend(rels_schema_sync, rv); + } + + /* + * Synchronize table schemas for tables that are not present + * on the subscriber node. + * + * XXX: There is a window between creating the table and the + * tablesync worker starts processing it. If there is a DDL + * for the table, the data sync could fail. + */ + if (list_length(rels_schema_sync) > 0) + synchronize_table_schema(sub->conninfo, rels_schema_sync, + NULL); + } + /* * Rels that we want to remove from subscription and drop any slots * and origins corresponding to them. @@ -1252,7 +1343,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, case ALTER_SUBSCRIPTION_SET_PUBLICATION: { - supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; + supported_opts = SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA | SUBOPT_REFRESH; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1286,7 +1377,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data, + AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema, stmt->publication); } @@ -1299,7 +1390,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, List *publist; bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA; + supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1345,7 +1436,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Refresh the new list of publications. */ sub->publications = publist; - AlterSubscription_refresh(sub, opts.copy_data, + AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema, validate_publications); } @@ -1360,7 +1451,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA, &opts); /* * The subscription option "two_phase" requires that @@ -1387,7 +1478,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data, NULL); + AlterSubscription_refresh(sub, opts.copy_data, opts.copy_schema, NULL); break; } @@ -1957,7 +2048,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, appendStringInfoString(&cmd, "SELECT DISTINCT P.pubname AS pubname\n" "FROM pg_publication P,\n" - " LATERAL pg_get_publication_tables(P.pubname) GPT\n" + " LATERAL pg_get_publication_tables(false, P.pubname) GPT\n" " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" "WHERE C.oid = GPT.relid AND P.pubname IN ("); @@ -2044,7 +2135,8 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, * list and row filter are specified for different publications. */ static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_table_list(WalReceiverConn *wrconn, List *publications, + bool all_partitions) { WalRcvExecResult *res; StringInfoData cmd; @@ -2081,10 +2173,11 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n" " FROM pg_class c\n" " JOIN pg_namespace n ON n.oid = c.relnamespace\n" - " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n" + " JOIN ( SELECT (pg_get_publication_tables(%s, VARIADIC array_agg(pubname::text))).*\n" " FROM pg_publication\n" " WHERE pubname IN ( %s )) AS gpt\n" " ON gpt.relid = c.oid\n", + all_partitions ? "true" : "false", pub_names.data); pfree(pub_names.data); @@ -2290,6 +2383,140 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char * return oldpublist; } +/* + * Check and raise an ERROR if table schema copy is requested but pg_dump command is + * neither not found nor executable. + */ +static void +check_pg_dump_available(void) +{ + char path[MAXPGPATH]; + + if (find_my_exec("pg_dump", path) < 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find \"%s\" in executable path", "pg_dump"))); +} + +/* + * Fetch the table schema of the given table and restore into the subscriber. + * + * XXX currently it doesn't schema (IOW namespace) so the schema has to already + * be present on the subscriber, is that okay? or do we want to create it? But + * if we want to do that, we need to consider the case where the schema has + * non-default options. + */ +static void +synchronize_table_schema(char *conninfo, List *tables, char *snapshot_name) +{ + StringInfoData command; + FILE *handle; + char full_path[MAXPGPATH]; + ListCell *lc; + int ret; + + /* + * We've checked the availability of pg_dump at a CREATE SUBSCRIPTION or a + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION time, but check it again in + * case the pg_dump command becomes unavailable. + */ + if (find_my_exec("pg_dump", full_path) < 0) + ereport(ERROR, + (errmsg("could not find \"%s\" in executable path", "pg_dump"))); + + /* + * Construct pg_dump command. We dump only the table definition without + * any table-dependent objects such as indexes and triggers. Also, we specify + * the snapshot that has been exported while creating the replication slot for + * tablesync. The table name in --table option must be quoted to avoid the + * table name from being interpreted as a regular expression. + * + * Since the publisher could be a different major version PostgreSQL, we + * use --quote-all-identifiers option. + * + * The outputs are redirected to this backend's input and executed via SPI. + * + * XXX: who should be the owner of the new table? + */ + initStringInfo(&command); + appendStringInfo(&command, + "%s --format=p --schema-only --username %s --dbname \"%s\" --no-table-dependents --quote-all-identifiers --file -", + full_path, GetUserNameFromId(GetUserId(), true), + conninfo); + + if (snapshot_name) + appendStringInfo(&command, " --snapshot=%s", snapshot_name); + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + + /* + * Error if the table is already present on the subscriber. Please note + * that concurrent DDLs can create the table as we don't acquire any lock + * on the table. + * + * XXX: do we want to overwrite it (or optionally)? + */ + if (OidIsValid(RangeVarGetRelid(rv, AccessShareLock, true))) + ereport(ERROR, + (errmsg("existing table %s cannot synchronize table schema", + rv->relname))); + + appendStringInfo(&command, " --table '%s'", + quote_qualified_identifier(rv->schemaname, rv->relname)); + } + + /* Open SPI context. */ + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + PG_TRY(); + { + char buf[1024]; + StringInfoData querybuf; + + elog(DEBUG3, "executing pg_dump command \"%s\"", command.data); + + handle = OpenPipeStream(command.data, "r"); + if (handle == NULL) + elog(ERROR, "could not execute command \"%s\": %m", command.data); + + initStringInfo(&querybuf); + + /* + * Gathering all commands into one string. Since we dump only schema of the + * particular table, the command would not be long. + */ + while (fgets(buf, sizeof(buf), handle)) + appendStringInfoString(&querybuf, buf); + + /* + * If the pg_dump command failed, there is no output in the result handle + * and the pg_dump's error messages are written into the server log. + */ + if (querybuf.len == 0) + elog(ERROR, "failed to execute command \"%s\"", command.data); + + elog(DEBUG5, "executing dumped DDLs %s", querybuf.data); + ret = SPI_exec(querybuf.data, 0); + if (ret != SPI_OK_UTILITY && ret != SPI_OK_SELECT) + elog(ERROR, "SPI_exec returned %d: %s", ret, querybuf.data); + } + PG_FINALLY(); + { + ClosePipeStream(handle); + } + PG_END_TRY(); + + /* Close SPI context */ + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + /* make the newly created table visible to us */ + CommandCounterIncrement(); +} + /* * Extract the streaming mode value from a DefElem. This is like * defGetBoolean() but also accepts the special value of "parallel". diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6d461654ab..d870a9f69c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -846,7 +846,7 @@ fetch_remote_table_info(char *nspname, char *relname, " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)" " THEN NULL ELSE gpt.attrs END)" " FROM pg_publication p," - " LATERAL pg_get_publication_tables(p.pubname) gpt," + " LATERAL pg_get_publication_tables(false, p.pubname) gpt," " pg_class c" " WHERE gpt.relid = %u AND c.oid = gpt.relid" " AND p.pubname IN ( %s )", @@ -1028,7 +1028,7 @@ fetch_remote_table_info(char *nspname, char *relname, appendStringInfo(&cmd, "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)" " FROM pg_publication p," - " LATERAL pg_get_publication_tables(p.pubname) gpt" + " LATERAL pg_get_publication_tables(false, p.pubname) gpt" " WHERE gpt.relid = %u" " AND p.pubname IN ( %s )", lrel->remoteid, diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 6996073989..ec55a22fe1 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11823,10 +11823,10 @@ descr => 'get information of the tables that are part of the specified publications', proname => 'pg_get_publication_tables', prorows => '1000', provariadic => 'text', proretset => 't', provolatile => 's', - prorettype => 'record', proargtypes => '_text', - proallargtypes => '{_text,oid,oid,int2vector,pg_node_tree}', - proargmodes => '{v,o,o,o,o}', - proargnames => '{pubname,pubid,relid,attrs,qual}', + prorettype => 'record', proargtypes => 'bool _text', + proallargtypes => '{bool,_text,oid,oid,int2vector,pg_node_tree}', + proargmodes => '{i,v,o,o,o,o}', + proargnames => '{include_all_partitions,pubname,pubid,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 6ecaa2a01e..dd3c27d319 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -132,7 +132,7 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetAllTablesPublicationRelations(bool pubviaroot, bool include_all_partitions); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); extern List *GetSchemaPublicationRelations(Oid schemaid, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 7fd81e6a7d..fd805595bd 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1449,7 +1449,7 @@ pg_publication_tables| SELECT p.pubname, WHERE ((a.attrelid = gpt.relid) AND (a.attnum = ANY ((gpt.attrs)::smallint[])))) AS attnames, pg_get_expr(gpt.qual, gpt.relid) AS rowfilter FROM pg_publication p, - LATERAL pg_get_publication_tables(VARIADIC ARRAY[(p.pubname)::text]) gpt(pubid, relid, attrs, qual), + LATERAL pg_get_publication_tables(false, VARIADIC ARRAY[(p.pubname)::text]) gpt(pubid, relid, attrs, qual), (pg_class c JOIN pg_namespace n ON ((n.oid = c.relnamespace))) WHERE (c.oid = gpt.relid); -- 2.31.1