diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index cca28f4..6682b71 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -868,7 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr, uint64 *restBytes); +static XLogSegNo GetOldestKeepSegment(XLogSegNo *slotSegNo); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -9529,18 +9529,21 @@ GetOldestXLogFileSegNo(void) * 1 means that WAL record at tagetLSN is availble. * 2 means that WAL record at tagetLSN is availble but about to be removed by * the next checkpoint. + * + * Also return restSegs_p which means how far the targetLSN away from the critical + * point where it lost WAL segment file. If the targetLSN already lost WALs we + * set it to 0. */ int -IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes) +IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restSegs_p) { XLogRecPtr currpos; - XLogRecPtr slotPtr; XLogSegNo targetSeg; XLogSegNo tailSeg; XLogSegNo oldestSeg; Assert(!XLogRecPtrIsInvalid(targetLSN)); - Assert(restBytes); + Assert(restSegs_p); currpos = GetXLogWriteRecPtr(); @@ -9566,9 +9569,10 @@ IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes) oldestSeg++; XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + tailSeg = GetOldestKeepSegment(NULL); - slotPtr = XLogGetReplicationSlotMinimumLSN(); - tailSeg = GetOldestKeepSegment(currpos, slotPtr, restBytes); + /* Calculate how far this slot away from the tailSeg */ + *restSegs_p = (tailSeg > targetSeg) ? 0 : targetSeg - tailSeg; /* targetSeg is being reserved by slots */ if (tailSeg <= targetSeg) @@ -9586,71 +9590,58 @@ IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes) * Returns minimum segment number the next checktpoint must leave considering * wal_keep_segments, replication slots and max_slot_wal_keep_size. * - * If resetBytes is not NULL, returns remaining LSN bytes to advance until any - * slot loses reserving a WAL record. + * If slotSegNo_p is given, returns the XLogSegNo of the minimum LSN of + * replication slots. */ static XLogSegNo -GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, uint64 *restBytes) +GetOldestKeepSegment(XLogSegNo *slotSegNo_p) { + XLogRecPtr currptr; + XLogSegNo currSegNo; + XLogSegNo keepSegNo; uint64 keepSegs = 0; - XLogSegNo currSeg; - XLogSegNo slotSeg; - XLByteToSeg(currLSN, currSeg, wal_segment_size); - XLByteToSeg(minSlotLSN, slotSeg, wal_segment_size); - - /* - * Calculate keep segments by slots first. The second term of the - * condition is just a sanity check. - */ - if (minSlotLSN != InvalidXLogRecPtr && slotSeg <= currSeg) - keepSegs = currSeg - slotSeg; - - if (restBytes) - *restBytes = 0; + currptr = GetXLogWriteRecPtr(); + XLByteToSeg(currptr, currSegNo, wal_segment_size); + keepSegNo = currSegNo; /* * Calculate number of segments to keep ignoring segment fragment. If - * requested, return remaining LSN bytes to advance until the slot gives + * requested, return remaining XLogSegNo to advance until the slot gives * up to reserve WAL records. */ - if (max_slot_wal_keep_size_mb > 0) + if (max_replication_slots > 0) { - uint64 limitSegs; + XLogRecPtr slotminptr; + XLogSegNo slotSegNo; - limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + slotminptr = XLogGetReplicationSlotMinimumLSN(); + XLByteToSeg(slotminptr, slotSegNo, wal_segment_size); - if (limitSegs < keepSegs) - { - /* This slot gave up to retain reserved WAL records. */ - keepSegs = limitSegs; - } - else if (restBytes) - { - /* calculate return rest bytes until this slot loses WAL */ - uint64 fragbytes; + Assert(slotSegNo <= currSegNo); + keepSegs = currSegNo - slotSegNo; - /* If wal_keep_segments may be larger than slot limit. However - * it's a rather useless configuration, we should consider the - * case anyway. - */ - if (limitSegs < wal_keep_segments) - limitSegs = wal_keep_segments; + if (slotSegNo_p) + *slotSegNo_p = slotSegNo; + + if (max_slot_wal_keep_size_mb > 0) + { + XLogSegNo slotlimitSegs; - fragbytes = wal_segment_size - (currLSN % wal_segment_size); - *restBytes = (limitSegs - keepSegs) * wal_segment_size + fragbytes; + slotlimitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + keepSegs = slotlimitSegs; } } /* but, keep at least wal_keep_segments segments if any */ - if (wal_keep_segments > 0 && keepSegs < wal_keep_segments) - keepSegs = wal_keep_segments; + if (wal_keep_segments > 0 && wal_keep_segments > keepSegs) + keepSegNo = wal_keep_segments; /* avoid underflow, don't go below 1 */ - if (currSeg <= keepSegs) + if (currSegNo <= keepSegs) return 1; - return currSeg - keepSegs; + return currSegNo - keepSegs; } /* @@ -9665,43 +9656,33 @@ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { XLogSegNo segno; - XLogRecPtr slotminptr = InvalidXLogRecPtr; XLogSegNo minSegNo; XLogSegNo slotSegNo; + static XLogSegNo prev_lost_segs = 0; /* avoid duplicate messages */ XLByteToSeg(recptr, segno, wal_segment_size); - if (max_replication_slots > 0) - slotminptr = XLogGetReplicationSlotMinimumLSN(); - /* * We should keep certain number of WAL segments after this checktpoint. */ - minSegNo = GetOldestKeepSegment(recptr, slotminptr, NULL); + minSegNo = GetOldestKeepSegment(&slotSegNo); /* * warn if the checkpoint flushes the segments required by replication * slots. */ - if (!XLogRecPtrIsInvalid(slotminptr)) + if (slotSegNo < minSegNo) { - static XLogSegNo prev_lost_segs = 0; /* avoid duplicate messages */ - - XLByteToSeg(slotminptr, slotSegNo, wal_segment_size); - - if (slotSegNo < minSegNo) - { - XLogSegNo lost_segs = minSegNo - slotSegNo; - if (prev_lost_segs != lost_segs) - ereport(WARNING, - (errmsg ("some replication slots have lost required WAL segments"), - errdetail("The mostly affected slot has lost %ld segments.", - lost_segs))); - prev_lost_segs = lost_segs; - } - else - prev_lost_segs = 0; + XLogSegNo lost_segs = minSegNo - slotSegNo; + if (prev_lost_segs != lost_segs) + ereport(WARNING, + (errmsg ("some replication slots have lost required WAL segments"), + errdetail("The mostly affected slot has lost %ld segments.", + lost_segs))); + prev_lost_segs = lost_segs; } + else + prev_lost_segs = 0; if (minSegNo < segno) segno = minSegNo; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index d28896d..cf9e795 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -798,8 +798,8 @@ CREATE VIEW pg_replication_slots AS L.catalog_xmin, L.restart_lsn, L.confirmed_flush_lsn, - L.wal_status, - L.remain + L.wal_status, + L.remain FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 0b410ac..1d133a5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -314,10 +314,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) } else { - uint64 remaining_bytes; + uint64 remainingSegs; char *status; - switch (IsLsnStillAvaiable(restart_lsn, &remaining_bytes)) + switch (IsLsnStillAvaiable(restart_lsn, &remainingSegs)) { case 0: status = "lost"; @@ -334,7 +334,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) } values[i++] = CStringGetTextDatum(status); - values[i++] = LSNGetDatum(remaining_bytes); + values[i++] = UInt64GetDatum(remainingSegs); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index ad9d1de..a0f18af 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -269,7 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); -extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes); +extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes_p); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 18acf1f..4a096c9 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9796,7 +9796,7 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}', prosrc => 'pg_get_replication_slots' },