From 426c80f427ebd9d5817d10a1e557c543cedf631d Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 15 Jun 2021 01:52:39 -0400 Subject: [PATCH v5] Add option to set two-phase in CREATE_REPLICATION_SLOT command. CREATE_REPLICATION_SLOT modified to support two-phase encoding in the slot. This will allow the decoding of commands like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED for slots created with this option. --- doc/src/sgml/protocol.sgml | 16 +++++++++++++++- src/backend/replication/repl_gram.y | 12 ++++++++++++ src/backend/replication/repl_scanner.l | 1 + src/backend/replication/walsender.c | 18 +++++++++++++++--- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index bc2a2fe..205fbd2 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1914,7 +1914,7 @@ The commands accepted in replication mode are: - CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] } + CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT | TWO_PHASE ] } CREATE_REPLICATION_SLOT @@ -1956,6 +1956,20 @@ The commands accepted in replication mode are: + TWO_PHASE + + + Specify that this logical replication slot supports decoding of two-phase + transactions. With this option, two-phase commands like + PREPARE TRANSACTION, COMMIT PREPARED + and ROLLBACK PREPARED are decoded and transmitted. + The transaction will be decoded and transmitted at + PREPARE TRANSACTION time. + + + + + RESERVE_WAL diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index eb283a8..eead144 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void); %token K_SLOT %token K_RESERVE_WAL %token K_TEMPORARY +%token K_TWO_PHASE %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT @@ -102,6 +103,7 @@ static SQLCmd *make_sqlcmd(void); %type plugin_opt_arg %type opt_slot var_name %type opt_temporary +%type opt_two_phase %type create_slot_opt_list %type create_slot_opt @@ -283,6 +285,11 @@ create_slot_opt: $$ = makeDefElem("reserve_wal", (Node *)makeInteger(true), -1); } + | K_TWO_PHASE + { + $$ = makeDefElem("two_phase", + (Node *)makeInteger(true), -1); + } ; /* DROP_REPLICATION_SLOT slot */ @@ -365,6 +372,11 @@ opt_temporary: | /* EMPTY */ { $$ = false; } ; +opt_two_phase: + K_TWO_PHASE { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + opt_slot: K_SLOT IDENT { $$ = $2; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index dcc3c3f..c038a63 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } +TWO_PHASE { return K_TWO_PHASE; } EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } USE_SNAPSHOT { return K_USE_SNAPSHOT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3224536..92c755f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, - CRSSnapshotAction *snapshot_action) + CRSSnapshotAction *snapshot_action, + bool *two_phase) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; + bool two_phase_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, reserve_wal_given = true; *reserve_wal = true; } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_given = true; + *two_phase = true; + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char xloc[MAXFNAMELEN]; char *slot_name; bool reserve_wal = false; + bool two_phase = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); /* setup state for WalSndSegmentOpen */ sendTimeLineIsHistoric = false; @@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - false); + two_phase); } if (cmd->kind == REPLICATION_KIND_LOGICAL) -- 1.8.3.1