*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2018,2023 **** SET ENABLE_SEQSCAN TO OFF;
--- 2018,2109 ----
+
+ Synchronous Replication
+
+
+ These settings control the behavior of the built-in
+ synchronous replication> feature.
+ These parameters would be set on the primary server that is
+ to send replication data to one or more standby servers.
+
+
+
+
+ synchronous_replication (boolean)
+
+ synchronous_replication> configuration parameter
+
+
+
+ Specifies whether transaction commit will wait for WAL records
+ to be replicated before the command returns a success>
+ indication to the client. The default setting is off>.
+ When on>, there will be a delay while the client waits
+ for confirmation of successful replication. That delay will
+ increase depending upon the physical distance and network activity
+ between primary and standby. The commit wait will last until a
+ reply from the current synchronous standby indicates it has received
+ the commit record of the transaction. Synchronous standbys must
+ already have been defined (see ).
+
+
+ This parameter can be changed at any time; the
+ behavior for any one transaction is determined by the setting in
+ effect when it commits. It is therefore possible, and useful, to have
+ some transactions replicate synchronously and others asynchronously.
+ For example, to make a single multistatement transaction commit
+ asynchronously when the default is synchronous replication, issue
+ SET LOCAL synchronous_replication TO OFF> within the
+ transaction.
+
+
+
+
+
+ synchronous_standby_names (integer)
+
+ synchronous_standby_names> configuration parameter
+
+
+
+ Specifies a priority ordered list of standby names that can offer
+ synchronous replication. At any one time there will be just one
+ synchronous standby that will wake sleeping users following commit.
+ The synchronous standby will be the first named standby that is
+ both currently connected and streaming in real-time to the standby
+ (as shown by a state of "STREAMING"). Other standby servers
+ with listed later will become potential synchronous standbys.
+ If the current synchronous standby disconnects for whatever reason
+ it will be replaced immediately with the next highest priority standby.
+ Specifying more than one standby name can allow very high availability.
+
+
+ The standby name is currently taken as the application_name of the
+ standby, as set in the primary_conninfo on the standby. Names are
+ not enforced for uniqueness. In case of duplicates one of the standbys
+ will be chosen to be the synchronous standby, though exactly which
+ one is indeterminate.
+
+
+ The default is the special entry *> which matches any
+ application_name, including the default application name of
+ walsender>. This is not recommended and a more carefully
+ thought through configuration will be desirable.
+
+
+ If a standby is removed from the list of servers then it will stop
+ being the synchronous standby, allowing another to take it's place.
+ If the list is empty, synchronous replication will not be
+ possible, whatever the setting of synchronous_replication>.
+ Standbys may also be added to the list without restarting the server.
+
+
+
+
+
+
+
Standby Servers
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 875,880 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 875,1083 ----
+
+ Synchronous Replication
+
+
+ Synchronous Replication
+
+
+
+ PostgreSQL> streaming replication is asynchronous by
+ default. If the primary server
+ crashes then some transactions that were committed may not have been
+ replicated to the standby server, causing data loss. The amount
+ of data loss is proportional to the replication delay at the time of
+ failover.
+
+
+
+ Synchronous replication offers the ability to confirm that all changes
+ made by a transaction have been transferred to one synchronous standby
+ server. This extends the standard level of durability
+ offered by a transaction commit. This level of protection is referred
+ to as 2-safe replication in computer science theory.
+
+
+
+ When requesting synchronous replication, each commit of a
+ write transaction will wait until confirmation is
+ received that the commit has been written to the transaction log on disk
+ of both the primary and standby server. The only possibility that data
+ can be lost is if both the primary and the standby suffer crashes at the
+ same time. This can provide a much higher level of durability, though only
+ if the sysadmin is cautious about the placement and management of the two
+ servers. Waiting for confirmation increases the user's confidence that the
+ changes will not be lost in the event of server crashes but it also
+ necessarily increases the response time for the requesting transaction.
+ The minimum wait time is the roundtrip time between primary to standby.
+
+
+
+ Read only transactions and transaction rollbacks need not wait for
+ replies from standby servers. Subtransaction commits do not wait for
+ responses from standby servers, only top-level commits. Long
+ running actions such as data loading or index building do not wait
+ until the very final commit message. All two-phase commit actions
+ require commit waits, including both prepare and commit.
+
+
+
+ Basic Configuration
+
+
+ All parameters have useful default values, so we can enable
+ synchronous replication easily just by setting this on the primary
+
+
+ synchronous_replication = on
+
+
+ When synchronous_replication> is set, a commit will wait
+ for confirmation that the standby has received the commit record,
+ even if that takes a very long time.
+ synchronous_replication> can be set by individual
+ users, so can be configured in the configuration file, for particular
+ users or databases, or dynamically by applications programs.
+
+
+
+ After a commit record has been written to disk on the primary the
+ WAL record is then sent to the standby. The standby sends reply
+ messages each time a new batch of WAL data is received, unless
+ wal_receiver_status_interval> is set to zero on the standby.
+ If the standby is the first matching standby, as specified in
+ synchronous_standby_names> on the primary, the reply
+ messages from that standby will be used to wake users waiting for
+ confirmation the commit record has been received. These parameters
+ allow the administrator to specify which standby servers should be
+ synchronous standbys. Note that the configuration of synchronous
+ replication is mainly on the master.
+
+
+
+ Users will stop waiting if a fast shutdown is requested, though the
+ server does not fully shutdown until all outstanding WAL records are
+ transferred to standby servers.
+
+
+
+ Note also that synchronous_commit> is used when the user
+ specifies synchronous_replication>, overriding even an
+ explicit setting of synchronous_commit> to off>.
+ This is because we must write WAL to disk on primary before we replicate
+ to ensure the standby never gets ahead of the primary.
+
+
+
+
+
+ Planning for Performance
+
+
+ Synchronous replication usually requires carefully planned and placed
+ standby servers to ensure applications perform acceptably. Waiting
+ doesn't utilise system resources, but transaction locks continue to be
+ held until the transfer is confirmed. As a result, incautious use of
+ synchronous replication will reduce performance for database
+ applications because of increased response times and higher contention.
+
+
+
+ PostgreSQL> allows the application developer
+ to specify the durability level required via replication. This can be
+ specified for the system overall, though it can also be specified for
+ specific users or connections, or even individual transactions.
+
+
+
+ For example, an application workload might consist of:
+ 10% of changes are important customer details, while
+ 90% of changes are less important data that the business can more
+ easily survive if it is lost, such as chat messages between users.
+
+
+
+ With synchronous replication options specified at the application level
+ (on the primary) we can offer sync rep for the most important changes,
+ without slowing down the bulk of the total workload. Application level
+ options are an important and practical tool for allowing the benefits of
+ synchronous replication for high performance applications.
+
+
+
+ You should consider that the network bandwidth must be higher than
+ the rate of generation of WAL data.
+ 10% of changes are important customer details, while
+ 90% of changes are less important data that the business can more
+ easily survive if it is lost, such as chat messages between users.
+
+
+
+
+
+ Planning for High Availability
+
+
+ Commits made when synchronous_replication is set will wait until at
+ the sync standby responds. The response may never occur if the last,
+ or only, standby should crash.
+
+
+
+ The best solution for avoiding data loss is to ensure you don't lose
+ your last remaining sync standby. This can be achieved by naming multiple
+ potential synchronous standbys using synchronous_standby_names>.
+ The first named standby will be used as the synchronous standby. Standbys
+ listed after this will takeover the role of synchronous standby if the
+ first one should fail.
+
+
+
+ When a standby first attaches to the primary, it will not yet be properly
+ synchronized. This is described as CATCHUP> mode. Once
+ the lag between standby and primary reaches zero for the first time
+ we move to real-time STREAMING> state.
+ The catch-up duration may be long immediately after the standby has
+ been created. If the standby is shutdown, then the catch-up period
+ will increase according to the length of time the standby has been down.
+ The standby is only able to become a synchronous standby
+ once it has reached STREAMING> state.
+
+
+
+ If primary restarts while commits are waiting for acknowledgement, those
+ waiting transactions will be marked fully committed once the primary
+ database recovers.
+ There is no way to be certain that all standbys have received all
+ outstanding WAL data at time of the crash of the primary. Some
+ transactions may not show as committed on the standby, even though
+ they show as committed on the primary. The guarantee we offer is that
+ the application will not receive explicit acknowledgement of the
+ successful commit of a transaction until the WAL data is known to be
+ safely received by the standby.
+
+
+
+ If you really do lose your last standby server then you should disable
+ synchronous_standby_names> and restart the primary server.
+
+
+
+ If the primary is isolated from remaining standby severs you should
+ failover to the best candidate of those other remaining standby servers.
+
+
+
+ If you need to re-create a standby server while transactions are
+ waiting, make sure that the commands to run pg_start_backup() and
+ pg_stop_backup() are run in a session with
+ synchronous_replication = off, otherwise those requests will wait
+ forever for the standby to appear.
+
+
+
+
*** a/doc/src/sgml/monitoring.sgml
--- b/doc/src/sgml/monitoring.sgml
***************
*** 306,313 **** postgres: user> database> host> is set or if the user's hostname
needed to be looked up during pg_hba.conf
--- 306,316 ----
location. In addition, the standby reports the last transaction log
position it received and wrote, the last position it flushed to disk,
and the last position it replayed, and this information is also
! displayed here. If the standby's application names matches one of the
! settings in synchronous_standby_names> then the sync_priority
! is shown here also, that is the order in which standbys will become
! the synchronous standby. The columns detailing what exactly the connection
! is doing are only visible if the user examining the view is a superuser.
The client's hostname will be available only if
is set or if the user's hostname
needed to be looked up during pg_hba.conf
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 56,61 ****
--- 56,62 ----
#include "pg_trace.h"
#include "pgstat.h"
#include "replication/walsender.h"
+ #include "replication/syncrep.h"
#include "storage/fd.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
***************
*** 1071,1076 **** EndPrepare(GlobalTransaction gxact)
--- 1072,1085 ----
END_CRIT_SECTION();
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked the prepare, but still show as
+ * running in the procarray (twice!) and continue to hold locks.
+ */
+ SyncRepWaitForLSN(gxact->prepare_lsn);
+
records.tail = records.head = NULL;
}
***************
*** 2030,2035 **** RecordTransactionCommitPrepared(TransactionId xid,
--- 2039,2052 ----
MyProc->inCommit = false;
END_CRIT_SECTION();
+
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(recptr);
}
/*
***************
*** 2109,2112 **** RecordTransactionAbortPrepared(TransactionId xid,
--- 2126,2137 ----
TransactionIdAbortTree(xid, nchildren, children);
END_CRIT_SECTION();
+
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(recptr);
}
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 37,42 ****
--- 37,43 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walsender.h"
+ #include "replication/syncrep.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
***************
*** 1055,1061 **** RecordTransactionCommit(void)
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
*/
! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0)
{
/*
* Synchronous commit case:
--- 1056,1062 ----
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
*/
! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested())
{
/*
* Synchronous commit case:
***************
*** 1125,1130 **** RecordTransactionCommit(void)
--- 1126,1139 ----
/* Compute latestXid while we have the child XIDs handy */
latestXid = TransactionIdLatest(xid, nchildren, children);
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(XactLastRecEnd);
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd.xrecoff = 0;
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 520,526 **** CREATE VIEW pg_stat_replication AS
W.sent_location,
W.write_location,
W.flush_location,
! W.replay_location
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
pg_stat_get_wal_senders() AS W
WHERE S.usesysid = U.oid AND
--- 520,528 ----
W.sent_location,
W.write_location,
W.flush_location,
! W.replay_location,
! W.sync_priority,
! W.sync_state
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
pg_stat_get_wal_senders() AS W
WHERE S.usesysid = U.oid AND
*** a/src/backend/postmaster/autovacuum.c
--- b/src/backend/postmaster/autovacuum.c
***************
*** 1527,1532 **** AutoVacWorkerMain(int argc, char *argv[])
--- 1527,1539 ----
SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
/*
+ * Force synchronous replication off to allow regular maintenance even
+ * if we are waiting for standbys to connect. This is important to
+ * ensure we aren't blocked from performing anti-wraparound tasks.
+ */
+ SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /*
* Get the info about the database we're going to work on.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 13,19 **** top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! repl_gram.o
include $(top_srcdir)/src/backend/common.mk
--- 13,19 ----
include $(top_builddir)/src/Makefile.global
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! repl_gram.o syncrep.o
include $(top_srcdir)/src/backend/common.mk
*** /dev/null
--- b/src/backend/replication/syncrep.c
***************
*** 0 ****
--- 1,460 ----
+ /*-------------------------------------------------------------------------
+ *
+ * syncrep.c
+ *
+ * Synchronous replication is new as of PostgreSQL 9.1.
+ *
+ * If requested, transaction commits wait until their commit LSN is
+ * acknowledged by the sync standby.
+ *
+ * This module contains the code for waiting and release of backends.
+ * All code in this module executes on the primary. The core streaming
+ * replication transport remains within WALreceiver/WALsender modules.
+ *
+ * The essence of this design is that it isolates all logic about
+ * waiting/releasing onto the primary. The primary defines which standbys
+ * it wishes to wait for. The standby is completely unaware of the
+ * durability requirements of transactions on the primary, reducing the
+ * complexity of the code and streamlining both standby operations and
+ * network bandwidth because there is no requirement to ship
+ * per-transaction state information.
+ *
+ * The bookeeping approach we take is that a commit is either synchronous
+ * or not synchronous (async). If it is async, we just fastpath out of
+ * here. If it is sync, then in 9.1 we wait for the flush location on the
+ * standby before releasing the waiting backend. Further complexity
+ * in that interaction is expected in later releases.
+ *
+ * The best performing way to manage the waiting backends is to have a
+ * single ordered queue of waiting backends, so that we can avoid
+ * searching the through all waiters each time we receive a reply.
+ *
+ * In 9.1 we support only a single synchronous standby, chosen from a
+ * priority list of synchronous_standby_names. Before it can become the
+ * synchronous standby it must have caught up with the primary; that may
+ * take some time. Once caught up, the current highest priority standby
+ * will release waiters from the queue.
+ *
+ * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include
+
+ #include "access/xact.h"
+ #include "access/xlog_internal.h"
+ #include "miscadmin.h"
+ #include "postmaster/autovacuum.h"
+ #include "replication/syncrep.h"
+ #include "replication/walsender.h"
+ #include "storage/latch.h"
+ #include "storage/ipc.h"
+ #include "storage/pmsignal.h"
+ #include "storage/proc.h"
+ #include "utils/builtins.h"
+ #include "utils/guc.h"
+ #include "utils/guc_tables.h"
+ #include "utils/memutils.h"
+ #include "utils/ps_status.h"
+
+ /* User-settable parameters for sync rep */
+ bool sync_rep_mode = false; /* Only set in user backends */
+ char *SyncRepStandbyNames;
+
+ static bool announce_next_takeover = true;
+
+ static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN);
+ static void SyncRepQueueInsert(void);
+
+ static int SyncRepGetStandbyPriority(void);
+ static int SyncRepWakeQueue(void);
+
+ /*
+ * ===========================================================
+ * Synchronous Replication functions for normal user backends
+ * ===========================================================
+ */
+
+ /*
+ * Wait for synchronous replication, if requested by user.
+ */
+ void
+ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+ {
+ /*
+ * Fast exit if user has not requested sync replication, or
+ * streaming replication is inactive in this server.
+ */
+ if (!SyncRepRequested() || max_wal_senders == 0)
+ return;
+
+ /*
+ * Wait on queue. We check for a fast exit once we have the lock.
+ */
+ SyncRepWaitOnQueue(XactCommitLSN);
+ }
+
+ void
+ SyncRepCleanupAtProcExit(int code, Datum arg)
+ {
+ if (!SHMQueueIsDetached(&(MyProc->syncrep_links)))
+ {
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ SHMQueueDelete(&(MyProc->syncrep_links));
+ LWLockRelease(SyncRepLock);
+ }
+
+ if (MyProc != NULL)
+ DisownLatch(&MyProc->waitLatch);
+ }
+
+ /*
+ * Wait for specified LSN to be confirmed at the requested level
+ * of durability. Each proc has its own wait latch, so we perform
+ * a normal latch check/wait loop here.
+ */
+ static void
+ SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ char *new_status = NULL;
+ const char *old_status;
+ int len;
+
+ Assert(SHMQueueIsDetached(&(MyProc->syncrep_links)));
+
+ for (;;)
+ {
+ ResetLatch(&MyProc->waitLatch);
+
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+
+ /*
+ * First time through, add ourselves to the queue.
+ */
+ if (SHMQueueIsDetached(&(MyProc->syncrep_links)))
+ {
+ /*
+ * Wait no longer if we have already reached our LSN
+ */
+ if (XLByteLE(XactCommitLSN, walsndctl->lsn))
+ {
+ /* No need to wait */
+ LWLockRelease(SyncRepLock);
+ return;
+ }
+
+ /*
+ * Set our waitLSN so WALSender will know when to wake us.
+ * We set this before we add ourselves to queue, so that
+ * any proc on the queue can be examined freely without
+ * taking a lock on each process in the queue.
+ */
+ MyProc->waitLSN = XactCommitLSN;
+ SyncRepQueueInsert();
+ LWLockRelease(SyncRepLock);
+
+ /*
+ * Alter ps display to show waiting for sync rep.
+ */
+ if (update_process_title)
+ {
+ old_status = get_ps_display(&len);
+ new_status = (char *) palloc(len + 32 + 1);
+ memcpy(new_status, old_status, len);
+ sprintf(new_status + len, " waiting for %X/%X",
+ XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
+ set_ps_display(new_status, false);
+ new_status[len] = '\0'; /* truncate off " waiting ..." */
+ }
+ }
+ else
+ {
+ /*
+ * Check the LSN on our queue and if it's moved far enough then
+ * remove us from the queue. First time through this is
+ * unlikely to be far enough, yet is possible. Next time we are
+ * woken we should be more lucky.
+ */
+ if (XLByteLE(XactCommitLSN, walsndctl->lsn))
+ {
+ SHMQueueDelete(&(MyProc->syncrep_links));
+ LWLockRelease(SyncRepLock);
+
+ /*
+ * Reset our waitLSN.
+ */
+ MyProc->waitLSN.xlogid = 0;
+ MyProc->waitLSN.xrecoff = 0;
+
+ if (new_status)
+ {
+ /* Reset ps display */
+ set_ps_display(new_status, false);
+ pfree(new_status);
+ }
+
+ ereport(DEBUG3,
+ (errmsg("synchronous replication wait for %X/%X complete at %s",
+ XactCommitLSN.xlogid,
+ XactCommitLSN.xrecoff,
+ timestamptz_to_str(GetCurrentTimestamp()))));
+ return;
+ }
+
+ LWLockRelease(SyncRepLock);
+ }
+
+ WaitLatch(&MyProc->waitLatch, -1);
+ }
+ }
+
+ /*
+ * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
+ *
+ * Usually we will go at tail of queue, though its possible that we arrive
+ * here out of order, so start at tail and work back to insertion point.
+ */
+ static void
+ SyncRepQueueInsert(void)
+ {
+ PGPROC *proc;
+
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
+ &(WalSndCtl->SyncRepQueue),
+ offsetof(PGPROC, syncrep_links));
+
+ while (proc)
+ {
+ /*
+ * Stop at the queue element that we should after to
+ * ensure the queue is ordered by LSN.
+ */
+ if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
+ break;
+
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
+ &(proc->syncrep_links),
+ offsetof(PGPROC, syncrep_links));
+ }
+
+ if (proc)
+ SHMQueueInsertAfter(&(proc->syncrep_links), &(MyProc->syncrep_links));
+ else
+ SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncrep_links));
+ }
+
+ /*
+ * ===========================================================
+ * Synchronous Replication functions for wal sender processes
+ * ===========================================================
+ */
+
+ /*
+ * Take any action required to initialise sync rep state from config
+ * data. Called at WALSender startup and after each SIGHUP.
+ */
+ void
+ SyncRepInitConfig(void)
+ {
+ int priority;
+
+ /*
+ * Determine if we are a potential sync standby and remember the result
+ * for handling replies from standby.
+ */
+ priority = SyncRepGetStandbyPriority();
+ if (MyWalSnd->sync_standby_priority != priority)
+ {
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ MyWalSnd->sync_standby_priority = priority;
+ LWLockRelease(SyncRepLock);
+ ereport(DEBUG1,
+ (errmsg("standby \"%s\" now has synchronous standby priority %u",
+ application_name, priority)));
+ }
+ }
+
+ /*
+ * Update the LSNs on each queue based upon our latest state. This
+ * implements a simple policy of first-valid-standby-releases-waiter.
+ *
+ * Other policies are possible, which would change what we do here and what
+ * perhaps also which information we store as well.
+ */
+ void
+ SyncRepReleaseWaiters(void)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ volatile WalSnd *syncWalSnd = NULL;
+ int numprocs = 0;
+ int priority = 0;
+ int i;
+
+ /*
+ * If this WALSender is serving a standby that is not on the list of
+ * potential standbys then we have nothing to do. If we are still
+ * starting up or still running base backup, then leave quicly also.
+ */
+ if (MyWalSnd->sync_standby_priority == 0 ||
+ MyWalSnd->state < WALSNDSTATE_STREAMING)
+ return;
+
+ /*
+ * We're a potential sync standby. Release waiters if we are the
+ * highest priority standby.
+ */
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &walsndctl->walsnds[i];
+
+ if (walsnd->pid != 0 &&
+ walsnd->sync_standby_priority > 0 &&
+ (priority == 0 ||
+ priority > walsnd->sync_standby_priority))
+ {
+ priority = walsnd->sync_standby_priority;
+ syncWalSnd = walsnd;
+ }
+ }
+
+ /*
+ * We should have found ourselves at least.
+ */
+ Assert(syncWalSnd);
+
+ /*
+ * If we aren't managing the highest priority standby then just leave.
+ */
+ if (syncWalSnd != MyWalSnd)
+ {
+ LWLockRelease(SyncRepLock);
+ announce_next_takeover = true;
+ return;
+ }
+
+ if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
+ {
+ /*
+ * Set the lsn first so that when we wake backends they will
+ * release up to this location.
+ */
+ walsndctl->lsn = MyWalSnd->flush;
+ numprocs = SyncRepWakeQueue();
+ }
+
+ LWLockRelease(SyncRepLock);
+
+ elog(DEBUG3, "released %d procs up to %X/%X",
+ numprocs,
+ MyWalSnd->flush.xlogid,
+ MyWalSnd->flush.xrecoff);
+
+ /*
+ * If we are managing the highest priority standby, though we weren't
+ * prior to this, then announce we are now the sync standby.
+ */
+ if (announce_next_takeover)
+ {
+ announce_next_takeover = false;
+ ereport(LOG,
+ (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
+ application_name, MyWalSnd->sync_standby_priority)));
+ }
+ }
+
+ /*
+ * Check if we are in the list of sync standbys, and if so, determine
+ * priority sequence. Return priority if set, or zero to indicate that
+ * we are not a potential sync standby.
+ *
+ * Compare the parameter SyncRepStandbyNames against the application_name
+ * for this WALSender, or allow any name if we find a wildcard "*".
+ */
+ static int
+ SyncRepGetStandbyPriority(void)
+ {
+ char *rawstring;
+ List *elemlist;
+ ListCell *l;
+ int priority = 0;
+ bool found = false;
+
+ /* Need a modifiable copy of string */
+ rawstring = pstrdup(SyncRepStandbyNames);
+
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawstring, ',', &elemlist))
+ {
+ /* syntax error in list */
+ pfree(rawstring);
+ list_free(elemlist);
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for parameter \"synchronous_standby_names\"")));
+ return 0;
+ }
+
+ foreach(l, elemlist)
+ {
+ char *standby_name = (char *) lfirst(l);
+
+ priority++;
+
+ if (pg_strcasecmp(standby_name, application_name) == 0 ||
+ pg_strcasecmp(standby_name, "*") == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ pfree(rawstring);
+ list_free(elemlist);
+
+ return (found ? priority : 0);
+ }
+
+ /*
+ * Walk queue from head setting the latches of any procs that need
+ * to be woken. We don't modify the queue, we leave that for individual
+ * procs to release themselves.
+ *
+ * Must hold SyncRepLock
+ */
+ static int
+ SyncRepWakeQueue(void)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ PGPROC *proc;
+ int numprocs = 0;
+
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ &(WalSndCtl->SyncRepQueue),
+ offsetof(PGPROC, syncrep_links));
+
+ while (proc)
+ {
+ /*
+ * Assume the queue is ordered by LSN
+ */
+ if (XLByteLT(walsndctl->lsn, proc->waitLSN))
+ return numprocs;
+
+ numprocs++;
+ SetLatch(&(proc->waitLatch));
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ &(proc->syncrep_links),
+ offsetof(PGPROC, syncrep_links));
+ }
+
+ return numprocs;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 66,72 ****
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
! static WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
--- 66,72 ----
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
! WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
***************
*** 174,179 **** WalSenderMain(void)
--- 174,181 ----
SpinLockRelease(&walsnd->mutex);
}
+ SyncRepInitConfig();
+
/* Main loop of walsender */
return WalSndLoop();
}
***************
*** 584,589 **** ProcessStandbyReplyMessage(void)
--- 586,593 ----
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
+
+ SyncRepReleaseWaiters();
}
/*
***************
*** 700,705 **** WalSndLoop(void)
--- 704,710 ----
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
}
/*
***************
*** 771,777 **** WalSndLoop(void)
--- 776,787 ----
* that point might wait for some time.
*/
if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
+ {
+ ereport(DEBUG1,
+ (errmsg("standby \"%s\" has now caught up with primary",
+ application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
+ }
ProcessRepliesIfAny();
}
***************
*** 1238,1243 **** WalSndShmemInit(void)
--- 1248,1255 ----
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
+ SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+
for (i = 0; i < max_wal_senders; i++)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
***************
*** 1304,1315 **** WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
! #define PG_STAT_GET_WAL_SENDERS_COLS 6
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
int i;
/* check to see if caller supports us returning a tuplestore */
--- 1316,1330 ----
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
! #define PG_STAT_GET_WAL_SENDERS_COLS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
+ int sync_priority[max_wal_senders];
+ int priority = 0;
+ int sync_standby = -1;
int i;
/* check to see if caller supports us returning a tuplestore */
***************
*** 1337,1342 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1352,1384 ----
MemoryContextSwitchTo(oldcontext);
+ /*
+ * Get the priorities of sync standbys all in one go, to minimise
+ * lock acquisitions and to allow us to evaluate who is the current
+ * sync standby.
+ */
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid != 0)
+ {
+ sync_priority[i] = walsnd->sync_standby_priority;
+
+ if (walsnd->state == WALSNDSTATE_STREAMING &&
+ walsnd->sync_standby_priority > 0 &&
+ (priority == 0 ||
+ priority > walsnd->sync_standby_priority))
+ {
+ priority = walsnd->sync_standby_priority;
+ sync_standby = i;
+ }
+ }
+ }
+ LWLockRelease(SyncRepLock);
+
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
***************
*** 1370,1380 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* Only superusers can see details. Other users only get
* the pid value to know it's a walsender, but no details.
*/
! nulls[1] = true;
! nulls[2] = true;
! nulls[3] = true;
! nulls[4] = true;
! nulls[5] = true;
}
else
{
--- 1412,1418 ----
* Only superusers can see details. Other users only get
* the pid value to know it's a walsender, but no details.
*/
! MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
}
else
{
***************
*** 1401,1406 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1439,1457 ----
snprintf(location, sizeof(location), "%X/%X",
apply.xlogid, apply.xrecoff);
values[5] = CStringGetTextDatum(location);
+
+ values[6] = Int32GetDatum(sync_priority[i]);
+
+ /*
+ * More easily understood version of standby state.
+ * This is purely informational, not different from priority.
+ */
+ if (sync_priority[i] == 0)
+ values[7] = CStringGetTextDatum("ASYNC");
+ else if (i == sync_standby)
+ values[7] = CStringGetTextDatum("SYNC");
+ else
+ values[7] = CStringGetTextDatum("POTENTIAL");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
*** a/src/backend/storage/ipc/shmqueue.c
--- b/src/backend/storage/ipc/shmqueue.c
***************
*** 104,110 **** SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem)
* element. Inserting "after" the queue head puts the elem
* at the head of the queue.
*/
- #ifdef NOT_USED
void
SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
{
--- 104,109 ----
***************
*** 118,124 **** SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
queue->next = elem;
nextPtr->prev = elem;
}
- #endif /* NOT_USED */
/*--------------------
* SHMQueueNext -- Get the next element from a queue
--- 117,122 ----
***************
*** 156,161 **** SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
--- 154,178 ----
return (Pointer) (((char *) elemPtr) - linkOffset);
}
+ /*--------------------
+ * SHMQueuePrev -- Get the previous element from a queue
+ *
+ * Same as SHMQueueNext, just starting at tail and moving towards head
+ * All other comments and usage applies.
+ */
+ Pointer
+ SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
+ {
+ SHM_QUEUE *elemPtr = curElem->prev;
+
+ Assert(ShmemAddrIsValid(curElem));
+
+ if (elemPtr == queue) /* back to the queue head? */
+ return NULL;
+
+ return (Pointer) (((char *) elemPtr) - linkOffset);
+ }
+
/*
* SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise
*/
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 39,44 ****
--- 39,45 ----
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+ #include "replication/syncrep.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/pmsignal.h"
***************
*** 196,201 **** InitProcGlobal(void)
--- 197,203 ----
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
ProcGlobal->freeProcs = &procs[i];
+ InitSharedLatch(&procs[i].waitLatch);
}
/*
***************
*** 214,219 **** InitProcGlobal(void)
--- 216,222 ----
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
ProcGlobal->autovacFreeProcs = &procs[i];
+ InitSharedLatch(&procs[i].waitLatch);
}
/*
***************
*** 224,229 **** InitProcGlobal(void)
--- 227,233 ----
{
AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */
PGSemaphoreCreate(&(AuxiliaryProcs[i].sem));
+ InitSharedLatch(&procs[i].waitLatch);
}
/* Create ProcStructLock spinlock, too */
***************
*** 326,331 **** InitProcess(void)
--- 330,341 ----
SHMQueueInit(&(MyProc->myProcLocks[i]));
MyProc->recoveryConflictPending = false;
+ /* Initialise the waitLSN for sync rep */
+ MyProc->waitLSN.xlogid = 0;
+ MyProc->waitLSN.xrecoff = 0;
+
+ OwnLatch((Latch *) &MyProc->waitLatch);
+
/*
* We might be reusing a semaphore that belonged to a failed process. So
* be careful and reinitialize its value here. (This is not strictly
***************
*** 365,370 **** InitProcessPhase2(void)
--- 375,381 ----
/*
* Arrange to clean that up at backend exit.
*/
+ on_shmem_exit(SyncRepCleanupAtProcExit, 0);
on_shmem_exit(RemoveProcFromArray, 0);
}
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 2628,2633 **** die(SIGNAL_ARGS)
--- 2628,2638 ----
ProcDiePending = true;
/*
+ * Set this proc's wait latch to stop waiting
+ */
+ SetLatch(&(MyProc->waitLatch));
+
+ /*
* If it's safe to interrupt, and we're waiting for input or a lock,
* service the interrupt immediately
*/
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 55,60 ****
--- 55,61 ----
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
+ #include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
***************
*** 754,759 **** static struct config_bool ConfigureNamesBool[] =
--- 755,768 ----
true, NULL, NULL
},
{
+ {"synchronous_replication", PGC_USERSET, WAL_REPLICATION,
+ gettext_noop("Requests synchronous replication."),
+ NULL
+ },
+ &sync_rep_mode,
+ false, NULL, NULL
+ },
+ {
{"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Continues processing past damaged page headers."),
gettext_noop("Detection of a damaged page header normally causes PostgreSQL to "
***************
*** 2717,2722 **** static struct config_string ConfigureNamesString[] =
--- 2726,2741 ----
},
{
+ {"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION,
+ gettext_noop("List of potential standby names to synchronise with."),
+ NULL,
+ GUC_LIST_INPUT
+ },
+ &SyncRepStandbyNames,
+ "", NULL, NULL
+ },
+
+ {
{"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,
gettext_noop("Sets default text search configuration."),
NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 184,190 ****
#archive_timeout = 0 # force a logfile segment switch after this
# number of seconds; 0 disables
! # - Streaming Replication -
#max_wal_senders = 0 # max number of walsender processes
# (change requires restart)
--- 184,199 ----
#archive_timeout = 0 # force a logfile segment switch after this
# number of seconds; 0 disables
! # - Replication - User Settings
!
! #synchronous_replication = off # does commit wait for reply from standby
!
! # - Streaming Replication - Server Settings
!
! #synchronous_standby_names = '' # standby servers that provide sync rep
! # comma-separated list of application_name from standby(s);
! # '*' = all
!
#max_wal_senders = 0 # max number of walsender processes
# (change requires restart)
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 2542,2548 **** DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
--- 2542,2548 ----
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23,25}" "{o,o,o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
*** /dev/null
--- b/src/include/replication/syncrep.h
***************
*** 0 ****
--- 1,37 ----
+ /*-------------------------------------------------------------------------
+ *
+ * syncrep.h
+ * Exports from replication/syncrep.c.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef _SYNCREP_H
+ #define _SYNCREP_H
+
+ #include "access/xlog.h"
+ #include "storage/proc.h"
+ #include "storage/shmem.h"
+ #include "storage/spin.h"
+
+ #define SyncRepRequested() (sync_rep_mode)
+
+ /* user-settable parameters for synchronous replication */
+ extern bool sync_rep_mode;
+ extern int sync_rep_timeout;
+ extern char *SyncRepStandbyNames;
+
+ /* called by user backend */
+ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
+
+ /* callback at backend exit */
+ extern void SyncRepCleanupAtProcExit(int code, Datum arg);
+
+ /* called by wal sender */
+ extern void SyncRepInitConfig(void);
+ extern void SyncRepReleaseWaiters(void);
+
+ #endif /* _SYNCREP_H */
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 15,20 ****
--- 15,21 ----
#include "access/xlog.h"
#include "nodes/nodes.h"
#include "storage/latch.h"
+ #include "replication/syncrep.h"
#include "storage/spin.h"
***************
*** 52,62 **** typedef struct WalSnd
--- 53,84 ----
* to do.
*/
Latch latch;
+
+ /*
+ * The priority order of the standby managed by this WALSender, as
+ * listed in synchronous_standby_names, or 0 if not-listed.
+ * Protected by SyncRepLock.
+ */
+ int sync_standby_priority;
} WalSnd;
+ extern WalSnd *MyWalSnd;
+
/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
+ /*
+ * Synchronous replication queue. Protected by SyncRepLock.
+ */
+ SHM_QUEUE SyncRepQueue;
+
+ /*
+ * Current location of the head of the queue. All waiters should have
+ * a waitLSN that follows this value, or they are currently being woken
+ * to remove themselves from the queue. Protected by SyncRepLock.
+ */
+ XLogRecPtr lsn;
+
WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 78,83 **** typedef enum LWLockId
--- 78,84 ----
SerializableFinishedListLock,
SerializablePredicateLockListLock,
OldSerXidLock,
+ SyncRepLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 14,19 ****
--- 14,21 ----
#ifndef _PROC_H_
#define _PROC_H_
+ #include "access/xlog.h"
+ #include "storage/latch.h"
#include "storage/lock.h"
#include "storage/pg_sema.h"
#include "utils/timestamp.h"
***************
*** 115,120 **** struct PGPROC
--- 117,128 ----
LOCKMASK heldLocks; /* bitmask for lock types already held on this
* lock object by this backend */
+ /* Info to allow us to wait for synchronous replication, if needed. */
+ Latch waitLatch;
+ XLogRecPtr waitLSN; /* waiting for this LSN or higher */
+
+ SHM_QUEUE syncrep_links; /* list link if process is in syncrep list */
+
/*
* All PROCLOCK objects for locks held or awaited by this backend are
* linked into one of these lists, according to the partition number of
*** a/src/include/storage/shmem.h
--- b/src/include/storage/shmem.h
***************
*** 67,74 **** extern void SHMQueueInit(SHM_QUEUE *queue);
--- 67,77 ----
extern void SHMQueueElemInit(SHM_QUEUE *queue);
extern void SHMQueueDelete(SHM_QUEUE *queue);
extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
+ extern void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem);
extern Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem,
Size linkOffset);
+ extern Pointer SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem,
+ Size linkOffset);
extern bool SHMQueueEmpty(const SHM_QUEUE *queue);
extern bool SHMQueueIsDetached(const SHM_QUEUE *queue);
*** a/src/test/regress/expected/rules.out
--- b/src/test/regress/expected/rules.out
***************
*** 1298,1304 **** SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem
pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
! pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));
--- 1298,1304 ----
pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
! pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority, w.sync_state FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));