First group of logging changes for PITR - Mailing list pgsql-patches

From J. R. Nield
Subject First group of logging changes for PITR
Date
Msg-id 1026244592.11285.418.camel@localhost.localdomain
Whole thread Raw
List pgsql-patches
Here's a draft patch that adds additional logging code to certain
resource managers.

Note that some operations like CREATE DATABASE are still not logged.

An example to test it might be:

  <<ensure wal_debug = 16 in postgresql.conf>>

  <<startup -D ./data>>
  CREATE DATABASE test;
  <<backup ./data/global/pg_control while dirty>>
  <<shutdown -D ./data>>

  <<backup all of ./data except from pg_xlog into ./data_old>>
  <<startup -D ./data>>

  CREATE TABLE FOO...
  INSERT BUNCH OF RECORDS...
  CREATE INDEX ON FOO...
  etc, etc.

  <<shutdown -D ./data>>

  <<copy logfiles from ./data/pg_xlog into ./data_old/pg_xlog>>
  <<copy backup pg_control into ./data_old/global>>
  <<startup -D ./data_old and watch the recovery run>>

If you find anything that doesn't recover properly, other than
CREATE/DROP DATABASE and the rtree/GiST indexes, please let me know
about it.

This patch implements logging changes for:

=== RM_SMGR create/truncate/delete ===
  Note that logging of files pending for unlink has been moved into
smgr.c from xact.c::RecordTransactionCommit, and they get logged after
the commit record. The xlog code has a 2^15 size limit on the max size
of a record we have to avoid. This will not be a problem for
correctness.

=== RM_BTREE _bt_load operations logged properly ===
  See nbtree.c and nbtsort.c

=== Incomplete start of XLOG archiver ===
  See xlog.c and guc.c

--
J. R. Nield
jrnield@usol.com


*** ./src/backend/access/nbtree/nbtpage.c.orig    Thu Jun 20 18:18:12 2002
--- ./src/backend/access/nbtree/nbtpage.c    Mon Jul  8 17:58:09 2002
***************
*** 66,72 ****
--- 66,75 ----
          elog(ERROR, "Cannot initialize non-empty btree %s",
               RelationGetRelationName(rel));

+
      buf = ReadBuffer(rel, P_NEW);
+
+     START_CRIT_SECTION();
      pg = BufferGetPage(buf);
      _bt_pageinit(pg, BufferGetPageSize(buf));

***************
*** 79,86 ****
--- 82,115 ----
      op = (BTPageOpaque) PageGetSpecialPointer(pg);
      op->btpo_flags = BTP_META;

+     /* Log BTreeInit record for metapage (and implicitly the RelFileNode as
+      * well)
+      */
+     {
+         XLogRecPtr            lsn;
+         XLogRecData            rdata;
+         xl_btree_metapinit    xlrec;
+
+         xlrec.node        = rel->rd_node;
+         xlrec.magic        = BTREE_MAGIC;
+         xlrec.version    = BTREE_VERSION;
+
+         rdata.buffer    = InvalidBuffer;
+         rdata.data        = (char *) &xlrec;
+         rdata.len        = SizeOfBtreeMetapinit;
+         rdata.next        = NULL;
+
+
+         lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_METAPINIT, &rdata);
+
+         PageSetLSN(BufferGetPage(buf), lsn);
+         PageSetSUI(BufferGetPage(buf), ThisStartUpID);
+     }
+
      WriteBuffer(buf);

+     END_CRIT_SECTION();
+
      /* all done */
      if (USELOCKING)
          UnlockRelation(rel, AccessExclusiveLock);
***************
*** 430,441 ****
--- 459,505 ----
      metaopaque = (BTPageOpaque) PageGetSpecialPointer(metap);
      Assert(metaopaque->btpo_flags & BTP_META);
      metad = BTPageGetMeta(metap);
+
+     START_CRIT_SECTION();
+
      metad->btm_root = rootbknum;
      if (level == 0)                /* called from _do_insert */
          metad->btm_level += 1;
      else
          metad->btm_level = level;        /* called from btsort */
+
+     /* Log root change. Same as from _bt_getroot and _bt_newroot.
+      *
+      * Note that this was not logged before from here, because _bt_metaproot is
+      * only called from _bt_uppershutdown during _bt_load, and index builds
+      * were not logged before PITR support was added.
+      *
+      * jrnield 2002-07-08
+      */
+     {
+         XLogRecPtr            lsn;
+         XLogRecData            rdata;
+         xl_btree_newroot    xlrec;
+
+         xlrec.node        = rel->rd_node;
+         xlrec.level        = metad->btm_level;
+         BlockIdSet(&xlrec.rootblk, metad->btm_root);
+
+         rdata.buffer    = InvalidBuffer;
+         rdata.data        = (char *) &xlrec;
+         rdata.len        = SizeOfBtreeNewroot;
+         rdata.next        = NULL;
+
+         lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &rdata);
+
+         PageSetLSN(BufferGetPage(metabuf), lsn);
+         PageSetSUI(BufferGetPage(metabuf), ThisStartUpID);
+
+     }
+
      _bt_wrtbuf(rel, metabuf);
+
+     END_CRIT_SECTION();
  }

  /*
*** ./src/backend/access/nbtree/nbtree.c.orig    Thu Jun 20 18:18:12 2002
--- ./src/backend/access/nbtree/nbtree.c    Mon Jul  8 17:58:57 2002
***************
*** 1096,1101 ****
--- 1096,1200 ----
      UnlockAndWriteBuffer(metabuf);
  }

+ static void
+ btree_xlog_metapinit(bool redo, XLogRecPtr lsn, XLogRecord *record)
+ {
+     xl_btree_metapinit    *xlrec;
+     Relation            reln;
+     Buffer                buf;
+     Page                pg;
+     BTMetaPageData        *metapg;
+     BTPageOpaque        op;
+
+     if ( !redo )
+         elog(PANIC, "btree_xlog_metapinit: UNDO unimplemented");
+
+     xlrec    = (xl_btree_metapinit *)XLogRecGetData(record);
+     reln    = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
+
+     /* XXX: TODO: This should be done for all the other REDO functions now.
+      * jrnield 2002-07-08
+      */
+     if ( !RelationIsValid(reln) )
+         elog(PANIC, "btree_xlog_metapinit: unable to open relation %u/%u",
+                 xlrec->node.tblNode, xlrec->node.relNode);
+
+
+     buf        = XLogReadBuffer(true, reln, (BlockNumber)0);
+     if ( !BufferIsValid(buf) )
+         elog(PANIC, "btree_xlog_metapinit: failed to read metapage");
+
+     pg        = BufferGetPage(buf);
+
+     _bt_pageinit(pg, BufferGetPageSize(buf));
+
+     metapg    = BTPageGetMeta(pg);
+
+     metapg->btm_magic    = xlrec->magic;
+     metapg->btm_version    = xlrec->version;
+     metapg->btm_root    = P_NONE;
+     metapg->btm_level    = 0;
+
+     op = (BTPageOpaque) PageGetSpecialPointer(pg);
+     op->btpo_flags = BTP_META;
+
+     PageSetLSN(pg, lsn);
+     PageSetSUI(pg, ThisStartUpID);
+     UnlockAndWriteBuffer(buf);
+ }
+
+ static void
+ btree_xlog_buildadd(bool redo, XLogRecPtr lsn, XLogRecord *record)
+ {
+     Relation            reln;
+     xl_btree_buildadd    *xlrec;
+     Buffer                buf;
+     Page                pg;
+
+     if ( !redo )
+         elog(PANIC, "btree_xlog_buildadd: UNDO unimplemented");
+
+     xlrec    = (xl_btree_buildadd *) XLogRecGetData(record);
+     reln    = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
+
+     if ( !RelationIsValid(reln) )
+         elog(PANIC, "btree_xlog_buildadd: unable to open relation %u/%u",
+                 xlrec->node.tblNode, xlrec->node.relNode);
+
+     buf    = XLogReadBuffer(true, reln, xlrec->blkno);
+     if ( !BufferIsValid(buf) )
+         elog(PANIC, "btree_xlog_buildadd: error reading block %u",
+                 xlrec->blkno);
+
+     pg = BufferGetPage(buf);
+
+     if ( XLByteLE(lsn, PageGetLSN(pg)) )
+     {
+         UnlockAndReleaseBuffer(buf);
+         return;
+     }
+
+     /* We do full block backups for pages generated by _bt_load
+      * because they are almost entirely packed with new data.
+      *
+      * We don't use the XLOG "backup blocks" system, because
+      * that is for writing pre-image backups, and we wan't to
+      * be able to turn that off when possible.
+      */
+     memcpy(BufferGetBlock(buf), xlrec->idxpage, xlrec->idxpagesz);
+
+     PageSetLSN(pg, lsn);
+     PageSetSUI(pg, ThisStartUpID);
+     UnlockAndWriteBuffer(buf);
+ }
+
+ static void
+ btree_xlog_finishlevel(bool redo, XLogRecPtr lsn, XLogRecord *record)
+ {
+     /* For now, everything is the same */
+     btree_xlog_buildadd(redo, lsn, record);
+ }
+
  void
  btree_redo(XLogRecPtr lsn, XLogRecord *record)
  {
***************
*** 1112,1117 ****
--- 1211,1222 ----
          btree_xlog_split(true, true, lsn, record);        /* new item on the left */
      else if (info == XLOG_BTREE_NEWROOT)
          btree_xlog_newroot(true, lsn, record);
+     else if (info == XLOG_BTREE_METAPINIT)
+         btree_xlog_metapinit(true, lsn, record);
+     else if (info == XLOG_BTREE_BUILDADD)
+         btree_xlog_buildadd(true, lsn, record);
+     else if (info == XLOG_BTREE_FINISHLEVEL)
+         btree_xlog_finishlevel(true, lsn, record);
      else
          elog(PANIC, "btree_redo: unknown op code %u", info);
  }
