Re: Refactor pg_rewind code and make it work against a standby - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Refactor pg_rewind code and make it work against a standby
Date
Msg-id 20200918.164150.1688011206252014871.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Refactor pg_rewind code and make it work against a standby  (Heikki Linnakangas <hlinnaka@iki.fi>)
Responses Re: Refactor pg_rewind code and make it work against a standby  (Soumyadeep Chakraborty <soumyadeep2007@gmail.com>)
Re: Refactor pg_rewind code and make it work against a standby  (Heikki Linnakangas <hlinnaka@iki.fi>)
List pgsql-hackers
Hello.

It needed rebasing. (Attached)

At Tue, 25 Aug 2020 16:32:02 +0300, Heikki Linnakangas <hlinnaka@iki.fi> wrote in 
> On 20/08/2020 11:32, Kyotaro Horiguchi wrote:
> > 0002: Rewording that old->target and new->source makes the meaning far
> 
> Good idea! I changed the patch that way.

Looks Good.

> > 0003: Thomas is propsing sort template. It could be used if committed.


+     * If the block is beyond the EOF in the source system, or the file doesn't
+     * doesn'exist in the source at all, we're going to truncate/remove it away

"the file doesn't doesn'exist"

I don't think filemap_finalize needs to iterate over filemap twice.

hash_string_pointer is a copy of that of pg_verifybackup.c. Is it
worth having in hashfn.h or .c?

> --- a/src/bin/pg_rewind/pg_rewind.c
> +++ b/src/bin/pg_rewind/pg_rewind.c
> ...
> +    filemap_t  *filemap;
> ..
> +    filemap_init();
> ...
> +    filemap = filemap_finalize();

I'm a bit confused by this, and realized that what filemap_init
initializes is *not* the filemap, but the filehash. So for example,
the name of the functions might should be something like this?

filehash_init()
filemap = filehash_finalyze()/create_filemap()


> > 0004:
> >   The names of many of the functions gets an additional word "local"
> >   but I don't get the meaning clearly. but its about linguistic sense
> >   and I'm not fit to that..
> >   -rewind_copy_file_range(const char *path, off_t begin, off_t end, bool
> >   -trunc)
> > +local_fetch_file_range(rewind_source *source, const char *path,
> > uint64 off,
> >   The function actually copying the soruce range to the target file. So
> >   "fetch" might give somewhat different meaning, but its about
> >   linguistic (omitted..).
> 
> Hmm. It is "fetching" the range from the source server, and writing it
> to the target. The term makes more sense with a libpq source. Perhaps
> this function should be called "local_copy_range" or something, but
> it'd also be nice to have "fetch" in the name because the function
> pointer it's assigned to is called "queue_fetch_range".

Thanks. Yeah libpq_fetch_file makes sense. I agree to the name.
The refactoring looks good to me.

> > I'm going to continue reviewing this later.
> 
> Thanks! Attached is a new set of patches. The only meaningful change
> is in the 2nd patch, which I modified per your suggestion. Also, I
> moved the logic to decide each file's fate into a new subroutine
> called decide_file_action().

That's looks good.

0005:

+    /*
+     * We don't intend do any updates.  Put the connection in read-only mode
+     * to keep us honest.
+     */
     run_simple_command(conn, "SET default_transaction_read_only = off");

The comment is wrong since the time it was added by 0004 but that's
not a problem since it was to be fixed by 0005. However, we need the
variable turned on in order to be really honest:p

> /*
>  * Also check that full_page_writes is enabled.  We can get torn pages if
>  * a page is modified while we read it with pg_read_binary_file(), and we
>  * rely on full page images to fix them.
>  */
> str = run_simple_query(conn, "SHOW full_page_writes");
> if (strcmp(str, "on") != 0)
>     pg_fatal("full_page_writes must be enabled in the source server");
> pg_free(str);

This is a part not changed by this patch set. But If we allow to
connect to a standby, this check can be tricked by setting off on the
primary and "on" on the standby (FWIW, though). Some protection
measure might be necessary. (Maybe standby should be restricted to
have the same value with the primary.)


+            thislen = Min(len, CHUNK_SIZE - prev->length);
+            src->request_queue[src->num_requests - 1].length += thislen;

prev == &src->request_queue[src->num_requests - 1] here.


+        if (chunksize > rq->length)
+        {
+            pg_fatal("received more than requested for file \"%s\"",
+                     rq->path);
+            /* receiving less is OK, though */

Don't we need to truncate the target file, though?


+         * Source is a local data directory. It should've shut down cleanly,
+         * and we must to the latest shutdown checkpoint.

"and we must to the" => "and we must replay to the" ?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 6cfbb5a686e5b87159aed9769b1e52859bae02b4 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:35 +0300
Subject: [PATCH v3 1/5] pg_rewind: Move syncTargetDirectory() to file_ops.c

For consistency. All the other low-level functions that operate on the
target directory are in file_ops.c.
---
 src/bin/pg_rewind/file_ops.c  | 19 +++++++++++++++++++
 src/bin/pg_rewind/file_ops.h  |  1 +
 src/bin/pg_rewind/pg_rewind.c | 22 +---------------------
 src/bin/pg_rewind/pg_rewind.h |  1 +
 4 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index b3bf091c54..55439db20b 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -19,6 +19,7 @@
 #include <unistd.h>
 
 #include "common/file_perm.h"
+#include "common/file_utils.h"
 #include "file_ops.h"
 #include "filemap.h"
 #include "pg_rewind.h"
@@ -266,6 +267,24 @@ remove_target_symlink(const char *path)
                  dstpath);
 }
 
+/*
+ * Sync target data directory to ensure that modifications are safely on disk.
+ *
+ * We do this once, for the whole data directory, for performance reasons.  At
+ * the end of pg_rewind's run, the kernel is likely to already have flushed
+ * most dirty buffers to disk.  Additionally fsync_pgdata uses a two-pass
+ * approach (only initiating writeback in the first pass), which often reduces
+ * the overall amount of IO noticeably.
+ */
+void
+sync_target_dir(void)
+{
+    if (!do_sync || dry_run)
+        return;
+
+    fsync_pgdata(datadir_target, PG_VERSION_NUM);
+}
+
 
 /*
  * Read a file into memory. The file to be read is <datadir>/<path>.
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
index 025f24141c..d8466385cf 100644
--- a/src/bin/pg_rewind/file_ops.h
+++ b/src/bin/pg_rewind/file_ops.h
@@ -19,6 +19,7 @@ extern void remove_target_file(const char *path, bool missing_ok);
 extern void truncate_target_file(const char *path, off_t newsize);
 extern void create_target(file_entry_t *t);
 extern void remove_target(file_entry_t *t);
+extern void sync_target_dir(void);
 
 extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
 
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 0ec52cb032..5a7ab764db 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -20,7 +20,6 @@
 #include "catalog/pg_control.h"
 #include "common/controldata_utils.h"
 #include "common/file_perm.h"
-#include "common/file_utils.h"
 #include "common/restricted_token.h"
 #include "common/string.h"
 #include "fe_utils/recovery_gen.h"
@@ -38,7 +37,6 @@ static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
 
 static void digestControlFile(ControlFileData *ControlFile, char *source,
                               size_t size);
-static void syncTargetDirectory(void);
 static void getRestoreCommand(const char *argv0);
 static void sanityChecks(void);
 static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@@ -455,7 +453,7 @@ main(int argc, char **argv)
 
     if (showprogress)
         pg_log_info("syncing target data directory");
-    syncTargetDirectory();
+    sync_target_dir();
 
     if (writerecoveryconf && !dry_run)
         WriteRecoveryConfig(conn, datadir_target,
@@ -803,24 +801,6 @@ digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
     checkControlFile(ControlFile);
 }
 
-/*
- * Sync target data directory to ensure that modifications are safely on disk.
- *
- * We do this once, for the whole data directory, for performance reasons.  At
- * the end of pg_rewind's run, the kernel is likely to already have flushed
- * most dirty buffers to disk.  Additionally fsync_pgdata uses a two-pass
- * approach (only initiating writeback in the first pass), which often reduces
- * the overall amount of IO noticeably.
- */
-static void
-syncTargetDirectory(void)
-{
-    if (!do_sync || dry_run)
-        return;
-
-    fsync_pgdata(datadir_target, PG_VERSION_NUM);
-}
-
 /*
  * Get value of GUC parameter restore_command from the target cluster.
  *
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
index 8a9319ed67..67f90c2a38 100644
--- a/src/bin/pg_rewind/pg_rewind.h
+++ b/src/bin/pg_rewind/pg_rewind.h
@@ -24,6 +24,7 @@ extern char *datadir_source;
 extern char *connstr_source;
 extern bool showprogress;
 extern bool dry_run;
+extern bool do_sync;
 extern int    WalSegSz;
 
 /* Target history */
-- 
2.18.4

From 3f96ae5ba68bdcba6dd32ae77f3b5d4f4dc5b1d6 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:37 +0300
Subject: [PATCH v3 2/5] Refactor pg_rewind for more clear decision making.

Deciding what to do with each file is now a separate step after all the
necessary information has been gathered. It is more clear that way.
Previously, the decision-making was divided between process_source_file()
and process_target_file(), and it was a bit hard to piece together what the
overall rules were.
---
 src/bin/pg_rewind/copy_fetch.c  |  14 +-
 src/bin/pg_rewind/file_ops.c    |  16 +-
 src/bin/pg_rewind/filemap.c     | 558 +++++++++++++++++---------------
 src/bin/pg_rewind/filemap.h     |  69 ++--
 src/bin/pg_rewind/libpq_fetch.c |  12 +-
 src/bin/pg_rewind/parsexlog.c   |   2 +-
 src/bin/pg_rewind/pg_rewind.c   |   8 +-
 7 files changed, 373 insertions(+), 306 deletions(-)

diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index 1edab5f186..18fad32600 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -210,7 +210,7 @@ copy_executeFileMap(filemap_t *map)
     for (i = 0; i < map->narray; i++)
     {
         entry = map->array[i];
-        execute_pagemap(&entry->pagemap, entry->path);
+        execute_pagemap(&entry->target_modified_pages, entry->path);
 
         switch (entry->action)
         {
@@ -219,16 +219,16 @@ copy_executeFileMap(filemap_t *map)
                 break;
 
             case FILE_ACTION_COPY:
-                rewind_copy_file_range(entry->path, 0, entry->newsize, true);
+                rewind_copy_file_range(entry->path, 0, entry->source_size, true);
                 break;
 
             case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->newsize);
+                truncate_target_file(entry->path, entry->source_size);
                 break;
 
             case FILE_ACTION_COPY_TAIL:
-                rewind_copy_file_range(entry->path, entry->oldsize,
-                                       entry->newsize, false);
+                rewind_copy_file_range(entry->path, entry->target_size,
+                                       entry->source_size, false);
                 break;
 
             case FILE_ACTION_CREATE:
@@ -238,6 +238,10 @@ copy_executeFileMap(filemap_t *map)
             case FILE_ACTION_REMOVE:
                 remove_target(entry);
                 break;
+
+            case FILE_ACTION_UNDECIDED:
+                pg_fatal("no action decided for \"%s\"", entry->path);
+                break;
         }
     }
 
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index 55439db20b..ec37d0b2e0 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -126,8 +126,9 @@ void
 remove_target(file_entry_t *entry)
 {
     Assert(entry->action == FILE_ACTION_REMOVE);
+    Assert(entry->target_exists);
 
-    switch (entry->type)
+    switch (entry->target_type)
     {
         case FILE_TYPE_DIRECTORY:
             remove_target_dir(entry->path);
@@ -140,6 +141,10 @@ remove_target(file_entry_t *entry)
         case FILE_TYPE_SYMLINK:
             remove_target_symlink(entry->path);
             break;
+
+        case FILE_TYPE_UNDEFINED:
+            pg_fatal("undefined file type for \"%s\"", entry->path);
+            break;
     }
 }
 
@@ -147,21 +152,26 @@ void
 create_target(file_entry_t *entry)
 {
     Assert(entry->action == FILE_ACTION_CREATE);
+    Assert(!entry->target_exists);
 
-    switch (entry->type)
+    switch (entry->source_type)
     {
         case FILE_TYPE_DIRECTORY:
             create_target_dir(entry->path);
             break;
 
         case FILE_TYPE_SYMLINK:
-            create_target_symlink(entry->path, entry->link_target);
+            create_target_symlink(entry->path, entry->source_link_target);
             break;
 
         case FILE_TYPE_REGULAR:
             /* can't happen. Regular files are created with open_target_file. */
             pg_fatal("invalid action (CREATE) for regular file");
             break;
+
+        case FILE_TYPE_UNDEFINED:
+            pg_fatal("undefined file type for \"%s\"", entry->path);
+            break;
     }
 }
 
diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
index 1abc257177..6d19d2be61 100644
--- a/src/bin/pg_rewind/filemap.c
+++ b/src/bin/pg_rewind/filemap.c
@@ -26,6 +26,8 @@ static bool isRelDataFile(const char *path);
 static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
                          BlockNumber segno);
 static int    path_cmp(const void *a, const void *b);
+
+static file_entry_t *get_filemap_entry(const char *path, bool create);
 static int    final_filemap_cmp(const void *a, const void *b);
 static void filemap_list_to_array(filemap_t *map);
 static bool check_file_excluded(const char *path, bool is_source);
@@ -146,6 +148,65 @@ filemap_create(void)
     filemap = map;
 }
 
+/* Look up or create entry for 'path' */
+static file_entry_t *
+get_filemap_entry(const char *path, bool create)
+{
+    filemap_t  *map = filemap;
+    file_entry_t *entry;
+    file_entry_t **e;
+    file_entry_t key;
+    file_entry_t *key_ptr;
+
+    if (map->array)
+    {
+        key.path = (char *) path;
+        key_ptr = &key;
+        e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
+                    path_cmp);
+    }
+    else
+        e = NULL;
+
+    if (e)
+        entry = *e;
+    else if (!create)
+        entry = NULL;
+    else
+    {
+        /* Create a new entry for this file */
+        entry = pg_malloc(sizeof(file_entry_t));
+        entry->path = pg_strdup(path);
+        entry->isrelfile = isRelDataFile(path);
+        entry->action = FILE_ACTION_UNDECIDED;
+
+        entry->target_exists = false;
+        entry->target_type = FILE_TYPE_UNDEFINED;
+        entry->target_size = 0;
+        entry->target_link_target = NULL;
+        entry->target_modified_pages.bitmap = NULL;
+        entry->target_modified_pages.bitmapsize = 0;
+
+        entry->source_exists = false;
+        entry->source_type = FILE_TYPE_UNDEFINED;
+        entry->source_size = 0;
+        entry->source_link_target = NULL;
+
+        entry->next = NULL;
+
+        if (map->last)
+        {
+            map->last->next = entry;
+            map->last = entry;
+        }
+        else
+            map->first = map->last = entry;
+        map->nlist++;
+    }
+
+    return entry;
+}
+
 /*
  * Callback for processing source file list.
  *
@@ -154,25 +215,12 @@ filemap_create(void)
  * exists in the target and whether the size matches.
  */
 void
