Re: Memory leak in GIN index build - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Memory leak in GIN index build
Date
Msg-id 23849.1460989984@sss.pgh.pa.us
Whole thread Raw
In response to Re: Memory leak in GIN index build  (Julien Rouhaud <julien.rouhaud@dalibo.com>)
Responses Re: Memory leak in GIN index build  (Julien Rouhaud <julien.rouhaud@dalibo.com>)
List pgsql-hackers
Julien Rouhaud <julien.rouhaud@dalibo.com> writes:
> On 16/04/2016 20:45, Tom Lane wrote:
>> I think this needs to be redesigned so that the critical section and WAL
>> insertion calls all happen within a single straight-line piece of code.
>>
>> We could try making that place be ginPlaceToPage().  So far as
>> registerLeafRecompressWALData is concerned, that does not look that hard;
>> it could palloc and fill the WAL-data buffer the same as it's doing now,
>> then pass it back up to be registered (and eventually pfreed) in
>> ginPlaceToPage.  However, that would also mean postponing the call of
>> dataPlaceToPageLeafRecompress(), which seems much messier.  I assume
>> the data it's working from is mostly in the tmpCtx that
>> dataPlaceToPageLeaf wants to free before returning.  Maybe we could
>> move creation/destruction of the tmpCtx out to ginPlaceToPage, as well?
>>
>> The other line of thought is to move the WAL work that ginPlaceToPage
>> does down into the subroutines.  That would probably result in some
>> duplication of code, but it might end up being a cleaner solution.

> I'm not really confident, but I'll give a try.  Probably with your
> second solution which looks easier.

I poked at this over the weekend, and got more unhappy the more I poked.
Aside from the memory leakage issue, there are multiple coding-rule
violations besides the one you noted about scope of the critical sections.
One example is that in the page-split path for an inner page, we modify
the contents of childbuf long before getting into the critical section.
The number of out-of-date comments was depressingly large as well.

I ended up deciding that the most reasonable fix was along the lines of
my first alternative above.  In the attached draft patches, the
"placeToPage" method is split into two methods, "beginPlaceToPage" and
"execPlaceToPage", where the second method is what's supposed to happen
inside the critical section for a non-page-splitting update.  Nothing
that's done inside beginPlaceToPage is critical.

I've tested this to the extent of verifying that it passes make
check-world and stops the memory leak in your test case, but it could use
more eyeballs on it.

Attached are draft patches against HEAD and 9.5 (they're the same except
for BufferGetPage calls).  I think we'd also want to back-patch to 9.4,
but that will take considerable additional work because of the XLOG API
rewrite that happened in 9.5.  I also noted that some of the coding-rule
violations seem to be new in 9.5, so the problems may be less severe in
9.4 --- the memory leak definitely exists there, though.

            regards, tom lane

diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index e593b2b..fd36ecf 100644
*** a/src/backend/access/gin/ginbtree.c
--- b/src/backend/access/gin/ginbtree.c
***************
*** 17,22 ****
--- 17,23 ----
  #include "access/gin_private.h"
  #include "access/xloginsert.h"
  #include "miscadmin.h"
+ #include "utils/memutils.h"
  #include "utils/rel.h"

  static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
*************** ginFindParents(GinBtree btree, GinBtreeS
*** 312,326 ****
   * Insert a new item to a page.
   *
   * Returns true if the insertion was finished. On false, the page was split and
!  * the parent needs to be updated. (a root split returns true as it doesn't
!  * need any further action by the caller to complete)
   *
   * When inserting a downlink to an internal page, 'childbuf' contains the
   * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
!  * atomically with the insert. Also, the existing item at the given location
!  * is updated to point to 'updateblkno'.
   *
   * stack->buffer is locked on entry, and is kept locked.
   */
  static bool
  ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
--- 313,328 ----
   * Insert a new item to a page.
   *
   * Returns true if the insertion was finished. On false, the page was split and
!  * the parent needs to be updated. (A root split returns true as it doesn't
!  * need any further action by the caller to complete.)
   *
   * When inserting a downlink to an internal page, 'childbuf' contains the
   * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
!  * atomically with the insert. Also, the existing item at offset stack->off
!  * in the target page is updated to point to updateblkno.
   *
   * stack->buffer is locked on entry, and is kept locked.
+  * Likewise for childbuf, if given.
   */
  static bool
  ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 329,339 ****
--- 331,358 ----
  {
      Page        page = BufferGetPage(stack->buffer, NULL, NULL,
                                       BGP_NO_SNAPSHOT_TEST);
+     bool        result;
      GinPlaceToPageRC rc;
      uint16        xlflags = 0;
      Page        childpage = NULL;
      Page        newlpage = NULL,
                  newrpage = NULL;
+     void       *ptp_workspace = NULL;
+     MemoryContext tmpCxt;
+     MemoryContext oldCxt;
+
+     /*
+      * We do all the work of this function and its subfunctions in a temporary
+      * memory context.  This avoids leakages and simplifies APIs, since some
+      * subfunctions allocate storage that has to survive until we've finished
+      * the WAL insertion.
+      */
+     tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
+                                    "ginPlaceToPage temporary context",
+                                    ALLOCSET_DEFAULT_MINSIZE,
+                                    ALLOCSET_DEFAULT_INITSIZE,
+                                    ALLOCSET_DEFAULT_MAXSIZE);
+     oldCxt = MemoryContextSwitchTo(tmpCxt);

      if (GinPageIsData(page))
          xlflags |= GIN_INSERT_ISDATA;
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 351,390 ****
      }

      /*
!      * Try to put the incoming tuple on the page. placeToPage will decide if
!      * the page needs to be split.
!      *
!      * WAL-logging this operation is a bit funny:
!      *
!      * We're responsible for calling XLogBeginInsert() and XLogInsert().
!      * XLogBeginInsert() must be called before placeToPage, because
!      * placeToPage can register some data to the WAL record.
!      *
!      * If placeToPage returns INSERTED, placeToPage has already called
!      * START_CRIT_SECTION() and XLogBeginInsert(), and registered any data
!      * required to replay the operation, in block index 0. We're responsible
!      * for filling in the main data portion of the WAL record, calling
!      * XLogInsert(), and END_CRIT_SECTION.
!      *
!      * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
!      * Splits happen infrequently, so we just make a full-page image of all
!      * the pages involved.
       */
!     rc = btree->placeToPage(btree, stack->buffer, stack,
!                             insertdata, updateblkno,
!                             &newlpage, &newrpage);
!     if (rc == UNMODIFIED)
      {
!         XLogResetInsertion();
!         return true;
      }
!     else if (rc == INSERTED)
      {
!         /* placeToPage did START_CRIT_SECTION() */
          MarkBufferDirty(stack->buffer);

          /* An insert to an internal page finishes the split of the child. */
!         if (childbuf != InvalidBuffer)
          {
              GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
              MarkBufferDirty(childbuf);
--- 370,411 ----
      }

      /*
!      * See if the incoming tuple will fit on the page.  beginPlaceToPage will
!      * decide if the page needs to be split, and will compute the split
!      * contents if so.  See comments for beginPlaceToPage and execPlaceToPage
!      * functions for more details of the API here.
       */
!     rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
!                                  insertdata, updateblkno,
!                                  &ptp_workspace,
!                                  &newlpage, &newrpage);
!
!     if (rc == GPTP_NO_WORK)
      {
!         /* Nothing to do */
!         result = true;
      }
!     else if (rc == GPTP_INSERT)
      {
!         /* It will fit, perform the insertion */
!         START_CRIT_SECTION();
!
!         if (RelationNeedsWAL(btree->index))
!         {
!             XLogBeginInsert();
!             XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
!             if (BufferIsValid(childbuf))
!                 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
!         }
!
!         /* Perform the page update, and register any extra WAL data */
!         btree->execPlaceToPage(btree, stack->buffer, stack,
!                                insertdata, updateblkno, ptp_workspace);
!
          MarkBufferDirty(stack->buffer);

          /* An insert to an internal page finishes the split of the child. */
!         if (BufferIsValid(childbuf))
          {
              GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
              MarkBufferDirty(childbuf);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 396,416 ****
              ginxlogInsert xlrec;
              BlockIdData childblknos[2];

-             /*
-              * placetopage already registered stack->buffer as block 0.
-              */
              xlrec.flags = xlflags;

-             if (childbuf != InvalidBuffer)
-                 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
-
              XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));

              /*
               * Log information about child if this was an insertion of a
               * downlink.
               */
!             if (childbuf != InvalidBuffer)
              {
                  BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
                  BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
--- 417,431 ----
              ginxlogInsert xlrec;
              BlockIdData childblknos[2];

              xlrec.flags = xlflags;

              XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));

              /*
               * Log information about child if this was an insertion of a
               * downlink.
               */
!             if (BufferIsValid(childbuf))
              {
                  BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
                  BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 420,442 ****

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
              PageSetLSN(page, recptr);
!             if (childbuf != InvalidBuffer)
                  PageSetLSN(childpage, recptr);
          }

          END_CRIT_SECTION();

!         return true;
      }
!     else if (rc == SPLIT)
      {
!         /* Didn't fit, had to split */
          Buffer        rbuffer;
          BlockNumber savedRightLink;
          ginxlogSplit data;
          Buffer        lbuffer = InvalidBuffer;
          Page        newrootpg = NULL;

          rbuffer = GinNewBuffer(btree->index);

          /* During index build, count the new page */
--- 435,463 ----

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
              PageSetLSN(page, recptr);
!             if (BufferIsValid(childbuf))
                  PageSetLSN(childpage, recptr);
          }

          END_CRIT_SECTION();

!         /* Insertion is complete. */
!         result = true;
      }