***************
*** 1132,1137 ****
--- 1237,1248 ----
          btree_xlog_split(false, true, lsn, record);        /* new item on the left */
      else if (info == XLOG_BTREE_NEWROOT)
          btree_xlog_newroot(false, lsn, record);
+     else if (info == XLOG_BTREE_METAPINIT)
+         btree_xlog_metapinit(false, lsn, record);
+     else if (info == XLOG_BTREE_BUILDADD)
+         btree_xlog_buildadd(false, lsn, record);
+     else if (info == XLOG_BTREE_FINISHLEVEL)
+         btree_xlog_finishlevel(false, lsn, record);
      else
          elog(PANIC, "btree_undo: unknown op code %u", info);
  }
***************
*** 1184,1189 ****
--- 1295,1321 ----
                  xlrec->node.tblNode, xlrec->node.relNode,
                  BlockIdGetBlockNumber(&xlrec->rootblk));
      }
+     else if (info == XLOG_BTREE_METAPINIT)
+     {
+         xl_btree_metapinit *xlrec = (xl_btree_metapinit *) rec;
+
+         sprintf(buf + strlen(buf), "init: node %u/%u",
+                 xlrec->node.tblNode, xlrec->node.relNode);
+     }
+     else if (info == XLOG_BTREE_BUILDADD)
+     {
+         xl_btree_buildadd  *xlrec = (xl_btree_buildadd *) rec;
+
+         sprintf(buf + strlen(buf), "insert sort/load page: node %u/%u; blk %u",
+                 xlrec->node.tblNode, xlrec->node.relNode, xlrec->blkno);
+     }
+     else if (info == XLOG_BTREE_FINISHLEVEL )
+     {
+         xl_btree_finishlevel *xlrec = (xl_btree_finishlevel *) rec;
+
+         sprintf(buf + strlen(buf), "finish sort/load level: node %u/%u; blk %u",
+                 xlrec->node.tblNode, xlrec->node.relNode, xlrec->blkno);
+     }
      else
          strcat(buf, "UNKNOWN");
  }
*** ./src/backend/access/nbtree/nbtsort.c.orig    Fri Jul  5 00:31:36 2002
--- ./src/backend/access/nbtree/nbtsort.c    Tue Jul  9 15:03:18 2002
***************
*** 433,438 ****
--- 433,464 ----
                  BufferGetBlockNumber(state->btps_next->btps_buf);
          }

+         /* Log BTreeBuild Entry for new index page */
+         {
+             XLogRecPtr            lsn;
+             XLogRecData            rdata[2];
+             xl_btree_buildadd    xlrec;
+
+             xlrec.node        = index->rd_node;
+             xlrec.blkno        = BufferGetBlockNumber(obuf);
+             xlrec.idxpagesz    = BLCKSZ;
+
+             rdata[0].buffer    = InvalidBuffer;
+             rdata[0].data    = (char *) &xlrec;
+             rdata[0].len    = SizeOfBtreeBuildAdd;
+             rdata[0].next    = &rdata[1];
+
+             rdata[1].buffer = InvalidBuffer;
+             rdata[1].data    = (char *) BufferGetBlock(obuf);
+             rdata[1].len    = BufferGetPageSize(obuf);
+             rdata[1].next    = NULL;
+
+             lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_BUILDADD, rdata);
+
+             PageSetLSN(BufferGetPage(obuf), lsn);
+             PageSetSUI(BufferGetPage(obuf), ThisStartUpID);
+         }
+
          /*
           * Write out the old page.    We never want to see it again, so we
           * can give up our lock (if we had one; most likely BuildingBtree
***************
*** 516,521 ****
--- 542,575 ----
           * slid back one slot.    Then we can dump out the page.
           */
          _bt_slideleft(index, s->btps_buf, s->btps_page);
+
+         /* Log the last page in the level */
+         {
+             XLogRecPtr                lsn;
+             XLogRecData                rdata[2];
+             xl_btree_finishlevel    xlrec;
+
+             xlrec.node                = index->rd_node;
+             xlrec.blkno                = BufferGetBlockNumber(s->btps_buf);
+             xlrec.idxpagesz            = BufferGetPageSize(s->btps_buf);
+
+             rdata[0].buffer            = InvalidBuffer;
+             rdata[0].data            = (char *) &xlrec;
+             rdata[0].len            = SizeOfBtreeFinishLevel;
+             rdata[0].next            = &rdata[1];
+
+             rdata[1].buffer            = InvalidBuffer;
+             rdata[1].data            = (char *)BufferGetBlock(s->btps_buf);
+             rdata[1].len            = BufferGetPageSize(s->btps_buf);
+             rdata[1].next            = NULL;
+
+             lsn = XLogInsert(RM_BTREE_ID, XLOG_BTREE_FINISHLEVEL, rdata);
+
+             PageSetLSN(BufferGetPage(s->btps_buf), lsn);
+             PageSetSUI(BufferGetPage(s->btps_buf), ThisStartUpID);
+
+         }
+
          _bt_wrtbuf(index, s->btps_buf);
      }
  }
*** ./src/backend/access/transam/xact.c.orig    Thu Jun 20 18:18:13 2002
--- ./src/backend/access/transam/xact.c    Tue Jul  9 12:10:28 2002
***************
*** 556,562 ****
              rdata.next = NULL;

              /*
!              * XXX SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP
               */
              recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, &rdata);
          }
--- 556,566 ----
              rdata.next = NULL;

              /*
!              * RelFileNodes to be deleted are saved AFTER commit
!              * by the RM_SMGR.
!              *
!              * If we crash between now and the delete operation,
!              * then we will leak files.
               */
              recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, &rdata);
          }
