Re: Streaming Replication patch for CommitFest 2009-09 - Mailing list pgsql-hackers

From Heikki Linnakangas
Subject Re: Streaming Replication patch for CommitFest 2009-09
Date
Msg-id 4AB31EEC.4000509@enterprisedb.com
Whole thread Raw
In response to Re: Streaming Replication patch for CommitFest 2009-09  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Responses Re: Streaming Replication patch for CommitFest 2009-09
Re: Streaming Replication patch for CommitFest 2009-09
List pgsql-hackers
Heikki Linnakangas wrote:
> I'm thinking that walreceiver should be a stand-alone program that the
> startup process launches, similar to how it invokes restore_command in
> PITR recovery. Instead of using system(), though, it would use
> fork+exec, and a pipe to communicate.

Here's a WIP patch to do that, over your latest posted patch. I've also
pushed this to my git repository at
git://git.postgresql.org/git/users/heikki/postgres.git, "replication"
branch.

I'll continue reviewing...

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6804644..364d7e4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -41,7 +41,6 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
@@ -54,6 +53,7 @@
 #include "utils/guc.h"
 #include "utils/ps_status.h"
 #include "pg_trace.h"
+#include "postmaster/fork_process.h"


 /* File path names (all relative to $PGDATA) */
@@ -185,7 +185,8 @@ static TimestampTz recoveryLastXTime = 0;

 /* options taken from recovery.conf for XLOG streaming */
 static bool StandbyMode = false;
-char *TriggerFile = NULL;
+static char *TriggerFile = NULL;
+static char *conninfo = NULL;

 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
 static TransactionId recoveryStopXid;
@@ -489,6 +490,8 @@ static volatile sig_atomic_t shutdown_requested = false;
 static volatile sig_atomic_t in_restore_command = false;


+static pid_t WalReceiverPid = 0;
+
 static void XLogArchiveNotify(const char *xlog);
 static bool XLogArchiveCheckDone(const char *xlog);
 static bool XLogArchiveIsBusy(const char *xlog);
@@ -541,6 +544,10 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 static void rm_redo_error_callback(void *arg);
 static int    get_sync_bit(int method);

+static void StartWalReceiver(TimeLineID tli, XLogRecPtr startlsn);
+static void WaitNextXLogAvailable(XLogRecPtr recptr);
+static void WaitForTrigger(void);
+

 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -1180,18 +1187,6 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
     return false;                /* buffer does not need to be backed up */
 }

-/* Report XLOG streaming progress in PS display */
-void
-ReportLogstreamResult(void)
-{
-    char    activitymsg[50];
-
-    snprintf(activitymsg, sizeof(activitymsg),
-             "streaming %X/%X",
-             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
-    set_ps_display(activitymsg, false);
-}
-
 /*
  * XLogArchiveNotify
  *
@@ -3469,7 +3464,7 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)

         /* If there is no valid record available, request XLOG streaming */
         startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
-        RequestXLogStreaming(recoveryTargetTLI, startlsn);
+        StartWalReceiver(recoveryTargetTLI, startlsn);

         /* Needs to read the current page again if the next record is in it */
         needReread = haveNextRecord;
@@ -4934,7 +4929,6 @@ readRecoveryCommandFile(void)
 {
     FILE       *fd;
     char        cmdline[MAXPGPATH];
-    char       *conninfo = NULL;
     TimeLineID    rtli = 0;
     bool        rtliGiven = false;
     bool        syntaxError = false;
@@ -5113,14 +5107,6 @@ readRecoveryCommandFile(void)
                         cmdline),
               errhint("Lines should have the format parameter = 'value'.")));

-    /* Inform walreceiver of the connection information via file */
-    if (StandbyMode)
-    {
-        write_conninfo_file(conninfo);
-        if (conninfo)
-            pfree(conninfo);
-    }
-
     /* If not in standby mode, restore_command must be supplied */
     if (!StandbyMode && recoveryRestoreCommand == NULL)
         ereport(FATAL,
@@ -5282,7 +5268,13 @@ exitStreamingRecovery(void)
      * exited, and recovery checkpoint and subsequent records are
      * no longer overwritten unexpectedly.
      */
-    ShutdownWalRcv();
+    if (WalReceiverPid != 0)
+    {
+        int status = 0;
+        kill(WalReceiverPid, SIGTERM);
+        waitpid(WalReceiverPid, &status, 0);
+        WalReceiverPid = 0;
+    }

     /* We are no longer in streaming recovery state */
     InStreamingRecovery = false;
@@ -7185,7 +7177,7 @@ CreateRestartPoint(int flags)

     /* Are we doing recovery from XLOG stream? */
     if (!InStreamingRecovery)
-        InStreamingRecovery = WalRcvInProgress();
+        InStreamingRecovery = (WalReceiverPid != 0);

     /*
      * Delete old log files (those no longer needed even for previous
@@ -7203,8 +7195,9 @@ CreateRestartPoint(int flags)
     {
         XLogRecPtr    endptr;

-        /* Get the current (or recent) end of xlog */
-        endptr = GetWalRcvWriteRecPtr();
+        LWLockAcquire(ControlFileLock, LW_SHARED);
+        endptr = ControlFile->minRecoveryPoint;
+        LWLockRelease(ControlFileLock);

         PrevLogSeg(_logId, _logSeg);
         RemoveOldXlogFiles(_logId, _logSeg, endptr);
@@ -8443,3 +8436,193 @@ StartupProcessMain(void)
      */
     proc_exit(0);
 }
+
+
+
+
+/** WAL receiver stuff **/
+
+static int walreceiver_readfd;
+
+static void
+StartWalReceiver(TimeLineID tli, XLogRecPtr startlsn)
+{
+    pid_t pid;
+    char       *av[4];
+    char    startptr[22];
+    int pfildes[2];
+
+    sprintf(startptr, "%u %X/%X", tli, startlsn.xlogid, startlsn.xrecoff);
+
+    av[0] = "walreceiver";
+    av[1] = startptr;
+    av[2] = conninfo;
+    av[3] = 0;
+
+    pipe(pfildes);
+
+    /* Fire off execv in child */
+    if ((pid = fork_process()) == 0)
+    {
+        char walreceiverpath[MAXPGPATH];
+
+        find_other_exec(my_exec_path, "walreceiver",
+                        "walreceiver " PG_VERSION_STR,
+                        walreceiverpath);
+
+        dup2(pfildes[1], 1); /* stdout */
+        if (execv(walreceiverpath, av) < 0)
+        {
+            ereport(LOG,
+                    (errmsg("could not execute walreceiver process \"%s\": %m",
+                            walreceiverpath)));
+            /* We're already in the child process here, can't return */
+            exit(1);
+        }
+    }
+    else
+    {
+        walreceiver_readfd = pfildes[0];
+        WalReceiverPid = pid;
+    }
+}
+
+static bool foundTrigger = false;
+
+/*
+ * Wait for the XLOG records at given position available.
+ *
+ * The XLOG records already written by walreceiver are regarded as
+ * available.
+ *
+ * recptr: indicates the byte position which caller wants to read the
+ * XLOG record up to.
+ *
+ * Called by the startup process in streaming recovery.
+ */
+static void
+WaitNextXLogAvailable(XLogRecPtr recptr)
+{
+    struct stat stat_buf;
+    bool signaled = false;
+
+#ifdef REPLICATION_DEBUG
+    if (REPLICATION_DEBUG_ENABLED)
+        elog(LOG, "xlog wait request %X/%X; write %X/%X",
+             recptr.xlogid, recptr.xrecoff,
+             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+#endif
+
+    /* Quick exit if already known available */
+    while(XLByteLT(recptr, LogstreamResult.Write))
+    {
+        char buf[101];
+        int i;
+
+        for(i = 0; i < 100; i++)
+        {
+            read(walreceiver_readfd, &buf[i], 1);
+            if (buf[i] == '\0')
+                break;
+        }
+        buf[i] = '\0';
+
+        /* Update local status */
+        sscanf(buf, "%X/%X", &LogstreamResult.Write.xlogid, &LogstreamResult.Write.xrecoff);
+
+        /* If available already, leave here */
+        if (XLByteLT(recptr, LogstreamResult.Write))
+        {
+            /* XXX
+            XLogArchiveNotifySeg(recvId, recvSeg);
+            */
+
+            return;
+        }
+
+        /* Check to see if the trigger file exists */
+        if (TriggerFile != NULL && !foundTrigger &&
+            stat(TriggerFile, &stat_buf) == 0)
+        {
+            ereport(LOG,
+                    (errmsg("trigger file found: %s", TriggerFile)));
+            foundTrigger = true;
+            unlink(TriggerFile);
+        }
+
+        /*
+         * The presence of a trigger file shuts down walreceiver if it's
+         * in progress.
+         */
+        if (WalReceiverPid != 0)
+        {
+            if (foundTrigger && !signaled)
+            {
+                kill(WalReceiverPid, SIGTERM);
+                signaled = true;    /* prevents signal from being repeated */
+            }
+        }
+        /*
+         * If walreceiver is not in progress and has been retried more than
+         * MAX_WALRCV_RETRIES times, give up on the wait for the next record,
+         * which would cause a streaming recovery to end. If the former
+         * condition is met and the retry-count has not reached the maximum
+         * number yet, request XLOG streaming again.
+         */
+        else
+        {
+            return;
+        }
+
+        /*
+         * This possibly-long loop needs to handle interrupts of startup
+         * process.
+         */
+        HandleStartupProcInterrupts();
+
+        pg_usleep(100000L); /* 100ms */
+    }
+}
+
+
+/* Wait until a trigger file is found */
+static void
+WaitForTrigger(void)
+{
+    int    seconds_before_warning = 15;
+    int    elapsed    = 0;
+    int count    = 0;
+    struct stat stat_buf;
+
+    /* Quick exit if a trigger file was not specified or was already found */
+    if (TriggerFile == NULL || foundTrigger)
+        return;
+
+    while (stat(TriggerFile, &stat_buf) != 0)
+    {
+        /*
+         * This possibly-long loop needs to handle interrupts of startup
+         * process.
+         */
+        HandleStartupProcInterrupts();
+
+        pg_usleep(100000L);        /* 100ms */
+
+        if (++count >= 10)        /* 1s passed */
+        {
+            count = 0;
+
+            if (++elapsed >= seconds_before_warning)
+            {
+                seconds_before_warning *= 2;     /* This wraps in >10 years... */
+                ereport(WARNING,
+                        (errmsg("still waiting for the trigger file \"%s\" (%d seconds elapsed)",
+                                TriggerFile, elapsed)));
+            }
+        }
+    }
+
+    ereport(LOG,
+            (errmsg("trigger file found: %s", TriggerFile)));
+    unlink(TriggerFile);
+}
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 824a93f..06e9d33 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -31,7 +31,6 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "postmaster/bgwriter.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walwriter.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
@@ -340,9 +339,6 @@ AuxiliaryProcessMain(int argc, char *argv[])
             case WalWriterProcess:
                 statmsg = "wal writer process";
                 break;
-            case WalReceiverProcess:
-                statmsg = "wal receiver process";
-                break;
             default:
                 statmsg = "??? process";
                 break;
@@ -448,11 +444,6 @@ AuxiliaryProcessMain(int argc, char *argv[])
             WalWriterMain();
             proc_exit(1);        /* should never return */

-        case WalReceiverProcess:
-            /* don't set signals, walreceiver has its own agenda */
-            WalReceiverMain();
-            proc_exit(1);        /* should never return */
-
         default:
             elog(PANIC, "unrecognized process type: %d", auxType);
             proc_exit(1);
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 616cd2c..b73fdf4 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)

 OBJS = autovacuum.o bgwriter.o fork_process.o pgarch.o pgstat.o postmaster.o \
