vacuum.c refactoring - Mailing list pgsql-patches

From Manfred Koizar
Subject vacuum.c refactoring
Date
Msg-id 4gstb0du485kb5dlcajluvs8h2vvakr8dh@email.aon.at
Whole thread Raw
Responses Re: vacuum.c refactoring  (Bruce Momjian <pgman@candle.pha.pa.us>)
List pgsql-patches
   . rename variables
     . cur_buffer -> dst_buffer
     . ToPage -> dst_page
     . cur_page -> dst_vacpage
   . move variable declarations into block where variable is used
   . various Asserts instead of elog(ERROR, ...)
   . extract functionality from repair_frag() into new routines
     . move_chain_tuple()
     . move_plain_tuple()
     . update_hint_bits()
   . create type ExecContext
   . add comments

This patch does not intend to change any behaviour.  It passes make
check, make installcheck and some manual tests.  It might be hard to
review, because some lines are affected by more than one change.  If
it's too much to swallow at once, I can provide it in smaller chunks ...

Servus
 Manfred
diff -Ncr ../base/src/backend/commands/vacuum.c src/backend/commands/vacuum.c
*** ../base/src/backend/commands/vacuum.c    Mon May 31 21:24:05 2004
--- src/backend/commands/vacuum.c    Wed Jun  2 21:46:59 2004
***************
*** 99,104 ****
--- 99,162 ----
      VTupleLink    vtlinks;
  } VRelStats;

+ /*----------------------------------------------------------------------
+  * ExecContext:
+  *
+  * As these variables always appear together, we put them into one struct
+  * and pull initialization and cleanup into separate routines.
+  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
+  * accurately:  It is *used* only in move_xxx_tuple(), but because this
+  * routine is called many times, we initialize the struct just once in
+  * repair_frag() and pass it on to move_xxx_tuple().
+  */
+ typedef struct ExecContextData
+ {
+     ResultRelInfo *resultRelInfo;
+     EState       *estate;
+     TupleTable    tupleTable;
+     TupleTableSlot *slot;
+ } ExecContextData;
+ typedef ExecContextData *ExecContext;
+
+ static void
+ ExecContext_Init(ExecContext ec, Relation rel)
+ {
+     TupleDesc    tupdesc = RelationGetDescr(rel);
+
+     /*
+      * We need a ResultRelInfo and an EState so we can use the regular
+      * executor's index-entry-making machinery.
+      */
+     ec->estate = CreateExecutorState();
+
+     ec->resultRelInfo = makeNode(ResultRelInfo);
+     ec->resultRelInfo->ri_RangeTableIndex = 1;        /* dummy */
+     ec->resultRelInfo->ri_RelationDesc = rel;
+     ec->resultRelInfo->ri_TrigDesc = NULL;    /* we don't fire triggers */
+
+     ExecOpenIndices(ec->resultRelInfo);
+
+     ec->estate->es_result_relations = ec->resultRelInfo;
+     ec->estate->es_num_result_relations = 1;
+     ec->estate->es_result_relation_info = ec->resultRelInfo;
+
+     /* Set up a dummy tuple table too */
+     ec->tupleTable = ExecCreateTupleTable(1);
+     ec->slot = ExecAllocTableSlot(ec->tupleTable);
+     ExecSetSlotDescriptor(ec->slot, tupdesc, false);
+ }
+
+ static void
+ ExecContext_Finish(ExecContext ec)
+ {
+     ExecDropTupleTable(ec->tupleTable, true);
+     ExecCloseIndices(ec->resultRelInfo);
+     FreeExecutorState(ec->estate);
+ }
+ /*
+  * End of ExecContext Implementation
+  *----------------------------------------------------------------------
+  */

  static MemoryContext vac_context = NULL;

***************
*** 122,127 ****
--- 180,196 ----
  static void repair_frag(VRelStats *vacrelstats, Relation onerel,
              VacPageList vacuum_pages, VacPageList fraged_pages,
              int nindexes, Relation *Irel);
+ static void move_chain_tuple(Relation rel,
+                      Buffer old_buf, Page old_page, HeapTuple old_tup,
+                      Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                      ExecContext ec, ItemPointer ctid, bool cleanVpd);
+ static void move_plain_tuple(Relation rel,
+                      Buffer old_buf, Page old_page, HeapTuple old_tup,
+                      Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                      ExecContext ec);
+ static void update_hint_bits(Relation rel, VacPageList fraged_pages,
+                     int num_fraged_pages, BlockNumber last_move_dest_block,
+                     int num_moved);
  static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
              VacPageList vacpagelist);
  static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
