From 31a80bc3ebc7629390bedc0d1ef53c9f03677e1d Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 4 Mar 2022 15:45:40 +0900 Subject: [PATCH] fixup! Optionally disable subscriptions on error --- src/backend/catalog/pg_subscription.c | 39 +++++++ src/backend/replication/logical/worker.c | 142 +++++++++-------------- src/include/catalog/pg_subscription.h | 1 + 3 files changed, 95 insertions(+), 87 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d2beebcc9d..1fc95ad867 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -157,6 +157,45 @@ FreeSubscription(Subscription *sub) pfree(sub); } +/* + * Disable the given subscription. + */ +void +DisableSubscription(Oid subid) +{ + Relation rel; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + HeapTuple tup; + + /* Look up our subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for subscription %u", subid); + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set the subscription to disabled. */ + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false); + replaces[Anum_pg_subscription_subenabled - 1] = true; + + /* Update the catalog */ + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + heap_freetuple(tup); + + table_close(rel, NoLock); +} + /* * get_subscription_oid - given a subscription name, look up the OID * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 71b007719c..eba75bb48c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,7 +136,6 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" -#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -304,6 +303,8 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +static void worker_post_error_processing(void); + /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); @@ -2802,57 +2803,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received) walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); } -/* - * Disable the current subscription. - */ -static void -DisableSubscriptionOnError(void) -{ - Relation rel; - bool nulls[Natts_pg_subscription]; - bool replaces[Natts_pg_subscription]; - Datum values[Natts_pg_subscription]; - HeapTuple tup; - - /* Disable the subscription in a fresh transaction */ - StartTransactionCommand(); - - /* Look up our subscription in the catalog */ - rel = table_open(SubscriptionRelationId, RowExclusiveLock); - tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, - ObjectIdGetDatum(MySubscription->oid)); - - if (!HeapTupleIsValid(tup)) - elog(ERROR, "subscription \"%s\" does not exist", - MySubscription->name); - - LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, AccessExclusiveLock); - - /* Form a new tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - memset(replaces, false, sizeof(replaces)); - - /* Set the subscription to disabled. */ - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false); - replaces[Anum_pg_subscription_subenabled - 1] = true; - - /* Update the catalog */ - tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, - replaces); - CatalogTupleUpdate(rel, &tup->t_self, tup); - heap_freetuple(tup); - - table_close(rel, RowExclusiveLock); - - CommitTransactionCommand(); - - /* Notify the subscription has been disabled */ - ereport(LOG, - errmsg("logical replication subscription \"%s\" has been be disabled due to an error", - MySubscription->name)); -} - /* * Send a Standby Status Update message to server. * @@ -3443,25 +3393,16 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) PG_CATCH(); { /* - * First, ensure that we log the error message so that it won't be - * lost if some other internal error occurs in the following code. - * Then, abort the current transaction and send the stats message of - * the table synchronization failure in an idle state. + * Abort the current transaction with the error, do post-error + * processing. */ - HOLD_INTERRUPTS(); - EmitErrorReport(); - FlushErrorState(); - AbortOutOfAnyTransaction(); - RESUME_INTERRUPTS(); - pgstat_report_subscription_error(MySubscription->oid, false); - - if (MySubscription->disableonerr) - { - DisableSubscriptionOnError(); - proc_exit(0); - } + worker_post_error_processing(); - PG_RE_THROW(); + /* + * Similar to apply worker cases, the worker exits without re-throwing + * the error since it's already reported. + */ + proc_exit(1); } PG_END_TRY(); @@ -3484,26 +3425,19 @@ start_apply(XLogRecPtr origin_startpos) PG_CATCH(); { /* - * First, ensure that we log the error message so that it won't be - * lost if some other internal error occurs in the following code. - * Then, abort the current transaction and send the stats message of - * the apply failure in an idle state. + * Abort the current transaction with the error, do post-error + * processing and exits. */ - HOLD_INTERRUPTS(); - EmitErrorReport(); - FlushErrorState(); - AbortOutOfAnyTransaction(); - RESUME_INTERRUPTS(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); - - if (MySubscription->disableonerr) - { - DisableSubscriptionOnError(); - proc_exit(0); - } + worker_post_error_processing(); - PG_RE_THROW(); - } + /* + * The worker exits without re-throwing the error since it's already + * reported. So we don't get in further PG_CATCH() blocks, but it's + * okay since this PG_TRY()/PG_CATCH() block is the last one in the + * logical replication workers. + */ + proc_exit(1); +} PG_END_TRY(); } @@ -3726,6 +3660,40 @@ ApplyWorkerMain(Datum main_arg) proc_exit(0); } +/* + * Abort and cleanup the current transaction, then do post-error processing. + * This function must be called in a PG_CATCH() block. + */ +static void +worker_post_error_processing(void) +{ + /* Emit the error message, and recover from the error state to an idle state */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + FlushErrorState(); + AbortOutOfAnyTransaction(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + !am_tablesync_worker()); + + /* Disable the subscription if required */ + if (MySubscription->disableonerr) + { + StartTransactionCommand(); + DisableSubscription(MySubscription->oid); + CommitTransactionCommand(); + + /* Notify the subscription has been disabled */ + ereport(LOG, + errmsg("logical replication subscription \"%s\" has been be disabled due to an error", + MySubscription->name)); + } +} + /* * Is current process a logical replication worker? */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 90b1eb53aa..e2befaf351 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -117,6 +117,7 @@ typedef struct Subscription extern Subscription *GetSubscription(Oid subid, bool missing_ok); extern void FreeSubscription(Subscription *sub); +extern void DisableSubscription(Oid subid); extern Oid get_subscription_oid(const char *subname, bool missing_ok); extern char *get_subscription_name(Oid subid, bool missing_ok); -- 2.24.3 (Apple Git-128)