From 4a2abc82db9ab37699f09df9be86f150c58db3cf Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 28 Jun 2021 13:18:58 +0900 Subject: [PATCH v2 3/3] Add skip_xid option to ALTER SUBSCRIPTION. --- doc/src/sgml/logical-replication.sgml | 33 ++- doc/src/sgml/ref/alter_subscription.sgml | 47 +++- src/backend/catalog/pg_subscription.c | 10 + src/backend/commands/subscriptioncmds.c | 138 +++++++++-- src/backend/parser/gram.y | 11 +- src/backend/replication/logical/worker.c | 252 +++++++++++++++++---- src/include/catalog/pg_subscription.h | 4 + src/include/nodes/parsenodes.h | 4 +- src/test/regress/expected/subscription.out | 24 ++ src/test/regress/sql/subscription.sql | 20 ++ src/test/subscription/t/023_skip_xact.pl | 185 +++++++++++++++ 11 files changed, 645 insertions(+), 83 deletions(-) create mode 100644 src/test/subscription/t/023_skip_xact.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 88646bc859..d222e64122 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -333,14 +333,41 @@ A conflict will produce an error and will stop the replication; it must be resolved manually by the user. Details about the conflict can be found in - the subscriber's server log. + as well as the + subscriber's server log. The resolution can be done either by changing data on the subscriber so that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the + transaction that conflicts with the existing data. When a conflict produce + an error, it is shown in pg_stat_subscription_errors + view as follows: + + + +postgres=# SELECT * FROM pg_stat_subscription_errors; + datname | subname | relid | command | xid | failure_source | failure_count | last_failure | last_failure_message +----------+----------+-------+---------+-----+----------------+---------------+-------------------------------+------------------------------------------------------------ + postgres | test_sub | 16385 | INSERT | 740 | apply | 1 | 2021-07-15 21:54:58.804595+00 | duplicate key value violates unique constraint "test_pkey" + + + + and it is also shown in subscriber's server log as follows: + + + +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (id)=(1) already exists. +CONTEXT: during apply of "INSERT" for relation "public.test" in transaction with xid 740 committs 2021-07-15 21:54:58.802874+00 + + + + The transaction ID to skip (740 in above cases) can be found in those outputs. + The transaction can be skipped by setting skip_xid to + the subscription by ALTER SUBSCRIPTION ... SET. + Alternatively, The transaction can also be skipped by calling the + pg_replication_origin_advance() function with a node_name corresponding to the subscription name, and a position. The current position of origins can be seen in the diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a6f994450d..e961f83eca 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -29,6 +29,7 @@ ALTER SUBSCRIPTION name REFRESH PUB ALTER SUBSCRIPTION name ENABLE ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) +ALTER SUBSCRIPTION name RESET ( subscription_parameter [, ... ] ) ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER SUBSCRIPTION name RENAME TO new_name @@ -192,15 +193,47 @@ ALTER SUBSCRIPTION name RENAME TO < SET ( subscription_parameter [= value] [, ... ] ) + RESET ( subscription_parameter [, ... ] ) - This clause alters parameters originally set by - . See there for more - information. The parameters that can be altered - are slot_name, - synchronous_commit, - binary, and - streaming. + This clause sets or resets a subscription option. The parameters that can be + set are the parameters originally set by : + slot_name, synchronous_commit, + binary, streaming, and following + parameter: + + + + + skip_xid (xid) + + + If incoming data violates any constraints the logical replication + will stop until it is resolved (See + for the details). + The resolution can be done either by changing data on the + subscriber so that it doesn't conflict with incoming change or by + skipping the whole transaction. This option specifies transaction + ID that logical replication worker skips to apply. The logical + replication worker skips all data modification changes of the + specified transaction. Therefore, since it skips the whole + transaction including the changes that don't violate the constraint, + it should only be used as a last resort. This option has no effect + for the transaction that is already prepared with enabling + two_phase on susbscriber. After the logical + replication successfully skips the transaction, the transaction ID + (stored in + pg_subscription.subskipxid) + is cleared. + + + + + + + The parameters that can be reset are: streaming, + binary, synchronous_commit, + and skip_xid. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 25021e25a4..cb22cd7463 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,16 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); + /* Get skip XID */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subskipxid, + &isnull); + if (!isnull) + sub->skipxid = DatumGetTransactionId(datum); + else + sub->skipxid = InvalidTransactionId; + ReleaseSysCache(tup); return sub; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 239d263f83..b0a4b1de60 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,6 +60,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_SKIP_XID 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -81,6 +82,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + TransactionId skip_xid; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -99,7 +101,8 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, */ static void parse_subscription_options(ParseState *pstate, List *stmt_options, - bits32 supported_opts, SubOpts *opts) + bits32 supported_opts, SubOpts *opts, + bool is_reset) { ListCell *lc; @@ -128,12 +131,23 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_SKIP_XID)) + opts->skip_xid = InvalidTransactionId; /* Parse options */ foreach(lc, stmt_options) { DefElem *defel = (DefElem *) lfirst(lc); + if (is_reset) + { + if (defel->arg != NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("RESET must not include values for parameters"))); + + } + if (IsSet(supported_opts, SUBOPT_CONNECT) && strcmp(defel->defname, "connect") == 0) { @@ -141,7 +155,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CONNECT; - opts->connect = defGetBoolean(defel); + if (!is_reset) + opts->connect = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_ENABLED) && strcmp(defel->defname, "enabled") == 0) @@ -150,7 +165,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_ENABLED; - opts->enabled = defGetBoolean(defel); + if (!is_reset) + opts->enabled = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) && strcmp(defel->defname, "create_slot") == 0) @@ -159,7 +175,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CREATE_SLOT; - opts->create_slot = defGetBoolean(defel); + if (!is_reset) + opts->create_slot = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) && strcmp(defel->defname, "slot_name") == 0) @@ -168,7 +185,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SLOT_NAME; - opts->slot_name = defGetString(defel); + if (!is_reset) + opts->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ if (strcmp(opts->slot_name, "none") == 0) @@ -181,7 +199,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_COPY_DATA; - opts->copy_data = defGetBoolean(defel); + if (!is_reset) + opts->copy_data = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && strcmp(defel->defname, "synchronous_commit") == 0) @@ -190,12 +209,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; - opts->synchronous_commit = defGetString(defel); + if (!is_reset) + { + opts->synchronous_commit = defGetString(defel); - /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", opts->synchronous_commit, - PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, - false, 0, false); + /* Test if the given value is valid for synchronous_commit GUC. */ + (void) set_config_option("synchronous_commit", opts->synchronous_commit, + PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, + false, 0, false); + } } else if (IsSet(supported_opts, SUBOPT_REFRESH) && strcmp(defel->defname, "refresh") == 0) @@ -204,7 +226,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_REFRESH; - opts->refresh = defGetBoolean(defel); + if (!is_reset) + opts->refresh = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_BINARY) && strcmp(defel->defname, "binary") == 0) @@ -213,7 +236,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_BINARY; - opts->binary = defGetBoolean(defel); + if (!is_reset) + opts->binary = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_STREAMING) && strcmp(defel->defname, "streaming") == 0) @@ -222,7 +246,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_STREAMING; - opts->streaming = defGetBoolean(defel); + if (!is_reset) + opts->streaming = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -243,7 +268,26 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; - opts->twophase = defGetBoolean(defel); + if (!is_reset) + opts->twophase = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "skip_xid") == 0) + { + if (!is_reset) + { + int64 arg; + TransactionId xid; + + arg = defGetInt64(defel); + xid = (TransactionId) arg; + if (arg < 0 || !TransactionIdIsNormal(xid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid transaction id"))); + opts->skip_xid = xid; + } + + opts->specified_opts |= SUBOPT_SKIP_XID; } else ereport(ERROR, @@ -414,7 +458,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); - parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, supported_opts, &opts, + false); /* * Since creating a replication slot is not transactional, rolling back @@ -487,6 +532,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + nulls[Anum_pg_subscription_subskipxid - 1] = true; values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -883,14 +929,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, switch (stmt->kind) { - case ALTER_SUBSCRIPTION_OPTIONS: + case ALTER_SUBSCRIPTION_SET_OPTIONS: { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_SKIP_XID); parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { @@ -935,14 +981,60 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(opts.skip_xid); + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + update_tuple = true; break; } + case ALTER_SUBSCRIPTION_RESET_OPTIONS: + { + supported_opts = (SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_STREAMING | + SUBOPT_BINARY | SUBOPT_SKIP_XID); + + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts, true); + + if (IsSet(opts.specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) + { + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum("off"); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_substream - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_BINARY)) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + nulls[Anum_pg_subscription_subskipxid - 1] = + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + + update_tuple = true; + break; + } case ALTER_SUBSCRIPTION_ENABLED: { parse_subscription_options(pstate, stmt->options, - SUBOPT_ENABLED, &opts); + SUBOPT_ENABLED, &opts, false); + Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED)); if (!sub->slotname && opts.enabled) @@ -977,7 +1069,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -1027,7 +1119,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts |= SUBOPT_COPY_DATA; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); values[Anum_pg_subscription_subpublications - 1] = @@ -1075,7 +1167,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA, &opts, false); /* * The subscription option "two_phase" requires that diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 10da5c5c51..41a1d333f6 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9699,7 +9699,16 @@ AlterSubscriptionStmt: { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_OPTIONS; + n->kind = ALTER_SUBSCRIPTION_SET_OPTIONS; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name RESET definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_RESET_OPTIONS; n->subname = $3; n->options = $5; $$ = (Node *)n; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7c2ec983bb..e09929206f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -277,6 +278,16 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * True if we're skipping changes of the specified transaction in + * MySubscription->skip_xid. Please note that we don’t skip receiving the changes + * since we decide whether or not to skip applying the changes when starting to + * apply. When stopping the skipping behavior, we reset the skip XID (subskipxid) + * in the pg_subscription and associate origin status to the transaction that resets + * the skip XID so that we can start streaming from the next transaction. + */ +static bool skipping_changes = false; + /* * Hash table for storing the streaming xid information along with shared file * set for streaming and subxact files. @@ -332,8 +343,7 @@ static void maybe_reread_subscription(void); /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); -static void apply_handle_commit_internal(StringInfo s, - LogicalRepCommitData *commit_data); +static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot); @@ -361,6 +371,9 @@ static void set_apply_error_context_rel(LogicalRepRelMapEntry *rel); static void reset_apply_error_context_rel(void); static void reset_apply_error_context_info(void); +static void maybe_start_skipping_changes(TransactionId xid); +static void stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs); + /* * Should this worker apply changes for given relation. * @@ -858,6 +871,9 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -882,7 +898,18 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(s, &commit_data); + /* + * Stop the skipping transaction if enabled. Otherwise, commit the + * changes that are just applied. + */ + if (skipping_changes) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -910,6 +937,9 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -936,47 +966,55 @@ apply_handle_prepare(StringInfo s) LSN_FORMAT_ARGS(remote_final_lsn)))); /* - * Compute unique GID for two_phase transactions. We don't use GID of - * prepared transaction sent by server as that can lead to deadlock when - * we have multiple subscriptions from same node point to publications on - * the same node. See comments atop worker.c + * Prepare transaction if we haven't skipped the changes of this transaction. */ - TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, - gid, sizeof(gid)); + if (skipping_changes) + stop_skipping_changes(prepare_data.end_lsn, prepare_data.prepare_time); + else + { + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); - /* - * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. - * - * XXX, We can optimize such that at commit prepared time, we first check - * whether we have prepared the transaction or not but that doesn't seem - * worthwhile because such cases shouldn't be common. - */ - begin_replication_step(); + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction. It is done this way because at + * commit prepared time, we won't know whether we have skipped preparing a + * transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); - /* - * BeginTransactionBlock is necessary to balance the EndTransactionBlock - * called within the PrepareTransactionBlock below. - */ - BeginTransactionBlock(); - CommitTransactionCommand(); /* Completes the preceding Begin command. */ + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.prepare_time; + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; - PrepareTransactionBlock(gid); - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + PrepareTransactionBlock(gid); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + } + store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1089,9 +1127,10 @@ apply_handle_origin(StringInfo s) { /* * ORIGIN message can only come inside streaming transaction or inside - * remote transaction and before any actual writes. + * remote transaction and before any actual writes unless we're skipping + * changes of the transaction. */ - if (!in_streamed_transaction && + if (!in_streamed_transaction && !skipping_changes && (!in_remote_transaction || (IsTransactionState() && !am_tablesync_worker()))) ereport(ERROR, @@ -1113,6 +1152,9 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + /* * Start a transaction on stream start, this transaction will be committed * on the stream stop unless it is a tablesync worker in which case it @@ -1125,9 +1167,6 @@ apply_handle_stream_start(StringInfo s) /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; - /* extract XID of the top-level transaction */ - stream_xid = logicalrep_read_stream_start(s, &first_segment); - if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1209,6 +1248,7 @@ apply_handle_stream_abort(StringInfo s) errmsg_internal("STREAM ABORT message without STREAM STOP"))); logicalrep_read_stream_abort(s, &xid, &subxid); + maybe_start_skipping_changes(xid); /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so @@ -1301,6 +1341,10 @@ apply_handle_stream_abort(StringInfo s) CommitTransactionCommand(); } + /* Stop the skipping transaction if enabled */ + if (skipping_changes) + stop_skipping_changes(InvalidXLogRecPtr, 0); + reset_apply_error_context_info(); } @@ -1311,11 +1355,11 @@ static void apply_handle_stream_commit(StringInfo s) { TransactionId xid; + LogicalRepCommitData commit_data; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1329,8 +1373,13 @@ apply_handle_stream_commit(StringInfo s) apply_error_callback_arg.remote_xid = xid; apply_error_callback_arg.committs = commit_data.committime; + remote_final_lsn = commit_data.commit_lsn; + elog(DEBUG1, "received commit for streamed transaction %u", xid); + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(xid); + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1362,13 +1411,12 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; - /* * Make sure the handle apply_dispatch methods are aware we're in a remote * transaction. */ in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); end_replication_step(); @@ -1441,7 +1489,17 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); - apply_handle_commit_internal(s, &commit_data); + if (skipping_changes) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + { + /* commit the streamed transaction */ + apply_handle_commit_internal(&commit_data); + } /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); @@ -1450,7 +1508,6 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); } @@ -1458,7 +1515,7 @@ apply_handle_stream_commit(StringInfo s) * Helper function for apply_handle_commit and apply_handle_stream_commit. */ static void -apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data) +apply_handle_commit_internal(LogicalRepCommitData *commit_data) { if (IsTransactionState()) { @@ -2330,6 +2387,17 @@ apply_dispatch(StringInfo s) LogicalRepMsgType action = pq_getmsgbyte(s); ErrorContextCallback errcallback; + /* + * Skip all data-modification changes if we're skipping changes of this + * transaction. + */ + if (skipping_changes && + (action == LOGICAL_REP_MSG_INSERT || + action == LOGICAL_REP_MSG_UPDATE || + action == LOGICAL_REP_MSG_DELETE || + action == LOGICAL_REP_MSG_TRUNCATE)) + return; + /* * Push apply error context callback. Other fields will be filled * during applying the change. @@ -3789,3 +3857,91 @@ reset_logicalrep_error_context_rel(void) apply_error_callback_arg.relname = NULL; } } + +/* + * Start skipping changes of the transaction if the given XID matches the + * transaction ID specified by skip_xid option. + */ +static void +maybe_start_skipping_changes(TransactionId xid) +{ + Assert(!skipping_changes); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + if (!TransactionIdIsValid(MySubscription->skipxid) || + MySubscription->skipxid != xid) + return; + + skipping_changes = true; + ereport(LOG, + errmsg("start skipping logical replication transaction with xid %u", + xid)); +} + +/* + * Stop skipping changes and reset the skip XID. + * + * If origin_lsn and origin_committs are valid, we set origin state to the + * transaction commit that resets the skip XID so that we can start streaming + * from the transaction next to the one that we just skipped. + */ +static void +stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(skipping_changes); + Assert(TransactionIdIsValid(MySubscription->skipxid)); + Assert(in_remote_transaction); + + /* Stop skipping changes */ + skipping_changes = false; + ereport(LOG, + errmsg("done skipping logical replication transaction with xid %u", + MySubscription->skipxid)); + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Update the system catalog to reset the skip XID */ + if (!IsTransactionState()) + StartTransactionCommand(); + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + TransactionIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + /* Set subskipxid to null */ + nulls[Anum_pg_subscription_subskipxid - 1] = true; + replaces[Anum_pg_subscription_subskipxid - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + if (!XLogRecPtrIsInvalid(origin_lsn)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = origin_lsn; + replorigin_session_origin_timestamp = origin_committs; + } + + CommitTransactionCommand(); + pgstat_report_stat(false); +} diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 21061493ea..e5a95a02ec 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW char subtwophasestate; /* Stream two-phase transactions */ + TransactionId subskipxid BKI_FORCE_NULL; /* All changes associated with + this XID are skipped */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -103,6 +106,7 @@ typedef struct Subscription * binary format */ bool stream; /* Allow streaming in-progress transactions. */ char twophasestate; /* Allow streaming two-phase transactions */ + TransactionId skipxid; /* All changes of the XID are skipped */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index def9651b34..2c6d321284 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3658,7 +3658,8 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { - ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SET_OPTIONS, + ALTER_SUBSCRIPTION_RESET_OPTIONS, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -3675,6 +3676,7 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + TransactionId skip_xid; /* XID to skip */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index ad6b4e4bd3..11c9da4162 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -283,6 +283,30 @@ ERROR: unrecognized subscription parameter: "two_phase" ALTER SUBSCRIPTION regress_testsub SET (streaming = true); ERROR: cannot set streaming = true for two-phase enabled subscription ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967295); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid, synchronous_commit, binary, streaming); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967296); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = -1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); +ERROR: invalid transaction id +-- fail - RESET unsupporting +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ERROR: unrecognized subscription parameter: "connect" +ALTER SUBSCRIPTION regress_testsub RESET (enabled); +ERROR: unrecognized subscription parameter: "enabled" +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); +ERROR: RESET must not include values for parameters \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index b732871407..1db0a6d22f 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -217,6 +217,26 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967295); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid, synchronous_commit, binary, streaming); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); + +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967296); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = -1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); + +-- fail - RESET unsupporting +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ALTER SUBSCRIPTION regress_testsub RESET (enabled); + +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); + \dRs+ DROP SUBSCRIPTION regress_testsub; diff --git a/src/test/subscription/t/023_skip_xact.pl b/src/test/subscription/t/023_skip_xact.pl new file mode 100644 index 0000000000..7b29828cce --- /dev/null +++ b/src/test/subscription/t/023_skip_xact.pl @@ -0,0 +1,185 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test skipping logical replication transactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 8; + +sub test_subscription_error +{ + my ($node, $expected, $source, $relname, $msg) = @_; + + # Wait for the error statistics to be updated. + $node->poll_query_until( + 'postgres', qq[ +SELECT count(1) > 0 FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]) or die "Timed out while waiting for statistics to be updated"; + + my $result = $node->safe_psql( + 'postgres', + qq[ +SELECT datname, subname, command, relid::regclass, failure_source, failure_count > 0 +FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]); + is($result, $expected, $msg); +} + +# Create publisher node. +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); + +# don't overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + 'wal_retrieve_retry_interval = 5s'); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we create +# the same tables but with primary keys. Also, insert some data that will conflict +# with the data replicated from publisher later. +$node_publisher->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int); +CREATE TABLE test_tab2 (a int); +CREATE TABLE test_tab_streaming (a int, b text); +INSERT INTO test_tab1 VALUES (1); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +CREATE TABLE test_tab2 (a int primary key); +CREATE TABLE test_tab_streaming (a int primary key, b text); +INSERT INTO test_tab2 VALUES (1); +INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text); +COMMIT; +]); + +# Check if there is no subscription errors before starting logical replication. +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, qq(0), 'check no subscription error'); + +# Setup logical replication. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + q[ +CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2; +CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming; +]); + +# Start logical replication. The table sync for test_tab2 on tap_sub will enter +# infinite error due to violating the unique constraint. +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (two_phase = on);"); +my $appname_streaming = 'tap_sub_streaming'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on);"); + + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Also wait for initial table sync for test_tab1 and test_tab_streaming to finish. +$node_subscriber->poll_query_until('postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription_rel +WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate = 'r' +]) or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM test_tab1"); +is($result, q(1), 'check initial data was copied to subscriber'); + +# Insert more data to test_tab1, raising an error on the subscriber due to violating +# the unique constraint on test_tab1. +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab1 VALUES (1)"); + +# Insert enough rows to test_tab_streaming to exceed the 64kB limit, also raising an +# error on the subscriber for the same reason. +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);"); + +# Check both two errors on tap_sub subscription are reported. +test_subscription_error($node_subscriber, qq(postgres|tap_sub|INSERT|test_tab1|apply|t), + 'apply', 'test_tab1', 'error reporting by the apply worker'); +test_subscription_error($node_subscriber, qq(postgres|tap_sub||test_tab2|tablesync|t), + 'tablesync', 'test_tab2', 'error reporting by the table sync worker'); +test_subscription_error($node_subscriber, qq(postgres|tap_sub_streaming|INSERT|test_tab_streaming|apply|t), + 'apply', 'test_tab_streaming', 'error reporting by the apply worker'); + +# Set XIDs of the transactions in question to the subscriptions to skip. +my $skip_xid1 = $node_subscriber->safe_psql( + 'postgres', + "SELECT xid FROM pg_stat_subscription_errors WHERE relid = 'test_tab1'::regclass"); +my $skip_xid2 = $node_subscriber->safe_psql( + 'postgres', + "SELECT xid FROM pg_stat_subscription_errors WHERE relid = 'test_tab_streaming'::regclass"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (skip_xid = $skip_xid1)"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_streaming SET (skip_xid = $skip_xid2)"); + +# Restart the subscriber to restart logical replication without interval. +$node_subscriber->restart; + +# Wait for the transaction in question is skipped. +$node_subscriber->poll_query_until( + 'postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription +WHERE subname in ('tap_sub', 'tap_sub_streaming') AND subskipxid IS NULL +]) or die "Timed out while waiting for the transaction to be skipped"; + +# Insert data to test_tab1 that doesn't conflict. +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab1 VALUES (2)"); + +# Also, insert data to test_tab_streaming that doesn't conflict. +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab_streaming VALUES (10001, md5(10001::text))"); + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Check the data is successfully replicated after skipping the transaction. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_tab1"); +is($result, q(1 +2), "subscription gets changes after skipped transaction"); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM test_tab_streaming"); +is($result, q(2), "subscription gets changes after skipped transaction"); + +# Check if the view doesn't show any entries after dropping the subscription. +$node_subscriber->safe_psql( + 'postgres', + q[ +DROP SUBSCRIPTION tap_sub; +DROP SUBSCRIPTION tap_sub_streaming; +]); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, q(0), 'no error after dropping subscription'); -- 2.24.3 (Apple Git-128)