***************
*** 675,681 ****
  static void
  vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
  {
!     TransactionId myXID;
      Relation    relation;
      HeapScanDesc scan;
      HeapTuple    tuple;
--- 744,750 ----
  static void
  vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
  {
!     TransactionId myXID = GetCurrentTransactionId();
      Relation    relation;
      HeapScanDesc scan;
      HeapTuple    tuple;
***************
*** 683,689 ****
      bool        vacuumAlreadyWrapped = false;
      bool        frozenAlreadyWrapped = false;

-     myXID = GetCurrentTransactionId();

      relation = heap_openr(DatabaseRelationName, AccessShareLock);

--- 752,757 ----
***************
*** 1059,1075 ****
  {
      BlockNumber nblocks,
                  blkno;
-     ItemId        itemid;
-     Buffer        buf;
      HeapTupleData tuple;
-     OffsetNumber offnum,
-                 maxoff;
-     bool        pgchanged,
-                 tupgone,
-                 notup;
      char       *relname;
!     VacPage        vacpage,
!                 vacpagecopy;
      BlockNumber empty_pages,
                  empty_end_pages;
      double        num_tuples,
--- 1127,1135 ----
  {
      BlockNumber nblocks,
                  blkno;
      HeapTupleData tuple;
      char       *relname;
!     VacPage        vacpage;
      BlockNumber empty_pages,
                  empty_end_pages;
      double        num_tuples,
***************
*** 1080,1086 ****
                  usable_free_space;
      Size        min_tlen = MaxTupleSize;
      Size        max_tlen = 0;
-     int            i;
      bool        do_shrinking = true;
      VTupleLink    vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
      int            num_vtlinks = 0;
--- 1140,1145 ----
***************
*** 1113,1118 ****
--- 1172,1182 ----
                      tempPage = NULL;
          bool        do_reap,
                      do_frag;
+         Buffer        buf;
+         OffsetNumber offnum,
+                     maxoff;
+         bool        pgchanged,
+                     notup;

          vacuum_delay_point();

***************
*** 1125,1130 ****
--- 1189,1196 ----

          if (PageIsNew(page))
          {
+             VacPage    vacpagecopy;
+
              ereport(WARNING,
              (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
                      relname, blkno)));
***************
*** 1142,1147 ****
--- 1208,1215 ----

          if (PageIsEmpty(page))
          {
+             VacPage    vacpagecopy;
+
              vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
              free_space += vacpage->free;
              empty_pages++;
***************
*** 1161,1168 ****
               offnum = OffsetNumberNext(offnum))
          {
              uint16        sv_infomask;
!
!             itemid = PageGetItemId(page, offnum);

              /*
               * Collect un-used items too - it's possible to have indexes
--- 1229,1236 ----
               offnum = OffsetNumberNext(offnum))
          {
              uint16        sv_infomask;
!             ItemId        itemid = PageGetItemId(page, offnum);
!             bool        tupgone = false;

              /*
               * Collect un-used items too - it's possible to have indexes
***************
*** 1180,1186 ****
              tuple.t_len = ItemIdGetLength(itemid);
              ItemPointerSet(&(tuple.t_self), blkno, offnum);

-             tupgone = false;
              sv_infomask = tuple.t_data->t_infomask;

              switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
--- 1248,1253 ----
***************
*** 1269,1275 ****
                      do_shrinking = false;
                      break;
                  default:
!                     elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
                      break;
              }

--- 1336,1343 ----
                      do_shrinking = false;
                      break;
                  default:
!                     /* unexpected HeapTupleSatisfiesVacuum result */
!                     Assert(false);
                      break;
              }

***************
*** 1344,1350 ****

          if (do_reap || do_frag)
          {
!             vacpagecopy = copy_vac_page(vacpage);
              if (do_reap)
                  vpage_insert(vacuum_pages, vacpagecopy);
              if (do_frag)
--- 1412,1418 ----

          if (do_reap || do_frag)
          {
!             VacPage    vacpagecopy = copy_vac_page(vacpage);
              if (do_reap)
                  vpage_insert(vacuum_pages, vacpagecopy);
              if (do_frag)
***************
*** 1390,1395 ****
--- 1458,1465 ----
       */
      if (do_shrinking)
      {
+         int            i;
+
          Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
          fraged_pages->num_pages -= empty_end_pages;
          usable_free_space = 0;
***************
*** 1453,1528 ****
              VacPageList vacuum_pages, VacPageList fraged_pages,
              int nindexes, Relation *Irel)
  {
!     TransactionId myXID;
!     CommandId    myCID;
!     Buffer        buf,
!                 cur_buffer;
      BlockNumber nblocks,
                  blkno;
      BlockNumber last_move_dest_block = 0,
                  last_vacuum_block;
!     Page        page,
!                 ToPage = NULL;
!     OffsetNumber offnum,
!                 maxoff,
!                 newoff,
!                 max_offset;
!     ItemId        itemid,
!                 newitemid;
!     HeapTupleData tuple,
!                 newtup;
!     TupleDesc    tupdesc;
!     ResultRelInfo *resultRelInfo;
!     EState       *estate;
!     TupleTable    tupleTable;
!     TupleTableSlot *slot;
      VacPageListData Nvacpagelist;
!     VacPage        cur_page = NULL,
                  last_vacuum_page,
                  vacpage,
                 *curpage;
-     int            cur_item = 0;
      int            i;
!     Size        tuple_len;
!     int            num_moved,
                  num_fraged_pages,
                  vacuumed_pages;
!     int            checked_moved,
!                 num_tuples,
!                 keep_tuples = 0;
!     bool        isempty,
!                 dowrite,
!                 chain_tuple_moved;
      VacRUsage    ru0;

      vac_init_rusage(&ru0);

!     myXID = GetCurrentTransactionId();
!     myCID = GetCurrentCommandId();
!
!     tupdesc = RelationGetDescr(onerel);
!
!     /*
!      * We need a ResultRelInfo and an EState so we can use the regular
!      * executor's index-entry-making machinery.
!      */
!     estate = CreateExecutorState();
!
!     resultRelInfo = makeNode(ResultRelInfo);
!     resultRelInfo->ri_RangeTableIndex = 1;        /* dummy */
!     resultRelInfo->ri_RelationDesc = onerel;
!     resultRelInfo->ri_TrigDesc = NULL;    /* we don't fire triggers */
!
!     ExecOpenIndices(resultRelInfo);
!
!     estate->es_result_relations = resultRelInfo;
!     estate->es_num_result_relations = 1;
!     estate->es_result_relation_info = resultRelInfo;
!
!     /* Set up a dummy tuple table too */
!     tupleTable = ExecCreateTupleTable(1);
!     slot = ExecAllocTableSlot(tupleTable);
!     ExecSetSlotDescriptor(slot, tupdesc, false);

      Nvacpagelist.num_pages = 0;
      num_fraged_pages = fraged_pages->num_pages;
--- 1523,1551 ----
              VacPageList vacuum_pages, VacPageList fraged_pages,
              int nindexes, Relation *Irel)
  {
!     TransactionId myXID = GetCurrentTransactionId();
!     Buffer        dst_buffer = InvalidBuffer;
      BlockNumber nblocks,
                  blkno;
      BlockNumber last_move_dest_block = 0,
                  last_vacuum_block;
!     Page        dst_page = NULL;
!     ExecContextData    ec;
      VacPageListData Nvacpagelist;
!     VacPage        dst_vacpage = NULL,
                  last_vacuum_page,
                  vacpage,
                 *curpage;
      int            i;
!     int            num_moved = 0,
                  num_fraged_pages,
                  vacuumed_pages;
!     int            keep_tuples = 0;
      VacRUsage    ru0;

      vac_init_rusage(&ru0);

!     ExecContext_Init(&ec, onerel);

      Nvacpagelist.num_pages = 0;
      num_fraged_pages = fraged_pages->num_pages;
***************
*** 1539,1546 ****
          last_vacuum_page = NULL;
          last_vacuum_block = InvalidBlockNumber;
      }
-     cur_buffer = InvalidBuffer;
-     num_moved = 0;

      vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
      vacpage->offsets_used = vacpage->offsets_free = 0;
--- 1562,1567 ----
***************
*** 1560,1565 ****
--- 1581,1594 ----
           blkno > last_move_dest_block;
           blkno--)
      {
+         Buffer            buf;
+         Page            page;
+         OffsetNumber    offnum,
+                         maxoff;
+         bool            isempty,
+                         dowrite,
+                         chain_tuple_moved;
+
          vacuum_delay_point();

          /*
***************
*** 1635,1641 ****
               offnum <= maxoff;
               offnum = OffsetNumberNext(offnum))
          {
!             itemid = PageGetItemId(page, offnum);

              if (!ItemIdIsUsed(itemid))
                  continue;
--- 1664,1672 ----
               offnum <= maxoff;
               offnum = OffsetNumberNext(offnum))
          {
!             Size            tuple_len;
!             HeapTupleData    tuple;
!             ItemId            itemid = PageGetItemId(page, offnum);

              if (!ItemIdIsUsed(itemid))
                  continue;
***************
*** 1645,1689 ****
              tuple_len = tuple.t_len = ItemIdGetLength(itemid);
              ItemPointerSet(&(tuple.t_self), blkno, offnum);

              if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
              {
!                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
!                     elog(ERROR, "HEAP_MOVED_IN was not expected");

                  /*
                   * If this (chain) tuple is moved by me already then I
                   * have to check is it in vacpage or not - i.e. is it
                   * moved while cleaning this page or some previous one.
                   */
!                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
!                 {
!                     if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
!                         elog(ERROR, "invalid XVAC in tuple header");
!                     if (keep_tuples == 0)
!                         continue;
!                     if (chain_tuple_moved)        /* some chains was moved
!                                                  * while */
!                     {            /* cleaning this page */
!                         Assert(vacpage->offsets_free > 0);
!                         for (i = 0; i < vacpage->offsets_free; i++)
!                         {
!                             if (vacpage->offsets[i] == offnum)
!                                 break;
!                         }
!                         if (i >= vacpage->offsets_free) /* not found */
!                         {
!                             vacpage->offsets[vacpage->offsets_free++] = offnum;
!                             keep_tuples--;
!                         }
                      }
!                     else
                      {
                          vacpage->offsets[vacpage->offsets_free++] = offnum;
                          keep_tuples--;
                      }
-                     continue;
                  }
!                 elog(ERROR, "HEAP_MOVED_OFF was expected");
              }

              /*
--- 1676,1746 ----
              tuple_len = tuple.t_len = ItemIdGetLength(itemid);
              ItemPointerSet(&(tuple.t_self), blkno, offnum);

+             /*
+              * VACUUM FULL has an exclusive lock on the relation.  So
+              * normally no other transaction can have pending INSERTs or
+              * DELETEs in this relation.  A tuple is either
+              *   (a) a tuple in a system catalog, inserted or deleted by
+              *       a not yet committed transaction or
+              *   (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
+              *   (c) inserted by a committed xact (XMIN_COMMITTED) or
+              *   (d) moved by the currently running VACUUM.
+              * In case (a) we wouldn't be in repair_frag() at all.
+              * In case (b) we cannot be here, because scan_heap() has
+              * already marked the item as unused, see continue above.
+              * Case (c) is what normally is to be expected.
+              * Case (d) is only possible, if a whole tuple chain has been
+              * moved while processing this or a higher numbered block.
+              */
              if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
              {
!                 /*
!                  * There cannot be another concurrently running VACUUM.  If
!                  * the tuple had been moved in by a previous VACUUM, the
!                  * visibility check would have set XMIN_COMMITTED.  If the
!                  * tuple had been moved in by the currently running VACUUM,
!                  * the loop would have been terminated.  We had
!                  * elog(ERROR, ...) here, but as we are testing for a
!                  * can't-happen condition, Assert() seems more appropriate.
!                  */
!                 Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));

                  /*
                   * If this (chain) tuple is moved by me already then I
                   * have to check is it in vacpage or not - i.e. is it
                   * moved while cleaning this page or some previous one.
                   */
!                 Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
!                 /*
!                  * MOVED_OFF by another VACUUM would have caused the
!                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
!                  */
!                 Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
!
!                 /* Can't we Assert(keep_tuples > 0) here? */
!                 if (keep_tuples == 0)
!                     continue;
!                 if (chain_tuple_moved)        /* some chains was moved
!                                              * while */
!                 {            /* cleaning this page */
!                     Assert(vacpage->offsets_free > 0);
!                     for (i = 0; i < vacpage->offsets_free; i++)
!                     {
!                         if (vacpage->offsets[i] == offnum)
!                             break;
                      }
!                     if (i >= vacpage->offsets_free) /* not found */
                      {
                          vacpage->offsets[vacpage->offsets_free++] = offnum;
                          keep_tuples--;
                      }
                  }
!                 else
!                 {
!                     vacpage->offsets[vacpage->offsets_free++] = offnum;
!                     keep_tuples--;
!                 }
!                 continue;
              }

              /*
***************
*** 1716,1723 ****
                  Buffer        Cbuf = buf;
                  bool        freeCbuf = false;
                  bool        chain_move_failed = false;
-                 Page        Cpage;
-                 ItemId        Citemid;
                  ItemPointerData Ctid;
                  HeapTupleData tp = tuple;
                  Size        tlen = tuple_len;
--- 1773,1778 ----
***************
*** 1728,1737 ****
                  int            to_item = 0;
                  int            ti;

!                 if (cur_buffer != InvalidBuffer)
                  {
!                     WriteBuffer(cur_buffer);
!                     cur_buffer = InvalidBuffer;
                  }

                  /* Quick exit if we have no vtlinks to search in */
--- 1783,1792 ----
                  int            to_item = 0;
                  int            ti;

!                 if (dst_buffer != InvalidBuffer)
                  {
!                     WriteBuffer(dst_buffer);
!                     dst_buffer = InvalidBuffer;
                  }

                  /* Quick exit if we have no vtlinks to search in */
***************
*** 1754,1759 ****
--- 1809,1818 ----
                         !(ItemPointerEquals(&(tp.t_self),
                                             &(tp.t_data->t_ctid))))
                  {
+                     Page        Cpage;
+                     ItemId        Citemid;
+                     ItemPointerData Ctid;
+
                      Ctid = tp.t_data->t_ctid;
                      if (freeCbuf)
                          ReleaseBuffer(Cbuf);
***************
*** 1929,1940 ****
                  }

                  /*
!                  * Okay, move the whle tuple chain
                   */
                  ItemPointerSetInvalid(&Ctid);
                  for (ti = 0; ti < num_vtmove; ti++)
                  {
                      VacPage        destvacpage = vtmove[ti].vacpage;

                      /* Get page to move from */
                      tuple.t_self = vtmove[ti].tid;
--- 1988,2001 ----
                  }

                  /*
!                  * Okay, move the whole tuple chain
                   */
                  ItemPointerSetInvalid(&Ctid);
                  for (ti = 0; ti < num_vtmove; ti++)
                  {
                      VacPage        destvacpage = vtmove[ti].vacpage;
+                     Page        Cpage;
+                     ItemId        Citemid;

                      /* Get page to move from */
                      tuple.t_self = vtmove[ti].tid;
***************
*** 1942,1954 ****
                               ItemPointerGetBlockNumber(&(tuple.t_self)));

                      /* Get page to move to */
!                     cur_buffer = ReadBuffer(onerel, destvacpage->blkno);

!                     LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
!                     if (cur_buffer != Cbuf)
                          LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);

!                     ToPage = BufferGetPage(cur_buffer);
                      Cpage = BufferGetPage(Cbuf);

                      Citemid = PageGetItemId(Cpage,
--- 2003,2015 ----
                               ItemPointerGetBlockNumber(&(tuple.t_self)));

                      /* Get page to move to */
!                     dst_buffer = ReadBuffer(onerel, destvacpage->blkno);

!                     LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
!                     if (dst_buffer != Cbuf)
                          LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);

