Thread: Refactor pg_rewind code and make it work against a standby

Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
Hi,

I started to hack on making pg_rewind crash-safe (see [1]), but I 
quickly got side-tracked into refactoring and tidying up up the code in 
general. I ended up with this series of patches:

The first four patches are just refactoring that make the code and the 
logic more readable. Tom Lane commented about the messy comments earlier 
(see [2]), and I hope these patches will alleviate that confusion. See 
commit messages for details.

The last patch refactors the logic in libpq_fetch.c, so that it no 
longer uses a temporary table in the source system. That allows using a 
hot standby server as the pg_rewind source.

This doesn't do anything about pg_rewind's crash-safety yet, but I'll 
try work on that after these patches.

[1] 
https://www.postgresql.org/message-id/d8dcc760-8780-5084-f066-6d663801d2e2%40iki.fi

[2] https://www.postgresql.org/message-id/30255.1522711675%40sss.pgh.pa.us

- Heikki

Attachment

Re: Refactor pg_rewind code and make it work against a standby

From
Kyotaro Horiguchi
Date:
Hello.

At Wed, 19 Aug 2020 15:50:16 +0300, Heikki Linnakangas <hlinnaka@iki.fi> wrote in 
> Hi,
> 
> I started to hack on making pg_rewind crash-safe (see [1]), but I
> quickly got side-tracked into refactoring and tidying up up the code
> in general. I ended up with this series of patches:

^^;

> The first four patches are just refactoring that make the code and the
> logic more readable. Tom Lane commented about the messy comments
> earlier (see [2]), and I hope these patches will alleviate that
> confusion. See commit messages for details.

0001: It looks fine. The new location is reasonable but adding one
    extern is a bit annoying. But I don't object it.

0002: Rewording that old->target and new->source makes the meaning far
   clearer. Moving decisions core code into filemap_finalize is
   reasonable.

    By the way, some of the rules are remaining in
    process_source/target_file. For example, pg_wal that is a symlink,
    or tmporary directories. and excluded files.  The number of
    excluded files doesn't seem so large so it doesn't seem that the
    exclusion give advantage so much.  They seem to me movable to
    filemap_finalize, and we can get rid of the callbacks by doing
    so. Is there any reason that the remaining rules should be in the
    callbacks?

0003: Thomas is propsing sort template. It could be used if committed.

0004:

 The names of many of the functions gets an additional word "local"
 but I don't get the meaning clearly. but its about linguistic sense
 and I'm not fit to that..
 
-rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
+local_fetch_file_range(rewind_source *source, const char *path, uint64 off,

 The function actually copying the soruce range to the target file. So
 "fetch" might give somewhat different meaning, but its about
 linguistic (omitted..).


> The last patch refactors the logic in libpq_fetch.c, so that it no
> longer uses a temporary table in the source system. That allows using
> a hot standby server as the pg_rewind source.

That sounds nice.

> This doesn't do anything about pg_rewind's crash-safety yet, but I'll
> try work on that after these patches.
> 
> [1]
> https://www.postgresql.org/message-id/d8dcc760-8780-5084-f066-6d663801d2e2%40iki.fi
> 
> [2]
> https://www.postgresql.org/message-id/30255.1522711675%40sss.pgh.pa.us
> 
> - Heikki

I'm going to continue reviewing this later.

reagards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
On 20/08/2020 11:32, Kyotaro Horiguchi wrote:
> 0002: Rewording that old->target and new->source makes the meaning far
>     clearer. Moving decisions core code into filemap_finalize is
>     reasonable.
> 
>      By the way, some of the rules are remaining in
>      process_source/target_file. For example, pg_wal that is a symlink,
>      or tmporary directories. and excluded files.  The number of
>      excluded files doesn't seem so large so it doesn't seem that the
>      exclusion give advantage so much.  They seem to me movable to
>      filemap_finalize, and we can get rid of the callbacks by doing
>      so. Is there any reason that the remaining rules should be in the
>      callbacks?

Good idea! I changed the patch that way.

> 0003: Thomas is propsing sort template. It could be used if committed.
> 
> 0004:
> 
>   The names of many of the functions gets an additional word "local"
>   but I don't get the meaning clearly. but its about linguistic sense
>   and I'm not fit to that..
>   
> -rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
> +local_fetch_file_range(rewind_source *source, const char *path, uint64 off,
> 
>   The function actually copying the soruce range to the target file. So
>   "fetch" might give somewhat different meaning, but its about
>   linguistic (omitted..).

Hmm. It is "fetching" the range from the source server, and writing it 
to the target. The term makes more sense with a libpq source. Perhaps 
this function should be called "local_copy_range" or something, but it'd 
also be nice to have "fetch" in the name because the function pointer 
it's assigned to is called "queue_fetch_range".

> I'm going to continue reviewing this later.

Thanks! Attached is a new set of patches. The only meaningful change is 
in the 2nd patch, which I modified per your suggestion. Also, I moved 
the logic to decide each file's fate into a new subroutine called 
decide_file_action().

- Heikki

Attachment

Re: Refactor pg_rewind code and make it work against a standby

From
Kyotaro Horiguchi
Date:
Hello.

It needed rebasing. (Attached)

At Tue, 25 Aug 2020 16:32:02 +0300, Heikki Linnakangas <hlinnaka@iki.fi> wrote in 
> On 20/08/2020 11:32, Kyotaro Horiguchi wrote:
> > 0002: Rewording that old->target and new->source makes the meaning far
> 
> Good idea! I changed the patch that way.

Looks Good.

> > 0003: Thomas is propsing sort template. It could be used if committed.


+     * If the block is beyond the EOF in the source system, or the file doesn't
+     * doesn'exist in the source at all, we're going to truncate/remove it away

"the file doesn't doesn'exist"

I don't think filemap_finalize needs to iterate over filemap twice.

hash_string_pointer is a copy of that of pg_verifybackup.c. Is it
worth having in hashfn.h or .c?

> --- a/src/bin/pg_rewind/pg_rewind.c
> +++ b/src/bin/pg_rewind/pg_rewind.c
> ...
> +    filemap_t  *filemap;
> ..
> +    filemap_init();
> ...
> +    filemap = filemap_finalize();

I'm a bit confused by this, and realized that what filemap_init
initializes is *not* the filemap, but the filehash. So for example,
the name of the functions might should be something like this?

filehash_init()
filemap = filehash_finalyze()/create_filemap()


> > 0004:
> >   The names of many of the functions gets an additional word "local"
> >   but I don't get the meaning clearly. but its about linguistic sense
> >   and I'm not fit to that..
> >   -rewind_copy_file_range(const char *path, off_t begin, off_t end, bool
> >   -trunc)
> > +local_fetch_file_range(rewind_source *source, const char *path,
> > uint64 off,
> >   The function actually copying the soruce range to the target file. So
> >   "fetch" might give somewhat different meaning, but its about
> >   linguistic (omitted..).
> 
> Hmm. It is "fetching" the range from the source server, and writing it
> to the target. The term makes more sense with a libpq source. Perhaps
> this function should be called "local_copy_range" or something, but
> it'd also be nice to have "fetch" in the name because the function
> pointer it's assigned to is called "queue_fetch_range".

Thanks. Yeah libpq_fetch_file makes sense. I agree to the name.
The refactoring looks good to me.

> > I'm going to continue reviewing this later.
> 
> Thanks! Attached is a new set of patches. The only meaningful change
> is in the 2nd patch, which I modified per your suggestion. Also, I
> moved the logic to decide each file's fate into a new subroutine
> called decide_file_action().

That's looks good.

0005:

+    /*
+     * We don't intend do any updates.  Put the connection in read-only mode
+     * to keep us honest.
+     */
     run_simple_command(conn, "SET default_transaction_read_only = off");

The comment is wrong since the time it was added by 0004 but that's
not a problem since it was to be fixed by 0005. However, we need the
variable turned on in order to be really honest:p

> /*
>  * Also check that full_page_writes is enabled.  We can get torn pages if
>  * a page is modified while we read it with pg_read_binary_file(), and we
>  * rely on full page images to fix them.
>  */
> str = run_simple_query(conn, "SHOW full_page_writes");
> if (strcmp(str, "on") != 0)
>     pg_fatal("full_page_writes must be enabled in the source server");
> pg_free(str);

This is a part not changed by this patch set. But If we allow to
connect to a standby, this check can be tricked by setting off on the
primary and "on" on the standby (FWIW, though). Some protection
measure might be necessary. (Maybe standby should be restricted to
have the same value with the primary.)


+            thislen = Min(len, CHUNK_SIZE - prev->length);
+            src->request_queue[src->num_requests - 1].length += thislen;

prev == &src->request_queue[src->num_requests - 1] here.


+        if (chunksize > rq->length)
+        {
+            pg_fatal("received more than requested for file \"%s\"",
+                     rq->path);
+            /* receiving less is OK, though */

Don't we need to truncate the target file, though?


+         * Source is a local data directory. It should've shut down cleanly,
+         * and we must to the latest shutdown checkpoint.

"and we must to the" => "and we must replay to the" ?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 6cfbb5a686e5b87159aed9769b1e52859bae02b4 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:35 +0300
Subject: [PATCH v3 1/5] pg_rewind: Move syncTargetDirectory() to file_ops.c

For consistency. All the other low-level functions that operate on the
target directory are in file_ops.c.
---
 src/bin/pg_rewind/file_ops.c  | 19 +++++++++++++++++++
 src/bin/pg_rewind/file_ops.h  |  1 +
 src/bin/pg_rewind/pg_rewind.c | 22 +---------------------
 src/bin/pg_rewind/pg_rewind.h |  1 +
 4 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index b3bf091c54..55439db20b 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -19,6 +19,7 @@
 #include <unistd.h>
 
 #include "common/file_perm.h"
+#include "common/file_utils.h"
 #include "file_ops.h"
 #include "filemap.h"
 #include "pg_rewind.h"
@@ -266,6 +267,24 @@ remove_target_symlink(const char *path)
                  dstpath);
 }
 
+/*
+ * Sync target data directory to ensure that modifications are safely on disk.
+ *
+ * We do this once, for the whole data directory, for performance reasons.  At
+ * the end of pg_rewind's run, the kernel is likely to already have flushed
+ * most dirty buffers to disk.  Additionally fsync_pgdata uses a two-pass
+ * approach (only initiating writeback in the first pass), which often reduces
+ * the overall amount of IO noticeably.
+ */
+void
+sync_target_dir(void)
+{
+    if (!do_sync || dry_run)
+        return;
+
+    fsync_pgdata(datadir_target, PG_VERSION_NUM);
+}
+
 
 /*
  * Read a file into memory. The file to be read is <datadir>/<path>.
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
index 025f24141c..d8466385cf 100644
--- a/src/bin/pg_rewind/file_ops.h
+++ b/src/bin/pg_rewind/file_ops.h
@@ -19,6 +19,7 @@ extern void remove_target_file(const char *path, bool missing_ok);
 extern void truncate_target_file(const char *path, off_t newsize);
 extern void create_target(file_entry_t *t);
 extern void remove_target(file_entry_t *t);
+extern void sync_target_dir(void);
 
 extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
 
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 0ec52cb032..5a7ab764db 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -20,7 +20,6 @@
 #include "catalog/pg_control.h"
 #include "common/controldata_utils.h"
 #include "common/file_perm.h"
-#include "common/file_utils.h"
 #include "common/restricted_token.h"
 #include "common/string.h"
 #include "fe_utils/recovery_gen.h"
@@ -38,7 +37,6 @@ static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
 
 static void digestControlFile(ControlFileData *ControlFile, char *source,
                               size_t size);
-static void syncTargetDirectory(void);
 static void getRestoreCommand(const char *argv0);
 static void sanityChecks(void);
 static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@@ -455,7 +453,7 @@ main(int argc, char **argv)
 
     if (showprogress)
         pg_log_info("syncing target data directory");
-    syncTargetDirectory();
+    sync_target_dir();
 
     if (writerecoveryconf && !dry_run)
         WriteRecoveryConfig(conn, datadir_target,
@@ -803,24 +801,6 @@ digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
     checkControlFile(ControlFile);
 }
 
-/*
- * Sync target data directory to ensure that modifications are safely on disk.
- *
- * We do this once, for the whole data directory, for performance reasons.  At
- * the end of pg_rewind's run, the kernel is likely to already have flushed
- * most dirty buffers to disk.  Additionally fsync_pgdata uses a two-pass
- * approach (only initiating writeback in the first pass), which often reduces
- * the overall amount of IO noticeably.
- */
-static void
-syncTargetDirectory(void)
-{
-    if (!do_sync || dry_run)
-        return;
-
-    fsync_pgdata(datadir_target, PG_VERSION_NUM);
-}
-
 /*
  * Get value of GUC parameter restore_command from the target cluster.
  *
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
index 8a9319ed67..67f90c2a38 100644
--- a/src/bin/pg_rewind/pg_rewind.h
+++ b/src/bin/pg_rewind/pg_rewind.h
@@ -24,6 +24,7 @@ extern char *datadir_source;
 extern char *connstr_source;
 extern bool showprogress;
 extern bool dry_run;
+extern bool do_sync;
 extern int    WalSegSz;
 
 /* Target history */
-- 
2.18.4

From 3f96ae5ba68bdcba6dd32ae77f3b5d4f4dc5b1d6 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:37 +0300
Subject: [PATCH v3 2/5] Refactor pg_rewind for more clear decision making.

Deciding what to do with each file is now a separate step after all the
necessary information has been gathered. It is more clear that way.
Previously, the decision-making was divided between process_source_file()
and process_target_file(), and it was a bit hard to piece together what the
overall rules were.
---
 src/bin/pg_rewind/copy_fetch.c  |  14 +-
 src/bin/pg_rewind/file_ops.c    |  16 +-
 src/bin/pg_rewind/filemap.c     | 558 +++++++++++++++++---------------
 src/bin/pg_rewind/filemap.h     |  69 ++--
 src/bin/pg_rewind/libpq_fetch.c |  12 +-
 src/bin/pg_rewind/parsexlog.c   |   2 +-
 src/bin/pg_rewind/pg_rewind.c   |   8 +-
 7 files changed, 373 insertions(+), 306 deletions(-)

diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index 1edab5f186..18fad32600 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -210,7 +210,7 @@ copy_executeFileMap(filemap_t *map)
     for (i = 0; i < map->narray; i++)
     {
         entry = map->array[i];
-        execute_pagemap(&entry->pagemap, entry->path);
+        execute_pagemap(&entry->target_modified_pages, entry->path);
 
         switch (entry->action)
         {
@@ -219,16 +219,16 @@ copy_executeFileMap(filemap_t *map)
                 break;
 
             case FILE_ACTION_COPY:
-                rewind_copy_file_range(entry->path, 0, entry->newsize, true);
+                rewind_copy_file_range(entry->path, 0, entry->source_size, true);
                 break;
 
             case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->newsize);
+                truncate_target_file(entry->path, entry->source_size);
                 break;
 
             case FILE_ACTION_COPY_TAIL:
-                rewind_copy_file_range(entry->path, entry->oldsize,
-                                       entry->newsize, false);
+                rewind_copy_file_range(entry->path, entry->target_size,
+                                       entry->source_size, false);
                 break;
 
             case FILE_ACTION_CREATE:
@@ -238,6 +238,10 @@ copy_executeFileMap(filemap_t *map)
             case FILE_ACTION_REMOVE:
                 remove_target(entry);
                 break;
+
+            case FILE_ACTION_UNDECIDED:
+                pg_fatal("no action decided for \"%s\"", entry->path);
+                break;
         }
     }
 
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index 55439db20b..ec37d0b2e0 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -126,8 +126,9 @@ void
 remove_target(file_entry_t *entry)
 {
     Assert(entry->action == FILE_ACTION_REMOVE);
+    Assert(entry->target_exists);
 
-    switch (entry->type)
+    switch (entry->target_type)
     {
         case FILE_TYPE_DIRECTORY:
             remove_target_dir(entry->path);
@@ -140,6 +141,10 @@ remove_target(file_entry_t *entry)
         case FILE_TYPE_SYMLINK:
             remove_target_symlink(entry->path);
             break;
+
+        case FILE_TYPE_UNDEFINED:
+            pg_fatal("undefined file type for \"%s\"", entry->path);
+            break;
     }
 }
 
@@ -147,21 +152,26 @@ void
 create_target(file_entry_t *entry)
 {
     Assert(entry->action == FILE_ACTION_CREATE);
+    Assert(!entry->target_exists);
 
-    switch (entry->type)
+    switch (entry->source_type)
     {
         case FILE_TYPE_DIRECTORY:
             create_target_dir(entry->path);
             break;
 
         case FILE_TYPE_SYMLINK:
-            create_target_symlink(entry->path, entry->link_target);
+            create_target_symlink(entry->path, entry->source_link_target);
             break;
 
         case FILE_TYPE_REGULAR:
             /* can't happen. Regular files are created with open_target_file. */
             pg_fatal("invalid action (CREATE) for regular file");
             break;
+
+        case FILE_TYPE_UNDEFINED:
+            pg_fatal("undefined file type for \"%s\"", entry->path);
+            break;
     }
 }
 
diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
index 1abc257177..6d19d2be61 100644
--- a/src/bin/pg_rewind/filemap.c
+++ b/src/bin/pg_rewind/filemap.c
@@ -26,6 +26,8 @@ static bool isRelDataFile(const char *path);
 static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
                          BlockNumber segno);
 static int    path_cmp(const void *a, const void *b);
+
+static file_entry_t *get_filemap_entry(const char *path, bool create);
 static int    final_filemap_cmp(const void *a, const void *b);
 static void filemap_list_to_array(filemap_t *map);
 static bool check_file_excluded(const char *path, bool is_source);
@@ -146,6 +148,65 @@ filemap_create(void)
     filemap = map;
 }
 
+/* Look up or create entry for 'path' */
+static file_entry_t *
+get_filemap_entry(const char *path, bool create)
+{
+    filemap_t  *map = filemap;
+    file_entry_t *entry;
+    file_entry_t **e;
+    file_entry_t key;
+    file_entry_t *key_ptr;
+
+    if (map->array)
+    {
+        key.path = (char *) path;
+        key_ptr = &key;
+        e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
+                    path_cmp);
+    }
+    else
+        e = NULL;
+
+    if (e)
+        entry = *e;
+    else if (!create)
+        entry = NULL;
+    else
+    {
+        /* Create a new entry for this file */
+        entry = pg_malloc(sizeof(file_entry_t));
+        entry->path = pg_strdup(path);
+        entry->isrelfile = isRelDataFile(path);
+        entry->action = FILE_ACTION_UNDECIDED;
+
+        entry->target_exists = false;
+        entry->target_type = FILE_TYPE_UNDEFINED;
+        entry->target_size = 0;
+        entry->target_link_target = NULL;
+        entry->target_modified_pages.bitmap = NULL;
+        entry->target_modified_pages.bitmapsize = 0;
+
+        entry->source_exists = false;
+        entry->source_type = FILE_TYPE_UNDEFINED;
+        entry->source_size = 0;
+        entry->source_link_target = NULL;
+
+        entry->next = NULL;
+
+        if (map->last)
+        {
+            map->last->next = entry;
+            map->last = entry;
+        }
+        else
+            map->first = map->last = entry;
+        map->nlist++;
+    }
+
+    return entry;
+}
+
 /*
  * Callback for processing source file list.
  *
@@ -154,25 +215,12 @@ filemap_create(void)
  * exists in the target and whether the size matches.
  */
 void