***************
*** 711,717 ****
          START_CRIT_SECTION();

          /*
!          * SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP
           */
          recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata);

--- 715,725 ----
          START_CRIT_SECTION();

          /*
!          * RelFileNodes to be deleted are saved AFTER abort by
!          * the RM_SMGR.
!          *
!          * If we crash between now and the delete operation,
!          * then we will leak files.
           */
          recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata);

***************
*** 1588,1599 ****
      if (info == XLOG_XACT_COMMIT)
      {
          TransactionIdCommit(record->xl_xid);
-         /* SHOULD REMOVE FILES OF ALL DROPPED RELATIONS */
      }
      else if (info == XLOG_XACT_ABORT)
      {
          TransactionIdAbort(record->xl_xid);
-         /* SHOULD REMOVE FILES OF ALL FAILED-TO-BE-CREATED RELATIONS */
      }
      else
          elog(PANIC, "xact_redo: unknown op code %u", info);
--- 1596,1605 ----
*** ./src/backend/access/transam/xlog.c.orig    Thu Jun 20 18:18:13 2002
--- ./src/backend/access/transam/xlog.c    Tue Jul  9 14:55:22 2002
***************
*** 91,98 ****
  int            XLOG_DEBUG = 0;
  char       *XLOG_sync_method = NULL;
  const char    XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
! char        XLOG_archive_dir[MAXPGPATH];        /* null string means
                                                   * delete 'em */

  /*
   * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
--- 91,99 ----
  int            XLOG_DEBUG = 0;
  char       *XLOG_sync_method = NULL;
  const char    XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
! char       *XLOG_archive_dir = NULL;            /* null string means
                                                   * delete 'em */
