diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5937cbb..b4ae782 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -929,6 +929,8 @@ static void StartLogicalReplication(StartReplicationCmd *cmd) { StringInfoData buf; + XLogRecPtr FlushPtr; + List *timeLineHistory; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(); @@ -941,6 +943,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) * Force a disconnect, so that the decoding code doesn't need to care * about a eventual switch from running in recovery, to running in a * normal environment. Client code is expected to handle reconnects. + * This covers the race condition where we are promoted half way + * through starting up. */ if (am_cascading_walsender && !RecoveryInProgress()) { @@ -949,6 +953,14 @@ StartLogicalReplication(StartReplicationCmd *cmd) walsender_ready_to_stop = true; } + if (am_cascading_walsender) + { + /* this also updates ThisTimeLineID */ + FlushPtr = GetStandbyFlushRecPtr(); + } + else + FlushPtr = GetFlushRecPtr(); + WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ @@ -975,6 +987,24 @@ StartLogicalReplication(StartReplicationCmd *cmd) logical_startptr = MyReplicationSlot->data.restart_lsn; /* + * Find the timeline for the start location, or throw an error. + * + * Logical replication relies upon replication slots. Each slot has a + * single timeline history baked into it, so this should be easy. + */ + timeLineHistory = readTimeLineHistory(ThisTimeLineID); + sendTimeLine = tliOfPointInHistory(logical_startptr, timeLineHistory); + if (sendTimeLine != ThisTimeLineID) + { + sendTimeLineIsHistoric = true; + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, timeLineHistory, + &sendTimeLineNextTLI); + } + list_free_deep(timeLineHistory); + + streamingDoneSending = streamingDoneReceiving = false; + + /* * Report the location after which we'll send out further commits as the * current sentPtr. */ @@ -2424,6 +2454,7 @@ XLogSendPhysical(void) static void XLogSendLogical(void) { + XLogRecPtr SendRqstPtr; XLogRecord *record; char *errm; @@ -2458,6 +2489,105 @@ XLogSendLogical(void) WalSndCaughtUp = true; } + if (am_cascading_walsender && !sendTimeLineIsHistoric) + { + /* + * Streaming the latest timeline on a standby. + * + * The timeline we're recovering from can change, or we can be + * promoted. In either case, the current timeline becomes historic. We + * need to detect that so that we don't try to stream past the point + * where we switched to another timeline. We check for promotion or + * timeline switch after calculating FlushPtr, to avoid a race + * condition: if the timeline becomes historic just after we checked + * that it was still current, it's still be OK to stream it up to the + * FlushPtr that was calculated before it became historic. + */ + bool becameHistoric = false; + + SendRqstPtr = GetStandbyFlushRecPtr(); + + if (!RecoveryInProgress()) + { + /* + * We have been promoted. RecoveryInProgress() updated + * ThisTimeLineID to the new current timeline. + */ + am_cascading_walsender = false; + becameHistoric = true; + } + else + { + /* + * Still a cascading standby. But is the timeline we're sending + * still the one recovery is recovering from? ThisTimeLineID was + * updated by the GetStandbyFlushRecPtr() call above. + */ + if (sendTimeLine != ThisTimeLineID) + becameHistoric = true; + } + + if (becameHistoric) + { + /* + * The timeline we were sending has become historic. Read the + * timeline history file of the new timeline to see where exactly + * we forked off from the timeline we were sending. + */ + List *history; + + history = readTimeLineHistory(ThisTimeLineID); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, + &sendTimeLineNextTLI); + + Assert(sendTimeLine < sendTimeLineNextTLI); + list_free_deep(history); + + sendTimeLineIsHistoric = true; + + SendRqstPtr = sendTimeLineValidUpto; + } + } + + /* + * If this is a historic timeline and we've reached the point where we + * forked to the next timeline, switch to new timeline. + */ + if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) + { + List *history; + + /* close the current file. */ + if (sendFile >= 0) + close(sendFile); + sendFile = -1; + + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), (uint32) sentPtr); + + /* + * Did we reach the current timeline yet? If not, switch to the + * next one and follow that to its endpoint. + */ + if (sendTimeLineNextTLI == ThisTimeLineID) + sendTimeLineIsHistoric = false; + else + { + List *history; + + sendTimeLine = sendTimeLineNextTLI; + history = readTimeLineHistory(sendTimeLine); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, + &sendTimeLineNextTLI); + + Assert(sendTimeLine < sendTimeLineNextTLI); + list_free_deep(history); + + SendRqstPtr = sendTimeLineValidUpto; + } + } + /* Update shared memory status */ { /* use volatile pointer to prevent code rearrangement */ @@ -2467,6 +2597,8 @@ XLogSendLogical(void) walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); } + + /* ps display updated by plugin, if desired */ } /*