From aaeb397c352267dbeddcb8e98f71d827cc5c1027 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Mon, 12 Apr 2021 19:58:47 +1000 Subject: [PATCH v72] Add support for prepared transactions to built-in logical replication. To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the below things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable it. We enable the two_phase once the initial data sync is over. * Add a new option to enable two_phase while creating a slot. We don't use this option in the patch but this will allow the outside replication solutions using streaming replication protocol to use it. * Adds new subscription TAP tests, and new subscription.sql regression tests. * Updates PG doumentation. We don't support the below operations: * ALTER SUBSCRIPTION REFRESH PUBLICATION when two_phase enabled. * ALTER SUBSCRIPTION {SET|ADD|DROP} PUBLICATION WITH (refresh = true) when two_phase enabled. * Prepare API for in-progress transactions is not supported. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 12 + doc/src/sgml/logicaldecoding.sgml | 6 +- doc/src/sgml/protocol.sgml | 313 ++++++++++++++++++- doc/src/sgml/ref/alter_subscription.sgml | 5 + doc/src/sgml/ref/create_subscription.sgml | 37 +++ src/backend/access/transam/twophase.c | 68 ++++ src/backend/catalog/pg_subscription.c | 35 +++ src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 135 +++++++- .../libpqwalreceiver/libpqwalreceiver.c | 10 +- src/backend/replication/logical/decode.c | 10 +- src/backend/replication/logical/logical.c | 37 ++- src/backend/replication/logical/origin.c | 7 +- src/backend/replication/logical/proto.c | 212 +++++++++++++ src/backend/replication/logical/reorderbuffer.c | 13 +- src/backend/replication/logical/snapbuild.c | 33 +- src/backend/replication/logical/tablesync.c | 200 ++++++++++-- src/backend/replication/logical/worker.c | 341 ++++++++++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 201 +++++++++--- src/backend/replication/repl_gram.y | 16 +- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/slot.c | 1 + src/backend/replication/walreceiver.c | 2 +- src/bin/pg_dump/pg_dump.c | 20 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 13 +- src/bin/psql/tab-complete.c | 2 +- src/include/access/twophase.h | 2 + src/include/catalog/pg_subscription.h | 11 + src/include/catalog/pg_subscription_rel.h | 1 + src/include/replication/logical.h | 7 +- src/include/replication/logicalproto.h | 65 +++- src/include/replication/pgoutput.h | 1 + src/include/replication/reorderbuffer.h | 2 +- src/include/replication/slot.h | 7 +- src/include/replication/snapbuild.h | 5 +- src/include/replication/walreceiver.h | 6 +- src/include/replication/worker_internal.h | 3 + src/test/regress/expected/subscription.out | 109 ++++--- src/test/regress/sql/subscription.sql | 25 ++ src/test/subscription/t/021_twophase.pl | 293 ++++++++++++++++++ src/test/subscription/t/022_twophase_cascade.pl | 236 ++++++++++++++ src/tools/pgindent/typedefs.list | 2 + 43 files changed, 2318 insertions(+), 190 deletions(-) create mode 100644 src/test/subscription/t/021_twophase.pl create mode 100644 src/test/subscription/t/022_twophase_cascade.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 2656786..cd6f064 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7632,6 +7632,18 @@ SCRAM-SHA-256$<iteration count>:&l + subtwophasestate char + + + State code: + d = two_phase mode was not requested, so is disabled; + p = two_phase mode was requested, but is pending enablement; + e = two_phase mode was requested, and is enabled. + + + + + subconninfo text diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 5d049cd..97ac503 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -1250,9 +1250,9 @@ stream_commit_cb(...); <-- commit of the streamed transaction The logical replication solution that builds distributed two phase commit using this feature can deadlock if the prepared transaction has locked - [user] catalog tables exclusively. They need to inform users to not have - locks on catalog tables (via explicit LOCK command) in - such transactions. + [user] catalog tables exclusively. To avoid this users must refrain from + having locks on catalog tables (via explicit LOCK command) + in such transactions. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 2f4dde3..48044dd 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1914,7 +1914,7 @@ The commands accepted in replication mode are: - CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] } + CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] [ TWO_PHASE ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] } CREATE_REPLICATION_SLOT @@ -1956,6 +1956,20 @@ The commands accepted in replication mode are: + TWO_PHASE + + + Specify that this replication slot supports decode of two-phase + transactions. With this option, two-phase commands like + PREPARE TRANSACTION, COMMIT PREPARED + and ROLLBACK PREPARED are decoded and transmitted. + The transaction will be decoded and transmitted at + PREPARE TRANSACTION time. + + + + + RESERVE_WAL @@ -2797,11 +2811,17 @@ The commands accepted in replication mode are: - Protocol version. Currently versions 1 and - 2 are supported. The version 2 - is supported only for server version 14 and above, and it allows - streaming of large in-progress transactions. - + Protocol version. Currently versions 1, 2, + and 3 are supported. + + + Version 2 is supported only for server version 14 + and above, and it allows streaming of large in-progress transactions. + + + Version 3 is supported only for server version 15 + and above, and it allows streaming of two-phase transactions. + @@ -2857,10 +2877,11 @@ The commands accepted in replication mode are: The logical replication protocol sends individual transactions one by one. This means that all messages between a pair of Begin and Commit messages - belong to the same transaction. It also sends changes of large in-progress - transactions between a pair of Stream Start and Stream Stop messages. The - last stream of such a transaction contains Stream Commit or Stream Abort - message. + belong to the same transaction. Similarly, all messages between a pair of + Begin Prepare and Commit Prepared messages belong to the same transaction. + It also sends changes of large in-progress transactions between a pair of + Stream Start and Stream Stop messages. The last stream of such a transaction + contains Stream Commit or Stream Abort message. @@ -7364,6 +7385,278 @@ Stream Abort + + + +The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared) +are available since protocol version 3. + + + + + + +Begin Prepare + + + + + + +Byte1('b') + + Identifies this message as the beginning of a two-phase transaction message. + + + + +Int64 + + The LSN of the prepare. + + + + +Int64 + + The end LSN of the transaction. + + + + +Int64 + + Timestamp of the prepare transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the subtransaction (will be same as xid of the transaction for top-level + transactions). + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + +Prepare + + + + + + +Byte1('P') + + Identifies this message as a two-phase prepare transaction message. + + + + +Int8 + + Flags; currently unused (must be 0). + + + + +Int64 + + The LSN of the prepare. + + + + +Int64 + + The end LSN of the transaction. + + + + +Int64 + + Timestamp of the prepare transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the subtransaction (will be same as xid of the transaction for top-level + transactions). + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + +Commit Prepared + + + + + + +Byte1('K') + + Identifies this message as the commit of a two-phase transaction message. + + + + +Int8 + + Flags; currently unused (must be 0). + + + + +Int64 + + The LSN of the prepare. + + + + +Int64 + + The end LSN of the transaction. + + + + +Int64 + + Timestamp of the commit transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the subtransaction (will be same as xid of the transaction for top-level + transactions). + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + +Rollback Prepared + + + + + + +Byte1('A') + + Identifies this message as the rollback of a two-phase transaction message. + + + + +Int8 + + Flags; currently unused (must be 0). + + + + +Int64 + + The LSN of the prepare. + + + + +Int64 + + The end LSN of the transaction. + + + + +Int64 + + Timestamp of the prepare. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int64 + + Timestamp of the rollback transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the subtransaction (will be same as xid of the transaction for top-level + transactions). + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + + + The following message parts are shared by the above messages. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 367ac81..2408e10 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -67,6 +67,11 @@ ALTER SUBSCRIPTION name RENAME TO < Commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true cannot be executed inside a transaction block. + + These commands also cannot be executed with copy_data = true + when the subscription has two_phase commit enabled. See + column subtwophasestate of + to know the actual two-phase state. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e812bee..bbef613 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -237,6 +237,43 @@ CREATE SUBSCRIPTION subscription_name + + + The streaming option cannot be used along with + two_phase option. + + + + + + two_phase (boolean) + + + Specifies whether two-phase commit is enabled for this subscription. + The default is false. + + + + When two-phase commit is enabled then the decoded transactions are sent + to the subscriber on the PREPARE TRANSACTION. By default, the transaction + prepared on publisher is decoded as normal transaction at commit. + + + + The two-phase commit implementation requires that the replication has + successfully passed the initial table synchronization phase. This means + even when two_phase is enabled for the subscription, the internal + two-phase state remains temporarily "pending" until the initialization + phase is completed. See column + subtwophasestate of + to know the actual two-phase state. + + + + The two_phase option cannot be used along with + streaming option. + + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index b658134..a8725e6 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2460,3 +2460,71 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } + +/* + * LookupGXact + * Check if the prepared transaction with the given GID and lsn is around. + * + * Note that we always compare with the LSN where prepare ends because that is + * what is stored as origin_lsn in the 2PC file. + * + * This function is primarily used to check if the prepared transaction + * received from the upstream (remote node) already exists. Checking only GID + * is not sufficient because a different prepared xact with the same GID can + * exist on the same node. So, we are ensuring to match origin_lsn and + * origin_timestamp of prepared xact to avoid the possibility of a match of + * prepared xact from two different nodes. + */ +bool +LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, + TimestampTz origin_prepare_timestamp) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if ((gxact->valid && strcmp(gxact->gid, gid) == 0)) + { + char *buf; + TwoPhaseFileHeader *hdr; + + /* + * We are neither expecting the collisions of GXACTs (same gid) + * between publisher and subscribers nor the apply worker restarts + * after prepared xacts, so we perform all I/O while holding + * TwoPhaseStateLock for simplicity. + * + * To move the I/O out of the lock, we need to ensure that no + * other backend commits the prepared xact in the meantime. We can + * do this optimization if we encounter many collisions in GID + * between publisher and subscriber. + */ + if (gxact->ondisk) + buf = ReadTwoPhaseFile(gxact->xid, false); + else + { + Assert(gxact->prepare_start_lsn); + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + } + + hdr = (TwoPhaseFileHeader *) buf; + + if (hdr->origin_lsn == prepare_end_lsn && + hdr->origin_timestamp == origin_prepare_timestamp) + { + found = true; + pfree(buf); + break; + } + + pfree(buf); + } + } + LWLockRelease(TwoPhaseStateLock); + return found; +} diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 4039768..2b4b699 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->enabled = subform->subenabled; sub->binary = subform->subbinary; sub->stream = subform->substream; + sub->twophasestate = subform->subtwophasestate; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, @@ -449,6 +450,40 @@ RemoveSubscriptionRel(Oid subid, Oid relid) table_close(rel, RowExclusiveLock); } +/* + * Does the subscription have any relations? + * + * Use this function only to know true/false, and when you have no need for the + * List returned by GetSubscriptionRelations. + */ +bool +HasSubscriptionRelations(Oid subid) +{ + Relation rel; + int nkeys = 0; + ScanKeyData skey[2]; + SysScanDesc scan; + bool has_subrels = false; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, nkeys, skey); + + /* If even a single tuple exists then the subscription has tables. */ + has_subrels = HeapTupleIsValid(systable_getnext(scan)); + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + return has_subrels; +} /* * Get all relations for subscription. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 451db2e..9abe43d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1264,7 +1264,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 517c8ed..55deef8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -69,7 +69,8 @@ parse_subscription_options(List *options, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) + bool *streaming_given, bool *streaming, + bool *twophase_given, bool *twophase) { ListCell *lc; bool connect_given = false; @@ -110,6 +111,11 @@ parse_subscription_options(List *options, *streaming_given = false; *streaming = false; } + if (twophase) + { + *twophase_given = false; + *twophase = false; + } /* Parse options */ foreach(lc, options) @@ -215,6 +221,29 @@ parse_subscription_options(List *options, *streaming_given = true; *streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0) + { + /* + * Do not allow toggling of two_phase option. Doing so could cause + * missing of transactions and lead to an inconsistent replica. + * See comments atop worker.c + * + * Note: twophase == NULL indicates that this call originated from + * AlterSubscription. + */ + if (!twophase) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot alter two_phase option"))); + + if (*twophase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *twophase_given = true; + *twophase = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -285,6 +314,21 @@ parse_subscription_options(List *options, errmsg("subscription with %s must also set %s", "slot_name = NONE", "create_slot = false"))); } + + /* + * Do additional checking for the disallowed combination of two_phase and + * streaming. While streaming and two_phase can theoretically be + * supported, it needs more analysis to allow them together. + */ + if (twophase && *twophase_given && *twophase) + { + if (streaming && *streaming_given && *streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "two_phase = true", "streaming = true"))); + } + } /* @@ -337,6 +381,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool copy_data; bool streaming; bool streaming_given; + bool twophase; + bool twophase_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -361,7 +407,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + &twophase_given, &twophase); /* * Since creating a replication slot is not transactional, rolling back @@ -429,6 +476,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -507,7 +558,16 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) { Assert(slotname); - walrcv_create_slot(wrconn, slotname, false, + /* + * Even if two_phase is set, don't create the slot with + * two-phase enabled. Will enable it once all the tables are + * synced and ready. This avoids race-conditions like prepared + * transactions being skipped due to changes not being applied + * due to checks in should_apply_changes_for_rel() when + * tablesync for the corresponding tables are in progress. See + * comments atop worker.c. + */ + walrcv_create_slot(wrconn, slotname, false, false, CRS_NOEXPORT_SNAPSHOT, NULL); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", @@ -814,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + NULL, NULL /* no "two_phase" */ ); if (slotname_given) { @@ -848,6 +909,12 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (streaming_given) { + if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot set %s for two-phase enabled subscription", + "streaming = true"))); + values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); replaces[Anum_pg_subscription_substream - 1] = true; @@ -871,7 +938,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -916,7 +984,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -932,6 +1001,17 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + /* + * See ALTER_SUBSCRIPTION_REFRESH for details why this is + * not allowed. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Make sure refresh sees the new list of publications. */ @@ -963,7 +1043,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publist); @@ -980,6 +1061,17 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + /* + * See ALTER_SUBSCRIPTION_REFRESH for details why this is + * not allowed. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Only refresh the added/dropped list of publications. */ @@ -1009,7 +1101,34 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ + + /* + * The subscription two_phase commit implementation requires + * that replication has passed the initial table + * synchronization phase before the two_phase becomes properly + * enabled. + * + * But, having reached this two-phase commit "enabled" state + * we must not allow any subsequent table initialization to + * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed + * when the the user had requested two_phase = on mode. + * + * The exception to this restriction is when copy_data = + * false, because when copy_data is false the tablesync will + * start already in READY state and will exit directly without + * doing anything which could interfere with the apply + * worker's message handling. + * + * For more details see comments atop worker.c. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 021c1b3..eb03c53 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -73,6 +73,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, + bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); @@ -433,6 +434,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 140000) appendStringInfoString(&cmd, ", streaming 'on'"); + if (options->proto.logical.twophase && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfoString(&cmd, ", two_phase 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) @@ -833,7 +838,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, CRSSnapshotAction snapshot_action, + bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; @@ -847,6 +852,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, if (temporary) appendStringInfoString(&cmd, " TEMPORARY"); + if (two_phase) + appendStringInfoString(&cmd, " TWO_PHASE"); + if (conn->logical) { appendStringInfoString(&cmd, " LOGICAL pgoutput"); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7924581..99f2afc 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -374,11 +374,9 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * * XXX Now, this can even lead to a deadlock if the prepare * transaction is waiting to get it logically replicated for - * distributed 2PC. Currently, we don't have an in-core - * implementation of prepares for distributed 2PC but some - * out-of-core logical replication solution can have such an - * implementation. They need to inform users to not have locks - * on catalog tables in such transactions. + * distributed 2PC. This can be avoided by disallowing to + * prepare transactions that have locked [user] catalog tables + * exclusively. */ DecodePrepare(ctx, buf, &parsed); break; @@ -735,7 +733,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, - SnapBuildInitialConsistentPoint(ctx->snapshot_builder), + SnapBuildGetTwoPhaseAt(ctx->snapshot_builder), commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 4f6e87f..954dbb5 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -209,7 +209,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot, slot->data.initial_consistent_point); + need_full_snapshot, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -435,10 +435,19 @@ CreateInitDecodingContext(const char *plugin, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= (ctx->twophase_opt_given || slot->data.two_phase); + + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) + { + slot->data.two_phase = true; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -542,10 +551,21 @@ CreateDecodingContext(XLogRecPtr start_lsn, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is + * enabled at the time of slot creation, or when the two_phase option is + * given at the streaming start. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= (ctx->twophase_opt_given || slot->data.two_phase); + + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) + { + slot->data.two_phase = true; + slot->data.two_phase_at = start_lsn; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); + } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -612,7 +632,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; - slot->data.initial_consistent_point = ctx->reader->EndRecPtr; + if (slot->data.two_phase) + slot->data.two_phase_at = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 39471fd..b258174 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -973,8 +973,11 @@ replorigin_advance(RepOriginId node, /* * Due to - harmless - race conditions during a checkpoint we could see - * values here that are older than the ones we already have in memory. - * Don't overwrite those. + * values here that are older than the ones we already have in memory. We + * could also see older values for prepared transactions when the prepare + * is sent at a later point of time along with commit prepared and there + * are other transactions commits between prepare and commit prepared. See + * ReorderBufferFinishPrepared. Don't overwrite those. */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 2a1f983..9977eae 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -107,6 +107,218 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write BEGIN PREPARE to the output stream. + */ +void +logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE); + + /* fixed fields */ + pq_sendint64(out, txn->final_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction BEGIN PREPARE from the stream. + */ +void +logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data) +{ + /* read fields */ + begin_data->prepare_lsn = pq_getmsgint64(in); + if (begin_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "final_lsn not set in begin message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin message"); + begin_data->preparetime = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(begin_data->gid, pq_getmsgstring(in)); +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn is not set in prepare message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in prepare message"); + prepare_data->preparetime = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write COMMIT PREPARED to the output stream. + */ +void +logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. Additionally, the transaction + * must be prepared. See ReorderBufferFinishPrepared. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction COMMIT PREPARED from the stream. + */ +void +logicalrep_read_commit_prepared(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn is not set in commit prepared message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->preparetime = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write ROLLBACK PREPARED to the output stream. + */ +void +logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_end_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction ROLLBACK PREPARED from the stream. + */ +void +logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in rollback prepare message", flags); + + /* read fields */ + rollback_data->prepare_end_lsn = pq_getmsgint64(in); + if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_end_lsn is not set in commit prepared message"); + rollback_data->rollback_end_lsn = pq_getmsgint64(in); + if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "rollback_end_lsn is not set in commit prepared message"); + rollback_data->preparetime = pq_getmsgint64(in); + rollback_data->rollbacktime = pq_getmsgint64(in); + rollback_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(rollback_data->gid, pq_getmsgstring(in)); +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 52d0628..92c7fa7 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2680,7 +2680,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - XLogRecPtr initial_consistent_point, + XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit) { @@ -2706,12 +2706,13 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, /* * It is possible that this transaction is not decoded at prepare time - * either because by that time we didn't have a consistent snapshot or it - * was decoded earlier but we have restarted. We only need to send the - * prepare if it was not decoded earlier. We don't need to decode the xact - * for aborts if it is not done already. + * either because by that time we didn't have a consistent snapshot, or + * two_phase was not enabled, or it was decoded earlier but we have + * restarted. We only need to send the prepare if it was not decoded + * earlier. We don't need to decode the xact for aborts if it is not done + * already. */ - if ((txn->final_lsn < initial_consistent_point) && is_commit) + if ((txn->final_lsn < two_phase_at) && is_commit) { txn->txn_flags |= RBTXN_PREPARE; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index c5a8125..0daff13 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,15 +165,15 @@ struct SnapBuild XLogRecPtr start_decoding_at; /* - * LSN at which we found a consistent point at the time of slot creation. - * This is also the point where we have exported a snapshot for the - * initial copy. + * LSN at which two-phase decoding was enabled or LSN at which we found a + * consistent point at the time of slot creation. * - * The prepared transactions that are not covered by initial snapshot - * needs to be sent later along with commit prepared and they must be - * before this point. + * The prepared transactions that were skipped because previously + * two-phase was not enabled or are not covered by initial snapshot needs + * to be sent later along with commit prepared and they must be before + * this point. */ - XLogRecPtr initial_consistent_point; + XLogRecPtr two_phase_at; /* * Don't start decoding WAL until the "xl_running_xacts" information @@ -281,7 +281,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, - XLogRecPtr initial_consistent_point) + XLogRecPtr two_phase_at) { MemoryContext context; MemoryContext oldcontext; @@ -309,7 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; - builder->initial_consistent_point = initial_consistent_point; + builder->two_phase_at = two_phase_at; MemoryContextSwitchTo(oldcontext); @@ -370,12 +370,21 @@ SnapBuildCurrentState(SnapBuild *builder) } /* - * Return the LSN at which the snapshot was exported + * Return the LSN at which the two-phase decoding was first enabled. */ XLogRecPtr -SnapBuildInitialConsistentPoint(SnapBuild *builder) +SnapBuildGetTwoPhaseAt(SnapBuild *builder) { - return builder->initial_consistent_point; + return builder->two_phase_at; +} + +/* + * Set the LSN at which two-phase decoding is enabled. + */ +void +SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) +{ + builder->two_phase_at = ptr; } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 0638f5c..a1bae34 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -96,6 +96,7 @@ #include "access/table.h" #include "access/xact.h" +#include "catalog/indexing.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" @@ -114,8 +115,11 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" static bool table_states_valid = false; +static List *table_states_not_ready = NIL; +static bool FetchTableStates(bool *started_tx); StringInfo copybuf = NULL; @@ -359,7 +363,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Oid relid; TimestampTz last_start_time; }; - static List *table_states = NIL; static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; @@ -367,42 +370,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - if (!table_states_valid) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - /* Clean the old list. */ - list_free_deep(table_states); - table_states = NIL; - - StartTransactionCommand(); - started_tx = true; - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states = lappend(table_states, rstate); - } - MemoryContextSwitchTo(oldctx); - - table_states_valid = true; - } + FetchTableStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need * syncing. */ - if (table_states && !last_start_times) + if (table_states_not_ready && !last_start_times) { HASHCTL ctl; @@ -416,16 +391,37 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Clean up the hash table when we're done with all tables (just to * release the bit of memory). */ - else if (!table_states && last_start_times) + else if (!table_states_not_ready && last_start_times) { hash_destroy(last_start_times); last_start_times = NULL; } /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become properly + * 'enabled' at that time. + * + * Note: If the subscription has no tables then leave the state as PENDING, + * which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so two_phase can be enabled", + MySubscription->name))); + + proc_exit(0); + } + + /* * Process all tables that are being synchronized. */ - foreach(lc, table_states) + foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -1058,7 +1054,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * slot leading to a dangling slot on the server. */ HOLD_INTERRUPTS(); - walrcv_create_slot(wrconn, slotname, false /* permanent */ , + walrcv_create_slot(wrconn, slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); RESUME_INTERRUPTS(); @@ -1144,3 +1140,139 @@ copy_table_done: wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; } + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + */ +static bool +FetchTableStates(bool *started_tx) +{ + static int has_subrels = false; + + *started_tx = false; + + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY relations found then we know it does. But if + * table_state_no_ready was empty we still need to check again to see + * if there are 0 tables. + */ + has_subrels = (list_length(table_states_not_ready) > 0) || + HasSubscriptionRelations(MySubscription->oid); + + table_states_valid = true; + } + + return has_subrels; +} + +/* + * If the subscription has no tables then return false. + * + * Otherwise, are all tablesyncs READY? + * + * Note: This function is not suitable to be called from outside of apply or + * tablesync workers because MySubscription needs to be already initialized. + */ +bool +AllTablesyncsReady(void) +{ + bool found_busy = false; + bool started_tx = false; + bool has_subrels = false; + + /* We need up-to-date sync state info for subscription tables here. */ + has_subrels = FetchTableStates(&started_tx); + + found_busy = list_length(table_states_not_ready) > 0; + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(false); + } + + /* + * When there are no tables, then return false. + * When no tablesyncs are busy, then all are READY + */ + return has_subrels && !found_busy; +} + +/* + * Update the pg_subscription two_phase state of the current subscription. + */ +void +UpdateTwoPhaseState(char new_state) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || + new_state == LOGICALREP_TWOPHASE_STATE_PENDING || + new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + + if (!IsTransactionState()) + StartTransactionCommand(); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(MySubscription->oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, + "cache lookup failed for subscription oid %u", + MySubscription->oid); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* And update/set two_phase ENABLED */ + values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + CommitTransactionCommand(); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fb3ba5c..5c1ae6b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -49,6 +49,78 @@ * a new way to pass filenames to BufFile APIs so that we are allowed to open * the file we desired across multiple stream-open calls for the same * transaction. + * + * TWO_PHASE TRANSACTIONS + * ---------------------- + * Two phase transactions are replayed at prepare and then committed or + * rolled back at commit prepared and rollback prepared respectively. It is + * possible to have a prepared transaction that arrives at the apply worker + * when the tablesync is busy doing the initial copy. In this case, the apply + * worker skips all the prepared operations [e.g. inserts] while the tablesync + * was still busy (see the condition of should_apply_changes_for_rel). The + * tablesync worker might not get such a prepared transaction because say it + * was prior to the initial consistent point but might have got some later + * commits. Now, the tablesync worker will exit without doing anything for the + * prepared transaction skipped by the apply worker as the sync location for it + * will be already ahead of the apply worker's current location. This would lead + * to an "empty prepare", because later when the apply worker does the commit + * prepare, there is nothing in it (the inserts were skipped earlier). + * + * To avoid this, and similar prepare confusions the subscription two_phase + * commit is enabled only after the initial sync is over. The two_phase option + * has been implemented as a tri-state with values DISABLED, PENDING, and + * ENABLED. + * + * Even if the user specifies they want a subscription with two_phase = on, + * internally it will start with a tri-state of PENDING which only becomes + * ENABLED after all tablesync initializations are completed - i.e. when all + * tablesync workers have reached their READY state. In other words, the value + * PENDING is only a temporary state for subscription start-up. + * + * Until the two_phase is properly available (ENABLED) the subscription will + * behave as if two_phase = off. When the apply worker detects that all + * tablesyncs have become READY (while the tri-state was PENDING) it will + * restart the apply worker process. This happens in + * process_sync_tables_for_apply. + * + * When the (re-started) apply worker finds that all tablesyncs are READY for a + * two_phase tri-state of PENDING it calls wal_startstreaming to enable the + * publisher for two-phase commit and updates the tri-state value + * PENDING -> ENABLED. Now, it is possible that during the time we have not + * enabled two_phase, the publisher (replication server) would have skipped some + * prepares but we ensure that such prepares are sent along with commit + * prepare, see ReorderBufferFinishPrepared. + * + * If the subscription has no tables then a two_phase tri-state PENDING is + * left unchanged. This lets the user still do an ALTER TABLE REFRESH + * PUBLICATION which might otherwise be disallowed (see below). + * + * If ever a user needs to be aware of the tri-state value, they can fetch it + * from the pg_subscription catalog (see column subtwophasestate). + * + * We don't allow to toggle two_phase option of a subscription because it can + * lead to the inconsistent replica. Consider, initially, it was on and we have + * received some prepare then we turn it off, now at commit time the server + * will send the entire transaction data along with the commit. With some more + * analysis, we can allow changing this option from off to on but not sure if + * that alone would be useful. + * + * Finally, to avoid problems mentioned in previous paragraphs from any + * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on' + * to 'off' and then again back to 'on') there is a restriction for + * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when + * the two_phase tri-state is ENABLED, except when copy_data = false. + * + * We can get prepare of the same GID more than once for the genuine cases + * where we have defined multiple subscriptions for publications on the same + * server and prepared transaction has operations on tables subscribed to those + * subscriptions. For such cases, if we use the GID sent by publisher one of + * the prepares will be successful and others will fail, in which case the + * server will send them again. Now, this can lead to a deadlock if user has + * set synchronous_standby_names for all the subscriptions on subscriber. To + * avoid such deadlocks, we generate a unique GID (consisting of the + * subscription oid and the xid of the prepared transaction) for each prepare + * transaction on the subscriber. *------------------------------------------------------------------------- */ @@ -59,6 +131,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" @@ -246,6 +319,9 @@ static void apply_handle_tuple_routing(ResultRelInfo *relinfo, LogicalRepRelMapEntry *relmapentry, CmdType operation); +/* Compute GID for two_phase transactions */ +static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); + /* * Should this worker apply changes for given relation. * @@ -720,6 +796,180 @@ apply_handle_commit(StringInfo s) } /* + * Handle BEGIN PREPARE message. + */ +static void +apply_handle_begin_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData begin_data; + char gid[GIDSIZE]; + + /* Tablesync should never receive prepare. */ + Assert(!am_tablesync_worker()); + + logicalrep_read_begin_prepare(s, &begin_data); + + /* The gid must not already be prepared. */ + TwoPhaseTransactionGid(MySubscription->oid, begin_data.xid, + gid, sizeof(gid)); + Assert(!LookupGXact(gid, begin_data.end_lsn, begin_data.preparetime)); + + remote_final_lsn = begin_data.prepare_lsn; + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_prepare(s, &prepare_data); + + Assert(prepare_data.prepare_lsn == remote_final_lsn); + + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction. It is done this way because at + * commit prepared time, we won't know whether we have skipped preparing a + * transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worthwhile because such cases shouldn't be common. + */ + ensure_transaction(); + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + PrepareTransactionBlock(gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a COMMIT PREPARED of a previously PREPARED transaction. + */ +static void +apply_handle_commit_prepared(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_commit_prepared(s, &prepare_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + FinishPreparedTransaction(gid, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION. + */ +static void +apply_handle_rollback_prepared(StringInfo s) +{ + LogicalRepRollbackPreparedTxnData rollback_data; + char gid[GIDSIZE]; + + logicalrep_read_rollback_prepared(s, &rollback_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, + gid, sizeof(gid)); + + /* + * It is possible that we haven't received prepare because it occurred + * before walsender reached a consistent point or the two_phase was still + * not enabled by that time, so in such cases, we need to skip rollback + * prepared. + */ + if (LookupGXact(gid, rollback_data.prepare_end_lsn, + rollback_data.preparetime)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; + replorigin_session_origin_timestamp = rollback_data.rollbacktime; + + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + FinishPreparedTransaction(gid, false); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(rollback_data.rollback_end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(rollback_data.rollback_end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -1970,6 +2220,22 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); return; + + case LOGICAL_REP_MSG_BEGIN_PREPARE: + apply_handle_begin_prepare(s); + return; + + case LOGICAL_REP_MSG_PREPARE: + apply_handle_prepare(s); + return; + + case LOGICAL_REP_MSG_COMMIT_PREPARED: + apply_handle_commit_prepared(s); + return; + + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + apply_handle_rollback_prepared(s); + return; } ereport(ERROR, @@ -2446,6 +2712,9 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); + /* two-phase should not be altered */ + Assert(newsub->twophasestate == MySubscription->twophasestate); + /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker. @@ -2932,6 +3201,20 @@ cleanup_subxact_info() subxact_data.nsubxacts_max = 0; } +/* + * Form the prepared transaction GID for two_phase transactions. + * + * Return the GID in the supplied buffer. + */ +static void +TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) +{ + Assert(subid != InvalidRepOriginId); + Assert(TransactionIdIsValid(xid)); + + snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -3098,15 +3381,67 @@ ApplyWorkerMain(Datum main_arg) options.logical = true; options.startpoint = origin_startpos; options.slotname = myslotname; + /* + * FIXME - 9/April. The below code is a temporary hack to set the protocol + * version 3 (for two_phase) for server version 140000, even though this + * feature did not make it into the PG 14 release. + * + * When the PG 15 development officially starts someone will update the + * PG_VERSION_NUM (pg_config.h) to be 150000, and when that happens we need + * to revisit this code to remove this hack and write the code properly. + * + * e.g. + * if >= 15000 use LOGICALREP_PROTO_TWOPHASE_VERSION_NUM + * else if >= 14000 use LOGICALREP_PROTO_STREAM_VERSION_NUM + * else use LOGICALREP_PROTO_TWOPHASE_VERSION_NUM + */ options.proto.logical.proto_version = walrcv_server_version(wrconn) >= 140000 ? - LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; + LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; + options.proto.logical.twophase = false; - /* Start normal logical streaming replication. */ - walrcv_startstreaming(wrconn, &options); + if (!am_tablesync_worker()) + { + /* + * Even when the two_phase mode is requested by the user, it remains + * as the tri-state PENDING until all tablesyncs have reached READY + * state. Only then, can it become properly ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + /* Start streaming with two_phase enabled */ + options.proto.logical.twophase = true; + walrcv_startstreaming(wrconn, &options); + + UpdateTwoPhaseState(LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + } + else + { + walrcv_startstreaming(wrconn, &options); + } + + ereport(DEBUG1, + (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + } + else + { + /* Start normal logical streaming replication. */ + walrcv_startstreaming(wrconn, &options); + } /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f68348d..ecf9b9a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -51,6 +51,16 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, @@ -70,6 +80,9 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx); +static void send_repl_origin(LogicalDecodingContext *ctx, + RepOriginId origin_id, XLogRecPtr origin_lsn, + bool send_origin); /* * Entry in the map used to remember which relation schemas we sent. @@ -148,6 +161,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn; + + cb->begin_prepare_cb = pgoutput_begin_prepare_txn; + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -159,6 +177,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; cb->stream_truncate_cb = pgoutput_truncate; + /* transaction streaming - two-phase commit */ + cb->stream_prepare_cb = NULL; } static void @@ -170,10 +190,12 @@ parse_output_parameters(List *options, PGOutputData *data) bool binary_option_given = false; bool messages_option_given = false; bool streaming_given = false; + bool two_phase_option_given = false; data->binary = false; data->streaming = false; data->messages = false; + data->two_phase = false; foreach(lc, options) { @@ -249,8 +271,29 @@ parse_output_parameters(List *options, PGOutputData *data) data->streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_option_given = true; + + data->two_phase = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); + + /* + * Do additional checking for the disallowed combination of two_phase and + * streaming. While streaming and two_phase can theoretically be + * supported, it needs more analysis to allow them together. + */ + if (data->two_phase && data->streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "two_phase", "streaming"))); } } @@ -322,6 +365,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Also remember we're currently not streaming any transaction. */ in_streaming = false; + /* + * Here, we just check whether the two-phase option is passed by + * plugin and decide whether to enable it at later point of time. It + * remains enabled if the previous start-up has done so. But we only + * allow the option to be passed in with sufficient version of the + * protocol, and when the output plugin supports it. + */ + if (!data->two_phase) + ctx->twophase_opt_given = false; + else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM))); + else if (!ctx->twophase) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("two-phase commit requested, but not supported by output plugin"))); + else + ctx->twophase_opt_given = true; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -334,8 +398,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } else { - /* Disable the streaming during the slot initialization mode. */ + /* + * Disable the streaming and prepared transactions during the slot + * initialization mode. + */ ctx->streaming = false; + ctx->twophase = false; } } @@ -350,29 +418,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); - if (send_replication_origin) - { - char *origin; - - /*---------- - * XXX: which behaviour do we want here? - * - * Alternatives: - * - don't send origin message if origin name not found - * (that's what we do now) - * - throw error - that will break replication, not good - * - send some special "unknown" origin - *---------- - */ - if (replorigin_by_oid(txn->origin_id, true, &origin)) - { - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_origin(ctx->out, origin, txn->origin_lsn); - } - - } + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); OutputPluginWrite(ctx, true); } @@ -392,6 +439,68 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * BEGIN PREPARE callback + */ +static void +pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_begin_prepare(ctx->out, txn); + + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); + + OutputPluginWrite(ctx, true); +} + +/* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * ROLLBACK PREPARED callback + */ +static void +pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, + prepare_time); + OutputPluginWrite(ctx, true); +} + +/* * Write the current schema of the relation and its ancestor (if any) if not * done yet. */ @@ -819,18 +928,8 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); - if (send_replication_origin) - { - char *origin; - - if (replorigin_by_oid(txn->origin_id, true, &origin)) - { - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); - } - } + send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr, + send_replication_origin); OutputPluginWrite(ctx, true); @@ -1236,3 +1335,33 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubtruncate = false; } } + +/* Send Replication origin */ +static void +send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, + XLogRecPtr origin_lsn, bool send_origin) +{ + if (send_origin) + { + char *origin; + + /*---------- + * XXX: which behaviour do we want here? + * + * Alternatives: + * - don't send origin message if origin name not found + * (that's what we do now) + * - throw error - that will break replication, not good + * - send some special "unknown" origin + *---------- + */ + if (replorigin_by_oid(origin_id, true, &origin)) + { + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + + logicalrep_write_origin(ctx->out, origin, origin_lsn); + } + } +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index eb283a8..8c1f353 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void); %token K_SLOT %token K_RESERVE_WAL %token K_TEMPORARY +%token K_TWO_PHASE %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT @@ -102,6 +103,7 @@ static SQLCmd *make_sqlcmd(void); %type plugin_opt_arg %type opt_slot var_name %type opt_temporary +%type opt_two_phase %type create_slot_opt_list %type create_slot_opt @@ -241,16 +243,17 @@ create_replication_slot: cmd->options = $5; $$ = (Node *) cmd; } - /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list + /* CREATE_REPLICATION_SLOT slot TEMPORARY TWO_PHASE LOGICAL plugin */ + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary opt_two_phase K_LOGICAL IDENT create_slot_opt_list { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_LOGICAL; cmd->slotname = $2; cmd->temporary = $3; - cmd->plugin = $5; - cmd->options = $6; + cmd->two_phase = $4; + cmd->plugin = $6; + cmd->options = $7; $$ = (Node *) cmd; } ; @@ -365,6 +368,11 @@ opt_temporary: | /* EMPTY */ { $$ = false; } ; +opt_two_phase: + K_TWO_PHASE { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + opt_slot: K_SLOT IDENT { $$ = $2; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index dcc3c3f..c038a63 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } +TWO_PHASE { return K_TWO_PHASE; } EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } USE_SNAPSHOT { return K_USE_SNAPSHOT; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 75a087c..91224e0 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -285,6 +285,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.database = db_specific ? MyDatabaseId : InvalidOid; slot->data.persistency = persistency; slot->data.two_phase = two_phase; + slot->data.two_phase_at = InvalidXLogRecPtr; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 9a0e380..92f3373 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -365,7 +365,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index d0ea489..5941868 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -51,6 +51,7 @@ #include "catalog/pg_largeobject_d.h" #include "catalog/pg_largeobject_metadata_d.h" #include "catalog/pg_proc_d.h" +#include "catalog/pg_subscription.h" #include "catalog/pg_trigger_d.h" #include "catalog/pg_type_d.h" #include "common/connect.h" @@ -4368,6 +4369,7 @@ getSubscriptions(Archive *fout) int i_subname; int i_rolname; int i_substream; + int i_subtwophasestate; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4411,9 +4413,16 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " false AS subbinary,\n"); if (fout->remoteVersion >= 140000) - appendPQExpBufferStr(query, " s.substream\n"); + appendPQExpBufferStr(query, " s.substream,\n"); else - appendPQExpBufferStr(query, " false AS substream\n"); + appendPQExpBufferStr(query, " false AS substream,\n"); + + if (fout->remoteVersion >= 140000) + appendPQExpBufferStr(query, " s.subtwophasestate\n"); + else + appendPQExpBuffer(query, + " '%c' AS subtwophasestate\n", + LOGICALREP_TWOPHASE_STATE_DISABLED); appendPQExpBufferStr(query, "FROM pg_subscription s\n" @@ -4434,6 +4443,7 @@ getSubscriptions(Archive *fout) i_subpublications = PQfnumber(res, "subpublications"); i_subbinary = PQfnumber(res, "subbinary"); i_substream = PQfnumber(res, "substream"); + i_subtwophasestate = PQfnumber(res, "subtwophasestate"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4459,6 +4469,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subbinary)); subinfo[i].substream = pg_strdup(PQgetvalue(res, i, i_substream)); + subinfo[i].subtwophasestate = + pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4486,6 +4498,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) char **pubnames = NULL; int npubnames = 0; int i; + char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'}; if (!(subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) return; @@ -4527,6 +4540,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->substream, "f") != 0) appendPQExpBufferStr(query, ", streaming = on"); + if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0) + appendPQExpBufferStr(query, ", two_phase = on"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 5340843..70c072d 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -639,6 +639,7 @@ typedef struct _SubscriptionInfo char *subslotname; char *subbinary; char *substream; + char *subtwophasestate; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index fdc2a89..682b5ff 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6389,7 +6389,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false}; + false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6415,13 +6415,18 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { - /* Binary mode and streaming are only supported in v14 and higher */ + /* + * Binary, streaming, and two_phase are only supported in v14 and + * higher + */ if (pset.sversion >= 140000) appendPQExpBuffer(&buf, ", subbinary AS \"%s\"\n" - ", substream AS \"%s\"\n", + ", substream AS \"%s\"\n" + ", subtwophasestate AS \"%s\"\n", gettext_noop("Binary"), - gettext_noop("Streaming")); + gettext_noop("Streaming"), + gettext_noop("Two phase commit")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 26ac786..8bab45c 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2765,7 +2765,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("copy_data", "connect", "create_slot", "enabled", - "slot_name", "synchronous_commit"); + "slot_name", "synchronous_commit", "streaming", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 91786da..e27e1a8 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -58,4 +58,6 @@ extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); +extern bool LookupGXact(const char *gid, XLogRecPtr prepare_at_lsn, + TimestampTz origin_prepare_timestamp); #endif /* TWOPHASE_H */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index a5d6efd..ca9814f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -22,6 +22,14 @@ #include "nodes/pg_list.h" +/* + * two_phase tri-state values. See comments atop worker.c to know more about + * these states. + */ +#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd' +#define LOGICALREP_TWOPHASE_STATE_PENDING 'p' +#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' + /* ---------------- * pg_subscription definition. cpp turns this into * typedef struct FormData_pg_subscription @@ -54,6 +62,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + char subtwophasestate; /* Decode 2PC PREPARE? */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -91,6 +101,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + char twophasestate; /* Decode 2PC PREPARE? */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index ed94f57..765e9b5 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -88,6 +88,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); +extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid); extern List *GetSubscriptionNotReadyRelations(Oid subid); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 7dfcb7b..72f049b 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -88,11 +88,16 @@ typedef struct LogicalDecodingContext bool streaming; /* - * Does the output plugin support two-phase decoding, and is it enabled? + * Does the output plugin support two-phase decoding. */ bool twophase; /* + * Is two-phase option given by output plugin? + */ + bool twophase_opt_given; + + /* * State for writing output. */ bool accept_writes; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 55b90c0..540d8ee 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -13,6 +13,7 @@ #ifndef LOGICAL_PROTO_H #define LOGICAL_PROTO_H +#include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" @@ -26,12 +27,16 @@ * connect time. * * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with - * support for streaming large transactions. + * support for streaming large transactions. Introduced in PG14. + * + * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with + * support for two-phase commit PREPARE decoding. Introduced in PG15. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 -#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM +#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 +#define LOGICALREP_PROTO_MAX_VERSION_NUM 3 /* * Logical message types @@ -55,6 +60,10 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', + LOGICAL_REP_MSG_PREPARE = 'P', + LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', + LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', @@ -115,6 +124,7 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ typedef struct LogicalRepCommitData { XLogRecPtr commit_lsn; @@ -122,6 +132,39 @@ typedef struct LogicalRepCommitData TimestampTz committime; } LogicalRepCommitData; +/* + * Prepared transaction protocol information. This same structure is used to + * hold the information for begin_prepare, prepare, and commit prepared + * transaction. prepare_lsn and preparetime are used to store commit lsn and + * commit time for commit prepared. + */ +typedef struct LogicalRepPreparedTxnData +{ + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz preparetime; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepPreparedTxnData; + +/* + * Rollback Prepared transaction protocol information. The prepare information + * prepare_end_lsn and prepare_time are used to check if the downstream has + * received this prepared transaction in which case it can apply the rollback, + * otherwise, it can skip the rollback operation. The gid alone is not + * sufficient because the downstream node can have a prepared transaction with + * same identifier. + */ +typedef struct LogicalRepRollbackPreparedTxnData +{ + XLogRecPtr prepare_end_lsn; + XLogRecPtr rollback_end_lsn; + TimestampTz preparetime; + TimestampTz rollbacktime; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepRollbackPreparedTxnData; + extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); @@ -129,6 +172,24 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn); +extern void logicalrep_read_begin_prepare(StringInfo in, + LogicalRepPreparedTxnData *begin_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern void logicalrep_read_commit_prepared(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); +extern void logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data); + + extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 51e7c03..0dc460f 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -27,6 +27,7 @@ typedef struct PGOutputData bool binary; bool streaming; bool messages; + bool two_phase; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 565a961..6c9f2c6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -643,7 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - XLogRecPtr initial_consistent_point, + XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c..db68551 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -92,11 +92,10 @@ typedef struct ReplicationSlotPersistentData XLogRecPtr confirmed_flush; /* - * LSN at which we found a consistent point at the time of slot creation. - * This is also the point where we have exported a snapshot for the - * initial copy. + * LSN at which we enabled two_phase commit for this slot or LSN at which + * we found a consistent point at the time of slot creation. */ - XLogRecPtr initial_consistent_point; + XLogRecPtr two_phase_at; /* * Allow decoding of prepared transactions? diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index fbabce6..de72124 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -62,7 +62,7 @@ extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, - XLogRecPtr initial_consistent_point); + XLogRecPtr two_phase_at); extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); @@ -76,7 +76,8 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); -extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder); +extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder); +extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 4fd7c25..9edf907 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -181,6 +181,7 @@ typedef struct List *publication_names; /* String list of publications */ bool binary; /* Ask publisher to use binary */ bool streaming; /* Streaming of large transactions */ + bool twophase; /* Enable 2PC decoding of PREPARE */ } logical; } proto; } WalRcvStreamOptions; @@ -347,6 +348,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, + bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); @@ -420,8 +422,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1cac75e..daf6ad4 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,9 @@ extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); +extern bool AllTablesyncsReady(void); +extern void UpdateTwoPhaseState(char new_state); + void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 09576c1..a10231e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -162,19 +162,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -185,19 +185,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) -- fail - publication already exists @@ -212,10 +212,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist (1 row) -- fail - publication used more then once @@ -233,10 +233,10 @@ ERROR: unrecognized subscription parameter: "copy_data" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -263,6 +263,43 @@ ALTER SUBSCRIPTION regress_testsub DISABLE; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; DROP FUNCTION func; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); +ERROR: two_phase requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist +(1 row) + +--fail - alter of two_phase option not supported. +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); +ERROR: cannot alter two_phase option +--fail - cannot set streaming when two_phase enabled +ALTER SUBSCRIPTION regress_testsub SET (streaming = true); +ERROR: cannot set streaming = true for two-phase enabled subscription +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; +-- fail - two_phase and streaming are mutually exclusive. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); +ERROR: two_phase = true and streaming = true are mutually exclusive options +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +------+-------+---------+-------------+--------+-----------+------------------+--------------------+---------- +(0 rows) + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 308c098..b732871 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -202,6 +202,31 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; DROP FUNCTION func; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); + +\dRs+ +--fail - alter of two_phase option not supported. +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); + +--fail - cannot set streaming when two_phase enabled +ALTER SUBSCRIPTION regress_testsub SET (streaming = true); + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + +-- fail - two_phase and streaming are mutually exclusive. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); + +\dRs+ + + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl new file mode 100644 index 0000000..364e6eb --- /dev/null +++ b/src/test/subscription/t/021_twophase.pl @@ -0,0 +1,293 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 20; + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full SELECT generate_series(1,10); + PREPARE TRANSACTION 'some_initial_data'; + COMMIT PREPARED 'some_initial_data';"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE tab_full"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (two_phase = on)"); + +# Wait for subscriber to finish initialization +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Also wait for two-phase to be enabled +my $twophase_query = + "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; +$node_subscriber->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +############################### +# check that 2PC gets replicated to subscriber +# then COMMIT PREPARED +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# check that 2PC gets replicated to subscriber +# then ROLLBACK PREPARED +############################### + +$node_publisher->safe_psql('postgres'," + BEGIN; + INSERT INTO tab_full VALUES (12); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets aborted on subscriber +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is aborted on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Check that ROLLBACK PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# rollback post the restart +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are rolled back +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(0), 'Rows rolled back are not on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (subscriber only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (14); + INSERT INTO tab_full VALUES (15); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (14,15);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (16); + INSERT INTO tab_full VALUES (17); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_publisher->stop('immediate'); +$node_publisher->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (16,17);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Test nested transaction with 2PC +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (21); + SAVEPOINT sp_inner; + INSERT INTO tab_full VALUES (22); + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# COMMIT +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + +$node_publisher->wait_for_catchup($appname); + +# check the tx state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber'); + +# check inserts are visible. 22 should be rolled back. 21 should be committed. +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (21);"); +is($result, qq(1), 'Rows committed are on the subscriber'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (22);"); +is($result, qq(0), 'Rows rolled back are not on the subscriber'); + +############################### +# Test using empty GID +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (51); + PREPARE TRANSACTION '';"); +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# ROLLBACK +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED '';"); + +# check that 2PC gets aborted on subscriber +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# check all the cleanup +############################### + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl new file mode 100644 index 0000000..76b224a --- /dev/null +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -0,0 +1,236 @@ +# Test cascading logical replication of 2PC. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 29; + +############################### +# Setup a cascade of pub/sub nodes. +# node_A -> node_B -> node_C +############################### + +# Initialize nodes +# node_A +my $node_A = get_new_node('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_A->start; +# node_B +my $node_B = get_new_node('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_B->start; +# node_C +my $node_C = get_new_node('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_C->start; + +# Create some pre-existing content on node_A +$node_A->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_A->safe_psql('postgres', " + INSERT INTO tab_full SELECT generate_series(1,10);"); + +# Create the same tables on node_B amd node_C +$node_B->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_C->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication + +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); +my $appname_B = 'tap_sub_B'; +$node_B->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_B + CONNECTION '$node_A_connstr application_name=$appname_B' + PUBLICATION tap_pub_A + WITH (two_phase = on)"); + +# node_B (pub) -> node_C (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); +my $appname_C = 'tap_sub_C'; +$node_C->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_C + CONNECTION '$node_B_connstr application_name=$appname_C' + PUBLICATION tap_pub_B + WITH (two_phase = on)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# Also wait for two-phase to be enabled +my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; +$node_B->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; +$node_C->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +is(1,1, "Cascade setup is complete"); + +my $result; + +############################### +# check that 2PC gets replicated to subscriber(s) +# then COMMIT PREPARED +############################### + +# 2PC PREPARE +$node_A->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the tx state is prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction was committed on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C'); + +# check the tx state is ended on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber C'); + +############################### +# check that 2PC gets replicated to subscriber(s) +# then ROLLBACK PREPARED +############################### + +# 2PC PREPARE +$node_A->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the tx state is prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC ROLLBACK +$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction is aborted on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C'); + +# check the tx state is ended on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +############################### +# Test nested transactions with 2PC +############################### + +# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT +$node_A->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (21); + SAVEPOINT sp_inner; + INSERT INTO tab_full VALUES (22); + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the tx state prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the tx state is ended on subscriber +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +# check inserts are visible at subscriber(s). +# 22 should be rolled back. +# 21 should be committed. +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (21);"); +is($result, qq(1), 'Rows committed are present on subscriber B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (22);"); +is($result, qq(0), 'Rows rolled back are not present on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (21);"); +is($result, qq(1), 'Rows committed are present on subscriber C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (22);"); +is($result, qq(0), 'Rows rolled back are not present on subscriber C'); + +############################### +# check all the cleanup +############################### + +# cleanup the node_B => node_C pub/sub +$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C"); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber node C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber node C'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node B'); + +# cleanup the node_A => node_B pub/sub +$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B"); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber node B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber node B'); +$result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node A'); + +# shutdown +$node_C->stop('fast'); +$node_B->stop('fast'); +$node_A->stop('fast'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c7aff67..2b89efc 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1347,9 +1347,11 @@ LogicalRepBeginData LogicalRepCommitData LogicalRepCtxStruct LogicalRepPartMapEntry +LogicalRepPreparedTxnData LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation +LogicalRepRollbackPreparedTxnData LogicalRepTupleData LogicalRepTyp LogicalRepWorker -- 1.8.3.1