Make CLUSTER MVCC-safe - Mailing list pgsql-patches

From Heikki Linnakangas
Subject Make CLUSTER MVCC-safe
Date
Msg-id 460025FD.6080208@enterprisedb.com
Whole thread Raw
Responses Re: Make CLUSTER MVCC-safe
Re: Make CLUSTER MVCC-safe
Re: Make CLUSTER MVCC-safe
List pgsql-patches
This patch makes CLUSTER MVCC-safe. Visibility information and update
chains are preserved like in VACUUM FULL.

I created a new generic rewriteheap-facility to handle rewriting tables
in a visibility-preserving manner. All the update chain tracking is done
in rewriteheap.c, the caller is responsible for supplying the stream of
tuples.

CLUSTER is currently the only user of the facility, but I'm envisioning
we might have other users in the future. For example, a version of
VACUUM FULL that rewrites the whole table. We could also use it to make
ALTER TABLE MVCC-safe, but there's some issues with that. For example,
what to do if RECENTLY_DEAD tuples don't satisfy a newly added constraint.

One complication in the implementation was the fact that heap_insert
overwrites the visibility information, and it doesn't write the full
tuple header to WAL. I ended up implementing a special-purpose
raw_heap_insert function instead, which is optimized for bulk inserting
a lot of tuples, knowing that we have exclusive access to the heap.
raw_heap_insert keeps the current buffer locked over calls, until it
gets full, and inserts the whole page to WAL as a single record using
the existing XLOG_HEAP_NEWPAGE record type.

This makes CLUSTER a more viable alternative to VACUUM FULL. One
motivation for making CLUSTER MVCC-safe is that if some poor soul runs
pg_dump to make a backup concurrently with CLUSTER, the clustered tables
will appear to be empty in the dump file.

The documentation doesn't anything about CLUSTER not being MVCC-safe, so
I suppose there's no need to touch the docs. I sent a doc patch earlier
to add a note about it, that doc patch should still be applied to older
release branches, IMO.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.228
diff -c -r1.228 heapam.c
*** src/backend/access/heap/heapam.c    9 Feb 2007 03:35:33 -0000    1.228
--- src/backend/access/heap/heapam.c    20 Mar 2007 17:55:16 -0000
***************
*** 3284,3289 ****
--- 3284,3326 ----
      return log_heap_update(reln, oldbuf, from, newbuf, newtup, true);
  }

