Re: Logical Replication - behavior of ALTER PUBLICATION .. DROP TABLE and ALTER SUBSCRIPTION .. REFRESH PUBLICATION - Mailing list pgsql-hackers

From Bharath Rupireddy
Subject Re: Logical Replication - behavior of ALTER PUBLICATION .. DROP TABLE and ALTER SUBSCRIPTION .. REFRESH PUBLICATION
Date
Msg-id CALj2ACXtSDu8sFwPVHzd89nTBo1tr2ZwQU6Asf_zDhqZ5oYE2g@mail.gmail.com
Whole thread Raw
In response to Re: Logical Replication - behavior of ALTER PUBLICATION .. DROP TABLE and ALTER SUBSCRIPTION .. REFRESH PUBLICATION  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: Logical Replication - behavior of ALTER PUBLICATION .. DROP TABLE and ALTER SUBSCRIPTION .. REFRESH PUBLICATION  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
On Tue, Jan 12, 2021 at 12:06 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > Here's my analysis:
> > 1) in the publisher, alter publication drop table successfully
> > removes(PublicationDropTables) the table from the catalogue
> > pg_publication_rel
> > 2) in the subscriber, alter subscription refresh publication
> > successfully removes the table from the catalogue pg_subscription_rel
> > (AlterSubscription_refresh->RemoveSubscriptionRel)
> > so far so good
> >
>
> Here, it should register the worker to stop on commit, and then on
> commit it should call AtEOXact_ApplyLauncher to stop the apply worker.
> Once the apply worker is stopped, the corresponding WALSender will
> also be stopped. Something here is not happening as per expected
> behavior.

On the subscriber, an entry for worker stop is created in AlterSubscription_refresh --> logicalrep_worker_stop_at_commit. At the end of txn, in AtEOXact_ApplyLauncher, we try to stop that worker, but it cannot be stopped because logicalrep_worker_find returns null (AtEOXact_ApplyLauncher --> logicalrep_worker_stop --> logicalrep_worker_find). The worker entry for that subscriber is having relid as 0 [1], due to which the following if condition will not be hit. The apply worker on the subscriber related to the subscription on which refresh publication was run is not closed. It looks like relid 0 is valid because it will be applicable only during the table sync phase, the comment in the LogicalRepWorker structure says that.

And also, I think, expecting the apply worker to be closed this way doesn't make sense because the apply worker is a per-subscription base, and the subscription can have other tables too.

    /* Search for attached worker for a given subscription id. */
    for (i = 0; i < max_logical_replication_workers; i++)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];

        if (w->in_use && w->subid == subid && w->relid == relid &&
            (!only_running || w->proc))
        {
            res = w;
            break;
        }
    }

[1]
(gdb) p subid
$5 = 16391
(gdb) p relid
$6 = 16388
(gdb) p *w
$4 = {launch_time = 663760343317760, in_use = true, generation = 1, proc = 0x7fdfd9a7cc90,
  dbid = 12872, userid = 10, subid = 16391, relid = 0, relstate = 0 '\000', relstate_lsn = 0,
  relmutex = 0 '\000', last_lsn = 22798424, last_send_time = 663760483945980,
  last_recv_time = 663760483946087, reply_lsn = 22798424, reply_time = 663760483945980}

postgres=# select * from pg_stat_get_subscription(16391);
 subid | relid |  pid   | received_lsn |        last_msg_send_time        |      last_msg_receipt_time       | latest_end_lsn |         latest_end_time          
-------+-------+--------+--------------+----------------------------------+----------------------------------+----------------+----------------------------------
 16391 |       | 466779 | 0/15BE140    | 2021-01-12 15:26:48.778813+05:30 | 2021-01-12 15:26:48.778878+05:30 | 0/15BE140      | 2021-01-12 15:26:48.778813+05:30
(1 row)

> > 3) after the insertion into the table in the publisher(remember that
> > it's dropped from the publication in (1)), the walsender process is
> > unable detect that the table has been dropped from the publication
> > i.e. it doesn't look at the pg_publication_rel catalogue or some
> > other, but it only does is_publishable_relation() check which returns
> > true in pgoutput_change(). Maybe the walsender should look at the
> > catalogue pg_publication_rel in is_publishable_relation()?
> >
>
> We must be somewhere checking pg_publication_rel before sending the
> decoded change because otherwise, we would have sent the changes for
> the table which are not even part of this publication. I think you can
> try to create a separate table that is not part of the publication
> under test and see how the changes for that are filtered.

As pointed out by japin in the next thread, the walsender process in the publisher uses RelationSyncCache to hold the relations to which the insertions need to be sent to the subscriber. The RelationSyncCache gets created during startup of the walsender process(pgoutput_startup->init_rel_sync_cache) and also rel_sync_cache_publication_cb callback gets registered. So, if any alters happen to the pg_publication_rel catalog table, the callback rel_sync_cache_publication_cb is called, all the entries in the RelationSyncCache are marked as invalid, with the expectation that on the next use of any cache entry in get_rel_sync_entry (pgoutput_change->get_rel_sync_entry), that entry is validated again.

In the use case, the invalidation happens as expected in rel_sync_cache_publication_cb and while revalidating the entry in get_rel_sync_entry, since there is only one publication to which the given relation is attached to, the pubids will be null and we don't set the entry->pubactions.pubinsert/update/delete/truncate to false, so the publisher keeps publishing the inserts to the relation.

/* Validate the entry */
if (!entry->replicate_valid)
{
List   *pubids = GetRelationPublications(relid);

But, we cannot right away set to entry->pubactions.pubinsert/update/delete/truncate to false, when there are no publications attached to the given relation, because in that case, we don't know whether the user has run the alter subscription...refresh publication; on the subscriber.

If we want to achieve the drop table behaviour what's stated in the document (i.e. after publisher drops a table from the publication, the subscriber keeps receiving the data until refresh publication is run on it), the right place to set those false i.e. tell the walsender process not to publish the insertions further, is if there is a way in the publisher we could know that the user has run the alter subscription...refresh publication on the subscriber. But I don't see currently, the subscriber informing back to the publisher after AlterSubscription_refresh().

Thoughts?

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com

pgsql-hackers by date:

Previous
From: Li Japin
Date:
Subject: Re: Logical Replication - behavior of ALTER PUBLICATION .. DROP TABLE and ALTER SUBSCRIPTION .. REFRESH PUBLICATION
Next
From: Bharath Rupireddy
Date:
Subject: Re: Logical Replication - behavior of ALTER PUBLICATION .. DROP TABLE and ALTER SUBSCRIPTION .. REFRESH PUBLICATION