!                     dst_page = BufferGetPage(dst_buffer);
                      Cpage = BufferGetPage(Cbuf);

                      Citemid = PageGetItemId(Cpage,
***************
*** 1961,2081 ****
                       * make a copy of the source tuple, and then mark the
                       * source tuple MOVED_OFF.
                       */
!                     heap_copytuple_with_tuple(&tuple, &newtup);
!
!                     /*
!                      * register invalidation of source tuple in catcaches.
!                      */
!                     CacheInvalidateHeapTuple(onerel, &tuple);
!
!                     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
!                     START_CRIT_SECTION();
!
!                     tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                                   HEAP_XMIN_INVALID |
!                                                   HEAP_MOVED_IN);
!                     tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
!                     HeapTupleHeaderSetXvac(tuple.t_data, myXID);
!
!                     /*
!                      * If this page was not used before - clean it.
!                      *
!                      * NOTE: a nasty bug used to lurk here.  It is possible
!                      * for the source and destination pages to be the same
!                      * (since this tuple-chain member can be on a page
!                      * lower than the one we're currently processing in
!                      * the outer loop).  If that's true, then after
!                      * vacuum_page() the source tuple will have been
!                      * moved, and tuple.t_data will be pointing at
!                      * garbage.  Therefore we must do everything that uses
!                      * tuple.t_data BEFORE this step!!
!                      *
!                      * This path is different from the other callers of
!                      * vacuum_page, because we have already incremented
!                      * the vacpage's offsets_used field to account for the
!                      * tuple(s) we expect to move onto the page. Therefore
!                      * vacuum_page's check for offsets_used == 0 is wrong.
!                      * But since that's a good debugging check for all
!                      * other callers, we work around it here rather than
!                      * remove it.
!                      */
!                     if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
!                     {
!                         int            sv_offsets_used = destvacpage->offsets_used;
!
!                         destvacpage->offsets_used = 0;
!                         vacuum_page(onerel, cur_buffer, destvacpage);
!                         destvacpage->offsets_used = sv_offsets_used;
!                     }
!
!                     /*
!                      * Update the state of the copied tuple, and store it
!                      * on the destination page.
!                      */
!                     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                                    HEAP_XMIN_INVALID |
!                                                    HEAP_MOVED_OFF);
!                     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!                     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
!                     newoff = PageAddItem(ToPage,
!                                          (Item) newtup.t_data,
!                                          tuple_len,
!                                          InvalidOffsetNumber,
!                                          LP_USED);
!                     if (newoff == InvalidOffsetNumber)
!                     {
!                         elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
!                           (unsigned long) tuple_len, destvacpage->blkno);
!                     }
!                     newitemid = PageGetItemId(ToPage, newoff);
!                     pfree(newtup.t_data);
!                     newtup.t_datamcxt = NULL;
!                     newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
!                     ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
!
!                     /* XLOG stuff */
!                     if (!onerel->rd_istemp)
!                     {
!                         XLogRecPtr    recptr =
!                         log_heap_move(onerel, Cbuf, tuple.t_self,
!                                       cur_buffer, &newtup);
!
!                         if (Cbuf != cur_buffer)
!                         {
!                             PageSetLSN(Cpage, recptr);
!                             PageSetSUI(Cpage, ThisStartUpID);
!                         }
!                         PageSetLSN(ToPage, recptr);
!                         PageSetSUI(ToPage, ThisStartUpID);
!                     }
!                     else
!                     {
!                         /*
!                          * No XLOG record, but still need to flag that XID
!                          * exists on disk
!                          */
!                         MyXactMadeTempRelUpdate = true;
!                     }
!
!                     END_CRIT_SECTION();

                      if (destvacpage->blkno > last_move_dest_block)
                          last_move_dest_block = destvacpage->blkno;

                      /*
-                      * Set new tuple's t_ctid pointing to itself for last
-                      * tuple in chain, and to next tuple in chain
-                      * otherwise.
-                      */
-                     if (!ItemPointerIsValid(&Ctid))
-                         newtup.t_data->t_ctid = newtup.t_self;
-                     else
-                         newtup.t_data->t_ctid = Ctid;
-                     Ctid = newtup.t_self;
-
-                     num_moved++;
-
-                     /*
                       * Remember that we moved tuple from the current page
                       * (corresponding index tuple will be cleaned).
                       */
