From 24126e3765fec4f9706185c6b7e35b1da14ae653 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 2 Aug 2021 14:27:40 +0900 Subject: [PATCH v11 4/5] Add skip_xid option to ALTER SUBSCRIPTION. If incoming change violates any constraint, logical replication stops until it's resolved. This commit introduces another way to skip the transaction in question. The user can specify XID by ALTER SUBSCRIPTION ... SET (skip_xid = XXX), updating pg_subscription.subskipxid field, telling the apply worker to skip the transaction. The apply worker skips all data modification changes within the specified transaction. After skipping the transaciton the apply worker clears pg_subscription.subskipxid. Also, it clears the error statistics of the subscription in pg_stat_subscription_errors system view as well in order the user not to get confused. It's done by sending the message for clearing a subscription error to the stats collector. --- doc/src/sgml/logical-replication.sgml | 49 ++++- doc/src/sgml/ref/alter_subscription.sgml | 32 ++- src/backend/catalog/pg_subscription.c | 10 + src/backend/commands/subscriptioncmds.c | 45 +++- src/backend/postmaster/pgstat.c | 44 +++- src/backend/replication/logical/worker.c | 201 ++++++++++++++++- src/include/catalog/pg_subscription.h | 4 + src/include/nodes/parsenodes.h | 1 + src/include/pgstat.h | 7 +- src/test/regress/expected/subscription.out | 13 ++ src/test/regress/sql/subscription.sql | 11 + src/test/subscription/t/024_skip_xact.pl | 244 +++++++++++++++++++++ 12 files changed, 636 insertions(+), 25 deletions(-) create mode 100644 src/test/subscription/t/024_skip_xact.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 88646bc859..d558dcfe81 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -333,20 +333,63 @@ A conflict will produce an error and will stop the replication; it must be resolved manually by the user. Details about the conflict can be found in - the subscriber's server log. + as well as the + subscriber's server log. The resolution can be done either by changing data on the subscriber so that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the + transaction that conflicts with the existing data. When a conflict produce + an error, it is shown in pg_stat_subscription_errors + view as follows: + + + +postgres=# SELECT * FROM pg_stat_subscription_errors; +-[ RECORD 1 ]--------+----------------------------------------------------------- +datname | postgres +subid | 16395 +subname | test_sub +relid | 16385 +command | INSERT +xid | 716 +failure_source | apply +failure_count | 50 +last_failure | 2021-07-21 21:16:02.781779+00 +last_failure_message | duplicate key value violates unique constraint "test_pkey" +stats_reset | + + + + and it is also shown in subscriber's server log as follows: + + + +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (id)=(1) already exists. +CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 716 with commit timestamp 2021-07-15 21:54:58.802874+00 + + + + The transaction ID that contains the change violating the constraint can be + found from those outputs (transaction ID 740 in the above case). The transaction + can be skipped by setting skip_xid to the subscription + by ALTER SUBSCRIPTION ... SET. Alternatively, the transaction + can also be skipped by calling the pg_replication_origin_advance() function with a node_name corresponding to the subscription name, and a position. The current position of origins can be seen in the pg_replication_origin_status system view. + + + In either way, those should be used as a last resort. They skip the whole + transaction including changes that may not violate any constraint and easily + make subscriber inconsistent if a user specifies the wrong transaction ID or + the position of origin. + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 8c3c28b7e7..cfb318e08c 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -205,8 +205,36 @@ ALTER SUBSCRIPTION name RENAME TO < streaming. - The parameters that can be reset are: streaming, - binary, synchronous_commit. + The parameters that can be reset are: slot_name, + synchronous_commit, binary, + streaming, and following parameter: + + + + + skip_xid (xid) + + + If incoming data violates any constraints the logical replication + will stop until it is resolved. The resolution can be done either + by changing data on the subscriber so that it doesn't conflict with + incoming change or by skipping the whole transaction. This option + specifies transaction ID that logical replication worker skips to + apply. The logical replication worker skips all data modification + changes within the specified transaction. Therefore, since it skips + the whole transaction including the changes that may not violate the + constraint, it should only be used as a last resort. This option has + no effect for the transaction that is already prepared with enabling + two_phase on susbscriber. After the logical + replication successfully skips the transaction, the transaction ID + (stored in + pg_subscription.subskipxid) + is cleared. See for + the details of logical replication conflicts. + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 25021e25a4..cb22cd7463 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,16 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); + /* Get skip XID */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subskipxid, + &isnull); + if (!isnull) + sub->skipxid = DatumGetTransactionId(datum); + else + sub->skipxid = InvalidTransactionId; + ReleaseSysCache(tup); return sub; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index cc390ce95a..188f3e42fd 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,6 +60,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_SKIP_XID 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -81,6 +82,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + TransactionId skip_xid; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -129,6 +131,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_SKIP_XID)) + opts->skip_xid = InvalidTransactionId; /* Parse options */ foreach(lc, stmt_options) @@ -261,6 +265,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; opts->twophase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "skip_xid") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_SKIP_XID)) + errorConflictingDefElem(defel, pstate); + + if (!is_reset) + { + char *xid_str = defGetString(defel); + TransactionId xid; + + /* Parse the argument as TransactionId */ + xid = DatumGetTransactionId(DirectFunctionCall1(xidin, + CStringGetDatum(xid_str))); + + if (!TransactionIdIsNormal(xid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid transaction id"))); + opts->skip_xid = xid; + } + + opts->specified_opts |= SUBOPT_SKIP_XID; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -485,6 +512,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + nulls[Anum_pg_subscription_subskipxid - 1] = true; values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -885,7 +913,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_SKIP_XID); parse_subscription_options(pstate, stmt->options, supported_opts, &opts, false); @@ -934,6 +962,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(opts.skip_xid); + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + update_tuple = true; break; } @@ -941,7 +976,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, case ALTER_SUBSCRIPTION_RESET_OPTIONS: { supported_opts = (SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_SKIP_XID); parse_subscription_options(pstate, stmt->options, supported_opts, &opts, true); @@ -967,6 +1002,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + nulls[Anum_pg_subscription_subskipxid - 1] = + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 7e3938d0d3..911a031b60 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1743,11 +1743,32 @@ pgstat_reset_subscription_error(Oid subid, Oid subrelid) pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR); msg.m_subid = subid; msg.m_subrelid = subrelid; + msg.m_clear = false; msg.m_reset = true; pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_reset) + sizeof(bool)); } +/* ---------- + * pgstat_clear_subscription_error() - + * + * Tell the collector about clear the error of subscription. + * ---------- + */ +void +pgstat_clear_subscription_error(Oid subid, Oid subrelid) +{ + PgStat_MsgSubscriptionErr msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR); + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_clear = true; + msg.m_reset = false; + + pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_reset) + sizeof(bool)); +} + /* ---------- * pgstat_report_autovac() - * @@ -2034,6 +2055,7 @@ pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid, msg.m_subid = subid; msg.m_subrelid = subrelid; msg.m_reset = false; + msg.m_clear = false; msg.m_databaseid = MyDatabaseId; msg.m_relid = relid; msg.m_command = command; @@ -6134,27 +6156,37 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len) { + PgStat_StatSubErrEntry *errent; - bool create = !msg->m_reset; + bool create = !(msg->m_reset || msg->m_clear); /* Get subscription error */ errent = pgstat_get_subscription_error_entry(msg->m_subid, msg->m_subrelid, create); - if (msg->m_reset) + if (msg->m_reset || msg->m_clear) { + Assert(!(msg->m_reset && msg->m_clear)); + if (errent == NULL) return; - /* reset fields and set reset timestamp */ errent->relid = InvalidOid; errent->command = 0; errent->xid = InvalidTransactionId; errent->failure_count = 0; - errent->last_failure = 0; - errent->last_errmsg[0] = '\0'; - errent->stat_reset_timestamp = GetCurrentTimestamp(); + + /* + * If the reset is requested, reset more fields and set the reset + * timestamp. + */ + if (msg->m_reset) + { + errent->last_failure = 0; + errent->last_errmsg[0] = '\0'; + errent->stat_reset_timestamp = GetCurrentTimestamp(); + } } else { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ebe6f53b5d..ae6c9f147c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -269,6 +270,21 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * skipping_xid is a valid XID if we're skipping all data modification changes + * (INSERT/DELETE/UPDATE/TRUNCATE) of the specified transaction in MySubscription->skipxid. + * Please note that we don’t skip receiving the changes particularly in streaming + * cases, since we decide whether or not to skip applying the changes when starting + * to apply. Once starting skipping changes, we copy the XID to skipping_xid and + * then don't stop skipping until we skip the whole transaction even if the + * subscription is invalidated and* MySubscription->skipxid gets changed or reset. + * When stopping the skipping behavior, we reset the skip XID (subskipxid) in the + * pg_subscription catalog and associate origin status to the transaction that resets + * the skip XID so that we can start streaming from the next transaction. + */ +static TransactionId skipping_xid = InvalidTransactionId; +#define is_skipping_changes() (TransactionIdIsValid(skipping_xid)) + /* * Hash table for storing the streaming xid information along with shared file * set for streaming and subxact files. @@ -355,6 +371,9 @@ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts); static inline void reset_apply_error_context_info(void); +static void maybe_start_skipping_changes(TransactionId xid); +static void stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs); + /* * Should this worker apply changes for given relation. * @@ -809,6 +828,11 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + /* + * Enable skipping all changes of this transaction if specified. + */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -833,7 +857,18 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(&commit_data); + /* + * Stop the skipping transaction if enabled. Otherwise, commit the changes + * that are just applied. + */ + if (is_skipping_changes()) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -861,6 +896,9 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + /* Enable skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -919,9 +957,10 @@ apply_handle_prepare(StringInfo s) /* * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. + * change has happened in this transaction, possibly because we're + * skipping data-modification changes of this transaction. It is done this + * way because at commit prepared time, we won't know whether we have + * skipped preparing a transaction because of no change. * * XXX, We can optimize such that at commit prepared time, we first check * whether we have prepared the transaction or not but that doesn't seem @@ -935,6 +974,10 @@ apply_handle_prepare(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); + /* Stop the skipping changes if enabled */ + if (is_skipping_changes()) + stop_skipping_changes(InvalidXLogRecPtr, 0); + store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; @@ -1066,6 +1109,9 @@ apply_handle_stream_prepare(StringInfo s) elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); + /* Enable skipping all changes of this transaction if specified. */ + maybe_start_skipping_changes(prepare_data.xid); + /* Replay all the spooled operations. */ apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); @@ -1076,6 +1122,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); + /* Stop the skipping changes if enabled */ + if (is_skipping_changes()) + stop_skipping_changes(InvalidXLogRecPtr, 0); + store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; @@ -1101,9 +1151,10 @@ apply_handle_origin(StringInfo s) { /* * ORIGIN message can only come inside streaming transaction or inside - * remote transaction and before any actual writes. + * remote transaction and before any actual writes unless we're skipping + * changes of the transaction. */ - if (!in_streamed_transaction && + if (!in_streamed_transaction && !is_skipping_changes() && (!in_remote_transaction || (IsTransactionState() && !am_tablesync_worker()))) ereport(ERROR, @@ -1125,6 +1176,9 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + /* * Start a transaction on stream start, this transaction will be committed * on the stream stop unless it is a tablesync worker in which case it @@ -1137,9 +1191,6 @@ apply_handle_stream_start(StringInfo s) /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; - /* extract XID of the top-level transaction */ - stream_xid = logicalrep_read_stream_start(s, &first_segment); - if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1221,6 +1272,7 @@ apply_handle_stream_abort(StringInfo s) errmsg_internal("STREAM ABORT message without STREAM STOP"))); logicalrep_read_stream_abort(s, &xid, &subxid); + maybe_start_skipping_changes(xid); /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so @@ -1314,6 +1366,10 @@ apply_handle_stream_abort(StringInfo s) CommitTransactionCommand(); } + /* Stop the skipping transaction if enabled */ + if (is_skipping_changes()) + stop_skipping_changes(InvalidXLogRecPtr, 0); + reset_apply_error_context_info(); } @@ -1463,9 +1519,23 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "received commit for streamed transaction %u", xid); + /* Enable skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(xid); + apply_spooled_messages(xid, commit_data.commit_lsn); - apply_handle_commit_internal(&commit_data); + if (is_skipping_changes()) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + { + /* commit the streamed transaction */ + apply_handle_commit_internal(&commit_data); + } /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); @@ -2351,6 +2421,17 @@ apply_dispatch(StringInfo s) LogicalRepMsgType action = pq_getmsgbyte(s); LogicalRepMsgType saved_command; + /* + * Skip all data-modification changes if we're skipping changes of this + * transaction. + */ + if (is_skipping_changes() && + (action == LOGICAL_REP_MSG_INSERT || + action == LOGICAL_REP_MSG_UPDATE || + action == LOGICAL_REP_MSG_DELETE || + action == LOGICAL_REP_MSG_TRUNCATE)) + return; + /* * Set the current command being applied. Since this function can be * called recusively when applying spooled changes, save the current @@ -3791,3 +3872,103 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, 0); } + +/* + * Start skipping changes of the transaction if the given XID matches the + * transaction ID specified by skip_xid option. + */ +static void +maybe_start_skipping_changes(TransactionId xid) +{ + Assert(!is_skipping_changes()); + Assert(!TransactionIdIsValid(skipping_xid)); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + if (!TransactionIdIsValid(MySubscription->skipxid) || + MySubscription->skipxid != xid) + return; + + skipping_xid = xid; + ereport(LOG, + errmsg("start skipping logical replication transaction with xid %u", + skipping_xid)); +} + +/* + * Stop skipping changes and reset the skip XID. Also, reset the skip XID + * (pg_subscription.subskipxid). If origin_lsn and origin_committs are valid, we + * set origin state to the transaction commit that resets the skip XID so that we + * can start streaming from the transaction next to the one that we just skipped. + */ +static void +stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(is_skipping_changes()); + + ereport(LOG, + (errmsg("done skipping logical replication transaction with xid %u", + skipping_xid))); + + /* Stop skipping changes */ + skipping_xid = InvalidTransactionId; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + if (!IsTransactionState()) + StartTransactionCommand(); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + TransactionIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + /* Set subskipxid to null */ + nulls[Anum_pg_subscription_subskipxid - 1] = true; + replaces[Anum_pg_subscription_subskipxid - 1] = true; + + /* Update the system catalog to reset the skip XID */ + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + if (!XLogRecPtrIsInvalid(origin_lsn)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = origin_lsn; + replorigin_session_origin_timestamp = origin_committs; + } + + CommitTransactionCommand(); + pgstat_report_stat(false); + + /* + * Clear the error statistics of this subscription to let users know the + * subscription is no longer getting stuck by the conflict. + * + * The message for clearing the error statistics can be lost but that's + * okay. The user can know the logical replication is working fine in + * other ways, for example, checking pg_stat_subscription view. And the + * user is able to reset the single subscription error statistics by + * pg_reset_subscription_error SQL function. + */ + pgstat_clear_subscription_error(MySubscription->oid, InvalidOid); +} diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 21061493ea..beaa6e646d 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW char subtwophasestate; /* Stream two-phase transactions */ + TransactionId subskipxid BKI_FORCE_NULL; /* All changes associated with + * this XID are skipped */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -103,6 +106,7 @@ typedef struct Subscription * binary format */ bool stream; /* Allow streaming in-progress transactions. */ char twophasestate; /* Allow streaming two-phase transactions */ + TransactionId skipxid; /* All changes of the XID are skipped */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3f55d63425..93bfef0e9c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3677,6 +3677,7 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + TransactionId skip_xid; /* XID to skip */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/include/pgstat.h b/src/include/pgstat.h index a6914a24e5..6775736b2b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -536,7 +536,7 @@ typedef struct PgStat_MsgReplSlot /* ---------- * PgStat_MsgSubscriptionErr Sent by the apply worker or the table sync worker to - * update/reset the error happening during logical + * update/reset/clear the error happening during logical * replication. * ---------- */ @@ -554,7 +554,9 @@ typedef struct PgStat_MsgSubscriptionErr Oid m_subid; Oid m_subrelid; - /* The reset message uses below field */ + /* The clear and reset messages use below fields */ + bool m_clear; /* clear all fields except for last_failure and + * last_errmsg */ bool m_reset; /* Reset all fields and set reset_stats * timestamp */ @@ -1111,6 +1113,7 @@ extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type t extern void pgstat_reset_slru_counter(const char *); extern void pgstat_reset_replslot_counter(const char *name); extern void pgstat_reset_subscription_error(Oid subid, Oid subrelid); +extern void pgstat_clear_subscription_error(Oid subid, Oid subrelid); extern void pgstat_report_autovac(Oid dboid); extern void pgstat_report_vacuum(Oid tableoid, bool shared, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b87f67fe55..217b5fabd1 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -296,6 +296,19 @@ ERROR: unrecognized subscription parameter: "enabled" -- fail - RESET must not include values ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); ERROR: RESET must not include values for parameters +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = '4294967295'); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1.1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); +ERROR: invalid transaction id \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index aa90560691..4c9d25f0a4 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -231,6 +231,17 @@ ALTER SUBSCRIPTION regress_testsub RESET (enabled); -- fail - RESET must not include values ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = '4294967295'); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); + +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1.1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); + \dRs+ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/subscription/t/024_skip_xact.pl b/src/test/subscription/t/024_skip_xact.pl new file mode 100644 index 0000000000..affb663803 --- /dev/null +++ b/src/test/subscription/t/024_skip_xact.pl @@ -0,0 +1,244 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for skipping logical replication transactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 10; + +# Test if the error reported on pg_subscription_errors view is expected. +sub test_subscription_error +{ + my ($node, $source, $relname, $expected_error, $msg) = @_; + + # Wait for the error statistics to be updated. + $node->poll_query_until( + 'postgres', qq[ +SELECT count(1) > 0 FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]) or die "Timed out while waiting for statistics to be updated"; + + my $result = $node->safe_psql( + 'postgres', + qq[ +SELECT datname, subname, command, relid::regclass, failure_source, failure_count > 0 +FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]); + is($result, $expected_error, $msg); +} + +# Check the error reported on pg_stat_subscription view and skip the failed +# transaction. +sub test_skip_subscription_error +{ + my ($node, $source, $subname, $relname, $expected_error, $msg) = @_; + + # Check the reported error. + test_subscription_error($node, $source, $relname, $expected_error, $msg); + + # Get XID of the failed transaction. + my $skipxid = $node->safe_psql( + 'postgres', + "SELECT xid FROM pg_stat_subscription_errors WHERE relid = '$relname'::regclass"); + $node->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname SET (skip_xid = '$skipxid')"); + + # Restart the subscriber to restart logical replication without interval. + $node->restart; + + # Wait for the failed transaction to be skipped. + $node->poll_query_until( + 'postgres', + qq[ +SELECT subskipxid IS NULL FROM pg_subscription +WHERE subname = '$subname' +]) or die "Timed out while waiting for the transaction to be skipped"; + + # Also wait for the error details to be cleared. + $node->poll_query_until( + 'postgres', + qq[ +SELECT command IS NULL FROM pg_stat_subscription_errors +WHERE subname = '$subname' AND failure_source = '$source'; +]) or die "Timed out while waiting for the transaction to be skipped"; +} + +# Create publisher node. +my $node_publisher = PostgresNode->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq[ +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +]); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = PostgresNode->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); + +# don't overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + qq[ +max_prepared_transactions = 10 +wal_retrieve_retry_interval = 5s +]); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we create +# the same tables but with primary keys. Also, insert some data that will conflict +# with the data replicated from publisher later. +$node_publisher->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int); +CREATE TABLE test_tab2 (a int); +CREATE TABLE test_tab_streaming (a int, b text); +INSERT INTO test_tab1 VALUES (1); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +CREATE TABLE test_tab2 (a int primary key); +CREATE TABLE test_tab_streaming (a int primary key, b text); +INSERT INTO test_tab2 VALUES (1); +INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text); +COMMIT; +]); + +# Check if there is no subscription errors before starting logical replication. +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, qq(0), 'check no subscription error'); + +# Setup publications. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + q[ +CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2; +CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming; +]); + +# Create subscriptions. The table sync for test_tab2 on tap_sub will enter to +# infinite error due to violating the unique constraint. +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (two_phase = on);"); +my $appname_streaming = 'tap_sub_streaming'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on, two_phase = on);"); + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Wait for initial table sync for test_tab1 and test_tab_streaming to finish. +$node_subscriber->poll_query_until('postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription_rel +WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate = 'r' +]) or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM test_tab1"); +is($result, q(1), 'check initial data was copied to subscriber'); + +# Insert more data to test_tab1, raising an error on the subscriber due to violating +# the unique constraint on test_tab1. Then skip the transaction in question. +$node_publisher->safe_psql('postgres', + qq[ +BEGIN; +INSERT INTO test_tab1 VALUES (1); +COMMIT; +]); +test_skip_subscription_error($node_subscriber, + 'apply', 'tap_sub', 'test_tab1', + qq(postgres|tap_sub|INSERT|test_tab1|apply|t), + 'skip the error reported by the apply worker'); + +# Check the table sync worker's error in the view. +test_subscription_error($node_subscriber, + 'tablesync', 'test_tab2', + qq(postgres|tap_sub||test_tab2|tablesync|t), + 'skip the error reported by the table sync worker'); + +# Insert enough rows to test_tab_streaming to exceed the 64kB limit, also raising an +# error on the subscriber during applying spooled changes for the same reason. Then +# skip the transactio in question. +$node_publisher->safe_psql('postgres', + qq[ +BEGIN; +INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i); +COMMIT; +]); +test_skip_subscription_error($node_subscriber, + 'apply', 'tap_sub_streaming', 'test_tab_streaming', + qq(postgres|tap_sub_streaming|INSERT|test_tab_streaming|apply|t), + 'skip the error reported by the table sync worker during during applying streaming changes'); + +# Insert data to test_tab1 and test_tab_streaming that don't conflict. +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab1 VALUES (2)"); +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab_streaming VALUES (10001, md5(10001::text))"); + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Check the data is successfully replicated after skipping the transactions. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_tab1"); +is($result, q(1 +2), "subscription gets changes after skipped transaction"); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM test_tab_streaming"); +is($result, q(2), "subscription gets changes after skipped streamed transaction"); + +# Tests for skipping the transactions that are prepared and stream_prepared. We insert +# the same data as the previous tests but prepare the transactions. Those insertions +# raise an error on the subscriptions. Then we skip the transactions in question. +$node_publisher->safe_psql('postgres', + qq[ +BEGIN; +INSERT INTO test_tab1 VALUES (1); +PREPARE TRANSACTION 'skip_sub1'; +COMMIT PREPARED 'skip_sub1'; +]); +test_skip_subscription_error($node_subscriber, + 'apply', 'tap_sub', 'test_tab1', + qq(postgres|tap_sub|INSERT|test_tab1|apply|t), + 'skip the error on changes of the prepared transaction'); + +$node_publisher->safe_psql('postgres', + qq[ +BEGIN; +INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i); +PREPARE TRANSACTION 'skip_sub2'; +COMMIT PREPARED 'skip_sub2'; +]); +test_skip_subscription_error($node_subscriber, + 'apply', 'tap_sub_streaming', 'test_tab_streaming', + qq(postgres|tap_sub_streaming|INSERT|test_tab_streaming|apply|t), + 'skip the error on changes of the prepared-streamed transaction'); + +# Check if the view doesn't show any entries after dropping the subscriptions. +$node_subscriber->safe_psql( + 'postgres', + q[ +DROP SUBSCRIPTION tap_sub; +DROP SUBSCRIPTION tap_sub_streaming; +]); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, q(0), 'no error after dropping subscription'); -- 2.24.3 (Apple Git-128)