Re: Support logical replication of DDLs - Mailing list pgsql-hackers

From Ajin Cherian
Subject Re: Support logical replication of DDLs
Date
Msg-id CAFPTHDbp_opg-RcKHhvRuXiZx_xdS9Pz06QM6uZRyPPb62R7Sg@mail.gmail.com
Whole thread Raw
In response to Re: Support logical replication of DDLs  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Tue, Nov 15, 2022 at 10:57 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Here are some review comments for v32-0002
>
> ======
>
> 1. Commit message
>
> Comment says:
> While creating a publication, we register a command end
> trigger that deparses the DDL as a JSON blob, and WAL logs it. The event
> trigger is automatically removed at the time of drop publication.
>
> SUGGESTION (uppercase the SQL)
> During CREATE PUBLICATION we register a command end trigger that
> deparses the DDL as a JSON blob, and WAL logs it. The event
> trigger is automatically removed at the time of DROP PUBLICATION.
>
> ~~~

fixed.

>
> 2.
>
> Comment says:
> This is a POC patch to show how using event triggers and DDL deparsing
> facilities we can implement DDL replication. So, the implementation is
> restricted to CREATE TABLE/ALTER TABLE/DROP TABLE commands.
>
> ~
>
> Still correct or old comment gone stale?
>

Removed.

> ~~~
>
> 3.
>
> Comment says:
> Note that the replication for ALTER INDEX command is still under
> progress.
>
> ~
>
> Still correct or old comment gone stale?
>

Removed.

> ======
>
> 4. GENERAL - Patch order.
>
> Somehow, I feel this v32-0002 patch and the v32-0001 patch should be
> swapped. IIUC this one seems to me to be the "core" framework for the
> DDL message replication but the other 0001 was more like just the
> implements of all the supported different *kinds* of DDL JSON blobs.
> So actually this patch seems more like the mandatory one and the other
> one can just evolve as it gets more supported JSON.
>

I think there is a big patch reordering planned in future versions
based on this comment
and Alvaro's comment. Skipping this for now.

> ~~~
>
> 5. GENERAL - naming
>
> The DDL suffix 'msg' or 'message' seemed sometimes unnecessary because
> there is no ambiguity that this is a message for DDL replication, so
> the shorter name conveys the same amount of information, doesn't it?
>
> e.g. Maybe reconsider some of these ones (probably there are others)...
>
> src/include/replication/decode.h
> logicalddlmsg_decode -> Why not call this function logicalddl_decode?
>
> src/include/replication/logicalproto.h:
> LOGICAL_REP_MSG_DDLMESSAGE -> Why not call it 'LOGICAL_REP_MSG_DDL'?
> logicalrep_write_ddlmessage -> Why not call this function logicalrep_write_ddl?
> logicalrep_read_ddlmessage -> Why not call this function logicalrep_read_ddl?
>
> src/include/replication/output_plugin.h:
> 'ddlmessage_cb' -> Why not call it 'ddl_cb'?
> 'stream_ddlmessage_cb' -> Why not call it 'stream_ddl_cb'?
>
> src/include/replication/reorderbuffer.h:
> - 'REORDER_BUFFER_CHANGE_DDL' --> Why not call it 'REORDER_BUFFER_CHANGE_DDL'?
> - 'ddlmsg' -> Why not call it 'ddl'?
> - 'ddlmessage' -> Why not call it 'ddl'?
> - 'stream_ddlmessage' -> Why not call it 'stream_ddl'?
>

Fixed.

> ======
>
> src/backend/access/rmgrdesc/Makefile
>
> 6.
>
> @@ -19,6 +19,7 @@ OBJS = \
>   hashdesc.o \
>   heapdesc.o \
>   logicalmsgdesc.o \
> + logicalddlmsgdesc.o \
>
> Change should be in alphabetical order.
>

Fixed.