--- 2022,2036 ----
                       * make a copy of the source tuple, and then mark the
                       * source tuple MOVED_OFF.
                       */
!                     move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
!                                      dst_buffer, dst_page, destvacpage,
!                                      &ec, &Ctid, vtmove[ti].cleanVpd);

+                     num_moved++;
                      if (destvacpage->blkno > last_move_dest_block)
                          last_move_dest_block = destvacpage->blkno;

                      /*
                       * Remember that we moved tuple from the current page
                       * (corresponding index tuple will be cleaned).
                       */
***************
*** 2085,2107 ****
                      else
                          keep_tuples++;

!                     LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
!                     if (cur_buffer != Cbuf)
!                         LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
!
!                     /* Create index entries for the moved tuple */
!                     if (resultRelInfo->ri_NumIndices > 0)
!                     {
!                         ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
!                         ExecInsertIndexTuples(slot, &(newtup.t_self),
!                                               estate, true);
!                     }
!
!                     WriteBuffer(cur_buffer);
                      WriteBuffer(Cbuf);
                  }                /* end of move-the-tuple-chain loop */

!                 cur_buffer = InvalidBuffer;
                  pfree(vtmove);
                  chain_tuple_moved = true;

--- 2040,2050 ----
                      else
                          keep_tuples++;

!                     WriteBuffer(dst_buffer);
                      WriteBuffer(Cbuf);
                  }                /* end of move-the-tuple-chain loop */

!                 dst_buffer = InvalidBuffer;
                  pfree(vtmove);
                  chain_tuple_moved = true;

***************
*** 2110,2122 ****
              }                    /* end of is-tuple-in-chain test */

              /* try to find new page for this tuple */
!             if (cur_buffer == InvalidBuffer ||
!                 !enough_space(cur_page, tuple_len))
              {
!                 if (cur_buffer != InvalidBuffer)
                  {
!                     WriteBuffer(cur_buffer);
!                     cur_buffer = InvalidBuffer;
                  }
                  for (i = 0; i < num_fraged_pages; i++)
                  {
--- 2053,2065 ----
              }                    /* end of is-tuple-in-chain test */

              /* try to find new page for this tuple */
!             if (dst_buffer == InvalidBuffer ||
!                 !enough_space(dst_vacpage, tuple_len))
              {
!                 if (dst_buffer != InvalidBuffer)
                  {
!                     WriteBuffer(dst_buffer);
!                     dst_buffer = InvalidBuffer;
                  }
                  for (i = 0; i < num_fraged_pages; i++)
                  {
***************
*** 2125,2234 ****
                  }
                  if (i == num_fraged_pages)
                      break;        /* can't move item anywhere */
!                 cur_item = i;
!                 cur_page = fraged_pages->pagedesc[cur_item];
!                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
!                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
!                 ToPage = BufferGetPage(cur_buffer);
                  /* if this page was not used before - clean it */
!                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
!                     vacuum_page(onerel, cur_buffer, cur_page);
              }
              else
!                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);

              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

!             /* copy tuple */
!             heap_copytuple_with_tuple(&tuple, &newtup);
!
!             /*
!              * register invalidation of source tuple in catcaches.
!              *
!              * (Note: we do not need to register the copied tuple, because we
!              * are not changing the tuple contents and so there cannot be
!              * any need to flush negative catcache entries.)
!              */
!             CacheInvalidateHeapTuple(onerel, &tuple);

-             /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
-             START_CRIT_SECTION();

!             /*
!              * Mark new tuple as MOVED_IN by me.
!              */
!             newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                            HEAP_XMIN_INVALID |
!                                            HEAP_MOVED_OFF);
!             newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!             HeapTupleHeaderSetXvac(newtup.t_data, myXID);
!
!             /* add tuple to the page */
!             newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
!                                  InvalidOffsetNumber, LP_USED);
!             if (newoff == InvalidOffsetNumber)
!             {
!                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
!                      (unsigned long) tuple_len,
!                      cur_page->blkno, (unsigned long) cur_page->free,
!                      cur_page->offsets_used, cur_page->offsets_free);
!             }
!             newitemid = PageGetItemId(ToPage, newoff);
!             pfree(newtup.t_data);
!             newtup.t_datamcxt = NULL;
!             newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
!             ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
!             newtup.t_self = newtup.t_data->t_ctid;

              /*
!              * Mark old tuple as MOVED_OFF by me.
               */
-             tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                           HEAP_XMIN_INVALID |
-                                           HEAP_MOVED_IN);
-             tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
-             HeapTupleHeaderSetXvac(tuple.t_data, myXID);
-
-             /* XLOG stuff */
-             if (!onerel->rd_istemp)
-             {
-                 XLogRecPtr    recptr =
-                 log_heap_move(onerel, buf, tuple.t_self,
-                               cur_buffer, &newtup);
-
-                 PageSetLSN(page, recptr);
-                 PageSetSUI(page, ThisStartUpID);
-                 PageSetLSN(ToPage, recptr);
-                 PageSetSUI(ToPage, ThisStartUpID);
-             }
-             else
-             {
-                 /*
-                  * No XLOG record, but still need to flag that XID exists
-                  * on disk
-                  */
-                 MyXactMadeTempRelUpdate = true;
-             }
-
-             END_CRIT_SECTION();
-
-             cur_page->offsets_used++;
-             num_moved++;
-             cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
-             if (cur_page->blkno > last_move_dest_block)
-                 last_move_dest_block = cur_page->blkno;
-
              vacpage->offsets[vacpage->offsets_free++] = offnum;
-
-             LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
-             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-
-             /* insert index' tuples if needed */
-             if (resultRelInfo->ri_NumIndices > 0)
-             {
-                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
-                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
-             }
          }                        /* walk along page */

          /*
--- 2068,2099 ----
                  }
                  if (i == num_fraged_pages)
                      break;        /* can't move item anywhere */
!                 dst_vacpage = fraged_pages->pagedesc[i];
!                 dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
!                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
!                 dst_page = BufferGetPage(dst_buffer);
                  /* if this page was not used before - clean it */
!                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
!                     vacuum_page(onerel, dst_buffer, dst_vacpage);
              }
              else
!                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);

              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

!             move_plain_tuple(onerel, buf, page, &tuple,
!                              dst_buffer, dst_page, dst_vacpage, &ec);


!             num_moved++;
!             if (dst_vacpage->blkno > last_move_dest_block)
!                 last_move_dest_block = dst_vacpage->blkno;

              /*
!              * Remember that we moved tuple from the current page
!              * (corresponding index tuple will be cleaned).
               */
              vacpage->offsets[vacpage->offsets_free++] = offnum;
          }                        /* walk along page */

          /*
***************
*** 2249,2284 ****
                   off <= maxoff;
                   off = OffsetNumberNext(off))
              {
!                 itemid = PageGetItemId(page, off);
                  if (!ItemIdIsUsed(itemid))
                      continue;
!                 tuple.t_datamcxt = NULL;
!                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
!                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
                      continue;
!                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
!                     elog(ERROR, "HEAP_MOVED_IN was not expected");
!                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                  {
!                     if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
!                         elog(ERROR, "invalid XVAC in tuple header");
!                     /* some chains was moved while */
!                     if (chain_tuple_moved)
!                     {            /* cleaning this page */
!                         Assert(vacpage->offsets_free > 0);
!                         for (i = 0; i < vacpage->offsets_free; i++)
!                         {
!                             if (vacpage->offsets[i] == off)
!                                 break;
!                         }
!                         if (i >= vacpage->offsets_free) /* not found */
!                         {
!                             vacpage->offsets[vacpage->offsets_free++] = off;
!                             Assert(keep_tuples > 0);
!                             keep_tuples--;
!                         }
                      }
