Thread: WIP: New Page API

WIP: New Page API

From
Zdenek Kotala
Date:
I finished first prototype of new page API. It contains several new functions like:

Pointer PageGetUpperPointer(Page page);

void PageClearPrunable(Page page);
bool PageIsComprimable(Page page);

void PageReserveLinp(Page page);
void PageReleaseLinp(Page page);

LocationIndex PageGetLower(Page page);
LocationIndex PageGetUpper(Page page);
LocationIndex PageGetSpecial(Page page);
void PageSetLower(Page page, LocationIndex lower);
void PageSetUpper(Page page, LocationIndex lower);

Page PageGetTempPage(Page page, bool copy);
Page PageGetTempPageCopySpecial(Page page);
void PageRestoreTempPage(Page tempPage, Page oldPage);

Size PageGetSpecialSize(Page page);
Size PageGetDataSize(Page page);

bool PageLayoutIsValid(Page page);


The main point of the new API is to handle multi page layout versions. The
current implementation don't uses any speed optimization and performance gap is
about 5% (big thanks to Paul van den Bogaard for benchmarking). Finally I plan
to implement hottest function like macro (or inline fn ;-) ) and because most
structure members are located on same place I will remove extra switch in the code.

I also grab number of calls by DTrace and I got following result:

<snip>
   PageGetHeapFreeSpace                                            984
   PageGetSpecialSize                                             1170
   PageGetFreeSpace                                               1200
   PageGetExactFreeSpace                                          1399
   PageGetUpper                                                   1419
   PageGetLower                                                   1618
   PageGetLSN                                                     2936
   PageGetMaxOffsetNumber                                         5504
   PageGetSpecialPointer                                         13534
   PageGetItemId                                                 71074
   PageGetItem                                                   76629

I plan to remove PageGetItemId and replace it with any other function like
PageGetHeapTuple and so on. The reason is that ItemId flags has been changed
between version 3 and 4. And on many times it is called like

itemId = PageGetItemId();
PageGetItem(itemId);

I'm also thinking about add following function:

PageSetXLOG(page,TLI,LSN) - it should replace PageSetLSN();PageSetTLI();
sequence in code

I'm not happy with PageSetLower() function which is used in nbtree, but no idea
yet how to improve it.

Please, let me know your comments. I attached prototype patch.

    Thanks Zdenek


--
Zdenek Kotala              Sun Microsystems
Prague, Czech Republic     http://sun.com/postgresql

diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/gin/gindatapage.c
--- a/src/backend/access/gin/gindatapage.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/gin/gindatapage.c    Mon Aug 11 15:58:27 2008 +0200
@@ -445,7 +445,7 @@
     char       *ptr;
     OffsetNumber separator;
     ItemPointer bound;
-    Page        lpage = GinPageGetCopyPage(BufferGetPage(lbuf));
+    Page        lpage = PageGetTempPage(BufferGetPage(lbuf), true);
     ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
     int            sizeofitem = GinSizeOfItem(lpage);
     OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/gin/ginentrypage.c
--- a/src/backend/access/gin/ginentrypage.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/gin/ginentrypage.c    Mon Aug 11 15:58:27 2008 +0200
@@ -458,7 +458,7 @@
                 leftrightmost = NULL;
     static ginxlogSplit data;
     Page        page;
-    Page        lpage = GinPageGetCopyPage(BufferGetPage(lbuf));
+    Page        lpage = PageGetTempPage(BufferGetPage(lbuf), true);
     Page        rpage = BufferGetPage(rbuf);
     Size        pageSize = PageGetPageSize(lpage);

diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/gin/ginutil.c
--- a/src/backend/access/gin/ginutil.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/gin/ginutil.c    Mon Aug 11 15:58:27 2008 +0200
@@ -309,21 +309,6 @@
     return entries;
 }

-/*
- * It's analog of PageGetTempPage(), but copies whole page
- */
-Page
-GinPageGetCopyPage(Page page)
-{
-    Size        pageSize = PageGetPageSize(page);
-    Page        tmppage;
-
-    tmppage = (Page) palloc(pageSize);
-    memcpy(tmppage, page, pageSize);
-
-    return tmppage;
-}
-
 Datum
 ginoptions(PG_FUNCTION_ARGS)
 {
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/gin/ginvacuum.c
--- a/src/backend/access/gin/ginvacuum.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/gin/ginvacuum.c    Mon Aug 11 15:58:27 2008 +0200
@@ -529,7 +529,7 @@
                      * On first difference we create temporary page in memory
                      * and copies content in to it.
                      */
-                    tmppage = GinPageGetCopyPage(origpage);
+                    tmppage = PageGetTempPage(origpage, true);

                     if (newN > 0)
                     {
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/gist/gist.c
--- a/src/backend/access/gist/gist.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/gist/gist.c    Mon Aug 11 15:58:27 2008 +0200
@@ -339,7 +339,7 @@
              * we must create temporary page to operate
              */
             dist->buffer = state->stack->buffer;
-            dist->page = PageGetTempPage(BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData));
+            dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer));

             /* clean all flags except F_LEAF */
             GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/gist/gistvacuum.c
--- a/src/backend/access/gist/gistvacuum.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/gist/gistvacuum.c    Mon Aug 11 15:58:27 2008 +0200
@@ -140,18 +140,6 @@
     END_CRIT_SECTION();

     UnlockReleaseBuffer(buffer);
-}
-
-static Page
-GistPageGetCopyPage(Page page)
-{
-    Size        pageSize = PageGetPageSize(page);
-    Page        tmppage;
-
-    tmppage = (Page) palloc(pageSize);
-    memcpy(tmppage, page, pageSize);
-
-    return tmppage;
 }

 static ArrayTuple
@@ -322,7 +310,7 @@
         addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon);

         /* get copy of page to work */
-        tempPage = GistPageGetCopyPage(page);
+        tempPage = PageGetTempPage(page, true);

         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
         {
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/hash/hash.c
--- a/src/backend/access/hash/hash.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/hash/hash.c    Mon Aug 11 15:58:27 2008 +0200
@@ -527,7 +527,7 @@
      * each bucket.
      */
     metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
-    metap = (HashMetaPage) BufferGetPage(metabuf);
+    metap =  HashPageGetMeta(BufferGetPage(metabuf));
     orig_maxbucket = metap->hashm_maxbucket;
     orig_ntuples = metap->hashm_ntuples;
     memcpy(&local_metapage, metap, sizeof(local_metapage));
@@ -629,7 +629,7 @@

     /* Write-lock metapage and check for split since we started */
     metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE, LH_META_PAGE);
