From 0a9b98a857e8de02394dc8aefe365a72e50a222e Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Tue, 5 Dec 2017 13:32:45 +0800 Subject: [PATCH v1] Clean up reorder buffer files when starting logical decoding We could fail to clean up reorder buffer files if the walsender exited due to a client disconnect, because we'd skip both the normal exit and error paths. --- src/backend/replication/logical/reorderbuffer.c | 84 +++++++++++++++---------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index fa95bab..bc562e2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -196,6 +196,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferCleanSerializedTXNs(const char *slotname); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, @@ -214,7 +215,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t /* - * Allocate a new ReorderBuffer + * Allocate a new ReorderBuffer and clean out any old serialized state from + * prior ReorderBuffer instances for the same slot. */ ReorderBuffer * ReorderBufferAllocate(void) @@ -223,6 +225,8 @@ ReorderBufferAllocate(void) HASHCTL hash_ctl; MemoryContext new_ctx; + Assert(MyReplicationSlot != NULL); + /* allocate memory in own context, to have better accountability */ new_ctx = AllocSetContextCreate(CurrentMemoryContext, "ReorderBuffer", @@ -266,6 +270,9 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); + /* Ensure there's no stale data from prior uses of this slot */ + ReorderBufferCleanSerializedTXNs(NameStr(MyReplicationSlot->data.name)); + return buffer; } @@ -2551,6 +2558,47 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* + * Remove any leftover serialized reorder buffers from a slot directory after a + * prior crash or decoding session exit. + */ +static void +ReorderBufferCleanSerializedTXNs(const char *slotname) +{ + DIR *spill_dir; + struct dirent *spill_de; + struct stat statbuf; + char path[MAXPGPATH * 2 + 12]; + + sprintf(path, "pg_replslot/%s", slotname); + + /* we're only handling directories here, skip if it's not our's */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + return; + + spill_dir = AllocateDir(path); + while ((spill_de = ReadDir(spill_dir, path)) != NULL) + { + if (strcmp(spill_de->d_name, ".") == 0 || + strcmp(spill_de->d_name, "..") == 0) + continue; + + /* only look at names that can be ours */ + if (strncmp(spill_de->d_name, "xid", 3) == 0) + { + sprintf(path, "pg_replslot/%s/%s", slotname, + spill_de->d_name); + + if (unlink(path) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", + path))); + } + } + FreeDir(spill_dir); +} + +/* * Delete all data spilled to disk after we've restarted/crashed. It will be * recreated when the respective slots are reused. */ @@ -2560,15 +2608,9 @@ StartupReorderBuffer(void) DIR *logical_dir; struct dirent *logical_de; - DIR *spill_dir; - struct dirent *spill_de; - logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { - struct stat statbuf; - char path[MAXPGPATH * 2 + 12]; - if (strcmp(logical_de->d_name, ".") == 0 || strcmp(logical_de->d_name, "..") == 0) continue; @@ -2581,33 +2623,7 @@ StartupReorderBuffer(void) * ok, has to be a surviving logical slot, iterate and delete * everything starting with xid-* */ - sprintf(path, "pg_replslot/%s", logical_de->d_name); - - /* we're only creating directories here, skip if it's not our's */ - if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) - continue; - - spill_dir = AllocateDir(path); - while ((spill_de = ReadDir(spill_dir, path)) != NULL) - { - if (strcmp(spill_de->d_name, ".") == 0 || - strcmp(spill_de->d_name, "..") == 0) - continue; - - /* only look at names that can be ours */ - if (strncmp(spill_de->d_name, "xid", 3) == 0) - { - sprintf(path, "pg_replslot/%s/%s", logical_de->d_name, - spill_de->d_name); - - if (unlink(path) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", - path))); - } - } - FreeDir(spill_dir); + ReorderBufferCleanSerializedTXNs(logical_de->d_name); } FreeDir(logical_dir); } -- 2.9.5