!     else if (rc == GPTP_SPLIT)
      {
!         /*
!          * Didn't fit, need to split.  The split has been computed in newlpage
!          * and newrpage, which are pointers to palloc'd pages, not associated
!          * with buffers.  stack->buffer is not touched yet.
!          */
          Buffer        rbuffer;
          BlockNumber savedRightLink;
          ginxlogSplit data;
          Buffer        lbuffer = InvalidBuffer;
          Page        newrootpg = NULL;

+         /* Get a new index page to become the right page */
          rbuffer = GinNewBuffer(btree->index);

          /* During index build, count the new page */
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 450,469 ****

          savedRightLink = GinPageGetOpaque(page)->rightlink;

!         /*
!          * newlpage and newrpage are pointers to memory pages, not associated
!          * with buffers. stack->buffer is not touched yet.
!          */
!
          data.node = btree->index->rd_node;
          data.flags = xlflags;
!         if (childbuf != InvalidBuffer)
          {
-             Page        childpage = BufferGetPage(childbuf, NULL, NULL,
-                                                   BGP_NO_SNAPSHOT_TEST);
-
-             GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
-
              data.leftChildBlkno = BufferGetBlockNumber(childbuf);
              data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
          }
--- 471,481 ----

          savedRightLink = GinPageGetOpaque(page)->rightlink;

!         /* Begin setting up WAL record */
          data.node = btree->index->rd_node;
          data.flags = xlflags;
!         if (BufferIsValid(childbuf))
          {
              data.leftChildBlkno = BufferGetBlockNumber(childbuf);
              data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
          }
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 473,484 ****
          if (stack->parent == NULL)
          {
              /*
!              * split root, so we need to allocate new left page and place
!              * pointer on root to left and right page
               */
              lbuffer = GinNewBuffer(btree->index);

!             /* During index build, count the newly-added root page */
              if (buildStats)
              {
                  if (btree->isData)
--- 485,496 ----
          if (stack->parent == NULL)
          {
              /*
!              * splitting the root, so we need to allocate new left page and
!              * place pointers to left and right page on root page.
               */
              lbuffer = GinNewBuffer(btree->index);

!             /* During index build, count the new left page */
              if (buildStats)
              {
                  if (btree->isData)
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 495,503 ****

              /*
               * Construct a new root page containing downlinks to the new left
!              * and right pages. (do this in a temporary copy first rather than
!              * overwriting the original page directly, so that we can still
!              * abort gracefully if this fails.)
               */
              newrootpg = PageGetTempPage(newrpage);
              GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
--- 507,515 ----

              /*
               * Construct a new root page containing downlinks to the new left
!              * and right pages.  (Do this in a temporary copy rather than
!              * overwriting the original page directly, since we're not in the
!              * critical section yet.)
               */
              newrootpg = PageGetTempPage(newrpage);
              GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 508,514 ****
          }
          else
          {
!             /* split non-root page */
              data.rrlink = savedRightLink;

              GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
--- 520,526 ----
          }
          else
          {
!             /* splitting a non-root page */
              data.rrlink = savedRightLink;

              GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 517,550 ****
          }

          /*
!          * Ok, we have the new contents of the left page in a temporary copy
!          * now (newlpage), and the newly-allocated right block has been filled
!          * in. The original page is still unchanged.
           *
           * If this is a root split, we also have a temporary page containing
!          * the new contents of the root. Copy the new left page to a
!          * newly-allocated block, and initialize the (original) root page the
!          * new copy. Otherwise, copy over the temporary copy of the new left
!          * page over the old left page.
           */

          START_CRIT_SECTION();

          MarkBufferDirty(rbuffer);
          MarkBufferDirty(stack->buffer);
-         if (BufferIsValid(childbuf))
-             MarkBufferDirty(childbuf);

          /*
!          * Restore the temporary copies over the real buffers. But don't free
!          * the temporary copies yet, WAL record data points to them.
           */
          if (stack->parent == NULL)
          {
              MarkBufferDirty(lbuffer);
!             memcpy(BufferGetPage(stack->buffer, NULL, NULL,
!                                  BGP_NO_SNAPSHOT_TEST),
!                    newrootpg, BLCKSZ);
              memcpy(BufferGetPage(lbuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
                     newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
--- 529,555 ----
          }

          /*
!          * OK, we have the new contents of the left page in a temporary copy
!          * now (newlpage), and likewise for the new contents of the
!          * newly-allocated right block. The original page is still unchanged.
           *
           * If this is a root split, we also have a temporary page containing
!          * the new contents of the root.
           */

          START_CRIT_SECTION();

          MarkBufferDirty(rbuffer);
          MarkBufferDirty(stack->buffer);

          /*
!          * Restore the temporary copies over the real buffers.
           */
          if (stack->parent == NULL)
          {
+             /* Splitting the root, three pages to update */
              MarkBufferDirty(lbuffer);
!             memcpy(page, newrootpg, BLCKSZ);
              memcpy(BufferGetPage(lbuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
                     newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 552,564 ****
          }
          else
          {
!             memcpy(BufferGetPage(stack->buffer, NULL, NULL,
!                                  BGP_NO_SNAPSHOT_TEST),
!                    newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
                     newrpage, BLCKSZ);
          }

          /* write WAL record */
          if (RelationNeedsWAL(btree->index))
          {
--- 557,575 ----
          }
          else
          {
!             /* Normal split, only two pages to update */
!             memcpy(page, newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
                     newrpage, BLCKSZ);
          }

+         /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
+         if (BufferIsValid(childbuf))
+         {
+             GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
+             MarkBufferDirty(childbuf);
+         }
+
          /* write WAL record */
          if (RelationNeedsWAL(btree->index))
          {
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 583,596 ****
                  XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
              }
              if (BufferIsValid(childbuf))
!                 XLogRegisterBuffer(3, childbuf, 0);

              XLogRegisterData((char *) &data, sizeof(ginxlogSplit));

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
!             PageSetLSN(BufferGetPage(stack->buffer, NULL, NULL,
!                                      BGP_NO_SNAPSHOT_TEST),
!                        recptr);
              PageSetLSN(BufferGetPage(rbuffer, NULL, NULL,
                                       BGP_NO_SNAPSHOT_TEST),
                         recptr);
--- 594,606 ----
                  XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
              }
              if (BufferIsValid(childbuf))
!                 XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);

              XLogRegisterData((char *) &data, sizeof(ginxlogSplit));

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
!
!             PageSetLSN(page, recptr);
              PageSetLSN(BufferGetPage(rbuffer, NULL, NULL,
                                       BGP_NO_SNAPSHOT_TEST),
                         recptr);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 604,636 ****
          END_CRIT_SECTION();

          /*
!          * We can release the lock on the right page now, but keep the
!          * original buffer locked.
           */
          UnlockReleaseBuffer(rbuffer);
          if (stack->parent == NULL)
              UnlockReleaseBuffer(lbuffer);

-         pfree(newlpage);
-         pfree(newrpage);
-         if (newrootpg)
-             pfree(newrootpg);
-
          /*
           * If we split the root, we're done. Otherwise the split is not
           * complete until the downlink for the new page has been inserted to
           * the parent.
           */
!         if (stack->parent == NULL)
!             return true;
!         else
!             return false;
      }
      else
      {
!         elog(ERROR, "unknown return code from GIN placeToPage method: %d", rc);
!         return false;            /* keep compiler quiet */
      }
  }

  /*
--- 614,644 ----
          END_CRIT_SECTION();

          /*
!          * We can release the locks/pins on the new pages now, but keep
!          * stack->buffer locked.  childbuf doesn't get unlocked either.
           */
          UnlockReleaseBuffer(rbuffer);
          if (stack->parent == NULL)
              UnlockReleaseBuffer(lbuffer);

          /*
           * If we split the root, we're done. Otherwise the split is not
           * complete until the downlink for the new page has been inserted to
           * the parent.
           */
!         result = (stack->parent == NULL);
      }
      else
      {
!         elog(ERROR, "invalid return code from GIN placeToPage method: %d", rc);
!         result = false;            /* keep compiler quiet */
      }
+
+     /* Clean up temp context */
+     MemoryContextSwitchTo(oldCxt);
+     MemoryContextDelete(tmpCxt);
+
+     return result;
  }

  /*
diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c
index ed3d917..499c4fb 100644
*** a/src/backend/access/gin/gindatapage.c
--- b/src/backend/access/gin/gindatapage.c
***************
*** 18,24 ****
  #include "access/xloginsert.h"
  #include "lib/ilist.h"
  #include "miscadmin.h"
- #include "utils/memutils.h"
  #include "utils/rel.h"

  /*
--- 18,23 ----
*************** typedef struct
*** 57,62 ****
--- 56,68 ----
      int            rsize;            /* total size on right page */

      bool        oldformat;        /* page is in pre-9.4 format on disk */
+
+     /*
+      * If we need WAL data representing the reconstructed leaf page, it's
+      * stored here by computeLeafRecompressWALData.
+      */
+     char       *walinfo;        /* buffer start */
+     int            walinfolen;        /* and length */
  } disassembledLeaf;

  typedef struct
*************** static bool leafRepackItems(disassembled
*** 105,114 ****
  static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
                 int nNewItems);

! static void registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf);
  static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
! static void dataPlaceToPageLeafSplit(Buffer buf,
!                          disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage);

--- 111,119 ----
  static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
                 int nNewItems);

! static void computeLeafRecompressWALData(disassembledLeaf *leaf);
  static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
! static void dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage);

*************** GinPageDeletePostingItem(Page page, Offs
*** 423,433 ****
  }

  /*
!  * Places keys to leaf data page and fills WAL record.
   */
  static GinPlaceToPageRC
! dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                     void *insertdata, Page *newlpage, Page *newrpage)
  {
      GinBtreeDataLeafInsertData *items = insertdata;
      ItemPointer newItems = &items->items[items->curitem];
--- 428,449 ----
  }

  /*
!  * Prepare to insert data on a leaf data page.
!  *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
!  *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
!  *
!  * In neither case should the given page buffer be modified here.
   */
  static GinPlaceToPageRC
! dataBeginPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                          void *insertdata,
!                          void **ptp_workspace,
!                          Page *newlpage, Page *newrpage)
  {
      GinBtreeDataLeafInsertData *items = insertdata;
      ItemPointer newItems = &items->items[items->curitem];
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 440,454 ****
      bool        append;
      int            segsize;
      Size        freespace;
-     MemoryContext tmpCxt;
-     MemoryContext oldCxt;
      disassembledLeaf *leaf;
      leafSegmentInfo *lastleftinfo;
      ItemPointerData maxOldItem;
      ItemPointerData remaining;

-     Assert(GinPageIsData(page));
-
      rbound = *GinDataPageGetRightBound(page);

      /*
--- 456,466 ----
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 472,489 ****
          maxitems = i;
      }

!     /*
!      * The following operations do quite a lot of small memory allocations,
!      * create a temporary memory context so that we don't need to keep track
!      * of them individually.
!      */
!     tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
!                                    "Gin split temporary context",
!                                    ALLOCSET_DEFAULT_MINSIZE,
!                                    ALLOCSET_DEFAULT_INITSIZE,
!                                    ALLOCSET_DEFAULT_MAXSIZE);
!     oldCxt = MemoryContextSwitchTo(tmpCxt);
!
      leaf = disassembleLeaf(page);

      /*
--- 484,490 ----
          maxitems = i;
      }

!     /* Disassemble the data on the page */
      leaf = disassembleLeaf(page);

      /*
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 548,563 ****
          maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
      }

!     /* Add the new items to the segments */
      if (!addItemsToLeaf(leaf, newItems, maxitems))
      {
          /* all items were duplicates, we have nothing to do */
          items->curitem += maxitems;

!         MemoryContextSwitchTo(oldCxt);
!         MemoryContextDelete(tmpCxt);
!
!         return UNMODIFIED;
      }

      /*
--- 549,561 ----
          maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
      }

!     /* Add the new items to the segment list */
      if (!addItemsToLeaf(leaf, newItems, maxitems))
      {
          /* all items were duplicates, we have nothing to do */
          items->curitem += maxitems;

!         return GPTP_NO_WORK;
      }

      /*
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 590,611 ****
      if (!needsplit)
      {
          /*
!          * Great, all the items fit on a single page. Construct a WAL record
!          * describing the changes we made, and write the segments back to the
!          * page.
!          *
!          * Once we start modifying the page, there's no turning back. The
!          * caller is responsible for calling END_CRIT_SECTION() after writing
!          * the WAL record.
           */
