Re: Make CLUSTER MVCC-safe - Mailing list pgsql-patches

From Heikki Linnakangas
Subject Re: Make CLUSTER MVCC-safe
Date
Msg-id 460B7B61.5090902@enterprisedb.com
Whole thread Raw
In response to Make CLUSTER MVCC-safe  (Heikki Linnakangas <heikki@enterprisedb.com>)
Responses Re: Make CLUSTER MVCC-safe
Re: Make CLUSTER MVCC-safe
Re: Make CLUSTER MVCC-safe
List pgsql-patches
Here's an update, fixing conflict by Tom's recent commit of Simon's
patch to skip WAL-inserts when archiving is not enabled.

Heikki Linnakangas wrote:
> This patch makes CLUSTER MVCC-safe. Visibility information and update
> chains are preserved like in VACUUM FULL.
>
> I created a new generic rewriteheap-facility to handle rewriting tables
> in a visibility-preserving manner. All the update chain tracking is done
> in rewriteheap.c, the caller is responsible for supplying the stream of
> tuples.
>
> CLUSTER is currently the only user of the facility, but I'm envisioning
> we might have other users in the future. For example, a version of
> VACUUM FULL that rewrites the whole table. We could also use it to make
> ALTER TABLE MVCC-safe, but there's some issues with that. For example,
> what to do if RECENTLY_DEAD tuples don't satisfy a newly added constraint.
>
> One complication in the implementation was the fact that heap_insert
> overwrites the visibility information, and it doesn't write the full
> tuple header to WAL. I ended up implementing a special-purpose
> raw_heap_insert function instead, which is optimized for bulk inserting
> a lot of tuples, knowing that we have exclusive access to the heap.
> raw_heap_insert keeps the current buffer locked over calls, until it
> gets full, and inserts the whole page to WAL as a single record using
> the existing XLOG_HEAP_NEWPAGE record type.
>
> This makes CLUSTER a more viable alternative to VACUUM FULL. One
> motivation for making CLUSTER MVCC-safe is that if some poor soul runs
> pg_dump to make a backup concurrently with CLUSTER, the clustered tables
> will appear to be empty in the dump file.
>
> The documentation doesn't anything about CLUSTER not being MVCC-safe, so
> I suppose there's no need to touch the docs. I sent a doc patch earlier
> to add a note about it, that doc patch should still be applied to older
> release branches, IMO.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.230
diff -c -r1.230 heapam.c
*** src/backend/access/heap/heapam.c    29 Mar 2007 00:15:37 -0000    1.230
--- src/backend/access/heap/heapam.c    29 Mar 2007 08:27:21 -0000
***************
*** 3280,3285 ****
--- 3280,3322 ----
      return log_heap_update(reln, oldbuf, from, newbuf, newtup, true);
  }

