diff --git a/configure b/configure index 83abe872aa..e7fffcea86 100755 --- a/configure +++ b/configure @@ -11037,6 +11037,84 @@ _ACEOF fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for main in -lrt" >&5 +$as_echo_n "checking for main in -lrt... " >&6; } +if ${ac_cv_lib_rt_main+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lrt $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + + +int +main () +{ +return main (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_rt_main=yes +else + ac_cv_lib_rt_main=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_rt_main" >&5 +$as_echo "$ac_cv_lib_rt_main" >&6; } +if test "x$ac_cv_lib_rt_main" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBRT 1 +_ACEOF + + LIBS="-lrt $LIBS" + +fi + +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for main in -laio" >&5 +$as_echo_n "checking for main in -laio... " >&6; } +if ${ac_cv_lib_aio_main+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-laio $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + + +int +main () +{ +return main (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_aio_main=yes +else + ac_cv_lib_aio_main=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_aio_main" >&5 +$as_echo "$ac_cv_lib_aio_main" >&6; } +if test "x$ac_cv_lib_aio_main" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBAIO 1 +_ACEOF + + LIBS="-laio $LIBS" + +fi + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing setproctitle" >&5 $as_echo_n "checking for library containing setproctitle... " >&6; } if ${ac_cv_search_setproctitle+:} false; then : diff --git a/configure.in b/configure.in index ecdf172396..1d2f9b9d4c 100644 --- a/configure.in +++ b/configure.in @@ -1139,6 +1139,8 @@ AC_SUBST(PTHREAD_LIBS) ## AC_CHECK_LIB(m, main) +AC_CHECK_LIB(rt, main) +AC_CHECK_LIB(aio, main) AC_SEARCH_LIBS(setproctitle, util) AC_SEARCH_LIBS(dlopen, dl) AC_SEARCH_LIBS(socket, [socket ws2_32]) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f9980cf80c..94a3258438 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -473,6 +473,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, bool *foundPtr); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); +static int PerformAIORequest(int start_buffer_id, int num_buffers); static void CheckForBufferLeaks(void); static int rnode_comparator(const void *p1, const void *p2); static int buffertag_comparator(const void *p1, const void *p2); @@ -705,6 +706,135 @@ ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum, mode, strategy, &hit); } +static int +PerformAIORequest ( + int start_buffer_id, + int num_buffers +) { + int ii = 0; + int num_requests = 0; + int buf_id = start_buffer_id; + aio_req_t req[MAX_AIO_BATCH_SIZE]; + BufferDesc *buf; + uint32 buf_state; + + Assert(num_buffers <= MAX_AIO_BATCH_SIZE); + Assert((start_buffer_id + num_buffers) <= NBuffers); + + memset(req, 0, (sizeof(req))); + + for (ii = 0; ii < num_buffers; ++ii) { + XLogRecPtr recptr; + + buf = GetBufferDescriptor(buf_id++); + + buf_state = pg_atomic_read_u32(&buf->state); + if (!(buf_state & BM_VALID) + || !(buf_state & BM_DIRTY) + || (buf_state & BM_IO_IN_PROGRESS)) { + continue; + } + + buf_state = LockBufHdr(buf); + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { + UnlockBufHdr(buf, buf_state); + continue; + } + + ResourceOwnerEnlargeBuffers(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + PinBuffer_Locked(buf); + if (!LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf), + LW_SHARED)) { + UnpinBuffer(buf, true); + continue; + } + + buf_state = LockBufHdr(buf); + if (buf_state & BM_IO_IN_PROGRESS) { + LWLockRelease(BufferDescriptorGetContentLock(buf)); + UnlockBufHdr(buf, buf_state); + UnpinBuffer(buf, true); + continue; + } + + if (!LWLockConditionalAcquire(BufferDescriptorGetIOLock(buf), + LW_EXCLUSIVE)) { + LWLockRelease(BufferDescriptorGetContentLock(buf)); + UnlockBufHdr(buf, buf_state); + UnpinBuffer(buf, true); + continue; + } + + if (buf_state & BM_IO_IN_PROGRESS) { + LWLockRelease(BufferDescriptorGetIOLock(buf)); + LWLockRelease(BufferDescriptorGetContentLock(buf)); + UnlockBufHdr(buf, buf_state); + UnpinBuffer(buf, true); + continue; + } + + if (!(buf_state & BM_DIRTY)) { + LWLockRelease(BufferDescriptorGetIOLock(buf)); + LWLockRelease(BufferDescriptorGetContentLock(buf)); + UnlockBufHdr(buf, buf_state); + UnpinBuffer(buf, true); + continue; + } + + recptr = BufferGetLSN(buf); + + buf_state &= ~BM_JUST_DIRTIED; + buf_state |= BM_IO_IN_PROGRESS; + + UnlockBufHdr(buf, buf_state); + + if (buf_state & BM_PERMANENT) { + XLogFlush(recptr); + } + + req[ii].reln = smgropen(buf->tag.rnode, InvalidBackendId); + req[ii].bnum = buf->tag.blockNum; + req[ii].fnum = buf->tag.forkNum, + req[ii].buf = PageSetChecksumCopy((Page) BufHdrGetBlock( + buf), buf->tag.blockNum); + req[ii].isValid = true; + + ++num_requests; + } + + if (0 < num_requests) { + /* Tell the storage manager to do it's job */ + if (!(smgraio(req))) { + elog(PANIC, "SMGRAIO error!\n"); + } + + pgBufferUsage.shared_blks_written += num_requests; + + buf_id = start_buffer_id; + for (ii = 0; ii < num_buffers; ++ii) { + buf = GetBufferDescriptor(buf_id++); + + if (!req[ii].isValid) { + continue; + } + + /* Lock the buffer header while we play with it */ + buf_state = LockBufHdr(buf); + buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); + if (!(buf_state & BM_JUST_DIRTIED)) { + buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); + } + + LWLockRelease(BufferDescriptorGetIOLock(buf)); + LWLockRelease(BufferDescriptorGetContentLock(buf)); + UnlockBufHdr(buf, buf_state); + UnpinBuffer(buf, true); + } + } + + return (num_requests); +} /* PerformAIORequest() */ /* * ReadBuffer_common -- common logic for all ReadBuffer variants @@ -2293,29 +2423,53 @@ BgBufferSync(WritebackContext *wb_context) reusable_buffers = reusable_buffers_est; /* Execute the LRU scan */ - while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) - { - int sync_state = SyncOneBuffer(next_to_clean, true, - wb_context); - - if (++next_to_clean >= NBuffers) - { - next_to_clean = 0; - next_passes++; + if (EnableAsyncIO) { + /* + * No strategy other than to keep it as clean as possible. + */ + saved_info_valid = false; + while (num_to_scan > 0) { + int request_size = Min(num_to_scan, max_asyncio_events); + + /* XXX refactor to remove duplicate aio call */ + if ((next_to_clean + request_size) >= NBuffers) { + request_size = (NBuffers - next_to_clean); + num_written += PerformAIORequest(next_to_clean, request_size); + reusable_buffers += num_written; + next_to_clean = 0; + ++next_passes; + } else { + num_written += PerformAIORequest(next_to_clean, request_size); + reusable_buffers += num_written; + next_to_clean += request_size; + } + num_to_scan -= request_size; } - num_to_scan--; - - if (sync_state & BUF_WRITTEN) + } else { + while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - reusable_buffers++; - if (++num_written >= bgwriter_lru_maxpages) + int sync_state = SyncOneBuffer(next_to_clean, true, + wb_context); + + if (++next_to_clean >= NBuffers) { - BgWriterStats.m_maxwritten_clean++; - break; + next_to_clean = 0; + next_passes++; + } + num_to_scan--; + + if (sync_state & BUF_WRITTEN) + { + reusable_buffers++; + if (++num_written >= bgwriter_lru_maxpages) + { + BgWriterStats.m_maxwritten_clean++; + break; + } } + else if (sync_state & BUF_REUSABLE) + reusable_buffers++; } - else if (sync_state & BUF_REUSABLE) - reusable_buffers++; } BgWriterStats.m_buf_written_clean += num_written; diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 6ffd7b3306..21c4e6f355 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -434,7 +434,12 @@ InitLocalBuffers(void) /* Allocate and zero buffer headers and auxiliary arrays */ LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc)); - LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block)); + + /* Ensure alignment of local buffers */ + LocalBufferBlockPointers = + (Block *) DIOBUFFERALIGN(malloc((sizeof(Block) * nbufs) + ALIGNOF_DIRECTIO)); + MemSet((void *) LocalBufferBlockPointers, 0, (sizeof(Block) * nbufs)); + LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32)); if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount) ereport(FATAL, diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 7dc6dd2f15..032057bf6b 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -271,6 +271,11 @@ static Oid *tempTableSpaces = NULL; static int numTempTableSpaces = -1; static int nextTempTableSpace = 0; +#ifdef USE_POSIX_AIO +# include +#else +# include +#endif /* USE_POSIX_AIO */ /*-------------------- * @@ -336,6 +341,11 @@ static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel); static int fsync_parent_path(const char *fname, int elevel); +/* Asynchronous and Direct I/O GUCs */ +bool EnableDirectIO = false; +bool EnableAsyncIO = false; +int max_asyncio_events = MAX_AIO_BATCH_SIZE; + /* * pg_fsync --- do fsync with or without writethrough */ @@ -1693,6 +1703,177 @@ OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError) return file; } +#ifdef USE_POSIX_AIO +bool +FileAIO ( + aio_req_t *req +) { + struct aiocb aio_array[MAX_AIO_BATCH_SIZE]; + struct aiocb *aio_list[MAX_AIO_BATCH_SIZE]; + int rc; + int ii = 0; + int aio_nent = 0; + + memset(aio_array, 0, sizeof(aio_array)); + + for (ii = 0; ii < max_asyncio_events; ++ii) { + if (!req[ii].isValid) { + continue; + } + + Assert(FileIsValid(req[ii].file)); + rc = FileAccess(req[ii].file); + if (rc < 0) { + elog(ERROR, "FileAccess returned %d at line %d in %s", + rc, __LINE__, __func__); + continue; + } + + aio_array[aio_nent].aio_lio_opcode = LIO_WRITE; + aio_array[aio_nent].aio_fildes = VfdCache[req[ii].file].fd; + aio_array[aio_nent].aio_buf = req[ii].buf; + aio_array[aio_nent].aio_nbytes = BLCKSZ; + aio_array[aio_nent].aio_offset = req[ii].offset; + aio_list[aio_nent] = &aio_array[aio_nent]; + + ++aio_nent; + } + + rc = lio_listio(LIO_WAIT, aio_list, aio_nent, 0); + if (rc) { + if (EINTR == errno) { + int continue_loop; + + do { + aio_suspend((const struct aiocb *const *) aio_list, + aio_nent, NULL); + continue_loop = 0; + for (ii = 0; ii < aio_nent; ++ii) { + if (EINPROGRESS == aio_error(aio_list[ii])) { + continue_loop = 1; + } else { + if (-1 == aio_return(aio_list[ii])) { + ereport(PANIC, + (errcode_for_file_access(), + errmsg("unhandled POSIX AIO condition: %m"))); + } + } + } + } while (continue_loop); + elog(LOG, "completed EINTR handling"); + } else { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("lio_listio returned: %m"))); + } + } + + for (ii = 0; ii < aio_nent; ++ii) { + int aio_errno = aio_error(aio_list[ii]); + size_t aio_ret = aio_return(aio_list[ii]); + + if (BLCKSZ != aio_ret) { + errno = aio_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("lio_listio returned: %m"))); + } + } + + return (true); +} /* FileAIO() */ +#else /* use libaio */ +bool +FileAIO ( + aio_req_t *req +) { + int rc, i, num_iocbs; + static io_context_t ctx; + static bool initialized = false; + struct iocb array[MAX_AIO_BATCH_SIZE]; + struct iocb *list[MAX_AIO_BATCH_SIZE]; + struct io_event reap[MAX_AIO_BATCH_SIZE]; + + /* We only need to do io_setup once */ + if (!initialized) { + memset(&ctx, 0x0, sizeof(ctx)); + rc = io_setup(MAX_AIO_BATCH_SIZE, &ctx); + if (rc) { + elog(PANIC, "io_setup returned %d", rc); + } + initialized = true; + } + +retry: + memset(array, 0x0, sizeof(struct iocb) * MAX_AIO_BATCH_SIZE); + memset(reap, 0x0, sizeof(struct io_event) * MAX_AIO_BATCH_SIZE); + + /* + * Iterate over the AIO request structure adding each buffer to + * the list. + */ + for (i = 0, num_iocbs = 0 + ; i < max_asyncio_events + ; ++i) { + if (!req[i].isValid) { + continue; + } + + Assert(FileIsValid(req[i].file)); + rc = FileAccess(req[i].file); + if (rc < 0) { + elog(ERROR, "FileAccess returned %d at line %d in %s", + rc, __LINE__, __func__); + list[i] = NULL; + continue; + } + + array[i].aio_lio_opcode = IO_CMD_PWRITE; + array[i].aio_fildes = VfdCache[req[i].file].fd; + array[i].u.c.buf = req[i].buf; + array[i].u.c.nbytes = BLCKSZ; + array[i].u.c.offset = req[i].offset; + list[num_iocbs++] = &array[i]; + } + + /* Submit the request for processing */ + errno = 0; + elog(DEBUG1, "submitting libaio request containing %d iocbs", num_iocbs); + rc = io_submit(ctx, num_iocbs, list); + if (rc != num_iocbs) { + elog(PANIC, "io_submit returned %d nr_submitted: %d errno: %d %m", rc, + num_iocbs, errno); + } + + rc = io_getevents(ctx, num_iocbs, num_iocbs, reap, NULL); + if (rc != num_iocbs) { + elog(LOG, "io_getevents failed (nr=%d rc=%d)", num_iocbs, rc); + for (i = 0; i < num_iocbs; ++i) { + if (reap[i].res2 != 0) { + elog(LOG, "io_getevents IO=%d status=%lu errno=%d %m", i, + reap[i].res2, errno); + } + + if ((reap[i].obj != NULL) && + (reap[i].res != reap[i].obj->u.c.nbytes)) { + elog(LOG, "io_getevents missing bytes (expected %lu got %lu)", + reap[i].obj->u.c.nbytes, reap[i].res); + } + } + + /* + * XXX this should be changed to handle individual failures rather + * than re-submitting the entire request, but for now, just retry. + */ + elog(LOG, "retrying AIO request"); + goto retry; + } + + return (true); +} /* FileAIO() */ +#endif /* USE_POSIX_AIO */ + + /* * Create a new file. The directory containing it must already exist. Files diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 4c14e51c67..c6c2fcff49 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -130,7 +130,7 @@ LWLockPadded *MainLWLockArray = NULL; * occasionally the number can be much higher; for example, the pg_buffercache * extension locks all buffer partitions simultaneously. */ -#define MAX_SIMUL_LWLOCKS 200 +#define MAX_SIMUL_LWLOCKS 1024 /* struct representing the LWLocks we're holding */ typedef struct LWLockHandle diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index e0b020da11..2c219706dd 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -87,6 +87,20 @@ typedef struct _MdfdVec static MemoryContext MdCxt; /* context for all MdfdVec objects */ +/* Direct I/O Support */ +static char *localDirectIOBuffer = NULL; +static char *alignedDirectIOBuffer = NULL; + +/* Direct I/O flags passed to open() */ +#define DIRECT_IO_FLAGS ((EnableDirectIO) ? (PG_O_DIRECT | O_SYNC) : (0)) + +/* bool DIRECTIO_BUFFER_REQUIRED (void *ptr); */ +#define DIRECTIO_BUFFER_REQUIRED(ptr) \ + (EnableDirectIO && ((long) ptr & (ALIGNOF_DIRECTIO - 1))) + /* + ** Returns true iff Direct I/O is enabled and the given pointer isn't + ** properly aligned. + */ /* Populate a file tag describing an md.c segment file. */ #define INIT_MD_FILETAG(a,xx_rnode,xx_forknum,xx_segno) \ @@ -150,6 +164,15 @@ mdinit(void) MdCxt = AllocSetContextCreate(TopMemoryContext, "MdSmgr", ALLOCSET_DEFAULT_SIZES); + + /* + * Initialize the buffer used for handling unaligned reads/writes when + * Direct I/O is enabled. + */ + if (EnableDirectIO) { + localDirectIOBuffer = (char *) palloc(BLCKSZ + ALIGNOF_DIRECTIO); + alignedDirectIOBuffer = (char *) DIOBUFFERALIGN(localDirectIOBuffer); + } } /* @@ -201,14 +224,14 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo) path = relpath(reln->smgr_rnode, forkNum); - fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY); + fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY | DIRECT_IO_FLAGS); if (fd < 0) { int save_errno = errno; if (isRedo) - fd = PathNameOpenFile(path, O_RDWR | PG_BINARY); + fd = PathNameOpenFile(path, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS); if (fd < 0) { /* be sure to report the error reported by create, not open */ @@ -390,6 +413,7 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, off_t seekpos; int nbytes; MdfdVec *v; + char *ptr; /* This assert is too expensive to have on normally ... */ #ifdef CHECK_WRITE_VS_EXTEND @@ -414,7 +438,15 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ) + /* Use the Direct I/O buffer iff it's required */ + if (DIRECTIO_BUFFER_REQUIRED(buffer)) { + memcpy(alignedDirectIOBuffer, buffer, BLCKSZ); + ptr = alignedDirectIOBuffer; + } else { + ptr = buffer; + } + + if ((nbytes = FileWrite(v->mdfd_vfd, ptr, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ) { if (nbytes < 0) ereport(ERROR, @@ -460,7 +492,7 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior) path = relpath(reln->smgr_rnode, forknum); - fd = PathNameOpenFile(path, O_RDWR | PG_BINARY); + fd = PathNameOpenFile(path, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS); if (fd < 0) { @@ -556,6 +588,10 @@ void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks) { + if (EnableDirectIO) { + return; + } + /* * Issue flush requests in as few requests as possible; have to split at * segment boundaries though, since those are actually separate files. @@ -608,6 +644,7 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, off_t seekpos; int nbytes; MdfdVec *v; + char *ptr; TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, reln->smgr_rnode.node.spcNode, @@ -622,7 +659,14 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); + /* Use the Direct I/O buffer iff it's required */ + if (DIRECTIO_BUFFER_REQUIRED(buffer)) { + ptr = alignedDirectIOBuffer; + } else { + ptr = buffer; + } + + nbytes = FileRead(v->mdfd_vfd, ptr, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, @@ -673,6 +717,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, off_t seekpos; int nbytes; MdfdVec *v; + char *ptr; /* This assert is too expensive to have on normally ... */ #ifdef CHECK_WRITE_VS_EXTEND @@ -692,7 +737,15 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + /* Use the Direct I/O buffer iff it's required */ + if (DIRECTIO_BUFFER_REQUIRED(buffer)) { + memcpy(alignedDirectIOBuffer, buffer, BLCKSZ); + ptr = alignedDirectIOBuffer; + } else { + ptr = buffer; + } + + nbytes = FileWrite(v->mdfd_vfd, ptr, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, @@ -894,6 +947,10 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum) int segno; int min_inactive_seg; + if (EnableDirectIO) { + return; + } + /* * NOTE: mdnblocks makes sure we have opened all active segments, so that * fsync loop will get them all! @@ -930,6 +987,31 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum) segno--; } + +} + +/* + * mdaio() -- Asynchronous I/O request. + */ +bool +mdaio (aio_req_t *req) +{ + int i; + MdfdVec *v; + + Assert(req != NULL); + + for (i = 0; i < max_asyncio_events; ++i) { + if (req[i].isValid == false) + continue; + + v = _mdfd_getseg(req[i].reln, req[i].fnum, req[i].bnum, false, EXTENSION_FAIL); + req[i].file = v->mdfd_vfd; + req[i].offset = (off_t) BLCKSZ *(req[i].bnum % ((BlockNumber) RELSEG_SIZE)); + Assert(req[i].offset < BLCKSZ * RELSEG_SIZE); + } + + return FileAIO(req); } /* @@ -1120,7 +1202,7 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno, fullpath = _mdfd_segpath(reln, forknum, segno); /* open the file */ - fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags); + fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS | oflags); pfree(fullpath); @@ -1324,7 +1406,7 @@ mdsyncfiletag(const FileTag *ftag, char *path) strlcpy(path, p, MAXPGPATH); pfree(p); - file = PathNameOpenFile(path, O_RDWR | PG_BINARY); + file = PathNameOpenFile(path, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS); if (file < 0) return -1; need_to_close = true; diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index b053a4dc76..d42b730335 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -61,6 +61,7 @@ typedef struct f_smgr void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum); + bool (*smgr_aio) (aio_req_t *); } f_smgr; static const f_smgr smgrsw[] = { @@ -81,6 +82,7 @@ static const f_smgr smgrsw[] = { .smgr_nblocks = mdnblocks, .smgr_truncate = mdtruncate, .smgr_immedsync = mdimmedsync, + .smgr_aio = mdaio, } }; @@ -709,3 +711,13 @@ AtEOXact_SMgr(void) smgrclose(rel); } } + +/* + * smgraio() -- Asynchronous I/O request. + */ +bool +smgraio (aio_req_t *req) +{ + return (*(smgrsw[0].smgr_aio)) (req); +} + diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 5bdc02fce2..36f672e312 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2060,6 +2060,24 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_directio", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Enables Direct I/O."), + NULL + }, + &EnableDirectIO, + false, NULL, NULL + }, + + { + {"enable_asyncio", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Enables Asynchronous I/O."), + NULL + }, + &EnableAsyncIO, + false, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -3400,6 +3418,15 @@ static struct config_int ConfigureNamesInt[] = NULL, assign_tcp_user_timeout, show_tcp_user_timeout }, + { + {"max_asyncio_events", PGC_POSTMASTER, RESOURCES_KERNEL, + gettext_noop("Sets the maximum number of asynchronous I/O events per request."), + NULL + }, + &max_asyncio_events, + 16, 1, MAX_AIO_BATCH_SIZE, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 995b6ca155..bf94d422e6 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -770,6 +770,18 @@ #include_if_exists = '...' # include file only if it exists #include = '...' # include file +#------------------------------------------------------------------------------ +# AIO/DIO Options +#------------------------------------------------------------------------------ + +# Enables usage of direct I/O (O_DIRECT) +#enable_directio = off + +# Enables usage of asynchronous I/O (libaio or POSIX) +#enable_asyncio = off + +# The maximum number of asynchonous I/O events per call (between 1-512) +#max_asyncio_events = 16 #------------------------------------------------------------------------------ # CUSTOMIZED OPTIONS diff --git a/src/include/c.h b/src/include/c.h index d72b23afe4..c7e8a63fb4 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -691,6 +691,7 @@ typedef NameData *Name; #define MAXALIGN(LEN) TYPEALIGN(MAXIMUM_ALIGNOF, (LEN)) /* MAXALIGN covers only built-in types, not buffers */ #define BUFFERALIGN(LEN) TYPEALIGN(ALIGNOF_BUFFER, (LEN)) +#define DIOBUFFERALIGN(LEN) TYPEALIGN(ALIGNOF_DIRECTIO, (LEN)) #define CACHELINEALIGN(LEN) TYPEALIGN(PG_CACHE_LINE_SIZE, (LEN)) #define TYPEALIGN_DOWN(ALIGNVAL,LEN) \ diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index c199cd46d2..a31c8fcfb5 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -310,6 +310,9 @@ /* Define to 1 if you have the `ldap_initialize' function. */ #undef HAVE_LDAP_INITIALIZE +/* Define to 1 if you have the `aio' library (-laio). */ +#undef HAVE_LIBAIO + /* Define to 1 if you have the `crypto' library (-lcrypto). */ #undef HAVE_LIBCRYPTO @@ -328,6 +331,9 @@ /* Define if you have a function readline library */ #undef HAVE_LIBREADLINE +/* Define to 1 if you have the `rt' library (-lrt). */ +#undef HAVE_LIBRT + /* Define to 1 if you have the `selinux' library (-lselinux). */ #undef HAVE_LIBSELINUX diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h index 8f3ec6bde1..3ccf2fb812 100644 --- a/src/include/pg_config_manual.h +++ b/src/include/pg_config_manual.h @@ -122,6 +122,12 @@ */ #define ALIGNOF_BUFFER 32 +/* + * Preferred alignment for Direct I/O buffers. This should generally be done + * on a 512-byte boundary. + */ +#define ALIGNOF_DIRECTIO 512 + /* * If EXEC_BACKEND is defined, the postmaster uses an alternative method for * starting subprocesses: Instead of simply using fork(), as is standard on @@ -145,6 +151,15 @@ #define USE_POSIX_FADVISE #endif +/* + * USE_POSIX_AIO controls whether Postgres will use POSIX's version of + * asynchonous I/O, or whether we'll use libaio. By default, we'd + * prefer to use libaio where available (i.e. on Linux.) + */ +#ifndef HAVE_LIBAIO +# define USE_POSIX_AIO +#endif + /* * USE_PREFETCH code should be compiled only if we have a way to implement * prefetching. (This is decoupled from USE_POSIX_FADVISE because there diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 8cd125d7df..54125d0fd0 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -44,20 +44,37 @@ #define FD_H #include +#include "storage/smgr.h" +#ifdef USE_POSIX_AIO +# include +#else +# include +#endif /* USE_POSIX_AIO */ typedef int File; - /* GUC parameter */ extern PGDLLIMPORT int max_files_per_process; extern PGDLLIMPORT bool data_sync_retry; +extern PGDLLIMPORT bool EnableDirectIO; +extern PGDLLIMPORT bool EnableAsyncIO; +extern PGDLLIMPORT int max_asyncio_events; /* * This is private to fd.c, but exported for save/restore_backend_variables() */ extern int max_safe_fds; +/* + * The platform-dependent maximum number of AIO requests per batch. + */ +#ifdef AIO_LISTIO_MAX +#define MAX_AIO_BATCH_SIZE AIO_LISTIO_MAX +#else +#define MAX_AIO_BATCH_SIZE 16 +#endif /* AIO_LISTIO_MAX */ + /* * On Windows, we have to interpret EACCES as possibly meaning the same as * ENOENT, because if a file is unlinked-but-not-yet-gone on that platform, @@ -91,6 +108,7 @@ extern char *FilePathName(File file); extern int FileGetRawDesc(File file); extern int FileGetRawFlags(File file); extern mode_t FileGetRawMode(File file); +extern bool FileAIO(aio_req_t *req); /* Operations used for sharing named temporary files */ extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure); diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 07fd1bb7d0..2e167beb90 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -40,6 +40,7 @@ extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); extern void mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum); +extern bool mdaio(aio_req_t *); extern void ForgetDatabaseSyncRequests(Oid dbid); extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index bb8428f27f..816c16b38f 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -76,6 +76,17 @@ typedef struct SMgrRelationData typedef SMgrRelationData *SMgrRelation; +/* Asynchronous I/O request structure */ +typedef struct { + bool isValid; + SMgrRelation reln; + BlockNumber bnum; + ForkNumber fnum; + off_t offset; + int file; + char *buf; +} aio_req_t; + #define SmgrIsTemp(smgr) \ RelFileNodeBackendIsTemp((smgr)->smgr_rnode) @@ -105,6 +116,7 @@ extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nblocks); extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum); +extern bool smgraio(aio_req_t *); extern void AtEOXact_SMgr(void); #endif /* SMGR_H */