-process_source_file(const char *path, file_type_t type, size_t newsize,
+process_source_file(const char *path, file_type_t type, size_t size,
                     const char *link_target)
 {
-    bool        exists;
-    char        localpath[MAXPGPATH];
-    struct stat statbuf;
-    filemap_t  *map = filemap;
-    file_action_t action = FILE_ACTION_NONE;
-    size_t        oldsize = 0;
     file_entry_t *entry;
 
-    Assert(map->array == NULL);
-
-    /*
-     * Skip any files matching the exclusion filters. This has the effect to
-     * remove all those files on the target.
-     */
-    if (check_file_excluded(path, true))
-        return;
+    Assert(filemap->array == NULL);
 
     /*
      * Pretend that pg_wal is a directory, even if it's really a symlink. We
@@ -182,16 +230,6 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
     if (strcmp(path, "pg_wal") == 0 && type == FILE_TYPE_SYMLINK)
         type = FILE_TYPE_DIRECTORY;
 
-    /*
-     * Skip temporary files, .../pgsql_tmp/... and .../pgsql_tmp.* in source.
-     * This has the effect that all temporary files in the destination will be
-     * removed.
-     */
-    if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL)
-        return;
-    if (strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
-        return;
-
     /*
      * sanity check: a filename that looks like a data file better be a
      * regular file
@@ -199,142 +237,12 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
     if (type != FILE_TYPE_REGULAR && isRelDataFile(path))
         pg_fatal("data file \"%s\" in source is not a regular file", path);
 
-    snprintf(localpath, sizeof(localpath), "%s/%s", datadir_target, path);
-
-    /* Does the corresponding file exist in the target data dir? */
-    if (lstat(localpath, &statbuf) < 0)
-    {
-        if (errno != ENOENT)
-            pg_fatal("could not stat file \"%s\": %m",
-                     localpath);
-
-        exists = false;
-    }
-    else
-        exists = true;
-
-    switch (type)
-    {
-        case FILE_TYPE_DIRECTORY:
-            if (exists && !S_ISDIR(statbuf.st_mode) && strcmp(path, "pg_wal") != 0)
-            {
-                /* it's a directory in source, but not in target. Strange.. */
-                pg_fatal("\"%s\" is not a directory", localpath);
-            }
-
-            if (!exists)
-                action = FILE_ACTION_CREATE;
-            else
-                action = FILE_ACTION_NONE;
-            oldsize = 0;
-            break;
-
-        case FILE_TYPE_SYMLINK:
-            if (exists &&
-#ifndef WIN32
-                !S_ISLNK(statbuf.st_mode)
-#else
-                !pgwin32_is_junction(localpath)
-#endif
-                )
-            {
-                /*
-                 * It's a symbolic link in source, but not in target.
-                 * Strange..
-                 */
-                pg_fatal("\"%s\" is not a symbolic link", localpath);
-            }
-
-            if (!exists)
-                action = FILE_ACTION_CREATE;
-            else
-                action = FILE_ACTION_NONE;
-            oldsize = 0;
-            break;
-
-        case FILE_TYPE_REGULAR:
-            if (exists && !S_ISREG(statbuf.st_mode))
-                pg_fatal("\"%s\" is not a regular file", localpath);
-
-            if (!exists || !isRelDataFile(path))
-            {
-                /*
-                 * File exists in source, but not in target. Or it's a
-                 * non-data file that we have no special processing for. Copy
-                 * it in toto.
-                 *
-                 * An exception: PG_VERSIONs should be identical, but avoid
-                 * overwriting it for paranoia.
-                 */
-                if (pg_str_endswith(path, "PG_VERSION"))
-                {
-                    action = FILE_ACTION_NONE;
-                    oldsize = statbuf.st_size;
-                }
-                else
-                {
-                    action = FILE_ACTION_COPY;
-                    oldsize = 0;
-                }
-            }
-            else
-            {
-                /*
-                 * It's a data file that exists in both.
-                 *
-                 * If it's larger in target, we can truncate it. There will
-                 * also be a WAL record of the truncation in the source
-                 * system, so WAL replay would eventually truncate the target
-                 * too, but we might as well do it now.
-                 *
-                 * If it's smaller in the target, it means that it has been
-                 * truncated in the target, or enlarged in the source, or
-                 * both. If it was truncated in the target, we need to copy
-                 * the missing tail from the source system. If it was enlarged
-                 * in the source system, there will be WAL records in the
-                 * source system for the new blocks, so we wouldn't need to
-                 * copy them here. But we don't know which scenario we're
-                 * dealing with, and there's no harm in copying the missing
-                 * blocks now, so do it now.
-                 *
-                 * If it's the same size, do nothing here. Any blocks modified
-                 * in the target will be copied based on parsing the target
-                 * system's WAL, and any blocks modified in the source will be
-                 * updated after rewinding, when the source system's WAL is
-                 * replayed.
-                 */
-                oldsize = statbuf.st_size;
-                if (oldsize < newsize)
-                    action = FILE_ACTION_COPY_TAIL;
-                else if (oldsize > newsize)
-                    action = FILE_ACTION_TRUNCATE;
-                else
-                    action = FILE_ACTION_NONE;
-            }
-            break;
-    }
-
-    /* Create a new entry for this file */
-    entry = pg_malloc(sizeof(file_entry_t));
-    entry->path = pg_strdup(path);
-    entry->type = type;
-    entry->action = action;
-    entry->oldsize = oldsize;
-    entry->newsize = newsize;
-    entry->link_target = link_target ? pg_strdup(link_target) : NULL;
-    entry->next = NULL;
-    entry->pagemap.bitmap = NULL;
-    entry->pagemap.bitmapsize = 0;
-    entry->isrelfile = isRelDataFile(path);
-
-    if (map->last)
-    {
-        map->last->next = entry;
-        map->last = entry;
-    }
-    else
-        map->first = map->last = entry;
-    map->nlist++;
+    /* Remember this source file */
+    entry = get_filemap_entry(path, true);
+    entry->source_exists = true;
+    entry->source_type = type;
+    entry->source_size = size;
+    entry->source_link_target = link_target ? pg_strdup(link_target) : NULL;
 }
 
 /*
@@ -345,12 +253,9 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
  * deletion.
  */
 void
-process_target_file(const char *path, file_type_t type, size_t oldsize,
+process_target_file(const char *path, file_type_t type, size_t size,
                     const char *link_target)
 {
-    bool        exists;
-    file_entry_t key;
-    file_entry_t *key_ptr;
     filemap_t  *map = filemap;
     file_entry_t *entry;
 
@@ -377,120 +282,76 @@ process_target_file(const char *path, file_type_t type, size_t oldsize,
     }
 
     /*
-     * Like in process_source_file, pretend that xlog is always a  directory.
+     * Like in process_source_file, pretend that pg_wal is always a directory.
      */
     if (strcmp(path, "pg_wal") == 0 && type == FILE_TYPE_SYMLINK)
         type = FILE_TYPE_DIRECTORY;
 
-    key.path = (char *) path;
-    key_ptr = &key;
-    exists = (bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
-                      path_cmp) != NULL);
-
-    /* Remove any file or folder that doesn't exist in the source system. */
-    if (!exists)
-    {
-        entry = pg_malloc(sizeof(file_entry_t));
-        entry->path = pg_strdup(path);
-        entry->type = type;
-        entry->action = FILE_ACTION_REMOVE;
-        entry->oldsize = oldsize;
-        entry->newsize = 0;
-        entry->link_target = link_target ? pg_strdup(link_target) : NULL;
-        entry->next = NULL;
-        entry->pagemap.bitmap = NULL;
-        entry->pagemap.bitmapsize = 0;
-        entry->isrelfile = isRelDataFile(path);
-
-        if (map->last == NULL)
-            map->first = entry;
-        else
-            map->last->next = entry;
-        map->last = entry;
-        map->nlist++;
-    }
-    else
-    {
-        /*
-         * We already handled all files that exist in the source system in
-         * process_source_file().
-         */
-    }
+    /* Remember this target file */
+    entry = get_filemap_entry(path, true);
+    entry->target_exists = true;
+    entry->target_type = type;
+    entry->target_size = size;
+    entry->target_link_target = link_target ? pg_strdup(link_target) : NULL;
 }
 
 /*
  * This callback gets called while we read the WAL in the target, for every
- * block that have changed in the target system. It makes note of all the
+ * block that have changed in the target system.  It makes note of all the
  * changed blocks in the pagemap of the file.
+ *
+ * NOTE: All the files on both systems must have already been added to the
+ * file map!
  */
 void
-process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
+process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
+                                BlockNumber blkno)
 {
     char       *path;
-    file_entry_t key;
-    file_entry_t *key_ptr;
     file_entry_t *entry;
     BlockNumber blkno_inseg;
     int            segno;
-    filemap_t  *map = filemap;
-    file_entry_t **e;
 
-    Assert(map->array);
+    Assert(filemap->array);
 
     segno = blkno / RELSEG_SIZE;
     blkno_inseg = blkno % RELSEG_SIZE;
 
     path = datasegpath(rnode, forknum, segno);
-
-    key.path = (char *) path;
-    key_ptr = &key;
-
-    e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
-                path_cmp);
-    if (e)
-        entry = *e;
-    else
-        entry = NULL;
+    entry = get_filemap_entry(path, false);
     pfree(path);
 
     if (entry)
     {
+        int64        end_offset;
+
         Assert(entry->isrelfile);
 
-        switch (entry->action)
-        {
-            case FILE_ACTION_NONE:
-            case FILE_ACTION_TRUNCATE:
-                /* skip if we're truncating away the modified block anyway */
-                if ((blkno_inseg + 1) * BLCKSZ <= entry->newsize)
-                    datapagemap_add(&entry->pagemap, blkno_inseg);
-                break;
-
-            case FILE_ACTION_COPY_TAIL:
-
-                /*
-                 * skip the modified block if it is part of the "tail" that
-                 * we're copying anyway.
-                 */
-                if ((blkno_inseg + 1) * BLCKSZ <= entry->oldsize)
-                    datapagemap_add(&entry->pagemap, blkno_inseg);
-                break;
-
-            case FILE_ACTION_COPY:
-            case FILE_ACTION_REMOVE:
-                break;
-
-            case FILE_ACTION_CREATE:
-                pg_fatal("unexpected page modification for directory or symbolic link \"%s\"", entry->path);
-        }
+        if (entry->target_type != FILE_TYPE_REGULAR)
+            pg_fatal("unexpected page modification for directory or symbolic link \"%s\"",
+                     entry->path);
+
+        /*
+         * If the block beyond the EOF in the source system, no need to
+         * remember it now, because we're going to truncate it away from the
+         * target anyway. Also no need to remember the block if it's beyond
+         * the current EOF in the target system; we will copy it over with the
+         * "tail" from the source system, anyway.
+         */
+        end_offset = (blkno_inseg + 1) * BLCKSZ;
+        if (end_offset <= entry->source_size &&
+            end_offset <= entry->target_size)
+            datapagemap_add(&entry->target_modified_pages, blkno_inseg);
     }
     else
     {
         /*
          * If we don't have any record of this file in the file map, it means
-         * that it's a relation that doesn't exist in the source system, and
-         * it was subsequently removed in the target system, too. We can
-         * safely ignore it.
+         * that it's a relation that doesn't exist in the source system.  It
+         * could exist in the target system; we haven't moved the target-only
+         * entries from the linked list to the array yet!  But in any case, if
+         * it doesn't exist in the source it will be removed from the target
+         * too, and we can safely ignore it.
          */
     }
 }
@@ -581,16 +442,6 @@ filemap_list_to_array(filemap_t *map)
     map->first = map->last = NULL;
 }
 
-void
-filemap_finalize(void)
-{
-    filemap_t  *map = filemap;
-
-    filemap_list_to_array(map);
-    qsort(map->array, map->narray, sizeof(file_entry_t *),
-          final_filemap_cmp);
-}
-
 static const char *
 action_to_str(file_action_t action)
 {
@@ -631,26 +482,26 @@ calculate_totals(void)
     {
         entry = map->array[i];
 
-        if (entry->type != FILE_TYPE_REGULAR)
+        if (entry->source_type != FILE_TYPE_REGULAR)
             continue;
 
-        map->total_size += entry->newsize;
+        map->total_size += entry->source_size;
 
         if (entry->action == FILE_ACTION_COPY)
         {
-            map->fetch_size += entry->newsize;
+            map->fetch_size += entry->source_size;
             continue;
         }
 
         if (entry->action == FILE_ACTION_COPY_TAIL)
-            map->fetch_size += (entry->newsize - entry->oldsize);
+            map->fetch_size += (entry->source_size - entry->target_size);
 
-        if (entry->pagemap.bitmapsize > 0)
+        if (entry->target_modified_pages.bitmapsize > 0)
         {
             datapagemap_iterator_t *iter;
             BlockNumber blk;
 
-            iter = datapagemap_iterate(&entry->pagemap);
+            iter = datapagemap_iterate(&entry->target_modified_pages);
             while (datapagemap_next(iter, &blk))
                 map->fetch_size += BLCKSZ;
 
@@ -670,13 +521,13 @@ print_filemap(void)
     {
         entry = map->array[i];
         if (entry->action != FILE_ACTION_NONE ||
-            entry->pagemap.bitmapsize > 0)
+            entry->target_modified_pages.bitmapsize > 0)
         {
             pg_log_debug("%s (%s)", entry->path,
                          action_to_str(entry->action));
 
-            if (entry->pagemap.bitmapsize > 0)
-                datapagemap_print(&entry->pagemap);
+            if (entry->target_modified_pages.bitmapsize > 0)
+                datapagemap_print(&entry->target_modified_pages);
         }
     }
     fflush(stdout);
@@ -825,3 +676,180 @@ final_filemap_cmp(const void *a, const void *b)
     else
         return strcmp(fa->path, fb->path);
 }
+
+/*
+ * Decide what action to perform to a file.
+ */
+static file_action_t
+decide_file_action(file_entry_t *entry)
+{
+    const char *path = entry->path;
+
+    /*
+     * Don't touch the control file. It is handled specially, after copying
+     * all the other files.
+     */
+    if (strcmp(path, "global/pg_control") == 0)
+        return FILE_ACTION_NONE;
+
+    /*
+     * Remove all files matching the exclusion filters in the target.
+     */
+    if (check_file_excluded(path, true))
+    {
+        if (entry->target_exists)
+            return FILE_ACTION_REMOVE;
+        else
+            return FILE_ACTION_NONE;
+    }
+
+    /*
+     * Also remove all temporary files, .../pgsql_tmp/... and .../pgsql_tmp.*
+     * in the target.
+     */
+    if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL ||
+        strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
+    {
+        if (entry->target_exists)
+            return FILE_ACTION_REMOVE;
+        else
+            return FILE_ACTION_NONE;
+    }
+
+    /*
+     * Handle cases where the file is missing from one of the systems.
+     */
+    if (!entry->target_exists && entry->source_exists)
+    {
+        /*
+         * File exists in source, but not in target. Copy it in toto. (If it's
+         * a relation data file, WAL replay after rewinding should re-create
+         * it anyway. But there's no harm in copying it now.)
+         */
+        switch (entry->source_type)
+        {
+            case FILE_TYPE_DIRECTORY:
+            case FILE_TYPE_SYMLINK:
+                return FILE_ACTION_CREATE;
+            case FILE_TYPE_REGULAR:
+                return FILE_ACTION_COPY;
+            case FILE_TYPE_UNDEFINED:
+                pg_fatal("unknown file type for \"%s\"", entry->path);
+                break;
+        }
+    }
+    else if (entry->target_exists && !entry->source_exists)
+    {
+        /* File exists in target, but not source. Remove it. */
+        return FILE_ACTION_REMOVE;
+    }
+    else if (!entry->target_exists && !entry->source_exists)
+    {
+        /*
+         * Doesn't exist in either server. Why does it have an entry in the
+         * first place??
+         */
+        return FILE_ACTION_NONE;
+    }
+
+    /*
+     * Otherwise, the file exists on both systems
+     */
+    Assert(entry->target_exists && entry->source_exists);
+
+    if (entry->source_type != entry->target_type)
+    {
+        /* But it's a different kind of object. Strange.. */
+        pg_fatal("file \"%s\" is of different type in source and target", entry->path);
+    }
+
+    /*
+     * PG_VERSION files should be identical on both systems, but avoid
+     * overwriting them for paranoia.
+     */
+    if (pg_str_endswith(entry->path, "PG_VERSION"))
+        return FILE_ACTION_NONE;
+
+    switch (entry->source_type)
+    {
+        case FILE_TYPE_DIRECTORY:
+            return FILE_ACTION_NONE;
+
+        case FILE_TYPE_SYMLINK:
+            /* FIXME: Check if it points to the same target? */
+            return FILE_ACTION_NONE;
+
+        case FILE_TYPE_REGULAR:
+            if (!entry->isrelfile)
+            {
+                /*
+                 * It's a non-data file that we have no special processing
+                 * for. Copy it in toto.
+                 */
+                return FILE_ACTION_COPY;
+            }
+            else
+            {
+                /*
+                 * It's a data file that exists in both systems.
+                 *
+                 * If it's larger in target, we can truncate it. There will
+                 * also be a WAL record of the truncation in the source
+                 * system, so WAL replay would eventually truncate the target
+                 * too, but we might as well do it now.
+                 *
+                 * If it's smaller in the target, it means that it has been
+                 * truncated in the target, or enlarged in the source, or
+                 * both. If it was truncated in the target, we need to copy
+                 * the missing tail from the source system. If it was enlarged
+                 * in the source system, there will be WAL records in the
+                 * source system for the new blocks, so we wouldn't need to
+                 * copy them here. But we don't know which scenario we're
+                 * dealing with, and there's no harm in copying the missing
+                 * blocks now, so do it now.
+                 *
+                 * If it's the same size, do nothing here. Any blocks modified
+                 * in the target will be copied based on parsing the target
+                 * system's WAL, and any blocks modified in the source will be
+                 * updated after rewinding, when the source system's WAL is
+                 * replayed.
+                 */
+                if (entry->target_size < entry->source_size)
+                    return FILE_ACTION_COPY_TAIL;
+                else if (entry->target_size > entry->source_size)
+                    return FILE_ACTION_TRUNCATE;
+                else
+                    return FILE_ACTION_NONE;
+            }
+            break;
+
+        case FILE_TYPE_UNDEFINED:
+            pg_fatal("unknown file type for \"%s\"", path);
+            break;
+    }
+
+    /* unreachable */
+    pg_fatal("could not decide what to do with file \"%s\"", path);
+}
+
+/*
+ * Decide what to do with each file.
+ */
+void
+filemap_finalize()
+{
+    int            i;
+
+    filemap_list_to_array(filemap);
+
+    for (i = 0; i < filemap->narray; i++)
+    {
+        file_entry_t *entry = filemap->array[i];
+
+        entry->action = decide_file_action(entry);
+    }
+
+    /* Sort the actions to the order that they should be performed */
+    qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
+          final_filemap_cmp);
+}
diff --git a/src/bin/pg_rewind/filemap.h b/src/bin/pg_rewind/filemap.h
index 0cb7425170..a5e8df57f4 100644
--- a/src/bin/pg_rewind/filemap.h
+++ b/src/bin/pg_rewind/filemap.h
@@ -14,17 +14,21 @@
 
 /*
  * For every file found in the local or remote system, we have a file entry
- * which says what we are going to do with the file. For relation files,
- * there is also a page map, marking pages in the file that were changed
- * locally.
- *
- * The enum values are sorted in the order we want actions to be processed.
+ * that contains information about the file on both systems.  For relation
+ * files, there is also a page map that marks pages in the file that were
+ * changed in the target after the last common checkpoint.  Each entry also
+ * contains an 'action' field, which says what we are going to do with the
+ * file.
  */
