Re: Support logical replication of DDLs - Mailing list pgsql-hackers

From Ajin Cherian
Subject Re: Support logical replication of DDLs
Date
Msg-id CAFPTHDbaFByXyzEts+wZR5JLcCit50_S_j7qMF4tNeSvSuxZDA@mail.gmail.com
Whole thread Raw
In response to Re: Support logical replication of DDLs  (Peter Smith <smithpb2250@gmail.com>)
Responses Re: Support logical replication of DDLs  (Zheng Li <zhengli10@gmail.com>)
List pgsql-hackers
On Tue, Nov 15, 2022 at 10:57 AM Peter Smith <smithpb2250@gmail.com> wrote:
> ======
>
> src/backend/replication/logical/worker.c
>
> 32. preprocess_create_table
>
> +/* Remove the data population from the command */
> +static void
> +preprocess_create_table(RawStmt *command)
>
> The comment is too short. Needs more explanation than this.
>

fixed.

> ~~~
>
> 33. handle_create_table
>
> +/*
> + * Handle CREATE TABLE command
> + *
> + * Call AddSubscriptionRelState for CREATE TABEL command to set the relstate to
> + * SUBREL_STATE_READY so DML changes on this new table can be
> replicated without
> + * having to manually run "alter subscription ... refresh publication"
> + */
>
> Typo "TABEL"
>

fixed.

> ~~~
>
> 34. handle_create_table
>
> + switch (commandTag)
> + {
> + case CMDTAG_CREATE_TABLE:
> + {
> + CreateStmt *cstmt = (CreateStmt *) command->stmt;
> +
> + rv = cstmt->relation;
> + }
> + break;
> + default:
> + break;
> + }
> +
> + if (!rv)
> + return;
>
> This switch seems overcomplicated since the function only cares about
> CMDTAG_CREATE_TABLE.
>
> SUGGESTION
>
> if (commandTag == CMDTAG_CREATE_TABLE)
> {
> CreateStmt *cstmt = (CreateStmt *) command->stmt;
> rv = cstmt->relation;
> }
> else
> {
> return;
> }
>

fixed as suggested.

> ~
>
> 35.
>
> + if (relnamespace != InvalidOid)
> + relid = get_relname_relid(relname, relnamespace);
> + else
> + relid = RelnameGetRelid(relname);
> +
> + if (relid != InvalidOid)
> + {
>
> 35a.
> Maybe better to use the OidIsValid() macro for these places
>

fixed.


> ~
>
> 35b.
> I'm not 100% sure of this logic. Is it even *possible* for these to be
> InvalidOid -- e.g. I thought the CREATE TABLE would have failed
> already if this was the case. Maybe these checks can be changed to
> Asserts?
>

Theoretically somebody could have deleted the table in the meantime.

> ~~~
>
> 36. apply_handle_ddl
>
> +
> +static void
> +apply_handle_ddl(StringInfo s)
>
> Missing function comment
>

added comment.

> ======
>
> src/backend/replication/pgoutput/pgoutput.c
>
> 37. pgoutput_change
>
> @@ -1377,9 +1386,22 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   ReorderBufferChangeType action = change->action;
>   TupleTableSlot *old_slot = NULL;
>   TupleTableSlot *new_slot = NULL;
> + bool table_rewrite = false;
>
>   update_replication_progress(ctx, false);
>
> + /*
> + * For heap rewrites, we might need to replicate them if the rewritten
> + * table publishes rewrite ddl message. So get the actual relation here
> + * and check the pubaction later.
> + */
> + if (relation->rd_rel->relrewrite)
> + {
> + table_rewrite = true;
> + relation = RelationIdGetRelation(relation->rd_rel->relrewrite);
> + targetrel = relation;
> + }
> +
>   if (!is_publishable_relation(relation))
>   return;
>
> @@ -1413,6 +1435,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   Assert(false);
>   }
>
> + /*
> + * We don't publish table rewrite change unless we publish the rewrite ddl
> + * message.
> + */
> + if (table_rewrite && !relentry->pubactions.pubddl)
> + return;
> +
>
> Something does not seem right. Other code later in this function takes
> care to call RelationClose(relation), but in the above change, the
> logic is just returning without closing anything.
>

There is code just above this where you return if the publication action
does not match the action.

> ~~~
>
> 38. pgoutput_message
>
> @@ -1671,8 +1714,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>
>  static void
>  pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> - XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
> - const char *message)
> + XLogRecPtr message_lsn, bool transactional,
> + const char *prefix, Size sz, const char *message)
>  {
>
> This change of wrapping seems unrelated , so should not be done in this patch.
>

removed.

> ~~~
>
> 39. pgoutput_ddlmessage
>
> +static void
> +pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> + XLogRecPtr message_lsn,
> + const char *prefix, Oid relid, DeparsedCommandType cmdtype,
> + Size sz, const char *message)
>
> Missing function comment.
>

