Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers

From Drouvot, Bertrand
Subject Re: Minimal logical decoding on standbys
Date
Msg-id a1975e6c-df78-a24f-232f-c6386d38130f@amazon.com
Whole thread Raw
In response to Re: Minimal logical decoding on standbys  (Andres Freund <andres@anarazel.de>)
List pgsql-hackers
Hi Andres,

On 4/6/21 8:02 PM, Andres Freund wrote:
> CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you
canconfirm the sender and know the content is safe.
 
>
>
>
> Hi,
>
> On 2021-04-06 14:30:29 +0200, Drouvot, Bertrand wrote:
>>  From 827295f74aff9c627ee722f541a6c7cc6d4133cf Mon Sep 17 00:00:00 2001
>> From: bdrouvotAWS <bdrouvot@amazon.com>
>> Date: Tue, 6 Apr 2021 11:59:23 +0000
>> Subject: [PATCH v15 1/5] Allow logical decoding on standby.
>>
>> Allow a logical slot to be created on standby. Restrict its usage
>> or its creation if wal_level on primary is less than logical.
>> During slot creation, it's restart_lsn is set to the last replayed
>> LSN. Effectively, a logical slot creation on standby waits for an
>> xl_running_xact record to arrive from primary. Conflicting slots
>> would be handled in next commits.
>>
>> Andres Freund and Amit Khandekar.
> I think more people have worked on this by now...
>
> Does this strike you as an accurate description?
>
> Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
> Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas
>
>> --- a/src/backend/replication/logical/logical.c
>> +++ b/src/backend/replication/logical/logical.c
>> @@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void)
>>                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>>                                 errmsg("logical decoding requires a database connection")));
>>
>> -     /* ----
>> -      * TODO: We got to change that someday soon...
>> -      *
>> -      * There's basically three things missing to allow this:
>> -      * 1) We need to be able to correctly and quickly identify the timeline a
>> -      *        LSN belongs to
>> -      * 2) We need to force hot_standby_feedback to be enabled at all times so
>> -      *        the primary cannot remove rows we need.
>> -      * 3) support dropping replication slots referring to a database, in
>> -      *        dbase_redo. There can't be any active ones due to HS recovery
>> -      *        conflicts, so that should be relatively easy.
>> -      * ----
>> -      */
>>        if (RecoveryInProgress())
>> -             ereport(ERROR,
>> -                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
>> -                              errmsg("logical decoding cannot be used while in recovery")));
> Maybe I am just missing something right now, and maybe I'm being a bit
> overly pedantic, but I don't immediately see how 0001 is correct without
> 0002 and 0003? I think it'd be better to first introduce the conflict
> information, then check for conflicts, and only after that allow
> decoding on standbys?
>
>
>> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
>> index 6f8810e149..6a21cba362 100644
>> --- a/src/backend/access/transam/xlog.c
>> +++ b/src/backend/access/transam/xlog.c
>> @@ -5080,6 +5080,17 @@ LocalProcessControlFile(bool reset)
>>        ReadControlFile();
>>   }
>>
>> +/*
>> + * Get the wal_level from the control file. For a standby, this value should be
>> + * considered as its active wal_level, because it may be different from what
>> + * was originally configured on standby.
>> + */
>> +WalLevel
>> +GetActiveWalLevel(void)
>> +{
>> +     return ControlFile->wal_level;
>> +}
>> +
> This strikes me as error-prone - there's nothing in the function name
> that this should mainly (only?) be used during recovery...
>
>
>> +             if (SlotIsPhysical(slot))
>> +                     restart_lsn = GetRedoRecPtr();
>> +             else if (RecoveryInProgress())
>> +             {
>> +                     restart_lsn = GetXLogReplayRecPtr(NULL);
>> +                     /*
>> +                      * Replay pointer may point one past the end of the record. If that
>> +                      * is a XLOG page boundary, it will not be a valid LSN for the
>> +                      * start of a record, so bump it up past the page header.
>> +                      */
>> +                     if (!XRecOffIsValid(restart_lsn))
>> +                     {
>> +                             if (restart_lsn % XLOG_BLCKSZ != 0)
>> +                                     elog(ERROR, "invalid replay pointer");
>> +
>> +                             /* For the first page of a segment file, it's a long header */
>> +                             if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0)
>> +                                     restart_lsn += SizeOfXLogLongPHD;
>> +                             else
>> +                                     restart_lsn += SizeOfXLogShortPHD;
>> +                     }
>> +             }
> This seems like a layering violation to me. I don't think stuff like
> this should be outside of xlog[reader].c, and definitely not in
> ReplicationSlotReserveWal().
>
> Relevant discussion (which totally escaped my mind):
> https://postgr.es/m/CAJ3gD9csOr0LoYoMK9NnfBk0RZmvHXcJAFWFd2EuL%3DNOfz7PVA%40mail.gmail.com
>
>
>> +             else
>> +                     restart_lsn = GetXLogInsertRecPtr();
>> +
>> +             SpinLockAcquire(&slot->mutex);
>> +             slot->data.restart_lsn = restart_lsn;
>> +             SpinLockRelease(&slot->mutex);
>> +
>>                if (!RecoveryInProgress() && SlotIsLogical(slot))
>>                {
>>                        XLogRecPtr      flushptr;
>>
>> -                     /* start at current insert position */
>> -                     restart_lsn = GetXLogInsertRecPtr();
>> -                     SpinLockAcquire(&slot->mutex);
>> -                     slot->data.restart_lsn = restart_lsn;
>> -                     SpinLockRelease(&slot->mutex);
>> -
>>                        /* make sure we have enough information to start */
>>                        flushptr = LogStandbySnapshot();
>>
>>                        /* and make sure it's fsynced to disk */
>>                        XLogFlush(flushptr);
>>                }
>> -             else
>> -             {
>> -                     restart_lsn = GetRedoRecPtr();
>> -                     SpinLockAcquire(&slot->mutex);
>> -                     slot->data.restart_lsn = restart_lsn;
>> -                     SpinLockRelease(&slot->mutex);
>> -             }
>>
>>                /* prevent WAL removal as fast as possible */
>>                ReplicationSlotsComputeRequiredLSN();
> I think I'd move the LogStandbySnapshot() piece out of the entire
> loop. There's no reason for logging multiple ones if we then just end up
> failing because of the XLogGetLastRemovedSegno() check.
>
>
>> diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
>> index 178d49710a..6c4c26c2fe 100644
>> --- a/src/include/access/heapam_xlog.h
>> +++ b/src/include/access/heapam_xlog.h
>> @@ -239,6 +239,7 @@ typedef struct xl_heap_update
>>    */
>>   typedef struct xl_heap_clean
>>   {
>> +     bool            onCatalogTable;
>>        TransactionId latestRemovedXid;
>>        uint16          nredirected;
>>        uint16          ndead;
>> @@ -254,6 +255,7 @@ typedef struct xl_heap_clean
>>    */
>>   typedef struct xl_heap_cleanup_info
>>   {
>> +     bool            onCatalogTable;
>>        RelFileNode node;
>>        TransactionId latestRemovedXid;
>>   } xl_heap_cleanup_info;
>> @@ -334,6 +336,7 @@ typedef struct xl_heap_freeze_tuple
>>    */
>>   typedef struct xl_heap_freeze_page
>>   {
>> +     bool            onCatalogTable;
>>        TransactionId cutoff_xid;
>>        uint16          ntuples;
>>   } xl_heap_freeze_page;
>> @@ -348,6 +351,7 @@ typedef struct xl_heap_freeze_page
>>    */
>>   typedef struct xl_heap_visible
>>   {
>> +     bool            onCatalogTable;
>>        TransactionId cutoff_xid;
>>        uint8           flags;
>>   } xl_heap_visible;
> Reminder to self: This needs a WAL version bump.
>
>> diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
>> index 9a3a03e520..3405070d63 100644
>> --- a/src/include/utils/rel.h
>> +++ b/src/include/utils/rel.h
>> @@ -16,6 +16,7 @@
>>
>>   #include "access/tupdesc.h"
>>   #include "access/xlog.h"
>> +#include "catalog/catalog.h"
>>   #include "catalog/pg_class.h"
>>   #include "catalog/pg_index.h"
>>   #include "catalog/pg_publication.h"
> Not clear why this is in this patch?
>
>
>
>> diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
>> index 5ba776e789..03c5dbea48 100644
>> --- a/src/backend/postmaster/pgstat.c
>> +++ b/src/backend/postmaster/pgstat.c
>> @@ -2928,6 +2928,24 @@ pgstat_send_archiver(const char *xlog, bool failed)
>>        pgstat_send(&msg, sizeof(msg));
>>   }
>>
>> +/* ----------
>> + * pgstat_send_droplogicalslot() -
>> + *
>> + *   Tell the collector about a logical slot being dropped
>> + *   due to conflict.
>> + * ----------
>> + */
>> +void
>> +pgstat_send_droplogicalslot(Oid dbOid)
>> +{
>> +     PgStat_MsgRecoveryConflict msg;
>> +
>> +     pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT);
>> +     msg.m_databaseid = dbOid;
>> +     msg.m_reason = PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT;
>> +     pgstat_send(&msg, sizeof(msg));
>> +}
> Why do we have this in adition to pgstat_report_replslot_drop()? ISTM
> that we should instead add a reason parameter to
> pgstat_report_replslot_drop()?
>
>
>> +/*
>> + * Resolve recovery conflicts with logical slots.
>> + *
>> + * When xid is valid, it means that rows older than xid might have been
>> + * removed.
> I don't think the past tense is correct - the rows better not be removed
> yet on the standby, otherwise we'd potentially do something random in
> decoding.
>
>
>> diff --git a/src/test/recovery/t/024_standby_logical_decoding_xmins.pl
b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl
>> new file mode 100644
>> index 0000000000..d654d79526
>> --- /dev/null
>> +++ b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl
>> @@ -0,0 +1,272 @@
>> +# logical decoding on a standby : ensure xmins are appropriately updated
>> +
>> +use strict;
>> +use warnings;
>> +
>> +use PostgresNode;
>> +use TestLib;
>> +use Test::More tests => 23;
>> +use RecursiveCopy;
>> +use File::Copy;
>> +use Time::HiRes qw(usleep);
> Several of these don't actually seem to be used?
>
>
>> +########################
>> +# Initialize master node
>> +########################
> (I'll rename these to primary/replica)
>
>
>> +$node_master->init(allows_streaming => 1, has_archiving => 1);
>> +$node_master->append_conf('postgresql.conf', q{
>> +wal_level = 'logical'
>> +max_replication_slots = 4
>> +max_wal_senders = 4
>> +log_min_messages = 'debug2'
>> +log_error_verbosity = verbose
>> +# very promptly terminate conflicting backends
>> +max_standby_streaming_delay = '2s'
>> +});
> Why is this done on the primary, rather than on the standby?
>
>
>> +################################
>> +# Catalog xmins should advance after standby logical slot fetches the changes.
>> +################################
>> +
>> +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
>> +# we hold down xmin.
> I don't know what that means.
>
>
>> +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]);
>> +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
>> +for my $i (0 .. 2000)
>> +{
>> +    $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
>> +}
> Forking 2000 psql processes is pretty expensive, especially on slower
> machines. What is this supposed to test?
>
>
>> +($ret, $stdout, $stderr) = $node_standby->psql('postgres',
>> +     qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0',
'skip-empty-xacts','1', 'include-timestamp', '0')]);
 
>> +is($ret, 0, 'replay of big series succeeded');
>> +isnt($stdout, '', 'replayed some rows');
> Nothing is being replayed...
>
>
>
>> +######################
>> +# Upstream oldestXid should not go past downstream catalog_xmin
>> +######################
>> +
>> +# First burn some xids on the master in another DB, so we push the master's
>> +# nextXid ahead.
>> +foreach my $i (1 .. 100)
>> +{
>> +     $node_master->safe_psql('postgres', 'SELECT txid_current()');
>> +}
>> +
>> +# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
>> +# past our needed xmin. The only way we have visibility into that is to force
>> +# a checkpoint.
>> +$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
>> +foreach my $dbname ('template1', 'postgres', 'postgres', 'template0')
>> +{
>> +     $node_master->safe_psql($dbname, 'VACUUM FREEZE');
>> +}
>> +$node_master->safe_psql('postgres', 'CHECKPOINT');
>> +IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
>> +     or die "pg_controldata failed with $?";
>> +my @checkpoint = split('\n', $stdout);
>> +my $oldestXid = '';
>> +foreach my $line (@checkpoint)
>> +{
>> +     if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
>> +     {
>> +             $oldestXid = $1;
>> +     }
>> +}
>> +die 'no oldestXID found in checkpoint' unless $oldestXid;
>> +
>> +cmp_ok($oldestXid, "<=", $node_standby->slot($standby_slotname)->{'catalog_xmin'},
>> +        'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
>> +
>> +$node_master->safe_psql('postgres',
>> +     "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
>> +
> I am thinking of removing this test. It doesn't seem to test anything
> really related to the issue at hand, and seems complicated (needing to
> update datallowcon, manually triggering checkpoints, parsing
> pg_controldata output).
>
>
>> +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
>> +# given boolean condition to be true to ensure we've reached a quiescent state
>> +sub wait_for_xmins
>> +{
>> +     my ($node, $slotname, $check_expr) = @_;
>> +
>> +     $node->poll_query_until(
>> +             'postgres', qq[
>> +             SELECT $check_expr
>> +             FROM pg_catalog.pg_replication_slots
>> +             WHERE slot_name = '$slotname';
>> +     ]) or die "Timed out waiting for slot xmins to advance";
>> +}
>> +
>> +# Verify that pg_stat_database_conflicts.confl_logicalslot has been updated
>> +sub check_confl_logicalslot
>> +{
>> +     ok( $node_standby->poll_query_until(
>> +             'postgres',
>> +             "select (confl_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
>> +             'confl_logicalslot updated') or die "Timed out waiting confl_logicalslot to be updated";
>> +}
>> +
> Given that this hardcodes a specific number of conflicting slots etc,
> there doesn't seem much point in making this a function...
>
>
>> +# Acquire one of the standby logical slots created by create_logical_slots()
>> +sub make_slot_active
>> +{
>> +     my $slot_user_handle;
>> +
>> +     # make sure activeslot is in use
>> +     print "starting pg_recvlogical\n";
>> +     $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node_standby->connstr('testdb'), '-S',
'activeslot','-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
 
>> +
>> +     while (!$node_standby->slot('activeslot')->{'active_pid'})
>> +     {
>> +             usleep(100_000);
>> +             print "waiting for slot to become active\n";
>> +     }
>> +     return $slot_user_handle;
>> +}
> It's a bad idea to not have timeouts in things like this - if there's a
> problem, it'll lead to the test never returning. Things like
> poll_query_until() have timeouts to deal with this, but this doesn't.
>
>
>> +# Check if all the slots on standby are dropped. These include the 'activeslot'
>> +# that was acquired by make_slot_active(), and the non-active 'dropslot'.
>> +sub check_slots_dropped
>> +{
>> +     my ($slot_user_handle) = @_;
>> +     my $return;
>> +
>> +     is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped');
>> +     is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
>> +
>> +     # our client should've terminated in response to the walsender error
>> +     eval {
>> +             $slot_user_handle->finish;
>> +     };
>> +     $return = $?;
>> +     cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n");
>> +     if ($return) {
>> +             like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict');
>> +             like($stderr, qr/must be dropped/, 'recvlogical error detail');
>> +     }
> Why do we need to use eval{} for things like checking if a program
> finished?
>
>
>> @@ -297,6 +297,24 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
>>        may consume changes from a slot at any given time.
>>       </para>
>>
>> +    <para>
>> +     A logical replication slot can also be created on a hot standby. To prevent
>> +     <command>VACUUM</command> from removing required rows from the system
>> +     catalogs, <varname>hot_standby_feedback</varname> should be set on the
>> +     standby. In spite of that, if any required rows get removed, the slot gets
>> +     dropped. Existing logical slots on standby also get dropped if wal_level
>> +     on primary is reduced to less than 'logical'.
>> +    </para>
> I think this should add that it's very advisable to use a physical slot
> between primary and standby. Otherwise hot_standby_feedback will work,
> but only while the connection is alive - as soon as it breaks, a node
> gets restarted, ...
>
> Greetings,
>
> Andres Freund

Thanks for your feedback!, I'll look at it.

But prior to that, I am sharing v16 (a rebase of v15 needed due to 
8523492d4e).

Bertrand


Attachment

pgsql-hackers by date:

Previous
From: Amit Langote
Date:
Subject: Re: Wired if-statement in gen_partprune_steps_internal
Next
From: vignesh C
Date:
Subject: Re: Replication slot stats misgivings