-    metap = (HashMetaPage) BufferGetPage(metabuf);
+    metap = HashPageGetMeta(BufferGetPage(metabuf));

     if (cur_maxbucket != metap->hashm_maxbucket)
     {
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/hash/hashinsert.c
--- a/src/backend/access/hash/hashinsert.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/hash/hashinsert.c    Mon Aug 11 15:58:27 2008 +0200
@@ -69,7 +69,7 @@

     /* Read the metapage */
     metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
-    metap = (HashMetaPage) BufferGetPage(metabuf);
+    metap = HashPageGetMeta(BufferGetPage(metabuf));

     /*
      * Check whether the item can fit on a hash page at all. (Eventually, we
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/hash/hashovfl.c
--- a/src/backend/access/hash/hashovfl.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/hash/hashovfl.c    Mon Aug 11 15:58:27 2008 +0200
@@ -187,7 +187,7 @@
     _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);

     _hash_checkpage(rel, metabuf, LH_META_PAGE);
-    metap = (HashMetaPage) BufferGetPage(metabuf);
+    metap = HashPageGetMeta(BufferGetPage(metabuf));

     /* start search at hashm_firstfree */
     orig_firstfree = metap->hashm_firstfree;
@@ -450,7 +450,7 @@

     /* Read the metapage so we can determine which bitmap page to use */
     metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
-    metap = (HashMetaPage) BufferGetPage(metabuf);
+    metap = HashPageGetMeta(BufferGetPage(metabuf));

     /* Identify which bit to set */
     ovflbitno = blkno_to_bitno(metap, ovflblkno);
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/hash/hashpage.c
--- a/src/backend/access/hash/hashpage.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/hash/hashpage.c    Mon Aug 11 15:58:27 2008 +0200
@@ -395,20 +395,18 @@
     pageopaque->hasho_flag = LH_META_PAGE;
     pageopaque->hasho_page_id = HASHO_PAGE_ID;

-    metap = (HashMetaPage) pg;
+    metap = HashPageGetMeta(pg);

     metap->hashm_magic = HASH_MAGIC;
     metap->hashm_version = HASH_VERSION;
     metap->hashm_ntuples = 0;
     metap->hashm_nmaps = 0;
     metap->hashm_ffactor = ffactor;
-    metap->hashm_bsize = BufferGetPageSize(metabuf);
+    metap->hashm_bsize = HashGetMaxBitmapSize(pg);
     /* find largest bitmap array size that will fit in page size */
     for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
     {
-        if ((1 << i) <= (metap->hashm_bsize -
-                         (MAXALIGN(sizeof(PageHeaderData)) +
-                          MAXALIGN(sizeof(HashPageOpaqueData)))))
+        if ((1 << i) <= metap->hashm_bsize)
             break;
     }
     Assert(i > 0);
@@ -532,7 +530,7 @@
     _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);

     _hash_checkpage(rel, metabuf, LH_META_PAGE);
-    metap = (HashMetaPage) BufferGetPage(metabuf);
+    metap = HashPageGetMeta(BufferGetPage(metabuf));

     /*
      * Check to see if split is still needed; someone else might have already
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/hash/hashsearch.c
--- a/src/backend/access/hash/hashsearch.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/hash/hashsearch.c    Mon Aug 11 15:58:27 2008 +0200
@@ -186,7 +186,7 @@

     /* Read the metapage */
     metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
-    metap = (HashMetaPage) BufferGetPage(metabuf);
+    metap = HashPageGetMeta(BufferGetPage(metabuf));

     /*
      * Compute the target bucket number, and convert to block number.
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/hash/hashutil.c
--- a/src/backend/access/hash/hashutil.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/hash/hashutil.c    Mon Aug 11 15:58:27 2008 +0200
@@ -190,7 +190,7 @@
      */
     if (flags == LH_META_PAGE)
     {
-        HashMetaPage metap = (HashMetaPage) page;
+        HashMetaPage metap = HashPageGetMeta(page);

         if (metap->hashm_magic != HASH_MAGIC)
             ereport(ERROR,
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/heap/pruneheap.c
--- a/src/backend/access/heap/pruneheap.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/heap/pruneheap.c    Mon Aug 11 15:58:27 2008 +0200
@@ -236,7 +236,10 @@
          * Update the page's pd_prune_xid field to either zero, or the lowest
          * XID of any soon-prunable tuple.
          */
-        ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
+        if( prstate.new_prune_xid != InvalidTransactionId)
+            PageSetPrunable(page, prstate.new_prune_xid);
+        else
+            PageClearPrunable(page);

         /*
          * Also clear the "page is full" flag, since there's no point in
@@ -275,10 +278,10 @@
          * point in repeating the prune/defrag process until something else
          * happens to the page.
          */
-        if (((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
+        if (PageGetPrunable(page) != prstate.new_prune_xid ||
             PageIsFull(page))
         {
-            ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
+            PageSetPrunable(page, prstate.new_prune_xid);
             PageClearFull(page);
             SetBufferCommitInfoNeedsSave(buffer);
         }
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/nbtree/nbtinsert.c
--- a/src/backend/access/nbtree/nbtinsert.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/nbtree/nbtinsert.c    Mon Aug 11 15:58:27 2008 +0200
@@ -793,7 +793,7 @@

     rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
     origpage = BufferGetPage(buf);
-    leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
+    leftpage = PageGetTempPage(origpage, false);
     rightpage = BufferGetPage(rbuf);

     _bt_pageinit(leftpage, BufferGetPageSize(buf));
@@ -1115,10 +1115,8 @@
         lastrdata->next = lastrdata + 1;
         lastrdata++;

-        lastrdata->data = (char *) rightpage +
-            ((PageHeader) rightpage)->pd_upper;
-        lastrdata->len = ((PageHeader) rightpage)->pd_special -
-            ((PageHeader) rightpage)->pd_upper;
+        lastrdata->data = PageGetUpperPointer(rightpage);
+        lastrdata->len = PageGetDataSize(rightpage);
         lastrdata->buffer = InvalidBuffer;

         /* Log the right sibling, because we've changed its' prev-pointer. */
@@ -1772,13 +1770,8 @@
         rdata[0].buffer = InvalidBuffer;
         rdata[0].next = &(rdata[1]);

-        /*
-         * Direct access to page is not good but faster - we should implement
-         * some new func in page API.
-         */
-        rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
-        rdata[1].len = ((PageHeader) rootpage)->pd_special -
-            ((PageHeader) rootpage)->pd_upper;
+        rdata[1].data = PageGetUpperPointer(rootpage);
+        rdata[1].len = PageGetDataSize(rootpage);
         rdata[1].buffer = InvalidBuffer;
         rdata[1].next = NULL;

diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/nbtree/nbtpage.c
--- a/src/backend/access/nbtree/nbtpage.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/nbtree/nbtpage.c    Mon Aug 11 15:58:27 2008 +0200
@@ -58,8 +58,7 @@
      * Set pd_lower just past the end of the metadata.    This is not essential
      * but it makes the page look compressible to xlog.c.
      */
-    ((PageHeader) page)->pd_lower =
-        ((char *) metad + sizeof(BTMetaPageData)) - (char *) page;
+    PageSetLower(page, ((char *) metad + sizeof(BTMetaPageData)) - (char *) page);
 }

 /*
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/nbtree/nbtsort.c
--- a/src/backend/access/nbtree/nbtsort.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/nbtree/nbtsort.c    Mon Aug 11 15:58:27 2008 +0200
@@ -249,7 +249,7 @@
     opaque->btpo_cycleid = 0;

     /* Make the P_HIKEY line pointer appear allocated */
-    ((PageHeader) page)->pd_lower += sizeof(ItemIdData);
+    PageReserveLinp(page);

     return page;
 }
@@ -366,7 +366,7 @@
             *previi = *thisii;
             previi = thisii;
         }
-        ((PageHeader) page)->pd_lower -= sizeof(ItemIdData);
+        PageReleaseLinp(page);
     }
 }

