Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL - Mailing list pgsql-hackers
From | Heikki Linnakangas |
---|---|
Subject | Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL |
Date | |
Msg-id | 4BA361E4.7020309@enterprisedb.com Whole thread Raw |
In response to | Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL (Simon Riggs <simon@2ndQuadrant.com>) |
Responses |
Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL
Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL |
List | pgsql-hackers |
Simon Riggs wrote: > On Thu, 2010-03-18 at 23:27 +0900, Fujii Masao wrote: > >> I agree that this is a bigger problem. Since the standby always starts >> walreceiver before replaying any WAL files in pg_xlog, walreceiver tries >> to receive the WAL files following the REDO starting point even if they >> have already been in pg_xlog. IOW, the same WAL files might be shipped >> from the primary to the standby many times. This behavior is unsmart, >> and should be addressed. > > We might also have written half a file many times. The files in pg_xlog > are suspect whereas the files in the archive are not. If we have both we > should prefer the archive. Yep. Here's a patch I've been playing with. The idea is that in standby mode, the server keeps trying to make progress in the recovery by: a) restoring files from archive b) replaying files from pg_xlog c) streaming from master When recovery reaches an invalid WAL record, typically caused by a half-written WAL file, it closes the file and moves to the next source. If an error is found in a file restored from archive or in a portion just streamed from master, however, a PANIC is thrown, because it's not expected to have errors in the archive or in the master. When a file is streamed from master, it's left in pg_xlog, so it's found there after a standby restart, and recovery can progress to the same point as before restart. It also means that you can copy partial WAL files to pg_xlog at any time and have them replayed in a few seconds. The code structure is a bit spaghetti-like, I'm afraid. Any suggestions on how to improve that are welcome.. -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 445,464 **** static uint32 openLogSeg = 0; static uint32 openLogOff = 0; /* * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which * will be just past that page. readLen indicates how much of the current ! * page has been read into readBuf. */ static int readFile = -1; static uint32 readId = 0; static uint32 readSeg = 0; static uint32 readOff = 0; static uint32 readLen = 0; ! /* Is the currently open segment being streamed from primary? */ ! static bool readStreamed = false; /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ static char *readBuf = NULL; --- 445,477 ---- static uint32 openLogOff = 0; /* + * Codes indicating where we got a WAL file from during recovery, or where + * to attempt to get one. + */ + #define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */ + #define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */ + #define XLOG_FROM_STREAM (1<<2) /* Streamed from master */ + + /* * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which * will be just past that page. readLen indicates how much of the current ! * page has been read into readBuf, and readSource indicates where we got ! * the currently open file from. */ static int readFile = -1; static uint32 readId = 0; static uint32 readSeg = 0; static uint32 readOff = 0; static uint32 readLen = 0; + static int readSource = 0; /* XLOG_FROM_* code */ ! /* ! * Keeps track of which sources we've tried to read the current WAL ! * record from and failed. ! */ ! static int failedSources = 0; /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ static char *readBuf = NULL; *************** *** 512,520 **** static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock); static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ! bool fromArchive, bool notexistOk); static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, ! bool fromArchive); static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, bool randAccess); static void XLogFileClose(void); --- 525,533 ---- bool find_free, int *max_advance, bool use_lock); static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ! int source, bool notexistOk); static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, ! int sources); static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, bool randAccess); static void XLogFileClose(void); *************** *** 2567,2573 **** XLogFileOpen(uint32 log, uint32 seg) */ static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ! bool fromArchive, bool notfoundOk) { char xlogfname[MAXFNAMELEN]; char activitymsg[MAXFNAMELEN + 16]; --- 2580,2586 ---- */ static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ! int source, bool notfoundOk) { char xlogfname[MAXFNAMELEN]; char activitymsg[MAXFNAMELEN + 16]; *************** *** 2576,2598 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, XLogFileName(xlogfname, tli, log, seg); ! if (fromArchive) { ! /* Report recovery progress in PS display */ ! snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", ! xlogfname); ! set_ps_display(activitymsg, false); ! restoredFromArchive = RestoreArchivedFile(path, xlogfname, ! "RECOVERYXLOG", ! XLogSegSize); ! if (!restoredFromArchive) ! return -1; ! } ! else ! { ! XLogFilePath(path, tli, log, seg); ! restoredFromArchive = false; } fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); --- 2589,2616 ---- XLogFileName(xlogfname, tli, log, seg); ! switch (source) { ! case XLOG_FROM_ARCHIVE: ! /* Report recovery progress in PS display */ ! snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", ! xlogfname); ! set_ps_display(activitymsg, false); ! restoredFromArchive = RestoreArchivedFile(path, xlogfname, ! "RECOVERYXLOG", ! XLogSegSize); ! if (!restoredFromArchive) ! return -1; ! break; ! ! case XLOG_FROM_PG_XLOG: ! XLogFilePath(path, tli, log, seg); ! restoredFromArchive = false; ! break; ! ! default: ! elog(ERROR, "invalid XLogFileRead source %d", source); } fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); *************** *** 2606,2611 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, --- 2624,2631 ---- xlogfname); set_ps_display(activitymsg, false); + readSource = source; + return fd; } if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ *************** *** 2624,2630 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, * searched in pg_xlog if not found in archive. */ static int ! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive) { char path[MAXPGPATH]; ListCell *cell; --- 2644,2650 ---- * searched in pg_xlog if not found in archive. */ static int ! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources) { char path[MAXPGPATH]; ListCell *cell; *************** *** 2647,2666 **** XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive) if (tli < curFileTLI) break; /* don't bother looking at too-old TLIs */ ! fd = XLogFileRead(log, seg, emode, tli, fromArchive, true); ! if (fd != -1) ! return fd; ! /* ! * If not in StandbyMode, fall back to searching pg_xlog. In ! * StandbyMode we're streaming segments from the primary to pg_xlog, ! * and we mustn't confuse the (possibly partial) segments in pg_xlog ! * with complete segments ready to be applied. We rather wait for the ! * records to arrive through streaming. ! */ ! if (!StandbyMode && fromArchive) { ! fd = XLogFileRead(log, seg, emode, tli, false, true); if (fd != -1) return fd; } --- 2667,2685 ---- if (tli < curFileTLI) break; /* don't bother looking at too-old TLIs */ ! if (sources & XLOG_FROM_ARCHIVE) ! { ! fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_ARCHIVE, true); ! if (fd != -1) ! { ! elog(DEBUG1, "got WAL segment from archive"); ! return fd; ! } ! } ! if (sources & XLOG_FROM_PG_XLOG) { ! fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_PG_XLOG, true); if (fd != -1) return fd; } *************** *** 3530,3545 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) uint32 pageHeaderSize; int emode; - /* - * We don't expect any invalid records during streaming recovery: we - * should never hit the end of WAL because we wait for it to be streamed. - * Therefore treat any broken WAL as PANIC, instead of failing over. - */ - if (StandbyMode) - emode = PANIC; - else - emode = emode_arg; - if (readBuf == NULL) { /* --- 3549,3554 ---- *************** *** 3591,3600 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) randAccess = true; /* allow curFileTLI to go backwards too */ } /* Read the page containing the record */ ! if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) return NULL; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; if (targetRecOff == 0) --- 3600,3623 ---- randAccess = true; /* allow curFileTLI to go backwards too */ } + /* This is the first try read this page. */ + failedSources = 0; + retry: /* Read the page containing the record */ ! if (!XLogPageRead(RecPtr, emode_arg, fetching_ckpt, randAccess)) return NULL; + /* + * We don't expect any invalid records in archive or in records streamed + * from master: we should never hit the end of WAL because we wait for it + * to be streamed. Therefore treat any broken WAL as PANIC, instead of + * failing over. + */ + if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE) + emode = PANIC; + else + emode = emode_arg; + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; if (targetRecOff == 0) *************** *** 3828,3833 **** next_record_is_invalid:; --- 3851,3864 ---- close(readFile); readFile = -1; } + + /* In standby-mode, retry from another source */ + if (StandbyMode) + { + failedSources |= readSource; + goto retry; + } + return NULL; } *************** *** 8698,8704 **** StartupProcessMain(void) * as for waiting for the requested WAL record to arrive in standby mode. */ static bool ! XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, bool randAccess) { static XLogRecPtr receivedUpto = {0, 0}; --- 8729,8735 ---- * as for waiting for the requested WAL record to arrive in standby mode. */ static bool ! XLogPageRead(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt, bool randAccess) { static XLogRecPtr receivedUpto = {0, 0}; *************** *** 8707,8719 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, uint32 targetRecOff; uint32 targetId; uint32 targetSeg; XLByteToSeg(*RecPtr, targetId, targetSeg); targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; /* Fast exit if we have read the record in the current buffer already */ ! if (targetId == readId && targetSeg == readSeg && targetPageOff == readOff && targetRecOff < readLen) return true; --- 8738,8752 ---- uint32 targetRecOff; uint32 targetId; uint32 targetSeg; + int emode; + static pg_time_t last_fail_time = 0; XLByteToSeg(*RecPtr, targetId, targetSeg); targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; /* Fast exit if we have read the record in the current buffer already */ ! if (failedSources == 0 && targetId == readId && targetSeg == readSeg && targetPageOff == readOff && targetRecOff < readLen) return true; *************** *** 8725,8742 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, { close(readFile); readFile = -1; } XLByteToSeg(*RecPtr, readId, readSeg); /* See if we need to retrieve more data */ if (readFile < 0 || ! (readStreamed && !XLByteLT(*RecPtr, receivedUpto))) { if (StandbyMode) { - bool last_restore_failed = false; - /* * In standby mode, wait for the requested record to become * available, either via restore_command succeeding to restore the --- 8758,8775 ---- { close(readFile); readFile = -1; + readSource = 0; } XLByteToSeg(*RecPtr, readId, readSeg); + retry: /* See if we need to retrieve more data */ if (readFile < 0 || ! (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto))) { if (StandbyMode) { /* * In standby mode, wait for the requested record to become * available, either via restore_command succeeding to restore the *************** *** 8746,8751 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, --- 8779,8786 ---- { if (WalRcvInProgress()) { + failedSources = 0; + /* * While walreceiver is active, wait for new WAL to arrive * from primary. *************** *** 8761,8775 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, { readFile = XLogFileRead(readId, readSeg, PANIC, ! recoveryTargetTLI, false, false); switched_segment = true; ! readStreamed = true; } break; } if (CheckForStandbyTrigger()) ! goto next_record_is_invalid; /* * When streaming is active, we want to react quickly when --- 8796,8811 ---- { readFile = XLogFileRead(readId, readSeg, PANIC, ! recoveryTargetTLI, ! XLOG_FROM_PG_XLOG, false); switched_segment = true; ! readSource = XLOG_FROM_STREAM; } break; } if (CheckForStandbyTrigger()) ! goto triggered; /* * When streaming is active, we want to react quickly when *************** *** 8779,8784 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, --- 8815,8823 ---- } else { + int sources; + pg_time_t now; + /* * Until walreceiver manages to reconnect, poll the * archive. *************** *** 8791,8828 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, /* Reset curFileTLI if random fetch. */ if (randAccess) curFileTLI = 0; ! readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true); switched_segment = true; - readStreamed = false; if (readFile != -1) { - elog(DEBUG1, "got WAL segment from archive"); break; } /* ! * If we succeeded restoring some segments from archive ! * since the last connection attempt (or we haven't tried ! * streaming yet, retry immediately. But if we haven't, ! * assume the problem is persistent, so be less ! * aggressive. */ ! if (last_restore_failed) { ! /* ! * Check to see if the trigger file exists. Note that ! * we do this only after failure, so when you create ! * the trigger file, we still finish replaying as much ! * as we can before failover. ! */ ! if (CheckForStandbyTrigger()) ! goto next_record_is_invalid; ! pg_usleep(5000000L); /* 5 seconds */ } ! last_restore_failed = true; /* ! * Nope, not found in archive. Try to stream it. * * If fetching_ckpt is TRUE, RecPtr points to the initial * checkpoint location. In that case, we use RedoStartLSN --- 8830,8877 ---- /* Reset curFileTLI if random fetch. */ if (randAccess) curFileTLI = 0; ! ! /* ! * Try to restore the file from archive, or read an ! * existing file from pg_xlog. ! */ ! sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; ! sources &= ~failedSources; ! readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, ! sources); switched_segment = true; if (readFile != -1) { break; } /* ! * Nope, not found in archive. ! */ ! ! /* ! * Check to see if the trigger file exists. Note that ! * we do this only after failure, so when you create ! * the trigger file, we still finish replaying as much ! * as we can from archive and pg_xlog before failover. */ ! if (CheckForStandbyTrigger()) ! goto triggered; ! ! /* ! * Sleep if it hasn't been long since last attempt. ! */ ! now = (pg_time_t) time(NULL); ! if ((now - last_fail_time) < 5) { ! pg_usleep(1000000L * (5 - (now - last_fail_time))); ! now = (pg_time_t) time(NULL); } ! last_fail_time = now; /* ! * If primary_conninfo is set, launch walreceiver to ! * try to stream the missing WAL. * * If fetching_ckpt is TRUE, RecPtr points to the initial * checkpoint location. In that case, we use RedoStartLSN *************** *** 8847,8859 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, /* In archive or crash recovery. */ if (readFile < 0) { /* Reset curFileTLI if random fetch. */ if (randAccess) curFileTLI = 0; ! readFile = XLogFileReadAnyTLI(readId, readSeg, emode, ! InArchiveRecovery); switched_segment = true; - readStreamed = false; if (readFile < 0) return false; } --- 8896,8914 ---- /* In archive or crash recovery. */ if (readFile < 0) { + int sources; /* Reset curFileTLI if random fetch. */ if (randAccess) curFileTLI = 0; ! ! sources = XLOG_FROM_PG_XLOG; ! if (InArchiveRecovery) ! sources |= XLOG_FROM_ARCHIVE; ! sources &= ~failedSources; ! ! readFile = XLogFileReadAnyTLI(readId, readSeg, emode_arg, ! sources); switched_segment = true; if (readFile < 0) return false; } *************** *** 8861,8878 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, } /* ! * At this point, we have the right segment open and we know the requested ! * record is in it. */ Assert(readFile != -1); /* * If the current segment is being streamed from master, calculate how * much of the current page we have received already. We know the * requested record has been received, but this is for the benefit of * future calls, to allow quick exit at the top of this function. */ ! if (readStreamed) { if (RecPtr->xlogid != receivedUpto.xlogid || (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ)) --- 8916,8944 ---- } /* ! * At this point, we have the right segment open and if we're streaming ! * we know the requested record is in it. */ Assert(readFile != -1); /* + * We don't expect any invalid records in archive or in records streamed + * from master: we should never hit the end of WAL because we wait for it + * to be streamed. Therefore treat any broken WAL as PANIC, instead of + * failing over. + */ + if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE) + emode = PANIC; + else + emode = emode_arg; + + /* * If the current segment is being streamed from master, calculate how * much of the current page we have received already. We know the * requested record has been received, but this is for the benefit of * future calls, to allow quick exit at the top of this function. */ ! if (readSource == XLOG_FROM_STREAM) { if (RecPtr->xlogid != receivedUpto.xlogid || (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ)) *************** *** 8936,8946 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, return true; next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; - readStreamed = false; readLen = 0; return false; } --- 9002,9026 ---- return true; next_record_is_invalid: + failedSources |= readSource; + + if (readFile >= 0) + close(readFile); + readFile = -1; + readLen = 0; + readSource = 0; + + if (StandbyMode) + goto retry; + else + return false; + + triggered: if (readFile >= 0) close(readFile); readFile = -1; readLen = 0; + readSource = 0; return false; }
pgsql-hackers by date: