From 74f703ced88de4b16a1d216b1fc42d2ac1f77e73 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 19 Mar 2021 19:46:32 +0530 Subject: [PATCH v63 2/3] Misc changes by Amit. --- src/backend/commands/subscriptioncmds.c | 21 ++++++++-------- .../libpqwalreceiver/libpqwalreceiver.c | 6 ++--- src/backend/replication/logical/logical.c | 29 ++++++++++------------ src/backend/replication/logical/snapbuild.c | 8 +++--- src/backend/replication/logical/worker.c | 25 +++++++++++++++---- src/backend/replication/pgoutput/pgoutput.c | 19 ++++++++------ src/backend/replication/repl_gram.y | 2 +- src/backend/replication/walreceiver.c | 2 +- src/include/replication/logical.h | 2 +- src/include/replication/slot.h | 3 ++- src/include/replication/walreceiver.h | 6 ++--- 11 files changed, 70 insertions(+), 53 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 015b2dc..79178a9 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -224,7 +224,7 @@ parse_subscription_options(List *options, /* * Do not allow toggling of two_phase option, this could * cause missing of transactions and lead to an inconsistent - * replica. + * replica. See comments atop worker.c. */ if (!twophase) { @@ -321,8 +321,6 @@ parse_subscription_options(List *options, * Do additional checking for the disallowed combination of two_phase and * streaming. While streaming and two_phase can theoretically be supported, * it needs more analysis to allow them together. - * Hence, disabling this combination till that issue can - * be addressed. */ if (twophase && *twophase_given && *twophase) { @@ -584,13 +582,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) if (create_slot) { Assert(slotname); + /* - * Even if two_phase is set, don't create the slot with two-phase - * enabled. Will enable it once all the tables are synced and ready. - * This avoids race-conditions like prepared transactions being - * skippped due to changes not being applied due to checks in - * should_apply_changes_for_rel() when tablesync for the - * corresponding tables are in progress. + * Even if two_phase is set, don't create the slot with + * two-phase enabled. Will enable it once all the tables are + * synced and ready. This avoids race-conditions like prepared + * transactions being skipped due to changes not being applied + * due to checks in should_apply_changes_for_rel() when + * tablesync for the corresponding tables are in progress. See + * comments atop worker.c. */ walrcv_create_slot(wrconn, slotname, false, false, CRS_NOEXPORT_SNAPSHOT, NULL); @@ -1083,8 +1083,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) * anything which could interfere with the apply worker's * message handling. * - * For more details see the "TWO_PHASE COMMIT TRI-STATE LOGIC" - * comment atop worker.c + * For more details see comments atop worker.c. */ if (sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED && copy_data) ereport(ERROR, diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index c8f0539..58c813f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -63,7 +63,7 @@ static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len); static bool libpqrcv_startstreaming(WalReceiverConn *conn, - const WalRcvStreamOptions *options, bool two_phase); + const WalRcvStreamOptions *options); static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli); static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, @@ -394,7 +394,7 @@ libpqrcv_server_version(WalReceiverConn *conn) */ static bool libpqrcv_startstreaming(WalReceiverConn *conn, - const WalRcvStreamOptions *options, bool two_phase) + const WalRcvStreamOptions *options) { StringInfoData cmd; PGresult *res; @@ -435,7 +435,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfoString(&cmd, ", streaming 'on'"); /* set the two_phase option only if the caller specifies it. */ - if (options->proto.logical.twophase && two_phase && + if (options->proto.logical.twophase && PQserverVersion(conn->streamConn) >= 140000) appendStringInfoString(&cmd, ", two_phase 'on'"); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d944b02..5143d8f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -432,20 +432,18 @@ CreateInitDecodingContext(const char *plugin, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions when - * (a) the two_phase is enabled at the time of slot creation, - * or (b) when the two_phase option is passed in at restart. + * We allow decoding of prepared transactions when the two_phase is enabled + * at the time of slot creation, or when the two_phase option is given at + * the streaming start. */ - ctx->twophase &= (ctx->twophase_opt_given || MyReplicationSlot->data.two_phase); + ctx->twophase &= (ctx->twophase_opt_given || slot->data.two_phase); - /* Set two_phase_at LSN only if it hasn't already been set. */ - if (ctx->twophase && !MyReplicationSlot->data.two_phase_at) + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) { - MyReplicationSlot->data.two_phase_at = restart_lsn; slot->data.two_phase = true; ReplicationSlotMarkDirty(); ReplicationSlotSave(); - SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, restart_lsn); } ctx->reorder->output_rewrites = ctx->options.receive_rewrites; @@ -537,7 +535,6 @@ CreateDecodingContext(XLogRecPtr start_lsn, start_lsn = slot->data.confirmed_flush; } - ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, fast_forward, xl_routine, prepare_write, @@ -550,17 +547,17 @@ CreateDecodingContext(XLogRecPtr start_lsn, MemoryContextSwitchTo(old_context); /* - * We allow decoding of prepared transactions when - * (a) the two_phase is enabled at the time of slot creation, - * or (b) when the two_phase option is passed in at restart. + * We allow decoding of prepared transactions when the two_phase is enabled + * at the time of slot creation, or when the two_phase option is given at + * the streaming start. */ - ctx->twophase &= (ctx->twophase_opt_given || MyReplicationSlot->data.two_phase); + ctx->twophase &= (ctx->twophase_opt_given || slot->data.two_phase); - /* Set two_phase_at LSN only if it hasn't already been set. */ - if (ctx->twophase && !MyReplicationSlot->data.two_phase_at) + /* Mark slot to allow two_phase decoding if not already marked */ + if (ctx->twophase && !slot->data.two_phase) { - MyReplicationSlot->data.two_phase_at = start_lsn; slot->data.two_phase = true; + slot->data.two_phase_at = start_lsn; ReplicationSlotMarkDirty(); ReplicationSlotSave(); SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index fddcc64..12f0cf9 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,10 +165,12 @@ struct SnapBuild XLogRecPtr start_decoding_at; /* - * LSN at which two-phase decoding was enabled. + * LSN at which two-phase decoding was enabled or LSN at which we found a + * consistent point at the time of slot creation. * - * The prepared transactions from an LSN < this LSN - * needs to be sent later along with commit prepared. + * The prepared transactions that were skipped because previously two-phase + * was not enabled or are not covered by initial snapshot needs to be sent + * later along with commit prepared and they must be before this point. */ XLogRecPtr two_phase_at; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 1a76c38..26f8069 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -816,6 +816,19 @@ apply_handle_prepare(StringInfo s) Assert(prepare_data.prepare_lsn == remote_final_lsn); + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction. It is done this way because + * at commit prepared time, we won't know whether we have skipped + * preparing a transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worth because such cases shouldn't be common. Also, as of now, the + * two different subscriptions can receive the same prepared transaction + * GID and can cause confusion at the time of commit prepared if we skip + * preparing the tranasction. + */ ensure_transaction(); /* @@ -3313,7 +3326,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; - options.proto.logical.twophase = MySubscription->twophasestate; + options.proto.logical.twophase = false; if (!am_tablesync_worker()) { @@ -3324,16 +3337,18 @@ ApplyWorkerMain(Datum main_arg) */ bool all_tables_ready = AllTablesyncsREADY(); - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && all_tables_ready) + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + all_tables_ready) { /* Start streaming with two_phase enabled */ - walrcv_startstreaming(wrconn, &options, true); + options.proto.logical.twophase = true; + walrcv_startstreaming(wrconn, &options); UpdateTwoPhaseState(LOGICALREP_TWOPHASE_STATE_ENABLED); MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; } else { - walrcv_startstreaming(wrconn, &options, false); + walrcv_startstreaming(wrconn, &options); } ereport(LOG, @@ -3347,7 +3362,7 @@ ApplyWorkerMain(Datum main_arg) else { /* Start normal logical streaming replication. */ - walrcv_startstreaming(wrconn, &options, false); + walrcv_startstreaming(wrconn, &options); } /* Run the main loop. */ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9dc7fc2..fc4b1ad 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -267,14 +267,17 @@ parse_output_parameters(List *options, uint32 *protocol_version, else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } + /* - * Enabling both streaming and two-phase is not a currently supported - * combination, could be supported in future. + * Do additional checking for the disallowed combination of two_phase and + * streaming. While streaming and two_phase can theoretically be supported, + * it needs more analysis to allow them together. */ if (*enable_twophase && *enable_streaming) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unsupported combination of options"))); + errmsg("%s and %s are mutually exclusive options", + "two_phase", "streaming"))); } /* @@ -353,11 +356,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, in_streaming = false; /* - * We don't decide if two-phae is disabled here. Just mark the option - * as passed in or not. Two-phase could remain enabled because a previous - * start-up enabled in. But we only allow the option to be passed in - * with sufficient version of the protocol, and when the output plugin - * supports it. + * Here, we just check whether the two-phase option is passed by plugin + * and decide whether to enable it at later point of time. It remains + * enabled if the previous start-up has done so. But we only allow the + * option to be passed in with sufficient version of the protocol, and + * when the output plugin supports it. */ if (!enable_twophase) ctx->twophase_opt_given = false; diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 27fdbd7..8c1f353 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -325,7 +325,7 @@ start_replication: } ; -/* START_REPLICATION SLOT slot LOGICAL %X/%X options*/ +/* START_REPLICATION SLOT slot LOGICAL %X/%X options */ start_logical_replication: K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 1325c29..8e7edae 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -388,7 +388,7 @@ WalReceiverMain(void) options.slotname = slotname[0] != '\0' ? slotname : NULL; options.proto.physical.startpointTLI = startpointTLI; ThisTimeLineID = startpointTLI; - if (walrcv_startstreaming(wrconn, &options, false)) + if (walrcv_startstreaming(wrconn, &options)) { if (first_stream) ereport(LOG, diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 6548b27..5c1ce7e 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -90,7 +90,7 @@ typedef struct LogicalDecodingContext bool twophase; /* - * Does output plugin wants to turn on two-phase? + * Is two-phase option given by output plugin? */ bool twophase_opt_given; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1f4b253..db68551 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -92,7 +92,8 @@ typedef struct ReplicationSlotPersistentData XLogRecPtr confirmed_flush; /* - * LSN at which we enabled two_phase commit for this slot. + * LSN at which we enabled two_phase commit for this slot or LSN at which + * we found a consistent point at the time of slot creation. */ XLogRecPtr two_phase_at; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e5b6329..9edf907 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -302,7 +302,7 @@ typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn, * didn't switch to copy-mode. */ typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn, - const WalRcvStreamOptions *options, bool two_phase); + const WalRcvStreamOptions *options); /* * walrcv_endstreaming_fn @@ -414,8 +414,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) -#define walrcv_startstreaming(conn, options, two_phase) \ - WalReceiverFunctions->walrcv_startstreaming(conn, options, two_phase) +#define walrcv_startstreaming(conn, options) \ + WalReceiverFunctions->walrcv_startstreaming(conn, options) #define walrcv_endstreaming(conn, next_tli) \ WalReceiverFunctions->walrcv_endstreaming(conn, next_tli) #define walrcv_receive(conn, buffer, wait_fd) \ -- 1.8.3.1