!                     else
                      {
                          vacpage->offsets[vacpage->offsets_free++] = off;
                          Assert(keep_tuples > 0);
--- 2114,2144 ----
                   off <= maxoff;
                   off = OffsetNumberNext(off))
              {
!                 ItemId    itemid = PageGetItemId(page, off);
!                 HeapTupleHeader    htup;
!
                  if (!ItemIdIsUsed(itemid))
                      continue;
!                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
!                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
                      continue;
!                 /*
!                 ** See comments in the walk-along-page loop above, why we
!                 ** have Asserts here instead of if (...) elog(ERROR).
!                 */
!                 Assert(!(htup->t_infomask & HEAP_MOVED_IN));
!                 Assert(htup->t_infomask & HEAP_MOVED_OFF);
!                 Assert(HeapTupleHeaderGetXvac(htup) == myXID);
!                 if (chain_tuple_moved)
                  {
!                     /* some chains was moved while cleaning this page */
!                     Assert(vacpage->offsets_free > 0);
!                     for (i = 0; i < vacpage->offsets_free; i++)
!                     {
!                         if (vacpage->offsets[i] == off)
!                             break;
                      }
!                     if (i >= vacpage->offsets_free) /* not found */
                      {
                          vacpage->offsets[vacpage->offsets_free++] = off;
                          Assert(keep_tuples > 0);
***************
*** 2286,2292 ****
                      }
                  }
                  else
!                     elog(ERROR, "HEAP_MOVED_OFF was expected");
              }
          }

--- 2146,2156 ----
                      }
                  }
                  else
!                 {
!                     vacpage->offsets[vacpage->offsets_free++] = off;
!                     Assert(keep_tuples > 0);
!                     keep_tuples--;
!                 }
              }
          }

***************
*** 2312,2321 ****

      blkno++;                    /* new number of blocks */

!     if (cur_buffer != InvalidBuffer)
      {
          Assert(num_moved > 0);
!         WriteBuffer(cur_buffer);
      }

      if (num_moved > 0)
--- 2176,2185 ----

      blkno++;                    /* new number of blocks */

!     if (dst_buffer != InvalidBuffer)
      {
          Assert(num_moved > 0);
!         WriteBuffer(dst_buffer);
      }

      if (num_moved > 0)
