Re: Streaming Replication patch for CommitFest 2009-09 - Mailing list pgsql-hackers
From | Heikki Linnakangas |
---|---|
Subject | Re: Streaming Replication patch for CommitFest 2009-09 |
Date | |
Msg-id | 4AB31EEC.4000509@enterprisedb.com Whole thread Raw |
In response to | Re: Streaming Replication patch for CommitFest 2009-09 (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>) |
Responses |
Re: Streaming Replication patch for CommitFest 2009-09
Re: Streaming Replication patch for CommitFest 2009-09 |
List | pgsql-hackers |
Heikki Linnakangas wrote: > I'm thinking that walreceiver should be a stand-alone program that the > startup process launches, similar to how it invokes restore_command in > PITR recovery. Instead of using system(), though, it would use > fork+exec, and a pipe to communicate. Here's a WIP patch to do that, over your latest posted patch. I've also pushed this to my git repository at git://git.postgresql.org/git/users/heikki/postgres.git, "replication" branch. I'll continue reviewing... -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6804644..364d7e4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -41,7 +41,6 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgwriter.h" -#include "postmaster/walreceiver.h" #include "postmaster/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -54,6 +53,7 @@ #include "utils/guc.h" #include "utils/ps_status.h" #include "pg_trace.h" +#include "postmaster/fork_process.h" /* File path names (all relative to $PGDATA) */ @@ -185,7 +185,8 @@ static TimestampTz recoveryLastXTime = 0; /* options taken from recovery.conf for XLOG streaming */ static bool StandbyMode = false; -char *TriggerFile = NULL; +static char *TriggerFile = NULL; +static char *conninfo = NULL; /* if recoveryStopsHere returns true, it saves actual stop xid/time here */ static TransactionId recoveryStopXid; @@ -489,6 +490,8 @@ static volatile sig_atomic_t shutdown_requested = false; static volatile sig_atomic_t in_restore_command = false; +static pid_t WalReceiverPid = 0; + static void XLogArchiveNotify(const char *xlog); static bool XLogArchiveCheckDone(const char *xlog); static bool XLogArchiveIsBusy(const char *xlog); @@ -541,6 +544,10 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc, static void rm_redo_error_callback(void *arg); static int get_sync_bit(int method); +static void StartWalReceiver(TimeLineID tli, XLogRecPtr startlsn); +static void WaitNextXLogAvailable(XLogRecPtr recptr); +static void WaitForTrigger(void); + /* * Insert an XLOG record having the specified RMID and info bytes, @@ -1180,18 +1187,6 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, return false; /* buffer does not need to be backed up */ } -/* Report XLOG streaming progress in PS display */ -void -ReportLogstreamResult(void) -{ - char activitymsg[50]; - - snprintf(activitymsg, sizeof(activitymsg), - "streaming %X/%X", - LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); - set_ps_display(activitymsg, false); -} - /* * XLogArchiveNotify * @@ -3469,7 +3464,7 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) /* If there is no valid record available, request XLOG streaming */ startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr; - RequestXLogStreaming(recoveryTargetTLI, startlsn); + StartWalReceiver(recoveryTargetTLI, startlsn); /* Needs to read the current page again if the next record is in it */ needReread = haveNextRecord; @@ -4934,7 +4929,6 @@ readRecoveryCommandFile(void) { FILE *fd; char cmdline[MAXPGPATH]; - char *conninfo = NULL; TimeLineID rtli = 0; bool rtliGiven = false; bool syntaxError = false; @@ -5113,14 +5107,6 @@ readRecoveryCommandFile(void) cmdline), errhint("Lines should have the format parameter = 'value'."))); - /* Inform walreceiver of the connection information via file */ - if (StandbyMode) - { - write_conninfo_file(conninfo); - if (conninfo) - pfree(conninfo); - } - /* If not in standby mode, restore_command must be supplied */ if (!StandbyMode && recoveryRestoreCommand == NULL) ereport(FATAL, @@ -5282,7 +5268,13 @@ exitStreamingRecovery(void) * exited, and recovery checkpoint and subsequent records are * no longer overwritten unexpectedly. */ - ShutdownWalRcv(); + if (WalReceiverPid != 0) + { + int status = 0; + kill(WalReceiverPid, SIGTERM); + waitpid(WalReceiverPid, &status, 0); + WalReceiverPid = 0; + } /* We are no longer in streaming recovery state */ InStreamingRecovery = false; @@ -7185,7 +7177,7 @@ CreateRestartPoint(int flags) /* Are we doing recovery from XLOG stream? */ if (!InStreamingRecovery) - InStreamingRecovery = WalRcvInProgress(); + InStreamingRecovery = (WalReceiverPid != 0); /* * Delete old log files (those no longer needed even for previous @@ -7203,8 +7195,9 @@ CreateRestartPoint(int flags) { XLogRecPtr endptr; - /* Get the current (or recent) end of xlog */ - endptr = GetWalRcvWriteRecPtr(); + LWLockAcquire(ControlFileLock, LW_SHARED); + endptr = ControlFile->minRecoveryPoint; + LWLockRelease(ControlFileLock); PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, endptr); @@ -8443,3 +8436,193 @@ StartupProcessMain(void) */ proc_exit(0); } + + + + +/** WAL receiver stuff **/ + +static int walreceiver_readfd; + +static void +StartWalReceiver(TimeLineID tli, XLogRecPtr startlsn) +{ + pid_t pid; + char *av[4]; + char startptr[22]; + int pfildes[2]; + + sprintf(startptr, "%u %X/%X", tli, startlsn.xlogid, startlsn.xrecoff); + + av[0] = "walreceiver"; + av[1] = startptr; + av[2] = conninfo; + av[3] = 0; + + pipe(pfildes); + + /* Fire off execv in child */ + if ((pid = fork_process()) == 0) + { + char walreceiverpath[MAXPGPATH]; + + find_other_exec(my_exec_path, "walreceiver", + "walreceiver " PG_VERSION_STR, + walreceiverpath); + + dup2(pfildes[1], 1); /* stdout */ + if (execv(walreceiverpath, av) < 0) + { + ereport(LOG, + (errmsg("could not execute walreceiver process \"%s\": %m", + walreceiverpath))); + /* We're already in the child process here, can't return */ + exit(1); + } + } + else + { + walreceiver_readfd = pfildes[0]; + WalReceiverPid = pid; + } +} + +static bool foundTrigger = false; + +/* + * Wait for the XLOG records at given position available. + * + * The XLOG records already written by walreceiver are regarded as + * available. + * + * recptr: indicates the byte position which caller wants to read the + * XLOG record up to. + * + * Called by the startup process in streaming recovery. + */ +static void +WaitNextXLogAvailable(XLogRecPtr recptr) +{ + struct stat stat_buf; + bool signaled = false; + +#ifdef REPLICATION_DEBUG + if (REPLICATION_DEBUG_ENABLED) + elog(LOG, "xlog wait request %X/%X; write %X/%X", + recptr.xlogid, recptr.xrecoff, + LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); +#endif + + /* Quick exit if already known available */ + while(XLByteLT(recptr, LogstreamResult.Write)) + { + char buf[101]; + int i; + + for(i = 0; i < 100; i++) + { + read(walreceiver_readfd, &buf[i], 1); + if (buf[i] == '\0') + break; + } + buf[i] = '\0'; + + /* Update local status */ + sscanf(buf, "%X/%X", &LogstreamResult.Write.xlogid, &LogstreamResult.Write.xrecoff); + + /* If available already, leave here */ + if (XLByteLT(recptr, LogstreamResult.Write)) + { + /* XXX + XLogArchiveNotifySeg(recvId, recvSeg); + */ + + return; + } + + /* Check to see if the trigger file exists */ + if (TriggerFile != NULL && !foundTrigger && + stat(TriggerFile, &stat_buf) == 0) + { + ereport(LOG, + (errmsg("trigger file found: %s", TriggerFile))); + foundTrigger = true; + unlink(TriggerFile); + } + + /* + * The presence of a trigger file shuts down walreceiver if it's + * in progress. + */ + if (WalReceiverPid != 0) + { + if (foundTrigger && !signaled) + { + kill(WalReceiverPid, SIGTERM); + signaled = true; /* prevents signal from being repeated */ + } + } + /* + * If walreceiver is not in progress and has been retried more than + * MAX_WALRCV_RETRIES times, give up on the wait for the next record, + * which would cause a streaming recovery to end. If the former + * condition is met and the retry-count has not reached the maximum + * number yet, request XLOG streaming again. + */ + else + { + return; + } + + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + + pg_usleep(100000L); /* 100ms */ + } +} + + +/* Wait until a trigger file is found */ +static void +WaitForTrigger(void) +{ + int seconds_before_warning = 15; + int elapsed = 0; + int count = 0; + struct stat stat_buf; + + /* Quick exit if a trigger file was not specified or was already found */ + if (TriggerFile == NULL || foundTrigger) + return; + + while (stat(TriggerFile, &stat_buf) != 0) + { + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + + pg_usleep(100000L); /* 100ms */ + + if (++count >= 10) /* 1s passed */ + { + count = 0; + + if (++elapsed >= seconds_before_warning) + { + seconds_before_warning *= 2; /* This wraps in >10 years... */ + ereport(WARNING, + (errmsg("still waiting for the trigger file \"%s\" (%d seconds elapsed)", + TriggerFile, elapsed))); + } + } + } + + ereport(LOG, + (errmsg("trigger file found: %s", TriggerFile))); + unlink(TriggerFile); +} diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 824a93f..06e9d33 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -31,7 +31,6 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "postmaster/bgwriter.h" -#include "postmaster/walreceiver.h" #include "postmaster/walwriter.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -340,9 +339,6 @@ AuxiliaryProcessMain(int argc, char *argv[]) case WalWriterProcess: statmsg = "wal writer process"; break; - case WalReceiverProcess: - statmsg = "wal receiver process"; - break; default: statmsg = "??? process"; break; @@ -448,11 +444,6 @@ AuxiliaryProcessMain(int argc, char *argv[]) WalWriterMain(); proc_exit(1); /* should never return */ - case WalReceiverProcess: - /* don't set signals, walreceiver has its own agenda */ - WalReceiverMain(); - proc_exit(1); /* should never return */ - default: elog(PANIC, "unrecognized process type: %d", auxType); proc_exit(1); diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 616cd2c..b73fdf4 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) OBJS = autovacuum.o bgwriter.o fork_process.o pgarch.o pgstat.o postmaster.o \ - syslogger.o walwriter.o walsender.o walreceiver.o + syslogger.o walwriter.o walsender.o walreceiver.o: submake-libpq diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 3ad82ef..b91ca88 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -108,7 +108,6 @@ #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" -#include "postmaster/walreceiver.h" #include "postmaster/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -125,7 +124,6 @@ #include "storage/spin.h" #endif - /* * List of active backends (or child processes anyway; we don't actually * know whether a given child has become a backend or is still in the @@ -217,7 +215,6 @@ char *bonjour_name; static pid_t StartupPID = 0, BgWriterPID = 0, WalWriterPID = 0, - WalReceiverPID = 0, AutoVacPID = 0, PgArchPID = 0, PgStatPID = 0, @@ -289,7 +286,6 @@ typedef enum PM_WAIT_BACKENDS, /* waiting for live backends to exit */ PM_SHUTDOWN, /* waiting for bgwriter to do shutdown ckpt */ PM_SHUTDOWN_2, /* waiting for archiver to finish */ - PM_SHUTDOWN_3, /* waiting for walsenders to finish */ PM_WAIT_DEAD_END, /* waiting for dead_end children to exit */ PM_NO_CHILDREN /* all important children have exited */ } PMState; @@ -468,7 +464,6 @@ static void ShmemBackendArrayRemove(Backend *bn); #define StartupDataBase() StartChildProcess(StartupProcess) #define StartBackgroundWriter() StartChildProcess(BgWriterProcess) #define StartWalWriter() StartChildProcess(WalWriterProcess) -#define StartWalReceiver() StartChildProcess(WalReceiverProcess) /* Macros to check exit status of a child process */ #define EXIT_STATUS_0(st) ((st) == 0) @@ -1465,9 +1460,10 @@ ServerLoop(void) /* If we have lost the archiver, try to start a new one */ if (XLogArchivingActive() && PgArchPID == 0 && - (pmState == PM_RUN || + (pmState == PM_RUN /* XXX postmaster doesn't know if walreceiver is active + || ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) && - WalRcvInProgress()))) + WalRcvInProgress()) */)) PgArchPID = pgarch_start(); /* If we have lost the stats collector, try to start a new one */ @@ -1640,7 +1636,7 @@ retry1: if (proto == XLOG_STREAMING_CODE && !am_walsender) { am_walsender = true; - /* No packets other than regular one should not follow */ + /* No packets other than regular one should follow */ return ProcessStartupPacket(port, SSLdone); } @@ -2097,8 +2093,6 @@ SIGHUP_handler(SIGNAL_ARGS) signal_child(BgWriterPID, SIGHUP); if (WalWriterPID != 0) signal_child(WalWriterPID, SIGHUP); - if (WalReceiverPID != 0) - signal_child(WalReceiverPID, SIGHUP); if (AutoVacPID != 0) signal_child(AutoVacPID, SIGHUP); if (PgArchPID != 0) @@ -2194,8 +2188,6 @@ pmdie(SIGNAL_ARGS) if (StartupPID != 0) signal_child(StartupPID, SIGTERM); - if (WalReceiverPID != 0) - signal_child(WalReceiverPID, SIGTERM); if (pmState == PM_RECOVERY) { /* only bgwriter is active in this state */ @@ -2243,8 +2235,6 @@ pmdie(SIGNAL_ARGS) signal_child(BgWriterPID, SIGQUIT); if (WalWriterPID != 0) signal_child(WalWriterPID, SIGQUIT); - if (WalReceiverPID != 0) - signal_child(WalReceiverPID, SIGQUIT); if (AutoVacPID != 0) signal_child(AutoVacPID, SIGQUIT); if (PgArchPID != 0) @@ -2404,17 +2394,16 @@ reaper(SIGNAL_ARGS) */ Assert(Shutdown > NoShutdown); - if (PgArchPID != 0) + if (PgArchPID != 0 || WalSndInProgress()) { /* Waken archiver for the last time */ - signal_child(PgArchPID, SIGUSR2); - pmState = PM_SHUTDOWN_2; - } - else if (WalSndInProgress()) - { + if (PgArchPID != 0) + signal_child(PgArchPID, SIGUSR2); + /* Waken walsenders for the last time */ SignalWalSenders(SIGUSR2); - pmState = PM_SHUTDOWN_3; + + pmState = PM_SHUTDOWN_2; } else pmState = PM_WAIT_DEAD_END; @@ -2454,20 +2443,6 @@ reaper(SIGNAL_ARGS) } /* - * Was it the wal receiver? If exit status is zero (normal) or one - * (FATAL exit), we assume everything is all right just like normal - * backends. - */ - if (pid == WalReceiverPID) - { - WalReceiverPID = 0; - if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) - HandleChildCrash(pid, exitstatus, - _("WAL receiver process")); - continue; - } - - /* * Was it the autovacuum launcher? Normal exit can be ignored; we'll * start a new one at the next iteration of the postmaster's main * loop, if necessary. Any other exit condition is treated as a @@ -2495,16 +2470,12 @@ reaper(SIGNAL_ARGS) LogChildExit(LOG, _("archiver process"), pid, exitstatus); if (XLogArchivingActive() && - (pmState == PM_RUN || + (pmState == PM_RUN/* XXX postmaster doesn't know if walreceiver is active + || ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) && - WalRcvInProgress()))) + WalRcvInProgress())*/)) PgArchPID = pgarch_start(); - else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress()) - { - SignalWalSenders(SIGUSR2); - pmState = PM_SHUTDOWN_3; - } - else + else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress()) pmState = PM_WAIT_DEAD_END; continue; } @@ -2611,8 +2582,8 @@ CleanupBackend(int pid, * advance to the next shutdown step. */ if (bp->child_type == BACKEND_TYPE_WALSND && - pmState == PM_SHUTDOWN_3 && - !WalSndInProgress()) + pmState == PM_SHUTDOWN_2 && + !WalSndInProgress() && PgArchPID == 0) pmState = PM_WAIT_DEAD_END; } DLRemove(curr); @@ -2729,18 +2700,6 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) signal_child(WalWriterPID, (SendStop ? SIGSTOP : SIGQUIT)); } - /* Take care of the walreceiver too */ - if (pid == WalReceiverPID) - WalReceiverPID = 0; - else if (WalReceiverPID != 0 && !FatalError) - { - ereport(DEBUG2, - (errmsg_internal("sending %s to process %d", - (SendStop ? "SIGSTOP" : "SIGQUIT"), - (int) WalReceiverPID))); - signal_child(WalReceiverPID, (SendStop ? SIGSTOP : SIGQUIT)); - } - /* Take care of the autovacuum launcher too */ if (pid == AutoVacPID) AutoVacPID = 0; @@ -2884,7 +2843,6 @@ PostmasterStateMachine(void) */ if (CountChildren(true) == 0 && StartupPID == 0 && - WalReceiverPID == 0 && (BgWriterPID == 0 || !FatalError) && WalWriterPID == 0 && AutoVacPID == 0) @@ -2961,7 +2919,6 @@ PostmasterStateMachine(void) { /* These other guys should be dead already */ Assert(StartupPID == 0); - Assert(WalReceiverPID == 0); Assert(BgWriterPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); @@ -4119,9 +4076,10 @@ sigusr1_handler(SIGNAL_ARGS) if (CheckPostmasterSignal(PMSIGNAL_START_ARCHIVER) && XLogArchivingActive() && PgArchPID == 0 && - (pmState == PM_RUN || + (pmState == PM_RUN /* XXX postmaster doesn't know if walreceiver is active + || ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) && - WalRcvInProgress()))) + WalRcvInProgress()) */)) { /* * Start archiver process. This is mainly called for archiving during @@ -4173,12 +4131,6 @@ sigusr1_handler(SIGNAL_ARGS) RegisterWalSender(); } - if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER)) - { - /* The startup process wants us to start a walreceiver */ - WalReceiverPID = StartWalReceiver(); - } - PG_SETMASK(&UnBlockSig); errno = save_errno; @@ -4372,10 +4324,6 @@ StartChildProcess(AuxProcType type) ereport(LOG, (errmsg("could not fork WAL writer process: %m"))); break; - case WalReceiverProcess: - ereport(LOG, - (errmsg("could not fork WAL receiver process: %m"))); - break; default: ereport(LOG, (errmsg("could not fork process: %m"))); diff --git a/src/backend/postmaster/walreceiver.c b/src/backend/postmaster/walreceiver.c deleted file mode 100644 index a07b1f2..0000000 --- a/src/backend/postmaster/walreceiver.c +++ /dev/null @@ -1,980 +0,0 @@ -/*------------------------------------------------------------------------- - * - * walreceiver.c - * - * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It - * takes charge of XLOG streaming receiver in the standby server. At first, - * it is started by the postmaster and connects to the primary server, - * when the startup process in the standby mode requests XLOG streaming - * replication. It attempts to keep receiving XLOG records from the primary - * server and writing them to the disk, as long as the connection is alive - * (i.e., like any backend, there is an one to one relationship between - * a connection and the walreceiver process). Also, it notifies the startup - * process of the location of XLOG records available. This enables - * the startup process to read XLOG records from XLOG stream and apply them - * to make a replica of the primary database. - * - * Normal termination is by SIGTERM or an end-of-streaming message from the - * primary server, which instructs the walreceiver to exit(0). Emergency - * termination is by SIGQUIT; like any backend, the walreceiver will simply - * abort and exit on SIGQUIT. A close of the connection and a FATAL error - * are treated as not a crash but approximately normal termination. - * - * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group - * - * - * IDENTIFICATION - * $PostgreSQL$ - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include <unistd.h> - -#include "access/xlog_internal.h" -#include "libpq-fe.h" -#include "libpq/pqsignal.h" -#include "miscadmin.h" -#include "postmaster/walreceiver.h" -#include "storage/fd.h" -#include "storage/ipc.h" -#include "storage/pmsignal.h" -#include "storage/shmem.h" -#include "utils/guc.h" -#include "utils/memutils.h" -#include "utils/resowner.h" - -static WalRcvData *WalRcv = NULL; - -/* streamConn is a PGconn object of a connection to walsender from walreceiver */ -static PGconn *streamConn; - -/* Path for the connection information file (relative to $PGDATA) */ -#define CONNINFO_FILENAME "global/conninfo" - -/* - * These variables are used similarly to openLogFile/Id/Seg/Off, - * but for walreceiver to write the XLOG. - */ -static int recvFile = -1; -static uint32 recvId = 0; -static uint32 recvSeg = 0; -static uint32 recvOff = 0; - -/* - * ZeroedRecPtr indicates the byte position that we have already zeroed. It is - * updated when walreceiver writes a half-filled page that needs to be zeroed. - * ZeroedBuffer points a zeroed buffer used for zeroing. - */ -static XLogRecPtr ZeroedRecPtr = {0, 0}; -static char *ZeroedBuffer; - -/* Recovery has been already triggered? */ -static bool foundTrigger = false; - -/* - * Max number of times to retry walreceiver - * - * XXX: Should this number be user-configurable? - */ -#define MAX_WALRCV_RETRIES 0 - -/* - * Advances when startup process retries to request walreceiver. - * When walreceiver is not in progress, if this counter is smaller - * than MAX_WALRCV_RETRIES, we retry to start walreceiver. - */ -static int NumWalRcvRetries = 0; - -/* Flags set by interrupt handlers of walreceiver for later service in the main loop */ -static volatile sig_atomic_t got_SIGHUP = false; -static volatile sig_atomic_t shutdown_requested = false; - -/* Signal handlers */ -static void WalRcvSigHupHandler(SIGNAL_ARGS); -static void WalRcvShutdownHandler(SIGNAL_ARGS); -static void WalRcvQuickDieHandler(SIGNAL_ARGS); - -/* Prototypes for private functions */ -static void WalRcvLoop(void); -static void InitWalRcv(void); -static void WalRcvKill(int code, Datum arg); -static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced); -static void XLogWalRcvFlush(XLogRecPtr recptr); -static void WritePhysicalXLog(char *from, Size nbytes, int startoff); -static char *read_conninfo_file(void); - -/* Main entry point for walreceiver process */ -void -WalReceiverMain(void) -{ - MemoryContext walrcv_context; - char *conninfo; - - /* Mark walreceiver in progress */ - InitWalRcv(); - - /* - * If possible, make this process a group leader, so that the postmaster - * can signal any child processes too. (walreceiver probably never has - * any child processes, but for consistency we make all postmaster child - * processes do this.) - */ -#ifdef HAVE_SETSID - if (setsid() < 0) - elog(FATAL, "setsid() failed: %m"); -#endif - - /* Properly accept or ignore signals the postmaster might send us */ - pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ - pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ - pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ - pqsignal(SIGALRM, SIG_IGN); - pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); - pqsignal(SIGUSR2, SIG_IGN); - - /* Reset some signals that are accepted by postmaster but not here */ - pqsignal(SIGCHLD, SIG_DFL); - pqsignal(SIGTTIN, SIG_DFL); - pqsignal(SIGTTOU, SIG_DFL); - pqsignal(SIGCONT, SIG_DFL); - pqsignal(SIGWINCH, SIG_DFL); - - /* We allow SIGQUIT (quickdie) at all times */ -#ifdef HAVE_SIGPROCMASK - sigdelset(&BlockSig, SIGQUIT); -#else - BlockSig &= ~(sigmask(SIGQUIT)); -#endif - - /* - * Create a resource owner to keep track of our resources (not clear that - * we need this, but may as well have one). - */ - CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); - - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. Formerly this code just ran in - * TopMemoryContext, but resetting that would be a really bad idea. - */ - walrcv_context = AllocSetContextCreate(TopMemoryContext, - "Wal Receiver", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(walrcv_context); - - /* Unblock signals (they were blocked when the postmaster forked us) */ - PG_SETMASK(&UnBlockSig); - - /* Get the starting XLOG location of XLOG streaming */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - SpinLockAcquire(&walrcv->mutex); - LogstreamResult = walrcv->LogstreamResult; - SpinLockRelease(&walrcv->mutex); - - /* Report XLOG streaming progress in PS display */ - ReportLogstreamResult(); - } - - /* Read the connection information used to connect with the primary */ - conninfo = read_conninfo_file(); - - /* Set up a connection for XLOG streaming */ - streamConn = PQstartXLogStreaming(conninfo, - LogstreamResult.Write.xlogid, - LogstreamResult.Write.xrecoff); - if (PQstatus(streamConn) != CONNECTION_OK) - ereport(FATAL, - (errmsg("could not connect to the primary server : %s", - PQerrorMessage(streamConn)))); - pfree(conninfo); - - /* - * Confirm that the current timeline of the primary is the same - * as the recovery target timeline. - */ - ThisTimeLineID = PQtimeline(streamConn); - if (ThisTimeLineID != WalRcv->RecoveryTargetTLI) - ereport(FATAL, - (errmsg("timeline %u of the primary does not match " - "recovery target timeline %u", - ThisTimeLineID, WalRcv->RecoveryTargetTLI))); - - ZeroedBuffer = (char *) palloc0(XLOG_BLCKSZ); - - /* Main loop of walreceiver */ - WalRcvLoop(); -} - -/* Main loop of walreceiver process */ -static void -WalRcvLoop(void) -{ - char *buf; - bool finishing_seg; - bool fsync_requested; - int len; - XLogRecPtr recptr; - - /* Loop until end-of-streaming or error */ - for (;;) - { - bool fsynced = false; - - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive(true)) - exit(1); - - /* - * Exit walreceiver if we're not in recovery. This should not happen, - * but cross-check the status here. - */ - if (!RecoveryInProgress()) - ereport(FATAL, - (errmsg("cannot continue XLOG streaming, recovery has already ended"))); - - /* Process any requests or signals received recently */ - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } - - /* Normal exit from the walreceiver is here */ - if (shutdown_requested) - proc_exit(0); - - /* Receive XLogData message (wait for new message to arrive) */ - len = PQgetXLogData(streamConn, &buf, - (int *) &recptr.xlogid, (int *) &recptr.xrecoff, - (char *) &finishing_seg, (char *) &fsync_requested, 0); - - if (len < 0) /* end-of-streaming or error */ - break; - - if (buf == NULL) /* should not happen */ - continue; - -#ifdef REPLICATION_DEBUG - if (REPLICATION_DEBUG_ENABLED) - elog(LOG, "xlog recv result %X/%X:%s%s; write %X/%X; flush %X/%X", - recptr.xlogid, recptr.xrecoff, - finishing_seg ? " finishing_seg" : "", - fsync_requested ? " fsync_requested" : "", - LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff, - LogstreamResult.Flush.xlogid, LogstreamResult.Flush.xrecoff); -#endif - - /* - * A level of synchronization between both servers depends on when - * the standby returns a "success" of XLOG streaming to the primary. - * For example, the following timings can be considered: - * - * A "success" is returned after - * #1 receiving the logs and locating them on a memory - * #2 writing them to the disk - * #3 fsyncing them to the disk - * #4 replaying them - * ...etc - * - * We can choose only #2 now. - * - * Note: In #1 and #2, the logs might disappear if the standby fails - * before writing them to certainly the disk sector. But, since such - * missing logs are guaranteed to exist in the primary side, - * the transaction is not lost in the whole system (i.e., the standby - * can recover all transactions from the primary). - */ - - XLogWalRcvWrite(buf, len, recptr, &fsynced); - - /* - * The logs in the XLogData message were written successfully, - * so we mark the message already consumed. - */ - PQmarkConsumed(streamConn); - - /* - * If fsync is not requested or was already done, we send a "success" - * to the primary before issuing fsync for end-of-segment. - */ - if (fsynced || !fsync_requested) - { - if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff, - (int) fsynced) == -1) - ereport(FATAL, - (errmsg("could not send a message to the primary: %s", - PQerrorMessage(streamConn)))); - } - - /* - * If we just wrote the whole last page of a logfile segment but - * had not fsynced it yet, fsync the segment immediately. This - * avoids having to go back and re-open prior segments when an - * fsync request comes along later. - * - * Of course, if asked to fsync but not, do so. - */ - if (!fsynced && (fsync_requested || finishing_seg)) - { - XLogWalRcvFlush(recptr); - - if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff, - 1) == -1) - ereport(FATAL, - (errmsg("could not send a message to the primary: %s", - PQerrorMessage(streamConn)))); - - /* - * If the segment is ready to copy to archival storage, - * notify the archiver so. - */ - if (finishing_seg && XLogArchivingActive()) - XLogArchiveNotifySeg(recvId, recvSeg); - - /* - * XXX: Should we signal bgwriter to start a restartpoint - * if we've consumed too much xlog since the last one, like - * in normal processing? But this is not worth doing unless - * a restartpoint can be created independently from a - * checkpoint record. - */ - } - } - - if (len == -1) /* end-of-streaming */ - { - PGresult *res; - - res = PQgetResult(streamConn); - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - PQclear(res); - proc_exit(0); - } - PQclear(res); - } - - /* error */ - ereport(FATAL, - (errmsg("could not read xlog records: %s", - PQerrorMessage(streamConn)))); -} - -/* Mark this walreceiver in progress */ -static void -InitWalRcv(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * WalRcv should be set up already (if we are a backend, we inherit - * this by fork() or EXEC_BACKEND mechanism from the postmaster). - */ - if (walrcv == NULL) - elog(PANIC, "walreceiver control data uninitialized"); - - /* Make sure WalRcv is not in use */ - if (walrcv->pid != 0) - elog(FATAL, "WalRcv is in use"); - - /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvKill, 0); - - /* Mark walreceiver in progress */ - walrcv->pid = MyProcPid; -} - -/* - * Close a connection for XLOG streaming and mark this walreceiver - * no longer in progress - */ -static void -WalRcvKill(int code, Datum arg) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - PQfinish(streamConn); - walrcv->pid = 0; - walrcv->in_progress = false; -} - -/* SIGHUP: set flag to re-read config file at next convenient time */ -static void -WalRcvSigHupHandler(SIGNAL_ARGS) -{ - got_SIGHUP = true; -} - -/* SIGTERM: set flag to exit normally */ -static void -WalRcvShutdownHandler(SIGNAL_ARGS) -{ - if (CritSectionCount == 0) - proc_exit(0); - - /* Delay shutdown if we are inside a critical section */ - shutdown_requested = true; -} - -/* - * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. - * - * Some backend has bought the farm, - * so we need to stop what we're doing and exit. - */ -static void -WalRcvQuickDieHandler(SIGNAL_ARGS) -{ - PG_SETMASK(&BlockSig); - - /* - * We DO NOT want to run proc_exit() callbacks -- we're here because - * shared memory may be corrupted, so we don't want to try to clean up our - * transaction. Just nail the windows shut and get out of town. Now that - * there's an atexit callback to prevent third-party code from breaking - * things by calling exit() directly, we have to reset the callbacks - * explicitly to make this work as intended. - */ - on_exit_reset(); - - /* - * Note we do exit(2) not exit(0). This is to force the postmaster into a - * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random - * backend. This is necessary precisely because we don't clean up our - * shared memory state. (The "dead man switch" mechanism in pmsignal.c - * should ensure the postmaster sees this as a crash, too, but no harm - * in being doubly sure.) - */ - exit(2); -} - -/* Report shared-memory space needed by WalRcvShmemInit */ -Size -WalRcvShmemSize(void) -{ - Size size = 0; - - size = add_size(size, sizeof(WalRcvData)); - - return size; -} - -/* Allocate and initialize walreceiver-related shared memory */ -void -WalRcvShmemInit(void) -{ - bool found; - - WalRcv = (WalRcvData *) - ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); - - if (WalRcv == NULL) - ereport(FATAL, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("not enough shared memory for walreceiver"))); - if (found) - return; /* already initialized */ - - /* Initialize the data structures */ - MemSet(WalRcv, 0, WalRcvShmemSize()); - SpinLockInit(&WalRcv->mutex); -} - -/* Is walreceiver in progress (or just starting up)? */ -bool -WalRcvInProgress(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - return walrcv->in_progress; -} - -/* - * Write the log to disk. - * - * fsynced is set to true if the log was fsyned by O_DIRECT. - */ -static void -XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced) -{ - int startoff; - int endoff; - - START_CRIT_SECTION(); - - if (!XLByteInPrevSeg(recptr, recvId, recvSeg)) - { - bool use_existent; - - /* - * XLOG segment files will be re-read in recovery operation soon, - * so we don't need to advise the OS to release any cache page. - */ - if (recvFile >= 0 && close(recvFile)) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close log file %u, segment %u: %m", - recvId, recvSeg))); - recvFile = -1; - - /* Create/use new log file */ - XLByteToPrevSeg(recptr, recvId, recvSeg); - use_existent = true; - recvFile = XLogFileInit(recvId, recvSeg, - &use_existent, true); - recvOff = 0; - } - - /* Make sure we have the current logfile open */ - if (recvFile < 0) - { - XLByteToPrevSeg(recptr, recvId, recvSeg); - recvFile = XLogFileOpen(recvId, recvSeg); - recvOff = 0; - } - - /* Calculate the start/end file offset of the received logs */ - endoff = recptr.xrecoff % XLogSegSize; - startoff = ((endoff == 0) ? XLogSegSize : endoff) - len; - - /* - * Re-zero the page so that bytes beyond what we've written will look - * like zeroes and not valid XLOG records. Only end page which we are - * writing need to be zeroed. Of course, we can skip zeroing the pages - * full of the XLOG records. Save the end position of the already zeroed - * area at the variable ZeroedRecPtr, and avoid zeroing the same page - * two or more times. - * - * This must precede the writing of the actual logs. Otherwise, a crash - * before re-zeroing would cause a corrupted page. - */ - if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0) - { - int zlen; - - zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ; - WritePhysicalXLog(ZeroedBuffer, zlen, endoff); - ZeroedRecPtr = recptr; - ZeroedRecPtr.xrecoff += zlen; - } - - /* Write out the logs */ - WritePhysicalXLog(buf, len, startoff); - LogstreamResult.Send = recptr; - LogstreamResult.Write = recptr; - - if (sync_method == SYNC_METHOD_OPEN || - sync_method == SYNC_METHOD_OPEN_DSYNC) - { - LogstreamResult.Flush = recptr; - *fsynced = true; /* logs were already fsynced */ - } - - /* Update shared-memory status */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - SpinLockAcquire(&walrcv->mutex); - XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send); - XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write); - if (*fsynced) - XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush); - SpinLockRelease(&walrcv->mutex); - } - - /* Report XLOG streaming progress in PS display */ - ReportLogstreamResult(); - - END_CRIT_SECTION(); -} - -/* Flush the log to disk */ -static void -XLogWalRcvFlush(XLogRecPtr recptr) -{ - START_CRIT_SECTION(); - - issue_xlog_fsync(recvFile, recvId, recvSeg); - - LogstreamResult.Flush = recptr; - - /* Update shared-memory status */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - SpinLockAcquire(&walrcv->mutex); - XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush); - SpinLockRelease(&walrcv->mutex); - } - - END_CRIT_SECTION(); -} - -/* Physical write to the given logs */ -static void -WritePhysicalXLog(char *from, Size nbytes, int startoff) -{ - /* Need to seek in the file? */ - if (recvOff != startoff) - { - if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not seek in log file %u, " - "segment %u to offset %u: %m", - recvId, recvSeg, startoff))); - recvOff = startoff; - } - - /* OK to write the logs */ - errno = 0; - if (write(recvFile, from, nbytes) != nbytes) - { - /* if write didn't set errno, assume no disk space */ - if (errno == 0) - errno = ENOSPC; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log file %u, segment %u " - "at offset %u, length %lu: %m", - recvId, recvSeg, - recvOff, (unsigned long) nbytes))); - } - - /* Update state for write */ - recvOff += nbytes; -} - -/* - * Wait for the XLOG records at given position available. - * - * The XLOG records already written by walreceiver are regarded as - * available. - * - * recptr: indicates the byte position which caller wants to read the - * XLOG record up to. - * - * Called by the startup process in streaming recovery. - */ -void -WaitNextXLogAvailable(XLogRecPtr recptr) -{ - struct stat stat_buf; - bool signaled = false; - -#ifdef REPLICATION_DEBUG - if (REPLICATION_DEBUG_ENABLED) - elog(LOG, "xlog wait request %X/%X; write %X/%X", - recptr.xlogid, recptr.xrecoff, - LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); -#endif - - /* Quick exit if already known available */ - if (XLByteLT(recptr, LogstreamResult.Write)) - return; - - for (;;) - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* Update local status */ - SpinLockAcquire(&walrcv->mutex); - LogstreamResult = walrcv->LogstreamResult; - SpinLockRelease(&walrcv->mutex); - - /* If available already, leave here */ - if (XLByteLT(recptr, LogstreamResult.Write)) - return; - - /* Check to see if the trigger file exists */ - if (TriggerFile != NULL && !foundTrigger && - stat(TriggerFile, &stat_buf) == 0) - { - ereport(LOG, - (errmsg("trigger file found: %s", TriggerFile))); - foundTrigger = true; - unlink(TriggerFile); - } - - /* - * The presence of a trigger file shuts down walreceiver if it's - * in progress. - */ - if (WalRcvInProgress()) - { - pid_t pid = walrcv->pid; - - if (foundTrigger && !signaled && pid != 0) - { - kill(pid, SIGTERM); - signaled = true; /* prevents signal from being repeated */ - } - } - /* - * If walreceiver is not in progress and has been retried more than - * MAX_WALRCV_RETRIES times, give up on the wait for the next record, - * which would cause a streaming recovery to end. If the former - * condition is met and the retry-count has not reached the maximum - * number yet, request XLOG streaming again. - */ - else - { - if (NumWalRcvRetries < MAX_WALRCV_RETRIES && !foundTrigger) - { - /* - * Since recovery target timeline has already been shared with - * upcoming walreceiver, we pass 0 to RequestXLogStreaming() - * as timeline (i.e., shared timeline variable is not updated). - */ - RequestXLogStreaming(0, recptr); - NumWalRcvRetries++; - } - else - return; - } - - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); - - pg_usleep(100000L); /* 100ms */ - } -} - -/* Ensure that walreceiver has already exited */ -void -ShutdownWalRcv(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - pid_t pid = walrcv->pid; - - if (pid != 0) - kill(pid, SIGTERM); - - while (WalRcvInProgress()) - { - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); - - pg_usleep(100000); /* 100ms */ - } -} - -/* Wait until a trigger file is found */ -void -WaitForTrigger(void) -{ - int seconds_before_warning = 15; - int elapsed = 0; - int count = 0; - struct stat stat_buf; - - /* Quick exit if a trigger file was not specified or was already found */ - if (TriggerFile == NULL || foundTrigger) - return; - - while (stat(TriggerFile, &stat_buf) != 0) - { - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); - - pg_usleep(100000L); /* 100ms */ - - if (++count >= 10) /* 1s passed */ - { - count = 0; - - if (++elapsed >= seconds_before_warning) - { - seconds_before_warning *= 2; /* This wraps in >10 years... */ - ereport(WARNING, - (errmsg("still waiting for the trigger file \"%s\" (%d seconds elapsed)", - TriggerFile, elapsed))); - } - } - } - - ereport(LOG, - (errmsg("trigger file found: %s", TriggerFile))); - unlink(TriggerFile); -} - -/* - * Request postmaster to start the processes required for XLOG streaming. - * - * tli: recovery target timeline. If it's not 0, share it with upcoming - * walreceiver. - * - * recptr: indicates the position where we failed in reading a record. - */ -void -RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * Calculate the start position of XLOG streaming. If we need to read - * a record in the middle of a segment which doesn't exist in pg_xlog, - * the start position has to be the head of the segment which that - * record belongs to. Which is necessary for preventing an immature - * segment (i.e., there is no record in the first half of a segment) - * from being created by XLOG streaming. - */ - if (recptr.xrecoff % XLogSegSize != 0) - { - char xlogpath[MAXPGPATH]; - struct stat stat_buf; - uint32 log; - uint32 seg; - - XLByteToSeg(recptr, log, seg); - XLogFilePath(xlogpath, tli, log, seg); - - if (stat(xlogpath, &stat_buf) != 0) - recptr.xrecoff -= recptr.xrecoff % XLogSegSize; - } - - LogstreamResult.Send = recptr; - LogstreamResult.Write = recptr; - LogstreamResult.Flush = recptr; - - SpinLockAcquire(&walrcv->mutex); - walrcv->LogstreamResult = LogstreamResult; - if (tli != 0) - walrcv->RecoveryTargetTLI = tli; - walrcv->in_progress = true; /* Mark that walreceiver is in progress */ - SpinLockRelease(&walrcv->mutex); - - SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); - - /* Start archiver to archive xlog segments written by walreceiver */ - if (XLogArchivingActive()) - SendPostmasterSignal(PMSIGNAL_START_ARCHIVER); -} - -/* - * Returns the byte position that walreceiver has written - */ -XLogRecPtr -GetWalRcvWriteRecPtr(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - XLogRecPtr recptr; - - SpinLockAcquire(&walrcv->mutex); - recptr = walrcv->LogstreamResult.Write; - SpinLockRelease(&walrcv->mutex); - - return recptr; -} - -/* Write the connection information to the file */ -void -write_conninfo_file(char *conninfo) -{ - FILE *fp; - - fp = AllocateFile(CONNINFO_FILENAME, "w"); - if (!fp) - { - ereport(FATAL, - (errcode_for_file_access(), - errmsg("could not write to file \"%s\": %m", - CONNINFO_FILENAME))); - } - - /* - * The format is: - * - * conninfo string, null terminated - * - * If a connection information was not supplied (e.g., recovery.conf did not - * specify primary_conninfo parameter), an empty string is written, which - * means that the default values that are available from the environment etc - * are used for connection of XLOG streaming. - * - * Add 'replication' as the database name to connect to, into the tail of - * conninfo. Since libpq prefers a posteriorly-located setting, the database - * name specified by an user is always ignored. - */ - if (conninfo != NULL) - fprintf(fp, "%s", conninfo); - fputs(" dbname=replication", fp); - fputc(0, fp); - - if (FreeFile(fp)) - { - ereport(FATAL, - (errcode_for_file_access(), - errmsg("could not write to file \"%s\": %m", - CONNINFO_FILENAME))); - } -} - -/* Return a malloc'd connection information read from the file */ -static char * -read_conninfo_file(void) -{ - FILE *fp; - StringInfoData buf; - int ch; - char *conninfo; - - initStringInfo(&buf); - - fp = AllocateFile(CONNINFO_FILENAME, "r"); - if (!fp) - { - ereport(FATAL, - (errcode_for_file_access(), - errmsg("could not read from file \"%s\": %m", - CONNINFO_FILENAME))); - } - - /* Read a string to a null-termination or the end of the file */ - for (;;) - { - ch = fgetc(fp); - if (ch == 0 || ch == EOF) - break; - - appendStringInfoChar(&buf, (char) ch); - } - - FreeFile(fp); - - conninfo = pstrdup(buf.data); - pfree(buf.data); - - return conninfo; -} diff --git a/src/backend/postmaster/walsender.c b/src/backend/postmaster/walsender.c index 28566de..2c46511 100644 --- a/src/backend/postmaster/walsender.c +++ b/src/backend/postmaster/walsender.c @@ -50,8 +50,13 @@ #include "tcop/tcopprot.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/ps_status.h" #include "utils/resowner.h" +/* Private, possibly out-of-date copy of shared LogstreamResult */ +extern XLogstreamResult LogstreamResult; + + WalSndCtlData *WalSndCtl = NULL; static WalSnd *MyWalSnd = NULL; @@ -481,8 +486,9 @@ XLogSend(PendingMessage inMsg, PendingMessage outMsg) XLogRecPtr SendRqstPtr; /* - * Invalid position means that XLOG streaming is not started yet, - * so we do nothing here. + * Invalid position means that we have not yet received the initial + * XLogRecPtr message from the slave that indicates where to start the + * streaming. */ if (XLogRecPtrIsInvalid(LogstreamResult.Send)) return true; @@ -491,7 +497,7 @@ XLogSend(PendingMessage inMsg, PendingMessage outMsg) SendRqstPtr = GetWriteRecPtr(); #ifdef REPLICATION_DEBUG - if (REPLICATION_DEBUG_ENABLED) + if (REPLICATION_DEBUG_ENABLED && XLByteLT(LogstreamResult.Send, SendRqstPtr)) elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X", SendRqstPtr.xlogid, SendRqstPtr.xrecoff, LogstreamResult.Send.xlogid, LogstreamResult.Send.xrecoff, @@ -911,3 +917,16 @@ UpdateOldestLogstreamResult(void) LogstreamResult = oldest; return found; } + + +/* Report XLOG streaming progress in PS display */ +void +ReportLogstreamResult(void) +{ + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), + "streaming %X/%X", + LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); + set_ps_display(activitymsg, false); +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index b5f7260..ff3e659 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -25,7 +25,6 @@ #include "postmaster/autovacuum.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" -#include "postmaster/walreceiver.h" #include "postmaster/walsender.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -119,7 +118,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, BgWriterShmemSize()); size = add_size(size, AutoVacuumShmemSize()); size = add_size(size, WalSndShmemSize()); - size = add_size(size, WalRcvShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); #ifdef EXEC_BACKEND @@ -218,7 +216,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) BgWriterShmemInit(); AutoVacuumShmemInit(); WalSndShmemInit(); - WalRcvShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/bin/walreceiver/Makefile b/src/bin/walreceiver/Makefile new file mode 100644 index 0000000..28932fb --- /dev/null +++ b/src/bin/walreceiver/Makefile @@ -0,0 +1,38 @@ +#------------------------------------------------------------------------- +# +# Makefile for src/bin/walreceiver +# +# Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group +# Portions Copyright (c) 1994, Regents of the University of California +# +# $PostgreSQL$ +# +#------------------------------------------------------------------------- + +PGFILEDESC = "PostgreSQL WAL receiver utility" +subdir = src/bin/walreceiver +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = walreceiver.o $(WIN32RES) + +all: submake-libpq walreceiver + +%: %.o $(WIN32RES) + $(CC) $(CFLAGS) $^ $(libpq_pgport) $(LDFLAGS) $(LIBS) -o $@$(X) + +walreceiver: $(OBJS) + +install: all installdirs + $(INSTALL_PROGRAM) walreceiver$(X) '$(DESTDIR)$(bindir)'/walreceiver$(X) + +installdirs: + $(MKDIR_P) '$(DESTDIR)$(bindir)' + +uninstall: + rm -f $(addprefix '$(DESTDIR)$(bindir)'/, $(addsuffix $(X), $(PROGRAMS))) + +clean distclean maintainer-clean: + rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS)) diff --git a/src/bin/walreceiver/walreceiver.c b/src/bin/walreceiver/walreceiver.c new file mode 100644 index 0000000..01e6f07 --- /dev/null +++ b/src/bin/walreceiver/walreceiver.c @@ -0,0 +1,505 @@ +/*------------------------------------------------------------------------- + * + * walreceiver.c + * + * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It + * takes charge of XLOG streaming receiver in the standby server. It is + * launched by the startup process, and connects to the primary server, + * It attempts to keep receiving XLOG records from the primary + * server and writing them to the disk, as long as the connection is alive + * Also, it notifies the startup + * process of the location of XLOG records available. This enables + * the startup process to read XLOG records from XLOG stream and apply them + * to make a replica of the primary database. + * + * Normal termination is by SIGTERM or an end-of-streaming message from the + * primary server, which instructs the walreceiver to exit(0). Emergency + * termination is by SIGQUIT; like backends, walreceiver will simply + * abort and exit on SIGQUIT. A close of the connection and a FATAL error + * are treated as not a crash but approximately normal termination. + * + * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> + +#include "access/xlog_internal.h" +#include "libpq-fe.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "storage/shmem.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* libpq connection to the primary server. */ +static PGconn *streamConn = NULL; + +/* + * These variables are used similarly to openLogFile/Id/Seg/Off in xlog.c + */ +static int recvFile = -1; +static uint32 recvId = 0; +static uint32 recvSeg = 0; +static uint32 recvOff = 0; + +/* + * ZeroedRecPtr indicates the byte position that we have already zeroed. It is + * updated when walreceiver writes a half-filled page that needs to be zeroed. + * ZeroedBuffer points a zeroed buffer used for zeroing. + */ +static XLogRecPtr ZeroedRecPtr = {0, 0}; +static char *ZeroedBuffer; + +/* Signal handlers */ +static void WalRcvQuickDieHandler(SIGNAL_ARGS); + +/* Prototypes for private functions */ +static void WalRcvLoop(void); +static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced); +static void XLogWalRcvFlush(XLogRecPtr recptr); +static void WritePhysicalXLog(char *from, Size nbytes, int startoff); +static int OpenPhysicalXLog(uint32 log, uint32 seg); + + +static XLogRecPtr writtenPtr; +static XLogRecPtr flushedPtr; + +TimeLineID ThisTimeLineID; + +static void +usage(const char *progname) +{ + printf(_("%s is an internal utility to receive WAL from another PostgreSQL instance.\n\n"), progname); + printf(_("Usage:\n %s <target TLI> <starting XLOG location> <connection string>\n"), progname); + printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n")); +} + + +/* Main entry point for walreceiver process */ +int +main(int argc, char *argv[]) +{ + char *conninfo; + TimeLineID RecoveryTargetTLI; + char *s; + + if (argc > 3) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(argv[0]); + exit(0); + } + if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0) + { + puts("walreceiver (PostgreSQL) " PG_VERSION); + exit(0); + } + } + else + { + usage(argv[0]); + exit(1); + } + + /* Properly accept or ignore signals the postmaster might send us */ + pqsignal(SIGHUP, SIG_IGN); + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGTERM, SIG_DFL); + pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR2, SIG_IGN); + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); + pqsignal(SIGTTIN, SIG_DFL); + pqsignal(SIGTTOU, SIG_DFL); + pqsignal(SIGCONT, SIG_DFL); + pqsignal(SIGWINCH, SIG_DFL); + + /* We allow SIGQUIT (quickdie) at all times */ +#ifdef BROKEN +#ifdef HAVE_SIGPROCMASK + sigdelset(&BlockSig, SIGQUIT); +#else + BlockSig &= ~(sigmask(SIGQUIT)); +#endif + + /* Unblock signals (they were blocked when the postmaster forked us) */ + PG_SETMASK(&UnBlockSig); +#endif + + /* Get the starting XLOG location from command line */ + RecoveryTargetTLI = strtoul(argv[1], &s, 10); + if (*s != '\0') + { + fprintf(stderr, "invalid TLI: %s\n", argv[1]); + exit(1); + } + if (sscanf(argv[2], "%X/%X", &writtenPtr.xlogid, &writtenPtr.xrecoff) != 2) + { + fprintf(stderr, "invalid recptr: %s\n", argv[2]); + exit(1); + } + + /* Read the connection information used to connect with the primary */ + conninfo = malloc(strlen(argv[3]) + strlen(" dbname=replication") + 1); + sprintf(conninfo, "%s dbname=replication", argv[3]); + + /* Set up a connection for XLOG streaming */ + streamConn = PQstartXLogStreaming(conninfo, + writtenPtr.xlogid, + writtenPtr.xrecoff); + if (PQstatus(streamConn) != CONNECTION_OK) + { + fprintf(stderr, "could not connect to the primary server: %s\n", + PQerrorMessage(streamConn)); + exit(1); + } + + /* + * Confirm that the current timeline of the primary is the same + * as the recovery target timeline. + */ + ThisTimeLineID = PQtimeline(streamConn); + if (ThisTimeLineID != RecoveryTargetTLI) + { + fprintf(stderr, "timeline %u of the primary does not match recovery target timeline %u", + ThisTimeLineID, RecoveryTargetTLI); + exit(1); + } + + ZeroedBuffer = (char *) malloc(XLOG_BLCKSZ); + memset(ZeroedBuffer, 0, XLOG_BLCKSZ); + + /* Main loop of walreceiver */ + WalRcvLoop(); + + return 0; +} + +/* Main loop of walreceiver process */ +static void +WalRcvLoop(void) +{ + char *buf; + bool finishing_seg; + bool fsync_requested; + int len; + XLogRecPtr recptr; + + /* Loop until end-of-streaming or error */ + for (;;) + { + bool fsynced = false; + +#ifdef NOT_USED + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + + /* + * Exit walreceiver if we're not in recovery. This should not happen, + * but cross-check the status here. + */ + if (!RecoveryInProgress()) + { + fprintf(stderr, "cannot continue XLOG streaming, recovery has already ended\n"); + exit(1); + } +#endif + + /* Receive XLogData message (wait for new message to arrive) */ + len = PQgetXLogData(streamConn, &buf, + (int *) &recptr.xlogid, (int *) &recptr.xrecoff, + (char *) &finishing_seg, (char *) &fsync_requested, 0); + + if (len < 0) /* end-of-streaming or error */ + break; + + if (buf == NULL) /* should not happen */ + continue; + +#ifdef REPLICATION_DEBUG + fprintf(stderr, "xlog recv result %X/%X:%s%s; write %X/%X; flush %X/%X\n", + recptr.xlogid, recptr.xrecoff, + finishing_seg ? " finishing_seg" : "", + fsync_requested ? " fsync_requested" : "", + writtenPtr.xlogid, writtenPtr.xrecoff, + flushedPtr.xlogid, flushedPtr.xrecoff); +#endif + + /* + * A level of synchronization between both servers depends on when + * the standby returns a "success" of XLOG streaming to the primary. + * For example, the following timings can be considered: + * + * A "success" is returned after + * #1 receiving the logs and locating them on a memory + * #2 writing them to the disk + * #3 fsyncing them to the disk + * #4 replaying them + * ...etc + * + * We can choose only #2 now. + * + * Note: In #1 and #2, the logs might disappear if the standby fails + * before writing them to certainly the disk sector. But, since such + * missing logs are guaranteed to exist in the primary side, + * the transaction is not lost in the whole system (i.e., the standby + * can recover all transactions from the primary). + */ + + XLogWalRcvWrite(buf, len, recptr, &fsynced); + + /* + * The logs in the XLogData message were written successfully, + * so we mark the message already consumed. + */ + PQmarkConsumed(streamConn); + + /* + * If we just wrote the whole last page of a logfile segment but + * had not fsynced it yet, fsync the segment immediately. This + * avoids having to go back and re-open prior segments when an + * fsync request comes along later. + * + * Of course, if asked to fsync but not, do so. + */ + if (!fsynced && (fsync_requested || finishing_seg)) + { + XLogWalRcvFlush(recptr); + + /* + * XXX: Should we signal bgwriter to start a restartpoint + * if we've consumed too much xlog since the last one, like + * in normal processing? But this is not worth doing unless + * a restartpoint can be created independently from a + * checkpoint record. + * + * Heikki: + * No. The startup process is responsible for that when it + * replays the WAL. We're just storing the WAL to disk, the + * checkpoint won't do anything before it's been replayed as well. + */ + } + /* + * If fsync is not requested or was already done, we send a "success" + * to the primary before issuing fsync for end-of-segment. + */ + if (finishing_seg || (fsynced && fsync_requested)) + { + if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff, + (int) fsynced) == -1) + { + fprintf(stderr, "could not send a message to the primary: %s\n", + PQerrorMessage(streamConn)); + exit(1); + } + } + } + + if (len == -1) /* end-of-streaming */ + { + PGresult *res; + + res = PQgetResult(streamConn); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + exit(0); + } + PQclear(res); + } + + /* error */ + fprintf(stderr, "could not read xlog records: %s", + PQerrorMessage(streamConn)); + exit(1); +} + +/* + * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. + * + * Some backend has bought the farm, + * so we need to stop what we're doing and exit. + */ +static void +WalRcvQuickDieHandler(SIGNAL_ARGS) +{ +#ifdef BROKEN + PG_SETMASK(&BlockSig); +#endif + + exit(2); +} + +/* + * Write the log to disk. + * + * fsynced is set to true if the log was fsyned by O_DIRECT. + */ +static void +XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced) +{ + int startoff; + int endoff; + + if (!XLByteInPrevSeg(recptr, recvId, recvSeg)) + { + /* + * XLOG segment files will be re-read in recovery operation soon, + * so we don't need to advise the OS to release any cache page. + */ + if (recvFile >= 0 && close(recvFile)) + { + fprintf(stderr, "could not close log file %u, segment %u: %m", + recvId, recvSeg); + exit(3); + } + recvFile = -1; + + /* Create/use new log file */ + XLByteToPrevSeg(recptr, recvId, recvSeg); + recvFile = OpenPhysicalXLog(recvId, recvSeg); + recvOff = 0; + } + + /* Make sure we have the current logfile open */ + if (recvFile < 0) + { + XLByteToPrevSeg(recptr, recvId, recvSeg); + recvFile = OpenPhysicalXLog(recvId, recvSeg); + recvOff = 0; + } + + /* Calculate the start/end file offset of the received logs */ + endoff = recptr.xrecoff % XLogSegSize; + startoff = ((endoff == 0) ? XLogSegSize : endoff) - len; + + /* + * Re-zero the page so that bytes beyond what we've written will look + * like zeroes and not valid XLOG records. Only end of the page which we + * wrote to need to be zeroed. Of course, we can skip zeroing the pages + * full of the XLOG records. Save the end position of the already zeroed + * area at the variable ZeroedRecPtr, and avoid zeroing the same page + * two or more times. + * + * This must precede the writing of the actual logs. Otherwise, a crash + * before re-zeroing would cause a corrupted page. + */ + if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0) + { + int zlen; + + zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ; + WritePhysicalXLog(ZeroedBuffer, zlen, endoff); + ZeroedRecPtr = recptr; + ZeroedRecPtr.xrecoff += zlen; + } + + /* Write out the logs */ + WritePhysicalXLog(buf, len, startoff); + writtenPtr = recptr; + + /* Let the startup process know how far we've advanced */ + printf("%X/%X\n", writtenPtr.xlogid, writtenPtr.xrecoff); + + /* Report XLOG streaming progress in PS display */ + ReportLogstreamResult(); +} + +/* Flush the log to disk */ +static void +XLogWalRcvFlush(XLogRecPtr recptr) +{ + fsync(recvFile); + + flushedPtr = recptr; +} + +/* Physical write to the given logs */ +static void +WritePhysicalXLog(char *from, Size nbytes, int startoff) +{ + /* Need to seek in the file? */ + if (recvOff != startoff) + { + if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) + { + fprintf(stderr, "could not seek in log file %u, segment %u to offset %u: %s\n", + recvId, recvSeg, startoff, strerror(errno)); + exit(3); + } + recvOff = startoff; + } + + /* OK to write the logs */ + errno = 0; + if (write(recvFile, from, nbytes) != nbytes) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + fprintf(stderr, "could not write to log file %u, segment %u " + "at offset %u, length %lu: %s", + recvId, recvSeg, + recvOff, (unsigned long) nbytes, strerror(errno)); + exit(3); + } + + /* Update state for write */ + recvOff += nbytes; +} + +static int +OpenPhysicalXLog(uint32 log, uint32 seg) +{ + char path[MAXPGPATH]; + int fd; + + XLogFilePath(path, ThisTimeLineID, log, seg); + + /* + * Try to use existent file (checkpoint maker may have created it already) + */ + fd = open(path, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + fprintf(stderr, "could not open file \"%s\" (log file %u, segment %u): %s", + path, log, seg, strerror(errno)); + exit(2); + } + return fd; +} + +/* Report XLOG streaming progress in PS display */ +void +ReportLogstreamResult(void) +{ +#ifdef BROKEN + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), + "streaming %X/%X", + writtenPtr.xlogid, writtenPtr.xrecoff); + set_ps_display(activitymsg, false); +#endif +} + diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0e32f04..8ae62fe 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -191,22 +191,6 @@ typedef struct CheckpointStatsData extern CheckpointStatsData CheckpointStats; -/* - * LogstreamResult indicates the byte positions that we have already - * sent/written/fsynced. This is used for management of XLOG streaming. - */ -typedef struct -{ - XLogRecPtr Send; /* last byte + 1 sent to the standby */ - XLogRecPtr Write; /* last byte + 1 written out in the standby */ - XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ -} XLogstreamResult; - -extern XLogstreamResult LogstreamResult; - -extern char *TriggerFile; - - extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); extern void XLogFlush(XLogRecPtr RecPtr); extern void XLogBackgroundFlush(void); diff --git a/src/include/postmaster/walreceiver.h b/src/include/postmaster/walreceiver.h deleted file mode 100644 index 8e34172..0000000 --- a/src/include/postmaster/walreceiver.h +++ /dev/null @@ -1,58 +0,0 @@ -/*------------------------------------------------------------------------- - * - * walreceiver.h - * Exports from postmaster/walreceiver.c. - * - * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group - * - * $PostgreSQL$ - * - *------------------------------------------------------------------------- - */ -#ifndef _WALRECEIVER_H -#define _WALRECEIVER_H - -#include "storage/spin.h" - -/* Shared memory area for management of walreceiver process */ -typedef struct -{ - pid_t pid; /* walreceiver's process id, or 0 */ - - /* - * in_progress indicates whether walreceiver is in progress - * (or just starting up). This flag is set to TRUE when - * startup process requests walreceiver to start XLOG streaming, - * and FALSE when walreceiver exits. - */ - bool in_progress; - - /* - * LogstreamResult indicates the byte positions that have been - * already streamed. This is shared by walreceiver and startup - * process, and used to advance XLOG streaming and recovery - * cooperatively. - */ - XLogstreamResult LogstreamResult; - - /* - * recovery target timeline; must be the same as the current - * timeline of the primary. - */ - TimeLineID RecoveryTargetTLI; - - slock_t mutex; /* locks shared variables shown above */ -} WalRcvData; - -extern void WalReceiverMain(void); -extern Size WalRcvShmemSize(void); -extern void WalRcvShmemInit(void); -extern bool WalRcvInProgress(void); -extern void WaitNextXLogAvailable(XLogRecPtr recptr); -extern void ShutdownWalRcv(void); -extern void WaitForTrigger(void); -extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr); -extern XLogRecPtr GetWalRcvWriteRecPtr(void); -extern void write_conninfo_file(char *conninfo); - -#endif /* _WALRECEIVER_H */ diff --git a/src/include/postmaster/walsender.h b/src/include/postmaster/walsender.h index e547cb3..bd669e1 100644 --- a/src/include/postmaster/walsender.h +++ b/src/include/postmaster/walsender.h @@ -16,6 +16,17 @@ #include "storage/spin.h" /* + * LogstreamResult indicates the byte positions that we have already + * sent/written/fsynced. This is used for management of XLOG streaming. + */ +typedef struct +{ + XLogRecPtr Send; /* last byte + 1 sent to the standby */ + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} XLogstreamResult; + +/* * Each walsender has a WalSnd struct in shared memory. * * links: list link for any list the WalSnd struct is in. A recycled WalSnd
pgsql-hackers by date: