*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 2018,2023 **** SET ENABLE_SEQSCAN TO OFF; --- 2018,2109 ---- + + Synchronous Replication + + + These settings control the behavior of the built-in + synchronous replication feature. + These parameters would be set on the primary server that is + to send replication data to one or more standby servers. + + + + + synchronous_replication (boolean) + + synchronous_replication configuration parameter + + + + Specifies whether transaction commit will wait for WAL records + to be replicated before the command returns a success + indication to the client. The default setting is off. + When on, there will be a delay while the client waits + for confirmation of successful replication. That delay will + increase depending upon the physical distance and network activity + between primary and standby. The commit wait will last until a + reply from the current synchronous standby indicates it has received + the commit record of the transaction. Synchronous standbys must + already have been defined (see ). + + + This parameter can be changed at any time; the + behavior for any one transaction is determined by the setting in + effect when it commits. It is therefore possible, and useful, to have + some transactions replicate synchronously and others asynchronously. + For example, to make a single multistatement transaction commit + asynchronously when the default is synchronous replication, issue + SET LOCAL synchronous_replication TO OFF within the + transaction. + + + + + + synchronous_standby_names (integer) + + synchronous_standby_names configuration parameter + + + + Specifies a priority ordered list of standby names that can offer + synchronous replication. At any one time there will be just one + synchronous standby that will wake sleeping users following commit. + The synchronous standby will be the first named standby that is + both currently connected and streaming in real-time to the standby + (as shown by a state of "STREAMING"). Other standby servers + with listed later will become potential synchronous standbys. + If the current synchronous standby disconnects for whatever reason + it will be replaced immediately with the next highest priority standby. + Specifying more than one standby name can allow very high availability. + + + The standby name is currently taken as the application_name of the + standby, as set in the primary_conninfo on the standby. Names are + not enforced for uniqueness. In case of duplicates one of the standbys + will be chosen to be the synchronous standby, though exactly which + one is indeterminate. + + + The default is the special entry * which matches any + application_name, including the default application name of + walsender. This is not recommended and a more carefully + thought through configuration will be desirable. + + + If a standby is removed from the list of servers then it will stop + being the synchronous standby, allowing another to take it's place. + If the list is empty, synchronous replication will not be + possible, whatever the setting of synchronous_replication. + Standbys may also be added to the list without restarting the server. + + + + + + + Standby Servers *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 875,880 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' --- 875,1083 ---- + + Synchronous Replication + + + Synchronous Replication + + + + PostgreSQL streaming replication is asynchronous by + default. If the primary server + crashes then some transactions that were committed may not have been + replicated to the standby server, causing data loss. The amount + of data loss is proportional to the replication delay at the time of + failover. + + + + Synchronous replication offers the ability to confirm that all changes + made by a transaction have been transferred to one synchronous standby + server. This extends the standard level of durability + offered by a transaction commit. This level of protection is referred + to as 2-safe replication in computer science theory. + + + + When requesting synchronous replication, each commit of a + write transaction will wait until confirmation is + received that the commit has been written to the transaction log on disk + of both the primary and standby server. The only possibility that data + can be lost is if both the primary and the standby suffer crashes at the + same time. This can provide a much higher level of durability, though only + if the sysadmin is cautious about the placement and management of the two + servers. Waiting for confirmation increases the user's confidence that the + changes will not be lost in the event of server crashes but it also + necessarily increases the response time for the requesting transaction. + The minimum wait time is the roundtrip time between primary to standby. + + + + Read only transactions and transaction rollbacks need not wait for + replies from standby servers. Subtransaction commits do not wait for + responses from standby servers, only top-level commits. Long + running actions such as data loading or index building do not wait + until the very final commit message. All two-phase commit actions + require commit waits, including both prepare and commit. + + + + Basic Configuration + + + All parameters have useful default values, so we can enable + synchronous replication easily just by setting this on the primary + + + synchronous_replication = on + + + When synchronous_replication is set, a commit will wait + for confirmation that the standby has received the commit record, + even if that takes a very long time. + synchronous_replication can be set by individual + users, so can be configured in the configuration file, for particular + users or databases, or dynamically by applications programs. + + + + After a commit record has been written to disk on the primary the + WAL record is then sent to the standby. The standby sends reply + messages each time a new batch of WAL data is received, unless + wal_receiver_status_interval is set to zero on the standby. + If the standby is the first matching standby, as specified in + synchronous_standby_names on the primary, the reply + messages from that standby will be used to wake users waiting for + confirmation the commit record has been received. These parameters + allow the administrator to specify which standby servers should be + synchronous standbys. Note that the configuration of synchronous + replication is mainly on the master. + + + + Users will stop waiting if a fast shutdown is requested, though the + server does not fully shutdown until all outstanding WAL records are + transferred to standby servers. + + + + Note also that synchronous_commit is used when the user + specifies synchronous_replication, overriding even an + explicit setting of synchronous_commit to off. + This is because we must write WAL to disk on primary before we replicate + to ensure the standby never gets ahead of the primary. + + + + + + Planning for Performance + + + Synchronous replication usually requires carefully planned and placed + standby servers to ensure applications perform acceptably. Waiting + doesn't utilise system resources, but transaction locks continue to be + held until the transfer is confirmed. As a result, incautious use of + synchronous replication will reduce performance for database + applications because of increased response times and higher contention. + + + + PostgreSQL allows the application developer + to specify the durability level required via replication. This can be + specified for the system overall, though it can also be specified for + specific users or connections, or even individual transactions. + + + + For example, an application workload might consist of: + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + + + + With synchronous replication options specified at the application level + (on the primary) we can offer sync rep for the most important changes, + without slowing down the bulk of the total workload. Application level + options are an important and practical tool for allowing the benefits of + synchronous replication for high performance applications. + + + + You should consider that the network bandwidth must be higher than + the rate of generation of WAL data. + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + + + + + + Planning for High Availability + + + Commits made when synchronous_replication is set will wait until at + the sync standby responds. The response may never occur if the last, + or only, standby should crash. + + + + The best solution for avoiding data loss is to ensure you don't lose + your last remaining sync standby. This can be achieved by naming multiple + potential synchronous standbys using synchronous_standby_names. + The first named standby will be used as the synchronous standby. Standbys + listed after this will takeover the role of synchronous standby if the + first one should fail. + + + + When a standby first attaches to the primary, it will not yet be properly + synchronized. This is described as CATCHUP mode. Once + the lag between standby and primary reaches zero for the first time + we move to real-time STREAMING state. + The catch-up duration may be long immediately after the standby has + been created. If the standby is shutdown, then the catch-up period + will increase according to the length of time the standby has been down. + The standby is only able to become a synchronous standby + once it has reached STREAMING state. + + + + If primary restarts while commits are waiting for acknowledgement, those + waiting transactions will be marked fully committed once the primary + database recovers. + There is no way to be certain that all standbys have received all + outstanding WAL data at time of the crash of the primary. Some + transactions may not show as committed on the standby, even though + they show as committed on the primary. The guarantee we offer is that + the application will not receive explicit acknowledgement of the + successful commit of a transaction until the WAL data is known to be + safely received by the standby. + + + + If you really do lose your last standby server then you should disable + synchronous_standby_names and restart the primary server. + + + + If the primary is isolated from remaining standby severs you should + failover to the best candidate of those other remaining standby servers. + + + + If you need to re-create a standby server while transactions are + waiting, make sure that the commands to run pg_start_backup() and + pg_stop_backup() are run in a session with + synchronous_replication = off, otherwise those requests will wait + forever for the standby to appear. + + + + *** a/doc/src/sgml/monitoring.sgml --- b/doc/src/sgml/monitoring.sgml *************** *** 306,313 **** postgres: user database host is set or if the user's hostname needed to be looked up during pg_hba.conf --- 306,316 ---- location. In addition, the standby reports the last transaction log position it received and wrote, the last position it flushed to disk, and the last position it replayed, and this information is also ! displayed here. If the standby's application names matches one of the ! settings in synchronous_standby_names then the sync_priority ! is shown here also, that is the order in which standbys will become ! the synchronous standby. The columns detailing what exactly the connection ! is doing are only visible if the user examining the view is a superuser. The client's hostname will be available only if is set or if the user's hostname needed to be looked up during pg_hba.conf *** a/src/backend/access/transam/twophase.c --- b/src/backend/access/transam/twophase.c *************** *** 56,61 **** --- 56,62 ---- #include "pg_trace.h" #include "pgstat.h" #include "replication/walsender.h" + #include "replication/syncrep.h" #include "storage/fd.h" #include "storage/predicate.h" #include "storage/procarray.h" *************** *** 1071,1076 **** EndPrepare(GlobalTransaction gxact) --- 1072,1085 ---- END_CRIT_SECTION(); + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked the prepare, but still show as + * running in the procarray (twice!) and continue to hold locks. + */ + SyncRepWaitForLSN(gxact->prepare_lsn); + records.tail = records.head = NULL; } *************** *** 2030,2035 **** RecordTransactionCommitPrepared(TransactionId xid, --- 2039,2052 ---- MyProc->inCommit = false; END_CRIT_SECTION(); + + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(recptr); } /* *************** *** 2109,2112 **** RecordTransactionAbortPrepared(TransactionId xid, --- 2126,2137 ---- TransactionIdAbortTree(xid, nchildren, children); END_CRIT_SECTION(); + + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(recptr); } *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** *** 37,42 **** --- 37,43 ---- #include "miscadmin.h" #include "pgstat.h" #include "replication/walsender.h" + #include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" *************** *** 1055,1061 **** RecordTransactionCommit(void) * if all to-be-deleted tables are temporary though, since they are lost * anyway if we crash.) */ ! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0) { /* * Synchronous commit case: --- 1056,1062 ---- * if all to-be-deleted tables are temporary though, since they are lost * anyway if we crash.) */ ! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested()) { /* * Synchronous commit case: *************** *** 1125,1130 **** RecordTransactionCommit(void) --- 1126,1139 ---- /* Compute latestXid while we have the child XIDs handy */ latestXid = TransactionIdLatest(xid, nchildren, children); + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(XactLastRecEnd); + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd.xrecoff = 0; *** a/src/backend/catalog/system_views.sql --- b/src/backend/catalog/system_views.sql *************** *** 520,526 **** CREATE VIEW pg_stat_replication AS W.sent_location, W.write_location, W.flush_location, ! W.replay_location FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W WHERE S.usesysid = U.oid AND --- 520,528 ---- W.sent_location, W.write_location, W.flush_location, ! W.replay_location, ! W.sync_priority, ! W.sync_state FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W WHERE S.usesysid = U.oid AND *** a/src/backend/postmaster/autovacuum.c --- b/src/backend/postmaster/autovacuum.c *************** *** 1527,1532 **** AutoVacWorkerMain(int argc, char *argv[]) --- 1527,1539 ---- SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); /* + * Force synchronous replication off to allow regular maintenance even + * if we are waiting for standbys to connect. This is important to + * ensure we aren't blocked from performing anti-wraparound tasks. + */ + SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE); + + /* * Get the info about the database we're going to work on. */ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); *** a/src/backend/replication/Makefile --- b/src/backend/replication/Makefile *************** *** 13,19 **** top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ ! repl_gram.o include $(top_srcdir)/src/backend/common.mk --- 13,19 ---- include $(top_builddir)/src/Makefile.global OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ ! repl_gram.o syncrep.o include $(top_srcdir)/src/backend/common.mk *** /dev/null --- b/src/backend/replication/syncrep.c *************** *** 0 **** --- 1,460 ---- + /*------------------------------------------------------------------------- + * + * syncrep.c + * + * Synchronous replication is new as of PostgreSQL 9.1. + * + * If requested, transaction commits wait until their commit LSN is + * acknowledged by the sync standby. + * + * This module contains the code for waiting and release of backends. + * All code in this module executes on the primary. The core streaming + * replication transport remains within WALreceiver/WALsender modules. + * + * The essence of this design is that it isolates all logic about + * waiting/releasing onto the primary. The primary defines which standbys + * it wishes to wait for. The standby is completely unaware of the + * durability requirements of transactions on the primary, reducing the + * complexity of the code and streamlining both standby operations and + * network bandwidth because there is no requirement to ship + * per-transaction state information. + * + * The bookeeping approach we take is that a commit is either synchronous + * or not synchronous (async). If it is async, we just fastpath out of + * here. If it is sync, then in 9.1 we wait for the flush location on the + * standby before releasing the waiting backend. Further complexity + * in that interaction is expected in later releases. + * + * The best performing way to manage the waiting backends is to have a + * single ordered queue of waiting backends, so that we can avoid + * searching the through all waiters each time we receive a reply. + * + * In 9.1 we support only a single synchronous standby, chosen from a + * priority list of synchronous_standby_names. Before it can become the + * synchronous standby it must have caught up with the primary; that may + * take some time. Once caught up, the current highest priority standby + * will release waiters from the queue. + * + * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include + + #include "access/xact.h" + #include "access/xlog_internal.h" + #include "miscadmin.h" + #include "postmaster/autovacuum.h" + #include "replication/syncrep.h" + #include "replication/walsender.h" + #include "storage/latch.h" + #include "storage/ipc.h" + #include "storage/pmsignal.h" + #include "storage/proc.h" + #include "utils/builtins.h" + #include "utils/guc.h" + #include "utils/guc_tables.h" + #include "utils/memutils.h" + #include "utils/ps_status.h" + + /* User-settable parameters for sync rep */ + bool sync_rep_mode = false; /* Only set in user backends */ + char *SyncRepStandbyNames; + + static bool announce_next_takeover = true; + + static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN); + static void SyncRepQueueInsert(void); + + static int SyncRepGetStandbyPriority(void); + static int SyncRepWakeQueue(void); + + /* + * =========================================================== + * Synchronous Replication functions for normal user backends + * =========================================================== + */ + + /* + * Wait for synchronous replication, if requested by user. + */ + void + SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) + { + /* + * Fast exit if user has not requested sync replication, or + * streaming replication is inactive in this server. + */ + if (!SyncRepRequested() || max_wal_senders == 0) + return; + + /* + * Wait on queue. We check for a fast exit once we have the lock. + */ + SyncRepWaitOnQueue(XactCommitLSN); + } + + void + SyncRepCleanupAtProcExit(int code, Datum arg) + { + if (!SHMQueueIsDetached(&(MyProc->syncrep_links))) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SHMQueueDelete(&(MyProc->syncrep_links)); + LWLockRelease(SyncRepLock); + } + + if (MyProc != NULL) + DisownLatch(&MyProc->waitLatch); + } + + /* + * Wait for specified LSN to be confirmed at the requested level + * of durability. Each proc has its own wait latch, so we perform + * a normal latch check/wait loop here. + */ + static void + SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + char *new_status = NULL; + const char *old_status; + int len; + + Assert(SHMQueueIsDetached(&(MyProc->syncrep_links))); + + for (;;) + { + ResetLatch(&MyProc->waitLatch); + + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + /* + * First time through, add ourselves to the queue. + */ + if (SHMQueueIsDetached(&(MyProc->syncrep_links))) + { + /* + * Wait no longer if we have already reached our LSN + */ + if (XLByteLE(XactCommitLSN, walsndctl->lsn)) + { + /* No need to wait */ + LWLockRelease(SyncRepLock); + return; + } + + /* + * Set our waitLSN so WALSender will know when to wake us. + * We set this before we add ourselves to queue, so that + * any proc on the queue can be examined freely without + * taking a lock on each process in the queue. + */ + MyProc->waitLSN = XactCommitLSN; + SyncRepQueueInsert(); + LWLockRelease(SyncRepLock); + + /* + * Alter ps display to show waiting for sync rep. + */ + if (update_process_title) + { + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 32 + 1); + memcpy(new_status, old_status, len); + sprintf(new_status + len, " waiting for %X/%X", + XactCommitLSN.xlogid, XactCommitLSN.xrecoff); + set_ps_display(new_status, false); + new_status[len] = '\0'; /* truncate off " waiting ..." */ + } + } + else + { + /* + * Check the LSN on our queue and if it's moved far enough then + * remove us from the queue. First time through this is + * unlikely to be far enough, yet is possible. Next time we are + * woken we should be more lucky. + */ + if (XLByteLE(XactCommitLSN, walsndctl->lsn)) + { + SHMQueueDelete(&(MyProc->syncrep_links)); + LWLockRelease(SyncRepLock); + + /* + * Reset our waitLSN. + */ + MyProc->waitLSN.xlogid = 0; + MyProc->waitLSN.xrecoff = 0; + + if (new_status) + { + /* Reset ps display */ + set_ps_display(new_status, false); + pfree(new_status); + } + + ereport(DEBUG3, + (errmsg("synchronous replication wait for %X/%X complete at %s", + XactCommitLSN.xlogid, + XactCommitLSN.xrecoff, + timestamptz_to_str(GetCurrentTimestamp())))); + return; + } + + LWLockRelease(SyncRepLock); + } + + WaitLatch(&MyProc->waitLatch, -1); + } + } + + /* + * Insert MyProc into SyncRepQueue, maintaining sorted invariant. + * + * Usually we will go at tail of queue, though its possible that we arrive + * here out of order, so start at tail and work back to insertion point. + */ + static void + SyncRepQueueInsert(void) + { + PGPROC *proc; + + proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), + &(WalSndCtl->SyncRepQueue), + offsetof(PGPROC, syncrep_links)); + + while (proc) + { + /* + * Stop at the queue element that we should after to + * ensure the queue is ordered by LSN. + */ + if (XLByteLT(proc->waitLSN, MyProc->waitLSN)) + break; + + proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue), + &(proc->syncrep_links), + offsetof(PGPROC, syncrep_links)); + } + + if (proc) + SHMQueueInsertAfter(&(proc->syncrep_links), &(MyProc->syncrep_links)); + else + SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncrep_links)); + } + + /* + * =========================================================== + * Synchronous Replication functions for wal sender processes + * =========================================================== + */ + + /* + * Take any action required to initialise sync rep state from config + * data. Called at WALSender startup and after each SIGHUP. + */ + void + SyncRepInitConfig(void) + { + int priority; + + /* + * Determine if we are a potential sync standby and remember the result + * for handling replies from standby. + */ + priority = SyncRepGetStandbyPriority(); + if (MyWalSnd->sync_standby_priority != priority) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + MyWalSnd->sync_standby_priority = priority; + LWLockRelease(SyncRepLock); + ereport(DEBUG1, + (errmsg("standby \"%s\" now has synchronous standby priority %u", + application_name, priority))); + } + } + + /* + * Update the LSNs on each queue based upon our latest state. This + * implements a simple policy of first-valid-standby-releases-waiter. + * + * Other policies are possible, which would change what we do here and what + * perhaps also which information we store as well. + */ + void + SyncRepReleaseWaiters(void) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile WalSnd *syncWalSnd = NULL; + int numprocs = 0; + int priority = 0; + int i; + + /* + * If this WALSender is serving a standby that is not on the list of + * potential standbys then we have nothing to do. If we are still + * starting up or still running base backup, then leave quicly also. + */ + if (MyWalSnd->sync_standby_priority == 0 || + MyWalSnd->state < WALSNDSTATE_STREAMING) + return; + + /* + * We're a potential sync standby. Release waiters if we are the + * highest priority standby. + */ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + for (i = 0; i < max_wal_senders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &walsndctl->walsnds[i]; + + if (walsnd->pid != 0 && + walsnd->sync_standby_priority > 0 && + (priority == 0 || + priority > walsnd->sync_standby_priority)) + { + priority = walsnd->sync_standby_priority; + syncWalSnd = walsnd; + } + } + + /* + * We should have found ourselves at least. + */ + Assert(syncWalSnd); + + /* + * If we aren't managing the highest priority standby then just leave. + */ + if (syncWalSnd != MyWalSnd) + { + LWLockRelease(SyncRepLock); + announce_next_takeover = true; + return; + } + + if (XLByteLT(walsndctl->lsn, MyWalSnd->flush)) + { + /* + * Set the lsn first so that when we wake backends they will + * release up to this location. + */ + walsndctl->lsn = MyWalSnd->flush; + numprocs = SyncRepWakeQueue(); + } + + LWLockRelease(SyncRepLock); + + elog(DEBUG3, "released %d procs up to %X/%X", + numprocs, + MyWalSnd->flush.xlogid, + MyWalSnd->flush.xrecoff); + + /* + * If we are managing the highest priority standby, though we weren't + * prior to this, then announce we are now the sync standby. + */ + if (announce_next_takeover) + { + announce_next_takeover = false; + ereport(LOG, + (errmsg("standby \"%s\" is now the synchronous standby with priority %u", + application_name, MyWalSnd->sync_standby_priority))); + } + } + + /* + * Check if we are in the list of sync standbys, and if so, determine + * priority sequence. Return priority if set, or zero to indicate that + * we are not a potential sync standby. + * + * Compare the parameter SyncRepStandbyNames against the application_name + * for this WALSender, or allow any name if we find a wildcard "*". + */ + static int + SyncRepGetStandbyPriority(void) + { + char *rawstring; + List *elemlist; + ListCell *l; + int priority = 0; + bool found = false; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(SyncRepStandbyNames); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist); + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid list syntax for parameter \"synchronous_standby_names\""))); + return 0; + } + + foreach(l, elemlist) + { + char *standby_name = (char *) lfirst(l); + + priority++; + + if (pg_strcasecmp(standby_name, application_name) == 0 || + pg_strcasecmp(standby_name, "*") == 0) + { + found = true; + break; + } + } + + pfree(rawstring); + list_free(elemlist); + + return (found ? priority : 0); + } + + /* + * Walk queue from head setting the latches of any procs that need + * to be woken. We don't modify the queue, we leave that for individual + * procs to release themselves. + * + * Must hold SyncRepLock + */ + static int + SyncRepWakeQueue(void) + { + volatile WalSndCtlData *walsndctl = WalSndCtl; + PGPROC *proc; + int numprocs = 0; + + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), + &(WalSndCtl->SyncRepQueue), + offsetof(PGPROC, syncrep_links)); + + while (proc) + { + /* + * Assume the queue is ordered by LSN + */ + if (XLByteLT(walsndctl->lsn, proc->waitLSN)) + return numprocs; + + numprocs++; + SetLatch(&(proc->waitLatch)); + proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue), + &(proc->syncrep_links), + offsetof(PGPROC, syncrep_links)); + } + + return numprocs; + } *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 66,72 **** WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ ! static WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ --- 66,72 ---- WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ ! WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ *************** *** 174,179 **** WalSenderMain(void) --- 174,181 ---- SpinLockRelease(&walsnd->mutex); } + SyncRepInitConfig(); + /* Main loop of walsender */ return WalSndLoop(); } *************** *** 584,589 **** ProcessStandbyReplyMessage(void) --- 586,593 ---- walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } + + SyncRepReleaseWaiters(); } /* *************** *** 700,705 **** WalSndLoop(void) --- 704,710 ---- { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); } /* *************** *** 771,777 **** WalSndLoop(void) --- 776,787 ---- * that point might wait for some time. */ if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup) + { + ereport(DEBUG1, + (errmsg("standby \"%s\" has now caught up with primary", + application_name))); WalSndSetState(WALSNDSTATE_STREAMING); + } ProcessRepliesIfAny(); } *************** *** 1238,1243 **** WalSndShmemInit(void) --- 1248,1255 ---- /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); + SHMQueueInit(&(WalSndCtl->SyncRepQueue)); + for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; *************** *** 1304,1315 **** WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { ! #define PG_STAT_GET_WAL_SENDERS_COLS 6 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; int i; /* check to see if caller supports us returning a tuplestore */ --- 1316,1330 ---- Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { ! #define PG_STAT_GET_WAL_SENDERS_COLS 8 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; + int sync_priority[max_wal_senders]; + int priority = 0; + int sync_standby = -1; int i; /* check to see if caller supports us returning a tuplestore */ *************** *** 1337,1342 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) --- 1352,1384 ---- MemoryContextSwitchTo(oldcontext); + /* + * Get the priorities of sync standbys all in one go, to minimise + * lock acquisitions and to allow us to evaluate who is the current + * sync standby. + */ + LWLockAcquire(SyncRepLock, LW_SHARED); + for (i = 0; i < max_wal_senders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + if (walsnd->pid != 0) + { + sync_priority[i] = walsnd->sync_standby_priority; + + if (walsnd->state == WALSNDSTATE_STREAMING && + walsnd->sync_standby_priority > 0 && + (priority == 0 || + priority > walsnd->sync_standby_priority)) + { + priority = walsnd->sync_standby_priority; + sync_standby = i; + } + } + } + LWLockRelease(SyncRepLock); + for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ *************** *** 1370,1380 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * Only superusers can see details. Other users only get * the pid value to know it's a walsender, but no details. */ ! nulls[1] = true; ! nulls[2] = true; ! nulls[3] = true; ! nulls[4] = true; ! nulls[5] = true; } else { --- 1412,1418 ---- * Only superusers can see details. Other users only get * the pid value to know it's a walsender, but no details. */ ! MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1); } else { *************** *** 1401,1406 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) --- 1439,1457 ---- snprintf(location, sizeof(location), "%X/%X", apply.xlogid, apply.xrecoff); values[5] = CStringGetTextDatum(location); + + values[6] = Int32GetDatum(sync_priority[i]); + + /* + * More easily understood version of standby state. + * This is purely informational, not different from priority. + */ + if (sync_priority[i] == 0) + values[7] = CStringGetTextDatum("ASYNC"); + else if (i == sync_standby) + values[7] = CStringGetTextDatum("SYNC"); + else + values[7] = CStringGetTextDatum("POTENTIAL"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); *** a/src/backend/storage/ipc/shmqueue.c --- b/src/backend/storage/ipc/shmqueue.c *************** *** 104,110 **** SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem) * element. Inserting "after" the queue head puts the elem * at the head of the queue. */ - #ifdef NOT_USED void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem) { --- 104,109 ---- *************** *** 118,124 **** SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem) queue->next = elem; nextPtr->prev = elem; } - #endif /* NOT_USED */ /*-------------------- * SHMQueueNext -- Get the next element from a queue --- 117,122 ---- *************** *** 156,161 **** SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) --- 154,178 ---- return (Pointer) (((char *) elemPtr) - linkOffset); } + /*-------------------- + * SHMQueuePrev -- Get the previous element from a queue + * + * Same as SHMQueueNext, just starting at tail and moving towards head + * All other comments and usage applies. + */ + Pointer + SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) + { + SHM_QUEUE *elemPtr = curElem->prev; + + Assert(ShmemAddrIsValid(curElem)); + + if (elemPtr == queue) /* back to the queue head? */ + return NULL; + + return (Pointer) (((char *) elemPtr) - linkOffset); + } + /* * SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise */ *** a/src/backend/storage/lmgr/proc.c --- b/src/backend/storage/lmgr/proc.c *************** *** 39,44 **** --- 39,45 ---- #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" + #include "replication/syncrep.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" *************** *** 196,201 **** InitProcGlobal(void) --- 197,203 ---- PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* *************** *** 214,219 **** InitProcGlobal(void) --- 216,222 ---- PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* *************** *** 224,229 **** InitProcGlobal(void) --- 227,233 ---- { AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */ PGSemaphoreCreate(&(AuxiliaryProcs[i].sem)); + InitSharedLatch(&procs[i].waitLatch); } /* Create ProcStructLock spinlock, too */ *************** *** 326,331 **** InitProcess(void) --- 330,341 ---- SHMQueueInit(&(MyProc->myProcLocks[i])); MyProc->recoveryConflictPending = false; + /* Initialise the waitLSN for sync rep */ + MyProc->waitLSN.xlogid = 0; + MyProc->waitLSN.xrecoff = 0; + + OwnLatch((Latch *) &MyProc->waitLatch); + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly *************** *** 365,370 **** InitProcessPhase2(void) --- 375,381 ---- /* * Arrange to clean that up at backend exit. */ + on_shmem_exit(SyncRepCleanupAtProcExit, 0); on_shmem_exit(RemoveProcFromArray, 0); } *** a/src/backend/tcop/postgres.c --- b/src/backend/tcop/postgres.c *************** *** 2628,2633 **** die(SIGNAL_ARGS) --- 2628,2638 ---- ProcDiePending = true; /* + * Set this proc's wait latch to stop waiting + */ + SetLatch(&(MyProc->waitLatch)); + + /* * If it's safe to interrupt, and we're waiting for input or a lock, * service the interrupt immediately */ *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 55,60 **** --- 55,61 ---- #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" + #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" *************** *** 754,759 **** static struct config_bool ConfigureNamesBool[] = --- 755,768 ---- true, NULL, NULL }, { + {"synchronous_replication", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Requests synchronous replication."), + NULL + }, + &sync_rep_mode, + false, NULL, NULL + }, + { {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS, gettext_noop("Continues processing past damaged page headers."), gettext_noop("Detection of a damaged page header normally causes PostgreSQL to " *************** *** 2717,2722 **** static struct config_string ConfigureNamesString[] = --- 2726,2741 ---- }, { + {"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("List of potential standby names to synchronise with."), + NULL, + GUC_LIST_INPUT + }, + &SyncRepStandbyNames, + "", NULL, NULL + }, + + { {"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets default text search configuration."), NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 184,190 **** #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables ! # - Streaming Replication - #max_wal_senders = 0 # max number of walsender processes # (change requires restart) --- 184,199 ---- #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables ! # - Replication - User Settings ! ! #synchronous_replication = off # does commit wait for reply from standby ! ! # - Streaming Replication - Server Settings ! ! #synchronous_standby_names = '' # standby servers that provide sync rep ! # comma-separated list of application_name from standby(s); ! # '*' = all ! #max_wal_senders = 0 # max number of walsender processes # (change requires restart) *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** *** 2542,2548 **** DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); ! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); --- 2542,2548 ---- DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); ! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23,25}" "{o,o,o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); *** /dev/null --- b/src/include/replication/syncrep.h *************** *** 0 **** --- 1,37 ---- + /*------------------------------------------------------------------------- + * + * syncrep.h + * Exports from replication/syncrep.c. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #ifndef _SYNCREP_H + #define _SYNCREP_H + + #include "access/xlog.h" + #include "storage/proc.h" + #include "storage/shmem.h" + #include "storage/spin.h" + + #define SyncRepRequested() (sync_rep_mode) + + /* user-settable parameters for synchronous replication */ + extern bool sync_rep_mode; + extern int sync_rep_timeout; + extern char *SyncRepStandbyNames; + + /* called by user backend */ + extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); + + /* callback at backend exit */ + extern void SyncRepCleanupAtProcExit(int code, Datum arg); + + /* called by wal sender */ + extern void SyncRepInitConfig(void); + extern void SyncRepReleaseWaiters(void); + + #endif /* _SYNCREP_H */ *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 15,20 **** --- 15,21 ---- #include "access/xlog.h" #include "nodes/nodes.h" #include "storage/latch.h" + #include "replication/syncrep.h" #include "storage/spin.h" *************** *** 52,62 **** typedef struct WalSnd --- 53,84 ---- * to do. */ Latch latch; + + /* + * The priority order of the standby managed by this WALSender, as + * listed in synchronous_standby_names, or 0 if not-listed. + * Protected by SyncRepLock. + */ + int sync_standby_priority; } WalSnd; + extern WalSnd *MyWalSnd; + /* There is one WalSndCtl struct for the whole database cluster */ typedef struct { + /* + * Synchronous replication queue. Protected by SyncRepLock. + */ + SHM_QUEUE SyncRepQueue; + + /* + * Current location of the head of the queue. All waiters should have + * a waitLSN that follows this value, or they are currently being woken + * to remove themselves from the queue. Protected by SyncRepLock. + */ + XLogRecPtr lsn; + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; *** a/src/include/storage/lwlock.h --- b/src/include/storage/lwlock.h *************** *** 78,83 **** typedef enum LWLockId --- 78,84 ---- SerializableFinishedListLock, SerializablePredicateLockListLock, OldSerXidLock, + SyncRepLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, *** a/src/include/storage/proc.h --- b/src/include/storage/proc.h *************** *** 14,19 **** --- 14,21 ---- #ifndef _PROC_H_ #define _PROC_H_ + #include "access/xlog.h" + #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" #include "utils/timestamp.h" *************** *** 115,120 **** struct PGPROC --- 117,128 ---- LOCKMASK heldLocks; /* bitmask for lock types already held on this * lock object by this backend */ + /* Info to allow us to wait for synchronous replication, if needed. */ + Latch waitLatch; + XLogRecPtr waitLSN; /* waiting for this LSN or higher */ + + SHM_QUEUE syncrep_links; /* list link if process is in syncrep list */ + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of *** a/src/include/storage/shmem.h --- b/src/include/storage/shmem.h *************** *** 67,74 **** extern void SHMQueueInit(SHM_QUEUE *queue); --- 67,77 ---- extern void SHMQueueElemInit(SHM_QUEUE *queue); extern void SHMQueueDelete(SHM_QUEUE *queue); extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem); + extern void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem); extern Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset); + extern Pointer SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, + Size linkOffset); extern bool SHMQueueEmpty(const SHM_QUEUE *queue); extern bool SHMQueueIsDetached(const SHM_QUEUE *queue); *** a/src/test/regress/expected/rules.out --- b/src/test/regress/expected/rules.out *************** *** 1298,1304 **** SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset; pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d; pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; ! pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid)); pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text)); pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text)); pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL)); --- 1298,1304 ---- pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset; pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d; pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; ! pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority, w.sync_state FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid)); pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text)); pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text)); pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));