-         MemoryContextSwitchTo(oldCxt);
          if (RelationNeedsWAL(btree->index))
!         {
!             XLogBeginInsert();
!             registerLeafRecompressWALData(buf, leaf);
!         }
!         START_CRIT_SECTION();
!         dataPlaceToPageLeafRecompress(buf, leaf);

          if (append)
              elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
--- 588,604 ----
      if (!needsplit)
      {
          /*
!          * Great, all the items fit on a single page.  If needed, prepare data
!          * for a WAL record describing the changes we'll make.
           */
          if (RelationNeedsWAL(btree->index))
!             computeLeafRecompressWALData(leaf);
!
!         /*
!          * We're ready to enter the critical section, but
!          * dataExecPlaceToPageLeaf will need access to the "leaf" data.
!          */
!         *ptp_workspace = leaf;

          if (append)
              elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 619,625 ****
      else
      {
          /*
!          * Had to split.
           *
           * leafRepackItems already divided the segments between the left and
           * the right page. It filled the left page as full as possible, and
--- 612,618 ----
      else
      {
          /*
!          * Have to split.
           *
           * leafRepackItems already divided the segments between the left and
           * the right page. It filled the left page as full as possible, and
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 631,637 ****
           * until they're balanced.
           *
           * As a further heuristic, when appending items to the end of the
!          * page, try make the left page 75% full, one the assumption that
           * subsequent insertions will probably also go to the end. This packs
           * the index somewhat tighter when appending to a table, which is very
           * common.
--- 624,630 ----
           * until they're balanced.
           *
           * As a further heuristic, when appending items to the end of the
!          * page, try to make the left page 75% full, on the assumption that
           * subsequent insertions will probably also go to the end. This packs
           * the index somewhat tighter when appending to a table, which is very
           * common.
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 680,689 ****
                                                         &lastleftinfo->nitems);
          lbound = lastleftinfo->items[lastleftinfo->nitems - 1];

!         *newlpage = MemoryContextAlloc(oldCxt, BLCKSZ);
!         *newrpage = MemoryContextAlloc(oldCxt, BLCKSZ);

!         dataPlaceToPageLeafSplit(buf, leaf, lbound, rbound,
                                   *newlpage, *newrpage);

          Assert(GinPageRightMost(page) ||
--- 673,685 ----
                                                         &lastleftinfo->nitems);
          lbound = lastleftinfo->items[lastleftinfo->nitems - 1];

!         /*
!          * Now allocate a couple of temporary page images, and fill them.
!          */
!         *newlpage = palloc(BLCKSZ);
!         *newrpage = palloc(BLCKSZ);

!         dataPlaceToPageLeafSplit(leaf, lbound, rbound,
                                   *newlpage, *newrpage);

          Assert(GinPageRightMost(page) ||
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 700,711 ****
                   items->nitem - items->curitem - maxitems);
      }

-     MemoryContextSwitchTo(oldCxt);
-     MemoryContextDelete(tmpCxt);
-
      items->curitem += maxitems;

!     return needsplit ? SPLIT : INSERTED;
  }

  /*
--- 696,726 ----
                   items->nitem - items->curitem - maxitems);
      }

      items->curitem += maxitems;

!     return needsplit ? GPTP_SPLIT : GPTP_INSERT;
! }
!
! /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  */
! static void
! dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                         void *insertdata, void *ptp_workspace)
! {
!     disassembledLeaf *leaf = (disassembledLeaf *) ptp_workspace;
!
!     /* Apply changes to page */
!     dataPlaceToPageLeafRecompress(buf, leaf);
!
!     /* If needed, register WAL data built by computeLeafRecompressWALData */
!     if (RelationNeedsWAL(btree->index))
!     {
!         XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
!     }
  }

  /*
*************** ginVacuumPostingTreeLeaf(Relation indexr
*** 816,826 ****
          }

          if (RelationNeedsWAL(indexrel))
!         {
!             XLogBeginInsert();
!             registerLeafRecompressWALData(buffer, leaf);
!         }
          START_CRIT_SECTION();
          dataPlaceToPageLeafRecompress(buffer, leaf);

          MarkBufferDirty(buffer);
--- 831,841 ----
          }

          if (RelationNeedsWAL(indexrel))
!             computeLeafRecompressWALData(leaf);
!
!         /* Apply changes to page */
          START_CRIT_SECTION();
+
          dataPlaceToPageLeafRecompress(buffer, leaf);

          MarkBufferDirty(buffer);
*************** ginVacuumPostingTreeLeaf(Relation indexr
*** 829,834 ****
--- 844,852 ----
          {
              XLogRecPtr    recptr;

+             XLogBeginInsert();
+             XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+             XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE);
              PageSetLSN(page, recptr);
          }
*************** ginVacuumPostingTreeLeaf(Relation indexr
*** 839,848 ****

  /*
   * Construct a ginxlogRecompressDataLeaf record representing the changes
!  * in *leaf.
   */
  static void
! registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf)
  {
      int            nmodified = 0;
      char       *walbufbegin;
--- 857,867 ----

  /*
   * Construct a ginxlogRecompressDataLeaf record representing the changes
!  * in *leaf.  (Because this requires a palloc, we have to do it before
!  * we enter the critical section that actually updates the page.)
   */
  static void
! computeLeafRecompressWALData(disassembledLeaf *leaf)
  {
      int            nmodified = 0;
      char       *walbufbegin;
*************** registerLeafRecompressWALData(Buffer buf
*** 933,950 ****
              segno++;
      }

!
!     XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
!     XLogRegisterBufData(0, walbufbegin, walbufend - walbufbegin);
!
  }

  /*
   * Assemble a disassembled posting tree leaf page back to a buffer.
   *
!  * *prdata is filled with WAL information about this operation. The caller
!  * is responsible for inserting to the WAL, along with any other information
!  * about the operation that triggered this recompression.
   *
   * NOTE: The segment pointers must not point directly to the same buffer,
   * except for segments that have not been modified and whose preceding
--- 952,966 ----
              segno++;
      }

!     /* Pass back the constructed info via *leaf */
!     leaf->walinfo = walbufbegin;
!     leaf->walinfolen = walbufend - walbufbegin;
  }

  /*
   * Assemble a disassembled posting tree leaf page back to a buffer.
   *
!  * This just updates the target buffer; WAL stuff is caller's responsibility.
   *
   * NOTE: The segment pointers must not point directly to the same buffer,
   * except for segments that have not been modified and whose preceding
*************** dataPlaceToPageLeafRecompress(Buffer buf
*** 1003,1013 ****
   * segments to two pages instead of one.
   *
   * This is different from the non-split cases in that this does not modify
!  * the original page directly, but to temporary in-memory copies of the new
!  * left and right pages.
   */
  static void
! dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage)
  {
--- 1019,1029 ----
   * segments to two pages instead of one.
   *
   * This is different from the non-split cases in that this does not modify
!  * the original page directly, but writes to temporary in-memory copies of
!  * the new left and right pages.
   */
  static void
! dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage)
  {
*************** dataPlaceToPageLeafSplit(Buffer buf, dis
*** 1076,1114 ****
  }

  /*
!  * Place a PostingItem to page, and fill a WAL record.
   *
!  * If the item doesn't fit, returns false without modifying the page.
   *
!  * In addition to inserting the given item, the downlink of the existing item
!  * at 'off' is updated to point to 'updateblkno'.
   *
!  * On INSERTED, registers the buffer as buffer ID 0, with data.
!  * On SPLIT, returns rdata that represents the split pages in *prdata.
   */
  static GinPlaceToPageRC
! dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                         void *insertdata, BlockNumber updateblkno,
!                         Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
-     OffsetNumber off = stack->off;
-     PostingItem *pitem;
-
-     /* this must be static so it can be returned to caller */
-     static ginxlogInsertDataInternal data;

!     /* split if we have to */
      if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
      {
          dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
                                newlpage, newrpage);
!         return SPLIT;
      }

!     Assert(GinPageIsData(page));

!     START_CRIT_SECTION();

      /* Update existing downlink to point to next page (on internal page) */
      pitem = GinDataPageGetPostingItem(page, off);
--- 1092,1146 ----
  }

  /*
!  * Prepare to insert data on an internal data page.
   *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
   *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
   *
!  * In neither case should the given page buffer be modified here.
!  *
!  * Note: on insertion to an internal node, in addition to inserting the given
!  * item, the downlink of the existing item at stack->off will be updated to
!  * point to updateblkno.
   */
  static GinPlaceToPageRC
! dataBeginPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                              void *insertdata, BlockNumber updateblkno,
!                              void **ptp_workspace,
!                              Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);

!     /* If it doesn't fit, deal with split case */
      if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
      {
          dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
                                newlpage, newrpage);
!         return GPTP_SPLIT;
      }

!     /* Else, we're ready to proceed with insertion */
!     return GPTP_INSERT;
! }

! /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  */
! static void
! dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                             void *insertdata, BlockNumber updateblkno,
!                             void *ptp_workspace)
! {
!     Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
!     OffsetNumber off = stack->off;
!     PostingItem *pitem;

      /* Update existing downlink to point to next page (on internal page) */
      pitem = GinDataPageGetPostingItem(page, off);
*************** dataPlaceToPageInternal(GinBtree btree,
*** 1120,1162 ****

      if (RelationNeedsWAL(btree->index))
      {
          data.offset = off;
          data.newitem = *pitem;

-         XLogBeginInsert();
-         XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
          XLogRegisterBufData(0, (char *) &data,
                              sizeof(ginxlogInsertDataInternal));
      }
-
-     return INSERTED;
  }

  /*
!  * Places an item (or items) to a posting tree. Calls relevant function of
!  * internal of leaf page because they are handled very differently.
   */
  static GinPlaceToPageRC
! dataPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                 void *insertdata, BlockNumber updateblkno,
!                 Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);

      Assert(GinPageIsData(page));

      if (GinPageIsLeaf(page))
!         return dataPlaceToPageLeaf(btree, buf, stack, insertdata,
!                                    newlpage, newrpage);
      else
!         return dataPlaceToPageInternal(btree, buf, stack,
!                                        insertdata, updateblkno,
!                                        newlpage, newrpage);
  }

  /*
!  * Split page and fill WAL record. Returns a new temp buffer filled with data
!  * that should go to the left page. The original buffer is left untouched.
   */
  static void
  dataSplitPageInternal(GinBtree btree, Buffer origbuf,
--- 1152,1241 ----

      if (RelationNeedsWAL(btree->index))
      {
+         /*
+          * This must be static, because it has to survive until XLogInsert,
+          * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
+          * isn't reentrant anyway.
+          */
+         static ginxlogInsertDataInternal data;
+
          data.offset = off;
          data.newitem = *pitem;

          XLogRegisterBufData(0, (char *) &data,
                              sizeof(ginxlogInsertDataInternal));
      }
  }

  /*
!  * Prepare to insert data on a posting-tree data page.
!  *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
!  *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
!  *
!  * In neither case should the given page buffer be modified here.
!  *
!  * Note: on insertion to an internal node, in addition to inserting the given
!  * item, the downlink of the existing item at stack->off will be updated to
!  * point to updateblkno.
!  *
!  * Calls relevant function for internal or leaf page because they are handled
!  * very differently.
   */
  static GinPlaceToPageRC
! dataBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                      void *insertdata, BlockNumber updateblkno,
!                      void **ptp_workspace,
!                      Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);

      Assert(GinPageIsData(page));

      if (GinPageIsLeaf(page))
!         return dataBeginPlaceToPageLeaf(btree, buf, stack, insertdata,
!                                         ptp_workspace,
!                                         newlpage, newrpage);
      else
!         return dataBeginPlaceToPageInternal(btree, buf, stack,
!                                             insertdata, updateblkno,
!                                             ptp_workspace,
!                                             newlpage, newrpage);
  }

  /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  *
!  * Calls relevant function for internal or leaf page because they are handled
!  * very differently.
!  */
! static void
! dataExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                     void *insertdata, BlockNumber updateblkno,
!                     void *ptp_workspace)
! {
!     Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
!
!     if (GinPageIsLeaf(page))
!         dataExecPlaceToPageLeaf(btree, buf, stack, insertdata,
!                                 ptp_workspace);
!     else
!         dataExecPlaceToPageInternal(btree, buf, stack, insertdata,
!                                     updateblkno, ptp_workspace);
! }
!
! /*
!  * Split internal page and insert new data.
!  *
!  * Returns new temp pages to *newlpage and *newrpage.
!  * The original buffer is left untouched.
   */
  static void
  dataSplitPageInternal(GinBtree btree, Buffer origbuf,
*************** dataSplitPageInternal(GinBtree btree, Bu
*** 1231,1236 ****
--- 1310,1316 ----
      /* set up right bound for right page */
      *GinDataPageGetRightBound(rpage) = oldbound;

+     /* return temp pages to caller */
      *newlpage = lpage;
      *newrpage = rpage;
  }
*************** ginPrepareDataScan(GinBtree btree, Relat
*** 1789,1795 ****
      btree->isMoveRight = dataIsMoveRight;
      btree->findItem = NULL;
      btree->findChildPtr = dataFindChildPtr;
!     btree->placeToPage = dataPlaceToPage;
      btree->fillRoot = ginDataFillRoot;
      btree->prepareDownlink = dataPrepareDownlink;

--- 1869,1876 ----
      btree->isMoveRight = dataIsMoveRight;
      btree->findItem = NULL;
      btree->findChildPtr = dataFindChildPtr;
!     btree->beginPlaceToPage = dataBeginPlaceToPage;
!     btree->execPlaceToPage = dataExecPlaceToPage;
      btree->fillRoot = ginDataFillRoot;
      btree->prepareDownlink = dataPrepareDownlink;

diff --git a/src/backend/access/gin/ginentrypage.c b/src/backend/access/gin/ginentrypage.c
index 8a5d9e1..53a394e 100644
*** a/src/backend/access/gin/ginentrypage.c
--- b/src/backend/access/gin/ginentrypage.c
***************
*** 21,27 ****

  static void entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                void *insertPayload,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage);

--- 21,27 ----

  static void entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                GinBtreeEntryInsertData *insertData,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage);

*************** entryPreparePage(GinBtree btree, Page pa
*** 510,548 ****
  }

  /*
!  * Place tuple on page and fills WAL record
   *
!  * If the tuple doesn't fit, returns false without modifying the page.
   *
!  * On insertion to an internal node, in addition to inserting the given item,
!  * the downlink of the existing item at 'off' is updated to point to
!  * 'updateblkno'.
   *
!  * On INSERTED, registers the buffer as buffer ID 0, with data.
!  * On SPLIT, returns rdata that represents the split pages in *prdata.
   */
  static GinPlaceToPageRC
! entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                  void *insertPayload, BlockNumber updateblkno,
!                  Page *newlpage, Page *newrpage)
  {
      GinBtreeEntryInsertData *insertData = insertPayload;
-     Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
      OffsetNumber off = stack->off;
-     OffsetNumber placed;

!     /* this must be static so it can be returned to caller. */
!     static ginxlogInsertEntry data;
!
!     /* quick exit if it doesn't fit */
      if (!entryIsEnoughSpace(btree, buf, off, insertData))
      {
!         entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
                         newlpage, newrpage);
!         return SPLIT;
      }

!     START_CRIT_SECTION();

      entryPreparePage(btree, page, off, insertData, updateblkno);

--- 510,566 ----
  }

  /*
!  * Prepare to insert data on an entry page.
   *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
   *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
   *
!  * In neither case should the given page buffer be modified here.
!  *
!  * Note: on insertion to an internal node, in addition to inserting the given
!  * item, the downlink of the existing item at stack->off will be updated to
!  * point to updateblkno.
   */
  static GinPlaceToPageRC
! entryBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                       void *insertPayload, BlockNumber updateblkno,
!                       void **ptp_workspace,
!                       Page *newlpage, Page *newrpage)
  {
      GinBtreeEntryInsertData *insertData = insertPayload;
      OffsetNumber off = stack->off;

!     /* If it doesn't fit, deal with split case */
      if (!entryIsEnoughSpace(btree, buf, off, insertData))
      {
!         entrySplitPage(btree, buf, stack, insertData, updateblkno,
                         newlpage, newrpage);
!         return GPTP_SPLIT;
      }

!     /* Else, we're ready to proceed with insertion */
!     return GPTP_INSERT;
! }
!
! /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  */
! static void
! entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                      void *insertPayload, BlockNumber updateblkno,
!                      void *ptp_workspace)
! {
!     GinBtreeEntryInsertData *insertData = insertPayload;
!     Page        page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
!     OffsetNumber off = stack->off;
!     OffsetNumber placed;

      entryPreparePage(btree, page, off, insertData, updateblkno);

*************** entryPlaceToPage(GinBtree btree, Buffer
*** 556,589 ****

      if (RelationNeedsWAL(btree->index))
      {
          data.isDelete = insertData->isDelete;
          data.offset = off;

-         XLogBeginInsert();
-         XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
          XLogRegisterBufData(0, (char *) &data,
                              offsetof(ginxlogInsertEntry, tuple));
          XLogRegisterBufData(0, (char *) insertData->entry,
                              IndexTupleSize(insertData->entry));
      }
-
-     return INSERTED;
  }

  /*
!  * Place tuple and split page, original buffer(lbuf) leaves untouched,
!  * returns shadow pages filled with new data.
!  * Tuples are distributed between pages by equal size on its, not
!  * an equal number!
   */
  static void
  entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                void *insertPayload,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage)
  {
-     GinBtreeEntryInsertData *insertData = insertPayload;
      OffsetNumber off = stack->off;
      OffsetNumber i,
                  maxoff,
--- 574,609 ----

      if (RelationNeedsWAL(btree->index))
      {
+         /*
+          * This must be static, because it has to survive until XLogInsert,
+          * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
+          * isn't reentrant anyway.
+          */
+         static ginxlogInsertEntry data;
+
          data.isDelete = insertData->isDelete;
          data.offset = off;

          XLogRegisterBufData(0, (char *) &data,
                              offsetof(ginxlogInsertEntry, tuple));
          XLogRegisterBufData(0, (char *) insertData->entry,
                              IndexTupleSize(insertData->entry));
      }
  }

  /*
!  * Split entry page and insert new data.
!  *
!  * Returns new temp pages to *newlpage and *newrpage.
!  * The original buffer is left untouched.
   */
  static void
  entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                GinBtreeEntryInsertData *insertData,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage)
  {
      OffsetNumber off = stack->off;
      OffsetNumber i,
                  maxoff,
*************** entrySplitPage(GinBtree btree, Buffer or
*** 650,655 ****
--- 670,679 ----
      {
          itup = (IndexTuple) ptr;

+         /*
+          * Decide where to split.  We try to equalize the pages' total data
+          * size, not number of tuples.
+          */
          if (lsize > totalsize / 2)
          {
              if (separator == InvalidOffsetNumber)
*************** entrySplitPage(GinBtree btree, Buffer or
*** 667,672 ****
--- 691,697 ----
          ptr += MAXALIGN(IndexTupleSize(itup));
      }

+     /* return temp pages to caller */
      *newlpage = lpage;
      *newrpage = rpage;
  }
*************** ginPrepareEntryScan(GinBtree btree, Offs
*** 735,741 ****
      btree->isMoveRight = entryIsMoveRight;
      btree->findItem = entryLocateLeafEntry;
      btree->findChildPtr = entryFindChildPtr;
!     btree->placeToPage = entryPlaceToPage;
      btree->fillRoot = ginEntryFillRoot;
      btree->prepareDownlink = entryPrepareDownlink;

--- 760,767 ----
      btree->isMoveRight = entryIsMoveRight;
      btree->findItem = entryLocateLeafEntry;
      btree->findChildPtr = entryFindChildPtr;
!     btree->beginPlaceToPage = entryBeginPlaceToPage;
!     btree->execPlaceToPage = entryExecPlaceToPage;
      btree->fillRoot = ginEntryFillRoot;
      btree->prepareDownlink = entryPrepareDownlink;

diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h
index a7d4a90..afb558b 100644
*** a/src/include/access/gin_private.h
--- b/src/include/access/gin_private.h
*************** typedef struct GinBtreeStack
*** 641,652 ****

  typedef struct GinBtreeData *GinBtree;

! /* Return codes for GinBtreeData.placeToPage method */
  typedef enum
  {
!     UNMODIFIED,
!     INSERTED,
!     SPLIT
  } GinPlaceToPageRC;

  typedef struct GinBtreeData
--- 641,652 ----

  typedef struct GinBtreeData *GinBtree;

! /* Return codes for GinBtreeData.beginPlaceToPage method */
  typedef enum
  {
!     GPTP_NO_WORK,
!     GPTP_INSERT,
!     GPTP_SPLIT
  } GinPlaceToPageRC;

  typedef struct GinBtreeData
*************** typedef struct GinBtreeData
*** 659,665 ****

      /* insert methods */
      OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber);
!     GinPlaceToPageRC (*placeToPage) (GinBtree, Buffer, GinBtreeStack *, void *, BlockNumber, Page *, Page *);
      void       *(*prepareDownlink) (GinBtree, Buffer);
      void        (*fillRoot) (GinBtree, Page, BlockNumber, Page, BlockNumber, Page);

--- 659,666 ----

      /* insert methods */
      OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber);
!     GinPlaceToPageRC (*beginPlaceToPage) (GinBtree, Buffer, GinBtreeStack *, void *, BlockNumber, void **, Page *,
Page*); 
!     void        (*execPlaceToPage) (GinBtree, Buffer, GinBtreeStack *, void *, BlockNumber, void *);
      void       *(*prepareDownlink) (GinBtree, Buffer);
      void        (*fillRoot) (GinBtree, Page, BlockNumber, Page, BlockNumber, Page);

diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index f0ff91a..9b57b77 100644
*** a/src/backend/access/gin/ginbtree.c
--- b/src/backend/access/gin/ginbtree.c
***************
*** 17,22 ****
--- 17,23 ----
  #include "access/gin_private.h"
  #include "access/xloginsert.h"
  #include "miscadmin.h"
+ #include "utils/memutils.h"
  #include "utils/rel.h"

  static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
*************** ginFindParents(GinBtree btree, GinBtreeS
*** 310,324 ****
   * Insert a new item to a page.
   *
   * Returns true if the insertion was finished. On false, the page was split and
!  * the parent needs to be updated. (a root split returns true as it doesn't
!  * need any further action by the caller to complete)
   *
   * When inserting a downlink to an internal page, 'childbuf' contains the
   * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
!  * atomically with the insert. Also, the existing item at the given location
!  * is updated to point to 'updateblkno'.
   *
   * stack->buffer is locked on entry, and is kept locked.
   */
  static bool
  ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
--- 311,326 ----
   * Insert a new item to a page.
   *
   * Returns true if the insertion was finished. On false, the page was split and
!  * the parent needs to be updated. (A root split returns true as it doesn't
!  * need any further action by the caller to complete.)
   *
   * When inserting a downlink to an internal page, 'childbuf' contains the
   * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
!  * atomically with the insert. Also, the existing item at offset stack->off
!  * in the target page is updated to point to updateblkno.
   *
   * stack->buffer is locked on entry, and is kept locked.
+  * Likewise for childbuf, if given.
   */
  static bool
  ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 326,336 ****
--- 328,355 ----
                 Buffer childbuf, GinStatsData *buildStats)
  {
      Page        page = BufferGetPage(stack->buffer);
+     bool        result;
      GinPlaceToPageRC rc;
      uint16        xlflags = 0;
      Page        childpage = NULL;
      Page        newlpage = NULL,
                  newrpage = NULL;
+     void       *ptp_workspace = NULL;
+     MemoryContext tmpCxt;
+     MemoryContext oldCxt;
+
+     /*
+      * We do all the work of this function and its subfunctions in a temporary
+      * memory context.  This avoids leakages and simplifies APIs, since some
+      * subfunctions allocate storage that has to survive until we've finished
+      * the WAL insertion.
+      */
+     tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
+                                    "ginPlaceToPage temporary context",
+                                    ALLOCSET_DEFAULT_MINSIZE,
+                                    ALLOCSET_DEFAULT_INITSIZE,
+                                    ALLOCSET_DEFAULT_MAXSIZE);
+     oldCxt = MemoryContextSwitchTo(tmpCxt);

      if (GinPageIsData(page))
          xlflags |= GIN_INSERT_ISDATA;
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 348,387 ****
      }

      /*
!      * Try to put the incoming tuple on the page. placeToPage will decide if
!      * the page needs to be split.
!      *
!      * WAL-logging this operation is a bit funny:
!      *
!      * We're responsible for calling XLogBeginInsert() and XLogInsert().
!      * XLogBeginInsert() must be called before placeToPage, because
!      * placeToPage can register some data to the WAL record.
!      *
!      * If placeToPage returns INSERTED, placeToPage has already called
!      * START_CRIT_SECTION() and XLogBeginInsert(), and registered any data
!      * required to replay the operation, in block index 0. We're responsible
!      * for filling in the main data portion of the WAL record, calling
!      * XLogInsert(), and END_CRIT_SECTION.
!      *
!      * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
!      * Splits happen infrequently, so we just make a full-page image of all
!      * the pages involved.
       */
!     rc = btree->placeToPage(btree, stack->buffer, stack,
!                             insertdata, updateblkno,
!                             &newlpage, &newrpage);
!     if (rc == UNMODIFIED)
      {
!         XLogResetInsertion();
!         return true;
      }
!     else if (rc == INSERTED)
      {
!         /* placeToPage did START_CRIT_SECTION() */
          MarkBufferDirty(stack->buffer);

          /* An insert to an internal page finishes the split of the child. */
!         if (childbuf != InvalidBuffer)
          {
              GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
              MarkBufferDirty(childbuf);
--- 367,408 ----
      }

      /*
!      * See if the incoming tuple will fit on the page.  beginPlaceToPage will
!      * decide if the page needs to be split, and will compute the split
!      * contents if so.  See comments for beginPlaceToPage and execPlaceToPage
!      * functions for more details of the API here.
       */
!     rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
!                                  insertdata, updateblkno,
!                                  &ptp_workspace,
!                                  &newlpage, &newrpage);
!
!     if (rc == GPTP_NO_WORK)
      {
!         /* Nothing to do */
!         result = true;
      }
!     else if (rc == GPTP_INSERT)
      {
!         /* It will fit, perform the insertion */
!         START_CRIT_SECTION();
!
!         if (RelationNeedsWAL(btree->index))
!         {
!             XLogBeginInsert();
!             XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
!             if (BufferIsValid(childbuf))
!                 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
!         }
!
!         /* Perform the page update, and register any extra WAL data */
!         btree->execPlaceToPage(btree, stack->buffer, stack,
!                                insertdata, updateblkno, ptp_workspace);
!
          MarkBufferDirty(stack->buffer);

          /* An insert to an internal page finishes the split of the child. */
!         if (BufferIsValid(childbuf))
          {
              GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
              MarkBufferDirty(childbuf);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 393,413 ****
              ginxlogInsert xlrec;
              BlockIdData childblknos[2];

-             /*
-              * placetopage already registered stack->buffer as block 0.
-              */
              xlrec.flags = xlflags;

-             if (childbuf != InvalidBuffer)
-                 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
-
              XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));

              /*
               * Log information about child if this was an insertion of a
               * downlink.
               */
!             if (childbuf != InvalidBuffer)
              {
                  BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
                  BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
--- 414,428 ----
              ginxlogInsert xlrec;
              BlockIdData childblknos[2];

              xlrec.flags = xlflags;

              XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));

              /*
               * Log information about child if this was an insertion of a
               * downlink.
               */
!             if (BufferIsValid(childbuf))
              {
                  BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
                  BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 417,439 ****

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
              PageSetLSN(page, recptr);
!             if (childbuf != InvalidBuffer)
                  PageSetLSN(childpage, recptr);
          }

          END_CRIT_SECTION();

!         return true;
      }
!     else if (rc == SPLIT)
      {
!         /* Didn't fit, had to split */
          Buffer        rbuffer;
          BlockNumber savedRightLink;
          ginxlogSplit data;
          Buffer        lbuffer = InvalidBuffer;
          Page        newrootpg = NULL;

          rbuffer = GinNewBuffer(btree->index);

          /* During index build, count the new page */
--- 432,460 ----

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
              PageSetLSN(page, recptr);
!             if (BufferIsValid(childbuf))
                  PageSetLSN(childpage, recptr);
          }

          END_CRIT_SECTION();

!         /* Insertion is complete. */
!         result = true;
      }