***************
*** 2348,2353 ****
--- 2212,2220 ----
          Assert((*curpage)->blkno < blkno);
          if ((*curpage)->offsets_used == 0)
          {
+             Buffer        buf;
+             Page        page;
+
              /* this page was not used as a move target, so must clean it */
              buf = ReadBuffer(onerel, (*curpage)->blkno);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
***************
*** 2363,2424 ****
       * Now scan all the pages that we moved tuples onto and update tuple
       * status bits.  This is not really necessary, but will save time for
       * future transactions examining these tuples.
-      *
-      * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
-      * pages that were move source pages but not move dest pages.  One
-      * also wonders whether it wouldn't be better to skip this step and
-      * let the tuple status updates happen someplace that's not holding an
-      * exclusive lock on the relation.
       */
!     checked_moved = 0;
!     for (i = 0, curpage = fraged_pages->pagedesc;
!          i < num_fraged_pages;
!          i++, curpage++)
!     {
!         vacuum_delay_point();
!
!         Assert((*curpage)->blkno < blkno);
!         if ((*curpage)->blkno > last_move_dest_block)
!             break;                /* no need to scan any further */
!         if ((*curpage)->offsets_used == 0)
!             continue;            /* this page was never used as a move dest */
!         buf = ReadBuffer(onerel, (*curpage)->blkno);
!         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
!         page = BufferGetPage(buf);
!         num_tuples = 0;
!         max_offset = PageGetMaxOffsetNumber(page);
!         for (newoff = FirstOffsetNumber;
!              newoff <= max_offset;
!              newoff = OffsetNumberNext(newoff))
!         {
!             itemid = PageGetItemId(page, newoff);
!             if (!ItemIdIsUsed(itemid))
!                 continue;
!             tuple.t_datamcxt = NULL;
!             tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
!             if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
!             {
!                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
!                     elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
!                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
!                     elog(ERROR, "invalid XVAC in tuple header");
!                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
!                 {
!                     tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
!                     tuple.t_data->t_infomask &= ~HEAP_MOVED;
!                     num_tuples++;
!                 }
!                 else
!                     tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
!             }
!         }
!         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
!         WriteBuffer(buf);
!         Assert((*curpage)->offsets_used == num_tuples);
!         checked_moved += num_tuples;
!     }
!     Assert(num_moved == checked_moved);
!
      /*
       * It'd be cleaner to make this report at the bottom of this routine,
       * but then the rusage would double-count the second pass of index
--- 2230,2239 ----
       * Now scan all the pages that we moved tuples onto and update tuple
       * status bits.  This is not really necessary, but will save time for
       * future transactions examining these tuples.
       */
!     update_hint_bits(onerel, fraged_pages, num_fraged_pages,
!                      last_move_dest_block, num_moved);
!
      /*
       * It'd be cleaner to make this report at the bottom of this routine,
       * but then the rusage would double-count the second pass of index
***************
*** 2455,2460 ****
--- 2270,2282 ----
                  *vpleft = *vpright;
                  *vpright = vpsave;
              }
+             /*
+              * keep_tuples is the number of tuples that have been moved
+              * off a page during chain moves but not been scanned over
+              * subsequently.  The tuple ids of these tuples are not
+              * recorded as free offsets for any VacPage, so they will not
+              * be cleared from the indexes.
+              */
              Assert(keep_tuples >= 0);
              for (i = 0; i < nindexes; i++)
                  vacuum_index(&Nvacpagelist, Irel[i],
***************
*** 2465,2500 ****
          if (vacpage->blkno == (blkno - 1) &&
              vacpage->offsets_free > 0)
          {
!             OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
!             int            uncnt;

              buf = ReadBuffer(onerel, vacpage->blkno);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              page = BufferGetPage(buf);
-             num_tuples = 0;
              maxoff = PageGetMaxOffsetNumber(page);
              for (offnum = FirstOffsetNumber;
                   offnum <= maxoff;
                   offnum = OffsetNumberNext(offnum))
              {
!                 itemid = PageGetItemId(page, offnum);
                  if (!ItemIdIsUsed(itemid))
                      continue;
!                 tuple.t_datamcxt = NULL;
!                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);

!                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
!                 {
!                     if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
!                     {
!                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
!                             elog(ERROR, "invalid XVAC in tuple header");
!                         itemid->lp_flags &= ~LP_USED;
!                         num_tuples++;
!                     }
!                     else
!                         elog(ERROR, "HEAP_MOVED_OFF was expected");
!                 }

              }
              Assert(vacpage->offsets_free == num_tuples);
--- 2287,2327 ----
          if (vacpage->blkno == (blkno - 1) &&
              vacpage->offsets_free > 0)
          {
!             Buffer            buf;
!             Page            page;
!             OffsetNumber    unused[BLCKSZ / sizeof(OffsetNumber)];
!             OffsetNumber    offnum,
!                             maxoff;
!             int                uncnt;
!             int                num_tuples = 0;

              buf = ReadBuffer(onerel, vacpage->blkno);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              page = BufferGetPage(buf);
              maxoff = PageGetMaxOffsetNumber(page);
              for (offnum = FirstOffsetNumber;
                   offnum <= maxoff;
                   offnum = OffsetNumberNext(offnum))
              {
!                 ItemId    itemid = PageGetItemId(page, offnum);
!                 HeapTupleHeader htup;
!
                  if (!ItemIdIsUsed(itemid))
                      continue;
!                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
!                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
!                     continue;

!                 /*
!                 ** See comments in the walk-along-page loop above, why we
!                 ** have Asserts here instead of if (...) elog(ERROR).
!                 */
!                 Assert(!(htup->t_infomask & HEAP_MOVED_IN));
!                 Assert(htup->t_infomask & HEAP_MOVED_OFF);
!                 Assert(HeapTupleHeaderGetXvac(htup) == myXID);
!
!                 itemid->lp_flags &= ~LP_USED;
!                 num_tuples++;

              }
              Assert(vacpage->offsets_free == num_tuples);
***************
*** 2554,2564 ****
      if (vacrelstats->vtlinks != NULL)
          pfree(vacrelstats->vtlinks);

!     ExecDropTupleTable(tupleTable, true);

!     ExecCloseIndices(resultRelInfo);

!     FreeExecutorState(estate);
  }

  /*
--- 2381,2717 ----
      if (vacrelstats->vtlinks != NULL)
          pfree(vacrelstats->vtlinks);

!     ExecContext_Finish(&ec);
! }
!
! /*
!  *    move_chain_tuple() -- move one tuple that is part of a tuple chain
!  *
!  *        This routine moves old_tup from old_page to dst_page.
!  *        old_page and dst_page might be the same page.
!  *        On entry old_buf and dst_buf are locked exclusively, both locks (or
!  *        the single lock, if this is a intra-page-move) are released before
!  *        exit.
!  *
!  *        Yes, a routine with ten parameters is ugly, but it's still better
!  *        than having these 120 lines of code in repair_frag() which is
!  *        already too long and almost unreadable.
!  */
! static void
! move_chain_tuple(Relation rel,
!                  Buffer old_buf, Page old_page, HeapTuple old_tup,
!                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
!                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
! {
!     TransactionId myXID = GetCurrentTransactionId();
!     HeapTupleData    newtup;
!     OffsetNumber    newoff;
!     ItemId            newitemid;
!     Size            tuple_len = old_tup->t_len;
!
!     heap_copytuple_with_tuple(old_tup, &newtup);
!
!     /*
!      * register invalidation of source tuple in catcaches.
!      */
!     CacheInvalidateHeapTuple(rel, old_tup);
!
!     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
!     START_CRIT_SECTION();
!
!     old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                   HEAP_XMIN_INVALID |
!                                   HEAP_MOVED_IN);
!     old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
!     HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
!
!     /*
!      * If this page was not used before - clean it.
!      *
!      * NOTE: a nasty bug used to lurk here.  It is possible
!      * for the source and destination pages to be the same
!      * (since this tuple-chain member can be on a page
!      * lower than the one we're currently processing in
!      * the outer loop).  If that's true, then after
!      * vacuum_page() the source tuple will have been
!      * moved, and tuple.t_data will be pointing at
!      * garbage.  Therefore we must do everything that uses
!      * old_tup->t_data BEFORE this step!!
!      *
!      * This path is different from the other callers of
!      * vacuum_page, because we have already incremented
!      * the vacpage's offsets_used field to account for the
!      * tuple(s) we expect to move onto the page. Therefore
!      * vacuum_page's check for offsets_used == 0 is wrong.
!      * But since that's a good debugging check for all
!      * other callers, we work around it here rather than
!      * remove it.
!      */
!     if (!PageIsEmpty(dst_page) && cleanVpd)
!     {
!         int        sv_offsets_used = dst_vacpage->offsets_used;
!
!         dst_vacpage->offsets_used = 0;
!         vacuum_page(rel, dst_buf, dst_vacpage);
!         dst_vacpage->offsets_used = sv_offsets_used;
!     }

!     /*
!      * Update the state of the copied tuple, and store it
!      * on the destination page.
!      */
!     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                    HEAP_XMIN_INVALID |
!                                    HEAP_MOVED_OFF);
!     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
!     newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
!                          InvalidOffsetNumber, LP_USED);
!     if (newoff == InvalidOffsetNumber)
!     {
!         elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
!           (unsigned long) tuple_len, dst_vacpage->blkno);
!     }
!     newitemid = PageGetItemId(dst_page, newoff);
!     pfree(newtup.t_data);
!     newtup.t_datamcxt = NULL;
!     newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
!     ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
!
!     /* XLOG stuff */
!     if (!rel->rd_istemp)
!     {
!         XLogRecPtr    recptr = log_heap_move(rel, old_buf, old_tup->t_self,
!                                            dst_buf, &newtup);
!
!         if (old_buf != dst_buf)
!         {
!             PageSetLSN(old_page, recptr);
!             PageSetSUI(old_page, ThisStartUpID);
!         }
!         PageSetLSN(dst_page, recptr);
!         PageSetSUI(dst_page, ThisStartUpID);
!     }
!     else
!     {
!         /*
!          * No XLOG record, but still need to flag that XID
!          * exists on disk
!          */
!         MyXactMadeTempRelUpdate = true;
!     }
!
!     END_CRIT_SECTION();
!
!     /*
!      * Set new tuple's t_ctid pointing to itself for last
!      * tuple in chain, and to next tuple in chain
!      * otherwise.
!      */
!     /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
!     if (!ItemPointerIsValid(ctid))
!         newtup.t_data->t_ctid = newtup.t_self;
!     else
!         newtup.t_data->t_ctid = *ctid;
!     *ctid = newtup.t_self;