+ const char  XLOG_archive_dir_default[] = "";

  /*
   * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
***************
*** 334,339 ****
--- 335,355 ----
          } \
      } while (0)

+ #define LogSegPairEQ(logId_lhs, logSeg_lhs, logId_rhs, logSeg_rhs) \
+        ((logId_lhs) == (logId_rhs) && (logSeg_lhs) == (logSeg_rhs))
+
+ #define LogSegPairLT(logId_lhs, logSeg_lhs, logId_rhs, logSeg_rhs) \
+      ( \
+        ((logId_lhs) < (logId_rhs)) || \
+        ((logId_lhs) == (logId_rhs) && (logSeg_lhs) < (logSeg_rhs)) \
+      )
+
+ #define LogSegPairLE(logId_lhs, logSeg_lhs, logId_rhs, logSeg_rhs) \
+      ( \
+        ((logId_lhs) < (logId_rhs)) || \
+        ((logId_lhs) == (logId_rhs) && (logSeg_lhs) <= (logSeg_rhs)) \
+      )
+
  /*
   * Compute ID and segment from an XLogRecPtr.
   *
***************
*** 453,458 ****
--- 469,481 ----
  static void xlog_outrec(char *buf, XLogRecord *record);
  static void issue_xlog_fsync(void);

+ static int32 ReadAndSortDirectory(char *dpath,
+                      char ***sorted_pointers,
+                      char **directory_entries,
+                      bool (*filter_function)(const char *, void *),
+                      void *ff_context,
+                      int  (*sortcomp)(const char *, const char *) );
+ static bool LogFileFilterFunction(const char *fname, void *ctx);

  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 1496,1501 ****
--- 1519,1529 ----
              NextLogSeg(log, seg);
              XLogFileName(path, log, seg);
          }
+         /* On BEOS and CYGWIN, we're about to call rename, so make sure
+          * the file didn't exist.
+          */
+         if ( errno != ENOENT )
+             { elog(PANIC, "Unexpected error from open(%s): %m", path); }
      }

      /*
***************
*** 1583,1656 ****
  }

  /*
   * Remove or move offline all log files older or equal to passed log/seg#
   *
   * endptr is current (or recent) end of xlog; this is used to determine
   * whether we want to recycle rather than delete no-longer-wanted log files.
   */
  static void
  MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
  {
      uint32        endlogId;
      uint32        endlogSeg;
-     DIR           *xldir;
-     struct dirent *xlde;
      char        lastoff[32];
      char        path[MAXPGPATH];

      XLByteToPrevSeg(endptr, endlogId, endlogSeg);

-     xldir = opendir(XLogDir);
-     if (xldir == NULL)
-         elog(PANIC, "could not open transaction log directory (%s): %m",
-              XLogDir);
-
      sprintf(lastoff, "%08X%08X", log, seg);

!     errno = 0;
!     while ((xlde = readdir(xldir)) != NULL)
      {
!         if (strlen(xlde->d_name) == 16 &&
!             strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
!             strcmp(xlde->d_name, lastoff) <= 0)
!         {
!             snprintf(path, MAXPGPATH, "%s/%s", XLogDir, xlde->d_name);
!             if (XLOG_archive_dir[0])
!             {
!                 elog(LOG, "archiving transaction log file %s",
!                      xlde->d_name);
!                 elog(WARNING, "archiving log files is not implemented!");
              }
              else
              {
!                 /*
!                  * Before deleting the file, see if it can be recycled as
!                  * a future log segment.  We allow recycling segments up
!                  * to XLOGfiles + XLOGfileslop segments beyond the current
!                  * XLOG location.
!                  */
!                 if (InstallXLogFileSegment(endlogId, endlogSeg, path,
!                                            true, XLOGfiles + XLOGfileslop,
!                                            true))
!                 {
!                     elog(LOG, "recycled transaction log file %s",
!                          xlde->d_name);
!                 }
!                 else
!                 {
!                     /* No need for any more future segments... */
!                     elog(LOG, "removing transaction log file %s",
!                          xlde->d_name);
!                     unlink(path);
!                 }
              }
          }
-         errno = 0;
      }
-     if (errno)
-         elog(PANIC, "could not read transaction log directory (%s): %m",
-              XLogDir);
-     closedir(xldir);
  }

  /*
--- 1611,1836 ----
  }

  /*
+  * Read while handling all the stupid EINTR conditions
+  * allowed by POSIX.
+  */
+ static ssize_t
+ xlog_read_nointr(int fd, void *buf, size_t count)
+ {
+     off_t    pos;
+     int        save_errno = errno;
+     ssize_t    rbytes;
+
+   L_restart_read:
+     /* Save our current file position */
+     if ( (pos = lseek(fd, (off_t)0, SEEK_CUR)) == (off_t)-1 )
+     {
+         elog(WARNING, "seek failed: %m");
+         return -1;
+     }
+
+     errno = 0;
+     /* This handles the modern UNIX OS cases */
+     while( (rbytes = read(fd, buf, count)) < 0 &&
+             errno == EINTR &&
+             rbytes == 0)
+         { /* NOTHING */; }
+
+     /* This handles the partial read EINTR allowed by POSIX */
+     if ( rbytes < count && errno == EINTR )
+     {
+         if ( (pos = lseek(fd, pos, SEEK_SET)) == (off_t)-1 )
+         {
+             elog(WARNING, "seek failed: %m");
+             return -1;
+         }
+         goto L_restart_read;
+     }
+
+     if ( !errno )
+         { errno = save_errno; }
+     return rbytes;
+ }
+
+ static bool
+ ArchiveCopyFile(char *srcpath, char *archpath)
+ {
+     unsigned char    buf[BLCKSZ * 8]; /* XXX: should be in GUC */
+     int                srcfd=-1,
+                     archfd=-1;
+     ssize_t            rbytes,
+                     wbytes;
+
+     if ( (srcfd = BasicOpenFile(srcpath, O_RDONLY|PG_BINARY,
+                                 S_IRUSR|S_IWUSR)) < 0 )
+     {
+         elog(WARNING, "unable to open archive source file %s: %m",
+                      srcpath);
+         goto L_cleanup;
+     }
+
+     if ( (archfd = BasicOpenFile(archpath, O_CREAT|O_EXCL|O_WRONLY|PG_BINARY,
+                                 S_IRUSR|S_IWUSR)) < 0 )
+     {
+         elog(WARNING, "unable to create archive target file %s: %m",
+                      archpath);
+         goto L_cleanup;
+     }
+
+     do {
+
+         if ( (rbytes = xlog_read_nointr(srcfd, buf, sizeof(buf))) < 0 )
+         {
+             elog(WARNING, "could not read archive source: %m");
+             goto L_cleanup;
+         }
+
+         if ( rbytes < 0 )
+         {
+             elog(WARNING, "read from archive source file failed: %m");
+             goto L_cleanup;
+         }
+
+         errno = 0;
+         while ( (wbytes = write(archfd, buf, rbytes)) < 0  &&
+                 errno == EINTR )
+             { /* NOTHING */; }
+
+         if ( wbytes < 0 )
+         {
+             elog(WARNING, "write to archive target file failed: %m");
+             goto L_cleanup;
+         }
+
+         if ( wbytes != rbytes )
+         {
+             elog(WARNING,
+                     "partial write to archive target file (req %d, wrote %d)",
+                     rbytes, wbytes);
+             goto L_cleanup;
+         }
+     } while (wbytes);
+
+     close(srcfd);
+     srcfd = -1;
+     if ( pg_fsync(archfd) < 0 )
+     {
+         elog(WARNING, "unable to fsync archive target file: %m");
+         goto L_cleanup;
+     }
+     close(archfd);
+     archfd = -1;
+     return true;
+
+   L_cleanup:
+     if ( srcfd >= 0 )
+         { close(srcfd); }
+     if ( archfd >= 0 )
+     {
+         close(archfd);
+         if ( unlink(archpath) < 0 )
+             { elog(WARNING, "unable to remove file %s: %m", archpath); }
+         else
+             { elog(WARNING, "removed archive target file %s", archpath); }
+     }
+     return false;
+ }
+
+ /*
   * Remove or move offline all log files older or equal to passed log/seg#
   *
   * endptr is current (or recent) end of xlog; this is used to determine
   * whether we want to recycle rather than delete no-longer-wanted log files.
+  *
+  * This is called only from CreateCheckPoint(), so we will hold the
+  * CheckpointLock. It repeatedly acquires and releases the ControlFileLock
+  * while archiving each log file.
   */
  static void
  MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
  {
      uint32        endlogId;
      uint32        endlogSeg;
      char        lastoff[32];
      char        path[MAXPGPATH];
+     char        archpath[MAXPGPATH];
+     int32        narchfiles;
+     char      **archfilp;
+     char        *archfiles;
+     int32        i;

      XLByteToPrevSeg(endptr, endlogId, endlogSeg);

      sprintf(lastoff, "%08X%08X", log, seg);

!     if ( (narchfiles = ReadAndSortDirectory(XLogDir, &archfilp, &archfiles,
!                                               LogFileFilterFunction, lastoff,
!                                               strcasecmp)) < 0 )
!     {
!         elog(PANIC, "unable to read and sort log directory (%s): %m",
!                 XLogDir);
!     }
!
!     /* nothing to move offline */
!     if ( narchfiles == 0 )
!         { return; }
!
!     for (i=0; i < narchfiles; i++)
      {
!         snprintf(path, MAXPGPATH, "%s/%s", XLogDir, archfilp[i]);
!
!         if (XLOG_archive_dir[0])
!         {
!             bool archok;
!
!             snprintf(archpath, MAXPGPATH, "%s/%s.arch", XLOG_archive_dir, archfilp[i]);
!             elog(LOG, "archiving transaction log file %s",
!                  archfilp[i]);
!
!             /* No need to hold ControfileLock to do this,
!              * because we are sure that nobody can touch files
!              * with (log, seg) < lastoff except us.
!              */
!             archok = ArchiveCopyFile(path, archpath);
!
!             if ( archok )
!             {
!                 elog(LOG, "archived log file %s to %s",
!                         archfilp[i], archpath);
              }
              else
              {
!                 /* XXX: Should be a config option to panic here instead */
!                 elog(WARNING, "ARCHIVING FAILED: unable to archive log file %s",
!                         path);
!                 elog(LOG,
!                     "removal and cleanup of old log files terminated because of archive failure");
!                 return;
              }
+
+         }
+
+         /*
+          * Before deleting the file, see if it can be recycled as
+          * a future log segment.  We allow recycling segments up
+          * to XLOGfiles + XLOGfileslop segments beyond the current
+          * XLOG location.
+          */
+         if (InstallXLogFileSegment(endlogId, endlogSeg, path,
+                                    true, XLOGfiles + XLOGfileslop,
+                                    true))
+         {
+             elog(LOG, "recycled transaction log file %s",
+                  archfilp[i]);
+         }
+         else
+         {
+             /* No need for any more future segments... */
+             elog(LOG, "removing transaction log file %s",
+                  archfilp[i]);
+             unlink(path);
          }
      }
  }

  /*
***************
*** 2471,2476 ****
--- 2651,2781 ----
      return buf;
  }

+ static bool
+ LSNShouldBeInArchive(XLogRecPtr *lsn)
+ {
+     /* XXX: TODO: Make this function */
+
+     return false;
+ }
+
+ static bool
+ LogFileFilterFunction(const char *fname, void *ctx)
+ {
+     char *lastoff = (char *)ctx;
+
+     Assert(ctx);
+     if (strlen(fname) == 16 &&
+         strspn(fname, "0123456789ABCDEF") == 16 &&
+         strcmp(fname, lastoff) <= 0)
+     {
+         return true;
+     }
+     return false;
+ }
+
+ static int32
+ ReadAndSortDirectory(char *dpath,
+                      char ***sorted_pointers,
+                      char **directory_entries,
+                      bool (*filter_function)(const char *, void *),
+                      void *ff_context,
+                      int  (*sortcomp)(const char *, const char *) )
+ {
+     Size    nbytes;
+     int32    nelem;
+     int32    i, j;
+     struct dirent *dent;
+     /* These must be NULL for 'L_cleanup' label (see end) */
+     DIR *    dir   = NULL;
+     char *  names = NULL;
+     char ** ptrs  = NULL;
+
+     errno = 0;
+
+     if ( !(dir = opendir(dpath)) )
+     {
+         elog(WARNING, "could not open directory (%s): %m", dpath);
+         return -1;
+     }
+
+     /* Pass 1 to figure-out how much memory we need */
+     nbytes = 0;
+     nelem = 0;
+     while ( (dent = readdir(dir)) )
+     {
+         if ( filter_function(dent->d_name, ff_context) )
+         {
+             nelem++;
+             nbytes += sizeof(char) * (strlen(dent->d_name) + 1);
+         }
+     }
+     if (errno)
+     {
+         elog(WARNING, "could not read directory (%s): %m", dpath);
+         goto L_cleanup;
+     }
+
+     if ( nelem == 0 )
+     {
+         *sorted_pointers     = NULL;
+         *directory_entries     = NULL;
+         closedir(dir);
+         return 0;
+     }
+
+     names = (char *)palloc(nbytes);
+     ptrs  = (char **)palloc( sizeof(char *) * nelem );
+
+     /* Pass 2 to read the filtered directory into memory */
+     rewinddir(dir);
+     i=0; j=0;
+     while ( i < nelem && j < nbytes && (dent = readdir(dir)) )
+     {
+         if ( filter_function(dent->d_name, ff_context) )
+         {
+             ptrs[i] = &names[j];
+             strncpy(ptrs[i], dent->d_name, nbytes - j);
+             j += strlen(dent->d_name) + 1;
+             i++;
+         }
+     }
+     if (errno)
+     {
+         elog(WARNING, "could not read directory (pass 2) (%s): %m", dpath);
+         goto L_cleanup;
+     }
+
+     if ( i != nelem || j != nbytes )
+     {
+         elog(WARNING, "directory %s was changed unexpectedly", dpath);
+         goto L_cleanup;
+     }
+
+     /* Sort what we got */
+     qsort(ptrs, nelem, sizeof(char *),
+             (int (*)(const void *, const void *)) sortcomp);
+
+
+     closedir(dir);
+     *sorted_pointers     = ptrs;
+     *directory_entries     = names;
+     return nelem;
+
+   L_cleanup:
+
+     if ( dir )
+         { closedir(dir); }
+     *sorted_pointers    = NULL;
+     *directory_entries    = NULL;
+     if ( names )
+         { pfree(names); }
+     if ( ptrs )
+         { pfree(ptrs); }
+     return -1;
+
+ }
+
  /*
   * This must be called ONCE during postmaster or standalone-backend startup
   */
