Re: [HACKERS] Get stuck when dropping a subscription duringsynchronizing table - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: [HACKERS] Get stuck when dropping a subscription duringsynchronizing table
Date
Msg-id 20170515.200219.10007602.horiguchi.kyotaro@lab.ntt.co.jp
Whole thread Raw
In response to Re: [HACKERS] Get stuck when dropping a subscription duringsynchronizing table  (Masahiko Sawada <sawada.mshk@gmail.com>)
Responses Re: [HACKERS] Get stuck when dropping a subscription duringsynchronizing table  (Masahiko Sawada <sawada.mshk@gmail.com>)
List pgsql-hackers
Hello,

At Fri, 12 May 2017 17:24:07 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in
<CAD21AoDJihMvdiZv7d_bpMPUK1G379WfxWpeanmJVn1KvEGy0Q@mail.gmail.com>
> On Fri, May 12, 2017 at 11:24 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> > On Thu, May 11, 2017 at 6:16 PM, Petr Jelinek
> > <petr.jelinek@2ndquadrant.com> wrote:
> >> On 11/05/17 10:10, Masahiko Sawada wrote:
> >>> On Thu, May 11, 2017 at 4:06 PM, Michael Paquier
> >>> <michael.paquier@gmail.com> wrote:
> >>>> On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >>>>> Barring any objections, I'll add these two issues to open item.
> >>>>
> >>>> It seems to me that those open items have not been added yet to the
> >>>> list. If I am following correctly, they could be defined as follows:
> >>>> - Dropping subscription may stuck if done during tablesync.
> >>>> -- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.
> >>
> >> I think the solution to this is to reintroduce the LWLock that was
> >> removed and replaced with the exclusive lock on catalog [1]. I am afraid
> >> that correct way of handling this is to do both LWLock and catalog lock
> >> (first LWLock under which we kill the workers and then catalog lock so
> >> that something that prevents launcher from restarting them is held till
> >> the end of transaction).
> >
> > I agree to reintroduce LWLock and to stop logical rep worker first and
> > then modify catalog. That way we can reduce catalog lock level (maybe
> > to RowExclusiveLock) so that apply worker can see it. Also I think
> > that we need to do more things like in order to prevent that we keep
> > to hold LWLock until end of transaction, because holding LWLock until
> > end of transaction is not good idea and could be cause of deadlock. So
> > for example we can commit the transaction in DropSubscription after
> > cleaned pg_subscription record and all its dependencies and then start
> > new transaction for the remaining work. Of course we also need to
> > disallow DROP SUBSCRIPTION being executed in a user transaction
> > though.
> 
> Attached two draft patches to solve these issues.
> 
> Attached 0001 patch reintroduces LogicalRepLauncherLock and makes DROP
> SUBSCRIPTION keep holding it until commit. To prevent from deadlock
> possibility, I disallowed DROP SUBSCRIPTION being called in a
> transaction block. But there might be more sensible solution for this.
> please give me feedback.

+     * Protect against launcher restarting the worker. This lock will
+     * be released at commit.

This is wrong. COMMIT doesn't release left-over LWLocks, only
ABORT does (precisely, it seems intended to fire on ERRORs). So
with this patch, the second DROP SUBSCRIPTION is stuck on the
LWLock acquired at the first time. And as Petr said, LWLock with
such a duration seems bad.

The cause seems to be that workers ignore sigterm on certain
conditions. One of the choke points is GetSubscription, the other
is get_subscription_list. I think we can treat the both cases
without LWLocks.

The attached patch does that.

- heap_close + UnlockRelationOid in get_subscription_list() is equivalent to one heap_close or relation_close but I
tookseeming symmetricity.
 

- 0.5 seconds for the sleep in ApplyWorkerMain is quite arbitrary. NAPTIME_PER_CYCLE * 1000 could be used instead.

- NULL MySubscription without SIGTERM might not need to be an ERROR.

Any more thoughts?


FYI, I reproduced the situation by the following steps. This
effectively reproduced the situation without delay insertion for
me.

# Creating 5 tables with 100000 rows on the publisher
create table t1 (a int);
...
create table t5 (a int);
insert into t1 (select * from generate_series(0, 99999) a);
...
insert into t5 (select * from generate_series(0, 99999) a);
create publication p1 for table t1, t2, t3, t4, t5;


