diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 325239d..ae84959 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -59,9 +59,16 @@ /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; +int synchronous_replication_method; +int synchronous_standby_num; #define SyncStandbysDefined() \ - (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') + (SyncRepStandbyNames != NULL && \ + SyncRepStandbyNames[0] != '\0' && \ + synchronous_standby_num > 0) + +#define SyncRepMethodIsQuorum() \ + synchronous_replication_method == SYNC_REP_METHOD_QUORUM static bool announce_next_takeover = true; @@ -73,6 +80,8 @@ static int SyncRepWakeQueue(bool all, int mode); static int SyncRepGetStandbyPriority(void); +static int comp_lsn(const void *a, const void *b); + #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); #endif @@ -349,16 +358,18 @@ SyncRepInitConfig(void) } /* - * Find the WAL sender servicing the synchronous standby with the lowest - * priority value, or NULL if no synchronous standby is connected. If there - * are multiple standbys with the same lowest priority value, the first one - * found is selected. The caller must hold SyncRepLock. + * Obtain three palloc'd arrays containing position of standbys currently + * considered as synchronous, write/flush LSN position of each sync node. */ -WalSnd * -SyncRepGetSynchronousStandby(void) + +int +SyncRepGetSynchronousStandbys(int *sync_standbys, XLogRecPtr *write_pos_list, + XLogRecPtr *flush_pos_list) { WalSnd *result = NULL; int result_priority = 0; + int priority = 0; + int num_sync = 0; int i; for (i = 0; i < max_wal_senders; i++) @@ -366,6 +377,7 @@ SyncRepGetSynchronousStandby(void) /* Use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; int this_priority; + int j; /* Must be active */ if (walsnd->pid == 0) @@ -388,18 +400,50 @@ SyncRepGetSynchronousStandby(void) if (XLogRecPtrIsInvalid(walsnd->flush)) continue; - result = (WalSnd *) walsnd; - result_priority = this_priority; - - /* - * If priority is equal to 1, there cannot be any other WAL senders - * with a lower priority, so we're done. - */ - if (this_priority == 1) - return result; + if (SyncRepMethodIsQuorum()) + { + /* Just add LSN to array */ + sync_standbys[num_sync] = i; + SpinLockAcquire(&walsnd->mutex); + write_pos_list[num_sync] = walsnd->write; + flush_pos_list[num_sync] = walsnd->flush; + SpinLockRelease(&walsnd->mutex); + num_sync++; + } + else /* PRIORITY method */ + { + if (num_sync == synchronous_standby_num) + { + int new_priority = 0; + + for (j = 0; j < num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]]; + + if (walsndloc->sync_standby_priority == priority && + walsnd->sync_standby_priority < priority) + sync_standbys[j] = i; + + /* Update highest priority standby */ + if (new_priority < walsndloc->sync_standby_priority) + new_priority = walsndloc->sync_standby_priority; + } + + priority = new_priority; + } + else + { + sync_standbys[num_sync] = i; + num_sync++; + + /* Keep track highest priority standby */ + if (priority < walsnd->sync_standby_priority) + priority = walsnd->sync_standby_priority; + } + } } - return result; + return num_sync; } /* @@ -413,9 +457,15 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - WalSnd *syncWalSnd; + int *sync_standbys; + XLogRecPtr *write_pos_list, *flush_pos_list; + XLogRecPtr write_pos; + XLogRecPtr flush_pos; + int num_sync = 0; int numwrite = 0; int numflush = 0; + int i; + bool found = false; /* * If this WALSender is serving a standby that is not on the list of @@ -428,38 +478,116 @@ SyncRepReleaseWaiters(void) XLogRecPtrIsInvalid(MyWalSnd->flush)) return; + sync_standbys = (int *) palloc(sizeof(int) * synchronous_standby_num); + + /* The write/flush_pos_list is needed only by quorum method */ + if (SyncRepMethodIsQuorum()) + { + write_pos_list = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * max_wal_senders); + flush_pos_list = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * max_wal_senders); + } + /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - syncWalSnd = SyncRepGetSynchronousStandby(); + num_sync = SyncRepGetSynchronousStandbys(sync_standbys, write_pos_list, flush_pos_list); /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); + Assert(num_sync > 0); /* * If we aren't managing the highest priority standby then just leave. */ - if (syncWalSnd != MyWalSnd) + for (i = 0; i < num_sync; i++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; + if (walsndloc == MyWalSnd) + { + found = true; + break; + } + } + + /* + * We are definitely not one of the chosen... But we could by + * taking the next take over + */ + if (!found) { LWLockRelease(SyncRepLock); + pfree(sync_standbys); + if (write_pos_list) + pfree(write_pos_list); + if (flush_pos_list) + pfree(flush_pos_list); announce_next_takeover = true; return; } /* + * Even if we are one of the chosen standbys, leave if there + * are less synchronous standbys in waiting state than what is + * expected by the user. + */ + if (num_sync < synchronous_standby_num) + { + LWLockRelease(SyncRepLock); + pfree(sync_standbys); + if (write_pos_list) + pfree(write_pos_list); + if (flush_pos_list) + pfree(flush_pos_list); + return; + } + + + if (SyncRepMethodIsQuorum()) + { + + /* + * In quorum commit method, we sort LSN of each wal senders + * in desc order, in order to decide LSN. + */ + qsort((void *) write_pos_list, num_sync, sizeof(XLogRecPtr), comp_lsn); + qsort((void *) flush_pos_list, num_sync, sizeof(XLogRecPtr), comp_lsn); + + write_pos = write_pos_list[synchronous_standby_num - 1]; + flush_pos = flush_pos_list[synchronous_standby_num - 1]; + } + else /* PRIORITY Method */ + { + write_pos = MyWalSnd->write; + flush_pos = MyWalSnd->flush; + + for (i = 0; i < num_sync; i++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsndloc->mutex); + + /* Find lowest XLogRecPtr of both write and flush from sync_nodes */ + if (write_pos > walsndloc->write) + write_pos = walsndloc->write; + if (flush_pos > walsndloc->flush) + flush_pos = walsndloc->flush; + + SpinLockRelease(&walsndloc->mutex); + } + } + + /* * Set the lsn first so that when we wake backends they will release up to * this location. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < write_pos) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flush_pos) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } @@ -480,6 +608,14 @@ SyncRepReleaseWaiters(void) (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } + + /* Clean up */ + if (sync_standbys) + pfree(sync_standbys); + if (write_pos_list) + pfree(write_pos_list); + if (flush_pos_list) + pfree(flush_pos_list); } /* @@ -506,6 +642,10 @@ SyncRepGetStandbyPriority(void) if (am_cascading_walsender) return 0; + /* If no synchronous standby allowed, no cake for this WAL sender */ + if (synchronous_standby_num == 0) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); @@ -529,6 +669,11 @@ SyncRepGetStandbyPriority(void) pg_strcasecmp(standby_name, "*") == 0) { found = true; + + /* All standby's priority is 1, if quorum mothod */ + if (SyncRepMethodIsQuorum()) + priority = 1; + break; } } @@ -643,6 +788,21 @@ SyncRepUpdateSyncStandbysDefined(void) } } +static int +comp_lsn(const void *a, const void *b) +{ + XLogRecPtr *lsn1 = (XLogRecPtr *) a; + XLogRecPtr *lsn2 = (XLogRecPtr *) b; + int res; + + if (*lsn1 <= *lsn2) + res = 1; + else + res = -1; + + return res; +} + #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c6043cd..9d4cccd 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2729,9 +2729,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - WalSnd *sync_standby; + int *sync_standbys; + XLogRecPtr *write_pos_list; + XLogRecPtr *flush_pos_list; + int num_sync; int i; + /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, @@ -2757,11 +2761,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); + sync_standbys = (int *) palloc(sizeof(int) * synchronous_standby_num); + write_pos_list = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * max_wal_senders); + flush_pos_list = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * max_wal_senders); + /* - * Get the currently active synchronous standby. + * Get the currently active synchronous standbys. */ LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standby = SyncRepGetSynchronousStandby(); + num_sync = SyncRepGetSynchronousStandbys(sync_standbys, write_pos_list, flush_pos_list); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) @@ -2831,18 +2839,42 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[7] = CStringGetTextDatum("async"); - else if (walsnd == sync_standby) - values[7] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + { + int j; + bool found = false; + + for (j = 0; j < num_sync; j++) + { + /* Found sync standby */ + if (i == sync_standbys[j]) + { + values[7] = CStringGetTextDatum("sync"); + found = true; + break; + } + } + if (!found) + values[7] = CStringGetTextDatum("potential"); + } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + /* Clean up */ + if (sync_standbys) + pfree(sync_standbys); + if (write_pos_list) + pfree(write_pos_list); + if (flush_pos_list) + pfree(flush_pos_list); + + /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); + return (Datum) 0; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 71090f2..0277190 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -310,6 +310,12 @@ static const struct config_enum_entry xmloption_options[] = { {NULL, 0, false} }; +static const struct config_enum_entry synchronous_replication_method_options[] = { + {"priority", SYNC_REP_METHOD_PRIORITY, false}, + {"quorum", SYNC_REP_METHOD_QUORUM, false}, + {NULL, 0, false} +}; + /* * Although only "on", "off", and "safe_encoding" are documented, we * accept all the likely variants of "on" and "off". @@ -2664,6 +2670,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Number of potential synchronous standbys."), + NULL + }, + &synchronous_standby_num, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -3653,6 +3669,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"synchronous_replication_method", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Method for multiple synchronous replication."), + NULL + }, + &synchronous_replication_method, + SYNC_REP_METHOD_PRIORITY, synchronous_replication_method_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index dcf929f..e03f38d 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -241,6 +241,8 @@ #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all +#synchronous_standby_num = 0 # number of standby servers using sync rep +#synchronous_replication_method = quorum # quorum, priority #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 71e2857..c71e30d 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -31,8 +31,14 @@ #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* SyncRepMethod */ +#define SYNC_REP_METHOD_PRIORITY 0 +#define SYNC_REP_METHOD_QUORUM 1 + /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; +extern int synchronous_replication_method; +extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); @@ -49,7 +55,8 @@ extern void SyncRepUpdateSyncStandbysDefined(void); /* forward declaration to avoid pulling in walsender_private.h */ struct WalSnd; -extern struct WalSnd *SyncRepGetSynchronousStandby(void); +extern int SyncRepGetSynchronousStandbys(int *sync_standbys, XLogRecPtr *write_pos_list, + XLogRecPtr *flush_pos_list); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);