***************
*** 2545,2550 ****
--- 2850,2868 ----
          else
              elog(PANIC, "unable to locate a valid checkpoint record");
      }
+
+     if ( LSNShouldBeInArchive(&checkPointLoc) )
+     {
+         elog(PANIC, "The control file says you need to do recovery\n"
+                     "\tfrom the log archives.\n"
+                     "\tYou must be running a standalone backend to do that.");
+     }
+
+     /* XXX: TODO: Prevent startup in DB_SHUTDOWNED from
+      * old logfile from truncating the log when it finds
+      * a valid checkpoint-shutdown record at startup.
+      */
+
      LastRec = RecPtr = checkPointLoc;
      memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
      wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
***************
*** 3274,3279 ****
--- 3592,3628 ----
  /*
   * GUC support
   */
+
+ const char *
+ assign_xlog_archive_dir(const char *newval, bool doit, bool interactive)
+ {
+     struct stat st;
+
+     if ( newval[0] == '\0' )
+     {
+         return newval;
+     }
+
+
+     if ( stat(newval, &st) < 0 )
+     {
+         elog(WARNING, "unable to stat archive destination \"%s\": %m",
+                 (char *)newval);
+         return NULL;
+     }
+
+     if ( !S_ISDIR(st.st_mode) )
+     {
+         elog(WARNING, "archive destination \"%s\" is not a directory",
+                 (char *)newval);
+         return NULL;
+     }
+
+     /* XXX: Should check to ensure write permission, etc. */
+
+     return newval;
+ }
+
  const char *
  assign_xlog_sync_method(const char *method, bool doit, bool interactive)
  {
*** ./src/backend/storage/smgr/md.c.orig    Sat Jun 22 17:31:55 2002
--- ./src/backend/storage/smgr/md.c    Thu Jul  4 07:08:57 2002
***************
*** 76,81 ****
--- 76,83 ----
  static MdfdVec *_mdfd_getseg(Relation reln, BlockNumber blkno);

  static int    _mdfd_blind_getseg(RelFileNode rnode, BlockNumber blkno);
+ static MdfdVec * _mdfd_blind_openseg(RelFileNode rfnode,
+                                      BlockNumber segno, int oflags);

  static int    _fdvec_alloc(void);
  static void _fdvec_free(int);
***************
*** 860,865 ****
--- 862,1004 ----
  }

  /*
+  *  mdrecreate() -- Create a file if it doesn't exist
+  *
+  *    Called during XLOG recovery.
+  */
+ int
+ mdrecreate(RelFileNode rfnode)
+ {
+     int        fd;
+     char    *path;
+
+     path = relpath(rfnode);
+
+     fd = FileNameOpenFile(path, O_RDWR | O_CREAT | PG_BINARY, 0600);
+
+     pfree(path);
+
+     if ( fd >= 0 )
+     {
+         FileClose(fd);
+         return SM_SUCCESS;
+     }
+     return SM_FAIL;
+ }
+
+ int
+ mdreunlink(RelFileNode rfnode)
+ {
+     int        res;
+
+     res = mdunlink(rfnode);
+
+     if ( res != SM_SUCCESS && errno == ENOENT )
+     {
+         return SM_SUCCESS;
+     }
+     return res;
+ }
+
+ int
+ mdretruncate(RelFileNode rfnode, BlockNumber nblcks)
+ {
+     MdfdVec     *v;
+     MdfdVec     *ov;
+     BlockNumber realnb;
+
+ #ifndef LET_OS_MANAGE_FILESIZE
+     BlockNumber segno;
+ #endif
+
+     v = _mdfd_blind_openseg(rfnode, 0, O_RDWR | PG_BINARY);
+
+     /*
+      * Finds the size of the relation, and saves that in
+      * realnb.
+      */
+ #ifndef LET_OS_MANAGE_FILESIZE
+     segno = 0;
+     for (;;)
+     {
+         realnb = _mdnblocks(v->mdfd_vfd, BLCKSZ);
+         if (realnb > ((BlockNumber) RELSEG_SIZE))
+             elog(FATAL, "segment too big in mdnblocks!");
+         if (realnb < ((BlockNumber) RELSEG_SIZE))
+         {
+             realnb = (segno * ((BlockNumber) RELSEG_SIZE)) + realnb;
+             break;
+         }
+
+         /*
+          * If segment is exactly RELSEG_SIZE, advance to next one.
+          */
+         segno++;
+
+         if (v->mdfd_chain == (MdfdVec *) NULL)
+         {
+             /*
+              * Because we pass O_CREAT, we will create the next segment
+              * (with zero length) immediately, if the last segment is of
+              * length REL_SEGSIZE.    This is unnecessary but harmless, and
+              * testing for the case would take more cycles than it seems
+              * worth.
+              */
+             v->mdfd_chain = _mdfd_blind_openseg(rfnode, segno, O_CREAT);
+             if (v->mdfd_chain == (MdfdVec *) NULL)
+             {
+                 elog(WARNING,
+                      "cannot count blocks for %u/%u.%u -- open failed: %m",
+                      rfnode.tblNode, rfnode.relNode, segno);
+                 return SM_FAIL;
+             }
+
+         }
+
+         v = v->mdfd_chain;
+     }
+ #else
+     realnb = _mdnblocks(v->mdfd_vfd, BLCKSZ);
+ #endif
+
+     if ( nblcks > realnb )
+     {
+         elog(WARNING,
+             "mdretruncate request to truncate rfnode of size %u blocks to %u",
+             realnb, nblcks);
+         return SM_SUCCESS; /* Yes, succeed here */
+     }
+     else if ( nblcks == realnb )
+     {
+         return SM_SUCCESS;
+     }
+
+     /* Truncate the target segment */
+     v = _mdfd_blind_openseg(rfnode, nblcks/RELSEG_SIZE, O_RDWR);
+     if ( FileTruncate(v->mdfd_vfd, nblcks % RELSEG_SIZE) < 0 )
+     {
+         elog(WARNING, "mdretruncate: FileTruncate failed (%d, %u/%u): %m",
+                 v->mdfd_vfd, nblcks/RELSEG_SIZE, nblcks % RELSEG_SIZE);
+         return SM_FAIL;
+     }
+     v = v->mdfd_chain;
+
+     /* Delete any trailing segments */
+     while ( v )
+     {
+         ov = v;
+
+         FileTruncate(v->mdfd_vfd, 0);
+         FileUnlink(v->mdfd_vfd);
+         v = v->mdfd_chain;
+         pfree(ov);
+     }
+
+     return SM_SUCCESS;
+ }
+
+
+ /*
   *    _fdvec_alloc () -- grab a free (or new) md file descriptor vector.
   *
   */
***************
*** 932,937 ****
--- 1071,1123 ----
  }

  static MdfdVec *
