Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers
From | Drouvot, Bertrand |
---|---|
Subject | Re: Minimal logical decoding on standbys |
Date | |
Msg-id | a1975e6c-df78-a24f-232f-c6386d38130f@amazon.com Whole thread Raw |
In response to | Re: Minimal logical decoding on standbys (Andres Freund <andres@anarazel.de>) |
List | pgsql-hackers |
Hi Andres, On 4/6/21 8:02 PM, Andres Freund wrote: > CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you canconfirm the sender and know the content is safe. > > > > Hi, > > On 2021-04-06 14:30:29 +0200, Drouvot, Bertrand wrote: >> From 827295f74aff9c627ee722f541a6c7cc6d4133cf Mon Sep 17 00:00:00 2001 >> From: bdrouvotAWS <bdrouvot@amazon.com> >> Date: Tue, 6 Apr 2021 11:59:23 +0000 >> Subject: [PATCH v15 1/5] Allow logical decoding on standby. >> >> Allow a logical slot to be created on standby. Restrict its usage >> or its creation if wal_level on primary is less than logical. >> During slot creation, it's restart_lsn is set to the last replayed >> LSN. Effectively, a logical slot creation on standby waits for an >> xl_running_xact record to arrive from primary. Conflicting slots >> would be handled in next commits. >> >> Andres Freund and Amit Khandekar. > I think more people have worked on this by now... > > Does this strike you as an accurate description? > > Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot > Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas > >> --- a/src/backend/replication/logical/logical.c >> +++ b/src/backend/replication/logical/logical.c >> @@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void) >> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), >> errmsg("logical decoding requires a database connection"))); >> >> - /* ---- >> - * TODO: We got to change that someday soon... >> - * >> - * There's basically three things missing to allow this: >> - * 1) We need to be able to correctly and quickly identify the timeline a >> - * LSN belongs to >> - * 2) We need to force hot_standby_feedback to be enabled at all times so >> - * the primary cannot remove rows we need. >> - * 3) support dropping replication slots referring to a database, in >> - * dbase_redo. There can't be any active ones due to HS recovery >> - * conflicts, so that should be relatively easy. >> - * ---- >> - */ >> if (RecoveryInProgress()) >> - ereport(ERROR, >> - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), >> - errmsg("logical decoding cannot be used while in recovery"))); > Maybe I am just missing something right now, and maybe I'm being a bit > overly pedantic, but I don't immediately see how 0001 is correct without > 0002 and 0003? I think it'd be better to first introduce the conflict > information, then check for conflicts, and only after that allow > decoding on standbys? > > >> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c >> index 6f8810e149..6a21cba362 100644 >> --- a/src/backend/access/transam/xlog.c >> +++ b/src/backend/access/transam/xlog.c >> @@ -5080,6 +5080,17 @@ LocalProcessControlFile(bool reset) >> ReadControlFile(); >> } >> >> +/* >> + * Get the wal_level from the control file. For a standby, this value should be >> + * considered as its active wal_level, because it may be different from what >> + * was originally configured on standby. >> + */ >> +WalLevel >> +GetActiveWalLevel(void) >> +{ >> + return ControlFile->wal_level; >> +} >> + > This strikes me as error-prone - there's nothing in the function name > that this should mainly (only?) be used during recovery... > > >> + if (SlotIsPhysical(slot)) >> + restart_lsn = GetRedoRecPtr(); >> + else if (RecoveryInProgress()) >> + { >> + restart_lsn = GetXLogReplayRecPtr(NULL); >> + /* >> + * Replay pointer may point one past the end of the record. If that >> + * is a XLOG page boundary, it will not be a valid LSN for the >> + * start of a record, so bump it up past the page header. >> + */ >> + if (!XRecOffIsValid(restart_lsn)) >> + { >> + if (restart_lsn % XLOG_BLCKSZ != 0) >> + elog(ERROR, "invalid replay pointer"); >> + >> + /* For the first page of a segment file, it's a long header */ >> + if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0) >> + restart_lsn += SizeOfXLogLongPHD; >> + else >> + restart_lsn += SizeOfXLogShortPHD; >> + } >> + } > This seems like a layering violation to me. I don't think stuff like > this should be outside of xlog[reader].c, and definitely not in > ReplicationSlotReserveWal(). > > Relevant discussion (which totally escaped my mind): > https://postgr.es/m/CAJ3gD9csOr0LoYoMK9NnfBk0RZmvHXcJAFWFd2EuL%3DNOfz7PVA%40mail.gmail.com > > >> + else >> + restart_lsn = GetXLogInsertRecPtr(); >> + >> + SpinLockAcquire(&slot->mutex); >> + slot->data.restart_lsn = restart_lsn; >> + SpinLockRelease(&slot->mutex); >> + >> if (!RecoveryInProgress() && SlotIsLogical(slot)) >> { >> XLogRecPtr flushptr; >> >> - /* start at current insert position */ >> - restart_lsn = GetXLogInsertRecPtr(); >> - SpinLockAcquire(&slot->mutex); >> - slot->data.restart_lsn = restart_lsn; >> - SpinLockRelease(&slot->mutex); >> - >> /* make sure we have enough information to start */ >> flushptr = LogStandbySnapshot(); >> >> /* and make sure it's fsynced to disk */ >> XLogFlush(flushptr); >> } >> - else >> - { >> - restart_lsn = GetRedoRecPtr(); >> - SpinLockAcquire(&slot->mutex); >> - slot->data.restart_lsn = restart_lsn; >> - SpinLockRelease(&slot->mutex); >> - } >> >> /* prevent WAL removal as fast as possible */ >> ReplicationSlotsComputeRequiredLSN(); > I think I'd move the LogStandbySnapshot() piece out of the entire > loop. There's no reason for logging multiple ones if we then just end up > failing because of the XLogGetLastRemovedSegno() check. > > >> diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h >> index 178d49710a..6c4c26c2fe 100644 >> --- a/src/include/access/heapam_xlog.h >> +++ b/src/include/access/heapam_xlog.h >> @@ -239,6 +239,7 @@ typedef struct xl_heap_update >> */ >> typedef struct xl_heap_clean >> { >> + bool onCatalogTable; >> TransactionId latestRemovedXid; >> uint16 nredirected; >> uint16 ndead; >> @@ -254,6 +255,7 @@ typedef struct xl_heap_clean >> */ >> typedef struct xl_heap_cleanup_info >> { >> + bool onCatalogTable; >> RelFileNode node; >> TransactionId latestRemovedXid; >> } xl_heap_cleanup_info; >> @@ -334,6 +336,7 @@ typedef struct xl_heap_freeze_tuple >> */ >> typedef struct xl_heap_freeze_page >> { >> + bool onCatalogTable; >> TransactionId cutoff_xid; >> uint16 ntuples; >> } xl_heap_freeze_page; >> @@ -348,6 +351,7 @@ typedef struct xl_heap_freeze_page >> */ >> typedef struct xl_heap_visible >> { >> + bool onCatalogTable; >> TransactionId cutoff_xid; >> uint8 flags; >> } xl_heap_visible; > Reminder to self: This needs a WAL version bump. > >> diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h >> index 9a3a03e520..3405070d63 100644 >> --- a/src/include/utils/rel.h >> +++ b/src/include/utils/rel.h >> @@ -16,6 +16,7 @@ >> >> #include "access/tupdesc.h" >> #include "access/xlog.h" >> +#include "catalog/catalog.h" >> #include "catalog/pg_class.h" >> #include "catalog/pg_index.h" >> #include "catalog/pg_publication.h" > Not clear why this is in this patch? > > > >> diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c >> index 5ba776e789..03c5dbea48 100644 >> --- a/src/backend/postmaster/pgstat.c >> +++ b/src/backend/postmaster/pgstat.c >> @@ -2928,6 +2928,24 @@ pgstat_send_archiver(const char *xlog, bool failed) >> pgstat_send(&msg, sizeof(msg)); >> } >> >> +/* ---------- >> + * pgstat_send_droplogicalslot() - >> + * >> + * Tell the collector about a logical slot being dropped >> + * due to conflict. >> + * ---------- >> + */ >> +void >> +pgstat_send_droplogicalslot(Oid dbOid) >> +{ >> + PgStat_MsgRecoveryConflict msg; >> + >> + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT); >> + msg.m_databaseid = dbOid; >> + msg.m_reason = PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT; >> + pgstat_send(&msg, sizeof(msg)); >> +} > Why do we have this in adition to pgstat_report_replslot_drop()? ISTM > that we should instead add a reason parameter to > pgstat_report_replslot_drop()? > > >> +/* >> + * Resolve recovery conflicts with logical slots. >> + * >> + * When xid is valid, it means that rows older than xid might have been >> + * removed. > I don't think the past tense is correct - the rows better not be removed > yet on the standby, otherwise we'd potentially do something random in > decoding. > > >> diff --git a/src/test/recovery/t/024_standby_logical_decoding_xmins.pl b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl >> new file mode 100644 >> index 0000000000..d654d79526 >> --- /dev/null >> +++ b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl >> @@ -0,0 +1,272 @@ >> +# logical decoding on a standby : ensure xmins are appropriately updated >> + >> +use strict; >> +use warnings; >> + >> +use PostgresNode; >> +use TestLib; >> +use Test::More tests => 23; >> +use RecursiveCopy; >> +use File::Copy; >> +use Time::HiRes qw(usleep); > Several of these don't actually seem to be used? > > >> +######################## >> +# Initialize master node >> +######################## > (I'll rename these to primary/replica) > > >> +$node_master->init(allows_streaming => 1, has_archiving => 1); >> +$node_master->append_conf('postgresql.conf', q{ >> +wal_level = 'logical' >> +max_replication_slots = 4 >> +max_wal_senders = 4 >> +log_min_messages = 'debug2' >> +log_error_verbosity = verbose >> +# very promptly terminate conflicting backends >> +max_standby_streaming_delay = '2s' >> +}); > Why is this done on the primary, rather than on the standby? > > >> +################################ >> +# Catalog xmins should advance after standby logical slot fetches the changes. >> +################################ >> + >> +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, >> +# we hold down xmin. > I don't know what that means. > > >> +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]); >> +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)'); >> +for my $i (0 .. 2000) >> +{ >> + $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); >> +} > Forking 2000 psql processes is pretty expensive, especially on slower > machines. What is this supposed to test? > > >> +($ret, $stdout, $stderr) = $node_standby->psql('postgres', >> + qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts','1', 'include-timestamp', '0')]); >> +is($ret, 0, 'replay of big series succeeded'); >> +isnt($stdout, '', 'replayed some rows'); > Nothing is being replayed... > > > >> +###################### >> +# Upstream oldestXid should not go past downstream catalog_xmin >> +###################### >> + >> +# First burn some xids on the master in another DB, so we push the master's >> +# nextXid ahead. >> +foreach my $i (1 .. 100) >> +{ >> + $node_master->safe_psql('postgres', 'SELECT txid_current()'); >> +} >> + >> +# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance >> +# past our needed xmin. The only way we have visibility into that is to force >> +# a checkpoint. >> +$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); >> +foreach my $dbname ('template1', 'postgres', 'postgres', 'template0') >> +{ >> + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); >> +} >> +$node_master->safe_psql('postgres', 'CHECKPOINT'); >> +IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) >> + or die "pg_controldata failed with $?"; >> +my @checkpoint = split('\n', $stdout); >> +my $oldestXid = ''; >> +foreach my $line (@checkpoint) >> +{ >> + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) >> + { >> + $oldestXid = $1; >> + } >> +} >> +die 'no oldestXID found in checkpoint' unless $oldestXid; >> + >> +cmp_ok($oldestXid, "<=", $node_standby->slot($standby_slotname)->{'catalog_xmin'}, >> + 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); >> + >> +$node_master->safe_psql('postgres', >> + "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); >> + > I am thinking of removing this test. It doesn't seem to test anything > really related to the issue at hand, and seems complicated (needing to > update datallowcon, manually triggering checkpoints, parsing > pg_controldata output). > > >> +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for >> +# given boolean condition to be true to ensure we've reached a quiescent state >> +sub wait_for_xmins >> +{ >> + my ($node, $slotname, $check_expr) = @_; >> + >> + $node->poll_query_until( >> + 'postgres', qq[ >> + SELECT $check_expr >> + FROM pg_catalog.pg_replication_slots >> + WHERE slot_name = '$slotname'; >> + ]) or die "Timed out waiting for slot xmins to advance"; >> +} >> + >> +# Verify that pg_stat_database_conflicts.confl_logicalslot has been updated >> +sub check_confl_logicalslot >> +{ >> + ok( $node_standby->poll_query_until( >> + 'postgres', >> + "select (confl_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'), >> + 'confl_logicalslot updated') or die "Timed out waiting confl_logicalslot to be updated"; >> +} >> + > Given that this hardcodes a specific number of conflicting slots etc, > there doesn't seem much point in making this a function... > > >> +# Acquire one of the standby logical slots created by create_logical_slots() >> +sub make_slot_active >> +{ >> + my $slot_user_handle; >> + >> + # make sure activeslot is in use >> + print "starting pg_recvlogical\n"; >> + $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node_standby->connstr('testdb'), '-S', 'activeslot','-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); >> + >> + while (!$node_standby->slot('activeslot')->{'active_pid'}) >> + { >> + usleep(100_000); >> + print "waiting for slot to become active\n"; >> + } >> + return $slot_user_handle; >> +} > It's a bad idea to not have timeouts in things like this - if there's a > problem, it'll lead to the test never returning. Things like > poll_query_until() have timeouts to deal with this, but this doesn't. > > >> +# Check if all the slots on standby are dropped. These include the 'activeslot' >> +# that was acquired by make_slot_active(), and the non-active 'dropslot'. >> +sub check_slots_dropped >> +{ >> + my ($slot_user_handle) = @_; >> + my $return; >> + >> + is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped'); >> + is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped'); >> + >> + # our client should've terminated in response to the walsender error >> + eval { >> + $slot_user_handle->finish; >> + }; >> + $return = $?; >> + cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n"); >> + if ($return) { >> + like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict'); >> + like($stderr, qr/must be dropped/, 'recvlogical error detail'); >> + } > Why do we need to use eval{} for things like checking if a program > finished? > > >> @@ -297,6 +297,24 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU >> may consume changes from a slot at any given time. >> </para> >> >> + <para> >> + A logical replication slot can also be created on a hot standby. To prevent >> + <command>VACUUM</command> from removing required rows from the system >> + catalogs, <varname>hot_standby_feedback</varname> should be set on the >> + standby. In spite of that, if any required rows get removed, the slot gets >> + dropped. Existing logical slots on standby also get dropped if wal_level >> + on primary is reduced to less than 'logical'. >> + </para> > I think this should add that it's very advisable to use a physical slot > between primary and standby. Otherwise hot_standby_feedback will work, > but only while the connection is alive - as soon as it breaks, a node > gets restarted, ... > > Greetings, > > Andres Freund Thanks for your feedback!, I'll look at it. But prior to that, I am sharing v16 (a rebase of v15 needed due to 8523492d4e). Bertrand
Attachment
pgsql-hackers by date: