Thread: Streaming replication, retrying from archive

Streaming replication, retrying from archive

From
Heikki Linnakangas
Date:
Imagine this scenario:

1. Master is up and running, standby is connected and streaming happily
2. Network goes down, connection is broken.
3. Standby falls behind a lot. Old WAL files that the standby needs are
archived, and deleted from master.
4. Network is restored. Standby reconnects
5. Standby will get an error because the WAL file it needs is not in the
master anymore.

What will currently happen is:

6, Standby retries connecting and failing indefinitely, until the admin
restarts it.

What we would *like* to happen is:

6. Standby fetches the missing WAL files from archive, then reconnects
and continues streaming.

Can we fix that?

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming replication, retrying from archive

From
Robert Haas
Date:
On Thu, Jan 14, 2010 at 9:15 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Imagine this scenario:
>
> 1. Master is up and running, standby is connected and streaming happily
> 2. Network goes down, connection is broken.
> 3. Standby falls behind a lot. Old WAL files that the standby needs are
> archived, and deleted from master.
> 4. Network is restored. Standby reconnects
> 5. Standby will get an error because the WAL file it needs is not in the
> master anymore.
>
> What will currently happen is:
>
> 6, Standby retries connecting and failing indefinitely, until the admin
> restarts it.
>
> What we would *like* to happen is:
>
> 6. Standby fetches the missing WAL files from archive, then reconnects
> and continues streaming.
>
> Can we fix that?

Just MHO here, but this seems like a bigger project than we should be
starting at this stage of the game.

...Robert


Re: Streaming replication, retrying from archive

From
Magnus Hagander
Date:
On Thu, Jan 14, 2010 at 15:36, Robert Haas <robertmhaas@gmail.com> wrote:
> On Thu, Jan 14, 2010 at 9:15 AM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>> Imagine this scenario:
>>
>> 1. Master is up and running, standby is connected and streaming happily
>> 2. Network goes down, connection is broken.
>> 3. Standby falls behind a lot. Old WAL files that the standby needs are
>> archived, and deleted from master.
>> 4. Network is restored. Standby reconnects
>> 5. Standby will get an error because the WAL file it needs is not in the
>> master anymore.
>>
>> What will currently happen is:
>>
>> 6, Standby retries connecting and failing indefinitely, until the admin
>> restarts it.
>>
>> What we would *like* to happen is:
>>
>> 6. Standby fetches the missing WAL files from archive, then reconnects
>> and continues streaming.
>>
>> Can we fix that?
>
> Just MHO here, but this seems like a bigger project than we should be
> starting at this stage of the game.

+1.

We want this eventually (heck, it'd be awesome!), but let's get what
we have now stable first.

-- Magnus HaganderMe: http://www.hagander.net/Work: http://www.redpill-linpro.com/


Re: Streaming replication, retrying from archive

From
Heikki Linnakangas
Date:
Magnus Hagander wrote:
> On Thu, Jan 14, 2010 at 15:36, Robert Haas <robertmhaas@gmail.com> wrote:
>> On Thu, Jan 14, 2010 at 9:15 AM, Heikki Linnakangas
>> <heikki.linnakangas@enterprisedb.com> wrote:
>>> Imagine this scenario:
>>>
>>> 1. Master is up and running, standby is connected and streaming happily
>>> 2. Network goes down, connection is broken.
>>> 3. Standby falls behind a lot. Old WAL files that the standby needs are
>>> archived, and deleted from master.
>>> 4. Network is restored. Standby reconnects
>>> 5. Standby will get an error because the WAL file it needs is not in the
>>> master anymore.
>>>
>>> What will currently happen is:
>>>
>>> 6, Standby retries connecting and failing indefinitely, until the admin
>>> restarts it.
>>>
>>> What we would *like* to happen is:
>>>
>>> 6. Standby fetches the missing WAL files from archive, then reconnects
>>> and continues streaming.
>>>
>>> Can we fix that?
>> Just MHO here, but this seems like a bigger project than we should be
>> starting at this stage of the game.
> 
> +1.
> 
> We want this eventually (heck, it'd be awesome!), but let's get what
> we have now stable first.

If we don't fix that within the server, we will need to document that
caveat and every installation will need to work around that one way or
another. Maybe with some monitoring software and an automatic restart. Ugh.

I wasn't really asking if it's possible to fix, I meant "Let's think
about *how* to fix that".

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming replication, retrying from archive

From
Dimitri Fontaine
Date:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> If we don't fix that within the server, we will need to document that
> caveat and every installation will need to work around that one way or
> another. Maybe with some monitoring software and an automatic restart. Ugh.
>
> I wasn't really asking if it's possible to fix, I meant "Let's think
> about *how* to fix that".

Did I mention my viewpoint on that already? http://archives.postgresql.org/pgsql-hackers/2009-07/msg00943.php

It could well be I'm talking about things that have no relation at all
to what is in the patch currently, and that make no sense for where we
want the patch to go. But I'd like to know about that so that I'm not
banging my head on the nearest wall each time the topic surfaces.

Regards,
-- 
dim


Re: Streaming replication, retrying from archive

From
Fujii Masao
Date:
On Fri, Jan 15, 2010 at 12:23 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> If we don't fix that within the server, we will need to document that
> caveat and every installation will need to work around that one way or
> another. Maybe with some monitoring software and an automatic restart. Ugh.
>
> I wasn't really asking if it's possible to fix, I meant "Let's think
> about *how* to fix that".

OK. How about the following (though it's a rough design)?

(1)    If walsender cannot read the WAL file because of ENOENT, it sends the        special message indicating that
errorto walreceiver. This message is        shipped on the COPY protocol.
 

(2-a) If the message arrives, walreceiver exits by using proc_exit().
(3-a) If the startup process detects the exit of walreceiver in
WaitNextXLogAvailable(),         it switches back to a normal archive recovery mode, closes
the currently opened         WAL file, resets some variables (readId, readSeg, etc), and
calls FetchRecord()         again. Then it tries to restore the WAL file from the
archive if the restore_command         is supplied, and switches to a streaming recovery mode again
if invalid WAL is         found.

Or

(2-b) If the message arrives, walreceiver executes restore_command,
and then sets         the receivedUpto to the end location of the restored WAL
file. The restored file is         expected to be filled because it doesn't exist in the
primary's pg_xlog. So that         update of the receivedUpto is OK.
(3-b) After one WAL file is restored, walreceiver tries to connect to
the primary, and         starts replication again. If the ENOENT error occurs again,
we go back to the (1).

I like the latter approach since it's simpler. Thought?

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming replication, retrying from archive

From
Fujii Masao
Date:
On Fri, Jan 15, 2010 at 1:06 AM, Dimitri Fontaine
<dfontaine@hi-media.com> wrote:
> Did I mention my viewpoint on that already?
>  http://archives.postgresql.org/pgsql-hackers/2009-07/msg00943.php

>  0. base: slave asks the master for a base-backup, at the end of this it
>     reaches the base-lsn

What if the WAL file including the archive recovery starting location has
been removed from the primary's pg_xlog before the end of online-backup
(i.e., the procedure 0)? Where should the standby get such a WAL file from?
How?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming replication, retrying from archive

From
Dimitri Fontaine
Date:
Fujii Masao <masao.fujii@gmail.com> writes:
> On Fri, Jan 15, 2010 at 1:06 AM, Dimitri Fontaine <dfontaine@hi-media.com> wrote:
>>  0. base: slave asks the master for a base-backup, at the end of this it
>>     reaches the base-lsn
>
> What if the WAL file including the archive recovery starting location has
> been removed from the primary's pg_xlog before the end of online-backup
> (i.e., the procedure 0)? Where should the standby get such a WAL file from?
> How?

I guess it would be perfectly sensible for 8.5, given the timeframe, to
not implement this as part of SR, but tell our users they need to make a
base backup themselves.

If after that the first WAL we need from the master ain't available, 8.5
SR should maybe only issue an ERROR with a HINT explaining how to ensure
not running in the problem when trying again.

But how we handle failures when transitioning from one state to the
other should be a lot easier to discuss and decide as soon as we have
the possible states and the transitions we want to allow and support. I
think.

My guess is that those states and transitions are in the code, but not
explicit, so that each time we talk about how to handle the error cases
we have to be extra verbose and we risk not talking about exactly the
same thing. Naming the states should make those arrangements easier, I
should think. Not sure if it would help follow the time constraint now
though.

Regards,
-- 
dim


Re: Streaming replication, retrying from archive

From
Robert Haas
Date:
On Thu, Jan 14, 2010 at 10:23 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> I wasn't really asking if it's possible to fix, I meant "Let's think
> about *how* to fix that".

Well...  maybe if it doesn't require too MUCH thought.

I'm thinking that HS+SR are going to be a bit like the Windows port -
they're going to require a few releases before they really work as
well as we'd like them too.

...Robert


Re: Streaming replication, retrying from archive

From
Tom Lane
Date:
Robert Haas <robertmhaas@gmail.com> writes:
> I'm thinking that HS+SR are going to be a bit like the Windows port -
> they're going to require a few releases before they really work as
> well as we'd like them too.

I've assumed that from the get-go ;-).  It's one of the reasons that
we ought to label this release 9.0 if those features get in.  Such a
number would help clue folks that there might be some less than
entirely stable things about it.
        regards, tom lane


Re: Streaming replication, retrying from archive

From
Heikki Linnakangas
Date:
Fujii Masao wrote:
> On Fri, Jan 15, 2010 at 12:23 AM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>> If we don't fix that within the server, we will need to document that
>> caveat and every installation will need to work around that one way or
>> another. Maybe with some monitoring software and an automatic restart. Ugh.
>>
>> I wasn't really asking if it's possible to fix, I meant "Let's think
>> about *how* to fix that".
> 
> OK. How about the following (though it's a rough design)?
> 
> (1)    If walsender cannot read the WAL file because of ENOENT, it sends the
>          special message indicating that error to walreceiver. This message is
>          shipped on the COPY protocol.
> 
> (2-a) If the message arrives, walreceiver exits by using proc_exit().
> (3-a) If the startup process detects the exit of walreceiver in
> WaitNextXLogAvailable(),
>           it switches back to a normal archive recovery mode, closes
> the currently opened
>           WAL file, resets some variables (readId, readSeg, etc), and
> calls FetchRecord()
>           again. Then it tries to restore the WAL file from the
> archive if the restore_command
>           is supplied, and switches to a streaming recovery mode again
> if invalid WAL is
>           found.
> 
> Or
> 
> (2-b) If the message arrives, walreceiver executes restore_command,
> and then sets
>           the receivedUpto to the end location of the restored WAL
> file. The restored file is
>           expected to be filled because it doesn't exist in the
> primary's pg_xlog. So that
>           update of the receivedUpto is OK.
> (3-b) After one WAL file is restored, walreceiver tries to connect to
> the primary, and
>           starts replication again. If the ENOENT error occurs again,
> we go back to the (1).
> 
> I like the latter approach since it's simpler. Thought?

Hmm. Executing restore_command in walreceiver process doesn't feel right
somehow. I'm thinking of:

Let's introduce a new boolean variable in shared memory that the
walreceiver can set to tell startup process if it's connected or
streaming, or disconnected. When startup process sees that walreceiver
is connected, it waits for receivedUpto to advance. Otherwise, it polls
the archive using restore_command.

To actually implement that requires some refactoring of the
ReadRecord/FetchRecord logic in xlog.c. However, it always felt a bit
hacky to me anyway, so that's not necessary a bad thing.

Now, one problem with this is that under the right conditions,
walreceiver might just succeed to reconnect, while the startup process
starts to restore the file from archive. That's OK, the streamed file
will be simply ignored, and the file restored from archive uses a
temporary filename that won't clash with the streamed file, but it feels
a bit strange to have the same file copied to the server via both
mechanisms.

See the "replication-xlogrefactor" branch in my git repository for a
prototype of that. We could also combine that with your 1st design, and
add the special message to indicate "WAL already deleted", and change
the walreceiver restart logic as you suggested. Some restructuring of
Read/FetchRecord is probably required for that anyway.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming replication, retrying from archive

From
Fujii Masao
Date:
On Fri, Jan 15, 2010 at 7:19 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Let's introduce a new boolean variable in shared memory that the
> walreceiver can set to tell startup process if it's connected or
> streaming, or disconnected. When startup process sees that walreceiver
> is connected, it waits for receivedUpto to advance. Otherwise, it polls
> the archive using restore_command.

Seems OK.

> See the "replication-xlogrefactor" branch in my git repository for a
> prototype of that. We could also combine that with your 1st design, and
> add the special message to indicate "WAL already deleted", and change
> the walreceiver restart logic as you suggested. Some restructuring of
> Read/FetchRecord is probably required for that anyway.

Though I haven't read your branch much yet, there seems to be a corner
case which a partially-filled WAL file might be restored wrongly, which
would cause a PANIC error. So the primary should tell the last WAL file
which has been filled completely. And when that file has been restored
in the standby, the startup process should stop restoring any more files,
and try to wait for streaming again.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming replication, retrying from archive

From
Heikki Linnakangas
Date:
Dimitri Fontaine wrote:
> But how we handle failures when transitioning from one state to the
> other should be a lot easier to discuss and decide as soon as we have
> the possible states and the transitions we want to allow and support. I
> think.
>
> My guess is that those states and transitions are in the code, but not
> explicit, so that each time we talk about how to handle the error cases
> we have to be extra verbose and we risk not talking about exactly the
> same thing. Naming the states should make those arrangements easier, I
> should think. Not sure if it would help follow the time constraint now
> though.

I agree, a state machine is a useful way of thinking about this. I
recall that mail of yours from last summer :-).

The states we have at the moment in standby are:

1. Archive recovery. Standby fetches WAL files from archive using
restore_command. When a file is not found in archive, we switch to state 2

2. Streaming replication. Standby connects (and reconnects if the
connection is lost for any reason) to the primary, starts streaming, and
applies WAL as it arrives. We stay in this state until trigger file is
found or server is shut down.

The states with my suggested ReadRecord/FetchRecord refactoring, the
code I have in the replication-xlogrefactor branch in my git repo, are:

1. Initial archive recovery. Standby fetches WAL files from archive
using restore_command. When a file is not found in archive, we start
walreceiver and switch to state 2

2. Retrying to restore from archive. When the connection to primary is
established and replication is started, we switch to state 3

3. Streaming replication. Connection to primary is established, and WAL
is applied as it arrives. When the connection is dropped, we go back to
state 2

Although the the state transitions between 2 and 3 are a bit fuzzy in
that version; walreceiver runs concurrently, trying to reconnect, while
startup process retries restoring from archive. Fujii-san's suggestion
to have walreceiver stop while startup process retries restoring from
archive (or have walreceiver run restore_command in approach #2) would
make that clearer.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming replication, retrying from archive

From
Simon Riggs
Date:
On Fri, 2010-01-15 at 20:11 +0200, Heikki Linnakangas wrote:

> The states we have at the moment in standby are:
> 
> 1. Archive recovery. Standby fetches WAL files from archive using
> restore_command. When a file is not found in archive, we switch to state 2
> 
> 2. Streaming replication. Standby connects (and reconnects if the
> connection is lost for any reason) to the primary, starts streaming, and
> applies WAL as it arrives. We stay in this state until trigger file is
> found or server is shut down.

> The states with my suggested ReadRecord/FetchRecord refactoring, the
> code I have in the replication-xlogrefactor branch in my git repo, are:
> 
> 1. Initial archive recovery. Standby fetches WAL files from archive
> using restore_command. When a file is not found in archive, we start
> walreceiver and switch to state 2
> 
> 2. Retrying to restore from archive. When the connection to primary is
> established and replication is started, we switch to state 3
> 
> 3. Streaming replication. Connection to primary is established, and WAL
> is applied as it arrives. When the connection is dropped, we go back to
> state 2
> 
> Although the the state transitions between 2 and 3 are a bit fuzzy in
> that version; walreceiver runs concurrently, trying to reconnect, while
> startup process retries restoring from archive. Fujii-san's suggestion
> to have walreceiver stop while startup process retries restoring from
> archive (or have walreceiver run restore_command in approach #2) would
> make that clearer.

The one-way state transitions between 1->2 in both cases seem to make
this a little more complex, rather than more simple. 

If the connection did drop then WAL will be in the archive, so the path
for data is archive->primary->standby. There already needs to be a
network path between archive and standby, so why not drop back from
state 3 -> 1 rather than from 3 -> 2? That way we could have just 2
states on each side, rather than 3.

-- Simon Riggs           www.2ndQuadrant.com



Re: Streaming replication, retrying from archive

From
Dimitri Fontaine
Date:
Thanks for stating it this way, it really helps figuring out what is it
we're talking about!

Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> The states with my suggested ReadRecord/FetchRecord refactoring, the
> code I have in the replication-xlogrefactor branch in my git repo,
> are:

They look like you're trying to solve a specific issue that is a
consequence of another one, without fixing the cause. I hope I'm wrong,
once more :)

> 1. Initial archive recovery. Standby fetches WAL files from archive
> using restore_command. When a file is not found in archive, we start
> walreceiver and switch to state 2
>
> 2. Retrying to restore from archive. When the connection to primary is
> established and replication is started, we switch to state 3

When do the master know about this new slave being there? I'd say not
until 3 is ok, and then, the actual details between 1 and 2 look
strange, partly because it's more about processes than states.

I'd propose to have 1 and 2 started in parallel from the beginning, and
as Simon proposes, being able to get back to 1. at any time:

0. start from a base backup, determine the first WAL / LSN we need to  start streaming, call it SR_LSN. That means
askingthe master its  current xlog location. The LSN we're at now, after replaying the base  backup and maybe the
initialrecovery from local WAL files, let's  call it BASE_LSN.
 

1. Get the missing WAL to get from BASE_LSN to SR_LSN from the archive,  with restore_command, apply them as we receive
them,and start  2. possibly in parallel
 

2. Streaming replication: we connect to the primary and walreceiver gets  the WALs from the connection. It either
storesthem if current  standby's position < SR_LSN or apply them directly if we were already  streaming.
 
  Local storage would be either standby's archiving or a specific  temporary location. I guess it's more or less what
youwant to do  with retrying from the master's archives, but I'm not sure your line  of though makes it simpler.
 

But that's more a process view, not a state view. As 1 and 2 run in
parallel, we're missing some state names. Let's name the states now that
we have the processes.

base: start from a base backup, which we don't know how we got it

catch-up: getting the WALs [from archive] to get from base to being able         to apply the streaming

wanna-sync: receiving primary's wal while not being able to replay them

do-sync: applying the wals we got in wanna-sync state

sync: replaying what's being sent as it arrives

So the current problem is what happens when we're not able to start
streaming from the primary, yet, or again. And your question is how will
it get simpler with all those details.

What I propose is to always have a walreceiver running and getting WALs
from the master. Depending on current state it's applying them (sync) or
keeping them for later (wanna-sync). We need some more code for it to
apply WALs it's been keeping for later (do-sync), that depends on how we
keep the WALs.

Your problem is getting out of catch-up up to sync, and which process is
doing what in between. I hope to make it clear to think about with my
proposal, and would go as far as to say that the startup process does
only care about getting the WALs from BASE_LSN to SR_LSN, that's called
catch-up.

Having another process to handle wanna-sync is neat, but can be
sequential too.

When you lose the connection, you get out of sync back to another state
depending on missing wals, so to know that you need to contact the
primary again.

The master only considers any standby's in sync if its walsender process
is up-to-date or lagging only the last emitted WAL. If lagging more,
that means the standby's is catching up, or replaying more than the
current WAL, so in wanna-sync or do-sync state. Not in sync.

The details about when a slave is in sync will get more important as
soon as we have synchronous streaming.

Regards,
-- 
dim


Re: Streaming replication, retrying from archive

From
Heikki Linnakangas
Date:
Dimitri Fontaine wrote:
> Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
>> 1. Initial archive recovery. Standby fetches WAL files from archive
>> using restore_command. When a file is not found in archive, we start
>> walreceiver and switch to state 2
>>
>> 2. Retrying to restore from archive. When the connection to primary is
>> established and replication is started, we switch to state 3
>
> When do the master know about this new slave being there? I'd say not
> until 3 is ok, and then, the actual details between 1 and 2 look
> strange, partly because it's more about processes than states.

Right. The master doesn't need to know about the slave.

> I'd propose to have 1 and 2 started in parallel from the beginning, and
> as Simon proposes, being able to get back to 1. at any time:
>
> 0. start from a base backup, determine the first WAL / LSN we need to
>    start streaming, call it SR_LSN. That means asking the master its
>    current xlog location.

What if the master can't be contacted?

> The LSN we're at now, after replaying the base
>    backup and maybe the initial recovery from local WAL files, let's
>    call it BASE_LSN.
>
> 1. Get the missing WAL to get from BASE_LSN to SR_LSN from the archive,
>    with restore_command, apply them as we receive them, and start
>    2. possibly in parallel
>
> 2. Streaming replication: we connect to the primary and walreceiver gets
>    the WALs from the connection. It either stores them if current
>    standby's position < SR_LSN or apply them directly if we were already
>    streaming.
>
>    Local storage would be either standby's archiving or a specific
>    temporary location. I guess it's more or less what you want to do
>    with retrying from the master's archives, but I'm not sure your line
>    of though makes it simpler.

Seems complicated...

> <snip>
> The details about when a slave is in sync will get more important as
> soon as we have synchronous streaming.

Yeah, a lot of that logic and states is completely unnecessary until we
have a synchronous mode. Even then, it seems complex.

Here's what I've been hacking:

First of all, walreceiver no longer tries to retry the connection on
error, and postmaster no longer tries to relaunch it if it dies. So when
Walreceiver is launched, it tries to connect once, and if successful,
streams until an error occurs or it's killed.

When startup process needs more WAL to continue replay, the logic is in
pseudocode:

while (<need more wal>)
{
  if(<walreceiver is alive>)
  {
     wait for WAL to arrive, or for walreceiver to die.
  }
  else
  {
     Run restore_command
     If (restore_command succeeded)
       break;
     else
     {
       Sleep 5 seconds
       Start walreceiver
     }
  }
}

So there's just two states:

1. Recovering from archive
2. Streaming

We start from 1, and switch state at error.

This gives nice behavior from a user point of view. Standby tries to
make progress using either the archive or streaming, whichever becomes
available first.

Attached is a WIP patch implementing that, also available in the
'replication-xlogrefactor' branch in my git repository. It includes the
Read/FetchRecord refactoring I mentioned earlier; that's a pre-requisite
for this.

The code implementing the above retry logic in XLogReadPage(), in xlog.c.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 690dbb6..6cb6bf0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -144,16 +144,6 @@ HotStandbyState        standbyState = STANDBY_DISABLED;
 static     XLogRecPtr    LastRec;

 /*
- * Are we doing recovery from XLOG stream? If so, we recover without using
- * offline XLOG archives even though InArchiveRecovery==true. This flag is
- * used only in standby mode.
- */
-static bool InStreamingRecovery = false;
-
-/* The current log page is partially-filled, and so needs to be read again? */
-static bool needReread = false;
-
-/*
  * Local copy of SharedRecoveryInProgress variable. True actually means "not
  * known, need to check the shared state".
  */
@@ -457,12 +447,16 @@ static uint32 openLogOff = 0;
  * These variables are used similarly to the ones above, but for reading
  * the XLOG.  Note, however, that readOff generally represents the offset
  * of the page just read, not the seek position of the FD itself, which
- * will be just past that page.
+ * will be just past that page. readLen indicates how much of the current
+ * page has been read into readBuf.
  */
 static int    readFile = -1;
 static uint32 readId = 0;
 static uint32 readSeg = 0;
 static uint32 readOff = 0;
+static uint32 readLen = 0;
+/* Is the currently open segment being streamed from primary? */
+static bool readStreamed = false;

 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
 static char *readBuf = NULL;
@@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0;
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;    /* start of last record read */
 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
-static XLogRecord *nextRecord = NULL;
 static TimeLineID lastPageTLI = 0;

 static XLogRecPtr minRecoveryPoint;        /* local copy of
@@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
                        bool find_free, int *max_advance,
                        bool use_lock);
-static int    XLogFileRead(uint32 log, uint32 seg, int emode);
+static int    XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+             bool fromArchive, bool notexistOk);
+static int    XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
+                   bool fromArchive);
+static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+             bool randAccess);
 static void XLogFileClose(void);
 static bool RestoreArchivedFile(char *path, const char *xlogfname,
                     const char *recovername, off_t expectedSize);
@@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
 static List *readTimeLineHistory(TimeLineID targetTLI);
@@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 static void WriteControlFile(void);
 static void ReadControlFile(void);
 static char *str_time(pg_time_t tnow);
+static bool CheckForStandbyTrigger(void);

 #ifdef WAL_DEBUG
 static void xlog_outrec(StringInfo buf, XLogRecord *record);
@@ -2586,36 +2584,22 @@ XLogFileOpen(uint32 log, uint32 seg)

 /*
  * Open a logfile segment for reading (during recovery).
+ *
+ * If fromArchive is true, the segment is retrieved from archive, otherwise
+ * it's read from pg_xlog.
  */
 static int
-XLogFileRead(uint32 log, uint32 seg, int emode)
+XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+             bool fromArchive, bool notfoundOk)
 {
-    char        path[MAXPGPATH];
     char        xlogfname[MAXFNAMELEN];
     char        activitymsg[MAXFNAMELEN + 16];
-    ListCell   *cell;
+    char        path[MAXPGPATH];
     int            fd;

-    /*
-     * Loop looking for a suitable timeline ID: we might need to read any of
-     * the timelines listed in expectedTLIs.
-     *
-     * We expect curFileTLI on entry to be the TLI of the preceding file in
-     * sequence, or 0 if there was no predecessor.    We do not allow curFileTLI
-     * to go backwards; this prevents us from picking up the wrong file when a
-     * parent timeline extends to higher segment numbers than the child we
-     * want to read.
-     */
-    foreach(cell, expectedTLIs)
-    {
-        TimeLineID    tli = (TimeLineID) lfirst_int(cell);
-
-        if (tli < curFileTLI)
-            break;                /* don't bother looking at too-old TLIs */
-
         XLogFileName(xlogfname, tli, log, seg);

-        if (InArchiveRecovery && !InStreamingRecovery)
+        if (fromArchive)
         {
             /* Report recovery progress in PS display */
             snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
@@ -2625,9 +2609,14 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
             restoredFromArchive = RestoreArchivedFile(path, xlogfname,
                                                       "RECOVERYXLOG",
                                                       XLogSegSize);
+            if (!restoredFromArchive)
+                return -1;
         }
         else
+        {
             XLogFilePath(path, tli, log, seg);
+            restoredFromArchive = false;
+        }

         fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
         if (fd >= 0)
@@ -2642,11 +2631,62 @@ XLogFileRead(uint32 log, uint32 seg, int emode)

             return fd;
         }
-        if (errno != ENOENT)    /* unexpected failure? */
+        if (errno != ENOENT || !notfoundOk)    /* unexpected failure? */
             ereport(PANIC,
                     (errcode_for_file_access(),
             errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
                    path, log, seg)));
+        return -1;
+}
+
+/*
+ * Open a logfile segment for reading (during recovery).
+ *
+ * This version searches for the segment with any TLI listed in expectedTLIs.
+ * Also, if not in StandbyMode and fromArchive is true, the segment is
+ * also searched in pg_xlog if not found in archive.
+ */
+static int
+XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
+{
+    char        path[MAXPGPATH];
+    ListCell   *cell;
+    int            fd;
+
+    /*
+     * Loop looking for a suitable timeline ID: we might need to read any of
+     * the timelines listed in expectedTLIs.
+     *
+     * We expect curFileTLI on entry to be the TLI of the preceding file in
+     * sequence, or 0 if there was no predecessor.    We do not allow curFileTLI
+     * to go backwards; this prevents us from picking up the wrong file when a
+     * parent timeline extends to higher segment numbers than the child we
+     * want to read.
+     */
+    foreach(cell, expectedTLIs)
+    {
+        TimeLineID    tli = (TimeLineID) lfirst_int(cell);
+
+        if (tli < curFileTLI)
+            break;                /* don't bother looking at too-old TLIs */
+
+        fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
+        if (fd != -1)
+            return fd;
+
+        /*
+         * If not in StandbyMode, fall back to searching pg_xlog. In
+         * StandbyMode we're streaming segments from the primary to pg_xlog,
+         * and we mustn't confuse the (possibly partial) segments in pg_xlog
+         * with complete segments ready to be applied. We rather wait for
+         * the records to arrive through streaming.
+         */
+        if (!StandbyMode && fromArchive)
+        {
+            fd = XLogFileRead(log, seg, emode, tli, false, true);
+            if (fd != -1)
+                return fd;
+        }
     }

     /* Couldn't find it.  For simplicity, complain about front timeline */
@@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
              * different filename that can't be confused with regular XLOG
              * files.
              */
-            if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
+            if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
             {
                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);

@@ -3474,92 +3514,19 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
 }

 /*
- * Attempt to fetch an XLOG record.
- *
- * If RecPtr is not NULL, try to fetch a record at that position.  Otherwise
- * try to fetch a record just after the last one previously read.
- *
- * In standby mode, if we failed in reading a valid record and are not doing
- * recovery from XLOG stream yet, we ignore the failure and start walreceiver
- * process to fetch the record from the primary. Otherwise, returns NULL,
- * or fails if emode is PANIC. (emode must be either PANIC or LOG.)
- *
- * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
- * this case, if we have to start XLOG streaming, we use RedoStartLSN as the
- * streaming start position instead of RecPtr.
- *
- * The record is copied into readRecordBuf, so that on successful return,
- * the returned record pointer always points there.
- */
-static XLogRecord *
-FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
-{
-    if (StandbyMode && !InStreamingRecovery)
-    {
-        XLogRecord *record;
-        XLogRecPtr    startlsn;
-        bool        haveNextRecord = (nextRecord != NULL);
-
-        /* An invalid record is OK here, so we set emode to DEBUG2 */
-        record = ReadRecord(RecPtr, DEBUG2);
-        if (record != NULL)
-            return record;
-
-        /*
-         * Start XLOG streaming if there is no more valid records available
-         * in the archive.
-         *
-         * We need to calculate the start position of XLOG streaming. If we
-         * read a record in the middle of a segment which doesn't exist in
-         * pg_xlog, we use the start of the segment as the start position.
-         * That prevents a broken segment (i.e., with no records in the
-         * first half of a segment) from being created by XLOG streaming,
-         * which might cause trouble later on if the segment is e.g
-         * archived.
-         */
-        startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
-        if (startlsn.xrecoff % XLogSegSize != 0)
-        {
-            char        xlogpath[MAXPGPATH];
-            struct stat    stat_buf;
-            uint32        log;
-            uint32        seg;
-
-            XLByteToSeg(startlsn, log, seg);
-            XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
-
-            if (stat(xlogpath, &stat_buf) != 0)
-                startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
-        }
-        RequestXLogStreaming(startlsn, PrimaryConnInfo);
-
-        /* Needs to read the current page again if the next record is in it */
-        needReread = haveNextRecord;
-        nextRecord = NULL;
-
-        InStreamingRecovery = true;
-        ereport(LOG,
-                (errmsg("starting streaming recovery at %X/%X",
-                        startlsn.xlogid, startlsn.xrecoff)));
-    }
-
-    return ReadRecord(RecPtr, emode);
-}
-
-/*
  * Attempt to read an XLOG record.
  *
  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
  * try to read a record just after the last one previously read.
  *
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG or DEBUG2.)
+ * (emode must be either PANIC, LOG)
  *
  * The record is copied into readRecordBuf, so that on successful return,
  * the returned record pointer always points there.
  */
 static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
+ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
 {
     XLogRecord *record;
     char       *buffer;
@@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
     bool        randAccess = false;
     uint32        len,
                 total_len;
-    uint32        targetPageOff;
     uint32        targetRecOff;
     uint32        pageHeaderSize;
-    XLogRecPtr    receivedUpto = {0,0};
-    bool        finished;
     int            emode;

     /*
@@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
      * should never hit the end of WAL because we wait for it to be streamed.
      * Therefore treat any broken WAL as PANIC, instead of failing over.
      */
-    if (InStreamingRecovery)
+    if (StandbyMode)
         emode = PANIC;
     else
         emode = emode_arg;
@@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
     if (RecPtr == NULL)
     {
         RecPtr = &tmpRecPtr;
-        /* fast case if next record is on same page */
-        if (nextRecord != NULL)
-        {
-            record = nextRecord;
-            goto got_record;
-        }

         /*
-         * Align old recptr to next page if the current page is filled and
-         * doesn't need to be read again.
+         * Align recptr to next page if no more records can fit on the
+         * current page.
          */
-        if (!needReread)
+        if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
+        {
             NextLogPage(tmpRecPtr);
-        /* We will account for page header size below */
+            /* We will account for page header size below */
+        }
     }
     else
     {
@@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
         randAccess = true;        /* allow curFileTLI to go backwards too */
     }

-    if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
-    {
-        close(readFile);
-        readFile = -1;
-    }
-
-    /* Is the target record ready yet? */
-    if (InStreamingRecovery)
-    {
-        receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
-        if (finished)
-        {
-            if (emode_arg == PANIC)
-                ereport(PANIC,
-                        (errmsg("streaming recovery ended")));
-            else
-                return NULL;
-        }
-    }
-
-    XLByteToSeg(*RecPtr, readId, readSeg);
-    if (readFile < 0)
-    {
-        /* Now it's okay to reset curFileTLI if random fetch */
-        if (randAccess)
-            curFileTLI = 0;
-
-        readFile = XLogFileRead(readId, readSeg, emode);
-        if (readFile < 0)
-            goto next_record_is_invalid;
-
-        /*
-         * Whenever switching to a new WAL segment, we read the first page of
-         * the file and validate its header, even if that's not where the
-         * target record is.  This is so that we can check the additional
-         * identification info that is present in the first page's "long"
-         * header.
-         */
-        readOff = 0;
-        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-        {
-            ereport(emode,
-                    (errcode_for_file_access(),
-                     errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                            readId, readSeg, readOff)));
-            goto next_record_is_invalid;
-        }
-        if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-            goto next_record_is_invalid;
-    }
+    /* Read the page containing the record */
+    if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
+        return NULL;

-    targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-    if (readOff != targetPageOff || needReread)
-    {
-        readOff = targetPageOff;
-        needReread = false;
-        if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
-        {
-            ereport(emode,
-                    (errcode_for_file_access(),
-                     errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                            readId, readSeg, readOff)));
-            goto next_record_is_invalid;
-        }
-        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-        {
-            ereport(emode,
-                    (errcode_for_file_access(),
-                     errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                            readId, readSeg, readOff)));
-            goto next_record_is_invalid;
-        }
-        if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-            goto next_record_is_invalid;
-    }
     pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
     targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
     if (targetRecOff == 0)