@@ -523,7 +523,7 @@
         hii = PageGetItemId(opage, P_HIKEY);
         *hii = *ii;
         ItemIdSetUnused(ii);    /* redundant */
-        ((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+        PageReleaseLinp(opage);

         /*
          * Link the old page into its parent, using its minimum key. If we
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/nbtree/nbtxlog.c
--- a/src/backend/access/nbtree/nbtxlog.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/nbtree/nbtxlog.c    Mon Aug 11 15:58:27 2008 +0200
@@ -180,9 +180,7 @@
      * Set pd_lower just past the end of the metadata.    This is not essential
      * but it makes the page look compressible to xlog.c.
      */
-    ((PageHeader) metapg)->pd_lower =
-        ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
-
+    PageSetLower(metapg, ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg);
     PageSetLSN(metapg, lsn);
     PageSetTLI(metapg, ThisTimeLineID);
     MarkBufferDirty(metabuf);
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/access/transam/xlog.c
--- a/src/backend/access/transam/xlog.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/access/transam/xlog.c    Mon Aug 11 15:58:27 2008 +0200
@@ -1040,15 +1040,10 @@
         if (rdata->buffer_std)
         {
             /* Assume we can omit data between pd_lower and pd_upper */
-            uint16        lower = ((PageHeader) page)->pd_lower;
-            uint16        upper = ((PageHeader) page)->pd_upper;
-
-            if (lower >= SizeOfPageHeaderData &&
-                upper > lower &&
-                upper <= BLCKSZ)
-            {
-                bkpb->hole_offset = lower;
-                bkpb->hole_length = upper - lower;
+            if (PageIsComprimable(page))
+            {
+                bkpb->hole_offset = PageGetLower(page);
+                bkpb->hole_length = PageGetExactFreeSpace(page);
             }
             else
             {
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/commands/sequence.c
--- a/src/backend/commands/sequence.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/commands/sequence.c    Mon Aug 11 15:58:27 2008 +0200
@@ -387,9 +387,8 @@
         rdata[0].buffer = InvalidBuffer;
         rdata[0].next = &(rdata[1]);

-        rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
-        rdata[1].len = ((PageHeader) page)->pd_special -
-            ((PageHeader) page)->pd_upper;
+        rdata[1].data = PageGetUpperPointer(page);
+        rdata[1].len = PageGetDataSize(page);
         rdata[1].buffer = InvalidBuffer;
         rdata[1].next = NULL;

@@ -618,9 +617,8 @@
         seq->is_called = true;
         seq->log_cnt = 0;

-        rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
-        rdata[1].len = ((PageHeader) page)->pd_special -
-            ((PageHeader) page)->pd_upper;
+        rdata[1].data = PageGetUpperPointer(page);
+        rdata[1].len = PageGetDataSize(page);
         rdata[1].buffer = InvalidBuffer;
         rdata[1].next = NULL;

@@ -794,9 +792,8 @@
         seq->is_called = true;
         seq->log_cnt = 0;

-        rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
-        rdata[1].len = ((PageHeader) page)->pd_special -
-            ((PageHeader) page)->pd_upper;
+        rdata[1].data = PageGetUpperPointer(page);
+        rdata[1].len = PageGetDataSize(page);
         rdata[1].buffer = InvalidBuffer;
         rdata[1].next = NULL;

diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/storage/buffer/bufmgr.c
--- a/src/backend/storage/buffer/bufmgr.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/storage/buffer/bufmgr.c    Mon Aug 11 15:58:27 2008 +0200
@@ -356,7 +356,7 @@
             smgrread(smgr, blockNum, (char *) bufBlock);

             /* check for garbage data */
-            if (!PageHeaderIsValid((PageHeader) bufBlock))
+            if (!PageLayoutIsValid((Page) bufBlock))
             {
                 if (zero_damaged_pages)
                 {
diff -r 879ef18ad9cb -r 3e71cf34dced src/backend/storage/page/bufpage.c
--- a/src/backend/storage/page/bufpage.c    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/backend/storage/page/bufpage.c    Mon Aug 11 15:58:27 2008 +0200
@@ -8,15 +8,19 @@
  *
  *
  * IDENTIFICATION
- *      $PostgreSQL: pgsql/src/backend/storage/page/bufpage.c,v 1.79 2008/05/13 15:44:08 momjian Exp $
+ *      $PostgreSQL: pgsql/src/backend/storage/page/bufpage.c,v 1.78 2008/02/10 20:39:08 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"

 #include "access/htup.h"
+#include "access/transam.h"
 #include "storage/bufpage.h"

+
+static bool PageLayoutIsValid_04(Page page);
+bool PageIsZeroed(Page page);

 /* ----------------------------------------------------------------
  *                        Page support functions
@@ -25,12 +29,13 @@

 /*
  * PageInit
- *        Initializes the contents of a page.
+ *        Initializes the contents of a page. We allow to initialize page only
+ *      in latest Page Layout Version.
  */
 void
 PageInit(Page page, Size pageSize, Size specialSize)
 {
-    PageHeader    p = (PageHeader) page;
+    PageHeader_04    p = (PageHeader_04) page;

     specialSize = MAXALIGN(specialSize);

@@ -50,7 +55,7 @@


 /*
- * PageHeaderIsValid
+ * PageLayoutIsValid
  *        Check that the header fields of a page appear valid.
  *
  * This is called when a page has just been read in from disk.    The idea is
@@ -68,23 +73,26 @@
  * will clean up such a page and make it usable.
  */
 bool
-PageHeaderIsValid(PageHeader page)
+PageLayoutIsValid(Page page)
+{
+    /* Check normal case */
+    switch(PageGetPageLayoutVersion(page))
+    {
+        case 4 : return(PageLayoutIsValid_04(page));
+        case 0 : return(PageIsZeroed(page));
+    }
+    return false;
+}
+
+/*
+ * Check all-zeroes case
+ */
+bool
+PageIsZeroed(Page page)
 {
     char       *pagebytes;
     int            i;

-    /* Check normal case */
-    if (PageGetPageSize(page) == BLCKSZ &&
-        PageGetPageLayoutVersion(page) == PG_PAGE_LAYOUT_VERSION &&
-        (page->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
-        page->pd_lower >= SizeOfPageHeaderData &&
-        page->pd_lower <= page->pd_upper &&
-        page->pd_upper <= page->pd_special &&
-        page->pd_special <= BLCKSZ &&
-        page->pd_special == MAXALIGN(page->pd_special))
-        return true;
-
-    /* Check all-zeroes case */
     pagebytes = (char *) page;
     for (i = 0; i < BLCKSZ; i++)
     {
@@ -92,6 +100,21 @@
             return false;
     }
     return true;
+}
+
+bool PageLayoutIsValid_04(Page page)
+{
+    PageHeader_04 phdr = (PageHeader_04)page;
+     if(
+         PageGetPageSize(page) == BLCKSZ &&
+        (phdr->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
+        phdr->pd_lower >= SizeOfPageHeaderData &&
+        phdr->pd_lower <= phdr->pd_upper &&
+        phdr->pd_upper <= phdr->pd_special &&
+        phdr->pd_special <= BLCKSZ &&
+        phdr->pd_special == MAXALIGN(phdr->pd_special))
+        return true;
+    return false;
 }


@@ -123,7 +146,7 @@
             bool overwrite,
             bool is_heap)
 {
-    PageHeader    phdr = (PageHeader) page;
+    PageHeader_04    phdr = (PageHeader_04) page;
     Size        alignedSize;
     int            lower;
     int            upper;
@@ -131,6 +154,9 @@
     OffsetNumber limit;
     bool        needshuffle = false;

+    /* We allow add new items only on the new page layout - TODO indexes? */
+    if( PageGetPageLayoutVersion(page) != PG_PAGE_LAYOUT_VERSION )
+        elog(PANIC, "Add item on old page layout version is forbidden.");
     /*
      * Be wary about corrupted page pointers
      */
@@ -156,7 +182,7 @@
         {
             if (offsetNumber < limit)
             {
-                itemId = PageGetItemId(phdr, offsetNumber);
+                itemId = PageGetItemId(page, offsetNumber);
                 if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
                 {
                     elog(WARNING, "will not overwrite a used ItemId");
@@ -174,7 +200,7 @@
     {
         /* offsetNumber was not passed in, so find a free slot */
         /* if no free slot, we'll put it at limit (1st open slot) */
-        if (PageHasFreeLinePointers(phdr))
+        if (PageHasFreeLinePointers(page))
         {
             /*
              * Look for "recyclable" (unused) ItemId.  We check for no storage
@@ -183,14 +209,14 @@
              */
             for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
             {
-                itemId = PageGetItemId(phdr, offsetNumber);
+                itemId = PageGetItemId(page, offsetNumber);
                 if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
                     break;
             }
             if (offsetNumber >= limit)
             {
                 /* the hint is wrong, so reset it */
-                PageClearHasFreeLinePointers(phdr);
+                PageClearHasFreeLinePointers(page);
             }
         }
         else
@@ -233,7 +259,7 @@
     /*
      * OK to insert the item.  First, shuffle the existing pointers if needed.
      */
-    itemId = PageGetItemId(phdr, offsetNumber);
+    itemId = PageGetItemId(page, offsetNumber);

     if (needshuffle)
         memmove(itemId + 1, itemId,
@@ -254,31 +280,47 @@

 /*
  * PageGetTempPage
- *        Get a temporary page in local memory for special processing
+ *        Get a temporary page in local memory for special processing. Caller is
+ * responsible to init page.
  */
 Page
-PageGetTempPage(Page page, Size specialSize)
+PageGetTempPage(Page page, bool copy)
+{
+    Size        pageSize;
+    Size        size;
+    Page        temp;
+
+    pageSize = PageGetPageSize(page);
+    temp = (Page) palloc(pageSize);
+
+    if( copy )
+    {
+        memcpy(temp, page, pageSize);
+    }
+
+    return temp;
+}
+
+/*
+ * PageGetTempPage
+ *        Get a temporary page in local memory for special processing
+ * XXX: only consumer now is gistplacetopage(), so maybe we don't even want it
+ */
+Page
+PageGetTempPageCopySpecial(Page page)
 {
     Size        pageSize;
     Page        temp;
-    PageHeader    thdr;

     pageSize = PageGetPageSize(page);
     temp = (Page) palloc(pageSize);
-    thdr = (PageHeader) temp;

-    /* copy old page in */
-    memcpy(temp, page, pageSize);
-
-    /* set high, low water marks */
-    thdr->pd_lower = SizeOfPageHeaderData;
-    thdr->pd_upper = pageSize - MAXALIGN(specialSize);
-
-    /* clear out the middle */
-    MemSet((char *) temp + thdr->pd_lower, 0, thdr->pd_upper - thdr->pd_lower);
+    PageInit(temp, pageSize, PageGetSpecialSize(page));
+    memcpy(PageGetSpecialPointer(temp), PageGetSpecialPointer(page), PageGetSpecialSize(page));

     return temp;
 }
+

 /*
  * PageRestoreTempPage
@@ -329,9 +371,9 @@
 void
 PageRepairFragmentation(Page page)
 {
-    Offset        pd_lower = ((PageHeader) page)->pd_lower;
-    Offset        pd_upper = ((PageHeader) page)->pd_upper;
-    Offset        pd_special = ((PageHeader) page)->pd_special;
+    Offset        pd_lower = PageGetLower(page);
+    Offset        pd_upper = PageGetUpper(page);
+    Offset        pd_special = PageGetSpecial(page);
     itemIdSort    itemidbase,
                 itemidptr;
     ItemId        lp;
@@ -380,7 +422,7 @@
     if (nstorage == 0)
     {
         /* Page is completely empty, so just reset it quickly */
-        ((PageHeader) page)->pd_upper = pd_special;
+        PageSetUpper(page, pd_special);
     }
     else
     {                            /* nstorage != 0 */
@@ -430,7 +472,7 @@
             lp->lp_off = upper;
         }

-        ((PageHeader) page)->pd_upper = upper;
+        PageSetUpper(page, upper);

         pfree(itemidbase);
     }
@@ -459,8 +501,7 @@
      * Use signed arithmetic here so that we behave sensibly if pd_lower >
      * pd_upper.
      */
-    space = (int) ((PageHeader) page)->pd_upper -
-        (int) ((PageHeader) page)->pd_lower;
+    space = PageGetExactFreeSpace(page);

     if (space < (int) sizeof(ItemIdData))
         return 0;
@@ -483,8 +524,7 @@
      * Use signed arithmetic here so that we behave sensibly if pd_lower >
      * pd_upper.
      */
-    space = (int) ((PageHeader) page)->pd_upper -
-        (int) ((PageHeader) page)->pd_lower;
+    space = (int)PageGetUpper(page) - (int)PageGetLower(page);

     if (space < 0)
         return 0;
@@ -524,7 +564,7 @@
         nline = PageGetMaxOffsetNumber(page);
         if (nline >= MaxHeapTuplesPerPage)
         {
-            if (PageHasFreeLinePointers((PageHeader) page))
+            if (PageHasFreeLinePointers(page))
             {
                 /*
                  * Since this is just a hint, we must confirm that there is
@@ -571,7 +611,7 @@
 void
 PageIndexTupleDelete(Page page, OffsetNumber offnum)
 {
-    PageHeader    phdr = (PageHeader) page;
+    PageHeader_04    phdr = (PageHeader_04) page; /* TODO PGU */
     char       *addr;
     ItemId        tup;
     Size        size;
@@ -656,7 +696,7 @@
         nline--;                /* there's one less than when we started */
         for (i = 1; i <= nline; i++)
         {
-            ItemId        ii = PageGetItemId(phdr, i);
+            ItemId        ii = PageGetItemId(page, i);

             Assert(ItemIdHasStorage(ii));
             if (ItemIdGetOffset(ii) <= offset)
@@ -677,7 +717,7 @@
 void
 PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 {
-    PageHeader    phdr = (PageHeader) page;
+    PageHeader_04    phdr = (PageHeader_04) page; /* TODO PGU */
     Offset        pd_lower = phdr->pd_lower;
     Offset        pd_upper = phdr->pd_upper;
     Offset        pd_special = phdr->pd_special;
@@ -797,3 +837,371 @@

     pfree(itemidbase);
 }
+
+
+
+/*
+ * PageGetItemId
+ *        Returns an item identifier of a page.
+ */
+ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
+{
+    AssertMacro(offsetNumber > 0);
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return (ItemId) (& ((PageHeader_04) page)->pd_linp[(offsetNumber) - 1]) ;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetItemId.");
+}
+
+/*
+ * PageGetContents
+ *        To be used in case the page does not contain item pointers.
+ */
+Pointer PageGetContents(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return (Pointer) (&((PageHeader_04) (page))->pd_linp[0]);
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetContents.");
+}
+
+/* ----------------
+ *        page special data macros
+ * ----------------
+ */
+/*
+ * PageGetSpecialSize
+ *        Returns size of special space on a page.
+ */
+Size PageGetSpecialSize(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return (Size) PageGetPageSize(page) - ((PageHeader_04)(page))->pd_special;
+
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetSpecialSize.");
+}
+
+Size PageGetDataSize(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return (Size) ((PageHeader_04)(page))->pd_special - ((PageHeader_04)(page))->pd_upper;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetDataSize.");
+}
+
+/*
+ * PageGetSpecialPointer
+ *        Returns pointer to special space on a page.
+ */
+Pointer PageGetSpecialPointer(Page page)
+{
+    AssertMacro(PageIsValid(page));
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return page + ((PageHeader_04)(page))->pd_special;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetSpecialPointer.");
+}
+
+/*
+ * PageGetUpperPointer
+ *        Returns pointer to first data (upper) on a page.
+ */
+Pointer PageGetUpperPointer(Page page)
+{
+    AssertMacro(PageIsValid(page));
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return page + ((PageHeader_04)(page))->pd_upper;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetUpperPointer.");
+}
+
+void PageSetLower(Page page, LocationIndex lower)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) page)->pd_lower = lower;
+                break;
+        default:  elog(PANIC, "Unsupported page layout in function PageSetLower.");
+    }
+}
+
+void PageSetUpper(Page page, LocationIndex upper)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) page)->pd_upper = upper;
+                break;
+        default:  elog(PANIC, "Unsupported page layout in function PageSetLower.");
+    }
+}
+
+
+/*
+ * PageGetItem
+ *        Retrieves an item on the given page.
+ *
+ * Note:
+ *        This does not change the status of any of the resources passed.
+ *        The semantics may change in the future.
+ */
+Item PageGetItem(Page page, ItemId itemId)
+{
+    AssertMacro(PageIsValid(page));
+    AssertMacro(ItemIdHasStorage(itemId));
+    switch ( PageGetPageLayoutVersion(page) ) /* TODO not necessary */
+    {
+        case 4 : return (Item) (page + ItemIdGetOffset(itemId));
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetItem.");
+}
+
+
+void PageReserveLinp(Page page)
+{
+    AssertMacro(PageIsValid(page));
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) page)->pd_lower += sizeof(ItemIdData);
+                AssertMacro(((PageHeader_04) page)->pd_lower <= ((PageHeader_04) page)->pd_upper );
+                break;
+        default: elog(PANIC, "Unsupported page layout in function PageReserveLinp.");
+    }
+}
+
+void PageReleaseLinp(Page page)
+{
+    AssertMacro(PageIsValid(page));
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) page)->pd_lower -= sizeof(ItemIdData);
+                AssertMacro(((PageHeader_04) page)->pd_lower >= SizeOfPageHeaderData);
+                break;
+        default: elog(PANIC, "Unsupported page layout in function PageReleaseLinp.");
+    }
+}
+
+
+/*
+ * PageGetMaxOffsetNumber
+ *        Returns the maximum offset number used by the given page.
+ *        Since offset numbers are 1-based, this is also the number
+ *        of items on the page.
+ *
+ *        NOTE: if the page is not initialized (pd_lower == 0), we must
+ *        return zero to ensure sane behavior.  Accept double evaluation
+ *        of the argument so that we can ensure this.
+ */
+int PageGetMaxOffsetNumber(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : {
+                    PageHeader_04 header = (PageHeader_04) (page);
+                     return header->pd_lower <= SizeOfPageHeaderData ? 0 :
+                     (header->pd_lower - SizeOfPageHeaderData) / sizeof(ItemIdData);
+                 }
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetMaxOffsetNumber.");
+    return 0;
+}
+
+/*
+ * Additional macros for access to page headers
+ */
+XLogRecPtr PageGetLSN(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) page)->pd_lsn;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetLSN.");
+}
+
+LocationIndex PageGetLower(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) page)->pd_lower;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetLower.");
+    return 0;
+}
+
+LocationIndex PageGetUpper(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) page)->pd_upper;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetUpper.");
+    return 0;
+}
+
+LocationIndex PageGetSpecial(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) page)->pd_special;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetUpper.");
+    return 0;
+}
+
+
+void PageSetLSN(Page page, XLogRecPtr lsn)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) page)->pd_lsn = lsn;
+                break;
+        default: elog(PANIC, "Unsupported page layout in function PageSetLSN.");
+    }
+}
+
+/* NOTE: only the 16 least significant bits are stored */
+TimeLineID PageGetTLI(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) (page))->pd_tli;
+    }
+    elog(PANIC, "Unsupported page layout in function PageGetTLI.");
+}
+
+void PageSetTLI(Page page, TimeLineID tli)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) (page))->pd_tli = (uint16) (tli);
+                break;
+        default: elog(PANIC, "Unsupported page layout in function PageSetTLI.");
+    }
+}
+
+bool PageHasFreeLinePointers(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) (page))->pd_flags & PD_HAS_FREE_LINES;
+        default: return false;
+    }
+}
+
+void PageSetHasFreeLinePointers(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) (page))->pd_flags |= PD_HAS_FREE_LINES;
+                 break;
+        default: elog(PANIC, "HasFreeLinePointers is not supported on page layout version %i",
+                PageGetPageLayoutVersion(page));
+    }
+}
+
+void PageClearHasFreeLinePointers(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) (page))->pd_flags &= ~PD_HAS_FREE_LINES;
+                  break;
+        default: elog(PANIC, "HasFreeLinePointers is not supported on page layout version %i",
+                PageGetPageLayoutVersion(page));
+    }
+}
+
+bool PageIsFull(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) (page))->pd_flags & PD_PAGE_FULL;
+    }
+    return true; /* no space on old data page */
+}
+
+void PageSetFull(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) (page))->pd_flags |= PD_PAGE_FULL;
+                  break;
+        default: elog(PANIC, "PageSetFull is not supported on page layout version %i",
+                PageGetPageLayoutVersion(page));
+    }
+}
+
+
+void PageClearFull(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) (page))->pd_flags &= ~PD_PAGE_FULL;
+                  break;
+        default: elog(PANIC, "PageClearFull is not supported on page layout version %i",
+                PageGetPageLayoutVersion(page));
+    }
+}
+
+bool PageIsPrunable(Page page, TransactionId oldestxmin)
+{
+    AssertMacro(TransactionIdIsNormal(oldestxmin));
+    return (
+        TransactionIdIsValid(((PageHeader_04) page)->pd_prune_xid) &&
+        TransactionIdPrecedes(((PageHeader_04) page)->pd_prune_xid, oldestxmin) );
+}
+
+TransactionId PageGetPrunable(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : return ((PageHeader_04) page)->pd_prune_xid;
+    }
+    elog(PANIC, "PageGetPrunable is not supported on page layout version %i",
+        PageGetPageLayoutVersion(page));
+}
+
+void PageSetPrunable(Page page, TransactionId xid)
+{
+    Assert(TransactionIdIsNormal(xid));
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : if (!TransactionIdIsValid(((PageHeader_04) (page))->pd_prune_xid) ||
+                TransactionIdPrecedes(xid, ((PageHeader_04) (page))->pd_prune_xid))
+                ((PageHeader_04) (page))->pd_prune_xid = (xid);
+                break;
+        default: elog(PANIC, "PageSetPrunable is not supported on page layout version %i",
+                PageGetPageLayoutVersion(page));
+    }
+}
+
+void PageClearPrunable(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : ((PageHeader_04) page)->pd_prune_xid = InvalidTransactionId;
+                break;
+        default: elog(PANIC, "PageClearPrunable is not supported on page layout version %i",
+                PageGetPageLayoutVersion(page));
+    }
+}
+
+bool PageIsComprimable(Page page)
+{
+    switch ( PageGetPageLayoutVersion(page) )
+    {
+        case 4 : {
+                    PageHeader_04 ph = (PageHeader_04) page;
+                    return(ph->pd_lower >= SizeOfPageHeaderData &&
+                        ph->pd_upper > ph->pd_lower &&
+                        ph->pd_upper <= BLCKSZ);
+                  }
+        default: elog(PANIC, "IsComprimable is not supported on page layout version %i",
+                PageGetPageLayoutVersion(page));
+    }
+}
diff -r 879ef18ad9cb -r 3e71cf34dced src/include/access/gin.h
--- a/src/include/access/gin.h    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/include/access/gin.h    Mon Aug 11 15:58:27 2008 +0200
@@ -246,7 +246,6 @@
 extern Datum *extractEntriesS(GinState *ginstate, OffsetNumber attnum, Datum value,
                 int32 *nentries, bool *needUnique);
 extern Datum *extractEntriesSU(GinState *ginstate, OffsetNumber attnum, Datum value, int32 *nentries);