# Subscribe them, wait 1sec, then unsbscribe.
create table t1 (a int);
...
create table t5 (a int);
truncate t1, t2, t3, t4, t5; create subscription s1 CONNECTION 'host=/tmp port=5432 dbname=postgres' publication p1;
selectpg_sleep(1); drop subscription s1;
 

Repeated test can be performed by repeatedly enter the last line.

> >>>> -- Avoid orphaned tablesync worker if apply worker exits before
> >>>> changing its status.
> >>>
> >>
> >> The behavior question I have about this is if sync workers should die
> >> when apply worker dies (ie they are tied to apply worker) or if they
> >> should be tied to the subscription.
> >>
> >> I guess taking down all the sync workers when apply worker has exited is
> >> easier to solve. Of course it means that if apply worker restarts in
> >> middle of table synchronization, the table synchronization will have to
> >> start from scratch. That being said, in normal operation apply worker
> >> should only exit/restart if subscription has changed or has been
> >> dropped/disabled and I think sync workers want to exit/restart in that
> >> situation as well.
> >
> > I agree that sync workers are tied to the apply worker.
> >
> >>
> >> So for example having shmem detach hook for an apply worker (or reusing
> >> the existing one) that searches for all the other workers for same
> >> subscription and shuts them down as well sounds like solution to this.
> >
> > Seems reasonable solution.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/src/backend/replication/logical/launcher.c
--- b/src/backend/replication/logical/launcher.c
***************
*** 42,47 ****
--- 42,48 ---- #include "replication/worker_internal.h"  #include "storage/ipc.h"
+ #include "storage/lmgr.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/procsignal.h"
***************
*** 116,122 **** get_subscription_list(void)     StartTransactionCommand();     (void) GetTransactionSnapshot(); 
!     rel = heap_open(SubscriptionRelationId, AccessShareLock);     scan = heap_beginscan_catalog(rel, 0, NULL);
while(HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
 
--- 117,131 ----     StartTransactionCommand();     (void) GetTransactionSnapshot(); 
!     /*
!      * This lock cannot be aquired while subsciption commands are updating the
!      * relation. We can safely skip over for the case.
!      */
!     if (!ConditionalLockRelationOid(SubscriptionRelationId, AccessShareLock))
!         return NIL;
! 
!     rel = heap_open(SubscriptionRelationId, NoLock);
!      scan = heap_beginscan_catalog(rel, 0, NULL);      while (HeapTupleIsValid(tup = heap_getnext(scan,
ForwardScanDirection)))
***************
*** 146,152 **** get_subscription_list(void)     }      heap_endscan(scan);
!     heap_close(rel, AccessShareLock);      CommitTransactionCommand(); 
--- 155,162 ----     }      heap_endscan(scan);
!     heap_close(rel, NoLock);
!     UnlockRelationOid(SubscriptionRelationId, AccessShareLock);      CommitTransactionCommand(); 
***************
*** 403,410 **** retry: }  /*  * Stop the logical replication worker and wait until it detaches from the
!  * slot.  */ void logicalrep_worker_stop(Oid subid, Oid relid)
--- 413,465 ---- }  /*
+  * Stop all table sync workers associated with given subid.
+  *
+  * This function is called by apply worker. Since table sync
+  * worker associated with same subscription is launched by
+  * only the apply worker. We don't need to acquire
+  * LogicalRepLauncherLock here.
+  */
+ void
+ logicalrep_sync_workers_stop(Oid subid)
+ {
+     List *relid_list = NIL;
+     ListCell *cell;
+     int    i;
+ 
+     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ 
+     /*
+      * Walks the workers array and get relid list that matches
+      * given subscription id.
+      */
+     for (i = 0; i < max_logical_replication_workers; i++)
+     {
+         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ 
+         if (w->in_use && w->subid == subid &&
+             OidIsValid(w->relid))
+             relid_list = lappend_oid(relid_list, w->relid);
+     }
+ 
+     LWLockRelease(LogicalRepWorkerLock);
+ 
+     /* Return if there is no table sync worker associated with myself */
+     if (relid_list == NIL)
+         return;
+ 
+     foreach (cell, relid_list)
+     {
+         Oid    relid = lfirst_oid(cell);
+ 
+         logicalrep_worker_stop(subid, relid);
+     }
+ }
+ 
+ /*  * Stop the logical replication worker and wait until it detaches from the
!  * slot. This function can be called by both logical replication launcher
!  * and apply worker to stop apply worker and table sync worker.  */ void logicalrep_worker_stop(Oid subid, Oid
relid)
***************
*** 570,575 **** logicalrep_worker_attach(int slot)
--- 625,634 ---- static void logicalrep_worker_detach(void) {
+     /* Stop all sync workers associated if apply worker */
+     if (!am_tablesync_worker())
+         logicalrep_sync_workers_stop(MyLogicalRepWorker->subid);
+      /* Block concurrent access. */     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); 
*** a/src/backend/replication/logical/worker.c
--- b/src/backend/replication/logical/worker.c
***************
*** 1455,1468 **** ApplyWorkerMain(Datum main_arg)     char           *myslotname;     WalRcvStreamOptions options; 
-     /* Attach to slot */
-     logicalrep_worker_attach(worker_slot);
-      /* Setup signal handling */     pqsignal(SIGHUP, logicalrep_worker_sighup);     pqsignal(SIGTERM,
logicalrep_worker_sigterm);    BackgroundWorkerUnblockSignals();      /* Initialise stats to a sanish value */
MyLogicalRepWorker->last_send_time= MyLogicalRepWorker->last_recv_time =         MyLogicalRepWorker->reply_time =
GetCurrentTimestamp();
--- 1455,1471 ----     char           *myslotname;     WalRcvStreamOptions options;      /* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);     pqsignal(SIGTERM, logicalrep_worker_sigterm);
BackgroundWorkerUnblockSignals();
 
+     /*
+      * Attach to slot. This should be after signal handling setup since
+      * signals may come as soon as attached.
+      */
+     logicalrep_worker_attach(worker_slot);
+      /* Initialise stats to a sanish value */     MyLogicalRepWorker->last_send_time =
MyLogicalRepWorker->last_recv_time=         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
 