> ======
>
> src/backend/access/rmgrdesc/logicalddlmsgdesc.c
>
> 7. logicalddlmsg_identify
>
> +const char *
> +logicalddlmsg_identify(uint8 info)
> +{
> + if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
> + return "DDL MESSAGE";
> +
> + return NULL;
> +}
>
> The logicalrep_message_type (see below) said "DDL", so maybe this
> should also just say "DDL" instead of "DDL MESSAGE"
>
> @@ -1218,6 +1264,8 @@ logicalrep_message_type(LogicalRepMsgType action)
>   return "TYPE";
>   case LOGICAL_REP_MSG_MESSAGE:
>   return "MESSAGE";
> + case LOGICAL_REP_MSG_DDLMESSAGE:
> + return "DDL";
>

Fixed.

> ======
>
> src/backend/commands/event_trigger.c
>
> 8. start/end
>
> +/*
> + * publication_deparse_ddl_command_start
> + *
> + * Deparse the ddl command and log it.
> + */
> +Datum
> +publication_deparse_ddl_command_start(PG_FUNCTION_ARGS)
> ...
> +/*
> + * publication_deparse_ddl_command_end
> + *
> + * Deparse the ddl command and log it.
> + */
> +Datum
> +publication_deparse_ddl_command_end(PG_FUNCTION_ARGS)
>
> The start/end function comments are the same -- there should be some
> more explanation to say what they are for.
>

Updated with a more detailed explanation.

> ~~~
>
> 9. publication_deparse_ddl_command_start
>
> + char    *command = psprintf("Drop table command start");
>
> Huh? So this function is only for this specific case of DROP TABLE? If
> correct, then I think that should be commented on or asserted
> somewhere.
>

Updated the comments specifying this.

> ~
>
> 10.
>
> + /* extract the relid from the parse tree */
> + foreach(cell1, stmt->objects)
>
> Uppercase comment
>

Fixed.

> ~
>
> 11.
>
> + if (relpersist == RELPERSISTENCE_TEMP)
> + {
> + table_close(relation, NoLock);
> + continue;
> + }
> +
> + LogLogicalDDLMessage("deparse", address.objectId, DCT_TableDropStart,
> + command, strlen(command) + 1);
> +
> + if (relation)
> + table_close(relation, NoLock);
>
> This code looks overly complex. Can't it just be like below?
>
> SUGGESTION
>
> if (relpersist != RELPERSISTENCE_TEMP)
> LogLogicalDDLMessage("deparse", address.objectId, DCT_TableDropStart,
> command, strlen(command) + 1);
>
> if (relation)
> table_close(relation, NoLock);
>
> ~~~
>
> 12. publication_deparse_table_rewrite
>
> + if (relpersist == RELPERSISTENCE_TEMP)
> + return PointerGetDatum(NULL);
> +
> + /* Deparse the DDL command and WAL log it to allow decoding of the same. */
> + json_string = deparse_utility_command(cmd, false);
> +
> + if (json_string != NULL)
> + LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter,
> + json_string, strlen(json_string) + 1);
> +
> + return PointerGetDatum(NULL);
>
> Similar to previous comment I think this can be simplified so there is
> only one return
>
> SUGGESTION
>
> if (relpersist != RELPERSISTENCE_TEMP)
> {
> /* Deparse the DDL command and WAL log it to allow decoding of the same. */
> json_string = deparse_utility_command(cmd, false);
>
> if (json_string != NULL)
> LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter,
> json_string, strlen(json_string) + 1);
> }
>
> return PointerGetDatum(NULL);
>

Fixed as described above.