-extern Page GinPageGetCopyPage(Page page);

 extern Datum gin_index_getattr(GinState *ginstate, IndexTuple tuple);
 extern OffsetNumber gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple);
diff -r 879ef18ad9cb -r 3e71cf34dced src/include/access/hash.h
--- a/src/include/access/hash.h    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/include/access/hash.h    Mon Aug 11 15:58:27 2008 +0200
@@ -138,7 +138,7 @@

 typedef struct HashMetaPageData
 {
-    PageHeaderData hashm_phdr;    /* pad for page header (do not use) */
+    PageHeaderData_04 hashm_phdr;  /* pad for page header (do not use) */
     uint32        hashm_magic;    /* magic no. for hash tables */
     uint32        hashm_version;    /* version ID */
     double        hashm_ntuples;    /* number of tuples stored in the table */
@@ -191,8 +191,20 @@
 #define BMPGSZ_BIT(metap)        ((metap)->hashm_bmsize << BYTE_TO_BIT)
 #define BMPG_SHIFT(metap)        ((metap)->hashm_bmshift)
 #define BMPG_MASK(metap)        (BMPGSZ_BIT(metap) - 1)
-#define HashPageGetBitmap(pg) \
-    ((uint32 *) (((char *) (pg)) + MAXALIGN(sizeof(PageHeaderData))))
+/*
+ * Note: Hash bitmap and metapage could be placed on MAXALIGN(SizeOfPageHeaderData),
+ * but for backward compatibility (in-place-upgrade) we waste ~4 bytes.
+ */
+#define HashPageGetBitmap(page) \
+    ((uint32 *) (((char *) (page)) + MAXALIGN(SizeOfPageHeaderData + sizeof(ItemIdData) )))
+
+#define HashGetMaxBitmapSize(page) \
+    ( PageGetPageSize((Page) page)- \
+    ( MAXALIGN(SizeOfPageHeaderData + sizeof(ItemIdData)) + \
+      MAXALIGN(sizeof(HashPageOpaqueData))) )
+
+#define HashPageGetMeta(page) \
+ ((HashMetaPage)(page))  /* TODO check if page has layout 4 */

 /*
  * The number of bits in an ovflpage bitmap word.
diff -r 879ef18ad9cb -r 3e71cf34dced src/include/storage/bufpage.h
--- a/src/include/storage/bufpage.h    Tue Aug 05 21:28:29 2008 +0000
+++ b/src/include/storage/bufpage.h    Mon Aug 11 15:58:27 2008 +0200
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/bufpage.h,v 1.82 2008/07/13 21:50:04 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/storage/bufpage.h,v 1.78 2008/05/12 00:00:53 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -119,7 +119,7 @@
  * On the high end, we can only support pages up to 32KB because lp_off/lp_len
  * are 15 bits.
  */
-typedef struct PageHeaderData
+typedef struct PageHeaderData_04
 {
     /* XXX LSN is member of *any* block, not only page-organized ones */
     XLogRecPtr    pd_lsn;            /* LSN: next byte after last byte of xlog
@@ -133,9 +133,9 @@
     uint16        pd_pagesize_version;
     TransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */
     ItemIdData    pd_linp[1];        /* beginning of line pointer array */
-} PageHeaderData;
+} PageHeaderData_04;