+ /*
+  * Perofrm XLogInsert of a full page image to WAL. Caller must make sure
+  * the page contents on disk are consistent with the page inserted to WAL.
+  */
+ XLogRecPtr
+ log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page)
+ {
+     xl_heap_newpage xlrec;
+     XLogRecPtr    recptr;
+     XLogRecData rdata[2];
+
+     /* NO ELOG(ERROR) from here till newpage op is logged */
+     START_CRIT_SECTION();
+
+     xlrec.node = *rnode;
+     xlrec.blkno = blkno;
+
+     rdata[0].data = (char *) &xlrec;
+     rdata[0].len = SizeOfHeapNewpage;
+     rdata[0].buffer = InvalidBuffer;
+     rdata[0].next = &(rdata[1]);
+
+     rdata[1].data = (char *) page;
+     rdata[1].len = BLCKSZ;
+     rdata[1].buffer = InvalidBuffer;
+     rdata[1].next = NULL;
+
+     recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
+
+     PageSetLSN(page, recptr);
+     PageSetTLI(page, ThisTimeLineID);
+
+     END_CRIT_SECTION();
+
+     return recptr;
+ }
+
  static void
  heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
  {
Index: src/backend/access/nbtree/nbtsort.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtsort.c,v
retrieving revision 1.110
diff -c -r1.110 nbtsort.c
*** src/backend/access/nbtree/nbtsort.c    9 Jan 2007 02:14:10 -0000    1.110
--- src/backend/access/nbtree/nbtsort.c    20 Mar 2007 17:58:20 -0000
***************
*** 64,69 ****
--- 64,70 ----

  #include "postgres.h"

+ #include "access/heapam.h"
  #include "access/nbtree.h"
  #include "miscadmin.h"
  #include "storage/smgr.h"
***************
*** 265,296 ****
      if (wstate->btws_use_wal)
      {
          /* We use the heap NEWPAGE record type for this */
!         xl_heap_newpage xlrec;
!         XLogRecPtr    recptr;
!         XLogRecData rdata[2];
!
!         /* NO ELOG(ERROR) from here till newpage op is logged */
!         START_CRIT_SECTION();
!
!         xlrec.node = wstate->index->rd_node;
!         xlrec.blkno = blkno;
!
!         rdata[0].data = (char *) &xlrec;
!         rdata[0].len = SizeOfHeapNewpage;
!         rdata[0].buffer = InvalidBuffer;
!         rdata[0].next = &(rdata[1]);
!
!         rdata[1].data = (char *) page;
!         rdata[1].len = BLCKSZ;
!         rdata[1].buffer = InvalidBuffer;
!         rdata[1].next = NULL;
!
!         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
!
!         PageSetLSN(page, recptr);
!         PageSetTLI(page, ThisTimeLineID);
!
!         END_CRIT_SECTION();
      }
      else
      {
--- 266,272 ----
      if (wstate->btws_use_wal)
      {
          /* We use the heap NEWPAGE record type for this */
!         log_newpage(&wstate->index->rd_node, blkno, page);
      }
      else
      {
Index: src/backend/commands/Makefile
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/Makefile,v
retrieving revision 1.35
diff -c -r1.35 Makefile
*** src/backend/commands/Makefile    20 Jan 2007 17:16:11 -0000    1.35
--- src/backend/commands/Makefile    20 Mar 2007 17:04:58 -0000
***************
*** 16,22 ****
      conversioncmds.o copy.o \
      dbcommands.o define.o explain.o functioncmds.o \
      indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
!     portalcmds.o prepare.o proclang.o \
      schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
      typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o

--- 16,22 ----
      conversioncmds.o copy.o \
      dbcommands.o define.o explain.o functioncmds.o \
      indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
!     rewriteheap.o portalcmds.o prepare.o proclang.o \
      schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
      typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o

Index: src/backend/commands/cluster.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/cluster.c,v
retrieving revision 1.158
diff -c -r1.158 cluster.c
*** src/backend/commands/cluster.c    29 Mar 2007 00:15:37 -0000    1.158
--- src/backend/commands/cluster.c    29 Mar 2007 08:28:57 -0000
***************
*** 20,25 ****
--- 20,26 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/xact.h"
+ #include "access/transam.h"
  #include "catalog/catalog.h"
  #include "catalog/dependency.h"
  #include "catalog/heap.h"
***************
*** 36,41 ****
--- 37,44 ----
  #include "utils/memutils.h"
  #include "utils/syscache.h"
  #include "utils/relcache.h"
+ #include "storage/procarray.h"
+ #include "commands/rewriteheap.h"


  /*
***************
*** 653,659 ****
      char       *nulls;
      IndexScanDesc scan;
      HeapTuple    tuple;
!     CommandId    mycid = GetCurrentCommandId();
      bool        use_wal;

      /*
--- 656,663 ----
      char       *nulls;
      IndexScanDesc scan;
      HeapTuple    tuple;
!     TransactionId OldestXmin;
!     RewriteState rwstate;
      bool        use_wal;

      /*
***************
*** 677,682 ****
--- 681,689 ----
      nulls = (char *) palloc(natts * sizeof(char));
      memset(nulls, 'n', natts * sizeof(char));

+     /* Get an OldestXmin to use to weed out dead tuples */
+     OldestXmin = GetOldestXmin(OldHeap->rd_rel->relisshared, false);
+
      /*
       * We need to log the copied data in WAL iff WAL archiving is enabled AND
       * it's not a temp rel.  (Since we know the target relation is new and
***************
*** 688,724 ****
      /* use_wal off requires rd_targblock be initially invalid */
      Assert(NewHeap->rd_targblock == InvalidBlockNumber);

      /*
       * Scan through the OldHeap on the OldIndex and copy each tuple into the
       * NewHeap.
       */
      scan = index_beginscan(OldHeap, OldIndex,
!                            SnapshotNow, 0, (ScanKey) NULL);

      while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
      {
          /*
           * We cannot simply pass the tuple to heap_insert(), for several
           * reasons:
           *
!          * 1. heap_insert() will overwrite the commit-status fields of the
!          * tuple it's handed.  This would trash the source relation, which is
!          * bad news if we abort later on.  (This was a bug in releases thru
!          * 7.0)
!          *
!          * 2. We'd like to squeeze out the values of any dropped columns, both
           * to save space and to ensure we have no corner-case failures. (It's
           * possible for example that the new table hasn't got a TOAST table
           * and so is unable to store any large values of dropped cols.)
           *
!          * 3. The tuple might not even be legal for the new table; this is
           * currently only known to happen as an after-effect of ALTER TABLE
           * SET WITHOUT OIDS.
           *
           * So, we must reconstruct the tuple from component Datums.
           */
-         HeapTuple    copiedTuple;
-         int            i;

          heap_deformtuple(tuple, oldTupDesc, values, nulls);

--- 695,743 ----
      /* use_wal off requires rd_targblock be initially invalid */
      Assert(NewHeap->rd_targblock == InvalidBlockNumber);

+     /* Initialize the rewrite operation */
+     rwstate = begin_heap_rewrite(NewHeap, OldestXmin, use_wal);
+
      /*
       * Scan through the OldHeap on the OldIndex and copy each tuple into the
       * NewHeap.
       */
      scan = index_beginscan(OldHeap, OldIndex,
!                            SnapshotAny, 0, (ScanKey) NULL);

      while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
      {
+         HeapTuple    copiedTuple;
+         int            i;
+         bool        isdead;
+
+         LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
+         isdead = HeapTupleSatisfiesVacuum(tuple->t_data,
+                                           OldestXmin,
+                                           scan->xs_cbuf) == HEAPTUPLE_DEAD;
+         LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
+
+         if(isdead)
+         {
+             rewrite_heap_dead_tuple(rwstate, tuple);
+             continue;
+         }
+
          /*
           * We cannot simply pass the tuple to heap_insert(), for several
           * reasons:
           *
!          * 1. We'd like to squeeze out the values of any dropped columns, both
           * to save space and to ensure we have no corner-case failures. (It's
           * possible for example that the new table hasn't got a TOAST table
           * and so is unable to store any large values of dropped cols.)
           *
!          * 2. The tuple might not even be legal for the new table; this is
           * currently only known to happen as an after-effect of ALTER TABLE
           * SET WITHOUT OIDS.
           *
           * So, we must reconstruct the tuple from component Datums.
           */

          heap_deformtuple(tuple, oldTupDesc, values, nulls);

***************
*** 735,747 ****
          if (NewHeap->rd_rel->relhasoids)
              HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));

!         heap_insert(NewHeap, copiedTuple, mycid, use_wal, false);

          heap_freetuple(copiedTuple);

          CHECK_FOR_INTERRUPTS();
      }

      index_endscan(scan);

      pfree(values);
--- 754,768 ----
          if (NewHeap->rd_rel->relhasoids)
              HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));