-    syslogger.o walwriter.o walsender.o walreceiver.o
+    syslogger.o walwriter.o walsender.o

 walreceiver.o: submake-libpq

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3ad82ef..b91ca88 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -108,7 +108,6 @@
 #include "postmaster/pgarch.h"
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -125,7 +124,6 @@
 #include "storage/spin.h"
 #endif

-
 /*
  * List of active backends (or child processes anyway; we don't actually
  * know whether a given child has become a backend or is still in the
@@ -217,7 +215,6 @@ char       *bonjour_name;
 static pid_t StartupPID = 0,
             BgWriterPID = 0,
             WalWriterPID = 0,
-            WalReceiverPID = 0,
             AutoVacPID = 0,
             PgArchPID = 0,
             PgStatPID = 0,
@@ -289,7 +286,6 @@ typedef enum
     PM_WAIT_BACKENDS,            /* waiting for live backends to exit */
     PM_SHUTDOWN,                /* waiting for bgwriter to do shutdown ckpt */
     PM_SHUTDOWN_2,                /* waiting for archiver to finish */
-    PM_SHUTDOWN_3,                /* waiting for walsenders to finish */
     PM_WAIT_DEAD_END,            /* waiting for dead_end children to exit */
     PM_NO_CHILDREN                /* all important children have exited */
 } PMState;
@@ -468,7 +464,6 @@ static void ShmemBackendArrayRemove(Backend *bn);
 #define StartupDataBase()        StartChildProcess(StartupProcess)
 #define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
 #define StartWalWriter()        StartChildProcess(WalWriterProcess)
-#define StartWalReceiver()        StartChildProcess(WalReceiverProcess)

 /* Macros to check exit status of a child process */
 #define EXIT_STATUS_0(st)  ((st) == 0)
@@ -1465,9 +1460,10 @@ ServerLoop(void)

         /* If we have lost the archiver, try to start a new one */
         if (XLogArchivingActive() && PgArchPID == 0 &&
-            (pmState == PM_RUN ||
+            (pmState == PM_RUN /* XXX postmaster doesn't know if walreceiver is active
+                                 ||
              ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
-              WalRcvInProgress())))
+             WalRcvInProgress()) */))
             PgArchPID = pgarch_start();

         /* If we have lost the stats collector, try to start a new one */
@@ -1640,7 +1636,7 @@ retry1:
     if (proto == XLOG_STREAMING_CODE && !am_walsender)
     {
         am_walsender = true;
-        /* No packets other than regular one should not follow */
+        /* No packets other than regular one should follow */
         return ProcessStartupPacket(port, SSLdone);
     }

@@ -2097,8 +2093,6 @@ SIGHUP_handler(SIGNAL_ARGS)
             signal_child(BgWriterPID, SIGHUP);
         if (WalWriterPID != 0)
             signal_child(WalWriterPID, SIGHUP);
-        if (WalReceiverPID != 0)
-            signal_child(WalReceiverPID, SIGHUP);
         if (AutoVacPID != 0)
             signal_child(AutoVacPID, SIGHUP);
         if (PgArchPID != 0)
@@ -2194,8 +2188,6 @@ pmdie(SIGNAL_ARGS)

             if (StartupPID != 0)
                 signal_child(StartupPID, SIGTERM);
-            if (WalReceiverPID != 0)
-                signal_child(WalReceiverPID, SIGTERM);
             if (pmState == PM_RECOVERY)
             {
                 /* only bgwriter is active in this state */
@@ -2243,8 +2235,6 @@ pmdie(SIGNAL_ARGS)
                 signal_child(BgWriterPID, SIGQUIT);
             if (WalWriterPID != 0)
                 signal_child(WalWriterPID, SIGQUIT);
-            if (WalReceiverPID != 0)
-                signal_child(WalReceiverPID, SIGQUIT);
             if (AutoVacPID != 0)
                 signal_child(AutoVacPID, SIGQUIT);
             if (PgArchPID != 0)
@@ -2404,17 +2394,16 @@ reaper(SIGNAL_ARGS)
                  */
                 Assert(Shutdown > NoShutdown);

-                if (PgArchPID != 0)
+                if (PgArchPID != 0 || WalSndInProgress())
                 {
                     /* Waken archiver for the last time */
-                    signal_child(PgArchPID, SIGUSR2);
-                    pmState = PM_SHUTDOWN_2;
-                }
-                else if (WalSndInProgress())
-                {
+                    if (PgArchPID != 0)
+                        signal_child(PgArchPID, SIGUSR2);
+
                     /* Waken walsenders for the last time */
                     SignalWalSenders(SIGUSR2);
-                    pmState = PM_SHUTDOWN_3;
+
+                    pmState = PM_SHUTDOWN_2;
                 }
                 else
                     pmState = PM_WAIT_DEAD_END;
@@ -2454,20 +2443,6 @@ reaper(SIGNAL_ARGS)
         }

         /*
-         * Was it the wal receiver?  If exit status is zero (normal) or one
-         * (FATAL exit), we assume everything is all right just like normal
-         * backends.
-         */
-        if (pid == WalReceiverPID)
-        {
-            WalReceiverPID = 0;
-            if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
-                HandleChildCrash(pid, exitstatus,
-                                 _("WAL receiver process"));
-            continue;
-        }
-
-        /*
          * Was it the autovacuum launcher?    Normal exit can be ignored; we'll
          * start a new one at the next iteration of the postmaster's main
          * loop, if necessary.    Any other exit condition is treated as a
@@ -2495,16 +2470,12 @@ reaper(SIGNAL_ARGS)
                 LogChildExit(LOG, _("archiver process"),
                              pid, exitstatus);
             if (XLogArchivingActive() &&
-                (pmState == PM_RUN ||
+                (pmState == PM_RUN/*  XXX postmaster doesn't know if walreceiver is active
+                                      ||
                  ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
-                  WalRcvInProgress())))
+                 WalRcvInProgress())*/))
                 PgArchPID = pgarch_start();
-            else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress())
-            {
-                SignalWalSenders(SIGUSR2);
-                pmState = PM_SHUTDOWN_3;
-            }
-            else
+            else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress())
                 pmState = PM_WAIT_DEAD_END;
             continue;
         }
@@ -2611,8 +2582,8 @@ CleanupBackend(int pid,
                  * advance to the next shutdown step.
                  */
                 if (bp->child_type == BACKEND_TYPE_WALSND &&
-                    pmState == PM_SHUTDOWN_3 &&
-                    !WalSndInProgress())
+                    pmState == PM_SHUTDOWN_2 &&
+                    !WalSndInProgress() && PgArchPID == 0)
                     pmState = PM_WAIT_DEAD_END;
             }
             DLRemove(curr);
@@ -2729,18 +2700,6 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
         signal_child(WalWriterPID, (SendStop ? SIGSTOP : SIGQUIT));
     }

-    /* Take care of the walreceiver too */
-    if (pid == WalReceiverPID)
-        WalReceiverPID = 0;
-    else if (WalReceiverPID != 0 && !FatalError)
-    {
-        ereport(DEBUG2,
-                (errmsg_internal("sending %s to process %d",
-                                 (SendStop ? "SIGSTOP" : "SIGQUIT"),
-                                 (int) WalReceiverPID)));
-        signal_child(WalReceiverPID, (SendStop ? SIGSTOP : SIGQUIT));
-    }
-
     /* Take care of the autovacuum launcher too */
     if (pid == AutoVacPID)
         AutoVacPID = 0;
@@ -2884,7 +2843,6 @@ PostmasterStateMachine(void)
          */
         if (CountChildren(true) == 0 &&
             StartupPID == 0 &&
-            WalReceiverPID == 0 &&
             (BgWriterPID == 0 || !FatalError) &&
             WalWriterPID == 0 &&
             AutoVacPID == 0)
@@ -2961,7 +2919,6 @@ PostmasterStateMachine(void)
         {
             /* These other guys should be dead already */
             Assert(StartupPID == 0);
-            Assert(WalReceiverPID == 0);
             Assert(BgWriterPID == 0);
             Assert(WalWriterPID == 0);
             Assert(AutoVacPID == 0);
@@ -4119,9 +4076,10 @@ sigusr1_handler(SIGNAL_ARGS)

     if (CheckPostmasterSignal(PMSIGNAL_START_ARCHIVER) &&
         XLogArchivingActive() && PgArchPID == 0 &&
-        (pmState == PM_RUN ||
+        (pmState == PM_RUN /*  XXX postmaster doesn't know if walreceiver is active
+                               ||
          ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
-          WalRcvInProgress())))
+         WalRcvInProgress()) */))
     {
         /*
          * Start archiver process. This is mainly called for archiving during
@@ -4173,12 +4131,6 @@ sigusr1_handler(SIGNAL_ARGS)
         RegisterWalSender();
     }

-    if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER))
-    {
-        /* The startup process wants us to start a walreceiver */
-        WalReceiverPID = StartWalReceiver();
-    }
-
     PG_SETMASK(&UnBlockSig);

     errno = save_errno;
@@ -4372,10 +4324,6 @@ StartChildProcess(AuxProcType type)
                 ereport(LOG,
                         (errmsg("could not fork WAL writer process: %m")));
                 break;
-            case WalReceiverProcess:
-                ereport(LOG,
-                        (errmsg("could not fork WAL receiver process: %m")));
-                break;
             default:
                 ereport(LOG,
                         (errmsg("could not fork process: %m")));
diff --git a/src/backend/postmaster/walreceiver.c b/src/backend/postmaster/walreceiver.c
deleted file mode 100644
index a07b1f2..0000000
--- a/src/backend/postmaster/walreceiver.c
+++ /dev/null
@@ -1,980 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * walreceiver.c
- *
- * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It
- * takes charge of XLOG streaming receiver in the standby server. At first,
- * it is started by the postmaster and connects to the primary server,
- * when the startup process in the standby mode requests XLOG streaming
- * replication. It attempts to keep receiving XLOG records from the primary
- * server and writing them to the disk, as long as the connection is alive
- * (i.e., like any backend, there is an one to one relationship between
- * a connection and the walreceiver process). Also, it notifies the startup
- * process of the location of XLOG records available. This enables
- * the startup process to read XLOG records from XLOG stream and apply them
- * to make a replica of the primary database.
- *
- * Normal termination is by SIGTERM or an end-of-streaming message from the
- * primary server, which instructs the walreceiver to exit(0). Emergency
- * termination is by SIGQUIT; like any backend, the walreceiver will simply
- * abort and exit on SIGQUIT. A close of the connection and a FATAL error
- * are treated as not a crash but approximately normal termination.
- *
- * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group
- *
- *
- * IDENTIFICATION
- *      $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include <unistd.h>
-
-#include "access/xlog_internal.h"
-#include "libpq-fe.h"
-#include "libpq/pqsignal.h"
-#include "miscadmin.h"
-#include "postmaster/walreceiver.h"
-#include "storage/fd.h"
-#include "storage/ipc.h"
-#include "storage/pmsignal.h"
-#include "storage/shmem.h"
-#include "utils/guc.h"
-#include "utils/memutils.h"
-#include "utils/resowner.h"
-
-static WalRcvData *WalRcv = NULL;
-
-/* streamConn is a PGconn object of a connection to walsender from walreceiver */
-static PGconn *streamConn;
-
-/* Path for the connection information file (relative to $PGDATA) */
-#define CONNINFO_FILENAME    "global/conninfo"
-
-/*
- * These variables are used similarly to openLogFile/Id/Seg/Off,
- * but for walreceiver to write the XLOG.
- */
-static int    recvFile = -1;
-static uint32 recvId = 0;
-static uint32 recvSeg = 0;
-static uint32 recvOff = 0;
-
-/*
- * ZeroedRecPtr indicates the byte position that we have already zeroed. It is
- * updated when walreceiver writes a half-filled page that needs to be zeroed.
- * ZeroedBuffer points a zeroed buffer used for zeroing.
- */
-static XLogRecPtr    ZeroedRecPtr = {0, 0};
-static char           *ZeroedBuffer;
-
-/* Recovery has been already triggered? */
-static bool foundTrigger = false;
-
-/*
- * Max number of times to retry walreceiver
- *
- * XXX: Should this number be user-configurable?
- */
-#define MAX_WALRCV_RETRIES 0
-
-/*
- * Advances when startup process retries to request walreceiver.
- * When walreceiver is not in progress, if this counter is smaller
- * than MAX_WALRCV_RETRIES, we retry to start walreceiver.
- */
-static int NumWalRcvRetries = 0;
-
-/* Flags set by interrupt handlers of walreceiver for later service in the main loop */
-static volatile sig_atomic_t got_SIGHUP = false;
-static volatile sig_atomic_t shutdown_requested = false;
-
-/* Signal handlers */
-static void WalRcvSigHupHandler(SIGNAL_ARGS);
-static void WalRcvShutdownHandler(SIGNAL_ARGS);
-static void WalRcvQuickDieHandler(SIGNAL_ARGS);
-
-/* Prototypes for private functions */
-static void WalRcvLoop(void);
-static void    InitWalRcv(void);
-static void    WalRcvKill(int code, Datum arg);
-static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
-static void XLogWalRcvFlush(XLogRecPtr recptr);
-static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
-static char *read_conninfo_file(void);
-
-/* Main entry point for walreceiver process */
-void
-WalReceiverMain(void)
-{
-    MemoryContext walrcv_context;
-    char   *conninfo;
-
-    /* Mark walreceiver in progress */
-    InitWalRcv();
-
-    /*
-     * If possible, make this process a group leader, so that the postmaster
-     * can signal any child processes too.    (walreceiver probably never has
-     * any child processes, but for consistency we make all postmaster child
-     * processes do this.)
-     */
-#ifdef HAVE_SETSID
-    if (setsid() < 0)
-        elog(FATAL, "setsid() failed: %m");
-#endif
-
-    /* Properly accept or ignore signals the postmaster might send us */
-    pqsignal(SIGHUP, WalRcvSigHupHandler);        /* set flag to read config file */
-    pqsignal(SIGINT, SIG_IGN);
-    pqsignal(SIGTERM, WalRcvShutdownHandler);    /* request shutdown */
-    pqsignal(SIGQUIT, WalRcvQuickDieHandler);    /* hard crash time */
-    pqsignal(SIGALRM, SIG_IGN);
-    pqsignal(SIGPIPE, SIG_IGN);
-    pqsignal(SIGUSR1, SIG_IGN);
-    pqsignal(SIGUSR2, SIG_IGN);
-
-    /* Reset some signals that are accepted by postmaster but not here */
-    pqsignal(SIGCHLD, SIG_DFL);
-    pqsignal(SIGTTIN, SIG_DFL);
-    pqsignal(SIGTTOU, SIG_DFL);
-    pqsignal(SIGCONT, SIG_DFL);
-    pqsignal(SIGWINCH, SIG_DFL);
-
-    /* We allow SIGQUIT (quickdie) at all times */
-#ifdef HAVE_SIGPROCMASK
-    sigdelset(&BlockSig, SIGQUIT);
-#else
-    BlockSig &= ~(sigmask(SIGQUIT));
-#endif
-
-    /*
-     * Create a resource owner to keep track of our resources (not clear that
-     * we need this, but may as well have one).
-     */
-    CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
-
-    /*
-     * Create a memory context that we will do all our work in.  We do this so
-     * that we can reset the context during error recovery and thereby avoid
-     * possible memory leaks.  Formerly this code just ran in
-     * TopMemoryContext, but resetting that would be a really bad idea.
-     */
-    walrcv_context = AllocSetContextCreate(TopMemoryContext,
-                                              "Wal Receiver",
-                                              ALLOCSET_DEFAULT_MINSIZE,
-                                              ALLOCSET_DEFAULT_INITSIZE,
-                                              ALLOCSET_DEFAULT_MAXSIZE);
-    MemoryContextSwitchTo(walrcv_context);
-
-    /* Unblock signals (they were blocked when the postmaster forked us) */
-    PG_SETMASK(&UnBlockSig);
-
-    /* Get the starting XLOG location of XLOG streaming */
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        SpinLockAcquire(&walrcv->mutex);
-        LogstreamResult = walrcv->LogstreamResult;
-        SpinLockRelease(&walrcv->mutex);
-
-        /* Report XLOG streaming progress in PS display */
-        ReportLogstreamResult();
-    }
-
-    /* Read the connection information used to connect with the primary */
-    conninfo = read_conninfo_file();
-
-    /* Set up a connection for XLOG streaming */
-    streamConn = PQstartXLogStreaming(conninfo,
-                                      LogstreamResult.Write.xlogid,
-                                      LogstreamResult.Write.xrecoff);
-    if (PQstatus(streamConn) != CONNECTION_OK)
-        ereport(FATAL,
-                (errmsg("could not connect to the primary server : %s",
-                        PQerrorMessage(streamConn))));
-    pfree(conninfo);
-
-    /*
-     * Confirm that the current timeline of the primary is the same
-     * as the recovery target timeline.
-     */
-    ThisTimeLineID = PQtimeline(streamConn);
-    if (ThisTimeLineID != WalRcv->RecoveryTargetTLI)
-        ereport(FATAL,
-                (errmsg("timeline %u of the primary does not match "
-                        "recovery target timeline %u",
-                        ThisTimeLineID, WalRcv->RecoveryTargetTLI)));
-
-    ZeroedBuffer = (char *) palloc0(XLOG_BLCKSZ);
-
-    /* Main loop of walreceiver */
-    WalRcvLoop();
-}
-
-/* Main loop of walreceiver process */
-static void
-WalRcvLoop(void)
-{
-    char       *buf;
-    bool        finishing_seg;
-    bool        fsync_requested;
-    int            len;
-    XLogRecPtr    recptr;
-
-    /* Loop until end-of-streaming or error */
-    for (;;)
-    {
-        bool    fsynced = false;
-
-        /*
-         * Emergency bailout if postmaster has died.  This is to avoid the
-         * necessity for manual cleanup of all postmaster children.
-         */
-        if (!PostmasterIsAlive(true))
-            exit(1);
-
-        /*
-         * Exit walreceiver if we're not in recovery. This should not happen,
-         * but cross-check the status here.
-         */
-        if (!RecoveryInProgress())
-            ereport(FATAL,
-                    (errmsg("cannot continue XLOG streaming, recovery has already ended")));
-
-        /* Process any requests or signals received recently */
-        if (got_SIGHUP)
-        {
-            got_SIGHUP = false;
-            ProcessConfigFile(PGC_SIGHUP);
-        }
-
-        /* Normal exit from the walreceiver is here */
-        if (shutdown_requested)
-            proc_exit(0);
-
-        /* Receive XLogData message (wait for new message to arrive) */
-        len = PQgetXLogData(streamConn, &buf,
-                            (int *) &recptr.xlogid, (int *) &recptr.xrecoff,
-                            (char *) &finishing_seg, (char *) &fsync_requested, 0);
-
-        if (len < 0)    /* end-of-streaming or error */
-            break;
-
-        if (buf == NULL)    /* should not happen */
-            continue;
-
-#ifdef REPLICATION_DEBUG
-        if (REPLICATION_DEBUG_ENABLED)
-            elog(LOG, "xlog recv result %X/%X:%s%s; write %X/%X; flush %X/%X",
-                 recptr.xlogid, recptr.xrecoff,
-                 finishing_seg ? " finishing_seg" : "",
-                 fsync_requested ? " fsync_requested" : "",
-                 LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff,
-                 LogstreamResult.Flush.xlogid, LogstreamResult.Flush.xrecoff);
-#endif
-
-        /*
-         * A level of synchronization between both servers depends on when
-         * the standby returns a "success" of XLOG streaming to the primary.
-         * For example, the following timings can be considered:
-         *
-         *     A "success" is returned after
-         *         #1 receiving the logs and locating them on a memory
-         *         #2 writing them to the disk
-         *         #3 fsyncing them to the disk
-         *         #4 replaying them
-         *         ...etc
-         *
-         * We can choose only #2 now.
-         *
-         * Note: In #1 and #2, the logs might disappear if the standby fails
-         * before writing them to certainly the disk sector. But, since such
-         * missing logs are guaranteed to exist in the primary side,
-         * the transaction is not lost in the whole system (i.e., the standby
-         * can recover all transactions from the primary).
-         */
-
-        XLogWalRcvWrite(buf, len, recptr, &fsynced);
-
-        /*
-         * The logs in the XLogData message were written successfully,
-         * so we mark the message already consumed.
-         */
-        PQmarkConsumed(streamConn);
-
-        /*
-         * If fsync is not requested or was already done, we send a "success"
-         * to the primary before issuing fsync for end-of-segment.
-         */
-        if (fsynced || !fsync_requested)
-        {
-            if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
-                                (int) fsynced) == -1)
-                ereport(FATAL,
-                        (errmsg("could not send a message to the primary: %s",
-                                PQerrorMessage(streamConn))));
-        }
-
-        /*
-         * If we just wrote the whole last page of a logfile segment but
-         * had not fsynced it yet, fsync the segment immediately.  This
-         * avoids having to go back and re-open prior segments when an
-         * fsync request comes along later.
-         *
-         * Of course, if asked to fsync but not, do so.
-         */
-        if (!fsynced && (fsync_requested || finishing_seg))
-        {
-            XLogWalRcvFlush(recptr);
-
-            if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
-                                1) == -1)
-                ereport(FATAL,
-                        (errmsg("could not send a message to the primary: %s",
-                                PQerrorMessage(streamConn))));
-
-            /*
-             * If the segment is ready to copy to archival storage,
-             * notify the archiver so.
-             */
-            if (finishing_seg && XLogArchivingActive())
-                XLogArchiveNotifySeg(recvId, recvSeg);
-
-            /*
-             * XXX: Should we signal bgwriter to start a restartpoint
-             * if we've consumed too much xlog since the last one, like
-             * in normal processing? But this is not worth doing unless
-             * a restartpoint can be created independently from a
-             * checkpoint record.
-             */
-        }
-    }
-
-    if (len == -1)    /* end-of-streaming */
-    {
-        PGresult *res;
-
-        res = PQgetResult(streamConn);
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            PQclear(res);
-            proc_exit(0);
-        }
-        PQclear(res);
-    }
-
-    /* error */
-    ereport(FATAL,
-            (errmsg("could not read xlog records: %s",
-                    PQerrorMessage(streamConn))));
-}
-
-/* Mark this walreceiver in progress */
-static void
-InitWalRcv(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    /*
-     * WalRcv should be set up already (if we are a backend, we inherit
-     * this by fork() or EXEC_BACKEND mechanism from the postmaster).
-     */
-    if (walrcv == NULL)
-        elog(PANIC, "walreceiver control data uninitialized");
-
-    /* Make sure WalRcv is not in use */
-    if (walrcv->pid != 0)
-        elog(FATAL, "WalRcv is in use");
-
-    /* Arrange to clean up at walreceiver exit */
-    on_shmem_exit(WalRcvKill, 0);
-
-    /* Mark walreceiver in progress */
-    walrcv->pid = MyProcPid;
-}
-
-/*
- * Close a connection for XLOG streaming and mark this walreceiver
- * no longer in progress
- */
-static void
-WalRcvKill(int code, Datum arg)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    PQfinish(streamConn);
-    walrcv->pid = 0;
-    walrcv->in_progress = false;
-}
-
-/* SIGHUP: set flag to re-read config file at next convenient time */
-static void
-WalRcvSigHupHandler(SIGNAL_ARGS)
-{
-    got_SIGHUP = true;
-}
-
-/* SIGTERM: set flag to exit normally */
-static void
-WalRcvShutdownHandler(SIGNAL_ARGS)
-{
-    if (CritSectionCount == 0)
-        proc_exit(0);
-
-    /* Delay shutdown if we are inside a critical section */
-    shutdown_requested = true;
-}
-
-/*
- * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
- *
- * Some backend has bought the farm,
- * so we need to stop what we're doing and exit.
- */
-static void
-WalRcvQuickDieHandler(SIGNAL_ARGS)
-{
-    PG_SETMASK(&BlockSig);
-
-    /*
-     * We DO NOT want to run proc_exit() callbacks -- we're here because
-     * shared memory may be corrupted, so we don't want to try to clean up our
-     * transaction.  Just nail the windows shut and get out of town.  Now that
-     * there's an atexit callback to prevent third-party code from breaking
-     * things by calling exit() directly, we have to reset the callbacks
-     * explicitly to make this work as intended.
-     */
-    on_exit_reset();
-
-    /*
-     * Note we do exit(2) not exit(0).    This is to force the postmaster into a
-     * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
-     * backend.  This is necessary precisely because we don't clean up our
-     * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
-     * should ensure the postmaster sees this as a crash, too, but no harm
-     * in being doubly sure.)
-     */
-    exit(2);
-}
-
-/* Report shared-memory space needed by WalRcvShmemInit */
-Size
-WalRcvShmemSize(void)
-{
-    Size size = 0;
-
-    size = add_size(size, sizeof(WalRcvData));
-
-    return size;
-}
-
-/* Allocate and initialize walreceiver-related shared memory */
-void
-WalRcvShmemInit(void)
-{
-    bool    found;
-
-    WalRcv = (WalRcvData *)
-        ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
-
-    if (WalRcv == NULL)
-        ereport(FATAL,
-                (errcode(ERRCODE_OUT_OF_MEMORY),
-                 errmsg("not enough shared memory for walreceiver")));
-    if (found)
-        return;                    /* already initialized */
-
-    /* Initialize the data structures */
-    MemSet(WalRcv, 0, WalRcvShmemSize());
-    SpinLockInit(&WalRcv->mutex);
-}
-
-/* Is walreceiver in progress (or just starting up)? */
-bool
-WalRcvInProgress(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    return walrcv->in_progress;
-}
-
-/*
- * Write the log to disk.
- *
- * fsynced is set to true if the log was fsyned by O_DIRECT.
- */
-static void
-XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
-{
-    int        startoff;
-    int        endoff;
-
-    START_CRIT_SECTION();
-
-    if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
-    {
-        bool    use_existent;
-
-        /*
-         * XLOG segment files will be re-read in recovery operation soon,
-         * so we don't need to advise the OS to release any cache page.
-         */
-        if (recvFile >= 0 && close(recvFile))
-            ereport(PANIC,
-                    (errcode_for_file_access(),
-                     errmsg("could not close log file %u, segment %u: %m",
-                            recvId, recvSeg)));
-        recvFile = -1;
-
-        /* Create/use new log file */
-        XLByteToPrevSeg(recptr, recvId, recvSeg);
-        use_existent = true;
-        recvFile = XLogFileInit(recvId, recvSeg,
-                                  &use_existent, true);
-        recvOff = 0;
-    }
-
-    /* Make sure we have the current logfile open */
-    if (recvFile < 0)
-    {
-        XLByteToPrevSeg(recptr, recvId, recvSeg);
-        recvFile = XLogFileOpen(recvId, recvSeg);
-        recvOff = 0;
-    }
-
-    /* Calculate the start/end file offset of the received logs */
-    endoff = recptr.xrecoff % XLogSegSize;
-    startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;
-
-    /*
-     * Re-zero the page so that bytes beyond what we've written will look
-     * like zeroes and not valid XLOG records. Only end page which we are
-     * writing need to be zeroed. Of course, we can skip zeroing the pages
-     * full of the XLOG records. Save the end position of the already zeroed
-     * area at the variable ZeroedRecPtr, and avoid zeroing the same page
-     * two or more times.
-     *
-     * This must precede the writing of the actual logs. Otherwise, a crash
-     * before re-zeroing would cause a corrupted page.
-     */
-    if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
-    {
-        int        zlen;
-
-        zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
-        WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
-        ZeroedRecPtr = recptr;
-        ZeroedRecPtr.xrecoff += zlen;
-    }
-
-    /* Write out the logs */
-    WritePhysicalXLog(buf, len, startoff);
-    LogstreamResult.Send    = recptr;
-    LogstreamResult.Write    = recptr;
-
-    if (sync_method == SYNC_METHOD_OPEN ||
-        sync_method == SYNC_METHOD_OPEN_DSYNC)
-    {
-        LogstreamResult.Flush = recptr;
-        *fsynced = true;        /* logs were already fsynced */
-    }
-
-    /* Update shared-memory status */
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        SpinLockAcquire(&walrcv->mutex);
-        XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
-        XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
-        if (*fsynced)
-            XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
-        SpinLockRelease(&walrcv->mutex);
-    }
-
-    /* Report XLOG streaming progress in PS display */
-    ReportLogstreamResult();
-
-    END_CRIT_SECTION();
-}
-
-/* Flush the log to disk */
-static void
-XLogWalRcvFlush(XLogRecPtr recptr)
-{
-    START_CRIT_SECTION();
-
-    issue_xlog_fsync(recvFile, recvId, recvSeg);
-
-    LogstreamResult.Flush = recptr;
-
-    /* Update shared-memory status */
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        SpinLockAcquire(&walrcv->mutex);
-        XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
-        SpinLockRelease(&walrcv->mutex);
-    }
-
-    END_CRIT_SECTION();
-}
-
-/* Physical write to the given logs */
-static void
-WritePhysicalXLog(char *from, Size nbytes, int startoff)
-{
-    /* Need to seek in the file? */
-    if (recvOff != startoff)
-    {
-        if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
-            ereport(PANIC,
-                    (errcode_for_file_access(),
-                     errmsg("could not seek in log file %u, "
-                            "segment %u to offset %u: %m",
-                            recvId, recvSeg, startoff)));
-        recvOff = startoff;
-    }
-
-    /* OK to write the logs */
-    errno = 0;
-    if (write(recvFile, from, nbytes) != nbytes)
-    {
-        /* if write didn't set errno, assume no disk space */
-        if (errno == 0)
-            errno = ENOSPC;
-        ereport(PANIC,
-                (errcode_for_file_access(),
-                 errmsg("could not write to log file %u, segment %u "
-                        "at offset %u, length %lu: %m",
-                        recvId, recvSeg,
-                        recvOff, (unsigned long) nbytes)));
-    }
-
-    /* Update state for write */
-    recvOff += nbytes;
-}
-
-/*
- * Wait for the XLOG records at given position available.
- *
- * The XLOG records already written by walreceiver are regarded as
- * available.
- *
- * recptr: indicates the byte position which caller wants to read the
- * XLOG record up to.
- *
- * Called by the startup process in streaming recovery.
- */
-void
-WaitNextXLogAvailable(XLogRecPtr recptr)
-{
-    struct stat stat_buf;
-    bool signaled = false;
-
-#ifdef REPLICATION_DEBUG
-    if (REPLICATION_DEBUG_ENABLED)
-        elog(LOG, "xlog wait request %X/%X; write %X/%X",
-             recptr.xlogid, recptr.xrecoff,
-             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
-#endif
-
-    /* Quick exit if already known available */
-    if (XLByteLT(recptr, LogstreamResult.Write))
-        return;
-
-    for (;;)
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        /* Update local status */
-        SpinLockAcquire(&walrcv->mutex);
-        LogstreamResult = walrcv->LogstreamResult;
-        SpinLockRelease(&walrcv->mutex);
-
-        /* If available already, leave here */
-        if (XLByteLT(recptr, LogstreamResult.Write))
-            return;
-
-        /* Check to see if the trigger file exists */
-        if (TriggerFile != NULL && !foundTrigger &&
-            stat(TriggerFile, &stat_buf) == 0)
-        {
-            ereport(LOG,
-                    (errmsg("trigger file found: %s", TriggerFile)));
-            foundTrigger = true;
-            unlink(TriggerFile);
-        }
-
-        /*
-         * The presence of a trigger file shuts down walreceiver if it's
-         * in progress.
-         */
-        if (WalRcvInProgress())
-        {
-            pid_t    pid = walrcv->pid;
-
-            if (foundTrigger && !signaled && pid != 0)
-            {
-                kill(pid, SIGTERM);
-                signaled = true;    /* prevents signal from being repeated */
-            }
-        }
-        /*
-         * If walreceiver is not in progress and has been retried more than
-         * MAX_WALRCV_RETRIES times, give up on the wait for the next record,
-         * which would cause a streaming recovery to end. If the former
-         * condition is met and the retry-count has not reached the maximum
-         * number yet, request XLOG streaming again.
-         */
-        else
-        {
-            if (NumWalRcvRetries < MAX_WALRCV_RETRIES && !foundTrigger)
-            {
-                /*
-                 * Since recovery target timeline has already been shared with
-                 * upcoming walreceiver, we pass 0 to RequestXLogStreaming()
-                 * as timeline (i.e., shared timeline variable is not updated).
-                 */
-                RequestXLogStreaming(0, recptr);
-                NumWalRcvRetries++;
-            }
-            else
-                return;
-        }
-
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        pg_usleep(100000L); /* 100ms */
-    }
-}
-
-/* Ensure that walreceiver has already exited */
-void
-ShutdownWalRcv(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-    pid_t    pid = walrcv->pid;
-
-    if (pid != 0)
-        kill(pid, SIGTERM);
-
-    while (WalRcvInProgress())
-    {
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        pg_usleep(100000);        /* 100ms */
-    }
-}
-
-/* Wait until a trigger file is found */
-void
-WaitForTrigger(void)
-{
-    int    seconds_before_warning = 15;
-    int    elapsed    = 0;
-    int count    = 0;
-    struct stat stat_buf;
-
-    /* Quick exit if a trigger file was not specified or was already found */
-    if (TriggerFile == NULL || foundTrigger)
-        return;
-
-    while (stat(TriggerFile, &stat_buf) != 0)
-    {
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        pg_usleep(100000L);        /* 100ms */
-
-        if (++count >= 10)        /* 1s passed */
-        {
-            count = 0;
-
-            if (++elapsed >= seconds_before_warning)
-            {
-                seconds_before_warning *= 2;     /* This wraps in >10 years... */
-                ereport(WARNING,
-                        (errmsg("still waiting for the trigger file \"%s\" (%d seconds elapsed)",
-                                TriggerFile, elapsed)));
-            }
-        }
-    }
-
-    ereport(LOG,
-            (errmsg("trigger file found: %s", TriggerFile)));
-    unlink(TriggerFile);
-}
-
-/*
- * Request postmaster to start the processes required for XLOG streaming.
- *
- * tli: recovery target timeline. If it's not 0, share it with upcoming
- * walreceiver.
- *
- * recptr: indicates the position where we failed in reading a record.
- */
-void
-RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    /*
-     * Calculate the start position of XLOG streaming. If we need to read
-     * a record in the middle of a segment which doesn't exist in pg_xlog,
-     * the start position has to be the head of the segment which that
-     * record belongs to. Which is necessary for preventing an immature
-     * segment (i.e., there is no record in the first half of a segment)
-     * from being created by XLOG streaming.
-     */
-    if (recptr.xrecoff % XLogSegSize != 0)
-    {
-        char        xlogpath[MAXPGPATH];
-        struct stat    stat_buf;
-        uint32        log;
-        uint32        seg;
-
-        XLByteToSeg(recptr, log, seg);
-        XLogFilePath(xlogpath, tli, log, seg);
-
-        if (stat(xlogpath, &stat_buf) != 0)
-            recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
-    }
-
-    LogstreamResult.Send    = recptr;
-    LogstreamResult.Write    = recptr;
-    LogstreamResult.Flush    = recptr;
-
-    SpinLockAcquire(&walrcv->mutex);
-    walrcv->LogstreamResult    = LogstreamResult;
-    if (tli != 0)
-        walrcv->RecoveryTargetTLI = tli;
-    walrcv->in_progress = true;        /* Mark that walreceiver is in progress */
-    SpinLockRelease(&walrcv->mutex);
-
-    SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-
-    /* Start archiver to archive xlog segments written by walreceiver */
-    if (XLogArchivingActive())
-        SendPostmasterSignal(PMSIGNAL_START_ARCHIVER);
-}
-
-/*
- * Returns the byte position that walreceiver has written
- */
-XLogRecPtr
-GetWalRcvWriteRecPtr(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-    XLogRecPtr    recptr;
-
-    SpinLockAcquire(&walrcv->mutex);
-    recptr = walrcv->LogstreamResult.Write;
-    SpinLockRelease(&walrcv->mutex);
-
-    return recptr;
-}
-
-/* Write the connection information to the file */
-void
-write_conninfo_file(char *conninfo)
-{
-    FILE   *fp;
-
-    fp = AllocateFile(CONNINFO_FILENAME, "w");
-    if (!fp)
-    {
-        ereport(FATAL,
-                (errcode_for_file_access(),
-                 errmsg("could not write to file \"%s\": %m",
-                        CONNINFO_FILENAME)));
-    }
-
-    /*
-     * The format is:
-     *
-     *     conninfo string, null terminated
-     *
-     * If a connection information was not supplied (e.g., recovery.conf did not
-     * specify primary_conninfo parameter), an empty string is written, which
-     * means that the default values that are available from the environment etc
-     * are used for connection of XLOG streaming.
-     *
-     * Add 'replication' as the database name to connect to, into the tail of
-     * conninfo. Since libpq prefers a posteriorly-located setting, the database
-     * name specified by an user is always ignored.
-     */
-    if (conninfo != NULL)
-        fprintf(fp, "%s", conninfo);
-    fputs(" dbname=replication", fp);
-    fputc(0, fp);
-
-    if (FreeFile(fp))
-    {
-        ereport(FATAL,
-                (errcode_for_file_access(),
-                 errmsg("could not write to file \"%s\": %m",
-                        CONNINFO_FILENAME)));
-    }
-}
-
-/* Return a malloc'd connection information read from the file */
-static char *
-read_conninfo_file(void)
-{
-    FILE           *fp;
-    StringInfoData    buf;
-    int                ch;
-    char           *conninfo;
-
-    initStringInfo(&buf);
-
-    fp = AllocateFile(CONNINFO_FILENAME, "r");
-    if (!fp)
-    {
-        ereport(FATAL,
-                (errcode_for_file_access(),
-                 errmsg("could not read from file \"%s\": %m",
-                        CONNINFO_FILENAME)));
-    }
-
-    /* Read a string to a null-termination or the end of the file */
-    for (;;)
-    {
-        ch = fgetc(fp);
-        if (ch == 0 || ch == EOF)
-            break;
-
-        appendStringInfoChar(&buf, (char) ch);
-    }
-
-    FreeFile(fp);
-
-    conninfo = pstrdup(buf.data);
-    pfree(buf.data);
-
-    return conninfo;
-}
diff --git a/src/backend/postmaster/walsender.c b/src/backend/postmaster/walsender.c
index 28566de..2c46511 100644
--- a/src/backend/postmaster/walsender.c
+++ b/src/backend/postmaster/walsender.c
@@ -50,8 +50,13 @@
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/ps_status.h"
 #include "utils/resowner.h"