-process_source_file(const char *path, file_type_t type, size_t newsize,
+process_source_file(const char *path, file_type_t type, size_t size,
                     const char *link_target)
 {
-    bool        exists;
-    char        localpath[MAXPGPATH];
-    struct stat statbuf;
-    filemap_t  *map = filemap;
-    file_action_t action = FILE_ACTION_NONE;
-    size_t        oldsize = 0;
     file_entry_t *entry;
 
-    Assert(map->array == NULL);
-
-    /*
-     * Skip any files matching the exclusion filters. This has the effect to
-     * remove all those files on the target.
-     */
-    if (check_file_excluded(path, true))
-        return;
+    Assert(filemap->array == NULL);
 
     /*
      * Pretend that pg_wal is a directory, even if it's really a symlink. We
@@ -182,16 +230,6 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
     if (strcmp(path, "pg_wal") == 0 && type == FILE_TYPE_SYMLINK)
         type = FILE_TYPE_DIRECTORY;
 
-    /*
-     * Skip temporary files, .../pgsql_tmp/... and .../pgsql_tmp.* in source.
-     * This has the effect that all temporary files in the destination will be
-     * removed.
-     */
-    if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL)
-        return;
-    if (strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
-        return;
-
     /*
      * sanity check: a filename that looks like a data file better be a
      * regular file
@@ -199,142 +237,12 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
     if (type != FILE_TYPE_REGULAR && isRelDataFile(path))
         pg_fatal("data file \"%s\" in source is not a regular file", path);
 
-    snprintf(localpath, sizeof(localpath), "%s/%s", datadir_target, path);
-
-    /* Does the corresponding file exist in the target data dir? */
-    if (lstat(localpath, &statbuf) < 0)
-    {
-        if (errno != ENOENT)
-            pg_fatal("could not stat file \"%s\": %m",
-                     localpath);
-
-        exists = false;
-    }
-    else
-        exists = true;
-
-    switch (type)
-    {
-        case FILE_TYPE_DIRECTORY:
-            if (exists && !S_ISDIR(statbuf.st_mode) && strcmp(path, "pg_wal") != 0)
-            {
-                /* it's a directory in source, but not in target. Strange.. */
-                pg_fatal("\"%s\" is not a directory", localpath);
-            }
-
-            if (!exists)
-                action = FILE_ACTION_CREATE;
-            else
-                action = FILE_ACTION_NONE;
-            oldsize = 0;
-            break;
-
-        case FILE_TYPE_SYMLINK:
-            if (exists &&
-#ifndef WIN32
-                !S_ISLNK(statbuf.st_mode)
-#else
-                !pgwin32_is_junction(localpath)
-#endif
-                )
-            {
-                /*
-                 * It's a symbolic link in source, but not in target.
-                 * Strange..
-                 */
-                pg_fatal("\"%s\" is not a symbolic link", localpath);
-            }
-
-            if (!exists)
-                action = FILE_ACTION_CREATE;
-            else
-                action = FILE_ACTION_NONE;
-            oldsize = 0;
-            break;
-
-        case FILE_TYPE_REGULAR:
-            if (exists && !S_ISREG(statbuf.st_mode))
-                pg_fatal("\"%s\" is not a regular file", localpath);
-
-            if (!exists || !isRelDataFile(path))
-            {
-                /*
-                 * File exists in source, but not in target. Or it's a
-                 * non-data file that we have no special processing for. Copy
-                 * it in toto.
-                 *
-                 * An exception: PG_VERSIONs should be identical, but avoid
-                 * overwriting it for paranoia.
-                 */
-                if (pg_str_endswith(path, "PG_VERSION"))
-                {
-                    action = FILE_ACTION_NONE;
-                    oldsize = statbuf.st_size;
-                }
-                else
-                {
-                    action = FILE_ACTION_COPY;
-                    oldsize = 0;
-                }
-            }
-            else
-            {
-                /*
-                 * It's a data file that exists in both.
-                 *
-                 * If it's larger in target, we can truncate it. There will
-                 * also be a WAL record of the truncation in the source
-                 * system, so WAL replay would eventually truncate the target
-                 * too, but we might as well do it now.
-                 *
-                 * If it's smaller in the target, it means that it has been
-                 * truncated in the target, or enlarged in the source, or
-                 * both. If it was truncated in the target, we need to copy
-                 * the missing tail from the source system. If it was enlarged
-                 * in the source system, there will be WAL records in the
-                 * source system for the new blocks, so we wouldn't need to
-                 * copy them here. But we don't know which scenario we're
-                 * dealing with, and there's no harm in copying the missing
-                 * blocks now, so do it now.
-                 *
-                 * If it's the same size, do nothing here. Any blocks modified
-                 * in the target will be copied based on parsing the target
-                 * system's WAL, and any blocks modified in the source will be
-                 * updated after rewinding, when the source system's WAL is
-                 * replayed.
-                 */
-                oldsize = statbuf.st_size;
-                if (oldsize < newsize)
-                    action = FILE_ACTION_COPY_TAIL;
-                else if (oldsize > newsize)
-                    action = FILE_ACTION_TRUNCATE;
-                else
-                    action = FILE_ACTION_NONE;
-            }
-            break;
-    }
-
-    /* Create a new entry for this file */
-    entry = pg_malloc(sizeof(file_entry_t));
-    entry->path = pg_strdup(path);
-    entry->type = type;
-    entry->action = action;
-    entry->oldsize = oldsize;
-    entry->newsize = newsize;
-    entry->link_target = link_target ? pg_strdup(link_target) : NULL;
-    entry->next = NULL;
-    entry->pagemap.bitmap = NULL;
-    entry->pagemap.bitmapsize = 0;
-    entry->isrelfile = isRelDataFile(path);
-
-    if (map->last)
-    {
-        map->last->next = entry;
-        map->last = entry;
-    }
-    else
-        map->first = map->last = entry;
-    map->nlist++;
+    /* Remember this source file */
+    entry = get_filemap_entry(path, true);
+    entry->source_exists = true;
+    entry->source_type = type;
+    entry->source_size = size;
+    entry->source_link_target = link_target ? pg_strdup(link_target) : NULL;
 }
 
 /*
@@ -345,12 +253,9 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
  * deletion.
  */
 void
-process_target_file(const char *path, file_type_t type, size_t oldsize,
+process_target_file(const char *path, file_type_t type, size_t size,
                     const char *link_target)
 {
-    bool        exists;
-    file_entry_t key;
-    file_entry_t *key_ptr;
     filemap_t  *map = filemap;
     file_entry_t *entry;
 
@@ -377,120 +282,76 @@ process_target_file(const char *path, file_type_t type, size_t oldsize,
     }
 
     /*
-     * Like in process_source_file, pretend that xlog is always a  directory.
+     * Like in process_source_file, pretend that pg_wal is always a directory.
      */
     if (strcmp(path, "pg_wal") == 0 && type == FILE_TYPE_SYMLINK)
         type = FILE_TYPE_DIRECTORY;
 
-    key.path = (char *) path;
-    key_ptr = &key;
-    exists = (bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
-                      path_cmp) != NULL);
-
-    /* Remove any file or folder that doesn't exist in the source system. */
-    if (!exists)
-    {
-        entry = pg_malloc(sizeof(file_entry_t));
-        entry->path = pg_strdup(path);
-        entry->type = type;
-        entry->action = FILE_ACTION_REMOVE;
-        entry->oldsize = oldsize;
-        entry->newsize = 0;
-        entry->link_target = link_target ? pg_strdup(link_target) : NULL;
-        entry->next = NULL;
-        entry->pagemap.bitmap = NULL;
-        entry->pagemap.bitmapsize = 0;
-        entry->isrelfile = isRelDataFile(path);
-
-        if (map->last == NULL)
-            map->first = entry;
-        else
-            map->last->next = entry;
-        map->last = entry;
-        map->nlist++;
-    }
-    else
-    {
-        /*
-         * We already handled all files that exist in the source system in
-         * process_source_file().
-         */
-    }
+    /* Remember this target file */
+    entry = get_filemap_entry(path, true);
+    entry->target_exists = true;
+    entry->target_type = type;
+    entry->target_size = size;
+    entry->target_link_target = link_target ? pg_strdup(link_target) : NULL;
 }
 
 /*
  * This callback gets called while we read the WAL in the target, for every
- * block that have changed in the target system. It makes note of all the
+ * block that have changed in the target system.  It makes note of all the
  * changed blocks in the pagemap of the file.
+ *
+ * NOTE: All the files on both systems must have already been added to the
+ * file map!
  */
 void
-process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
+process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
+                                BlockNumber blkno)
 {
     char       *path;
-    file_entry_t key;
-    file_entry_t *key_ptr;
     file_entry_t *entry;
     BlockNumber blkno_inseg;
     int            segno;
-    filemap_t  *map = filemap;
-    file_entry_t **e;
 
-    Assert(map->array);
+    Assert(filemap->array);
 
     segno = blkno / RELSEG_SIZE;
     blkno_inseg = blkno % RELSEG_SIZE;
 
     path = datasegpath(rnode, forknum, segno);
-
-    key.path = (char *) path;
-    key_ptr = &key;
-
-    e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
-                path_cmp);
-    if (e)
-        entry = *e;
-    else
-        entry = NULL;
+    entry = get_filemap_entry(path, false);
     pfree(path);
 
     if (entry)
     {
+        int64        end_offset;
+
         Assert(entry->isrelfile);
 
-        switch (entry->action)
-        {
-            case FILE_ACTION_NONE:
-            case FILE_ACTION_TRUNCATE:
-                /* skip if we're truncating away the modified block anyway */
-                if ((blkno_inseg + 1) * BLCKSZ <= entry->newsize)
-                    datapagemap_add(&entry->pagemap, blkno_inseg);
-                break;
-
-            case FILE_ACTION_COPY_TAIL:
-
-                /*
-                 * skip the modified block if it is part of the "tail" that
-                 * we're copying anyway.
-                 */
-                if ((blkno_inseg + 1) * BLCKSZ <= entry->oldsize)
-                    datapagemap_add(&entry->pagemap, blkno_inseg);
-                break;
-
-            case FILE_ACTION_COPY:
-            case FILE_ACTION_REMOVE:
-                break;
-
-            case FILE_ACTION_CREATE:
-                pg_fatal("unexpected page modification for directory or symbolic link \"%s\"", entry->path);
-        }
+        if (entry->target_type != FILE_TYPE_REGULAR)
+            pg_fatal("unexpected page modification for directory or symbolic link \"%s\"",
+                     entry->path);
+
+        /*
+         * If the block beyond the EOF in the source system, no need to
+         * remember it now, because we're going to truncate it away from the
+         * target anyway. Also no need to remember the block if it's beyond
+         * the current EOF in the target system; we will copy it over with the
+         * "tail" from the source system, anyway.
+         */
+        end_offset = (blkno_inseg + 1) * BLCKSZ;
+        if (end_offset <= entry->source_size &&
+            end_offset <= entry->target_size)
+            datapagemap_add(&entry->target_modified_pages, blkno_inseg);
     }
     else
     {
         /*
          * If we don't have any record of this file in the file map, it means
-         * that it's a relation that doesn't exist in the source system, and
-         * it was subsequently removed in the target system, too. We can
-         * safely ignore it.
+         * that it's a relation that doesn't exist in the source system.  It
+         * could exist in the target system; we haven't moved the target-only
+         * entries from the linked list to the array yet!  But in any case, if
+         * it doesn't exist in the source it will be removed from the target
+         * too, and we can safely ignore it.
          */
     }
 }
@@ -581,16 +442,6 @@ filemap_list_to_array(filemap_t *map)
     map->first = map->last = NULL;
 }
 
-void
-filemap_finalize(void)
-{
-    filemap_t  *map = filemap;
-
-    filemap_list_to_array(map);
-    qsort(map->array, map->narray, sizeof(file_entry_t *),
-          final_filemap_cmp);
-}
-
 static const char *
 action_to_str(file_action_t action)
 {
@@ -631,26 +482,26 @@ calculate_totals(void)
     {
         entry = map->array[i];
 
-        if (entry->type != FILE_TYPE_REGULAR)
+        if (entry->source_type != FILE_TYPE_REGULAR)
             continue;
 
-        map->total_size += entry->newsize;
+        map->total_size += entry->source_size;
 
         if (entry->action == FILE_ACTION_COPY)
         {
-            map->fetch_size += entry->newsize;
+            map->fetch_size += entry->source_size;
             continue;
         }
 
         if (entry->action == FILE_ACTION_COPY_TAIL)
-            map->fetch_size += (entry->newsize - entry->oldsize);
+            map->fetch_size += (entry->source_size - entry->target_size);
 
-        if (entry->pagemap.bitmapsize > 0)
+        if (entry->target_modified_pages.bitmapsize > 0)
         {
             datapagemap_iterator_t *iter;
             BlockNumber blk;
 
-            iter = datapagemap_iterate(&entry->pagemap);
+            iter = datapagemap_iterate(&entry->target_modified_pages);
             while (datapagemap_next(iter, &blk))
                 map->fetch_size += BLCKSZ;
 
@@ -670,13 +521,13 @@ print_filemap(void)
     {
         entry = map->array[i];
         if (entry->action != FILE_ACTION_NONE ||
-            entry->pagemap.bitmapsize > 0)
+            entry->target_modified_pages.bitmapsize > 0)
         {
             pg_log_debug("%s (%s)", entry->path,
                          action_to_str(entry->action));
 
-            if (entry->pagemap.bitmapsize > 0)
-                datapagemap_print(&entry->pagemap);
+            if (entry->target_modified_pages.bitmapsize > 0)
+                datapagemap_print(&entry->target_modified_pages);
         }
     }
     fflush(stdout);
@@ -825,3 +676,180 @@ final_filemap_cmp(const void *a, const void *b)
     else
         return strcmp(fa->path, fb->path);
 }
+
+/*
+ * Decide what action to perform to a file.
+ */
+static file_action_t
+decide_file_action(file_entry_t *entry)
+{
+    const char *path = entry->path;
+
+    /*
+     * Don't touch the control file. It is handled specially, after copying
+     * all the other files.
+     */
+    if (strcmp(path, "global/pg_control") == 0)
+        return FILE_ACTION_NONE;
+
+    /*
+     * Remove all files matching the exclusion filters in the target.
+     */
+    if (check_file_excluded(path, true))
+    {
+        if (entry->target_exists)
+            return FILE_ACTION_REMOVE;
+        else
+            return FILE_ACTION_NONE;
+    }
+
+    /*
+     * Also remove all temporary files, .../pgsql_tmp/... and .../pgsql_tmp.*
+     * in the target.
+     */
+    if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL ||
+        strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
+    {
+        if (entry->target_exists)
+            return FILE_ACTION_REMOVE;
+        else
+            return FILE_ACTION_NONE;
+    }
+
+    /*
+     * Handle cases where the file is missing from one of the systems.
+     */
+    if (!entry->target_exists && entry->source_exists)
+    {
+        /*
+         * File exists in source, but not in target. Copy it in toto. (If it's
+         * a relation data file, WAL replay after rewinding should re-create
+         * it anyway. But there's no harm in copying it now.)
+         */
+        switch (entry->source_type)
+        {
+            case FILE_TYPE_DIRECTORY:
+            case FILE_TYPE_SYMLINK:
+                return FILE_ACTION_CREATE;
+            case FILE_TYPE_REGULAR:
+                return FILE_ACTION_COPY;
+            case FILE_TYPE_UNDEFINED:
+                pg_fatal("unknown file type for \"%s\"", entry->path);
+                break;
+        }
+    }
+    else if (entry->target_exists && !entry->source_exists)
+    {
+        /* File exists in target, but not source. Remove it. */
+        return FILE_ACTION_REMOVE;
+    }
+    else if (!entry->target_exists && !entry->source_exists)
+    {
+        /*
+         * Doesn't exist in either server. Why does it have an entry in the
+         * first place??
+         */
+        return FILE_ACTION_NONE;
+    }
+
+    /*
+     * Otherwise, the file exists on both systems
+     */
+    Assert(entry->target_exists && entry->source_exists);
+
+    if (entry->source_type != entry->target_type)
+    {
+        /* But it's a different kind of object. Strange.. */
+        pg_fatal("file \"%s\" is of different type in source and target", entry->path);
+    }
+
+    /*
+     * PG_VERSION files should be identical on both systems, but avoid
+     * overwriting them for paranoia.
+     */
+    if (pg_str_endswith(entry->path, "PG_VERSION"))
+        return FILE_ACTION_NONE;
+
+    switch (entry->source_type)
+    {
+        case FILE_TYPE_DIRECTORY:
+            return FILE_ACTION_NONE;
+
+        case FILE_TYPE_SYMLINK:
+            /* FIXME: Check if it points to the same target? */
+            return FILE_ACTION_NONE;
+
+        case FILE_TYPE_REGULAR:
+            if (!entry->isrelfile)
+            {
+                /*
+                 * It's a non-data file that we have no special processing
+                 * for. Copy it in toto.
+                 */
+                return FILE_ACTION_COPY;
+            }
+            else
+            {
+                /*
+                 * It's a data file that exists in both systems.
+                 *
+                 * If it's larger in target, we can truncate it. There will
+                 * also be a WAL record of the truncation in the source
+                 * system, so WAL replay would eventually truncate the target
+                 * too, but we might as well do it now.
+                 *
+                 * If it's smaller in the target, it means that it has been
+                 * truncated in the target, or enlarged in the source, or
+                 * both. If it was truncated in the target, we need to copy
+                 * the missing tail from the source system. If it was enlarged
+                 * in the source system, there will be WAL records in the
+                 * source system for the new blocks, so we wouldn't need to
+                 * copy them here. But we don't know which scenario we're
+                 * dealing with, and there's no harm in copying the missing
+                 * blocks now, so do it now.
+                 *
+                 * If it's the same size, do nothing here. Any blocks modified
+                 * in the target will be copied based on parsing the target
+                 * system's WAL, and any blocks modified in the source will be
+                 * updated after rewinding, when the source system's WAL is
+                 * replayed.
+                 */
+                if (entry->target_size < entry->source_size)
+                    return FILE_ACTION_COPY_TAIL;
+                else if (entry->target_size > entry->source_size)
+                    return FILE_ACTION_TRUNCATE;
+                else
+                    return FILE_ACTION_NONE;
+            }
+            break;
+
+        case FILE_TYPE_UNDEFINED:
+            pg_fatal("unknown file type for \"%s\"", path);
+            break;
+    }
+
+    /* unreachable */
+    pg_fatal("could not decide what to do with file \"%s\"", path);
+}
+
+/*
+ * Decide what to do with each file.
+ */
+void
+filemap_finalize()
+{
+    int            i;
+
+    filemap_list_to_array(filemap);
+
+    for (i = 0; i < filemap->narray; i++)
+    {
+        file_entry_t *entry = filemap->array[i];
+
+        entry->action = decide_file_action(entry);
+    }
+
+    /* Sort the actions to the order that they should be performed */
+    qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
+          final_filemap_cmp);
+}
diff --git a/src/bin/pg_rewind/filemap.h b/src/bin/pg_rewind/filemap.h
index 0cb7425170..a5e8df57f4 100644
--- a/src/bin/pg_rewind/filemap.h
+++ b/src/bin/pg_rewind/filemap.h
@@ -14,17 +14,21 @@
 
 /*
  * For every file found in the local or remote system, we have a file entry
- * which says what we are going to do with the file. For relation files,
- * there is also a page map, marking pages in the file that were changed
- * locally.
- *
- * The enum values are sorted in the order we want actions to be processed.
+ * that contains information about the file on both systems.  For relation
+ * files, there is also a page map that marks pages in the file that were
+ * changed in the target after the last common checkpoint.  Each entry also
+ * contains an 'action' field, which says what we are going to do with the
+ * file.
  */
+
+/* these enum values are sorted in the order we want actions to be processed */
 typedef enum
 {
+    FILE_ACTION_UNDECIDED = 0,    /* not decided yet */
+
     FILE_ACTION_CREATE,            /* create local directory or symbolic link */
     FILE_ACTION_COPY,            /* copy whole file, overwriting if exists */
-    FILE_ACTION_COPY_TAIL,        /* copy tail from 'oldsize' to 'newsize' */
+    FILE_ACTION_COPY_TAIL,        /* copy tail from 'source_size' to 'target_size' */
     FILE_ACTION_NONE,            /* no action (we might still copy modified
                                  * blocks based on the parsed WAL) */
     FILE_ACTION_TRUNCATE,        /* truncate local file to 'newsize' bytes */
@@ -33,6 +37,8 @@ typedef enum
 
 typedef enum
 {
+    FILE_TYPE_UNDEFINED = 0,
+
     FILE_TYPE_REGULAR,
     FILE_TYPE_DIRECTORY,
     FILE_TYPE_SYMLINK
@@ -41,19 +47,30 @@ typedef enum
 typedef struct file_entry_t
 {
     char       *path;
-    file_type_t type;
-
-    file_action_t action;
-
-    /* for a regular file */
-    size_t        oldsize;
-    size_t        newsize;
     bool        isrelfile;        /* is it a relation data file? */
 
-    datapagemap_t pagemap;
+    /*
+     * Status of the file in the target.
+     */
+    bool        target_exists;
+    file_type_t target_type;
+    size_t        target_size; /* for a regular file */
+    char       *target_link_target; /* for a symlink */
 
-    /* for a symlink */
-    char       *link_target;
+    datapagemap_t target_modified_pages;
+
+    /*
+     * Status of the file in the source.
+     */
+    bool        source_exists;
+    file_type_t source_type;
+    size_t        source_size;
+    char       *source_link_target; /* for a symlink */
+
+    /*
+     * What will we do to the file?
+     */
+    file_action_t action;
 
     struct file_entry_t *next;
 } file_entry_t;
@@ -70,20 +87,19 @@ typedef struct filemap_t
 
     /*
      * After processing all the remote files, the entries in the linked list
-     * are moved to this array. After processing local files, too, all the
+     * are moved to this array.  After processing local files, too, all the
      * local entries are added to the array by filemap_finalize, and sorted in
-     * the final order. After filemap_finalize, all the entries are in the
+     * the final order.  After filemap_finalize, all the entries are in the
      * array, and the linked list is empty.
      */
     file_entry_t **array;
     int            narray;            /* current length of array */
 
     /*
-     * Summary information. total_size is the total size of the source
-     * cluster, and fetch_size is the number of bytes that needs to be copied.
+     * Summary information.
      */
-    uint64        total_size;
-    uint64        fetch_size;
+    uint64        total_size;        /* total size of the source cluster */
+    uint64        fetch_size;        /* number of bytes that needs to be copied */
 } filemap_t;
 
 extern filemap_t *filemap;
@@ -94,11 +110,12 @@ extern void print_filemap(void);
 
 /* Functions for populating the filemap */
 extern void process_source_file(const char *path, file_type_t type,
-                                size_t newsize, const char *link_target);
+                                size_t size, const char *link_target);
 extern void process_target_file(const char *path, file_type_t type,
-                                size_t newsize, const char *link_target);
-extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
-                                 BlockNumber blkno);
+                                size_t size, const char *link_target);
+extern void process_target_wal_block_change(ForkNumber forknum,
+                                            RelFileNode rnode,
+                                            BlockNumber blkno);
 extern void filemap_finalize(void);
 
 #endif                            /* FILEMAP_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index bf4dfc23b9..7fc9161b8c 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -465,7 +465,7 @@ libpq_executeFileMap(filemap_t *map)
         entry = map->array[i];
 
         /* If this is a relation file, copy the modified blocks */
-        execute_pagemap(&entry->pagemap, entry->path);
+        execute_pagemap(&entry->target_modified_pages, entry->path);
 
         switch (entry->action)
         {
@@ -476,15 +476,15 @@ libpq_executeFileMap(filemap_t *map)
             case FILE_ACTION_COPY:
                 /* Truncate the old file out of the way, if any */
                 open_target_file(entry->path, true);
-                fetch_file_range(entry->path, 0, entry->newsize);
+                fetch_file_range(entry->path, 0, entry->source_size);
                 break;
 
             case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->newsize);
+                truncate_target_file(entry->path, entry->source_size);
                 break;
 
             case FILE_ACTION_COPY_TAIL:
-                fetch_file_range(entry->path, entry->oldsize, entry->newsize);
+                fetch_file_range(entry->path, entry->target_size, entry->source_size);
                 break;
 
             case FILE_ACTION_REMOVE:
@@ -494,6 +494,10 @@ libpq_executeFileMap(filemap_t *map)
             case FILE_ACTION_CREATE:
                 create_target(entry);
                 break;
+
+            case FILE_ACTION_UNDECIDED:
+                pg_fatal("no action decided for \"%s\"", entry->path);
+                break;
         }
     }
 
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 2229c86f9a..2baeb74ae9 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -436,6 +436,6 @@ extractPageInfo(XLogReaderState *record)
         if (forknum != MAIN_FORKNUM)
             continue;
 
-        process_block_change(forknum, rnode, blkno);
+        process_target_wal_block_change(forknum, rnode, blkno);
     }
 }
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 5a7ab764db..e0ed1759cb 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -369,7 +369,7 @@ main(int argc, char **argv)
                 chkpttli);
 
     /*
-     * Build the filemap, by comparing the source and target data directories.
+     * Collect information about all files in the target and source systems.
      */
     filemap_create();
     if (showprogress)
@@ -390,8 +390,12 @@ main(int argc, char **argv)
         pg_log_info("reading WAL in target");
     extractPageMap(datadir_target, chkptrec, lastcommontliIndex,
                    ControlFile_target.checkPoint, restore_command);
+
+    /*
+     * We have collected all information we need from both systems. Decide
+     * what to do with each file.
+     */
     filemap_finalize();
-
     if (showprogress)
         calculate_totals();
 
-- 
2.18.4

From 8c5bc72c286a5ca97a3df8e12d8c0fdf35456ead Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:39 +0300
Subject: [PATCH v3 3/5] pg_rewind: Replace the hybrid list+array data
 structure with simplehash.

Now that simplehash can be use in frontend code, let's make use of it.
---
 src/bin/pg_rewind/copy_fetch.c  |   4 +-
 src/bin/pg_rewind/fetch.c       |   2 +-
 src/bin/pg_rewind/fetch.h       |   2 +-
 src/bin/pg_rewind/filemap.c     | 286 ++++++++++++++------------------
 src/bin/pg_rewind/filemap.h     |  67 +++-----
 src/bin/pg_rewind/libpq_fetch.c |   4 +-
 src/bin/pg_rewind/pg_rewind.c   |  12 +-
 7 files changed, 168 insertions(+), 209 deletions(-)

diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index 18fad32600..61aed8018b 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -207,9 +207,9 @@ copy_executeFileMap(filemap_t *map)
     file_entry_t *entry;
     int            i;
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < map->nactions; i++)
     {
-        entry = map->array[i];
+        entry = map->actions[i];
         execute_pagemap(&entry->target_modified_pages, entry->path);
 
         switch (entry->action)
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
index f18fe5386e..f41d0f295e 100644
--- a/src/bin/pg_rewind/fetch.c
+++ b/src/bin/pg_rewind/fetch.c
@@ -37,7 +37,7 @@ fetchSourceFileList(void)
  * Fetch all relation data files that are marked in the given data page map.
  */
 void
-executeFileMap(void)
+execute_file_actions(filemap_t *filemap)
 {
     if (datadir_source)
         copy_executeFileMap(filemap);
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
index 7cf8b6ea09..b20df8b153 100644
--- a/src/bin/pg_rewind/fetch.h
+++ b/src/bin/pg_rewind/fetch.h
@@ -25,7 +25,7 @@
  */
 extern void fetchSourceFileList(void);
 extern char *fetchFile(const char *filename, size_t *filesize);
-extern void executeFileMap(void);
+extern void execute_file_actions(filemap_t *filemap);
 
 /* in libpq_fetch.c */
 extern void libpqProcessFileList(void);
diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
index 6d19d2be61..e6e037d1cd 100644
--- a/src/bin/pg_rewind/filemap.c
+++ b/src/bin/pg_rewind/filemap.c
@@ -3,6 +3,19 @@
  * filemap.c
  *      A data structure for keeping track of files that have changed.
  *
+ * This source file contains the logic to decide what to do with different
+ * kinds of files, and the data structure to support it.  Before modifying
+ * anything, pg_rewind collects information about all the files and their
+ * attributes in the target and source data directories.  It also scans the
+ * WAL log in the target, and collects information about data blocks that
+ * were changed.  All this information is stored in a hash table, using the
+ * file path, relative to the root of the data directory, as the key.
+ *
+ * After collecting all the information required, the filemap_finalize()
+ * function scans the hash table and decides what action needs to be taken
+ * for each file.  Finally, it sorts the array to the final order that the
+ * actions should be executed in.
+ *
  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
  *-------------------------------------------------------------------------
@@ -14,22 +27,39 @@
 #include <unistd.h>
 
 #include "catalog/pg_tablespace_d.h"
+#include "common/hashfn.h"
 #include "common/string.h"
 #include "datapagemap.h"
 #include "filemap.h"
 #include "pg_rewind.h"
 #include "storage/fd.h"
 
-filemap_t  *filemap = NULL;
+/*
+ * Define a hash table which we can use to store information about the files
+ * mentioned in the backup manifest.
+ */
+static uint32 hash_string_pointer(const char *s);
+#define SH_PREFIX        filehash
+#define SH_ELEMENT_TYPE    file_entry_t
+#define SH_KEY_TYPE        const char *
+#define    SH_KEY            path
+#define SH_HASH_KEY(tb, key)    hash_string_pointer(key)
+#define SH_EQUAL(tb, a, b)        (strcmp(a, b) == 0)
+#define    SH_SCOPE        static inline
+#define SH_RAW_ALLOCATOR    pg_malloc0
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+static filehash_hash *filehash;
 
 static bool isRelDataFile(const char *path);
 static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
                          BlockNumber segno);
-static int    path_cmp(const void *a, const void *b);
 
-static file_entry_t *get_filemap_entry(const char *path, bool create);
+static file_entry_t *insert_filehash_entry(const char *path);
+static file_entry_t *lookup_filehash_entry(const char *path);
 static int    final_filemap_cmp(const void *a, const void *b);
-static void filemap_list_to_array(filemap_t *map);
 static bool check_file_excluded(const char *path, bool is_source);
 
 /*
@@ -131,54 +161,26 @@ static const struct exclude_list_item excludeFiles[] =
 };
 
 /*
- * Create a new file map (stored in the global pointer "filemap").
+ * Initialize the hash table for the file map.
  */
 void
-filemap_create(void)
+filemap_init(void)
 {
-    filemap_t  *map;
-
-    map = pg_malloc(sizeof(filemap_t));
-    map->first = map->last = NULL;
-    map->nlist = 0;
-    map->array = NULL;
-    map->narray = 0;
-
-    Assert(filemap == NULL);
-    filemap = map;
+    filehash = filehash_create(1000, NULL);
 }
 
-/* Look up or create entry for 'path' */
+/* Look up entry for 'path', creating new one if it doesn't exists */
 static file_entry_t *
-get_filemap_entry(const char *path, bool create)
+insert_filehash_entry(const char *path)
 {
-    filemap_t  *map = filemap;
     file_entry_t *entry;
-    file_entry_t **e;
-    file_entry_t key;
-    file_entry_t *key_ptr;
+    bool        found;
 
-    if (map->array)
+    entry = filehash_insert(filehash, path, &found);
+    if (!found)
     {
-        key.path = (char *) path;
-        key_ptr = &key;
-        e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
-                    path_cmp);
-    }
-    else
-        e = NULL;
-
-    if (e)
-        entry = *e;
-    else if (!create)
-        entry = NULL;
-    else
-    {
-        /* Create a new entry for this file */
-        entry = pg_malloc(sizeof(file_entry_t));
         entry->path = pg_strdup(path);
         entry->isrelfile = isRelDataFile(path);
-        entry->action = FILE_ACTION_UNDECIDED;
 
         entry->target_exists = false;
         entry->target_type = FILE_TYPE_UNDEFINED;
@@ -192,21 +194,18 @@ get_filemap_entry(const char *path, bool create)
         entry->source_size = 0;
         entry->source_link_target = NULL;
 
-        entry->next = NULL;
-
-        if (map->last)
-        {
-            map->last->next = entry;
-            map->last = entry;
-        }
-        else
-            map->first = map->last = entry;
-        map->nlist++;
+        entry->action = FILE_ACTION_UNDECIDED;
     }
 
     return entry;
 }
 
+static file_entry_t *
+lookup_filehash_entry(const char *path)
+{
+    return filehash_lookup(filehash, path);
+}
+
 /*
  * Callback for processing source file list.
  *
@@ -220,8 +219,6 @@ process_source_file(const char *path, file_type_t type, size_t size,
 {
     file_entry_t *entry;
 
-    Assert(filemap->array == NULL);
-
     /*
      * Pretend that pg_wal is a directory, even if it's really a symlink. We
      * don't want to mess with the symlink itself, nor complain if it's a
@@ -238,7 +235,9 @@ process_source_file(const char *path, file_type_t type, size_t size,
         pg_fatal("data file \"%s\" in source is not a regular file", path);
 
     /* Remember this source file */
-    entry = get_filemap_entry(path, true);
+    entry = insert_filehash_entry(path);
+    if (entry->source_exists)
+        pg_fatal("duplicate source file \"%s\"", path);
     entry->source_exists = true;
     entry->source_type = type;
     entry->source_size = size;