-typedef PageHeaderData *PageHeader;
+typedef PageHeaderData_04 *PageHeader_04;

 /*
  * pd_flags contains the following flag bits.  Undefined bits are initialized
@@ -179,40 +179,22 @@
 #define PageIsValid(page) PointerIsValid(page)

 /*
- * line pointer(s) do not count as part of header
+ * line pointer does not count as part of header
  */
-#define SizeOfPageHeaderData (offsetof(PageHeaderData, pd_linp))
+#define SizeOfPageHeaderData (offsetof(PageHeaderData_04, pd_linp[0]))

 /*
  * PageIsEmpty
  *        returns true iff no itemid has been allocated on the page
  */
 #define PageIsEmpty(page) \
-    (((PageHeader) (page))->pd_lower <= SizeOfPageHeaderData)
+    (((PageHeader_04) (page))->pd_lower <= SizeOfPageHeaderData)

 /*
  * PageIsNew
  *        returns true iff page has not been initialized (by PageInit)
  */
-#define PageIsNew(page) (((PageHeader) (page))->pd_upper == 0)
-
-/*
- * PageGetItemId
- *        Returns an item identifier of a page.
- */
-#define PageGetItemId(page, offsetNumber) \
-    ((ItemId) (&((PageHeader) (page))->pd_linp[(offsetNumber) - 1]))
-
-/*
- * PageGetContents
- *        To be used in case the page does not contain item pointers.
- *
- * Note: prior to 8.3 this was not guaranteed to yield a MAXALIGN'd result.
- * Now it is.  Beware of old code that might think the offset to the contents
- * is just SizeOfPageHeaderData rather than MAXALIGN(SizeOfPageHeaderData).
- */
-#define PageGetContents(page) \
-    ((char *) (page) + MAXALIGN(SizeOfPageHeaderData))
+#define PageIsNew(page) (((PageHeader_04) (page))->pd_upper == 0)

 /* ----------------
  *        macros to access page size info
@@ -234,14 +216,14 @@
  * however, it can be called on a page that is not stored in a buffer.
  */
 #define PageGetPageSize(page) \