+ _mdfd_blind_openseg(RelFileNode rfnode, BlockNumber segno, int oflags)
+ {
+     MdfdVec    *v;
+     int            fd;
+     char       *path,
+                *fullpath;
+
+     /* be sure we have enough space for the '.segno', if any */
+     path = relpath(rfnode);
+
+     if (segno > 0)
+     {
+         fullpath = (char *) palloc(strlen(path) + 12);
+         sprintf(fullpath, "%s.%u", path, segno);
+         pfree(path);
+     }
+     else
+         fullpath = path;
+
+     /* open the file */
+     fd = FileNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags, 0600);
+
+     pfree(fullpath);
+
+     if (fd < 0)
+         return (MdfdVec *) NULL;
+
+     /* allocate an mdfdvec entry for it */
+     v = (MdfdVec *) MemoryContextAlloc(MdCxt, sizeof(MdfdVec));
+
+     /* fill the entry */
+     v->mdfd_vfd = fd;
+     v->mdfd_flags = (uint16) 0;
+ #ifndef LET_OS_MANAGE_FILESIZE
+     v->mdfd_chain = (MdfdVec *) NULL;
+
+ #ifdef DIAGNOSTIC
+     if (_mdnblocks(fd, BLCKSZ) > ((BlockNumber) RELSEG_SIZE))
+         elog(FATAL, "segment too big on openseg!");
+ #endif
+ #endif
+
+     /* all done */
+     return v;
+ }
+
+ static MdfdVec *
  _mdfd_openseg(Relation reln, BlockNumber segno, int oflags)
  {
      MdfdVec    *v;
*** ./src/backend/storage/smgr/smgr.c.orig    Sat Jun 22 17:31:55 2002
--- ./src/backend/storage/smgr/smgr.c    Tue Jul  9 15:37:33 2002
***************
*** 22,27 ****
--- 22,28 ----
  #include "storage/ipc.h"
  #include "storage/smgr.h"
  #include "utils/memutils.h"
+ #include "miscadmin.h"


  static void smgrshutdown(void);
***************
*** 51,61 ****
--- 52,91 ----
      int            (*smgr_commit) (void);    /* may be NULL */
      int            (*smgr_abort) (void);    /* may be NULL */
      int            (*smgr_sync) (void);
+     int            (*smgr_recreate) (RelFileNode rnode); /* may be NULL */
+     int            (*smgr_reunlink) (RelFileNode rnode); /* may be NULL */
+     int            (*smgr_retruncate) (RelFileNode rnode, BlockNumber nblocks);
+         /* may be NULL */
  } f_smgr;

+ /* Either a storage manager supports all the recovery operations,
+  * or none of them.
+  */
+ #define SMGRRecoveryUnsupported(which) \
+ ( \
+   smgrsw[which].smgr_recreate == NULL && \
+   smgrsw[which].smgr_reunlink == NULL && \
+   smgrsw[which].smgr_retruncate == NULL \
+ )
+
+ #define SMGRSupportsRecovery(which) \
+ ( \
+    smgrsw[which].smgr_recreate && \
+    smgrsw[which].smgr_reunlink && \
+    smgrsw[which].smgr_retruncate \
+ )
+
+ #define ASSERT_SMGR_RECOVERY_VALID(which) \
+     Assert(SMGRSupportsRecovery(which) || SMGRRecoveryUnsupported(which))
+
  /*
   *    The weird placement of commas in this init block is to keep the compiler
   *    happy, regardless of what storage managers we have (or don't have).
+  *
+  *    WARNING:
+  *    Now that file create/delete/truncate are logged in xlog, changing
+  *    storage manager ID of a storage manager will BREAK BACKWARDS
+  *    COMPATABILITY with previous log files!
   */

  static f_smgr smgrsw[] = {
***************
*** 63,76 ****
      /* magnetic disk */
      {mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
          mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
!         mdnblocks, mdtruncate, mdcommit, mdabort, mdsync
!     },

  #ifdef STABLE_MEMORY_STORAGE
      /* main memory */
      {mminit, mmshutdown, mmcreate, mmunlink, mmextend, mmopen, mmclose,
          mmread, mmwrite, mmflush, mmblindwrt, mmmarkdirty, mmblindmarkdirty,
!     mmnblocks, NULL, mmcommit, mmabort},
  #endif
  };

--- 93,110 ----
      /* magnetic disk */
      {mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
          mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
!         mdnblocks, mdtruncate, mdcommit, mdabort, mdsync,
!         mdrecreate, mdreunlink, mdretruncate
!     }

  #ifdef STABLE_MEMORY_STORAGE
+     ,
      /* main memory */
      {mminit, mmshutdown, mmcreate, mmunlink, mmextend, mmopen, mmclose,
          mmread, mmwrite, mmflush, mmblindwrt, mmmarkdirty, mmblindmarkdirty,
!         mmnblocks, NULL, mmcommit, mmabort,
!         NULL, NULL, NULL
!     }
  #endif
  };

***************
*** 173,181 ****
--- 207,241 ----
      int            fd;
      PendingRelDelete *pending;

+     if ( SMGRSupportsRecovery(which) )
+     {
+         XLogRecPtr        lsn;
+         XLogRecData        rdata;
+         xl_smgr_create    xlrec;
+
+         xlrec.xlsf.smgrid     = which;
+         xlrec.xlsf.rfnode    = reln->rd_node;
+
+         rdata.buffer         = InvalidBuffer;
+         rdata.data            = (char *) &xlrec;
+         rdata.len            = sizeof(xlrec);
+         rdata.next            = NULL;
+
+         /* Because of WAL, failure must be a fatal error.
+          */
+         START_CRIT_SECTION();
+
+         lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE, &rdata);
+
+         /* No need to flush, because file not visible until commit */
+     }
+
      if ((fd = (*(smgrsw[which].smgr_create)) (reln)) < 0)
          elog(ERROR, "cannot create %s: %m", RelationGetRelationName(reln));

+     if ( SMGRSupportsRecovery(which) )
+         { END_CRIT_SECTION(); }
+
      /* Add the relation to the list of stuff to delete at abort */
      pending = (PendingRelDelete *)
          MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
***************
*** 457,469 ****
   *    smgrtruncate() -- Truncate supplied relation to a specified number
   *                        of blocks
   *
!  *        Returns the number of blocks on success, aborts the current
!  *        transaction on failure.
   */
  BlockNumber
  smgrtruncate(int16 which, Relation reln, BlockNumber nblocks)
  {
!     BlockNumber newblks;

      newblks = nblocks;
      if (smgrsw[which].smgr_truncate)
--- 517,559 ----
   *    smgrtruncate() -- Truncate supplied relation to a specified number
   *                        of blocks
   *
!  *        Returns the number of blocks on success, aborts the entire
!  *        system on failure.
   */
  BlockNumber
  smgrtruncate(int16 which, Relation reln, BlockNumber nblocks)
  {
!     BlockNumber         newblks;
!     XLogRecPtr            lsn;
!     XLogRecData            rdata;
!     xl_smgr_truncate    xlrec;
!
!     if ( SMGRSupportsRecovery(which) )
!     {
!         xlrec.xlblkno         = nblocks;
!         xlrec.xlsf.smgrid    = which;
!         xlrec.xlsf.rfnode    = reln->rd_node;
!
!         rdata.buffer        = InvalidBuffer;
!         rdata.data            = (char *) &xlrec;
!         rdata.len            = sizeof(xlrec);
!         rdata.next            = NULL;
!     }
!
!     /* If truncate fails, we have to bring the system down.
!      * If we want the log to be infinitely replayable, then the
!      * operation must succeed sometime, either now or in recovery.
!      *
!      * That means that if an impossible request is logged here,
!      * and the request can't be detected as such before it is logged
!      * then the recoverability chain is broken and the user will have
!      * to reset the logs and do a full backup.
!      *
!      */
!
!     if ( (newblks = smgrnblocks(which, reln)) < nblocks )
!         elog(ERROR, "request to truncate file %u blocks long to %u blocks",
!                 newblks, nblocks);

      newblks = nblocks;
      if (smgrsw[which].smgr_truncate)
***************
*** 477,497 ****
--- 567,679 ----
                               nblocks, MaxBlockNumber,
                               0, NULL, NULL);

+         if ( SMGRSupportsRecovery(which) )
+         {
+             START_CRIT_SECTION();
+
+             lsn = XLogInsert(RM_SMGR_ID, XLOG_NO_TRAN|XLOG_SMGR_TRUNC, &rdata);
+
+             /* Must flush */
+             XLogFlush(lsn);
+         }
+
          newblks = (*(smgrsw[which].smgr_truncate)) (reln, nblocks);
          if (newblks == InvalidBlockNumber)
              elog(ERROR, "cannot truncate %s to %u blocks: %m",
                   RelationGetRelationName(reln), nblocks);
+
+         if ( SMGRSupportsRecovery(which) )
+         {
+             END_CRIT_SECTION();
+         }
      }

      return newblks;
  }

