From 202e813150db4b13ed3b4002a82b235622b7968b Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 28 Jun 2021 13:18:58 +0900 Subject: [PATCH v4 3/3] Add skip_xid option to ALTER SUBSCRIPTION. If incoming change violates any constraint, lotigcal replication stops until it's resolved. This commit introduces another way to skip the transaction in question. The user can specify XID by ALTER SUBSCRIPTION ... SET (skip_xid = XXX), updating pg_subscription.subskipxid field, telling the apply worker to skip the transaction. The apply worker skips all data modification changes within the specified transaction. After skipping the transaciton the apply worker clears subskipxid. Also it clear the error statistics of the subscription in pg_stat_subscription_errors system view. To reset skip_xid parameter (and other paremeters), this commits also adds RESET command to ALTER SUBSCRIPTION command. --- doc/src/sgml/logical-replication.sgml | 49 +++- doc/src/sgml/ref/alter_subscription.sgml | 46 +++- src/backend/catalog/pg_subscription.c | 10 + src/backend/commands/subscriptioncmds.c | 146 +++++++++-- src/backend/parser/gram.y | 11 +- src/backend/postmaster/pgstat.c | 41 +++- src/backend/replication/logical/worker.c | 273 +++++++++++++++++---- src/include/catalog/pg_subscription.h | 4 + src/include/nodes/parsenodes.h | 4 +- src/include/pgstat.h | 4 +- src/test/regress/expected/subscription.out | 22 ++ src/test/regress/sql/subscription.sql | 19 ++ src/test/subscription/t/023_skip_xact.pl | 185 ++++++++++++++ 13 files changed, 729 insertions(+), 85 deletions(-) create mode 100644 src/test/subscription/t/023_skip_xact.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 88646bc859..992d8b4ac1 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -333,20 +333,63 @@ A conflict will produce an error and will stop the replication; it must be resolved manually by the user. Details about the conflict can be found in - the subscriber's server log. + as well as the + subscriber's server log. The resolution can be done either by changing data on the subscriber so that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the + transaction that conflicts with the existing data. When a conflict produce + an error, it is shown in pg_stat_subscription_errors + view as follows: + + + +postgres=# SELECT * FROM pg_stat_subscription_errors; +-[ RECORD 1 ]--------+----------------------------------------------------------- +datname | postgres +subid | 16395 +subname | test_sub +relid | 16385 +command | INSERT +xid | 716 +failure_source | apply +failure_count | 50 +last_failure | 2021-07-21 21:16:02.781779+00 +last_failure_message | duplicate key value violates unique constraint "test_pkey" +stats_reset | + + + + and it is also shown in subscriber's server log as follows: + + + +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (id)=(1) already exists. +CONTEXT: during apply of "INSERT" for relation "public.test" in transaction with xid 716 committs 2021-07-15 21:54:58.802874+00 + + + + The transaction ID that contains the change violating the constraint can be + found from those outputs (transaction ID 740 in the above case). The transaction + can be skipped by setting skip_xid to the subscription + by ALTER SUBSCRIPTION ... SET. Alternatively, the transaction + can also be skipped by calling the pg_replication_origin_advance() function with a node_name corresponding to the subscription name, and a position. The current position of origins can be seen in the pg_replication_origin_status system view. + + + In either way, those should be used as a last resort. They skip the whole + transaction including changes that may not violate any constraint and easily + make subscriber inconsistent if a user specifies the wrong transaction ID or + the position of origin. + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a6f994450d..591f554fc7 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -29,6 +29,7 @@ ALTER SUBSCRIPTION name REFRESH PUB ALTER SUBSCRIPTION name ENABLE ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) +ALTER SUBSCRIPTION name RESET ( subscription_parameter [, ... ] ) ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER SUBSCRIPTION name RENAME TO new_name @@ -192,15 +193,46 @@ ALTER SUBSCRIPTION name RENAME TO < SET ( subscription_parameter [= value] [, ... ] ) + RESET ( subscription_parameter [, ... ] ) - This clause alters parameters originally set by - . See there for more - information. The parameters that can be altered - are slot_name, - synchronous_commit, - binary, and - streaming. + This clause sets or resets a subscription option. The parameters that can be + set are the parameters originally set by : + slot_name, synchronous_commit, + binary, streaming, and following + parameter: + + + + + skip_xid (xid) + + + If incoming data violates any constraints the logical replication + will stop until it is resolved. The resolution can be done either + by changing data on the subscriber so that it doesn't conflict with + incoming change or by skipping the whole transaction. This option + specifies transaction ID that logical replication worker skips to + apply. The logical replication worker skips all data modification + changes within the specified transaction. Therefore, since it skips + the whole transaction including the changes that may not violate the + constraint, it should only be used as a last resort. This option has + no effect for the transaction that is already prepared with enabling + two_phase on susbscriber. After the logical + replication successfully skips the transaction, the transaction ID + (stored in + pg_subscription.subskipxid) + is cleared. See for + the details of logical replication conflicts. + + + + + + + The parameters that can be reset are: streaming, + binary, synchronous_commit, + and skip_xid. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d76bdff36a..8ecc55150e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,16 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); + /* Get skip XID */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subskipxid, + &isnull); + if (!isnull) + sub->skipxid = DatumGetTransactionId(datum); + else + sub->skipxid = InvalidTransactionId; + ReleaseSysCache(tup); return sub; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index da02d3bbfa..0cc965c056 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,6 +60,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_SKIP_XID 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -81,6 +82,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + TransactionId skip_xid; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -99,7 +101,8 @@ static void ReportSlotConnectionError(HTAB *rstates, Oid subid, char *slotname, */ static void parse_subscription_options(ParseState *pstate, List *stmt_options, - bits32 supported_opts, SubOpts *opts) + bits32 supported_opts, SubOpts *opts, + bool is_reset) { ListCell *lc; @@ -128,12 +131,23 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_SKIP_XID)) + opts->skip_xid = InvalidTransactionId; /* Parse options */ foreach(lc, stmt_options) { DefElem *defel = (DefElem *) lfirst(lc); + if (is_reset) + { + if (defel->arg != NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("RESET must not include values for parameters"))); + + } + if (IsSet(supported_opts, SUBOPT_CONNECT) && strcmp(defel->defname, "connect") == 0) { @@ -141,7 +155,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CONNECT; - opts->connect = defGetBoolean(defel); + if (!is_reset) + opts->connect = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_ENABLED) && strcmp(defel->defname, "enabled") == 0) @@ -150,7 +165,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_ENABLED; - opts->enabled = defGetBoolean(defel); + if (!is_reset) + opts->enabled = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) && strcmp(defel->defname, "create_slot") == 0) @@ -159,7 +175,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CREATE_SLOT; - opts->create_slot = defGetBoolean(defel); + if (!is_reset) + opts->create_slot = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) && strcmp(defel->defname, "slot_name") == 0) @@ -168,7 +185,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SLOT_NAME; - opts->slot_name = defGetString(defel); + if (!is_reset) + opts->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ if (strcmp(opts->slot_name, "none") == 0) @@ -183,7 +201,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_COPY_DATA; - opts->copy_data = defGetBoolean(defel); + if (!is_reset) + opts->copy_data = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && strcmp(defel->defname, "synchronous_commit") == 0) @@ -192,12 +211,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; - opts->synchronous_commit = defGetString(defel); + if (!is_reset) + { + opts->synchronous_commit = defGetString(defel); - /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", opts->synchronous_commit, - PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, - false, 0, false); + /* + * Test if the given value is valid for synchronous_commit + * GUC. + */ + (void) set_config_option("synchronous_commit", opts->synchronous_commit, + PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, + false, 0, false); + } } else if (IsSet(supported_opts, SUBOPT_REFRESH) && strcmp(defel->defname, "refresh") == 0) @@ -206,7 +231,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_REFRESH; - opts->refresh = defGetBoolean(defel); + if (!is_reset) + opts->refresh = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_BINARY) && strcmp(defel->defname, "binary") == 0) @@ -215,7 +241,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_BINARY; - opts->binary = defGetBoolean(defel); + if (!is_reset) + opts->binary = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_STREAMING) && strcmp(defel->defname, "streaming") == 0) @@ -224,7 +251,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_STREAMING; - opts->streaming = defGetBoolean(defel); + if (!is_reset) + opts->streaming = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -245,7 +273,31 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; - opts->twophase = defGetBoolean(defel); + if (!is_reset) + opts->twophase = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "skip_xid") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_SKIP_XID)) + errorConflictingDefElem(defel, pstate); + + if (!is_reset) + { + char *xid_str = defGetString(defel); + TransactionId xid; + + /* Parse the argument as TransactionId */ + xid = DatumGetTransactionId(DirectFunctionCall1(xidin, + CStringGetDatum(xid_str))); + + if (!TransactionIdIsNormal(xid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid transaction id"))); + opts->skip_xid = xid; + } + + opts->specified_opts |= SUBOPT_SKIP_XID; } else ereport(ERROR, @@ -416,7 +468,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); - parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, supported_opts, &opts, + false); /* * Since creating a replication slot is not transactional, rolling back @@ -489,6 +542,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + nulls[Anum_pg_subscription_subskipxid - 1] = true; values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -885,14 +939,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, switch (stmt->kind) { - case ALTER_SUBSCRIPTION_OPTIONS: + case ALTER_SUBSCRIPTION_SET_OPTIONS: { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_SKIP_XID); parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { @@ -944,14 +998,60 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(opts.skip_xid); + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + update_tuple = true; break; } + case ALTER_SUBSCRIPTION_RESET_OPTIONS: + { + supported_opts = (SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_STREAMING | + SUBOPT_BINARY | SUBOPT_SKIP_XID); + + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts, true); + + if (IsSet(opts.specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) + { + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum("off"); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_substream - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_BINARY)) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + nulls[Anum_pg_subscription_subskipxid - 1] = + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + + update_tuple = true; + break; + } case ALTER_SUBSCRIPTION_ENABLED: { parse_subscription_options(pstate, stmt->options, - SUBOPT_ENABLED, &opts); + SUBOPT_ENABLED, &opts, false); + Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED)); if (!sub->slotname && opts.enabled) @@ -986,7 +1086,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -1036,7 +1136,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts |= SUBOPT_COPY_DATA; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); values[Anum_pg_subscription_subpublications - 1] = @@ -1084,7 +1184,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA, &opts, false); /* * The subscription option "two_phase" requires that diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 39a2849eba..bcf85e8980 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9707,7 +9707,16 @@ AlterSubscriptionStmt: { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_OPTIONS; + n->kind = ALTER_SUBSCRIPTION_SET_OPTIONS; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name RESET definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_RESET_OPTIONS; n->subname = $3; n->options = $5; $$ = (Node *)n; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 4a35e6640f..d3f0a2ea2f 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1699,6 +1699,27 @@ pgstat_reset_subscription_error(Oid subid, Oid subrelid) msg.m_subid = subid; msg.m_subrelid = subrelid; msg.m_reset = true; + msg.m_clear = true; + + pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_reset) + sizeof(bool)); +} + +/* ---------- + * pgstat_clear_subscription_error() - + * + * Tell the collector about clear the error of subscription. + * ---------- + */ +void +pgstat_clear_subscription_error(Oid subid, Oid subrelid) +{ + PgStat_MsgSubscriptionErr msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR); + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_reset = false; + msg.m_clear = true; pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_reset) + sizeof(bool)); } @@ -2046,6 +2067,7 @@ pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid, msg.m_databaseid = MyDatabaseId; msg.m_relid = relid; msg.m_reset = false; + msg.m_clear = false; msg.m_command = command; msg.m_xid = xid; msg.m_last_failure = GetCurrentTimestamp(); @@ -6078,26 +6100,37 @@ static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len) { PgStat_StatSubRelErrEntry *relerrent; - bool create = !msg->m_reset; + bool create = !(msg->m_reset || msg->m_clear); /* Get subscription error */ relerrent = pgstat_get_subscription_rel_error_entry(msg->m_subid, msg->m_subrelid, create); - if (msg->m_reset) + if (msg->m_reset || msg->m_clear) { + Assert(!(msg->m_reset && msg->m_clear)); + if (relerrent) return; - /* reset fields and set reset timestamp */ + /* reset fields */ relerrent->relid = InvalidOid; relerrent->command = 0; relerrent->xid = InvalidTransactionId; relerrent->failure_count = 0; relerrent->last_failure = 0; relerrent->errmsg[0] = '\0'; - relerrent->stat_reset_timestamp = GetCurrentTimestamp(); + + /* + * If the reset is requested, reset more fields and set the reset + * timestamp. + */ + if (msg->m_reset) + { + relerrent->failure_count = 0; + relerrent->stat_reset_timestamp = GetCurrentTimestamp(); + } } else { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4f9c4e9014..56d78ba905 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -277,6 +278,21 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * skipping_changes is true if we're skipping all data modification changes of the + * specified transaction in MySubscription->skipxid copied to skipping_xid. Please + * note that we don’t skip receiving the changes particularly in streaming cases, + * since we decide whether or not to skip applying the changes when starting to apply. + * Once starting skipping changes, we copy XID to skipping_xid and then don't stop + * skipping until we skip the whole transaction even if the subscription is invalidated + * and MySubscription->skipxid gets changed or reset. When stopping the skipping + * behavior, we reset the skip XID (subskipxid) in the pg_subscription and associate + * origin status to the transaction that resets the skip XID so that we can start + * streaming from the next transaction. + */ +static bool skipping_changes = false; +static TransactionId skipping_xid = InvalidTransactionId; + /* * Hash table for storing the streaming xid information along with shared file * set for streaming and subxact files. @@ -360,6 +376,9 @@ static void set_apply_error_context_rel(LogicalRepRelMapEntry *rel); static void reset_apply_error_context_rel(void); static void reset_apply_error_context_info(void); +static void maybe_start_skipping_changes(TransactionId xid); +static void stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs); + /* * Should this worker apply changes for given relation. * @@ -857,6 +876,11 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + /* + * Start skipping all changes of this transaction if specified + */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -881,7 +905,18 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(&commit_data); + /* + * Stop the skipping transaction if enabled. Otherwise, commit the changes + * that are just applied. + */ + if (skipping_changes) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -910,6 +945,9 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -934,47 +972,57 @@ apply_handle_prepare(StringInfo s) LSN_FORMAT_ARGS(remote_final_lsn)))); /* - * Compute unique GID for two_phase transactions. We don't use GID of - * prepared transaction sent by server as that can lead to deadlock when - * we have multiple subscriptions from same node point to publications on - * the same node. See comments atop worker.c + * Prepare transaction if we haven't skipped the changes of this + * transaction. */ - TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, - gid, sizeof(gid)); + if (skipping_changes) + stop_skipping_changes(prepare_data.end_lsn, prepare_data.prepare_time); + else + { + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock + * when we have multiple subscriptions from same node point to + * publications on the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); - /* - * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. - * - * XXX, We can optimize such that at commit prepared time, we first check - * whether we have prepared the transaction or not but that doesn't seem - * worthwhile because such cases shouldn't be common. - */ - begin_replication_step(); + /* + * Unlike commit, here, we always prepare the transaction even though + * no change has happened in this transaction. It is done this way + * because at commit prepared time, we won't know whether we have + * skipped preparing a transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first + * check whether we have prepared the transaction or not but that + * doesn't seem worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); - /* - * BeginTransactionBlock is necessary to balance the EndTransactionBlock - * called within the PrepareTransactionBlock below. - */ - BeginTransactionBlock(); - CommitTransactionCommand(); /* Completes the preceding Begin command. */ + /* + * BeginTransactionBlock is necessary to balance the + * EndTransactionBlock called within the PrepareTransactionBlock + * below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.prepare_time; + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; - PrepareTransactionBlock(gid); - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + PrepareTransactionBlock(gid); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + } + store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1087,9 +1135,10 @@ apply_handle_origin(StringInfo s) { /* * ORIGIN message can only come inside streaming transaction or inside - * remote transaction and before any actual writes. + * remote transaction and before any actual writes unless we're skipping + * changes of the transaction. */ - if (!in_streamed_transaction && + if (!in_streamed_transaction && !skipping_changes && (!in_remote_transaction || (IsTransactionState() && !am_tablesync_worker()))) ereport(ERROR, @@ -1111,6 +1160,9 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + /* * Start a transaction on stream start, this transaction will be committed * on the stream stop unless it is a tablesync worker in which case it @@ -1123,9 +1175,6 @@ apply_handle_stream_start(StringInfo s) /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; - /* extract XID of the top-level transaction */ - stream_xid = logicalrep_read_stream_start(s, &first_segment); - if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1207,6 +1256,7 @@ apply_handle_stream_abort(StringInfo s) errmsg_internal("STREAM ABORT message without STREAM STOP"))); logicalrep_read_stream_abort(s, &xid, &subxid); + maybe_start_skipping_changes(xid); /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so @@ -1299,6 +1349,10 @@ apply_handle_stream_abort(StringInfo s) CommitTransactionCommand(); } + /* Stop the skipping transaction if enabled */ + if (skipping_changes) + stop_skipping_changes(InvalidXLogRecPtr, 0); + reset_apply_error_context_info(); } @@ -1309,11 +1363,11 @@ static void apply_handle_stream_commit(StringInfo s) { TransactionId xid; + LogicalRepCommitData commit_data; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1327,8 +1381,13 @@ apply_handle_stream_commit(StringInfo s) apply_error_callback_arg.remote_xid = xid; apply_error_callback_arg.committs = commit_data.committime; + remote_final_lsn = commit_data.commit_lsn; + elog(DEBUG1, "received commit for streamed transaction %u", xid); + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(xid); + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1360,13 +1419,12 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; - /* * Make sure the handle apply_dispatch methods are aware we're in a remote * transaction. */ in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); end_replication_step(); @@ -1439,7 +1497,17 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); - apply_handle_commit_internal(&commit_data); + if (skipping_changes) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + { + /* commit the streamed transaction */ + apply_handle_commit_internal(&commit_data); + } /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); @@ -1448,7 +1516,6 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); } @@ -2328,6 +2395,17 @@ apply_dispatch(StringInfo s) LogicalRepMsgType action = pq_getmsgbyte(s); ErrorContextCallback errcallback; + /* + * Skip all data-modification changes if we're skipping changes of this + * transaction. + */ + if (skipping_changes && + (action == LOGICAL_REP_MSG_INSERT || + action == LOGICAL_REP_MSG_UPDATE || + action == LOGICAL_REP_MSG_DELETE || + action == LOGICAL_REP_MSG_TRUNCATE)) + return; + /* * Push apply error context callback. Other fields will be filled during * applying the change. @@ -3788,3 +3866,108 @@ reset_logicalrep_error_context_rel(void) apply_error_callback_arg.relname = NULL; } } + +/* + * Start skipping changes of the transaction if the given XID matches the + * transaction ID specified by skip_xid option. + */ +static void +maybe_start_skipping_changes(TransactionId xid) +{ + Assert(!skipping_changes); + Assert(!TransactionIdIsValid(skipping_xid)); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + if (!TransactionIdIsValid(MySubscription->skipxid) || + MySubscription->skipxid != xid) + return; + + skipping_changes = true; + skipping_xid = xid; + ereport(LOG, + errmsg("start skipping logical replication transaction with xid %u", + skipping_xid)); +} + +/* + * Stop skipping changes and reset the skip XID. + * + * If origin_lsn and origin_committs are valid, we set origin state to the + * transaction commit that resets the skip XID so that we can start streaming + * from the transaction next to the one that we just skipped. + */ +static void +stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(skipping_changes); + Assert(TransactionIdIsValid(skipping_xid)); + Assert(in_remote_transaction); + + ereport(LOG, + (errmsg("done skipping logical replication transaction with xid %u", + skipping_xid))); + + /* Stop skipping changes */ + skipping_changes = false; + skipping_xid = InvalidTransactionId; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + if (!IsTransactionState()) + StartTransactionCommand(); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + TransactionIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + /* Set subskipxid to null */ + nulls[Anum_pg_subscription_subskipxid - 1] = true; + replaces[Anum_pg_subscription_subskipxid - 1] = true; + + /* Update the system catalog to reset the skip XID */ + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + if (!XLogRecPtrIsInvalid(origin_lsn)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = origin_lsn; + replorigin_session_origin_timestamp = origin_committs; + } + + CommitTransactionCommand(); + pgstat_report_stat(false); + + /* + * Clear the error statistics of this subscription to let users know the + * subscription is no longer getting stuck by the conflict. + * + * The message for clearing the error statistics can be lost but that's + * okay. The user can know the logical replication is working fine in + * other ways, for example, checking pg_stat_subscription view. And the + * user is able to reset the single subscription error statistics by + * pg_reset_subscription_error SQL function. + */ + pgstat_clear_subscription_error(MySubscription->oid, InvalidOid); +} diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 21061493ea..beaa6e646d 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW char subtwophasestate; /* Stream two-phase transactions */ + TransactionId subskipxid BKI_FORCE_NULL; /* All changes associated with + * this XID are skipped */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -103,6 +106,7 @@ typedef struct Subscription * binary format */ bool stream; /* Allow streaming in-progress transactions. */ char twophasestate; /* Allow streaming two-phase transactions */ + TransactionId skipxid; /* All changes of the XID are skipped */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e28248af32..af5c16abfa 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3659,7 +3659,8 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { - ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SET_OPTIONS, + ALTER_SUBSCRIPTION_RESET_OPTIONS, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -3676,6 +3677,7 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + TransactionId skip_xid; /* XID to skip */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 1104886bef..4a1185a4f6 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -563,9 +563,10 @@ typedef struct PgStat_MsgSubscriptionErr Oid m_subid; Oid m_subrelid; - /* The clear messages use below field */ + /* The clear messages use below fields */ bool m_reset; /* clear all fields and set reset_stats * timestamp */ + bool m_clear; /* clear all fields except for total_failure */ /* The error report message uses below fields */ Oid m_databaseid; @@ -1101,6 +1102,7 @@ extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type t extern void pgstat_reset_slru_counter(const char *); extern void pgstat_reset_replslot_counter(const char *name); extern void pgstat_reset_subscription_error(Oid subid, Oid subrelid); +extern void pgstat_clear_subscription_error(Oid subid, Oid subrelid); extern void pgstat_report_autovac(Oid dboid); extern void pgstat_report_vacuum(Oid tableoid, bool shared, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 67f92b3878..e2ec685f78 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -286,6 +286,28 @@ ERROR: unrecognized subscription parameter: "two_phase" ALTER SUBSCRIPTION regress_testsub SET (streaming = true); ERROR: cannot set streaming = true for two-phase enabled subscription ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = '4294967295'); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid, synchronous_commit, binary, streaming); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1.1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); +ERROR: invalid transaction id +-- fail - unsupported parameters +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ERROR: unrecognized subscription parameter: "connect" +ALTER SUBSCRIPTION regress_testsub RESET (enabled); +ERROR: unrecognized subscription parameter: "enabled" +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); +ERROR: RESET must not include values for parameters \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 88743ab33b..2412b28422 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -220,6 +220,25 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = '4294967295'); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid, synchronous_commit, binary, streaming); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); + +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1.1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); + +-- fail - unsupported parameters +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ALTER SUBSCRIPTION regress_testsub RESET (enabled); + +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); + \dRs+ DROP SUBSCRIPTION regress_testsub; diff --git a/src/test/subscription/t/023_skip_xact.pl b/src/test/subscription/t/023_skip_xact.pl new file mode 100644 index 0000000000..7b29828cce --- /dev/null +++ b/src/test/subscription/t/023_skip_xact.pl @@ -0,0 +1,185 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test skipping logical replication transactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 8; + +sub test_subscription_error +{ + my ($node, $expected, $source, $relname, $msg) = @_; + + # Wait for the error statistics to be updated. + $node->poll_query_until( + 'postgres', qq[ +SELECT count(1) > 0 FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]) or die "Timed out while waiting for statistics to be updated"; + + my $result = $node->safe_psql( + 'postgres', + qq[ +SELECT datname, subname, command, relid::regclass, failure_source, failure_count > 0 +FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]); + is($result, $expected, $msg); +} + +# Create publisher node. +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); + +# don't overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + 'wal_retrieve_retry_interval = 5s'); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we create +# the same tables but with primary keys. Also, insert some data that will conflict +# with the data replicated from publisher later. +$node_publisher->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int); +CREATE TABLE test_tab2 (a int); +CREATE TABLE test_tab_streaming (a int, b text); +INSERT INTO test_tab1 VALUES (1); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +CREATE TABLE test_tab2 (a int primary key); +CREATE TABLE test_tab_streaming (a int primary key, b text); +INSERT INTO test_tab2 VALUES (1); +INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text); +COMMIT; +]); + +# Check if there is no subscription errors before starting logical replication. +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, qq(0), 'check no subscription error'); + +# Setup logical replication. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + q[ +CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2; +CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming; +]); + +# Start logical replication. The table sync for test_tab2 on tap_sub will enter +# infinite error due to violating the unique constraint. +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (two_phase = on);"); +my $appname_streaming = 'tap_sub_streaming'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on);"); + + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Also wait for initial table sync for test_tab1 and test_tab_streaming to finish. +$node_subscriber->poll_query_until('postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription_rel +WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate = 'r' +]) or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM test_tab1"); +is($result, q(1), 'check initial data was copied to subscriber'); + +# Insert more data to test_tab1, raising an error on the subscriber due to violating +# the unique constraint on test_tab1. +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab1 VALUES (1)"); + +# Insert enough rows to test_tab_streaming to exceed the 64kB limit, also raising an +# error on the subscriber for the same reason. +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);"); + +# Check both two errors on tap_sub subscription are reported. +test_subscription_error($node_subscriber, qq(postgres|tap_sub|INSERT|test_tab1|apply|t), + 'apply', 'test_tab1', 'error reporting by the apply worker'); +test_subscription_error($node_subscriber, qq(postgres|tap_sub||test_tab2|tablesync|t), + 'tablesync', 'test_tab2', 'error reporting by the table sync worker'); +test_subscription_error($node_subscriber, qq(postgres|tap_sub_streaming|INSERT|test_tab_streaming|apply|t), + 'apply', 'test_tab_streaming', 'error reporting by the apply worker'); + +# Set XIDs of the transactions in question to the subscriptions to skip. +my $skip_xid1 = $node_subscriber->safe_psql( + 'postgres', + "SELECT xid FROM pg_stat_subscription_errors WHERE relid = 'test_tab1'::regclass"); +my $skip_xid2 = $node_subscriber->safe_psql( + 'postgres', + "SELECT xid FROM pg_stat_subscription_errors WHERE relid = 'test_tab_streaming'::regclass"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (skip_xid = $skip_xid1)"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_streaming SET (skip_xid = $skip_xid2)"); + +# Restart the subscriber to restart logical replication without interval. +$node_subscriber->restart; + +# Wait for the transaction in question is skipped. +$node_subscriber->poll_query_until( + 'postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription +WHERE subname in ('tap_sub', 'tap_sub_streaming') AND subskipxid IS NULL +]) or die "Timed out while waiting for the transaction to be skipped"; + +# Insert data to test_tab1 that doesn't conflict. +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab1 VALUES (2)"); + +# Also, insert data to test_tab_streaming that doesn't conflict. +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab_streaming VALUES (10001, md5(10001::text))"); + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Check the data is successfully replicated after skipping the transaction. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_tab1"); +is($result, q(1 +2), "subscription gets changes after skipped transaction"); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM test_tab_streaming"); +is($result, q(2), "subscription gets changes after skipped transaction"); + +# Check if the view doesn't show any entries after dropping the subscription. +$node_subscriber->safe_psql( + 'postgres', + q[ +DROP SUBSCRIPTION tap_sub; +DROP SUBSCRIPTION tap_sub_streaming; +]); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, q(0), 'no error after dropping subscription'); -- 2.24.3 (Apple Git-128)