Here's a draft patch that adds additional logging code to certain
resource managers.
Note that some operations like CREATE DATABASE are still not logged.
An example to test it might be:
<<ensure wal_debug = 16 in postgresql.conf>>
<<startup -D ./data>>
CREATE DATABASE test;
<<backup ./data/global/pg_control while dirty>>
<<shutdown -D ./data>>
<<backup all of ./data except from pg_xlog into ./data_old>>
<<startup -D ./data>>
CREATE TABLE FOO...
INSERT BUNCH OF RECORDS...
CREATE INDEX ON FOO...
etc, etc.
<<shutdown -D ./data>>
<<copy logfiles from ./data/pg_xlog into ./data_old/pg_xlog>>
<<copy backup pg_control into ./data_old/global>>
<<startup -D ./data_old and watch the recovery run>>
If you find anything that doesn't recover properly, other than
CREATE/DROP DATABASE and the rtree/GiST indexes, please let me know
about it.
This patch implements logging changes for:
=== RM_SMGR create/truncate/delete ===
Note that logging of files pending for unlink has been moved into
smgr.c from xact.c::RecordTransactionCommit, and they get logged after
the commit record. The xlog code has a 2^15 size limit on the max size
of a record we have to avoid. This will not be a problem for
correctness.
=== RM_BTREE _bt_load operations logged properly ===
See nbtree.c and nbtsort.c
=== Incomplete start of XLOG archiver ===
See xlog.c and guc.c
--
J. R. Nield
jrnield@usol.com
*** ./src/backend/access/nbtree/nbtpage.c.orig Thu Jun 20 18:18:12 2002
--- ./src/backend/access/nbtree/nbtpage.c Mon Jul 8 17:58:09 2002
***************
*** 66,72 ****
--- 66,75 ----
elog(ERROR, "Cannot initialize non-empty btree %s",
RelationGetRelationName(rel));
+
buf = ReadBuffer(rel, P_NEW);
+
+ START_CRIT_SECTION();
pg = BufferGetPage(buf);
_bt_pageinit(pg, BufferGetPageSize(buf));
***************
*** 79,86 ****
--- 82,115 ----
op = (BTPageOpaque) PageGetSpecialPointer(pg);
op->btpo_flags = BTP_META;
+ /* Log BTreeInit record for metapage (and implicitly the RelFileNode as
+ * well)
+ */
+ {
+ XLogRecPtr lsn;
+ XLogRecData rdata;
+ xl_btree_metapinit xlrec;
+
+ xlrec.node = rel->rd_node;
+ xlrec.magic = BTREE_MAGIC;
+ xlrec.version = BTREE_VERSION;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &xlrec;
+ rdata.len = SizeOfBtreeMetapinit;
+ rdata.next = NULL;
+
+
+ lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_METAPINIT, &rdata);
+
+ PageSetLSN(BufferGetPage(buf), lsn);
+ PageSetSUI(BufferGetPage(buf), ThisStartUpID);
+ }
+
WriteBuffer(buf);
+ END_CRIT_SECTION();
+
/* all done */
if (USELOCKING)
UnlockRelation(rel, AccessExclusiveLock);
***************
*** 430,441 ****
--- 459,505 ----
metaopaque = (BTPageOpaque) PageGetSpecialPointer(metap);
Assert(metaopaque->btpo_flags & BTP_META);
metad = BTPageGetMeta(metap);
+
+ START_CRIT_SECTION();
+
metad->btm_root = rootbknum;
if (level == 0) /* called from _do_insert */
metad->btm_level += 1;
else
metad->btm_level = level; /* called from btsort */
+
+ /* Log root change. Same as from _bt_getroot and _bt_newroot.
+ *
+ * Note that this was not logged before from here, because _bt_metaproot is
+ * only called from _bt_uppershutdown during _bt_load, and index builds
+ * were not logged before PITR support was added.
+ *
+ * jrnield 2002-07-08
+ */
+ {
+ XLogRecPtr lsn;
+ XLogRecData rdata;
+ xl_btree_newroot xlrec;
+
+ xlrec.node = rel->rd_node;
+ xlrec.level = metad->btm_level;
+ BlockIdSet(&xlrec.rootblk, metad->btm_root);
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &xlrec;
+ rdata.len = SizeOfBtreeNewroot;
+ rdata.next = NULL;
+
+ lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &rdata);
+
+ PageSetLSN(BufferGetPage(metabuf), lsn);
+ PageSetSUI(BufferGetPage(metabuf), ThisStartUpID);
+
+ }
+
_bt_wrtbuf(rel, metabuf);
+
+ END_CRIT_SECTION();
}
/*
*** ./src/backend/access/nbtree/nbtree.c.orig Thu Jun 20 18:18:12 2002
--- ./src/backend/access/nbtree/nbtree.c Mon Jul 8 17:58:57 2002
***************
*** 1096,1101 ****
--- 1096,1200 ----
UnlockAndWriteBuffer(metabuf);
}
+ static void
+ btree_xlog_metapinit(bool redo, XLogRecPtr lsn, XLogRecord *record)
+ {
+ xl_btree_metapinit *xlrec;
+ Relation reln;
+ Buffer buf;
+ Page pg;
+ BTMetaPageData *metapg;
+ BTPageOpaque op;
+
+ if ( !redo )
+ elog(PANIC, "btree_xlog_metapinit: UNDO unimplemented");
+
+ xlrec = (xl_btree_metapinit *)XLogRecGetData(record);
+ reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
+
+ /* XXX: TODO: This should be done for all the other REDO functions now.
+ * jrnield 2002-07-08
+ */
+ if ( !RelationIsValid(reln) )
+ elog(PANIC, "btree_xlog_metapinit: unable to open relation %u/%u",
+ xlrec->node.tblNode, xlrec->node.relNode);
+
+
+ buf = XLogReadBuffer(true, reln, (BlockNumber)0);
+ if ( !BufferIsValid(buf) )
+ elog(PANIC, "btree_xlog_metapinit: failed to read metapage");
+
+ pg = BufferGetPage(buf);
+
+ _bt_pageinit(pg, BufferGetPageSize(buf));
+
+ metapg = BTPageGetMeta(pg);
+
+ metapg->btm_magic = xlrec->magic;
+ metapg->btm_version = xlrec->version;
+ metapg->btm_root = P_NONE;
+ metapg->btm_level = 0;
+
+ op = (BTPageOpaque) PageGetSpecialPointer(pg);
+ op->btpo_flags = BTP_META;
+
+ PageSetLSN(pg, lsn);
+ PageSetSUI(pg, ThisStartUpID);
+ UnlockAndWriteBuffer(buf);
+ }
+
+ static void
+ btree_xlog_buildadd(bool redo, XLogRecPtr lsn, XLogRecord *record)
+ {
+ Relation reln;
+ xl_btree_buildadd *xlrec;
+ Buffer buf;
+ Page pg;
+
+ if ( !redo )
+ elog(PANIC, "btree_xlog_buildadd: UNDO unimplemented");
+
+ xlrec = (xl_btree_buildadd *) XLogRecGetData(record);
+ reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
+
+ if ( !RelationIsValid(reln) )
+ elog(PANIC, "btree_xlog_buildadd: unable to open relation %u/%u",
+ xlrec->node.tblNode, xlrec->node.relNode);
+
+ buf = XLogReadBuffer(true, reln, xlrec->blkno);
+ if ( !BufferIsValid(buf) )
+ elog(PANIC, "btree_xlog_buildadd: error reading block %u",
+ xlrec->blkno);
+
+ pg = BufferGetPage(buf);
+
+ if ( XLByteLE(lsn, PageGetLSN(pg)) )
+ {
+ UnlockAndReleaseBuffer(buf);
+ return;
+ }
+
+ /* We do full block backups for pages generated by _bt_load
+ * because they are almost entirely packed with new data.
+ *
+ * We don't use the XLOG "backup blocks" system, because
+ * that is for writing pre-image backups, and we wan't to
+ * be able to turn that off when possible.
+ */
+ memcpy(BufferGetBlock(buf), xlrec->idxpage, xlrec->idxpagesz);
+
+ PageSetLSN(pg, lsn);
+ PageSetSUI(pg, ThisStartUpID);
+ UnlockAndWriteBuffer(buf);
+ }
+
+ static void
+ btree_xlog_finishlevel(bool redo, XLogRecPtr lsn, XLogRecord *record)
+ {
+ /* For now, everything is the same */
+ btree_xlog_buildadd(redo, lsn, record);
+ }
+
void
btree_redo(XLogRecPtr lsn, XLogRecord *record)
{
***************
*** 1112,1117 ****
--- 1211,1222 ----
btree_xlog_split(true, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
+ else if (info == XLOG_BTREE_METAPINIT)
+ btree_xlog_metapinit(true, lsn, record);
+ else if (info == XLOG_BTREE_BUILDADD)
+ btree_xlog_buildadd(true, lsn, record);
+ else if (info == XLOG_BTREE_FINISHLEVEL)
+ btree_xlog_finishlevel(true, lsn, record);
else
elog(PANIC, "btree_redo: unknown op code %u", info);
}
***************
*** 1132,1137 ****
--- 1237,1248 ----
btree_xlog_split(false, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(false, lsn, record);
+ else if (info == XLOG_BTREE_METAPINIT)
+ btree_xlog_metapinit(false, lsn, record);
+ else if (info == XLOG_BTREE_BUILDADD)
+ btree_xlog_buildadd(false, lsn, record);
+ else if (info == XLOG_BTREE_FINISHLEVEL)
+ btree_xlog_finishlevel(false, lsn, record);
else
elog(PANIC, "btree_undo: unknown op code %u", info);
}
***************
*** 1184,1189 ****
--- 1295,1321 ----
xlrec->node.tblNode, xlrec->node.relNode,
BlockIdGetBlockNumber(&xlrec->rootblk));
}
+ else if (info == XLOG_BTREE_METAPINIT)
+ {
+ xl_btree_metapinit *xlrec = (xl_btree_metapinit *) rec;
+
+ sprintf(buf + strlen(buf), "init: node %u/%u",
+ xlrec->node.tblNode, xlrec->node.relNode);
+ }
+ else if (info == XLOG_BTREE_BUILDADD)
+ {
+ xl_btree_buildadd *xlrec = (xl_btree_buildadd *) rec;
+
+ sprintf(buf + strlen(buf), "insert sort/load page: node %u/%u; blk %u",
+ xlrec->node.tblNode, xlrec->node.relNode, xlrec->blkno);
+ }
+ else if (info == XLOG_BTREE_FINISHLEVEL )
+ {
+ xl_btree_finishlevel *xlrec = (xl_btree_finishlevel *) rec;
+
+ sprintf(buf + strlen(buf), "finish sort/load level: node %u/%u; blk %u",
+ xlrec->node.tblNode, xlrec->node.relNode, xlrec->blkno);
+ }
else
strcat(buf, "UNKNOWN");
}
*** ./src/backend/access/nbtree/nbtsort.c.orig Fri Jul 5 00:31:36 2002
--- ./src/backend/access/nbtree/nbtsort.c Tue Jul 9 15:03:18 2002
***************
*** 433,438 ****
--- 433,464 ----
BufferGetBlockNumber(state->btps_next->btps_buf);
}
+ /* Log BTreeBuild Entry for new index page */
+ {
+ XLogRecPtr lsn;
+ XLogRecData rdata[2];
+ xl_btree_buildadd xlrec;
+
+ xlrec.node = index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(obuf);
+ xlrec.idxpagesz = BLCKSZ;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBtreeBuildAdd;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) BufferGetBlock(obuf);
+ rdata[1].len = BufferGetPageSize(obuf);
+ rdata[1].next = NULL;
+
+ lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_BUILDADD, rdata);
+
+ PageSetLSN(BufferGetPage(obuf), lsn);
+ PageSetSUI(BufferGetPage(obuf), ThisStartUpID);
+ }
+
/*
* Write out the old page. We never want to see it again, so we
* can give up our lock (if we had one; most likely BuildingBtree
***************
*** 516,521 ****
--- 542,575 ----
* slid back one slot. Then we can dump out the page.
*/
_bt_slideleft(index, s->btps_buf, s->btps_page);
+
+ /* Log the last page in the level */
+ {
+ XLogRecPtr lsn;
+ XLogRecData rdata[2];
+ xl_btree_finishlevel xlrec;
+
+ xlrec.node = index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(s->btps_buf);
+ xlrec.idxpagesz = BufferGetPageSize(s->btps_buf);
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBtreeFinishLevel;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *)BufferGetBlock(s->btps_buf);
+ rdata[1].len = BufferGetPageSize(s->btps_buf);
+ rdata[1].next = NULL;
+
+ lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_FINISHLEVEL, rdata);
+
+ PageSetLSN(BufferGetPage(s->btps_buf), lsn);
+ PageSetSUI(BufferGetPage(s->btps_buf), ThisStartUpID);
+
+ }
+
_bt_wrtbuf(index, s->btps_buf);
}
}
*** ./src/backend/access/transam/xact.c.orig Thu Jun 20 18:18:13 2002
--- ./src/backend/access/transam/xact.c Tue Jul 9 12:10:28 2002
***************
*** 556,562 ****
rdata.next = NULL;
/*
! * XXX SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP
*/
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, &rdata);
}
--- 556,566 ----
rdata.next = NULL;
/*
! * RelFileNodes to be deleted are saved AFTER commit
! * by the RM_SMGR.
! *
! * If we crash between now and the delete operation,
! * then we will leak files.
*/
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, &rdata);
}
***************
*** 711,717 ****
START_CRIT_SECTION();
/*
! * SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP
*/
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata);
--- 715,725 ----
START_CRIT_SECTION();
/*
! * RelFileNodes to be deleted are saved AFTER abort by
! * the RM_SMGR.
! *
! * If we crash between now and the delete operation,
! * then we will leak files.
*/
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata);
***************
*** 1588,1599 ****
if (info == XLOG_XACT_COMMIT)
{
TransactionIdCommit(record->xl_xid);
- /* SHOULD REMOVE FILES OF ALL DROPPED RELATIONS */
}
else if (info == XLOG_XACT_ABORT)
{
TransactionIdAbort(record->xl_xid);
- /* SHOULD REMOVE FILES OF ALL FAILED-TO-BE-CREATED RELATIONS */
}
else
elog(PANIC, "xact_redo: unknown op code %u", info);
--- 1596,1605 ----
*** ./src/backend/access/transam/xlog.c.orig Thu Jun 20 18:18:13 2002
--- ./src/backend/access/transam/xlog.c Tue Jul 9 14:55:22 2002
***************
*** 91,98 ****
int XLOG_DEBUG = 0;
char *XLOG_sync_method = NULL;
const char XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
! char XLOG_archive_dir[MAXPGPATH]; /* null string means
* delete 'em */
/*
* XLOGfileslop is used in the code as the allowed "fuzz" in the number of
--- 91,99 ----
int XLOG_DEBUG = 0;
char *XLOG_sync_method = NULL;
const char XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
! char *XLOG_archive_dir = NULL; /* null string means
* delete 'em */
+ const char XLOG_archive_dir_default[] = "";
/*
* XLOGfileslop is used in the code as the allowed "fuzz" in the number of
***************
*** 334,339 ****
--- 335,355 ----
} \
} while (0)
+ #define LogSegPairEQ(logId_lhs, logSeg_lhs, logId_rhs, logSeg_rhs) \
+ ((logId_lhs) == (logId_rhs) && (logSeg_lhs) == (logSeg_rhs))
+
+ #define LogSegPairLT(logId_lhs, logSeg_lhs, logId_rhs, logSeg_rhs) \
+ ( \
+ ((logId_lhs) < (logId_rhs)) || \
+ ((logId_lhs) == (logId_rhs) && (logSeg_lhs) < (logSeg_rhs)) \
+ )
+
+ #define LogSegPairLE(logId_lhs, logSeg_lhs, logId_rhs, logSeg_rhs) \
+ ( \
+ ((logId_lhs) < (logId_rhs)) || \
+ ((logId_lhs) == (logId_rhs) && (logSeg_lhs) <= (logSeg_rhs)) \
+ )
+
/*
* Compute ID and segment from an XLogRecPtr.
*
***************
*** 453,458 ****
--- 469,481 ----
static void xlog_outrec(char *buf, XLogRecord *record);
static void issue_xlog_fsync(void);
+ static int32 ReadAndSortDirectory(char *dpath,
+ char ***sorted_pointers,
+ char **directory_entries,
+ bool (*filter_function)(const char *, void *),
+ void *ff_context,
+ int (*sortcomp)(const char *, const char *) );
+ static bool LogFileFilterFunction(const char *fname, void *ctx);
/*
* Insert an XLOG record having the specified RMID and info bytes,
***************
*** 1496,1501 ****
--- 1519,1529 ----
NextLogSeg(log, seg);
XLogFileName(path, log, seg);
}
+ /* On BEOS and CYGWIN, we're about to call rename, so make sure
+ * the file didn't exist.
+ */
+ if ( errno != ENOENT )
+ { elog(PANIC, "Unexpected error from open(%s): %m", path); }
}
/*
***************
*** 1583,1656 ****
}
/*
* Remove or move offline all log files older or equal to passed log/seg#
*
* endptr is current (or recent) end of xlog; this is used to determine
* whether we want to recycle rather than delete no-longer-wanted log files.
*/
static void
MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
{
uint32 endlogId;
uint32 endlogSeg;
- DIR *xldir;
- struct dirent *xlde;
char lastoff[32];
char path[MAXPGPATH];
XLByteToPrevSeg(endptr, endlogId, endlogSeg);
- xldir = opendir(XLogDir);
- if (xldir == NULL)
- elog(PANIC, "could not open transaction log directory (%s): %m",
- XLogDir);
-
sprintf(lastoff, "%08X%08X", log, seg);
! errno = 0;
! while ((xlde = readdir(xldir)) != NULL)
{
! if (strlen(xlde->d_name) == 16 &&
! strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
! strcmp(xlde->d_name, lastoff) <= 0)
! {
! snprintf(path, MAXPGPATH, "%s/%s", XLogDir, xlde->d_name);
! if (XLOG_archive_dir[0])
! {
! elog(LOG, "archiving transaction log file %s",
! xlde->d_name);
! elog(WARNING, "archiving log files is not implemented!");
}
else
{
! /*
! * Before deleting the file, see if it can be recycled as
! * a future log segment. We allow recycling segments up
! * to XLOGfiles + XLOGfileslop segments beyond the current
! * XLOG location.
! */
! if (InstallXLogFileSegment(endlogId, endlogSeg, path,
! true, XLOGfiles + XLOGfileslop,
! true))
! {
! elog(LOG, "recycled transaction log file %s",
! xlde->d_name);
! }
! else
! {
! /* No need for any more future segments... */
! elog(LOG, "removing transaction log file %s",
! xlde->d_name);
! unlink(path);
! }
}
}
- errno = 0;
}
- if (errno)
- elog(PANIC, "could not read transaction log directory (%s): %m",
- XLogDir);
- closedir(xldir);
}
/*
--- 1611,1836 ----
}
/*
+ * Read while handling all the stupid EINTR conditions
+ * allowed by POSIX.
+ */
+ static ssize_t
+ xlog_read_nointr(int fd, void *buf, size_t count)
+ {
+ off_t pos;
+ int save_errno = errno;
+ ssize_t rbytes;
+
+ L_restart_read:
+ /* Save our current file position */
+ if ( (pos = lseek(fd, (off_t)0, SEEK_CUR)) == (off_t)-1 )
+ {
+ elog(WARNING, "seek failed: %m");
+ return -1;
+ }
+
+ errno = 0;
+ /* This handles the modern UNIX OS cases */
+ while( (rbytes = read(fd, buf, count)) < 0 &&
+ errno == EINTR &&
+ rbytes == 0)
+ { /* NOTHING */; }
+
+ /* This handles the partial read EINTR allowed by POSIX */
+ if ( rbytes < count && errno == EINTR )
+ {
+ if ( (pos = lseek(fd, pos, SEEK_SET)) == (off_t)-1 )
+ {
+ elog(WARNING, "seek failed: %m");
+ return -1;
+ }
+ goto L_restart_read;
+ }
+
+ if ( !errno )
+ { errno = save_errno; }
+ return rbytes;
+ }
+
+ static bool
+ ArchiveCopyFile(char *srcpath, char *archpath)
+ {
+ unsigned char buf[BLCKSZ * 8]; /* XXX: should be in GUC */
+ int srcfd=-1,
+ archfd=-1;
+ ssize_t rbytes,
+ wbytes;
+
+ if ( (srcfd = BasicOpenFile(srcpath, O_RDONLY|PG_BINARY,
+ S_IRUSR|S_IWUSR)) < 0 )
+ {
+ elog(WARNING, "unable to open archive source file %s: %m",
+ srcpath);
+ goto L_cleanup;
+ }
+
+ if ( (archfd = BasicOpenFile(archpath, O_CREAT|O_EXCL|O_WRONLY|PG_BINARY,
+ S_IRUSR|S_IWUSR)) < 0 )
+ {
+ elog(WARNING, "unable to create archive target file %s: %m",
+ archpath);
+ goto L_cleanup;
+ }
+
+ do {
+
+ if ( (rbytes = xlog_read_nointr(srcfd, buf, sizeof(buf))) < 0 )
+ {
+ elog(WARNING, "could not read archive source: %m");
+ goto L_cleanup;
+ }
+
+ if ( rbytes < 0 )
+ {
+ elog(WARNING, "read from archive source file failed: %m");
+ goto L_cleanup;
+ }
+
+ errno = 0;
+ while ( (wbytes = write(archfd, buf, rbytes)) < 0 &&
+ errno == EINTR )
+ { /* NOTHING */; }
+
+ if ( wbytes < 0 )
+ {
+ elog(WARNING, "write to archive target file failed: %m");
+ goto L_cleanup;
+ }
+
+ if ( wbytes != rbytes )
+ {
+ elog(WARNING,
+ "partial write to archive target file (req %d, wrote %d)",
+ rbytes, wbytes);
+ goto L_cleanup;
+ }
+ } while (wbytes);
+
+ close(srcfd);
+ srcfd = -1;
+ if ( pg_fsync(archfd) < 0 )
+ {
+ elog(WARNING, "unable to fsync archive target file: %m");
+ goto L_cleanup;
+ }
+ close(archfd);
+ archfd = -1;
+ return true;
+
+ L_cleanup:
+ if ( srcfd >= 0 )
+ { close(srcfd); }
+ if ( archfd >= 0 )
+ {
+ close(archfd);
+ if ( unlink(archpath) < 0 )
+ { elog(WARNING, "unable to remove file %s: %m", archpath); }
+ else
+ { elog(WARNING, "removed archive target file %s", archpath); }
+ }
+ return false;
+ }
+
+ /*
* Remove or move offline all log files older or equal to passed log/seg#
*
* endptr is current (or recent) end of xlog; this is used to determine
* whether we want to recycle rather than delete no-longer-wanted log files.
+ *
+ * This is called only from CreateCheckPoint(), so we will hold the
+ * CheckpointLock. It repeatedly acquires and releases the ControlFileLock
+ * while archiving each log file.
*/
static void
MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
{
uint32 endlogId;
uint32 endlogSeg;
char lastoff[32];
char path[MAXPGPATH];
+ char archpath[MAXPGPATH];
+ int32 narchfiles;
+ char **archfilp;
+ char *archfiles;
+ int32 i;
XLByteToPrevSeg(endptr, endlogId, endlogSeg);
sprintf(lastoff, "%08X%08X", log, seg);
! if ( (narchfiles = ReadAndSortDirectory(XLogDir, &archfilp, &archfiles,
! LogFileFilterFunction, lastoff,
! strcasecmp)) < 0 )
! {
! elog(PANIC, "unable to read and sort log directory (%s): %m",
! XLogDir);
! }
!
! /* nothing to move offline */
! if ( narchfiles == 0 )
! { return; }
!
! for (i=0; i < narchfiles; i++)
{
! snprintf(path, MAXPGPATH, "%s/%s", XLogDir, archfilp[i]);
!
! if (XLOG_archive_dir[0])
! {
! bool archok;
!
! snprintf(archpath, MAXPGPATH, "%s/%s.arch", XLOG_archive_dir, archfilp[i]);
! elog(LOG, "archiving transaction log file %s",
! archfilp[i]);
!
! /* No need to hold ControfileLock to do this,
! * because we are sure that nobody can touch files
! * with (log, seg) < lastoff except us.
! */
! archok = ArchiveCopyFile(path, archpath);
!
! if ( archok )
! {
! elog(LOG, "archived log file %s to %s",
! archfilp[i], archpath);
}
else
{
! /* XXX: Should be a config option to panic here instead */
! elog(WARNING, "ARCHIVING FAILED: unable to archive log file %s",
! path);
! elog(LOG,
! "removal and cleanup of old log files terminated because of archive failure");
! return;
}
+
+ }
+
+ /*
+ * Before deleting the file, see if it can be recycled as
+ * a future log segment. We allow recycling segments up
+ * to XLOGfiles + XLOGfileslop segments beyond the current
+ * XLOG location.
+ */
+ if (InstallXLogFileSegment(endlogId, endlogSeg, path,
+ true, XLOGfiles + XLOGfileslop,
+ true))
+ {
+ elog(LOG, "recycled transaction log file %s",
+ archfilp[i]);
+ }
+ else
+ {
+ /* No need for any more future segments... */
+ elog(LOG, "removing transaction log file %s",
+ archfilp[i]);
+ unlink(path);
}
}
}
/*
***************
*** 2471,2476 ****
--- 2651,2781 ----
return buf;
}
+ static bool
+ LSNShouldBeInArchive(XLogRecPtr *lsn)
+ {
+ /* XXX: TODO: Make this function */
+
+ return false;
+ }
+
+ static bool
+ LogFileFilterFunction(const char *fname, void *ctx)
+ {
+ char *lastoff = (char *)ctx;
+
+ Assert(ctx);
+ if (strlen(fname) == 16 &&
+ strspn(fname, "0123456789ABCDEF") == 16 &&
+ strcmp(fname, lastoff) <= 0)
+ {
+ return true;
+ }
+ return false;
+ }
+
+ static int32
+ ReadAndSortDirectory(char *dpath,
+ char ***sorted_pointers,
+ char **directory_entries,
+ bool (*filter_function)(const char *, void *),
+ void *ff_context,
+ int (*sortcomp)(const char *, const char *) )
+ {
+ Size nbytes;
+ int32 nelem;
+ int32 i, j;
+ struct dirent *dent;
+ /* These must be NULL for 'L_cleanup' label (see end) */
+ DIR * dir = NULL;
+ char * names = NULL;
+ char ** ptrs = NULL;
+
+ errno = 0;
+
+ if ( !(dir = opendir(dpath)) )
+ {
+ elog(WARNING, "could not open directory (%s): %m", dpath);
+ return -1;
+ }
+
+ /* Pass 1 to figure-out how much memory we need */
+ nbytes = 0;
+ nelem = 0;
+ while ( (dent = readdir(dir)) )
+ {
+ if ( filter_function(dent->d_name, ff_context) )
+ {
+ nelem++;
+ nbytes += sizeof(char) * (strlen(dent->d_name) + 1);
+ }
+ }
+ if (errno)
+ {
+ elog(WARNING, "could not read directory (%s): %m", dpath);
+ goto L_cleanup;
+ }
+
+ if ( nelem == 0 )
+ {
+ *sorted_pointers = NULL;
+ *directory_entries = NULL;
+ closedir(dir);
+ return 0;
+ }
+
+ names = (char *)palloc(nbytes);
+ ptrs = (char **)palloc( sizeof(char *) * nelem );
+
+ /* Pass 2 to read the filtered directory into memory */
+ rewinddir(dir);
+ i=0; j=0;
+ while ( i < nelem && j < nbytes && (dent = readdir(dir)) )
+ {
+ if ( filter_function(dent->d_name, ff_context) )
+ {
+ ptrs[i] = &names[j];
+ strncpy(ptrs[i], dent->d_name, nbytes - j);
+ j += strlen(dent->d_name) + 1;
+ i++;
+ }
+ }
+ if (errno)
+ {
+ elog(WARNING, "could not read directory (pass 2) (%s): %m", dpath);
+ goto L_cleanup;
+ }
+
+ if ( i != nelem || j != nbytes )
+ {
+ elog(WARNING, "directory %s was changed unexpectedly", dpath);
+ goto L_cleanup;
+ }
+
+ /* Sort what we got */
+ qsort(ptrs, nelem, sizeof(char *),
+ (int (*)(const void *, const void *)) sortcomp);
+
+
+ closedir(dir);
+ *sorted_pointers = ptrs;
+ *directory_entries = names;
+ return nelem;
+
+ L_cleanup:
+
+ if ( dir )
+ { closedir(dir); }
+ *sorted_pointers = NULL;
+ *directory_entries = NULL;
+ if ( names )
+ { pfree(names); }
+ if ( ptrs )
+ { pfree(ptrs); }
+ return -1;
+
+ }
+
/*
* This must be called ONCE during postmaster or standalone-backend startup
*/
***************
*** 2545,2550 ****
--- 2850,2868 ----
else
elog(PANIC, "unable to locate a valid checkpoint record");
}
+
+ if ( LSNShouldBeInArchive(&checkPointLoc) )
+ {
+ elog(PANIC, "The control file says you need to do recovery\n"
+ "\tfrom the log archives.\n"
+ "\tYou must be running a standalone backend to do that.");
+ }
+
+ /* XXX: TODO: Prevent startup in DB_SHUTDOWNED from
+ * old logfile from truncating the log when it finds
+ * a valid checkpoint-shutdown record at startup.
+ */
+
LastRec = RecPtr = checkPointLoc;
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
***************
*** 3274,3279 ****
--- 3592,3628 ----
/*
* GUC support
*/
+
+ const char *
+ assign_xlog_archive_dir(const char *newval, bool doit, bool interactive)
+ {
+ struct stat st;
+
+ if ( newval[0] == '\0' )
+ {
+ return newval;
+ }
+
+
+ if ( stat(newval, &st) < 0 )
+ {
+ elog(WARNING, "unable to stat archive destination \"%s\": %m",
+ (char *)newval);
+ return NULL;
+ }
+
+ if ( !S_ISDIR(st.st_mode) )
+ {
+ elog(WARNING, "archive destination \"%s\" is not a directory",
+ (char *)newval);
+ return NULL;
+ }
+
+ /* XXX: Should check to ensure write permission, etc. */
+
+ return newval;
+ }
+
const char *
assign_xlog_sync_method(const char *method, bool doit, bool interactive)
{
*** ./src/backend/storage/smgr/md.c.orig Sat Jun 22 17:31:55 2002
--- ./src/backend/storage/smgr/md.c Thu Jul 4 07:08:57 2002
***************
*** 76,81 ****
--- 76,83 ----
static MdfdVec *_mdfd_getseg(Relation reln, BlockNumber blkno);
static int _mdfd_blind_getseg(RelFileNode rnode, BlockNumber blkno);
+ static MdfdVec * _mdfd_blind_openseg(RelFileNode rfnode,
+ BlockNumber segno, int oflags);
static int _fdvec_alloc(void);
static void _fdvec_free(int);
***************
*** 860,865 ****
--- 862,1004 ----
}
/*
+ * mdrecreate() -- Create a file if it doesn't exist
+ *
+ * Called during XLOG recovery.
+ */
+ int
+ mdrecreate(RelFileNode rfnode)
+ {
+ int fd;
+ char *path;
+
+ path = relpath(rfnode);
+
+ fd = FileNameOpenFile(path, O_RDWR | O_CREAT | PG_BINARY, 0600);
+
+ pfree(path);
+
+ if ( fd >= 0 )
+ {
+ FileClose(fd);
+ return SM_SUCCESS;
+ }
+ return SM_FAIL;
+ }
+
+ int
+ mdreunlink(RelFileNode rfnode)
+ {
+ int res;
+
+ res = mdunlink(rfnode);
+
+ if ( res != SM_SUCCESS && errno == ENOENT )
+ {
+ return SM_SUCCESS;
+ }
+ return res;
+ }
+
+ int
+ mdretruncate(RelFileNode rfnode, BlockNumber nblcks)
+ {
+ MdfdVec *v;
+ MdfdVec *ov;
+ BlockNumber realnb;
+
+ #ifndef LET_OS_MANAGE_FILESIZE
+ BlockNumber segno;
+ #endif
+
+ v = _mdfd_blind_openseg(rfnode, 0, O_RDWR | PG_BINARY);
+
+ /*
+ * Finds the size of the relation, and saves that in
+ * realnb.
+ */
+ #ifndef LET_OS_MANAGE_FILESIZE
+ segno = 0;
+ for (;;)
+ {
+ realnb = _mdnblocks(v->mdfd_vfd, BLCKSZ);
+ if (realnb > ((BlockNumber) RELSEG_SIZE))
+ elog(FATAL, "segment too big in mdnblocks!");
+ if (realnb < ((BlockNumber) RELSEG_SIZE))
+ {
+ realnb = (segno * ((BlockNumber) RELSEG_SIZE)) + realnb;
+ break;
+ }
+
+ /*
+ * If segment is exactly RELSEG_SIZE, advance to next one.
+ */
+ segno++;
+
+ if (v->mdfd_chain == (MdfdVec *) NULL)
+ {
+ /*
+ * Because we pass O_CREAT, we will create the next segment
+ * (with zero length) immediately, if the last segment is of
+ * length REL_SEGSIZE. This is unnecessary but harmless, and
+ * testing for the case would take more cycles than it seems
+ * worth.
+ */
+ v->mdfd_chain = _mdfd_blind_openseg(rfnode, segno, O_CREAT);
+ if (v->mdfd_chain == (MdfdVec *) NULL)
+ {
+ elog(WARNING,
+ "cannot count blocks for %u/%u.%u -- open failed: %m",
+ rfnode.tblNode, rfnode.relNode, segno);
+ return SM_FAIL;
+ }
+
+ }
+
+ v = v->mdfd_chain;
+ }
+ #else
+ realnb = _mdnblocks(v->mdfd_vfd, BLCKSZ);
+ #endif
+
+ if ( nblcks > realnb )
+ {
+ elog(WARNING,
+ "mdretruncate request to truncate rfnode of size %u blocks to %u",
+ realnb, nblcks);
+ return SM_SUCCESS; /* Yes, succeed here */
+ }
+ else if ( nblcks == realnb )
+ {
+ return SM_SUCCESS;
+ }
+
+ /* Truncate the target segment */
+ v = _mdfd_blind_openseg(rfnode, nblcks/RELSEG_SIZE, O_RDWR);
+ if ( FileTruncate(v->mdfd_vfd, nblcks % RELSEG_SIZE) < 0 )
+ {
+ elog(WARNING, "mdretruncate: FileTruncate failed (%d, %u/%u): %m",
+ v->mdfd_vfd, nblcks/RELSEG_SIZE, nblcks % RELSEG_SIZE);
+ return SM_FAIL;
+ }
+ v = v->mdfd_chain;
+
+ /* Delete any trailing segments */
+ while ( v )
+ {
+ ov = v;
+
+ FileTruncate(v->mdfd_vfd, 0);
+ FileUnlink(v->mdfd_vfd);
+ v = v->mdfd_chain;
+ pfree(ov);
+ }
+
+ return SM_SUCCESS;
+ }
+
+
+ /*
* _fdvec_alloc () -- grab a free (or new) md file descriptor vector.
*
*/
***************
*** 932,937 ****
--- 1071,1123 ----
}
static MdfdVec *
+ _mdfd_blind_openseg(RelFileNode rfnode, BlockNumber segno, int oflags)
+ {
+ MdfdVec *v;
+ int fd;
+ char *path,
+ *fullpath;
+
+ /* be sure we have enough space for the '.segno', if any */
+ path = relpath(rfnode);
+
+ if (segno > 0)
+ {
+ fullpath = (char *) palloc(strlen(path) + 12);
+ sprintf(fullpath, "%s.%u", path, segno);
+ pfree(path);
+ }
+ else
+ fullpath = path;
+
+ /* open the file */
+ fd = FileNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags, 0600);
+
+ pfree(fullpath);
+
+ if (fd < 0)
+ return (MdfdVec *) NULL;
+
+ /* allocate an mdfdvec entry for it */
+ v = (MdfdVec *) MemoryContextAlloc(MdCxt, sizeof(MdfdVec));
+
+ /* fill the entry */
+ v->mdfd_vfd = fd;
+ v->mdfd_flags = (uint16) 0;
+ #ifndef LET_OS_MANAGE_FILESIZE
+ v->mdfd_chain = (MdfdVec *) NULL;
+
+ #ifdef DIAGNOSTIC
+ if (_mdnblocks(fd, BLCKSZ) > ((BlockNumber) RELSEG_SIZE))
+ elog(FATAL, "segment too big on openseg!");
+ #endif
+ #endif
+
+ /* all done */
+ return v;
+ }
+
+ static MdfdVec *
_mdfd_openseg(Relation reln, BlockNumber segno, int oflags)
{
MdfdVec *v;
*** ./src/backend/storage/smgr/smgr.c.orig Sat Jun 22 17:31:55 2002
--- ./src/backend/storage/smgr/smgr.c Tue Jul 9 15:37:33 2002
***************
*** 22,27 ****
--- 22,28 ----
#include "storage/ipc.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
+ #include "miscadmin.h"
static void smgrshutdown(void);
***************
*** 51,61 ****
--- 52,91 ----
int (*smgr_commit) (void); /* may be NULL */
int (*smgr_abort) (void); /* may be NULL */
int (*smgr_sync) (void);
+ int (*smgr_recreate) (RelFileNode rnode); /* may be NULL */
+ int (*smgr_reunlink) (RelFileNode rnode); /* may be NULL */
+ int (*smgr_retruncate) (RelFileNode rnode, BlockNumber nblocks);
+ /* may be NULL */
} f_smgr;
+ /* Either a storage manager supports all the recovery operations,
+ * or none of them.
+ */
+ #define SMGRRecoveryUnsupported(which) \
+ ( \
+ smgrsw[which].smgr_recreate == NULL && \
+ smgrsw[which].smgr_reunlink == NULL && \
+ smgrsw[which].smgr_retruncate == NULL \
+ )
+
+ #define SMGRSupportsRecovery(which) \
+ ( \
+ smgrsw[which].smgr_recreate && \
+ smgrsw[which].smgr_reunlink && \
+ smgrsw[which].smgr_retruncate \
+ )
+
+ #define ASSERT_SMGR_RECOVERY_VALID(which) \
+ Assert(SMGRSupportsRecovery(which) || SMGRRecoveryUnsupported(which))
+
/*
* The weird placement of commas in this init block is to keep the compiler
* happy, regardless of what storage managers we have (or don't have).
+ *
+ * WARNING:
+ * Now that file create/delete/truncate are logged in xlog, changing
+ * storage manager ID of a storage manager will BREAK BACKWARDS
+ * COMPATABILITY with previous log files!
*/
static f_smgr smgrsw[] = {
***************
*** 63,76 ****
/* magnetic disk */
{mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
! mdnblocks, mdtruncate, mdcommit, mdabort, mdsync
! },
#ifdef STABLE_MEMORY_STORAGE
/* main memory */
{mminit, mmshutdown, mmcreate, mmunlink, mmextend, mmopen, mmclose,
mmread, mmwrite, mmflush, mmblindwrt, mmmarkdirty, mmblindmarkdirty,
! mmnblocks, NULL, mmcommit, mmabort},
#endif
};
--- 93,110 ----
/* magnetic disk */
{mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
! mdnblocks, mdtruncate, mdcommit, mdabort, mdsync,
! mdrecreate, mdreunlink, mdretruncate
! }
#ifdef STABLE_MEMORY_STORAGE
+ ,
/* main memory */
{mminit, mmshutdown, mmcreate, mmunlink, mmextend, mmopen, mmclose,
mmread, mmwrite, mmflush, mmblindwrt, mmmarkdirty, mmblindmarkdirty,
! mmnblocks, NULL, mmcommit, mmabort,
! NULL, NULL, NULL
! }
#endif
};
***************
*** 173,181 ****
--- 207,241 ----
int fd;
PendingRelDelete *pending;
+ if ( SMGRSupportsRecovery(which) )
+ {
+ XLogRecPtr lsn;
+ XLogRecData rdata;
+ xl_smgr_create xlrec;
+
+ xlrec.xlsf.smgrid = which;
+ xlrec.xlsf.rfnode = reln->rd_node;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) &xlrec;
+ rdata.len = sizeof(xlrec);
+ rdata.next = NULL;
+
+ /* Because of WAL, failure must be a fatal error.
+ */
+ START_CRIT_SECTION();
+
+ lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE, &rdata);
+
+ /* No need to flush, because file not visible until commit */
+ }
+
if ((fd = (*(smgrsw[which].smgr_create)) (reln)) < 0)
elog(ERROR, "cannot create %s: %m", RelationGetRelationName(reln));
+ if ( SMGRSupportsRecovery(which) )
+ { END_CRIT_SECTION(); }
+
/* Add the relation to the list of stuff to delete at abort */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
***************
*** 457,469 ****
* smgrtruncate() -- Truncate supplied relation to a specified number
* of blocks
*
! * Returns the number of blocks on success, aborts the current
! * transaction on failure.
*/
BlockNumber
smgrtruncate(int16 which, Relation reln, BlockNumber nblocks)
{
! BlockNumber newblks;
newblks = nblocks;
if (smgrsw[which].smgr_truncate)
--- 517,559 ----
* smgrtruncate() -- Truncate supplied relation to a specified number
* of blocks
*
! * Returns the number of blocks on success, aborts the entire
! * system on failure.
*/
BlockNumber
smgrtruncate(int16 which, Relation reln, BlockNumber nblocks)
{
! BlockNumber newblks;
! XLogRecPtr lsn;
! XLogRecData rdata;
! xl_smgr_truncate xlrec;
!
! if ( SMGRSupportsRecovery(which) )
! {
! xlrec.xlblkno = nblocks;
! xlrec.xlsf.smgrid = which;
! xlrec.xlsf.rfnode = reln->rd_node;
!
! rdata.buffer = InvalidBuffer;
! rdata.data = (char *) &xlrec;
! rdata.len = sizeof(xlrec);
! rdata.next = NULL;
! }
!
! /* If truncate fails, we have to bring the system down.
! * If we want the log to be infinitely replayable, then the
! * operation must succeed sometime, either now or in recovery.
! *
! * That means that if an impossible request is logged here,
! * and the request can't be detected as such before it is logged
! * then the recoverability chain is broken and the user will have
! * to reset the logs and do a full backup.
! *
! */
!
! if ( (newblks = smgrnblocks(which, reln)) < nblocks )
! elog(ERROR, "request to truncate file %u blocks long to %u blocks",
! newblks, nblocks);
newblks = nblocks;
if (smgrsw[which].smgr_truncate)
***************
*** 477,497 ****
--- 567,679 ----
nblocks, MaxBlockNumber,
0, NULL, NULL);
+ if ( SMGRSupportsRecovery(which) )
+ {
+ START_CRIT_SECTION();
+
+ lsn = XLogInsert(RM_SMGR_ID, XLOG_NO_TRAN|XLOG_SMGR_TRUNC, &rdata);
+
+ /* Must flush */
+ XLogFlush(lsn);
+ }
+
newblks = (*(smgrsw[which].smgr_truncate)) (reln, nblocks);
if (newblks == InvalidBlockNumber)
elog(ERROR, "cannot truncate %s to %u blocks: %m",
RelationGetRelationName(reln), nblocks);
+
+ if ( SMGRSupportsRecovery(which) )
+ {
+ END_CRIT_SECTION();
+ }
}
return newblks;
}
+
/*
* smgrDoPendingDeletes() -- take care of relation deletes at end of xact.
*/
int
smgrDoPendingDeletes(bool isCommit)
{
+ PendingRelDelete *prd;
+
+ /* If there is anything to delete, then
+ * make XLOG entries for REDO
+ *
+ * Note that this does not need to be in a critical section,
+ * because of the way we handle files (but it might leak files on crash).
+ */
+ if ( pendingDeletes )
+ {
+ #define SMGR_MAXLOGRECSZ 8192
+ XLogRecData rdata;
+ xl_smgr_delete *xlrec;
+ unsigned char bdata[SMGR_MAXLOGRECSZ];
+ uint32 i;
+ uint32 nfmax;
+
+ /* Calculate the maximum number of xl_smgr_file's that
+ * will fit within our record size limit
+ */
+ nfmax = (SMGR_MAXLOGRECSZ - SizeOfXlSmgrDelete(0)) /
+ (SizeOfXlSmgrDelete(1)-SizeOfXlSmgrDelete(0));
+
+ Assert(SizeOfXlSmgrDelete(nfmax) < SMGR_MAXLOGRECSZ &&
+ nfmax > 1);
+
+ #undef SMGR_MAXLOGRECSZ
+ /* Log the files to delete.
+ *
+ * If there are more than 'nfmax' files, then we break them
+ * into separate log records.
+ */
+ xlrec = (xl_smgr_delete *)bdata;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *)xlrec;
+ rdata.next = NULL;
+
+ for (i=0, prd = pendingDeletes;
+ prd;
+ prd = prd->next )
+ {
+ if ( prd->atCommit == isCommit )
+ {
+ xlrec->xlsfiles[i].rfnode = prd->relnode;
+ xlrec->xlsfiles[i].smgrid = prd->which;
+
+ i++;
+
+ if ( i >= nfmax )
+ {
+ Assert(i == nfmax);
+
+ xlrec->xlnfiles = i;
+ rdata.len = SizeOfXlSmgrDelete(xlrec->xlnfiles);
+
+ XLogInsert(RM_SMGR_ID, XLOG_SMGR_DELETE|XLOG_NO_TRAN,
+ &rdata);
+
+ i = 0;
+ }
+ }
+ }
+
+ /* Get the final chunk */
+ if ( i != 0 )
+ {
+ xlrec->xlnfiles = i;
+ rdata.len = SizeOfXlSmgrDelete(xlrec->xlnfiles);
+
+ XLogInsert(RM_SMGR_ID, XLOG_SMGR_DELETE|XLOG_NO_TRAN, &rdata);
+ }
+
+ }
+
+
while (pendingDeletes != NULL)
{
PendingRelDelete *pending = pendingDeletes;
***************
*** 591,596 ****
--- 773,853 ----
return SM_SUCCESS;
}
+ /*
+ * smgrrecreate(), smgrreunlink(), smgrretruncate()
+ *
+ * Redo the appropriate filesystem operation during REDO recovery.
+ *
+ * Return SM_SUCCESS if successfull, SM_FAIL if operation cannot
+ * be redone.
+ */
+
+ int
+ smgrrecreate(int16 which, RelFileNode rnode)
+ {
+ int ret;
+
+ ASSERT_SMGR_RECOVERY_VALID(which);
+ if ( ! smgrsw[which].smgr_recreate )
+ {
+ elog(NOTICE,
+ "recovery attempted on inappropriate storage manager type %d",
+ (int)which);
+ return SM_SUCCESS;
+ }
+
+ if ( (ret = smgrsw[which].smgr_recreate(rnode)) < 0 )
+ elog(WARNING, "recovery-creation failed on %s: %m",
+ DatumGetCString(DirectFunctionCall1(smgrout,
+ Int16GetDatum(which))));
+
+ return ret;
+ }
+
+ int
+ smgrreunlink(int16 which, RelFileNode rnode)
+ {
+ int ret;
+
+ ASSERT_SMGR_RECOVERY_VALID(which);
+ if ( ! smgrsw[which].smgr_reunlink )
+ {
+ elog(NOTICE,
+ "recovery attempted on inappropriate storage manager type %d",
+ (int)which);
+ return SM_SUCCESS;
+ }
+
+ if ( (ret = smgrsw[which].smgr_reunlink(rnode)) < 0 )
+ elog(WARNING, "recovery-deletion failed on %s: %m",
+ DatumGetCString(DirectFunctionCall1(smgrout,
+ Int16GetDatum(which))));
+
+ return ret;
+ }
+
+ int
+ smgrretruncate(int16 which, RelFileNode rnode, BlockNumber nblocks)
+ {
+ int ret;
+
+ ASSERT_SMGR_RECOVERY_VALID(which);
+ if ( ! smgrsw[which].smgr_retruncate )
+ {
+ elog(NOTICE,
+ "recovery attempted on inappropriate storage manager type %d",
+ (int)which);
+ return SM_SUCCESS;
+ }
+
+ if ( (ret = smgrsw[which].smgr_retruncate(rnode, nblocks)) < 0 )
+ elog(WARNING, "recovery-truncate failed on %s: %m",
+ DatumGetCString(DirectFunctionCall1(smgrout,
+ Int16GetDatum(which))));
+
+ return ret;
+ }
+
#ifdef NOT_USED
bool
smgriswo(int16 smgrno)
***************
*** 605,610 ****
--- 862,919 ----
void
smgr_redo(XLogRecPtr lsn, XLogRecord *record)
{
+ uint8 smgrinfo = record->xl_info & ~XLR_INFO_MASK;
+
+ if ( smgrinfo == XLOG_SMGR_CREATE )
+ {
+ xl_smgr_create *xlrec =
+ (xl_smgr_create *)XLogRecGetData(record);
+
+ if ( smgrrecreate(xlrec->xlsf.smgrid, xlrec->xlsf.rfnode)
+ != SM_SUCCESS )
+ {
+ elog(PANIC, "Unable to create file (tblNode=%u, relNode=%u)",
+ xlrec->xlsf.rfnode.tblNode,
+ xlrec->xlsf.rfnode.relNode);
+ }
+ }
+ else if ( smgrinfo == XLOG_SMGR_TRUNC )
+ {
+ xl_smgr_truncate *xlrec =
+ (xl_smgr_truncate *)XLogRecGetData(record);
+
+ if ( smgrretruncate(xlrec->xlsf.smgrid, xlrec->xlsf.rfnode,
+ xlrec->xlblkno) != SM_SUCCESS )
+ {
+ elog(PANIC, "Unable to truncate file (tblNode=%u, relNode=%u)\n"
+ "\tto BlockNumber %u",
+ xlrec->xlsf.rfnode.tblNode,
+ xlrec->xlsf.rfnode.relNode,
+ xlrec->xlblkno);
+ }
+ }
+ else if ( smgrinfo == XLOG_SMGR_DELETE )
+ {
+ uint32 i;
+ xl_smgr_delete *xlrec =
+ (xl_smgr_delete *)XLogRecGetData(record);
+
+ for ( i=0; i<xlrec->xlnfiles; i++ )
+ {
+ if ( smgrreunlink(xlrec->xlsfiles[i].smgrid,
+ xlrec->xlsfiles[i].rfnode) != SM_SUCCESS)
+ {
+ elog(PANIC, "Unable to unlink file node %u/%u",
+ xlrec->xlsfiles[i].rfnode.tblNode,
+ xlrec->xlsfiles[i].rfnode.relNode);
+ }
+ }
+
+ }
+ else
+ {
+ elog(PANIC, "Unknown log record type for SMGR");
+ }
}
void
***************
*** 615,618 ****
--- 924,970 ----
void
smgr_desc(char *buf, uint8 xl_info, char *rec)
{
+ uint8 smgrinfo = xl_info & ~XLR_INFO_MASK;
+
+ if ( smgrinfo == XLOG_SMGR_CREATE )
+ {
+ xl_smgr_create *xlrec = (xl_smgr_create *)rec;
+
+ sprintf(buf,
+ "SMGR Redo Create File: tblNode=%u, relNode=%u, smgrid=%d",
+ xlrec->xlsf.rfnode.tblNode,
+ xlrec->xlsf.rfnode.relNode,
+ (int) xlrec->xlsf.smgrid);
+ }
+ else if ( smgrinfo == XLOG_SMGR_TRUNC )
+ {
+ xl_smgr_truncate *xlrec = (xl_smgr_truncate *)rec;
+
+ sprintf(buf,
+ "SMGR Redo Truncate File: tblNode=%u, relNode=%u, smgrid=%d,"
+ " BlockNumber=%u",
+ xlrec->xlsf.rfnode.tblNode,
+ xlrec->xlsf.rfnode.relNode,
+ (int) xlrec->xlsf.smgrid,
+ xlrec->xlblkno);
+ }
+ else if ( smgrinfo == XLOG_SMGR_DELETE )
+ {
+ xl_smgr_delete *xlrec = (xl_smgr_delete *)rec;
+
+ sprintf(buf,
+ "SMGR Redo Delete Files: %u files to delete",
+ xlrec->xlnfiles);
+ }
+ else
+ {
+ sprintf(buf, "SMGR Unknown Operation ID %d", (int) smgrinfo);
+ }
}
+
+
+
+
+
+
+
*** ./src/backend/utils/misc/guc.c.orig Sat Jun 15 20:09:12 2002
--- ./src/backend/utils/misc/guc.c Fri Jul 5 01:47:17 2002
***************
*** 800,805 ****
--- 800,810 ----
},
{
+ { "wal_archive_dest", PGC_POSTMASTER }, &XLOG_archive_dir,
+ XLOG_archive_dir_default, assign_xlog_archive_dir, NULL
+ },
+
+ {
{ NULL, 0 }, NULL, NULL, NULL, NULL
}
};
*** ./src/include/access/xlog.h.orig Sat Jun 22 17:32:02 2002
--- ./src/include/access/xlog.h Fri Jul 5 01:50:07 2002
***************
*** 188,193 ****
--- 188,195 ----
extern int XLOG_DEBUG;
extern char *XLOG_sync_method;
extern const char XLOG_sync_method_default[];
+ extern char *XLOG_archive_dir;
+ extern const char XLOG_archive_dir_default[];
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
***************
*** 216,221 ****
--- 218,225 ----
*/
extern XLogRecPtr GetUndoRecPtr(void);
+ extern const char *assign_xlog_archive_dir(const char *dir,
+ bool doit, bool interactive);
extern const char *assign_xlog_sync_method(const char *method,
bool doit, bool interactive);
*** ./src/include/storage/smgr.h.orig Sat Jun 22 17:32:06 2002
--- ./src/include/storage/smgr.h Tue Jul 9 15:14:39 2002
***************
*** 19,24 ****
--- 19,66 ----
#include "storage/block.h"
#include "utils/rel.h"
+ typedef struct xl_smgr_file {
+ RelFileNode rfnode;
+ int16 smgrid;
+ } xl_smgr_file;
+
+
+ /* All info needed to re-create the file in recovery */
+ typedef struct xl_smgr_create {
+ xl_smgr_file xlsf;
+ } xl_smgr_create;
+
+ #define SizeOfXlSmgrCreate (offsetof(xl_smgr_create, xlsf) + SizeOfXlSmgrFile)
+
+ typedef struct xl_smgr_truncate {
+ BlockNumber xlblkno;
+ xl_smgr_file xlsf;
+ } xl_smgr_truncate;
+
+ /* All info needed to re-delete a group of files.
+ * This goes in the commit/abort log records issues in xact.c
+ */
+ typedef struct xl_smgr_delete {
+ uint32 xlnfiles;
+ xl_smgr_file xlsfiles[1]; /* VARIABLE SIZE */
+ } xl_smgr_delete;
+
+ #define SizeOfBaseXlSmgrDelete (offsetof(xl_smgr_delete, xlnfiles) + \
+ sizeof(uint32) )
+
+ #define SizeOfXlSmgrDelete(nfiles) \
+ ( \
+ (!nfiles)? SizeOfBaseXlSmgrDelete : \
+ offsetof(xl_smgr_delete, xlsfiles) + \
+ nfiles * sizeof(xl_smgr_file) \
+ )
+
+ #define GetSizeOfXlSmgrDeleteRec(rec) GetSizeOfXlSmgrDelete((rec)->xlnfiles)
+
+ /* XLOG gives us high 4 bits */
+ #define XLOG_SMGR_CREATE 0x10
+ #define XLOG_SMGR_TRUNC 0x20
+ #define XLOG_SMGR_DELETE 0x30
#define SM_FAIL 0
#define SM_SUCCESS 1
***************
*** 50,55 ****
--- 92,101 ----
extern int smgrcommit(void);
extern int smgrabort(void);
extern int smgrsync(void);
+ extern int smgrrecreate(int16 which, RelFileNode rnode);
+ extern int smgrreunlink(int16 which, RelFileNode rnode);
+ extern int smgrretruncate(int16 which, RelFileNode rnode,
+ BlockNumber nblocks);
extern void smgr_redo(XLogRecPtr lsn, XLogRecord *record);
extern void smgr_undo(XLogRecPtr lsn, XLogRecord *record);
***************
*** 77,82 ****
--- 123,131 ----
extern int mdcommit(void);
extern int mdabort(void);
extern int mdsync(void);
+ extern int mdrecreate(RelFileNode rnode);
+ extern int mdreunlink(RelFileNode rnode);
+ extern int mdretruncate(RelFileNode rnode, BlockNumber nblocks);
/* mm.c */
extern int mminit(void);