From b1cd3e9ab91203bbc5ce77e314ea646ee590a9ed Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Sat, 20 Mar 2021 11:49:15 +1100 Subject: [PATCH v65 1/3] Add support for prepared transactions to built-in logical replication. To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the below things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable it. We enable the two_phase once the initial data sync is over. * Add a new option to enable two_phase while creating a slot. We don't use this option in the patch but this will allow the outside replication solutions using streaming replication protocol to use it. We don't support the below operations: * ALTER SUBSCRIPTION REFRESH PUBLICATION when two_phase enabled. * ALTER SUBSCRIPTION SET PUBLICATION WITH (refresh = true) when two_phase enabled. * Prepare API for in-progress transactions is not supported. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, C Vignesh, Dilip Kumar, Takamichi Osumi Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 12 + doc/src/sgml/protocol.sgml | 16 +- doc/src/sgml/ref/alter_subscription.sgml | 4 +- doc/src/sgml/ref/create_subscription.sgml | 36 +++ src/backend/access/transam/twophase.c | 68 +++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 121 +++++++- .../libpqwalreceiver/libpqwalreceiver.c | 11 +- src/backend/replication/logical/decode.c | 2 +- src/backend/replication/logical/logical.c | 37 ++- src/backend/replication/logical/origin.c | 7 +- src/backend/replication/logical/proto.c | 212 +++++++++++++ src/backend/replication/logical/reorderbuffer.c | 4 +- src/backend/replication/logical/snapbuild.c | 32 +- src/backend/replication/logical/tablesync.c | 204 ++++++++++--- src/backend/replication/logical/worker.c | 328 ++++++++++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 207 ++++++++++--- src/backend/replication/repl_gram.y | 16 +- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/slot.c | 1 + src/backend/replication/walreceiver.c | 2 +- src/bin/pg_dump/pg_dump.c | 20 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 13 +- src/bin/psql/tab-complete.c | 2 +- src/include/access/twophase.h | 2 + src/include/catalog/pg_subscription.h | 8 + src/include/replication/logical.h | 7 +- src/include/replication/logicalproto.h | 77 ++++- src/include/replication/reorderbuffer.h | 2 +- src/include/replication/slot.h | 7 +- src/include/replication/snapbuild.h | 5 +- src/include/replication/walreceiver.h | 6 +- src/include/replication/worker_internal.h | 3 + src/test/regress/expected/subscription.out | 93 ++++-- src/test/regress/sql/subscription.sql | 25 ++ src/tools/pgindent/typedefs.list | 3 + 38 files changed, 1437 insertions(+), 161 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 68d1960..00d43d9 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7589,6 +7589,18 @@ SCRAM-SHA-256$<iteration count>:&l + subtwophasestate char + + + State code: + d = two_phase mode was not requested, so is disabled; + p = two_phase mode was requested, but is pending enablement; + e = two_phase mode was requested, and is enabled. + + + + + subconninfo text diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 43092fe..c285ef7 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1914,7 +1914,7 @@ The commands accepted in replication mode are: - CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] } + CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] [ TWO_PHASE ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] } CREATE_REPLICATION_SLOT @@ -1956,6 +1956,20 @@ The commands accepted in replication mode are: + TWO_PHASE + + + Specify that this replication slot supports decode of two-phase + transactions. With this option, two-phase commands like + PREPARE TRANSACTION, COMMIT PREPARED + and ROLLBACK PREPARED are decoded and transmitted. + The transaction will be decoded and transmitted at + PREPARE TRANSACTION time. + + + + + RESERVE_WAL diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 0adf68e..85cc8bb 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -64,7 +64,9 @@ ALTER SUBSCRIPTION name RENAME TO < Commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh - option as true cannot be executed inside a transaction block. + option as true cannot be executed inside a transaction block. They also + cannot be executed with copy_data = true if the + subscription is using two_phase commit. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e812bee..a5c9158 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -237,6 +237,42 @@ CREATE SUBSCRIPTION subscription_name + + + The streaming option cannot be used along with + two_phase option. + + + + + + two_phase (boolean) + + + Specifies whether two-phase commit is enabled for this subscription. + The default is false. + + + + When two-phase commit is enabled then the decoded transactions are sent + to the subscriber on the PREPARE TRANSACTION. By default, the transaction + prepared on publisher is decoded as normal transaction at commit. + + + + The two-phase commit implementation requires that the replication has + successfully passed the intial table synchronization phase. This means even when + two_phase is enabled for the subscription, the internal two-phase state remains + temporarily "pending" until the initialization phase is completed. See column + subtwophase of + to know the actual two-phase state. + + + + The two_phase option cannot be used along with + streaming option. + + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 6023e7c..c58c46d 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2445,3 +2445,71 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } + +/* + * LookupGXact + * Check if the prepared transaction with the given GID and lsn is around. + * + * Note that we always compare with the LSN where prepare ends because that is + * what is stored as origin_lsn in the 2PC file. + * + * This function is primarily used to check if the prepared transaction + * received from the upstream (remote node) already exists. Checking only GID + * is not sufficient because a different prepared xact with the same GID can + * exist on the same node. So, we are ensuring to match origin_lsn and + * origin_timestamp of prepared xact to avoid the possibility of a match of + * prepared xact from two different nodes. + */ +bool +LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, + TimestampTz origin_prepare_timestamp) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if ((gxact->valid && strcmp(gxact->gid, gid) == 0)) + { + char *buf; + TwoPhaseFileHeader *hdr; + + /* + * We are neither expecting the collisions of GXACTs (same gid) + * between publisher and subscribers nor the apply worker restarts + * after prepared xacts, so we perform all I/O while holding + * TwoPhaseStateLock for simplicity. + * + * To move the I/O out of the lock, we need to ensure that no + * other backend commits the prepared xact in the meantime. We can + * do this optimization if we encounter many collisions in GID + * between publisher and subscriber. + */ + if (gxact->ondisk) + buf = ReadTwoPhaseFile(gxact->xid, false); + else + { + Assert(gxact->prepare_start_lsn); + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + } + + hdr = (TwoPhaseFileHeader *) buf; + + if (hdr->origin_lsn == prepare_end_lsn && + hdr->origin_timestamp == origin_prepare_timestamp) + { + found = true; + pfree(buf); + break; + } + + pfree(buf); + } + } + LWLockRelease(TwoPhaseStateLock); + return found; +} diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 4039768..658d9f8 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->enabled = subform->subenabled; sub->binary = subform->subbinary; sub->stream = subform->substream; + sub->twophasestate = subform->subtwophasestate; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 0dca65d..d453902 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1180,7 +1180,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index bfd3514..f1856d7 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -67,7 +67,8 @@ parse_subscription_options(List *options, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) + bool *streaming_given, bool *streaming, + bool *twophase_given, bool *twophase) { ListCell *lc; bool connect_given = false; @@ -108,6 +109,11 @@ parse_subscription_options(List *options, *streaming_given = false; *streaming = false; } + if (twophase) + { + *twophase_given = false; + *twophase = false; + } /* Parse options */ foreach(lc, options) @@ -213,6 +219,32 @@ parse_subscription_options(List *options, *streaming_given = true; *streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0) + { + /* + * Do not allow toggling of two_phase option. Doing so could + * cause missing of transactions and lead to an inconsistent + * replica. See comments atop worker.c + * + * Note: twophase == NULL indicates that this call originated + * from AlterSubscription. + */ + if (!twophase) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot alter two_phase option"))); + + } + + if (*twophase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *twophase_given = true; + *twophase = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -283,6 +315,21 @@ parse_subscription_options(List *options, errmsg("subscription with %s must also set %s", "slot_name = NONE", "create_slot = false"))); } + + /* + * Do additional checking for the disallowed combination of two_phase and + * streaming. While streaming and two_phase can theoretically be supported, + * it needs more analysis to allow them together. + */ + if (twophase && *twophase_given && *twophase) + { + if (streaming && *streaming_given && *streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "two_phase = true", "streaming = true"))); + } + } /* @@ -358,6 +405,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool copy_data; bool streaming; bool streaming_given; + bool twophase; + bool twophase_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -382,7 +431,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + &twophase_given, &twophase); /* * Since creating a replication slot is not transactional, rolling back @@ -450,6 +500,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -528,7 +582,16 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) { Assert(slotname); - walrcv_create_slot(wrconn, slotname, false, + /* + * Even if two_phase is set, don't create the slot with + * two-phase enabled. Will enable it once all the tables are + * synced and ready. This avoids race-conditions like prepared + * transactions being skipped due to changes not being applied + * due to checks in should_apply_changes_for_rel() when + * tablesync for the corresponding tables are in progress. See + * comments atop worker.c. + */ + walrcv_create_slot(wrconn, slotname, false, false, CRS_NOEXPORT_SNAPSHOT, NULL); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", @@ -835,7 +898,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + NULL, NULL /* no "two_phase" */); if (slotname_given) { @@ -869,6 +933,12 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (streaming_given) { + if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot set %s for two-phase enabled subscription", + "streaming = true"))); + values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); replaces[Anum_pg_subscription_substream - 1] = true; @@ -892,7 +962,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -937,7 +1008,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -953,6 +1025,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + /* See ALTER_SUBSCRIPTION_REFRESH for details why this is not allow. */ + if (sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED && copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Make sure refresh sees the new list of publications. */ @@ -982,7 +1062,34 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ + + /* + * The subscription two_phase commit implementation requires + * that replication has passed the initial table + * synchronization phase before the two_phase becomes properly + * enabled. + * + * But, having reached this two-phase commit "enabled" state we + * must not allow any subsequent table initialization to occur. + * So the ALTER SUBSCRIPTION ... REFRESH is disallowed when the + * the user had requested two_phase = on mode. + * + * The exception to this restriction is when copy_data = false, + * because when copy_data is false the tablesync will start + * already in READY state and will exit directly without doing + * anything which could interfere with the apply worker's + * message handling. + * + * For more details see comments atop worker.c. + */ + if (sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED && copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 021c1b3..58c813f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -73,6 +73,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, + bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); @@ -433,6 +434,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 140000) appendStringInfoString(&cmd, ", streaming 'on'"); + /* set the two_phase option only if the caller specifies it. */ + if (options->proto.logical.twophase && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfoString(&cmd, ", two_phase 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) @@ -833,7 +839,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, CRSSnapshotAction snapshot_action, + bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; @@ -847,6 +853,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, if (temporary) appendStringInfoString(&cmd, " TEMPORARY"); + if (two_phase) + appendStringInfoString(&cmd, " TWO_PHASE"); + if (conn->logical) { appendStringInfoString(&cmd, " LOGICAL pgoutput"); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5f59613..6a90b56 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -730,7 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, - SnapBuildInitialConsistentPoint(ctx->snapshot_builder), + SnapBuildGetTwoPhaseAt(ctx->snapshot_builder), commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 37b75de..5143d8f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot, slot->data.initial_consistent_point); + need_full_snapshot, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -432,10 +432,19 @@ CreateInitDecodingContext(const char *plugin, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is enabled + * at the time of slot creation, or when the two_phase option is given at + * the streaming start. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= (ctx->twophase_opt_given || slot->data.two_phase); + + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) + { + slot->data.two_phase = true; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -538,10 +547,21 @@ CreateDecodingContext(XLogRecPtr start_lsn, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions iff the two_phase option is - * enabled at the time of slot creation. + * We allow decoding of prepared transactions when the two_phase is enabled + * at the time of slot creation, or when the two_phase option is given at + * the streaming start. */ - ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->twophase &= (ctx->twophase_opt_given || slot->data.two_phase); + + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) + { + slot->data.two_phase = true; + slot->data.two_phase_at = start_lsn; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); + } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -602,7 +622,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; - slot->data.initial_consistent_point = ctx->reader->EndRecPtr; + if (slot->data.two_phase) + slot->data.two_phase_at = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 39471fd..b258174 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -973,8 +973,11 @@ replorigin_advance(RepOriginId node, /* * Due to - harmless - race conditions during a checkpoint we could see - * values here that are older than the ones we already have in memory. - * Don't overwrite those. + * values here that are older than the ones we already have in memory. We + * could also see older values for prepared transactions when the prepare + * is sent at a later point of time along with commit prepared and there + * are other transactions commits between prepare and commit prepared. See + * ReorderBufferFinishPrepared. Don't overwrite those. */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index f2c85ca..21487ec 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -106,6 +106,218 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write BEGIN PREPARE to the output stream. + */ +void +logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE); + + /* fixed fields */ + pq_sendint64(out, txn->final_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction BEGIN PREPARE from the stream. + */ +void +logicalrep_read_begin_prepare(StringInfo in, LogicalRepBeginPrepareData *begin_data) +{ + /* read fields */ + begin_data->final_lsn = pq_getmsgint64(in); + if (begin_data->final_lsn == InvalidXLogRecPtr) + elog(ERROR, "final_lsn not set in begin message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin message"); + begin_data->committime = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(begin_data->gid, pq_getmsgstring(in)); +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn is not set in prepare message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in prepare message"); + prepare_data->preparetime = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write COMMIT PREPARED to the output stream. + */ +void +logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. Additionally, the transaction + * must be prepared. See ReorderBufferFinishPrepared. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction COMMIT PREPARED from the stream. + */ +void +logicalrep_read_commit_prepared(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn is not set in commit prepared message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->preparetime = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write ROLLBACK PREPARED to the output stream. + */ +void +logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_end_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction ROLLBACK PREPARED from the stream. + */ +void +logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in rollback prepare message", flags); + + /* read fields */ + rollback_data->prepare_end_lsn = pq_getmsgint64(in); + if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_end_lsn is not set in commit prepared message"); + rollback_data->rollback_end_lsn = pq_getmsgint64(in); + if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "rollback_end_lsn is not set in commit prepared message"); + rollback_data->preparetime = pq_getmsgint64(in); + rollback_data->rollbacktime = pq_getmsgint64(in); + rollback_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(rollback_data->gid, pq_getmsgstring(in)); +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c291b05..ccb786e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2672,7 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - XLogRecPtr initial_consistent_point, + XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit) { @@ -2703,7 +2703,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, * prepare if it was not decoded earlier. We don't need to decode the xact * for aborts if it is not done already. */ - if ((txn->final_lsn < initial_consistent_point) && is_commit) + if ((txn->final_lsn < two_phase_at) && is_commit) { txn->txn_flags |= RBTXN_PREPARE; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ed3acad..12f0cf9 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,15 +165,14 @@ struct SnapBuild XLogRecPtr start_decoding_at; /* - * LSN at which we found a consistent point at the time of slot creation. - * This is also the point where we have exported a snapshot for the - * initial copy. + * LSN at which two-phase decoding was enabled or LSN at which we found a + * consistent point at the time of slot creation. * - * The prepared transactions that are not covered by initial snapshot - * needs to be sent later along with commit prepared and they must be - * before this point. + * The prepared transactions that were skipped because previously two-phase + * was not enabled or are not covered by initial snapshot needs to be sent + * later along with commit prepared and they must be before this point. */ - XLogRecPtr initial_consistent_point; + XLogRecPtr two_phase_at; /* * Don't start decoding WAL until the "xl_running_xacts" information @@ -281,7 +280,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, - XLogRecPtr initial_consistent_point) + XLogRecPtr two_phase_at) { MemoryContext context; MemoryContext oldcontext; @@ -309,7 +308,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; - builder->initial_consistent_point = initial_consistent_point; + builder->two_phase_at = two_phase_at; MemoryContextSwitchTo(oldcontext); @@ -370,12 +369,21 @@ SnapBuildCurrentState(SnapBuild *builder) } /* - * Return the LSN at which the snapshot was exported + * Return the LSN at which the two-phase decoding was first enabled. */ XLogRecPtr -SnapBuildInitialConsistentPoint(SnapBuild *builder) +SnapBuildGetTwoPhaseAt(SnapBuild *builder) { - return builder->initial_consistent_point; + return builder->two_phase_at; +} + +/* + * Set the LSN at which two-phase decoding is enabled. + */ +void +SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) +{ + builder->two_phase_at = ptr; } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6ed3181..233649e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -96,6 +96,7 @@ #include "access/table.h" #include "access/xact.h" +#include "catalog/indexing.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" @@ -114,8 +115,11 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" static bool table_states_valid = false; +static List *table_states_not_ready = NIL; +static void FetchTableStates(bool *started_tx); StringInfo copybuf = NULL; @@ -359,7 +363,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Oid relid; TimestampTz last_start_time; }; - static List *table_states = NIL; static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; @@ -367,42 +370,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - if (!table_states_valid) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - /* Clean the old list. */ - list_free_deep(table_states); - table_states = NIL; - - StartTransactionCommand(); - started_tx = true; - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states = lappend(table_states, rstate); - } - MemoryContextSwitchTo(oldctx); - - table_states_valid = true; - } + FetchTableStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need * syncing. */ - if (table_states && !last_start_times) + if (table_states_not_ready && !last_start_times) { HASHCTL ctl; @@ -416,16 +391,36 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Clean up the hash table when we're done with all tables (just to * release the bit of memory). */ - else if (!table_states && last_start_times) + else if (!table_states_not_ready && last_start_times) { hash_destroy(last_start_times); last_start_times = NULL; } /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become properly 'enabled' + * at that time. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + { + if (AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so two_phase can be enabled", + MySubscription->name))); + + proc_exit(0); + } + } + + /* * Process all tables that are being synchronized. */ - foreach(lc, table_states) + foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -1058,7 +1053,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * slot leading to a dangling slot on the server. */ HOLD_INTERRUPTS(); - walrcv_create_slot(wrconn, slotname, false /* permanent */ , + walrcv_create_slot(wrconn, slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); RESUME_INTERRUPTS(); @@ -1144,3 +1139,144 @@ copy_table_done: wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; } + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + */ +static void +FetchTableStates(bool *started_tx) +{ + *started_tx = false; + + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + StartTransactionCommand(); + *started_tx = true; + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + table_states_valid = true; + } +} + +/* + * Are all tablesyncs READY? + */ +bool +AllTablesyncsReady(void) +{ + bool found_busy = false; + bool started_tx = false; + int count = 0; + ListCell *lc; + + /* We need up-to-date sync state info for subscription tables here. */ + FetchTableStates(&started_tx); + + /* + * Process all not-READY tables. + */ + foreach(lc, table_states_not_ready) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + + count++; + /* + * When the process_syncing_tables_for_apply changes the state from + * SYNCDONE to READY, that change is actually written directly into + * the list element of table_states_not_ready. + * + * The table_states_valid flag is not immediately updated, so + * FetchTableStates does not rebuild the "table_states_not_ready" list + * because it is unaware that it needs to. + * + * It means the "table_states_not_ready" list might end up having + * a READY state in it even though there was none when it was + * initially created. + * + * This is why the code is testing for READY. And because a READY in the + * table_states_not_ready list is the exception rather than the rule it + * means we will nearly always break from this loop at the first + * iteration. + */ + if (rstate->state != SUBREL_STATE_READY) + { + found_busy = true; + break; + } + } + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(false); + } + + /* When no tablesyncs are busy, then all are READY */ + return !found_busy; +} + +/* + * Update the p_subscription two_phase state of the current subscription. + */ +void +UpdateTwoPhaseState(char new_state) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || + new_state == LOGICALREP_TWOPHASE_STATE_PENDING || + new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + + if (!IsTransactionState()) + StartTransactionCommand(); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(MySubscription->oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, + "cache lookup failed for subscription oid %u", + MySubscription->oid); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* And update/set two_phase ENABLED */ + values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + CommitTransactionCommand(); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 21d304a..1768268 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -49,6 +49,74 @@ * a new way to pass filenames to BufFile APIs so that we are allowed to open * the file we desired across multiple stream-open calls for the same * transaction. + * + * TWO_PHASE TRANSACTIONS + * ---------------------- + * Two phase transactions are replayed at prepare and then committed or + * rollbacked at commit prepared and rollback prepared respectively. It is + * possible to have a prepared transaction that arrives at the apply worker + * when the tablesync is busy doing the initial copy. In this case, the apply + * worker skips all the prepared operations [e.g. inserts] while the tablesync + * was still busy (see the condition of should_apply_changes_for_rel). The + * tablesync worker might not get such a prepared transaction because say it + * was prior to the initial consistent point but might have got some later + * commits. Now, the tablesync worker will exit without doing anything for the + * prepared transaction skipped by the apply worker as the sync location for it + * will be already ahead of the apply worker's current location. This would lead + * to an "empty prepare", because later when the apply worker does the commit + * prepare, there is nothing in it (the inserts were skipped earlier). + * + * To avoid this, and similar prepare confusions the subscription two_phase + * commit is enabled only after the initial sync is over. The two_phase option + * has been implemented as a tri-state with values DISABLED, PENDING, and + * ENABLED. + * + * Even if the user specifies they want a subscription with two_phase = on, + * internally it will start with a tri-state of PENDING which only becomes + * ENABLED after all tablesync initializations are completed - i.e. when all + * tablesync workers have reached their READY state. In other words, the value + * PENDING is only a temporary state for subscription start-up. + * + * Until the two_phase is properly available (ENABLED) the subscription will + * behave as if two_phase = off. When the apply worker detects that all + * tablesyncs have become READY (while the tri-state was PENDING) it will + * restart the apply worker process. This happens in + * process_sync_tables_for_apply. + * + * When the (re-started) apply worker finds that all tablesyncs are READY for a + * two_phase tri-state of PENDING it calls wal_startstreaming to enable the + * publisher for two-phase commit and updates the tri-state value + * PENDING -> ENABLED. Now, it is possible that during the time we have not + * enabled two_phase, the publisher (replication server) would have skipped some + * prepares but we ensure that such prepares are sent along with commit + * prepare, see ReorderBufferFinishPrepared. + * + * If ever a user needs to be aware of the tri-state value, they can fetch it + * from the pg_subscription catalog (see column subtwophasestate). + * + * We don't allow to toggle two_phase option of a subscription because it can + * lead to the inconsistent replica. Consider, initially, it was on and we have + * received some prepare then we turn it off, now at commit time the server + * will send the entire transaction data along with the commit. With some more + * analysis, we can allow changing this option from off to on but not sure if + * that alone would be useful. + * + * Finally, to avoid problems mentioned in previous paragraphs from any + * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on' + * to 'off' and then again back to 'on') there is a restriction for + * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted for + * two_phase = on, except when copy_data = false. + * + * We can get prepare of the same GID more than once for the genuine cases + * where we have defined multiple subscriptions for publications on the same + * server and prepared transaction has operations on tables subscribed to those + * subscriptions. For such cases, if we use the GID sent by publisher one of + * the prepares will be successful and others will fail, in which case the + * server will send them again. Now, this can lead to a deadlock if user has + * set synchronous_standby_names for all the subscriptions on subscriber. To + * avoid such deadlocks, we generate a unique GID (consisting of origin_id of + * subscription and xid of prepared transaction) for each prepare transaction + * on the subscriber. *------------------------------------------------------------------------- */ @@ -59,6 +127,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" @@ -246,6 +315,10 @@ static void apply_handle_tuple_routing(ResultRelInfo *relinfo, LogicalRepRelMapEntry *relmapentry, CmdType operation); +/* Compute GID for two_phase transactions */ +static void TwoPhaseTransactionGid(RepOriginId originid, TransactionId xid, + char* gid, int szgid); + /* * Should this worker apply changes for given relation. * @@ -720,6 +793,180 @@ apply_handle_commit(StringInfo s) } /* + * Handle BEGIN PREPARE message. + */ +static void +apply_handle_begin_prepare(StringInfo s) +{ + LogicalRepBeginPrepareData begin_data; + char gid[GIDSIZE] PG_USED_FOR_ASSERTS_ONLY; + + /* Tablesync should never receive prepare. */ + Assert(!am_tablesync_worker()); + + logicalrep_read_begin_prepare(s, &begin_data); + + /* The gid must not already be prepared. */ + TwoPhaseTransactionGid(replorigin_session_origin, begin_data.xid, gid, + sizeof(gid)); + Assert(!LookupGXact(gid, begin_data.end_lsn, begin_data.committime)); + + remote_final_lsn = begin_data.final_lsn; + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_prepare(s, &prepare_data); + + Assert(prepare_data.prepare_lsn == remote_final_lsn); + + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when we + * have multiple subscriptions from same node point to publications on the + * same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(replorigin_session_origin, prepare_data.xid, gid, + sizeof(gid)); + + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction. It is done this way because + * at commit prepared time, we won't know whether we have skipped + * preparing a transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worth because such cases shouldn't be common. + */ + ensure_transaction(); + + /* + * BeginTransactionBlock is necessary to balance the + * EndTransactionBlock called within the PrepareTransactionBlock + * below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + PrepareTransactionBlock(gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a COMMIT PREPARED of a previously PREPARED transaction. + */ +static void +apply_handle_commit_prepared(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + char gid[GIDSIZE]; + + logicalrep_read_commit_prepared(s, &prepare_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(replorigin_session_origin, prepare_data.xid, gid, + sizeof(gid)); + + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + FinishPreparedTransaction(gid, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION. + */ +static void +apply_handle_rollback_prepared(StringInfo s) +{ + LogicalRepRollbackPreparedTxnData rollback_data; + char gid[GIDSIZE]; + + logicalrep_read_rollback_prepared(s, &rollback_data); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(replorigin_session_origin, rollback_data.xid, gid, + sizeof(gid)); + + /* + * It is possible that we haven't received prepare because it occurred + * before walsender reached a consistent point in which case we need to + * skip rollback prepared. + */ + if (LookupGXact(gid, rollback_data.prepare_end_lsn, + rollback_data.preparetime)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; + replorigin_session_origin_timestamp = rollback_data.rollbacktime; + + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + FinishPreparedTransaction(gid, false); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(rollback_data.rollback_end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(rollback_data.rollback_end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -1954,6 +2201,28 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); return; + + case LOGICAL_REP_MSG_BEGIN_PREPARE: + apply_handle_begin_prepare(s); + return; + + case LOGICAL_REP_MSG_PREPARE: + apply_handle_prepare(s); + return; + + case LOGICAL_REP_MSG_COMMIT_PREPARED: + apply_handle_commit_prepared(s); + return; + + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + apply_handle_rollback_prepared(s); + return; + + case LOGICAL_REP_MSG_STREAM_PREPARE: + /* Streaming with two-phase is not supported */ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } ereport(ERROR, @@ -2430,6 +2699,9 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); + /* two-phase should not be altered */ + Assert(newsub->twophasestate == MySubscription->twophasestate); + /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker. @@ -2913,6 +3185,22 @@ cleanup_subxact_info() subxact_data.nsubxacts_max = 0; } +/* + * Form the prepared transaction GID for two_phase transactions. + * + * Return the GID in the supplied buffer. + */ +static void +TwoPhaseTransactionGid(RepOriginId originid, TransactionId xid, + char *gid, int szgid) +{ + /* Origin and Transaction ids must be valid */ + Assert(originid != InvalidRepOriginId); + Assert(TransactionIdIsValid(xid)); + + snprintf(gid, szgid, "pg_%u_%u", originid, xid); +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -3085,9 +3373,45 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; + options.proto.logical.twophase = false; - /* Start normal logical streaming replication. */ - walrcv_startstreaming(wrconn, &options); + if (!am_tablesync_worker()) + { + /* + * Even when the two_phase mode is requested by the user, it remains as + * the tri-state PENDING until all tablesyncs have reached READY state. + * Only then, can it become properly ENABLED. + */ + bool all_tables_ready = AllTablesyncsReady(); + + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + all_tables_ready) + { + /* Start streaming with two_phase enabled */ + options.proto.logical.twophase = true; + walrcv_startstreaming(wrconn, &options); + + UpdateTwoPhaseState(LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + } + else + { + walrcv_startstreaming(wrconn, &options); + } + + ereport(DEBUG1, + (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + } + else + { + /* Start normal logical streaming replication. */ + walrcv_startstreaming(wrconn, &options); + } /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 1b993fb..fc4b1ad 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,6 +47,16 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, @@ -66,6 +76,9 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx); +static void send_repl_origin(LogicalDecodingContext *ctx, + RepOriginId origin_id, XLogRecPtr origin_lsn, + bool send_origin); /* * Entry in the map used to remember which relation schemas we sent. @@ -143,6 +156,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; + + cb->begin_prepare_cb = pgoutput_begin_prepare_txn; + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -153,18 +171,22 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_truncate_cb = pgoutput_truncate; + /* transaction streaming - two-phase commit */ + cb->stream_prepare_cb = NULL; } static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names, bool *binary, - bool *enable_streaming) + bool *enable_streaming, + bool *enable_twophase) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; bool streaming_given = false; + bool twophase_given = false; *binary = false; @@ -232,9 +254,30 @@ parse_output_parameters(List *options, uint32 *protocol_version, *enable_streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (twophase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + twophase_given = true; + + *enable_twophase = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } + + /* + * Do additional checking for the disallowed combination of two_phase and + * streaming. While streaming and two_phase can theoretically be supported, + * it needs more analysis to allow them together. + */ + if (*enable_twophase && *enable_streaming) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "two_phase", "streaming"))); } /* @@ -245,6 +288,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { bool enable_streaming = false; + bool enable_twophase = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -269,7 +313,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, &data->protocol_version, &data->publication_names, &data->binary, - &enable_streaming); + &enable_streaming, + &enable_twophase); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) @@ -310,6 +355,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Also remember we're currently not streaming any transaction. */ in_streaming = false; + /* + * Here, we just check whether the two-phase option is passed by plugin + * and decide whether to enable it at later point of time. It remains + * enabled if the previous start-up has done so. But we only allow the + * option to be passed in with sufficient version of the protocol, and + * when the output plugin supports it. + */ + if (!enable_twophase) + ctx->twophase_opt_given = false; + else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM))); + else if (!ctx->twophase) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("two-phase commit requested, but not supported by output plugin"))); + else + ctx->twophase_opt_given = true; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -322,8 +388,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } else { - /* Disable the streaming during the slot initialization mode. */ + /* + * Disable the streaming and prepared transactions during the slot + * initialization mode. + */ ctx->streaming = false; + ctx->twophase = false; } } @@ -338,29 +408,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); - if (send_replication_origin) - { - char *origin; - - /*---------- - * XXX: which behaviour do we want here? - * - * Alternatives: - * - don't send origin message if origin name not found - * (that's what we do now) - * - throw error - that will break replication, not good - * - send some special "unknown" origin - *---------- - */ - if (replorigin_by_oid(txn->origin_id, true, &origin)) - { - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_origin(ctx->out, origin, txn->origin_lsn); - } - - } + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); OutputPluginWrite(ctx, true); } @@ -380,6 +429,68 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * BEGIN PREPARE callback + */ +static void +pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_begin_prepare(ctx->out, txn); + + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); + + OutputPluginWrite(ctx, true); +} + +/* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * ROLLBACK PREPARED callback + */ +static void +pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, + prepare_time); + OutputPluginWrite(ctx, true); +} + +/* * Write the current schema of the relation and its ancestor (if any) if not * done yet. */ @@ -778,18 +889,8 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); - if (send_replication_origin) - { - char *origin; - - if (replorigin_by_oid(txn->origin_id, true, &origin)) - { - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); - } - } + send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr, + send_replication_origin); OutputPluginWrite(ctx, true); @@ -1195,3 +1296,33 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubtruncate = false; } } + +/* Send Replication origin */ +static void +send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, + XLogRecPtr origin_lsn, bool send_origin) +{ + if (send_origin) + { + char *origin; + + /*---------- + * XXX: which behaviour do we want here? + * + * Alternatives: + * - don't send origin message if origin name not found + * (that's what we do now) + * - throw error - that will break replication, not good + * - send some special "unknown" origin + *---------- + */ + if (replorigin_by_oid(origin_id, true, &origin)) + { + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + + logicalrep_write_origin(ctx->out, origin, origin_lsn); + } + } +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index eb283a8..8c1f353 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void); %token K_SLOT %token K_RESERVE_WAL %token K_TEMPORARY +%token K_TWO_PHASE %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT @@ -102,6 +103,7 @@ static SQLCmd *make_sqlcmd(void); %type plugin_opt_arg %type opt_slot var_name %type opt_temporary +%type opt_two_phase %type create_slot_opt_list %type create_slot_opt @@ -241,16 +243,17 @@ create_replication_slot: cmd->options = $5; $$ = (Node *) cmd; } - /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list + /* CREATE_REPLICATION_SLOT slot TEMPORARY TWO_PHASE LOGICAL plugin */ + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary opt_two_phase K_LOGICAL IDENT create_slot_opt_list { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_LOGICAL; cmd->slotname = $2; cmd->temporary = $3; - cmd->plugin = $5; - cmd->options = $6; + cmd->two_phase = $4; + cmd->plugin = $6; + cmd->options = $7; $$ = (Node *) cmd; } ; @@ -365,6 +368,11 @@ opt_temporary: | /* EMPTY */ { $$ = false; } ; +opt_two_phase: + K_TWO_PHASE { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + opt_slot: K_SLOT IDENT { $$ = $2; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index dcc3c3f..c038a63 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } +TWO_PHASE { return K_TWO_PHASE; } EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } USE_SNAPSHOT { return K_USE_SNAPSHOT; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 75a087c..91224e0 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -285,6 +285,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.database = db_specific ? MyDatabaseId : InvalidOid; slot->data.persistency = persistency; slot->data.two_phase = two_phase; + slot->data.two_phase_at = InvalidXLogRecPtr; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8532296..8e7edae 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -364,7 +364,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index da6cc05..dddd5a3 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -51,6 +51,7 @@ #include "catalog/pg_largeobject_d.h" #include "catalog/pg_largeobject_metadata_d.h" #include "catalog/pg_proc_d.h" +#include "catalog/pg_subscription.h" /* For 2PC tri-state. */ #include "catalog/pg_trigger_d.h" #include "catalog/pg_type_d.h" #include "common/connect.h" @@ -4278,6 +4279,7 @@ getSubscriptions(Archive *fout) int i_subname; int i_rolname; int i_substream; + int i_subtwophasestate; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4321,9 +4323,16 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " false AS subbinary,\n"); if (fout->remoteVersion >= 140000) - appendPQExpBufferStr(query, " s.substream\n"); + appendPQExpBufferStr(query, " s.substream,\n"); else - appendPQExpBufferStr(query, " false AS substream\n"); + appendPQExpBufferStr(query, " false AS substream,\n"); + + if (fout->remoteVersion >= 140000) + appendPQExpBufferStr(query, " s.subtwophasestate\n"); + else + appendPQExpBuffer(query, + " '%c' AS subtwophasestate\n", + LOGICALREP_TWOPHASE_STATE_DISABLED); appendPQExpBufferStr(query, "FROM pg_subscription s\n" @@ -4344,6 +4353,7 @@ getSubscriptions(Archive *fout) i_subpublications = PQfnumber(res, "subpublications"); i_subbinary = PQfnumber(res, "subbinary"); i_substream = PQfnumber(res, "substream"); + i_subtwophasestate = PQfnumber(res, "subtwophasestate"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4369,6 +4379,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subbinary)); subinfo[i].substream = pg_strdup(PQgetvalue(res, i, i_substream)); + subinfo[i].subtwophasestate = + pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4396,6 +4408,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) char **pubnames = NULL; int npubnames = 0; int i; + char two_phase_disabled[] = { LOGICALREP_TWOPHASE_STATE_DISABLED, '\0' }; if (!(subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) return; @@ -4437,6 +4450,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->substream, "f") != 0) appendPQExpBufferStr(query, ", streaming = on"); + if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0) + appendPQExpBufferStr(query, ", two_phase = on"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 5340843..70c072d 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -639,6 +639,7 @@ typedef struct _SubscriptionInfo char *subslotname; char *subbinary; char *substream; + char *subtwophasestate; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index eeac0ef..fd0d90e 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6098,7 +6098,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false}; + false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6124,13 +6124,18 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { - /* Binary mode and streaming are only supported in v14 and higher */ + /* + * Binary, streaming, and two_phase are only supported in v14 and + * higher + */ if (pset.sversion >= 140000) appendPQExpBuffer(&buf, ", subbinary AS \"%s\"\n" - ", substream AS \"%s\"\n", + ", substream AS \"%s\"\n" + ", subtwophasestate AS \"%s\"\n", gettext_noop("Binary"), - gettext_noop("Streaming")); + gettext_noop("Streaming"), + gettext_noop("Two phase commit")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index b67f4ea..7957e2c 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2764,7 +2764,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("copy_data", "connect", "create_slot", "enabled", - "slot_name", "synchronous_commit"); + "slot_name", "synchronous_commit", "streaming", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 91786da..e27e1a8 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -58,4 +58,6 @@ extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); +extern bool LookupGXact(const char *gid, XLogRecPtr prepare_at_lsn, + TimestampTz origin_prepare_timestamp); #endif /* TWOPHASE_H */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index a5d6efd..3f06d1f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -22,6 +22,11 @@ #include "nodes/pg_list.h" +/* two_phase tri-state values. */ +#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd' +#define LOGICALREP_TWOPHASE_STATE_PENDING 'p' +#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' + /* ---------------- * pg_subscription definition. cpp turns this into * typedef struct FormData_pg_subscription @@ -54,6 +59,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + char subtwophasestate; /* Decode 2PC PREPARE? */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -91,6 +98,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + char twophasestate; /* Decode 2PC PREPARE? */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c253403..5c1ce7e 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -85,11 +85,16 @@ typedef struct LogicalDecodingContext bool streaming; /* - * Does the output plugin support two-phase decoding, and is it enabled? + * Does the output plugin support two-phase decoding. */ bool twophase; /* + * Is two-phase option given by output plugin? + */ + bool twophase_opt_given; + + /* * State for writing output. */ bool accept_writes; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index fa4c372..eedfc3c 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -13,6 +13,7 @@ #ifndef LOGICAL_PROTO_H #define LOGICAL_PROTO_H +#include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" @@ -27,10 +28,16 @@ * * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with * support for streaming large transactions. + * + * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with + * support for two-phase commit PREPARE decoding. This has the same protocol + * version requirement as LOGICAL_PROTO_STREAM_VERSION_NUM because these + * features were both introduced in the same release (PG14). */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 +#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 2 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM /* @@ -54,10 +61,15 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', + LOGICAL_REP_MSG_PREPARE = 'P', + LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', + LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', - LOGICAL_REP_MSG_STREAM_ABORT = 'A' + LOGICAL_REP_MSG_STREAM_ABORT = 'A', + LOGICAL_REP_MSG_STREAM_PREPARE = 'p' } LogicalRepMsgType; /* @@ -114,6 +126,7 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ typedef struct LogicalRepCommitData { XLogRecPtr commit_lsn; @@ -121,6 +134,50 @@ typedef struct LogicalRepCommitData TimestampTz committime; } LogicalRepCommitData; + +/* Begin Prepare information */ +typedef struct LogicalRepBeginPrepareData +{ + XLogRecPtr final_lsn; + XLogRecPtr end_lsn; + TimestampTz committime; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepBeginPrepareData; + +/* + * Prepared transaction protocol information. This same structure is used to + * hold the information for prepare, and commit prepared transaction. + * prepare_lsn and preparetime are used to store commit lsn and + * commit time for commit prepared. + */ +typedef struct LogicalRepPreparedTxnData +{ + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz preparetime; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepPreparedTxnData; + +/* + * Rollback Prepared transaction protocol information. The prepare information + * prepare_end_lsn and prepare_time are used to check if the downstream has + * received this prepared transaction in which case it can apply the rollback, + * otherwise, it can skip the rollback operation. The gid alone is not + * sufficient because the downstream node can have a prepared transaction with + * same identifier. + */ +typedef struct LogicalRepRollbackPreparedTxnData +{ + XLogRecPtr prepare_end_lsn; + XLogRecPtr rollback_end_lsn; + TimestampTz preparetime; + TimestampTz rollbacktime; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepRollbackPreparedTxnData; + extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); @@ -128,6 +185,24 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn); +extern void logicalrep_read_begin_prepare(StringInfo in, + LogicalRepBeginPrepareData *begin_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern void logicalrep_read_commit_prepared(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); +extern void logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data); + + extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 565a961..6c9f2c6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -643,7 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - XLogRecPtr initial_consistent_point, + XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c..db68551 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -92,11 +92,10 @@ typedef struct ReplicationSlotPersistentData XLogRecPtr confirmed_flush; /* - * LSN at which we found a consistent point at the time of slot creation. - * This is also the point where we have exported a snapshot for the - * initial copy. + * LSN at which we enabled two_phase commit for this slot or LSN at which + * we found a consistent point at the time of slot creation. */ - XLogRecPtr initial_consistent_point; + XLogRecPtr two_phase_at; /* * Allow decoding of prepared transactions? diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index fbabce6..de72124 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -62,7 +62,7 @@ extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, - XLogRecPtr initial_consistent_point); + XLogRecPtr two_phase_at); extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); @@ -76,7 +76,8 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); -extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder); +extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder); +extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 4fd7c25..9edf907 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -181,6 +181,7 @@ typedef struct List *publication_names; /* String list of publications */ bool binary; /* Ask publisher to use binary */ bool streaming; /* Streaming of large transactions */ + bool twophase; /* Enable 2PC decoding of PREPARE */ } logical; } proto; } WalRcvStreamOptions; @@ -347,6 +348,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, + bool two_phase, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); @@ -420,8 +422,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1cac75e..daf6ad4 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,9 @@ extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); +extern bool AllTablesyncsReady(void); +extern void UpdateTwoPhaseState(char new_state); + void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 14a4302..a9664e8 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -162,19 +162,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -185,19 +185,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -224,6 +224,43 @@ ALTER SUBSCRIPTION regress_testsub DISABLE; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; DROP FUNCTION func; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); +ERROR: two_phase requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist +(1 row) + +--fail - alter of two_phase option not supported. +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); +ERROR: cannot alter two_phase option +--fail - cannot set streaming when two_phase enabled +ALTER SUBSCRIPTION regress_testsub SET (streaming = true); +ERROR: cannot set streaming = true for two-phase enabled subscription +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; +-- fail - two_phase and streaming are mutually exclusive. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); +ERROR: two_phase = true and streaming = true are mutually exclusive options +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +------+-------+---------+-------------+--------+-----------+------------------+--------------------+---------- +(0 rows) + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 81e65e5..13e0c20 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -171,6 +171,31 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; DROP FUNCTION func; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); + +\dRs+ +--fail - alter of two_phase option not supported. +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); + +--fail - cannot set streaming when two_phase enabled +ALTER SUBSCRIPTION regress_testsub SET (streaming = true); + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + +-- fail - two_phase and streaming are mutually exclusive. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); + +\dRs+ + + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1d1d5d2..1f3038f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1343,12 +1343,15 @@ LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData +LogicalRepBeginPrepareData LogicalRepCommitData LogicalRepCtxStruct LogicalRepPartMapEntry +LogicalRepPreparedTxnData LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation +LogicalRepRollbackPreparedTxnData LogicalRepTupleData LogicalRepTyp LogicalRepWorker -- 1.8.3.1