From cba667cf3da39b8cfe9aebfce251e31ea59bf639 Mon Sep 17 00:00:00 2001 From: Julien Rouhaud Date: Fri, 14 Apr 2023 13:49:09 +0800 Subject: [PATCH v23 1/3] Always persist to disk logical slots during a shutdown checkpoint. It's entirely possible for a logical slot to have a confirmed_flush_lsn higher than the last value saved on disk while not being marked as dirty. It's currently not a problem to lose that value during a clean shutdown / restart cycle, but a later patch adding support for pg_upgrade of publications and logical slots will rely on that value being properly persisted to disk. Author: Julien Rouhaud Reviewed-by: Wang Wei, Peter Smith, Masahiko Sawada --- contrib/test_decoding/meson.build | 1 + contrib/test_decoding/t/002_always_persist.pl | 74 +++++++++++++++++++ src/backend/access/transam/xlog.c | 2 +- src/backend/replication/slot.c | 25 ++++--- src/include/replication/slot.h | 2 +- 5 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 contrib/test_decoding/t/002_always_persist.pl diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index 7b05cc25a3..12afb9ea8c 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -72,6 +72,7 @@ tests += { 'tap': { 'tests': [ 't/001_repl_stats.pl', + 't/002_always_persist.pl', ], }, } diff --git a/contrib/test_decoding/t/002_always_persist.pl b/contrib/test_decoding/t/002_always_persist.pl new file mode 100644 index 0000000000..cf78953eef --- /dev/null +++ b/contrib/test_decoding/t/002_always_persist.pl @@ -0,0 +1,74 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +# Test logical replication slots are always persist to disk during a shutdown +# checkpoint. + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test set-up +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(allows_streaming => 'logical'); +$node->append_conf('postgresql.conf', q{ +autovacuum = off +checkpoint_timeout = 1h +}); + +$node->start; + +# Create table +$node->safe_psql('postgres', "CREATE TABLE test (id int)"); + +# Create replication slot +$node->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');" +); + +# Insert some data +$node->safe_psql('postgres', + "INSERT INTO test VALUES (generate_series(1, 5));"); + +# Consume WAL records +$node->safe_psql('postgres', + "SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL);" +); + +# Shutdown the node once to do shutdown checkpoint +$node->stop(); + +# Fetch checkPoint from the control file itself +my ($stdout, $stderr) = run_command([ 'pg_controldata', $node->data_dir ]); +my @control_data = split("\n", $stdout); +my $latest_checkpoint = undef; +foreach (@control_data) +{ + if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg) + { + $latest_checkpoint = $1; + last; + } +} +die "No checkPoint in control file found\n" + unless defined($latest_checkpoint); + +# Boot the node again and check confirmed_flush_lsn. If the slot has persisted, +# the LSN becomes same as the latest checkpoint location, which means the +# SHUTDOWN_CHECKPOINT record. +$node->start(); +my $confirmed_flush = $node->safe_psql('postgres', + "SELECT confirmed_flush_lsn FROM pg_replication_slots;" +); + +# Compare confirmed_flush_lsn and checkPoint +ok($confirmed_flush eq $latest_checkpoint, + "Check confirmed_flush is same as latest checkpoint location"); + +# Shutdown +$node->stop; + +done_testing(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 60c0b7ec3a..6dced61cf4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7026,7 +7026,7 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { CheckPointRelationMap(); - CheckPointReplicationSlots(); + CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN); CheckPointSnapBuild(); CheckPointLogicalRewriteHeap(); CheckPointReplicationOrigin(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1dc27264f6..4d1e2d193e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -109,7 +109,8 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); -static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel, + bool is_shutdown); /* * Report shared-memory space needed by ReplicationSlotsShmemInit. @@ -783,7 +784,7 @@ ReplicationSlotSave(void) Assert(MyReplicationSlot != NULL); sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name)); - SaveSlotToPath(MyReplicationSlot, path, ERROR); + SaveSlotToPath(MyReplicationSlot, path, ERROR, false); } /* @@ -1565,11 +1566,10 @@ restart: /* * Flush all replication slots to disk. * - * This needn't actually be part of a checkpoint, but it's a convenient - * location. + * is_shutdown is true in case of a shutdown checkpoint. */ void -CheckPointReplicationSlots(void) +CheckPointReplicationSlots(bool is_shutdown) { int i; @@ -1594,7 +1594,7 @@ CheckPointReplicationSlots(void) /* save the slot to disk, locking is handled in SaveSlotToPath() */ sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); - SaveSlotToPath(s, path, LOG); + SaveSlotToPath(s, path, LOG, is_shutdown); } LWLockRelease(ReplicationSlotAllocationLock); } @@ -1700,7 +1700,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) /* Write the actual state file. */ slot->dirty = true; /* signal that we really need to write */ - SaveSlotToPath(slot, tmppath, ERROR); + SaveSlotToPath(slot, tmppath, ERROR, false); /* Rename the directory into place. */ if (rename(tmppath, path) != 0) @@ -1726,7 +1726,8 @@ CreateSlotOnDisk(ReplicationSlot *slot) * Shared functionality between saving and creating a replication slot. */ static void -SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) +SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel, + bool is_shutdown) { char tmppath[MAXPGPATH]; char path[MAXPGPATH]; @@ -1740,8 +1741,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) slot->just_dirtied = false; SpinLockRelease(&slot->mutex); - /* and don't do anything if there's nothing to write */ - if (!was_dirty) + /* + * Don't do anything if there's nothing to write, unless this is called for + * a logical slot during a shutdown checkpoint, as we want to persist the + * confirmed_flush LSN in that case, even if that's the only modification. + */ + if (!was_dirty && !(SlotIsLogical(slot) && is_shutdown)) return; LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..7ca37c9f70 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -241,7 +241,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); -extern void CheckPointReplicationSlots(void); +extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); -- 2.27.0