+/* Private, possibly out-of-date copy of shared LogstreamResult */
+extern XLogstreamResult LogstreamResult;
+
+
 WalSndCtlData *WalSndCtl = NULL;
 static WalSnd *MyWalSnd = NULL;

@@ -481,8 +486,9 @@ XLogSend(PendingMessage inMsg, PendingMessage outMsg)
     XLogRecPtr    SendRqstPtr;

     /*
-     * Invalid position means that XLOG streaming is not started yet,
-     * so we do nothing here.
+     * Invalid position means that we have not yet received the initial
+     * XLogRecPtr message from the slave that indicates where to start the
+     * streaming.
      */
     if (XLogRecPtrIsInvalid(LogstreamResult.Send))
         return true;
@@ -491,7 +497,7 @@ XLogSend(PendingMessage inMsg, PendingMessage outMsg)
     SendRqstPtr = GetWriteRecPtr();

 #ifdef REPLICATION_DEBUG
-    if (REPLICATION_DEBUG_ENABLED)
+    if (REPLICATION_DEBUG_ENABLED && XLByteLT(LogstreamResult.Send, SendRqstPtr))
         elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X",
              SendRqstPtr.xlogid, SendRqstPtr.xrecoff,
              LogstreamResult.Send.xlogid, LogstreamResult.Send.xrecoff,
