From fe55e489e9cfa30149d9886072d3fd420138093c Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 11 May 2022 02:09:50 -0400 Subject: [PATCH v2 2/2] Support DDL replication. To support DDL replication, we use event trigger and DDL deparsing facilities. While creating a publication, we register a command end trigger that deparses the DDL as a JSON blob, and WAL logs it. The event trigger is automatically removed at the time of drop publication. The WALSender decodes the WAL and sends it downstream similar to other DML commands. The subscriber then converts JSON back to the DDL command string and executes it. In the subscriber, we also add the newly added rel to pg_subscription_rel so that the DML changes on the new table can be replicated without having to manually run "ALTER SUBSCRIPTION ... REFRESH PUBLICATION". This is a POC patch to show how using event triggers and DDL deparsing facilities we can implement DDL replication. So, the implementation is restricted to a simple CREATE TABLE command. --- src/backend/access/rmgrdesc/Makefile | 1 + src/backend/access/rmgrdesc/logicalddlmsgdesc.c | 51 ++++ src/backend/access/transam/rmgr.c | 1 + src/backend/commands/event_trigger.c | 44 +++ src/backend/commands/publicationcmds.c | 50 +++ src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/ddlmessage.c | 83 +++++ src/backend/replication/logical/decode.c | 40 +++ src/backend/replication/logical/logical.c | 88 ++++++ src/backend/replication/logical/proto.c | 52 +++- src/backend/replication/logical/reorderbuffer.c | 123 +++++++- src/backend/replication/logical/worker.c | 232 ++++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 39 ++- src/backend/utils/adt/ri_triggers.c | 2 + src/backend/utils/cache/relcache.c | 1 + src/bin/pg_dump/pg_dump.c | 27 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_waldump/rmgrdesc.c | 1 + src/bin/psql/describe.c | 17 +- src/include/access/rmgrlist.h | 1 + src/include/catalog/pg_proc.dat | 3 + src/include/catalog/pg_publication.h | 4 + src/include/replication/ddlmessage.h | 41 +++ src/include/replication/decode.h | 1 + src/include/replication/logicalproto.h | 7 +- src/include/replication/output_plugin.h | 23 ++ src/include/replication/reorderbuffer.h | 30 ++ src/test/regress/expected/psql.out | 6 +- src/test/regress/expected/publication.out | 388 ++++++++++++------------ 29 files changed, 1149 insertions(+), 209 deletions(-) create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c create mode 100644 src/backend/replication/logical/ddlmessage.c create mode 100644 src/include/replication/ddlmessage.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72f..b8e29e8 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -19,6 +19,7 @@ OBJS = \ hashdesc.o \ heapdesc.o \ logicalmsgdesc.o \ + logicalddlmsgdesc.o \ mxactdesc.o \ nbtdesc.o \ relmapdesc.o \ diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c new file mode 100644 index 0000000..4dc7591 --- /dev/null +++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c @@ -0,0 +1,51 @@ +/*------------------------------------------------------------------------- + * + * logicalddlmsgdesc.c + * rmgr descriptor routines for replication/logical/ddlmessage.c + * + * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/logicalddlmsgdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "replication/ddlmessage.h" + +void +logicalddlmsg_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_LOGICAL_DDL_MESSAGE) + { + xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec; + char *prefix = xlrec->message; + char *message = xlrec->message + xlrec->prefix_size; + char *sep = ""; + + Assert(prefix[xlrec->prefix_size] != '\0'); + + appendStringInfo(buf, "prefix \"%s\"; payload (%zu bytes): ", + prefix, xlrec->message_size); + /* Write message payload as a series of hex bytes */ + for (int cnt = 0; cnt < xlrec->message_size; cnt++) + { + appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]); + sep = " "; + } + } +} + +const char * +logicalddlmsg_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE) + return "DDL MESSAGE"; + + return NULL; +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index e1d6ebb..b97bbe9 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -27,6 +27,7 @@ #include "fmgr.h" #include "funcapi.h" #include "miscadmin.h" +#include "replication/ddlmessage.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 4642527..8446a00 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -37,8 +37,11 @@ #include "miscadmin.h" #include "parser/parse_func.h" #include "pgstat.h" +#include "replication/ddlmessage.h" +#include "replication/message.h" #include "tcop/deparse_utility.h" #include "tcop/utility.h" +#include "tcop/ddl_deparse.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/evtcache.h" @@ -2180,3 +2183,44 @@ stringify_adefprivs_objtype(ObjectType objtype) return "???"; /* keep compiler quiet */ } + +/* + * publication_ddl_deparse + * + * Deparse the ddl command and log it. + */ +Datum +publication_ddl_deparse(PG_FUNCTION_ARGS) +{ + ListCell *lc; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + elog(ERROR, "not fired by event trigger manager"); + + foreach(lc, currentEventTriggerState->commandList) + { + CollectedCommand *cmd = lfirst(lc); + char *json_string; + static int i; + + if (cmd->type == SCT_Simple && + !OidIsValid(cmd->d.simple.address.objectId)) + continue; + + /* + * Fixme: This is just for the development purpose and needs to be + * removed later. + */ + elog(LOG, "DDL command no. %d", ++i); + + /* Deparse the DDL command and WAL log it to allow decoding of the same. */ + json_string = deparse_utility_command(cmd); + + if (json_string == NULL) + continue; + + LogLogicalDDLMessage("deparse", json_string, strlen(json_string) + 1); + } + + return PointerGetDatum(NULL); +} diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 6df0e66..f41c22a 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -37,10 +37,12 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "parser/parse_clause.h" #include "parser/parse_collate.h" #include "parser/parse_relation.h" +#include "parser/parser.h" #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" @@ -95,6 +97,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = true; pubactions->pubdelete = true; pubactions->pubtruncate = true; + pubactions->pubcreate = false; *publish_via_partition_root = false; /* Parse options */ @@ -141,6 +144,8 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) pubactions->pubtruncate = true; + else if (strcmp(publish_opt, "create") == 0) + pubactions->pubcreate = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -755,6 +760,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) { Relation rel; ObjectAddress myself; + ObjectAddress referenced; Oid puboid; bool nulls[Natts_pg_publication]; Datum values[Natts_pg_publication]; @@ -819,6 +825,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); + values[Anum_pg_publication_pubcreate - 1] = + BoolGetDatum(pubactions.pubcreate); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); @@ -881,6 +889,45 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) } } + /* + * Create an event trigger to allow logging of DDL statements. + * + * TODO: We need to find a better syntax to allow replication of DDL + * statements. + * + * XXX: This code is just to show the replication of CREATE TABLE works. + * We need to enhance this once the approach for DDL replication is + * finalized. + */ + if (stmt->for_all_tables && pubactions.pubcreate) + { + CreateEventTrigStmt *ddl_trigg; + Node *args = NULL; + List *tags = NIL; + Oid event_trig_id; + char trigger_name[NAMEDATALEN]; + + ddl_trigg = makeNode(CreateEventTrigStmt); + + snprintf(trigger_name, sizeof(trigger_name), "pg_deparse_trig_%u", + puboid); + ddl_trigg->trigname = pstrdup(trigger_name); + ddl_trigg->eventname = "ddl_command_end"; + ddl_trigg->funcname = SystemFuncName("publication_ddl_deparse"); + + args = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_CREATE_TABLE))); + tags = list_make1(args); + ddl_trigg->whenclause = list_make1(makeDefElem("tag", (Node *) tags, -1)); + event_trig_id = CreateEventTrigger(ddl_trigg); + + /* + * Register the event trigger as internally dependent on the + * publication. + */ + ObjectAddressSet(referenced, EventTriggerRelationId, event_trig_id); + recordDependencyOn(&referenced, &myself, DEPENDENCY_INTERNAL); + } + table_close(rel, RowExclusiveLock); InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); @@ -1021,6 +1068,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); replaces[Anum_pg_publication_pubtruncate - 1] = true; + + values[Anum_pg_publication_pubcreate - 1] = BoolGetDatum(pubactions.pubcreate); + replaces[Anum_pg_publication_pubcreate - 1] = true; } if (publish_via_partition_root_given) diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fde..f3eeb67 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ decode.o \ + ddlmessage.o\ launcher.o \ logical.o \ logicalfuncs.o \ diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c new file mode 100644 index 0000000..8e0be0f --- /dev/null +++ b/src/backend/replication/logical/ddlmessage.c @@ -0,0 +1,83 @@ +/*------------------------------------------------------------------------- + * + * ddlmessage.c + * Logical DDL messages. + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/ddlmessage.c + * + * NOTES + * + * Logical DDL messages allow XLOG logging of DDL command strings that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * Unlike generic logical messages, these DDL messages have only transactional + * mode.Note by default DDLs in PostgreSQL are transactional. + * + * These messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xloginsert.h" +#include "catalog/namespace.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "replication/logical.h" +#include "replication/ddlmessage.h" +#include "utils/memutils.h" + +/* + * Write logical decoding DDL message into XLog. + */ +XLogRecPtr +LogLogicalDDLMessage(const char *prefix, const char *message, size_t size) +{ + xl_logical_ddl_message xlrec; + + /* + * Ensure we have a valid transaction id. + */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + xlrec.dbId = MyDatabaseId; + /* trailing zero is critical; see logicalddlmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage); + XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + XLogRegisterData(unconstify(char *, message), size); + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding ddl messages. + */ +void +logicalddlmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6303647..14dc9c4 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -36,6 +36,7 @@ #include "access/xlogutils.h" #include "catalog/pg_control.h" #include "replication/decode.h" +#include "replication/ddlmessage.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/origin.h" @@ -604,6 +605,45 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } /* + * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer(). + */ +void +logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + RepOriginId origin_id = XLogRecGetOrigin(r); + xl_logical_ddl_message *message; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding ddl messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + message = (xl_logical_ddl_message *) XLogRecGetData(r); + + if (message->dbId != ctx->slot->data.database || + FilterByOrigin(ctx, origin_id)) + return; + + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr, + message->message, /* first part of message is prefix */ + message->message_size, + message->message + message->prefix_size); +} + +/* * Consolidated commit record handling between the different form of commit * records. * diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 788769d..d01c143 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,6 +73,9 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, const char *prefix, + Size message_size, const char *message); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -90,6 +93,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, + Size message_size, const char *message); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -218,6 +225,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->ddlmessage = ddlmessage_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -234,6 +242,7 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_ddlmessage_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* @@ -251,6 +260,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_ddlmessage = stream_ddlmessage_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -1206,6 +1216,43 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, Size message_size, + const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.ddlmessage_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "ddlmessage"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, prefix, message_size, + message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) { @@ -1511,6 +1558,47 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, Size message_size, + const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_ddlmessage_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_ddlmessage"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, prefix, message_size, + message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index ff8513e..eaec031 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -640,8 +640,8 @@ logicalrep_read_truncate(StringInfo in, */ void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, Size sz, - const char *message) + bool transactional, const char *prefix, + Size sz, const char *message) { uint8 flags = 0; @@ -663,6 +663,52 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, } /* + * Read DDL MESSAGE from stream + */ +char * +logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, + const char **prefix, + Size *sz) +{ + uint8 flags; + char *msg; + + //TODO double check when do we need to get TransactionId. + + flags = pq_getmsgint(in, 1); + if (flags != 0) + elog(ERROR, "unrecognized flags %u in ddl message", flags); + *lsn = pq_getmsgint64(in); + *prefix = pq_getmsgstring(in); + *sz = pq_getmsgint(in, 4); + msg = (char *) pq_getmsgbytes(in, *sz); + + return msg; +} + +/* + * Write DDL MESSAGE to stream + */ +void +logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, + const char *prefix, Size sz, const char *message) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + pq_sendstring(out, prefix); + pq_sendint32(out, sz); + pq_sendbytes(out, message, sz); +} + +/* * Write relation description to the output stream. */ void @@ -1218,6 +1264,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "TYPE"; case LOGICAL_REP_MSG_MESSAGE: return "MESSAGE"; + case LOGICAL_REP_MSG_DDLMESSAGE: + return "DDL"; case LOGICAL_REP_MSG_BEGIN_PREPARE: return "BEGIN PREPARE"; case LOGICAL_REP_MSG_PREPARE: diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 6887dc2..1c04296 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -512,6 +512,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, pfree(change->data.msg.message); change->data.msg.message = NULL; break; + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + if (change->data.ddlmsg.prefix != NULL) + pfree(change->data.ddlmsg.prefix); + change->data.ddlmsg.prefix = NULL; + if (change->data.ddlmsg.message != NULL) + pfree(change->data.ddlmsg.message); + change->data.ddlmsg.message = NULL; + break; case REORDER_BUFFER_CHANGE_INVALIDATION: if (change->data.inval.invalidations) pfree(change->data.inval.invalidations); @@ -867,6 +875,33 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } /* + * A transactional DDL message is queued to be processed upon commit. + */ +void +ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, const char *prefix, + Size message_size, const char *message) +{ + MemoryContext oldcontext; + ReorderBufferChange *change; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE; + change->data.ddlmsg.prefix = pstrdup(prefix); + change->data.ddlmsg.message_size = message_size; + change->data.ddlmsg.message = palloc(message_size); + memcpy(change->data.ddlmsg.message, message, message_size); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); +} + +/* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer * @@ -1958,6 +1993,25 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* + * Helper function for ReorderBufferProcessTXN for applying the DDL message. + */ +static inline void +ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_ddlmessage(rb, txn, change->lsn, + change->data.ddlmsg.prefix, + change->data.ddlmsg.message_size, + change->data.ddlmsg.message); + else + rb->ddlmessage(rb, txn, change->lsn, + change->data.ddlmsg.prefix, + change->data.ddlmsg.message_size, + change->data.ddlmsg.message); +} + +/* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. */ @@ -2335,6 +2389,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferApplyMessage(rb, txn, change, streaming); break; + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + ReorderBufferApplyDDLMessage(rb, txn, change, streaming); + break; + case REORDER_BUFFER_CHANGE_INVALIDATION: /* Execute the invalidation messages locally */ ReorderBufferExecuteInvalidations( @@ -3711,6 +3769,36 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + char *data; + Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1; + + sz += prefix_size + change->data.ddlmsg.message_size + + sizeof(Size) + sizeof(Size); + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* write the prefix including the size */ + memcpy(data, &prefix_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.prefix, + prefix_size); + data += prefix_size; + + /* write the message including the size */ + memcpy(data, &change->data.ddlmsg.message_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.message, + change->data.ddlmsg.message_size); + data += change->data.ddlmsg.message_size; + + break; + } case REORDER_BUFFER_CHANGE_INVALIDATION: { char *data; @@ -4025,6 +4113,15 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1; + + sz += prefix_size + change->data.ddlmsg.message_size + + sizeof(Size) + sizeof(Size); + + break; + } case REORDER_BUFFER_CHANGE_INVALIDATION: { sz += sizeof(SharedInvalidationMessage) * @@ -4283,8 +4380,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* read prefix */ memcpy(&prefix_size, data, sizeof(Size)); data += sizeof(Size); - change->data.msg.prefix = MemoryContextAlloc(rb->context, - prefix_size); + change->data.msg.prefix = MemoryContextAlloc(rb->context, prefix_size); memcpy(change->data.msg.prefix, data, prefix_size); Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); data += prefix_size; @@ -4300,6 +4396,29 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + Size prefix_size; + + /* read prefix */ + memcpy(&prefix_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size); + memcpy(change->data.ddlmsg.prefix, data, prefix_size); + Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0'); + data += prefix_size; + + /* read the message */ + memcpy(&change->data.msg.message_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.message = MemoryContextAlloc(rb->context, + change->data.msg.message_size); + memcpy(change->data.msg.message, data, + change->data.msg.message_size); + data += change->data.msg.message_size; + + break; + } case REORDER_BUFFER_CHANGE_INVALIDATION: { Size inval_size = sizeof(SharedInvalidationMessage) * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7da7823..fc5e42cf 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -156,6 +156,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/optimizer.h" +#include "parser/analyze.h" #include "pgstat.h" #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" @@ -179,7 +180,10 @@ #include "storage/lmgr.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "tcop/ddl_deparse.h" +#include "tcop/pquery.h" #include "tcop/tcopprot.h" +#include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -2445,6 +2449,230 @@ apply_handle_truncate(StringInfo s) end_replication_step(); } +/* Remove the data population from the command */ +static void +preprocess_create_table(RawStmt *command) +{ + CommandTag commandTag; + + commandTag = CreateCommandTag((Node *)command); + + switch (commandTag) + { + case CMDTAG_CREATE_TABLE_AS: + case CMDTAG_SELECT_INTO: + { + CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt; + if (castmt->objtype == OBJECT_TABLE) + { + /* + * Force skipping data population to avoid data + * inconsistency. Data should be replicated from the + * publisher instead. + */ + castmt->into->skipData = true; + } + } + break; + case CMDTAG_SELECT: + { + SelectStmt *sstmt = (SelectStmt *) command->stmt; + + if (sstmt->intoClause != NULL) + { + /* + * Force skipping data population to avoid data + * inconsistency. Data should be replicated from the + * publisher instead. + */ + sstmt->intoClause->skipData = true; + } + } + break; + default: + break; + } +} + +/* + * Handle CREATE TABLE command + * + * Call AddSubscriptionRelState for CREATE TABEL command to set the relstate to + * SUBREL_STATE_READY so DML changes on this new table can be replicated without + * having to manually run "alter subscription ... refresh publication" + */ +static void +handle_create_table(RawStmt *command) +{ + CommandTag commandTag; + RangeVar *rv = NULL; + Oid relid; + Oid relnamespace = InvalidOid; + char *schemaname = NULL; + char *relname = NULL; + + commandTag = CreateCommandTag((Node *) command); + + switch (commandTag) + { + case CMDTAG_CREATE_TABLE: + { + CreateStmt *cstmt = (CreateStmt *) command->stmt; + rv = cstmt->relation; + } + break; + default: + break; + } + + if (!rv) + return; + + schemaname = rv->schemaname; + relname = rv->relname; + + if (schemaname != NULL) + relnamespace = get_namespace_oid(schemaname, false); + + if (relnamespace != InvalidOid) + relid = get_relname_relid(relname, relnamespace); + else + relid = RelnameGetRelid(relname); + + if (relid != InvalidOid) + { + AddSubscriptionRelState(MySubscription->oid, relid, + SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s\" added to subscription \"%s\"", + relname, MySubscription->name))); + } +} + +static void +apply_handle_ddl(StringInfo s) +{ + XLogRecPtr lsn; + const char *prefix = NULL; + char *message = NULL; + char *ddl_command; + Size sz; + List *parsetree_list; + ListCell *parsetree_item; + DestReceiver *receiver; + MemoryContext oldcontext; + const char *save_debug_query_string = debug_query_string; + + message = logicalrep_read_ddlmessage(s, &lsn, &prefix, &sz); + + /* Make sure we are in a transaction command */ + begin_replication_step(); + + ddl_command = ddl_deparse_json_to_string(message); + debug_query_string = ddl_command; + + /* DestNone for logical replication */ + receiver = CreateDestReceiver(DestNone); + parsetree_list = pg_parse_query(ddl_command); + + foreach(parsetree_item, parsetree_list) + { + List *plantree_list; + List *querytree_list; + RawStmt *command = (RawStmt *) lfirst(parsetree_item); + CommandTag commandTag; + MemoryContext per_parsetree_context = NULL; + Portal portal; + bool snapshot_set = false; + + commandTag = CreateCommandTag((Node *) command); + + /* If we got a cancel signal in parsing or prior command, quit */ + CHECK_FOR_INTERRUPTS(); + + /* Remove data population from the command */ + preprocess_create_table(command); + + /* + * Set up a snapshot if parse analysis/planning will need one. + */ + if (analyze_requires_snapshot(command)) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + + /* + * We do the work for each parsetree in a short-lived context, to + * limit the memory used when there are many commands in the string. + */ + per_parsetree_context = + AllocSetContextCreate(CurrentMemoryContext, + "execute_sql_string per-statement context", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(per_parsetree_context); + + querytree_list = pg_analyze_and_rewrite_fixedparams(command, + ddl_command, + NULL, 0, NULL); + + plantree_list = pg_plan_queries(querytree_list, ddl_command, 0, NULL); + + /* Done with the snapshot used for parsing/planning */ + if (snapshot_set) + PopActiveSnapshot(); + + portal = CreatePortal("logical replication", true, true); + + /* + * We don't have to copy anything into the portal, because everything + * we are passing here is in ApplyMessageContext or the + * per_parsetree_context, and so will outlive the portal anyway. + */ + PortalDefineQuery(portal, + NULL, + ddl_command, + commandTag, + plantree_list, + NULL); + + /* + * Start the portal. No parameters here. + */ + PortalStart(portal, NULL, 0, InvalidSnapshot); + + /* + * Switch back to transaction context for execution. + */ + MemoryContextSwitchTo(oldcontext); + + (void) PortalRun(portal, + FETCH_ALL, + true, + true, + receiver, + receiver, + NULL); + + PortalDrop(portal, false); + + CommandCounterIncrement(); + + /* + * Table created by DDL replication (database level) is automatically + * added to the subscription here. + */ + handle_create_table(command); + + /* Now we may drop the per-parsetree context, if one was created. */ + MemoryContextDelete(per_parsetree_context); + } + + debug_query_string = save_debug_query_string; + end_replication_step(); +} + /* * Logical replication protocol message dispatcher. @@ -2510,6 +2738,10 @@ apply_dispatch(StringInfo s) */ break; + case LOGICAL_REP_MSG_DDLMESSAGE: + apply_handle_ddl(s); + break; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index b197bfd..5ec9c3e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -54,6 +54,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pgoutput_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + const char *prefix, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, @@ -255,6 +258,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; + cb->ddlmessage_cb = pgoutput_ddlmessage; cb->commit_cb = pgoutput_commit_txn; cb->begin_prepare_cb = pgoutput_begin_prepare_txn; @@ -271,6 +275,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; + cb->stream_ddlmessage_cb = pgoutput_ddlmessage; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ cb->stream_prepare_cb = pgoutput_stream_prepare_txn; @@ -1649,8 +1654,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, - const char *message) + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size sz, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; @@ -1689,6 +1694,36 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, Size sz, const char *message) +{ + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + TransactionId xid = InvalidTransactionId; + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_ddlmessage(ctx->out, + xid, + message_lsn, + prefix, + sz, + message); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 01d4c22..9649c5b 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -33,6 +33,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_type.h" #include "commands/trigger.h" +#include "commands/event_trigger.h" #include "executor/executor.h" #include "executor/spi.h" #include "lib/ilist.h" @@ -40,6 +41,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_relation.h" #include "storage/bufmgr.h" +#include "tcop/ddl_deparse.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 43f14c2..b850e7c 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5634,6 +5634,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->pubactions.pubupdate |= pubform->pubupdate; pubdesc->pubactions.pubdelete |= pubform->pubdelete; pubdesc->pubactions.pubtruncate |= pubform->pubtruncate; + pubdesc->pubactions.pubcreate |= pubform->pubcreate; /* * Check if all columns referenced in the filter expression are part of diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 786d592..b4ee0f9 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3867,6 +3867,7 @@ getPublications(Archive *fout, int *numPublications) int i_pubupdate; int i_pubdelete; int i_pubtruncate; + int i_pubcreate; int i_pubviaroot; int i, ntups; @@ -3882,23 +3883,29 @@ getPublications(Archive *fout, int *numPublications) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 130000) + if (fout->remoteVersion >= 150000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubcreate, p.pubviaroot " + "FROM pg_publication p"); + else if (fout->remoteVersion >= 130000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "p.pubowner, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false as p.pubcreate, p.pubviaroot " "FROM pg_publication p"); else if (fout->remoteVersion >= 110000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false as p.pubcreate, false AS pubviaroot " "FROM pg_publication p"); else appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false as p.pubcreate, false AS pubviaroot " "FROM pg_publication p"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -3914,6 +3921,7 @@ getPublications(Archive *fout, int *numPublications) i_pubupdate = PQfnumber(res, "pubupdate"); i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); + i_pubcreate = PQfnumber(res, "pubcreate"); i_pubviaroot = PQfnumber(res, "pubviaroot"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3937,6 +3945,8 @@ getPublications(Archive *fout, int *numPublications) (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); pubinfo[i].pubtruncate = (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); + pubinfo[i].pubcreate = + (strcmp(PQgetvalue(res, i, i_pubcreate), "t") == 0); pubinfo[i].pubviaroot = (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); @@ -4016,6 +4026,15 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) first = false; } + if (pubinfo->pubcreate) + { + if (!first) + appendPQExpBufferStr(query, ", "); + + appendPQExpBufferStr(query, "create"); + first = false; + } + appendPQExpBufferStr(query, "'"); if (pubinfo->pubviaroot) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1d21c29..170271e 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -620,6 +620,7 @@ typedef struct _PublicationInfo bool pubdelete; bool pubtruncate; bool pubviaroot; + bool pubcreate; } PublicationInfo; /* diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17b..792f438 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -27,6 +27,7 @@ #include "commands/sequence.h" #include "commands/tablespace.h" #include "replication/message.h" +#include "replication/ddlmessage.h" #include "replication/origin.h" #include "rmgrdesc.h" #include "storage/standbydefs.h" diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 31df8b7..2814a13 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6050,7 +6050,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6085,6 +6085,10 @@ listPublications(const char *pattern) appendPQExpBuffer(&buf, ",\n pubviaroot AS \"%s\"", gettext_noop("Via root")); + if (pset.sversion >= 140000) + appendPQExpBuffer(&buf, + ",\n pubcreate AS \"%s\"", + gettext_noop("Creates")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -6172,6 +6176,7 @@ describePublications(const char *pattern) PGresult *res; bool has_pubtruncate; bool has_pubviaroot; + bool has_pubcreate; PQExpBufferData title; printTableContent cont; @@ -6188,6 +6193,7 @@ describePublications(const char *pattern) has_pubtruncate = (pset.sversion >= 110000); has_pubviaroot = (pset.sversion >= 130000); + has_pubcreate = (pset.sversion >= 150000); initPQExpBuffer(&buf); @@ -6201,6 +6207,9 @@ describePublications(const char *pattern) if (has_pubviaroot) appendPQExpBufferStr(&buf, ", pubviaroot"); + if (has_pubcreate) + appendPQExpBufferStr(&buf, + ", pubcreate"); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -6249,6 +6258,8 @@ describePublications(const char *pattern) ncols++; if (has_pubviaroot) ncols++; + if (has_pubcreate) + ncols++; initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); @@ -6263,6 +6274,8 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); if (has_pubviaroot) printTableAddHeader(&cont, gettext_noop("Via root"), true, align); + if (has_pubcreate) + printTableAddHeader(&cont, gettext_noop("Creates"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); @@ -6273,6 +6286,8 @@ describePublications(const char *pattern) printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); if (has_pubviaroot) printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); + if (has_pubcreate) + printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false); if (!puballtables) { diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 9a74721..9de3b8f 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddlmsg_decode) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index ba908b3..7f0d2ca 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11887,5 +11887,8 @@ { oid => '4643', descr => 'json to string', proname => 'ddl_deparse_expand_command', prorettype => 'text', proargtypes => 'text', prosrc => 'ddl_deparse_expand_command' }, +{ oid => '4644', descr => 'trigger for ddl command deparse', + proname => 'publication_ddl_deparse', prorettype => 'event_trigger', + proargtypes => '', prosrc => 'publication_ddl_deparse' }, ] diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 29b1856..f954907 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -54,6 +54,9 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if partition changes are published using root schema */ bool pubviaroot; + + /* true if table creations are published */ + bool pubcreate; } FormData_pg_publication; /* ---------------- @@ -72,6 +75,7 @@ typedef struct PublicationActions bool pubupdate; bool pubdelete; bool pubtruncate; + bool pubcreate; } PublicationActions; typedef struct PublicationDesc diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h new file mode 100644 index 0000000..bc95f3c --- /dev/null +++ b/src/include/replication/ddlmessage.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * ddlmessage.h + * Exports from replication/logical/ddlmessage.c + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * src/include/replication/ddlmessage.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_DDL_MESSAGE_H +#define PG_LOGICAL_DDL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" + +/* + * Generic logical decoding DDL message wal record. + */ +typedef struct xl_logical_ddl_message +{ + Oid dbId; /* database Oid emitted from */ + Size prefix_size; /* length of prefix */ + Size message_size; /* size of the message */ + /* + * payload, including null-terminated prefix of length prefix_size + */ + char message[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_ddl_message; + +#define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message)) + +extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, const char *ddl_message, size_t size); + +/* RMGR API*/ +#define XLOG_LOGICAL_DDL_MESSAGE 0x00 +void logicalddlmsg_redo(XLogReaderState *record); +void logicalddlmsg_desc(StringInfo buf, XLogReaderState *record); +const char *logicalddlmsg_identify(uint8 info); + +#endif /* PG_LOGICAL_DDL_MESSAGE_H */ diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index a33c2a7..697867a 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index a771ab8..2abccb8 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_DDLMESSAGE = 'L', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -229,7 +230,11 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, - bool transactional, const char *prefix, Size sz, const char *message); + bool transactional, const char *prefix, + Size sz, const char *message); +extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn, + const char *prefix, Size sz, const char *message); +extern char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, Size *sz); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 539dc8e..fa7a8b8 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -89,6 +89,16 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, const char *message); /* + * Called for the logical decoding DDL messages. + */ +typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, + Size message_size, + const char *message); + +/* * Filter changes by origin. */ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, @@ -200,6 +210,17 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx const char *message); /* + * Callback for streaming logical decoding DDL messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, + Size message_size, + const char *message); + +/* * Callback for streaming truncates from in-progress transactions. */ typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, @@ -219,6 +240,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeDDLMessageCB ddlmessage_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -237,6 +259,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamDDLMessageCB stream_ddlmessage_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index f12e75d..a6a9133 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -56,6 +56,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_DDLMESSAGE, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, @@ -130,6 +131,14 @@ typedef struct ReorderBufferChange char *message; } msg; + /* DDL Message. */ + struct + { + char *prefix; + Size message_size; + char *message; + } ddlmsg; + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; @@ -430,6 +439,14 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* DDL message callback signature */ +typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, + Size sz, + const char *message); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -496,6 +513,15 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream DDL message callback signature */ +typedef void (*ReorderBufferStreamDDLMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, + Size sz, + const char *message); + /* stream truncate callback signature */ typedef void (*ReorderBufferStreamTruncateCB) ( ReorderBuffer *rb, @@ -541,6 +567,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferDDLMessageCB ddlmessage; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -560,6 +587,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamDDLMessageCB stream_ddlmessage; ReorderBufferStreamTruncateCB stream_truncate; /* @@ -635,6 +663,8 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + const char *prefix, Size message_size, const char *message); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out index 1c5b5d2..ed546c6 100644 --- a/src/test/regress/expected/psql.out +++ b/src/test/regress/expected/psql.out @@ -5969,9 +5969,9 @@ List of schemas (0 rows) \dRp "no.such.publication" - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -------+-------+------------+---------+---------+---------+-----------+---------- + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +------+-------+------------+---------+---------+---------+-----------+----------+--------- (0 rows) \dRs "no.such.subscription" diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 398c0f3..ff56ece 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -30,20 +30,20 @@ ERROR: conflicting or redundant options LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi... ^ \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | f + testpub_default | regress_publication_user | f | f | t | f | f | f | f (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | f + testpub_default | regress_publication_user | f | t | t | t | f | f | f (2 rows) --- adding tables @@ -87,10 +87,10 @@ RESET client_min_messages; -- should be able to add schema to 'FOR TABLE' publication ALTER PUBLICATION testpub_fortable ADD ALL TABLES IN SCHEMA pub_test; \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "public.testpub_tbl1" Tables from schemas: @@ -99,20 +99,20 @@ Tables from schemas: -- should be able to drop schema from 'FOR TABLE' publication ALTER PUBLICATION testpub_fortable DROP ALL TABLES IN SCHEMA pub_test; \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "public.testpub_tbl1" -- should be able to set schema to 'FOR TABLE' publication ALTER PUBLICATION testpub_fortable SET ALL TABLES IN SCHEMA pub_test; \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test" @@ -134,10 +134,10 @@ ERROR: relation "testpub_nopk" is not part of the publication -- should be able to set table to schema publication ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk; \dRp+ testpub_forschema - Publication testpub_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "pub_test.testpub_nopk" @@ -159,10 +159,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | t | t | t | f | f | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | t | t | t | f | f | f | f (1 row) DROP TABLE testpub_tbl2; @@ -174,19 +174,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; RESET client_min_messages; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "public.testpub_tbl3" @@ -207,10 +207,10 @@ UPDATE testpub_parted1 SET a = 1; -- only parent is listed as being in publication, not the partition ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "public.testpub_parted" @@ -223,10 +223,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; UPDATE testpub_parted1 SET a = 1; ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | t + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | t | f Tables: "public.testpub_parted" @@ -255,10 +255,10 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert'); RESET client_min_messages; \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5)) @@ -271,10 +271,10 @@ Tables: ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5)) @@ -290,10 +290,10 @@ Publications: ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000)) @@ -301,10 +301,10 @@ Tables: -- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); \dRp+ testpub5 - Publication testpub5 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500)) @@ -337,10 +337,10 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert'); RESET client_min_messages; \dRp+ testpub_syntax1 - Publication testpub_syntax1 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub_syntax1 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "public.testpub_rf_tbl3" WHERE (e < 999) @@ -350,10 +350,10 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert'); RESET client_min_messages; \dRp+ testpub_syntax2 - Publication testpub_syntax2 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | f | f + Publication testpub_syntax2 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | f | f | f | f | f Tables: "public.testpub_rf_tbl1" "testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999) @@ -676,10 +676,10 @@ CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); RESET client_min_messages; ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok \dRp+ testpub_table_ins - Publication testpub_table_ins - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | t | f + Publication testpub_table_ins + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | f | f | t | f | f Tables: "public.testpub_tbl5" (a) @@ -821,10 +821,10 @@ CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c)); ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey; ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1); \dRp+ testpub_both_filters - Publication testpub_both_filters - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_both_filters + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1) @@ -1029,10 +1029,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -1070,10 +1070,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | f | f | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -1151,10 +1151,10 @@ REVOKE CREATE ON DATABASE regression FROM regress_publication_user2; DROP TABLE testpub_parted; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | f | f | f (1 row) -- fail - must be owner of publication @@ -1164,20 +1164,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root --------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpub_foo | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +-------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpub_foo | regress_publication_user | f | t | t | t | f | f | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ------------------+---------------------------+------------+---------+---------+---------+-----------+---------- - testpub_default | regress_publication_user2 | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpub_default | regress_publication_user2 | f | t | t | t | f | f | f (1 row) -- adding schemas and tables @@ -1193,19 +1193,19 @@ CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub1_forschema FOR ALL TABLES IN SCHEMA pub_test1; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" CREATE PUBLICATION testpub2_forschema FOR ALL TABLES IN SCHEMA pub_test1, pub_test2, pub_test3; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1219,44 +1219,44 @@ CREATE PUBLICATION testpub6_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA", CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"; RESET client_min_messages; \dRp+ testpub3_forschema - Publication testpub3_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "public" \dRp+ testpub4_forschema - Publication testpub4_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub4_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "CURRENT_SCHEMA" \dRp+ testpub5_forschema - Publication testpub5_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub5_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "CURRENT_SCHEMA" "public" \dRp+ testpub6_forschema - Publication testpub6_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub6_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "CURRENT_SCHEMA" "public" \dRp+ testpub_fortable - Publication testpub_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "CURRENT_SCHEMA.CURRENT_SCHEMA" @@ -1290,10 +1290,10 @@ ERROR: schema "testpub_view" does not exist -- dropping the schema should reflect the change in publication DROP SCHEMA pub_test3; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1301,20 +1301,20 @@ Tables from schemas: -- renaming the schema should reflect the change in publication ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1_renamed" "pub_test2" ALTER SCHEMA pub_test1_renamed RENAME to pub_test1; \dRp+ testpub2_forschema - Publication testpub2_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1322,10 +1322,10 @@ Tables from schemas: -- alter publication add schema ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test2; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1334,10 +1334,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA non_existent_schema; ERROR: schema "non_existent_schema" does not exist \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1346,10 +1346,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test1; ERROR: schema "pub_test1" is already member of publication "testpub1_forschema" \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1357,10 +1357,10 @@ Tables from schemas: -- alter publication drop schema ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" @@ -1368,10 +1368,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2; ERROR: tables from schema "pub_test2" are not part of the publication \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" @@ -1379,29 +1379,29 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA non_existent_schema; ERROR: schema "non_existent_schema" does not exist \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" -- drop all schemas ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f (1 row) -- alter publication set multiple schema ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test2; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1410,10 +1410,10 @@ Tables from schemas: ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schema; ERROR: schema "non_existent_schema" does not exist \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" "pub_test2" @@ -1422,10 +1422,10 @@ Tables from schemas: -- removing the duplicate schemas ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; \dRp+ testpub1_forschema - Publication testpub1_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" @@ -1504,18 +1504,18 @@ SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub3_forschema; RESET client_min_messages; \dRp+ testpub3_forschema - Publication testpub3_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f (1 row) ALTER PUBLICATION testpub3_forschema SET ALL TABLES IN SCHEMA pub_test1; \dRp+ testpub3_forschema - Publication testpub3_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables from schemas: "pub_test1" @@ -1525,20 +1525,20 @@ CREATE PUBLICATION testpub_forschema_fortable FOR ALL TABLES IN SCHEMA pub_test1 CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, ALL TABLES IN SCHEMA pub_test1; RESET client_min_messages; \dRp+ testpub_forschema_fortable - Publication testpub_forschema_fortable - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forschema_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "pub_test2.tbl1" Tables from schemas: "pub_test1" \dRp+ testpub_fortable_forschema - Publication testpub_fortable_forschema - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortable_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Creates +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | f Tables: "pub_test2.tbl1" Tables from schemas: -- 1.8.3.1