!     else if (rc == GPTP_SPLIT)
      {
!         /*
!          * Didn't fit, need to split.  The split has been computed in newlpage
!          * and newrpage, which are pointers to palloc'd pages, not associated
!          * with buffers.  stack->buffer is not touched yet.
!          */
          Buffer        rbuffer;
          BlockNumber savedRightLink;
          ginxlogSplit data;
          Buffer        lbuffer = InvalidBuffer;
          Page        newrootpg = NULL;

+         /* Get a new index page to become the right page */
          rbuffer = GinNewBuffer(btree->index);

          /* During index build, count the new page */
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 447,465 ****

          savedRightLink = GinPageGetOpaque(page)->rightlink;

!         /*
!          * newlpage and newrpage are pointers to memory pages, not associated
!          * with buffers. stack->buffer is not touched yet.
!          */
!
          data.node = btree->index->rd_node;
          data.flags = xlflags;
!         if (childbuf != InvalidBuffer)
          {
-             Page        childpage = BufferGetPage(childbuf);
-
-             GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
-
              data.leftChildBlkno = BufferGetBlockNumber(childbuf);
              data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
          }
--- 468,478 ----

          savedRightLink = GinPageGetOpaque(page)->rightlink;

!         /* Begin setting up WAL record */
          data.node = btree->index->rd_node;
          data.flags = xlflags;
!         if (BufferIsValid(childbuf))
          {
              data.leftChildBlkno = BufferGetBlockNumber(childbuf);
              data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
          }
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 469,480 ****
          if (stack->parent == NULL)
          {
              /*
!              * split root, so we need to allocate new left page and place
!              * pointer on root to left and right page
               */
              lbuffer = GinNewBuffer(btree->index);

!             /* During index build, count the newly-added root page */
              if (buildStats)
              {
                  if (btree->isData)
--- 482,493 ----
          if (stack->parent == NULL)
          {
              /*
!              * splitting the root, so we need to allocate new left page and
!              * place pointers to left and right page on root page.
               */
              lbuffer = GinNewBuffer(btree->index);

!             /* During index build, count the new left page */
              if (buildStats)
              {
                  if (btree->isData)
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 491,499 ****

              /*
               * Construct a new root page containing downlinks to the new left
!              * and right pages. (do this in a temporary copy first rather than
!              * overwriting the original page directly, so that we can still
!              * abort gracefully if this fails.)
               */
              newrootpg = PageGetTempPage(newrpage);
              GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
--- 504,512 ----

              /*
               * Construct a new root page containing downlinks to the new left
!              * and right pages.  (Do this in a temporary copy rather than
!              * overwriting the original page directly, since we're not in the
!              * critical section yet.)
               */
              newrootpg = PageGetTempPage(newrpage);
              GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 504,510 ****
          }
          else
          {
!             /* split non-root page */
              data.rrlink = savedRightLink;

              GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
--- 517,523 ----
          }
          else
          {
!             /* splitting a non-root page */
              data.rrlink = savedRightLink;

              GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 513,553 ****
          }

          /*
!          * Ok, we have the new contents of the left page in a temporary copy
!          * now (newlpage), and the newly-allocated right block has been filled
!          * in. The original page is still unchanged.
           *
           * If this is a root split, we also have a temporary page containing
!          * the new contents of the root. Copy the new left page to a
!          * newly-allocated block, and initialize the (original) root page the
!          * new copy. Otherwise, copy over the temporary copy of the new left
!          * page over the old left page.
           */

          START_CRIT_SECTION();

          MarkBufferDirty(rbuffer);
          MarkBufferDirty(stack->buffer);
-         if (BufferIsValid(childbuf))
-             MarkBufferDirty(childbuf);

          /*
!          * Restore the temporary copies over the real buffers. But don't free
!          * the temporary copies yet, WAL record data points to them.
           */
          if (stack->parent == NULL)
          {
              MarkBufferDirty(lbuffer);
!             memcpy(BufferGetPage(stack->buffer), newrootpg, BLCKSZ);
              memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
          }
          else
          {
!             memcpy(BufferGetPage(stack->buffer), newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
          }

          /* write WAL record */
          if (RelationNeedsWAL(btree->index))
          {
--- 526,569 ----
          }

          /*
!          * OK, we have the new contents of the left page in a temporary copy
!          * now (newlpage), and likewise for the new contents of the
!          * newly-allocated right block. The original page is still unchanged.
           *
           * If this is a root split, we also have a temporary page containing
!          * the new contents of the root.
           */

          START_CRIT_SECTION();

          MarkBufferDirty(rbuffer);
          MarkBufferDirty(stack->buffer);

          /*
!          * Restore the temporary copies over the real buffers.
           */
          if (stack->parent == NULL)
          {
+             /* Splitting the root, three pages to update */
              MarkBufferDirty(lbuffer);
!             memcpy(page, newrootpg, BLCKSZ);
              memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
          }
          else
          {
!             /* Normal split, only two pages to update */
!             memcpy(page, newlpage, BLCKSZ);
              memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
          }

+         /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
+         if (BufferIsValid(childbuf))
+         {
+             GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
+             MarkBufferDirty(childbuf);
+         }
+
          /* write WAL record */
          if (RelationNeedsWAL(btree->index))
          {
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 572,583 ****
                  XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
              }
              if (BufferIsValid(childbuf))
!                 XLogRegisterBuffer(3, childbuf, 0);

              XLogRegisterData((char *) &data, sizeof(ginxlogSplit));

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
!             PageSetLSN(BufferGetPage(stack->buffer), recptr);
              PageSetLSN(BufferGetPage(rbuffer), recptr);
              if (stack->parent == NULL)
                  PageSetLSN(BufferGetPage(lbuffer), recptr);
--- 588,600 ----
                  XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
              }
              if (BufferIsValid(childbuf))
!                 XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);

              XLogRegisterData((char *) &data, sizeof(ginxlogSplit));

              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
!
!             PageSetLSN(page, recptr);
              PageSetLSN(BufferGetPage(rbuffer), recptr);
              if (stack->parent == NULL)
                  PageSetLSN(BufferGetPage(lbuffer), recptr);
*************** ginPlaceToPage(GinBtree btree, GinBtreeS
*** 587,619 ****
          END_CRIT_SECTION();

          /*
!          * We can release the lock on the right page now, but keep the
!          * original buffer locked.
           */
          UnlockReleaseBuffer(rbuffer);
          if (stack->parent == NULL)
              UnlockReleaseBuffer(lbuffer);

-         pfree(newlpage);
-         pfree(newrpage);
-         if (newrootpg)
-             pfree(newrootpg);
-
          /*
           * If we split the root, we're done. Otherwise the split is not
           * complete until the downlink for the new page has been inserted to
           * the parent.
           */
!         if (stack->parent == NULL)
!             return true;
!         else
!             return false;
      }
      else
      {
!         elog(ERROR, "unknown return code from GIN placeToPage method: %d", rc);
!         return false;            /* keep compiler quiet */
      }
  }

  /*
--- 604,634 ----
          END_CRIT_SECTION();

          /*
!          * We can release the locks/pins on the new pages now, but keep
!          * stack->buffer locked.  childbuf doesn't get unlocked either.
           */
          UnlockReleaseBuffer(rbuffer);
          if (stack->parent == NULL)
              UnlockReleaseBuffer(lbuffer);

          /*
           * If we split the root, we're done. Otherwise the split is not
           * complete until the downlink for the new page has been inserted to
           * the parent.
           */
!         result = (stack->parent == NULL);
      }
      else
      {
!         elog(ERROR, "invalid return code from GIN placeToPage method: %d", rc);
!         result = false;            /* keep compiler quiet */
      }
+
+     /* Clean up temp context */
+     MemoryContextSwitchTo(oldCxt);
+     MemoryContextDelete(tmpCxt);
+
+     return result;
  }

  /*
diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c
index ec8c94b..feac59d 100644
*** a/src/backend/access/gin/gindatapage.c
--- b/src/backend/access/gin/gindatapage.c
***************
*** 18,24 ****
  #include "access/xloginsert.h"
  #include "lib/ilist.h"
  #include "miscadmin.h"
- #include "utils/memutils.h"
  #include "utils/rel.h"

  /*
--- 18,23 ----
*************** typedef struct
*** 57,62 ****
--- 56,68 ----
      int            rsize;            /* total size on right page */

      bool        oldformat;        /* page is in pre-9.4 format on disk */
+
+     /*
+      * If we need WAL data representing the reconstructed leaf page, it's
+      * stored here by computeLeafRecompressWALData.
+      */
+     char       *walinfo;        /* buffer start */
+     int            walinfolen;        /* and length */
  } disassembledLeaf;

  typedef struct
*************** static bool leafRepackItems(disassembled
*** 105,114 ****
  static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
                 int nNewItems);

! static void registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf);
  static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
! static void dataPlaceToPageLeafSplit(Buffer buf,
!                          disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage);

--- 111,119 ----
  static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
                 int nNewItems);

! static void computeLeafRecompressWALData(disassembledLeaf *leaf);
  static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
! static void dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage);

*************** GinPageDeletePostingItem(Page page, Offs
*** 423,433 ****
  }

  /*
!  * Places keys to leaf data page and fills WAL record.
   */
  static GinPlaceToPageRC
! dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                     void *insertdata, Page *newlpage, Page *newrpage)
  {
      GinBtreeDataLeafInsertData *items = insertdata;
      ItemPointer newItems = &items->items[items->curitem];
--- 428,449 ----
  }

  /*
!  * Prepare to insert data on a leaf data page.
!  *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
!  *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
!  *
!  * In neither case should the given page buffer be modified here.
   */
  static GinPlaceToPageRC
! dataBeginPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                          void *insertdata,
!                          void **ptp_workspace,
!                          Page *newlpage, Page *newrpage)
  {
      GinBtreeDataLeafInsertData *items = insertdata;
      ItemPointer newItems = &items->items[items->curitem];
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 440,454 ****
      bool        append;
      int            segsize;
      Size        freespace;
-     MemoryContext tmpCxt;
-     MemoryContext oldCxt;
      disassembledLeaf *leaf;
      leafSegmentInfo *lastleftinfo;
      ItemPointerData maxOldItem;
      ItemPointerData remaining;

-     Assert(GinPageIsData(page));
-
      rbound = *GinDataPageGetRightBound(page);

      /*
--- 456,466 ----
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 472,489 ****
          maxitems = i;
      }

!     /*
!      * The following operations do quite a lot of small memory allocations,
!      * create a temporary memory context so that we don't need to keep track
!      * of them individually.
!      */
!     tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
!                                    "Gin split temporary context",
!                                    ALLOCSET_DEFAULT_MINSIZE,
!                                    ALLOCSET_DEFAULT_INITSIZE,
!                                    ALLOCSET_DEFAULT_MAXSIZE);
!     oldCxt = MemoryContextSwitchTo(tmpCxt);
!
      leaf = disassembleLeaf(page);

      /*
--- 484,490 ----
          maxitems = i;
      }

!     /* Disassemble the data on the page */
      leaf = disassembleLeaf(page);

      /*
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 548,563 ****
          maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
      }

!     /* Add the new items to the segments */
      if (!addItemsToLeaf(leaf, newItems, maxitems))
      {
          /* all items were duplicates, we have nothing to do */
          items->curitem += maxitems;

!         MemoryContextSwitchTo(oldCxt);
!         MemoryContextDelete(tmpCxt);
!
!         return UNMODIFIED;
      }

      /*
--- 549,561 ----
          maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
      }

!     /* Add the new items to the segment list */
      if (!addItemsToLeaf(leaf, newItems, maxitems))
      {
          /* all items were duplicates, we have nothing to do */
          items->curitem += maxitems;

!         return GPTP_NO_WORK;
      }

      /*
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 590,611 ****
      if (!needsplit)
      {
          /*
!          * Great, all the items fit on a single page. Construct a WAL record
!          * describing the changes we made, and write the segments back to the
!          * page.
!          *
!          * Once we start modifying the page, there's no turning back. The
!          * caller is responsible for calling END_CRIT_SECTION() after writing
!          * the WAL record.
           */
-         MemoryContextSwitchTo(oldCxt);
          if (RelationNeedsWAL(btree->index))
!         {
!             XLogBeginInsert();
!             registerLeafRecompressWALData(buf, leaf);
!         }
!         START_CRIT_SECTION();
!         dataPlaceToPageLeafRecompress(buf, leaf);

          if (append)
              elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
