From db2df233fb2a0db44a2beb884d68cf783435822f Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 9 Jun 2021 06:38:07 -0400 Subject: [PATCH v3] Add support for two-phase decoding in pg_recvlogical. Modified streamutils to pass in two-phase option when calling CREATE_REPLICATION_SLOT. Added new option --two-phase in pg_recvlogical to allow decoding of two-phase transactions. --- doc/src/sgml/logicaldecoding.sgml | 20 ++++++++++++++++++-- doc/src/sgml/ref/pg_recvlogical.sgml | 16 ++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 2 +- src/bin/pg_basebackup/pg_receivewal.c | 2 +- src/bin/pg_basebackup/pg_recvlogical.c | 19 +++++++++++++++++-- src/bin/pg_basebackup/streamutil.c | 11 +++++++++-- src/bin/pg_basebackup/streamutil.h | 2 +- 7 files changed, 63 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index d2c6e15..9212984 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -144,14 +144,14 @@ postgres=# SELECT pg_drop_replication_slot('regression_slot'); - The following example shows how logical decoding is controlled over the + The following examples shows how logical decoding is controlled over the streaming replication protocol, using the program included in the PostgreSQL distribution. This requires that client authentication is set up to allow replication connections (see ) and that max_wal_senders is set sufficiently high to allow - an additional connection. + an additional connection. The second example enables two-phase decoding. $ pg_recvlogical -d postgres --slot=test --create-slot @@ -164,8 +164,24 @@ table public.data: INSERT: id[integer]:4 data[text]:'4' COMMIT 693 ControlC $ pg_recvlogical -d postgres --slot=test --drop-slot + +$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase +$ pg_recvlogical -d postgres --slot=test --start -f - +ControlZ +$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';" +$ fg +BEGIN 694 +table public.data: INSERT: id[integer]:5 data[text]:'5' +PREPARE TRANSACTION 'test', txid 694 +ControlZ +$ psql -d postgres -c "COMMIT PREPARED 'test';" +$ fg +COMMIT PREPARED 'test', txid 694 +ControlC +$ pg_recvlogical -d postgres --slot=test --drop-slot + The following example shows SQL interface that can be used to decode prepared transactions. Before you use two-phase commit commands, you must set diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml index 6b1d98d..57c7e1b 100644 --- a/doc/src/sgml/ref/pg_recvlogical.sgml +++ b/doc/src/sgml/ref/pg_recvlogical.sgml @@ -65,6 +65,11 @@ PostgreSQL documentation , for the database specified by . + + + The can be specified with + to enable two-phase decoding. + @@ -265,6 +270,17 @@ PostgreSQL documentation + + + + + + + Enables two-phase decoding. This option should only be used with + + + + diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 16d8929..8bb0acf 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) if (temp_replication_slot || create_slot) { if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL, - temp_replication_slot, true, true, false)) + temp_replication_slot, true, true, false, false)) exit(1); if (verbose) diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 0d15012..c1334fa 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -741,7 +741,7 @@ main(int argc, char **argv) pg_log_info("creating replication slot \"%s\"", replication_slot); if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false, - slot_exists_ok)) + slot_exists_ok, false)) exit(1); exit(0); } diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 5efec16..729082b 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -35,6 +35,7 @@ /* Global Options */ static char *outfile = NULL; static int verbose = 0; +static bool two_phase = false; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 10 * 1000; /* 10 sec = default */ @@ -94,6 +95,7 @@ usage(void) " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n")); printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); @@ -678,6 +680,7 @@ main(int argc, char **argv) {"fsync-interval", required_argument, NULL, 'F'}, {"no-loop", no_argument, NULL, 'n'}, {"verbose", no_argument, NULL, 'v'}, + {"two-phase", no_argument, NULL, 't'}, {"version", no_argument, NULL, 'V'}, {"help", no_argument, NULL, '?'}, /* connection options */ @@ -726,7 +729,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:", + while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:", long_options, &option_index)) != -1) { switch (c) @@ -749,6 +752,9 @@ main(int argc, char **argv) case 'v': verbose++; break; + case 't': + two_phase = true; + break; /* connection options */ case 'd': dbname = pg_strdup(optarg); @@ -920,6 +926,15 @@ main(int argc, char **argv) exit(1); } + if (two_phase && !do_create_slot) + { + pg_log_error("--two-phase may only be specified with --create-slot"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + #ifndef WIN32 pqsignal(SIGINT, sigint_handler); pqsignal(SIGHUP, sighup_handler); @@ -976,7 +991,7 @@ main(int argc, char **argv) pg_log_info("creating replication slot \"%s\"", replication_slot); if (!CreateReplicationSlot(conn, replication_slot, plugin, false, - false, false, slot_exists_ok)) + false, false, slot_exists_ok, two_phase)) exit(1); startpos = InvalidXLogRecPtr; } diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 99daf0e..a437533 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, - bool slot_exists_ok) + bool slot_exists_ok, bool two_phase) { PQExpBuffer query; PGresult *res; @@ -495,12 +495,14 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, Assert((is_physical && plugin == NULL) || (!is_physical && plugin != NULL)); + Assert(!(two_phase && is_physical)); Assert(slot_name != NULL); /* Build query */ appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name); if (is_temporary) appendPQExpBufferStr(query, " TEMPORARY"); + if (is_physical) { appendPQExpBufferStr(query, " PHYSICAL"); @@ -509,7 +511,12 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, } else { - appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin); + appendPQExpBuffer(query, " LOGICAL", plugin); + if (two_phase) + appendPQExpBufferStr(query, " TWO_PHASE "); + + appendPQExpBuffer(query, "\"%s\"", plugin); + if (PQserverVersion(conn) >= 100000) /* pg_recvlogical doesn't use an exported snapshot, so suppress */ appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT"); diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 10f87ad..504803b 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -34,7 +34,7 @@ extern PGconn *GetConnection(void); extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, - bool slot_exists_ok); + bool slot_exists_ok, bool two_phase); extern bool DropReplicationSlot(PGconn *conn, const char *slot_name); extern bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, -- 1.8.3.1