@@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
     }
     record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);

-got_record:;
-
     /*
      * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
      * required.
@@ -3838,58 +3725,35 @@ got_record:;
     }

     buffer = readRecordBuf;
-    nextRecord = NULL;
     len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
     if (total_len > len)
     {
         /* Need to reassemble record */
         XLogContRecord *contrecord;
-        XLogRecPtr    nextpagelsn = *RecPtr;
+        XLogRecPtr    pagelsn;
         uint32        gotlen = len;

+        /* Initialize pagelsn to the beginning of the page this record is on */
+        pagelsn = *RecPtr;
+        pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+
         memcpy(buffer, record, len);
         record = (XLogRecord *) buffer;
         buffer += len;
         for (;;)
         {
-            /* Is the next page ready yet? */
-            if (InStreamingRecovery)
+            /* Calculate pointer to beginning of next page */
+            pagelsn.xrecoff += XLOG_BLCKSZ;
+            if (pagelsn.xrecoff >= XLogFileSize)
             {
-                if (gotlen != len)
-                    nextpagelsn.xrecoff += XLOG_BLCKSZ;
-                NextLogPage(nextpagelsn);
-                receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
-                if (finished)
-                {
-                    if (emode_arg == PANIC)
-                        ereport(PANIC,
-                                (errmsg("streaming recovery ended")));
-                    else
-                        return NULL;
-                }
+                (pagelsn.xlogid)++;
+                pagelsn.xrecoff = 0;
             }
+            /* Wait for the next page to become available */
+            if (!XLogPageRead(&pagelsn, emode, false, false))
+                return NULL;

-            readOff += XLOG_BLCKSZ;
-            if (readOff >= XLogSegSize)
-            {
-                close(readFile);
-                readFile = -1;
-                NextLogSeg(readId, readSeg);
-                readFile = XLogFileRead(readId, readSeg, emode);
-                if (readFile < 0)
-                    goto next_record_is_invalid;
-                readOff = 0;
-            }
-            if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-            {
-                ereport(emode,
-                        (errcode_for_file_access(),
-                         errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                                readId, readSeg, readOff)));
-                goto next_record_is_invalid;
-            }
-            if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-                goto next_record_is_invalid;
+            /* Check that the continuation record looks valid */
             if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
             {
                 ereport(emode,
@@ -3923,31 +3787,11 @@ got_record:;
         if (!RecordIsValid(record, *RecPtr, emode))
             goto next_record_is_invalid;
         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-        if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
-            MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
-        {
-            nextRecord = (XLogRecord *) ((char *) contrecord +
-                    MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
-        }
         EndRecPtr.xlogid = readId;
         EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
             pageHeaderSize +
             MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);

-        /*
-         * Check whether the current page needs to be read again. If there is no
-         * unread record in the current page (nextRecord == NULL), obviously we
-         * don't need to reread it. If we're not in streaming recovery mode yet,
-         * partially-filled page doesn't need to be reread because it is the
-         * last valid page.
-         */
-        if (nextRecord != NULL && InStreamingRecovery &&
-            XLByteLE(receivedUpto, EndRecPtr))
-        {
-            nextRecord    = NULL;
-            needReread    = true;
-        }
-
         ReadRecPtr = *RecPtr;
         /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
         return record;
@@ -3956,26 +3800,9 @@ got_record:;
     /* Record does not cross a page boundary */
     if (!RecordIsValid(record, *RecPtr, emode))
         goto next_record_is_invalid;
-    if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
-        MAXALIGN(total_len))
-        nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
     EndRecPtr.xlogid = RecPtr->xlogid;
     EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);