@@ -256,7 +255,6 @@ void
 process_target_file(const char *path, file_type_t type, size_t size,
                     const char *link_target)
 {
-    filemap_t  *map = filemap;
     file_entry_t *entry;
 
     /*
@@ -265,22 +263,6 @@ process_target_file(const char *path, file_type_t type, size_t size,
      * the source data folder when processing the source files.
      */
 
-    if (map->array == NULL)
-    {
-        /* on first call, initialize lookup array */
-        if (map->nlist == 0)
-        {
-            /* should not happen */
-            pg_fatal("source file list is empty");
-        }
-
-        filemap_list_to_array(map);
-
-        Assert(map->array != NULL);
-
-        qsort(map->array, map->narray, sizeof(file_entry_t *), path_cmp);
-    }
-
     /*
      * Like in process_source_file, pretend that pg_wal is always a directory.
      */
@@ -288,7 +270,9 @@ process_target_file(const char *path, file_type_t type, size_t size,
         type = FILE_TYPE_DIRECTORY;
 
     /* Remember this target file */
-    entry = get_filemap_entry(path, true);
+    entry = insert_filehash_entry(path);
+    if (entry->target_exists)
+        pg_fatal("duplicate source file \"%s\"", path);
     entry->target_exists = true;
     entry->target_type = type;
     entry->target_size = size;
@@ -301,7 +285,7 @@ process_target_file(const char *path, file_type_t type, size_t size,
  * changed blocks in the pagemap of the file.
  *
  * NOTE: All the files on both systems must have already been added to the
- * file map!
+ * hash table!
  */
 void
 process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
@@ -312,47 +296,45 @@ process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
     BlockNumber blkno_inseg;
     int            segno;
 
-    Assert(filemap->array);
-
     segno = blkno / RELSEG_SIZE;
     blkno_inseg = blkno % RELSEG_SIZE;
 
     path = datasegpath(rnode, forknum, segno);
-    entry = get_filemap_entry(path, false);
+    entry = lookup_filehash_entry(path);
     pfree(path);
 
+    /*
+     * If the block still exists in both systems, remember it. Otherwise we
+     * can safely ignore it.
+     *
+     * If the block is beyond the EOF in the source system, or the file doesn't
+     * doesn'exist in the source at all, we're going to truncate/remove it away
+     * from the target anyway. Likewise, if it doesn't exist in the target
+     * anymore, we will copy it over with the "tail" from the source system,
+     * anyway.
+     *
+     * It is possible to find WAL for a file that doesn't exist on either
+     * system anymore. It means that the relation was dropped later in the
+     * target system, and independently on the source system too, or that
+     * it was created and dropped in the target system and it never existed
+     * in the source. Either way, we can safely ignore it.
+     */
     if (entry)
     {
-        int64        end_offset;
-
         Assert(entry->isrelfile);
 
         if (entry->target_type != FILE_TYPE_REGULAR)
             pg_fatal("unexpected page modification for directory or symbolic link \"%s\"",
                      entry->path);
 
-        /*
-         * If the block beyond the EOF in the source system, no need to
-         * remember it now, because we're going to truncate it away from the
-         * target anyway. Also no need to remember the block if it's beyond
-         * the current EOF in the target system; we will copy it over with the
-         * "tail" from the source system, anyway.
-         */
-        end_offset = (blkno_inseg + 1) * BLCKSZ;
-        if (end_offset <= entry->source_size &&
-            end_offset <= entry->target_size)
-            datapagemap_add(&entry->target_modified_pages, blkno_inseg);
-    }
-    else
-    {
-        /*
-         * If we don't have any record of this file in the file map, it means
-         * that it's a relation that doesn't exist in the source system.  It
-         * could exist in the target system; we haven't moved the target-only
-         * entries from the linked list to the array yet!  But in any case, if
-         * it doesn't exist in the source it will be removed from the target
-         * too, and we can safely ignore it.
-         */
+        if (entry->target_exists && entry->source_exists)
+        {
+            off_t        end_offset;
+
+            end_offset = (blkno_inseg + 1) * BLCKSZ;
+            if (end_offset <= entry->source_size && end_offset <= entry->target_size)
+                datapagemap_add(&entry->target_modified_pages, blkno_inseg);
+        }
     }
 }
 
@@ -414,34 +396,6 @@ check_file_excluded(const char *path, bool is_source)
     return false;
 }
 
-/*
- * Convert the linked list of entries in map->first/last to the array,
- * map->array.
- */
-static void
-filemap_list_to_array(filemap_t *map)
-{
-    int            narray;
-    file_entry_t *entry,
-               *next;
-
-    map->array = (file_entry_t **)
-        pg_realloc(map->array,
-                   (map->nlist + map->narray) * sizeof(file_entry_t *));
-
-    narray = map->narray;
-    for (entry = map->first; entry != NULL; entry = next)
-    {
-        map->array[narray++] = entry;
-        next = entry->next;
-        entry->next = NULL;
-    }
-    Assert(narray == map->nlist + map->narray);
-    map->narray = narray;
-    map->nlist = 0;
-    map->first = map->last = NULL;
-}
-
 static const char *
 action_to_str(file_action_t action)
 {
@@ -469,32 +423,31 @@ action_to_str(file_action_t action)
  * Calculate the totals needed for progress reports.
  */
 void
-calculate_totals(void)
+calculate_totals(filemap_t *filemap)
 {
     file_entry_t *entry;
     int            i;
-    filemap_t  *map = filemap;
 
-    map->total_size = 0;
-    map->fetch_size = 0;
+    filemap->total_size = 0;
+    filemap->fetch_size = 0;
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < filemap->nactions; i++)
     {
-        entry = map->array[i];
+        entry = filemap->actions[i];
 
         if (entry->source_type != FILE_TYPE_REGULAR)
             continue;
 
-        map->total_size += entry->source_size;
+        filemap->total_size += entry->source_size;
 
         if (entry->action == FILE_ACTION_COPY)
         {
-            map->fetch_size += entry->source_size;
+            filemap->fetch_size += entry->source_size;
             continue;
         }
 
         if (entry->action == FILE_ACTION_COPY_TAIL)
-            map->fetch_size += (entry->source_size - entry->target_size);
+            filemap->fetch_size += (entry->source_size - entry->target_size);
 
         if (entry->target_modified_pages.bitmapsize > 0)
         {
@@ -503,7 +456,7 @@ calculate_totals(void)
 
             iter = datapagemap_iterate(&entry->target_modified_pages);
             while (datapagemap_next(iter, &blk))
-                map->fetch_size += BLCKSZ;
+                filemap->fetch_size += BLCKSZ;
 
             pg_free(iter);
         }
@@ -511,15 +464,14 @@ calculate_totals(void)
 }
 
 void
-print_filemap(void)
+print_filemap(filemap_t *filemap)
 {
-    filemap_t  *map = filemap;
     file_entry_t *entry;
     int            i;
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < filemap->nactions; i++)
     {
-        entry = map->array[i];
+        entry = filemap->actions[i];
         if (entry->action != FILE_ACTION_NONE ||
             entry->target_modified_pages.bitmapsize > 0)
         {
@@ -641,15 +593,6 @@ datasegpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
         return path;
 }
 
-static int
-path_cmp(const void *a, const void *b)
-{
-    file_entry_t *fa = *((file_entry_t **) a);
-    file_entry_t *fb = *((file_entry_t **) b);
-
-    return strcmp(fa->path, fb->path);
-}
-
 /*
  * In the final stage, the filemap is sorted so that removals come last.
  * From disk space usage point of view, it would be better to do removals
@@ -835,21 +778,48 @@ decide_file_action(file_entry_t *entry)
 /*
  * Decide what to do with each file.
  */
-void
+filemap_t *
 filemap_finalize()
 {
     int            i;
+    filehash_iterator it;
+    file_entry_t *entry;
+    filemap_t  *filemap;
 
-    filemap_list_to_array(filemap);
-
-    for (i = 0; i < filemap->narray; i++)
+    filehash_start_iterate(filehash, &it);
+    while ((entry = filehash_iterate(filehash, &it)) != NULL)
     {
-        file_entry_t *entry = filemap->array[i];
-
         entry->action = decide_file_action(entry);
     }
 
-    /* Sort the actions to the order that they should be performed */
-    qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
+    /*
+     * Turn the hash table into an array, sorted in the order that the actions
+     * should be performed.
+     */
+    filemap = pg_malloc(offsetof(filemap_t, actions) +
+                        filehash->members * sizeof(file_entry_t *));
+    filemap->nactions = filehash->members;
+    filehash_start_iterate(filehash, &it);
+    i = 0;
+    while ((entry = filehash_iterate(filehash, &it)) != NULL)
+    {
+        filemap->actions[i++] = entry;
+    }
+
+    qsort(&filemap->actions, filemap->nactions, sizeof(file_entry_t *),
           final_filemap_cmp);
+
+    return filemap;
+}
+
+
+/*
+ * Helper function for filemap hash table.
+ */
+static uint32
+hash_string_pointer(const char *s)
+{
+    unsigned char *ss = (unsigned char *) s;
+
+    return hash_bytes(ss, strlen(s));
 }
diff --git a/src/bin/pg_rewind/filemap.h b/src/bin/pg_rewind/filemap.h
index a5e8df57f4..3660ffe099 100644
--- a/src/bin/pg_rewind/filemap.h
+++ b/src/bin/pg_rewind/filemap.h
@@ -12,15 +12,6 @@
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
-/*
- * For every file found in the local or remote system, we have a file entry
- * that contains information about the file on both systems.  For relation
- * files, there is also a page map that marks pages in the file that were
- * changed in the target after the last common checkpoint.  Each entry also
- * contains an 'action' field, which says what we are going to do with the
- * file.
- */
-
 /* these enum values are sorted in the order we want actions to be processed */
 typedef enum
 {
@@ -44,9 +35,21 @@ typedef enum
     FILE_TYPE_SYMLINK
 } file_type_t;
 
+/*
+ * For every file found in the local or remote system, we have a file entry
+ * that contains information about the file on both systems.  For relation
+ * files, there is also a page map that marks pages in the file that were
+ * changed in the target after the last common checkpoint.
+ *
+ * When gathering information, these are kept in a hash table, private to
+ * filemap.c. filemap_finalize() fills in the 'action' field, sorts all the
+ * entries, and returns them in an array, ready for executing the actions.
+ */
 typedef struct file_entry_t
 {
-    char       *path;
+    uint32        status;            /* hash status */
+
+    const char *path;
     bool        isrelfile;        /* is it a relation data file? */
 
     /*
@@ -71,44 +74,25 @@ typedef struct file_entry_t
      * What will we do to the file?
      */
     file_action_t action;
-
-    struct file_entry_t *next;
 } file_entry_t;
 
+/*
+ * This represents the final decisions on what to do with each file.
+ * 'actions' array contains an entry for each file, sorted in the order
+ * that their actions should executed.
+ */
 typedef struct filemap_t
 {
-    /*
-     * New entries are accumulated to a linked list, in process_source_file
-     * and process_target_file.
-     */
-    file_entry_t *first;
-    file_entry_t *last;
-    int            nlist;            /* number of entries currently in list */
-
-    /*
-     * After processing all the remote files, the entries in the linked list
-     * are moved to this array.  After processing local files, too, all the
-     * local entries are added to the array by filemap_finalize, and sorted in
-     * the final order.  After filemap_finalize, all the entries are in the
-     * array, and the linked list is empty.
-     */
-    file_entry_t **array;
-    int            narray;            /* current length of array */
-
-    /*
-     * Summary information.
-     */
+    /* Summary information, filled by calculate_totals() */
     uint64        total_size;        /* total size of the source cluster */
     uint64        fetch_size;        /* number of bytes that needs to be copied */
+
+    int            nactions;        /* size of 'actions' array */
+    file_entry_t *actions[FLEXIBLE_ARRAY_MEMBER];
 } filemap_t;
 
-extern filemap_t *filemap;
-
-extern void filemap_create(void);
-extern void calculate_totals(void);
-extern void print_filemap(void);
-
 /* Functions for populating the filemap */
+extern void filemap_init(void);
 extern void process_source_file(const char *path, file_type_t type,
                                 size_t size, const char *link_target);
 extern void process_target_file(const char *path, file_type_t type,
@@ -116,6 +100,9 @@ extern void process_target_file(const char *path, file_type_t type,
 extern void process_target_wal_block_change(ForkNumber forknum,
                                             RelFileNode rnode,
                                             BlockNumber blkno);
-extern void filemap_finalize(void);
+
+extern filemap_t *filemap_finalize(void);
+extern void calculate_totals(filemap_t *filemap);
+extern void print_filemap(filemap_t *filemap);
 
 #endif                            /* FILEMAP_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 7fc9161b8c..9c541bb73d 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -460,9 +460,9 @@ libpq_executeFileMap(filemap_t *map)
                  PQresultErrorMessage(res));
     PQclear(res);
 
-    for (i = 0; i < map->narray; i++)
+    for (i = 0; i < map->nactions; i++)
     {
-        entry = map->array[i];
+        entry = map->actions[i];
 
         /* If this is a relation file, copy the modified blocks */
         execute_pagemap(&entry->target_modified_pages, entry->path);
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index e0ed1759cb..13b4eab52b 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -129,6 +129,7 @@ main(int argc, char **argv)
     TimeLineID    endtli;
     ControlFileData ControlFile_new;
     bool        writerecoveryconf = false;
+    filemap_t  *filemap;
 
     pg_logging_init(argv[0]);
     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_rewind"));
@@ -371,10 +372,11 @@ main(int argc, char **argv)
     /*
      * Collect information about all files in the target and source systems.
      */
-    filemap_create();
     if (showprogress)
         pg_log_info("reading source file list");
+    filemap_init();
     fetchSourceFileList();
+
     if (showprogress)
         pg_log_info("reading target file list");
     traverse_datadir(datadir_target, &process_target_file);
@@ -395,13 +397,13 @@ main(int argc, char **argv)
      * We have collected all information we need from both systems. Decide
      * what to do with each file.
      */
-    filemap_finalize();
+    filemap = filemap_finalize();
     if (showprogress)
-        calculate_totals();
+        calculate_totals(filemap);
 
     /* this is too verbose even for verbose mode */
     if (debug)
-        print_filemap();
+        print_filemap(filemap);
 
     /*
      * Ok, we're ready to start copying things over.
@@ -421,7 +423,7 @@ main(int argc, char **argv)
      * modified the target directory and there is no turning back!
      */
 
-    executeFileMap();
+    execute_file_actions(filemap);
 
     progress_report(true);
 
-- 
2.18.4

From 45130573b47b1e8260387383a50012c0f9ebc263 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:41 +0300
Subject: [PATCH v3 4/5] pg_rewind: Refactor the abstraction to fetch from
 local/libpq source.

There copy_executeFileMap() and libpq_executeFileMap() contained basically
the same logic, just calling different functions to fetch the source files.
Refactor so that the common logic is in one place, execute_file_actions().

This makes the abstraction of a "source" server more clear, by introducing
a common abstract class, borrowing the object-oriented programming term,
that represents all the operations that can be done on the source server.
There are two implementations of it, one for fetching via libpq, and
another to fetch from a local directory. This adds some code, but makes it
easier to understand what's going on.
---
 src/bin/pg_rewind/copy_fetch.c  | 239 +++++----------------
 src/bin/pg_rewind/fetch.c       |  97 ++++++---
 src/bin/pg_rewind/fetch.h       |  76 +++++--
 src/bin/pg_rewind/file_ops.c    | 129 +++++++++++-
 src/bin/pg_rewind/file_ops.h    |   3 +
 src/bin/pg_rewind/libpq_fetch.c | 363 +++++++++++++++-----------------
 src/bin/pg_rewind/pg_rewind.c   |  70 ++++--
 src/bin/pg_rewind/pg_rewind.h   |   5 -
 8 files changed, 528 insertions(+), 454 deletions(-)

diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index 61aed8018b..9927a45a07 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * copy_fetch.c
- *      Functions for using a data directory as the source.
+ *      Functions for using a local data directory as the source.
  *
  * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
@@ -9,8 +9,6 @@
  */
 #include "postgres_fe.h"
 
-#include <sys/stat.h>
-#include <dirent.h>
 #include <fcntl.h>
 #include <unistd.h>
 
@@ -20,146 +18,70 @@
 #include "filemap.h"
 #include "pg_rewind.h"
 
-static void recurse_dir(const char *datadir, const char *path,
-                        process_file_callback_t callback);
+typedef struct
+{
+    rewind_source common;    /* common interface functions */
+
+    const char *datadir;    /* path to the source data directory */
+} local_source;
 
-static void execute_pagemap(datapagemap_t *pagemap, const char *path);
+static void local_traverse_files(rewind_source *source,
+                                 process_file_callback_t callback);
+static char *local_fetch_file(rewind_source *source, const char *path,
+                              size_t *filesize);
+static void local_fetch_file_range(rewind_source *source, const char *path,
+                                   uint64 off, size_t len);
+static void local_finish_fetch(rewind_source *source);
+static void local_destroy(rewind_source *source);
 
-/*
- * Traverse through all files in a data directory, calling 'callback'
- * for each file.
- */
-void
-traverse_datadir(const char *datadir, process_file_callback_t callback)
+rewind_source *
+init_local_source(const char *datadir)
 {
-    recurse_dir(datadir, NULL, callback);
+    local_source *src;
+
+    src = pg_malloc0(sizeof(local_source));
+
+    src->common.traverse_files = local_traverse_files;
+    src->common.fetch_file = local_fetch_file;
+    src->common.queue_fetch_range = local_fetch_file_range;
+    src->common.finish_fetch = local_finish_fetch;
+    src->common.get_current_wal_insert_lsn = NULL;
+    src->common.destroy = local_destroy;
+
+    src->datadir = datadir;
+
+    return &src->common;
 }
 
-/*
- * recursive part of traverse_datadir
- *
- * parentpath is the current subdirectory's path relative to datadir,
- * or NULL at the top level.
- */
 static void
-recurse_dir(const char *datadir, const char *parentpath,
-            process_file_callback_t callback)
+local_traverse_files(rewind_source *source, process_file_callback_t callback)
 {
-    DIR           *xldir;
-    struct dirent *xlde;
-    char        fullparentpath[MAXPGPATH];
+    traverse_datadir(((local_source *) source)->datadir, &process_source_file);
+}
 
-    if (parentpath)
-        snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
-    else
-        snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
-
-    xldir = opendir(fullparentpath);
-    if (xldir == NULL)
-        pg_fatal("could not open directory \"%s\": %m",
-                 fullparentpath);
-
-    while (errno = 0, (xlde = readdir(xldir)) != NULL)
-    {
-        struct stat fst;
-        char        fullpath[MAXPGPATH * 2];
-        char        path[MAXPGPATH * 2];
-
-        if (strcmp(xlde->d_name, ".") == 0 ||
-            strcmp(xlde->d_name, "..") == 0)
-            continue;
-
-        snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
-
-        if (lstat(fullpath, &fst) < 0)
-        {
-            if (errno == ENOENT)
-            {
-                /*
-                 * File doesn't exist anymore. This is ok, if the new primary
-                 * is running and the file was just removed. If it was a data
-                 * file, there should be a WAL record of the removal. If it
-                 * was something else, it couldn't have been anyway.
-                 *
-                 * TODO: But complain if we're processing the target dir!
-                 */
-            }
-            else
-                pg_fatal("could not stat file \"%s\": %m",
-                         fullpath);
-        }
-
-        if (parentpath)
-            snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
-        else
-            snprintf(path, sizeof(path), "%s", xlde->d_name);
-
-        if (S_ISREG(fst.st_mode))
-            callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
-        else if (S_ISDIR(fst.st_mode))
-        {
-            callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
-            /* recurse to handle subdirectories */
-            recurse_dir(datadir, path, callback);
-        }
-#ifndef WIN32
-        else if (S_ISLNK(fst.st_mode))
-#else
-        else if (pgwin32_is_junction(fullpath))
-#endif
-        {
-#if defined(HAVE_READLINK) || defined(WIN32)
-            char        link_target[MAXPGPATH];
-            int            len;
-
-            len = readlink(fullpath, link_target, sizeof(link_target));
-            if (len < 0)
-                pg_fatal("could not read symbolic link \"%s\": %m",
-                         fullpath);
-            if (len >= sizeof(link_target))
-                pg_fatal("symbolic link \"%s\" target is too long",
-                         fullpath);
-            link_target[len] = '\0';
-
-            callback(path, FILE_TYPE_SYMLINK, 0, link_target);
-
-            /*
-             * If it's a symlink within pg_tblspc, we need to recurse into it,
-             * to process all the tablespaces.  We also follow a symlink if
-             * it's for pg_wal.  Symlinks elsewhere are ignored.
-             */
-            if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
-                strcmp(path, "pg_wal") == 0)
-                recurse_dir(datadir, path, callback);
-#else
-            pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
-                     fullpath);
-#endif                            /* HAVE_READLINK */
-        }
-    }
-
-    if (errno)
-        pg_fatal("could not read directory \"%s\": %m",
-                 fullparentpath);
-
-    if (closedir(xldir))
-        pg_fatal("could not close directory \"%s\": %m",
-                 fullparentpath);
+static char *
+local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
+{
+    return slurpFile(((local_source *) source)->datadir, path, filesize);
 }
 
 /*
- * Copy a file from source to target, between 'begin' and 'end' offsets.
+ * Copy a file from source to target, starting at 'off', for 'len' bytes.
  *
  * If 'trunc' is true, any existing file with the same name is truncated.
  */
 static void
-rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
+local_fetch_file_range(rewind_source *source, const char *path, uint64 off,
+                       size_t len)
 {
+    const char *datadir = ((local_source *) source)->datadir;
     PGAlignedBlock buf;
     char        srcpath[MAXPGPATH];
     int            srcfd;
+    uint64        begin = off;
+    uint64        end = off + len;
 
-    snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir_source, path);
+    snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
 
     srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
     if (srcfd < 0)
@@ -169,7 +91,7 @@ rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
     if (lseek(srcfd, begin, SEEK_SET) == -1)
         pg_fatal("could not seek in source file: %m");
 
-    open_target_file(path, trunc);
+    open_target_file(path, false);
 
     while (end - begin > 0)
     {
@@ -197,70 +119,17 @@ rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
         pg_fatal("could not close file \"%s\": %m", srcpath);
 }
 
-/*
- * Copy all relation data files from datadir_source to datadir_target, which
- * are marked in the given data page map.
- */
-void
-copy_executeFileMap(filemap_t *map)
+static void
+local_finish_fetch(rewind_source *source)
 {
-    file_entry_t *entry;
-    int            i;
-
-    for (i = 0; i < map->nactions; i++)
-    {
-        entry = map->actions[i];
-        execute_pagemap(&entry->target_modified_pages, entry->path);
-
-        switch (entry->action)
-        {
-            case FILE_ACTION_NONE:
-                /* ok, do nothing.. */
-                break;
-
-            case FILE_ACTION_COPY:
-                rewind_copy_file_range(entry->path, 0, entry->source_size, true);
-                break;
-
-            case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->source_size);
-                break;
-
-            case FILE_ACTION_COPY_TAIL:
-                rewind_copy_file_range(entry->path, entry->target_size,
-                                       entry->source_size, false);
-                break;
-
-            case FILE_ACTION_CREATE:
-                create_target(entry);
-                break;
-
-            case FILE_ACTION_REMOVE:
-                remove_target(entry);
-                break;
-
-            case FILE_ACTION_UNDECIDED:
-                pg_fatal("no action decided for \"%s\"", entry->path);
-                break;
-        }
-    }
-
-    close_target_file();
+    /*
+     * Nothing to do, local_fetch_file_range() performs the fetching
+     * immediately.
+     */
 }
 
 static void
-execute_pagemap(datapagemap_t *pagemap, const char *path)
+local_destroy(rewind_source *source)
 {
-    datapagemap_iterator_t *iter;
-    BlockNumber blkno;
-    off_t        offset;
-
-    iter = datapagemap_iterate(pagemap);
-    while (datapagemap_next(iter, &blkno))
-    {
-        offset = blkno * BLCKSZ;
-        rewind_copy_file_range(path, offset, offset + BLCKSZ, false);
-        /* Ok, this block has now been copied from new data dir to old */
-    }
-    pg_free(iter);
+    pfree(source);
 }
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
index f41d0f295e..c8ee38f8e0 100644
--- a/src/bin/pg_rewind/fetch.c
+++ b/src/bin/pg_rewind/fetch.c
@@ -24,37 +24,78 @@
 #include "filemap.h"
 #include "pg_rewind.h"
 
-void
-fetchSourceFileList(void)
-{
-    if (datadir_source)
-        traverse_datadir(datadir_source, &process_source_file);
-    else
-        libpqProcessFileList();
-}
-
 /*
- * Fetch all relation data files that are marked in the given data page map.
+ * Execute the actions in the file map, fetching data from the source
+ * system as needed.
  */
 void
-execute_file_actions(filemap_t *filemap)
+execute_file_actions(filemap_t *filemap, rewind_source *source)
 {
-    if (datadir_source)
-        copy_executeFileMap(filemap);
-    else
-        libpq_executeFileMap(filemap);
-}
+    int            i;
 
-/*
- * Fetch a single file into a malloc'd buffer. The file size is returned
- * in *filesize. The returned buffer is always zero-terminated, which is
- * handy for text files.
- */
-char *
-fetchFile(const char *filename, size_t *filesize)
-{
-    if (datadir_source)
-        return slurpFile(datadir_source, filename, filesize);
-    else
-        return libpqGetFile(filename, filesize);
+    for (i = 0; i < filemap->nactions; i++)
+    {
+        file_entry_t *entry = filemap->actions[i];
+        datapagemap_iterator_t *iter;
+        BlockNumber blkno;
+        off_t        offset;
+
+        /*
+         * If this is a relation file, copy the modified blocks.
+         *
+         * This is in addition to any other changes.
+         */
+        iter = datapagemap_iterate(&entry->target_modified_pages);
+        while (datapagemap_next(iter, &blkno))
+        {
+            offset = blkno * BLCKSZ;
+
+            source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
+        }
+        pg_free(iter);
+
+        switch (entry->action)
+        {
+            case FILE_ACTION_NONE:
+                /* nothing else to do */
+                break;
+
+            case FILE_ACTION_COPY:
+                /* Truncate the old file out of the way, if any */
+                open_target_file(entry->path, true);
+                source->queue_fetch_range(source, entry->path,
+                                          0, entry->source_size);
+                break;
+
+            case FILE_ACTION_TRUNCATE:
+                truncate_target_file(entry->path, entry->source_size);
+                break;
+
+            case FILE_ACTION_COPY_TAIL:
+                source->queue_fetch_range(source, entry->path,
+                                          entry->target_size,
+                                          entry->source_size - entry->target_size);
+                break;
+
+            case FILE_ACTION_REMOVE:
+                remove_target(entry);
+                break;
+
+            case FILE_ACTION_CREATE:
+                create_target(entry);
+                break;
+
+            case FILE_ACTION_UNDECIDED:
+                pg_fatal("no action decided for \"%s\"", entry->path);
+                break;
+        }
+    }
+
+    /*
+     * We've now copied the list of file ranges that we need to fetch to the
+     * temporary table. Now, actually fetch all of those ranges. XXX
+     */
+    source->finish_fetch(source);
+
+    close_target_file();
 }
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
index b20df8b153..8be1a9582d 100644
--- a/src/bin/pg_rewind/fetch.h
+++ b/src/bin/pg_rewind/fetch.h
@@ -1,12 +1,12 @@
 /*-------------------------------------------------------------------------
  *
  * fetch.h
- *      Fetching data from a local or remote data directory.
+ *      Abstraction for fetching from source server.
  *
- * This file includes the prototypes for functions used to copy files from
- * one data directory to another. The source to copy from can be a local
- * directory (copy method), or a remote PostgreSQL server (libpq fetch
- * method).
+ * The source server can be either a libpq connection to a live system, or
+ * a local data directory. The 'rewind_source' struct abstracts the
+ * operations to fetch data from the source system, so that the rest of
+ * the code doesn't need to care what kind of a source its dealing with.
  *
  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
@@ -16,29 +16,63 @@
 #define FETCH_H
 
 #include "access/xlogdefs.h"
-
+#include "file_ops.h"
 #include "filemap.h"
+#include "libpq-fe.h"
+
+typedef struct rewind_source
+{
+    /*
+     * Traverse all files in the source data directory, and call 'callback'
+     * on each file.
+     */
+    void (*traverse_files) (struct rewind_source *,
+                            process_file_callback_t callback);
+
+    /*
+     * Fetch a single file into a malloc'd buffer. The file size is returned
+     * in *filesize. The returned buffer is always zero-terminated, which is
+     * handy for text files.
+     */
+    char *(*fetch_file) (struct rewind_source *, const char *path,
+                         size_t *filesize);
+
+    /*
+     * Request to fetch (part of) a file in the source system, and write it
+     * the corresponding file in the target system. The source implementation
+     * may queue up the request and execute it later when convenient. Call
+     * finish_fetch() to flush the queue and execute all requests.
+     */
+    void (*queue_fetch_range) (struct rewind_source *, const char *path,
+                               uint64 offset, size_t len);
+
+    /*
+     * Execute all requests queued up with queue_fetch_range().
+     */
+    void (*finish_fetch) (struct rewind_source *);
+
+    /*
+     * Get the current WAL insert position in the source system.
+     */
+    XLogRecPtr (*get_current_wal_insert_lsn) (struct rewind_source *);
+
+    /*
+     * Free this rewind_source object.
+     */
+    void (*destroy) (struct rewind_source *);
+
+} rewind_source;
+
 
 /*
- * Common interface. Calls the copy or libpq method depending on global
- * config options.
+ * Execute all the actions in 'filemap'.
  */
-extern void fetchSourceFileList(void);
-extern char *fetchFile(const char *filename, size_t *filesize);
-extern void execute_file_actions(filemap_t *filemap);
+extern void execute_file_actions(filemap_t *filemap, rewind_source *source);
 
 /* in libpq_fetch.c */
-extern void libpqProcessFileList(void);
-extern char *libpqGetFile(const char *filename, size_t *filesize);
-extern void libpq_executeFileMap(filemap_t *map);
-
-extern void libpqConnect(const char *connstr);
-extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void);
+extern rewind_source *init_libpq_source(PGconn *conn);
 
 /* in copy_fetch.c */