--- 588,604 ----
      if (!needsplit)
      {
          /*
!          * Great, all the items fit on a single page.  If needed, prepare data
!          * for a WAL record describing the changes we'll make.
           */
          if (RelationNeedsWAL(btree->index))
!             computeLeafRecompressWALData(leaf);
!
!         /*
!          * We're ready to enter the critical section, but
!          * dataExecPlaceToPageLeaf will need access to the "leaf" data.
!          */
!         *ptp_workspace = leaf;

          if (append)
              elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 619,625 ****
      else
      {
          /*
!          * Had to split.
           *
           * leafRepackItems already divided the segments between the left and
           * the right page. It filled the left page as full as possible, and
--- 612,618 ----
      else
      {
          /*
!          * Have to split.
           *
           * leafRepackItems already divided the segments between the left and
           * the right page. It filled the left page as full as possible, and
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 631,637 ****
           * until they're balanced.
           *
           * As a further heuristic, when appending items to the end of the
!          * page, try make the left page 75% full, one the assumption that
           * subsequent insertions will probably also go to the end. This packs
           * the index somewhat tighter when appending to a table, which is very
           * common.
--- 624,630 ----
           * until they're balanced.
           *
           * As a further heuristic, when appending items to the end of the
!          * page, try to make the left page 75% full, on the assumption that
           * subsequent insertions will probably also go to the end. This packs
           * the index somewhat tighter when appending to a table, which is very
           * common.
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 680,689 ****
                                                         &lastleftinfo->nitems);
          lbound = lastleftinfo->items[lastleftinfo->nitems - 1];

!         *newlpage = MemoryContextAlloc(oldCxt, BLCKSZ);
!         *newrpage = MemoryContextAlloc(oldCxt, BLCKSZ);

!         dataPlaceToPageLeafSplit(buf, leaf, lbound, rbound,
                                   *newlpage, *newrpage);

          Assert(GinPageRightMost(page) ||
--- 673,685 ----
                                                         &lastleftinfo->nitems);
          lbound = lastleftinfo->items[lastleftinfo->nitems - 1];

!         /*
!          * Now allocate a couple of temporary page images, and fill them.
!          */
!         *newlpage = palloc(BLCKSZ);
!         *newrpage = palloc(BLCKSZ);

!         dataPlaceToPageLeafSplit(leaf, lbound, rbound,
                                   *newlpage, *newrpage);

          Assert(GinPageRightMost(page) ||
*************** dataPlaceToPageLeaf(GinBtree btree, Buff
*** 700,711 ****
                   items->nitem - items->curitem - maxitems);
      }

-     MemoryContextSwitchTo(oldCxt);
-     MemoryContextDelete(tmpCxt);
-
      items->curitem += maxitems;

!     return needsplit ? SPLIT : INSERTED;
  }

  /*
--- 696,726 ----
                   items->nitem - items->curitem - maxitems);
      }

      items->curitem += maxitems;

!     return needsplit ? GPTP_SPLIT : GPTP_INSERT;
! }
!
! /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  */
! static void
! dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                         void *insertdata, void *ptp_workspace)
! {
!     disassembledLeaf *leaf = (disassembledLeaf *) ptp_workspace;
!
!     /* Apply changes to page */
!     dataPlaceToPageLeafRecompress(buf, leaf);
!
!     /* If needed, register WAL data built by computeLeafRecompressWALData */
!     if (RelationNeedsWAL(btree->index))
!     {
!         XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
!     }
  }

  /*
*************** ginVacuumPostingTreeLeaf(Relation indexr
*** 816,826 ****
          }

          if (RelationNeedsWAL(indexrel))
!         {
!             XLogBeginInsert();
!             registerLeafRecompressWALData(buffer, leaf);
!         }
          START_CRIT_SECTION();
          dataPlaceToPageLeafRecompress(buffer, leaf);

          MarkBufferDirty(buffer);
--- 831,841 ----
          }

          if (RelationNeedsWAL(indexrel))
!             computeLeafRecompressWALData(leaf);
!
!         /* Apply changes to page */
          START_CRIT_SECTION();
+
          dataPlaceToPageLeafRecompress(buffer, leaf);

          MarkBufferDirty(buffer);
*************** ginVacuumPostingTreeLeaf(Relation indexr
*** 829,834 ****
--- 844,852 ----
          {
              XLogRecPtr    recptr;

+             XLogBeginInsert();
+             XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+             XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
              recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE);
              PageSetLSN(page, recptr);
          }
*************** ginVacuumPostingTreeLeaf(Relation indexr
*** 839,848 ****

  /*
   * Construct a ginxlogRecompressDataLeaf record representing the changes
!  * in *leaf.
   */
  static void
! registerLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf)
  {
      int            nmodified = 0;
      char       *walbufbegin;
--- 857,867 ----

  /*
   * Construct a ginxlogRecompressDataLeaf record representing the changes
!  * in *leaf.  (Because this requires a palloc, we have to do it before
!  * we enter the critical section that actually updates the page.)
   */
  static void
! computeLeafRecompressWALData(disassembledLeaf *leaf)
  {
      int            nmodified = 0;
      char       *walbufbegin;
*************** registerLeafRecompressWALData(Buffer buf
*** 933,950 ****
              segno++;
      }

!
!     XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
!     XLogRegisterBufData(0, walbufbegin, walbufend - walbufbegin);
!
  }

  /*
   * Assemble a disassembled posting tree leaf page back to a buffer.
   *
!  * *prdata is filled with WAL information about this operation. The caller
!  * is responsible for inserting to the WAL, along with any other information
!  * about the operation that triggered this recompression.
   *
   * NOTE: The segment pointers must not point directly to the same buffer,
   * except for segments that have not been modified and whose preceding
--- 952,966 ----
              segno++;
      }

!     /* Pass back the constructed info via *leaf */
!     leaf->walinfo = walbufbegin;
!     leaf->walinfolen = walbufend - walbufbegin;
  }

  /*
   * Assemble a disassembled posting tree leaf page back to a buffer.
   *
!  * This just updates the target buffer; WAL stuff is caller's responsibility.
   *
   * NOTE: The segment pointers must not point directly to the same buffer,
   * except for segments that have not been modified and whose preceding
*************** dataPlaceToPageLeafRecompress(Buffer buf
*** 1003,1013 ****
   * segments to two pages instead of one.
   *
   * This is different from the non-split cases in that this does not modify
!  * the original page directly, but to temporary in-memory copies of the new
!  * left and right pages.
   */
  static void
! dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage)
  {
--- 1019,1029 ----
   * segments to two pages instead of one.
   *
   * This is different from the non-split cases in that this does not modify
!  * the original page directly, but writes to temporary in-memory copies of
!  * the new left and right pages.
   */
  static void
! dataPlaceToPageLeafSplit(disassembledLeaf *leaf,
                           ItemPointerData lbound, ItemPointerData rbound,
                           Page lpage, Page rpage)
  {
*************** dataPlaceToPageLeafSplit(Buffer buf, dis
*** 1076,1114 ****
  }

  /*
!  * Place a PostingItem to page, and fill a WAL record.
   *
!  * If the item doesn't fit, returns false without modifying the page.
   *
!  * In addition to inserting the given item, the downlink of the existing item
!  * at 'off' is updated to point to 'updateblkno'.
   *
!  * On INSERTED, registers the buffer as buffer ID 0, with data.
!  * On SPLIT, returns rdata that represents the split pages in *prdata.
   */
  static GinPlaceToPageRC
! dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                         void *insertdata, BlockNumber updateblkno,
!                         Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf);
-     OffsetNumber off = stack->off;
-     PostingItem *pitem;
-
-     /* this must be static so it can be returned to caller */
-     static ginxlogInsertDataInternal data;

!     /* split if we have to */
      if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
      {
          dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
                                newlpage, newrpage);
!         return SPLIT;
      }

!     Assert(GinPageIsData(page));

!     START_CRIT_SECTION();

      /* Update existing downlink to point to next page (on internal page) */
      pitem = GinDataPageGetPostingItem(page, off);
--- 1092,1146 ----
  }

  /*
!  * Prepare to insert data on an internal data page.
   *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
   *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
   *
!  * In neither case should the given page buffer be modified here.
!  *
!  * Note: on insertion to an internal node, in addition to inserting the given
!  * item, the downlink of the existing item at stack->off will be updated to
!  * point to updateblkno.
   */
  static GinPlaceToPageRC
! dataBeginPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                              void *insertdata, BlockNumber updateblkno,
!                              void **ptp_workspace,
!                              Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf);

!     /* If it doesn't fit, deal with split case */
      if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
      {
          dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
                                newlpage, newrpage);
!         return GPTP_SPLIT;
      }

!     /* Else, we're ready to proceed with insertion */
!     return GPTP_INSERT;
! }

! /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  */
! static void
! dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                             void *insertdata, BlockNumber updateblkno,
!                             void *ptp_workspace)
! {
!     Page        page = BufferGetPage(buf);
!     OffsetNumber off = stack->off;
!     PostingItem *pitem;

      /* Update existing downlink to point to next page (on internal page) */
      pitem = GinDataPageGetPostingItem(page, off);
*************** dataPlaceToPageInternal(GinBtree btree,
*** 1120,1162 ****

      if (RelationNeedsWAL(btree->index))
      {
          data.offset = off;
          data.newitem = *pitem;

-         XLogBeginInsert();
-         XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
          XLogRegisterBufData(0, (char *) &data,
                              sizeof(ginxlogInsertDataInternal));
      }
-
-     return INSERTED;
  }

  /*
!  * Places an item (or items) to a posting tree. Calls relevant function of
!  * internal of leaf page because they are handled very differently.
   */
  static GinPlaceToPageRC
! dataPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                 void *insertdata, BlockNumber updateblkno,
!                 Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf);

      Assert(GinPageIsData(page));

      if (GinPageIsLeaf(page))
!         return dataPlaceToPageLeaf(btree, buf, stack, insertdata,
!                                    newlpage, newrpage);
      else
!         return dataPlaceToPageInternal(btree, buf, stack,
!                                        insertdata, updateblkno,
!                                        newlpage, newrpage);
  }

  /*
!  * Split page and fill WAL record. Returns a new temp buffer filled with data
!  * that should go to the left page. The original buffer is left untouched.
   */
  static void
  dataSplitPageInternal(GinBtree btree, Buffer origbuf,
--- 1152,1241 ----

      if (RelationNeedsWAL(btree->index))
      {
+         /*
+          * This must be static, because it has to survive until XLogInsert,
+          * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
+          * isn't reentrant anyway.
+          */
+         static ginxlogInsertDataInternal data;
+
          data.offset = off;
          data.newitem = *pitem;

          XLogRegisterBufData(0, (char *) &data,
                              sizeof(ginxlogInsertDataInternal));
      }
  }

  /*
!  * Prepare to insert data on a posting-tree data page.
!  *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
!  *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
!  *
!  * In neither case should the given page buffer be modified here.
!  *
!  * Note: on insertion to an internal node, in addition to inserting the given
!  * item, the downlink of the existing item at stack->off will be updated to
!  * point to updateblkno.
!  *
!  * Calls relevant function for internal or leaf page because they are handled
!  * very differently.
   */
  static GinPlaceToPageRC
! dataBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                      void *insertdata, BlockNumber updateblkno,
!                      void **ptp_workspace,
!                      Page *newlpage, Page *newrpage)
  {
      Page        page = BufferGetPage(buf);

      Assert(GinPageIsData(page));

      if (GinPageIsLeaf(page))
!         return dataBeginPlaceToPageLeaf(btree, buf, stack, insertdata,
!                                         ptp_workspace,
!                                         newlpage, newrpage);
      else
!         return dataBeginPlaceToPageInternal(btree, buf, stack,
!                                             insertdata, updateblkno,
!                                             ptp_workspace,
!                                             newlpage, newrpage);
  }

  /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  *
