diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ad213fc454..e4ddbf2c44 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -711,14 +711,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -726,7 +736,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* @@ -780,14 +790,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) */ for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -796,7 +816,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2723612718..25f12e0706 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1392,23 +1392,13 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) TimestampTz latest_end_time; char *slotname; char *conninfo; - - /* - * No WAL receiver (or not ready yet), just return a tuple with NULL - * values - */ - if (walrcv->pid == 0 || !walrcv->ready_to_display) - PG_RETURN_NULL(); - - /* determine result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - values = palloc0(sizeof(Datum) * tupdesc->natts); - nulls = palloc0(sizeof(bool) * tupdesc->natts); + int pid; + bool ready_to_display; /* Take a lock to ensure value consistency */ SpinLockAcquire(&walrcv->mutex); + pid = walrcv->pid; + ready_to_display = walrcv->ready_to_display; state = walrcv->walRcvState; receive_start_lsn = walrcv->receiveStart; receive_start_tli = walrcv->receiveStartTLI; @@ -1422,8 +1412,22 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) conninfo = pstrdup(walrcv->conninfo); SpinLockRelease(&walrcv->mutex); + /* + * No WAL receiver (or not ready yet), just return a tuple with NULL + * values + */ + if (pid == 0 || !ready_to_display) + PG_RETURN_NULL(); + + /* determine result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + values = palloc0(sizeof(Datum) * tupdesc->natts); + nulls = palloc0(sizeof(bool) * tupdesc->natts); + /* Fetch values */ - values[0] = Int32GetDatum(walrcv->pid); + values[0] = Int32GetDatum(pid); if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS)) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 976a42f86d..13d55826f9 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2877,10 +2877,12 @@ WalSndRqstFileReload(void) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } walsnd->needreload = true; SpinLockRelease(&walsnd->mutex); } @@ -3195,14 +3197,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) TimeOffset flushLag; TimeOffset applyLag; int priority; + int pid; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } + pid = walsnd->pid; sentPtr = walsnd->sentPtr; state = walsnd->state; write = walsnd->write; @@ -3215,7 +3221,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); - values[0] = Int32GetDatum(walsnd->pid); + values[0] = Int32GetDatum(pid); if (!superuser()) { diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 31d090c99d..44199670b1 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -114,6 +114,9 @@ typedef struct */ char slotname[NAMEDATALEN]; + /* set true once conninfo is ready to display (obfuscated pwds etc) */ + bool ready_to_display; + slock_t mutex; /* locks shared variables shown above */ /* @@ -122,9 +125,6 @@ typedef struct */ bool force_reply; - /* set true once conninfo is ready to display (obfuscated pwds etc) */ - bool ready_to_display; - /* * Latch used by startup process to wake up walreceiver after telling it * where to start streaming (after setting receiveStart and