-    ((Size) (((PageHeader) (page))->pd_pagesize_version & (uint16) 0xFF00))
+    ((Size) (((PageHeader_04) (page))->pd_pagesize_version & (uint16) 0xFF00))

 /*
  * PageGetPageLayoutVersion
  *        Returns the page layout version of a page.
  */
 #define PageGetPageLayoutVersion(page) \
-    (((PageHeader) (page))->pd_pagesize_version & 0x00FF)
+    (((PageHeader_04) (page))->pd_pagesize_version & 0x00FF)

 /*
  * PageSetPageSizeAndVersion
@@ -254,120 +236,64 @@
 ( \
     AssertMacro(((size) & 0xFF00) == (size)), \
     AssertMacro(((version) & 0x00FF) == (version)), \
-    ((PageHeader) (page))->pd_pagesize_version = (size) | (version) \
+    ((PageHeader_04) (page))->pd_pagesize_version = (size) | (version) \
 )

-/* ----------------
- *        page special data macros
- * ----------------
- */
-/*
- * PageGetSpecialSize
- *        Returns size of special space on a page.
- */
-#define PageGetSpecialSize(page) \
-    ((uint16) (PageGetPageSize(page) - ((PageHeader)(page))->pd_special))
-
-/*
- * PageGetSpecialPointer
- *        Returns pointer to special space on a page.
- */
-#define PageGetSpecialPointer(page) \
-( \
-    AssertMacro(PageIsValid(page)), \
-    (char *) ((char *) (page) + ((PageHeader) (page))->pd_special) \
-)
-
-/*
- * PageGetItem
- *        Retrieves an item on the given page.
- *
- * Note:
- *        This does not change the status of any of the resources passed.
- *        The semantics may change in the future.
- */
-#define PageGetItem(page, itemId) \
-( \
-    AssertMacro(PageIsValid(page)), \
-    AssertMacro(ItemIdHasStorage(itemId)), \
-    (Item)(((char *)(page)) + ItemIdGetOffset(itemId)) \
-)
-
-/*
- * PageGetMaxOffsetNumber
- *        Returns the maximum offset number used by the given page.
- *        Since offset numbers are 1-based, this is also the number
- *        of items on the page.
- *
- *        NOTE: if the page is not initialized (pd_lower == 0), we must
- *        return zero to ensure sane behavior.  Accept double evaluation
- *        of the argument so that we can ensure this.
- */
-#define PageGetMaxOffsetNumber(page) \
-    (((PageHeader) (page))->pd_lower <= SizeOfPageHeaderData ? 0 : \
-     ((((PageHeader) (page))->pd_lower - SizeOfPageHeaderData) \
-      / sizeof(ItemIdData)))
-
-/*
- * Additional macros for access to page headers
- */
-#define PageGetLSN(page) \
-    (((PageHeader) (page))->pd_lsn)
-#define PageSetLSN(page, lsn) \
-    (((PageHeader) (page))->pd_lsn = (lsn))
-
-/* NOTE: only the 16 least significant bits are stored */
-#define PageGetTLI(page) \
-    (((PageHeader) (page))->pd_tli)
-#define PageSetTLI(page, tli) \
-    (((PageHeader) (page))->pd_tli = (uint16) (tli))
-
-#define PageHasFreeLinePointers(page) \
-    (((PageHeader) (page))->pd_flags & PD_HAS_FREE_LINES)
-#define PageSetHasFreeLinePointers(page) \
-    (((PageHeader) (page))->pd_flags |= PD_HAS_FREE_LINES)
-#define PageClearHasFreeLinePointers(page) \
-    (((PageHeader) (page))->pd_flags &= ~PD_HAS_FREE_LINES)
-
-#define PageIsFull(page) \
-    (((PageHeader) (page))->pd_flags & PD_PAGE_FULL)
-#define PageSetFull(page) \
-    (((PageHeader) (page))->pd_flags |= PD_PAGE_FULL)
-#define PageClearFull(page) \
-    (((PageHeader) (page))->pd_flags &= ~PD_PAGE_FULL)
-
-#define PageIsPrunable(page, oldestxmin) \
-( \
-    AssertMacro(TransactionIdIsNormal(oldestxmin)), \
-    TransactionIdIsValid(((PageHeader) (page))->pd_prune_xid) && \
-    TransactionIdPrecedes(((PageHeader) (page))->pd_prune_xid, oldestxmin) \
-)
-#define PageSetPrunable(page, xid) \
-do { \
-    Assert(TransactionIdIsNormal(xid)); \
-    if (!TransactionIdIsValid(((PageHeader) (page))->pd_prune_xid) || \
-        TransactionIdPrecedes(xid, ((PageHeader) (page))->pd_prune_xid)) \
-        ((PageHeader) (page))->pd_prune_xid = (xid); \
-} while (0)
-#define PageClearPrunable(page) \
-    (((PageHeader) (page))->pd_prune_xid = InvalidTransactionId)


 /* ----------------------------------------------------------------
  *        extern declarations
  * ----------------------------------------------------------------
  */