-extern void copy_executeFileMap(filemap_t *map);
-
-typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
-extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
+extern rewind_source *init_local_source(const char *datadir);
 
 #endif                            /* FETCH_H */
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index ec37d0b2e0..4ae343888e 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -15,6 +15,7 @@
 #include "postgres_fe.h"
 
 #include <sys/stat.h>
+#include <dirent.h>
 #include <fcntl.h>
 #include <unistd.h>
 
@@ -35,6 +36,9 @@ static void remove_target_dir(const char *path);
 static void create_target_symlink(const char *path, const char *link);
 static void remove_target_symlink(const char *path);
 
+static void recurse_dir(const char *datadir, const char *parentpath,
+                        process_file_callback_t callback);
+
 /*
  * Open a target file for writing. If 'trunc' is true and the file already
  * exists, it will be truncated.
@@ -305,9 +309,6 @@ sync_target_dir(void)
  * buffer is actually *filesize + 1. That's handy when reading a text file.
  * This function can be used to read binary files as well, you can just
  * ignore the zero-terminator in that case.
- *
- * This function is used to implement the fetchFile function in the "fetch"
- * interface (see fetch.c), but is also called directly.
  */
 char *
 slurpFile(const char *datadir, const char *path, size_t *filesize)
@@ -352,3 +353,125 @@ slurpFile(const char *datadir, const char *path, size_t *filesize)
         *filesize = len;
     return buffer;
 }
+
+/*
+ * Traverse through all files in a data directory, calling 'callback'
+ * for each file.
+ */
+void
+traverse_datadir(const char *datadir, process_file_callback_t callback)
+{
+    recurse_dir(datadir, NULL, callback);
+}
+
+/*
+ * recursive part of traverse_datadir
+ *
+ * parentpath is the current subdirectory's path relative to datadir,
+ * or NULL at the top level.
+ */
+static void
+recurse_dir(const char *datadir, const char *parentpath,
+            process_file_callback_t callback)
+{
+    DIR           *xldir;
+    struct dirent *xlde;
+    char        fullparentpath[MAXPGPATH];
+
+    if (parentpath)
+        snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
+    else
+        snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
+
+    xldir = opendir(fullparentpath);
+    if (xldir == NULL)
+        pg_fatal("could not open directory \"%s\": %m",
+                 fullparentpath);
+
+    while (errno = 0, (xlde = readdir(xldir)) != NULL)
+    {
+        struct stat fst;
+        char        fullpath[MAXPGPATH * 2];
+        char        path[MAXPGPATH * 2];
+
+        if (strcmp(xlde->d_name, ".") == 0 ||
+            strcmp(xlde->d_name, "..") == 0)
+            continue;
+
+        snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
+
+        if (lstat(fullpath, &fst) < 0)
+        {
+            if (errno == ENOENT)
+            {
+                /*
+                 * File doesn't exist anymore. This is ok, if the new primary
+                 * is running and the file was just removed. If it was a data
+                 * file, there should be a WAL record of the removal. If it
+                 * was something else, it couldn't have been anyway.
+                 *
+                 * TODO: But complain if we're processing the target dir!
+                 */
+            }
+            else
+                pg_fatal("could not stat file \"%s\": %m",
+                         fullpath);
+        }
+
+        if (parentpath)
+            snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
+        else
+            snprintf(path, sizeof(path), "%s", xlde->d_name);
+
+        if (S_ISREG(fst.st_mode))
+            callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
+        else if (S_ISDIR(fst.st_mode))
+        {
+            callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
+            /* recurse to handle subdirectories */
+            recurse_dir(datadir, path, callback);
+        }
+#ifndef WIN32
+        else if (S_ISLNK(fst.st_mode))
+#else
+        else if (pgwin32_is_junction(fullpath))
+#endif
+        {
+#if defined(HAVE_READLINK) || defined(WIN32)
+            char        link_target[MAXPGPATH];
+            int            len;
+
+            len = readlink(fullpath, link_target, sizeof(link_target));
+            if (len < 0)
+                pg_fatal("could not read symbolic link \"%s\": %m",
+                         fullpath);
+            if (len >= sizeof(link_target))
+                pg_fatal("symbolic link \"%s\" target is too long",
+                         fullpath);
+            link_target[len] = '\0';
+
+            callback(path, FILE_TYPE_SYMLINK, 0, link_target);
+
+            /*
+             * If it's a symlink within pg_tblspc, we need to recurse into it,
+             * to process all the tablespaces.  We also follow a symlink if
+             * it's for pg_wal.  Symlinks elsewhere are ignored.
+             */
+            if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
+                strcmp(path, "pg_wal") == 0)
+                recurse_dir(datadir, path, callback);
+#else
+            pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
+                     fullpath);
+#endif                            /* HAVE_READLINK */
+        }
+    }
+
+    if (errno)
+        pg_fatal("could not read directory \"%s\": %m",
+                 fullparentpath);
+
+    if (closedir(xldir))
+        pg_fatal("could not close directory \"%s\": %m",
+                 fullparentpath);
+}
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
index d8466385cf..c763085976 100644
--- a/src/bin/pg_rewind/file_ops.h
+++ b/src/bin/pg_rewind/file_ops.h
@@ -23,4 +23,7 @@ extern void sync_target_dir(void);
 
 extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
 
+typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
+extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
+
 #endif                            /* FILE_OPS_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 9c541bb73d..52c4e147e1 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * libpq_fetch.c
- *      Functions for fetching files from a remote server.
+ *      Functions for fetching files from a remote server via libpq.
  *
  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
  *
@@ -9,11 +9,6 @@
  */
 #include "postgres_fe.h"
 
-#include <sys/stat.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <unistd.h>
-
 #include "catalog/pg_type_d.h"
 #include "common/connect.h"
 #include "datapagemap.h"
@@ -23,8 +18,6 @@
 #include "pg_rewind.h"
 #include "port/pg_bswap.h"
 
-PGconn       *conn = NULL;
-
 /*
  * Files are fetched max CHUNKSIZE bytes at a time.
  *
@@ -34,30 +27,73 @@ PGconn       *conn = NULL;
  */
 #define CHUNKSIZE 1000000
 
-static void receiveFileChunks(const char *sql);
-static void execute_pagemap(datapagemap_t *pagemap, const char *path);
-static char *run_simple_query(const char *sql);
-static void run_simple_command(const char *sql);
+typedef struct
+{
+    rewind_source common;    /* common interface functions */
+
+    PGconn       *conn;
+} libpq_source;
+
+static void init_libpq_conn(PGconn *conn);
+static char *run_simple_query(PGconn *conn, const char *sql);
+static void run_simple_command(PGconn *conn, const char *sql);
+
+/* public interface functions */
+static void libpq_traverse_files(rewind_source *source,
+                                 process_file_callback_t callback);
+static char *libpq_fetch_file(rewind_source *source, const char *path,
+                              size_t *filesize);
+static void libpq_queue_fetch_range(rewind_source *source, const char *path,
+                                    uint64 off, size_t len);
+static void libpq_finish_fetch(rewind_source *source);
+static void libpq_destroy(rewind_source *source);
+static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
+
+/*
+ * Create a new libpq source.
+ *
+ * The caller has already established the connection, but should not try
+ * to use it while the source is active.
+ */
+rewind_source *
+init_libpq_source(PGconn *conn)
+{
+    libpq_source *src;
+
+    init_libpq_conn(conn);
+
+    src = pg_malloc0(sizeof(libpq_source));
+
+    src->common.traverse_files = libpq_traverse_files;
+    src->common.fetch_file = libpq_fetch_file;
+    src->common.queue_fetch_range = libpq_queue_fetch_range;
+    src->common.finish_fetch = libpq_finish_fetch;
+    src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
+    src->common.destroy = libpq_destroy;
+
+    src->conn = conn;
+
+    return &src->common;
+}
 
-void
-libpqConnect(const char *connstr)
+/*
+ * Initialize a libpq connection for use.
+ */
+static void
+init_libpq_conn(PGconn *conn)
 {
+    PGresult   *res;
     char       *str;
-    PGresult   *res;
-
-    conn = PQconnectdb(connstr);
-    if (PQstatus(conn) == CONNECTION_BAD)
-        pg_fatal("could not connect to server: %s",
-                 PQerrorMessage(conn));
-
-    if (showprogress)
-        pg_log_info("connected to server");
 
     /* disable all types of timeouts */
-    run_simple_command("SET statement_timeout = 0");
-    run_simple_command("SET lock_timeout = 0");
-    run_simple_command("SET idle_in_transaction_session_timeout = 0");
+    run_simple_command(conn, "SET statement_timeout = 0");
+    run_simple_command(conn, "SET lock_timeout = 0");
+    run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
 
+    /* we don't intend do any updates. Put the connection in read-only mode to keep us honest */
+    run_simple_command(conn, "SET default_transaction_read_only = off");
+
+    /* secure search_path */
     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     if (PQresultStatus(res) != PGRES_TUPLES_OK)
         pg_fatal("could not clear search_path: %s",
@@ -70,7 +106,7 @@ libpqConnect(const char *connstr)
      * currently because we use a temporary table. Better to check for it
      * explicitly than error out, for a better error message.
      */
-    str = run_simple_query("SELECT pg_is_in_recovery()");
+    str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
     if (strcmp(str, "f") != 0)
         pg_fatal("source server must not be in recovery mode");
     pg_free(str);
@@ -80,27 +116,31 @@ libpqConnect(const char *connstr)
      * a page is modified while we read it with pg_read_binary_file(), and we
      * rely on full page images to fix them.
      */
-    str = run_simple_query("SHOW full_page_writes");
+    str = run_simple_query(conn, "SHOW full_page_writes");
     if (strcmp(str, "on") != 0)
         pg_fatal("full_page_writes must be enabled in the source server");
     pg_free(str);
 
     /*
-     * Although we don't do any "real" updates, we do work with a temporary
-     * table. We don't care about synchronous commit for that. It doesn't
-     * otherwise matter much, but if the server is using synchronous
-     * replication, and replication isn't working for some reason, we don't
-     * want to get stuck, waiting for it to start working again.
+     * First create a temporary table, and COPY to load it with the list of
+     * blocks that we need to fetch.
      */
-    run_simple_command("SET synchronous_commit = off");
+    run_simple_command(conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+
+    res = PQexec(conn, "COPY fetchchunks FROM STDIN");
+    if (PQresultStatus(res) != PGRES_COPY_IN)
+        pg_fatal("could not send file list: %s",
+                 PQresultErrorMessage(res));
+    PQclear(res);
 }
 
 /*
- * Runs a query that returns a single value.
+ * Run a query that returns a single value.
+ *
  * The result should be pg_free'd after use.
  */
 static char *
-run_simple_query(const char *sql)
+run_simple_query(PGconn *conn, const char *sql)
 {
     PGresult   *res;
     char       *result;
@@ -123,11 +163,12 @@ run_simple_query(const char *sql)
 }
 
 /*
- * Runs a command.
+ * Run a command.
+ *
  * In the event of a failure, exit immediately.
  */
 static void
-run_simple_command(const char *sql)
+run_simple_command(PGconn *conn, const char *sql)
 {
     PGresult   *res;
 
@@ -141,17 +182,18 @@ run_simple_command(const char *sql)
 }
 
 /*
- * Calls pg_current_wal_insert_lsn() function
+ * Call the pg_current_wal_insert_lsn() function in the remote system.
  */
-XLogRecPtr
-libpqGetCurrentXlogInsertLocation(void)
+static XLogRecPtr
+libpq_get_current_wal_insert_lsn(rewind_source *source)
 {
+    PGconn       *conn = ((libpq_source *) source)->conn;
     XLogRecPtr    result;
     uint32        hi;
     uint32        lo;
     char       *val;
 
-    val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
+    val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
 
     if (sscanf(val, "%X/%X", &hi, &lo) != 2)
         pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
@@ -166,9 +208,10 @@ libpqGetCurrentXlogInsertLocation(void)
 /*
  * Get a list of all files in the data directory.
  */
-void
-libpqProcessFileList(void)
+static void
+libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
 {
+    PGconn *conn = ((libpq_source *) source)->conn;
     PGresult   *res;
     const char *sql;
     int            i;
@@ -246,6 +289,48 @@ libpqProcessFileList(void)
     PQclear(res);
 }
 
+/*
+ * Queue up a request to fetch a piece of a file from remote system.
+ */
+static void
+libpq_queue_fetch_range(rewind_source *source, const char *path, uint64 off,
+                        size_t len)
+{
+    libpq_source *src = (libpq_source *) source;
+    uint64        begin = off;
+    uint64        end = off + len;
+
+    /*
+     * Write the file range to a temporary table in the server.
+     *
+     * The range is sent to the server as a COPY formatted line, to be inserted
+     * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
+     * the temporary table to actually fetch the data.
+     */
+
+    /* Split the range into CHUNKSIZE chunks */
+    while (end - begin > 0)
+    {
+        char        linebuf[MAXPGPATH + 23];
+        unsigned int len;
+
+        /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
+        if (end - begin > CHUNKSIZE)
+            len = CHUNKSIZE;
+        else
+            len = (unsigned int) (end - begin);
+
+        begin += len;
+
+        snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+
+        if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
+            pg_fatal("could not send COPY data: %s",
+                     PQerrorMessage(src->conn));
+    }
+}
+
+
 /*----
  * Runs a query, which returns pieces of files from the remote source data
  * directory, and overwrites the corresponding parts of target files with
@@ -256,20 +341,46 @@ libpqProcessFileList(void)
  * chunk    bytea    -- file content
  *----
  */
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
 static void
-receiveFileChunks(const char *sql)
+libpq_finish_fetch(rewind_source *source)
 {
+    libpq_source *src = (libpq_source *) source;
     PGresult   *res;
+    const char *sql;
 
-    if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
-        pg_fatal("could not send query: %s", PQerrorMessage(conn));
+    if (PQputCopyEnd(src->conn, NULL) != 1)
+        pg_fatal("could not send end-of-COPY: %s",
+                 PQerrorMessage(src->conn));
+
+    while ((res = PQgetResult(src->conn)) != NULL)
+    {
+        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            pg_fatal("unexpected result while sending file list: %s",
+                     PQresultErrorMessage(res));
+        PQclear(res);
+    }
+
+    /*
+     * We've now copied the list of file ranges that we need to fetch to the
+     * temporary table. Now, actually fetch all of those ranges.
+     */
+    sql =
+        "SELECT path, begin,\n"
+        "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
+        "FROM fetchchunks\n";
+
+    if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+        pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
 
     pg_log_debug("getting file chunks");
 
-    if (PQsetSingleRowMode(conn) != 1)
+    if (PQsetSingleRowMode(src->conn) != 1)
         pg_fatal("could not set libpq connection to single row mode");
 
-    while ((res = PQgetResult(conn)) != NULL)
+    while ((res = PQgetResult(src->conn)) != NULL)
     {
         char       *filename;
         int            filenamelen;
@@ -363,28 +474,29 @@ receiveFileChunks(const char *sql)
 }
 
 /*
- * Receive a single file as a malloc'd buffer.
+ * Fetch a single file as a malloc'd buffer.
  */
-char *
-libpqGetFile(const char *filename, size_t *filesize)
+static char *
+libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
 {
+    PGconn       *conn = ((libpq_source *) source)->conn;
     PGresult   *res;
     char       *result;
     int            len;
     const char *paramValues[1];
 
-    paramValues[0] = filename;
+    paramValues[0] = path;
     res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
                        1, NULL, paramValues, NULL, NULL, 1);
 
     if (PQresultStatus(res) != PGRES_TUPLES_OK)
         pg_fatal("could not fetch remote file \"%s\": %s",
-                 filename, PQresultErrorMessage(res));
+                 path, PQresultErrorMessage(res));
 
     /* sanity check the result set */
     if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
         pg_fatal("unexpected result set while fetching remote file \"%s\"",
-                 filename);
+                 path);
 
     /* Read result to local variables */
     len = PQgetlength(res, 0, 0);
@@ -394,7 +506,7 @@ libpqGetFile(const char *filename, size_t *filesize)
 
     PQclear(res);
 
-    pg_log_debug("fetched file \"%s\", length %d", filename, len);
+    pg_log_debug("fetched file \"%s\", length %d", path, len);
 
     if (filesize)
         *filesize = len;
@@ -402,142 +514,11 @@ libpqGetFile(const char *filename, size_t *filesize)
 }
 
 /*
- * Write a file range to a temporary table in the server.
- *
- * The range is sent to the server as a COPY formatted line, to be inserted
- * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
- * function to actually fetch the data.
+ * Close a libpq source.
  */
 static void
-fetch_file_range(const char *path, uint64 begin, uint64 end)
+libpq_destroy(rewind_source *source)
 {
-    char        linebuf[MAXPGPATH + 23];
-
-    /* Split the range into CHUNKSIZE chunks */
-    while (end - begin > 0)
-    {
-        unsigned int len;
-
-        /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
-        if (end - begin > CHUNKSIZE)
-            len = CHUNKSIZE;
-        else
-            len = (unsigned int) (end - begin);
-
-        snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
-
-        if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
-            pg_fatal("could not send COPY data: %s",
-                     PQerrorMessage(conn));
-
-        begin += len;
-    }
-}
-
-/*
- * Fetch all changed blocks from remote source data directory.
- */
-void
-libpq_executeFileMap(filemap_t *map)
-{
-    file_entry_t *entry;
-    const char *sql;
-    PGresult   *res;
-    int            i;
-
-    /*
-     * First create a temporary table, and load it with the blocks that we
-     * need to fetch.
-     */
-    sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
-    run_simple_command(sql);
-
-    sql = "COPY fetchchunks FROM STDIN";
-    res = PQexec(conn, sql);
-
-    if (PQresultStatus(res) != PGRES_COPY_IN)
-        pg_fatal("could not send file list: %s",
-                 PQresultErrorMessage(res));
-    PQclear(res);
-
-    for (i = 0; i < map->nactions; i++)
-    {
-        entry = map->actions[i];
-
-        /* If this is a relation file, copy the modified blocks */
-        execute_pagemap(&entry->target_modified_pages, entry->path);
-
-        switch (entry->action)
-        {
-            case FILE_ACTION_NONE:
-                /* nothing else to do */
-                break;
-
-            case FILE_ACTION_COPY:
-                /* Truncate the old file out of the way, if any */
-                open_target_file(entry->path, true);
-                fetch_file_range(entry->path, 0, entry->source_size);
-                break;
-
-            case FILE_ACTION_TRUNCATE:
-                truncate_target_file(entry->path, entry->source_size);
-                break;
-
-            case FILE_ACTION_COPY_TAIL:
-                fetch_file_range(entry->path, entry->target_size, entry->source_size);
-                break;
-
-            case FILE_ACTION_REMOVE:
-                remove_target(entry);
-                break;
-
-            case FILE_ACTION_CREATE:
-                create_target(entry);
-                break;
-
-            case FILE_ACTION_UNDECIDED:
-                pg_fatal("no action decided for \"%s\"", entry->path);
-                break;
-        }
-    }
-
-    if (PQputCopyEnd(conn, NULL) != 1)
-        pg_fatal("could not send end-of-COPY: %s",
-                 PQerrorMessage(conn));
-
-    while ((res = PQgetResult(conn)) != NULL)
-    {
-        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-            pg_fatal("unexpected result while sending file list: %s",
-                     PQresultErrorMessage(res));
-        PQclear(res);
-    }
-
-    /*
-     * We've now copied the list of file ranges that we need to fetch to the
-     * temporary table. Now, actually fetch all of those ranges.
-     */
-    sql =
-        "SELECT path, begin,\n"
-        "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
-        "FROM fetchchunks\n";
-
-    receiveFileChunks(sql);
-}
-
-static void
-execute_pagemap(datapagemap_t *pagemap, const char *path)
-{
-    datapagemap_iterator_t *iter;
-    BlockNumber blkno;
-    off_t        offset;
-
-    iter = datapagemap_iterate(pagemap);
-    while (datapagemap_next(iter, &blkno))
-    {
-        offset = blkno * BLCKSZ;
-
-        fetch_file_range(path, offset, offset + BLCKSZ);
-    }
-    pg_free(iter);
+    pfree(source);
+    /* NOTE: we don't close the connection here, as it was not opened by us. */
 }
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 13b4eab52b..c5ee70272a 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -35,8 +35,8 @@ static void usage(const char *progname);
 static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
                               XLogRecPtr checkpointloc);
 