!  * Calls relevant function for internal or leaf page because they are handled
!  * very differently.
!  */
! static void
! dataExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                     void *insertdata, BlockNumber updateblkno,
!                     void *ptp_workspace)
! {
!     Page        page = BufferGetPage(buf);
!
!     if (GinPageIsLeaf(page))
!         dataExecPlaceToPageLeaf(btree, buf, stack, insertdata,
!                                 ptp_workspace);
!     else
!         dataExecPlaceToPageInternal(btree, buf, stack, insertdata,
!                                     updateblkno, ptp_workspace);
! }
!
! /*
!  * Split internal page and insert new data.
!  *
!  * Returns new temp pages to *newlpage and *newrpage.
!  * The original buffer is left untouched.
   */
  static void
  dataSplitPageInternal(GinBtree btree, Buffer origbuf,
*************** dataSplitPageInternal(GinBtree btree, Bu
*** 1231,1236 ****
--- 1310,1316 ----
      /* set up right bound for right page */
      *GinDataPageGetRightBound(rpage) = oldbound;

+     /* return temp pages to caller */
      *newlpage = lpage;
      *newrpage = rpage;
  }
*************** ginPrepareDataScan(GinBtree btree, Relat
*** 1789,1795 ****
      btree->isMoveRight = dataIsMoveRight;
      btree->findItem = NULL;
      btree->findChildPtr = dataFindChildPtr;
!     btree->placeToPage = dataPlaceToPage;
      btree->fillRoot = ginDataFillRoot;
      btree->prepareDownlink = dataPrepareDownlink;

--- 1869,1876 ----
      btree->isMoveRight = dataIsMoveRight;
      btree->findItem = NULL;
      btree->findChildPtr = dataFindChildPtr;
!     btree->beginPlaceToPage = dataBeginPlaceToPage;
!     btree->execPlaceToPage = dataExecPlaceToPage;
      btree->fillRoot = ginDataFillRoot;
      btree->prepareDownlink = dataPrepareDownlink;

diff --git a/src/backend/access/gin/ginentrypage.c b/src/backend/access/gin/ginentrypage.c
index c912e60..a022f50 100644
*** a/src/backend/access/gin/ginentrypage.c
--- b/src/backend/access/gin/ginentrypage.c
***************
*** 21,27 ****

  static void entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                void *insertPayload,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage);

--- 21,27 ----

  static void entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                GinBtreeEntryInsertData *insertData,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage);

*************** entryPreparePage(GinBtree btree, Page pa
*** 508,546 ****
  }

  /*
!  * Place tuple on page and fills WAL record
   *
!  * If the tuple doesn't fit, returns false without modifying the page.
   *
!  * On insertion to an internal node, in addition to inserting the given item,
!  * the downlink of the existing item at 'off' is updated to point to
!  * 'updateblkno'.
   *
!  * On INSERTED, registers the buffer as buffer ID 0, with data.
!  * On SPLIT, returns rdata that represents the split pages in *prdata.
   */
  static GinPlaceToPageRC
! entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                  void *insertPayload, BlockNumber updateblkno,
!                  Page *newlpage, Page *newrpage)
  {
      GinBtreeEntryInsertData *insertData = insertPayload;
-     Page        page = BufferGetPage(buf);
      OffsetNumber off = stack->off;
-     OffsetNumber placed;

!     /* this must be static so it can be returned to caller. */
!     static ginxlogInsertEntry data;
!
!     /* quick exit if it doesn't fit */
      if (!entryIsEnoughSpace(btree, buf, off, insertData))
      {
!         entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
                         newlpage, newrpage);
!         return SPLIT;
      }

!     START_CRIT_SECTION();

      entryPreparePage(btree, page, off, insertData, updateblkno);

--- 508,564 ----
  }

  /*
!  * Prepare to insert data on an entry page.
   *
!  * If it will fit, return GPTP_INSERT after doing whatever setup is needed
!  * before we enter the insertion critical section.  *ptp_workspace can be
!  * set to pass information along to the execPlaceToPage function.
   *
!  * If it won't fit, perform a page split and return two temporary page
!  * images into *newlpage and *newrpage, with result GPTP_SPLIT.
   *
!  * In neither case should the given page buffer be modified here.
!  *
!  * Note: on insertion to an internal node, in addition to inserting the given
!  * item, the downlink of the existing item at stack->off will be updated to
!  * point to updateblkno.
   */
  static GinPlaceToPageRC
! entryBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                       void *insertPayload, BlockNumber updateblkno,
!                       void **ptp_workspace,
!                       Page *newlpage, Page *newrpage)
  {
      GinBtreeEntryInsertData *insertData = insertPayload;
      OffsetNumber off = stack->off;

!     /* If it doesn't fit, deal with split case */
      if (!entryIsEnoughSpace(btree, buf, off, insertData))
      {
!         entrySplitPage(btree, buf, stack, insertData, updateblkno,
                         newlpage, newrpage);
!         return GPTP_SPLIT;
      }

!     /* Else, we're ready to proceed with insertion */
!     return GPTP_INSERT;
! }
!
! /*
!  * Perform data insertion after beginPlaceToPage has decided it will fit.
!  *
!  * This is invoked within a critical section, and XLOG record creation (if
!  * needed) is already started.  The target buffer is registered in slot 0.
!  */
! static void
! entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
!                      void *insertPayload, BlockNumber updateblkno,
!                      void *ptp_workspace)
! {
!     GinBtreeEntryInsertData *insertData = insertPayload;
!     Page        page = BufferGetPage(buf);
!     OffsetNumber off = stack->off;
!     OffsetNumber placed;

      entryPreparePage(btree, page, off, insertData, updateblkno);

*************** entryPlaceToPage(GinBtree btree, Buffer
*** 554,587 ****

      if (RelationNeedsWAL(btree->index))
      {
          data.isDelete = insertData->isDelete;
          data.offset = off;

-         XLogBeginInsert();
-         XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
          XLogRegisterBufData(0, (char *) &data,
                              offsetof(ginxlogInsertEntry, tuple));
          XLogRegisterBufData(0, (char *) insertData->entry,
                              IndexTupleSize(insertData->entry));
      }
-
-     return INSERTED;
  }

  /*
!  * Place tuple and split page, original buffer(lbuf) leaves untouched,
!  * returns shadow pages filled with new data.
!  * Tuples are distributed between pages by equal size on its, not
!  * an equal number!
   */
  static void
  entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                void *insertPayload,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage)
  {
-     GinBtreeEntryInsertData *insertData = insertPayload;
      OffsetNumber off = stack->off;
      OffsetNumber i,
                  maxoff,
--- 572,607 ----

      if (RelationNeedsWAL(btree->index))
      {
+         /*
+          * This must be static, because it has to survive until XLogInsert,
+          * and we can't palloc here.  Ugly, but the XLogInsert infrastructure
+          * isn't reentrant anyway.
+          */
+         static ginxlogInsertEntry data;
+
          data.isDelete = insertData->isDelete;
          data.offset = off;

          XLogRegisterBufData(0, (char *) &data,
                              offsetof(ginxlogInsertEntry, tuple));
          XLogRegisterBufData(0, (char *) insertData->entry,
                              IndexTupleSize(insertData->entry));
      }
  }

  /*
!  * Split entry page and insert new data.
!  *
!  * Returns new temp pages to *newlpage and *newrpage.
!  * The original buffer is left untouched.
   */
  static void
  entrySplitPage(GinBtree btree, Buffer origbuf,
                 GinBtreeStack *stack,
!                GinBtreeEntryInsertData *insertData,
                 BlockNumber updateblkno,
                 Page *newlpage, Page *newrpage)
  {
      OffsetNumber off = stack->off;
      OffsetNumber i,
                  maxoff,
*************** entrySplitPage(GinBtree btree, Buffer or
*** 646,651 ****
--- 666,675 ----
      {
          itup = (IndexTuple) ptr;

+         /*
+          * Decide where to split.  We try to equalize the pages' total data
+          * size, not number of tuples.
+          */
          if (lsize > totalsize / 2)
          {
              if (separator == InvalidOffsetNumber)
*************** entrySplitPage(GinBtree btree, Buffer or
*** 663,668 ****
--- 687,693 ----
          ptr += MAXALIGN(IndexTupleSize(itup));
      }

+     /* return temp pages to caller */
      *newlpage = lpage;
      *newrpage = rpage;
  }
*************** ginPrepareEntryScan(GinBtree btree, Offs
*** 731,737 ****
      btree->isMoveRight = entryIsMoveRight;
      btree->findItem = entryLocateLeafEntry;
      btree->findChildPtr = entryFindChildPtr;
!     btree->placeToPage = entryPlaceToPage;
      btree->fillRoot = ginEntryFillRoot;
      btree->prepareDownlink = entryPrepareDownlink;

--- 756,763 ----
      btree->isMoveRight = entryIsMoveRight;
      btree->findItem = entryLocateLeafEntry;
      btree->findChildPtr = entryFindChildPtr;
!     btree->beginPlaceToPage = entryBeginPlaceToPage;
!     btree->execPlaceToPage = entryExecPlaceToPage;
      btree->fillRoot = ginEntryFillRoot;
      btree->prepareDownlink = entryPrepareDownlink;

diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h
index 06c5780..c937011 100644
*** a/src/include/access/gin_private.h
--- b/src/include/access/gin_private.h
*************** typedef struct GinBtreeStack
*** 637,648 ****

  typedef struct GinBtreeData *GinBtree;

! /* Return codes for GinBtreeData.placeToPage method */
  typedef enum
  {
!     UNMODIFIED,
!     INSERTED,
!     SPLIT
  } GinPlaceToPageRC;

  typedef struct GinBtreeData
--- 637,648 ----

  typedef struct GinBtreeData *GinBtree;

! /* Return codes for GinBtreeData.beginPlaceToPage method */
  typedef enum
  {
!     GPTP_NO_WORK,
!     GPTP_INSERT,
!     GPTP_SPLIT
  } GinPlaceToPageRC;

  typedef struct GinBtreeData
*************** typedef struct GinBtreeData
*** 655,661 ****

      /* insert methods */
      OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber);
!     GinPlaceToPageRC (*placeToPage) (GinBtree, Buffer, GinBtreeStack *, void *, BlockNumber, Page *, Page *);
      void       *(*prepareDownlink) (GinBtree, Buffer);
      void        (*fillRoot) (GinBtree, Page, BlockNumber, Page, BlockNumber, Page);

--- 655,662 ----

      /* insert methods */
      OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber);
!     GinPlaceToPageRC (*beginPlaceToPage) (GinBtree, Buffer, GinBtreeStack *, void *, BlockNumber, void **, Page *,
Page*); 
!     void        (*execPlaceToPage) (GinBtree, Buffer, GinBtreeStack *, void *, BlockNumber, void *);
      void       *(*prepareDownlink) (GinBtree, Buffer);
      void        (*fillRoot) (GinBtree, Page, BlockNumber, Page, BlockNumber, Page);


pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: Re: SET ROLE and reserved roles
Next
From: Fabrízio de Royes Mello
Date:
Subject: Re: "parallel= " information is not coming in pg_dumpall for create aggregate