+extern Pointer PageGetContents(Page page);
+extern Pointer PageGetSpecialPointer(Page page);
+extern Pointer PageGetUpperPointer(Page page);

-extern void PageInit(Page page, Size pageSize, Size specialSize);
-extern bool PageHeaderIsValid(PageHeader page);
-extern OffsetNumber PageAddItem(Page page, Item item, Size size,
-            OffsetNumber offsetNumber, bool overwrite, bool is_heap);
-extern Page PageGetTempPage(Page page, Size specialSize);
+extern ItemId PageGetItemId(Page page, OffsetNumber offsetNumber);
+extern Item PageGetItem(Page page, ItemId itemId);
+extern int PageGetMaxOffsetNumber(Page page);
+
+extern XLogRecPtr PageGetLSN(Page page);
+extern void PageSetLSN(Page page, XLogRecPtr lsn);
+extern TimeLineID PageGetTLI(Page page);
+extern void PageSetTLI(Page page, TimeLineID tli);
+
+extern bool PageHasFreeLinePointers(Page page);
+extern void PageSetHasFreeLinePointers(Page page);
+extern void PageClearHasFreeLinePointers(Page page);
+extern bool PageIsFull(Page page);
+extern void PageSetFull(Page page);
+extern void PageClearFull(Page page);
+extern bool PageIsPrunable(Page page, TransactionId oldestxmin);
+extern void PageSetPrunable(Page page, TransactionId xid);
+extern TransactionId PageGetPrunable(Page page);
+extern void PageClearPrunable(Page page);
+extern bool PageIsComprimable(Page page);
+
+extern void PageReserveLinp(Page page);
+extern void PageReleaseLinp(Page page);
+
+extern LocationIndex PageGetLower(Page page);
+extern LocationIndex PageGetUpper(Page page);
+extern LocationIndex PageGetSpecial(Page page);
+extern void PageSetLower(Page page, LocationIndex lower);
+extern void PageSetUpper(Page page, LocationIndex lower);
+
+extern Page PageGetTempPage(Page page, bool copy);
+extern Page PageGetTempPageCopySpecial(Page page);
 extern void PageRestoreTempPage(Page tempPage, Page oldPage);