@@ -911,3 +917,16 @@ UpdateOldestLogstreamResult(void)
     LogstreamResult = oldest;
     return found;
 }
+
+
+/* Report XLOG streaming progress in PS display */
+void
+ReportLogstreamResult(void)
+{
+    char    activitymsg[50];
+
+    snprintf(activitymsg, sizeof(activitymsg),
+             "streaming %X/%X",
+             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+    set_ps_display(activitymsg, false);
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index b5f7260..ff3e659 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,7 +25,6 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
@@ -119,7 +118,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
         size = add_size(size, BgWriterShmemSize());
         size = add_size(size, AutoVacuumShmemSize());
         size = add_size(size, WalSndShmemSize());
-        size = add_size(size, WalRcvShmemSize());
         size = add_size(size, BTreeShmemSize());
         size = add_size(size, SyncScanShmemSize());
 #ifdef EXEC_BACKEND
@@ -218,7 +216,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
     BgWriterShmemInit();
     AutoVacuumShmemInit();
     WalSndShmemInit();
-    WalRcvShmemInit();

     /*
      * Set up other modules that need some shared memory space
diff --git a/src/bin/walreceiver/Makefile b/src/bin/walreceiver/Makefile
new file mode 100644
index 0000000..28932fb
--- /dev/null
+++ b/src/bin/walreceiver/Makefile
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/walreceiver
+#
+# Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# $PostgreSQL$
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "PostgreSQL WAL receiver utility"
+subdir = src/bin/walreceiver
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = walreceiver.o $(WIN32RES)
+
+all: submake-libpq walreceiver
+
+%: %.o $(WIN32RES)
+    $(CC) $(CFLAGS) $^ $(libpq_pgport) $(LDFLAGS) $(LIBS) -o $@$(X)
+
+walreceiver: $(OBJS)
+
+install: all installdirs
+    $(INSTALL_PROGRAM) walreceiver$(X)   '$(DESTDIR)$(bindir)'/walreceiver$(X)
+
+installdirs:
+    $(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+    rm -f $(addprefix '$(DESTDIR)$(bindir)'/, $(addsuffix $(X), $(PROGRAMS)))
+
+clean distclean maintainer-clean:
+    rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS))
diff --git a/src/bin/walreceiver/walreceiver.c b/src/bin/walreceiver/walreceiver.c
new file mode 100644
index 0000000..01e6f07
--- /dev/null
+++ b/src/bin/walreceiver/walreceiver.c
@@ -0,0 +1,505 @@
+/*-------------------------------------------------------------------------
+ *
+ * walreceiver.c
+ *
+ * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It
+ * takes charge of XLOG streaming receiver in the standby server. It is
+ * launched by the startup process, and connects to the primary server,
+ * It attempts to keep receiving XLOG records from the primary
+ * server and writing them to the disk, as long as the connection is alive
+ * Also, it notifies the startup
+ * process of the location of XLOG records available. This enables
+ * the startup process to read XLOG records from XLOG stream and apply them
+ * to make a replica of the primary database.
+ *
+ * Normal termination is by SIGTERM or an end-of-streaming message from the
+ * primary server, which instructs the walreceiver to exit(0). Emergency
+ * termination is by SIGQUIT; like backends, walreceiver will simply
+ * abort and exit on SIGQUIT. A close of the connection and a FATAL error
+ * are treated as not a crash but approximately normal termination.
+ *
+ * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *      $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/xlog_internal.h"
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/pmsignal.h"
+#include "storage/shmem.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/* libpq connection to the primary server. */
+static PGconn *streamConn = NULL;
+
+/*
+ * These variables are used similarly to openLogFile/Id/Seg/Off in xlog.c
+ */
+static int    recvFile = -1;
+static uint32 recvId = 0;
+static uint32 recvSeg = 0;
+static uint32 recvOff = 0;
+
+/*
+ * ZeroedRecPtr indicates the byte position that we have already zeroed. It is
+ * updated when walreceiver writes a half-filled page that needs to be zeroed.
+ * ZeroedBuffer points a zeroed buffer used for zeroing.
+ */
+static XLogRecPtr    ZeroedRecPtr = {0, 0};
+static char           *ZeroedBuffer;
+
+/* Signal handlers */
+static void WalRcvQuickDieHandler(SIGNAL_ARGS);
+
+/* Prototypes for private functions */
+static void WalRcvLoop(void);
+static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
+static void XLogWalRcvFlush(XLogRecPtr recptr);
+static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
+static int OpenPhysicalXLog(uint32 log, uint32 seg);
+
+
+static XLogRecPtr writtenPtr;
+static XLogRecPtr flushedPtr;
+
+TimeLineID ThisTimeLineID;
+
+static void
+usage(const char *progname)
+{
+    printf(_("%s is an internal utility to receive WAL from another PostgreSQL instance.\n\n"), progname);
+    printf(_("Usage:\n  %s <target TLI> <starting XLOG location> <connection string>\n"), progname);
+    printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+
+/* Main entry point for walreceiver process */
+int
+main(int argc, char *argv[])
+{
+    char   *conninfo;
+    TimeLineID RecoveryTargetTLI;
+    char   *s;
+
+    if (argc > 3)
+    {
+        if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+        {
+            usage(argv[0]);
+            exit(0);
+        }
+        if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
+        {
+            puts("walreceiver (PostgreSQL) " PG_VERSION);
+            exit(0);
+        }
+    }
+    else
+    {
+        usage(argv[0]);
+        exit(1);
+    }
+
+    /* Properly accept or ignore signals the postmaster might send us */
+    pqsignal(SIGHUP, SIG_IGN);
+    pqsignal(SIGINT, SIG_IGN);
+    pqsignal(SIGTERM, SIG_DFL);
+    pqsignal(SIGQUIT, WalRcvQuickDieHandler);    /* hard crash time */
+    pqsignal(SIGALRM, SIG_IGN);
+    pqsignal(SIGPIPE, SIG_IGN);
+    pqsignal(SIGUSR1, SIG_IGN);
+    pqsignal(SIGUSR2, SIG_IGN);
+
+    /* Reset some signals that are accepted by postmaster but not here */
+    pqsignal(SIGCHLD, SIG_DFL);
+    pqsignal(SIGTTIN, SIG_DFL);
+    pqsignal(SIGTTOU, SIG_DFL);
+    pqsignal(SIGCONT, SIG_DFL);
+    pqsignal(SIGWINCH, SIG_DFL);
+
+    /* We allow SIGQUIT (quickdie) at all times */
+#ifdef BROKEN
+#ifdef HAVE_SIGPROCMASK
+    sigdelset(&BlockSig, SIGQUIT);
+#else
+    BlockSig &= ~(sigmask(SIGQUIT));
+#endif
+
+    /* Unblock signals (they were blocked when the postmaster forked us) */
+    PG_SETMASK(&UnBlockSig);
+#endif
+
+    /* Get the starting XLOG location from command line */
+    RecoveryTargetTLI = strtoul(argv[1], &s, 10);
+    if (*s != '\0')
+    {
+        fprintf(stderr, "invalid TLI: %s\n", argv[1]);
+        exit(1);
+    }
+    if (sscanf(argv[2], "%X/%X", &writtenPtr.xlogid, &writtenPtr.xrecoff) != 2)
+    {
+        fprintf(stderr, "invalid recptr: %s\n", argv[2]);
+        exit(1);
+    }
+
+    /* Read the connection information used to connect with the primary */
+    conninfo = malloc(strlen(argv[3]) + strlen(" dbname=replication") + 1);
+    sprintf(conninfo, "%s dbname=replication", argv[3]);
+
+    /* Set up a connection for XLOG streaming */
+    streamConn = PQstartXLogStreaming(conninfo,
+                                      writtenPtr.xlogid,
+                                      writtenPtr.xrecoff);
+    if (PQstatus(streamConn) != CONNECTION_OK)
+    {
+        fprintf(stderr, "could not connect to the primary server: %s\n",
+                PQerrorMessage(streamConn));
+        exit(1);
+    }
+
+    /*
+     * Confirm that the current timeline of the primary is the same
+     * as the recovery target timeline.
+     */
+    ThisTimeLineID = PQtimeline(streamConn);
+    if (ThisTimeLineID != RecoveryTargetTLI)
+    {
+        fprintf(stderr, "timeline %u of the primary does not match recovery target timeline %u",
+                ThisTimeLineID, RecoveryTargetTLI);
+        exit(1);
+    }
+
+    ZeroedBuffer = (char *) malloc(XLOG_BLCKSZ);
+    memset(ZeroedBuffer, 0, XLOG_BLCKSZ);
+
+    /* Main loop of walreceiver */
+    WalRcvLoop();
+
+    return 0;
+}
+
+/* Main loop of walreceiver process */
+static void
+WalRcvLoop(void)
+{
+    char       *buf;
+    bool        finishing_seg;
+    bool        fsync_requested;
+    int            len;
+    XLogRecPtr    recptr;
+
+    /* Loop until end-of-streaming or error */
+    for (;;)
+    {
+        bool    fsynced = false;
+
+#ifdef NOT_USED
+        /*
+         * Emergency bailout if postmaster has died.  This is to avoid the
+         * necessity for manual cleanup of all postmaster children.
+         */
+        if (!PostmasterIsAlive(true))
+            exit(1);
+
+        /*
+         * Exit walreceiver if we're not in recovery. This should not happen,
+         * but cross-check the status here.
+         */
+        if (!RecoveryInProgress())
+        {
+            fprintf(stderr, "cannot continue XLOG streaming, recovery has already ended\n");
+            exit(1);
+        }
+#endif
+
+        /* Receive XLogData message (wait for new message to arrive) */
+        len = PQgetXLogData(streamConn, &buf,
+                            (int *) &recptr.xlogid, (int *) &recptr.xrecoff,
+                            (char *) &finishing_seg, (char *) &fsync_requested, 0);
+
+        if (len < 0)    /* end-of-streaming or error */
+            break;
+
+        if (buf == NULL)    /* should not happen */
+            continue;
+
+#ifdef REPLICATION_DEBUG
+        fprintf(stderr, "xlog recv result %X/%X:%s%s; write %X/%X; flush %X/%X\n",
+                recptr.xlogid, recptr.xrecoff,
+                finishing_seg ? " finishing_seg" : "",
+                fsync_requested ? " fsync_requested" : "",
+                writtenPtr.xlogid, writtenPtr.xrecoff,
+                flushedPtr.xlogid, flushedPtr.xrecoff);
+#endif
+
+        /*
+         * A level of synchronization between both servers depends on when
+         * the standby returns a "success" of XLOG streaming to the primary.
+         * For example, the following timings can be considered:
+         *
+         *     A "success" is returned after
+         *         #1 receiving the logs and locating them on a memory
+         *         #2 writing them to the disk
+         *         #3 fsyncing them to the disk
+         *         #4 replaying them
+         *         ...etc
+         *
+         * We can choose only #2 now.
+         *
+         * Note: In #1 and #2, the logs might disappear if the standby fails
+         * before writing them to certainly the disk sector. But, since such
+         * missing logs are guaranteed to exist in the primary side,
+         * the transaction is not lost in the whole system (i.e., the standby
+         * can recover all transactions from the primary).
+         */
+
+        XLogWalRcvWrite(buf, len, recptr, &fsynced);
+
+        /*
+         * The logs in the XLogData message were written successfully,
+         * so we mark the message already consumed.
+         */
+        PQmarkConsumed(streamConn);
+
+        /*
+         * If we just wrote the whole last page of a logfile segment but
+         * had not fsynced it yet, fsync the segment immediately.  This
+         * avoids having to go back and re-open prior segments when an
+         * fsync request comes along later.
+         *
+         * Of course, if asked to fsync but not, do so.
+         */
+        if (!fsynced && (fsync_requested || finishing_seg))
+        {
+            XLogWalRcvFlush(recptr);
+
+            /*
+             * XXX: Should we signal bgwriter to start a restartpoint
+             * if we've consumed too much xlog since the last one, like
+             * in normal processing? But this is not worth doing unless
+             * a restartpoint can be created independently from a
+             * checkpoint record.
+             *
+             * Heikki:
+             * No. The startup process is responsible for that when it
+             * replays the WAL. We're just storing the WAL to disk, the
+             * checkpoint won't do anything before it's been replayed as well.
+             */
+        }
+        /*
+         * If fsync is not requested or was already done, we send a "success"
+         * to the primary before issuing fsync for end-of-segment.
+         */
+        if (finishing_seg || (fsynced && fsync_requested))
+        {
+            if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
+                                (int) fsynced) == -1)
+            {
+                fprintf(stderr, "could not send a message to the primary: %s\n",
+                        PQerrorMessage(streamConn));
+                exit(1);
+            }
+        }
+    }
+
+    if (len == -1)    /* end-of-streaming */
+    {
+        PGresult *res;
+
+        res = PQgetResult(streamConn);
+        if (PQresultStatus(res) == PGRES_COMMAND_OK)
+        {
+            PQclear(res);
+            exit(0);
+        }
+        PQclear(res);
+    }
+
+    /* error */
+    fprintf(stderr, "could not read xlog records: %s",
+            PQerrorMessage(streamConn));
+    exit(1);
+}
+
+/*
+ * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm,
+ * so we need to stop what we're doing and exit.
+ */
+static void
+WalRcvQuickDieHandler(SIGNAL_ARGS)
+{
+#ifdef BROKEN
+    PG_SETMASK(&BlockSig);
+#endif
+
+    exit(2);
+}
+
+/*
+ * Write the log to disk.
+ *
+ * fsynced is set to true if the log was fsyned by O_DIRECT.
+ */
+static void
+XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
+{
+    int        startoff;
+    int        endoff;
+
+    if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
+    {
+        /*
+         * XLOG segment files will be re-read in recovery operation soon,
+         * so we don't need to advise the OS to release any cache page.
+         */
+        if (recvFile >= 0 && close(recvFile))
+        {
+            fprintf(stderr, "could not close log file %u, segment %u: %m",
+                    recvId, recvSeg);
+            exit(3);
+        }
+        recvFile = -1;
+
+        /* Create/use new log file */
+        XLByteToPrevSeg(recptr, recvId, recvSeg);
+        recvFile = OpenPhysicalXLog(recvId, recvSeg);
+        recvOff = 0;
+    }
+
+    /* Make sure we have the current logfile open */
+    if (recvFile < 0)
+    {
+        XLByteToPrevSeg(recptr, recvId, recvSeg);
+        recvFile = OpenPhysicalXLog(recvId, recvSeg);
+        recvOff = 0;
+    }
+
+    /* Calculate the start/end file offset of the received logs */
+    endoff = recptr.xrecoff % XLogSegSize;
+    startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;
+
+    /*
+     * Re-zero the page so that bytes beyond what we've written will look
+     * like zeroes and not valid XLOG records. Only end of the page which we
+     * wrote to need to be zeroed. Of course, we can skip zeroing the pages
+     * full of the XLOG records. Save the end position of the already zeroed
+     * area at the variable ZeroedRecPtr, and avoid zeroing the same page
+     * two or more times.
+     *
+     * This must precede the writing of the actual logs. Otherwise, a crash
+     * before re-zeroing would cause a corrupted page.
+     */
+    if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
+    {
+        int        zlen;
+
+        zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
+        WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
+        ZeroedRecPtr = recptr;
+        ZeroedRecPtr.xrecoff += zlen;
+    }
+
+    /* Write out the logs */
+    WritePhysicalXLog(buf, len, startoff);
+    writtenPtr = recptr;
+
+    /* Let the startup process know how far we've advanced */
+    printf("%X/%X\n", writtenPtr.xlogid, writtenPtr.xrecoff);
+
+    /* Report XLOG streaming progress in PS display */
+    ReportLogstreamResult();
+}
+
+/* Flush the log to disk */
+static void
+XLogWalRcvFlush(XLogRecPtr recptr)
+{
+    fsync(recvFile);
+
+    flushedPtr = recptr;
+}
+
+/* Physical write to the given logs */
+static void
+WritePhysicalXLog(char *from, Size nbytes, int startoff)
+{
+    /* Need to seek in the file? */
+    if (recvOff != startoff)
+    {
+        if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
+        {
+            fprintf(stderr, "could not seek in log file %u, segment %u to offset %u: %s\n",
+                    recvId, recvSeg, startoff, strerror(errno));
+            exit(3);
+        }
+        recvOff = startoff;
+    }
+
+    /* OK to write the logs */
+    errno = 0;
+    if (write(recvFile, from, nbytes) != nbytes)
+    {
+        /* if write didn't set errno, assume no disk space */
+        if (errno == 0)
+            errno = ENOSPC;
+        fprintf(stderr, "could not write to log file %u, segment %u "
+                        "at offset %u, length %lu: %s",
+                recvId, recvSeg,
+                recvOff, (unsigned long) nbytes, strerror(errno));
+        exit(3);
+    }
+
+    /* Update state for write */
+    recvOff += nbytes;
+}
+
+static int
+OpenPhysicalXLog(uint32 log, uint32 seg)
+{
+    char        path[MAXPGPATH];
+    int            fd;
+
+    XLogFilePath(path, ThisTimeLineID, log, seg);
+
+    /*
+     * Try to use existent file (checkpoint maker may have created it already)
+     */
+    fd = open(path, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+    if (fd < 0)
+    {
+        fprintf(stderr, "could not open file \"%s\" (log file %u, segment %u): %s",
+                path, log, seg, strerror(errno));
+        exit(2);
+    }
+    return fd;
+}
+
+/* Report XLOG streaming progress in PS display */
+void
+ReportLogstreamResult(void)
+{
+#ifdef BROKEN
+    char    activitymsg[50];
+
+    snprintf(activitymsg, sizeof(activitymsg),
+             "streaming %X/%X",
+             writtenPtr.xlogid, writtenPtr.xrecoff);
+    set_ps_display(activitymsg, false);
+#endif
+}
+
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0e32f04..8ae62fe 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -191,22 +191,6 @@ typedef struct CheckpointStatsData

 extern CheckpointStatsData CheckpointStats;

-/*
- * LogstreamResult indicates the byte positions that we have already
- * sent/written/fsynced. This is used for management of XLOG streaming.
- */
-typedef struct
-{
-    XLogRecPtr    Send;    /* last byte + 1 sent to the standby */
-    XLogRecPtr    Write;    /* last byte + 1 written out in the standby */
-    XLogRecPtr    Flush;    /* last byte + 1 flushed in the standby */
-} XLogstreamResult;
-
-extern XLogstreamResult LogstreamResult;
-
-extern char *TriggerFile;
-
-
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern void XLogBackgroundFlush(void);
diff --git a/src/include/postmaster/walreceiver.h b/src/include/postmaster/walreceiver.h
deleted file mode 100644
index 8e34172..0000000
--- a/src/include/postmaster/walreceiver.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * walreceiver.h
- *      Exports from postmaster/walreceiver.c.
- *
- * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group
- *
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#ifndef _WALRECEIVER_H
-#define _WALRECEIVER_H
-
-#include "storage/spin.h"
-
-/* Shared memory area for management of walreceiver process */
-typedef struct
-{
-    pid_t    pid;            /* walreceiver's process id, or 0 */
-
-    /*
-     * in_progress indicates whether walreceiver is in progress
-     * (or just starting up). This flag is set to TRUE when
-     * startup process requests walreceiver to start XLOG streaming,
-     * and FALSE when walreceiver exits.
-     */
-    bool    in_progress;
-
-    /*
-     * LogstreamResult indicates the byte positions that have been
-     * already streamed. This is shared by walreceiver and startup
-     * process, and used to advance XLOG streaming and recovery
-     * cooperatively.
-     */
-    XLogstreamResult LogstreamResult;
-
-    /*
-     * recovery target timeline; must be the same as the current
-     * timeline of the primary.
-     */
-    TimeLineID    RecoveryTargetTLI;
-
-    slock_t    mutex;        /* locks shared variables shown above */
-} WalRcvData;
-
-extern void    WalReceiverMain(void);
-extern Size WalRcvShmemSize(void);
-extern void WalRcvShmemInit(void);
-extern bool WalRcvInProgress(void);
-extern void WaitNextXLogAvailable(XLogRecPtr recptr);
-extern void ShutdownWalRcv(void);
-extern void WaitForTrigger(void);
-extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr);
-extern XLogRecPtr GetWalRcvWriteRecPtr(void);
-extern void write_conninfo_file(char *conninfo);
-
-#endif   /* _WALRECEIVER_H */
diff --git a/src/include/postmaster/walsender.h b/src/include/postmaster/walsender.h
index e547cb3..bd669e1 100644
--- a/src/include/postmaster/walsender.h
+++ b/src/include/postmaster/walsender.h
@@ -16,6 +16,17 @@
 #include "storage/spin.h"

 /*
+ * LogstreamResult indicates the byte positions that we have already
+ * sent/written/fsynced. This is used for management of XLOG streaming.
+ */
+typedef struct
+{
+    XLogRecPtr    Send;    /* last byte + 1 sent to the standby */
+    XLogRecPtr    Write;    /* last byte + 1 written out in the standby */
+    XLogRecPtr    Flush;    /* last byte + 1 flushed in the standby */
+} XLogstreamResult;
+
+/*
  * Each walsender has a WalSnd struct in shared memory.
  *
  * links: list link for any list the WalSnd struct is in. A recycled WalSnd

pgsql-hackers by date:

Previous
From: Pavel Stehule
Date:
Subject: Re: patch: Review handling of MOVE and FETCH (ToDo)
Next
From: Fujii Masao
Date:
Subject: Re: Streaming Replication patch for CommitFest 2009-09