From 2b5af6a1e1a73b614057b8a6b9e1e1d822b7baa8 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Thu, 10 Mar 2016 10:50:59 +0800 Subject: [PATCH 7/7] Introduce TAP recovery tests for failover slots --- src/test/recovery/t/007_failover_slots.pl | 367 ++++++++++++++++++++++++++++++ 1 file changed, 367 insertions(+) create mode 100644 src/test/recovery/t/007_failover_slots.pl diff --git a/src/test/recovery/t/007_failover_slots.pl b/src/test/recovery/t/007_failover_slots.pl new file mode 100644 index 0000000..8524e20 --- /dev/null +++ b/src/test/recovery/t/007_failover_slots.pl @@ -0,0 +1,367 @@ +# +# Test failover slots +# +use strict; +use warnings; +use bigint; +use PostgresNode; +use TestLib; +use Test::More; +use RecursiveCopy; +use File::Copy; +use File::Basename qw(basename); +use List::Util qw(); +use Data::Dumper; + +use Carp 'verbose'; +$SIG{ __DIE__ } = sub { Carp::confess( @_ ) }; + +sub lsn_to_bigint +{ + my ($lsn) = @_; + my ($high, $low) = split("/",$lsn); + return hex($high) * 2**32 + hex($low); +} + +sub get_slot_info +{ + my ($node, $slot_name) = @_; + + my $esc_slot_name = $slot_name; + $esc_slot_name =~ s/'/''/g; + my @selectlist = ('slot_name', 'plugin', 'slot_type', 'database', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn', 'confirmed_flush_lsn'); + my $row = $node->safe_psql('postgres', "SELECT " . join(', ', @selectlist) . " FROM pg_replication_slots WHERE slot_name = '$esc_slot_name';", + extra_params => ['-z']); + chomp $row; + my @fields = split("\0", $row); + if (scalar @fields != scalar @selectlist) + { + die "Select-list '@selectlist' didn't match length of result-list '@fields'"; + } + my %slotinfo; + for (my $i = 0; $i < scalar @selectlist; $i++) + { + $slotinfo{$selectlist[$i]} = $fields[$i]; + } + return \%slotinfo; +} + +sub diag_slotinfo +{ + my ($info, $msg) = @_; + diag "slot " . $info->{slot_name} . ": " . Dumper($info); +} + +sub wait_for_catchup +{ + my ($node_master, $node_replica) = @_; + + my $master_lsn = $node_master->safe_psql('postgres', 'SELECT pg_current_xlog_insert_location()'); + diag "waiting for " . $node_replica->name . " to catch up to $master_lsn on " . $node_master->name; + my $ret = $node_replica->poll_query_until('postgres', + "SELECT pg_last_xlog_replay_location() >= '$master_lsn'::pg_lsn;"); + BAIL_OUT('replica failed to catch up') unless $ret; + my $replica_lsn = $node_replica->safe_psql('postgres', 'SELECT pg_last_xlog_replay_location()'); + diag "Replica is caught up to $replica_lsn, past required LSN $master_lsn"; +} + +sub read_slot_updates_from_xlog +{ + my ($node, $timeline) = @_; + my ($stdout, $stderr) = ('', ''); + # Look at master xlogs and examine sequence advances + my $wal_pattern = sprintf("%s/pg_xlog/%08X" . ("?" x 16), $node->data_dir, $timeline); + my @wal = glob $wal_pattern; + my $firstwal = List::Util::minstr(@wal); + my $lastwal = basename(List::Util::maxstr(@wal)); + diag "decoding xlog on " . $node->name . " from $firstwal to $lastwal"; + IPC::Run::run ['pg_xlogdump', $firstwal, $lastwal], '>', \$stdout, '2>', \$stderr; + like($stderr, qr/invalid record length at [0-9A-F]+\/[0-9A-F]+: wanted 24, got 0/, + 'pg_xlogdump exits with expected error'); + my @slots = grep(/ReplicationSlot/, split(/\n/, $stdout)); + + # Parse the dumped xlog data + my @slot_updates = (); + for my $slot (@slots) { + if (my @matches = ($slot =~ /lsn: ([[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8}), prev [[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8}, desc: UPDATE of slot (\w+) with restart ([[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8}) and xid ([[:digit:]]+) confirmed to ([[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8})/)) + { + my %slot_update = ( + action => 'UPDATE', + log_lsn => $1, slot_name => $2, restart_lsn => $3, + xid => $4, confirm_lsn => $5 + ); + diag "Replication slot create/advance: $slot_update{slot_name} advanced to $slot_update{confirm_lsn} with restart $slot_update{restart_lsn} and $slot_update{xid} in xlog entry $slot_update{log_lsn}"; + push @slot_updates, \%slot_update; + } + elsif ($slot =~ /DELETE/) + { + diag "Replication slot delete: $slot"; + } + else + { + die "Slot xlog entry didn't match pattern: $slot"; + } + } + return \@slot_updates; +} + +sub check_slot_wal_update +{ + my ($entry, $slotname, %params) = @_; + + ok(defined($entry), 'xlog entry exists for slot $slotname'); + SKIP: { + skip 'Expected xlog entry was undef' unless defined($entry); + my %entry = %{$entry}; undef $entry; + diag "Examining decoded slot update xlog entry: " . Dumper(\%entry); + is($entry{action}, 'UPDATE', "action is an update"); + is($entry{slot_name}, $slotname, "action affects slot " . $slotname); + + cmp_ok(lsn_to_bigint($entry{restart_lsn}), "le", + lsn_to_bigint($entry{log_lsn}), + "restart_lsn is no greater than LSN when logged"); + + cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "le", + lsn_to_bigint($entry{log_lsn}), + "confirm_lsn is no greater than LSN when logged"); + + cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "ge", + lsn_to_bigint($entry{restart_lsn}), + 'confirm_lsn equal to or ahead of restart_lsn'); + + cmp_ok(lsn_to_bigint($entry{restart_lsn}), "le", + lsn_to_bigint($params{expect_max_restart_lsn}), + 'restart_lsn is at or before expected') + if ($params{expect_max_restart_lsn}); + + cmp_ok(lsn_to_bigint($entry{restart_lsn}), "ge", + lsn_to_bigint($params{expect_min_restart_lsn}), + 'restart_lsn is at or after expected') + if ($params{expect_min_restart_lsn}); + + cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "le", + lsn_to_bigint($params{expect_max_confirm_lsn}), + 'confirm_lsn is at or before expected') + if ($params{expect_max_confirm_lsn}); + + cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "ge", + lsn_to_bigint($params{expect_min_confirm_lsn}), + 'confirm_lsn is at or after expected') + if ($params{expect_min_confirm_lsn}); + } +} + +sub test_read_from_slot +{ + my ($node, $slot, $expected) = @_; + my $slot_quoted = $slot; + $slot_quoted =~ s/'/''/g; + my ($ret, $stdout, $stderr) = $node->psql('postgres', + "SELECT data FROM pg_logical_slot_peek_changes('$slot_quoted', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');" + ); + is($ret, 0, "replaying from slot $slot is successful"); + is($stderr, '', "replay from slot $slot produces no stderr"); + if (defined($expected)) { + is($stdout, $expected, "slot $slot returned expected output"); + } + return $stderr; +} + +sub wait_for_end_of_recovery +{ + my ($node) = @_; + $node->poll_query_until('postgres', + "SELECT NOT pg_is_in_recovery();"); +} + +diag ""; + + + +my ($stdout, $stderr, $ret, $slotinfo); + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); +$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); +$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); +$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug3'\n"); +$node_master->dump_info; +$node_master->start; + +my $master_beforecreate_bb_lsn = $node_master->safe_psql('postgres', + "SELECT pg_current_xlog_insert_location()"); + +diag "master LSN is $master_beforecreate_bb_lsn before creation of bb_failover"; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('bb_failover', 'test_decoding', true);" +); +my $bb_beforeconsume_si = get_slot_info($node_master, 'bb_failover'); +diag_slotinfo $bb_beforeconsume_si, 'bb_beforeconsume'; + +$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('consumed');"); +($ret, $stdout, $stderr) = $node_master->psql('postgres', + "SELECT data FROM pg_logical_slot_get_changes('bb_failover', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"); +is($ret, 0, 'replaying from bb_failover on master is successful'); +is( $stdout, q(BEGIN +table public.decoding: INSERT: blah[text]:'consumed' +COMMIT), 'decoded expected data from slot bb_failover on master'); +is($stderr, '', 'replay from slot bb_failover produces no stderr'); + +my $bb_afterconsume_si = get_slot_info($node_master, 'bb_failover'); +diag_slotinfo $bb_afterconsume_si, 'bb_afterconsume'; + +($ret, $stdout, $stderr) = $node_master->psql('postgres', + "SELECT data FROM pg_logical_slot_get_changes('bb_failover', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"); +is ($ret, 0, 'no error reading empty slot changes after get'); +is ($stdout, '', 'no new changes to read from slot after get'); + +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('beforebb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my $backup_name = 'b1'; +$node_master->backup_fs_hot($backup_name); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->start; + +my $master_beforecreate_ab_lsn = $node_master->safe_psql('postgres', + "SELECT pg_current_xlog_insert_location()"); + +diag "master LSN is $master_beforecreate_ab_lsn before creation of ab_failover"; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('ab_failover', 'test_decoding', true);" +); + +my $ab_beforeconsume_si = get_slot_info($node_master, 'ab_failover'); +diag_slotinfo $ab_beforeconsume_si, 'ab_beforeconsume'; + +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('afterbb');"); + +wait_for_catchup($node_master, $node_replica); + +$stdout = $node_master->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is($stdout, "ab_failover\nbb_failover", 'Both failover slots exist on master'); + + +# Verify that only the before base_backup slot is on the replica +$stdout = $node_replica->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is($stdout, "ab_failover\nbb_failover", 'Both failover slots exist on replica') + or BAIL_OUT('Remaining tests meaningless'); + +# Boom, crash +$node_master->stop('fast'); + +my @slot_updates = @{ read_slot_updates_from_xlog($node_master, 1) }; + +# +# Decode the WAL from the master and make sure the expected entries and only the +# expected entries are present. +# +# We want to see two WAL entries, one for each slot. There won't be another entry +# for the slot advance because right now we don't write out WAL when a slot's confirmed +# location advances, only when the flush location or xmin advance. The restart lsn +# and confirmed flush LSN in the slot's WAL record must not be less than the LSN +# of the master before we created the slot and not greater than the position we saw +# in pg_replication_slots after slot creation. +# + +check_slot_wal_update($slot_updates[0], 'bb_failover', + expect_min_restart_lsn => $master_beforecreate_bb_lsn, + expect_min_confirm_lsn => $master_beforecreate_bb_lsn, + expect_max_restart_lsn => $bb_beforeconsume_si->{restart_lsn}, + expect_max_confirm_lsn => $bb_beforeconsume_si->{confirmed_flush_lsn}); + +check_slot_wal_update($slot_updates[1], 'ab_failover', + expect_min_restart_lsn => $master_beforecreate_ab_lsn, + expect_min_confirm_lsn => $master_beforecreate_ab_lsn, + expect_max_restart_lsn => $ab_beforeconsume_si->{restart_lsn}, + expect_max_confirm_lsn => $ab_beforeconsume_si->{confirmed_flush_lsn}); + +# Consuming from a slot does not cause the slot to be written out even on +# CHECKPOINT. Since nothing else would have dirtied the slot, there should +# be no more WAL entries for failover slots. +# +# The client is expected to keep track of the confirmed LSN and skip replaying +# data it's already seen. +ok(!defined($slot_updates[3]), 'Third xlog entry does not exist'); + +$node_replica->promote; + +wait_for_end_of_recovery($node_replica); + +$node_replica->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('after failover');"); + +my $bb_afterpromote_si = get_slot_info($node_replica, 'bb_failover'); +diag_slotinfo $bb_afterpromote_si, 'bb_afterpromote'; +# Because the confirmed LSN didn't get logged, the replica should have the slot +# at the position it was created at, not the position after we consumed data. +is($bb_afterpromote_si->{confirmed_flush_lsn}, $bb_beforeconsume_si->{confirmed_flush_lsn}, + 'slot bb_failover confirmed pos on replica has gone backwards'); +# the restart position won't have advanced either since we didn't log any new +# entries for it and we haven't done enough work to trigger a flush. +is($bb_afterpromote_si->{restart_lsn}, $bb_beforeconsume_si->{restart_lsn}, + 'slot bb_failover restart position is unchanged'); + +# Same for the after-basebackup slot. +my $ab_afterpromote_si = get_slot_info($node_replica, 'ab_failover'); +diag_slotinfo $ab_afterpromote_si, 'ab_afterpromote'; +is($ab_afterpromote_si->{confirmed_flush_lsn}, $ab_beforeconsume_si->{confirmed_flush_lsn}, + 'slot ab_failover confirmed pos on replica has gone backwards'); +is($ab_afterpromote_si->{restart_lsn}, $ab_beforeconsume_si->{restart_lsn}, + 'slot ab_failover restart position is unchanged'); + + + + +# Can replay from slot ab, following the timeline switch +test_read_from_slot($node_replica, 'ab_failover', q(BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT)); + +# Can replay from slot bb too +# +# Note that we expect to see data that we already replayed on the master here +# because the confirm lsn won't be flushed on the master and will go backwards. +# +# See http://www.postgresql.org/message-id/CAMsr+YGSaTRGqPcx9qx4eOcizWsa27XjKEiPSOtAJE8OfiXT-g@mail.gmail.com +# +# (If Pg is patched to flush all slots on shutdown then this will change, but +# it'll still be able to go backwards on an unclean shutdown). +# +test_read_from_slot($node_replica, 'bb_failover', q(BEGIN +table public.decoding: INSERT: blah[text]:'consumed' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'beforebb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT)); + +$node_replica->stop('fast'); + +# We don't need the standby anymore +$node_replica->teardown_node(); + +done_testing(); -- 2.1.0