+ /*
+  * Perofrm XLogInsert of a full page image to WAL. Caller must make sure
+  * the page contents on disk are consistent with the page inserted to WAL.
+  */
+ XLogRecPtr
+ log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page)
+ {
+     xl_heap_newpage xlrec;
+     XLogRecPtr    recptr;
+     XLogRecData rdata[2];
+
+     /* NO ELOG(ERROR) from here till newpage op is logged */
+     START_CRIT_SECTION();
+
+     xlrec.node = *rnode;
+     xlrec.blkno = blkno;
+
+     rdata[0].data = (char *) &xlrec;
+     rdata[0].len = SizeOfHeapNewpage;
+     rdata[0].buffer = InvalidBuffer;
+     rdata[0].next = &(rdata[1]);
+
+     rdata[1].data = (char *) page;
+     rdata[1].len = BLCKSZ;
+     rdata[1].buffer = InvalidBuffer;
+     rdata[1].next = NULL;
+
+     recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
+
+     PageSetLSN(page, recptr);
+     PageSetTLI(page, ThisTimeLineID);
+
+     END_CRIT_SECTION();
+
+     return recptr;
+ }
+
  static void
  heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
  {
Index: src/backend/access/nbtree/nbtsort.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtsort.c,v
retrieving revision 1.110
diff -c -r1.110 nbtsort.c
*** src/backend/access/nbtree/nbtsort.c    9 Jan 2007 02:14:10 -0000    1.110
--- src/backend/access/nbtree/nbtsort.c    20 Mar 2007 17:58:20 -0000
***************
*** 64,69 ****
--- 64,70 ----

  #include "postgres.h"

+ #include "access/heapam.h"
  #include "access/nbtree.h"
  #include "miscadmin.h"
  #include "storage/smgr.h"
***************
*** 265,296 ****
      if (wstate->btws_use_wal)
      {
          /* We use the heap NEWPAGE record type for this */
!         xl_heap_newpage xlrec;
!         XLogRecPtr    recptr;
!         XLogRecData rdata[2];
!
!         /* NO ELOG(ERROR) from here till newpage op is logged */
!         START_CRIT_SECTION();
!
!         xlrec.node = wstate->index->rd_node;
!         xlrec.blkno = blkno;
!
!         rdata[0].data = (char *) &xlrec;
!         rdata[0].len = SizeOfHeapNewpage;
!         rdata[0].buffer = InvalidBuffer;
!         rdata[0].next = &(rdata[1]);
!
!         rdata[1].data = (char *) page;
!         rdata[1].len = BLCKSZ;
!         rdata[1].buffer = InvalidBuffer;
!         rdata[1].next = NULL;
!
!         recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
!
!         PageSetLSN(page, recptr);
!         PageSetTLI(page, ThisTimeLineID);
!
!         END_CRIT_SECTION();
      }
      else
      {
--- 266,272 ----
      if (wstate->btws_use_wal)
      {
          /* We use the heap NEWPAGE record type for this */
!         log_newpage(&wstate->index->rd_node, blkno, page);
      }
      else
      {
Index: src/backend/commands/Makefile
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/Makefile,v
retrieving revision 1.35
diff -c -r1.35 Makefile
*** src/backend/commands/Makefile    20 Jan 2007 17:16:11 -0000    1.35
--- src/backend/commands/Makefile    20 Mar 2007 17:04:58 -0000
***************
*** 16,22 ****
      conversioncmds.o copy.o \
      dbcommands.o define.o explain.o functioncmds.o \
      indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
!     portalcmds.o prepare.o proclang.o \
      schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
      typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o

--- 16,22 ----
      conversioncmds.o copy.o \
      dbcommands.o define.o explain.o functioncmds.o \
      indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
!     rewriteheap.o portalcmds.o prepare.o proclang.o \
      schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
      typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o

Index: src/backend/commands/cluster.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/cluster.c,v
retrieving revision 1.157
diff -c -r1.157 cluster.c
*** src/backend/commands/cluster.c    13 Mar 2007 00:33:39 -0000    1.157
--- src/backend/commands/cluster.c    20 Mar 2007 17:56:43 -0000
***************
*** 20,25 ****
--- 20,26 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/xact.h"
+ #include "access/transam.h"
  #include "catalog/catalog.h"
  #include "catalog/dependency.h"
  #include "catalog/heap.h"
***************
*** 36,41 ****
--- 37,44 ----
  #include "utils/memutils.h"
  #include "utils/syscache.h"
  #include "utils/relcache.h"
+ #include "storage/procarray.h"
+ #include "commands/rewriteheap.h"


  /*
***************
*** 653,658 ****
--- 656,663 ----
      char       *nulls;
      IndexScanDesc scan;
      HeapTuple    tuple;
+     TransactionId OldestXmin;
+     RewriteState rwstate;

      /*
       * Open the relations we need.
***************
*** 675,711 ****
      nulls = (char *) palloc(natts * sizeof(char));
      memset(nulls, 'n', natts * sizeof(char));

      /*
       * Scan through the OldHeap on the OldIndex and copy each tuple into the
       * NewHeap.
       */
      scan = index_beginscan(OldHeap, OldIndex,
!                            SnapshotNow, 0, (ScanKey) NULL);

      while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
      {
          /*
           * We cannot simply pass the tuple to heap_insert(), for several
           * reasons:
           *
!          * 1. heap_insert() will overwrite the commit-status fields of the
!          * tuple it's handed.  This would trash the source relation, which is
!          * bad news if we abort later on.  (This was a bug in releases thru
!          * 7.0)
!          *
!          * 2. We'd like to squeeze out the values of any dropped columns, both
           * to save space and to ensure we have no corner-case failures. (It's
           * possible for example that the new table hasn't got a TOAST table
           * and so is unable to store any large values of dropped cols.)
           *
!          * 3. The tuple might not even be legal for the new table; this is
           * currently only known to happen as an after-effect of ALTER TABLE
           * SET WITHOUT OIDS.
           *
           * So, we must reconstruct the tuple from component Datums.
           */
-         HeapTuple    copiedTuple;
-         int            i;

          heap_deformtuple(tuple, oldTupDesc, values, nulls);

--- 680,731 ----
      nulls = (char *) palloc(natts * sizeof(char));
      memset(nulls, 'n', natts * sizeof(char));

+     /* Get an OldestXmin to use to weed out dead tuples */
+     OldestXmin = GetOldestXmin(OldHeap->rd_rel->relisshared, false);
+
+     /* Initialize the rewrite operation */
+     rwstate = begin_heap_rewrite(NewHeap, OldestXmin, true);
+
      /*
       * Scan through the OldHeap on the OldIndex and copy each tuple into the
       * NewHeap.
       */
      scan = index_beginscan(OldHeap, OldIndex,
!                            SnapshotAny, 0, (ScanKey) NULL);

      while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
      {
+         HeapTuple    copiedTuple;
+         int            i;
+         bool        isdead;
+
+         LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
+         isdead = HeapTupleSatisfiesVacuum(tuple->t_data,
+                                           OldestXmin,
+                                           scan->xs_cbuf) == HEAPTUPLE_DEAD;
+         LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
+
+         if(isdead)
+         {
+             rewrite_heap_dead_tuple(rwstate, tuple);
+             continue;
+         }
+
          /*
           * We cannot simply pass the tuple to heap_insert(), for several
           * reasons:
           *
!          * 1. We'd like to squeeze out the values of any dropped columns, both
           * to save space and to ensure we have no corner-case failures. (It's
           * possible for example that the new table hasn't got a TOAST table
           * and so is unable to store any large values of dropped cols.)
           *
!          * 2. The tuple might not even be legal for the new table; this is
           * currently only known to happen as an after-effect of ALTER TABLE
           * SET WITHOUT OIDS.
           *
           * So, we must reconstruct the tuple from component Datums.
           */

          heap_deformtuple(tuple, oldTupDesc, values, nulls);

***************
*** 722,734 ****
          if (NewHeap->rd_rel->relhasoids)
              HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));

!         simple_heap_insert(NewHeap, copiedTuple);

          heap_freetuple(copiedTuple);

          CHECK_FOR_INTERRUPTS();
      }

      index_endscan(scan);

      pfree(values);
--- 742,756 ----
          if (NewHeap->rd_rel->relhasoids)
              HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));