+
+/* these enum values are sorted in the order we want actions to be processed */
 typedef enum
 {
+    FILE_ACTION_UNDECIDED = 0,    /* not decided yet */
+
     FILE_ACTION_CREATE,            /* create local directory or symbolic link */
     FILE_ACTION_COPY,            /* copy whole file, overwriting if exists */
-    FILE_ACTION_COPY_TAIL,        /* copy tail from 'oldsize' to 'newsize' */
+    FILE_ACTION_COPY_TAIL,        /* copy tail from 'source_size' to 'target_size' */
     FILE_ACTION_NONE,            /* no action (we might still copy modified
                                  * blocks based on the parsed WAL) */
     FILE_ACTION_TRUNCATE,        /* truncate local file to 'newsize' bytes */
@@ -33,6 +37,8 @@ typedef enum
 
 typedef enum
 {
+    FILE_TYPE_UNDEFINED = 0,
+
     FILE_TYPE_REGULAR,
     FILE_TYPE_DIRECTORY,
     FILE_TYPE_SYMLINK
@@ -41,19 +47,30 @@ typedef enum
 typedef struct file_entry_t
 {
     char       *path;
-    file_type_t type;
-
-    file_action_t action;
-
-    /* for a regular file */
-    size_t        oldsize;
-    size_t        newsize;
     bool        isrelfile;        /* is it a relation data file? */
 
-    datapagemap_t pagemap;
+    /*
+     * Status of the file in the target.
+     */
+    bool        target_exists;
+    file_type_t target_type;
+    size_t        target_size; /* for a regular file */
+    char       *target_link_target; /* for a symlink */
 
-    /* for a symlink */
-    char       *link_target;
+    datapagemap_t target_modified_pages;
+
+    /*
+     * Status of the file in the source.
+     */
+    bool        source_exists;
+    file_type_t source_type;
+    size_t        source_size;
+    char       *source_link_target; /* for a symlink */
+
+    /*
+     * What will we do to the file?
+     */
+    file_action_t action;
 
     struct file_entry_t *next;
 } file_entry_t;
@@ -70,20 +87,19 @@ typedef struct filemap_t
 
     /*
      * After processing all the remote files, the entries in the linked list
-     * are moved to this array. After processing local files, too, all the
+     * are moved to this array.  After processing local files, too, all the
      * local entries are added to the array by filemap_finalize, and sorted in
-     * the final order. After filemap_finalize, all the entries are in the
+     * the final order.  After filemap_finalize, all the entries are in the
      * array, and the linked list is empty.
      */
     file_entry_t **array;
     int            narray;            /* current length of array */
 
     /*
-     * Summary information. total_size is the total size of the source
-     * cluster, and fetch_size is the number of bytes that needs to be copied.
+     * Summary information.
      */
-    uint64        total_size;
-    uint64        fetch_size;
+    uint64        total_size;        /* total size of the source cluster */
+    uint64        fetch_size;        /* number of bytes that needs to be copied */
 } filemap_t;
 
 extern filemap_t *filemap;
@@ -94,11 +110,12 @@ extern void print_filemap(void);
 
 /* Functions for populating the filemap */
 extern void process_source_file(const char *path, file_type_t type,
-                                size_t newsize, const char *link_target);
+                                size_t size, const char *link_target);
 extern void process_target_file(const char *path, file_type_t type,
-                                size_t newsize, const char *link_target);
-extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
-                                 BlockNumber blkno);
+                                size_t size, const char *link_target);
+extern void process_target_wal_block_change(ForkNumber forknum,
+                                            RelFileNode rnode,
+                                            BlockNumber blkno);
 extern void filemap_finalize(void);
 
 #endif                            /* FILEMAP_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index bf4dfc23b9..7fc9161b8c 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -465,7 +465,7 @@ libpq_executeFileMap(filemap_t *map)
         entry = map->array[i];
 
         /* If this is a relation file, copy the modified blocks */
-        execute_pagemap(&entry->pagemap, entry->path);
+        execute_pagemap(&entry->target_modified_pages, entry->path);
 
         switch (entry->action)
         {
@@ -476,15 +476,15 @@ libpq_executeFileMap(filemap_t *map)
             case FILE_ACTION_COPY:
                 /* Truncate the old file out of the way, if any */
                 open_target_file(entry->path, true);
-                fetch_file_range(entry->path, 0, entry->newsize);
+                fetch_file_range(entry->path, 0, entry->source_size);
                 break;
 
             case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->newsize);
+                truncate_target_file(entry->path, entry->source_size);
                 break;
 
             case FILE_ACTION_COPY_TAIL:
-                fetch_file_range(entry->path, entry->oldsize, entry->newsize);
+                fetch_file_range(entry->path, entry->target_size, entry->source_size);
                 break;
 
             case FILE_ACTION_REMOVE:
@@ -494,6 +494,10 @@ libpq_executeFileMap(filemap_t *map)
             case FILE_ACTION_CREATE:
                 create_target(entry);
                 break;
+
+            case FILE_ACTION_UNDECIDED:
+                pg_fatal("no action decided for \"%s\"", entry->path);
+                break;
         }
     }
 
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 2229c86f9a..2baeb74ae9 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -436,6 +436,6 @@ extractPageInfo(XLogReaderState *record)
         if (forknum != MAIN_FORKNUM)
             continue;
 
-        process_block_change(forknum, rnode, blkno);
+        process_target_wal_block_change(forknum, rnode, blkno);
     }
 }
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 5a7ab764db..e0ed1759cb 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -369,7 +369,7 @@ main(int argc, char **argv)
                 chkpttli);
 
     /*
-     * Build the filemap, by comparing the source and target data directories.
+     * Collect information about all files in the target and source systems.
      */
     filemap_create();
     if (showprogress)
@@ -390,8 +390,12 @@ main(int argc, char **argv)
         pg_log_info("reading WAL in target");
     extractPageMap(datadir_target, chkptrec, lastcommontliIndex,
                    ControlFile_target.checkPoint, restore_command);
+
+    /*
+     * We have collected all information we need from both systems. Decide
+     * what to do with each file.
+     */
     filemap_finalize();
-
     if (showprogress)
         calculate_totals();
 
-- 
2.18.4

From 8c5bc72c286a5ca97a3df8e12d8c0fdf35456ead Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:39 +0300
Subject: [PATCH v3 3/5] pg_rewind: Replace the hybrid list+array data
 structure with simplehash.

Now that simplehash can be use in frontend code, let's make use of it.
---
 src/bin/pg_rewind/copy_fetch.c  |   4 +-
 src/bin/pg_rewind/fetch.c       |   2 +-
 src/bin/pg_rewind/fetch.h       |   2 +-
 src/bin/pg_rewind/filemap.c     | 286 ++++++++++++++------------------
 src/bin/pg_rewind/filemap.h     |  67 +++-----
 src/bin/pg_rewind/libpq_fetch.c |   4 +-
 src/bin/pg_rewind/pg_rewind.c   |  12 +-
 7 files changed, 168 insertions(+), 209 deletions(-)

diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index 18fad32600..61aed8018b 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -207,9 +207,9 @@ copy_executeFileMap(filemap_t *map)
     file_entry_t *entry;
     int            i;
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < map->nactions; i++)
     {
-        entry = map->array[i];
+        entry = map->actions[i];
         execute_pagemap(&entry->target_modified_pages, entry->path);
 
         switch (entry->action)
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
index f18fe5386e..f41d0f295e 100644
--- a/src/bin/pg_rewind/fetch.c
+++ b/src/bin/pg_rewind/fetch.c
@@ -37,7 +37,7 @@ fetchSourceFileList(void)
  * Fetch all relation data files that are marked in the given data page map.
  */
 void
-executeFileMap(void)
+execute_file_actions(filemap_t *filemap)
 {
     if (datadir_source)
         copy_executeFileMap(filemap);
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
index 7cf8b6ea09..b20df8b153 100644
--- a/src/bin/pg_rewind/fetch.h
+++ b/src/bin/pg_rewind/fetch.h
@@ -25,7 +25,7 @@
  */
 extern void fetchSourceFileList(void);
 extern char *fetchFile(const char *filename, size_t *filesize);
-extern void executeFileMap(void);
+extern void execute_file_actions(filemap_t *filemap);
 
 /* in libpq_fetch.c */
 extern void libpqProcessFileList(void);
diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
index 6d19d2be61..e6e037d1cd 100644
--- a/src/bin/pg_rewind/filemap.c
+++ b/src/bin/pg_rewind/filemap.c
@@ -3,6 +3,19 @@
  * filemap.c
  *      A data structure for keeping track of files that have changed.
  *
+ * This source file contains the logic to decide what to do with different
+ * kinds of files, and the data structure to support it.  Before modifying
+ * anything, pg_rewind collects information about all the files and their
+ * attributes in the target and source data directories.  It also scans the
+ * WAL log in the target, and collects information about data blocks that
+ * were changed.  All this information is stored in a hash table, using the
+ * file path, relative to the root of the data directory, as the key.
+ *
+ * After collecting all the information required, the filemap_finalize()
+ * function scans the hash table and decides what action needs to be taken
+ * for each file.  Finally, it sorts the array to the final order that the
+ * actions should be executed in.
+ *
  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
  *-------------------------------------------------------------------------
@@ -14,22 +27,39 @@
 #include <unistd.h>
 
 #include "catalog/pg_tablespace_d.h"
+#include "common/hashfn.h"
 #include "common/string.h"
 #include "datapagemap.h"
 #include "filemap.h"
 #include "pg_rewind.h"
 #include "storage/fd.h"
 
-filemap_t  *filemap = NULL;
+/*
+ * Define a hash table which we can use to store information about the files
+ * mentioned in the backup manifest.
+ */
+static uint32 hash_string_pointer(const char *s);
+#define SH_PREFIX        filehash
+#define SH_ELEMENT_TYPE    file_entry_t
+#define SH_KEY_TYPE        const char *
+#define    SH_KEY            path
+#define SH_HASH_KEY(tb, key)    hash_string_pointer(key)
+#define SH_EQUAL(tb, a, b)        (strcmp(a, b) == 0)
+#define    SH_SCOPE        static inline
+#define SH_RAW_ALLOCATOR    pg_malloc0
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+static filehash_hash *filehash;
 
 static bool isRelDataFile(const char *path);
 static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
                          BlockNumber segno);
-static int    path_cmp(const void *a, const void *b);
 
-static file_entry_t *get_filemap_entry(const char *path, bool create);
+static file_entry_t *insert_filehash_entry(const char *path);
+static file_entry_t *lookup_filehash_entry(const char *path);
 static int    final_filemap_cmp(const void *a, const void *b);
-static void filemap_list_to_array(filemap_t *map);
 static bool check_file_excluded(const char *path, bool is_source);
 
 /*
@@ -131,54 +161,26 @@ static const struct exclude_list_item excludeFiles[] =
 };
 
 /*
- * Create a new file map (stored in the global pointer "filemap").
+ * Initialize the hash table for the file map.
  */
 void
-filemap_create(void)
+filemap_init(void)
 {
-    filemap_t  *map;
-
-    map = pg_malloc(sizeof(filemap_t));
-    map->first = map->last = NULL;
-    map->nlist = 0;
-    map->array = NULL;
-    map->narray = 0;
-
-    Assert(filemap == NULL);
-    filemap = map;
+    filehash = filehash_create(1000, NULL);
 }
 
-/* Look up or create entry for 'path' */
+/* Look up entry for 'path', creating new one if it doesn't exists */
 static file_entry_t *
-get_filemap_entry(const char *path, bool create)
+insert_filehash_entry(const char *path)
 {
-    filemap_t  *map = filemap;
     file_entry_t *entry;
-    file_entry_t **e;
-    file_entry_t key;
-    file_entry_t *key_ptr;
+    bool        found;
 
-    if (map->array)
+    entry = filehash_insert(filehash, path, &found);
+    if (!found)
     {
-        key.path = (char *) path;
-        key_ptr = &key;
-        e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
-                    path_cmp);
-    }
-    else
-        e = NULL;
-
-    if (e)
-        entry = *e;
-    else if (!create)
-        entry = NULL;
-    else
-    {
-        /* Create a new entry for this file */
-        entry = pg_malloc(sizeof(file_entry_t));
         entry->path = pg_strdup(path);
         entry->isrelfile = isRelDataFile(path);
-        entry->action = FILE_ACTION_UNDECIDED;
 
         entry->target_exists = false;
         entry->target_type = FILE_TYPE_UNDEFINED;
@@ -192,21 +194,18 @@ get_filemap_entry(const char *path, bool create)
         entry->source_size = 0;
         entry->source_link_target = NULL;
 
-        entry->next = NULL;
-
-        if (map->last)
-        {
-            map->last->next = entry;
-            map->last = entry;
-        }
-        else
-            map->first = map->last = entry;
-        map->nlist++;
+        entry->action = FILE_ACTION_UNDECIDED;
     }
 
     return entry;
 }
 
+static file_entry_t *
+lookup_filehash_entry(const char *path)
+{
+    return filehash_lookup(filehash, path);
+}
+
 /*
  * Callback for processing source file list.
  *
@@ -220,8 +219,6 @@ process_source_file(const char *path, file_type_t type, size_t size,
 {
     file_entry_t *entry;
 
-    Assert(filemap->array == NULL);
-
     /*
      * Pretend that pg_wal is a directory, even if it's really a symlink. We
      * don't want to mess with the symlink itself, nor complain if it's a
@@ -238,7 +235,9 @@ process_source_file(const char *path, file_type_t type, size_t size,
         pg_fatal("data file \"%s\" in source is not a regular file", path);
 
     /* Remember this source file */
-    entry = get_filemap_entry(path, true);
+    entry = insert_filehash_entry(path);
+    if (entry->source_exists)
+        pg_fatal("duplicate source file \"%s\"", path);
     entry->source_exists = true;
     entry->source_type = type;
     entry->source_size = size;
@@ -256,7 +255,6 @@ void
 process_target_file(const char *path, file_type_t type, size_t size,
                     const char *link_target)
 {
-    filemap_t  *map = filemap;
     file_entry_t *entry;
 
     /*
@@ -265,22 +263,6 @@ process_target_file(const char *path, file_type_t type, size_t size,
      * the source data folder when processing the source files.
      */
 
-    if (map->array == NULL)
-    {
-        /* on first call, initialize lookup array */
-        if (map->nlist == 0)
-        {
-            /* should not happen */
-            pg_fatal("source file list is empty");
-        }
-
-        filemap_list_to_array(map);
-
-        Assert(map->array != NULL);
-
-        qsort(map->array, map->narray, sizeof(file_entry_t *), path_cmp);
-    }
-
     /*
      * Like in process_source_file, pretend that pg_wal is always a directory.
      */
@@ -288,7 +270,9 @@ process_target_file(const char *path, file_type_t type, size_t size,
         type = FILE_TYPE_DIRECTORY;
 
     /* Remember this target file */
-    entry = get_filemap_entry(path, true);
+    entry = insert_filehash_entry(path);
+    if (entry->target_exists)
+        pg_fatal("duplicate source file \"%s\"", path);
     entry->target_exists = true;
     entry->target_type = type;
     entry->target_size = size;
@@ -301,7 +285,7 @@ process_target_file(const char *path, file_type_t type, size_t size,
  * changed blocks in the pagemap of the file.
  *
  * NOTE: All the files on both systems must have already been added to the
- * file map!
+ * hash table!
  */
 void
 process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
@@ -312,47 +296,45 @@ process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
     BlockNumber blkno_inseg;
     int            segno;
 
-    Assert(filemap->array);
-
     segno = blkno / RELSEG_SIZE;
     blkno_inseg = blkno % RELSEG_SIZE;
 
     path = datasegpath(rnode, forknum, segno);
-    entry = get_filemap_entry(path, false);
+    entry = lookup_filehash_entry(path);
     pfree(path);
 
+    /*
+     * If the block still exists in both systems, remember it. Otherwise we
+     * can safely ignore it.
+     *
+     * If the block is beyond the EOF in the source system, or the file doesn't
+     * doesn'exist in the source at all, we're going to truncate/remove it away
+     * from the target anyway. Likewise, if it doesn't exist in the target
+     * anymore, we will copy it over with the "tail" from the source system,
+     * anyway.
+     *
+     * It is possible to find WAL for a file that doesn't exist on either
+     * system anymore. It means that the relation was dropped later in the
+     * target system, and independently on the source system too, or that
+     * it was created and dropped in the target system and it never existed
+     * in the source. Either way, we can safely ignore it.
+     */
     if (entry)
     {
-        int64        end_offset;
-
         Assert(entry->isrelfile);
 
         if (entry->target_type != FILE_TYPE_REGULAR)
             pg_fatal("unexpected page modification for directory or symbolic link \"%s\"",
                      entry->path);
 
-        /*
-         * If the block beyond the EOF in the source system, no need to
-         * remember it now, because we're going to truncate it away from the
-         * target anyway. Also no need to remember the block if it's beyond
-         * the current EOF in the target system; we will copy it over with the
-         * "tail" from the source system, anyway.
-         */
-        end_offset = (blkno_inseg + 1) * BLCKSZ;
-        if (end_offset <= entry->source_size &&
-            end_offset <= entry->target_size)
-            datapagemap_add(&entry->target_modified_pages, blkno_inseg);
-    }
-    else
-    {
-        /*
-         * If we don't have any record of this file in the file map, it means
-         * that it's a relation that doesn't exist in the source system.  It
-         * could exist in the target system; we haven't moved the target-only
-         * entries from the linked list to the array yet!  But in any case, if
-         * it doesn't exist in the source it will be removed from the target
-         * too, and we can safely ignore it.
-         */
+        if (entry->target_exists && entry->source_exists)
+        {
+            off_t        end_offset;
+
+            end_offset = (blkno_inseg + 1) * BLCKSZ;
+            if (end_offset <= entry->source_size && end_offset <= entry->target_size)
+                datapagemap_add(&entry->target_modified_pages, blkno_inseg);
+        }
     }
 }
 
@@ -414,34 +396,6 @@ check_file_excluded(const char *path, bool is_source)
     return false;
 }
 
-/*
- * Convert the linked list of entries in map->first/last to the array,
- * map->array.
- */
-static void
-filemap_list_to_array(filemap_t *map)
-{
-    int            narray;
-    file_entry_t *entry,
-               *next;
-
-    map->array = (file_entry_t **)
-        pg_realloc(map->array,
-                   (map->nlist + map->narray) * sizeof(file_entry_t *));
-
-    narray = map->narray;
-    for (entry = map->first; entry != NULL; entry = next)
-    {
-        map->array[narray++] = entry;
-        next = entry->next;
-        entry->next = NULL;
-    }
-    Assert(narray == map->nlist + map->narray);
-    map->narray = narray;
-    map->nlist = 0;
-    map->first = map->last = NULL;
-}
-
 static const char *
 action_to_str(file_action_t action)
 {
@@ -469,32 +423,31 @@ action_to_str(file_action_t action)
  * Calculate the totals needed for progress reports.
  */
 void
-calculate_totals(void)
+calculate_totals(filemap_t *filemap)
 {
     file_entry_t *entry;
     int            i;
-    filemap_t  *map = filemap;
 
-    map->total_size = 0;
-    map->fetch_size = 0;
+    filemap->total_size = 0;
+    filemap->fetch_size = 0;
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < filemap->nactions; i++)
     {
-        entry = map->array[i];
+        entry = filemap->actions[i];
 
         if (entry->source_type != FILE_TYPE_REGULAR)
             continue;
 
-        map->total_size += entry->source_size;
+        filemap->total_size += entry->source_size;
 
         if (entry->action == FILE_ACTION_COPY)
         {
-            map->fetch_size += entry->source_size;
+            filemap->fetch_size += entry->source_size;
             continue;
         }
 
         if (entry->action == FILE_ACTION_COPY_TAIL)
-            map->fetch_size += (entry->source_size - entry->target_size);
+            filemap->fetch_size += (entry->source_size - entry->target_size);
 
         if (entry->target_modified_pages.bitmapsize > 0)
         {
@@ -503,7 +456,7 @@ calculate_totals(void)
 
             iter = datapagemap_iterate(&entry->target_modified_pages);
             while (datapagemap_next(iter, &blk))
-                map->fetch_size += BLCKSZ;
+                filemap->fetch_size += BLCKSZ;
 
             pg_free(iter);
         }
@@ -511,15 +464,14 @@ calculate_totals(void)
 }
 
 void
-print_filemap(void)
+print_filemap(filemap_t *filemap)
 {
-    filemap_t  *map = filemap;
     file_entry_t *entry;
     int            i;
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < filemap->nactions; i++)
     {
-        entry = map->array[i];
+        entry = filemap->actions[i];
         if (entry->action != FILE_ACTION_NONE ||
             entry->target_modified_pages.bitmapsize > 0)
         {
@@ -641,15 +593,6 @@ datasegpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
         return path;
 }
 
-static int
-path_cmp(const void *a, const void *b)
-{
-    file_entry_t *fa = *((file_entry_t **) a);
-    file_entry_t *fb = *((file_entry_t **) b);
-
-    return strcmp(fa->path, fb->path);
-}
-
 /*
  * In the final stage, the filemap is sorted so that removals come last.
  * From disk space usage point of view, it would be better to do removals
@@ -835,21 +778,48 @@ decide_file_action(file_entry_t *entry)
 /*
  * Decide what to do with each file.
  */
-void
+filemap_t *
 filemap_finalize()
 {
     int            i;
+    filehash_iterator it;
+    file_entry_t *entry;
+    filemap_t  *filemap;
 
-    filemap_list_to_array(filemap);
-
-    for (i = 0; i < filemap->narray; i++)
+    filehash_start_iterate(filehash, &it);
+    while ((entry = filehash_iterate(filehash, &it)) != NULL)
     {
-        file_entry_t *entry = filemap->array[i];
-
         entry->action = decide_file_action(entry);
     }
 
-    /* Sort the actions to the order that they should be performed */
-    qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
+    /*
+     * Turn the hash table into an array, sorted in the order that the actions
+     * should be performed.
+     */
+    filemap = pg_malloc(offsetof(filemap_t, actions) +
+                        filehash->members * sizeof(file_entry_t *));
+    filemap->nactions = filehash->members;
+    filehash_start_iterate(filehash, &it);
+    i = 0;
+    while ((entry = filehash_iterate(filehash, &it)) != NULL)
+    {
+        filemap->actions[i++] = entry;
+    }
+
+    qsort(&filemap->actions, filemap->nactions, sizeof(file_entry_t *),
           final_filemap_cmp);
+
+    return filemap;
+}
+
+
+/*
+ * Helper function for filemap hash table.
+ */
+static uint32
+hash_string_pointer(const char *s)
+{
+    unsigned char *ss = (unsigned char *) s;
+
+    return hash_bytes(ss, strlen(s));
 }
diff --git a/src/bin/pg_rewind/filemap.h b/src/bin/pg_rewind/filemap.h
index a5e8df57f4..3660ffe099 100644
--- a/src/bin/pg_rewind/filemap.h
+++ b/src/bin/pg_rewind/filemap.h
@@ -12,15 +12,6 @@
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
-/*
- * For every file found in the local or remote system, we have a file entry
- * that contains information about the file on both systems.  For relation
- * files, there is also a page map that marks pages in the file that were
- * changed in the target after the last common checkpoint.  Each entry also
- * contains an 'action' field, which says what we are going to do with the
- * file.
- */
-
 /* these enum values are sorted in the order we want actions to be processed */
 typedef enum
 {
@@ -44,9 +35,21 @@ typedef enum
     FILE_TYPE_SYMLINK
 } file_type_t;
 
+/*
+ * For every file found in the local or remote system, we have a file entry
+ * that contains information about the file on both systems.  For relation
+ * files, there is also a page map that marks pages in the file that were
+ * changed in the target after the last common checkpoint.
+ *
+ * When gathering information, these are kept in a hash table, private to
+ * filemap.c. filemap_finalize() fills in the 'action' field, sorts all the
+ * entries, and returns them in an array, ready for executing the actions.
+ */
 typedef struct file_entry_t
 {
-    char       *path;
+    uint32        status;            /* hash status */
+
+    const char *path;
     bool        isrelfile;        /* is it a relation data file? */
 
     /*
@@ -71,44 +74,25 @@ typedef struct file_entry_t
      * What will we do to the file?
      */
     file_action_t action;
-
-    struct file_entry_t *next;
 } file_entry_t;
 
+/*
+ * This represents the final decisions on what to do with each file.
+ * 'actions' array contains an entry for each file, sorted in the order
+ * that their actions should executed.
+ */
 typedef struct filemap_t
 {
-    /*
-     * New entries are accumulated to a linked list, in process_source_file
-     * and process_target_file.
-     */
-    file_entry_t *first;
-    file_entry_t *last;
-    int            nlist;            /* number of entries currently in list */
-
-    /*
-     * After processing all the remote files, the entries in the linked list
-     * are moved to this array.  After processing local files, too, all the
-     * local entries are added to the array by filemap_finalize, and sorted in
-     * the final order.  After filemap_finalize, all the entries are in the
-     * array, and the linked list is empty.
-     */
-    file_entry_t **array;
-    int            narray;            /* current length of array */
-
-    /*
-     * Summary information.
-     */
+    /* Summary information, filled by calculate_totals() */
     uint64        total_size;        /* total size of the source cluster */
     uint64        fetch_size;        /* number of bytes that needs to be copied */
+
+    int            nactions;        /* size of 'actions' array */
+    file_entry_t *actions[FLEXIBLE_ARRAY_MEMBER];
 } filemap_t;
 
-extern filemap_t *filemap;
-
-extern void filemap_create(void);
-extern void calculate_totals(void);
-extern void print_filemap(void);
-
 /* Functions for populating the filemap */
+extern void filemap_init(void);
 extern void process_source_file(const char *path, file_type_t type,
                                 size_t size, const char *link_target);
 extern void process_target_file(const char *path, file_type_t type,
@@ -116,6 +100,9 @@ extern void process_target_file(const char *path, file_type_t type,
 extern void process_target_wal_block_change(ForkNumber forknum,
                                             RelFileNode rnode,
                                             BlockNumber blkno);
-extern void filemap_finalize(void);
+
+extern filemap_t *filemap_finalize(void);
+extern void calculate_totals(filemap_t *filemap);
+extern void print_filemap(filemap_t *filemap);
 
 #endif                            /* FILEMAP_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 7fc9161b8c..9c541bb73d 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -460,9 +460,9 @@ libpq_executeFileMap(filemap_t *map)
                  PQresultErrorMessage(res));
     PQclear(res);
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < map->nactions; i++)
     {
-        entry = map->array[i];
+        entry = map->actions[i];
 
         /* If this is a relation file, copy the modified blocks */
         execute_pagemap(&entry->target_modified_pages, entry->path);
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index e0ed1759cb..13b4eab52b 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -129,6 +129,7 @@ main(int argc, char **argv)
     TimeLineID    endtli;
     ControlFileData ControlFile_new;
     bool        writerecoveryconf = false;
+    filemap_t  *filemap;
 
     pg_logging_init(argv[0]);
     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_rewind"));
@@ -371,10 +372,11 @@ main(int argc, char **argv)
     /*
      * Collect information about all files in the target and source systems.
      */
-    filemap_create();
     if (showprogress)
         pg_log_info("reading source file list");
+    filemap_init();
     fetchSourceFileList();
+
     if (showprogress)
         pg_log_info("reading target file list");
     traverse_datadir(datadir_target, &process_target_file);
@@ -395,13 +397,13 @@ main(int argc, char **argv)
      * We have collected all information we need from both systems. Decide
      * what to do with each file.
      */
-    filemap_finalize();
+    filemap = filemap_finalize();
     if (showprogress)
-        calculate_totals();
+        calculate_totals(filemap);
 
     /* this is too verbose even for verbose mode */
     if (debug)
-        print_filemap();
+        print_filemap(filemap);
 
     /*
      * Ok, we're ready to start copying things over.
@@ -421,7 +423,7 @@ main(int argc, char **argv)
      * modified the target directory and there is no turning back!
      */
 
-    executeFileMap();
+    execute_file_actions(filemap);
 
     progress_report(true);
 
-- 
2.18.4

From 45130573b47b1e8260387383a50012c0f9ebc263 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:41 +0300
Subject: [PATCH v3 4/5] pg_rewind: Refactor the abstraction to fetch from
 local/libpq source.

There copy_executeFileMap() and libpq_executeFileMap() contained basically
the same logic, just calling different functions to fetch the source files.
Refactor so that the common logic is in one place, execute_file_actions().

This makes the abstraction of a "source" server more clear, by introducing
a common abstract class, borrowing the object-oriented programming term,
that represents all the operations that can be done on the source server.
There are two implementations of it, one for fetching via libpq, and
another to fetch from a local directory. This adds some code, but makes it
easier to understand what's going on.
---
 src/bin/pg_rewind/copy_fetch.c  | 239 +++++----------------
 src/bin/pg_rewind/fetch.c       |  97 ++++++---
 src/bin/pg_rewind/fetch.h       |  76 +++++--
 src/bin/pg_rewind/file_ops.c    | 129 +++++++++++-
 src/bin/pg_rewind/file_ops.h    |   3 +
 src/bin/pg_rewind/libpq_fetch.c | 363 +++++++++++++++-----------------
 src/bin/pg_rewind/pg_rewind.c   |  70 ++++--
 src/bin/pg_rewind/pg_rewind.h   |   5 -
 8 files changed, 528 insertions(+), 454 deletions(-)

diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index 61aed8018b..9927a45a07 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * copy_fetch.c
- *      Functions for using a data directory as the source.
+ *      Functions for using a local data directory as the source.
  *
  * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
@@ -9,8 +9,6 @@
  */
 #include "postgres_fe.h"
 
-#include <sys/stat.h>
-#include <dirent.h>
 #include <fcntl.h>
 #include <unistd.h>
 
@@ -20,146 +18,70 @@
 #include "filemap.h"
 #include "pg_rewind.h"
 
-static void recurse_dir(const char *datadir, const char *path,
-                        process_file_callback_t callback);
+typedef struct
+{
+    rewind_source common;    /* common interface functions */
+
+    const char *datadir;    /* path to the source data directory */
+} local_source;
 
-static void execute_pagemap(datapagemap_t *pagemap, const char *path);
+static void local_traverse_files(rewind_source *source,
+                                 process_file_callback_t callback);
+static char *local_fetch_file(rewind_source *source, const char *path,
+                              size_t *filesize);
+static void local_fetch_file_range(rewind_source *source, const char *path,
+                                   uint64 off, size_t len);
+static void local_finish_fetch(rewind_source *source);
+static void local_destroy(rewind_source *source);
 
-/*
- * Traverse through all files in a data directory, calling 'callback'
- * for each file.
- */
-void
-traverse_datadir(const char *datadir, process_file_callback_t callback)
+rewind_source *
+init_local_source(const char *datadir)
 {
-    recurse_dir(datadir, NULL, callback);
+    local_source *src;
+
+    src = pg_malloc0(sizeof(local_source));
+
+    src->common.traverse_files = local_traverse_files;
+    src->common.fetch_file = local_fetch_file;
+    src->common.queue_fetch_range = local_fetch_file_range;
+    src->common.finish_fetch = local_finish_fetch;
+    src->common.get_current_wal_insert_lsn = NULL;
+    src->common.destroy = local_destroy;
+
+    src->datadir = datadir;
+
+    return &src->common;
 }
 
-/*
- * recursive part of traverse_datadir
- *
- * parentpath is the current subdirectory's path relative to datadir,
- * or NULL at the top level.
- */
 static void
-recurse_dir(const char *datadir, const char *parentpath,
-            process_file_callback_t callback)
+local_traverse_files(rewind_source *source, process_file_callback_t callback)
 {
-    DIR           *xldir;
-    struct dirent *xlde;
-    char        fullparentpath[MAXPGPATH];
+    traverse_datadir(((local_source *) source)->datadir, &process_source_file);
+}
 
-    if (parentpath)
-        snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
-    else
-        snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
-
-    xldir = opendir(fullparentpath);
-    if (xldir == NULL)
-        pg_fatal("could not open directory \"%s\": %m",
-                 fullparentpath);
-
-    while (errno = 0, (xlde = readdir(xldir)) != NULL)
-    {
-        struct stat fst;
-        char        fullpath[MAXPGPATH * 2];
-        char        path[MAXPGPATH * 2];
-
-        if (strcmp(xlde->d_name, ".") == 0 ||
-            strcmp(xlde->d_name, "..") == 0)
-            continue;
-
-        snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
-
-        if (lstat(fullpath, &fst) < 0)
-        {
-            if (errno == ENOENT)
-            {
-                /*
-                 * File doesn't exist anymore. This is ok, if the new primary
-                 * is running and the file was just removed. If it was a data
-                 * file, there should be a WAL record of the removal. If it
-                 * was something else, it couldn't have been anyway.
-                 *
-                 * TODO: But complain if we're processing the target dir!
-                 */
-            }
-            else
-                pg_fatal("could not stat file \"%s\": %m",
-                         fullpath);
-        }
-
-        if (parentpath)
-            snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
-        else
-            snprintf(path, sizeof(path), "%s", xlde->d_name);
-
-        if (S_ISREG(fst.st_mode))
-            callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
-        else if (S_ISDIR(fst.st_mode))
-        {
-            callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
-            /* recurse to handle subdirectories */
-            recurse_dir(datadir, path, callback);
-        }
-#ifndef WIN32
-        else if (S_ISLNK(fst.st_mode))
-#else
-        else if (pgwin32_is_junction(fullpath))
-#endif
-        {
-#if defined(HAVE_READLINK) || defined(WIN32)
-            char        link_target[MAXPGPATH];
-            int            len;
-
-            len = readlink(fullpath, link_target, sizeof(link_target));
-            if (len < 0)
-                pg_fatal("could not read symbolic link \"%s\": %m",
-                         fullpath);
-            if (len >= sizeof(link_target))
-                pg_fatal("symbolic link \"%s\" target is too long",
-                         fullpath);
-            link_target[len] = '\0';
-
-            callback(path, FILE_TYPE_SYMLINK, 0, link_target);
-
-            /*
-             * If it's a symlink within pg_tblspc, we need to recurse into it,
-             * to process all the tablespaces.  We also follow a symlink if
-             * it's for pg_wal.  Symlinks elsewhere are ignored.
-             */
-            if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
-                strcmp(path, "pg_wal") == 0)
-                recurse_dir(datadir, path, callback);
-#else
-            pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
-                     fullpath);
-#endif                            /* HAVE_READLINK */
-        }
-    }
-
-    if (errno)
-        pg_fatal("could not read directory \"%s\": %m",
-                 fullparentpath);
-
-    if (closedir(xldir))
-        pg_fatal("could not close directory \"%s\": %m",
-                 fullparentpath);
+static char *
+local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
+{
+    return slurpFile(((local_source *) source)->datadir, path, filesize);
 }
 
 /*
- * Copy a file from source to target, between 'begin' and 'end' offsets.
+ * Copy a file from source to target, starting at 'off', for 'len' bytes.
  *
  * If 'trunc' is true, any existing file with the same name is truncated.
  */
 static void
-rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
+local_fetch_file_range(rewind_source *source, const char *path, uint64 off,
+                       size_t len)
 {
+    const char *datadir = ((local_source *) source)->datadir;
     PGAlignedBlock buf;
     char        srcpath[MAXPGPATH];
     int            srcfd;
+    uint64        begin = off;
+    uint64        end = off + len;
 
-    snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir_source, path);
+    snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
 
     srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
     if (srcfd < 0)
@@ -169,7 +91,7 @@ rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
     if (lseek(srcfd, begin, SEEK_SET) == -1)
         pg_fatal("could not seek in source file: %m");
 
-    open_target_file(path, trunc);
+    open_target_file(path, false);
 
     while (end - begin > 0)
     {
@@ -197,70 +119,17 @@ rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
         pg_fatal("could not close file \"%s\": %m", srcpath);
 }
 
-/*
- * Copy all relation data files from datadir_source to datadir_target, which
- * are marked in the given data page map.
- */
-void
-copy_executeFileMap(filemap_t *map)
+static void
+local_finish_fetch(rewind_source *source)
 {
-    file_entry_t *entry;
-    int            i;
-
-    for (i = 0; i < map->nactions; i++)
-    {
-        entry = map->actions[i];
-        execute_pagemap(&entry->target_modified_pages, entry->path);
-
-        switch (entry->action)
-        {
-            case FILE_ACTION_NONE:
-                /* ok, do nothing.. */
-                break;
-
-            case FILE_ACTION_COPY:
-                rewind_copy_file_range(entry->path, 0, entry->source_size, true);
-                break;
-
-            case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->source_size);
-                break;
-
-            case FILE_ACTION_COPY_TAIL:
-                rewind_copy_file_range(entry->path, entry->target_size,
-                                       entry->source_size, false);
-                break;
-
-            case FILE_ACTION_CREATE:
-                create_target(entry);
-                break;
-
-            case FILE_ACTION_REMOVE:
-                remove_target(entry);
-                break;
-
-            case FILE_ACTION_UNDECIDED:
-                pg_fatal("no action decided for \"%s\"", entry->path);
-                break;
-        }
-    }
-
-    close_target_file();
+    /*
+     * Nothing to do, local_fetch_file_range() performs the fetching
+     * immediately.
+     */
 }
 
 static void
-execute_pagemap(datapagemap_t *pagemap, const char *path)
+local_destroy(rewind_source *source)
 {
-    datapagemap_iterator_t *iter;
-    BlockNumber blkno;
-    off_t        offset;
-
-    iter = datapagemap_iterate(pagemap);
-    while (datapagemap_next(iter, &blkno))
-    {
-        offset = blkno * BLCKSZ;
-        rewind_copy_file_range(path, offset, offset + BLCKSZ, false);
-        /* Ok, this block has now been copied from new data dir to old */
-    }
-    pg_free(iter);
+    pfree(source);
 }
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
index f41d0f295e..c8ee38f8e0 100644
--- a/src/bin/pg_rewind/fetch.c
+++ b/src/bin/pg_rewind/fetch.c
@@ -24,37 +24,78 @@
 #include "filemap.h"
 #include "pg_rewind.h"
 
-void
-fetchSourceFileList(void)
-{
-    if (datadir_source)
-        traverse_datadir(datadir_source, &process_source_file);
-    else
-        libpqProcessFileList();
-}
-
 /*
- * Fetch all relation data files that are marked in the given data page map.
+ * Execute the actions in the file map, fetching data from the source
+ * system as needed.
  */
 void
-execute_file_actions(filemap_t *filemap)
+execute_file_actions(filemap_t *filemap, rewind_source *source)
 {
-    if (datadir_source)
-        copy_executeFileMap(filemap);
-    else
-        libpq_executeFileMap(filemap);
-}
+    int            i;
 
-/*
- * Fetch a single file into a malloc'd buffer. The file size is returned
- * in *filesize. The returned buffer is always zero-terminated, which is
- * handy for text files.
- */
-char *
-fetchFile(const char *filename, size_t *filesize)
-{
-    if (datadir_source)
-        return slurpFile(datadir_source, filename, filesize);
-    else
-        return libpqGetFile(filename, filesize);
+    for (i = 0; i < filemap->nactions; i++)
+    {
+        file_entry_t *entry = filemap->actions[i];
+        datapagemap_iterator_t *iter;
+        BlockNumber blkno;
+        off_t        offset;
+
+        /*
+         * If this is a relation file, copy the modified blocks.
+         *
+         * This is in addition to any other changes.
+         */
+        iter = datapagemap_iterate(&entry->target_modified_pages);
+        while (datapagemap_next(iter, &blkno))
+        {
+            offset = blkno * BLCKSZ;
+
+            source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
+        }
+        pg_free(iter);
+
+        switch (entry->action)
+        {
+            case FILE_ACTION_NONE:
+                /* nothing else to do */
+                break;
+
+            case FILE_ACTION_COPY:
+                /* Truncate the old file out of the way, if any */
+                open_target_file(entry->path, true);
+                source->queue_fetch_range(source, entry->path,
+                                          0, entry->source_size);
+                break;
+
+            case FILE_ACTION_TRUNCATE:
+                truncate_target_file(entry->path, entry->source_size);
+                break;
+
+            case FILE_ACTION_COPY_TAIL:
+                source->queue_fetch_range(source, entry->path,
+                                          entry->target_size,
+                                          entry->source_size - entry->target_size);
+                break;
+
+            case FILE_ACTION_REMOVE:
+                remove_target(entry);
+                break;
+
+            case FILE_ACTION_CREATE:
+                create_target(entry);
+                break;
+
+            case FILE_ACTION_UNDECIDED:
+                pg_fatal("no action decided for \"%s\"", entry->path);
+                break;
+        }
+    }
+
+    /*
+     * We've now copied the list of file ranges that we need to fetch to the
+     * temporary table. Now, actually fetch all of those ranges. XXX
+     */
+    source->finish_fetch(source);
+
+    close_target_file();
 }
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
index b20df8b153..8be1a9582d 100644
--- a/src/bin/pg_rewind/fetch.h
+++ b/src/bin/pg_rewind/fetch.h
@@ -1,12 +1,12 @@
 /*-------------------------------------------------------------------------
  *
  * fetch.h
- *      Fetching data from a local or remote data directory.
+ *      Abstraction for fetching from source server.
  *
- * This file includes the prototypes for functions used to copy files from
- * one data directory to another. The source to copy from can be a local
- * directory (copy method), or a remote PostgreSQL server (libpq fetch
- * method).
+ * The source server can be either a libpq connection to a live system, or
+ * a local data directory. The 'rewind_source' struct abstracts the
+ * operations to fetch data from the source system, so that the rest of
+ * the code doesn't need to care what kind of a source its dealing with.
  *
  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
@@ -16,29 +16,63 @@
 #define FETCH_H
 
 #include "access/xlogdefs.h"
-
+#include "file_ops.h"
 #include "filemap.h"
+#include "libpq-fe.h"
+
+typedef struct rewind_source
+{
+    /*
+     * Traverse all files in the source data directory, and call 'callback'
+     * on each file.
+     */
+    void (*traverse_files) (struct rewind_source *,
+                            process_file_callback_t callback);
+
+    /*
+     * Fetch a single file into a malloc'd buffer. The file size is returned
+     * in *filesize. The returned buffer is always zero-terminated, which is
+     * handy for text files.
+     */
+    char *(*fetch_file) (struct rewind_source *, const char *path,
+                         size_t *filesize);
+
+    /*
+     * Request to fetch (part of) a file in the source system, and write it
+     * the corresponding file in the target system. The source implementation
+     * may queue up the request and execute it later when convenient. Call
+     * finish_fetch() to flush the queue and execute all requests.
+     */
+    void (*queue_fetch_range) (struct rewind_source *, const char *path,
+                               uint64 offset, size_t len);
+
+    /*
+     * Execute all requests queued up with queue_fetch_range().
+     */
+    void (*finish_fetch) (struct rewind_source *);
+
+    /*
+     * Get the current WAL insert position in the source system.
+     */
+    XLogRecPtr (*get_current_wal_insert_lsn) (struct rewind_source *);
+
+    /*
+     * Free this rewind_source object.
+     */
+    void (*destroy) (struct rewind_source *);
+
+} rewind_source;
+
 
 /*
- * Common interface. Calls the copy or libpq method depending on global
- * config options.
+ * Execute all the actions in 'filemap'.
  */
-extern void fetchSourceFileList(void);
-extern char *fetchFile(const char *filename, size_t *filesize);
-extern void execute_file_actions(filemap_t *filemap);
+extern void execute_file_actions(filemap_t *filemap, rewind_source *source);
 
 /* in libpq_fetch.c */
-extern void libpqProcessFileList(void);
-extern char *libpqGetFile(const char *filename, size_t *filesize);
-extern void libpq_executeFileMap(filemap_t *map);
-
-extern void libpqConnect(const char *connstr);
-extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void);
+extern rewind_source *init_libpq_source(PGconn *conn);
 
 /* in copy_fetch.c */
-extern void copy_executeFileMap(filemap_t *map);
-
-typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
-extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
+extern rewind_source *init_local_source(const char *datadir);
 
 #endif                            /* FETCH_H */
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index ec37d0b2e0..4ae343888e 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -15,6 +15,7 @@
 #include "postgres_fe.h"
 
 #include <sys/stat.h>
+#include <dirent.h>
 #include <fcntl.h>
 #include <unistd.h>
 
@@ -35,6 +36,9 @@ static void remove_target_dir(const char *path);
 static void create_target_symlink(const char *path, const char *link);
 static void remove_target_symlink(const char *path);
 
+static void recurse_dir(const char *datadir, const char *parentpath,
+                        process_file_callback_t callback);
+
 /*
  * Open a target file for writing. If 'trunc' is true and the file already
  * exists, it will be truncated.
@@ -305,9 +309,6 @@ sync_target_dir(void)
  * buffer is actually *filesize + 1. That's handy when reading a text file.
  * This function can be used to read binary files as well, you can just
  * ignore the zero-terminator in that case.
- *
- * This function is used to implement the fetchFile function in the "fetch"
- * interface (see fetch.c), but is also called directly.
  */
 char *
 slurpFile(const char *datadir, const char *path, size_t *filesize)
@@ -352,3 +353,125 @@ slurpFile(const char *datadir, const char *path, size_t *filesize)
         *filesize = len;
     return buffer;
 }
+
+/*
+ * Traverse through all files in a data directory, calling 'callback'
+ * for each file.
+ */
+void
+traverse_datadir(const char *datadir, process_file_callback_t callback)
+{
+    recurse_dir(datadir, NULL, callback);
+}
+
+/*
+ * recursive part of traverse_datadir
+ *
+ * parentpath is the current subdirectory's path relative to datadir,
+ * or NULL at the top level.
+ */
+static void
+recurse_dir(const char *datadir, const char *parentpath,
+            process_file_callback_t callback)
+{
+    DIR           *xldir;
+    struct dirent *xlde;
+    char        fullparentpath[MAXPGPATH];
+
+    if (parentpath)
+        snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
+    else
+        snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
+
+    xldir = opendir(fullparentpath);
+    if (xldir == NULL)
+        pg_fatal("could not open directory \"%s\": %m",
+                 fullparentpath);
+
+    while (errno = 0, (xlde = readdir(xldir)) != NULL)
+    {
+        struct stat fst;
+        char        fullpath[MAXPGPATH * 2];
+        char        path[MAXPGPATH * 2];
+
+        if (strcmp(xlde->d_name, ".") == 0 ||
+            strcmp(xlde->d_name, "..") == 0)
+            continue;
+
+        snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
+
+        if (lstat(fullpath, &fst) < 0)
+        {
+            if (errno == ENOENT)
+            {
+                /*
+                 * File doesn't exist anymore. This is ok, if the new primary
+                 * is running and the file was just removed. If it was a data
+                 * file, there should be a WAL record of the removal. If it
+                 * was something else, it couldn't have been anyway.
+                 *
+                 * TODO: But complain if we're processing the target dir!
+                 */
+            }
+            else
+                pg_fatal("could not stat file \"%s\": %m",
+                         fullpath);
+        }
+
+        if (parentpath)
+            snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
+        else
+            snprintf(path, sizeof(path), "%s", xlde->d_name);
+
+        if (S_ISREG(fst.st_mode))
+            callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
+        else if (S_ISDIR(fst.st_mode))
+        {
+            callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
+            /* recurse to handle subdirectories */
+            recurse_dir(datadir, path, callback);
+        }
+#ifndef WIN32
+        else if (S_ISLNK(fst.st_mode))
+#else
+        else if (pgwin32_is_junction(fullpath))
+#endif
+        {
+#if defined(HAVE_READLINK) || defined(WIN32)
+            char        link_target[MAXPGPATH];
+            int            len;
+
+            len = readlink(fullpath, link_target, sizeof(link_target));
+            if (len < 0)
+                pg_fatal("could not read symbolic link \"%s\": %m",
+                         fullpath);
+            if (len >= sizeof(link_target))
+                pg_fatal("symbolic link \"%s\" target is too long",
+                         fullpath);
+            link_target[len] = '\0';
+
+            callback(path, FILE_TYPE_SYMLINK, 0, link_target);
+
+            /*
+             * If it's a symlink within pg_tblspc, we need to recurse into it,
+             * to process all the tablespaces.  We also follow a symlink if
+             * it's for pg_wal.  Symlinks elsewhere are ignored.
+             */
+            if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
+                strcmp(path, "pg_wal") == 0)
+                recurse_dir(datadir, path, callback);
+#else
+            pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
+                     fullpath);
+#endif                            /* HAVE_READLINK */
+        }
+    }
+
+    if (errno)
+        pg_fatal("could not read directory \"%s\": %m",
+                 fullparentpath);
+
+    if (closedir(xldir))
+        pg_fatal("could not close directory \"%s\": %m",
+                 fullparentpath);
+}
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
index d8466385cf..c763085976 100644
--- a/src/bin/pg_rewind/file_ops.h
+++ b/src/bin/pg_rewind/file_ops.h
@@ -23,4 +23,7 @@ extern void sync_target_dir(void);
 
 extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
 
+typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
+extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
+
 #endif                            /* FILE_OPS_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 9c541bb73d..52c4e147e1 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * libpq_fetch.c
- *      Functions for fetching files from a remote server.
+ *      Functions for fetching files from a remote server via libpq.
  *
  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
@@ -9,11 +9,6 @@
  */
 #include "postgres_fe.h"
 
-#include <sys/stat.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <unistd.h>
-
 #include "catalog/pg_type_d.h"
 #include "common/connect.h"
 #include "datapagemap.h"
@@ -23,8 +18,6 @@
 #include "pg_rewind.h"
 #include "port/pg_bswap.h"
 
-PGconn       *conn = NULL;
-
 /*
  * Files are fetched max CHUNKSIZE bytes at a time.
  *
@@ -34,30 +27,73 @@ PGconn       *conn = NULL;
  */
 #define CHUNKSIZE 1000000
 
-static void receiveFileChunks(const char *sql);
-static void execute_pagemap(datapagemap_t *pagemap, const char *path);
-static char *run_simple_query(const char *sql);
-static void run_simple_command(const char *sql);
+typedef struct
+{
+    rewind_source common;    /* common interface functions */
+
+    PGconn       *conn;
+} libpq_source;
+
+static void init_libpq_conn(PGconn *conn);
+static char *run_simple_query(PGconn *conn, const char *sql);
+static void run_simple_command(PGconn *conn, const char *sql);
+
+/* public interface functions */
+static void libpq_traverse_files(rewind_source *source,
+                                 process_file_callback_t callback);
+static char *libpq_fetch_file(rewind_source *source, const char *path,
+                              size_t *filesize);
+static void libpq_queue_fetch_range(rewind_source *source, const char *path,
+                                    uint64 off, size_t len);
+static void libpq_finish_fetch(rewind_source *source);
+static void libpq_destroy(rewind_source *source);
+static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
+
+/*
+ * Create a new libpq source.
+ *
+ * The caller has already established the connection, but should not try
+ * to use it while the source is active.
+ */
+rewind_source *
+init_libpq_source(PGconn *conn)
+{
+    libpq_source *src;
+
+    init_libpq_conn(conn);
+
+    src = pg_malloc0(sizeof(libpq_source));
+
+    src->common.traverse_files = libpq_traverse_files;
+    src->common.fetch_file = libpq_fetch_file;
+    src->common.queue_fetch_range = libpq_queue_fetch_range;
+    src->common.finish_fetch = libpq_finish_fetch;
+    src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
+    src->common.destroy = libpq_destroy;
+
+    src->conn = conn;
+
+    return &src->common;
+}
 
-void
-libpqConnect(const char *connstr)
+/*
+ * Initialize a libpq connection for use.
+ */
+static void
+init_libpq_conn(PGconn *conn)
 {
+    PGresult   *res;
     char       *str;
-    PGresult   *res;
-
-    conn = PQconnectdb(connstr);
-    if (PQstatus(conn) == CONNECTION_BAD)
-        pg_fatal("could not connect to server: %s",
-                 PQerrorMessage(conn));
-
-    if (showprogress)
-        pg_log_info("connected to server");
 
     /* disable all types of timeouts */
-    run_simple_command("SET statement_timeout = 0");
-    run_simple_command("SET lock_timeout = 0");
-    run_simple_command("SET idle_in_transaction_session_timeout = 0");
+    run_simple_command(conn, "SET statement_timeout = 0");
+    run_simple_command(conn, "SET lock_timeout = 0");
+    run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
 
+    /* we don't intend do any updates. Put the connection in read-only mode to keep us honest */
+    run_simple_command(conn, "SET default_transaction_read_only = off");
+
+    /* secure search_path */
     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     if (PQresultStatus(res) != PGRES_TUPLES_OK)
         pg_fatal("could not clear search_path: %s",
@@ -70,7 +106,7 @@ libpqConnect(const char *connstr)
      * currently because we use a temporary table. Better to check for it
      * explicitly than error out, for a better error message.
      */
-    str = run_simple_query("SELECT pg_is_in_recovery()");
+    str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
     if (strcmp(str, "f") != 0)
         pg_fatal("source server must not be in recovery mode");
     pg_free(str);
@@ -80,27 +116,31 @@ libpqConnect(const char *connstr)
      * a page is modified while we read it with pg_read_binary_file(), and we
      * rely on full page images to fix them.
      */
-    str = run_simple_query("SHOW full_page_writes");
+    str = run_simple_query(conn, "SHOW full_page_writes");
     if (strcmp(str, "on") != 0)
         pg_fatal("full_page_writes must be enabled in the source server");
     pg_free(str);
 
     /*
-     * Although we don't do any "real" updates, we do work with a temporary
-     * table. We don't care about synchronous commit for that. It doesn't
-     * otherwise matter much, but if the server is using synchronous
-     * replication, and replication isn't working for some reason, we don't
-     * want to get stuck, waiting for it to start working again.
+     * First create a temporary table, and COPY to load it with the list of
+     * blocks that we need to fetch.
      */
-    run_simple_command("SET synchronous_commit = off");
+    run_simple_command(conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+
+    res = PQexec(conn, "COPY fetchchunks FROM STDIN");
+    if (PQresultStatus(res) != PGRES_COPY_IN)
+        pg_fatal("could not send file list: %s",
+                 PQresultErrorMessage(res));
+    PQclear(res);
 }
 
 /*
- * Runs a query that returns a single value.
+ * Run a query that returns a single value.
+ *
  * The result should be pg_free'd after use.
  */
 static char *
-run_simple_query(const char *sql)
+run_simple_query(PGconn *conn, const char *sql)
 {
     PGresult   *res;
     char       *result;
@@ -123,11 +163,12 @@ run_simple_query(const char *sql)
 }
 
 /*
- * Runs a command.
+ * Run a command.
+ *
  * In the event of a failure, exit immediately.
  */
 static void
-run_simple_command(const char *sql)
+run_simple_command(PGconn *conn, const char *sql)
 {
     PGresult   *res;
 
@@ -141,17 +182,18 @@ run_simple_command(const char *sql)
 }
 
 /*
- * Calls pg_current_wal_insert_lsn() function
+ * Call the pg_current_wal_insert_lsn() function in the remote system.
  */
-XLogRecPtr
-libpqGetCurrentXlogInsertLocation(void)
+static XLogRecPtr
+libpq_get_current_wal_insert_lsn(rewind_source *source)
 {
+    PGconn       *conn = ((libpq_source *) source)->conn;
     XLogRecPtr    result;
     uint32        hi;
     uint32        lo;
     char       *val;
 
-    val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
+    val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
 
     if (sscanf(val, "%X/%X", &hi, &lo) != 2)
         pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
@@ -166,9 +208,10 @@ libpqGetCurrentXlogInsertLocation(void)
 /*
  * Get a list of all files in the data directory.
  */
-void
-libpqProcessFileList(void)
+static void
+libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
 {
+    PGconn *conn = ((libpq_source *) source)->conn;
     PGresult   *res;
     const char *sql;
     int            i;
@@ -246,6 +289,48 @@ libpqProcessFileList(void)
     PQclear(res);
 }
 
+/*
+ * Queue up a request to fetch a piece of a file from remote system.
+ */
+static void
+libpq_queue_fetch_range(rewind_source *source, const char *path, uint64 off,
+                        size_t len)
+{
+    libpq_source *src = (libpq_source *) source;
+    uint64        begin = off;
+    uint64        end = off + len;
+
+    /*
+     * Write the file range to a temporary table in the server.
+     *
+     * The range is sent to the server as a COPY formatted line, to be inserted
+     * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
+     * the temporary table to actually fetch the data.
+     */
+
+    /* Split the range into CHUNKSIZE chunks */
+    while (end - begin > 0)
+    {
+        char        linebuf[MAXPGPATH + 23];
+        unsigned int len;
+
+        /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
+        if (end - begin > CHUNKSIZE)
+            len = CHUNKSIZE;
+        else
+            len = (unsigned int) (end - begin);
+
+        begin += len;
+
+        snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+
+        if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
+            pg_fatal("could not send COPY data: %s",
+                     PQerrorMessage(src->conn));
+    }
+}
+
+
 /*----
  * Runs a query, which returns pieces of files from the remote source data
  * directory, and overwrites the corresponding parts of target files with
@@ -256,20 +341,46 @@ libpqProcessFileList(void)
  * chunk    bytea    -- file content
  *----
  */
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
 static void
-receiveFileChunks(const char *sql)
+libpq_finish_fetch(rewind_source *source)
 {
+    libpq_source *src = (libpq_source *) source;
     PGresult   *res;
+    const char *sql;
 
-    if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
-        pg_fatal("could not send query: %s", PQerrorMessage(conn));
+    if (PQputCopyEnd(src->conn, NULL) != 1)
+        pg_fatal("could not send end-of-COPY: %s",
+                 PQerrorMessage(src->conn));
+
+    while ((res = PQgetResult(src->conn)) != NULL)
+    {
+        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            pg_fatal("unexpected result while sending file list: %s",
+                     PQresultErrorMessage(res));
+        PQclear(res);
+    }
+
+    /*
+     * We've now copied the list of file ranges that we need to fetch to the
+     * temporary table. Now, actually fetch all of those ranges.
+     */
+    sql =
+        "SELECT path, begin,\n"
+        "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
+        "FROM fetchchunks\n";
+
+    if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+        pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
 
     pg_log_debug("getting file chunks");
 
-    if (PQsetSingleRowMode(conn) != 1)
+    if (PQsetSingleRowMode(src->conn) != 1)
         pg_fatal("could not set libpq connection to single row mode");
 
-    while ((res = PQgetResult(conn)) != NULL)
+    while ((res = PQgetResult(src->conn)) != NULL)
     {
         char       *filename;
         int            filenamelen;
@@ -363,28 +474,29 @@ receiveFileChunks(const char *sql)
 }
 
 /*
- * Receive a single file as a malloc'd buffer.
+ * Fetch a single file as a malloc'd buffer.
  */
-char *
-libpqGetFile(const char *filename, size_t *filesize)
+static char *
+libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
 {
+    PGconn       *conn = ((libpq_source *) source)->conn;
     PGresult   *res;
     char       *result;
     int            len;
     const char *paramValues[1];
 
-    paramValues[0] = filename;
+    paramValues[0] = path;
     res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
                        1, NULL, paramValues, NULL, NULL, 1);
 
     if (PQresultStatus(res) != PGRES_TUPLES_OK)
         pg_fatal("could not fetch remote file \"%s\": %s",
-                 filename, PQresultErrorMessage(res));
+                 path, PQresultErrorMessage(res));
 
     /* sanity check the result set */
     if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
         pg_fatal("unexpected result set while fetching remote file \"%s\"",
-                 filename);
+                 path);
 
     /* Read result to local variables */
     len = PQgetlength(res, 0, 0);
@@ -394,7 +506,7 @@ libpqGetFile(const char *filename, size_t *filesize)
 
     PQclear(res);
 
-    pg_log_debug("fetched file \"%s\", length %d", filename, len);
+    pg_log_debug("fetched file \"%s\", length %d", path, len);
 
     if (filesize)
         *filesize = len;
@@ -402,142 +514,11 @@ libpqGetFile(const char *filename, size_t *filesize)
 }
 
 /*
- * Write a file range to a temporary table in the server.
- *
- * The range is sent to the server as a COPY formatted line, to be inserted
- * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
- * function to actually fetch the data.
+ * Close a libpq source.
  */
 static void
-fetch_file_range(const char *path, uint64 begin, uint64 end)
+libpq_destroy(rewind_source *source)
 {
-    char        linebuf[MAXPGPATH + 23];
-
-    /* Split the range into CHUNKSIZE chunks */
-    while (end - begin > 0)
-    {
-        unsigned int len;
-
-        /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
-        if (end - begin > CHUNKSIZE)
-            len = CHUNKSIZE;
-        else
-            len = (unsigned int) (end - begin);
-
-        snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
-
-        if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
-            pg_fatal("could not send COPY data: %s",
-                     PQerrorMessage(conn));
-
-        begin += len;
-    }
-}
-
-/*
- * Fetch all changed blocks from remote source data directory.
- */
-void
-libpq_executeFileMap(filemap_t *map)
-{
-    file_entry_t *entry;
-    const char *sql;
-    PGresult   *res;
-    int            i;
-
-    /*
-     * First create a temporary table, and load it with the blocks that we
-     * need to fetch.
-     */
-    sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
-    run_simple_command(sql);
-
-    sql = "COPY fetchchunks FROM STDIN";
-    res = PQexec(conn, sql);
-
-    if (PQresultStatus(res) != PGRES_COPY_IN)
-        pg_fatal("could not send file list: %s",
-                 PQresultErrorMessage(res));
-    PQclear(res);
-
-    for (i = 0; i < map->nactions; i++)
-    {
-        entry = map->actions[i];
-
-        /* If this is a relation file, copy the modified blocks */
-        execute_pagemap(&entry->target_modified_pages, entry->path);
-
-        switch (entry->action)
-        {
-            case FILE_ACTION_NONE:
-                /* nothing else to do */
-                break;
-
-            case FILE_ACTION_COPY:
-                /* Truncate the old file out of the way, if any */
-                open_target_file(entry->path, true);
-                fetch_file_range(entry->path, 0, entry->source_size);
-                break;
-
-            case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->source_size);
-                break;
-
-            case FILE_ACTION_COPY_TAIL:
-                fetch_file_range(entry->path, entry->target_size, entry->source_size);
-                break;
-
-            case FILE_ACTION_REMOVE:
-                remove_target(entry);
-                break;
-
-            case FILE_ACTION_CREATE:
-                create_target(entry);
-                break;
-
-            case FILE_ACTION_UNDECIDED:
-                pg_fatal("no action decided for \"%s\"", entry->path);
-                break;
-        }
-    }
-
-    if (PQputCopyEnd(conn, NULL) != 1)
-        pg_fatal("could not send end-of-COPY: %s",
-                 PQerrorMessage(conn));
-
-    while ((res = PQgetResult(conn)) != NULL)
-    {
-        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-            pg_fatal("unexpected result while sending file list: %s",
-                     PQresultErrorMessage(res));
-        PQclear(res);
-    }
-
-    /*
-     * We've now copied the list of file ranges that we need to fetch to the
-     * temporary table. Now, actually fetch all of those ranges.
-     */
-    sql =
-        "SELECT path, begin,\n"
-        "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
-        "FROM fetchchunks\n";
-
-    receiveFileChunks(sql);
-}
-
-static void
-execute_pagemap(datapagemap_t *pagemap, const char *path)
-{
-    datapagemap_iterator_t *iter;
-    BlockNumber blkno;
-    off_t        offset;
-
-    iter = datapagemap_iterate(pagemap);
-    while (datapagemap_next(iter, &blkno))
-    {
-        offset = blkno * BLCKSZ;
-
-        fetch_file_range(path, offset, offset + BLCKSZ);
-    }
-    pg_free(iter);
+    pfree(source);
+    /* NOTE: we don't close the connection here, as it was not opened by us. */
 }
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 13b4eab52b..c5ee70272a 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -35,8 +35,8 @@ static void usage(const char *progname);
 static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
                               XLogRecPtr checkpointloc);
 
-static void digestControlFile(ControlFileData *ControlFile, char *source,
-                              size_t size);
+static void digestControlFile(ControlFileData *ControlFile,
+                              const char *content, size_t size);
 static void getRestoreCommand(const char *argv0);
 static void sanityChecks(void);
 static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@@ -69,6 +69,8 @@ int            targetNentries;
 uint64        fetch_size;
 uint64        fetch_done;
 
+static PGconn *conn;
+static rewind_source *source;
 
 static void
 usage(const char *progname)
@@ -269,19 +271,29 @@ main(int argc, char **argv)
 
     atexit(disconnect_atexit);
 
-    /* Connect to remote server */
-    if (connstr_source)
-        libpqConnect(connstr_source);
-
     /*
-     * Ok, we have all the options and we're ready to start. Read in all the
-     * information we need from both clusters.
+     * Ok, we have all the options and we're ready to start. First, connect
+     * to remote server.
      */
-    buffer = slurpFile(datadir_target, "global/pg_control", &size);
-    digestControlFile(&ControlFile_target, buffer, size);
-    pg_free(buffer);
+    if (connstr_source)
+    {
+        conn = PQconnectdb(connstr_source);
+
+        if (PQstatus(conn) == CONNECTION_BAD)
+            pg_fatal("could not connect to server: %s",
+                     PQerrorMessage(conn));
+
+        if (showprogress)
+            pg_log_info("connected to server");
+
+        source = init_libpq_source(conn);
+    }
+    else
+        source = init_local_source(datadir_source);
 
     /*
+     * Check the status of the target instance.
+     *
      * If the target instance was not cleanly shut down, start and stop the
      * target cluster once in single-user mode to enforce recovery to finish,
      * ensuring that the cluster can be used by pg_rewind.  Note that if
@@ -289,6 +301,10 @@ main(int argc, char **argv)
      * need to make sure by themselves that the target cluster is in a clean
      * state.
      */
+    buffer = slurpFile(datadir_target, "global/pg_control", &size);
+    digestControlFile(&ControlFile_target, buffer, size);
+    pg_free(buffer);
+
     if (!no_ensure_shutdown &&
         ControlFile_target.state != DB_SHUTDOWNED &&
         ControlFile_target.state != DB_SHUTDOWNED_IN_RECOVERY)
@@ -300,17 +316,20 @@ main(int argc, char **argv)
         pg_free(buffer);
     }
 
-    buffer = fetchFile("global/pg_control", &size);
+    buffer = source->fetch_file(source, "global/pg_control", &size);
     digestControlFile(&ControlFile_source, buffer, size);
     pg_free(buffer);
 
     sanityChecks();
 
     /*
+     * Find the common ancestor timeline between the clusters.
+     *
      * If both clusters are already on the same timeline, there's nothing to
      * do.
      */
-    if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID)
+    if (ControlFile_target.checkPointCopy.ThisTimeLineID ==
+        ControlFile_source.checkPointCopy.ThisTimeLineID)
     {
         pg_log_info("source and target cluster are on the same timeline");
         rewind_needed = false;
@@ -370,12 +389,12 @@ main(int argc, char **argv)
                 chkpttli);
 
     /*
-     * Collect information about all files in the target and source systems.
+     * Collect information about all files in the both data directories.
      */
     if (showprogress)
         pg_log_info("reading source file list");
     filemap_init();
-    fetchSourceFileList();
+    source->traverse_files(source, &process_source_file);
 
     if (showprogress)
         pg_log_info("reading target file list");
@@ -423,7 +442,7 @@ main(int argc, char **argv)
      * modified the target directory and there is no turning back!
      */
 
-    execute_file_actions(filemap);
+    execute_file_actions(filemap, source);
 
     progress_report(true);
 
@@ -443,7 +462,7 @@ main(int argc, char **argv)
 
     if (connstr_source)
     {
-        endrec = libpqGetCurrentXlogInsertLocation();
+        endrec = source->get_current_wal_insert_lsn(source);
         endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
     }
     else
@@ -465,6 +484,14 @@ main(int argc, char **argv)
         WriteRecoveryConfig(conn, datadir_target,
                             GenerateRecoveryConfig(conn, NULL));
 
+    /* don't need the source connection anymore */
+    source->destroy(source);
+    if (conn)
+    {
+        PQfinish(conn);
+        conn = NULL;
+    }
+
     pg_log_info("Done!");
 
     return 0;
@@ -627,7 +654,7 @@ getTimelineHistory(ControlFileData *controlFile, int *nentries)
 
         /* Get history file from appropriate source */
         if (controlFile == &ControlFile_source)
-            histfile = fetchFile(path, NULL);
+            histfile = source->fetch_file(source, path, NULL);
         else if (controlFile == &ControlFile_target)
             histfile = slurpFile(datadir_target, path, NULL);
         else
@@ -783,16 +810,17 @@ checkControlFile(ControlFileData *ControlFile)
 }
 
 /*
- * Verify control file contents in the buffer src, and copy it to *ControlFile.
+ * Verify control file contents in the buffer 'content', and copy it to *ControlFile.
  */
 static void
-digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
+digestControlFile(ControlFileData *ControlFile,
+                  const char *content, size_t size)
 {
     if (size != PG_CONTROL_FILE_SIZE)
         pg_fatal("unexpected control file size %d, expected %d",
                  (int) size, PG_CONTROL_FILE_SIZE);
 
-    memcpy(ControlFile, src, sizeof(ControlFileData));
+    memcpy(ControlFile, content, sizeof(ControlFileData));
 
     /* set and validate WalSegSz */
     WalSegSz = ControlFile->xlog_seg_size;
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
index 67f90c2a38..0dc3dbd525 100644
--- a/src/bin/pg_rewind/pg_rewind.h
+++ b/src/bin/pg_rewind/pg_rewind.h
@@ -20,8 +20,6 @@
 
 /* Configuration options */
 extern char *datadir_target;
-extern char *datadir_source;
-extern char *connstr_source;
 extern bool showprogress;
 extern bool dry_run;
 extern bool do_sync;
@@ -31,9 +29,6 @@ extern int    WalSegSz;
 extern TimeLineHistoryEntry *targetHistory;
 extern int    targetNentries;
 
-/* general state */
-extern PGconn *conn;
-
 /* Progress counters */
 extern uint64 fetch_size;
 extern uint64 fetch_done;
-- 
2.18.4

From 9575133ca5da2b2f6827828cb0c26c6122b328f9 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:43 +0300
Subject: [PATCH v3 5/5] Allow pg_rewind to use a standby server as the source
 system.

Using a hot standby server as the source has not been possible, because
pg_rewind creates a temporary table in the source system, to hold the
list of file ranges that need to be fetched. Refactor it to queue up the
file fetch requests in pg_rewind's memory, so that the temporary table
is no longer needed.

Also update the logic to compute 'minRecoveryPoint' correctly, when the
source is a standby server.
---
 src/bin/pg_rewind/libpq_fetch.c | 281 ++++++++++++++++++++++----------
 src/bin/pg_rewind/pg_rewind.c   |  67 ++++++--
 2 files changed, 253 insertions(+), 95 deletions(-)

diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 52c4e147e1..f61a424299 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -15,39 +15,61 @@
 #include "fetch.h"
 #include "file_ops.h"
 #include "filemap.h"
+#include "lib/stringinfo.h"
 #include "pg_rewind.h"
 #include "port/pg_bswap.h"
 
 /*
- * Files are fetched max CHUNKSIZE bytes at a time.
- *
- * (This only applies to files that are copied in whole, or for truncated
- * files where we copy the tail. Relation files, where we know the individual
- * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
+ * Files are fetched max CHUNK_SIZE bytes at a time, and with a
+ * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
  */
-#define CHUNKSIZE 1000000
+#define CHUNK_SIZE (1024 * 1024)
+#define MAX_CHUNKS_PER_QUERY 1000
+
+/* represents the request to fetch a piece of a file from the source */
+typedef struct
+{
+    const char *path;        /* path relative to data directory root */
+    uint64        offset;
+    uint32        length;
+} fetch_range_request;
 
 typedef struct
 {
     rewind_source common;    /* common interface functions */
 
     PGconn       *conn;
+
+    /*
+     * Queue of chunks that have been requested with the queue_fetch_range()
+     * function, but have not been fetched from the remote server yet.
+     */
+    int            num_requests;
+    fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
+
+    /* temporary space for process_queued_fetch_requests() */
+    StringInfoData paths;
+    StringInfoData offsets;
+    StringInfoData lengths;
 } libpq_source;
 
 static void init_libpq_conn(PGconn *conn);
 static char *run_simple_query(PGconn *conn, const char *sql);
 static void run_simple_command(PGconn *conn, const char *sql);
+static void appendArrayEscapedString(StringInfo buf, const char *str);
+
+static void process_queued_fetch_requests(libpq_source *src);
 
 /* public interface functions */
 static void libpq_traverse_files(rewind_source *source,
                                  process_file_callback_t callback);
-static char *libpq_fetch_file(rewind_source *source, const char *path,
-                              size_t *filesize);
 static void libpq_queue_fetch_range(rewind_source *source, const char *path,
                                     uint64 off, size_t len);
 static void libpq_finish_fetch(rewind_source *source);
-static void libpq_destroy(rewind_source *source);
+static char *libpq_fetch_file(rewind_source *source, const char *path,
+                              size_t *filesize);
 static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
+static void libpq_destroy(rewind_source *source);
 
 /*
  * Create a new libpq source.
@@ -73,6 +95,10 @@ init_libpq_source(PGconn *conn)
 
     src->conn = conn;
 
+    initStringInfo(&src->paths);
+    initStringInfo(&src->offsets);
+    initStringInfo(&src->lengths);
+
     return &src->common;
 }
 
@@ -90,7 +116,10 @@ init_libpq_conn(PGconn *conn)
     run_simple_command(conn, "SET lock_timeout = 0");
     run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
 
-    /* we don't intend do any updates. Put the connection in read-only mode to keep us honest */
+    /*
+     * We don't intend do any updates.  Put the connection in read-only mode
+     * to keep us honest.
+     */
     run_simple_command(conn, "SET default_transaction_read_only = off");
 
     /* secure search_path */
@@ -100,17 +129,6 @@ init_libpq_conn(PGconn *conn)
                  PQresultErrorMessage(res));
     PQclear(res);
 