> ~~~
>
> 13. publication_deparse_ddl_command_end
>
> + if (relpersist == RELPERSISTENCE_TEMP)
> + continue;
> +
> + /*
> + * Deparse the DDL command and WAL log it to allow decoding of the
> + * same.
> + */
> + json_string = deparse_utility_command(cmd, false);
> +
> + if (json_string == NULL)
> + continue;
> +
> + LogLogicalDDLMessage("deparse", relid, type, json_string,
> + strlen(json_string) + 1);
>
> Maybe this logic is simpler without all the continue?
>
> SUGGESTION
>
> if (relpersist != RELPERSISTENCE_TEMP)
> {
> /*
> * Deparse the DDL command and WAL log it to allow decoding of the
> * same.
> */
> json_string = deparse_utility_command(cmd, false);
>
> if (json_string != NULL)
> LogLogicalDDLMessage("deparse", relid, type, json_string,
> strlen(json_string) + 1);
> }
>

Fixed.

> ~
>
> 14.
>
> + if (strcmp(obj->objecttype, "table") == 0)
> + cmdtype = DCT_TableDropEnd;
> + else if (strcmp(obj->objecttype, "sequence") == 0 ||
> + strcmp(obj->objecttype, "schema") == 0 ||
> + strcmp(obj->objecttype, "index") == 0 ||
> + strcmp(obj->objecttype, "function") == 0 ||
> + strcmp(obj->objecttype, "procedure") == 0 ||
> + strcmp(obj->objecttype, "operator") == 0 ||
> + strcmp(obj->objecttype, "operator class") == 0 ||
> + strcmp(obj->objecttype, "operator family") == 0 ||
> + strcmp(obj->objecttype, "cast") == 0 ||
> + strcmp(obj->objecttype, "type") == 0 ||
> + strcmp(obj->objecttype, "domain") == 0 ||
> + strcmp(obj->objecttype, "trigger") == 0 ||
> + strcmp(obj->objecttype, "conversion") == 0 ||
> + strcmp(obj->objecttype, "policy") == 0 ||
> + strcmp(obj->objecttype, "rule") == 0 ||
> + strcmp(obj->objecttype, "extension") == 0 ||
> + strcmp(obj->objecttype, "foreign-data wrapper") == 0 ||
> + strcmp(obj->objecttype, "text search configuration") == 0 ||
> + strcmp(obj->objecttype, "text search dictionary") == 0 ||
> + strcmp(obj->objecttype, "text search parser") == 0 ||
> + strcmp(obj->objecttype, "text search template") == 0 ||
> + strcmp(obj->objecttype, "transform") == 0 ||
> + strcmp(obj->objecttype, "server") == 0 ||
> + strcmp(obj->objecttype, "collation") == 0 ||
> + strcmp(obj->objecttype, "user mapping") == 0 ||
> + strcmp(obj->objecttype, "language") == 0 ||
> + strcmp(obj->objecttype, "view") == 0 ||
> + strcmp(obj->objecttype, "materialized view") == 0 ||
> + strcmp(obj->objecttype, "statistics object") == 0 ||
> + strcmp(obj->objecttype, "access method") == 0)
> + cmdtype = DCT_ObjectDrop;
> + else
> + continue;
> +
> + /* Change foreign-data wrapper to foreign data wrapper */
> + if (strncmp(obj->objecttype, "foreign-data wrapper", 20) == 0)
> + {
> + tmptype = pstrdup("foreign data wrapper");
> + command = deparse_drop_command(obj->objidentity, tmptype,
> +    stmt->behavior);
> + }
> +
> + /* Change statistics object to statistics */
> + else if (strncmp(obj->objecttype, "statistics object",
> + strlen("statistics object")) == 0)
> + {
> + tmptype = pstrdup("statistics");
> + command = deparse_drop_command(obj->objidentity, tmptype,
> +    stmt->behavior);
> + }
> +
> + /*
> + * object identity needs to be modified to make the drop work
> + *
> + * FROM: <role> on server <servername> TO  : for >role> server
> + * <servername>
> + *
> + */
> + else if (strncmp(obj->objecttype, "user mapping", 12) == 0)
> + {
> + char    *on_server;
> +
> + tmptype = palloc(strlen(obj->objidentity) + 2);
> + on_server = strstr(obj->objidentity, "on server");
> +
> + sprintf((char *) tmptype, "for ");
> + strncat((char *) tmptype, obj->objidentity, on_server - obj->objidentity);
> + strcat((char *) tmptype, on_server + 3);
> + command = deparse_drop_command(tmptype, obj->objecttype,
> +    stmt->behavior);
> + }
> + else
> + command = deparse_drop_command(obj->objidentity, obj->objecttype,
> +    stmt->behavior);
>
> 14a.
> Why are some of these implemented as strcmp and others are implemented
> as strncmp?
>
> ~
>
> 14b.
> The mass strcmp seems inefficient. The same could be done in other ways like:
> - use a single strstr call (where all the possibilities are in one large string)
> - pass string representation of some enum and just switch on it
> - etc.
>
> ~