!         rewrite_heap_tuple(rwstate, tuple, copiedTuple);

          heap_freetuple(copiedTuple);

          CHECK_FOR_INTERRUPTS();
      }

+     end_heap_rewrite(rwstate);
+
      index_endscan(scan);

      pfree(values);
Index: src/backend/commands/rewriteheap.c
===================================================================
RCS file: src/backend/commands/rewriteheap.c
diff -N src/backend/commands/rewriteheap.c
*** /dev/null    1 Jan 1970 00:00:00 -0000
--- src/backend/commands/rewriteheap.c    20 Mar 2007 17:57:32 -0000
***************
*** 0 ****
--- 1,543 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.c
+  *      support functions to rewrite tables.
+  *
+  * These functions provide a facility to completely rewrite a heap, while
+  * preserving visibility information and update chains.
+  *
+  *
+  * INTERFACE
+  *
+  * The caller is responsible for creating the new heap, all catalog
+  * changes, supplying the tuples to be written to the new heap, and
+  * rebuilding indexes. The caller must hold an AccessExclusiveLock on the
+  * table.
+  *
+  * To use the facility:
+  *
+  * begin_heap_rewrite
+  * while(fetch next tuple)
+  * {
+  *     if(tuple is dead)
+  *         rewrite_heap_dead_tuple
+  *     else
+  *     {
+  *         do any transformations here if required
+  *         rewrite_heap_tuple
+  *     }
+  * }
+  * end_heap_rewrite
+  *
+  * The rewrite facility holds the current page it's inserting into pinned
+  * and locked over rewrite_heap_tuple calls. To avoid deadlock, the caller
+  * must not access the new relation while a rewrite is in progress.
+  *
+  * The contents of the new relation shouldn't be relied on until
+  * end_heap_rewrite is called.
+  *
+  *
+  * IMPLEMENTATION
+  *
+  * While we copy the tuples, we need to keep track of ctid-references to
+  * handle update chains correctly. Since the new versions of tuples will
+  * have different ctids than the original ones, if we just copied
+  * everything to the new table the ctid pointers would be wrong. For each
+  * ctid reference from A -> B, we can encounter either A first or B first:
+  *
+  * If we encounter A first, we'll store the tuple in the unresolved_ctids
+  * hash table. When we later encounter B, we remove A from the hash table,
+  * fix the ctid to point to the new location of B, and insert both A and B
+  * to the new heap.
+  *
+  * If we encounter B first, we'll put an entry in the old_new_tid_map hash
+  * table and insert B to the new heap right away. When we later encounter
+  * A, we get the new location of B from the table.
+  *
+  * Note that a tuple in the middle of a chain is both A and B at the same
+  * time. Entries in the hash tables can be removed as soon as the later
+  * tuple is encountered. That helps to keep the memory usage down. At the
+  * end, both tables are usually empty; we should have encountered both A
+  * and B for each ctid reference. However, if there's a chain like this:
+  * RECENTLY_DEAD -> DEAD -> LIVE as determined by HeapTupleSatisfiesVacuum,
+  * we can encounter the dead tuple first, and skip it, and bump into the
+  * RECENTLY_DEAD tuple later. The RECENTLY_DEAD tuple would be added
+  * to unresolved_ctids, and stay there until end of the rewrite.
+  *
+  * Using in-memory hash tables means that we use some memory for each live
+  * update chain in the table, from the time we find one end of the
+  * reference until we find the other end. That shouldn't be a problem in
+  * practice, but if you do something like an UPDATE without a where-clause
+  * on a large table, and then run CLUSTER in the same transaction, you
+  * might run out of memory. It doesn't seem worthwhile to add support to
+  * spill-to-disk, there shouldn't be that many RECENTLY_DEAD tuples in a
+  * table under normal circumstances, and even if we do fail halfway through
+  * a CLUSTER, the old table is still valid.
+  *
+  * We can't use the normal heap_insert function to insert into the new
+  * heap, because heap_insert overwrites the visibility information.
+  * We use a special-purpose raw_heap_insert function instead, which
+  * is optimized for bulk inserting a lot of tuples, knowing that we have
+  * exclusive access to the heap. raw_heap_insert keeps the current buffer
+  * we're inserting into locked over calls, until it gets full. The page
+  * is inserted to WAL as a single record.
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *      $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+
+ #include "access/heapam.h"
+ #include "access/transam.h"
+ #include "access/tuptoaster.h"
+ #include "commands/rewriteheap.h"
+ #include "utils/memutils.h"
+
+ /*
+  * State associated with a rewrite operation. This is opaque to the user
+  * of the rewrite facility.
+  */
+ typedef struct RewriteStateData
+ {
+     Relation        rs_new_rel;            /* new heap */
+     bool            rs_use_wal;            /* WAL-log inserts to new heap? */
+     MemoryContext    rs_cxt;                /* for hash tables and entries and
+                                          * tuples in them */
+     TransactionId    rs_oldest_xmin;        /* oldest xmin used by caller to
+                                          * determine tuple visibility */
+     Buffer            rs_buf;                /* current buffer we're inserting
+                                          * to, exclusively locked */
+     HTAB           *rs_unresolved_ctids;
+     HTAB           *rs_old_new_tid_map;
+ } RewriteStateData;
+
+ /* entry structures for the hash tables */
+
+ typedef struct UnresolvedCtidData {
+     ItemPointerData old_ctid;    /* t_ctid-pointer of the tuple in old heap */
+     ItemPointerData old_tid;    /* location in the old heap */
+     HeapTuple tuple;            /* tuple contents */
+ } UnresolvedCtidData;
+
+ typedef UnresolvedCtidData *UnresolvedCtid;
+
+ typedef struct OldToNewMappingData {
+     ItemPointerData old_tid;    /* location in the old heap */
+     ItemPointerData new_tid;    /* location in the new heap */
+ } OldToNewMappingData;
+
+ typedef OldToNewMappingData *OldToNewMapping;
+
+
+ /* prototypes for internal functions */
+ static void raw_heap_insert(RewriteState state, HeapTuple tup);
+
+
+ /*
+  * Begin a rewrite of a table
+  *
+  * new_heap        new, empty heap relation to insert tuples to
+  * oldest_xmin    xid used by the caller to determine which tuples
+  *                are dead.
+  * use_wal        should the inserts to the new heap be WAL-logged
+  *
+  * Returns an opaque RewriteState, allocated in current memory context,
+  * to be used in subseuqent calls to the other functions.
+  */
+ RewriteState
+ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin,
+                    bool use_wal)
+ {
+     RewriteState state;
+     HASHCTL        hash_ctl;
+
+     state = palloc(sizeof(RewriteStateData));
+     state->rs_cxt = AllocSetContextCreate(CurrentMemoryContext,
+                                           "Table rewrite",
+                                           ALLOCSET_DEFAULT_MINSIZE,
+                                           ALLOCSET_DEFAULT_INITSIZE,
+                                           ALLOCSET_DEFAULT_MAXSIZE);
+
+     state->rs_oldest_xmin = oldest_xmin;
+     state->rs_new_rel = new_heap;
+     state->rs_use_wal = use_wal;
+     state->rs_buf = InvalidBuffer;
+
+     /* Initialize hash tables used to track update chains */
+     memset(&hash_ctl, 0, sizeof(hash_ctl));
+     hash_ctl.keysize = sizeof(ItemPointerData);
+     hash_ctl.entrysize = sizeof(UnresolvedCtidData);
+     hash_ctl.hcxt = state->rs_cxt;
+     hash_ctl.hash = tag_hash;
+
+     state->rs_unresolved_ctids =
+         hash_create("Rewrite / Unresolved ctids",
+                     128, /* arbitrary initial size */
+                     &hash_ctl,
+                     HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+     hash_ctl.entrysize = sizeof(OldToNewMappingData);
+
+     state->rs_old_new_tid_map =
+         hash_create("Rewrite / Old to new tid map",
+                     128, /* arbitrary initial size */
+                     &hash_ctl,
+                     HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+     return state;
+ }
+
+ /*
+  * End a rewrite.
+  *
+  * state and any other resources are freed.
+  */
+ void
+ end_heap_rewrite(RewriteState state)
+ {
+     HASH_SEQ_STATUS seq_status;
+     UnresolvedCtid unresolved;
+
+     /* Write any remaining tuples in the unresolvedCtids table. If we have
+      * any left, they should in fact be dead, but let's err on the safe
+      * side.
+      */
+     hash_seq_init(&seq_status, state->rs_unresolved_ctids);
+
+     while((unresolved = hash_seq_search(&seq_status)) != NULL)
+     {
+         ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+         raw_heap_insert(state, unresolved->tuple);
+
+         /* don't bother removing and pfreeing the entry, we're going
+          * to free the whole memory context after the scan anyway
+          */
+     }
+
+     /* WAL-log and unlock the last page */
+     if(BufferIsValid(state->rs_buf))
+     {
+         if(state->rs_use_wal)
+             log_newpage(&state->rs_new_rel->rd_node,
+                         BufferGetBlockNumber(state->rs_buf),
+                         BufferGetPage(state->rs_buf));
+
+         MarkBufferDirty(state->rs_buf);
+
+         UnlockReleaseBuffer(state->rs_buf);
+         state->rs_buf = InvalidBuffer;
+     }
+
+     MemoryContextDelete(state->rs_cxt);
+     pfree(state);
+ }
+
+ /*
+  * Register a dead tuple with an ongoing rewrite. Dead tuples are not
+  * copied to the new table, but we still make note of them so that we
+  * can release some resources earlier.
+  */
+ void
+ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
+ {
+     /* If we have already seen an earlier tuple in the update chain that
+      * points to this tuple, let's forget about that earlier tuple. It's
+      * in fact dead as well, our simple xmax < OldestXmin test in
+      * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It
+      * happens when xmin of a tuple is greater than xmax, which sounds
+      * counter-intuitive but is perfectly valid.
+      *
+      * We don't bother to try to detect the situation the other way
+      * round, when we encounter the dead tuple first and then the
+      * recently dead one that points to it. If that happens, we'll
+      * have some unmatched entries in the unresolvedCtids hash table
+      * at the end. That can happen anyway, because a vacuum might
+      * have removed the dead tuple in the chain before us.
+      */
+     hash_search(state->rs_unresolved_ctids,
+                 (void *) &(old_tuple->t_self),
+                 HASH_REMOVE, NULL);
+ }
+
+ /*
+  * Add a tuple to the new heap.
+  *
+  * Visibility information is copied from the original tuple.
+  *
+  * state        opaque state as returned by begin_heap_rewrite
+  * old_tuple    original tuple in the old heap
+  * new_tuple    new, rewritten tuple to be inserted to new heap
+  */
+ void
+ rewrite_heap_tuple(RewriteState state,
+                    HeapTuple old_tuple, HeapTuple new_tuple)
+ {
+     MemoryContext saved_cxt;
+     ItemPointerData old_tid;
+
+     saved_cxt = MemoryContextSwitchTo(state->rs_cxt);
+
+     /* Preserve the visibility information. Also preserve the HEAP_UPDATED
+      * flag; if the original tuple was an updated row version, so is the
+      * new one.
+      */
+     memcpy(&new_tuple->t_data->t_choice.t_heap,
+            &old_tuple->t_data->t_choice.t_heap,
+            sizeof(HeapTupleFields));
+
+     new_tuple->t_data->t_infomask &= ~(HEAP_XACT_MASK | HEAP_UPDATED);
+     new_tuple->t_data->t_infomask |=
+         old_tuple->t_data->t_infomask & (HEAP_XACT_MASK | HEAP_UPDATED);
+
+     /* Invalid ctid means that ctid should point to the tuple itself.
+      * We'll override it below if the tuple is part of an update chain.
+      */
+     ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
+
+     /* When we encounter a tuple that has been updated, check
+      * the old-to-new mapping hash table. If a match is found there,
+      * we've already copied the tuple that t_ctid points to, so we
+      * set the ctid of this tuple point to the new location, and insert
+      * the new tuple. Otherwise we store the tuple in the unresolved_ctids
+      * hash table and deal with it later, after inserting the tuple t_ctid
+      * points to.
+      */
+     if (!(old_tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
+                                        HEAP_IS_LOCKED)) &&
+         !(ItemPointerEquals(&(old_tuple->t_self),
+                             &(old_tuple->t_data->t_ctid))))
+     {
+         OldToNewMapping mapping;
+
+         mapping = (OldToNewMapping)
+             hash_search(state->rs_old_new_tid_map,
+                         (void *) &(old_tuple->t_data->t_ctid),
+                         HASH_FIND, NULL);
+
+         if (mapping != NULL)
+         {
+             /* We've already copied the tuple t_ctid points to.
+              * Use the new tid of that tuple */
+             new_tuple->t_data->t_ctid = mapping->new_tid;
+
+             hash_search(state->rs_old_new_tid_map,
+                         (void *) &(old_tuple->t_data->t_ctid),
+                         HASH_REMOVE, NULL);
+         }
+         else
+         {
+             /* We haven't seen the tuple t_ctid points to yet. Stash
+              * it into unresolved_ctids table and deal with it later.
+              */
+             UnresolvedCtid unresolved;
+             bool found;
+
+             unresolved = hash_search(state->rs_unresolved_ctids,
+                                      (void *) &(old_tuple->t_data->t_ctid),
+                                      HASH_ENTER, &found);
+             Assert(!found);
+
+             unresolved->old_tid = old_tuple->t_self;
+             unresolved->tuple = heap_copytuple(new_tuple);
+
+             MemoryContextSwitchTo(saved_cxt);
+             return;
+         }
+     }
+
+     old_tid = old_tuple->t_self;
+
+     for(;;)
+     {
+         ItemPointerData new_tid;
+
+         /* Insert the tuple */
+         raw_heap_insert(state, new_tuple);
+         new_tid = new_tuple->t_self;
+
+         /*
+          * When we encounter a tuple that's an updated version
+          * of a row, check if the previous tuple in the update chain
+          * is still visible to someone. The previous tuple's xmax
+          * is equal to this tuples xmin, so we can do the same check
+          * HeapTupleSatisfiesVacuum would do for the previous tuple,
+          * using xmin of this tuple. We know that xmin of this tuple
+          * committed, otherwise this tuple would be dead and the caller
+          * wouldn't have passed it to us in the first place.
+          */
+         if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
+             !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
+                                    state->rs_oldest_xmin))
+         {
+             /* The previous tuple in the update chain is RECENTLY_DEAD.
+              * Let's see if we've seen it already. */
+             UnresolvedCtid unresolved;
+
+             unresolved = hash_search(state->rs_unresolved_ctids,
+                                      (void *) &old_tid,
+                                      HASH_FIND, NULL);
+
+             if (unresolved != NULL)
+             {
+                 /* We have seen and memorized the previous tuple already.
+                  * Now that we know where we inserted the tuple its t_ctid
+                  * points to, fix its t_ctid and insert it to the new heap.
+                  */
+                 HeapTuple        prev_tup     = unresolved->tuple;
+                 ItemPointerData prev_old_tid = unresolved->old_tid;
+
+                 prev_tup->t_data->t_ctid = new_tid;
+
+                 hash_search(state->rs_unresolved_ctids,
+                             (void *) &old_tid,
+                             HASH_REMOVE, NULL);
+
+                 /* loop back to insert the previous tuple in the chain */
+                 old_tid = prev_old_tid;
+                 new_tuple = prev_tup;
+                 continue;
+             }
+             else
+             {
+                 /* Remember the new tid of this tuple. We'll use it to set
+                  * the ctid when we find the previous tuple in the chain
+                  */
+                 OldToNewMapping mapping;
+                 bool found;
+
+                 mapping = hash_search(state->rs_old_new_tid_map,
+                                       (void *) &old_tid,
+                                       HASH_ENTER, &found);
+                 Assert(!found);
+
+                 mapping->new_tid = new_tid;
+             }
+         }
+         break;
+     }
+
+     MemoryContextSwitchTo(saved_cxt);
+ }
+
+ /*
+  * Insert a tuple to the new relation.
+  *
+  * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
+  * tuple is invalid on entry, it's replaced with the new TID as well.
+  */
+ static void
+ raw_heap_insert(RewriteState state, HeapTuple new_tup)
+ {
+     BlockNumber        blkno    = InvalidBlockNumber;
+     Page            page    = NULL;
+     Size            pageFreeSpace, saveFreeSpace;
+     Size            len;
+     OffsetNumber    newoff;
+     HeapTuple        tup;
+
+     /*
+      * If the new tuple is too big for storage or contains already toasted
+      * out-of-line attributes from some other relation, invoke the toaster.
+      *
+      * Note: below this point, tup is the data we actually intend to store
+      * into the relation; new_tup is the caller's original untoasted data.
+      */
+     if (HeapTupleHasExternal(new_tup) || new_tup->t_len > TOAST_TUPLE_THRESHOLD)
+         tup = toast_insert_or_update(state->rs_new_rel, new_tup, NULL,
+                                      state->rs_use_wal);
+     else
+         tup = new_tup;
+
+     len = MAXALIGN(tup->t_len);        /* be conservative */
+
+     /*
+      * If we're gonna fail for oversize tuple, do it right away
+      */
+     if (len > MaxHeapTupleSize)
+         ereport(ERROR,
+                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                  errmsg("row is too big: size %lu, maximum size %lu",
+                         (unsigned long) len,
+                         (unsigned long) MaxHeapTupleSize)));
+
+     /* Compute desired extra freespace due to fillfactor option */
+     saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
+                                                    HEAP_DEFAULT_FILLFACTOR);
+
+     /* Now we can check to see if there's enough free space here. */
+     if (BufferIsValid(state->rs_buf))
+     {
+         blkno = BufferGetBlockNumber(state->rs_buf);
+         page = (Page) BufferGetPage(state->rs_buf);
+         pageFreeSpace = PageGetFreeSpace(page);
+
+         if (len + saveFreeSpace > pageFreeSpace)
+         {
+             if(state->rs_use_wal)
+                 log_newpage(&state->rs_new_rel->rd_node, blkno, page);
+
+             MarkBufferDirty(state->rs_buf);
+
+             UnlockReleaseBuffer(state->rs_buf);
+             state->rs_buf = InvalidBuffer;
+         }
+     }
+
+     if (!BufferIsValid(state->rs_buf))
+     {
+         /* There's no current buffer to insert to. Extend relation to get
+          * a new page.
+          */
+         state->rs_buf = ReadBuffer(state->rs_new_rel, P_NEW);
+         blkno = BufferGetBlockNumber(state->rs_buf);
+         page = (Page) BufferGetPage(state->rs_buf);
+
+         LockBuffer(state->rs_buf, BUFFER_LOCK_EXCLUSIVE);
+
+         /*
+          * We need to initialize the empty new page.  Double-check that it
+          * really is empty (this should never happen, but if it does we
+          * don't want to risk wiping out valid data).
+          */
+
+         if (!PageIsNew((PageHeader) page))
+             elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
+                  BufferGetBlockNumber(state->rs_buf),
+                  RelationGetRelationName(state->rs_new_rel));
+
+         PageInit(page, BufferGetPageSize(state->rs_buf), 0);
+     }
+
+     /* We now have a page pinned and locked to insert the new tuple to */
+
+     newoff = PageAddItem(page, (Item) tup->t_data, len,
+                          InvalidOffsetNumber, LP_USED);
+     if (newoff == InvalidOffsetNumber)
+         elog(ERROR, "failed to add tuple");
+
+     /* Update t_self to the actual position where it was stored */
+     ItemPointerSet(&(new_tup->t_self), blkno, newoff);
+
+     /* Insert the correct position into CTID of the stored tuple, too,
+      * if the caller didn't supply a valid CTID.
+      */
+     if(!ItemPointerIsValid(&new_tup->t_data->t_ctid))
+     {
+         ItemId            newitemid;
+         HeapTupleHeader onpage_tup;
+
+         newitemid = PageGetItemId(page, newoff);
+         onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
+
+         onpage_tup->t_ctid = new_tup->t_self;
+     }
+
+     /* If tup is a private copy, release it. */
+     if (tup != new_tup)
+         heap_freetuple(tup);
+ }
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.217
diff -c -r1.217 tablecmds.c
*** src/backend/commands/tablecmds.c    13 Mar 2007 00:33:39 -0000    1.217
--- src/backend/commands/tablecmds.c    20 Mar 2007 17:56:31 -0000
***************
*** 5813,5848 ****
      {
          smgrread(src, blkno, buf);

-         /* XLOG stuff */
          if (use_wal)
!         {
!             xl_heap_newpage xlrec;
!             XLogRecPtr    recptr;
!             XLogRecData rdata[2];
!
!             /* NO ELOG(ERROR) from here till newpage op is logged */
!             START_CRIT_SECTION();
!
!             xlrec.node = dst->smgr_rnode;
!             xlrec.blkno = blkno;
!
!             rdata[0].data = (char *) &xlrec;
!             rdata[0].len = SizeOfHeapNewpage;
!             rdata[0].buffer = InvalidBuffer;
!             rdata[0].next = &(rdata[1]);
!
!             rdata[1].data = (char *) page;
!             rdata[1].len = BLCKSZ;
!             rdata[1].buffer = InvalidBuffer;
!             rdata[1].next = NULL;
!
!             recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
!
!             PageSetLSN(page, recptr);
!             PageSetTLI(page, ThisTimeLineID);
!
!             END_CRIT_SECTION();
!         }

          /*
           * Now write the page.    We say isTemp = true even if it's not a temp
--- 5813,5820 ----
      {
          smgrread(src, blkno, buf);

          if (use_wal)
!             log_newpage(&dst->smgr_rnode, blkno, page);

          /*
           * Now write the page.    We say isTemp = true even if it's not a temp
Index: src/include/access/heapam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v
retrieving revision 1.120
diff -c -r1.120 heapam.h
*** src/include/access/heapam.h    25 Jan 2007 02:17:26 -0000    1.120
--- src/include/access/heapam.h    20 Mar 2007 17:55:30 -0000
***************
*** 197,202 ****
--- 197,203 ----
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
                                    TransactionId cutoff_xid,
                                    OffsetNumber *offsets, int offcnt);
+ extern XLogRecPtr log_newpage(RelFileNode *rnode, BlockNumber blk, Page page);

  /* in common/heaptuple.c */
  extern Size heap_compute_data_size(TupleDesc tupleDesc,
Index: src/include/commands/rewriteheap.h
===================================================================
RCS file: src/include/commands/rewriteheap.h
diff -N src/include/commands/rewriteheap.h
*** /dev/null    1 Jan 1970 00:00:00 -0000
--- src/include/commands/rewriteheap.h    20 Mar 2007 17:52:15 -0000
***************
*** 0 ****
--- 1,29 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.h
+  *      header file for heap rewrite support functions
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef REWRITE_HEAP_H
+ #define REWRITE_HEAP_H
+
+ #include "access/htup.h"
+ #include "utils/rel.h"
+
+ /* definition is private to rewriteheap.c */
+ typedef struct RewriteStateData *RewriteState;
+
+ extern RewriteState begin_heap_rewrite(Relation NewHeap,
+                                        TransactionId OldestXmin, bool use_wal);
+ extern void end_heap_rewrite(RewriteState state);
+ extern void rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
+ extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
+                                HeapTuple newTuple);
+
+ #endif   /* REWRITE_H */
Index: src/test/regress/expected/cluster.out
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/expected/cluster.out,v
retrieving revision 1.17
diff -c -r1.17 cluster.out
*** src/test/regress/expected/cluster.out    7 Jul 2005 20:40:01 -0000    1.17
--- src/test/regress/expected/cluster.out    20 Mar 2007 16:50:04 -0000
***************
*** 382,389 ****
--- 382,428 ----
   2
  (2 rows)

+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "clustertest_pkey" for table "clustertest"
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+ -- Test longer update chain
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+ SELECT * FROM clustertest;
+  key
+ -----
+   20
+   30
+  100
+   35
+   70
+ (5 rows)
+
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+  key
+ -----
+   20
+   30
+   35
+   70
+  100
+ (5 rows)
+
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;
Index: src/test/regress/sql/cluster.sql
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/sql/cluster.sql,v
retrieving revision 1.9
diff -c -r1.9 cluster.sql
*** src/test/regress/sql/cluster.sql    7 Jul 2005 20:40:02 -0000    1.9
--- src/test/regress/sql/cluster.sql    20 Mar 2007 17:52:48 -0000
***************
*** 153,160 ****
--- 153,187 ----
  CLUSTER clstr_1;
  SELECT * FROM clstr_1;

+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+
+ -- Test longer update chain
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+
+ SELECT * FROM clustertest;
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;

pgsql-patches by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: pgbench on mingw needs fflush
Next
From: Neil Conway
Date:
Subject: Re: patch adding new regexp functions