Re: vacuum.c refactoring - Mailing list pgsql-patches

From Bruce Momjian
Subject Re: vacuum.c refactoring
Date
Msg-id 200406081359.i58Dxf701152@candle.pha.pa.us
Whole thread Raw
In response to vacuum.c refactoring  (Manfred Koizar <mkoi-pg@aon.at>)
List pgsql-patches
Patch applied.  Thanks.

---------------------------------------------------------------------------


Manfred Koizar wrote:
>    . rename variables
>      . cur_buffer -> dst_buffer
>      . ToPage -> dst_page
>      . cur_page -> dst_vacpage
>    . move variable declarations into block where variable is used
>    . various Asserts instead of elog(ERROR, ...)
>    . extract functionality from repair_frag() into new routines
>      . move_chain_tuple()
>      . move_plain_tuple()
>      . update_hint_bits()
>    . create type ExecContext
>    . add comments
>
> This patch does not intend to change any behaviour.  It passes make
> check, make installcheck and some manual tests.  It might be hard to
> review, because some lines are affected by more than one change.  If
> it's too much to swallow at once, I can provide it in smaller chunks ...
>
> Servus
>  Manfred

> diff -Ncr ../base/src/backend/commands/vacuum.c src/backend/commands/vacuum.c
> *** ../base/src/backend/commands/vacuum.c    Mon May 31 21:24:05 2004
> --- src/backend/commands/vacuum.c    Wed Jun  2 21:46:59 2004
> ***************
> *** 99,104 ****
> --- 99,162 ----
>       VTupleLink    vtlinks;
>   } VRelStats;
>
> + /*----------------------------------------------------------------------
> +  * ExecContext:
> +  *
> +  * As these variables always appear together, we put them into one struct
> +  * and pull initialization and cleanup into separate routines.
> +  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
> +  * accurately:  It is *used* only in move_xxx_tuple(), but because this
> +  * routine is called many times, we initialize the struct just once in
> +  * repair_frag() and pass it on to move_xxx_tuple().
> +  */
> + typedef struct ExecContextData
> + {
> +     ResultRelInfo *resultRelInfo;
> +     EState       *estate;
> +     TupleTable    tupleTable;
> +     TupleTableSlot *slot;
> + } ExecContextData;
> + typedef ExecContextData *ExecContext;
> +
> + static void
> + ExecContext_Init(ExecContext ec, Relation rel)
> + {
> +     TupleDesc    tupdesc = RelationGetDescr(rel);
> +
> +     /*
> +      * We need a ResultRelInfo and an EState so we can use the regular
> +      * executor's index-entry-making machinery.
> +      */
> +     ec->estate = CreateExecutorState();
> +
> +     ec->resultRelInfo = makeNode(ResultRelInfo);
> +     ec->resultRelInfo->ri_RangeTableIndex = 1;        /* dummy */
> +     ec->resultRelInfo->ri_RelationDesc = rel;
> +     ec->resultRelInfo->ri_TrigDesc = NULL;    /* we don't fire triggers */
> +
> +     ExecOpenIndices(ec->resultRelInfo);
> +
> +     ec->estate->es_result_relations = ec->resultRelInfo;
> +     ec->estate->es_num_result_relations = 1;
> +     ec->estate->es_result_relation_info = ec->resultRelInfo;
> +
> +     /* Set up a dummy tuple table too */
> +     ec->tupleTable = ExecCreateTupleTable(1);
> +     ec->slot = ExecAllocTableSlot(ec->tupleTable);
> +     ExecSetSlotDescriptor(ec->slot, tupdesc, false);
> + }
> +
> + static void
> + ExecContext_Finish(ExecContext ec)
> + {
> +     ExecDropTupleTable(ec->tupleTable, true);
> +     ExecCloseIndices(ec->resultRelInfo);
> +     FreeExecutorState(ec->estate);
> + }
> + /*
> +  * End of ExecContext Implementation
> +  *----------------------------------------------------------------------
> +  */
>
>   static MemoryContext vac_context = NULL;
>
> ***************
> *** 122,127 ****
> --- 180,196 ----
>   static void repair_frag(VRelStats *vacrelstats, Relation onerel,
>               VacPageList vacuum_pages, VacPageList fraged_pages,
>               int nindexes, Relation *Irel);
> + static void move_chain_tuple(Relation rel,
> +                      Buffer old_buf, Page old_page, HeapTuple old_tup,
> +                      Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> +                      ExecContext ec, ItemPointer ctid, bool cleanVpd);
> + static void move_plain_tuple(Relation rel,
> +                      Buffer old_buf, Page old_page, HeapTuple old_tup,
> +                      Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> +                      ExecContext ec);
> + static void update_hint_bits(Relation rel, VacPageList fraged_pages,
> +                     int num_fraged_pages, BlockNumber last_move_dest_block,
> +                     int num_moved);
>   static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
>               VacPageList vacpagelist);
>   static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
> ***************
> *** 675,681 ****
>   static void
>   vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
>   {
> !     TransactionId myXID;
>       Relation    relation;
>       HeapScanDesc scan;
>       HeapTuple    tuple;
> --- 744,750 ----
>   static void
>   vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
>   {
> !     TransactionId myXID = GetCurrentTransactionId();
>       Relation    relation;
>       HeapScanDesc scan;
>       HeapTuple    tuple;
> ***************
> *** 683,689 ****
>       bool        vacuumAlreadyWrapped = false;
>       bool        frozenAlreadyWrapped = false;
>
> -     myXID = GetCurrentTransactionId();
>
>       relation = heap_openr(DatabaseRelationName, AccessShareLock);
>
> --- 752,757 ----
> ***************
> *** 1059,1075 ****
>   {
>       BlockNumber nblocks,
>                   blkno;
> -     ItemId        itemid;
> -     Buffer        buf;
>       HeapTupleData tuple;
> -     OffsetNumber offnum,
> -                 maxoff;
> -     bool        pgchanged,
> -                 tupgone,
> -                 notup;
>       char       *relname;
> !     VacPage        vacpage,
> !                 vacpagecopy;
>       BlockNumber empty_pages,
>                   empty_end_pages;
>       double        num_tuples,
> --- 1127,1135 ----
>   {
>       BlockNumber nblocks,
>                   blkno;
>       HeapTupleData tuple;
>       char       *relname;
> !     VacPage        vacpage;
>       BlockNumber empty_pages,
>                   empty_end_pages;
>       double        num_tuples,
> ***************
> *** 1080,1086 ****
>                   usable_free_space;
>       Size        min_tlen = MaxTupleSize;
>       Size        max_tlen = 0;
> -     int            i;
>       bool        do_shrinking = true;
>       VTupleLink    vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
>       int            num_vtlinks = 0;
> --- 1140,1145 ----
> ***************
> *** 1113,1118 ****
> --- 1172,1182 ----
>                       tempPage = NULL;
>           bool        do_reap,
>                       do_frag;
> +         Buffer        buf;
> +         OffsetNumber offnum,
> +                     maxoff;
> +         bool        pgchanged,
> +                     notup;
>
>           vacuum_delay_point();
>
> ***************
> *** 1125,1130 ****
> --- 1189,1196 ----
>
>           if (PageIsNew(page))
>           {
> +             VacPage    vacpagecopy;
> +
>               ereport(WARNING,
>               (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
>                       relname, blkno)));
> ***************
> *** 1142,1147 ****
> --- 1208,1215 ----
>
>           if (PageIsEmpty(page))
>           {
> +             VacPage    vacpagecopy;
> +
>               vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
>               free_space += vacpage->free;
>               empty_pages++;
> ***************
> *** 1161,1168 ****
>                offnum = OffsetNumberNext(offnum))
>           {
>               uint16        sv_infomask;
> !
> !             itemid = PageGetItemId(page, offnum);
>
>               /*
>                * Collect un-used items too - it's possible to have indexes
> --- 1229,1236 ----
>                offnum = OffsetNumberNext(offnum))
>           {
>               uint16        sv_infomask;
> !             ItemId        itemid = PageGetItemId(page, offnum);
> !             bool        tupgone = false;
>
>               /*
>                * Collect un-used items too - it's possible to have indexes
> ***************
> *** 1180,1186 ****
>               tuple.t_len = ItemIdGetLength(itemid);
>               ItemPointerSet(&(tuple.t_self), blkno, offnum);
>
> -             tupgone = false;
>               sv_infomask = tuple.t_data->t_infomask;
>
>               switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
> --- 1248,1253 ----
> ***************
> *** 1269,1275 ****
>                       do_shrinking = false;
>                       break;
>                   default:
> !                     elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
>                       break;
>               }
>
> --- 1336,1343 ----
>                       do_shrinking = false;
>                       break;
>                   default:
> !                     /* unexpected HeapTupleSatisfiesVacuum result */
> !                     Assert(false);
>                       break;
>               }
>
> ***************
> *** 1344,1350 ****
>
>           if (do_reap || do_frag)
>           {
> !             vacpagecopy = copy_vac_page(vacpage);
>               if (do_reap)
>                   vpage_insert(vacuum_pages, vacpagecopy);
>               if (do_frag)
> --- 1412,1418 ----
>
>           if (do_reap || do_frag)
>           {
> !             VacPage    vacpagecopy = copy_vac_page(vacpage);
>               if (do_reap)
>                   vpage_insert(vacuum_pages, vacpagecopy);
>               if (do_frag)
> ***************
> *** 1390,1395 ****
> --- 1458,1465 ----
>        */
>       if (do_shrinking)
>       {
> +         int            i;
> +
>           Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
>           fraged_pages->num_pages -= empty_end_pages;
>           usable_free_space = 0;
> ***************
> *** 1453,1528 ****
>               VacPageList vacuum_pages, VacPageList fraged_pages,
>               int nindexes, Relation *Irel)
>   {
> !     TransactionId myXID;
> !     CommandId    myCID;
> !     Buffer        buf,
> !                 cur_buffer;
>       BlockNumber nblocks,
>                   blkno;
>       BlockNumber last_move_dest_block = 0,
>                   last_vacuum_block;
> !     Page        page,
> !                 ToPage = NULL;
> !     OffsetNumber offnum,
> !                 maxoff,
> !                 newoff,
> !                 max_offset;
> !     ItemId        itemid,
> !                 newitemid;
> !     HeapTupleData tuple,
> !                 newtup;
> !     TupleDesc    tupdesc;
> !     ResultRelInfo *resultRelInfo;
> !     EState       *estate;
> !     TupleTable    tupleTable;
> !     TupleTableSlot *slot;
>       VacPageListData Nvacpagelist;
> !     VacPage        cur_page = NULL,
>                   last_vacuum_page,
>                   vacpage,
>                  *curpage;
> -     int            cur_item = 0;
>       int            i;
> !     Size        tuple_len;
> !     int            num_moved,
>                   num_fraged_pages,
>                   vacuumed_pages;
> !     int            checked_moved,
> !                 num_tuples,
> !                 keep_tuples = 0;
> !     bool        isempty,
> !                 dowrite,
> !                 chain_tuple_moved;
>       VacRUsage    ru0;
>
>       vac_init_rusage(&ru0);
>
> !     myXID = GetCurrentTransactionId();
> !     myCID = GetCurrentCommandId();
> !
> !     tupdesc = RelationGetDescr(onerel);
> !
> !     /*
> !      * We need a ResultRelInfo and an EState so we can use the regular
> !      * executor's index-entry-making machinery.
> !      */
> !     estate = CreateExecutorState();
> !
> !     resultRelInfo = makeNode(ResultRelInfo);
> !     resultRelInfo->ri_RangeTableIndex = 1;        /* dummy */
> !     resultRelInfo->ri_RelationDesc = onerel;
> !     resultRelInfo->ri_TrigDesc = NULL;    /* we don't fire triggers */
> !
> !     ExecOpenIndices(resultRelInfo);
> !
> !     estate->es_result_relations = resultRelInfo;
> !     estate->es_num_result_relations = 1;
> !     estate->es_result_relation_info = resultRelInfo;
> !
> !     /* Set up a dummy tuple table too */
> !     tupleTable = ExecCreateTupleTable(1);
> !     slot = ExecAllocTableSlot(tupleTable);
> !     ExecSetSlotDescriptor(slot, tupdesc, false);
>
>       Nvacpagelist.num_pages = 0;
>       num_fraged_pages = fraged_pages->num_pages;
> --- 1523,1551 ----
>               VacPageList vacuum_pages, VacPageList fraged_pages,
>               int nindexes, Relation *Irel)
>   {
> !     TransactionId myXID = GetCurrentTransactionId();
> !     Buffer        dst_buffer = InvalidBuffer;
>       BlockNumber nblocks,
>                   blkno;
>       BlockNumber last_move_dest_block = 0,
>                   last_vacuum_block;
> !     Page        dst_page = NULL;
> !     ExecContextData    ec;
>       VacPageListData Nvacpagelist;
> !     VacPage        dst_vacpage = NULL,
>                   last_vacuum_page,
>                   vacpage,
>                  *curpage;
>       int            i;
> !     int            num_moved = 0,
>                   num_fraged_pages,
>                   vacuumed_pages;
> !     int            keep_tuples = 0;
>       VacRUsage    ru0;
>
>       vac_init_rusage(&ru0);
>
> !     ExecContext_Init(&ec, onerel);
>
>       Nvacpagelist.num_pages = 0;
>       num_fraged_pages = fraged_pages->num_pages;
> ***************
> *** 1539,1546 ****
>           last_vacuum_page = NULL;
>           last_vacuum_block = InvalidBlockNumber;
>       }
> -     cur_buffer = InvalidBuffer;
> -     num_moved = 0;
>
>       vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
>       vacpage->offsets_used = vacpage->offsets_free = 0;
> --- 1562,1567 ----
> ***************
> *** 1560,1565 ****
> --- 1581,1594 ----
>            blkno > last_move_dest_block;
>            blkno--)
>       {
> +         Buffer            buf;
> +         Page            page;
> +         OffsetNumber    offnum,
> +                         maxoff;
> +         bool            isempty,
> +                         dowrite,
> +                         chain_tuple_moved;
> +
>           vacuum_delay_point();
>
>           /*
> ***************
> *** 1635,1641 ****
>                offnum <= maxoff;
>                offnum = OffsetNumberNext(offnum))
>           {
> !             itemid = PageGetItemId(page, offnum);
>
>               if (!ItemIdIsUsed(itemid))
>                   continue;
> --- 1664,1672 ----
>                offnum <= maxoff;
>                offnum = OffsetNumberNext(offnum))
>           {
> !             Size            tuple_len;
> !             HeapTupleData    tuple;
> !             ItemId            itemid = PageGetItemId(page, offnum);
>
>               if (!ItemIdIsUsed(itemid))
>                   continue;
> ***************
> *** 1645,1689 ****
>               tuple_len = tuple.t_len = ItemIdGetLength(itemid);
>               ItemPointerSet(&(tuple.t_self), blkno, offnum);
>
>               if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
>               {
> !                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> !                     elog(ERROR, "HEAP_MOVED_IN was not expected");
>
>                   /*
>                    * If this (chain) tuple is moved by me already then I
>                    * have to check is it in vacpage or not - i.e. is it
>                    * moved while cleaning this page or some previous one.
>                    */
> !                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
> !                 {
> !                     if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> !                         elog(ERROR, "invalid XVAC in tuple header");
> !                     if (keep_tuples == 0)
> !                         continue;
> !                     if (chain_tuple_moved)        /* some chains was moved
> !                                                  * while */
> !                     {            /* cleaning this page */
> !                         Assert(vacpage->offsets_free > 0);
> !                         for (i = 0; i < vacpage->offsets_free; i++)
> !                         {
> !                             if (vacpage->offsets[i] == offnum)
> !                                 break;
> !                         }
> !                         if (i >= vacpage->offsets_free) /* not found */
> !                         {
> !                             vacpage->offsets[vacpage->offsets_free++] = offnum;
> !                             keep_tuples--;
> !                         }
>                       }
> !                     else
>                       {
>                           vacpage->offsets[vacpage->offsets_free++] = offnum;
>                           keep_tuples--;
>                       }
> -                     continue;
>                   }
> !                 elog(ERROR, "HEAP_MOVED_OFF was expected");
>               }
>
>               /*
> --- 1676,1746 ----
>               tuple_len = tuple.t_len = ItemIdGetLength(itemid);
>               ItemPointerSet(&(tuple.t_self), blkno, offnum);
>
> +             /*
> +              * VACUUM FULL has an exclusive lock on the relation.  So
> +              * normally no other transaction can have pending INSERTs or
> +              * DELETEs in this relation.  A tuple is either
> +              *   (a) a tuple in a system catalog, inserted or deleted by
> +              *       a not yet committed transaction or
> +              *   (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
> +              *   (c) inserted by a committed xact (XMIN_COMMITTED) or
> +              *   (d) moved by the currently running VACUUM.
> +              * In case (a) we wouldn't be in repair_frag() at all.
> +              * In case (b) we cannot be here, because scan_heap() has
> +              * already marked the item as unused, see continue above.
> +              * Case (c) is what normally is to be expected.
> +              * Case (d) is only possible, if a whole tuple chain has been
> +              * moved while processing this or a higher numbered block.
> +              */
>               if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
>               {
> !                 /*
> !                  * There cannot be another concurrently running VACUUM.  If
> !                  * the tuple had been moved in by a previous VACUUM, the
> !                  * visibility check would have set XMIN_COMMITTED.  If the
> !                  * tuple had been moved in by the currently running VACUUM,
> !                  * the loop would have been terminated.  We had
> !                  * elog(ERROR, ...) here, but as we are testing for a
> !                  * can't-happen condition, Assert() seems more appropriate.
> !                  */
> !                 Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));
>
>                   /*
>                    * If this (chain) tuple is moved by me already then I
>                    * have to check is it in vacpage or not - i.e. is it
>                    * moved while cleaning this page or some previous one.
>                    */
> !                 Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
> !                 /*
> !                  * MOVED_OFF by another VACUUM would have caused the
> !                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
> !                  */
> !                 Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
> !
> !                 /* Can't we Assert(keep_tuples > 0) here? */
> !                 if (keep_tuples == 0)
> !                     continue;
> !                 if (chain_tuple_moved)        /* some chains was moved
> !                                              * while */
> !                 {            /* cleaning this page */
> !                     Assert(vacpage->offsets_free > 0);
> !                     for (i = 0; i < vacpage->offsets_free; i++)
> !                     {
> !                         if (vacpage->offsets[i] == offnum)
> !                             break;
>                       }
> !                     if (i >= vacpage->offsets_free) /* not found */
>                       {
>                           vacpage->offsets[vacpage->offsets_free++] = offnum;
>                           keep_tuples--;
>                       }
>                   }
> !                 else
> !                 {
> !                     vacpage->offsets[vacpage->offsets_free++] = offnum;
> !                     keep_tuples--;
> !                 }
> !                 continue;
>               }
>
>               /*
> ***************
> *** 1716,1723 ****
>                   Buffer        Cbuf = buf;
>                   bool        freeCbuf = false;
>                   bool        chain_move_failed = false;
> -                 Page        Cpage;
> -                 ItemId        Citemid;
>                   ItemPointerData Ctid;
>                   HeapTupleData tp = tuple;
>                   Size        tlen = tuple_len;
> --- 1773,1778 ----
> ***************
> *** 1728,1737 ****
>                   int            to_item = 0;
>                   int            ti;
>
> !                 if (cur_buffer != InvalidBuffer)
>                   {
> !                     WriteBuffer(cur_buffer);
> !                     cur_buffer = InvalidBuffer;
>                   }
>
>                   /* Quick exit if we have no vtlinks to search in */
> --- 1783,1792 ----
>                   int            to_item = 0;
>                   int            ti;
>
> !                 if (dst_buffer != InvalidBuffer)
>                   {
> !                     WriteBuffer(dst_buffer);
> !                     dst_buffer = InvalidBuffer;
>                   }
>
>                   /* Quick exit if we have no vtlinks to search in */
> ***************
> *** 1754,1759 ****
> --- 1809,1818 ----
>                          !(ItemPointerEquals(&(tp.t_self),
>                                              &(tp.t_data->t_ctid))))
>                   {
> +                     Page        Cpage;
> +                     ItemId        Citemid;
> +                     ItemPointerData Ctid;
> +
>                       Ctid = tp.t_data->t_ctid;
>                       if (freeCbuf)
>                           ReleaseBuffer(Cbuf);
> ***************
> *** 1929,1940 ****
>                   }
>
>                   /*
> !                  * Okay, move the whle tuple chain
>                    */
>                   ItemPointerSetInvalid(&Ctid);
>                   for (ti = 0; ti < num_vtmove; ti++)
>                   {
>                       VacPage        destvacpage = vtmove[ti].vacpage;
>
>                       /* Get page to move from */
>                       tuple.t_self = vtmove[ti].tid;
> --- 1988,2001 ----
>                   }
>
>                   /*
> !                  * Okay, move the whole tuple chain
>                    */
>                   ItemPointerSetInvalid(&Ctid);
>                   for (ti = 0; ti < num_vtmove; ti++)
>                   {
>                       VacPage        destvacpage = vtmove[ti].vacpage;
> +                     Page        Cpage;
> +                     ItemId        Citemid;
>
>                       /* Get page to move from */
>                       tuple.t_self = vtmove[ti].tid;
> ***************
> *** 1942,1954 ****
>                                ItemPointerGetBlockNumber(&(tuple.t_self)));
>
>                       /* Get page to move to */
> !                     cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
>
> !                     LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                     if (cur_buffer != Cbuf)
>                           LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
>
> !                     ToPage = BufferGetPage(cur_buffer);
>                       Cpage = BufferGetPage(Cbuf);
>
>                       Citemid = PageGetItemId(Cpage,
> --- 2003,2015 ----
>                                ItemPointerGetBlockNumber(&(tuple.t_self)));
>
>                       /* Get page to move to */
> !                     dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
>
> !                     LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                     if (dst_buffer != Cbuf)
>                           LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
>
> !                     dst_page = BufferGetPage(dst_buffer);
>                       Cpage = BufferGetPage(Cbuf);
>
>                       Citemid = PageGetItemId(Cpage,
> ***************
> *** 1961,2081 ****
>                        * make a copy of the source tuple, and then mark the
>                        * source tuple MOVED_OFF.
>                        */
> !                     heap_copytuple_with_tuple(&tuple, &newtup);
> !
> !                     /*
> !                      * register invalidation of source tuple in catcaches.
> !                      */
> !                     CacheInvalidateHeapTuple(onerel, &tuple);
> !
> !                     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> !                     START_CRIT_SECTION();
> !
> !                     tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                                   HEAP_XMIN_INVALID |
> !                                                   HEAP_MOVED_IN);
> !                     tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
> !                     HeapTupleHeaderSetXvac(tuple.t_data, myXID);
> !
> !                     /*
> !                      * If this page was not used before - clean it.
> !                      *
> !                      * NOTE: a nasty bug used to lurk here.  It is possible
> !                      * for the source and destination pages to be the same
> !                      * (since this tuple-chain member can be on a page
> !                      * lower than the one we're currently processing in
> !                      * the outer loop).  If that's true, then after
> !                      * vacuum_page() the source tuple will have been
> !                      * moved, and tuple.t_data will be pointing at
> !                      * garbage.  Therefore we must do everything that uses
> !                      * tuple.t_data BEFORE this step!!
> !                      *
> !                      * This path is different from the other callers of
> !                      * vacuum_page, because we have already incremented
> !                      * the vacpage's offsets_used field to account for the
> !                      * tuple(s) we expect to move onto the page. Therefore
> !                      * vacuum_page's check for offsets_used == 0 is wrong.
> !                      * But since that's a good debugging check for all
> !                      * other callers, we work around it here rather than
> !                      * remove it.
> !                      */
> !                     if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
> !                     {
> !                         int            sv_offsets_used = destvacpage->offsets_used;
> !
> !                         destvacpage->offsets_used = 0;
> !                         vacuum_page(onerel, cur_buffer, destvacpage);
> !                         destvacpage->offsets_used = sv_offsets_used;
> !                     }
> !
> !                     /*
> !                      * Update the state of the copied tuple, and store it
> !                      * on the destination page.
> !                      */
> !                     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                                    HEAP_XMIN_INVALID |
> !                                                    HEAP_MOVED_OFF);
> !                     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !                     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !                     newoff = PageAddItem(ToPage,
> !                                          (Item) newtup.t_data,
> !                                          tuple_len,
> !                                          InvalidOffsetNumber,
> !                                          LP_USED);
> !                     if (newoff == InvalidOffsetNumber)
> !                     {
> !                         elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
> !                           (unsigned long) tuple_len, destvacpage->blkno);
> !                     }
> !                     newitemid = PageGetItemId(ToPage, newoff);
> !                     pfree(newtup.t_data);
> !                     newtup.t_datamcxt = NULL;
> !                     newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
> !                     ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
> !
> !                     /* XLOG stuff */
> !                     if (!onerel->rd_istemp)
> !                     {
> !                         XLogRecPtr    recptr =
> !                         log_heap_move(onerel, Cbuf, tuple.t_self,
> !                                       cur_buffer, &newtup);
> !
> !                         if (Cbuf != cur_buffer)
> !                         {
> !                             PageSetLSN(Cpage, recptr);
> !                             PageSetSUI(Cpage, ThisStartUpID);
> !                         }
> !                         PageSetLSN(ToPage, recptr);
> !                         PageSetSUI(ToPage, ThisStartUpID);
> !                     }
> !                     else
> !                     {
> !                         /*
> !                          * No XLOG record, but still need to flag that XID
> !                          * exists on disk
> !                          */
> !                         MyXactMadeTempRelUpdate = true;
> !                     }
> !
> !                     END_CRIT_SECTION();
>
>                       if (destvacpage->blkno > last_move_dest_block)
>                           last_move_dest_block = destvacpage->blkno;
>
>                       /*
> -                      * Set new tuple's t_ctid pointing to itself for last
> -                      * tuple in chain, and to next tuple in chain
> -                      * otherwise.
> -                      */
> -                     if (!ItemPointerIsValid(&Ctid))
> -                         newtup.t_data->t_ctid = newtup.t_self;
> -                     else
> -                         newtup.t_data->t_ctid = Ctid;
> -                     Ctid = newtup.t_self;
> -
> -                     num_moved++;
> -
> -                     /*
>                        * Remember that we moved tuple from the current page
>                        * (corresponding index tuple will be cleaned).
>                        */
> --- 2022,2036 ----
>                        * make a copy of the source tuple, and then mark the
>                        * source tuple MOVED_OFF.
>                        */
> !                     move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
> !                                      dst_buffer, dst_page, destvacpage,
> !                                      &ec, &Ctid, vtmove[ti].cleanVpd);
>
> +                     num_moved++;
>                       if (destvacpage->blkno > last_move_dest_block)
>                           last_move_dest_block = destvacpage->blkno;
>
>                       /*
>                        * Remember that we moved tuple from the current page
>                        * (corresponding index tuple will be cleaned).
>                        */
> ***************
> *** 2085,2107 ****
>                       else
>                           keep_tuples++;
>
> !                     LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
> !                     if (cur_buffer != Cbuf)
> !                         LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
> !
> !                     /* Create index entries for the moved tuple */
> !                     if (resultRelInfo->ri_NumIndices > 0)
> !                     {
> !                         ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
> !                         ExecInsertIndexTuples(slot, &(newtup.t_self),
> !                                               estate, true);
> !                     }
> !
> !                     WriteBuffer(cur_buffer);
>                       WriteBuffer(Cbuf);
>                   }                /* end of move-the-tuple-chain loop */
>
> !                 cur_buffer = InvalidBuffer;
>                   pfree(vtmove);
>                   chain_tuple_moved = true;
>
> --- 2040,2050 ----
>                       else
>                           keep_tuples++;
>
> !                     WriteBuffer(dst_buffer);
>                       WriteBuffer(Cbuf);
>                   }                /* end of move-the-tuple-chain loop */
>
> !                 dst_buffer = InvalidBuffer;
>                   pfree(vtmove);
>                   chain_tuple_moved = true;
>
> ***************
> *** 2110,2122 ****
>               }                    /* end of is-tuple-in-chain test */
>
>               /* try to find new page for this tuple */
> !             if (cur_buffer == InvalidBuffer ||
> !                 !enough_space(cur_page, tuple_len))
>               {
> !                 if (cur_buffer != InvalidBuffer)
>                   {
> !                     WriteBuffer(cur_buffer);
> !                     cur_buffer = InvalidBuffer;
>                   }
>                   for (i = 0; i < num_fraged_pages; i++)
>                   {
> --- 2053,2065 ----
>               }                    /* end of is-tuple-in-chain test */
>
>               /* try to find new page for this tuple */
> !             if (dst_buffer == InvalidBuffer ||
> !                 !enough_space(dst_vacpage, tuple_len))
>               {
> !                 if (dst_buffer != InvalidBuffer)
>                   {
> !                     WriteBuffer(dst_buffer);
> !                     dst_buffer = InvalidBuffer;
>                   }
>                   for (i = 0; i < num_fraged_pages; i++)
>                   {
> ***************
> *** 2125,2234 ****
>                   }
>                   if (i == num_fraged_pages)
>                       break;        /* can't move item anywhere */
> !                 cur_item = i;
> !                 cur_page = fraged_pages->pagedesc[cur_item];
> !                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
> !                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                 ToPage = BufferGetPage(cur_buffer);
>                   /* if this page was not used before - clean it */
> !                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
> !                     vacuum_page(onerel, cur_buffer, cur_page);
>               }
>               else
> !                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
>
>               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>
> !             /* copy tuple */
> !             heap_copytuple_with_tuple(&tuple, &newtup);
> !
> !             /*
> !              * register invalidation of source tuple in catcaches.
> !              *
> !              * (Note: we do not need to register the copied tuple, because we
> !              * are not changing the tuple contents and so there cannot be
> !              * any need to flush negative catcache entries.)
> !              */
> !             CacheInvalidateHeapTuple(onerel, &tuple);
>
> -             /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> -             START_CRIT_SECTION();
>
> !             /*
> !              * Mark new tuple as MOVED_IN by me.
> !              */
> !             newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                            HEAP_XMIN_INVALID |
> !                                            HEAP_MOVED_OFF);
> !             newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !             HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !
> !             /* add tuple to the page */
> !             newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
> !                                  InvalidOffsetNumber, LP_USED);
> !             if (newoff == InvalidOffsetNumber)
> !             {
> !                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
> !                      (unsigned long) tuple_len,
> !                      cur_page->blkno, (unsigned long) cur_page->free,
> !                      cur_page->offsets_used, cur_page->offsets_free);
> !             }
> !             newitemid = PageGetItemId(ToPage, newoff);
> !             pfree(newtup.t_data);
> !             newtup.t_datamcxt = NULL;
> !             newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
> !             ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
> !             newtup.t_self = newtup.t_data->t_ctid;
>
>               /*
> !              * Mark old tuple as MOVED_OFF by me.
>                */
> -             tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> -                                           HEAP_XMIN_INVALID |
> -                                           HEAP_MOVED_IN);
> -             tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
> -             HeapTupleHeaderSetXvac(tuple.t_data, myXID);
> -
> -             /* XLOG stuff */
> -             if (!onerel->rd_istemp)
> -             {
> -                 XLogRecPtr    recptr =
> -                 log_heap_move(onerel, buf, tuple.t_self,
> -                               cur_buffer, &newtup);
> -
> -                 PageSetLSN(page, recptr);
> -                 PageSetSUI(page, ThisStartUpID);
> -                 PageSetLSN(ToPage, recptr);
> -                 PageSetSUI(ToPage, ThisStartUpID);
> -             }
> -             else
> -             {
> -                 /*
> -                  * No XLOG record, but still need to flag that XID exists
> -                  * on disk
> -                  */
> -                 MyXactMadeTempRelUpdate = true;
> -             }
> -
> -             END_CRIT_SECTION();
> -
> -             cur_page->offsets_used++;
> -             num_moved++;
> -             cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
> -             if (cur_page->blkno > last_move_dest_block)
> -                 last_move_dest_block = cur_page->blkno;
> -
>               vacpage->offsets[vacpage->offsets_free++] = offnum;
> -
> -             LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
> -             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> -
> -             /* insert index' tuples if needed */
> -             if (resultRelInfo->ri_NumIndices > 0)
> -             {
> -                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
> -                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
> -             }
>           }                        /* walk along page */
>
>           /*
> --- 2068,2099 ----
>                   }
>                   if (i == num_fraged_pages)
>                       break;        /* can't move item anywhere */
> !                 dst_vacpage = fraged_pages->pagedesc[i];
> !                 dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
> !                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
> !                 dst_page = BufferGetPage(dst_buffer);
>                   /* if this page was not used before - clean it */
> !                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
> !                     vacuum_page(onerel, dst_buffer, dst_vacpage);
>               }
>               else
> !                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
>
>               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>
> !             move_plain_tuple(onerel, buf, page, &tuple,
> !                              dst_buffer, dst_page, dst_vacpage, &ec);
>
>
> !             num_moved++;
> !             if (dst_vacpage->blkno > last_move_dest_block)
> !                 last_move_dest_block = dst_vacpage->blkno;
>
>               /*
> !              * Remember that we moved tuple from the current page
> !              * (corresponding index tuple will be cleaned).
>                */
>               vacpage->offsets[vacpage->offsets_free++] = offnum;
>           }                        /* walk along page */
>
>           /*
> ***************
> *** 2249,2284 ****
>                    off <= maxoff;
>                    off = OffsetNumberNext(off))
>               {
> !                 itemid = PageGetItemId(page, off);
>                   if (!ItemIdIsUsed(itemid))
>                       continue;
> !                 tuple.t_datamcxt = NULL;
> !                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
> !                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
>                       continue;
> !                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> !                     elog(ERROR, "HEAP_MOVED_IN was not expected");
> !                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
>                   {
> !                     if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> !                         elog(ERROR, "invalid XVAC in tuple header");
> !                     /* some chains was moved while */
> !                     if (chain_tuple_moved)
> !                     {            /* cleaning this page */
> !                         Assert(vacpage->offsets_free > 0);
> !                         for (i = 0; i < vacpage->offsets_free; i++)
> !                         {
> !                             if (vacpage->offsets[i] == off)
> !                                 break;
> !                         }
> !                         if (i >= vacpage->offsets_free) /* not found */
> !                         {
> !                             vacpage->offsets[vacpage->offsets_free++] = off;
> !                             Assert(keep_tuples > 0);
> !                             keep_tuples--;
> !                         }
>                       }
> !                     else
>                       {
>                           vacpage->offsets[vacpage->offsets_free++] = off;
>                           Assert(keep_tuples > 0);
> --- 2114,2144 ----
>                    off <= maxoff;
>                    off = OffsetNumberNext(off))
>               {
> !                 ItemId    itemid = PageGetItemId(page, off);
> !                 HeapTupleHeader    htup;
> !
>                   if (!ItemIdIsUsed(itemid))
>                       continue;
> !                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
> !                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
>                       continue;
> !                 /*
> !                 ** See comments in the walk-along-page loop above, why we
> !                 ** have Asserts here instead of if (...) elog(ERROR).
> !                 */
> !                 Assert(!(htup->t_infomask & HEAP_MOVED_IN));
> !                 Assert(htup->t_infomask & HEAP_MOVED_OFF);
> !                 Assert(HeapTupleHeaderGetXvac(htup) == myXID);
> !                 if (chain_tuple_moved)
>                   {
> !                     /* some chains was moved while cleaning this page */
> !                     Assert(vacpage->offsets_free > 0);
> !                     for (i = 0; i < vacpage->offsets_free; i++)
> !                     {
> !                         if (vacpage->offsets[i] == off)
> !                             break;
>                       }
> !                     if (i >= vacpage->offsets_free) /* not found */
>                       {
>                           vacpage->offsets[vacpage->offsets_free++] = off;
>                           Assert(keep_tuples > 0);
> ***************
> *** 2286,2292 ****
>                       }
>                   }
>                   else
> !                     elog(ERROR, "HEAP_MOVED_OFF was expected");
>               }
>           }
>
> --- 2146,2156 ----
>                       }
>                   }
>                   else
> !                 {
> !                     vacpage->offsets[vacpage->offsets_free++] = off;
> !                     Assert(keep_tuples > 0);
> !                     keep_tuples--;
> !                 }
>               }
>           }
>
> ***************
> *** 2312,2321 ****
>
>       blkno++;                    /* new number of blocks */
>
> !     if (cur_buffer != InvalidBuffer)
>       {
>           Assert(num_moved > 0);
> !         WriteBuffer(cur_buffer);
>       }
>
>       if (num_moved > 0)
> --- 2176,2185 ----
>
>       blkno++;                    /* new number of blocks */
>
> !     if (dst_buffer != InvalidBuffer)
>       {
>           Assert(num_moved > 0);
> !         WriteBuffer(dst_buffer);
>       }
>
>       if (num_moved > 0)
> ***************
> *** 2348,2353 ****
> --- 2212,2220 ----
>           Assert((*curpage)->blkno < blkno);
>           if ((*curpage)->offsets_used == 0)
>           {
> +             Buffer        buf;
> +             Page        page;
> +
>               /* this page was not used as a move target, so must clean it */
>               buf = ReadBuffer(onerel, (*curpage)->blkno);
>               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> ***************
> *** 2363,2424 ****
>        * Now scan all the pages that we moved tuples onto and update tuple
>        * status bits.  This is not really necessary, but will save time for
>        * future transactions examining these tuples.
> -      *
> -      * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
> -      * pages that were move source pages but not move dest pages.  One
> -      * also wonders whether it wouldn't be better to skip this step and
> -      * let the tuple status updates happen someplace that's not holding an
> -      * exclusive lock on the relation.
>        */
> !     checked_moved = 0;
> !     for (i = 0, curpage = fraged_pages->pagedesc;
> !          i < num_fraged_pages;
> !          i++, curpage++)
> !     {
> !         vacuum_delay_point();
> !
> !         Assert((*curpage)->blkno < blkno);
> !         if ((*curpage)->blkno > last_move_dest_block)
> !             break;                /* no need to scan any further */
> !         if ((*curpage)->offsets_used == 0)
> !             continue;            /* this page was never used as a move dest */
> !         buf = ReadBuffer(onerel, (*curpage)->blkno);
> !         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> !         page = BufferGetPage(buf);
> !         num_tuples = 0;
> !         max_offset = PageGetMaxOffsetNumber(page);
> !         for (newoff = FirstOffsetNumber;
> !              newoff <= max_offset;
> !              newoff = OffsetNumberNext(newoff))
> !         {
> !             itemid = PageGetItemId(page, newoff);
> !             if (!ItemIdIsUsed(itemid))
> !                 continue;
> !             tuple.t_datamcxt = NULL;
> !             tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
> !             if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> !             {
> !                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
> !                     elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
> !                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> !                     elog(ERROR, "invalid XVAC in tuple header");
> !                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> !                 {
> !                     tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
> !                     tuple.t_data->t_infomask &= ~HEAP_MOVED;
> !                     num_tuples++;
> !                 }
> !                 else
> !                     tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
> !             }
> !         }
> !         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> !         WriteBuffer(buf);
> !         Assert((*curpage)->offsets_used == num_tuples);
> !         checked_moved += num_tuples;
> !     }
> !     Assert(num_moved == checked_moved);
> !
>       /*
>        * It'd be cleaner to make this report at the bottom of this routine,
>        * but then the rusage would double-count the second pass of index
> --- 2230,2239 ----
>        * Now scan all the pages that we moved tuples onto and update tuple
>        * status bits.  This is not really necessary, but will save time for
>        * future transactions examining these tuples.
>        */
> !     update_hint_bits(onerel, fraged_pages, num_fraged_pages,
> !                      last_move_dest_block, num_moved);
> !
>       /*
>        * It'd be cleaner to make this report at the bottom of this routine,
>        * but then the rusage would double-count the second pass of index
> ***************
> *** 2455,2460 ****
> --- 2270,2282 ----
>                   *vpleft = *vpright;
>                   *vpright = vpsave;
>               }
> +             /*
> +              * keep_tuples is the number of tuples that have been moved
> +              * off a page during chain moves but not been scanned over
> +              * subsequently.  The tuple ids of these tuples are not
> +              * recorded as free offsets for any VacPage, so they will not
> +              * be cleared from the indexes.
> +              */
>               Assert(keep_tuples >= 0);
>               for (i = 0; i < nindexes; i++)
>                   vacuum_index(&Nvacpagelist, Irel[i],
> ***************
> *** 2465,2500 ****
>           if (vacpage->blkno == (blkno - 1) &&
>               vacpage->offsets_free > 0)
>           {
> !             OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
> !             int            uncnt;
>
>               buf = ReadBuffer(onerel, vacpage->blkno);
>               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>               page = BufferGetPage(buf);
> -             num_tuples = 0;
>               maxoff = PageGetMaxOffsetNumber(page);
>               for (offnum = FirstOffsetNumber;
>                    offnum <= maxoff;
>                    offnum = OffsetNumberNext(offnum))
>               {
> !                 itemid = PageGetItemId(page, offnum);
>                   if (!ItemIdIsUsed(itemid))
>                       continue;
> !                 tuple.t_datamcxt = NULL;
> !                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
>
> !                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> !                 {
> !                     if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
> !                     {
> !                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> !                             elog(ERROR, "invalid XVAC in tuple header");
> !                         itemid->lp_flags &= ~LP_USED;
> !                         num_tuples++;
> !                     }
> !                     else
> !                         elog(ERROR, "HEAP_MOVED_OFF was expected");
> !                 }
>
>               }
>               Assert(vacpage->offsets_free == num_tuples);
> --- 2287,2327 ----
>           if (vacpage->blkno == (blkno - 1) &&
>               vacpage->offsets_free > 0)
>           {
> !             Buffer            buf;
> !             Page            page;
> !             OffsetNumber    unused[BLCKSZ / sizeof(OffsetNumber)];
> !             OffsetNumber    offnum,
> !                             maxoff;
> !             int                uncnt;
> !             int                num_tuples = 0;
>
>               buf = ReadBuffer(onerel, vacpage->blkno);
>               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>               page = BufferGetPage(buf);
>               maxoff = PageGetMaxOffsetNumber(page);
>               for (offnum = FirstOffsetNumber;
>                    offnum <= maxoff;
>                    offnum = OffsetNumberNext(offnum))
>               {
> !                 ItemId    itemid = PageGetItemId(page, offnum);
> !                 HeapTupleHeader htup;
> !
>                   if (!ItemIdIsUsed(itemid))
>                       continue;
> !                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
> !                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
> !                     continue;
>
> !                 /*
> !                 ** See comments in the walk-along-page loop above, why we
> !                 ** have Asserts here instead of if (...) elog(ERROR).
> !                 */
> !                 Assert(!(htup->t_infomask & HEAP_MOVED_IN));
> !                 Assert(htup->t_infomask & HEAP_MOVED_OFF);
> !                 Assert(HeapTupleHeaderGetXvac(htup) == myXID);
> !
> !                 itemid->lp_flags &= ~LP_USED;
> !                 num_tuples++;
>
>               }
>               Assert(vacpage->offsets_free == num_tuples);
> ***************
> *** 2554,2564 ****
>       if (vacrelstats->vtlinks != NULL)
>           pfree(vacrelstats->vtlinks);
>
> !     ExecDropTupleTable(tupleTable, true);
>
> !     ExecCloseIndices(resultRelInfo);
>
> !     FreeExecutorState(estate);
>   }
>
>   /*
> --- 2381,2717 ----
>       if (vacrelstats->vtlinks != NULL)
>           pfree(vacrelstats->vtlinks);
>
> !     ExecContext_Finish(&ec);
> ! }
> !
> ! /*
> !  *    move_chain_tuple() -- move one tuple that is part of a tuple chain
> !  *
> !  *        This routine moves old_tup from old_page to dst_page.
> !  *        old_page and dst_page might be the same page.
> !  *        On entry old_buf and dst_buf are locked exclusively, both locks (or
> !  *        the single lock, if this is a intra-page-move) are released before
> !  *        exit.
> !  *
> !  *        Yes, a routine with ten parameters is ugly, but it's still better
> !  *        than having these 120 lines of code in repair_frag() which is
> !  *        already too long and almost unreadable.
> !  */
> ! static void
> ! move_chain_tuple(Relation rel,
> !                  Buffer old_buf, Page old_page, HeapTuple old_tup,
> !                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> !                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
> ! {
> !     TransactionId myXID = GetCurrentTransactionId();
> !     HeapTupleData    newtup;
> !     OffsetNumber    newoff;
> !     ItemId            newitemid;
> !     Size            tuple_len = old_tup->t_len;
> !
> !     heap_copytuple_with_tuple(old_tup, &newtup);
> !
> !     /*
> !      * register invalidation of source tuple in catcaches.
> !      */
> !     CacheInvalidateHeapTuple(rel, old_tup);
> !
> !     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> !     START_CRIT_SECTION();
> !
> !     old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                   HEAP_XMIN_INVALID |
> !                                   HEAP_MOVED_IN);
> !     old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
> !     HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
> !
> !     /*
> !      * If this page was not used before - clean it.
> !      *
> !      * NOTE: a nasty bug used to lurk here.  It is possible
> !      * for the source and destination pages to be the same
> !      * (since this tuple-chain member can be on a page
> !      * lower than the one we're currently processing in
> !      * the outer loop).  If that's true, then after
> !      * vacuum_page() the source tuple will have been
> !      * moved, and tuple.t_data will be pointing at
> !      * garbage.  Therefore we must do everything that uses
> !      * old_tup->t_data BEFORE this step!!
> !      *
> !      * This path is different from the other callers of
> !      * vacuum_page, because we have already incremented
> !      * the vacpage's offsets_used field to account for the
> !      * tuple(s) we expect to move onto the page. Therefore
> !      * vacuum_page's check for offsets_used == 0 is wrong.
> !      * But since that's a good debugging check for all
> !      * other callers, we work around it here rather than
> !      * remove it.
> !      */
> !     if (!PageIsEmpty(dst_page) && cleanVpd)
> !     {
> !         int        sv_offsets_used = dst_vacpage->offsets_used;
> !
> !         dst_vacpage->offsets_used = 0;
> !         vacuum_page(rel, dst_buf, dst_vacpage);
> !         dst_vacpage->offsets_used = sv_offsets_used;
> !     }
>
> !     /*
> !      * Update the state of the copied tuple, and store it
> !      * on the destination page.
> !      */
> !     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                    HEAP_XMIN_INVALID |
> !                                    HEAP_MOVED_OFF);
> !     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !     newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
> !                          InvalidOffsetNumber, LP_USED);
> !     if (newoff == InvalidOffsetNumber)
> !     {
> !         elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
> !           (unsigned long) tuple_len, dst_vacpage->blkno);
> !     }
> !     newitemid = PageGetItemId(dst_page, newoff);
> !     pfree(newtup.t_data);
> !     newtup.t_datamcxt = NULL;
> !     newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
> !     ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
> !
> !     /* XLOG stuff */
> !     if (!rel->rd_istemp)
> !     {
> !         XLogRecPtr    recptr = log_heap_move(rel, old_buf, old_tup->t_self,
> !                                            dst_buf, &newtup);
> !
> !         if (old_buf != dst_buf)
> !         {
> !             PageSetLSN(old_page, recptr);
> !             PageSetSUI(old_page, ThisStartUpID);
> !         }
> !         PageSetLSN(dst_page, recptr);
> !         PageSetSUI(dst_page, ThisStartUpID);
> !     }
> !     else
> !     {
> !         /*
> !          * No XLOG record, but still need to flag that XID
> !          * exists on disk
> !          */
> !         MyXactMadeTempRelUpdate = true;
> !     }
> !
> !     END_CRIT_SECTION();
> !
> !     /*
> !      * Set new tuple's t_ctid pointing to itself for last
> !      * tuple in chain, and to next tuple in chain
> !      * otherwise.
> !      */
> !     /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
> !     if (!ItemPointerIsValid(ctid))
> !         newtup.t_data->t_ctid = newtup.t_self;
> !     else
> !         newtup.t_data->t_ctid = *ctid;
> !     *ctid = newtup.t_self;
>
> !     LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
> !     if (dst_buf != old_buf)
> !         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
> !
> !     /* Create index entries for the moved tuple */
> !     if (ec->resultRelInfo->ri_NumIndices > 0)
> !     {
> !         ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
> !         ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
> !     }
> ! }
> !
> ! /*
> !  *    move_plain_tuple() -- move one tuple that is not part of a chain
> !  *
> !  *        This routine moves old_tup from old_page to dst_page.
> !  *        On entry old_buf and dst_buf are locked exclusively, both locks are
> !  *        released before exit.
> !  *
> !  *        Yes, a routine with eight parameters is ugly, but it's still better
> !  *        than having these 90 lines of code in repair_frag() which is already
> !  *        too long and almost unreadable.
> !  */
> ! static void
> ! move_plain_tuple(Relation rel,
> !                  Buffer old_buf, Page old_page, HeapTuple old_tup,
> !                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> !                  ExecContext ec)
> ! {
> !     TransactionId myXID = GetCurrentTransactionId();
> !     HeapTupleData    newtup;
> !     OffsetNumber    newoff;
> !     ItemId            newitemid;
> !     Size            tuple_len = old_tup->t_len;
> !
> !     /* copy tuple */
> !     heap_copytuple_with_tuple(old_tup, &newtup);
> !
> !     /*
> !      * register invalidation of source tuple in catcaches.
> !      *
> !      * (Note: we do not need to register the copied tuple, because we
> !      * are not changing the tuple contents and so there cannot be
> !      * any need to flush negative catcache entries.)
> !      */
> !     CacheInvalidateHeapTuple(rel, old_tup);
> !
> !     /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> !     START_CRIT_SECTION();
> !
> !     /*
> !      * Mark new tuple as MOVED_IN by me.
> !      */
> !     newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                    HEAP_XMIN_INVALID |
> !                                    HEAP_MOVED_OFF);
> !     newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> !     HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !
> !     /* add tuple to the page */
> !     newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
> !                          InvalidOffsetNumber, LP_USED);
> !     if (newoff == InvalidOffsetNumber)
> !     {
> !         elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
> !              (unsigned long) tuple_len,
> !              dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
> !              dst_vacpage->offsets_used, dst_vacpage->offsets_free);
> !     }
> !     newitemid = PageGetItemId(dst_page, newoff);
> !     pfree(newtup.t_data);
> !     newtup.t_datamcxt = NULL;
> !     newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
> !     ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
> !     newtup.t_self = newtup.t_data->t_ctid;
> !
> !     /*
> !      * Mark old tuple as MOVED_OFF by me.
> !      */
> !     old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> !                                   HEAP_XMIN_INVALID |
> !                                   HEAP_MOVED_IN);
> !     old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
> !     HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
> !
> !     /* XLOG stuff */
> !     if (!rel->rd_istemp)
> !     {
> !         XLogRecPtr    recptr = log_heap_move(rel, old_buf, old_tup->t_self,
> !                                            dst_buf, &newtup);
> !
> !         PageSetLSN(old_page, recptr);
> !         PageSetSUI(old_page, ThisStartUpID);
> !         PageSetLSN(dst_page, recptr);
> !         PageSetSUI(dst_page, ThisStartUpID);
> !     }
> !     else
> !     {
> !         /*
> !          * No XLOG record, but still need to flag that XID exists
> !          * on disk
> !          */
> !         MyXactMadeTempRelUpdate = true;
> !     }
> !
> !     END_CRIT_SECTION();
> !
> !     dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
> !                         ((PageHeader) dst_page)->pd_lower;
> !     LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
> !     LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
> !
> !     dst_vacpage->offsets_used++;
> !
> !     /* insert index' tuples if needed */
> !     if (ec->resultRelInfo->ri_NumIndices > 0)
> !     {
> !         ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
> !         ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
> !     }
> ! }
> !
> ! /*
> !  *    update_hint_bits() -- update hint bits in destination pages
> !  *
> !  * Scan all the pages that we moved tuples onto and update tuple
> !  * status bits.  This is not really necessary, but will save time for
> !  * future transactions examining these tuples.
> !  *
> !  * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
> !  * pages that were move source pages but not move dest pages.  One
> !  * also wonders whether it wouldn't be better to skip this step and
> !  * let the tuple status updates happen someplace that's not holding an
> !  * exclusive lock on the relation.
> !  */
> ! static void
> ! update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
> !                  BlockNumber last_move_dest_block, int num_moved)
> ! {
> !     int            checked_moved = 0;
> !     int            i;
> !     VacPage       *curpage;
> !
> !     for (i = 0, curpage = fraged_pages->pagedesc;
> !          i < num_fraged_pages;
> !          i++, curpage++)
> !     {
> !         Buffer            buf;
> !         Page            page;
> !         OffsetNumber    max_offset;
> !         OffsetNumber    off;
> !         int                num_tuples = 0;
> !
> !         vacuum_delay_point();
> !
> !         if ((*curpage)->blkno > last_move_dest_block)
> !             break;                /* no need to scan any further */
> !         if ((*curpage)->offsets_used == 0)
> !             continue;            /* this page was never used as a move dest */
> !         buf = ReadBuffer(rel, (*curpage)->blkno);
> !         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> !         page = BufferGetPage(buf);
> !         max_offset = PageGetMaxOffsetNumber(page);
> !         for (off = FirstOffsetNumber;
> !              off <= max_offset;
> !              off = OffsetNumberNext(off))
> !         {
> !             ItemId    itemid = PageGetItemId(page, off);
> !             HeapTupleHeader    htup;
> !
> !             if (!ItemIdIsUsed(itemid))
> !                 continue;
> !             htup = (HeapTupleHeader) PageGetItem(page, itemid);
> !             if (htup->t_infomask & HEAP_XMIN_COMMITTED)
> !                 continue;
> !             /*
> !              * See comments in the walk-along-page loop above, why we
> !              * have Asserts here instead of if (...) elog(ERROR).  The
> !              * difference here is that we may see MOVED_IN.
> !              */
> !             Assert(htup->t_infomask & HEAP_MOVED);
> !             Assert(HeapTupleHeaderGetXvac(htup) == GetCurrentTransactionId());
> !             if (htup->t_infomask & HEAP_MOVED_IN)
> !             {
> !                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
> !                 htup->t_infomask &= ~HEAP_MOVED;
> !                 num_tuples++;
> !             }
> !             else
> !                 htup->t_infomask |= HEAP_XMIN_INVALID;
> !         }
> !         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> !         WriteBuffer(buf);
> !         Assert((*curpage)->offsets_used == num_tuples);
> !         checked_moved += num_tuples;
> !     }
> !     Assert(num_moved == checked_moved);
>   }
>
>   /*

>
> ---------------------------(end of broadcast)---------------------------
> TIP 6: Have you searched our list archives?
>
>                http://archives.postgresql.org

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

pgsql-patches by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: Stylistic changes in bufmgr.c
Next
From: Fabien COELHO
Date:
Subject: fix schema ownership for database owner on first connection