!     LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
!     if (dst_buf != old_buf)
!         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
!
!     /* Create index entries for the moved tuple */
!     if (ec->resultRelInfo->ri_NumIndices > 0)
!     {
!         ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
!         ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
!     }
! }
!
! /*
!  *    move_plain_tuple() -- move one tuple that is not part of a chain
!  *
!  *        This routine moves old_tup from old_page to dst_page.
!  *        On entry old_buf and dst_buf are locked exclusively, both locks are
!  *        released before exit.
!  *
!  *        Yes, a routine with eight parameters is ugly, but it's still better
!  *        than having these 90 lines of code in repair_frag() which is already
!  *        too long and almost unreadable.
!  */
! static void
! move_plain_tuple(Relation rel,
!                  Buffer old_buf, Page old_page, HeapTuple old_tup,
!                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
!                  ExecContext ec)
! {
!     TransactionId myXID = GetCurrentTransactionId();
!     HeapTupleData    newtup;
!     OffsetNumber    newoff;
!     ItemId            newitemid;
!     Size            tuple_len = old_tup->t_len;
!
!     /* copy tuple */
!     heap_copytuple_with_tuple(old_tup, &newtup);
!
!     /*
!      * register invalidation of source tuple in catcaches.
!      *
!      * (Note: we do not need to register the copied tuple, because we
!      * are not changing the tuple contents and so there cannot be
!      * any need to flush negative catcache entries.)
!      */
!     CacheInvalidateHeapTuple(rel, old_tup);
!
!     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
!     START_CRIT_SECTION();
!
!     /*
!      * Mark new tuple as MOVED_IN by me.
!      */
!     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                    HEAP_XMIN_INVALID |
!                                    HEAP_MOVED_OFF);
!     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
!     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
!
!     /* add tuple to the page */
!     newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
!                          InvalidOffsetNumber, LP_USED);
!     if (newoff == InvalidOffsetNumber)
!     {
!         elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
!              (unsigned long) tuple_len,
!              dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
!              dst_vacpage->offsets_used, dst_vacpage->offsets_free);
!     }
!     newitemid = PageGetItemId(dst_page, newoff);
!     pfree(newtup.t_data);
!     newtup.t_datamcxt = NULL;
!     newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
!     ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
!     newtup.t_self = newtup.t_data->t_ctid;
!
!     /*
!      * Mark old tuple as MOVED_OFF by me.
!      */
!     old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
!                                   HEAP_XMIN_INVALID |
!                                   HEAP_MOVED_IN);
!     old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
!     HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
!
!     /* XLOG stuff */
!     if (!rel->rd_istemp)
!     {
!         XLogRecPtr    recptr = log_heap_move(rel, old_buf, old_tup->t_self,
!                                            dst_buf, &newtup);
!
!         PageSetLSN(old_page, recptr);
!         PageSetSUI(old_page, ThisStartUpID);
!         PageSetLSN(dst_page, recptr);
!         PageSetSUI(dst_page, ThisStartUpID);
!     }
!     else
!     {
!         /*
!          * No XLOG record, but still need to flag that XID exists
!          * on disk
!          */
!         MyXactMadeTempRelUpdate = true;
!     }
!
!     END_CRIT_SECTION();
!
!     dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
!                         ((PageHeader) dst_page)->pd_lower;
!     LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
!     LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
!
!     dst_vacpage->offsets_used++;
!
!     /* insert index' tuples if needed */
!     if (ec->resultRelInfo->ri_NumIndices > 0)
!     {
!         ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
!         ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
!     }
! }
!
! /*
!  *    update_hint_bits() -- update hint bits in destination pages
!  *
!  * Scan all the pages that we moved tuples onto and update tuple
!  * status bits.  This is not really necessary, but will save time for
!  * future transactions examining these tuples.
!  *
!  * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
!  * pages that were move source pages but not move dest pages.  One
!  * also wonders whether it wouldn't be better to skip this step and
!  * let the tuple status updates happen someplace that's not holding an
!  * exclusive lock on the relation.
!  */
! static void
! update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
!                  BlockNumber last_move_dest_block, int num_moved)
! {
!     int            checked_moved = 0;
!     int            i;
!     VacPage       *curpage;
!
!     for (i = 0, curpage = fraged_pages->pagedesc;
!          i < num_fraged_pages;
!          i++, curpage++)
!     {
!         Buffer            buf;
!         Page            page;
!         OffsetNumber    max_offset;
!         OffsetNumber    off;
!         int                num_tuples = 0;
!
!         vacuum_delay_point();
!
!         if ((*curpage)->blkno > last_move_dest_block)
!             break;                /* no need to scan any further */
!         if ((*curpage)->offsets_used == 0)
!             continue;            /* this page was never used as a move dest */
!         buf = ReadBuffer(rel, (*curpage)->blkno);
!         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
!         page = BufferGetPage(buf);
!         max_offset = PageGetMaxOffsetNumber(page);
!         for (off = FirstOffsetNumber;
!              off <= max_offset;
!              off = OffsetNumberNext(off))
!         {
!             ItemId    itemid = PageGetItemId(page, off);
!             HeapTupleHeader    htup;
!
!             if (!ItemIdIsUsed(itemid))
!                 continue;
!             htup = (HeapTupleHeader) PageGetItem(page, itemid);
!             if (htup->t_infomask & HEAP_XMIN_COMMITTED)
!                 continue;
!             /*
!              * See comments in the walk-along-page loop above, why we
!              * have Asserts here instead of if (...) elog(ERROR).  The
!              * difference here is that we may see MOVED_IN.
!              */
!             Assert(htup->t_infomask & HEAP_MOVED);
!             Assert(HeapTupleHeaderGetXvac(htup) == GetCurrentTransactionId());
!             if (htup->t_infomask & HEAP_MOVED_IN)
!             {
!                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
!                 htup->t_infomask &= ~HEAP_MOVED;
!                 num_tuples++;
!             }
!             else
!                 htup->t_infomask |= HEAP_XMIN_INVALID;
!         }
!         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
!         WriteBuffer(buf);
!         Assert((*curpage)->offsets_used == num_tuples);
!         checked_moved += num_tuples;
!     }
!     Assert(num_moved == checked_moved);
  }

  /*

pgsql-patches by date:

Previous
From: Andreas Pflug
Date:
Subject: Re: Compiling libpq with VisualC
Next
From: Manfred Koizar
Date:
Subject: Stylistic changes in bufmgr.c