!         rewrite_heap_tuple(rwstate, tuple, copiedTuple);

          heap_freetuple(copiedTuple);

          CHECK_FOR_INTERRUPTS();
      }

+     end_heap_rewrite(rwstate);
+
      index_endscan(scan);

      pfree(values);
Index: src/backend/commands/rewriteheap.c
===================================================================
RCS file: src/backend/commands/rewriteheap.c
diff -N src/backend/commands/rewriteheap.c
*** /dev/null    1 Jan 1970 00:00:00 -0000
--- src/backend/commands/rewriteheap.c    29 Mar 2007 08:31:00 -0000
***************
*** 0 ****
--- 1,543 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.c
+  *      support functions to rewrite tables.
+  *
+  * These functions provide a facility to completely rewrite a heap, while
+  * preserving visibility information and update chains.
+  *
+  *
+  * INTERFACE
+  *
+  * The caller is responsible for creating the new heap, all catalog
+  * changes, supplying the tuples to be written to the new heap, and
+  * rebuilding indexes. The caller must hold an AccessExclusiveLock on the
+  * table.
+  *
+  * To use the facility:
+  *
+  * begin_heap_rewrite
+  * while(fetch next tuple)
+  * {
+  *     if(tuple is dead)
+  *         rewrite_heap_dead_tuple
+  *     else
+  *     {
+  *         do any transformations here if required
+  *         rewrite_heap_tuple
+  *     }
+  * }
+  * end_heap_rewrite
+  *
+  * The rewrite facility holds the current page it's inserting into pinned
+  * and locked over rewrite_heap_tuple calls. To avoid deadlock, the caller
+  * must not access the new relation while a rewrite is in progress.
+  *
+  * The contents of the new relation shouldn't be relied on until
+  * end_heap_rewrite is called.
+  *
+  *
+  * IMPLEMENTATION
+  *
+  * While we copy the tuples, we need to keep track of ctid-references to
+  * handle update chains correctly. Since the new versions of tuples will
+  * have different ctids than the original ones, if we just copied
+  * everything to the new table the ctid pointers would be wrong. For each
+  * ctid reference from A -> B, we can encounter either A first or B first:
+  *
+  * If we encounter A first, we'll store the tuple in the unresolved_ctids
+  * hash table. When we later encounter B, we remove A from the hash table,
+  * fix the ctid to point to the new location of B, and insert both A and B
+  * to the new heap.
+  *
+  * If we encounter B first, we'll put an entry in the old_new_tid_map hash
+  * table and insert B to the new heap right away. When we later encounter
+  * A, we get the new location of B from the table.
+  *
+  * Note that a tuple in the middle of a chain is both A and B at the same
+  * time. Entries in the hash tables can be removed as soon as the later
+  * tuple is encountered. That helps to keep the memory usage down. At the
+  * end, both tables are usually empty; we should have encountered both A
+  * and B for each ctid reference. However, if there's a chain like this:
+  * RECENTLY_DEAD -> DEAD -> LIVE as determined by HeapTupleSatisfiesVacuum,
+  * we can encounter the dead tuple first, and skip it, and bump into the
+  * RECENTLY_DEAD tuple later. The RECENTLY_DEAD tuple would be added
+  * to unresolved_ctids, and stay there until end of the rewrite.
+  *
+  * Using in-memory hash tables means that we use some memory for each live
+  * update chain in the table, from the time we find one end of the
+  * reference until we find the other end. That shouldn't be a problem in
+  * practice, but if you do something like an UPDATE without a where-clause
+  * on a large table, and then run CLUSTER in the same transaction, you
+  * might run out of memory. It doesn't seem worthwhile to add support to
+  * spill-to-disk, there shouldn't be that many RECENTLY_DEAD tuples in a
+  * table under normal circumstances, and even if we do fail halfway through
+  * a CLUSTER, the old table is still valid.
+  *
+  * We can't use the normal heap_insert function to insert into the new
+  * heap, because heap_insert overwrites the visibility information.
+  * We use a special-purpose raw_heap_insert function instead, which
+  * is optimized for bulk inserting a lot of tuples, knowing that we have
+  * exclusive access to the heap. raw_heap_insert keeps the current buffer
+  * we're inserting into locked over calls, until it gets full. The page
+  * is inserted to WAL as a single record.
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *      $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+
+ #include "access/heapam.h"
+ #include "access/transam.h"
+ #include "access/tuptoaster.h"
+ #include "commands/rewriteheap.h"
+ #include "utils/memutils.h"
+
+ /*
+  * State associated with a rewrite operation. This is opaque to the user
+  * of the rewrite facility.
+  */
+ typedef struct RewriteStateData
+ {
+     Relation        rs_new_rel;            /* new heap */
+     bool            rs_use_wal;            /* WAL-log inserts to new heap? */
+     MemoryContext    rs_cxt;                /* for hash tables and entries and
+                                          * tuples in them */
+     TransactionId    rs_oldest_xmin;        /* oldest xmin used by caller to
+                                          * determine tuple visibility */
+     Buffer            rs_buf;                /* current buffer we're inserting
+                                          * to, exclusively locked */
+     HTAB           *rs_unresolved_ctids;
+     HTAB           *rs_old_new_tid_map;
+ } RewriteStateData;
+
+ /* entry structures for the hash tables */
+
+ typedef struct UnresolvedCtidData {
+     ItemPointerData old_ctid;    /* t_ctid-pointer of the tuple in old heap */
+     ItemPointerData old_tid;    /* location in the old heap */
+     HeapTuple tuple;            /* tuple contents */
+ } UnresolvedCtidData;
+
+ typedef UnresolvedCtidData *UnresolvedCtid;
+
+ typedef struct OldToNewMappingData {
+     ItemPointerData old_tid;    /* location in the old heap */
+     ItemPointerData new_tid;    /* location in the new heap */
+ } OldToNewMappingData;
+
+ typedef OldToNewMappingData *OldToNewMapping;
+
+
+ /* prototypes for internal functions */
+ static void raw_heap_insert(RewriteState state, HeapTuple tup);
+
+
+ /*
+  * Begin a rewrite of a table
+  *
+  * new_heap        new, empty heap relation to insert tuples to
+  * oldest_xmin    xid used by the caller to determine which tuples
+  *                are dead.
+  * use_wal        should the inserts to the new heap be WAL-logged
+  *
+  * Returns an opaque RewriteState, allocated in current memory context,
+  * to be used in subseuqent calls to the other functions.
+  */
+ RewriteState
+ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin,
+                    bool use_wal)
+ {
+     RewriteState state;
+     HASHCTL        hash_ctl;
+
+     state = palloc(sizeof(RewriteStateData));
+     state->rs_cxt = AllocSetContextCreate(CurrentMemoryContext,
+                                           "Table rewrite",
+                                           ALLOCSET_DEFAULT_MINSIZE,
+                                           ALLOCSET_DEFAULT_INITSIZE,
+                                           ALLOCSET_DEFAULT_MAXSIZE);
+
+     state->rs_oldest_xmin = oldest_xmin;
+     state->rs_new_rel = new_heap;
+     state->rs_use_wal = use_wal;
+     state->rs_buf = InvalidBuffer;
+
+     /* Initialize hash tables used to track update chains */
+     memset(&hash_ctl, 0, sizeof(hash_ctl));
+     hash_ctl.keysize = sizeof(ItemPointerData);
+     hash_ctl.entrysize = sizeof(UnresolvedCtidData);
+     hash_ctl.hcxt = state->rs_cxt;
+     hash_ctl.hash = tag_hash;
+
+     state->rs_unresolved_ctids =
+         hash_create("Rewrite / Unresolved ctids",
+                     128, /* arbitrary initial size */
+                     &hash_ctl,
+                     HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+     hash_ctl.entrysize = sizeof(OldToNewMappingData);
+
+     state->rs_old_new_tid_map =
+         hash_create("Rewrite / Old to new tid map",
+                     128, /* arbitrary initial size */
+                     &hash_ctl,
+                     HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+     return state;
+ }
+
+ /*
+  * End a rewrite.
+  *
+  * state and any other resources are freed.
+  */
+ void
+ end_heap_rewrite(RewriteState state)
+ {
+     HASH_SEQ_STATUS seq_status;
+     UnresolvedCtid unresolved;
+
+     /* Write any remaining tuples in the unresolvedCtids table. If we have
+      * any left, they should in fact be dead, but let's err on the safe
+      * side.
+      */
+     hash_seq_init(&seq_status, state->rs_unresolved_ctids);
+
+     while((unresolved = hash_seq_search(&seq_status)) != NULL)
+     {
+         ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+         raw_heap_insert(state, unresolved->tuple);
+
+         /* don't bother removing and pfreeing the entry, we're going
+          * to free the whole memory context after the scan anyway
+          */
+     }
+
+     /* WAL-log and unlock the last page */
+     if(BufferIsValid(state->rs_buf))
+     {
+         if(state->rs_use_wal)
+             log_newpage(&state->rs_new_rel->rd_node,
+                         BufferGetBlockNumber(state->rs_buf),
+                         BufferGetPage(state->rs_buf));
+
+         MarkBufferDirty(state->rs_buf);
+
+         UnlockReleaseBuffer(state->rs_buf);
+         state->rs_buf = InvalidBuffer;
+     }
+
+     MemoryContextDelete(state->rs_cxt);
+     pfree(state);
+ }
+
+ /*
+  * Register a dead tuple with an ongoing rewrite. Dead tuples are not
+  * copied to the new table, but we still make note of them so that we
+  * can release some resources earlier.
+  */
+ void
+ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
+ {
+     /* If we have already seen an earlier tuple in the update chain that
+      * points to this tuple, let's forget about that earlier tuple. It's
+      * in fact dead as well, our simple xmax < OldestXmin test in
+      * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It
+      * happens when xmin of a tuple is greater than xmax, which sounds
+      * counter-intuitive but is perfectly valid.
+      *
+      * We don't bother to try to detect the situation the other way
+      * round, when we encounter the dead tuple first and then the
+      * recently dead one that points to it. If that happens, we'll
+      * have some unmatched entries in the unresolvedCtids hash table
+      * at the end. That can happen anyway, because a vacuum might
+      * have removed the dead tuple in the chain before us.
+      */
+     hash_search(state->rs_unresolved_ctids,
+                 (void *) &(old_tuple->t_self),
+                 HASH_REMOVE, NULL);
+ }
+
+ /*
+  * Add a tuple to the new heap.
+  *
+  * Visibility information is copied from the original tuple.
+  *
+  * state        opaque state as returned by begin_heap_rewrite
+  * old_tuple    original tuple in the old heap
+  * new_tuple    new, rewritten tuple to be inserted to new heap
+  */
+ void
+ rewrite_heap_tuple(RewriteState state,
+                    HeapTuple old_tuple, HeapTuple new_tuple)
+ {
+     MemoryContext saved_cxt;
+     ItemPointerData old_tid;
+
+     saved_cxt = MemoryContextSwitchTo(state->rs_cxt);
+
+     /* Preserve the visibility information. Also preserve the HEAP_UPDATED
+      * flag; if the original tuple was an updated row version, so is the
+      * new one.
+      */
+     memcpy(&new_tuple->t_data->t_choice.t_heap,
+            &old_tuple->t_data->t_choice.t_heap,
+            sizeof(HeapTupleFields));
+
+     new_tuple->t_data->t_infomask &= ~(HEAP_XACT_MASK | HEAP_UPDATED);
+     new_tuple->t_data->t_infomask |=
+         old_tuple->t_data->t_infomask & (HEAP_XACT_MASK | HEAP_UPDATED);
+
+     /* Invalid ctid means that ctid should point to the tuple itself.
+      * We'll override it below if the tuple is part of an update chain.
+      */
+     ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
+
+     /* When we encounter a tuple that has been updated, check
+      * the old-to-new mapping hash table. If a match is found there,
+      * we've already copied the tuple that t_ctid points to, so we
+      * set the ctid of this tuple point to the new location, and insert
+      * the new tuple. Otherwise we store the tuple in the unresolved_ctids
+      * hash table and deal with it later, after inserting the tuple t_ctid
+      * points to.
+      */
+     if (!(old_tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
+                                        HEAP_IS_LOCKED)) &&
+         !(ItemPointerEquals(&(old_tuple->t_self),
+                             &(old_tuple->t_data->t_ctid))))
+     {
+         OldToNewMapping mapping;
+
+         mapping = (OldToNewMapping)
+             hash_search(state->rs_old_new_tid_map,
+                         (void *) &(old_tuple->t_data->t_ctid),
+                         HASH_FIND, NULL);
+
+         if (mapping != NULL)
+         {
+             /* We've already copied the tuple t_ctid points to.
+              * Use the new tid of that tuple */
+             new_tuple->t_data->t_ctid = mapping->new_tid;
+
+             hash_search(state->rs_old_new_tid_map,
+                         (void *) &(old_tuple->t_data->t_ctid),
+                         HASH_REMOVE, NULL);
+         }
+         else
+         {
+             /* We haven't seen the tuple t_ctid points to yet. Stash
+              * it into unresolved_ctids table and deal with it later.
+              */
+             UnresolvedCtid unresolved;
+             bool found;
+
+             unresolved = hash_search(state->rs_unresolved_ctids,
+                                      (void *) &(old_tuple->t_data->t_ctid),
+                                      HASH_ENTER, &found);
+             Assert(!found);
+
+             unresolved->old_tid = old_tuple->t_self;
+             unresolved->tuple = heap_copytuple(new_tuple);
+
+             MemoryContextSwitchTo(saved_cxt);
+             return;
+         }
+     }
+
+     old_tid = old_tuple->t_self;
+
+     for(;;)
+     {
+         ItemPointerData new_tid;
+
+         /* Insert the tuple */
+         raw_heap_insert(state, new_tuple);
+         new_tid = new_tuple->t_self;
+
+         /*
+          * When we encounter a tuple that's an updated version
+          * of a row, check if the previous tuple in the update chain
+          * is still visible to someone. The previous tuple's xmax
+          * is equal to this tuples xmin, so we can do the same check
+          * HeapTupleSatisfiesVacuum would do for the previous tuple,
+          * using xmin of this tuple. We know that xmin of this tuple
+          * committed, otherwise this tuple would be dead and the caller
+          * wouldn't have passed it to us in the first place.
+          */
+         if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
+             !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
+                                    state->rs_oldest_xmin))
+         {
+             /* The previous tuple in the update chain is RECENTLY_DEAD.
+              * Let's see if we've seen it already. */
+             UnresolvedCtid unresolved;
+
+             unresolved = hash_search(state->rs_unresolved_ctids,
+                                      (void *) &old_tid,
+                                      HASH_FIND, NULL);
+
+             if (unresolved != NULL)
+             {
+                 /* We have seen and memorized the previous tuple already.
+                  * Now that we know where we inserted the tuple its t_ctid
+                  * points to, fix its t_ctid and insert it to the new heap.
+                  */
+                 HeapTuple        prev_tup     = unresolved->tuple;
+                 ItemPointerData prev_old_tid = unresolved->old_tid;
+
+                 prev_tup->t_data->t_ctid = new_tid;
+
+                 hash_search(state->rs_unresolved_ctids,
+                             (void *) &old_tid,
+                             HASH_REMOVE, NULL);
+
+                 /* loop back to insert the previous tuple in the chain */
+                 old_tid = prev_old_tid;
+                 new_tuple = prev_tup;
+                 continue;
+             }
+             else
+             {
+                 /* Remember the new tid of this tuple. We'll use it to set
+                  * the ctid when we find the previous tuple in the chain
+                  */
+                 OldToNewMapping mapping;
+                 bool found;
+
+                 mapping = hash_search(state->rs_old_new_tid_map,
+                                       (void *) &old_tid,
+                                       HASH_ENTER, &found);
+                 Assert(!found);
+
+                 mapping->new_tid = new_tid;
+             }
+         }
+         break;
+     }
+
+     MemoryContextSwitchTo(saved_cxt);
+ }
+
+ /*
+  * Insert a tuple to the new relation.
+  *
+  * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
+  * tuple is invalid on entry, it's replaced with the new TID as well.
+  */
+ static void
+ raw_heap_insert(RewriteState state, HeapTuple new_tup)
+ {
+     BlockNumber        blkno    = InvalidBlockNumber;
+     Page            page    = NULL;
+     Size            pageFreeSpace, saveFreeSpace;
+     Size            len;
+     OffsetNumber    newoff;
+     HeapTuple        tup;
+
+     /*
+      * If the new tuple is too big for storage or contains already toasted
+      * out-of-line attributes from some other relation, invoke the toaster.
+      *
+      * Note: below this point, tup is the data we actually intend to store
+      * into the relation; new_tup is the caller's original untoasted data.
+      */
+     if (HeapTupleHasExternal(new_tup) || new_tup->t_len > TOAST_TUPLE_THRESHOLD)
+         tup = toast_insert_or_update(state->rs_new_rel, new_tup, NULL,
+                                      state->rs_use_wal, false);
+     else
+         tup = new_tup;
+
+     len = MAXALIGN(tup->t_len);        /* be conservative */
+
+     /*
+      * If we're gonna fail for oversize tuple, do it right away
+      */
+     if (len > MaxHeapTupleSize)
+         ereport(ERROR,
+                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                  errmsg("row is too big: size %lu, maximum size %lu",
+                         (unsigned long) len,
+                         (unsigned long) MaxHeapTupleSize)));
+
+     /* Compute desired extra freespace due to fillfactor option */
+     saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
+                                                    HEAP_DEFAULT_FILLFACTOR);
+
+     /* Now we can check to see if there's enough free space here. */
+     if (BufferIsValid(state->rs_buf))
+     {
+         blkno = BufferGetBlockNumber(state->rs_buf);
+         page = (Page) BufferGetPage(state->rs_buf);
+         pageFreeSpace = PageGetFreeSpace(page);
+
+         if (len + saveFreeSpace > pageFreeSpace)
+         {
+             if(state->rs_use_wal)
+                 log_newpage(&state->rs_new_rel->rd_node, blkno, page);
+
+             MarkBufferDirty(state->rs_buf);
+
+             UnlockReleaseBuffer(state->rs_buf);
+             state->rs_buf = InvalidBuffer;
+         }
+     }
+
+     if (!BufferIsValid(state->rs_buf))
+     {
+         /* There's no current buffer to insert to. Extend relation to get
+          * a new page.
+          */
+         state->rs_buf = ReadBuffer(state->rs_new_rel, P_NEW);
+         blkno = BufferGetBlockNumber(state->rs_buf);
+         page = (Page) BufferGetPage(state->rs_buf);
+
+         LockBuffer(state->rs_buf, BUFFER_LOCK_EXCLUSIVE);
+
+         /*
+          * We need to initialize the empty new page.  Double-check that it
+          * really is empty (this should never happen, but if it does we
+          * don't want to risk wiping out valid data).
+          */
+
+         if (!PageIsNew((PageHeader) page))
+             elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
+                  BufferGetBlockNumber(state->rs_buf),
+                  RelationGetRelationName(state->rs_new_rel));
+
+         PageInit(page, BufferGetPageSize(state->rs_buf), 0);
+     }
+
+     /* We now have a page pinned and locked to insert the new tuple to */
+
+     newoff = PageAddItem(page, (Item) tup->t_data, len,
+                          InvalidOffsetNumber, LP_USED);
+     if (newoff == InvalidOffsetNumber)
+         elog(ERROR, "failed to add tuple");
+
+     /* Update t_self to the actual position where it was stored */
+     ItemPointerSet(&(new_tup->t_self), blkno, newoff);
+
+     /* Insert the correct position into CTID of the stored tuple, too,
+      * if the caller didn't supply a valid CTID.
+      */
+     if(!ItemPointerIsValid(&new_tup->t_data->t_ctid))
+     {
+         ItemId            newitemid;
+         HeapTupleHeader onpage_tup;
+
+         newitemid = PageGetItemId(page, newoff);
+         onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
+
+         onpage_tup->t_ctid = new_tup->t_self;
+     }
+
+     /* If tup is a private copy, release it. */
+     if (tup != new_tup)
+         heap_freetuple(tup);
+ }
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.218
diff -c -r1.218 tablecmds.c
*** src/backend/commands/tablecmds.c    19 Mar 2007 23:38:29 -0000    1.218
--- src/backend/commands/tablecmds.c    20 Mar 2007 18:59:08 -0000
***************
*** 5855,5890 ****
      {
          smgrread(src, blkno, buf);

-         /* XLOG stuff */
          if (use_wal)
!         {
!             xl_heap_newpage xlrec;
!             XLogRecPtr    recptr;
!             XLogRecData rdata[2];
!
!             /* NO ELOG(ERROR) from here till newpage op is logged */
!             START_CRIT_SECTION();
!
!             xlrec.node = dst->smgr_rnode;
!             xlrec.blkno = blkno;
!
!             rdata[0].data = (char *) &xlrec;
!             rdata[0].len = SizeOfHeapNewpage;
!             rdata[0].buffer = InvalidBuffer;
!             rdata[0].next = &(rdata[1]);
!
!             rdata[1].data = (char *) page;
!             rdata[1].len = BLCKSZ;
!             rdata[1].buffer = InvalidBuffer;
!             rdata[1].next = NULL;
!
!             recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
!
!             PageSetLSN(page, recptr);
!             PageSetTLI(page, ThisTimeLineID);
!
!             END_CRIT_SECTION();
!         }

          /*
           * Now write the page.    We say isTemp = true even if it's not a temp
--- 5855,5862 ----
      {
          smgrread(src, blkno, buf);

          if (use_wal)
!             log_newpage(&dst->smgr_rnode, blkno, page);

          /*
           * Now write the page.    We say isTemp = true even if it's not a temp
Index: src/include/access/heapam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v
retrieving revision 1.121
diff -c -r1.121 heapam.h
*** src/include/access/heapam.h    29 Mar 2007 00:15:39 -0000    1.121
--- src/include/access/heapam.h    29 Mar 2007 08:27:34 -0000
***************
*** 194,199 ****
--- 194,200 ----
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
                                    TransactionId cutoff_xid,
                                    OffsetNumber *offsets, int offcnt);
+ extern XLogRecPtr log_newpage(RelFileNode *rnode, BlockNumber blk, Page page);

  /* in common/heaptuple.c */
  extern Size heap_compute_data_size(TupleDesc tupleDesc,
Index: src/include/commands/rewriteheap.h
===================================================================
RCS file: src/include/commands/rewriteheap.h
diff -N src/include/commands/rewriteheap.h
*** /dev/null    1 Jan 1970 00:00:00 -0000
--- src/include/commands/rewriteheap.h    20 Mar 2007 17:52:15 -0000
***************
*** 0 ****
--- 1,29 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.h
+  *      header file for heap rewrite support functions
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef REWRITE_HEAP_H
+ #define REWRITE_HEAP_H
+
+ #include "access/htup.h"
+ #include "utils/rel.h"
+
+ /* definition is private to rewriteheap.c */
+ typedef struct RewriteStateData *RewriteState;
+
+ extern RewriteState begin_heap_rewrite(Relation NewHeap,
+                                        TransactionId OldestXmin, bool use_wal);
+ extern void end_heap_rewrite(RewriteState state);
+ extern void rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
+ extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
+                                HeapTuple newTuple);
+
+ #endif   /* REWRITE_H */
Index: src/test/regress/expected/cluster.out
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/expected/cluster.out,v
retrieving revision 1.17
diff -c -r1.17 cluster.out
*** src/test/regress/expected/cluster.out    7 Jul 2005 20:40:01 -0000    1.17
--- src/test/regress/expected/cluster.out    20 Mar 2007 16:50:04 -0000
***************
*** 382,389 ****
--- 382,428 ----
   2
  (2 rows)

+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "clustertest_pkey" for table "clustertest"
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+ -- Test longer update chain
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+ SELECT * FROM clustertest;
+  key
+ -----
+   20
+   30
+  100
+   35
+   70
+ (5 rows)
+
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+  key
+ -----
+   20
+   30
+   35
+   70
+  100
+ (5 rows)
+
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;
Index: src/test/regress/sql/cluster.sql
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/sql/cluster.sql,v
retrieving revision 1.9
diff -c -r1.9 cluster.sql
*** src/test/regress/sql/cluster.sql    7 Jul 2005 20:40:02 -0000    1.9
--- src/test/regress/sql/cluster.sql    20 Mar 2007 17:52:48 -0000
***************
*** 153,160 ****
--- 153,187 ----
  CLUSTER clstr_1;
  SELECT * FROM clstr_1;

+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+
+ -- Test longer update chain
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+
+ SELECT * FROM clustertest;
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;

pgsql-patches by date:

Previous
From: Hartmut Benz
Date:
Subject: Small addition to PGInterval
Next
From: Heikki Linnakangas
Date:
Subject: Re: Small addition to PGInterval