Added comment.

> ~
>
> 40.
>
> + switch (cmdtype)
>
> 40a.
> Might be tidier to have a consistent space *before* each case of this switch.
>

fixed.

> ~
>
> 40b.
> I felt it was too confusing having some of the switch case break and
> some of the switch cases return from the function -- e.g It seems
> difficult to know what conditions will execute the code that follows
> the switch. Maybe all this needs to be refactored somehow, or just
> commented on more.
>

added more comments.

> ======
>
> src/bin/pg_dump/pg_dump.c
>
> 41. getPublications
>
> - if (fout->remoteVersion >= 130000)
> + if (fout->remoteVersion >= 150000)
>
> Should be >= 160000, right?
>

fixed.

> ~
>
> 42.
>
>   else if (fout->remoteVersion >= 110000)
>   appendPQExpBufferStr(query,
>   "SELECT p.tableoid, p.oid, p.pubname, "
>   "p.pubowner, "
> - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete,
> p.pubtruncate, false AS pubviaroot "
> + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete,
> p.pubtruncate, false as p.pubddl, false AS pubviaroot "
>   "FROM pg_publication p");
>   else
>   appendPQExpBufferStr(query,
>   "SELECT p.tableoid, p.oid, p.pubname, "
>   "p.pubowner, "
> - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS
> pubtruncate, false AS pubviaroot "
> + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS
> pubtruncate, false as p.pubddl, false AS pubviaroot "
>   "FROM pg_publication p");
>
> Use uppercase 'AS' for consistency with other code.
>

fixed.

> ======
>
> src/bin/pg_dump/pg_dump.h
>
> 43. PublicationInfo
>
> @@ -620,6 +620,7 @@ typedef struct _PublicationInfo
>   bool pubdelete;
>   bool pubtruncate;
>   bool pubviaroot;
> + bool pubddl;
>  } PublicationInfo;
>
> IMO the new member should be adjacent to the other 'publish' parameter
> values like pubdelete/pubtruncate.
>

I have moved this member up.

> ======
>
> src/bin/psql/describe.c
>
> 44. listPublications
>
> + if (pset.sversion >= 140000)
> + appendPQExpBuffer(&buf,
> +   ",\n  pubddl AS \"%s\"",
> +   gettext_noop("DDLs"));
>
> 44a.
> Should that be 160000?
>

updated.

> ~
>
> 44b.
> IMO it would be better if "DLLs" column appeared adjacent to that the
> other 'publish' parameter option values. (e.g. these are not even the
> same column ordering as pg_dump).
>
> ~~~
>
> 45. describePublications
>
>   has_pubtruncate = (pset.sversion >= 110000);
>   has_pubviaroot = (pset.sversion >= 130000);
> + has_pubddl =  (pset.sversion >= 150000);
>
> Shouldn't that be 160000?
>
> ~
>
> 46.
>
> @@ -6313,6 +6319,9 @@ describePublications(const char *pattern)
>   if (has_pubviaroot)
>   appendPQExpBufferStr(&buf,
>   ", pubviaroot");
> + if (has_pubddl)
> + appendPQExpBufferStr(&buf,
> + ", pubddl");
>
> IMO it would be better if "DLLs" column appeared adjacent to that the
> other 'publish' parameter option values. (e.g. these are not even the
> same column ordering as pg_dump).
>

Will fix this in a future patch.

>
> ======
>
> src/include/catalog/pg_proc.dat
>
> 47.
>
> +{ oid => '4644', descr => 'trigger for ddl command deparse',
> +  proname => 'publication_deparse_ddl_command_end', prorettype =>
> 'event_trigger',
> +  proargtypes => '', prosrc => 'publication_deparse_ddl_command_end' },
>
> Why doesn't the description say 'end'?
>

fixed this.

> ======
>
> src/include/catalog/pg_publication.h
>
> 48. FormData_pg_publication
>
> +
> + /* true if table creations are published */
> + bool pubddl;
>  } FormData_pg_publication;
>
> Why just table publications? I thought it was for EVERYTHING.
>

fixed.

> ~~~
>
> 49. PublicationActions
>
> + bool pubddl;
>  } PublicationActions;
>
> This might be OK for POC, but for the real feature, I think this
> should be more fine-grained than this all-or-nothing DDL.
>

yes, we will need to rethink this.

> ======
>
> src/include/replication/ddlmessage.h
>
> 50.
>
> +{
> + Oid dbId; /* database Oid emitted from */
> + Size prefix_size; /* length of prefix */
> + Oid relid; /* id of the table */
> + DeparsedCommandType cmdtype; /* type of sql command */
> + Size message_size; /* size of the message */
> +
> + /*
> + * payload, including null-terminated prefix of length prefix_size
> + */
> + char message[FLEXIBLE_ARRAY_MEMBER];
> +} xl_logical_ddl_message;
>
>
> 50a.
> The prefix_size comment needs to say /* length of the prefix
> (including '\0' terminator) */
>

