*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 2034,2040 **** SET ENABLE_SEQSCAN TO OFF; ! synchronous_replication (boolean) synchronous_replication configuration parameter --- 2034,2040 ---- ! synchronous_replication (enum) synchronous_replication configuration parameter *************** *** 2063,2068 **** SET ENABLE_SEQSCAN TO OFF; --- 2063,2081 ---- SET LOCAL synchronous_replication TO OFF within the transaction. + + In addition to on and off it is possible + to specify the exact level of durability required with corresponding + duration of wait. + The allowed values of synchronous_replication are + off do not wait for a response from standby + on commit waits for sync to disk on standby, + recv commit waits for standby to receive into memory (no sync), + sync commit waits for sync to disk on standby, + apply commit waits for standby to apply changes so that they + will be visible to queries on standby server. + + *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 63,69 **** #include "utils/ps_status.h" /* User-settable parameters for sync rep */ ! bool synchronous_replication = false; /* Only set in user backends */ char *SyncRepStandbyNames; #define SyncStandbysDefined() \ --- 63,69 ---- #include "utils/ps_status.h" /* User-settable parameters for sync rep */ ! SyncRepWaitType synchronous_replication = SYNC_REP_NO_WAIT; /* Only set in user backends */ char *SyncRepStandbyNames; #define SyncStandbysDefined() \ *************** *** 71,77 **** char *SyncRepStandbyNames; static bool announce_next_takeover = true; ! static void SyncRepQueueInsert(void); static void SyncRepCancelWait(void); static int SyncRepGetStandbyPriority(void); --- 71,77 ---- static bool announce_next_takeover = true; ! static void SyncRepQueueInsert(int queue); static void SyncRepCancelWait(void); static int SyncRepGetStandbyPriority(void); *************** *** 99,113 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) { char *new_status = NULL; const char *old_status; /* * Fast exit if user has not requested sync replication, or * there are no sync replication standby names defined. * Note that those standbys don't need to be connected. */ ! if (!SyncRepRequested() || !SyncStandbysDefined()) return; Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); Assert(WalSndCtl != NULL); --- 99,130 ---- { char *new_status = NULL; const char *old_status; + int queue = 1; /* * Fast exit if user has not requested sync replication, or * there are no sync replication standby names defined. * Note that those standbys don't need to be connected. */ ! if (!SyncStandbysDefined()) return; + switch (synchronous_replication) + { + case SYNC_REP_NOT_WAITING: + return; + + case SYNC_REP_WAIT_FOR_WRITE: + queue = 0; + + case SYNC_REP_WAIT_FOR_APPLY: + queue = 2; + + default: + case SYNC_REP_WAIT_FOR_FLUSH: + queue = 1; + } + Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); Assert(WalSndCtl != NULL); *************** *** 126,132 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * so its likely to be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || ! XLByteLE(XactCommitLSN, WalSndCtl->lsn)) { LWLockRelease(SyncRepLock); return; --- 143,149 ---- * so its likely to be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || ! XLByteLE(XactCommitLSN, WalSndCtl->lsn[queue])) { LWLockRelease(SyncRepLock); return; *************** *** 138,144 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) */ MyProc->waitLSN = XactCommitLSN; MyProc->syncRepState = SYNC_REP_WAITING; ! SyncRepQueueInsert(); Assert(SyncRepQueueIsOrderedByLSN()); LWLockRelease(SyncRepLock); --- 155,161 ---- */ MyProc->waitLSN = XactCommitLSN; MyProc->syncRepState = SYNC_REP_WAITING; ! SyncRepQueueInsert(queue); Assert(SyncRepQueueIsOrderedByLSN()); LWLockRelease(SyncRepLock); *************** *** 273,284 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * here out of order, so start at tail and work back to insertion point. */ static void ! SyncRepQueueInsert(void) { PGPROC *proc; ! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), ! &(WalSndCtl->SyncRepQueue), offsetof(PGPROC, syncRepLinks)); while (proc) --- 290,301 ---- * here out of order, so start at tail and work back to insertion point. */ static void ! SyncRepQueueInsert(int queue) { PGPROC *proc; ! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[queue]), ! &(WalSndCtl->SyncRepQueue[queue]), offsetof(PGPROC, syncRepLinks)); while (proc) *************** *** 290,296 **** SyncRepQueueInsert(void) if (XLByteLT(proc->waitLSN, MyProc->waitLSN)) break; ! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } --- 307,313 ---- if (XLByteLT(proc->waitLSN, MyProc->waitLSN)) break; ! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[queue]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } *************** *** 298,304 **** SyncRepQueueInsert(void) if (proc) SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); else ! SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks)); } /* --- 315,321 ---- if (proc) SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); else ! SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[queue]), &(MyProc->syncRepLinks)); } /* *************** *** 421,442 **** SyncRepReleaseWaiters(void) return; } ! if (XLByteLT(walsndctl->lsn, MyWalSnd->flush)) ! { ! /* ! * Set the lsn first so that when we wake backends they will ! * release up to this location. ! */ ! walsndctl->lsn = MyWalSnd->flush; ! numprocs = SyncRepWakeQueue(false); ! } ! LWLockRelease(SyncRepLock); ! elog(DEBUG3, "released %d procs up to %X/%X", ! numprocs, ! MyWalSnd->flush.xlogid, ! MyWalSnd->flush.xrecoff); /* * If we are managing the highest priority standby, though we weren't --- 438,457 ---- return; } ! /* ! * Set the lsn first so that when we wake backends they will ! * release up to this location. ! */ ! if (XLByteLT(walsndctl->lsn[0], MyWalSnd->write)) ! walsndctl->lsn[0] = MyWalSnd->write; ! if (XLByteLT(walsndctl->lsn[1], MyWalSnd->flush)) ! walsndctl->lsn[1] = MyWalSnd->flush; ! if (XLByteLT(walsndctl->lsn[2], MyWalSnd->apply)) ! walsndctl->lsn[2] = MyWalSnd->apply; ! numprocs = SyncRepWakeQueue(false); ! LWLockRelease(SyncRepLock); /* * If we are managing the highest priority standby, though we weren't *************** *** 515,563 **** SyncRepWakeQueue(bool all) PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; Assert(SyncRepQueueIsOrderedByLSN()); ! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), ! &(WalSndCtl->SyncRepQueue), ! offsetof(PGPROC, syncRepLinks)); ! ! while (proc) { ! /* ! * Assume the queue is ordered by LSN ! */ ! if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN)) ! return numprocs; ! ! /* ! * Move to next proc, so we can delete thisproc from the queue. ! * thisproc is valid, proc may be NULL after this. ! */ ! thisproc = proc; ! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), ! &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); ! /* ! * Set state to complete; see SyncRepWaitForLSN() for discussion ! * of the various states. ! */ ! thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; ! ! /* ! * Remove thisproc from queue. ! */ ! SHMQueueDelete(&(thisproc->syncRepLinks)); ! ! /* ! * Wake only when we have set state and removed from queue. ! */ ! Assert(SHMQueueIsDetached(&(thisproc->syncRepLinks))); ! Assert(thisproc->syncRepState == SYNC_REP_WAIT_COMPLETE); ! SetLatch(&(thisproc->waitLatch)); ! ! numprocs++; } return numprocs; --- 530,582 ---- PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; + int i; Assert(SyncRepQueueIsOrderedByLSN()); ! for (i = 0; i < MAX_SYNC_REP_QUEUES; i++) { ! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[i]), ! &(WalSndCtl->SyncRepQueue[i]), offsetof(PGPROC, syncRepLinks)); ! while (proc) ! { ! /* ! * Assume the queue is ordered by LSN ! */ ! if (!all && XLByteLT(walsndctl->lsn[i], proc->waitLSN)) ! return numprocs; ! ! /* ! * Move to next proc, so we can delete thisproc from the queue. ! * thisproc is valid, proc may be NULL after this. ! */ ! thisproc = proc; ! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[i]), ! &(proc->syncRepLinks), ! offsetof(PGPROC, syncRepLinks)); ! ! /* ! * Set state to complete; see SyncRepWaitForLSN() for discussion ! * of the various states. ! */ ! thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; ! ! /* ! * Remove thisproc from queue. ! */ ! SHMQueueDelete(&(thisproc->syncRepLinks)); ! ! /* ! * Wake only when we have set state and removed from queue. ! */ ! Assert(SHMQueueIsDetached(&(thisproc->syncRepLinks))); ! Assert(thisproc->syncRepState == SYNC_REP_WAIT_COMPLETE); ! SetLatch(&(thisproc->waitLatch)); ! ! numprocs++; ! } } return numprocs; *************** *** 606,633 **** SyncRepQueueIsOrderedByLSN(void) { PGPROC *proc = NULL; XLogRecPtr lastLSN; ! lastLSN.xlogid = 0; ! lastLSN.xrecoff = 0; ! ! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), ! &(WalSndCtl->SyncRepQueue), ! offsetof(PGPROC, syncRepLinks)); ! ! while (proc) { ! /* ! * Check the queue is ordered by LSN and that multiple ! * procs don't have matching LSNs ! */ ! if (XLByteLE(proc->waitLSN, lastLSN)) ! return false; ! lastLSN = proc->waitLSN; ! ! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), ! &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } return true; --- 625,656 ---- { PGPROC *proc = NULL; XLogRecPtr lastLSN; + int i; ! for (i = 0; i < MAX_SYNC_REP_QUEUES; i++) { ! lastLSN.xlogid = 0; ! lastLSN.xrecoff = 0; ! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[i]), ! &(WalSndCtl->SyncRepQueue[i]), offsetof(PGPROC, syncRepLinks)); + + while (proc) + { + /* + * Check the queue is ordered by LSN and that multiple + * procs don't have matching LSNs + */ + if (XLByteLE(proc->waitLSN, lastLSN)) + return false; + + lastLSN = proc->waitLSN; + + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[i]), + &(proc->syncRepLinks), + offsetof(PGPROC, syncRepLinks)); + } } return true; *** a/src/backend/replication/walreceiver.c --- b/src/backend/replication/walreceiver.c *************** *** 610,623 **** XLogWalRcvSendReply(void) /* * We can compare the write and flush positions to the last message we * sent without taking any lock, but the apply position requires a spin ! * lock, so we don't check that unless something else has changed or 10 ! * seconds have passed. This means that the apply log position will ! * appear, from the master's point of view, to lag slightly, but since ! * this is only for reporting purposes and only on idle systems, that's ! * probably OK. */ if (XLByteEQ(reply_message.write, LogstreamResult.Write) && XLByteEQ(reply_message.flush, LogstreamResult.Flush) && !TimestampDifferenceExceeds(reply_message.sendTime, now, wal_receiver_status_interval * 1000)) return; --- 610,621 ---- /* * We can compare the write and flush positions to the last message we * sent without taking any lock, but the apply position requires a spin ! * lock, so we just check whether the last message showed that all ! * flushed WAL data has been applied. */ if (XLByteEQ(reply_message.write, LogstreamResult.Write) && XLByteEQ(reply_message.flush, LogstreamResult.Flush) + && XLByteEQ(reply_message.flush, reply_message.apply) && !TimestampDifferenceExceeds(reply_message.sendTime, now, wal_receiver_status_interval * 1000)) return; *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 1247,1253 **** WalSndShmemInit(void) /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); ! SHMQueueInit(&(WalSndCtl->SyncRepQueue)); for (i = 0; i < max_wal_senders; i++) { --- 1247,1256 ---- /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); ! for (i = 0; i < MAX_SYNC_REP_QUEUES; i++) ! { ! SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); ! } for (i = 0; i < max_wal_senders; i++) { *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 755,768 **** static struct config_bool ConfigureNamesBool[] = true, NULL, NULL }, { - {"synchronous_replication", PGC_USERSET, WAL_REPLICATION, - gettext_noop("Requests synchronous replication."), - NULL - }, - &synchronous_replication, - false, NULL, NULL - }, - { {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS, gettext_noop("Continues processing past damaged page headers."), gettext_noop("Detection of a damaged page header normally causes PostgreSQL to " --- 755,760 ---- *************** *** 2818,2823 **** static struct config_enum ConfigureNamesEnum[] = --- 2810,2825 ---- }, { + {"synchronous_replication", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Requests synchronous replication."), + NULL + }, + &synchronous_replication, + SYNC_REP_NO_WAIT, synchronous_replication_options, + NULL, NULL + }, + + { {"default_transaction_isolation", PGC_USERSET, CLIENT_CONN_STATEMENT, gettext_noop("Sets the transaction isolation level of each new transaction."), NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 186,192 **** # - Replication - User Settings ! #synchronous_replication = off # does commit wait for reply from standby # - Streaming Replication - Server Settings --- 186,196 ---- # - Replication - User Settings ! #synchronous_replication = off # commit wait for reply from standby ! # valid values: on, off, recv, sync, apply ! # recv = in memory on standby ! # sync = flushed to disk on standby (= on) ! # apply = change applied and ready for query # - Streaming Replication - Server Settings *** a/src/include/replication/syncrep.h --- b/src/include/replication/syncrep.h *************** *** 20,34 **** #include "utils/guc.h" #define SyncRepRequested() \ ! (synchronous_replication && max_wal_senders > 0) /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 /* user-settable parameters for synchronous replication */ ! extern bool synchronous_replication; extern char *SyncRepStandbyNames; /* called by user backend */ --- 20,65 ---- #include "utils/guc.h" #define SyncRepRequested() \ ! (synchronous_replication >= 1 && max_wal_senders > 0) /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 + typedef enum SyncRepWaitType + { + SYNC_REP_NO_WAIT, /* async replication */ + SYNC_REP_WAIT_FOR_WRITE, /* sync rep, wait for write of data on standby */ + SYNC_REP_WAIT_FOR_FLUSH, /* sync rep, wait for fsync of data on standby */ + SYNC_REP_WAIT_FOR_APPLY, /* sync rep, wait for apply of data on standby */ + } SyncRepWaitType; + + /* + * Although only "on", "off", "async", "write", "fsync" and "apply" are documented, we + * accept all the likely variants of "on" and "off". + */ + static const struct config_enum_entry synchronous_replication_options[] = { + {"async", SYNC_REP_NO_WAIT, false}, + {"recv", SYNC_REP_WAIT_FOR_WRITE, false}, + {"sync", SYNC_REP_WAIT_FOR_FLUSH, false}, + {"apply", SYNC_REP_WAIT_FOR_APPLY, false}, + {"on", SYNC_REP_WAIT_FOR_FLUSH, false}, + {"off", SYNC_REP_NO_WAIT, false}, + {"true", SYNC_REP_WAIT_FOR_FLUSH, true}, + {"false", SYNC_REP_NO_WAIT, true}, + {"yes", SYNC_REP_WAIT_FOR_FLUSH, true}, + {"no", SYNC_REP_NO_WAIT, true}, + {"1", SYNC_REP_WAIT_FOR_FLUSH, true}, + {"0", SYNC_REP_NO_WAIT, true}, + {NULL, 0, false} + }; + + /* sync replication has separate queues for write, fsync and apply */ + #define MAX_SYNC_REP_QUEUES 3 + /* user-settable parameters for synchronous replication */ ! extern SyncRepWaitType synchronous_replication; extern char *SyncRepStandbyNames; /* called by user backend */ *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 68,82 **** extern WalSnd *MyWalSnd; typedef struct { /* ! * Synchronous replication queue. Protected by SyncRepLock. */ ! SHM_QUEUE SyncRepQueue; /* * Current location of the head of the queue. All waiters should have * a waitLSN that follows this value. Protected by SyncRepLock. */ ! XLogRecPtr lsn; /* * Are any sync standbys defined? Waiting backends can't reload the --- 68,82 ---- typedef struct { /* ! * Synchronous replication queues. Protected by SyncRepLock. */ ! SHM_QUEUE SyncRepQueue[MAX_SYNC_REP_QUEUES]; /* * Current location of the head of the queue. All waiters should have * a waitLSN that follows this value. Protected by SyncRepLock. */ ! XLogRecPtr lsn[MAX_SYNC_REP_QUEUES]; /* * Are any sync standbys defined? Waiting backends can't reload the