-    /*
-     * Check that the server is not in hot standby mode. There is no
-     * fundamental reason that couldn't be made to work, but it doesn't
-     * currently because we use a temporary table. Better to check for it
-     * explicitly than error out, for a better error message.
-     */
-    str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
-    if (strcmp(str, "f") != 0)
-        pg_fatal("source server must not be in recovery mode");
-    pg_free(str);
-
     /*
      * Also check that full_page_writes is enabled.  We can get torn pages if
      * a page is modified while we read it with pg_read_binary_file(), and we
@@ -121,15 +139,15 @@ init_libpq_conn(PGconn *conn)
         pg_fatal("full_page_writes must be enabled in the source server");
     pg_free(str);
 
-    /*
-     * First create a temporary table, and COPY to load it with the list of
-     * blocks that we need to fetch.
-     */
-    run_simple_command(conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+    /* Prepare a statement we'll use to fetch files */
+    res = PQprepare(conn, "fetch_chunks_stmt",
+                    "SELECT path, begin,\n"
+                    "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
+                    "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
+                    3, NULL);
 
-    res = PQexec(conn, "COPY fetchchunks FROM STDIN");
-    if (PQresultStatus(res) != PGRES_COPY_IN)
-        pg_fatal("could not send file list: %s",
+    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+        pg_fatal("could not prepare statement to fetch file contents: %s",
                  PQresultErrorMessage(res));
     PQclear(res);
 }
@@ -297,91 +315,143 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, uint64 off,
                         size_t len)
 {
     libpq_source *src = (libpq_source *) source;
-    uint64        begin = off;
-    uint64        end = off + len;
 
     /*
-     * Write the file range to a temporary table in the server.
+     * Does this request happen to be a continuation of the previous chunk?
+     * If so, merge it with the previous one.
      *
-     * The range is sent to the server as a COPY formatted line, to be inserted
-     * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
-     * the temporary table to actually fetch the data.
+     * XXX: We use pointer equality to compare the path. That's good enough
+     * for our purposes; the caller always passes the same pointer for the
+     * same filename. If it didn't, we would fail to merge requests, but it
+     * wouldn't affect correctness.
      */
-
-    /* Split the range into CHUNKSIZE chunks */
-    while (end - begin > 0)
+    if (src->num_requests > 0)
     {
-        char        linebuf[MAXPGPATH + 23];
-        unsigned int len;
+        fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
+
+        if (prev->offset + prev->length == off &&
+            prev->length < CHUNK_SIZE &&
+            prev->path == path)
+        {
+            /*
+             * Extend the previous request to cover as much of this new request
+             * as possible, without exceeding CHUNK_SIZE.
+             */
+            int32        thislen;
+
+            thislen = Min(len, CHUNK_SIZE - prev->length);
+            src->request_queue[src->num_requests - 1].length += thislen;
 
-        /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
-        if (end - begin > CHUNKSIZE)
-            len = CHUNKSIZE;
-        else
-            len = (unsigned int) (end - begin);
+            off += thislen;
+            len -= thislen;
+
+            /*
+             * Fall through to create new requests for any remaining 'len' that
+             * didn't fit in the previous chunk.
+             */
+        }
+    }
+
+    /* Divide the request into pieces of CHUNK_SIZE bytes each */
+    while (len > 0)
+    {
+        int32        thislen;
 
-        begin += len;
+        /* if the queue is full, perform all the work queued up so far */
+        if (src->num_requests == MAX_CHUNKS_PER_QUERY)
+            process_queued_fetch_requests(src);
 
-        snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+        thislen = Min(len, CHUNK_SIZE);
+        src->request_queue[src->num_requests].path = path;
+        src->request_queue[src->num_requests].offset = off;
+        src->request_queue[src->num_requests].length = thislen;
+        src->num_requests++;
 
-        if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
-            pg_fatal("could not send COPY data: %s",
-                     PQerrorMessage(src->conn));
+        off += thislen;
+        len -= thislen;
     }
 }
 
-
-/*----
- * Runs a query, which returns pieces of files from the remote source data
- * directory, and overwrites the corresponding parts of target files with
- * the received parts. The result set is expected to be of format:
- *
- * path        text    -- path in the data directory, e.g "base/1/123"
- * begin    int8    -- offset within the file
- * chunk    bytea    -- file content
- *----
- */
 /*
- * Receive all the queued chunks and write them to the target data directory.
+ * Fetche all the queued chunks and writes them to the target data directory.
  */
 static void
 libpq_finish_fetch(rewind_source *source)
 {
-    libpq_source *src = (libpq_source *) source;
+    process_queued_fetch_requests((libpq_source *) source);
+}
+
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
+static void
+process_queued_fetch_requests(libpq_source *src)
+{
+    const char *params[3];
     PGresult   *res;
-    const char *sql;
+    int            chunkno;
 
-    if (PQputCopyEnd(src->conn, NULL) != 1)
-        pg_fatal("could not send end-of-COPY: %s",
-                 PQerrorMessage(src->conn));
+    if (src->num_requests == 0)
+        return;
 
-    while ((res = PQgetResult(src->conn)) != NULL)
+    pg_log_debug("getting %d file chunks", src->num_requests);
+
+    /*
+     * The prepared statement, 'fetch_chunks_stmt', takes three arrays
+     * with the same length as parameters: paths, offsets and lengths.
+     * Construct the string representations of the parameter arrays.
+     */
+    resetStringInfo(&src->paths);
+    resetStringInfo(&src->offsets);
+    resetStringInfo(&src->lengths);
+
+    appendStringInfoChar(&src->paths, '{');
+    appendStringInfoChar(&src->offsets, '{');
+    appendStringInfoChar(&src->lengths, '{');
+    for (int i = 0; i < src->num_requests; i++)
     {
-        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-            pg_fatal("unexpected result while sending file list: %s",
-                     PQresultErrorMessage(res));
-        PQclear(res);
+        fetch_range_request *rq = &src->request_queue[i];
+
+        if (i > 0)
+        {
+            appendStringInfoChar(&src->paths, ',');
+            appendStringInfoChar(&src->offsets, ',');
+            appendStringInfoChar(&src->lengths, ',');
+        }
+
+        appendArrayEscapedString(&src->paths, rq->path);
+        appendStringInfo(&src->offsets, INT64_FORMAT, rq->offset);
+        appendStringInfo(&src->lengths, "%d", rq->length);
     }
+    appendStringInfoChar(&src->paths, '}');
+    appendStringInfoChar(&src->offsets, '}');
+    appendStringInfoChar(&src->lengths, '}');
 
     /*
-     * We've now copied the list of file ranges that we need to fetch to the
-     * temporary table. Now, actually fetch all of those ranges.
+     * Execute the prepared statement.
      */
-    sql =
-        "SELECT path, begin,\n"
-        "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
-        "FROM fetchchunks\n";
+    params[0] = src->paths.data;
+    params[1] = src->offsets.data;
+    params[2] = src->lengths.data;
 
-    if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+    if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
         pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
 
-    pg_log_debug("getting file chunks");
-
     if (PQsetSingleRowMode(src->conn) != 1)
         pg_fatal("could not set libpq connection to single row mode");
 
+    /*----
+     * The result set is of format:
+     *
+     * path        text    -- path in the data directory, e.g "base/1/123"
+     * begin    int8    -- offset within the file
+     * chunk    bytea    -- file content
+     *----
+     */
+    chunkno = 0;
     while ((res = PQgetResult(src->conn)) != NULL)
     {
+        fetch_range_request *rq = &src->request_queue[chunkno];
         char       *filename;
         int            filenamelen;
         int64        chunkoff;
@@ -402,6 +472,9 @@ libpq_finish_fetch(rewind_source *source)
                          PQresultErrorMessage(res));
         }
 
+        if (chunkno > src->num_requests)
+            pg_fatal("received more data chunks than requested");
+
         /* sanity check the result set */
         if (PQnfields(res) != 3 || PQntuples(res) != 1)
             pg_fatal("unexpected result set size while fetching remote files");
@@ -446,9 +519,7 @@ libpq_finish_fetch(rewind_source *source)
          * If a file has been deleted on the source, remove it on the target
          * as well.  Note that multiple unlink() calls may happen on the same
          * file if multiple data chunks are associated with it, hence ignore
-         * unconditionally anything missing.  If this file is not a relation
-         * data file, then it has been already truncated when creating the
-         * file chunk list at the previous execution of the filemap.
+         * unconditionally anything missing.
          */
         if (PQgetisnull(res, 0, 2))
         {
@@ -463,14 +534,54 @@ libpq_finish_fetch(rewind_source *source)
         pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
                      filename, (long long int) chunkoff, chunksize);
 
+        if (strcmp(filename, rq->path) != 0)
+        {
+            pg_fatal("received data for file \"%s\", when requested for \"%s\"",
+                     filename, rq->path);
+        }
+        if (chunkoff != rq->offset)
+            pg_fatal("received data at offset " UINT64_FORMAT" of file \"%s\", when requested for offset "
UINT64_FORMAT,
+                     chunkoff, rq->path, rq->offset);
+        if (chunksize > rq->length)
+        {
+            pg_fatal("received more than requested for file \"%s\"",
+                     rq->path);
+            /* receiving less is OK, though */
+        }
+
         open_target_file(filename, false);
-
         write_target_range(chunk, chunkoff, chunksize);
 
         pg_free(filename);
 
         PQclear(res);
+        chunkno++;
     }
+    if (chunkno != src->num_requests)
+        pg_fatal("unexpected number of data chunks received");
+
+    src->num_requests = 0;
+}
+
+/*
+ * Escape a string to be used as element in a text array constant
+ */
+static void
+appendArrayEscapedString(StringInfo buf, const char *str)
+{
+    appendStringInfoCharMacro(buf, '\"');
+    while (*str)
+    {
+        char        ch = *str;
+
+        if (ch == '"' || ch == '\\')
+            appendStringInfoCharMacro(buf, '\\');
+
+        appendStringInfoCharMacro(buf, ch);
+
+        str++;
+    }
+    appendStringInfoCharMacro(buf, '\"');
 }
 
 /*
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index c5ee70272a..e4e773deeb 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -45,6 +45,7 @@ static void disconnect_atexit(void);
 
 static ControlFileData ControlFile_target;
 static ControlFileData ControlFile_source;
+static ControlFileData ControlFile_source_after;
 
 const char *progname;
 int            WalSegSz;
@@ -446,30 +447,76 @@ main(int argc, char **argv)
 
     progress_report(true);
 
+    /*
+     * Fetch the control file from the source last. This ensures that the
+     * minRecoveryPoint is up-to-date.
+     */
+    buffer = source->fetch_file(source, "global/pg_control", &size);
+    digestControlFile(&ControlFile_source_after, buffer, size);
+    pg_free(buffer);
+
+    /*
+     * Sanity check: If the source is a local system, the control file should
+     * not have changed since we started.
+     *
+     * XXX: We assume it hasn't been modified, but actually, what could go
+     * wrong? The logic handles a libpq source that's modified concurrently,
+     * why not a local datadir?
+     */
+    if (datadir_source &&
+        memcmp(&ControlFile_source, &ControlFile_source_after,
+               sizeof(ControlFileData)) != 0)
+    {
+        pg_fatal("source system was modified while pg_rewind was running");
+    }
+
     if (showprogress)
         pg_log_info("creating backup label and updating control file");
     createBackupLabel(chkptredo, chkpttli, chkptrec);
 
     /*
      * Update control file of target. Make it ready to perform archive
-     * recovery when restarting.
+     * recovery when restarting, starting from the last common checkpoint.
      *
-     * minRecoveryPoint is set to the current WAL insert location in the
-     * source server. Like in an online backup, it's important that we recover
-     * all the WAL that was generated while we copied the files over.
+     * Like in an online backup, it's important that we replay all the WAL
+     * that was generated while we copied the files over. To enforce that,
+     * set 'minRecoveryPoint' in the control file.
      */
-    memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
-
     if (connstr_source)
     {
-        endrec = source->get_current_wal_insert_lsn(source);
-        endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+        if (ControlFile_source_after.state == DB_IN_ARCHIVE_RECOVERY)
+        {
+            /*
+             * Source is a standby server. We must replay to its
+             * minRecoveryPoint.
+             */
+            endrec = ControlFile_source_after.minRecoveryPoint;
+            endtli = ControlFile_source_after.minRecoveryPointTLI;
+        }
+        else
+        {
+            /*
+             * Source is a production, non-standby, server. We must recover up
+             * to the last WAL insert location.
+             */
+            if (ControlFile_source_after.state != DB_IN_PRODUCTION)
+                pg_fatal("source system was in unexpected state at end of rewind");
+
+            endrec = source->get_current_wal_insert_lsn(source);
+            endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
+        }
     }
     else
     {
-        endrec = ControlFile_source.checkPoint;
-        endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+        /*
+         * Source is a local data directory. It should've shut down cleanly,
+         * and we must to the latest shutdown checkpoint.
+         */
+        endrec = ControlFile_source_after.checkPoint;
+        endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
     }
+
+    memcpy(&ControlFile_new, &ControlFile_source_after, sizeof(ControlFileData));
     ControlFile_new.minRecoveryPoint = endrec;
     ControlFile_new.minRecoveryPointTLI = endtli;
     ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
-- 
2.18.4


pgsql-hackers by date:

Previous
From: Alexander Kukushkin
Date:
Subject: Re: Concurrency issue in pg_rewind
Next
From: Fabien COELHO
Date:
Subject: Re: pgbench calculates summary numbers a wrong way.