-extern void PageRepairFragmentation(Page page);
+
 extern Size PageGetFreeSpace(Page page);
 extern Size PageGetExactFreeSpace(Page page);
 extern Size PageGetHeapFreeSpace(Page page);
+extern Size PageGetSpecialSize(Page page);
+extern Size PageGetDataSize(Page page);
+
+extern void PageInit(Page page, Size pageSize, Size specialSize);
+extern bool PageLayoutIsValid(Page page);
+extern OffsetNumber PageAddItem(Page page, Item item, Size size,
+            OffsetNumber offsetNumber, bool overwrite, bool is_heap);
+extern void PageRepairFragmentation(Page page);
 extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
 extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);


Re: WIP: New Page API

From
Markus Wanner
Date:
Hi,

Zdenek Kotala wrote:
> I finished first prototype of new page API.

This might seem obvious to you, but could you please describe the 
problem you are trying to solve with this new page API? How does it 
relate to the relation forks, that have just been committed?

Regards

Markus



Re: WIP: New Page API

From
Zdenek Kotala
Date:
Markus Wanner napsal(a):
> Hi,
> 
> Zdenek Kotala wrote:
>> I finished first prototype of new page API.
> 
> This might seem obvious to you, but could you please describe the 
> problem you are trying to solve with this new page API? How does it 
> relate to the relation forks, that have just been committed?
> 

Hi Markus,

It is not related to fork maps. The idea is to learn PostgreSQL process to read 
old data page structures. It is part of in-place upgrade and it was discussed on 
PGCon this year. You can see more info on

http://wiki.postgresql.org/wiki/In-place_upgrade
http://www.pgcon.org/2008/schedule/events/87.en.html

This is prototype of first part. Multi versions tuple processing and tuple size 
limits are next part.

    Zdenek


Re: WIP: New Page API

From
Markus Wanner
Date:
Hi Zdenek,

Zdenek Kotala wrote:
> It is not related to fork maps. The idea is to learn PostgreSQL process 
> to read old data page structures. It is part of in-place upgrade and it 
> was discussed on PGCon this year. You can see more info on
> 
> http://wiki.postgresql.org/wiki/In-place_upgrade
> http://www.pgcon.org/2008/schedule/events/87.en.html

Ah, that's where the idea is coming from. Thanks for the pointers.

Regards

Markus Wanner