I considered modifying this but using one large string could possibly result
in a false positive with an overlapping combination of alphabets in
adjoining words.
This will definitely require a refactoring, but I am not sure what is
the best way.
Maybe a simple 'for' loop searching an array of strings is better.

>
> 15.
>
> + /*
> + * object identity needs to be modified to make the drop work
> + *
> + * FROM: <role> on server <servername> TO  : for >role> server
> + * <servername>
> + *
> + */
>
> The comment needs fixing.
>

Fixed.

> ~
>
> 16.
>
> + if (command == NULL)
> + continue;
> +
> + LogLogicalDDLMessage("deparse", obj->address.objectId, cmdtype,
> + command, strlen(command) + 1);
>
> SUGGESTION
>
> if (command)
> LogLogicalDDLMessage("deparse", obj->address.objectId, cmdtype,
> command, strlen(command) + 1);
>

Fixed.

> ======
>
> src/backend/commands/publicationcmds.c
>
>
> 17. CreateDDLReplicaEventTrigger
>
> + static const char *trigger_name_prefix = "pg_deparse_trig_%s_%u";
> + static const char *trigger_func_prefix = "publication_deparse_%s";
>
> 17a.
> I felt the ddl deparse trigger name should have the name "ddl" in it somewhere
>
It is there in the second-half of the string;
1. ddl_command_end
2. ddl_command_start


> ~
>
> 17b.
> Why are these called "prefixes" ?? - They looked more just like name
> format strings to me.
>

The caller can decide the second half of the string as event name,
 this function sets the first half, hence suffix.

> ~~~
>
> 18. CreatePublication
>
> + /*
> + * Create an event trigger to allow logging of DDL statements.
> + *
> + * TODO: We need to find a better syntax to allow replication of DDL
> + * statements.
> + *
> + * XXX: This code is just to show the replication of CREATE/ALTER/DROP
> + * TABLE works. We need to enhance this once the approach for DDL
> + * replication is finalized.
> + */
> + if (pubactions.pubddl)
>
> This comment needs updating.
>

Fixed.

