From ff658e5fecf317888ede6f11827be50495ec4f00 Mon Sep 17 00:00:00 2001 From: erthalion <9erthalion6@gmail.com> Date: Mon, 10 Dec 2018 21:51:15 +0100 Subject: [PATCH 3/3] Walreceiver WAL IO using PMDK --- src/backend/replication/walreceiver.c | 66 +++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 9643c2ed7b..e03b660109 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -61,6 +61,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" +#include "storage/pmem.h" #include "storage/pmsignal.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -91,6 +92,7 @@ static int recvFile = -1; static TimeLineID recvFileTLI = 0; static XLogSegNo recvSegNo = 0; static uint32 recvOff = 0; +void *mappedFileAddr = NULL; /* * Flags set by interrupt handlers of walreceiver for later service in the @@ -599,12 +601,12 @@ WalReceiverMain(void) * End of WAL reached on the requested timeline. Close the last * segment, and await for new orders from the startup process. */ - if (recvFile >= 0) + if (recvFile >= 0 || mappedFileAddr != NULL) { char xlogfname[MAXFNAMELEN]; XLogWalRcvFlush(false); - if (close(recvFile) != 0) + if (do_XLogFileClose(recvFile, mappedFileAddr) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -621,6 +623,7 @@ WalReceiverMain(void) XLogArchiveNotify(xlogfname); } recvFile = -1; + mappedFileAddr = NULL; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); WalRcvWaitForStartPosition(&startpoint, &startpointTLI); @@ -931,7 +934,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int segbytes; - if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + if ((recvFile < 0 && mappedFileAddr == NULL) || + !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) { bool use_existent; @@ -939,7 +943,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * fsync() and close current file before we switch to next one. We * would otherwise have to reopen this file to fsync it later */ - if (recvFile >= 0) + if (recvFile >= 0 || mappedFileAddr != NULL) { char xlogfname[MAXFNAMELEN]; @@ -950,7 +954,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * process soon, so we don't advise the OS to release cache * pages associated with the file like XLogFileClose() does. */ - if (close(recvFile) != 0) + if (do_XLogFileClose(recvFile, mappedFileAddr) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -967,11 +971,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) XLogArchiveNotify(xlogfname); } recvFile = -1; + mappedFileAddr = NULL; /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); use_existent = true; - recvFile = XLogFileInit(recvSegNo, &use_existent, true); + recvFile = XLogFileInit(recvSegNo, &use_existent, true, &mappedFileAddr); recvFileTLI = ThisTimeLineID; recvOff = 0; } @@ -987,30 +992,39 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* Need to seek in the file? */ if (recvOff != startoff) { - if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(recvFileTLI, recvSegNo), - startoff))); + if (!mappedFileAddr) + if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log segment %s to offset %u: %m", + XLogFileNameP(recvFileTLI, recvSegNo), + startoff))); recvOff = startoff; } - /* OK to write the logs */ - errno = 0; - - byteswritten = write(recvFile, buf, segbytes); - if (byteswritten <= 0) + if (mappedFileAddr) { - /* if write didn't set errno, assume no disk space */ - if (errno == 0) - errno = ENOSPC; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log segment %s " - "at offset %u, length %lu: %m", - XLogFileNameP(recvFileTLI, recvSegNo), - recvOff, (unsigned long) segbytes))); + PmemFileWrite((char *)mappedFileAddr+startoff, buf, segbytes); + byteswritten = segbytes; + } + else + { + /* OK to write the logs */ + errno = 0; + + byteswritten = write(recvFile, buf, segbytes); + if (byteswritten <= 0) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log segment %s " + "at offset %u, length %lu: %m", + XLogFileNameP(recvFileTLI, recvSegNo), + recvOff, (unsigned long) segbytes))); + } } /* Update state for write */ -- 2.16.4