Re: Warn when parallel restoring a custom dump without data offsets - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Warn when parallel restoring a custom dump without data offsets
Date
Msg-id 3172545.1594845659@sss.pgh.pa.us
Whole thread Raw
In response to Re: Warn when parallel restoring a custom dump without data offsets  (Tom Lane <tgl@sss.pgh.pa.us>)
Responses Re: Warn when parallel restoring a custom dump without data offsets
List pgsql-hackers
I wrote:
> The attached 0001 rewrites your 0001 as per the above ideas (dropping
> the proposed doc change for now), and includes your 0004 for simplicity.
> I'm including your 0002 verbatim just so the cfbot will be able to do a
> meaningful test on 0001; but as stated, I don't really want to commit it.

I spent some more time testing this, by trying to dump and restore the
core regression database.  I immediately noticed that I sometimes got
"ftell mismatch with expected position -- ftell used" warnings, though
it was a bit variable depending on the -j level.  The reason was fairly
apparent on looking at the code: we had various fseeko() calls in code
paths that did not bother to correct ctx->filePos afterwards.  In fact,
*none* of the four existing fseeko calls in pg_backup_custom.c did so.
It's fairly surprising that that hadn't caused a problem up to now.

I started to add adjustments of ctx->filePos after all the fseeko calls,
but then began to wonder why we don't just rip the variable out entirely.
The only places where we need it are to set dataPos for data blocks,
but that's an entirely pointless activity if we don't have seek
capability, because we're not going to be able to rewrite the TOC
to emit the updated values.

Hence, the 0000 patch attached rips out ctx->filePos, and then
0001 is the currently-discussed patch rebased on that.  I also added
an additional refinement, which is to track the furthest point we've
scanned to while looking for data blocks in an offset-less file.
If we have seek capability, then when we need to resume looking for
data blocks we can search forward from that spot rather than wherever
we happened to have stopped at.  This fixes an additional source
of potentially-O(N^2) behavior if we have to restore blocks in a
very out-of-order fashion.  I'm not sure that it makes much difference
in common cases, but with this we can say positively that we don't
scan the same block more than once per worker process.

I'm still unhappy about the proposed test case (0002), but now
I have a more concrete reason for that: it didn't catch this bug,
so the coverage is still pretty miserable.

Dump-and-restore-the-regression-database used to be a pretty common
manual test for pg_dump, but we never got around to automating it,
possibly because we figured that the pg_upgrade test script covers
that ground.  It's becoming gruesomely clear that pg_upgrade is a
distinct operating mode that doesn't necessarily have the same bugs.
So I'm inclined to feel that what we ought to do is automate a test
of that sort; but first we'll have to fix the existing bugs described
at [1][2].

Given the current state of affairs, I'm inclined to commit the
attached with no new test coverage, and then come back and look
at better testing after the other bugs are dealt with.

            regards, tom lane

[1] https://www.postgresql.org/message-id/3169466.1594841366%40sss.pgh.pa.us
[2] https://www.postgresql.org/message-id/3170626.1594842723%40sss.pgh.pa.us

diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 6ab122242c..3a9881d601 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -70,14 +70,12 @@ typedef struct
 {
     CompressorState *cs;
     int            hasSeek;
-    pgoff_t        filePos;
-    pgoff_t        dataStart;
 } lclContext;

 typedef struct
 {
     int            dataState;
-    pgoff_t        dataPos;
+    pgoff_t        dataPos;        /* valid only if dataState=K_OFFSET_POS_SET */
 } lclTocEntry;


@@ -144,8 +142,6 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
     AH->lo_buf_size = LOBBUFSIZE;
     AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);

-    ctx->filePos = 0;
-
     /*
      * Now open the file
      */
@@ -185,7 +181,6 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)

         ReadHead(AH);
         ReadToc(AH);
-        ctx->dataStart = _getFilePos(AH, ctx);
     }

 }
@@ -290,7 +285,8 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
     lclTocEntry *tctx = (lclTocEntry *) te->formatData;

     tctx->dataPos = _getFilePos(AH, ctx);
-    tctx->dataState = K_OFFSET_POS_SET;
+    if (tctx->dataPos >= 0)
+        tctx->dataState = K_OFFSET_POS_SET;

     _WriteByte(AH, BLK_DATA);    /* Block type */
     WriteInt(AH, te->dumpId);    /* For sanity check */
@@ -350,7 +346,8 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
     lclTocEntry *tctx = (lclTocEntry *) te->formatData;

     tctx->dataPos = _getFilePos(AH, ctx);
-    tctx->dataState = K_OFFSET_POS_SET;
+    if (tctx->dataPos >= 0)
+        tctx->dataState = K_OFFSET_POS_SET;

     _WriteByte(AH, BLK_BLOBS);    /* Block type */
     WriteInt(AH, te->dumpId);    /* For sanity check */
@@ -551,7 +548,6 @@ _skipBlobs(ArchiveHandle *AH)
 static void
 _skipData(ArchiveHandle *AH)
 {
-    lclContext *ctx = (lclContext *) AH->formatData;
     size_t        blkLen;
     char       *buf = NULL;
     int            buflen = 0;
@@ -575,8 +571,6 @@ _skipData(ArchiveHandle *AH)
                 fatal("could not read from input file: %m");
         }

-        ctx->filePos += blkLen;
-
         blkLen = ReadInt(AH);
     }

@@ -594,12 +588,10 @@ _skipData(ArchiveHandle *AH)
 static int
 _WriteByte(ArchiveHandle *AH, const int i)
 {
-    lclContext *ctx = (lclContext *) AH->formatData;
     int            res;

     if ((res = fputc(i, AH->FH)) == EOF)
         WRITE_ERROR_EXIT;
-    ctx->filePos += 1;

     return 1;
 }
@@ -615,13 +607,11 @@ _WriteByte(ArchiveHandle *AH, const int i)
 static int
 _ReadByte(ArchiveHandle *AH)
 {
-    lclContext *ctx = (lclContext *) AH->formatData;
     int            res;

     res = getc(AH->FH);
     if (res == EOF)
         READ_ERROR_EXIT(AH->FH);
-    ctx->filePos += 1;
     return res;
 }

@@ -635,11 +625,8 @@ _ReadByte(ArchiveHandle *AH)
 static void
 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
 {
-    lclContext *ctx = (lclContext *) AH->formatData;
-
     if (fwrite(buf, 1, len, AH->FH) != len)
         WRITE_ERROR_EXIT;
-    ctx->filePos += len;
 }

 /*
@@ -652,11 +639,8 @@ _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
 static void
 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
 {
-    lclContext *ctx = (lclContext *) AH->formatData;
-
     if (fread(buf, 1, len, AH->FH) != len)
         READ_ERROR_EXIT(AH->FH);
-    ctx->filePos += len;
 }

 /*
@@ -688,7 +672,6 @@ _CloseArchive(ArchiveHandle *AH)
         if (tpos < 0 && ctx->hasSeek)
             fatal("could not determine seek position in archive file: %m");
         WriteToc(AH);
-        ctx->dataStart = _getFilePos(AH, ctx);
         WriteDataChunks(AH, NULL);

         /*
@@ -862,30 +845,24 @@ _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)

 /*
  * Get the current position in the archive file.
+ *
+ * With a non-seekable archive file, we may not be able to obtain the
+ * file position.  If so, just return -1.  It's not too important in
+ * that case because we won't be able to rewrite the TOC to fill in
+ * data block offsets anyway.
  */
 static pgoff_t
 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
 {
     pgoff_t        pos;

-    if (ctx->hasSeek)
+    pos = ftello(AH->FH);
+    if (pos < 0)
     {
-        /*
-         * Prior to 1.7 (pg7.3) we relied on the internally maintained
-         * pointer.  Now we rely on ftello() always, unless the file has been
-         * found to not support it.  For debugging purposes, print a warning
-         * if the internal pointer disagrees, so that we're more likely to
-         * notice if something's broken about the internal position tracking.
-         */
-        pos = ftello(AH->FH);
-        if (pos < 0)
+        /* Not expected if we found we can seek. */
+        if (ctx->hasSeek)
             fatal("could not determine seek position in archive file: %m");
-
-        if (pos != ctx->filePos)
-            pg_log_warning("ftell mismatch with expected position -- ftell used");
     }
-    else
-        pos = ctx->filePos;
     return pos;
 }

@@ -897,7 +874,6 @@ _getFilePos(ArchiveHandle *AH, lclContext *ctx)
 static void
 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
 {
-    lclContext *ctx = (lclContext *) AH->formatData;
     int            byt;

     /*
@@ -918,7 +894,6 @@ _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
             *id = 0;            /* don't return an uninitialized value */
             return;
         }
-        ctx->filePos += 1;
     }

     *id = ReadInt(AH);
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 3a9881d601..971e6adf48 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -70,6 +70,8 @@ typedef struct
 {
     CompressorState *cs;
     int            hasSeek;
+    /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
+    pgoff_t        lastFilePos;    /* position after last data block we've read */
 } lclContext;

 typedef struct
@@ -181,8 +183,13 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)

         ReadHead(AH);
         ReadToc(AH);
-    }

+        /*
+         * Remember location of first data block (i.e., the point after TOC)
+         * in case we have to search for desired data blocks.
+         */
+        ctx->lastFilePos = _getFilePos(AH, ctx);
+    }
 }

 /*
@@ -418,13 +425,62 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
     {
         /*
          * We cannot seek directly to the desired block.  Instead, skip over
-         * block headers until we find the one we want.  This could fail if we
-         * are asked to restore items out-of-order.
+         * block headers until we find the one we want.  Remember the
+         * positions of skipped-over blocks, so that if we later decide we
+         * need to read one, we'll be able to seek to it.
+         *
+         * When our input file is seekable, we can do the search starting from
+         * the point after the last data block we scanned in previous
+         * iterations of this function.
          */
-        _readBlockHeader(AH, &blkType, &id);
+        if (ctx->hasSeek)
+        {
+            if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
+                fatal("error during file seek: %m");
+        }

-        while (blkType != EOF && id != te->dumpId)
+        for (;;)
         {
+            pgoff_t        thisBlkPos = _getFilePos(AH, ctx);
+
+            _readBlockHeader(AH, &blkType, &id);
+
+            if (blkType == EOF || id == te->dumpId)
+                break;
+
+            /* Remember the block position, if we got one */
+            if (thisBlkPos >= 0)
+            {
+                TocEntry   *otherte = getTocEntryByDumpId(AH, id);
+
+                if (otherte && otherte->formatData)
+                {
+                    lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
+
+                    /*
+                     * Note: on Windows, multiple threads might access/update
+                     * the same lclTocEntry concurrently, but that should be
+                     * safe as long as we update dataPos before dataState.
+                     * Ideally, we'd use pg_write_barrier() to enforce that,
+                     * but the needed infrastructure doesn't exist in frontend
+                     * code.  But Windows only runs on machines with strong
+                     * store ordering, so it should be okay for now.
+                     */
+                    if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
+                    {
+                        othertctx->dataPos = thisBlkPos;
+                        othertctx->dataState = K_OFFSET_POS_SET;
+                    }
+                    else if (othertctx->dataPos != thisBlkPos ||
+                             othertctx->dataState != K_OFFSET_POS_SET)
+                    {
+                        /* sanity check */
+                        pg_log_warning("data block %d has wrong seek position",
+                                       id);
+                    }
+                }
+            }
+
             switch (blkType)
             {
                 case BLK_DATA:
@@ -440,7 +496,6 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
                           blkType);
                     break;
             }
-            _readBlockHeader(AH, &blkType, &id);
         }
     }
     else
@@ -452,20 +507,18 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
         _readBlockHeader(AH, &blkType, &id);
     }

-    /* Produce suitable failure message if we fell off end of file */
+    /*
+     * If we reached EOF without finding the block we want, then either it
+     * doesn't exist, or it does but we lack the ability to seek back to it.
+     */
     if (blkType == EOF)
     {
-        if (tctx->dataState == K_OFFSET_POS_NOT_SET)
-            fatal("could not find block ID %d in archive -- "
-                  "possibly due to out-of-order restore request, "
-                  "which cannot be handled due to lack of data offsets in archive",
-                  te->dumpId);
-        else if (!ctx->hasSeek)
+        if (!ctx->hasSeek)
             fatal("could not find block ID %d in archive -- "
                   "possibly due to out-of-order restore request, "
                   "which cannot be handled due to non-seekable input file",
                   te->dumpId);
-        else                    /* huh, the dataPos led us to EOF? */
+        else
             fatal("could not find block ID %d in archive -- "
                   "possibly corrupt archive",
                   te->dumpId);
@@ -491,6 +544,20 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
                   blkType);
             break;
     }
+
+    /*
+     * If our input file is seekable but lacks data offsets, update our
+     * knowledge of where to start future searches from.  (Note that we did
+     * not update the current TE's dataState/dataPos.  We could have, but
+     * there is no point since it will not be visited again.)
+     */
+    if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
+    {
+        pgoff_t        curPos = _getFilePos(AH, ctx);
+
+        if (curPos > ctx->lastFilePos)
+            ctx->lastFilePos = curPos;
+    }
 }

 /*
@@ -548,6 +615,7 @@ _skipBlobs(ArchiveHandle *AH)
 static void
 _skipData(ArchiveHandle *AH)
 {
+    lclContext *ctx = (lclContext *) AH->formatData;
     size_t        blkLen;
     char       *buf = NULL;
     int            buflen = 0;
@@ -556,19 +624,27 @@ _skipData(ArchiveHandle *AH)
     blkLen = ReadInt(AH);
     while (blkLen != 0)
     {
-        if (blkLen > buflen)
+        if (ctx->hasSeek)
         {
-            if (buf)
-                free(buf);
-            buf = (char *) pg_malloc(blkLen);
-            buflen = blkLen;
+            if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
+                fatal("error during file seek: %m");
         }
-        if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
+        else
         {
-            if (feof(AH->FH))
-                fatal("could not read from input file: end of file");
-            else
-                fatal("could not read from input file: %m");
+            if (blkLen > buflen)
+            {
+                if (buf)
+                    free(buf);
+                buf = (char *) pg_malloc(blkLen);
+                buflen = blkLen;
+            }
+            if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
+            {
+                if (feof(AH->FH))
+                    fatal("could not read from input file: end of file");
+                else
+                    fatal("could not read from input file: %m");
+            }
         }

         blkLen = ReadInt(AH);
@@ -804,6 +880,9 @@ _Clone(ArchiveHandle *AH)
 {
     lclContext *ctx = (lclContext *) AH->formatData;

+    /*
+     * Each thread must have private lclContext working state.
+     */
     AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
     memcpy(AH->formatData, ctx, sizeof(lclContext));
     ctx = (lclContext *) AH->formatData;
@@ -813,10 +892,13 @@ _Clone(ArchiveHandle *AH)
         fatal("compressor active");

     /*
+     * We intentionally do not clone TOC-entry-local state: it's useful to
+     * share knowledge about where the data blocks are across threads.
+     * _PrintTocData has to be careful about the order of operations on that
+     * state, though.
+     *
      * Note: we do not make a local lo_buf because we expect at most one BLOBS
-     * entry per archive, so no parallelism is possible.  Likewise,
-     * TOC-entry-local state isn't an issue because any one TOC entry is
-     * touched by just one worker child.
+     * entry per archive, so no parallelism is possible.
      */
 }


pgsql-hackers by date:

Previous
From: Christoph Berg
Date:
Subject: Re: gs_group_1 crashing on 13beta2/s390x
Next
From: Tom Lane
Date:
Subject: Re: gs_group_1 crashing on 13beta2/s390x