> ~
>
> 19.
>
> + CommandTag end_commands[] = {
> + CMDTAG_CREATE_ACCESS_METHOD,
> + CMDTAG_DROP_ACCESS_METHOD,
> + CMDTAG_ALTER_DEFAULT_PRIVILEGES,
> + CMDTAG_COMMENT,
> + CMDTAG_CREATE_LANGUAGE,
> + CMDTAG_ALTER_LANGUAGE,
> + CMDTAG_DROP_LANGUAGE,
> + CMDTAG_CREATE_VIEW,
> + CMDTAG_ALTER_VIEW,
> + CMDTAG_DROP_VIEW,
> + CMDTAG_CREATE_MATERIALIZED_VIEW,
>
> 19a.
> Some better ordering (e.g. A-Z) can be done here, and maybe use blank
> lines to make the groupings more obbious.
>

Fixed.

> ~
>
> 19b.
> Wouldn't it be better to declare these static?
>
>

Fixed.

> ======
>
> src/backend/replication/logical/Makefile
>
> 20.
>
>  OBJS = \
>   decode.o \
> + ddlmessage.o\
>   launcher.o \
> Change should be in alphabetical order.
>

Fixed.

> ======
>
> src/backend/replication/logical/ddlmessage.c
>
> 21. File Comment
>
> + * Unlike generic logical messages, these DDL messages have only transactional
> + * mode.Note by default DDLs in PostgreSQL are transactional.
>
> Missing space before "Note"
>

Fixed.

> ~~~
>
> 22. LogLogicalDDLMessage
>
> + /*
> + * Ensure we have a valid transaction id.
> + */
> + Assert(IsTransactionState());
> + GetCurrentTransactionId();
>
> Single line comment should be OK here
>

Fixed.

> ~
>
> 23.
>
> + /* trailing zero is critical; see logicalddlmsg_desc */
>
> Uppercase comment
>

fixed.

> ~
>
> 24.
>
> + /* allow origin filtering */
>
> Uppercase comment
>

fixed.

> ======
>
> src/backend/replication/logical/proto.c
>
> 25. logicalrep_read_ddlmessage
>
> + uint8 flags;
> + char *msg;
> +
> + //TODO double check when do we need to get TransactionId.
> +
> + flags = pq_getmsgint(in, 1);
> + if (flags != 0)
> + elog(ERROR, "unrecognized flags %u in ddl message", flags);
> + *lsn = pq_getmsgint64(in);
> + *prefix = pq_getmsgstring(in);
> + *sz = pq_getmsgint(in, 4);
> + msg = (char *) pq_getmsgbytes(in, *sz);
> +
> + return msg;
>
> 25a.
> This code will fail if the associated *write* function has sent a xid.
> Maybe additional param is needed to tell it when to read the xid?
>

removed to not send xid, not required.

> ~
>
> 25b.
> Will be tidier to have a blank line after the elog
>

fixed.

> ~~~
>
> 26. logicalrep_write_ddlmessage
>
> + /* transaction ID (if not valid, we're not streaming) */
> + if (TransactionIdIsValid(xid))
> + pq_sendint32(out, xid);
>
> Perhaps this "write" function should *always* write the xid even if it
> is invalid because then the "read" function will know to always read
> it.
>

changed it to never send xid.

> ======
>
> src/backend/replication/logical/reorderbuffer.c
>
> 27. ReorderBufferQueueDDLMessage
>
> + Assert(xid != InvalidTransactionId);
>
> SUGGESTION
> Assert(TransactionIdIsValid(xid));
>

fixed.

> ~~~
>
> 28. ReorderBufferSerializeChange
>
> + data += sizeof(int);
> + memcpy(data, change->data.ddlmsg.prefix,
> +    prefix_size);
> + data += prefix_size;
>
> Unnecessary wrapping of memcpy.
>

fixed.

> ~
>
> 29.
>
> + memcpy(data, &change->data.ddlmsg.cmdtype, sizeof(int));
> + data += sizeof(int);
>
> Would that be better to write as:
>
> sizeof(DeparsedCommandType) instead of sizeof(int)
>

fixed.

> ~~~
>
> 30. ReorderBufferChangeSize
>
> + case REORDER_BUFFER_CHANGE_DDLMESSAGE:
> + {
> + Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
> +
> + sz += prefix_size + change->data.ddlmsg.message_size +
> + sizeof(Size) + sizeof(Size) + sizeof(Oid) + sizeof(int);
>
> sizeof(DeparsedCommandType) instead of sizeof(int)
>

fixed.

Breaking this into two mails, next set of comments in next mail.

regards,
Ajin Cherian
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Amit Kapila
Date:
Subject: Re: Force streaming every change in logical decoding
Next
From: Мельников Игорь
Date:
Subject: Re: Add PL/pgSQL extra check no_data_found