-    /*
-     * Check whether the current page needs to be read again. If there is no
-     * unread record in the current page (nextRecord == NULL), obviously we
-     * don't need to reread it. If we're not in streaming recovery mode yet,
-     * partially-filled page doesn't need to be reread because it is the last
-     * valid page.
-     */
-    if (nextRecord != NULL && InStreamingRecovery &&
-        XLByteLE(receivedUpto, EndRecPtr))
-    {
-        nextRecord    = NULL;
-        needReread    = true;
-    }
-
     ReadRecPtr = *RecPtr;
     memcpy(buffer, record, total_len);

@@ -3987,8 +3814,6 @@ got_record:;
         /* Pretend it extends to end of segment */
         EndRecPtr.xrecoff += XLogSegSize - 1;
         EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
-        nextRecord = NULL;        /* definitely not on same page */
-        needReread = false;

         /*
          * Pretend that readBuf contains the last page of the segment. This is
@@ -4005,7 +3830,6 @@ next_record_is_invalid:;
         close(readFile);
         readFile = -1;
     }
-    nextRecord = NULL;
     return NULL;
 }

@@ -5722,7 +5546,7 @@ StartupXLOG(void)
                     (errmsg("checkpoint record is at %X/%X",
                             checkPointLoc.xlogid, checkPointLoc.xrecoff)));
         }
-        else if (InStreamingRecovery)
+        else if (StandbyMode)
         {
             /*
              * The last valid checkpoint record required for a streaming
@@ -5930,12 +5754,12 @@ StartupXLOG(void)
         if (XLByteLT(checkPoint.redo, RecPtr))
         {
             /* back up to find the record */
