From fa507e1aa7923bb46b907847c2d6555c78a2219c Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 11 Feb 2022 09:43:57 -0800 Subject: [PATCH v4 8/8] Move removal of spilled logical slot data to custodian. If there are many such files, startup can take much longer than necessary. To handle this, startup creates a new slot directory, copies the state file, and swaps the new directory with the old one. The custodian then asynchronously cleans up the old slot directory. --- src/backend/access/transam/xlog.c | 15 +- src/backend/postmaster/custodian.c | 14 + .../replication/logical/reorderbuffer.c | 292 +++++++++++++++++- src/backend/replication/slot.c | 4 + src/include/replication/reorderbuffer.h | 1 + 5 files changed, 317 insertions(+), 9 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 07aaee1c07..4d18798387 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7155,18 +7155,21 @@ StartupXLOG(void) checkPoint.newestCommitTsXid); XLogCtl->ckptFullXid = checkPoint.nextXid; - /* - * Initialize replication slots, before there's a chance to remove - * required resources. - */ - StartupReplicationSlots(); - /* * Startup logical state, needs to be setup now so we have proper data * during crash recovery. + * + * NB: This also performs some important cleanup that must be done prior to + * other replication slot steps (e.g., StartupReplicationSlots()). */ StartupReorderBuffer(); + /* + * Initialize replication slots, before there's a chance to remove + * required resources. + */ + StartupReplicationSlots(); + /* * Startup CLOG. This must be done after ShmemVariableCache->nextXid has * been initialized and before we accept connections or begin WAL replay. diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c index 9c5479b5cf..fdc614b1bd 100644 --- a/src/backend/postmaster/custodian.c +++ b/src/backend/postmaster/custodian.c @@ -41,6 +41,7 @@ #include "pgstat.h" #include "postmaster/custodian.h" #include "postmaster/interrupt.h" +#include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" @@ -209,6 +210,19 @@ CustodianMain(void) */ RemovePgTempFiles(false, false); + /* + * Remove any replication slot directories that have been staged for + * deletion. Since slot directories can accumulate many files, removing + * all of the files during startup (which we used to do) can take a very + * long time. To avoid delaying startup, we simply have startup rename + * the slot directories, and we clean them up here. + * + * Replication slot directories are not staged or cleaned in single-user + * mode, so we don't need any extra handling outside of the custodian + * process for this. + */ + RemoveStagedSlotDirectories(); + /* * Remove serialized snapshots that are no longer required by any * logical replication slot. diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c2d9be81fa..ab51e41229 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -126,15 +126,19 @@ #include "access/xlog_internal.h" #include "catalog/catalog.h" #include "commands/sequence.h" +#include "common/string.h" #include "lib/binaryheap.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/interrupt.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" +#include "storage/copydir.h" #include "storage/fd.h" +#include "storage/proc.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -297,12 +301,15 @@ static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); +static void ReorderBufferCleanup(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid); +static void StageSlotDirForRemoval(const char *slotname, const char *slotpath); +static void RemoveStagedSlotDirectory(const char *path); /* * --------------------------------------- @@ -4835,6 +4842,202 @@ ReorderBufferCleanupSerializedTXNs(const char *slotname) FreeDir(spill_dir); } +/* + * Cleanup everything in the logical slot directory except for the "state" file. + * This is specially written for StartupReorderBuffer(), which has special logic + * to handle crashes at inconvenient times. + * + * NB: If anything except for the "state" file cannot be removed after startup, + * this will need to be updated. + */ +static void +ReorderBufferCleanup(const char *slotname) +{ + char path[MAXPGPATH]; + char newpath[MAXPGPATH]; + char statepath[MAXPGPATH]; + char newstatepath[MAXPGPATH]; + struct stat statbuf; + + sprintf(path, "pg_replslot/%s", slotname); + sprintf(newpath, "pg_replslot/%s.new", slotname); + sprintf(statepath, "pg_replslot/%s/state", slotname); + sprintf(newstatepath, "pg_replslot/%s.new/state", slotname); + + /* we're only handling directories here, skip if it's not ours */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + return; + + /* + * Build our new slot directory, suffixed with ".new". The caller (likely + * StartupReorderBuffer()) should have already ensured that any pre-existing + * ".new" directories leftover after a crash have been cleaned up. + */ + if (MakePGDirectory(newpath) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", newpath))); + + copy_file(statepath, newstatepath); + + fsync_fname(newstatepath, false); + fsync_fname(newpath, true); + fsync_fname("pg_replslot", true); + + /* + * Move the slot directory aside for cleanup by the custodian. After this + * step, there will be no slot directory. StartupReorderBuffer() has + * special logic to make sure we don't lose the slot if we crash at this + * point. + */ + StageSlotDirForRemoval(slotname, path); + + /* + * Move our ".new" directory to become our new slot directory. + */ + if (rename(newpath, path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\": %m", newpath))); + + fsync_fname(path, true); + fsync_fname("pg_replslot", true); +} + +/* + * This function renames the given directory with a special suffix that the + * custodian will know to look for. An integer is appended to the end of the + * new directory name in case previously staged slot directories have not yet + * been removed. + */ +static void +StageSlotDirForRemoval(const char *slotname, const char *slotpath) +{ + char stage_path[MAXPGPATH]; + + /* + * Find a name for the stage directory. We just increment an integer at the + * end of the name until we find one that doesn't exist. + */ + for (int n = 0; n <= INT_MAX; n++) + { + DIR *dir; + + sprintf(stage_path, "pg_replslot/%s.to_remove_%d", slotname, n); + + dir = AllocateDir(stage_path); + if (dir == NULL) + { + if (errno == ENOENT) + break; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", + stage_path))); + } + FreeDir(dir); + + stage_path[0] = '\0'; + } + + /* + * In the unlikely event that we couldn't find a name for the stage + * directory, bail out. + */ + if (stage_path[0] == '\0') + ereport(ERROR, + (errmsg("could not stage \"%s\" for deletion", + slotpath))); + + /* + * Rename the slot directory. + */ + if (rename(slotpath, stage_path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\": %m", slotpath))); + + fsync_fname(stage_path, true); + fsync_fname("pg_replslot", true); +} + +/* + * Remove slot directories that have been staged for deletion by + * ReorderBufferCleanup(). + */ +void +RemoveStagedSlotDirectories(void) +{ + DIR *dir; + struct dirent *de; + + dir = AllocateDir("pg_replslot"); + while (!ShutdownRequestPending && + (de = ReadDir(dir, "pg_replslot")) != NULL) + { + struct stat st; + char path[MAXPGPATH]; + + if (strstr(de->d_name, ".to_remove") == NULL) + continue; + + sprintf(path, "pg_replslot/%s", de->d_name); + if (lstat(path, &st) != 0) + ereport(ERROR, + (errmsg("could not stat file \"%s\": %m", path))); + + if (!S_ISDIR(st.st_mode)) + continue; + + RemoveStagedSlotDirectory(path); + } + FreeDir(dir); +} + +/* + * Removes one slot directory that has been staged for deletion by + * ReorderBufferCleanup(). If a shutdown request is pending, exit as soon as + * possible. + */ +static void +RemoveStagedSlotDirectory(const char *path) +{ + DIR *dir; + struct dirent *de; + + dir = AllocateDir(path); + while (!ShutdownRequestPending && + (de = ReadDir(dir, path)) != NULL) + { + struct stat st; + char filepath[MAXPGPATH]; + + if (strcmp(de->d_name, ".") == 0 || + strcmp(de->d_name, "..") == 0) + continue; + + sprintf(filepath, "%s/%s", path, de->d_name); + + if (lstat(filepath, &st) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", filepath))); + else if (S_ISDIR(st.st_mode)) + RemoveStagedSlotDirectory(filepath); + else if (unlink(filepath) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", filepath))); + } + FreeDir(dir); + + if (rmdir(path) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove directory \"%s\": %m", path))); +} + /* * Given a replication slot, transaction ID and segment number, fill in the * corresponding spill file into 'path', which is a caller-owned buffer of size @@ -4863,6 +5066,83 @@ StartupReorderBuffer(void) DIR *logical_dir; struct dirent *logical_de; + /* + * First, handle any ".new" directories that were leftover after a crash. + * These are created and swapped with the actual replication slot + * directories so that cleanup of spilled data can be done asynchronously by + * the custodian. + */ + logical_dir = AllocateDir("pg_replslot"); + while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) + { + char name[NAMEDATALEN]; + char path[NAMEDATALEN + 12]; + struct stat statbuf; + + if (strcmp(logical_de->d_name, ".") == 0 || + strcmp(logical_de->d_name, "..") == 0) + continue; + + /* + * Make sure it's a valid ".new" directory. + */ + if (!pg_str_endswith(logical_de->d_name, ".new") || + strlen(logical_de->d_name) >= NAMEDATALEN + 4) + continue; + + strncpy(name, logical_de->d_name, sizeof(name)); + name[strlen(logical_de->d_name) - 4] = '\0'; + if (!ReplicationSlotValidateName(name, DEBUG2)) + continue; + + sprintf(path, "pg_replslot/%s", name); + if (lstat(path, &statbuf) == 0) + { + if (!S_ISDIR(statbuf.st_mode)) + continue; + + /* + * If the original directory still exists, just delete the ".new" + * directory. We'll try again when we call ReorderBufferCleanup() + * later on. + */ + if (!rmtree(path, true)) + ereport(ERROR, + (errmsg("could not remove directory \"%s\"", path))); + } + else if (errno == ENOENT) + { + char newpath[NAMEDATALEN + 16]; + + /* + * If the original directory is gone, we need to rename the ".new" + * directory to take its place. We know that the ".new" directory + * is ready to be the real deal if we previously made it far enough + * to delete the original directory. + */ + sprintf(newpath, "pg_replslot/%s", logical_de->d_name); + if (rename(newpath, path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + newpath, path))); + + fsync_fname(path, true); + } + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + + fsync_fname("pg_replslot", true); + } + FreeDir(logical_dir); + + /* + * Now we can proceed with deleting all spilled data. (This actually just + * moves the directories aside so that the custodian can clean it up + * asynchronously.) + */ logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { @@ -4875,12 +5155,18 @@ StartupReorderBuffer(void) continue; /* - * ok, has to be a surviving logical slot, iterate and delete - * everything starting with xid-* + * ok, has to be a surviving logical slot, delete everything except for + * state */ - ReorderBufferCleanupSerializedTXNs(logical_de->d_name); + ReorderBufferCleanup(logical_de->d_name); } FreeDir(logical_dir); + + /* + * Wake up the custodian so it cleans up our old slot data. + */ + if (ProcGlobal->custodianLatch) + SetLatch(ProcGlobal->custodianLatch); } /* --------------------------------------- diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e5e0cf8768..c45f8cf94d 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1430,6 +1430,10 @@ StartupReplicationSlots(void) continue; } + /* if it's an old slot directory that's staged for removal, ignore it */ + if (strstr(replication_de->d_name, ".to_remove") != NULL) + continue; + /* looks like a slot in a normal state, restore */ RestoreSlotFromDisk(replication_de->d_name); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 859424bbd9..ff56ae0b22 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -719,6 +719,7 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); void StartupReorderBuffer(void); +void RemoveStagedSlotDirectories(void); bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, RelFileNode rnode, bool created); -- 2.25.1