+
  /*
   * smgrDoPendingDeletes() -- take care of relation deletes at end of xact.
   */
  int
  smgrDoPendingDeletes(bool isCommit)
  {
+     PendingRelDelete    *prd;
+
+     /* If there is anything to delete, then
+      * make XLOG entries for REDO
+      *
+      * Note that this does not need to be in a critical section,
+      * because of the way we handle files (but it might leak files on crash).
+      */
+     if ( pendingDeletes )
+     {
+ #define SMGR_MAXLOGRECSZ    8192
+         XLogRecData        rdata;
+         xl_smgr_delete    *xlrec;
+         unsigned char    bdata[SMGR_MAXLOGRECSZ];
+         uint32            i;
+         uint32            nfmax;
+
+         /* Calculate the maximum number of xl_smgr_file's that
+          * will fit within our record size limit
+          */
+         nfmax = (SMGR_MAXLOGRECSZ - SizeOfXlSmgrDelete(0)) /
+                 (SizeOfXlSmgrDelete(1)-SizeOfXlSmgrDelete(0));
+
+         Assert(SizeOfXlSmgrDelete(nfmax) < SMGR_MAXLOGRECSZ &&
+                 nfmax > 1);
+
+ #undef  SMGR_MAXLOGRECSZ
+         /* Log the files to delete.
+          *
+          * If there are more than 'nfmax' files, then we break them
+          * into separate log records.
+          */
+         xlrec = (xl_smgr_delete *)bdata;
+
+         rdata.buffer = InvalidBuffer;
+         rdata.data     = (char *)xlrec;
+         rdata.next     = NULL;
+
+         for (i=0, prd = pendingDeletes;
+                 prd;
+                 prd = prd->next )
+         {
+             if ( prd->atCommit == isCommit )
+             {
+                 xlrec->xlsfiles[i].rfnode = prd->relnode;
+                 xlrec->xlsfiles[i].smgrid = prd->which;
+
+                 i++;
+
+                 if ( i >= nfmax )
+                 {
+                     Assert(i == nfmax);
+
+                     xlrec->xlnfiles = i;
+                     rdata.len = SizeOfXlSmgrDelete(xlrec->xlnfiles);
+
+                     XLogInsert(RM_SMGR_ID, XLOG_SMGR_DELETE|XLOG_NO_TRAN,
+                             &rdata);
+
+                     i = 0;
+                 }
+             }
+         }
+
+         /* Get the final chunk */
+         if ( i != 0 )
+         {
+             xlrec->xlnfiles = i;
+             rdata.len = SizeOfXlSmgrDelete(xlrec->xlnfiles);
+
+             XLogInsert(RM_SMGR_ID, XLOG_SMGR_DELETE|XLOG_NO_TRAN, &rdata);
+         }
+
+     }
+
+
      while (pendingDeletes != NULL)
      {
          PendingRelDelete *pending = pendingDeletes;
***************
*** 591,596 ****
--- 773,853 ----
      return SM_SUCCESS;
  }

+ /*
+  * smgrrecreate(), smgrreunlink(), smgrretruncate()
+  *
+  * Redo the appropriate filesystem operation during REDO recovery.
+  *
+  * Return SM_SUCCESS if successfull, SM_FAIL if operation cannot
+  * be redone.
+  */
+
+ int
+ smgrrecreate(int16 which, RelFileNode rnode)
+ {
+     int ret;
+
+     ASSERT_SMGR_RECOVERY_VALID(which);
+     if ( ! smgrsw[which].smgr_recreate )
+     {
+         elog(NOTICE,
+                 "recovery attempted on inappropriate storage manager type %d",
+                 (int)which);
+         return SM_SUCCESS;
+     }
+
+     if ( (ret = smgrsw[which].smgr_recreate(rnode)) < 0 )
+         elog(WARNING, "recovery-creation failed on %s: %m",
+                  DatumGetCString(DirectFunctionCall1(smgrout,
+                                                  Int16GetDatum(which))));
+
+     return ret;
+ }
+
+ int
+ smgrreunlink(int16 which, RelFileNode rnode)
+ {
+     int ret;
+
+     ASSERT_SMGR_RECOVERY_VALID(which);
+     if ( ! smgrsw[which].smgr_reunlink )
+     {
+         elog(NOTICE,
+                 "recovery attempted on inappropriate storage manager type %d",
+                 (int)which);
+         return SM_SUCCESS;
+     }
+
+     if ( (ret = smgrsw[which].smgr_reunlink(rnode)) < 0 )
+         elog(WARNING, "recovery-deletion failed on %s: %m",
+                  DatumGetCString(DirectFunctionCall1(smgrout,
+                                                  Int16GetDatum(which))));
+
+     return ret;
+ }
+
+ int
+ smgrretruncate(int16 which, RelFileNode rnode, BlockNumber nblocks)
+ {
+     int ret;
+
+     ASSERT_SMGR_RECOVERY_VALID(which);
+     if ( ! smgrsw[which].smgr_retruncate )
+     {
+         elog(NOTICE,
+                 "recovery attempted on inappropriate storage manager type %d",
+                 (int)which);
+         return SM_SUCCESS;
+     }
+
+     if ( (ret = smgrsw[which].smgr_retruncate(rnode, nblocks)) < 0 )
+         elog(WARNING, "recovery-truncate failed on %s: %m",
+                  DatumGetCString(DirectFunctionCall1(smgrout,
+                                                  Int16GetDatum(which))));
+
+     return ret;
+ }
+
  #ifdef NOT_USED
  bool
  smgriswo(int16 smgrno)
***************
*** 605,610 ****
--- 862,919 ----
  void
  smgr_redo(XLogRecPtr lsn, XLogRecord *record)
  {
+     uint8        smgrinfo = record->xl_info & ~XLR_INFO_MASK;
+
+     if ( smgrinfo == XLOG_SMGR_CREATE )
+     {
+         xl_smgr_create        *xlrec =
+             (xl_smgr_create *)XLogRecGetData(record);
+
+         if ( smgrrecreate(xlrec->xlsf.smgrid, xlrec->xlsf.rfnode)
+                 != SM_SUCCESS )
+         {
+             elog(PANIC, "Unable to create file (tblNode=%u, relNode=%u)",
+                     xlrec->xlsf.rfnode.tblNode,
+                     xlrec->xlsf.rfnode.relNode);
+         }
+     }
+     else if ( smgrinfo == XLOG_SMGR_TRUNC )
+     {
+         xl_smgr_truncate    *xlrec =
+             (xl_smgr_truncate *)XLogRecGetData(record);
+
+         if ( smgrretruncate(xlrec->xlsf.smgrid, xlrec->xlsf.rfnode,
+                         xlrec->xlblkno) != SM_SUCCESS )
+         {
+             elog(PANIC, "Unable to truncate file (tblNode=%u, relNode=%u)\n"
+                         "\tto BlockNumber %u",
+                         xlrec->xlsf.rfnode.tblNode,
+                         xlrec->xlsf.rfnode.relNode,
+                         xlrec->xlblkno);
+         }
+     }
+     else if ( smgrinfo == XLOG_SMGR_DELETE )
+     {
+         uint32                i;
+         xl_smgr_delete        *xlrec =
+             (xl_smgr_delete *)XLogRecGetData(record);
+
+         for ( i=0; i<xlrec->xlnfiles; i++ )
+         {
+             if ( smgrreunlink(xlrec->xlsfiles[i].smgrid,
+                                  xlrec->xlsfiles[i].rfnode) != SM_SUCCESS)
+             {
+                 elog(PANIC, "Unable to unlink file node %u/%u",
+                         xlrec->xlsfiles[i].rfnode.tblNode,
+                         xlrec->xlsfiles[i].rfnode.relNode);
+             }
+         }
+
+     }
+     else
+     {
+         elog(PANIC, "Unknown log record type for SMGR");
+     }
  }

  void
***************
*** 615,618 ****
--- 924,970 ----
  void
  smgr_desc(char *buf, uint8 xl_info, char *rec)
  {
+     uint8        smgrinfo = xl_info & ~XLR_INFO_MASK;
+
+     if ( smgrinfo == XLOG_SMGR_CREATE )
+     {
+         xl_smgr_create        *xlrec = (xl_smgr_create *)rec;
+
+         sprintf(buf,
+                 "SMGR Redo Create File: tblNode=%u, relNode=%u, smgrid=%d",
+                 xlrec->xlsf.rfnode.tblNode,
+                 xlrec->xlsf.rfnode.relNode,
+                 (int) xlrec->xlsf.smgrid);
+     }
+     else if ( smgrinfo == XLOG_SMGR_TRUNC )
+     {
+         xl_smgr_truncate    *xlrec = (xl_smgr_truncate *)rec;
+
+         sprintf(buf,
+                 "SMGR Redo Truncate File: tblNode=%u, relNode=%u, smgrid=%d,"
+                 " BlockNumber=%u",
+                 xlrec->xlsf.rfnode.tblNode,
+                 xlrec->xlsf.rfnode.relNode,
+                 (int) xlrec->xlsf.smgrid,
+                 xlrec->xlblkno);
+     }
+     else if ( smgrinfo == XLOG_SMGR_DELETE )
+     {
+         xl_smgr_delete        *xlrec = (xl_smgr_delete *)rec;
+
+         sprintf(buf,
+                 "SMGR Redo Delete Files: %u files to delete",
+                 xlrec->xlnfiles);
+     }
+     else
+     {
+         sprintf(buf, "SMGR Unknown Operation ID %d", (int) smgrinfo);
+     }
  }
+
+
+
+
+
+
+
*** ./src/backend/utils/misc/guc.c.orig    Sat Jun 15 20:09:12 2002
--- ./src/backend/utils/misc/guc.c    Fri Jul  5 01:47:17 2002
***************
*** 800,805 ****
--- 800,810 ----
      },

      {
+         { "wal_archive_dest", PGC_POSTMASTER }, &XLOG_archive_dir,
+         XLOG_archive_dir_default, assign_xlog_archive_dir, NULL
+     },
+
+     {
          { NULL, 0 }, NULL, NULL, NULL, NULL
      }
  };