-            record = FetchRecord(&(checkPoint.redo), PANIC, false);
+            record = ReadRecord(&(checkPoint.redo), PANIC, false);
         }
         else
         {
             /* just have to read next record after CheckPoint */
-            record = FetchRecord(NULL, LOG, false);
+            record = ReadRecord(NULL, LOG, false);
         }

         if (record != NULL)
@@ -6088,7 +5912,7 @@ StartupXLOG(void)

                 LastRec = ReadRecPtr;

-                record = FetchRecord(NULL, LOG, false);
+                record = ReadRecord(NULL, LOG, false);
             } while (record != NULL && recoveryContinue);

             /*
@@ -6122,22 +5946,17 @@ StartupXLOG(void)

     /*
      * We are now done reading the xlog from stream. Turn off streaming
-     * recovery, and restart fetching the files (which would be required
-     * at end of recovery, e.g., timeline history file) from archive.
+     * recovery to force fetching the files (which would be required
+     * at end of recovery, e.g., timeline history file) from archive or
+     * pg_xlog.
      */
-    if (InStreamingRecovery)
-    {
-        /* We are no longer in streaming recovery state */
-        InStreamingRecovery = false;
-        ereport(LOG,
-                (errmsg("streaming recovery complete")));
-    }
+    StandbyMode = false;

     /*
      * Re-fetch the last valid or last applied record, so we can identify the
      * exact endpoint of what we consider the valid portion of WAL.
      */
-    record = ReadRecord(&LastRec, PANIC);
+    record = ReadRecord(&LastRec, PANIC, false);
     EndOfLog = EndRecPtr;
     XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);

@@ -6507,7 +6326,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
         return NULL;
     }

-    record = FetchRecord(&RecPtr, LOG, true);
+    record = ReadRecord(&RecPtr, LOG, true);

     if (record == NULL)
     {
@@ -7453,10 +7272,6 @@ CreateRestartPoint(int flags)
     }
     LWLockRelease(ControlFileLock);

-    /* Are we doing recovery from XLOG stream? */
-    if (!InStreamingRecovery)
-        InStreamingRecovery = WalRcvInProgress();
-
     /*
      * Delete old log files (those no longer needed even for previous
      * checkpoint/restartpoint) to prevent the disk holding the xlog from
@@ -7464,7 +7279,7 @@ CreateRestartPoint(int flags)
      * streaming recovery we have to or the disk will eventually fill up from
      * old log files streamed from master.
      */
-    if (InStreamingRecovery && (_logId || _logSeg))
+    if (WalRcvInProgress() && (_logId || _logSeg))
     {
         XLogRecPtr    endptr;

@@ -8739,6 +8554,13 @@ HandleStartupProcInterrupts(void)
      */
     if (shutdown_requested)
         proc_exit(1);
+
+    /*
+     * Emergency bailout if postmaster has died.  This is to avoid the
+     * necessity for manual cleanup of all postmaster children.
+     */
+    if (IsUnderPostmaster && !PostmasterIsAlive(true))
+        exit(1);
 }

 /* Main entry point for startup process */
@@ -8788,3 +8610,281 @@ StartupProcessMain(void)
      */
     proc_exit(0);
 }