-static void digestControlFile(ControlFileData *ControlFile, char *source,
-                              size_t size);
+static void digestControlFile(ControlFileData *ControlFile,
+                              const char *content, size_t size);
 static void getRestoreCommand(const char *argv0);
 static void sanityChecks(void);
 static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@@ -69,6 +69,8 @@ int            targetNentries;
 uint64        fetch_size;
 uint64        fetch_done;
 
+static PGconn *conn;
+static rewind_source *source;
 
 static void
 usage(const char *progname)
@@ -269,19 +271,29 @@ main(int argc, char **argv)
 
     atexit(disconnect_atexit);
 
-    /* Connect to remote server */
-    if (connstr_source)
-        libpqConnect(connstr_source);
-
     /*
-     * Ok, we have all the options and we're ready to start. Read in all the
-     * information we need from both clusters.
+     * Ok, we have all the options and we're ready to start. First, connect
+     * to remote server.
      */
-    buffer = slurpFile(datadir_target, "global/pg_control", &size);
-    digestControlFile(&ControlFile_target, buffer, size);
-    pg_free(buffer);
+    if (connstr_source)
+    {
+        conn = PQconnectdb(connstr_source);
+
+        if (PQstatus(conn) == CONNECTION_BAD)
+            pg_fatal("could not connect to server: %s",
+                     PQerrorMessage(conn));
+
+        if (showprogress)
+            pg_log_info("connected to server");
+
+        source = init_libpq_source(conn);
+    }
+    else
+        source = init_local_source(datadir_source);
 
     /*
+     * Check the status of the target instance.
+     *
      * If the target instance was not cleanly shut down, start and stop the
      * target cluster once in single-user mode to enforce recovery to finish,
      * ensuring that the cluster can be used by pg_rewind.  Note that if
@@ -289,6 +301,10 @@ main(int argc, char **argv)
      * need to make sure by themselves that the target cluster is in a clean
      * state.
      */
+    buffer = slurpFile(datadir_target, "global/pg_control", &size);
+    digestControlFile(&ControlFile_target, buffer, size);
+    pg_free(buffer);
+
     if (!no_ensure_shutdown &&
         ControlFile_target.state != DB_SHUTDOWNED &&
         ControlFile_target.state != DB_SHUTDOWNED_IN_RECOVERY)
@@ -300,17 +316,20 @@ main(int argc, char **argv)
         pg_free(buffer);
     }
 
-    buffer = fetchFile("global/pg_control", &size);
+    buffer = source->fetch_file(source, "global/pg_control", &size);
     digestControlFile(&ControlFile_source, buffer, size);
     pg_free(buffer);
 
     sanityChecks();
 
     /*
+     * Find the common ancestor timeline between the clusters.
+     *
      * If both clusters are already on the same timeline, there's nothing to
      * do.
      */
-    if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID)
+    if (ControlFile_target.checkPointCopy.ThisTimeLineID ==
+        ControlFile_source.checkPointCopy.ThisTimeLineID)
     {
         pg_log_info("source and target cluster are on the same timeline");
         rewind_needed = false;
@@ -370,12 +389,12 @@ main(int argc, char **argv)
                 chkpttli);
 
     /*
-     * Collect information about all files in the target and source systems.
+     * Collect information about all files in the both data directories.
      */
     if (showprogress)
         pg_log_info("reading source file list");
     filemap_init();
-    fetchSourceFileList();
+    source->traverse_files(source, &process_source_file);
 
     if (showprogress)
         pg_log_info("reading target file list");
@@ -423,7 +442,7 @@ main(int argc, char **argv)
      * modified the target directory and there is no turning back!
      */
 
-    execute_file_actions(filemap);
+    execute_file_actions(filemap, source);
 
     progress_report(true);
 
@@ -443,7 +462,7 @@ main(int argc, char **argv)
 
     if (connstr_source)
     {
-        endrec = libpqGetCurrentXlogInsertLocation();
+        endrec = source->get_current_wal_insert_lsn(source);
         endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
     }
     else
@@ -465,6 +484,14 @@ main(int argc, char **argv)
         WriteRecoveryConfig(conn, datadir_target,
                             GenerateRecoveryConfig(conn, NULL));
 
+    /* don't need the source connection anymore */
+    source->destroy(source);
+    if (conn)
+    {
+        PQfinish(conn);
+        conn = NULL;
+    }
+
     pg_log_info("Done!");
 
     return 0;
@@ -627,7 +654,7 @@ getTimelineHistory(ControlFileData *controlFile, int *nentries)
 
         /* Get history file from appropriate source */
         if (controlFile == &ControlFile_source)
-            histfile = fetchFile(path, NULL);
+            histfile = source->fetch_file(source, path, NULL);
         else if (controlFile == &ControlFile_target)
             histfile = slurpFile(datadir_target, path, NULL);
         else
@@ -783,16 +810,17 @@ checkControlFile(ControlFileData *ControlFile)
 }
 
 /*
- * Verify control file contents in the buffer src, and copy it to *ControlFile.
+ * Verify control file contents in the buffer 'content', and copy it to *ControlFile.
  */
 static void
-digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
+digestControlFile(ControlFileData *ControlFile,
+                  const char *content, size_t size)
 {
     if (size != PG_CONTROL_FILE_SIZE)
         pg_fatal("unexpected control file size %d, expected %d",
                  (int) size, PG_CONTROL_FILE_SIZE);
 
-    memcpy(ControlFile, src, sizeof(ControlFileData));
+    memcpy(ControlFile, content, sizeof(ControlFileData));
 
     /* set and validate WalSegSz */
     WalSegSz = ControlFile->xlog_seg_size;
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
index 67f90c2a38..0dc3dbd525 100644
--- a/src/bin/pg_rewind/pg_rewind.h
+++ b/src/bin/pg_rewind/pg_rewind.h
@@ -20,8 +20,6 @@
 
 /* Configuration options */
 extern char *datadir_target;
-extern char *datadir_source;
-extern char *connstr_source;
 extern bool showprogress;
 extern bool dry_run;
 extern bool do_sync;
@@ -31,9 +29,6 @@ extern int    WalSegSz;
 extern TimeLineHistoryEntry *targetHistory;
 extern int    targetNentries;
 
-/* general state */
-extern PGconn *conn;
-
 /* Progress counters */
 extern uint64 fetch_size;
 extern uint64 fetch_done;
-- 
2.18.4

From 9575133ca5da2b2f6827828cb0c26c6122b328f9 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 19 Aug 2020 15:34:43 +0300
Subject: [PATCH v3 5/5] Allow pg_rewind to use a standby server as the source
 system.

Using a hot standby server as the source has not been possible, because
pg_rewind creates a temporary table in the source system, to hold the
list of file ranges that need to be fetched. Refactor it to queue up the
file fetch requests in pg_rewind's memory, so that the temporary table
is no longer needed.

Also update the logic to compute 'minRecoveryPoint' correctly, when the
source is a standby server.
---
 src/bin/pg_rewind/libpq_fetch.c | 281 ++++++++++++++++++++++----------
 src/bin/pg_rewind/pg_rewind.c   |  67 ++++++--
 2 files changed, 253 insertions(+), 95 deletions(-)

diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 52c4e147e1..f61a424299 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -15,39 +15,61 @@
 #include "fetch.h"
 #include "file_ops.h"
 #include "filemap.h"
+#include "lib/stringinfo.h"
 #include "pg_rewind.h"
 #include "port/pg_bswap.h"
 
 /*
- * Files are fetched max CHUNKSIZE bytes at a time.
- *
- * (This only applies to files that are copied in whole, or for truncated
- * files where we copy the tail. Relation files, where we know the individual
- * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
+ * Files are fetched max CHUNK_SIZE bytes at a time, and with a
+ * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
  */
-#define CHUNKSIZE 1000000
+#define CHUNK_SIZE (1024 * 1024)
+#define MAX_CHUNKS_PER_QUERY 1000
+
+/* represents the request to fetch a piece of a file from the source */
+typedef struct
+{
+    const char *path;        /* path relative to data directory root */
+    uint64        offset;
+    uint32        length;
+} fetch_range_request;
 
 typedef struct
 {
     rewind_source common;    /* common interface functions */
 
     PGconn       *conn;
+
+    /*
+     * Queue of chunks that have been requested with the queue_fetch_range()
+     * function, but have not been fetched from the remote server yet.
+     */
+    int            num_requests;
+    fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
+
+    /* temporary space for process_queued_fetch_requests() */
+    StringInfoData paths;
+    StringInfoData offsets;
+    StringInfoData lengths;
 } libpq_source;
 
 static void init_libpq_conn(PGconn *conn);
 static char *run_simple_query(PGconn *conn, const char *sql);
 static void run_simple_command(PGconn *conn, const char *sql);
+static void appendArrayEscapedString(StringInfo buf, const char *str);
+
+static void process_queued_fetch_requests(libpq_source *src);
 
 /* public interface functions */
 static void libpq_traverse_files(rewind_source *source,
                                  process_file_callback_t callback);
-static char *libpq_fetch_file(rewind_source *source, const char *path,
-                              size_t *filesize);
 static void libpq_queue_fetch_range(rewind_source *source, const char *path,
                                     uint64 off, size_t len);
 static void libpq_finish_fetch(rewind_source *source);
-static void libpq_destroy(rewind_source *source);
+static char *libpq_fetch_file(rewind_source *source, const char *path,
+                              size_t *filesize);
 static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
+static void libpq_destroy(rewind_source *source);
 
 /*
  * Create a new libpq source.
@@ -73,6 +95,10 @@ init_libpq_source(PGconn *conn)
 
     src->conn = conn;
 
+    initStringInfo(&src->paths);
+    initStringInfo(&src->offsets);
+    initStringInfo(&src->lengths);
+
     return &src->common;
 }
 
@@ -90,7 +116,10 @@ init_libpq_conn(PGconn *conn)
     run_simple_command(conn, "SET lock_timeout = 0");
     run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
 
-    /* we don't intend do any updates. Put the connection in read-only mode to keep us honest */
+    /*
+     * We don't intend do any updates.  Put the connection in read-only mode
+     * to keep us honest.
+     */
     run_simple_command(conn, "SET default_transaction_read_only = off");
 
     /* secure search_path */
@@ -100,17 +129,6 @@ init_libpq_conn(PGconn *conn)
                  PQresultErrorMessage(res));
     PQclear(res);
 
-    /*
-     * Check that the server is not in hot standby mode. There is no
-     * fundamental reason that couldn't be made to work, but it doesn't
-     * currently because we use a temporary table. Better to check for it
-     * explicitly than error out, for a better error message.
-     */
-    str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
-    if (strcmp(str, "f") != 0)
-        pg_fatal("source server must not be in recovery mode");
-    pg_free(str);
-
     /*
      * Also check that full_page_writes is enabled.  We can get torn pages if
      * a page is modified while we read it with pg_read_binary_file(), and we
@@ -121,15 +139,15 @@ init_libpq_conn(PGconn *conn)
         pg_fatal("full_page_writes must be enabled in the source server");
     pg_free(str);
 
-    /*
-     * First create a temporary table, and COPY to load it with the list of
-     * blocks that we need to fetch.
-     */
-    run_simple_command(conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+    /* Prepare a statement we'll use to fetch files */
+    res = PQprepare(conn, "fetch_chunks_stmt",
+                    "SELECT path, begin,\n"
+                    "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
+                    "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
+                    3, NULL);
 
-    res = PQexec(conn, "COPY fetchchunks FROM STDIN");
-    if (PQresultStatus(res) != PGRES_COPY_IN)
-        pg_fatal("could not send file list: %s",
+    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+        pg_fatal("could not prepare statement to fetch file contents: %s",
                  PQresultErrorMessage(res));
     PQclear(res);
 }
@@ -297,91 +315,143 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, uint64 off,
                         size_t len)
 {
     libpq_source *src = (libpq_source *) source;
-    uint64        begin = off;
-    uint64        end = off + len;
 
     /*
-     * Write the file range to a temporary table in the server.
+     * Does this request happen to be a continuation of the previous chunk?
+     * If so, merge it with the previous one.
      *
-     * The range is sent to the server as a COPY formatted line, to be inserted
-     * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
-     * the temporary table to actually fetch the data.
+     * XXX: We use pointer equality to compare the path. That's good enough
+     * for our purposes; the caller always passes the same pointer for the
+     * same filename. If it didn't, we would fail to merge requests, but it
+     * wouldn't affect correctness.
      */
-
-    /* Split the range into CHUNKSIZE chunks */
-    while (end - begin > 0)
+    if (src->num_requests > 0)
     {
-        char        linebuf[MAXPGPATH + 23];
-        unsigned int len;
+        fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
+
+        if (prev->offset + prev->length == off &&
+            prev->length < CHUNK_SIZE &&
+            prev->path == path)
+        {
+            /*
+             * Extend the previous request to cover as much of this new request
+             * as possible, without exceeding CHUNK_SIZE.
+             */
+            int32        thislen;
+
+            thislen = Min(len, CHUNK_SIZE - prev->length);
+            src->request_queue[src->num_requests - 1].length += thislen;
 
-        /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
-        if (end - begin > CHUNKSIZE)
-            len = CHUNKSIZE;
-        else
-            len = (unsigned int) (end - begin);
+            off += thislen;
+            len -= thislen;
+
+            /*
+             * Fall through to create new requests for any remaining 'len' that
+             * didn't fit in the previous chunk.
+             */
+        }
+    }
+
+    /* Divide the request into pieces of CHUNK_SIZE bytes each */
+    while (len > 0)
+    {
+        int32        thislen;
 
-        begin += len;
+        /* if the queue is full, perform all the work queued up so far */
+        if (src->num_requests == MAX_CHUNKS_PER_QUERY)
+            process_queued_fetch_requests(src);
 
-        snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+        thislen = Min(len, CHUNK_SIZE);
+        src->request_queue[src->num_requests].path = path;
+        src->request_queue[src->num_requests].offset = off;
+        src->request_queue[src->num_requests].length = thislen;
+        src->num_requests++;
 
-        if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
-            pg_fatal("could not send COPY data: %s",
-                     PQerrorMessage(src->conn));
+        off += thislen;
+        len -= thislen;
     }
 }
 