***************
*** 1492,1498 **** ApplyWorkerMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);    StartTransactionCommand();     oldctx = MemoryContextSwitchTo(ApplyContext);
 
!     MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);     MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
 
--- 1495,1532 ----                                               ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();    oldctx = MemoryContextSwitchTo(ApplyContext);
 
! 
!     /*
!      * Wait for the catalog is available. The subscription for this worker
!      * might be already dropped.  We should receive SIGTERM in the case so
!      * obey it.
!      */
!     while (!ConditionalLockRelationOid(SubscriptionRelationId, AccessShareLock))
!     {
!         pg_usleep(500 * 1000L); /* 0.5s */
! 
!         /* We are apparently killed, exit silently. */
!         if (got_SIGTERM)
!             proc_exit(0);
!     }
! 
!     MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
!     UnlockRelationOid(SubscriptionRelationId, AccessShareLock);
! 
!     /* There's a race codition here. Check if MySubscription is valid. */
!     if (MySubscription == NULL)
!     {
!         /* If we got SIGTERM, we are explicitly killed */
!         if (got_SIGTERM)
!             proc_exit(0);
! 
!         /* Otherwise something uncertain happned */
!         ereport(ERROR,
!                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
!                  errmsg("subscription for this worker not found: %s",
!                         MySubscription->name)));
!     }
!      MySubscriptionValid = true;     MemoryContextSwitchTo(oldctx); 
*** a/src/include/replication/worker_internal.h
--- b/src/include/replication/worker_internal.h
***************
*** 78,83 **** extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
--- 78,84 ---- extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid
subid,Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
+ extern void logicalrep_sync_workers_stop(Oid subid);  extern int logicalrep_sync_worker_count(Oid subid);

pgsql-hackers by date:

Previous
From: Ashutosh Bapat
Date:
Subject: Re: [HACKERS] Patch to fix documentation about AFTER triggers
Next
From: Kuntal Ghosh
Date:
Subject: Re: [HACKERS] Server Crashes if try to provide slot_name='none' atthe time of creating subscription.