From cc69040e063a91acce0a7c7b9b4defe14500bb31 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 28 Jun 2021 13:18:58 +0900 Subject: [PATCH v1 1/3] Add ALTER SUBSCRIPTION SET SKIP TRANSACTION. --- src/backend/catalog/pg_subscription.c | 10 ++ src/backend/commands/subscriptioncmds.c | 21 +++ src/backend/parser/gram.y | 17 ++ src/backend/replication/logical/worker.c | 216 ++++++++++++++++++++--- src/include/catalog/pg_subscription.h | 4 + src/include/nodes/parsenodes.h | 5 +- 6 files changed, 243 insertions(+), 30 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 29fc4218cd..7b79de6351 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -103,6 +103,16 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); + /* Get skip XID */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subskipxid, + &isnull); + if (!isnull) + sub->skipxid = DatumGetTransactionId(datum); + else + sub->skipxid = InvalidTransactionId; + ReleaseSysCache(tup); return sub; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b862e59f1d..97fd56e371 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -429,6 +429,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); + nulls[Anum_pg_subscription_subskipxid - 1] = true; values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -1020,6 +1021,26 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) break; } + case ALTER_SUBSCRIPTION_SET_SKIP_XID: + { + if (sub->skipxid != stmt->skip_xid) + { + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(stmt->skip_xid); + replaces[Anum_pg_subscription_subskipxid - 1] = true; + update_tuple = true; + } + + break; + } + case ALTER_SUBSCRIPTION_RESET_SKIP_XID: + { + nulls[Anum_pg_subscription_subskipxid - 1] = true; + replaces[Anum_pg_subscription_subskipxid - 1] = true; + update_tuple = true; + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index eb24195438..ef9213570f 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9772,6 +9772,23 @@ AlterSubscriptionStmt: (Node *)makeInteger(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name SET SKIP TRANSACTION Iconst + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_SET_SKIP_XID; + n->subname = $3; + n->skip_xid = $7; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name RESET SKIP TRANSACTION + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_RESET_SKIP_XID; + n->subname = $3; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index bbb659dad0..b90a8df166 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -62,6 +62,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -181,6 +182,19 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * True if we're skipping changes of the specified transaction in + * MySubscription->skip_xid. Please note that We don’t skip receiving those + * changes. We decide whether or not to skip applying the changes when starting + * to apply. That is, for streamed transactions, we receive the streamed changes + * anyway and then cleanup streamed files when applying the stream-commit or + * stream-abort message. When stopping the skipping behavior, we reset the skip + * XID (subskipxid) in the pg_subscription and associate origin status to the + * transaction that resets the skip XID so that we can start streaming from the + * next transaction. + */ +static bool skipping_changes = false; + /* * Hash table for storing the streaming xid information along with shared file * set for streaming and subxact files. @@ -236,8 +250,7 @@ static void maybe_reread_subscription(void); /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); -static void apply_handle_commit_internal(StringInfo s, - LogicalRepCommitData *commit_data); +static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot); @@ -256,6 +269,13 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation); +static void apply_streamed_changes(TransactionId xid, + LogicalRepCommitData *commit_data); + +static bool start_skipping_changes(TransactionId xid); +static bool stop_skipping_changes(bool reset_xid, + LogicalRepCommitData *commit_data); + /* * Should this worker apply changes for given relation. @@ -771,7 +791,9 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; - in_remote_transaction = true; + /* Start skipping all changes of this transaction if necessary */ + if (!start_skipping_changes(begin_data.xid)) + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); } @@ -795,7 +817,12 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(s, &commit_data); + /* + * Stop the skipping transaction if enabled. Otherwise, commit the + * changes that are just applied. + */ + if (!stop_skipping_changes(true, &commit_data)) + apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -813,9 +840,10 @@ apply_handle_origin(StringInfo s) { /* * ORIGIN message can only come inside streaming transaction or inside - * remote transaction and before any actual writes. + * remote transaction and before any actual writes unless we're skipping + * changes of the transaction. */ - if (!in_streamed_transaction && + if (!in_streamed_transaction && !skipping_changes && (!in_remote_transaction || (IsTransactionState() && !am_tablesync_worker()))) ereport(ERROR, @@ -837,6 +865,9 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + /* * Start a transaction on stream start, this transaction will be committed * on the stream stop unless it is a tablesync worker in which case it @@ -849,9 +880,6 @@ apply_handle_stream_start(StringInfo s) /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; - /* extract XID of the top-level transaction */ - stream_xid = logicalrep_read_stream_start(s, &first_segment); - if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1016,6 +1044,9 @@ apply_handle_stream_abort(StringInfo s) end_replication_step(); CommitTransactionCommand(); } + + /* Stop the skipping transaction if enabled */ + stop_skipping_changes(true, NULL); } /* @@ -1025,14 +1056,7 @@ static void apply_handle_stream_commit(StringInfo s) { TransactionId xid; - StringInfoData s2; - int nchanges; - char path[MAXPGPATH]; - char *buffer = NULL; LogicalRepCommitData commit_data; - StreamXidHash *ent; - MemoryContext oldcxt; - BufFile *fd; if (in_streamed_transaction) ereport(ERROR, @@ -1041,8 +1065,40 @@ apply_handle_stream_commit(StringInfo s) xid = logicalrep_read_stream_commit(s, &commit_data); + remote_final_lsn = commit_data.commit_lsn; + elog(DEBUG1, "received commit for streamed transaction %u", xid); + /* + * Stop skipping transaction information if enabled. Otherwise, apply + * all streamed changes and commit the transaction. + */ + if (!stop_skipping_changes(true, &commit_data)) + apply_streamed_changes(xid, &commit_data); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Apply all streamed changes with the xid. + */ +static void +apply_streamed_changes(TransactionId xid, LogicalRepCommitData *commit_data) +{ + StringInfoData s2; + int nchanges; + char path[MAXPGPATH]; + char *buffer = NULL; + StreamXidHash *ent; + MemoryContext oldcxt; + BufFile *fd; + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1074,8 +1130,6 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; - /* * Make sure the handle apply_dispatch methods are aware we're in a remote * transaction. @@ -1153,22 +1207,15 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); - apply_handle_commit_internal(s, &commit_data); - - /* unlink the files with serialized changes and subxact info */ - stream_cleanup_files(MyLogicalRepWorker->subid, xid); - - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); - - pgstat_report_activity(STATE_IDLE, NULL); + /* commit the streamed transaction */ + apply_handle_commit_internal(commit_data); } /* * Helper function for apply_handle_commit and apply_handle_stream_commit. */ static void -apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data) +apply_handle_commit_internal(LogicalRepCommitData *commit_data) { if (IsTransactionState()) { @@ -2020,6 +2067,17 @@ apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + /* + * Skip all data-modification changes if we're skipping changes of this + * transaction. + */ + if (skipping_changes && + (action == LOGICAL_REP_MSG_INSERT || + action == LOGICAL_REP_MSG_UPDATE || + action == LOGICAL_REP_MSG_DELETE || + action == LOGICAL_REP_MSG_TRUNCATE)) + return; + switch (action) { case LOGICAL_REP_MSG_BEGIN: @@ -2309,7 +2367,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); - if (!in_remote_transaction && !in_streamed_transaction) + if (!in_remote_transaction && !in_streamed_transaction && + !skipping_changes) { /* * If we didn't get any transactions for a while there might be @@ -3254,3 +3313,102 @@ IsLogicalWorker(void) { return MyLogicalRepWorker != NULL; } + +/* + * Start skipping changes of the given transaction. Return true if we + * enabled the skipping behavior. + */ +static bool +start_skipping_changes(TransactionId xid) +{ + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + if (!TransactionIdIsValid(MySubscription->skipxid) || + MySubscription->skipxid != xid) + return false; + + skipping_changes = true; + ereport(LOG, + errmsg("start skipping logical replication transaction with xid %u", + xid)); + + return true; +} + +/* + * Stop skipping changes and reset the skip XID. Return true if we were + * skipping changes and have stopped it. + * + * reset_xid is true if the caller wants to reset the skip XID (subskipxid) + * after disabling the skipping behavior. Also, if *commit_data is non-NULL, + * we set origin state to the transaction commit that resets the skip XID so + * that we can start streaming from the transaction next to the one that we + * just skipped. + */ +static bool +stop_skipping_changes(bool reset_xid, LogicalRepCommitData *commit_data) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + if (!skipping_changes) + return false; + + Assert(TransactionIdIsValid(MySubscription->skipxid)); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + /* Stop skipping changes */ + skipping_changes = false; + + ereport(LOG, + errmsg("done skipping logical replication transaction with xid %u", + MySubscription->skipxid)); + + if (!reset_xid) + return true; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Update the system catalog to reset the skip XID */ + StartTransactionCommand(); + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + TransactionIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + /* Set subskipxid to null */ + nulls[Anum_pg_subscription_subskipxid - 1] = true; + replaces[Anum_pg_subscription_subskipxid - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + if (commit_data) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + } + + CommitTransactionCommand(); + + return true; +} diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0060ebfb40..a13b936891 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -57,6 +57,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + TransactionId subskipxid BKI_FORCE_NULL; /* All changes associated with + this XID are skipped */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -94,6 +97,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + TransactionId skipxid; /* All changes of the XID are skipped */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index def9651b34..49e5d3e1e5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3664,7 +3664,9 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, - ALTER_SUBSCRIPTION_ENABLED + ALTER_SUBSCRIPTION_ENABLED, + ALTER_SUBSCRIPTION_SET_SKIP_XID, + ALTER_SUBSCRIPTION_RESET_SKIP_XID, } AlterSubscriptionType; typedef struct AlterSubscriptionStmt @@ -3675,6 +3677,7 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + TransactionId skip_xid; /* XID to skip */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt -- 2.24.3 (Apple Git-128)