-
-/*----
- * Runs a query, which returns pieces of files from the remote source data
- * directory, and overwrites the corresponding parts of target files with
- * the received parts. The result set is expected to be of format:
- *
- * path        text    -- path in the data directory, e.g "base/1/123"
- * begin    int8    -- offset within the file
- * chunk    bytea    -- file content
- *----
- */
 /*
- * Receive all the queued chunks and write them to the target data directory.
+ * Fetche all the queued chunks and writes them to the target data directory.
  */
 static void
 libpq_finish_fetch(rewind_source *source)
 {
-    libpq_source *src = (libpq_source *) source;
+    process_queued_fetch_requests((libpq_source *) source);
+}
+
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
+static void
+process_queued_fetch_requests(libpq_source *src)
+{
+    const char *params[3];
     PGresult   *res;
-    const char *sql;
+    int            chunkno;
 
-    if (PQputCopyEnd(src->conn, NULL) != 1)
-        pg_fatal("could not send end-of-COPY: %s",
-                 PQerrorMessage(src->conn));
+    if (src->num_requests == 0)
+        return;
 
-    while ((res = PQgetResult(src->conn)) != NULL)
+    pg_log_debug("getting %d file chunks", src->num_requests);
+
+    /*
+     * The prepared statement, 'fetch_chunks_stmt', takes three arrays
+     * with the same length as parameters: paths, offsets and lengths.
+     * Construct the string representations of the parameter arrays.
+     */
+    resetStringInfo(&src->paths);
+    resetStringInfo(&src->offsets);
+    resetStringInfo(&src->lengths);
+
+    appendStringInfoChar(&src->paths, '{');
+    appendStringInfoChar(&src->offsets, '{');
+    appendStringInfoChar(&src->lengths, '{');
+    for (int i = 0; i < src->num_requests; i++)
     {
-        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-            pg_fatal("unexpected result while sending file list: %s",
-                     PQresultErrorMessage(res));
-        PQclear(res);
+        fetch_range_request *rq = &src->request_queue[i];
+
+        if (i > 0)
+        {
+            appendStringInfoChar(&src->paths, ',');
+            appendStringInfoChar(&src->offsets, ',');
+            appendStringInfoChar(&src->lengths, ',');
+        }
+
+        appendArrayEscapedString(&src->paths, rq->path);
+        appendStringInfo(&src->offsets, INT64_FORMAT, rq->offset);
+        appendStringInfo(&src->lengths, "%d", rq->length);
     }
+    appendStringInfoChar(&src->paths, '}');
+    appendStringInfoChar(&src->offsets, '}');
+    appendStringInfoChar(&src->lengths, '}');
 
     /*
-     * We've now copied the list of file ranges that we need to fetch to the
-     * temporary table. Now, actually fetch all of those ranges.
+     * Execute the prepared statement.
      */
-    sql =
-        "SELECT path, begin,\n"
-        "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
-        "FROM fetchchunks\n";
+    params[0] = src->paths.data;
+    params[1] = src->offsets.data;
+    params[2] = src->lengths.data;
 
-    if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+    if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
         pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
 
-    pg_log_debug("getting file chunks");
-
     if (PQsetSingleRowMode(src->conn) != 1)
         pg_fatal("could not set libpq connection to single row mode");
 
+    /*----
+     * The result set is of format:
+     *
+     * path        text    -- path in the data directory, e.g "base/1/123"
+     * begin    int8    -- offset within the file
+     * chunk    bytea    -- file content
+     *----
+     */
+    chunkno = 0;
     while ((res = PQgetResult(src->conn)) != NULL)
     {
+        fetch_range_request *rq = &src->request_queue[chunkno];
         char       *filename;
         int            filenamelen;
         int64        chunkoff;
@@ -402,6 +472,9 @@ libpq_finish_fetch(rewind_source *source)
                          PQresultErrorMessage(res));
         }
 
+        if (chunkno > src->num_requests)
+            pg_fatal("received more data chunks than requested");
+
         /* sanity check the result set */
         if (PQnfields(res) != 3 || PQntuples(res) != 1)
             pg_fatal("unexpected result set size while fetching remote files");
@@ -446,9 +519,7 @@ libpq_finish_fetch(rewind_source *source)
          * If a file has been deleted on the source, remove it on the target
          * as well.  Note that multiple unlink() calls may happen on the same
          * file if multiple data chunks are associated with it, hence ignore
-         * unconditionally anything missing.  If this file is not a relation
-         * data file, then it has been already truncated when creating the
-         * file chunk list at the previous execution of the filemap.
+         * unconditionally anything missing.
          */
         if (PQgetisnull(res, 0, 2))
         {
@@ -463,14 +534,54 @@ libpq_finish_fetch(rewind_source *source)
         pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
                      filename, (long long int) chunkoff, chunksize);
 
+        if (strcmp(filename, rq->path) != 0)
+        {
+            pg_fatal("received data for file \"%s\", when requested for \"%s\"",
+                     filename, rq->path);
+        }
+        if (chunkoff != rq->offset)
+            pg_fatal("received data at offset " UINT64_FORMAT" of file \"%s\", when requested for offset "
UINT64_FORMAT,
+                     chunkoff, rq->path, rq->offset);
+        if (chunksize > rq->length)
+        {
+            pg_fatal("received more than requested for file \"%s\"",
+                     rq->path);
+            /* receiving less is OK, though */
+        }
+
         open_target_file(filename, false);
-
         write_target_range(chunk, chunkoff, chunksize);
 
         pg_free(filename);
 
         PQclear(res);
+        chunkno++;
     }
+    if (chunkno != src->num_requests)
+        pg_fatal("unexpected number of data chunks received");
+
+    src->num_requests = 0;
+}
+
+/*
+ * Escape a string to be used as element in a text array constant
+ */
+static void
+appendArrayEscapedString(StringInfo buf, const char *str)
+{
+    appendStringInfoCharMacro(buf, '\"');
+    while (*str)
+    {
+        char        ch = *str;
+
+        if (ch == '"' || ch == '\\')
+            appendStringInfoCharMacro(buf, '\\');
+
+        appendStringInfoCharMacro(buf, ch);
+
+        str++;
+    }
+    appendStringInfoCharMacro(buf, '\"');
 }
 
 /*
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index c5ee70272a..e4e773deeb 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -45,6 +45,7 @@ static void disconnect_atexit(void);
 
 static ControlFileData ControlFile_target;
 static ControlFileData ControlFile_source;
+static ControlFileData ControlFile_source_after;
 
 const char *progname;
 int            WalSegSz;
@@ -446,30 +447,76 @@ main(int argc, char **argv)
 
     progress_report(true);
 
+    /*
+     * Fetch the control file from the source last. This ensures that the
+     * minRecoveryPoint is up-to-date.
+     */
+    buffer = source->fetch_file(source, "global/pg_control", &size);
+    digestControlFile(&ControlFile_source_after, buffer, size);
+    pg_free(buffer);
+
+    /*
+     * Sanity check: If the source is a local system, the control file should
+     * not have changed since we started.
+     *
+     * XXX: We assume it hasn't been modified, but actually, what could go
+     * wrong? The logic handles a libpq source that's modified concurrently,
+     * why not a local datadir?
+     */
+    if (datadir_source &&
+        memcmp(&ControlFile_source, &ControlFile_source_after,
+               sizeof(ControlFileData)) != 0)
+    {
+        pg_fatal("source system was modified while pg_rewind was running");
+    }
+
     if (showprogress)
         pg_log_info("creating backup label and updating control file");
     createBackupLabel(chkptredo, chkpttli, chkptrec);
 
     /*
      * Update control file of target. Make it ready to perform archive
-     * recovery when restarting.
+     * recovery when restarting, starting from the last common checkpoint.
      *
-     * minRecoveryPoint is set to the current WAL insert location in the
-     * source server. Like in an online backup, it's important that we recover
-     * all the WAL that was generated while we copied the files over.
+     * Like in an online backup, it's important that we replay all the WAL
+     * that was generated while we copied the files over. To enforce that,
+     * set 'minRecoveryPoint' in the control file.
      */
-    memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
-
     if (connstr_source)
     {
-        endrec = source->get_current_wal_insert_lsn(source);
-        endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+        if (ControlFile_source_after.state == DB_IN_ARCHIVE_RECOVERY)
+        {
+            /*
+             * Source is a standby server. We must replay to its
+             * minRecoveryPoint.
+             */
+            endrec = ControlFile_source_after.minRecoveryPoint;
+            endtli = ControlFile_source_after.minRecoveryPointTLI;
+        }
+        else
+        {
+            /*
+             * Source is a production, non-standby, server. We must recover up
+             * to the last WAL insert location.
+             */
+            if (ControlFile_source_after.state != DB_IN_PRODUCTION)
+                pg_fatal("source system was in unexpected state at end of rewind");
+
+            endrec = source->get_current_wal_insert_lsn(source);
+            endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
+        }
     }
     else
     {
-        endrec = ControlFile_source.checkPoint;
-        endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+        /*
+         * Source is a local data directory. It should've shut down cleanly,
+         * and we must to the latest shutdown checkpoint.
+         */
+        endrec = ControlFile_source_after.checkPoint;
+        endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
     }
+
+    memcpy(&ControlFile_new, &ControlFile_source_after, sizeof(ControlFileData));
     ControlFile_new.minRecoveryPoint = endrec;
     ControlFile_new.minRecoveryPointTLI = endtli;
     ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
-- 
2.18.4


Re: Refactor pg_rewind code and make it work against a standby

From
Soumyadeep Chakraborty
Date:
Hey Heikki,

Thanks for refactoring and making the code much easier to read!

Before getting into the code review for the patch, I wanted to know why
we don't use a Bitmapset for target_modified_pages?

Code review:


1. We need to update the comments for process_source_file and
process_target_file. We don't decide the action on the file until later.


2. Rename target_modified_pages to target_pages_to_overwrite?
target_modified_pages can lead to confusion as to whether it includes pages
that were modified on the target but not even present in the source and
the other exclusionary cases. target_pages_to_overwrite is much clearer.


3.

> /*
>  * If this is a relation file, copy the modified blocks.
>  *
>  * This is in addition to any other changes.
>  */
> iter = datapagemap_iterate(&entry->target_modified_pages);
> while (datapagemap_next(iter, &blkno))
> {
> offset = blkno * BLCKSZ;
>
> source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
> }
> pg_free(iter);

Can we put this hunk into a static function overwrite_pages()?


4.

> * block that have changed in the target system.  It makes note of all the
> * changed blocks in the pagemap of the file.

Can we replace the above with:

> * block that has changed in the target system.  It decides if the given
blkno in the target relfile needs to be overwritten from the source.


5.

> /*
>  * Doesn't exist in either server. Why does it have an entry in the
>  * first place??
>  */
> return FILE_ACTION_NONE;

Can we delete the above hunk and add the following assert to the very
top of decide_file_action():

Assert(entry->target_exists || entry->source_exists);


6.

> pg_fatal("unexpected page modification for directory or symbolic link \"%s\"",
> entry->path);

Can we replace above with:

pg_fatal("unexpected page modification for non-regular file \"%s\"",
entry->path);

This way we can address the undefined file type.


7. Please address the FIXME for the symlink case:
/* FIXME: Check if it points to the same target? */


8.

* it anyway. But there's no harm in copying it now.)

and

* copy them here. But we don't know which scenario we're
* dealing with, and there's no harm in copying the missing
* blocks now, so do it now.

Could you add a line or two explaining why there is "no harm" in these
two hunks above?


9. Can we add pg_control, /pgsql_tmp/... and .../pgsql_tmp.* and PG_VERSION
files to check_file_excluded()?


10.

- * block that have changed in the target system. It makes note of all the
+ * block that have changed in the target system.  It makes note of all the

Whitespace typo


11.

> * If the block is beyond the EOF in the source system, or the file doesn't
> * doesn'exist

Typo: Two doesnt's


12.

> /*
>  * This represents the final decisions on what to do with each file.
>  * 'actions' array contains an entry for each file, sorted in the order
>  * that their actions should executed.
>  */
> typedef struct filemap_t
> {
> /* Summary information, filled by calculate_totals() */
> uint64 total_size; /* total size of the source cluster */
> uint64 fetch_size; /* number of bytes that needs to be copied */
>
> int nactions; /* size of 'actions' array */
> file_entry_t *actions[FLEXIBLE_ARRAY_MEMBER];
> } filemap_t;

Replace nactions/actions with nentries/entries..clearer in intent as
it is easier to reconcile the modified pages stuff to an entry rather
than an action. It could be:

/*
 * This contains the final decisions on what to do with each file.
 * 'entries' array contains an entry for each file, sorted in the order
 * that their actions should executed.
 */
typedef struct filemap_t
{
/* Summary information, filled by calculate_totals() */
uint64 total_size; /* total size of the source cluster */
uint64 fetch_size; /* number of bytes that needs to be copied */
int nentries; /* size of 'entries' array */
file_entry_t *entries[FLEXIBLE_ARRAY_MEMBER];
} filemap_t;


13.

> filehash = filehash_create(1000, NULL);

Use a constant for the 1000 in (FILEMAP_INITIAL_SIZE):


14. queue_overwrite_range(), finish_overwrite() instead of
queue_fetch_range(), finish_fetch()? Similarly update\
*_fetch_file_range() and *_finish_fetch()


15. Let's have local_source.c and libpq_source.c instead of *_fetch.c


16.

> conn = PQconnectdb(connstr_source);
>
> if (PQstatus(conn) == CONNECTION_BAD)
> pg_fatal("could not connect to server: %s",
> PQerrorMessage(conn));
>
> if (showprogress)
> pg_log_info("connected to server");


The above hunk should be part of init_libpq_source(). Consequently,
init_libpq_source() should take a connection string instead of a conn.


17.

> if (conn)
> {
> PQfinish(conn);
> conn = NULL;
> }

The hunk above should be part of libpq_destroy()


18.

> /*
>  * Files are fetched max CHUNK_SIZE bytes at a time, and with a
>  * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
>  */
> #define CHUNK_SIZE (1024 * 1024)

Can we rename CHUNK_SIZE to MAX_CHUNK_SIZE and update the comment?


19.

> typedef struct
> {
> const char *path; /* path relative to data directory root */
> uint64 offset;
> uint32 length;
> } fetch_range_request;

offset should be of type off_t

20.

> * Request to fetch (part of) a file in the source system, and write it
> * the corresponding file in the target system.

Can we change the above hunk to?

> * Request to fetch (part of) a file in the source system, specified
> * by an offset and length, and write it to the same offset in the
> * corresponding target file.


21.

> * Fetche all the queued chunks and writes them to the target data directory.

Typo in word "fetch".


Regards,
Soumyadeep



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
Thanks for the review! I'll post a new version shortly, with your 
comments incorporated. More detailed response to a few of them below:

On 18/09/2020 10:41, Kyotaro Horiguchi wrote:
> I don't think filemap_finalize needs to iterate over filemap twice.

True, but I thought it's more clear that way, doing one thing at a time.

> hash_string_pointer is a copy of that of pg_verifybackup.c. Is it
> worth having in hashfn.h or .c?

I think it's fine for now. Maybe in the future if more copies crop up.

>> --- a/src/bin/pg_rewind/pg_rewind.c
>> +++ b/src/bin/pg_rewind/pg_rewind.c
>> ...
>> +    filemap_t  *filemap;
>> ..
>> +    filemap_init();
>> ...
>> +    filemap = filemap_finalize();
> 
> I'm a bit confused by this, and realized that what filemap_init
> initializes is *not* the filemap, but the filehash. So for example,
> the name of the functions might should be something like this?
> 
> filehash_init()
> filemap = filehash_finalyze()/create_filemap()

My thinking was that filemap_* is the prefix for the operations in 
filemap.c, hence filemap_init(). I can see the confusion, though, and I 
think you're right that renaming would be good. I renamed them to 
filehash_init(), and decide_file_actions(). I think those names make the 
calling code in pg_rewind.c quite clear.

>> /*
>>   * Also check that full_page_writes is enabled.  We can get torn pages if
>>   * a page is modified while we read it with pg_read_binary_file(), and we
>>   * rely on full page images to fix them.
>>   */
>> str = run_simple_query(conn, "SHOW full_page_writes");
>> if (strcmp(str, "on") != 0)
>>     pg_fatal("full_page_writes must be enabled in the source server");
>> pg_free(str);
> 
> This is a part not changed by this patch set. But If we allow to
> connect to a standby, this check can be tricked by setting off on the
> primary and "on" on the standby (FWIW, though). Some protection
> measure might be necessary.

Good point, the value in the primary is what matters. In fact, even when 
connected to the primary, the value might change while pg_rewind is 
running. I'm not sure how we could tighten that up.

