From 64e8bffde370c1b4fcef17a5a372080cda012660 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 15 Oct 2018 22:48:05 +1300 Subject: [PATCH] Refactor the checkpointer request queue. 1. Decouple the the checkpoint queue machinery from md.c, so that future SMGR implementations can also use it. 2. Keep file descriptors open to avoid losing errors on some OSes. Craig Ringer discovered that our practice of closing files and then reopening them in the checkpointer so it can call fsync(2) could lose track of write-back errors on Linux. Change to a model where file descriptors are sent to the checkpointer via the ancillary data mechanism of Unix domain sockets, and the oldest file descriptor for each given file is kept open, so that the write-back errors cannot be lost. On Windows, a pipe is the most natural replacement for a Unix domin socket, but unfortunately pipes don't support multiplexing via WSAEventSelect(), as used by our WaitEventSet machninery. So use "overlapped" IO, and add the ability to wait for IO completion to WaitEventSet. A new wait event flag WL_WIN32_HANDLE is provided on Windows only, and used to wait for asynchronous read and write operations over the checkpointer pipe. For now file descriptors are not transferred via the pipe on Windows. 3. Consider fsync failures to be fatal; the status of data written before a failed fsync(2) is unknown on Linux, even after a later successful fsync(2) call, so it is unsafe to complete a checkpoint. Author: Andres Freund and Thomas Munro Reviewed-by: Thomas Munro, Dmitry Dolgov Discussion: Discussion: https://postgr.es/m/20180427222842.in2e4mibx45zdth5%40alap3.anarazel.de Discussion: https://postgr.es/m/CAMsr+YHh+5Oq4xziwwoEfhoTZgr07vdGG+hu=1adXx59aTeaoQ@mail.gmail.com --- src/backend/access/transam/xlog.c | 9 +- src/backend/bootstrap/bootstrap.c | 1 + src/backend/commands/dbcommands.c | 2 +- src/backend/commands/tablespace.c | 2 +- src/backend/postmaster/bgwriter.c | 1 + src/backend/postmaster/checkpointer.c | 544 +++++++++------ src/backend/postmaster/postmaster.c | 123 +++- src/backend/storage/buffer/bufmgr.c | 2 + src/backend/storage/file/fd.c | 217 +++++- src/backend/storage/freespace/freespace.c | 5 +- src/backend/storage/ipc/ipci.c | 2 + src/backend/storage/ipc/latch.c | 12 + src/backend/storage/smgr/Makefile | 2 +- src/backend/storage/smgr/md.c | 791 ++------------------- src/backend/storage/smgr/smgr.c | 63 +- src/backend/storage/smgr/smgrsync.c | 803 ++++++++++++++++++++++ src/backend/tcop/utility.c | 2 +- src/backend/utils/misc/guc.c | 1 + src/include/postmaster/bgwriter.h | 24 +- src/include/postmaster/checkpointer.h | 71 ++ src/include/postmaster/postmaster.h | 9 + src/include/storage/fd.h | 11 + src/include/storage/latch.h | 1 + src/include/storage/smgr.h | 24 +- src/include/storage/smgrsync.h | 37 + 25 files changed, 1711 insertions(+), 1048 deletions(-) create mode 100644 src/backend/storage/smgr/smgrsync.c create mode 100644 src/include/postmaster/checkpointer.h create mode 100644 src/include/storage/smgrsync.h diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7375a78ffcf..62c5f7e9b96 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -44,6 +44,7 @@ #include "pgstat.h" #include "port/atomics.h" #include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "postmaster/walwriter.h" #include "postmaster/startup.h" #include "replication/basebackup.h" @@ -64,6 +65,7 @@ #include "storage/procarray.h" #include "storage/reinit.h" #include "storage/smgr.h" +#include "storage/smgrsync.h" #include "storage/spin.h" #include "utils/backend_random.h" #include "utils/builtins.h" @@ -8777,8 +8779,10 @@ CreateCheckPoint(int flags) * Note: because it is possible for log_checkpoints to change while a * checkpoint proceeds, we always accumulate stats, even if * log_checkpoints is currently off. + * + * Note #2: this is reset at the end of the checkpoint, not here, because + * we might have to fsync before getting here (see smgrsync()). */ - MemSet(&CheckpointStats, 0, sizeof(CheckpointStats)); CheckpointStats.ckpt_start_t = GetCurrentTimestamp(); /* @@ -9141,6 +9145,9 @@ CreateCheckPoint(int flags) CheckpointStats.ckpt_segs_recycled); LWLockRelease(CheckpointLock); + + /* reset stats */ + MemSet(&CheckpointStats, 0, sizeof(CheckpointStats)); } /* diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 578af2e66d8..43bc24953a4 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -31,6 +31,7 @@ #include "pg_getopt.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "postmaster/startup.h" #include "postmaster/walwriter.h" #include "replication/walreceiver.h" diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 5342f217c02..4d56db8d7b8 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -47,7 +47,7 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" -#include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "replication/slot.h" #include "storage/copydir.h" #include "storage/fd.h" diff --git a/src/backend/commands/tablespace.c b/src/backend/commands/tablespace.c index f7e9160a4f6..3096a2c904d 100644 --- a/src/backend/commands/tablespace.c +++ b/src/backend/commands/tablespace.c @@ -70,7 +70,7 @@ #include "commands/tablespace.h" #include "common/file_perm.h" #include "miscadmin.h" -#include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/standby.h" diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index b1e9bb2c537..d373449e3f7 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -44,6 +44,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "storage/bufmgr.h" #include "storage/buf_internals.h" #include "storage/condition_variable.h" diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 1a033093c53..29d3f937292 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -46,7 +46,10 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" +#include "port/atomics.h" #include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" +#include "postmaster/postmaster.h" #include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" @@ -101,19 +104,21 @@ * * The requests array holds fsync requests sent by backends and not yet * absorbed by the checkpointer. - * - * Unlike the checkpoint fields, num_backend_writes, num_backend_fsync, and - * the requests fields are protected by CheckpointerCommLock. *---------- */ typedef struct { - RelFileNode rnode; - ForkNumber forknum; - BlockNumber segno; /* see md.c for special values */ + uint32 type; + SmgrFileTag tag; + bool contains_fd; + int ckpt_started; + uint64 open_seq; /* might add a real request-type field later; not needed yet */ } CheckpointerRequest; +#define CKPT_REQUEST_RNODE 1 +#define CKPT_REQUEST_SYN 2 + typedef struct { pid_t checkpointer_pid; /* PID (0 if not started) */ @@ -126,12 +131,9 @@ typedef struct int ckpt_flags; /* checkpoint flags, as defined in xlog.h */ - uint32 num_backend_writes; /* counts user backend buffer writes */ - uint32 num_backend_fsync; /* counts user backend fsync calls */ - - int num_requests; /* current # of requests */ - int max_requests; /* allocated array size */ - CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER]; + pg_atomic_uint32 num_backend_writes; /* counts user backend buffer writes */ + pg_atomic_uint32 num_backend_fsync; /* counts user backend fsync calls */ + pg_atomic_uint64 ckpt_cycle; /* cycle */ } CheckpointerShmemStruct; static CheckpointerShmemStruct *CheckpointerShmem; @@ -171,8 +173,9 @@ static pg_time_t last_xlog_switch_time; static void CheckArchiveTimeout(void); static bool IsCheckpointOnSchedule(double progress); static bool ImmediateCheckpointRequested(void); -static bool CompactCheckpointerRequestQueue(void); static void UpdateSharedMemoryConfig(void); +static void SendFsyncRequest(CheckpointerRequest *request, int fd); +static bool AbsorbFsyncRequest(bool stop_at_current_cycle); /* Signal handlers */ @@ -182,6 +185,11 @@ static void ReqCheckpointHandler(SIGNAL_ARGS); static void chkpt_sigusr1_handler(SIGNAL_ARGS); static void ReqShutdownHandler(SIGNAL_ARGS); +#ifdef WIN32 +/* State used to track in-progress asynchronous fsync pipe reads. */ +static OVERLAPPED absorb_overlapped; +static HANDLE *absorb_read_in_progress; +#endif /* * Main entry point for checkpointer process @@ -194,6 +202,7 @@ CheckpointerMain(void) { sigjmp_buf local_sigjmp_buf; MemoryContext checkpointer_context; + WaitEventSet *wes; CheckpointerShmem->checkpointer_pid = MyProcPid; @@ -334,6 +343,21 @@ CheckpointerMain(void) */ ProcGlobal->checkpointerLatch = &MyProc->procLatch; + /* Create reusable WaitEventSet. */ + wes = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, + NULL); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); +#ifndef WIN32 + AddWaitEventToSet(wes, WL_SOCKET_READABLE, fsync_fds[FSYNC_FD_PROCESS], + NULL, NULL); +#else + absorb_overlapped.hEvent = CreateEvent(NULL, TRUE, TRUE, + "fsync pipe read completion"); + AddWaitEventToSet(wes, WL_WIN32_HANDLE, PGINVALID_SOCKET, NULL, + &absorb_overlapped.hEvent); +#endif + /* * Loop forever */ @@ -345,6 +369,7 @@ CheckpointerMain(void) int elapsed_secs; int cur_timeout; int rc; + WaitEvent event; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -545,16 +570,14 @@ CheckpointerMain(void) cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs); } - rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - cur_timeout * 1000L /* convert to ms */ , - WAIT_EVENT_CHECKPOINTER_MAIN); + rc = WaitEventSetWait(wes, cur_timeout * 1000, &event, 1, 0); + Assert(rc > 0); /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. */ - if (rc & WL_POSTMASTER_DEATH) + if (event.events == WL_POSTMASTER_DEATH) exit(1); } } @@ -890,16 +913,7 @@ ReqShutdownHandler(SIGNAL_ARGS) Size CheckpointerShmemSize(void) { - Size size; - - /* - * Currently, the size of the requests[] array is arbitrarily set equal to - * NBuffers. This may prove too large or small ... - */ - size = offsetof(CheckpointerShmemStruct, requests); - size = add_size(size, mul_size(NBuffers, sizeof(CheckpointerRequest))); - - return size; + return sizeof(CheckpointerShmemStruct); } /* @@ -920,13 +934,13 @@ CheckpointerShmemInit(void) if (!found) { /* - * First time through, so initialize. Note that we zero the whole - * requests array; this is so that CompactCheckpointerRequestQueue can - * assume that any pad bytes in the request structs are zeroes. + * First time through, so initialize. */ MemSet(CheckpointerShmem, 0, size); SpinLockInit(&CheckpointerShmem->ckpt_lck); - CheckpointerShmem->max_requests = NBuffers; + pg_atomic_init_u64(&CheckpointerShmem->ckpt_cycle, 0); + pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0); + pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0); } } @@ -1102,181 +1116,84 @@ RequestCheckpoint(int flags) * is theoretically possible a backend fsync might still be necessary, if * the queue is full and contains no duplicate entries. In that case, we * let the backend know by returning false. + * + * We add the cycle counter to the message. That is an unsynchronized read + * of the shared memory counter, but it doesn't matter if it is arbitrarily + * old since it is only used to limit unnecessary extra queue draining in + * AbsorbAllFsyncRequests(). */ -bool -ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) +void +ForwardFsyncRequest(const SmgrFileTag *tag, File file) { - CheckpointerRequest *request; - bool too_full; + CheckpointerRequest request = {0}; if (!IsUnderPostmaster) - return false; /* probably shouldn't even get here */ + elog(ERROR, "ForwardFsyncRequest must not be called in single user mode"); if (AmCheckpointerProcess()) elog(ERROR, "ForwardFsyncRequest must not be called in checkpointer"); - LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); - - /* Count all backend writes regardless of if they fit in the queue */ - if (!AmBackgroundWriterProcess()) - CheckpointerShmem->num_backend_writes++; + request.type = CKPT_REQUEST_RNODE; + request.tag = *tag; +#ifdef CHECKPOINTER_TRANSFER_FILES + request.contains_fd = file != -1; +#else + request.contains_fd = false; +#endif /* - * If the checkpointer isn't running or the request queue is full, the - * backend will have to perform its own fsync request. But before forcing - * that to happen, we can try to compact the request queue. + * Tell the checkpointer the sequence number of the most recent open, so + * that it can be sure to hold the older file descriptor. */ - if (CheckpointerShmem->checkpointer_pid == 0 || - (CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests && - !CompactCheckpointerRequestQueue())) - { - /* - * Count the subset of writes where backends have to do their own - * fsync - */ - if (!AmBackgroundWriterProcess()) - CheckpointerShmem->num_backend_fsync++; - LWLockRelease(CheckpointerCommLock); - return false; - } + request.open_seq = request.contains_fd ? FileGetOpenSeq(file) : (uint64) -1; - /* OK, insert request */ - request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++]; - request->rnode = rnode; - request->forknum = forknum; - request->segno = segno; - - /* If queue is more than half full, nudge the checkpointer to empty it */ - too_full = (CheckpointerShmem->num_requests >= - CheckpointerShmem->max_requests / 2); - - LWLockRelease(CheckpointerCommLock); - - /* ... but not till after we release the lock */ - if (too_full && ProcGlobal->checkpointerLatch) - SetLatch(ProcGlobal->checkpointerLatch); + /* + * We read ckpt_started without synchronization. It is used to prevent + * AbsorbAllFsyncRequests() from reading new values from after a + * checkpoint began. A slightly out-of-date value here will only cause + * it to do a little bit more work than strictly necessary, but that's + * OK. + */ + request.ckpt_started = CheckpointerShmem->ckpt_started; - return true; + SendFsyncRequest(&request, + request.contains_fd ? FileGetRawDesc(file) : -1); } /* - * CompactCheckpointerRequestQueue - * Remove duplicates from the request queue to avoid backend fsyncs. - * Returns "true" if any entries were removed. - * - * Although a full fsync request queue is not common, it can lead to severe - * performance problems when it does happen. So far, this situation has - * only been observed to occur when the system is under heavy write load, - * and especially during the "sync" phase of a checkpoint. Without this - * logic, each backend begins doing an fsync for every block written, which - * gets very expensive and can slow down the whole system. + * AbsorbFsyncRequests + * Retrieve queued fsync requests and pass them to local smgr. Stop when + * resources would be exhausted by absorbing more. * - * Trying to do this every time the queue is full could lose if there - * aren't any removable entries. But that should be vanishingly rare in - * practice: there's one queue entry per shared buffer. + * This is exported because we want to continue accepting requests during + * smgrsync(). */ -static bool -CompactCheckpointerRequestQueue(void) +void +AbsorbFsyncRequests(void) { - struct CheckpointerSlotMapping - { - CheckpointerRequest request; - int slot; - }; - - int n, - preserve_count; - int num_skipped = 0; - HASHCTL ctl; - HTAB *htab; - bool *skip_slot; - - /* must hold CheckpointerCommLock in exclusive mode */ - Assert(LWLockHeldByMe(CheckpointerCommLock)); - - /* Initialize skip_slot array */ - skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests); - - /* Initialize temporary hash table */ - MemSet(&ctl, 0, sizeof(ctl)); - ctl.keysize = sizeof(CheckpointerRequest); - ctl.entrysize = sizeof(struct CheckpointerSlotMapping); - ctl.hcxt = CurrentMemoryContext; - - htab = hash_create("CompactCheckpointerRequestQueue", - CheckpointerShmem->num_requests, - &ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - /* - * The basic idea here is that a request can be skipped if it's followed - * by a later, identical request. It might seem more sensible to work - * backwards from the end of the queue and check whether a request is - * *preceded* by an earlier, identical request, in the hopes of doing less - * copying. But that might change the semantics, if there's an - * intervening FORGET_RELATION_FSYNC or FORGET_DATABASE_FSYNC request, so - * we do it this way. It would be possible to be even smarter if we made - * the code below understand the specific semantics of such requests (it - * could blow away preceding entries that would end up being canceled - * anyhow), but it's not clear that the extra complexity would buy us - * anything. - */ - for (n = 0; n < CheckpointerShmem->num_requests; n++) - { - CheckpointerRequest *request; - struct CheckpointerSlotMapping *slotmap; - bool found; - - /* - * We use the request struct directly as a hashtable key. This - * assumes that any padding bytes in the structs are consistently the - * same, which should be okay because we zeroed them in - * CheckpointerShmemInit. Note also that RelFileNode had better - * contain no pad bytes. - */ - request = &CheckpointerShmem->requests[n]; - slotmap = hash_search(htab, request, HASH_ENTER, &found); - if (found) - { - /* Duplicate, so mark the previous occurrence as skippable */ - skip_slot[slotmap->slot] = true; - num_skipped++; - } - /* Remember slot containing latest occurrence of this request value */ - slotmap->slot = n; - } + if (!AmCheckpointerProcess()) + return; - /* Done with the hash table. */ - hash_destroy(htab); + /* Transfer stats counts into pending pgstats message */ + BgWriterStats.m_buf_written_backend += + pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0); + BgWriterStats.m_buf_fsync_backend += + pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0); - /* If no duplicates, we're out of luck. */ - if (!num_skipped) + while (true) { - pfree(skip_slot); - return false; - } + if (!FlushFsyncRequestQueueIfNecessary()) + break; - /* We found some duplicates; remove them. */ - preserve_count = 0; - for (n = 0; n < CheckpointerShmem->num_requests; n++) - { - if (skip_slot[n]) - continue; - CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n]; + if (!AbsorbFsyncRequest(false)) + break; } - ereport(DEBUG1, - (errmsg("compacted fsync request queue from %d entries to %d entries", - CheckpointerShmem->num_requests, preserve_count))); - CheckpointerShmem->num_requests = preserve_count; - - /* Cleanup. */ - pfree(skip_slot); - return true; } /* - * AbsorbFsyncRequests - * Retrieve queued fsync requests and pass them to local smgr. + * AbsorbAllFsyncRequests + * Retrieve all already pending fsync requests and pass them to local + * smgr. * * This is exported because it must be called during CreateCheckPoint; * we have to be sure we have accepted all pending requests just before @@ -1284,54 +1201,121 @@ CompactCheckpointerRequestQueue(void) * non-checkpointer processes, do nothing if not checkpointer. */ void -AbsorbFsyncRequests(void) +AbsorbAllFsyncRequests(void) { - CheckpointerRequest *requests = NULL; - CheckpointerRequest *request; - int n; - if (!AmCheckpointerProcess()) return; - LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); - /* Transfer stats counts into pending pgstats message */ - BgWriterStats.m_buf_written_backend += CheckpointerShmem->num_backend_writes; - BgWriterStats.m_buf_fsync_backend += CheckpointerShmem->num_backend_fsync; - - CheckpointerShmem->num_backend_writes = 0; - CheckpointerShmem->num_backend_fsync = 0; + BgWriterStats.m_buf_written_backend += + pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0); + BgWriterStats.m_buf_fsync_backend += + pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0); - /* - * We try to avoid holding the lock for a long time by copying the request - * array, and processing the requests after releasing the lock. - * - * Once we have cleared the requests from shared memory, we have to PANIC - * if we then fail to absorb them (eg, because our hashtable runs out of - * memory). This is because the system cannot run safely if we are unable - * to fsync what we have been told to fsync. Fortunately, the hashtable - * is so small that the problem is quite unlikely to arise in practice. - */ - n = CheckpointerShmem->num_requests; - if (n > 0) + for (;;) { - requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest)); - memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest)); + if (!FlushFsyncRequestQueueIfNecessary()) + elog(FATAL, "may not happen"); + + if (!AbsorbFsyncRequest(true)) + break; } +} + +/* + * AbsorbFsyncRequest + * Retrieve one queued fsync request and pass them to local smgr. + */ +static bool +AbsorbFsyncRequest(bool stop_at_current_cycle) +{ + static CheckpointerRequest req; + int fd = -1; +#ifndef WIN32 + int ret; +#else + DWORD bytes_read; +#endif + + ReleaseLruFiles(); START_CRIT_SECTION(); +#ifndef WIN32 + ret = pg_uds_recv_with_fd(fsync_fds[FSYNC_FD_PROCESS], + &req, + sizeof(req), + &fd); + if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + END_CRIT_SECTION(); + return false; + } + else if (ret < 0) + elog(ERROR, "recvmsg failed: %m"); +#else + if (!absorb_read_in_progress) + { + if (!ReadFile(fsyncPipe[FSYNC_FD_PROCESS], + &req, + sizeof(req), + &bytes_read, + &absorb_overlapped)) + { + if (GetLastError() != ERROR_IO_PENDING) + { + _dosmaperr(GetLastError()); + elog(ERROR, "can't begin read from fsync pipe: %m"); + } - CheckpointerShmem->num_requests = 0; + /* + * An asynchronous read has begun. We'll tell caller to call us + * back when the event indicates completion. + */ + absorb_read_in_progress = &absorb_overlapped.hEvent; + END_CRIT_SECTION(); + return false; + } + /* The read completed synchronously. 'req' is now populated. */ + } + if (absorb_read_in_progress) + { + /* Completed yet? */ + if (!GetOverlappedResult(fsyncPipe[FSYNC_FD_PROCESS], + &absorb_overlapped, + &bytes_read, + false)) + { + if (GetLastError() == ERROR_IO_INCOMPLETE) + { + /* Nope. Spurious event? Tell caller to wait some more. */ + END_CRIT_SECTION(); + return false; + } + _dosmaperr(GetLastError()); + elog(ERROR, "can't complete from fsync pipe: %m"); + } + /* The asynchronous read completed. 'req' is now populated. */ + absorb_read_in_progress = NULL; + } - LWLockRelease(CheckpointerCommLock); + /* Check message size. */ + if (bytes_read != sizeof(req)) + elog(ERROR, "unexpected short read on fsync pipe"); +#endif - for (request = requests; n > 0; request++, n--) - RememberFsyncRequest(request->rnode, request->forknum, request->segno); + if (req.contains_fd != (fd != -1)) + { + elog(FATAL, "message should have fd associated, but doesn't"); + } + RememberFsyncRequest(&req.tag, fd, req.open_seq); END_CRIT_SECTION(); - if (requests) - pfree(requests); + if (stop_at_current_cycle && + req.ckpt_started == CheckpointerShmem->ckpt_started) + return false; + + return true; } /* @@ -1374,3 +1358,139 @@ FirstCallSinceLastCheckpoint(void) return FirstCall; } + +uint64 +GetCheckpointSyncCycle(void) +{ + return pg_atomic_read_u64(&CheckpointerShmem->ckpt_cycle); +} + +uint64 +IncCheckpointSyncCycle(void) +{ + return pg_atomic_fetch_add_u64(&CheckpointerShmem->ckpt_cycle, 1); +} + +void +CountBackendWrite(void) +{ + pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_writes, 1); +} + +/* + * Send a message to the checkpointer's fsync socket (Unix) or pipe (Windows). + * This is essentially a blocking call (there is no CHECK_FOR_INTERRUPTS, and + * even if there were it'd be surpressed since callers hold a lock), except + * that we don't ignore postmaster death so we need an event loop. + * + * The code is rather different on Windows, because there we have to begin the + * write and then wait for it to complete, while on Unix we have to wait until + * we can do the write. + */ +static void +SendFsyncRequest(CheckpointerRequest *request, int fd) +{ +#ifndef WIN32 + ssize_t ret; + int rc; + + while (true) + { + ret = pg_uds_send_with_fd(fsync_fds[FSYNC_FD_SUBMIT], + request, + sizeof(*request), + request->contains_fd ? fd : -1); + + if (ret >= 0) + { + /* + * Don't think short writes will ever happen in realistic + * implementations, but better make sure that's true... + */ + if (ret != sizeof(*request)) + elog(FATAL, "unexpected short write to fsync request socket"); + break; + } + else if (errno == EWOULDBLOCK || errno == EAGAIN +#ifdef __darwin__ + || errno == EMSGSIZE || errno == ENOBUFS +#endif + ) + { + /* + * Testing on macOS 10.13 showed occasional EMSGSIZE or + * ENOBUFS errors, which could be handled by retrying. Unless + * the problem also shows up on other systems, let's handle those + * only for that OS. + */ + + /* Blocked on write - wait for socket to become readable */ + rc = WaitLatchOrSocket(NULL, + WL_SOCKET_WRITEABLE | WL_POSTMASTER_DEATH, + fsync_fds[FSYNC_FD_SUBMIT], -1, 0); + if (rc & WL_POSTMASTER_DEATH) + exit(1); + } + else + ereport(FATAL, (errmsg("could not send fsync request: %m"))); + } + +#else /* WIN32 */ + { + OVERLAPPED overlapped = {0}; + DWORD nwritten; + int rc; + + overlapped.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); + + if (!WriteFile(fsyncPipe[FSYNC_FD_SUBMIT], + request, + sizeof(*request), + &nwritten, + &overlapped)) + { + WaitEventSet *wes; + WaitEvent event; + + /* Handle unexpected errors. */ + if (GetLastError() != ERROR_IO_PENDING) + { + _dosmaperr(GetLastError()); + CloseHandle(overlapped.hEvent); + ereport(FATAL, (errmsg("could not send fsync request: %m"))); + } + + /* Wait for asynchronous IO to complete. */ + wes = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, + NULL); + AddWaitEventToSet(wes, WL_WIN32_HANDLE, PGINVALID_SOCKET, NULL, + &overlapped.hEvent); + for (;;) + { + rc = WaitEventSetWait(wes, -1, &event, 1, 0); + Assert(rc > 0); + if (event.events == WL_POSTMASTER_DEATH) + exit(1); + if (event.events == WL_WIN32_HANDLE) + { + if (!GetOverlappedResult(fsyncPipe[FSYNC_FD_SUBMIT], &overlapped, + &nwritten, FALSE)) + { + _dosmaperr(GetLastError()); + CloseHandle(overlapped.hEvent); + ereport(FATAL, (errmsg("could not get result of sending fsync request: %m"))); + } + if (nwritten > 0) + break; + } + } + FreeWaitEventSet(wes); + } + + CloseHandle(overlapped.hEvent); + if (nwritten != sizeof(*request)) + elog(FATAL, "unexpected short write to fsync request pipe"); + } +#endif +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 41de140ae01..8ec71d13fa7 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -70,6 +70,7 @@ #include #include #include +#include #include #include #include @@ -434,6 +435,7 @@ static pid_t StartChildProcess(AuxProcType type); static void StartAutovacuumWorker(void); static void MaybeStartWalReceiver(void); static void InitPostmasterDeathWatchHandle(void); +static void InitFsyncFdSocketPair(void); /* * Archiver is allowed to start up at the current postmaster state? @@ -523,9 +525,11 @@ typedef struct HANDLE PostmasterHandle; HANDLE initial_signal_pipe; HANDLE syslogPipe[2]; + HANDLE fsyncPipe[2]; #else int postmaster_alive_fds[2]; int syslogPipe[2]; + int fsync_fds[2]; #endif char my_exec_path[MAXPGPATH]; char pkglib_path[MAXPGPATH]; @@ -568,6 +572,12 @@ int postmaster_alive_fds[2] = {-1, -1}; HANDLE PostmasterHandle; #endif +#ifndef WIN32 +int fsync_fds[2] = {-1, -1}; +#else +HANDLE fsyncPipe[2] = {0, 0}; +#endif + /* * Postmaster main entry point */ @@ -1195,6 +1205,11 @@ PostmasterMain(int argc, char *argv[]) */ InitPostmasterDeathWatchHandle(); + /* + * Initialize socket pair used to transport file descriptors over. + */ + InitFsyncFdSocketPair(); + #ifdef WIN32 /* @@ -5994,7 +6009,8 @@ extern pg_time_t first_syslogger_file_time; #define write_inheritable_socket(dest, src, childpid) ((*(dest) = (src)), true) #define read_inheritable_socket(dest, src) (*(dest) = *(src)) #else -static bool write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE child); +static bool write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE child, + bool close_source); static bool write_inheritable_socket(InheritableSocket *dest, SOCKET src, pid_t childPid); static void read_inheritable_socket(SOCKET *dest, InheritableSocket *src); @@ -6058,11 +6074,20 @@ save_backend_variables(BackendParameters *param, Port *port, param->PostmasterHandle = PostmasterHandle; if (!write_duplicated_handle(¶m->initial_signal_pipe, pgwin32_create_signal_listener(childPid), - childProcess)) + childProcess, true)) + return false; + if (!write_duplicated_handle(¶m->fsyncPipe[0], + fsyncPipe[0], + childProcess, false)) + return false; + if (!write_duplicated_handle(¶m->fsyncPipe[1], + fsyncPipe[1], + childProcess, false)) return false; #else memcpy(¶m->postmaster_alive_fds, &postmaster_alive_fds, sizeof(postmaster_alive_fds)); + memcpy(¶m->fsync_fds, &fsync_fds, sizeof(fsync_fds)); #endif memcpy(¶m->syslogPipe, &syslogPipe, sizeof(syslogPipe)); @@ -6083,7 +6108,8 @@ save_backend_variables(BackendParameters *param, Port *port, * process instance of the handle to the parameter file. */ static bool -write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE childProcess) +write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE childProcess, + bool close_source) { HANDLE hChild = INVALID_HANDLE_VALUE; @@ -6093,7 +6119,8 @@ write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE childProcess) &hChild, 0, TRUE, - DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS)) + (close_source ? DUPLICATE_CLOSE_SOURCE : 0) | + DUPLICATE_SAME_ACCESS)) { ereport(LOG, (errmsg_internal("could not duplicate handle to be written to backend parameter file: error code %lu", @@ -6289,9 +6316,12 @@ restore_backend_variables(BackendParameters *param, Port *port) #ifdef WIN32 PostmasterHandle = param->PostmasterHandle; pgwin32_initial_signal_pipe = param->initial_signal_pipe; + fsyncPipe[0] = param->fsyncPipe[0]; + fsyncPipe[1] = param->fsyncPipe[1]; #else memcpy(&postmaster_alive_fds, ¶m->postmaster_alive_fds, sizeof(postmaster_alive_fds)); + memcpy(&fsync_fds, ¶m->fsync_fds, sizeof(fsync_fds)); #endif memcpy(&syslogPipe, ¶m->syslogPipe, sizeof(syslogPipe)); @@ -6468,3 +6498,88 @@ InitPostmasterDeathWatchHandle(void) GetLastError()))); #endif /* WIN32 */ } + +/* Create socket used for requesting fsyncs by checkpointer */ +static void +InitFsyncFdSocketPair(void) +{ + Assert(MyProcPid == PostmasterPid); + +#ifndef WIN32 + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fsync_fds) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create fsync sockets: %m"))); + /* + * Set O_NONBLOCK on both fds. + */ + if (fcntl(fsync_fds[FSYNC_FD_PROCESS], F_SETFL, O_NONBLOCK) == -1) + ereport(FATAL, + (errcode_for_socket_access(), + errmsg_internal("could not set fsync process socket to nonblocking mode: %m"))); +#ifndef EXEC_BACKEND + if (fcntl(fsync_fds[FSYNC_FD_PROCESS], F_SETFD, FD_CLOEXEC) == -1) + ereport(FATAL, + (errcode_for_socket_access(), + errmsg_internal("could not set fsync process socket to close-on-exec mode: %m"))); +#endif + + if (fcntl(fsync_fds[FSYNC_FD_SUBMIT], F_SETFL, O_NONBLOCK) == -1) + ereport(FATAL, + (errcode_for_socket_access(), + errmsg_internal("could not set fsync submit socket to nonblocking mode: %m"))); +#ifndef EXEC_BACKEND + if (fcntl(fsync_fds[FSYNC_FD_SUBMIT], F_SETFD, FD_CLOEXEC) == -1) + ereport(FATAL, + (errcode_for_socket_access(), + errmsg_internal("could not set fsync submit socket to close-on-exec mode: %m"))); +#endif +#else + { + UCHAR pipename[MAX_PATH]; + SECURITY_ATTRIBUTES sa; + + memset(&sa, 0, sizeof(sa)); + + /* + * We'll create a named pipe, because anonymous pipes don't allow + * overlapped (= async) IO or message-orient communication. We'll + * open both ends of it here, and then duplicate them into all child + * processes in save_backend_variables(). First, open the server end. + */ + snprintf(pipename, sizeof(pipename), "\\\\.\\Pipe\\fsync_pipe.%08x", + GetCurrentProcessId()); + fsyncPipe[FSYNC_FD_PROCESS] = CreateNamedPipeA(pipename, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_MESSAGE | PIPE_WAIT, + 1, + 4096, + 4096, + -1, + &sa); + if (!fsyncPipe[FSYNC_FD_PROCESS]) + { + _dosmaperr(GetLastError()); + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create server end of fsync pipe: %m"))); + } + + /* Now open the client end. */ + fsyncPipe[FSYNC_FD_SUBMIT] = CreateFileA(pipename, + GENERIC_WRITE, + 0, + &sa, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, + NULL); + if (!fsyncPipe[FSYNC_FD_SUBMIT]) + { + _dosmaperr(GetLastError()); + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create client end of fsync pipe: %m"))); + } + } +#endif +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 01eabe57063..256cc5e0217 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -42,11 +42,13 @@ #include "pg_trace.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/smgr.h" +#include "storage/smgrsync.h" #include "storage/standby.h" #include "utils/rel.h" #include "utils/resowner_private.h" diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 8dd51f17674..d5c8328b5d6 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -85,6 +85,7 @@ #include "catalog/pg_tablespace.h" #include "common/file_perm.h" #include "pgstat.h" +#include "port/atomics.h" #include "portability/mem.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -180,6 +181,7 @@ int max_safe_fds = 32; /* default if not changed */ #define FD_DELETE_AT_CLOSE (1 << 0) /* T = delete when closed */ #define FD_CLOSE_AT_EOXACT (1 << 1) /* T = close at eoXact */ #define FD_TEMP_FILE_LIMIT (1 << 2) /* T = respect temp_file_limit */ +#define FD_NOT_IN_LRU (1 << 3) /* T = not in LRU */ typedef struct vfd { @@ -195,6 +197,7 @@ typedef struct vfd /* NB: fileName is malloc'd, and must be free'd when closing the VFD */ int fileFlags; /* open(2) flags for (re)opening the file */ mode_t fileMode; /* mode to pass to open(2) */ + uint64 open_seq; /* sequence number of opened file */ } Vfd; /* @@ -304,7 +307,6 @@ static void LruDelete(File file); static void Insert(File file); static int LruInsert(File file); static bool ReleaseLruFile(void); -static void ReleaseLruFiles(void); static File AllocateVfd(void); static void FreeVfd(File file); @@ -333,6 +335,13 @@ static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel); static int fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel); static int fsync_parent_path(const char *fname, int elevel); +/* Shared memory state. */ +typedef struct +{ + pg_atomic_uint64 open_seq; +} FdSharedData; + +static FdSharedData *fd_shared; /* * pg_fsync --- do fsync with or without writethrough @@ -789,6 +798,20 @@ InitFileAccess(void) on_proc_exit(AtProcExit_Files, 0); } +/* + * Initialize shared memory state. This is called after shared memory is + * ready. + */ +void +FileShmemInit(void) +{ + bool found; + + fd_shared = ShmemInitStruct("fd_shared", sizeof(*fd_shared), &found); + if (!found) + pg_atomic_init_u64(&fd_shared->open_seq, 0); +} + /* * count_usable_fds --- count how many FDs the system will let us open, * and estimate how many are already open. @@ -1113,6 +1136,8 @@ LruInsert(File file) { ++nfile; } + vfdP->open_seq = + pg_atomic_fetch_add_u64(&fd_shared->open_seq, 1); /* * Seek to the right position. We need no special case for seekPos @@ -1176,7 +1201,7 @@ ReleaseLruFile(void) * Release kernel FDs as needed to get under the max_safe_fds limit. * After calling this, it's OK to try to open another file. */ -static void +void ReleaseLruFiles(void) { while (nfile + numAllocatedDescs >= max_safe_fds) @@ -1289,9 +1314,11 @@ FileAccess(File file) * We now know that the file is open and that it is not the last one * accessed, so we need to move it to the head of the Lru ring. */ - - Delete(file); - Insert(file); + if (!(VfdCache[file].fdstate & FD_NOT_IN_LRU)) + { + Delete(file); + Insert(file); + } } return 0; @@ -1410,6 +1437,58 @@ PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode) vfdP->fileSize = 0; vfdP->fdstate = 0x0; vfdP->resowner = NULL; + vfdP->open_seq = pg_atomic_fetch_add_u64(&fd_shared->open_seq, 1); + + return file; +} + +/* + * Open a File for a pre-existing file descriptor. + * + * Note that these files will not be closed in an LRU basis, therefore the + * caller is responsible for limiting the number of open file descriptors. + * + * The passed in name is purely for informational purposes. + */ +File +FileOpenForFd(int fd, const char *fileName, uint64 open_seq) +{ + char *fnamecopy; + File file; + Vfd *vfdP; + + /* + * We need a malloc'd copy of the file name; fail cleanly if no room. + */ + fnamecopy = strdup(fileName); + if (fnamecopy == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + file = AllocateVfd(); + vfdP = &VfdCache[file]; + + /* Close excess kernel FDs. */ + ReleaseLruFiles(); + + vfdP->fd = fd; + ++nfile; + + DO_DB(elog(LOG, "FileOpenForFd: success %d/%d (%s)", + file, fd, fnamecopy)); + + /* NB: Explicitly not inserted into LRU! */ + + vfdP->fileName = fnamecopy; + /* Saved flags are adjusted to be OK for re-opening file */ + vfdP->fileFlags = 0; + vfdP->fileMode = 0; + vfdP->seekPos = 0; + vfdP->fileSize = 0; + vfdP->fdstate = FD_NOT_IN_LRU; + vfdP->resowner = NULL; + vfdP->open_seq = open_seq; return file; } @@ -1760,7 +1839,11 @@ FileClose(File file) vfdP->fd = VFD_CLOSED; /* remove the file from the lru ring */ - Delete(file); + if (!(vfdP->fdstate & FD_NOT_IN_LRU)) + { + vfdP->fdstate &= ~FD_NOT_IN_LRU; + Delete(file); + } } if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) @@ -2232,6 +2315,10 @@ int FileGetRawDesc(File file) { Assert(FileIsValid(file)); + + if (FileAccess(file)) + return -1; + return VfdCache[file].fd; } @@ -2255,6 +2342,17 @@ FileGetRawMode(File file) return VfdCache[file].fileMode; } +/* + * Get the opening sequence number of this file. This number is captured + * after the file was opened but before anything was written to the file, + */ +uint64 +FileGetOpenSeq(File file) +{ + Assert(FileIsValid(file)); + return VfdCache[file].open_seq; +} + /* * Make room for another allocatedDescs[] array entry if needed and possible. * Returns true if an array element is available. @@ -3572,3 +3670,110 @@ MakePGDirectory(const char *directoryName) { return mkdir(directoryName, pg_dir_create_mode); } + +#ifndef WIN32 + +/* + * Send data over a unix domain socket, optionally (when fd != -1) including a + * file descriptor. + */ +ssize_t +pg_uds_send_with_fd(int sock, void *buf, ssize_t buflen, int fd) +{ + ssize_t size; + struct msghdr msg = {0}; + struct iovec iov = {0}; + /* cmsg header, union for correct alignment */ + union + { + struct cmsghdr cmsghdr; + char control[CMSG_SPACE(sizeof (int))]; + } cmsgu; + struct cmsghdr *cmsg; + + memset(&cmsgu, 0, sizeof(cmsgu)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + if (fd >= 0) + { + msg.msg_control = cmsgu.control; + msg.msg_controllen = sizeof(cmsgu.control); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_len = CMSG_LEN(sizeof (int)); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + + *((int *) CMSG_DATA(cmsg)) = fd; + } + + size = sendmsg(sock, &msg, 0); + + /* errors are returned directly */ + return size; +} + +/* + * Receive data from a unix domain socket. If a file is sent over the socket, + * store it in *fd. + */ +ssize_t +pg_uds_recv_with_fd(int sock, void *buf, ssize_t bufsize, int *fd) +{ + ssize_t size; + struct msghdr msg; + struct iovec iov; + /* cmsg header, union for correct alignment */ + union + { + struct cmsghdr cmsghdr; + char control[CMSG_SPACE(sizeof (int))]; + } cmsgu; + struct cmsghdr *cmsg; + + Assert(fd != NULL); + + iov.iov_base = buf; + iov.iov_len = bufsize; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = cmsgu.control; + msg.msg_controllen = sizeof(cmsgu.control); + + size = recvmsg (sock, &msg, 0); + + if (size < 0) + { + *fd = -1; + return size; + } + + cmsg = CMSG_FIRSTHDR(&msg); + if (cmsg && cmsg->cmsg_len == CMSG_LEN(sizeof(int))) + { + if (cmsg->cmsg_level != SOL_SOCKET) + elog(FATAL, "unexpected cmsg_level"); + + if (cmsg->cmsg_type != SCM_RIGHTS) + elog(FATAL, "unexpected cmsg_type"); + + *fd = *((int *) CMSG_DATA(cmsg)); + + /* FIXME: check / handle additional cmsg structures */ + } + else + *fd = -1; + + return size; +} + +#endif diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c index 7c4ad1c4494..2b47824aab9 100644 --- a/src/backend/storage/freespace/freespace.c +++ b/src/backend/storage/freespace/freespace.c @@ -556,7 +556,7 @@ fsm_readbuf(Relation rel, FSMAddress addr, bool extend) * not on extension.) */ if (rel->rd_smgr->smgr_fsm_nblocks == InvalidBlockNumber || - blkno >= rel->rd_smgr->smgr_fsm_nblocks) + rel->rd_smgr->smgr_fsm_nblocks == 0) { if (smgrexists(rel->rd_smgr, FSM_FORKNUM)) rel->rd_smgr->smgr_fsm_nblocks = smgrnblocks(rel->rd_smgr, @@ -564,6 +564,9 @@ fsm_readbuf(Relation rel, FSMAddress addr, bool extend) else rel->rd_smgr->smgr_fsm_nblocks = 0; } + else if (blkno >= rel->rd_smgr->smgr_fsm_nblocks) + rel->rd_smgr->smgr_fsm_nblocks = smgrnblocks(rel->rd_smgr, + FSM_FORKNUM); /* Handle requests beyond EOF */ if (blkno >= rel->rd_smgr->smgr_fsm_nblocks) diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 0c86a581c03..efbd25b84da 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -27,6 +27,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" #include "replication/slot.h" @@ -270,6 +271,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SyncScanShmemInit(); AsyncShmemInit(); BackendRandomShmemInit(); + FileShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index f6dda9cc9ac..081d399eefc 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -878,6 +878,12 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) { *handle = PostmasterHandle; } +#ifdef WIN32 + else if (event->events == WL_WIN32_HANDLE) + { + *handle = *(HANDLE *)event->user_data; + } +#endif else { int flags = FD_CLOSE; /* always check for errors/EOF */ @@ -1453,6 +1459,12 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } + else if (cur_event->events & WL_WIN32_HANDLE) + { + occurred_events->events |= WL_WIN32_HANDLE; + occurred_events++; + returned_events++; + } return returned_events; } diff --git a/src/backend/storage/smgr/Makefile b/src/backend/storage/smgr/Makefile index 2b95cb0df16..c9c4be325ed 100644 --- a/src/backend/storage/smgr/Makefile +++ b/src/backend/storage/smgr/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/storage/smgr top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = md.o smgr.o smgrtype.o +OBJS = md.o smgr.o smgrsync.o smgrtype.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index f4374d077be..d6bff3b6e03 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -30,37 +30,24 @@ #include "access/xlog.h" #include "pgstat.h" #include "portability/instr_time.h" -#include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "storage/fd.h" #include "storage/bufmgr.h" #include "storage/relfilenode.h" #include "storage/smgr.h" +#include "storage/smgrsync.h" #include "utils/hsearch.h" #include "utils/memutils.h" #include "pg_trace.h" -/* intervals for calling AbsorbFsyncRequests in mdsync and mdpostckpt */ -#define FSYNCS_PER_ABSORB 10 -#define UNLINKS_PER_ABSORB 10 - -/* - * Special values for the segno arg to RememberFsyncRequest. - * - * Note that CompactCheckpointerRequestQueue assumes that it's OK to remove an - * fsync request from the queue if an identical, subsequent request is found. - * See comments there before making changes here. - */ -#define FORGET_RELATION_FSYNC (InvalidBlockNumber) -#define FORGET_DATABASE_FSYNC (InvalidBlockNumber-1) -#define UNLINK_RELATION_REQUEST (InvalidBlockNumber-2) /* * On Windows, we have to interpret EACCES as possibly meaning the same as * ENOENT, because if a file is unlinked-but-not-yet-gone on that platform, * that's what you get. Ugh. This code is designed so that we don't * actually believe these cases are okay without further evidence (namely, - * a pending fsync request getting canceled ... see mdsync). + * a pending fsync request getting canceled ... see smgrsync). */ #ifndef WIN32 #define FILE_POSSIBLY_DELETED(err) ((err) == ENOENT) @@ -110,6 +97,7 @@ typedef struct _MdfdVec { File mdfd_vfd; /* fd number in fd.c's pool */ BlockNumber mdfd_segno; /* segment number, from 0 */ + uint64 mdfd_dirtied_cycle; } MdfdVec; static MemoryContext MdCxt; /* context for all MdfdVec objects */ @@ -134,30 +122,9 @@ static MemoryContext MdCxt; /* context for all MdfdVec objects */ * (Regular backends do not track pending operations locally, but forward * them to the checkpointer.) */ -typedef uint16 CycleCtr; /* can be any convenient integer size */ +typedef uint32 CycleCtr; /* can be any convenient integer size */ -typedef struct -{ - RelFileNode rnode; /* hash table key (must be first!) */ - CycleCtr cycle_ctr; /* mdsync_cycle_ctr of oldest request */ - /* requests[f] has bit n set if we need to fsync segment n of fork f */ - Bitmapset *requests[MAX_FORKNUM + 1]; - /* canceled[f] is true if we canceled fsyncs for fork "recently" */ - bool canceled[MAX_FORKNUM + 1]; -} PendingOperationEntry; - -typedef struct -{ - RelFileNode rnode; /* the dead relation to delete */ - CycleCtr cycle_ctr; /* mdckpt_cycle_ctr when request was made */ -} PendingUnlinkEntry; -static HTAB *pendingOpsTable = NULL; -static List *pendingUnlinks = NIL; -static MemoryContext pendingOpsCxt; /* context for the above */ - -static CycleCtr mdsync_cycle_ctr = 0; -static CycleCtr mdckpt_cycle_ctr = 0; /*** behavior for mdopen & _mdfd_getseg ***/ @@ -184,8 +151,7 @@ static void mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo); static MdfdVec *mdopen(SMgrRelation reln, ForkNumber forknum, int behavior); static void register_dirty_segment(SMgrRelation reln, ForkNumber forknum, - MdfdVec *seg); -static void register_unlink(RelFileNodeBackend rnode); + MdfdVec *seg); static void _fdvec_resize(SMgrRelation reln, ForkNumber forknum, int nseg); @@ -208,64 +174,6 @@ mdinit(void) MdCxt = AllocSetContextCreate(TopMemoryContext, "MdSmgr", ALLOCSET_DEFAULT_SIZES); - - /* - * Create pending-operations hashtable if we need it. Currently, we need - * it if we are standalone (not under a postmaster) or if we are a startup - * or checkpointer auxiliary process. - */ - if (!IsUnderPostmaster || AmStartupProcess() || AmCheckpointerProcess()) - { - HASHCTL hash_ctl; - - /* - * XXX: The checkpointer needs to add entries to the pending ops table - * when absorbing fsync requests. That is done within a critical - * section, which isn't usually allowed, but we make an exception. It - * means that there's a theoretical possibility that you run out of - * memory while absorbing fsync requests, which leads to a PANIC. - * Fortunately the hash table is small so that's unlikely to happen in - * practice. - */ - pendingOpsCxt = AllocSetContextCreate(MdCxt, - "Pending ops context", - ALLOCSET_DEFAULT_SIZES); - MemoryContextAllowInCriticalSection(pendingOpsCxt, true); - - MemSet(&hash_ctl, 0, sizeof(hash_ctl)); - hash_ctl.keysize = sizeof(RelFileNode); - hash_ctl.entrysize = sizeof(PendingOperationEntry); - hash_ctl.hcxt = pendingOpsCxt; - pendingOpsTable = hash_create("Pending Ops Table", - 100L, - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - pendingUnlinks = NIL; - } -} - -/* - * In archive recovery, we rely on checkpointer to do fsyncs, but we will have - * already created the pendingOpsTable during initialization of the startup - * process. Calling this function drops the local pendingOpsTable so that - * subsequent requests will be forwarded to checkpointer. - */ -void -SetForwardFsyncRequests(void) -{ - /* Perform any pending fsyncs we may have queued up, then drop table */ - if (pendingOpsTable) - { - mdsync(); - hash_destroy(pendingOpsTable); - } - pendingOpsTable = NULL; - - /* - * We should not have any pending unlink requests, since mdunlink doesn't - * queue unlink requests when isRedo. - */ - Assert(pendingUnlinks == NIL); } /* @@ -334,6 +242,7 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo) mdfd = &reln->md_seg_fds[forkNum][0]; mdfd->mdfd_vfd = fd; mdfd->mdfd_segno = 0; + mdfd->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1; } /* @@ -388,7 +297,7 @@ mdunlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo) { /* * We have to clean out any pending fsync requests for the doomed - * relation, else the next mdsync() will fail. There can't be any such + * relation, else the next smgrsync() will fail. There can't be any such * requests for a temp relation, though. We can send just one request * even when deleting multiple forks, since the fsync queuing code accepts * the "InvalidForkNumber = all forks" convention. @@ -448,7 +357,7 @@ mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo) errmsg("could not truncate file \"%s\": %m", path))); /* Register request to unlink first segment later */ - register_unlink(rnode); + UnlinkAfterCheckpoint(rnode); } /* @@ -555,7 +464,16 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } if (!skipFsync && !SmgrIsTemp(reln)) - register_dirty_segment(reln, forknum, v); + { + SmgrFileTag tag; + + tag.node = reln->smgr_rnode.node; + tag.forknum = forknum; + tag.segno = v->mdfd_segno; + v->mdfd_dirtied_cycle = FsyncAtCheckpoint(&tag, + v->mdfd_vfd, + v->mdfd_dirtied_cycle); + } Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); } @@ -615,6 +533,7 @@ mdopen(SMgrRelation reln, ForkNumber forknum, int behavior) mdfd = &reln->md_seg_fds[forknum][0]; mdfd->mdfd_vfd = fd; mdfd->mdfd_segno = 0; + mdfd->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1; Assert(_mdnblocks(reln, forknum, mdfd) <= ((BlockNumber) RELSEG_SIZE)); @@ -858,7 +777,16 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } if (!skipFsync && !SmgrIsTemp(reln)) - register_dirty_segment(reln, forknum, v); + { + SmgrFileTag tag; + + tag.node = reln->smgr_rnode.node; + tag.forknum = forknum; + tag.segno = v->mdfd_segno; + v->mdfd_dirtied_cycle = FsyncAtCheckpoint(&tag, + v->mdfd_vfd, + v->mdfd_dirtied_cycle); + } } /* @@ -1048,660 +976,38 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum) } /* - * mdsync() -- Sync previous writes to stable storage. - */ -void -mdsync(void) -{ - static bool mdsync_in_progress = false; - - HASH_SEQ_STATUS hstat; - PendingOperationEntry *entry; - int absorb_counter; - - /* Statistics on sync times */ - int processed = 0; - instr_time sync_start, - sync_end, - sync_diff; - uint64 elapsed; - uint64 longest = 0; - uint64 total_elapsed = 0; - - /* - * This is only called during checkpoints, and checkpoints should only - * occur in processes that have created a pendingOpsTable. - */ - if (!pendingOpsTable) - elog(ERROR, "cannot sync without a pendingOpsTable"); - - /* - * If we are in the checkpointer, the sync had better include all fsync - * requests that were queued by backends up to this point. The tightest - * race condition that could occur is that a buffer that must be written - * and fsync'd for the checkpoint could have been dumped by a backend just - * before it was visited by BufferSync(). We know the backend will have - * queued an fsync request before clearing the buffer's dirtybit, so we - * are safe as long as we do an Absorb after completing BufferSync(). - */ - AbsorbFsyncRequests(); - - /* - * To avoid excess fsync'ing (in the worst case, maybe a never-terminating - * checkpoint), we want to ignore fsync requests that are entered into the - * hashtable after this point --- they should be processed next time, - * instead. We use mdsync_cycle_ctr to tell old entries apart from new - * ones: new ones will have cycle_ctr equal to the incremented value of - * mdsync_cycle_ctr. - * - * In normal circumstances, all entries present in the table at this point - * will have cycle_ctr exactly equal to the current (about to be old) - * value of mdsync_cycle_ctr. However, if we fail partway through the - * fsync'ing loop, then older values of cycle_ctr might remain when we - * come back here to try again. Repeated checkpoint failures would - * eventually wrap the counter around to the point where an old entry - * might appear new, causing us to skip it, possibly allowing a checkpoint - * to succeed that should not have. To forestall wraparound, any time the - * previous mdsync() failed to complete, run through the table and - * forcibly set cycle_ctr = mdsync_cycle_ctr. - * - * Think not to merge this loop with the main loop, as the problem is - * exactly that that loop may fail before having visited all the entries. - * From a performance point of view it doesn't matter anyway, as this path - * will never be taken in a system that's functioning normally. - */ - if (mdsync_in_progress) - { - /* prior try failed, so update any stale cycle_ctr values */ - hash_seq_init(&hstat, pendingOpsTable); - while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL) - { - entry->cycle_ctr = mdsync_cycle_ctr; - } - } - - /* Advance counter so that new hashtable entries are distinguishable */ - mdsync_cycle_ctr++; - - /* Set flag to detect failure if we don't reach the end of the loop */ - mdsync_in_progress = true; - - /* Now scan the hashtable for fsync requests to process */ - absorb_counter = FSYNCS_PER_ABSORB; - hash_seq_init(&hstat, pendingOpsTable); - while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL) - { - ForkNumber forknum; - - /* - * If the entry is new then don't process it this time; it might - * contain multiple fsync-request bits, but they are all new. Note - * "continue" bypasses the hash-remove call at the bottom of the loop. - */ - if (entry->cycle_ctr == mdsync_cycle_ctr) - continue; - - /* Else assert we haven't missed it */ - Assert((CycleCtr) (entry->cycle_ctr + 1) == mdsync_cycle_ctr); - - /* - * Scan over the forks and segments represented by the entry. - * - * The bitmap manipulations are slightly tricky, because we can call - * AbsorbFsyncRequests() inside the loop and that could result in - * bms_add_member() modifying and even re-palloc'ing the bitmapsets. - * This is okay because we unlink each bitmapset from the hashtable - * entry before scanning it. That means that any incoming fsync - * requests will be processed now if they reach the table before we - * begin to scan their fork. - */ - for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) - { - Bitmapset *requests = entry->requests[forknum]; - int segno; - - entry->requests[forknum] = NULL; - entry->canceled[forknum] = false; - - while ((segno = bms_first_member(requests)) >= 0) - { - int failures; - - /* - * If fsync is off then we don't have to bother opening the - * file at all. (We delay checking until this point so that - * changing fsync on the fly behaves sensibly.) - */ - if (!enableFsync) - continue; - - /* - * If in checkpointer, we want to absorb pending requests - * every so often to prevent overflow of the fsync request - * queue. It is unspecified whether newly-added entries will - * be visited by hash_seq_search, but we don't care since we - * don't need to process them anyway. - */ - if (--absorb_counter <= 0) - { - AbsorbFsyncRequests(); - absorb_counter = FSYNCS_PER_ABSORB; - } - - /* - * The fsync table could contain requests to fsync segments - * that have been deleted (unlinked) by the time we get to - * them. Rather than just hoping an ENOENT (or EACCES on - * Windows) error can be ignored, what we do on error is - * absorb pending requests and then retry. Since mdunlink() - * queues a "cancel" message before actually unlinking, the - * fsync request is guaranteed to be marked canceled after the - * absorb if it really was this case. DROP DATABASE likewise - * has to tell us to forget fsync requests before it starts - * deletions. - */ - for (failures = 0;; failures++) /* loop exits at "break" */ - { - SMgrRelation reln; - MdfdVec *seg; - char *path; - int save_errno; - - /* - * Find or create an smgr hash entry for this relation. - * This may seem a bit unclean -- md calling smgr? But - * it's really the best solution. It ensures that the - * open file reference isn't permanently leaked if we get - * an error here. (You may say "but an unreferenced - * SMgrRelation is still a leak!" Not really, because the - * only case in which a checkpoint is done by a process - * that isn't about to shut down is in the checkpointer, - * and it will periodically do smgrcloseall(). This fact - * justifies our not closing the reln in the success path - * either, which is a good thing since in non-checkpointer - * cases we couldn't safely do that.) - */ - reln = smgropen(entry->rnode, InvalidBackendId); - - /* Attempt to open and fsync the target segment */ - seg = _mdfd_getseg(reln, forknum, - (BlockNumber) segno * (BlockNumber) RELSEG_SIZE, - false, - EXTENSION_RETURN_NULL - | EXTENSION_DONT_CHECK_SIZE); - - INSTR_TIME_SET_CURRENT(sync_start); - - if (seg != NULL && - FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) >= 0) - { - /* Success; update statistics about sync timing */ - INSTR_TIME_SET_CURRENT(sync_end); - sync_diff = sync_end; - INSTR_TIME_SUBTRACT(sync_diff, sync_start); - elapsed = INSTR_TIME_GET_MICROSEC(sync_diff); - if (elapsed > longest) - longest = elapsed; - total_elapsed += elapsed; - processed++; - if (log_checkpoints) - elog(DEBUG1, "checkpoint sync: number=%d file=%s time=%.3f msec", - processed, - FilePathName(seg->mdfd_vfd), - (double) elapsed / 1000); - - break; /* out of retry loop */ - } - - /* Compute file name for use in message */ - save_errno = errno; - path = _mdfd_segpath(reln, forknum, (BlockNumber) segno); - errno = save_errno; - - /* - * It is possible that the relation has been dropped or - * truncated since the fsync request was entered. - * Therefore, allow ENOENT, but only if we didn't fail - * already on this file. This applies both for - * _mdfd_getseg() and for FileSync, since fd.c might have - * closed the file behind our back. - * - * XXX is there any point in allowing more than one retry? - * Don't see one at the moment, but easy to change the - * test here if so. - */ - if (!FILE_POSSIBLY_DELETED(errno) || - failures > 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", - path))); - else - ereport(DEBUG1, - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\" but retrying: %m", - path))); - pfree(path); - - /* - * Absorb incoming requests and check to see if a cancel - * arrived for this relation fork. - */ - AbsorbFsyncRequests(); - absorb_counter = FSYNCS_PER_ABSORB; /* might as well... */ - - if (entry->canceled[forknum]) - break; - } /* end retry loop */ - } - bms_free(requests); - } - - /* - * We've finished everything that was requested before we started to - * scan the entry. If no new requests have been inserted meanwhile, - * remove the entry. Otherwise, update its cycle counter, as all the - * requests now in it must have arrived during this cycle. - */ - for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) - { - if (entry->requests[forknum] != NULL) - break; - } - if (forknum <= MAX_FORKNUM) - entry->cycle_ctr = mdsync_cycle_ctr; - else - { - /* Okay to remove it */ - if (hash_search(pendingOpsTable, &entry->rnode, - HASH_REMOVE, NULL) == NULL) - elog(ERROR, "pendingOpsTable corrupted"); - } - } /* end loop over hashtable entries */ - - /* Return sync performance metrics for report at checkpoint end */ - CheckpointStats.ckpt_sync_rels = processed; - CheckpointStats.ckpt_longest_sync = longest; - CheckpointStats.ckpt_agg_sync_time = total_elapsed; - - /* Flag successful completion of mdsync */ - mdsync_in_progress = false; -} - -/* - * mdpreckpt() -- Do pre-checkpoint work - * - * To distinguish unlink requests that arrived before this checkpoint - * started from those that arrived during the checkpoint, we use a cycle - * counter similar to the one we use for fsync requests. That cycle - * counter is incremented here. - * - * This must be called *before* the checkpoint REDO point is determined. - * That ensures that we won't delete files too soon. - * - * Note that we can't do anything here that depends on the assumption - * that the checkpoint will be completed. - */ -void -mdpreckpt(void) -{ - /* - * Any unlink requests arriving after this point will be assigned the next - * cycle counter, and won't be unlinked until next checkpoint. - */ - mdckpt_cycle_ctr++; -} - -/* - * mdpostckpt() -- Do post-checkpoint work - * - * Remove any lingering files that can now be safely removed. + * Return the filename for the specified segment of the relation. The + * returned string is palloc'd. */ void -mdpostckpt(void) +mdpath(const SmgrFileTag *tag, char *out) { - int absorb_counter; - - absorb_counter = UNLINKS_PER_ABSORB; - while (pendingUnlinks != NIL) - { - PendingUnlinkEntry *entry = (PendingUnlinkEntry *) linitial(pendingUnlinks); - char *path; - - /* - * New entries are appended to the end, so if the entry is new we've - * reached the end of old entries. - * - * Note: if just the right number of consecutive checkpoints fail, we - * could be fooled here by cycle_ctr wraparound. However, the only - * consequence is that we'd delay unlinking for one more checkpoint, - * which is perfectly tolerable. - */ - if (entry->cycle_ctr == mdckpt_cycle_ctr) - break; + char *path; - /* Unlink the file */ - path = relpathperm(entry->rnode, MAIN_FORKNUM); - if (unlink(path) < 0) - { - /* - * There's a race condition, when the database is dropped at the - * same time that we process the pending unlink requests. If the - * DROP DATABASE deletes the file before we do, we will get ENOENT - * here. rmtree() also has to ignore ENOENT errors, to deal with - * the possibility that we delete the file first. - */ - if (errno != ENOENT) - ereport(WARNING, - (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", path))); - } - pfree(path); + path = relpathperm(tag->node, tag->forknum); - /* And remove the list entry */ - pendingUnlinks = list_delete_first(pendingUnlinks); - pfree(entry); + if (tag->segno > 0) + snprintf(out, MAXPGPATH, "%s.%u", path, tag->segno); + else + snprintf(out, MAXPGPATH, "%s", path); - /* - * As in mdsync, we don't want to stop absorbing fsync requests for a - * long time when there are many deletions to be done. We can safely - * call AbsorbFsyncRequests() at this point in the loop (note it might - * try to delete list entries). - */ - if (--absorb_counter <= 0) - { - AbsorbFsyncRequests(); - absorb_counter = UNLINKS_PER_ABSORB; - } - } + pfree(path); } /* * register_dirty_segment() -- Mark a relation segment as needing fsync - * - * If there is a local pending-ops table, just make an entry in it for - * mdsync to process later. Otherwise, try to pass off the fsync request - * to the checkpointer process. If that fails, just do the fsync - * locally before returning (we hope this will not happen often enough - * to be a performance problem). */ static void register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) { - /* Temp relations should never be fsync'd */ - Assert(!SmgrIsTemp(reln)); - - if (pendingOpsTable) - { - /* push it into local pending-ops table */ - RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno); - } - else - { - if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno)) - return; /* passed it off successfully */ - - ereport(DEBUG1, - (errmsg("could not forward fsync request because request queue is full"))); - - if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", - FilePathName(seg->mdfd_vfd)))); - } -} - -/* - * register_unlink() -- Schedule a file to be deleted after next checkpoint - * - * We don't bother passing in the fork number, because this is only used - * with main forks. - * - * As with register_dirty_segment, this could involve either a local or - * a remote pending-ops table. - */ -static void -register_unlink(RelFileNodeBackend rnode) -{ - /* Should never be used with temp relations */ - Assert(!RelFileNodeBackendIsTemp(rnode)); - - if (pendingOpsTable) - { - /* push it into local pending-ops table */ - RememberFsyncRequest(rnode.node, MAIN_FORKNUM, - UNLINK_RELATION_REQUEST); - } - else - { - /* - * Notify the checkpointer about it. If we fail to queue the request - * message, we have to sleep and try again, because we can't simply - * delete the file now. Ugly, but hopefully won't happen often. - * - * XXX should we just leave the file orphaned instead? - */ - Assert(IsUnderPostmaster); - while (!ForwardFsyncRequest(rnode.node, MAIN_FORKNUM, - UNLINK_RELATION_REQUEST)) - pg_usleep(10000L); /* 10 msec seems a good number */ - } -} - -/* - * RememberFsyncRequest() -- callback from checkpointer side of fsync request - * - * We stuff fsync requests into the local hash table for execution - * during the checkpointer's next checkpoint. UNLINK requests go into a - * separate linked list, however, because they get processed separately. - * - * The range of possible segment numbers is way less than the range of - * BlockNumber, so we can reserve high values of segno for special purposes. - * We define three: - * - FORGET_RELATION_FSYNC means to cancel pending fsyncs for a relation, - * either for one fork, or all forks if forknum is InvalidForkNumber - * - FORGET_DATABASE_FSYNC means to cancel pending fsyncs for a whole database - * - UNLINK_RELATION_REQUEST is a request to delete the file after the next - * checkpoint. - * Note also that we're assuming real segment numbers don't exceed INT_MAX. - * - * (Handling FORGET_DATABASE_FSYNC requests is a tad slow because the hash - * table has to be searched linearly, but dropping a database is a pretty - * heavyweight operation anyhow, so we'll live with it.) - */ -void -RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) -{ - Assert(pendingOpsTable); - - if (segno == FORGET_RELATION_FSYNC) - { - /* Remove any pending requests for the relation (one or all forks) */ - PendingOperationEntry *entry; - - entry = (PendingOperationEntry *) hash_search(pendingOpsTable, - &rnode, - HASH_FIND, - NULL); - if (entry) - { - /* - * We can't just delete the entry since mdsync could have an - * active hashtable scan. Instead we delete the bitmapsets; this - * is safe because of the way mdsync is coded. We also set the - * "canceled" flags so that mdsync can tell that a cancel arrived - * for the fork(s). - */ - if (forknum == InvalidForkNumber) - { - /* remove requests for all forks */ - for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) - { - bms_free(entry->requests[forknum]); - entry->requests[forknum] = NULL; - entry->canceled[forknum] = true; - } - } - else - { - /* remove requests for single fork */ - bms_free(entry->requests[forknum]); - entry->requests[forknum] = NULL; - entry->canceled[forknum] = true; - } - } - } - else if (segno == FORGET_DATABASE_FSYNC) - { - /* Remove any pending requests for the entire database */ - HASH_SEQ_STATUS hstat; - PendingOperationEntry *entry; - ListCell *cell, - *prev, - *next; - - /* Remove fsync requests */ - hash_seq_init(&hstat, pendingOpsTable); - while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL) - { - if (entry->rnode.dbNode == rnode.dbNode) - { - /* remove requests for all forks */ - for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) - { - bms_free(entry->requests[forknum]); - entry->requests[forknum] = NULL; - entry->canceled[forknum] = true; - } - } - } - - /* Remove unlink requests */ - prev = NULL; - for (cell = list_head(pendingUnlinks); cell; cell = next) - { - PendingUnlinkEntry *entry = (PendingUnlinkEntry *) lfirst(cell); - - next = lnext(cell); - if (entry->rnode.dbNode == rnode.dbNode) - { - pendingUnlinks = list_delete_cell(pendingUnlinks, cell, prev); - pfree(entry); - } - else - prev = cell; - } - } - else if (segno == UNLINK_RELATION_REQUEST) - { - /* Unlink request: put it in the linked list */ - MemoryContext oldcxt = MemoryContextSwitchTo(pendingOpsCxt); - PendingUnlinkEntry *entry; - - /* PendingUnlinkEntry doesn't store forknum, since it's always MAIN */ - Assert(forknum == MAIN_FORKNUM); - - entry = palloc(sizeof(PendingUnlinkEntry)); - entry->rnode = rnode; - entry->cycle_ctr = mdckpt_cycle_ctr; - - pendingUnlinks = lappend(pendingUnlinks, entry); - - MemoryContextSwitchTo(oldcxt); - } - else - { - /* Normal case: enter a request to fsync this segment */ - MemoryContext oldcxt = MemoryContextSwitchTo(pendingOpsCxt); - PendingOperationEntry *entry; - bool found; - - entry = (PendingOperationEntry *) hash_search(pendingOpsTable, - &rnode, - HASH_ENTER, - &found); - /* if new entry, initialize it */ - if (!found) - { - entry->cycle_ctr = mdsync_cycle_ctr; - MemSet(entry->requests, 0, sizeof(entry->requests)); - MemSet(entry->canceled, 0, sizeof(entry->canceled)); - } - - /* - * NB: it's intentional that we don't change cycle_ctr if the entry - * already exists. The cycle_ctr must represent the oldest fsync - * request that could be in the entry. - */ - - entry->requests[forknum] = bms_add_member(entry->requests[forknum], - (int) segno); - - MemoryContextSwitchTo(oldcxt); - } -} - -/* - * ForgetRelationFsyncRequests -- forget any fsyncs for a relation fork - * - * forknum == InvalidForkNumber means all forks, although this code doesn't - * actually know that, since it's just forwarding the request elsewhere. - */ -void -ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum) -{ - if (pendingOpsTable) - { - /* standalone backend or startup process: fsync state is local */ - RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC); - } - else if (IsUnderPostmaster) - { - /* - * Notify the checkpointer about it. If we fail to queue the cancel - * message, we have to sleep and try again ... ugly, but hopefully - * won't happen often. - * - * XXX should we CHECK_FOR_INTERRUPTS in this loop? Escaping with an - * error would leave the no-longer-used file still present on disk, - * which would be bad, so I'm inclined to assume that the checkpointer - * will always empty the queue soon. - */ - while (!ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC)) - pg_usleep(10000L); /* 10 msec seems a good number */ - - /* - * Note we don't wait for the checkpointer to actually absorb the - * cancel message; see mdsync() for the implications. - */ - } -} - -/* - * ForgetDatabaseFsyncRequests -- forget any fsyncs and unlinks for a DB - */ -void -ForgetDatabaseFsyncRequests(Oid dbid) -{ - RelFileNode rnode; - - rnode.dbNode = dbid; - rnode.spcNode = 0; - rnode.relNode = 0; - - if (pendingOpsTable) - { - /* standalone backend or startup process: fsync state is local */ - RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC); - } - else if (IsUnderPostmaster) - { - /* see notes in ForgetRelationFsyncRequests */ - while (!ForwardFsyncRequest(rnode, InvalidForkNumber, - FORGET_DATABASE_FSYNC)) - pg_usleep(10000L); /* 10 msec seems a good number */ - } + SmgrFileTag tag; + + tag.node = reln->smgr_rnode.node; + tag.forknum = forknum; + tag.segno = seg->mdfd_segno; + seg->mdfd_dirtied_cycle = FsyncAtCheckpoint(&tag, + seg->mdfd_vfd, + seg->mdfd_dirtied_cycle); } /* @@ -1831,6 +1137,7 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno, v = &reln->md_seg_fds[forknum][segno]; v->mdfd_vfd = fd; v->mdfd_segno = segno; + v->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1; Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 189342ef86a..c36ba4298b7 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -21,6 +21,7 @@ #include "storage/bufmgr.h" #include "storage/ipc.h" #include "storage/smgr.h" +#include "storage/smgrsync.h" #include "utils/hsearch.h" #include "utils/inval.h" @@ -59,9 +60,7 @@ typedef struct f_smgr void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum); - void (*smgr_pre_ckpt) (void); /* may be NULL */ - void (*smgr_sync) (void); /* may be NULL */ - void (*smgr_post_ckpt) (void); /* may be NULL */ + void (*smgr_path) (const SmgrFileTag *tag, char *out); } f_smgr; @@ -82,9 +81,7 @@ static const f_smgr smgrsw[] = { .smgr_nblocks = mdnblocks, .smgr_truncate = mdtruncate, .smgr_immedsync = mdimmedsync, - .smgr_pre_ckpt = mdpreckpt, - .smgr_sync = mdsync, - .smgr_post_ckpt = mdpostckpt + .smgr_path = mdpath } }; @@ -104,6 +101,15 @@ static void smgrshutdown(int code, Datum arg); static void add_to_unowned_list(SMgrRelation reln); static void remove_from_unowned_list(SMgrRelation reln); +/* + * For now there is only one implementation. If more are added, we'll need to + * be able to dispatch based on a file tag. + */ +static inline int +which_for_file_tag(const SmgrFileTag *tag) +{ + return 0; +} /* * smgrinit(), smgrshutdown() -- Initialize or shut down storage @@ -118,6 +124,8 @@ smgrinit(void) { int i; + smgrsync_init(); + for (i = 0; i < NSmgr; i++) { if (smgrsw[i].smgr_init) @@ -751,50 +759,13 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum) smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum); } - -/* - * smgrpreckpt() -- Prepare for checkpoint. - */ -void -smgrpreckpt(void) -{ - int i; - - for (i = 0; i < NSmgr; i++) - { - if (smgrsw[i].smgr_pre_ckpt) - smgrsw[i].smgr_pre_ckpt(); - } -} - /* - * smgrsync() -- Sync files to disk during checkpoint. + * smgrpath() -- Expand a tag to a path. */ void -smgrsync(void) +smgrpath(const SmgrFileTag *tag, char *out) { - int i; - - for (i = 0; i < NSmgr; i++) - { - if (smgrsw[i].smgr_sync) - smgrsw[i].smgr_sync(); - } -} - -/* - * smgrpostckpt() -- Post-checkpoint cleanup. - */ -void -smgrpostckpt(void) -{ - int i; - - for (i = 0; i < NSmgr; i++) - { - if (smgrsw[i].smgr_post_ckpt) - smgrsw[i].smgr_post_ckpt(); - } + smgrsw[which_for_file_tag(tag)].smgr_path(tag, out); } /* diff --git a/src/backend/storage/smgr/smgrsync.c b/src/backend/storage/smgr/smgrsync.c new file mode 100644 index 00000000000..f4aad18054d --- /dev/null +++ b/src/backend/storage/smgr/smgrsync.c @@ -0,0 +1,803 @@ +/*------------------------------------------------------------------------- + * + * smgrsync.c + * management of file synchronization. + * + * This modules tracks which files need to be fsynced or unlinked at the + * next checkpoint, and performs those actions. Normally the work is done + * when called by the checkpointer, but it is also done in standalone mode + * and startup. + * + * Originally this logic was inside md.c, but it is now made more general, + * for reuse by other SMGR implementations that work with files. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/storage/smgr/smgrsync.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "access/xlog.h" +#include "miscadmin.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "portability/instr_time.h" +#include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" +#include "storage/relfilenode.h" +#include "storage/smgrsync.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +/* + * Special values for the segno member of SmgrFileTag. + * + * Note that CompactCheckpointerRequestQueue assumes that it's OK to remove an + * fsync request from the queue if an identical, subsequent request is found. + * See comments there before making changes here. + */ +#define FORGET_RELATION_FSYNC (InvalidBlockNumber) +#define FORGET_DATABASE_FSYNC (InvalidBlockNumber-1) +#define UNLINK_RELATION_REQUEST (InvalidBlockNumber-2) + +/* intervals for calling AbsorbFsyncRequests in smgrsync and smgrpostckpt */ +#define FSYNCS_PER_ABSORB 10 +#define UNLINKS_PER_ABSORB 10 + +/* + * An entry in the hash table of files that need to be flushed for the next + * checkpoint. + */ +typedef struct PendingFsyncEntry +{ + SmgrFileTag tag; + File file; + uint64 cycle_ctr; +} PendingFsyncEntry; + +typedef struct PendingUnlinkEntry +{ + RelFileNode rnode; /* the dead relation to delete */ + uint64 cycle_ctr; /* ckpt_cycle_ctr when request was made */ +} PendingUnlinkEntry; + +static uint32 open_fsync_queue_files = 0; +static bool sync_in_progress = false; +static uint64 ckpt_cycle_ctr = 0; + +static HTAB *pendingFsyncTable = NULL; +static List *pendingUnlinks = NIL; +static MemoryContext pendingOpsCxt; /* context for the above */ + +static void syncpass(bool include_current); + +/* + * Initialize the pending operations state, if necessary. + */ +void +smgrsync_init(void) +{ + /* + * Create pending-operations hashtable if we need it. Currently, we need + * it if we are standalone (not under a postmaster) or if we are a startup + * or checkpointer auxiliary process. + */ + if (!IsUnderPostmaster || AmStartupProcess() || AmCheckpointerProcess()) + { + HASHCTL hash_ctl; + + /* + * XXX: The checkpointer needs to add entries to the pending ops table + * when absorbing fsync requests. That is done within a critical + * section, which isn't usually allowed, but we make an exception. It + * means that there's a theoretical possibility that you run out of + * memory while absorbing fsync requests, which leads to a PANIC. + * Fortunately the hash table is small so that's unlikely to happen in + * practice. + */ + pendingOpsCxt = AllocSetContextCreate(TopMemoryContext, + "Pending ops context", + ALLOCSET_DEFAULT_SIZES); + MemoryContextAllowInCriticalSection(pendingOpsCxt, true); + + MemSet(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = sizeof(SmgrFileTag); + hash_ctl.entrysize = sizeof(PendingFsyncEntry); + hash_ctl.hcxt = pendingOpsCxt; + pendingFsyncTable = hash_create("Pending Ops Table", + 100L, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + pendingUnlinks = NIL; + } +} + +/* + * Do pre-checkpoint work. + * + * To distinguish unlink requests that arrived before this checkpoint + * started from those that arrived during the checkpoint, we use a cycle + * counter similar to the one we use for fsync requests. That cycle + * counter is incremented here. + * + * This must be called *before* the checkpoint REDO point is determined. + * That ensures that we won't delete files too soon. + * + * Note that we can't do anything here that depends on the assumption + * that the checkpoint will be completed. + */ +void +smgrpreckpt(void) +{ + /* + * Any unlink requests arriving after this point will be assigned the next + * cycle counter, and won't be unlinked until next checkpoint. + */ + ckpt_cycle_ctr++; +} + +/* + * Sync previous writes to stable storage. + */ +void +smgrsync(void) +{ + /* + * This is only called during checkpoints, and checkpoints should only + * occur in processes that have created a pendingFsyncTable. + */ + if (!pendingFsyncTable) + elog(ERROR, "cannot sync without a pendingFsyncTable"); + + /* + * If we are in the checkpointer, the sync had better include all fsync + * requests that were queued by backends up to this point. The tightest + * race condition that could occur is that a buffer that must be written + * and fsync'd for the checkpoint could have been dumped by a backend just + * before it was visited by BufferSync(). We know the backend will have + * queued an fsync request before clearing the buffer's dirtybit, so we + * are safe as long as we do an Absorb after completing BufferSync(). + */ + AbsorbAllFsyncRequests(); + + syncpass(false); +} + +/* + * Do one pass over the the fsync request hashtable and perform the necessary + * fsyncs. Increments the sync cycle counter. + * + * If include_current is true perform all fsyncs (this is done if too many + * files are open), otherwise only perform the fsyncs belonging to the cycle + * valid at call time. + */ +static void +syncpass(bool include_current) +{ + HASH_SEQ_STATUS hstat; + PendingFsyncEntry *entry; + int absorb_counter; + + /* Statistics on sync times */ + instr_time sync_start, + sync_end, + sync_diff; + uint64 elapsed; + int processed = CheckpointStats.ckpt_sync_rels; + uint64 longest = CheckpointStats.ckpt_longest_sync; + uint64 total_elapsed = CheckpointStats.ckpt_agg_sync_time; + + /* + * To avoid excess fsync'ing (in the worst case, maybe a never-terminating + * checkpoint), we want to ignore fsync requests that are entered into the + * hashtable after this point --- they should be processed next time, + * instead. We use GetCheckpointSyncCycle() to tell old entries apart + * from new ones: new ones will have cycle_ctr equal to + * IncCheckpointSyncCycle(). + * + * In normal circumstances, all entries present in the table at this point + * will have cycle_ctr exactly equal to the current (about to be old) + * value of sync_cycle_ctr. However, if we fail partway through the + * fsync'ing loop, then older values of cycle_ctr might remain when we + * come back here to try again. Repeated checkpoint failures would + * eventually wrap the counter around to the point where an old entry + * might appear new, causing us to skip it, possibly allowing a checkpoint + * to succeed that should not have. To forestall wraparound, any time the + * previous smgrsync() failed to complete, run through the table and + * forcibly set cycle_ctr = sync_cycle_ctr. + * + * Think not to merge this loop with the main loop, as the problem is + * exactly that that loop may fail before having visited all the entries. + * From a performance point of view it doesn't matter anyway, as this path + * will never be taken in a system that's functioning normally. + */ + if (sync_in_progress) + { + /* prior try failed, so update any stale cycle_ctr values */ + hash_seq_init(&hstat, pendingFsyncTable); + while ((entry = (PendingFsyncEntry *) hash_seq_search(&hstat)) != NULL) + entry->cycle_ctr = GetCheckpointSyncCycle(); + } + + /* Set flag to detect failure if we don't reach the end of the loop */ + sync_in_progress = true; + + /* Advance counter so that new hashtable entries are distinguishable */ + IncCheckpointSyncCycle(); + + /* Now scan the hashtable for fsync requests to process */ + absorb_counter = FSYNCS_PER_ABSORB; + hash_seq_init(&hstat, pendingFsyncTable); + while ((entry = (PendingFsyncEntry *) hash_seq_search(&hstat))) + { + /* + * If processing fsync requests because of too may file handles, close + * regardless of cycle. Otherwise nothing to be closed might be found, + * and we want to make room as quickly as possible so more requests + * can be absorbed. + */ + if (!include_current) + { + /* If the entry is new then don't process it this time. */ + if (entry->cycle_ctr == GetCheckpointSyncCycle()) + continue; + + /* Else assert we haven't missed it */ + Assert((entry->cycle_ctr + 1) == GetCheckpointSyncCycle()); + } + + /* + * If fsync is off then we don't have to bother opening the file at + * all. (We delay checking until this point so that changing fsync on + * the fly behaves sensibly.) + * + * XXX: Why is that an important goal? Doesn't give any interesting + * guarantees afaict? + */ + if (enableFsync) + { + File file; + + /* + * The fsync table could contain requests to fsync segments that + * have been deleted (unlinked) by the time we get to them. That + * used to be problematic, but now we have a filehandle to the + * deleted file. That means we might fsync an empty file + * superfluously, in a relatively tight window, which is + * acceptable. + */ + INSTR_TIME_SET_CURRENT(sync_start); + + if (entry->file == -1) + { + /* + * If we aren't transferring file descriptors directly to the + * checkpointer on this platform, we'll have to convert the + * tag to the path and open it (and close it again below). + */ + char path[MAXPGPATH]; + + smgrpath(&entry->tag, path); + file = PathNameOpenFile(path, O_RDWR | PG_BINARY); + if (file < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" to fsync: %m", + path))); + } + else + { + /* + * Otherwise, we have kept the file descriptor from the oldest + * request for the same tag. + */ + file = entry->file; + } + + if (FileSync(file, WAIT_EVENT_DATA_FILE_SYNC) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(file)))); + + /* Success; update statistics about sync timing */ + INSTR_TIME_SET_CURRENT(sync_end); + sync_diff = sync_end; + INSTR_TIME_SUBTRACT(sync_diff, sync_start); + elapsed = INSTR_TIME_GET_MICROSEC(sync_diff); + if (elapsed > longest) + longest = elapsed; + total_elapsed += elapsed; + processed++; + + if (log_checkpoints) + ereport(DEBUG1, + (errmsg("checkpoint sync: number=%d file=%s time=%.3f msec", + processed, + FilePathName(file), + (double) elapsed / 1000), + errhidestmt(true), + errhidecontext(true))); + + if (entry->file == -1) + FileClose(file); + } + + if (entry->file >= 0) + { + /* + * Close file. XXX: centralize code. + */ + Assert(open_fsync_queue_files > 0); + open_fsync_queue_files--; + FileClose(entry->file); + entry->file = -1; + } + + /* Remove the entry. */ + if (hash_search(pendingFsyncTable, &entry->tag, HASH_REMOVE, NULL) == NULL) + elog(ERROR, "pendingFsyncTable corrupted"); + + /* + * If in checkpointer, we want to absorb pending requests every so + * often to prevent overflow of the fsync request queue. It is + * unspecified whether newly-added entries will be visited by + * hash_seq_search, but we don't care since we don't need to + * process them anyway. + */ + if (absorb_counter-- <= 0) + { + /* + * Don't absorb if too many files are open. This pass will + * soon close some, so check again later. + */ + if (open_fsync_queue_files < ((max_safe_fds * 7) / 10)) + AbsorbFsyncRequests(); + absorb_counter = FSYNCS_PER_ABSORB; + } + } /* end loop over hashtable entries */ + + /* Flag successful completion of syncpass */ + sync_in_progress = false; + + /* Maintain sync performance metrics for report at checkpoint end */ + CheckpointStats.ckpt_sync_rels = processed; + CheckpointStats.ckpt_longest_sync = longest; + CheckpointStats.ckpt_agg_sync_time = total_elapsed; +} + +/* + * Do post-checkpoint work. + * + * Remove any lingering files that can now be safely removed. + */ +void +smgrpostckpt(void) +{ + int absorb_counter; + + absorb_counter = UNLINKS_PER_ABSORB; + while (pendingUnlinks != NIL) + { + PendingUnlinkEntry *entry = (PendingUnlinkEntry *) linitial(pendingUnlinks); + char *path; + + /* + * New entries are appended to the end, so if the entry is new we've + * reached the end of old entries. + * + * Note: if just the right number of consecutive checkpoints fail, we + * could be fooled here by cycle_ctr wraparound. However, the only + * consequence is that we'd delay unlinking for one more checkpoint, + * which is perfectly tolerable. + */ + if (entry->cycle_ctr == ckpt_cycle_ctr) + break; + + /* Unlink the file */ + path = relpathperm(entry->rnode, MAIN_FORKNUM); + if (unlink(path) < 0) + { + /* + * There's a race condition, when the database is dropped at the + * same time that we process the pending unlink requests. If the + * DROP DATABASE deletes the file before we do, we will get ENOENT + * here. rmtree() also has to ignore ENOENT errors, to deal with + * the possibility that we delete the file first. + */ + if (errno != ENOENT) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", path))); + } + pfree(path); + + /* And remove the list entry */ + pendingUnlinks = list_delete_first(pendingUnlinks); + pfree(entry); + + /* + * As in smgrsync, we don't want to stop absorbing fsync requests for a + * long time when there are many deletions to be done. We can safely + * call AbsorbFsyncRequests() at this point in the loop (note it might + * try to delete list entries). + */ + if (--absorb_counter <= 0) + { + /* XXX: Centralize this condition */ + if (open_fsync_queue_files < ((max_safe_fds * 7) / 10)) + AbsorbFsyncRequests(); + absorb_counter = UNLINKS_PER_ABSORB; + } + } +} + + +/* + * FsyncAtCheckpoint() -- Mark a relation segment as needing fsync + * + * If there is a local pending-ops table, just make an entry in it for + * smgrsync to process later. Otherwise, try to pass off the fsync request + * to the checkpointer process. + */ +uint64 +FsyncAtCheckpoint(const SmgrFileTag *tag, File file, uint64 last_cycle) +{ + uint64 cycle; + + pg_memory_barrier(); + cycle = GetCheckpointSyncCycle(); + + /* + * For historical reasons checkpointer keeps track of the number of time + * backends perform writes themselves. + */ + if (!AmBackgroundWriterProcess()) + CountBackendWrite(); + + /* Don't repeatedly register the same segment as dirty. */ + if (last_cycle == cycle) + return cycle; + + if (pendingFsyncTable) + { + int fd; + + /* + * Push it into local pending-ops table. + * + * Gotta duplicate the fd - we can't have fd.c close it behind our + * back, as that'd lead to losing error reporting guarantees on + * Linux. RememberFsyncRequest() will manage the lifetime. + */ + ReleaseLruFiles(); + fd = dup(FileGetRawDesc(file)); + if (fd < 0) + elog(ERROR, "couldn't dup: %m"); + RememberFsyncRequest(tag, fd, FileGetOpenSeq(file)); + } + else + ForwardFsyncRequest(tag, file); + + return cycle; +} + +/* + * Schedule a file to be deleted after next checkpoint. + * + * As with FsyncAtCheckpoint, this could involve either a local or a remote + * pending-ops table. + */ +void +UnlinkAfterCheckpoint(RelFileNodeBackend rnode) +{ + SmgrFileTag tag; + + tag.node = rnode.node; + tag.forknum = MAIN_FORKNUM; + tag.segno = UNLINK_RELATION_REQUEST; + + /* Should never be used with temp relations */ + Assert(!RelFileNodeBackendIsTemp(rnode)); + + if (pendingFsyncTable) + { + /* push it into local pending-ops table */ + RememberFsyncRequest(&tag, -1, 0); + } + else + { + /* Notify the checkpointer about it. */ + Assert(IsUnderPostmaster); + ForwardFsyncRequest(&tag, -1); + } +} + +/* + * In archive recovery, we rely on checkpointer to do fsyncs, but we will have + * already created the pendingFsyncTable during initialization of the startup + * process. Calling this function drops the local pendingFsyncTable so that + * subsequent requests will be forwarded to checkpointer. + */ +void +SetForwardFsyncRequests(void) +{ + /* Perform any pending fsyncs we may have queued up, then drop table */ + if (pendingFsyncTable) + { + smgrsync(); + hash_destroy(pendingFsyncTable); + } + pendingFsyncTable = NULL; + + /* + * We should not have any pending unlink requests, since mdunlink doesn't + * queue unlink requests when isRedo. + */ + Assert(pendingUnlinks == NIL); +} + + +/* + * RememberFsyncRequest() -- callback from checkpointer side of fsync request + * + * We stuff fsync requests into the local hash table for execution + * during the checkpointer's next checkpoint. UNLINK requests go into a + * separate linked list, however, because they get processed separately. + * + * The range of possible segment numbers is way less than the range of + * BlockNumber, so we can reserve high values of segno for special purposes. + * We define three: + * - FORGET_RELATION_FSYNC means to cancel pending fsyncs for a relation, + * either for one fork, or all forks if forknum is InvalidForkNumber + * - FORGET_DATABASE_FSYNC means to cancel pending fsyncs for a whole database + * - UNLINK_RELATION_REQUEST is a request to delete the file after the next + * checkpoint. + * Note also that we're assuming real segment numbers don't exceed INT_MAX. + * + * (Handling FORGET_DATABASE_FSYNC requests is a tad slow because the hash + * table has to be searched linearly, but dropping a database is a pretty + * heavyweight operation anyhow, so we'll live with it.) + */ +void +RememberFsyncRequest(const SmgrFileTag *tag, int fd, uint64 open_seq) +{ + Assert(pendingFsyncTable); + + if (tag->segno == FORGET_RELATION_FSYNC || + tag->segno == FORGET_DATABASE_FSYNC) + { + HASH_SEQ_STATUS hstat; + PendingFsyncEntry *entry; + + /* Remove fsync requests */ + hash_seq_init(&hstat, pendingFsyncTable); + while ((entry = (PendingFsyncEntry *) hash_seq_search(&hstat)) != NULL) + { + if ((tag->segno == FORGET_RELATION_FSYNC && + tag->node.dbNode == entry->tag.node.dbNode && + tag->node.relNode == entry->tag.node.relNode && + (tag->forknum == InvalidForkNumber || + tag->forknum == entry->tag.forknum)) || + (tag->segno == FORGET_DATABASE_FSYNC && + tag->node.dbNode == entry->tag.node.dbNode)) + { + if (entry->file != -1) + { + Assert(open_fsync_queue_files > 0); + open_fsync_queue_files--; + FileClose(entry->file); + } + hash_search(pendingFsyncTable, entry, HASH_REMOVE, NULL); + } + } + + /* Remove unlink requests */ + if (tag->segno == FORGET_DATABASE_FSYNC) + { + ListCell *cell, + *next, + *prev; + + prev = NULL; + for (cell = list_head(pendingUnlinks); cell; cell = next) + { + PendingUnlinkEntry *entry = (PendingUnlinkEntry *) lfirst(cell); + + next = lnext(cell); + if (tag->node.dbNode == entry->rnode.dbNode) + { + pendingUnlinks = list_delete_cell(pendingUnlinks, cell, + prev); + pfree(entry); + } + else + prev = cell; + } + } + } + else if (tag->segno == UNLINK_RELATION_REQUEST) + { + /* Unlink request: put it in the linked list */ + MemoryContext oldcxt = MemoryContextSwitchTo(pendingOpsCxt); + PendingUnlinkEntry *entry; + + /* PendingUnlinkEntry doesn't store forknum, since it's always MAIN */ + Assert(tag->forknum == MAIN_FORKNUM); + + entry = palloc(sizeof(PendingUnlinkEntry)); + entry->rnode = tag->node; + entry->cycle_ctr = ckpt_cycle_ctr; + + pendingUnlinks = lappend(pendingUnlinks, entry); + + MemoryContextSwitchTo(oldcxt); + } + else + { + /* Normal case: enter a request to fsync this segment */ + MemoryContext oldcxt = MemoryContextSwitchTo(pendingOpsCxt); + PendingFsyncEntry *entry; + bool found; + + entry = (PendingFsyncEntry *) hash_search(pendingFsyncTable, + tag, + HASH_ENTER, + &found); + /* if new entry, initialize it */ + if (!found) + { + entry->file = -1; + entry->cycle_ctr = GetCheckpointSyncCycle(); + } + + /* + * NB: it's intentional that we don't change cycle_ctr if the entry + * already exists. The cycle_ctr must represent the oldest fsync + * request that could be in the entry. + */ + + if (fd >= 0) + { + File existing_file; + File new_file; + + /* + * If we didn't have a file already, or we did have a file but it + * was opened later than this one, we'll keep the newly arrived + * one. + */ + existing_file = entry->file; + if (existing_file == -1 || + FileGetOpenSeq(existing_file) > open_seq) + { + char path[MAXPGPATH]; + + smgrpath(tag, path); + + new_file = FileOpenForFd(fd, path, open_seq); + if (new_file < 0) + elog(ERROR, "cannot open file"); + /* caller must have reserved entry */ + entry->file = new_file; + + if (existing_file != -1) + FileClose(existing_file); + else + open_fsync_queue_files++; + } + else + { + /* + * File is already open. Have to keep the older fd, errors + * might only be reported to it, thus close the one we just + * got. + * + * XXX: check for errors. + */ + close(fd); + } + + FlushFsyncRequestQueueIfNecessary(); + } + + MemoryContextSwitchTo(oldcxt); + } +} + +/* + * Flush the fsync request queue enough to make sure there's room for at least + * one more entry. + */ +bool +FlushFsyncRequestQueueIfNecessary(void) +{ + if (sync_in_progress) + return false; + + while (true) + { + if (open_fsync_queue_files >= ((max_safe_fds * 7) / 10)) + { + elog(DEBUG1, + "flush fsync request queue due to %u open files", + open_fsync_queue_files); + syncpass(true); + elog(DEBUG1, + "flushed fsync request, now at %u open files", + open_fsync_queue_files); + } + else + break; + } + + return true; +} + +/* + * ForgetRelationFsyncRequests -- forget any fsyncs for a relation fork + * + * forknum == InvalidForkNumber means all forks, although this code doesn't + * actually know that, since it's just forwarding the request elsewhere. + */ +void +ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum) +{ + SmgrFileTag tag; + + /* Create a special "forget relation" tag. */ + tag.node = rnode; + tag.forknum = forknum; + tag.segno = FORGET_RELATION_FSYNC; + + if (pendingFsyncTable) + { + /* standalone backend or startup process: fsync state is local */ + RememberFsyncRequest(&tag, -1, 0); + } + else if (IsUnderPostmaster) + { + /* Notify the checkpointer about it. */ + ForwardFsyncRequest(&tag, -1); + + /* + * Note we don't wait for the checkpointer to actually absorb the + * cancel message; see smgrsync() for the implications. + */ + } +} + +/* + * ForgetDatabaseFsyncRequests -- forget any fsyncs and unlinks for a DB + */ +void +ForgetDatabaseFsyncRequests(Oid dbid) +{ + SmgrFileTag tag; + + /* Create a special "forget database" tag. */ + tag.node.dbNode = dbid; + tag.node.spcNode = 0; + tag.node.relNode = 0; + tag.forknum = InvalidForkNumber; + tag.segno = FORGET_DATABASE_FSYNC; + + if (pendingFsyncTable) + { + /* standalone backend or startup process: fsync state is local */ + RememberFsyncRequest(&tag, -1, 0); + } + else if (IsUnderPostmaster) + { + /* see notes in ForgetRelationFsyncRequests */ + ForwardFsyncRequest(&tag, -1); + } +} diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index ede1621d3ea..019b48e1507 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -59,7 +59,7 @@ #include "commands/view.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" -#include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteRemove.h" #include "storage/fd.h" diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 2317e8be6be..9fdd39cb97f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -60,6 +60,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" +#include "postmaster/checkpointer.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h index 941c6aba7d1..137c748dfaf 100644 --- a/src/include/postmaster/bgwriter.h +++ b/src/include/postmaster/bgwriter.h @@ -1,10 +1,7 @@ /*------------------------------------------------------------------------- * * bgwriter.h - * Exports from postmaster/bgwriter.c and postmaster/checkpointer.c. - * - * The bgwriter process used to handle checkpointing duties too. Now - * there is a separate process, but we did not bother to split this header. + * Exports from postmaster/bgwriter.c. * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * @@ -15,29 +12,10 @@ #ifndef _BGWRITER_H #define _BGWRITER_H -#include "storage/block.h" -#include "storage/relfilenode.h" - - /* GUC options */ extern int BgWriterDelay; -extern int CheckPointTimeout; -extern int CheckPointWarning; -extern double CheckPointCompletionTarget; extern void BackgroundWriterMain(void) pg_attribute_noreturn(); -extern void CheckpointerMain(void) pg_attribute_noreturn(); - -extern void RequestCheckpoint(int flags); -extern void CheckpointWriteDelay(int flags, double progress); - -extern bool ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, - BlockNumber segno); -extern void AbsorbFsyncRequests(void); - -extern Size CheckpointerShmemSize(void); -extern void CheckpointerShmemInit(void); -extern bool FirstCallSinceLastCheckpoint(void); #endif /* _BGWRITER_H */ diff --git a/src/include/postmaster/checkpointer.h b/src/include/postmaster/checkpointer.h new file mode 100644 index 00000000000..252a94f2909 --- /dev/null +++ b/src/include/postmaster/checkpointer.h @@ -0,0 +1,71 @@ +/*------------------------------------------------------------------------- + * + * checkpointer.h + * Exports from postmaster/checkpointer.c. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * + * src/include/postmaster/checkpointer.h + * + *------------------------------------------------------------------------- + */ +#ifndef CHECKPOINTER_H +#define CHECKPOINTER_H + +#include "storage/smgr.h" +#include "storage/smgrsync.h" + +/* + * Control whether we transfer file descriptors to the checkpointer, to + * preserve error state on certain kernels. We don't yet have support for + * sending files on Windows (it's entirely possible but it's not clear whether + * it would actually be useful for anything on that platform). The macro is + * here just so that it can be commented out to test the non-fd-passing code + * path on Unix systems. + */ +#ifndef WIN32 +#define CHECKPOINTER_TRANSFER_FILES +#endif + +/* GUC options */ +extern int CheckPointTimeout; +extern int CheckPointWarning; +extern double CheckPointCompletionTarget; + +/* The type used for counting checkpoint cycles. */ +typedef uint32 CheckpointCycle; + +/* + * A tag identifying a file to be flushed by the checkpointer. This is + * convertible to the file's path, but it's convenient to have a small fixed + * sized object to use as a hash table key. + */ +typedef struct DirtyFileTag +{ + RelFileNode node; + ForkNumber forknum; + int segno; +} DirtyFileTag; + +extern void CheckpointerMain(void) pg_attribute_noreturn(); +extern CheckpointCycle register_dirty_file(const DirtyFileTag *tag, + File file, + CheckpointCycle last_cycle); + +extern void ForwardFsyncRequest(const SmgrFileTag *tag, File fd); +extern void RequestCheckpoint(int flags); +extern void CheckpointWriteDelay(int flags, double progress); + +extern void AbsorbFsyncRequests(void); +extern void AbsorbAllFsyncRequests(void); + +extern Size CheckpointerShmemSize(void); +extern void CheckpointerShmemInit(void); + +extern uint64 GetCheckpointSyncCycle(void); +extern uint64 IncCheckpointSyncCycle(void); + +extern bool FirstCallSinceLastCheckpoint(void); +extern void CountBackendWrite(void); + +#endif diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 1877eef2391..821fd2d1ad2 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -44,6 +44,15 @@ extern int postmaster_alive_fds[2]; #define POSTMASTER_FD_OWN 1 /* kept open by postmaster only */ #endif +#define FSYNC_FD_SUBMIT 0 +#define FSYNC_FD_PROCESS 1 + +#ifndef WIN32 +extern int fsync_fds[2]; +#else +extern HANDLE fsyncPipe[2]; +#endif + extern PGDLLIMPORT const char *progname; extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn(); diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 8e7c9728f4b..d952acf714e 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -65,6 +65,7 @@ extern int max_safe_fds; /* Operations on virtual Files --- equivalent to Unix kernel file ops */ extern File PathNameOpenFile(const char *fileName, int fileFlags); extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); +extern File FileOpenForFd(int fd, const char *fileName, uint64 open_seq); extern File OpenTemporaryFile(bool interXact); extern void FileClose(File file); extern int FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info); @@ -78,6 +79,8 @@ extern char *FilePathName(File file); extern int FileGetRawDesc(File file); extern int FileGetRawFlags(File file); extern mode_t FileGetRawMode(File file); +extern uint64 FileGetOpenSeq(File file); +extern void FileSetOpenSeq(File file, uint64 seq); /* Operations used for sharing named temporary files */ extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure); @@ -116,6 +119,7 @@ extern int MakePGDirectory(const char *directoryName); /* Miscellaneous support routines */ extern void InitFileAccess(void); +extern void FileShmemInit(void); extern void set_max_safe_fds(void); extern void closeAllVfds(void); extern void SetTempTablespaces(Oid *tableSpaces, int numSpaces); @@ -127,6 +131,7 @@ extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid); extern void RemovePgTempFiles(void); extern bool looks_like_temp_rel_name(const char *name); +extern void ReleaseLruFiles(void); extern int pg_fsync(int fd); extern int pg_fsync_no_writethrough(int fd); @@ -143,4 +148,10 @@ extern void SyncDataDirectory(void); #define PG_TEMP_FILES_DIR "pgsql_tmp" #define PG_TEMP_FILE_PREFIX "pgsql_tmp" +#ifndef WIN32 +/* XXX; This should probably go elsewhere */ +ssize_t pg_uds_send_with_fd(int sock, void *buf, ssize_t buflen, int fd); +ssize_t pg_uds_recv_with_fd(int sock, void *buf, ssize_t bufsize, int *fd); +#endif + #endif /* FD_H */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index fd8735b7f5f..a74eedfe4e9 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -128,6 +128,7 @@ typedef struct Latch #define WL_POSTMASTER_DEATH (1 << 4) #ifdef WIN32 #define WL_SOCKET_CONNECTED (1 << 5) +#define WL_WIN32_HANDLE (1 << 6) #else /* avoid having to deal with case on platforms not requiring it */ #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index c843bbc9692..dc22efbe0a8 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -77,6 +77,18 @@ typedef struct SMgrRelationData typedef SMgrRelationData *SMgrRelation; +/* + * A tag identifying a file to be flushed at the next checkpoint. This is + * convertible to the file's path, but it's convenient to have a small fixed + * sized object to use as a hash table key. + */ +typedef struct SmgrFileTag +{ + RelFileNode node; + ForkNumber forknum; + int segno; +} SmgrFileTag; + #define SmgrIsTemp(smgr) \ RelFileNodeBackendIsTemp((smgr)->smgr_rnode) @@ -106,9 +118,7 @@ extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum); -extern void smgrpreckpt(void); -extern void smgrsync(void); -extern void smgrpostckpt(void); +extern void smgrpath(const SmgrFileTag *tag, char *out); extern void AtEOXact_SMgr(void); @@ -134,13 +144,9 @@ extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); extern void mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum); -extern void mdpreckpt(void); -extern void mdsync(void); -extern void mdpostckpt(void); +extern void mdpath(const SmgrFileTag *tag, char *out); -extern void SetForwardFsyncRequests(void); -extern void RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, - BlockNumber segno); +extern bool FlushFsyncRequestQueueIfNecessary(void); extern void ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum); extern void ForgetDatabaseFsyncRequests(Oid dbid); extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo); diff --git a/src/include/storage/smgrsync.h b/src/include/storage/smgrsync.h new file mode 100644 index 00000000000..f32bb22a7cc --- /dev/null +++ b/src/include/storage/smgrsync.h @@ -0,0 +1,37 @@ +/*------------------------------------------------------------------------- + * + * smgrsync.h + * management of file synchronization + * + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/smgrpending.h + * + *------------------------------------------------------------------------- + */ +#ifndef SMGRSYNC_H +#define SMGRSYNC_H + +#include "postgres.h" + +#include "storage/fd.h" + + +extern void smgrsync_init(void); +extern void smgrpreckpt(void); +extern void smgrsync(void); +extern void smgrpostckpt(void); + +extern void UnlinkAfterCheckpoint(RelFileNodeBackend rnode); +extern uint64 FsyncAtCheckpoint(const SmgrFileTag *tag, + File file, + uint64 last_cycle); +extern void RememberFsyncRequest(const SmgrFileTag *tag, + int fd, + uint64 open_seq); +extern void SetForwardFsyncRequests(void); + + +#endif -- 2.17.1 (Apple Git-112)