*** ./src/include/access/xlog.h.orig    Sat Jun 22 17:32:02 2002
--- ./src/include/access/xlog.h    Fri Jul  5 01:50:07 2002
***************
*** 188,193 ****
--- 188,195 ----
  extern int    XLOG_DEBUG;
  extern char *XLOG_sync_method;
  extern const char XLOG_sync_method_default[];
+ extern char *XLOG_archive_dir;
+ extern const char XLOG_archive_dir_default[];


  extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
***************
*** 216,221 ****
--- 218,225 ----
   */
  extern XLogRecPtr GetUndoRecPtr(void);

+ extern const char *assign_xlog_archive_dir(const char *dir,
+                                            bool doit, bool interactive);
  extern const char *assign_xlog_sync_method(const char *method,
                                             bool doit, bool interactive);

*** ./src/include/storage/smgr.h.orig    Sat Jun 22 17:32:06 2002
--- ./src/include/storage/smgr.h    Tue Jul  9 15:14:39 2002
***************
*** 19,24 ****
--- 19,66 ----
  #include "storage/block.h"
  #include "utils/rel.h"

+ typedef struct xl_smgr_file {
+     RelFileNode        rfnode;
+     int16            smgrid;
+ } xl_smgr_file;
+
+
+ /* All info needed to re-create the file in recovery */
+ typedef struct xl_smgr_create {
+     xl_smgr_file    xlsf;
+ } xl_smgr_create;
+
+ #define SizeOfXlSmgrCreate    (offsetof(xl_smgr_create, xlsf) + SizeOfXlSmgrFile)
+
+ typedef struct xl_smgr_truncate {
+     BlockNumber        xlblkno;
+     xl_smgr_file    xlsf;
+ } xl_smgr_truncate;
+
+ /* All info needed to re-delete a group of files.
+  * This goes in the commit/abort log records issues in xact.c
+  */
+ typedef struct xl_smgr_delete {
+     uint32            xlnfiles;
+     xl_smgr_file    xlsfiles[1]; /* VARIABLE SIZE */
+ } xl_smgr_delete;
+
+ #define SizeOfBaseXlSmgrDelete (offsetof(xl_smgr_delete, xlnfiles) + \
+                                     sizeof(uint32) )
+
+ #define SizeOfXlSmgrDelete(nfiles) \
+ ( \
+    (!nfiles)? SizeOfBaseXlSmgrDelete : \
+            offsetof(xl_smgr_delete, xlsfiles) + \
+         nfiles * sizeof(xl_smgr_file) \
+ )
+
+ #define GetSizeOfXlSmgrDeleteRec(rec)    GetSizeOfXlSmgrDelete((rec)->xlnfiles)
+
+ /* XLOG gives us high 4 bits */
+ #define XLOG_SMGR_CREATE    0x10
+ #define XLOG_SMGR_TRUNC        0x20
+ #define XLOG_SMGR_DELETE    0x30

  #define SM_FAIL            0
  #define SM_SUCCESS        1
***************
*** 50,55 ****
--- 92,101 ----
  extern int    smgrcommit(void);
  extern int    smgrabort(void);
  extern int    smgrsync(void);
+ extern int    smgrrecreate(int16 which, RelFileNode rnode);
+ extern int    smgrreunlink(int16 which, RelFileNode rnode);
+ extern int    smgrretruncate(int16 which, RelFileNode rnode,
+                             BlockNumber nblocks);

  extern void smgr_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void smgr_undo(XLogRecPtr lsn, XLogRecord *record);
***************
*** 77,82 ****
--- 123,131 ----
  extern int    mdcommit(void);
  extern int    mdabort(void);
  extern int    mdsync(void);
+ extern int    mdrecreate(RelFileNode rnode);
+ extern int    mdreunlink(RelFileNode rnode);
+ extern int    mdretruncate(RelFileNode rnode, BlockNumber nblocks);

  /* mm.c */
  extern int    mminit(void);

pgsql-patches by date:

Previous
From: "eggli"
Date:
Subject: New Full Text Index using contrib/fulltextindex which now able to processing Traditional Chinese characters(Big5 encoding)
Next
From: nconway@klamath.dyndns.org (Neil Conway)
Date:
Subject: Re: UNIQUE predicate