fixed.


> ~
>
> 50b.
> 'relid' seems specific to TABLE DDL. Will future versions have many
> more Oid members here? Or should this be a union member or a generic
> name like 'objid'?
>

it is specific to tables, this is only to check if the table is part
of publication.
All other objects are taken as a catch-all.

> ~~~
>
> 51. XLOG_LOGICAL_DDL_MESSAGE
>
> +/* RMGR API*/
> +#define XLOG_LOGICAL_DDL_MESSAGE 0x00
>
> 0x00 is same value as XLOG_LOGICAL_MESSAGE in message.h. That doesn't
> seem correct because then how will those different messages be
> identified?
>

Currently logical messages are not handled by subscriptions, so the
same value is overloaded.

> ======
>
> src/include/replication/logicalproto.h
>
> 52. LogicalRepMsgType
>
> @@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType
>   LOGICAL_REP_MSG_RELATION = 'R',
>   LOGICAL_REP_MSG_TYPE = 'Y',
>   LOGICAL_REP_MSG_MESSAGE = 'M',
> + LOGICAL_REP_MSG_DDLMESSAGE = 'L',
>   LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
>
> The name already includes _MSG_ so why say MESSAGE again? IMO this
> should be called just LOGICAL_REP_MSG_DDL. See general comment.
>

fixed.

> ~~~
>
> 53.
>
>  extern void logicalrep_write_message(StringInfo out, TransactionId
> xid, XLogRecPtr lsn,
> - bool transactional, const char *prefix, Size sz, const char *message);
> + bool transactional, const char *prefix,
> + Size sz, const char *message);
>
> Modifying the wrapping of this unrelated function should not be done
> in this patch.
>

fixed.

> ======
>
> src/include/replication/reorderbuffer.h
>
> 54. REORDER_BUFFER_CHANGE_DDLMESSAGE
>
> @@ -56,6 +58,7 @@ typedef enum ReorderBufferChangeType
>   REORDER_BUFFER_CHANGE_INSERT,
>   REORDER_BUFFER_CHANGE_UPDATE,
>   REORDER_BUFFER_CHANGE_DELETE,
> + REORDER_BUFFER_CHANGE_DDLMESSAGE,
>
> Why not call it REORDER_BUFFER_CHANGE_DDL? -- see general review comment
>

fixed.

> ~~~
>
> 55. ReorderBufferChange
>
> + /* DDL Message. */
> + struct
> + {
> + char    *prefix;
> + Size message_size;
> + char    *message;
> + Oid relid;
> + DeparsedCommandType cmdtype;
> + } ddlmsg;
> +
>
> Why not call it ddl? -- see general review comment
>

fixed.

> ======
>
> src/test/regress/expected/psql.out
>
> 56.
>
>  \dRp "no.such.publication"
> -                              List of publications
> - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
> -------+-------+------------+---------+---------+---------+-----------+----------
> +                                 List of publications
> + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
> | Via root | DDLs
> +------+-------+------------+---------+---------+---------+-----------+----------+------
>  (0 rows)
>
> I wondered if "DDLs" belongs adjacent to the
> Inserts/Updates/Deletes/Trucates because those are the other "publish"
> parameters just like this.
>

Will fix this in a future patch.

> ======
>
> src/test/regress/expected/publication.out
>
> 57.
>
> (Ditto comment for psql.out)
>
> I wondered if "DDLs" belongs adjacent to the
> Inserts/Updates/Deletes/Trucates because those are the other "publish"
> parameters just like this.
>
> ~~~
>
> 58.
>
> Looks like there is a missing regress test case where you actually set
> the publish='ddl' and then verify that the DDLs column is correctly
> set 't'?
>

lots of tests missing, will update in a future patch.


> ======
>
> 59. MISC = typedefs.list
>
> There are missing some typedefs.list changes for this patch. At least
> the following:
>
> e.g.
> - DeparsedCommandType (from ddlmessage.h)
> - xl_logical_ddl_message (from ddlmessage.h)
> - LogicalDecodeDDLMessageCB (from output_plugin.h)
> - LogicalDecodeStreamDDLMessageCB (from output_plugin.h)
> - ReorderBufferDDLMessageCB (from reorderbuffer.h)
> - ReorderBufferStreamDDLMessageCB (from reorderbuffer.h)
>

added.

regards,
Ajin Cherian
Fujitsu Australia

Attachment

pgsql-hackers by date:

Previous
From: Ashutosh Bapat
Date:
Subject: Re: Infinite Interval
Next
From: Мельников Игорь
Date:
Subject: Re: Add PL/pgSQL extra check no_data_found