> +        if (chunksize > rq->length)
> +        {
> +            pg_fatal("received more than requested for file \"%s\"",
> +                     rq->path);
> +            /* receiving less is OK, though */
> 
> Don't we need to truncate the target file, though?

If a file is truncated in the source while pg_rewind is running, there 
should be a WAL record about the truncation that gets replayed when you 
start the server after pg_rewind has finished. We could truncate the 
file if we wanted to, but it's not necessary. I'll add a comment.

- Heikki



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
On 20/09/2020 23:44, Soumyadeep Chakraborty wrote:
> Before getting into the code review for the patch, I wanted to know why
> we don't use a Bitmapset for target_modified_pages?

Bitmapset is not available in client code. Perhaps it could be moved to 
src/common with some changes, but doesn't seem worth it until there's 
more client code that would need it.

I'm not sure that a bitmap is the best data structure for tracking the 
changed blocks in the first place. A hash table might be better if there 
are only a few changed blocks, or something like 
src/backend/lib/integerset.c if there are many. But as long as the 
simple bitmap gets the job done, let's keep it simple.

> 2. Rename target_modified_pages to target_pages_to_overwrite?
> target_modified_pages can lead to confusion as to whether it includes pages
> that were modified on the target but not even present in the source and
> the other exclusionary cases. target_pages_to_overwrite is much clearer.

Agreed, I'll rename it.

Conceptually, while we're scanning source WAL, we're just making note of 
the modified blocks. The decision on what to do with them happens only 
later, in decide_file_action(). The difference is purely theoretical, 
though. There is no real decision to be made, all the modified blocks 
will be overwritten. So on the whole, I agree 'target_page_to_overwrite' 
is better.

>> /*
>>   * If this is a relation file, copy the modified blocks.
>>   *
>>   * This is in addition to any other changes.
>>   */
>> iter = datapagemap_iterate(&entry->target_modified_pages);
>> while (datapagemap_next(iter, &blkno))
>> {
>> offset = blkno * BLCKSZ;
>>
>> source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
>> }
>> pg_free(iter);
> 
> Can we put this hunk into a static function overwrite_pages()?

Meh, it's only about 10 lines of code, and one caller.

> 4.
> 
>> * block that have changed in the target system.  It makes note of all the
>> * changed blocks in the pagemap of the file.
> 
> Can we replace the above with:
> 
>> * block that has changed in the target system.  It decides if the given
> blkno in the target relfile needs to be overwritten from the source.

Ok. Again conceptually though, process_target_wal_block_change() just 
collects information, and the decisions are made later. But you're right 
that we do leave out truncated-away blocks here, so we are doing more 
than just noting all the changed blocks.

>> /*
>>   * Doesn't exist in either server. Why does it have an entry in the
>>   * first place??
>>   */
>> return FILE_ACTION_NONE;
> 
> Can we delete the above hunk and add the following assert to the very
> top of decide_file_action():
> 
> Assert(entry->target_exists || entry->source_exists);

I would like to keep the check even when assertions are not enabled. 
I'll add an Assert(false) there.

> 7. Please address the FIXME for the symlink case:
> /* FIXME: Check if it points to the same target? */

It's not a new issue. Would be nice to fix, of course. I'm not sure what 
the right thing to do would be. If you have e.g. replaced 
postgresql.conf with a symlink that points outside the data directory, 
would it be appropriate to overwrite it? Or perhaps we should throw an 
error? We also throw an error if a file is a symlink in the source but a 
regular file in the target, or vice versa.

> 8.
> 
> * it anyway. But there's no harm in copying it now.)
> 
> and
> 
> * copy them here. But we don't know which scenario we're
> * dealing with, and there's no harm in copying the missing
> * blocks now, so do it now.
> 
> Could you add a line or two explaining why there is "no harm" in these
> two hunks above?

The previous sentences explain that there's a WAL record covering them. 
So they will be overwritten by WAL replay anyway. Does it need more 
explanation?

> 14. queue_overwrite_range(), finish_overwrite() instead of
> queue_fetch_range(), finish_fetch()? Similarly update\
> *_fetch_file_range() and *_finish_fetch()
> 
> 
> 15. Let's have local_source.c and libpq_source.c instead of *_fetch.c

Good idea! And fetch.h -> rewind_source.h.

I also moved the code in execute_file_actions() function to pg_rewind.c, 
into a new function: perform_rewind(). It felt a bit silly to have just 
execute_file_actions() in a file of its own. perform_rewind() now does 
all the modifications to the data directory, writing the backup file. 
Except for writing the recovery config: that also needs to be when 
there's no rewind to do, so it makes sense to keep it separate. What do 
you think?

> 16.
> 
>> conn = PQconnectdb(connstr_source);
>>
>> if (PQstatus(conn) == CONNECTION_BAD)
>> pg_fatal("could not connect to server: %s",
>> PQerrorMessage(conn));
>>
>> if (showprogress)
>> pg_log_info("connected to server");
> 
> 
> The above hunk should be part of init_libpq_source(). Consequently,
> init_libpq_source() should take a connection string instead of a conn.

The libpq connection is also needed by WriteRecoveryConfig(), that's why 
it's not fully encapsulated in libpq_source.

> 19.
> 
>> typedef struct
>> {
>> const char *path; /* path relative to data directory root */
>> uint64 offset;
>> uint32 length;
>> } fetch_range_request;
> 
> offset should be of type off_t

The 'offset' argument to the queue_fetch_range function is uint64, and 
the argument to the SQL-callable pg_read_binary_file() isint8, so it's 
consistent with them. Then again, the 'len' argument to 
queue_fetch_range() is a size_t, and to pg_read_binary_file() int8, so 
it's not fully consistent with that either. I'll try to make it more 
consistent.

Thanks for the review! Attached is a new version of the patch set.

- Heikki

Attachment

Re: Refactor pg_rewind code and make it work against a standby

From
Soumyadeep Chakraborty
Date:
On Thu, Sep 24, 2020 at 10:27 AM Heikki Linnakangas <hlinnaka@iki.fi> wrote:
> >> /*
> >>   * If this is a relation file, copy the modified blocks.
> >>   *
> >>   * This is in addition to any other changes.
> >>   */
> >> iter = datapagemap_iterate(&entry->target_modified_pages);
> >> while (datapagemap_next(iter, &blkno))
> >> {
> >> offset = blkno * BLCKSZ;
> >>
> >> source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
> >> }
> >> pg_free(iter);
> >
> > Can we put this hunk into a static function overwrite_pages()?
>
> Meh, it's only about 10 lines of code, and one caller.

Fair.

>
> > 7. Please address the FIXME for the symlink case:
> > /* FIXME: Check if it points to the same target? */
>
> It's not a new issue. Would be nice to fix, of course. I'm not sure what
> the right thing to do would be. If you have e.g. replaced
> postgresql.conf with a symlink that points outside the data directory,
> would it be appropriate to overwrite it? Or perhaps we should throw an
> error? We also throw an error if a file is a symlink in the source but a
> regular file in the target, or vice versa.
>

Hmm, I can imagine a use case for 2 different symlink targets on the
source and target clusters. For example the primary's pg_wal directory
can have a different symlink target as compared to a standby's
(different mount points on the same network maybe?). An end user might
not desire pg_rewind meddling with that setup or may desire pg_rewind to
treat the source as a source-of-truth with respect to this as well and
would want pg_rewind to overwrite the target's symlink. Maybe doing a
check and emitting a warning with hint/detail is prudent here while
taking no action.


> > 8.
> >
> > * it anyway. But there's no harm in copying it now.)
> >
> > and
> >
> > * copy them here. But we don't know which scenario we're
> > * dealing with, and there's no harm in copying the missing
> > * blocks now, so do it now.
> >
> > Could you add a line or two explaining why there is "no harm" in these
> > two hunks above?
>
> The previous sentences explain that there's a WAL record covering them.
> So they will be overwritten by WAL replay anyway. Does it need more
> explanation?

Yeah you are right, that is reason enough.

> > 14. queue_overwrite_range(), finish_overwrite() instead of
> > queue_fetch_range(), finish_fetch()? Similarly update\
> > *_fetch_file_range() and *_finish_fetch()
> >
> >
> > 15. Let's have local_source.c and libpq_source.c instead of *_fetch.c
>
> Good idea! And fetch.h -> rewind_source.h.

+1. You might have missed the changes to rename "fetch" -> "overwrite"
as was mentioned in 14.

>
> I also moved the code in execute_file_actions() function to pg_rewind.c,
> into a new function: perform_rewind(). It felt a bit silly to have just
> execute_file_actions() in a file of its own. perform_rewind() now does
> all the modifications to the data directory, writing the backup file.
> Except for writing the recovery config: that also needs to be when
> there's no rewind to do, so it makes sense to keep it separate. What do
> you think?

I don't mind inlining that functionality into perform_rewind(). +1.
Heads up: The function declaration for execute_file_actions() is still
there in rewind_source.h.

> > 16.
> >
> >> conn = PQconnectdb(connstr_source);
> >>
> >> if (PQstatus(conn) == CONNECTION_BAD)
> >> pg_fatal("could not connect to server: %s",
> >> PQerrorMessage(conn));
> >>
> >> if (showprogress)
> >> pg_log_info("connected to server");
> >
> >
> > The above hunk should be part of init_libpq_source(). Consequently,
> > init_libpq_source() should take a connection string instead of a conn.
>
> The libpq connection is also needed by WriteRecoveryConfig(), that's why
> it's not fully encapsulated in libpq_source.

Ah. I find it pretty weird why we need to specify --source-server to
have ----write-recovery-conf work. From the code, we only need the conn
for calling PQserverVersion(), something we can easily get by slurping
pg_controldata on the source side? Maybe we can remove this limitation?

Regards,
Soumyadeep (VMware)



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
On 25/09/2020 02:56, Soumyadeep Chakraborty wrote:
> On Thu, Sep 24, 2020 at 10:27 AM Heikki Linnakangas <hlinnaka@iki.fi> wrote:
>>> 7. Please address the FIXME for the symlink case:
>>> /* FIXME: Check if it points to the same target? */
>>
>> It's not a new issue. Would be nice to fix, of course. I'm not sure what
>> the right thing to do would be. If you have e.g. replaced
>> postgresql.conf with a symlink that points outside the data directory,
>> would it be appropriate to overwrite it? Or perhaps we should throw an
>> error? We also throw an error if a file is a symlink in the source but a
>> regular file in the target, or vice versa.
> 
> Hmm, I can imagine a use case for 2 different symlink targets on the
> source and target clusters. For example the primary's pg_wal directory
> can have a different symlink target as compared to a standby's
> (different mount points on the same network maybe?). An end user might
> not desire pg_rewind meddling with that setup or may desire pg_rewind to
> treat the source as a source-of-truth with respect to this as well and
> would want pg_rewind to overwrite the target's symlink. Maybe doing a
> check and emitting a warning with hint/detail is prudent here while
> taking no action.

We have special handling for 'pg_wal' to pretend that it's a regular 
directory (see process_source_file()), so that's taken care of. But if 
you did a something similar with some other subdirectory, that would be 
a problem.

>>> 14. queue_overwrite_range(), finish_overwrite() instead of
>>> queue_fetch_range(), finish_fetch()? Similarly update\
>>> *_fetch_file_range() and *_finish_fetch()
>>>
>>>
>>> 15. Let's have local_source.c and libpq_source.c instead of *_fetch.c
>>
>> Good idea! And fetch.h -> rewind_source.h.
> 
> +1. You might have missed the changes to rename "fetch" -> "overwrite"
> as was mentioned in 14.

I preferred the "fetch" nomenclature in those function names. They fetch 
and overwrite the file ranges, so 'fetch' still seems appropriate. 
"fetch" -> "overwrite" would make sense if you wanted to emphasize the 
"overwrite" part more. Or we could rename it to "fetch_and_overwrite". 
But overall I think "fetch" is fine.

>>> 16.
>>>
>>>> conn = PQconnectdb(connstr_source);
>>>>
>>>> if (PQstatus(conn) == CONNECTION_BAD)
>>>> pg_fatal("could not connect to server: %s",
>>>> PQerrorMessage(conn));
>>>>
>>>> if (showprogress)
>>>> pg_log_info("connected to server");
>>>
>>>
>>> The above hunk should be part of init_libpq_source(). Consequently,
>>> init_libpq_source() should take a connection string instead of a conn.
>>
>> The libpq connection is also needed by WriteRecoveryConfig(), that's why
>> it's not fully encapsulated in libpq_source.
> 
> Ah. I find it pretty weird why we need to specify --source-server to
> have ----write-recovery-conf work. From the code, we only need the conn
> for calling PQserverVersion(), something we can easily get by slurping
> pg_controldata on the source side? Maybe we can remove this limitation?

Yeah, perhaps. In another patch :-).

I read through the patches one more time, fixed a bunch of typos and 
such, and pushed patches 1-4. I'm going to spend some more time on 
testing the last patch. It allows using a standby server as the source, 
and we don't have any tests for that yet. Thanks for the review!

- Heikki



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
On 04/11/2020 11:23, Heikki Linnakangas wrote:
> I read through the patches one more time, fixed a bunch of typos and
> such, and pushed patches 1-4. I'm going to spend some more time on
> testing the last patch. It allows using a standby server as the source,
> and we don't have any tests for that yet. Thanks for the review!

Did some more testing, fixed one bug, and pushed.

To test this, I set up a cluster with one primary, a standby, and a 
cascaded standby. I launched a test workload against the primary that 
creates tables, inserts rows, and drops tables continuously. In another 
shell, I promoted the cascaded standby, run some updates on the promoted 
server, and finally, run pg_rewind pointed at the standby, and start it 
again as a cascaded standby. Repeat.

Attached are the scripts I used. I edited them between test runs to test 
slightly different scenarios. I don't expect them to be very useful to 
anyone else, but the Internet is my backup.

I did find one bug in the patch with that, so the time was well spent: 
the code in process_queued_fetch_requests() got confused and errored 
out, if a file was removed in the source system while pg_rewind was 
running. There was code to deal with that, but it was broken. Fixed that.

- Heikki

Attachment

Re: Refactor pg_rewind code and make it work against a standby

From
Tom Lane
Date:
Not sure if you noticed, but piculet has twice failed the
007_standby_source.pl test that was added by 9c4f5192f:

https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=piculet&dt=2020-11-15%2006%3A00%3A11
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=piculet&dt=2020-11-13%2011%3A20%3A10

and francolin failed once:

https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=francolin&dt=2020-11-12%2018%3A57%3A33

These failures look the same:

#   Failed test 'table content after rewind and insert: query result matches'
#   at t/007_standby_source.pl line 160.
#          got: 'in A
# in A, before promotion
# in A, after C was promoted
# '
#     expected: 'in A
# in A, before promotion
# in A, after C was promoted
# in A, after rewind
# '
# Looks like you failed 1 test of 3.
[11:27:01] t/007_standby_source.pl ...
Dubious, test returned 1 (wstat 256, 0x100)
Failed 1/3 subtests

Now, I'm not sure what to make of that, but I can't help noticing that
piculet uses --disable-atomics while francolin uses --disable-spinlocks.
That leads the mind towards some kind of low-level synchronization
bug ...

            regards, tom lane



Re: Refactor pg_rewind code and make it work against a standby

From
Tom Lane
Date:
I wrote:
> Not sure if you noticed, but piculet has twice failed the
> 007_standby_source.pl test that was added by 9c4f5192f:
> ...
> Now, I'm not sure what to make of that, but I can't help noticing that
> piculet uses --disable-atomics while francolin uses --disable-spinlocks.
> That leads the mind towards some kind of low-level synchronization
> bug ...

Or, maybe it's less mysterious than that.  The failure looks like we
have not waited long enough for the just-inserted row to get replicated
to node C.  That wait is implemented as

    $lsn = $node_a->lsn('insert');
    $node_b->wait_for_catchup('node_c', 'write', $lsn);

which looks fishy ... shouldn't wait_for_catchup be told to
wait for replay of that LSN, not just write-the-WAL?

            regards, tom lane



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
On 15/11/2020 09:07, Tom Lane wrote:
> I wrote:
>> Not sure if you noticed, but piculet has twice failed the
>> 007_standby_source.pl test that was added by 9c4f5192f:
>> ...
>> Now, I'm not sure what to make of that, but I can't help noticing that
>> piculet uses --disable-atomics while francolin uses --disable-spinlocks.
>> That leads the mind towards some kind of low-level synchronization
>> bug ...
> 
> Or, maybe it's less mysterious than that.  The failure looks like we
> have not waited long enough for the just-inserted row to get replicated
> to node C.  That wait is implemented as
> 
>     $lsn = $node_a->lsn('insert');
>     $node_b->wait_for_catchup('node_c', 'write', $lsn);
> 
> which looks fishy ... shouldn't wait_for_catchup be told to
> wait for replay of that LSN, not just write-the-WAL?

Yep, quite right. Fixed that way, thanks for the debugging!

- Heikki



Re: Refactor pg_rewind code and make it work against a standby

From
Andres Freund
Date:
Hi,

On 2020-11-15 17:10:53 +0200, Heikki Linnakangas wrote:
> Yep, quite right. Fixed that way, thanks for the debugging!

I locally, on a heavily modified branch (AIO support), started to get
consistent failures in this test. I *suspect*, but am not sure, that
it's the test's fault, not the fault of modifications.

As far as I can tell, after the pg_rewind call, there's no guarantee
that node_c has fully caught up to the 'in A, after C was promoted'
insertion on node a. Thus at the check_query() I sometimes get just 'in
A, before promotion' back.

After adding a wait that problem seems to be fixed. Here's what I did

diff --git i/src/bin/pg_rewind/t/007_standby_source.pl w/src/bin/pg_rewind/t/007_standby_source.pl
index f6abcc2d987..48898bef2f5 100644
--- i/src/bin/pg_rewind/t/007_standby_source.pl
+++ w/src/bin/pg_rewind/t/007_standby_source.pl
@@ -88,6 +88,7 @@ $node_c->safe_psql('postgres', "checkpoint");
 # - you need to rewind.
 $node_a->safe_psql('postgres',
     "INSERT INTO tbl1 VALUES ('in A, after C was promoted')");
+$lsn = $node_a->lsn('insert');
 
 # Also insert a new row in the standby, which won't be present in the
 # old primary.
@@ -142,6 +143,8 @@ $node_primary = $node_c;
 # Run some checks to verify that C has been successfully rewound,
 # and connected back to follow B.
 
+$node_b->wait_for_catchup('node_c', 'replay', $lsn);
+
 check_query(
     'SELECT * FROM tbl1',
     qq(in A


- Andres



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
On 20/11/2020 02:38, Andres Freund wrote:
> I locally, on a heavily modified branch (AIO support), started to get
> consistent failures in this test. I *suspect*, but am not sure, that
> it's the test's fault, not the fault of modifications.
> 
> As far as I can tell, after the pg_rewind call, there's no guarantee
> that node_c has fully caught up to the 'in A, after C was promoted'
> insertion on node a. Thus at the check_query() I sometimes get just 'in
> A, before promotion' back.
> 
> After adding a wait that problem seems to be fixed. Here's what I did
> 
> diff --git i/src/bin/pg_rewind/t/007_standby_source.pl w/src/bin/pg_rewind/t/007_standby_source.pl
> index f6abcc2d987..48898bef2f5 100644
> --- i/src/bin/pg_rewind/t/007_standby_source.pl
> +++ w/src/bin/pg_rewind/t/007_standby_source.pl
> @@ -88,6 +88,7 @@ $node_c->safe_psql('postgres', "checkpoint");
>   # - you need to rewind.
>   $node_a->safe_psql('postgres',
>       "INSERT INTO tbl1 VALUES ('in A, after C was promoted')");
> +$lsn = $node_a->lsn('insert');
>   
>   # Also insert a new row in the standby, which won't be present in the
>   # old primary.
> @@ -142,6 +143,8 @@ $node_primary = $node_c;
>   # Run some checks to verify that C has been successfully rewound,
>   # and connected back to follow B.
>   
> +$node_b->wait_for_catchup('node_c', 'replay', $lsn);
> +
>   check_query(
>       'SELECT * FROM tbl1',
>       qq(in A

Yes, I was able to reproduced that by inserting a strategic sleep in the 
test and pausing replication by attaching gdb to the walsender process.

Pushed a fix similar to your patch, but I put the wait_for_catchup() 
before running pg_rewind. The point of inserting the 'in A, after C was 
promoted' row is that it's present in B when pg_rewind runs.

Thanks!

- Heikki



Re: Refactor pg_rewind code and make it work against a standby

From
Andres Freund
Date:
Hi,

On 2020-11-20 16:19:03 +0200, Heikki Linnakangas wrote:
> Pushed a fix similar to your patch, but I put the wait_for_catchup() before
> running pg_rewind. The point of inserting the 'in A, after C was promoted'
> row is that it's present in B when pg_rewind runs.

Hm - don't we possibly need *both*? Since post pg_rewind recovery starts
at the previous checkpoint, it's quite possible for C to get ready to
answer queries before that record has been replayed?

Thanks,

Andres Freund



Re: Refactor pg_rewind code and make it work against a standby

From
Heikki Linnakangas
Date:
On 20/11/2020 19:14, Andres Freund wrote:
> Hi,
> 
> On 2020-11-20 16:19:03 +0200, Heikki Linnakangas wrote:
>> Pushed a fix similar to your patch, but I put the wait_for_catchup() before
>> running pg_rewind. The point of inserting the 'in A, after C was promoted'
>> row is that it's present in B when pg_rewind runs.
> 
> Hm - don't we possibly need *both*? Since post pg_rewind recovery starts
> at the previous checkpoint, it's quite possible for C to get ready to
> answer queries before that record has been replayed?

No, C will not reach consistent state until all the WAL in the source 
system has been replayed. pg_rewind will set minRecoveryPoint to the 
minRecoveryPoint of the source system, after copying all the files. (Or 
its insert point, if it's not a standby server, but in this case it is). 
Same as when taking an online backup.

- Heikki