Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least 9.5)? - Mailing list pgsql-hackers
| From | Kyotaro HORIGUCHI |
|---|---|
| Subject | Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least 9.5)? |
| Date | |
| Msg-id | 20170120.110729.107284864.horiguchi.kyotaro@lab.ntt.co.jp Whole thread Raw |
| In response to | Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least 9.5)? (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>) |
| Responses |
Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least9.5)?
|
| List | pgsql-hackers |
Hello,
At Thu, 19 Jan 2017 18:37:31 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20170119.183731.223893446.horiguchi.kyotaro@lab.ntt.co.jp>
> > > - Delaying recycling a segment until the last partial record on it
> > > completes. This seems doable in page-wise (coarse resolution)
> > > but would cost additional reading of past xlog files (page
> > > header of past pages is required).
> >
> > Hm, yes. That looks like the least invasive way to go. At least that
> > looks more correct than the others.
>
> The attached patch does that. Usually it reads page headers only
> on segment boundaries, but once continuation record found (or
> failed to read the next page header, that is, the first record on
> the first page in the next segment has not been replicated), it
> becomes to happen on every page boundary until non-continuation
> page comes.
>
> I leave a debug info (at LOG level) in the attached file shown on
> every state change of keep pointer. At least for pgbench, the
> cost seems ignorable.
I revised it. It became neater and less invasive.
- Removed added keep from struct WalSnd. It is never referrenced from other processes. It is static variable now.
- Restore keepPtr from replication slot on starting.
- Moved the main part to more appropriate position.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..0270474 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -185,6 +185,12 @@ static volatile sig_atomic_t replication_active = false;static LogicalDecodingContext
*logical_decoding_ctx= NULL;static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
+/*
+ * Segment keep pointer for physical slots. Has a valid value only when it
+ * differs from the current flush pointer.
+ */
+static XLogRecPtr keepPtr = InvalidXLogRecPtr;
+/* Signal handlers */static void WalSndSigHupHandler(SIGNAL_ARGS);static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -217,7 +223,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transtatic void
WalSndWriteData(LogicalDecodingContext*ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);static XLogRecPtr
WalSndWaitForWal(XLogRecPtrloc);
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);/* Initialize walsender process
beforeentering the main command loop */
@@ -538,6 +544,9 @@ StartReplication(StartReplicationCmd *cmd) ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use a logical replication slot
forphysical replication"))));
+
+ /* Restore keepPtr from replication slot */
+ keepPtr = MyReplicationSlot->data.restart_lsn; } /*
@@ -553,6 +562,10 @@ StartReplication(StartReplicationCmd *cmd) else FlushPtr = GetFlushRecPtr();
+ /* Set InvalidXLogRecPtr if catching up */
+ if (keepPtr == FlushPtr)
+ keepPtr = InvalidXLogRecPtr;
+ if (cmd->timeline != 0) { XLogRecPtr switchpoint;
@@ -774,7 +787,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count =
flushptr- targetPagePtr; /* now actually read the data, we know it's there */
- XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false); return count;}
@@ -1551,7 +1564,7 @@ static voidProcessStandbyReplyMessage(void){ XLogRecPtr writePtr,
- flushPtr,
+ flushPtr, oldFlushPtr, applyPtr; bool replyRequested;
@@ -1580,6 +1593,7 @@ ProcessStandbyReplyMessage(void) WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
+ oldFlushPtr = walsnd->flush; walsnd->write = writePtr; walsnd->flush = flushPtr;
walsnd->apply= applyPtr;
@@ -1597,7 +1611,78 @@ ProcessStandbyReplyMessage(void) if (SlotIsLogical(MyReplicationSlot))
LogicalConfirmReceivedLocation(flushPtr); else
- PhysicalConfirmReceivedLocation(flushPtr);
+ {
+ /*
+ * On recovery, a continuation reocrd must be available from
+ * single WAL source. So physical replication slot should stay in
+ * the first segment for a continuation record spanning multiple
+ * segments. Since this doesn't look into individual record,
+ * keepPtr may stay a bit too behind.
+ *
+ * Since the objective is avoding to remove required segments,
+ * checking every segment is enough. But once keepPtr goes behind,
+ * check every page for quick restoration.
+ *
+ * keepPtr has a valid value only when it is behind flushPtr.
+ */
+ if (oldFlushPtr != InvalidXLogRecPtr &&
+ (keepPtr == InvalidXLogRecPtr ?
+ oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+ keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+ {
+ XLogRecPtr rp;
+ XLogRecPtr oldKeepPtr = keepPtr; /* for debug */
+
+ if (keepPtr == InvalidXLogRecPtr)
+ keepPtr = oldFlushPtr;
+
+ rp = keepPtr - (keepPtr % XLOG_BLCKSZ);
+
+ /*
+ * We may have let the record at flushPtr sent, so it's worth
+ * looking
+ */
+ while (rp <= flushPtr)
+ {
+ XLogPageHeaderData header;
+
+ /*
+ * If the page header is not available for now, don't move
+ * keepPtr forward. We can read it by the next chance.
+ */
+ if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+ {
+ bool found;
+ /*
+ * Fetch the page header of the next page. Move
+ * keepPtr forward only if when it is not a
+ * continuation page.
+ */
+ found = XLogRead((char *)&header, rp,
+ sizeof(XLogPageHeaderData), true);
+ if (found &&
+ (header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+ keepPtr = rp;
+ }
+ rp += XLOG_BLCKSZ;
+ }
+
+ /*
+ * If keepPtr is on the same page with flushPtr, it means that
+ * we are catching up
+ */
+ if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+ keepPtr = InvalidXLogRecPtr;
+
+ if (oldKeepPtr != keepPtr)
+ elog(LOG, "%lX => %lX / %lX",
+ oldKeepPtr, keepPtr, flushPtr);
+ }
+
+ /* keepPtr == InvalidXLogRecPtr means catching up */
+ PhysicalConfirmReceivedLocation(keepPtr != InvalidXLogRecPtr ?
+ keepPtr : flushPtr);
+ } }}
@@ -2019,6 +2104,7 @@ WalSndKill(int code, Datum arg)/* * Read 'count' bytes from WAL into 'buf', starting at location
'startptr'
+ * Returns false if the segment file is not found when notfoundok is true. * * XXX probably this should be improved to
suckdata directly from the * WAL buffers when possible.
@@ -2028,8 +2114,8 @@ WalSndKill(int code, Datum arg) * always be one descriptor left open until the process ends, but
never* more than one. */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok){ char *p; XLogRecPtr recptr;
@@ -2106,10 +2192,15 @@ retry: * removed or recycled. */ if (errno ==
ENOENT)
+ {
+ if (notfoundok)
+ return false;
+ ereport(ERROR, (errcode_for_file_access(),
errmsg("requestedWAL segment %s has already been removed",
XLogFileNameP(curFileTimeLine,sendSegNo))));
+ } else ereport(ERROR,
(errcode_for_file_access(),
@@ -2189,6 +2280,8 @@ retry: goto retry; } }
+
+ return true;}/*
@@ -2393,7 +2486,7 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ XLogRead(&output_message.data[output_message.len], startptr, nbytes, false); output_message.len += nbytes;
output_message.data[output_message.len]= '\0';
pgsql-hackers by date: