From 67d7b1d16d8b2710e38d2b094ac9dc27acbfed40 Mon Sep 17 00:00:00 2001 From: Takashi Menjo Date: Mon, 16 Mar 2020 11:14:00 +0900 Subject: [PATCH v3 03/10] Use WAL segments as WAL buffers Please run ./configure with LIBS=-lpmem to build. Note that we ignore wal_sync_method from here. --- src/backend/access/transam/xlog.c | 968 +++++++++++------------------- 1 file changed, 366 insertions(+), 602 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5bf79e1d8c..a20fadbb55 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -18,9 +18,11 @@ #include #include #include +#include #include #include #include +#include #include "access/clog.h" #include "access/commit_ts.h" @@ -623,24 +625,8 @@ typedef struct XLogCtlData XLogwrtResult LogwrtResult; /* - * Latest initialized page in the cache (last byte position + 1). - * - * To change the identity of a buffer (and InitializedUpTo), you need to - * hold WALBufMappingLock. To change the identity of a buffer that's - * still dirty, the old page needs to be written out first, and for that - * you need WALWriteLock, and you need to ensure that there are no - * in-progress insertions to the page by calling - * WaitXLogInsertionsToFinish(). + * This value does not change after startup. */ - XLogRecPtr InitializedUpTo; - - /* - * These values do not change after startup, although the pointed-to pages - * and xlblocks values certainly do. xlblocks values are protected by - * WALBufMappingLock. - */ - char *pages; /* buffers for unwritten XLOG pages */ - XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ int XLogCacheBlck; /* highest allocated xlog buffer index */ /* @@ -804,9 +790,26 @@ static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "strea * openLogSegNo identifies the segment. These variables are only used to * write the XLOG, and so will normally refer to the active segment. * Note: call Reserve/ReleaseExternalFD to track consumption of this FD. + * + * mappedPages is mmap(2)-ed address for an open log file segment. + * It is used as WAL buffer instead of XLogCtl->pages. + * + * pmemMapped is true if mappedPages is on PMEM. */ static int openLogFile = -1; static XLogSegNo openLogSegNo = 0; +static char *mappedPages = NULL; +static bool pmemMapped = 0; + +/* 2MiB hugepage mask used by XLogFileMapHint */ +#define PG_HUGEPAGE_MASK ((((uintptr_t) 1) << 21) - 1) + +#ifndef MAP_SHARED_VALIDATE +#define MAP_SHARED_VALIDATE 0x3 +#endif +#ifndef MAP_SYNC +#define MAP_SYNC 0x80000 +#endif /* * These variables are used similarly to the ones above, but for reading @@ -911,12 +914,15 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); -static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic); static bool XLogCheckpointNeeded(XLogSegNo new_segno); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool find_free, XLogSegNo max_segno, bool use_lock); +static void *XLogFileMapHint(void); +static void *XLogFileMapUtil(void *hint, int fd, bool dax); +static char *XLogFileMap(XLogSegNo segno, bool *is_pmem); +static void XLogFileUnmap(char *pages, XLogSegNo segno); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); @@ -979,7 +985,6 @@ static void checkXLogConsistency(XLogReaderState *record); static void WALInsertLockAcquire(void); static void WALInsertLockAcquireExclusive(void); static void WALInsertLockRelease(void); -static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); /* * Insert an XLOG record represented by an already-constructed chain of data @@ -1623,27 +1628,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, */ while (CurrPos < EndPos) { - /* - * The minimal action to flush the page would be to call - * WALInsertLockUpdateInsertingAt(CurrPos) followed by - * AdvanceXLInsertBuffer(...). The page would be left initialized - * mostly to zeros, except for the page header (always the short - * variant, as this is never a segment's first page). - * - * The large vistas of zeros are good for compressibility, but the - * headers interrupting them every XLOG_BLCKSZ (with values that - * differ from page to page) are not. The effect varies with - * compression tool, but bzip2 for instance compresses about an - * order of magnitude worse if those headers are left in place. - * - * Rather than complicating AdvanceXLInsertBuffer itself (which is - * called in heavily-loaded circumstances as well as this lightly- - * loaded one) with variant behavior, we just use GetXLogBuffer - * (which itself calls the two methods we need) to get the pointer - * and zero most of the page. Then we just zero the page header. - */ - currpos = GetXLogBuffer(CurrPos); - MemSet(currpos, 0, SizeOfXLogShortPHD); + /* XXX We assume that XLogFileInit does what we did here */ CurrPos += XLOG_BLCKSZ; } @@ -1757,29 +1742,6 @@ WALInsertLockRelease(void) } } -/* - * Update our insertingAt value, to let others know that we've finished - * inserting up to that point. - */ -static void -WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt) -{ - if (holdingAllLocks) - { - /* - * We use the last lock to mark our actual position, see comments in - * WALInsertLockAcquireExclusive. - */ - LWLockUpdateVar(&WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.lock, - &WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.insertingAt, - insertingAt); - } - else - LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock, - &WALInsertLocks[MyLockNo].l.insertingAt, - insertingAt); -} - /* * Wait for any WAL insertions < upto to finish. * @@ -1881,123 +1843,37 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) /* * Get a pointer to the right location in the WAL buffer containing the * given XLogRecPtr. - * - * If the page is not initialized yet, it is initialized. That might require - * evicting an old dirty buffer from the buffer cache, which means I/O. - * - * The caller must ensure that the page containing the requested location - * isn't evicted yet, and won't be evicted. The way to ensure that is to - * hold onto a WAL insertion lock with the insertingAt position set to - * something <= ptr. GetXLogBuffer() will update insertingAt if it needs - * to evict an old page from the buffer. (This means that once you call - * GetXLogBuffer() with a given 'ptr', you must not access anything before - * that point anymore, and must not call GetXLogBuffer() with an older 'ptr' - * later, because older buffers might be recycled already) */ static char * GetXLogBuffer(XLogRecPtr ptr) { - int idx; - XLogRecPtr endptr; - static uint64 cachedPage = 0; - static char *cachedPos = NULL; - XLogRecPtr expectedEndPtr; + int idx; + XLogPageHeader page; + XLogSegNo segno; - /* - * Fast path for the common case that we need to access again the same - * page as last time. - */ - if (ptr / XLOG_BLCKSZ == cachedPage) + /* shut-up compiler if not --enable-cassert */ + (void) page; + + XLByteToSeg(ptr, segno, wal_segment_size); + if (segno != openLogSegNo) { - Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC); - Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); - return cachedPos + ptr % XLOG_BLCKSZ; + /* Unmap the current segment if mapped */ + if (mappedPages != NULL) + XLogFileUnmap(mappedPages, openLogSegNo); + + /* Map the segment we need */ + mappedPages = XLogFileMap(segno, &pmemMapped); + Assert(mappedPages != NULL); + openLogSegNo = segno; } - /* - * The XLog buffer cache is organized so that a page is always loaded to a - * particular buffer. That way we can easily calculate the buffer a given - * page must be loaded into, from the XLogRecPtr alone. - */ idx = XLogRecPtrToBufIdx(ptr); + page = (XLogPageHeader) (mappedPages + idx * (Size) XLOG_BLCKSZ); - /* - * See what page is loaded in the buffer at the moment. It could be the - * page we're looking for, or something older. It can't be anything newer - * - that would imply the page we're looking for has already been written - * out to disk and evicted, and the caller is responsible for making sure - * that doesn't happen. - * - * However, we don't hold a lock while we read the value. If someone has - * just initialized the page, it's possible that we get a "torn read" of - * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In - * that case we will see a bogus value. That's ok, we'll grab the mapping - * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than - * the page we're looking for. But it means that when we do this unlocked - * read, we might see a value that appears to be ahead of the page we're - * looking for. Don't PANIC on that, until we've verified the value while - * holding the lock. - */ - expectedEndPtr = ptr; - expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; + Assert(page->xlp_magic == XLOG_PAGE_MAGIC); + Assert(page->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); - endptr = XLogCtl->xlblocks[idx]; - if (expectedEndPtr != endptr) - { - XLogRecPtr initializedUpto; - - /* - * Before calling AdvanceXLInsertBuffer(), which can block, let others - * know how far we're finished with inserting the record. - * - * NB: If 'ptr' points to just after the page header, advertise a - * position at the beginning of the page rather than 'ptr' itself. If - * there are no other insertions running, someone might try to flush - * up to our advertised location. If we advertised a position after - * the page header, someone might try to flush the page header, even - * though page might actually not be initialized yet. As the first - * inserter on the page, we are effectively responsible for making - * sure that it's initialized, before we let insertingAt to move past - * the page header. - */ - if (ptr % XLOG_BLCKSZ == SizeOfXLogShortPHD && - XLogSegmentOffset(ptr, wal_segment_size) > XLOG_BLCKSZ) - initializedUpto = ptr - SizeOfXLogShortPHD; - else if (ptr % XLOG_BLCKSZ == SizeOfXLogLongPHD && - XLogSegmentOffset(ptr, wal_segment_size) < XLOG_BLCKSZ) - initializedUpto = ptr - SizeOfXLogLongPHD; - else - initializedUpto = ptr; - - WALInsertLockUpdateInsertingAt(initializedUpto); - - AdvanceXLInsertBuffer(ptr, false); - endptr = XLogCtl->xlblocks[idx]; - - if (expectedEndPtr != endptr) - elog(PANIC, "could not find WAL buffer for %X/%X", - (uint32) (ptr >> 32), (uint32) ptr); - } - else - { - /* - * Make sure the initialization of the page is visible to us, and - * won't arrive later to overwrite the WAL data we write on the page. - */ - pg_memory_barrier(); - } - - /* - * Found the buffer holding this page. Return a pointer to the right - * offset within the page. - */ - cachedPage = ptr / XLOG_BLCKSZ; - cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; - - Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC); - Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); - - return cachedPos + ptr % XLOG_BLCKSZ; + return mappedPages + ptr % wal_segment_size; } /* @@ -2125,179 +2001,6 @@ XLogRecPtrToBytePos(XLogRecPtr ptr) return result; } -/* - * Initialize XLOG buffers, writing out old buffers if they still contain - * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is - * true, initialize as many pages as we can without having to write out - * unwritten data. Any new pages are initialized to zeros, with pages headers - * initialized properly. - */ -static void -AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) -{ - XLogCtlInsert *Insert = &XLogCtl->Insert; - int nextidx; - XLogRecPtr OldPageRqstPtr; - XLogwrtRqst WriteRqst; - XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; - XLogRecPtr NewPageBeginPtr; - XLogPageHeader NewPage; - int npages = 0; - - LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); - - /* - * Now that we have the lock, check if someone initialized the page - * already. - */ - while (upto >= XLogCtl->InitializedUpTo || opportunistic) - { - nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); - - /* - * Get ending-offset of the buffer page we need to replace (this may - * be zero if the buffer hasn't been used yet). Fall through if it's - * already written out. - */ - OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; - if (LogwrtResult.Write < OldPageRqstPtr) - { - /* - * Nope, got work to do. If we just want to pre-initialize as much - * as we can without flushing, give up now. - */ - if (opportunistic) - break; - - /* Before waiting, get info_lck and update LogwrtResult */ - SpinLockAcquire(&XLogCtl->info_lck); - if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) - XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); - - /* - * Now that we have an up-to-date LogwrtResult value, see if we - * still need to write it or if someone else already did. - */ - if (LogwrtResult.Write < OldPageRqstPtr) - { - /* - * Must acquire write lock. Release WALBufMappingLock first, - * to make sure that all insertions that we need to wait for - * can finish (up to this same position). Otherwise we risk - * deadlock. - */ - LWLockRelease(WALBufMappingLock); - - WaitXLogInsertionsToFinish(OldPageRqstPtr); - - LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - - LogwrtResult = XLogCtl->LogwrtResult; - if (LogwrtResult.Write >= OldPageRqstPtr) - { - /* OK, someone wrote it already */ - LWLockRelease(WALWriteLock); - } - else - { - /* Have to write it ourselves */ - TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START(); - WriteRqst.Write = OldPageRqstPtr; - WriteRqst.Flush = 0; - XLogWrite(WriteRqst, false); - LWLockRelease(WALWriteLock); - WalStats.m_wal_buffers_full++; - TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); - } - /* Re-acquire WALBufMappingLock and retry */ - LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); - continue; - } - } - - /* - * Now the next buffer slot is free and we can set it up to be the - * next output page. - */ - NewPageBeginPtr = XLogCtl->InitializedUpTo; - NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; - - Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); - - NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); - - /* - * Be sure to re-zero the buffer so that bytes beyond what we've - * written will look like zeroes and not valid XLOG records... - */ - MemSet((char *) NewPage, 0, XLOG_BLCKSZ); - - /* - * Fill the new page's header - */ - NewPage->xlp_magic = XLOG_PAGE_MAGIC; - - /* NewPage->xlp_info = 0; */ /* done by memset */ - NewPage->xlp_tli = ThisTimeLineID; - NewPage->xlp_pageaddr = NewPageBeginPtr; - - /* NewPage->xlp_rem_len = 0; */ /* done by memset */ - - /* - * If online backup is not in progress, mark the header to indicate - * that WAL records beginning in this page have removable backup - * blocks. This allows the WAL archiver to know whether it is safe to - * compress archived WAL data by transforming full-block records into - * the non-full-block format. It is sufficient to record this at the - * page level because we force a page switch (in fact a segment - * switch) when starting a backup, so the flag will be off before any - * records can be written during the backup. At the end of a backup, - * the last page will be marked as all unsafe when perhaps only part - * is unsafe, but at worst the archiver would miss the opportunity to - * compress a few records. - */ - if (!Insert->forcePageWrites) - NewPage->xlp_info |= XLP_BKP_REMOVABLE; - - /* - * If first page of an XLOG segment file, make it a long header. - */ - if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0) - { - XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; - - NewLongPage->xlp_sysid = ControlFile->system_identifier; - NewLongPage->xlp_seg_size = wal_segment_size; - NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; - NewPage->xlp_info |= XLP_LONG_HEADER; - } - - /* - * Make sure the initialization of the page becomes visible to others - * before the xlblocks update. GetXLogBuffer() reads xlblocks without - * holding a lock. - */ - pg_write_barrier(); - - *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr; - - XLogCtl->InitializedUpTo = NewPageEndPtr; - - npages++; - } - LWLockRelease(WALBufMappingLock); - -#ifdef WAL_DEBUG - if (XLOG_DEBUG && npages > 0) - { - elog(DEBUG1, "initialized %d pages, up to %X/%X", - npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr); - } -#endif -} - /* * Calculate CheckPointSegments based on max_wal_size_mb and * checkpoint_completion_target. @@ -2426,14 +2129,9 @@ XLogCheckpointNeeded(XLogSegNo new_segno) static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible) { - bool ispartialpage; - bool last_iteration; bool finishing_seg; - bool use_existent; - int curridx; - int npages; - int startidx; - uint32 startoffset; + XLogSegNo rqstLogSegNo; + XLogSegNo segno; /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); @@ -2443,233 +2141,149 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) */ LogwrtResult = XLogCtl->LogwrtResult; - /* - * Since successive pages in the xlog cache are consecutively allocated, - * we can usually gather multiple pages together and issue just one - * write() call. npages is the number of pages we have determined can be - * written together; startidx is the cache block index of the first one, - * and startoffset is the file offset at which it should go. The latter - * two variables are only valid when npages > 0, but we must initialize - * all of them to keep the compiler quiet. - */ - npages = 0; - startidx = 0; - startoffset = 0; + /* Fast return if not requested to flush */ + if (WriteRqst.Flush == 0) + return; + Assert(WriteRqst.Flush == WriteRqst.Write); /* - * Within the loop, curridx is the cache block index of the page to - * consider writing. Begin at the buffer containing the next unwritten - * page, or last partially written page. + * Call pmem_persist() or pmem_msync() for each segment file that contains + * records to be flushed. */ - curridx = XLogRecPtrToBufIdx(LogwrtResult.Write); - - while (LogwrtResult.Write < WriteRqst.Write) + XLByteToPrevSeg(WriteRqst.Flush, rqstLogSegNo, wal_segment_size); + XLByteToSeg(LogwrtResult.Flush, segno, wal_segment_size); + while (segno <= rqstLogSegNo) { - /* - * Make sure we're not ahead of the insert process. This could happen - * if we're passed a bogus WriteRqst.Write that is past the end of the - * last page that's been initialized by AdvanceXLInsertBuffer. - */ - XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx]; + bool is_pmem; + char *addr; + char *p; + Size len; + XLogRecPtr BeginPtr; + XLogRecPtr EndPtr; - if (LogwrtResult.Write >= EndPtr) - elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", - (uint32) (LogwrtResult.Write >> 32), - (uint32) LogwrtResult.Write, - (uint32) (EndPtr >> 32), (uint32) EndPtr); - - /* Advance LogwrtResult.Write to end of current buffer page */ - LogwrtResult.Write = EndPtr; - ispartialpage = WriteRqst.Write < LogwrtResult.Write; - - if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size)) + /* Check if the segment is not mapped yet */ + if (segno != openLogSegNo) { + /* Map newly */ + is_pmem = 0; + addr = XLogFileMap(segno, &is_pmem); + /* - * Switch to new logfile segment. We cannot have any pending - * pages here (since we dump what we have at segment end). + * Use the mapped above as WAL buffer of this process for the + * future. Note that it might be unmapped within this loop. */ - Assert(npages == 0); - if (openLogFile >= 0) - XLogFileClose(); - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size); - - /* create/use new log file */ - use_existent = true; - openLogFile = XLogFileInit(openLogSegNo, &use_existent, true); - ReserveExternalFD(); + if (openLogSegNo == 0) + { + pmemMapped = is_pmem; + mappedPages = addr; + openLogSegNo = segno; + } } - - /* Make sure we have the current logfile open */ - if (openLogFile < 0) + else { - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size); - openLogFile = XLogFileOpen(openLogSegNo); - ReserveExternalFD(); + /* Or use existent mapping */ + is_pmem = pmemMapped; + addr = mappedPages; } + Assert(addr != NULL); + Assert(mappedPages != NULL); + Assert(openLogSegNo > 0); - /* Add current page to the set of pending pages-to-dump */ - if (npages == 0) - { - /* first of group */ - startidx = curridx; - startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ, - wal_segment_size); - } - npages++; + /* Find beginning position to be flushed */ + BeginPtr = segno * wal_segment_size; + if (BeginPtr < LogwrtResult.Flush) + BeginPtr = LogwrtResult.Flush; + + /* Find ending position to be flushed */ + EndPtr = (segno + 1) * wal_segment_size; + if (EndPtr > WriteRqst.Flush) + EndPtr = WriteRqst.Flush; + + /* Convert LSN to memory address */ + Assert(BeginPtr <= EndPtr); + p = addr + BeginPtr % wal_segment_size; + len = (Size) (EndPtr - BeginPtr); /* - * Dump the set if this will be the last loop iteration, or if we are - * at the last page of the cache area (since the next page won't be - * contiguous in memory), or if we are at the end of the logfile - * segment. + * Do cache-flush or msync. + * + * Note that pmem_msync() does backoff to the page boundary. */ - last_iteration = WriteRqst.Write <= LogwrtResult.Write; - - finishing_seg = !ispartialpage && - (startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size; - - if (last_iteration || - curridx == XLogCtl->XLogCacheBlck || - finishing_seg) + if (is_pmem) { - char *from; - Size nbytes; - Size nleft; - int written; - - /* OK to write the page(s) */ - from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ; - nbytes = npages * (Size) XLOG_BLCKSZ; - nleft = nbytes; - do + pgstat_report_wait_start(WAIT_EVENT_WAL_SYNC); + pmem_persist(p, len); + pgstat_report_wait_end(); + } + else + { + pgstat_report_wait_start(WAIT_EVENT_WAL_SYNC); + if (pmem_msync(p, len)) { - errno = 0; - pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE); - written = pg_pwrite(openLogFile, from, nleft, startoffset); + char xlogfname[MAXFNAMELEN]; + int save_errno; + pgstat_report_wait_end(); - if (written <= 0) - { - char xlogfname[MAXFNAMELEN]; - int save_errno; - if (errno == EINTR) - continue; + save_errno = errno; + XLogFileName(xlogfname, ThisTimeLineID, openLogSegNo, + wal_segment_size); + errno = save_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not msync to log file %s " + "at address %p, length %zu: %m", + xlogfname, p, len))); + } + pgstat_report_wait_end(); + } + LogwrtResult.Flush = LogwrtResult.Write = EndPtr; - save_errno = errno; - XLogFileName(xlogfname, ThisTimeLineID, openLogSegNo, - wal_segment_size); - errno = save_errno; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log file %s " - "at offset %u, length %zu: %m", - xlogfname, startoffset, nleft))); - } - nleft -= written; - from += written; - startoffset += written; - } while (nleft > 0); + /* Check if whole my WAL buffers are synchronized to the segment */ + finishing_seg = (LogwrtResult.Flush % wal_segment_size == 0) && + XLByteInPrevSeg(LogwrtResult.Flush, openLogSegNo, + wal_segment_size); - npages = 0; - - /* - * If we just wrote the whole last page of a logfile segment, - * fsync the segment immediately. This avoids having to go back - * and re-open prior segments when an fsync request comes along - * later. Doing it here ensures that one and only one backend will - * perform this fsync. - * - * This is also the right place to notify the Archiver that the - * segment is ready to copy to archival storage, and to update the - * timer for archive_timeout, and to signal for a checkpoint if - * too many logfile segments have been used since the last - * checkpoint. - */ + if (segno != openLogSegNo || finishing_seg) + { + XLogFileUnmap(addr, segno); if (finishing_seg) { - issue_xlog_fsync(openLogFile, openLogSegNo); - - /* signal that we need to wakeup walsenders later */ - WalSndWakeupRequest(); - - LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ - - if (XLogArchivingActive()) - XLogArchiveNotifySeg(openLogSegNo); - - XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); - XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; - - /* - * Request a checkpoint if we've consumed too much xlog since - * the last one. For speed, we first check using the local - * copy of RedoRecPtr, which might be out of date; if it looks - * like a checkpoint is needed, forcibly update RedoRecPtr and - * recheck. - */ - if (IsUnderPostmaster && XLogCheckpointNeeded(openLogSegNo)) - { - (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(openLogSegNo)) - RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); - } + Assert(segno == openLogSegNo); + mappedPages = NULL; + openLogSegNo = 0; } - } - if (ispartialpage) - { - /* Only asked to write a partial page */ - LogwrtResult.Write = WriteRqst.Write; - break; - } - curridx = NextBufIdx(curridx); + /* signal that we need to wakeup walsenders later */ + WalSndWakeupRequest(); - /* If flexible, break out of loop as soon as we wrote something */ - if (flexible && npages == 0) - break; - } + if (XLogArchivingActive()) + XLogArchiveNotifySeg(segno); - Assert(npages == 0); + XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); + XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; - /* - * If asked to flush, do so - */ - if (LogwrtResult.Flush < WriteRqst.Flush && - LogwrtResult.Flush < LogwrtResult.Write) - - { - /* - * Could get here without iterating above loop, in which case we might - * have no open file or the wrong one. However, we do not need to - * fsync more than one file. - */ - if (sync_method != SYNC_METHOD_OPEN && - sync_method != SYNC_METHOD_OPEN_DSYNC) - { - if (openLogFile >= 0 && - !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size)) - XLogFileClose(); - if (openLogFile < 0) + /* + * Request a checkpoint if we've consumed too much xlog since + * the last one. For speed, we first check using the local + * copy of RedoRecPtr, which might be out of date; if it looks + * like a checkpoint is needed, forcibly update RedoRecPtr and + * recheck. + */ + if (IsUnderPostmaster && XLogCheckpointNeeded(segno)) { - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size); - openLogFile = XLogFileOpen(openLogSegNo); - ReserveExternalFD(); + (void) GetRedoRecPtr(); + if (XLogCheckpointNeeded(segno)) + RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } - - issue_xlog_fsync(openLogFile, openLogSegNo); } - /* signal that we need to wakeup walsenders later */ - WalSndWakeupRequest(); - - LogwrtResult.Flush = LogwrtResult.Write; + ++segno; } + /* signal that we need to wakeup walsenders later */ + WalSndWakeupRequest(); + /* * Update shared-memory status * @@ -3090,6 +2704,16 @@ XLogBackgroundFlush(void) XLogFileClose(); } } + else if (mappedPages != NULL) + { + if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + wal_segment_size)) + { + XLogFileUnmap(mappedPages, openLogSegNo); + mappedPages = NULL; + openLogSegNo = 0; + } + } return false; } @@ -3156,12 +2780,6 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(); - /* - * Great, done. To take some work off the critical path, try to initialize - * as many of the no-longer-needed WAL buffers for future use as we can. - */ - AdvanceXLInsertBuffer(InvalidXLogRecPtr, true); - /* * If we determined that we need to write data, but somebody else * wrote/flushed already, it should be considered as being active, to @@ -3315,9 +2933,26 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) memset(zbuffer.data, 0, XLOG_BLCKSZ); pgstat_report_wait_start(WAIT_EVENT_WAL_INIT_WRITE); - save_errno = 0; - if (wal_init_zero) + + /* + * Allocate the file by posix_allocate(3) to utilize hugepage and reduce + * overhead of page fault. Note that posix_fallocate(3) do not set errno + * on error. Instead, it returns an error number directly. + */ + save_errno = posix_fallocate(fd, 0, wal_segment_size); + + if (save_errno) { + /* + * Do nothing on error. Go to pgstat_report_wait_end(). + */ + } + else if (wal_init_zero) + { + XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogPageHeader NewPage = (XLogPageHeader) zbuffer.data; + XLogRecPtr NewPageBeginPtr = logsegno * wal_segment_size; + /* * Zero-fill the file. With this setting, we do this the hard way to * ensure that all the file space has really been allocated. On @@ -3329,6 +2964,48 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) */ for (nbytes = 0; nbytes < wal_segment_size; nbytes += XLOG_BLCKSZ) { + memset(NewPage, 0, SizeOfXLogLongPHD); + + /* + * Fill the new page's header + */ + NewPage->xlp_magic = XLOG_PAGE_MAGIC; + + /* NewPage->xlp_info = 0; */ /* done by memset */ + NewPage->xlp_tli = ThisTimeLineID; + NewPage->xlp_pageaddr = NewPageBeginPtr; + + /* NewPage->xlp_rem_len = 0; */ /* done by memset */ + + /* + * If online backup is not in progress, mark the header to indicate + * that WAL records beginning in this page have removable backup + * blocks. This allows the WAL archiver to know whether it is safe to + * compress archived WAL data by transforming full-block records into + * the non-full-block format. It is sufficient to record this at the + * page level because we force a page switch (in fact a segment + * switch) when starting a backup, so the flag will be off before any + * records can be written during the backup. At the end of a backup, + * the last page will be marked as all unsafe when perhaps only part + * is unsafe, but at worst the archiver would miss the opportunity to + * compress a few records. + */ + if (!Insert->forcePageWrites) + NewPage->xlp_info |= XLP_BKP_REMOVABLE; + + /* + * If first page of an XLOG segment file, make it a long header. + */ + if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0) + { + XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; + + NewLongPage->xlp_sysid = ControlFile->system_identifier; + NewLongPage->xlp_seg_size = wal_segment_size; + NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; + NewPage->xlp_info |= XLP_LONG_HEADER; + } + errno = 0; if (write(fd, zbuffer.data, XLOG_BLCKSZ) != XLOG_BLCKSZ) { @@ -3336,6 +3013,8 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) save_errno = errno ? errno : ENOSPC; break; } + + NewPageBeginPtr += XLOG_BLCKSZ; } } else @@ -3651,6 +3330,138 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, return true; } +/* + * Get a hint address for hugepage boundary mapping. + * + * Returns non-NULL if success, or PANICs otherwise. + */ +static void * +XLogFileMapHint(void) +{ + void *hint; + Size len; + + len = (Size) wal_segment_size + PG_HUGEPAGE_MASK + 1; + hint = mmap(NULL, len, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + if (hint == MAP_FAILED) + elog(PANIC, "could not get hint address"); + + if (munmap(hint, len) != 0) + elog(PANIC, "could not unmap hint address"); + + /* Go forward onto the nearest hugepage boundary */ + return (void *) (((uintptr_t) hint + PG_HUGEPAGE_MASK) & ~PG_HUGEPAGE_MASK); +} + +static void * +XLogFileMapUtil(void *hint, int fd, bool dax) +{ + int flags; + + if (dax) + flags = MAP_SHARED_VALIDATE | MAP_SYNC; + else + flags = MAP_SHARED; + + return mmap(hint, wal_segment_size, PROT_READ | PROT_WRITE, flags, fd, 0); +} + +/* + * Memory-map a pre-existing logfile segment for WAL buffers. + * + * If success, it returns non-NULL and is_pmem is set whether the file is on + * PMEM or not. Otherwise, it PANICs. + */ +static char * +XLogFileMap(XLogSegNo segno, bool *is_pmem) +{ + char path[MAXPGPATH]; + char *addr; + void *hint; + int fd; + struct stat stat_buf; + + XLogFilePath(path, ThisTimeLineID, segno, wal_segment_size); + + fd = BasicOpenFile(path, O_RDWR | PG_BINARY); + if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + if (fstat(fd, &stat_buf) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fstat file \"%s\": %m", path))); + + if (stat_buf.st_size != wal_segment_size) + elog(PANIC, + "invalid logfile segment size; path \"%s\" actual %d expected %d", + path, (int) stat_buf.st_size, wal_segment_size); + + hint = XLogFileMapHint(); + + /* + * Try DAX mapping first (dax=true). + * + * If not supported, then do regular mapping (dax=false). + */ + addr = XLogFileMapUtil(hint, fd, true); + + if (addr != MAP_FAILED) + { + *is_pmem = true; + } + else if (errno == EOPNOTSUPP || errno == EINVAL) + { + addr = XLogFileMapUtil(hint, fd, false); + + if (addr == MAP_FAILED) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not mmap file \"%s\": %m", path))); + + *is_pmem = false; + } + + /* Check if the logfile segment is mapped onto hugepage boundary */ + if ((uintptr_t) addr & PG_HUGEPAGE_MASK) + elog(WARNING, + "logfile segment is not mapped onto hugepage boundary; path \"%s\" actual %p expected %p", + path, addr, hint); + + /* We don't need the file descriptor anymore, so close it */ + if (close(fd) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", path))); + + return addr; +} + +/* + * Unmap a given logfile segment for WAL buffer. + */ +static void +XLogFileUnmap(char *pages, XLogSegNo segno) +{ + Assert(pages != NULL); + + if (munmap(pages, wal_segment_size) != 0) + { + char xlogfname[MAXFNAMELEN]; + int save_errno = errno; + + XLogFileName(xlogfname, ThisTimeLineID, segno, wal_segment_size); + errno = save_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not unmap file \"%s\": %m", xlogfname))); + } +} + /* * Open a pre-existing logfile segment for writing. */ @@ -5070,12 +4881,6 @@ XLOGShmemSize(void) /* WAL insertion locks, plus alignment */ size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1)); - /* xlblocks array */ - size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); - /* extra alignment padding for XLOG I/O buffers */ - size = add_size(size, XLOG_BLCKSZ); - /* and the buffers themselves */ - size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers)); /* * Note: we don't count ControlFileData, it comes out of the "slop factor" @@ -5149,10 +4954,6 @@ XLOGShmemInit(void) * needed here. */ allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData); - XLogCtl->xlblocks = (XLogRecPtr *) allocptr; - memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); - allocptr += sizeof(XLogRecPtr) * XLOGbuffers; - /* WAL insertion locks. Ensure they're aligned to the full padded size */ allocptr += sizeof(WALInsertLockPadded) - @@ -5168,15 +4969,6 @@ XLOGShmemInit(void) WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr; } - /* - * Align the start of the page buffers to a full xlog block size boundary. - * This simplifies some calculations in XLOG insertion. It is also - * required for O_DIRECT. - */ - allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr); - XLogCtl->pages = allocptr; - memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers); - /* * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill * in additional info.) @@ -7717,40 +7509,12 @@ StartupXLOG(void) Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog); /* - * Tricky point here: readBuf contains the *last* block that the LastRec - * record spans, not the one it starts in. The last block is indeed the - * one we want to use. + * We DO NOT need the if-else block once existed here because we use WAL + * segment files as WAL buffers so the last block is "already on the + * buffers." + * + * XXX We assume there is no torn record. */ - if (EndOfLog % XLOG_BLCKSZ != 0) - { - char *page; - int len; - int firstIdx; - XLogRecPtr pageBeginPtr; - - pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); - - firstIdx = XLogRecPtrToBufIdx(EndOfLog); - - /* Copy the valid part of the last block, and zero the rest */ - page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ]; - len = EndOfLog % XLOG_BLCKSZ; - memcpy(page, xlogreader->readBuf, len); - memset(page + len, 0, XLOG_BLCKSZ - len); - - XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ; - XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ; - } - else - { - /* - * There is no partial block to copy. Just set InitializedUpTo, and - * let the first attempt to insert a log record to initialize the next - * buffer. - */ - XLogCtl->InitializedUpTo = EndOfLog; - } LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; -- 2.25.1