+
+/*
+ * Read the XLOG page containing RecPtr into readBuf. Returns true
+ * if successful, false otherwise.
+ *
+ * This is responsible for restoring files from archive as needed, as well
+ * as for waiting for new WAL to arrive in standby mode.
+ */
+static bool
+XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+             bool randAccess)
+{
+    static XLogRecPtr receivedUpto = {0, 0};
+    bool switched_segment = false;
+    uint32 targetPageOff;
+    uint32 targetRecOff;
+    uint32 targetId;
+    uint32 targetSeg;
+
+    XLByteToSeg(*RecPtr, targetId, targetSeg);
+    targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+    targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
+
+    /* Fast exit if we have read the record in the current buffer already */
+    if (targetId == readId && targetSeg == readSeg &&
+        targetPageOff == readOff && targetRecOff < readLen)
+        return true;
+
+    /*
+     * See if we need to switch to a new segment because the requested record
+     * is not in the currently open one.
+     */
+    if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
+    {
+        close(readFile);
+        readFile = -1;
+    }
+
+    XLByteToSeg(*RecPtr, readId, readSeg);
+
+    /* See if we need to retrieve more data */
+    if (readFile < 0 ||
+        (readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
+    {
+        if (StandbyMode)
+        {
+            bool last_restore_failed = false;
+
+            /*
+             * In standby mode, wait for the requested record to become
+             * available, either via restore_command succeeding to restore
+             * the segment, or via walreceiver having streamed the record.
+             */
+            for (;;)
+            {
+                if (WalRcvInProgress())
+                {
+                    /*
+                     * While walreceiver is active, wait for new WAL to
+                     * arrive from primary.
+                     */
+                    receivedUpto = GetWalRcvWriteRecPtr();
+                    if (XLByteLT(*RecPtr, receivedUpto))
+                    {
+                        /*
+                         * Great, streamed far enough. Open the file if it's
+                         * not open already.
+                         */
+                        if (readFile < 0)
+                        {
+                            readFile =
+                                XLogFileRead(readId, readSeg, PANIC,
+                                             recoveryTargetTLI, false, false);
+                            switched_segment = true;
+                            readStreamed = true;
+                        }
+                        break;
+                    }
+
+                    if (CheckForStandbyTrigger())
+                        goto next_record_is_invalid;
+
+                    /*
+                     * When streaming is active, we want to react quickly when
+                     * the next WAL record arrives, so sleep only a bit.
+                     */
+                    pg_usleep(100000L); /* 100ms */
+                }
+                else
+                {
+                    /*
+                     * Until walreceiver manages to reconnect, poll the
+                     * archive.
+                     */
+                    if (readFile >= 0)
+                    {
+                        close(readFile);
+                        readFile = -1;
+                    }
+                    /* Reset curFileTLI if random fetch. */
+                    if (randAccess)
+                        curFileTLI = 0;
+                    readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
+                    switched_segment = true;
+                    readStreamed = false;
+                    if (readFile != -1)
+                    {
+                        elog(DEBUG1, "got WAL segment from archive");
+                        break;
+                    }
+
+                    /*
+                     * If we succeeded restoring some segments from archive
+                     * since the last connection attempt (or we haven't
+                     * tried streaming yet, retry immediately. But if we
+                     * haven't, assume the problem is persistent, so be
+                     * less aggressive.
+                     */
+                    if (last_restore_failed)
+                    {
+                        /*
+                         * Check to see if the trigger file exists. Note that
+                         * we do this only after failure, so when you create
+                         * the trigger file, we still finish replaying as much
+                         * as we can before failover.
+                         */
+                        if (CheckForStandbyTrigger())
+                            goto next_record_is_invalid;
+                        pg_usleep(5000000L); /* 5 seconds */
+                    }
+                    last_restore_failed = true;
+
+                    /*
+                     * Nope, not found in archive. Try to stream it.
+                     *
+                     * If fetching_ckpt is TRUE, RecPtr points to the initial
+                     * checkpoint location. In that case, we use RedoStartLSN
+                     * as the streaming start position instead of RecPtr, so
+                     * that when we later jump backwards to start redo at
+                     * RedoStartLSN, we will have the logs streamed already.
+                     */
+                    RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr,
+                                         PrimaryConnInfo);
+                }
+
+                /*
+                 * This possibly-long loop needs to handle interrupts of startup
+                 * process.
+                 */
+                HandleStartupProcInterrupts();
+            }
+        }
+        else
+        {
+            /* In archive or crash recovery. */
+            if (readFile < 0)
+            {
+                /* Reset curFileTLI if random fetch. */
+                if (randAccess)
+                    curFileTLI = 0;
+                readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
+                                              InArchiveRecovery);
+                switched_segment = true;
+                readStreamed = false;
+                if (readFile < 0)
+                    return false;
+            }
+        }
+    }
+
+    /*
+     * At this point, we have the right segment open and we know the
+     * requested record is in it.
+     */
+    Assert(readFile != -1);
+
+    /*
+     * If the current segment is being streamed from master, calculate
+     * how much of the current page we have received already. We know the
+     * requested record has been received, but this is for the benefit
+     * of future calls, to allow quick exit at the top of this function.
+     */
+    if (readStreamed)
+    {
+        if (RecPtr->xlogid != receivedUpto.xlogid ||
+            (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
+        {
+            readLen = XLOG_BLCKSZ;
+        }
+        else
+            readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
+    }
+    else
+        readLen = XLOG_BLCKSZ;
+
+    if (switched_segment && targetPageOff != 0)
+    {
+        /*
+         * Whenever switching to a new WAL segment, we read the first page of
+         * the file and validate its header, even if that's not where the
+         * target record is.  This is so that we can check the additional
+         * identification info that is present in the first page's "long"
+         * header.
+         */
+        readOff = 0;
+        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+        {
+            ereport(emode,
+                    (errcode_for_file_access(),
+                     errmsg("could not read from log file %u, segment %u, offset %u: %m",
+                            readId, readSeg, readOff)));
+            goto next_record_is_invalid;
+        }
+        if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+            goto next_record_is_invalid;
+    }
+
+    /* Read the requested page */
+    readOff = targetPageOff;
+    if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
+    {
+        ereport(emode,
+                (errcode_for_file_access(),
+                 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+                        readId, readSeg, readOff)));
+        goto next_record_is_invalid;
+    }
+    if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+    {
+        ereport(emode,
+                (errcode_for_file_access(),
+                 errmsg("could not read from log file %u, segment %u, offset %u: %m",
+                        readId, readSeg, readOff)));
+        goto next_record_is_invalid;
+    }
+    if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+        goto next_record_is_invalid;
+
+    Assert(targetId == readId);
+    Assert(targetSeg == readSeg);
+    Assert(targetPageOff == readOff);
+    Assert(targetRecOff < readLen);
+
+    return true;
+
+next_record_is_invalid:
+    if (readFile >= 0)
+        close(readFile);
+    readFile = -1;
+    readStreamed = false;
+    readLen = 0;
+
+    return false;
+}
+
+/*
+ * Check to see if the trigger file exists. If it does, request postmaster
+ * to shut down walreceiver and wait for it to exit, and remove the trigger
+ * file.
+ */
+static bool
+CheckForStandbyTrigger(void)
+{
+    struct stat stat_buf;
+
+    if (TriggerFile == NULL)
+        return false;
+
+    if (stat(TriggerFile, &stat_buf) == 0)
+    {
+        ereport(LOG,
+                (errmsg("trigger file found: %s", TriggerFile)));
+        ShutdownWalRcv();
+        unlink(TriggerFile);
+        return true;
+    }
+    return false;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index e8ddfc1..5281fa2 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -224,9 +224,6 @@ static int    Shutdown = NoShutdown;
 static bool FatalError = false; /* T if recovering from backend crash */
 static bool RecoveryError = false;        /* T if WAL recovery failed */

-/* If WalReceiverActive is true, restart walreceiver if it dies */
-static bool WalReceiverActive = false;
-
 /*
  * We use a simple state machine to control startup, shutdown, and
  * crash recovery (which is rather like shutdown followed by startup).
@@ -1469,11 +1466,6 @@ ServerLoop(void)
         if (PgStatPID == 0 && pmState == PM_RUN)
             PgStatPID = pgstat_start();

-        /* If we have lost walreceiver, try to start a new one */
-        if (WalReceiverPID == 0 && WalReceiverActive &&
-            (pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT))
-            WalReceiverPID = StartWalReceiver();
-
         /* If we need to signal the autovacuum launcher, do so now */
         if (avlauncher_needs_signal)
         {
@@ -4167,16 +4159,9 @@ sigusr1_handler(SIGNAL_ARGS)
         WalReceiverPID == 0)
     {
         /* Startup Process wants us to start the walreceiver process. */
-        WalReceiverActive = true;
         WalReceiverPID = StartWalReceiver();
     }

-    if (CheckPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER))
-    {
-        /* The walreceiver process doesn't want to be restarted anymore */
-        WalReceiverActive = false;
-    }
-
     PG_SETMASK(&UnBlockSig);

     errno = save_errno;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b049baa..f0fcb7c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -134,7 +134,6 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);

 /* Prototypes for private functions */
-static void InitWalRcv(void);
 static void WalRcvKill(int code, Datum arg);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
@@ -153,21 +152,57 @@ static struct
 void
 WalReceiverMain(void)
 {
-    sigjmp_buf    local_sigjmp_buf;
-    MemoryContext walrcv_context;
     char conninfo[MAXCONNINFO];
     XLogRecPtr startpoint;
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;

-    /* Load the libpq-specific functions */
-    load_file("libpqwalreceiver", false);
-    if (walrcv_connect == NULL || walrcv_receive == NULL ||
-        walrcv_disconnect == NULL)
-        elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+    /*
+     * WalRcv should be set up already (if we are a backend, we inherit
+     * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+     */
+    Assert(walrcv != NULL);

-    /* Mark walreceiver in progress */
-    InitWalRcv();
+    /*
+     * Mark walreceiver as running in shared memory.
+     *
+     * Do this as early as possible, so that if we fail later on, we'll
+     * set state to STOPPED. If we die before this, the startup process
+     * will keep waiting for us to startup, until it times out.
+     */
+    SpinLockAcquire(&walrcv->mutex);
+    Assert(walrcv->pid == 0);
+    switch(walrcv->walRcvState)
+    {
+        case WALRCV_STOPPING:
+            /* If we've already been requested to stop, don't start up. */
+            walrcv->walRcvState = WALRCV_STOPPED;
+            /* fall through */
+
+        case WALRCV_STOPPED:
+            SpinLockRelease(&walrcv->mutex);
+            proc_exit(1);
+            break;
+
+        case WALRCV_STARTING:
+            /* The usual case */
+            break;
+
+        case WALRCV_RUNNING:
+            /* Shouldn't happen */
+            elog(PANIC, "walreceiver still running according to shared memory state");
+    }
+    /* Advertise our PID so that the startup process can kill us */
+    walrcv->pid = MyProcPid;
+    walrcv->walRcvState = WALRCV_RUNNING;
+
+    /* Fetch information required to start streaming */
+    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+    startpoint = walrcv->receivedUpto;
+    SpinLockRelease(&walrcv->mutex);
+
+    /* Arrange to clean up at walreceiver exit */
+    on_shmem_exit(WalRcvKill, 0);

     /*
      * If possible, make this process a group leader, so that the postmaster
@@ -200,81 +235,21 @@ WalReceiverMain(void)
     /* We allow SIGQUIT (quickdie) at all times */
     sigdelset(&BlockSig, SIGQUIT);

+    /* Load the libpq-specific functions */
+    load_file("libpqwalreceiver", false);
+    if (walrcv_connect == NULL || walrcv_receive == NULL ||
+        walrcv_disconnect == NULL)
+        elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+
     /*
      * Create a resource owner to keep track of our resources (not clear that
      * we need this, but may as well have one).
      */
     CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");

-    /*
-     * Create a memory context that we will do all our work in.  We do this so
-     * that we can reset the context during error recovery and thereby avoid
-     * possible memory leaks.
-     */
-    walrcv_context = AllocSetContextCreate(TopMemoryContext,
-                                              "Wal Receiver",
-                                              ALLOCSET_DEFAULT_MINSIZE,
-                                              ALLOCSET_DEFAULT_INITSIZE,
-                                              ALLOCSET_DEFAULT_MAXSIZE);
-    MemoryContextSwitchTo(walrcv_context);
-
-    /*
-     * If an exception is encountered, processing resumes here.
-     *
-     * This code is heavily based on bgwriter.c, q.v.
-     */
-    if (sigsetjmp(local_sigjmp_buf, 1) != 0)
-    {
-        /* Since not using PG_TRY, must reset error stack by hand */
-        error_context_stack = NULL;
-
-        /* Reset WalRcvImmediateInterruptOK */
-        DisableWalRcvImmediateExit();
-
-        /* Prevent interrupts while cleaning up */
-        HOLD_INTERRUPTS();
-
-        /* Report the error to the server log */
-        EmitErrorReport();
-
-        /* Disconnect any previous connection. */
-        EnableWalRcvImmediateExit();
-        walrcv_disconnect();
-        DisableWalRcvImmediateExit();
-
-        /*
-         * Now return to normal top-level context and clear ErrorContext for
-         * next time.
-         */
-        MemoryContextSwitchTo(walrcv_context);
-        FlushErrorState();
-
-        /* Flush any leaked data in the top-level context */
-        MemoryContextResetAndDeleteChildren(walrcv_context);
-
-        /* Now we can allow interrupts again */
-        RESUME_INTERRUPTS();
-
-        /*
-         * Sleep at least 1 second after any error.  A write error is likely
-         * to be repeated, and we don't want to be filling the error logs as
-         * fast as we can.
-         */
-        pg_usleep(1000000L);
-    }
-
-    /* We can now handle ereport(ERROR) */
-    PG_exception_stack = &local_sigjmp_buf;
-
     /* Unblock signals (they were blocked when the postmaster forked us) */
     PG_SETMASK(&UnBlockSig);

-    /* Fetch connection information from shared memory */
-    SpinLockAcquire(&walrcv->mutex);
-    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
-    startpoint = walrcv->receivedUpto;
-    SpinLockRelease(&walrcv->mutex);
-
     /* Establish the connection to the primary for XLOG streaming */
     EnableWalRcvImmediateExit();
     walrcv_connect(conninfo, startpoint);
@@ -330,63 +305,24 @@ WalReceiverMain(void)
     }
 }

-/* Advertise our pid in shared memory, so that startup process can kill us. */
-static void
-InitWalRcv(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    /*
-     * WalRcv should be set up already (if we are a backend, we inherit
-     * this by fork() or EXEC_BACKEND mechanism from the postmaster).
-     */
-    if (walrcv == NULL)
-        elog(PANIC, "walreceiver control data uninitialized");
-
-    /* If we've already been requested to stop, don't start up */
-    SpinLockAcquire(&walrcv->mutex);
-    Assert(walrcv->pid == 0);
-    if (walrcv->walRcvState == WALRCV_STOPPED ||
-        walrcv->walRcvState == WALRCV_STOPPING)
-    {
-        walrcv->walRcvState = WALRCV_STOPPED;
-        SpinLockRelease(&walrcv->mutex);
-        proc_exit(1);
-    }
-    walrcv->pid = MyProcPid;
-    SpinLockRelease(&walrcv->mutex);
-
-    /* Arrange to clean up at walreceiver exit */
-    on_shmem_exit(WalRcvKill, 0);
-}
-
 /*
- * Clear our pid from shared memory at exit.
+ * Mark us as STOPPED in shared memory at exit.
  */
 static void
 WalRcvKill(int code, Datum arg)
 {
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
-    bool stopped = false;

     SpinLockAcquire(&walrcv->mutex);
-    if (walrcv->walRcvState == WALRCV_STOPPING ||
-        walrcv->walRcvState == WALRCV_STOPPED)
-    {
-        walrcv->walRcvState = WALRCV_STOPPED;
-        stopped = true;
-        elog(LOG, "walreceiver stopped");
-    }
+    Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+           walrcv->walRcvState == WALRCV_STOPPING);
+    walrcv->walRcvState = WALRCV_STOPPED;
     walrcv->pid = 0;
     SpinLockRelease(&walrcv->mutex);

+    /* Terminate the connection gracefully. */
     walrcv_disconnect();
-
-    /* If requested to stop, tell postmaster to not restart us. */
-    if (stopped)
-        SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
 }

 /* SIGHUP: set flag to re-read config file at next convenient time */
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 24cf789..763c02d 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -18,6 +18,8 @@

 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/time.h>
+#include <time.h>
 #include <unistd.h>
 #include <signal.h>

@@ -30,8 +32,11 @@

 WalRcvData *WalRcv = NULL;

-static bool CheckForStandbyTrigger(void);
-static void ShutdownWalRcv(void);
+/*
+ * How long to wait for walreceiver to start up after requesting
+ * postmaster to launch it. In seconds.
+ */
+#define WALRCV_STARTUP_TIMEOUT 10

 /* Report shared memory space needed by WalRcvShmemInit */
 Size
@@ -62,7 +67,7 @@ WalRcvShmemInit(void)

     /* Initialize the data structures */
     MemSet(WalRcv, 0, WalRcvShmemSize());
-    WalRcv->walRcvState = WALRCV_NOT_STARTED;
+    WalRcv->walRcvState = WALRCV_STOPPED;
     SpinLockInit(&WalRcv->mutex);
 }

@@ -73,90 +78,39 @@ WalRcvInProgress(void)
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
     WalRcvState state;
+    pg_time_t now = (pg_time_t) time(NULL);

     SpinLockAcquire(&walrcv->mutex);
+
+    /*
+     * If it has taken too long for walreceiver to start up, give up.
+     * Setting the state to STOPPED ensures that if walreceiver later
+     * does start up after all, it will see that it's not supposed to be
+     * running and dies before doing anything.
+     */
+    if (walrcv->walRcvState == WALRCV_STARTING &&
+        (now - walrcv->startTime) > WALRCV_STARTUP_TIMEOUT)
+        walrcv->walRcvState = WALRCV_STOPPED;
+
     state = walrcv->walRcvState;
+
     SpinLockRelease(&walrcv->mutex);

