From 9d274eecef8e66e60b34d14c24459e70e782cf86 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Wed, 5 Oct 2022 10:31:35 -0700 Subject: [PATCH v5 1/2] Move WAL receivers' non-shared state to a new struct. This is preparatory work for a follow-up change that will revamp the wakeup mechanism for periodic tasks that WAL receivers must perform. --- src/backend/replication/walreceiver.c | 90 ++++++++++++++------------- 1 file changed, 48 insertions(+), 42 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 6cbb67c92a..89985c54cf 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -116,6 +116,14 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +/* + * A struct to keep track of non-shared state. + */ +typedef struct WalRcvInfo +{ + TimeLineID startpointTLI; +} WalRcvInfo; + static StringInfoData reply_message; static StringInfoData incoming_message; @@ -123,12 +131,12 @@ static StringInfoData incoming_message; static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); -static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, - TimeLineID tli); -static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, - TimeLineID tli); -static void XLogWalRcvFlush(bool dying, TimeLineID tli); -static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); +static void XLogWalRcvProcessMsg(WalRcvInfo *state, unsigned char type, + char *buf, Size len); +static void XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, + XLogRecPtr recptr); +static void XLogWalRcvFlush(WalRcvInfo *state, bool dying); +static void XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -175,7 +183,6 @@ WalReceiverMain(void) char slotname[NAMEDATALEN]; bool is_temp_slot; XLogRecPtr startpoint; - TimeLineID startpointTLI; TimeLineID primaryTLI; bool first_stream; WalRcvData *walrcv = WalRcv; @@ -185,6 +192,7 @@ WalReceiverMain(void) char *err; char *sender_host = NULL; int sender_port = 0; + WalRcvInfo state = {0}; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -238,7 +246,7 @@ WalReceiverMain(void) strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); is_temp_slot = walrcv->is_temp_slot; startpoint = walrcv->receiveStart; - startpointTLI = walrcv->receiveStartTLI; + state.startpointTLI = walrcv->receiveStartTLI; /* * At most one of is_temp_slot and slotname can be set; otherwise, @@ -258,7 +266,7 @@ WalReceiverMain(void) pg_atomic_write_u64(&WalRcv->writtenUpto, 0); /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI)); + on_shmem_exit(WalRcvDie, PointerGetDatum(&state)); /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config @@ -345,11 +353,11 @@ WalReceiverMain(void) * Confirm that the current timeline of the primary is the same or * ahead of ours. */ - if (primaryTLI < startpointTLI) + if (primaryTLI < state.startpointTLI) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("highest timeline %u of the primary is behind recovery timeline %u", - primaryTLI, startpointTLI))); + primaryTLI, state.startpointTLI))); /* * Get any missing history files. We do this always, even when we're @@ -361,7 +369,7 @@ WalReceiverMain(void) * but let's avoid the confusion of timeline id collisions where we * can. */ - WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI); /* * Create temporary replication slot if requested, and update slot @@ -396,17 +404,17 @@ WalReceiverMain(void) options.logical = false; options.startpoint = startpoint; options.slotname = slotname[0] != '\0' ? slotname : NULL; - options.proto.physical.startpointTLI = startpointTLI; + options.proto.physical.startpointTLI = state.startpointTLI; if (walrcv_startstreaming(wrconn, &options)) { if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", - LSN_FORMAT_ARGS(startpoint), startpointTLI))); + LSN_FORMAT_ARGS(startpoint), state.startpointTLI))); else ereport(LOG, (errmsg("restarted WAL streaming at %X/%X on timeline %u", - LSN_FORMAT_ARGS(startpoint), startpointTLI))); + LSN_FORMAT_ARGS(startpoint), state.startpointTLI))); first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ @@ -464,8 +472,8 @@ WalReceiverMain(void) */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; - XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, - startpointTLI); + XLogWalRcvProcessMsg(&state, buf[0], &buf[1], + len - 1); } else if (len == 0) break; @@ -474,7 +482,7 @@ WalReceiverMain(void) ereport(LOG, (errmsg("replication terminated by primary server"), errdetail("End of WAL reached on timeline %u at %X/%X.", - startpointTLI, + state.startpointTLI, LSN_FORMAT_ARGS(LogstreamResult.Write)))); endofwal = true; break; @@ -490,7 +498,7 @@ WalReceiverMain(void) * let the startup process and primary server know about * them. */ - XLogWalRcvFlush(false, startpointTLI); + XLogWalRcvFlush(&state, false); } /* Check if we need to exit the streaming loop. */ @@ -596,12 +604,12 @@ WalReceiverMain(void) * know about when we began streaming, fetch its timeline history * file now. */ - WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI); } else ereport(LOG, (errmsg("primary server contains no more WAL on requested timeline %u", - startpointTLI))); + state.startpointTLI))); /* * End of WAL reached on the requested timeline. Close the last @@ -611,7 +619,7 @@ WalReceiverMain(void) { char xlogfname[MAXFNAMELEN]; - XLogWalRcvFlush(false, startpointTLI); + XLogWalRcvFlush(&state, false); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); if (close(recvFile) != 0) ereport(PANIC, @@ -631,7 +639,7 @@ WalReceiverMain(void) recvFile = -1; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); - WalRcvWaitForStartPosition(&startpoint, &startpointTLI); + WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI); } /* not reached */ } @@ -779,12 +787,10 @@ static void WalRcvDie(int code, Datum arg) { WalRcvData *walrcv = WalRcv; - TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg); - - Assert(*startpointTLI_p != 0); + WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg); /* Ensure that all WAL records received are flushed to disk */ - XLogWalRcvFlush(true, *startpointTLI_p); + XLogWalRcvFlush(state, true); /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); @@ -814,7 +820,7 @@ WalRcvDie(int code, Datum arg) * Accept the message from XLOG stream, and process it. */ static void -XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) +XLogWalRcvProcessMsg(WalRcvInfo *state, unsigned char type, char *buf, Size len) { int hdrlen; XLogRecPtr dataStart; @@ -844,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) buf += hdrlen; len -= hdrlen; - XLogWalRcvWrite(buf, len, dataStart, tli); + XLogWalRcvWrite(state, buf, len, dataStart); break; } case 'k': /* Keepalive */ @@ -881,12 +887,12 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) * Write XLOG data to disk. */ static void -XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) +XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, XLogRecPtr recptr) { int startoff; int byteswritten; - Assert(tli != 0); + Assert(state->startpointTLI != 0); while (nbytes > 0) { @@ -894,14 +900,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) /* Close the current segment if it's completed */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr, tli); + XLogWalRcvClose(state, recptr); if (recvFile < 0) { /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); - recvFile = XLogFileInit(recvSegNo, tli); - recvFileTLI = tli; + recvFile = XLogFileInit(recvSegNo, state->startpointTLI); + recvFileTLI = state->startpointTLI; } /* Calculate the start offset of the received logs */ @@ -954,7 +960,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) * segment is received and written. */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr, tli); + XLogWalRcvClose(state, recptr); } /* @@ -964,15 +970,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) * an error, so we skip sending a reply in that case. */ static void -XLogWalRcvFlush(bool dying, TimeLineID tli) +XLogWalRcvFlush(WalRcvInfo *state, bool dying) { - Assert(tli != 0); + Assert(state->startpointTLI != 0); if (LogstreamResult.Flush < LogstreamResult.Write) { WalRcvData *walrcv = WalRcv; - issue_xlog_fsync(recvFile, recvSegNo, tli); + issue_xlog_fsync(recvFile, recvSegNo, state->startpointTLI); LogstreamResult.Flush = LogstreamResult.Write; @@ -982,7 +988,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) { walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; - walrcv->receivedTLI = tli; + walrcv->receivedTLI = state->startpointTLI; } SpinLockRelease(&walrcv->mutex); @@ -1019,18 +1025,18 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) * Create an archive notification file since the segment is known completed. */ static void -XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) +XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr) { char xlogfname[MAXFNAMELEN]; Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)); - Assert(tli != 0); + Assert(state->startpointTLI != 0); /* * fsync() and close current file before we switch to next one. We would * otherwise have to reopen this file to fsync it later */ - XLogWalRcvFlush(false, tli); + XLogWalRcvFlush(state, false); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); -- 2.25.1