From 32dbc891cc829f9dbbfa69690743d97699862453 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Mon, 3 Oct 2022 19:39:44 -0700 Subject: [PATCH v3 1/2] Move WAL receivers' non-shared state to a new struct. This is preparatory work for a follow-up change that will revamp the wakeup mechanism for periodic tasks that WAL receivers must perform. --- src/backend/replication/walreceiver.c | 46 +++++++++++++++------------ 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 3767466ef3..83e333e89c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -116,6 +116,14 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +/* + * A struct to keep track of non-shared state. + */ +typedef struct WalRcvInfo +{ + TimeLineID startpointTLI; +} WalRcvInfo; + static StringInfoData reply_message; static StringInfoData incoming_message; @@ -175,7 +183,6 @@ WalReceiverMain(void) char slotname[NAMEDATALEN]; bool is_temp_slot; XLogRecPtr startpoint; - TimeLineID startpointTLI; TimeLineID primaryTLI; bool first_stream; WalRcvData *walrcv = WalRcv; @@ -185,6 +192,7 @@ WalReceiverMain(void) char *err; char *sender_host = NULL; int sender_port = 0; + WalRcvInfo state = {0}; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -238,7 +246,7 @@ WalReceiverMain(void) strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); is_temp_slot = walrcv->is_temp_slot; startpoint = walrcv->receiveStart; - startpointTLI = walrcv->receiveStartTLI; + state.startpointTLI = walrcv->receiveStartTLI; /* * At most one of is_temp_slot and slotname can be set; otherwise, @@ -258,7 +266,7 @@ WalReceiverMain(void) pg_atomic_write_u64(&WalRcv->writtenUpto, 0); /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI)); + on_shmem_exit(WalRcvDie, PointerGetDatum(&state)); /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config @@ -345,11 +353,11 @@ WalReceiverMain(void) * Confirm that the current timeline of the primary is the same or * ahead of ours. */ - if (primaryTLI < startpointTLI) + if (primaryTLI < state.startpointTLI) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("highest timeline %u of the primary is behind recovery timeline %u", - primaryTLI, startpointTLI))); + primaryTLI, state.startpointTLI))); /* * Get any missing history files. We do this always, even when we're @@ -361,7 +369,7 @@ WalReceiverMain(void) * but let's avoid the confusion of timeline id collisions where we * can. */ - WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI); /* * Create temporary replication slot if requested, and update slot @@ -396,17 +404,17 @@ WalReceiverMain(void) options.logical = false; options.startpoint = startpoint; options.slotname = slotname[0] != '\0' ? slotname : NULL; - options.proto.physical.startpointTLI = startpointTLI; + options.proto.physical.startpointTLI = state.startpointTLI; if (walrcv_startstreaming(wrconn, &options)) { if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", - LSN_FORMAT_ARGS(startpoint), startpointTLI))); + LSN_FORMAT_ARGS(startpoint), state.startpointTLI))); else ereport(LOG, (errmsg("restarted WAL streaming at %X/%X on timeline %u", - LSN_FORMAT_ARGS(startpoint), startpointTLI))); + LSN_FORMAT_ARGS(startpoint), state.startpointTLI))); first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ @@ -465,7 +473,7 @@ WalReceiverMain(void) last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, - startpointTLI); + state.startpointTLI); } else if (len == 0) break; @@ -474,7 +482,7 @@ WalReceiverMain(void) ereport(LOG, (errmsg("replication terminated by primary server"), errdetail("End of WAL reached on timeline %u at %X/%X.", - startpointTLI, + state.startpointTLI, LSN_FORMAT_ARGS(LogstreamResult.Write)))); endofwal = true; break; @@ -490,7 +498,7 @@ WalReceiverMain(void) * let the startup process and primary server know about * them. */ - XLogWalRcvFlush(false, startpointTLI); + XLogWalRcvFlush(false, state.startpointTLI); } /* Check if we need to exit the streaming loop. */ @@ -596,12 +604,12 @@ WalReceiverMain(void) * know about when we began streaming, fetch its timeline history * file now. */ - WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI); } else ereport(LOG, (errmsg("primary server contains no more WAL on requested timeline %u", - startpointTLI))); + state.startpointTLI))); /* * End of WAL reached on the requested timeline. Close the last @@ -611,7 +619,7 @@ WalReceiverMain(void) { char xlogfname[MAXFNAMELEN]; - XLogWalRcvFlush(false, startpointTLI); + XLogWalRcvFlush(false, state.startpointTLI); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); if (close(recvFile) != 0) ereport(PANIC, @@ -631,7 +639,7 @@ WalReceiverMain(void) recvFile = -1; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); - WalRcvWaitForStartPosition(&startpoint, &startpointTLI); + WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI); } /* not reached */ } @@ -779,12 +787,10 @@ static void WalRcvDie(int code, Datum arg) { WalRcvData *walrcv = WalRcv; - TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg); - - Assert(*startpointTLI_p != 0); + WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg); /* Ensure that all WAL records received are flushed to disk */ - XLogWalRcvFlush(true, *startpointTLI_p); + XLogWalRcvFlush(true, state->startpointTLI); /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); -- 2.25.1