-    if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
+    if (state != WALRCV_STOPPED)
         return true;
     else
         return false;
 }

 /*
- * Wait for the XLOG record at given position to become available.
- *
- * 'recptr' indicates the byte position which caller wants to read the
- * XLOG record up to. The byte position actually written and flushed
- * by walreceiver is returned. It can be higher than the requested
- * location, and the caller can safely read up to that point without
- * calling WaitNextXLogAvailable() again.
- *
- * If WAL streaming is ended (because a trigger file is found), *finished
- * is set to true and function returns immediately. The returned position
- * can be lower than requested in that case.
- *
- * Called by the startup process during streaming recovery.
+ * Stop walreceiver (if running) and wait for it to die.
  */
-XLogRecPtr
-WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
-{
-    static XLogRecPtr receivedUpto = {0, 0};
-
-    *finished = false;
-
-    /* Quick exit if already known available */
-    if (XLByteLT(recptr, receivedUpto))
-        return receivedUpto;
-
-    for (;;)
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        /* Update local status */
-        SpinLockAcquire(&walrcv->mutex);
-        receivedUpto = walrcv->receivedUpto;
-        SpinLockRelease(&walrcv->mutex);
-
-        /* If available already, leave here */
-        if (XLByteLT(recptr, receivedUpto))
-            return receivedUpto;
-
-        /* Check to see if the trigger file exists */
-        if (CheckForStandbyTrigger())
-        {
-            *finished = true;
-            return receivedUpto;
-        }
-
-        pg_usleep(100000L); /* 100ms */
-
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        /*
-         * Emergency bailout if postmaster has died.  This is to avoid the
-         * necessity for manual cleanup of all postmaster children.
-         */
-        if (!PostmasterIsAlive(true))
-            exit(1);
-    }
-}
-
-/*
- * Stop walreceiver and wait for it to die.
- */
-static void
+void
 ShutdownWalRcv(void)
 {
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
-    pid_t walrcvpid;
+    pid_t walrcvpid = 0;

     /*
      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -164,9 +118,20 @@ ShutdownWalRcv(void)
      * restart itself.
      */
     SpinLockAcquire(&walrcv->mutex);
-    Assert(walrcv->walRcvState == WALRCV_RUNNING);
-    walrcv->walRcvState = WALRCV_STOPPING;
-    walrcvpid = walrcv->pid;
+    switch(walrcv->walRcvState)
+    {
+        case WALRCV_STOPPED:
+            break;
+        case WALRCV_STARTING:
+            walrcv->walRcvState = WALRCV_STOPPED;
+            break;
+
+        case WALRCV_RUNNING:
+        case WALRCV_STOPPING:
+            walrcv->walRcvState = WALRCV_STOPPING;
+            walrcvpid = walrcv->pid;
+            break;
+    }
     SpinLockRelease(&walrcv->mutex);

     /*
@@ -194,31 +159,8 @@ ShutdownWalRcv(void)
 }

 /*
- * Check to see if the trigger file exists. If it does, request postmaster
- * to shut down walreceiver and wait for it to exit, and remove the trigger
- * file.
- */
-static bool
-CheckForStandbyTrigger(void)
-{
-    struct stat stat_buf;
-
-    if (TriggerFile == NULL)
-        return false;
-
-    if (stat(TriggerFile, &stat_buf) == 0)
-    {
-        ereport(LOG,
-                (errmsg("trigger file found: %s", TriggerFile)));
-        ShutdownWalRcv();
-        unlink(TriggerFile);
-        return true;
-    }
-    return false;
-}
-
-/*
- * Request postmaster to start walreceiver.
+ * Request postmaster to start walreceiver, or update the starting point
+ * if already running.
  *
  * recptr indicates the position where streaming should begin, and conninfo
  * is a libpq connection string to use.
@@ -228,17 +170,29 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 {
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
+    pg_time_t now = (pg_time_t) time(NULL);

-    Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
+    /*
+     * We always start at the beginning of the segment.
+     * That prevents a broken segment (i.e., with no records in the
+     * first half of a segment) from being created by XLOG streaming,
+     * which might cause trouble later on if the segment is e.g
+     * archived.
+     */
+    if (recptr.xrecoff % XLogSegSize != 0)
+        recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
+
+    Assert(walrcv->walRcvState == WALRCV_STOPPED);

-    /* locking is just pro forma here; walreceiver isn't started yet */
     SpinLockAcquire(&walrcv->mutex);
-    walrcv->receivedUpto = recptr;
     if (conninfo != NULL)
         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     else
         walrcv->conninfo[0] = '\0';
-    walrcv->walRcvState = WALRCV_RUNNING;
+    walrcv->walRcvState = WALRCV_STARTING;
+    walrcv->startTime = now;
+
+    walrcv->receivedUpto = recptr;
     SpinLockRelease(&walrcv->mutex);

     SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
@@ -260,3 +214,4 @@ GetWalRcvWriteRecPtr(void)

     return recptr;
 }
+
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index f492975..477431f 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -27,10 +27,10 @@
  */
 typedef enum
 {
-    WALRCV_NOT_STARTED,
-    WALRCV_RUNNING,        /* walreceiver has been started */
-    WALRCV_STOPPING,    /* requested to stop, but still running */
-    WALRCV_STOPPED        /* stopped and mustn't start up again */
+    WALRCV_STOPPED,        /* stopped and mustn't start up again */
+    WALRCV_STARTING,    /* launched, but the process hasn't initialized yet */
+    WALRCV_RUNNING,        /* walreceiver is running */
+    WALRCV_STOPPING    /* requested to stop, but still running */
 } WalRcvState;

 /* Shared memory area for management of walreceiver process */
@@ -47,6 +47,7 @@ typedef struct
      */
     pid_t    pid;
     WalRcvState walRcvState;
+    pg_time_t startTime;

     /*
      * receivedUpto-1 is the last byte position that has been already
@@ -74,6 +75,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
 extern void WalReceiverMain(void);
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
+extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index f0e4329..4e305f3 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -30,7 +30,6 @@ typedef enum
     PMSIGNAL_START_AUTOVAC_LAUNCHER,    /* start an autovacuum launcher */
     PMSIGNAL_START_AUTOVAC_WORKER,        /* start an autovacuum worker */
     PMSIGNAL_START_WALRECEIVER,            /* start a walreceiver */
-    PMSIGNAL_SHUTDOWN_WALRECEIVER,        /* shut down a walreceiver */

     NUM_PMSIGNALS                /* Must be last value of enum! */
 } PMSignalReason;

Re: Streaming replication, retrying from archive

From
Simon Riggs
Date:
On Wed, 2010-01-20 at 21:26 +0200, Heikki Linnakangas wrote:

> So there's just two states:
> 
> 1. Recovering from archive
> 2. Streaming
> 
> We start from 1, and switch state at error.
> 
> This gives nice behavior from a user point of view. Standby tries to
> make progress using either the archive or streaming, whichever becomes
> available first.

Sounds good. Easier to drive if we have two gears.

-- Simon Riggs           www.2ndQuadrant.com



Re: Streaming replication, retrying from archive

From
Dimitri Fontaine
Date:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> Yeah, a lot of that logic and states is completely unnecessary until we
> have a synchronous mode. Even then, it seems complex.

I hope we'll find something less complex, what I proposed is heavily
inspired from londiste (Skytools) table addition to a replication set
(parallel COPY), which works fine.

> Here's what I've been hacking:
[...]
> So there's just two states:
>
> 1. Recovering from archive
> 2. Streaming
>
> We start from 1, and switch state at error.

Oh yes that's even more simple!

> This gives nice behavior from a user point of view. Standby tries to
> make progress using either the archive or streaming, whichever becomes
> available first.

So tools like pitrtools or walmgr.py will certainly continue being
necessary to use in 9.0, right?

-- 
dim


Re: Streaming replication, retrying from archive

From
Mark Kirkwood
Date:
Dimitri Fontaine wrote:
> Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
>   
>> Yeah, a lot of that logic and states is completely unnecessary until we
>> have a synchronous mode. Even then, it seems complex.
>>     
>
> I hope we'll find something less complex, what I proposed is heavily
> inspired from londiste (Skytools) table addition to a replication set
> (parallel COPY), which works fine.
>
>   
>> Here's what I've been hacking:
>>     
> [...]
>   
>> So there's just two states:
>>
>> 1. Recovering from archive
>> 2. Streaming
>>
>> We start from 1, and switch state at error.
>>     
>
> Oh yes that's even more simple!
>
>   
>> This gives nice behavior from a user point of view. Standby tries to
>> make progress using either the archive or streaming, whichever becomes
>> available first.
>>     
>
> So tools like pitrtools or walmgr.py will certainly continue being
> necessary to use in 9.0, right?
>
>   
Right now Streaming Replication does not do your backup for you, which 
some of the tools (e.g walmgr.py) do... Thinking about walmgr.py for a 
moment - it should be pretty easy (or possible anyway) to make it use 
streaming replication for server versions >= 9.0.

Cheers

Mark