[PATCH 06/16] Add support for a generic wal reading facility dubbed XLogReader - Mailing list pgsql-hackers

From Andres Freund
Subject [PATCH 06/16] Add support for a generic wal reading facility dubbed XLogReader
Date
Msg-id 1339586927-13156-6-git-send-email-andres@2ndquadrant.com
Whole thread Raw
In response to [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
Responses Re: [PATCH 06/16] Add support for a generic wal reading facility dubbed XLogReader
List pgsql-hackers
From: Andres Freund <andres@anarazel.de>

Features:
- streaming reading/writing
- filtering
- reassembly of records

Reusing the ReadRecord infrastructure in situations where the code that wants
to do so is not tightly integrated into xlog.c is rather hard and would require
changes to rather integral parts of the recovery code which doesn't seem to be
a good idea.

Missing:
- "compressing" the stream when removing uninteresting records
- writing out correct CRCs
- validating CRCs
- separating reader/writer
---src/backend/access/transam/Makefile     |    2 +-src/backend/access/transam/xlogreader.c |  914
+++++++++++++++++++++++++++++++src/include/access/xlogreader.h        |  173 ++++++3 files changed, 1088 insertions(+),
1deletion(-)create mode 100644 src/backend/access/transam/xlogreader.ccreate mode 100644
src/include/access/xlogreader.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index f82f10e..660b5fc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..include $(top_builddir)/src/Makefile.globalOBJS = clog.o transam.o varsup.o
xact.ormgr.o slru.o subtrans.o multixact.o \
 
-    twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o
+    twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogreader.o xlogutils.oinclude $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
new file mode 100644
index 0000000..6f15d66
--- /dev/null
+++ b/src/backend/access/transam/xlogreader.c
@@ -0,0 +1,914 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *
+ * Aa somewhat generic xlog read interface
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *      src/backend/access/transam/readxlog.c
+ *
+ *-------------------------------------------------------------------------
+ *
+ * FIXME:
+ * * CRC computation
+ * * separation of reader/writer
+ */
+
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/transam.h"
+#include "catalog/pg_control.h"
+#include "access/xlogreader.h"
+
+/* FIXME */
+#include "replication/walsender_private.h"
+#include "replication/walprotocol.h"
+
+//#define VERBOSE_DEBUG
+
+XLogReaderState* XLogReaderAllocate(void)
+{
+    XLogReaderState* state = (XLogReaderState*)malloc(sizeof(XLogReaderState));
+    int i;
+
+    if (!state)
+        goto oom;
+
+    memset(&state->buf.record, 0, sizeof(XLogRecord));
+    state->buf.record_data_size = XLOG_BLCKSZ*8;
+    state->buf.record_data =
+            malloc(state->buf.record_data_size);
+
+    if (!state->buf.record_data)
+        goto oom;
+
+    if (!state)
+        goto oom;
+
+    for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+    {
+        state->buf.bkp_block_data[i] =
+            malloc(BLCKSZ);
+
+        if (!state->buf.bkp_block_data[i])
+            goto oom;
+    }
+    XLogReaderReset(state);
+    return state;
+
+oom:
+    elog(ERROR, "could not allocate memory for XLogReaderState");
+    return 0;
+}
+
+void XLogReaderReset(XLogReaderState* state)
+{
+    state->in_record = false;
+    state->in_bkp_blocks = 0;
+    state->in_bkp_block_header = false;
+    state->in_skip = false;
+    state->remaining_size = 0;
+    state->nbytes = 0;
+    state->incomplete = false;
+    state->initialized = false;
+    state->needs_input = false;
+    state->needs_output = false;
+}
+
+static inline bool
+XLogReaderHasInput(XLogReaderState* state, Size size)
+{
+    XLogRecPtr tmp = state->curptr;
+    XLByteAdvance(tmp, size);
+    if (XLByteLE(state->endptr, tmp))
+        return false;
+    return true;
+}
+
+static inline bool
+XLogReaderHasOutput(XLogReaderState* state, Size size){
+    if (state->nbytes + size > MAX_SEND_SIZE)
+        return false;
+    return true;
+}
+
+static inline bool
+XLogReaderHasSpace(XLogReaderState* state, Size size)
+{
+    XLogRecPtr tmp = state->curptr;
+    XLByteAdvance(tmp, size);
+    if (XLByteLE(state->endptr, tmp))
+        return false;
+    else if (state->nbytes + size > MAX_SEND_SIZE)
+        return false;
+    return true;
+}
+
+void
+XLogReaderRead(XLogReaderState* state)
+{
+    XLogRecord* temp_record;
+
+    state->needs_input = false;
+    state->needs_output = false;
+
+    /*
+     * Do some basic sanity checking and setup if were starting anew.
+     */
+    if (!state->initialized)
+    {
+        state->initialized = true;
+        /*
+         * we need to start reading at the beginning of the page to understand
+         * what we are currently reading. We will skip over that because we
+         * check curptr < startptr later.
+         */
+        state->curptr.xrecoff = state->curptr.xrecoff - state->curptr.xrecoff % XLOG_BLCKSZ;
+        Assert(state->curptr.xrecoff % XLOG_BLCKSZ == 0);
+        elog(LOG, "start reading from %X/%X, scrolled back to %X/%X",
+             state->startptr.xlogid, state->startptr.xrecoff,
+             state->curptr.xlogid, state->curptr.xrecoff);
+
+    }
+    else
+    {
+        /*
+         * We didn't finish reading the last time round. Since then new data
+         * could have been appended to the current page. So we need to update
+         * our copy of that.
+         *
+         * XXX: We could tie that to state->needs_input but that doesn't seem
+         * worth the complication atm.
+         */
+        XLogRecPtr rereadptr = state->curptr;
+        rereadptr.xrecoff -= rereadptr.xrecoff % XLOG_BLCKSZ;
+
+        XLByteAdvance(rereadptr, SizeOfXLogShortPHD);
+
+        if(!XLByteLE(rereadptr, state->endptr))
+            goto not_enough_input;
+
+        rereadptr.xrecoff -= rereadptr.xrecoff % XLOG_BLCKSZ;
+
+        state->read_page(state, state->cur_page, rereadptr);
+
+        state->page_header = (XLogPageHeader)state->cur_page;
+        state->page_header_size = XLogPageHeaderSize(state->page_header);
+
+    }
+
+#ifdef VERBOSE_DEBUG
+    elog(LOG, "starting reading for %X from %X",
+         state->startptr.xrecoff, state->curptr.xrecoff);
+#endif
+    while (XLByteLT(state->curptr, state->endptr))
+    {
+        uint32 len_in_block;
+        /* did we read a partial xlog record due to input/output constraints */
+        bool partial_read = false;
+        bool partial_write = false;
+
+#ifdef VERBOSE_DEBUG
+        elog(LOG, "one loop start: record: %u skip: %u bkb_block: %d in_bkp_header: %u xrecoff: %X/%X remaining: %u,
off:%u",
 
+             state->in_record, state->in_skip,
+             state->in_bkp_blocks, state->in_bkp_block_header,
+             state->curptr.xlogid, state->curptr.xrecoff,
+             state->remaining_size,
+             state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+
+        if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+        {
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "reading page header, at %X/%X",
+                 state->curptr.xlogid, state->curptr.xrecoff);
+#endif
+            /* check whether we can read enough to see the short header */
+            if (!XLogReaderHasInput(state, SizeOfXLogShortPHD))
+                goto not_enough_input;
+
+            state->read_page(state, state->cur_page, state->curptr);
+            state->page_header = (XLogPageHeader)state->cur_page;
+            state->page_header_size = XLogPageHeaderSize(state->page_header);
+
+            /* check wether we have enough to read/write the full header */
+            if (!XLogReaderHasInput(state, state->page_header_size))
+                goto not_enough_input;
+
+            /* writeout page header only if were somewhere interesting */
+            if (!XLByteLT(state->curptr, state->startptr))
+            {
+                if (!XLogReaderHasOutput(state, state->page_header_size))
+                    goto not_enough_output;
+
+                state->writeout_data(state, state->cur_page, state->page_header_size);
+            }
+
+            XLByteAdvance(state->curptr, state->page_header_size);
+
+            if (XLByteLT(state->curptr, state->startptr))
+            {
+                /* don't intepret anything if were before startpoint */
+            }
+            else if (state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+            {
+                XLogContRecord* temp_contrecord;
+
+                if(!XLogReaderHasInput(state, SizeOfXLogContRecord))
+                    goto not_enough_input;
+
+                if(!XLogReaderHasOutput(state, SizeOfXLogContRecord))
+                    goto not_enough_output;
+
+                temp_contrecord =
+                    (XLogContRecord*)(state->cur_page
+                                      + state->curptr.xrecoff % XLOG_BLCKSZ);
+
+
+                state->writeout_data(state, (char*)temp_contrecord, SizeOfXLogContRecord);
+
+                XLByteAdvance(state->curptr, SizeOfXLogContRecord);
+
+                if (!state->in_record)
+                {
+                    /* we need to support this case for initializing a cluster... */
+                    elog(WARNING, "contrecord although were not in a record at %X/%X, starting at %X/%X",
+                         state->curptr.xlogid, state->curptr.xrecoff,
+                         state->startptr.xlogid, state->startptr.xrecoff);
+                    state->in_record = true;
+                    state->in_skip = true;
+                    state->remaining_size = temp_contrecord->xl_rem_len;
+                    continue;
+                }
+
+
+                if(temp_contrecord->xl_rem_len < state->remaining_size)
+                    elog(PANIC, "remaining length is smaller than to be read data: %u %u",
+                         temp_contrecord->xl_rem_len, state->remaining_size
+                        );
+
+            }
+            else
+            {
+                if (state->in_record)
+                {
+                    elog(PANIC, "no contrecord although were in a record");
+                }
+            }
+        }
+
+        if (!state->in_record)
+        {
+            /*
+             * a record must be stored aligned. So skip as far we need to
+             * comply with that.
+             */
+            Size skiplen;
+            skiplen = MAXALIGN(state->curptr.xrecoff)
+                - state->curptr.xrecoff;
+
+            if (skiplen)
+            {
+                if (!XLogReaderHasSpace(state, skiplen))
+                {
+#ifdef VERBOSE_DEBUG
+                    elog(LOG, "not aligning bc of space");
+#endif
+                    /*
+                     * We don't have enough space to read/write the alignment
+                     * bytes, so fake up a skip-state
+                     */
+                    state->in_record = true;
+                    state->in_skip = true;
+                    state->remaining_size = skiplen;
+
+                    if (!XLogReaderHasInput(state, skiplen))
+                        goto not_enough_input;
+                    goto not_enough_output;
+                }
+#ifdef VERBOSE_DEBUG
+                elog(LOG, "aligning from %X/%X to %X/%X",
+                     state->curptr.xlogid, state->curptr.xrecoff,
+                     state->curptr.xlogid, state->curptr.xrecoff + (uint32)skiplen);
+#endif
+                if (!XLByteLT(state->curptr, state->startptr))
+                    state->writeout_data(state, NULL, skiplen);
+                XLByteAdvance(state->curptr, skiplen);
+            }
+        }
+
+        /* skip until we reach the part of the page were interested in */
+        if (XLByteLT(state->curptr, state->startptr))
+        {
+
+            if (state->in_skip)
+            {
+                /* the code already handles that, we expect a contrecord */
+            }
+            else if ((state->curptr.xrecoff % XLOG_BLCKSZ) == state->page_header_size &&
+                     state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+            {
+
+                XLogContRecord* temp_contrecord = (XLogContRecord*)
+                    (state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ);
+
+                /*
+                 * we know we have enough space here because we didn't start
+                 * writing out data yet because were < startptr
+                 */
+                Assert(XLogReaderHasSpace(state, SizeOfXLogContRecord));
+
+                XLByteAdvance(state->curptr, SizeOfXLogContRecord);
+
+#ifdef VERBOSE_DEBUG
+                elog(LOG, "skipping contrecord before start");
+#endif
+                state->in_skip = true;
+                state->in_record = true;
+                state->in_bkp_blocks = 0;
+                state->remaining_size = temp_contrecord->xl_rem_len;
+            }
+            else
+            {
+                Assert(!state->in_record);
+
+                /* read how much space we have left on the current page */
+                if(state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+                    len_in_block = 0;
+                else
+                    len_in_block = XLOG_BLCKSZ - state->curptr.xrecoff % XLOG_BLCKSZ;
+
+                if(len_in_block < SizeOfXLogRecord)
+                {
+                    XLByteAdvance(state->curptr, len_in_block);
+                    continue;
+                }
+
+                /*
+                 * now read the record information and start skipping till the
+                 * record is over
+                 */
+                temp_record = (XLogRecord*)(state->cur_page + (state->curptr.xrecoff % XLOG_BLCKSZ));
+
+#ifdef VERBOSE_DEBUG
+                elog(LOG, "skipping record before start %lu, tot %u at %X/%X off %d ",
+                     temp_record->xl_tot_len - SizeOfXLogRecord,
+                     temp_record->xl_tot_len,
+                     state->curptr.xlogid, state->curptr.xrecoff,
+                     state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+
+                Assert(XLogReaderHasSpace(state, SizeOfXLogRecord));
+
+                XLByteAdvance(state->curptr, SizeOfXLogRecord);
+
+                state->in_skip = true;
+                state->in_record = true;
+                state->in_bkp_blocks = 0;
+                state->remaining_size = temp_record->xl_tot_len
+                    - SizeOfXLogRecord;
+            }
+        }
+
+        /*
+         * ----------------------------------------
+         * start to read a record
+         *
+         * This will only happen if were already behind state->startptr
+         * ----------------------------------------
+         */
+        if (!state->in_record)
+        {
+            /*
+             * if were at the beginning of a page (after the page header) it
+             * could be that were starting in a continuation of an earlier
+             * record. Its debatable wether thats a valid use-case. Support it
+             * for now but cry loudly.
+             */
+            if ((state->curptr.xrecoff % XLOG_BLCKSZ) == state->page_header_size &&
+               state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+            {
+                XLogContRecord* temp_contrecord = (XLogContRecord*)
+                    (state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ);
+
+                if (!XLogReaderHasInput(state, SizeOfXLogContRecord))
+                    goto not_enough_input;
+
+                if (!XLogReaderHasOutput(state, SizeOfXLogContRecord))
+                    goto not_enough_output;
+
+                state->writeout_data(state,
+                                     (char*)temp_contrecord,
+                                     SizeOfXLogContRecord);
+                XLByteAdvance(state->curptr, SizeOfXLogContRecord);
+
+                elog(PANIC, "hum, ho, first is contrecord, but trying to read the record afterwards %X/%X",
+                     state->curptr.xlogid, state->curptr.xrecoff);
+
+                state->in_skip = true;
+                state->in_record = true;
+                state->in_bkp_blocks = 0;
+                state->remaining_size = temp_contrecord->xl_rem_len;
+                continue;
+            }
+
+            /* read how much space we have left on the current page */
+            if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+                len_in_block = 0;
+            else
+                len_in_block = XLOG_BLCKSZ - state->curptr.xrecoff % XLOG_BLCKSZ;
+
+            /* if there is not enough space for the xlog header, skip to next page */
+            if (len_in_block < SizeOfXLogRecord)
+            {
+
+                if (!XLogReaderHasOutput(state, len_in_block))
+                    goto not_enough_input;
+
+                if (!XLogReaderHasOutput(state, len_in_block))
+                    goto not_enough_output;
+
+                state->writeout_data(state,
+                                     NULL,
+                                     len_in_block);
+
+                XLByteAdvance(state->curptr, len_in_block);
+                continue;
+            }
+
+            temp_record = (XLogRecord*)(state->cur_page + (state->curptr.xrecoff % XLOG_BLCKSZ));
+
+            /*
+             * we quickly loose the original address of a record as we can skip
+             * records and such, so keep the original addresses.
+             */
+            state->buf.origptr = state->curptr;
+
+            /* we writeout data as soon as we know whether were writing out something sensible */
+            XLByteAdvance(state->curptr, SizeOfXLogRecord);
+
+            /* ----------------------------------------
+             * normally we don't look at the content of xlog records here,
+             * XLOG_SWITCH is a special case though, as everything left in that
+             * segment won't be sensbible content.
+             * So skip to the next segment. For that we currently simply leave
+             * the loop as we don't have any mechanism to communicate that
+             * behaviour otherwise.
+             * ----------------------------------------
+             */
+            if (temp_record->xl_rmid == RM_XLOG_ID
+                && (temp_record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+            {
+
+                /*
+                 * writeout data so that this gap makes sense in the written
+                 * out data
+                 */
+                state->writeout_data(state,
+                                     (char*)temp_record,
+                                     SizeOfXLogRecord);
+
+                /*
+                 * Pretend the current data extends to end of segment
+                 *
+                 * FIXME: This logic is copied from xlog.c but seems to
+                 *    disregard xrecoff wrapping around to the next xlogid?
+                 */
+                state->curptr.xrecoff += XLogSegSize - 1;
+                state->curptr.xrecoff -= state->curptr.xrecoff % XLogSegSize;
+
+                state->in_record = false;
+                state->in_bkp_blocks = 0;
+                state->in_skip = false;
+                goto out;
+            }
+            /* ----------------------------------------
+             * Ok, we found interesting data. That means we need to do the full
+             * deal, reading the record, reading the BKP blocks afterward and
+             * then hand of the record to be processed.
+             * ----------------------------------------
+             */
+            else if (state->is_record_interesting(state, temp_record))
+            {
+                /*
+                 * the rest of the record might be on another page so we need a
+                 * copy instead just pointing into the current page.
+                 */
+                memcpy(&state->buf.record,
+                       temp_record,
+                       sizeof(XLogRecord));/* really needs sizeof(XLogRecord) */
+
+                state->writeout_data(state,
+                                     (char*)temp_record,
+                                     SizeOfXLogRecord);
+                /*
+                 * read till the record itself finished, after that we will
+                 * continue with the bkp blocks et al
+                 */
+                state->remaining_size = temp_record->xl_len;
+
+                state->in_record = true;
+                state->in_bkp_blocks = 0;
+                state->in_skip = false;
+
+#ifdef VERBOSE_DEBUG
+                elog(LOG, "found record at %X/%X, tx %u, rmid %hhu, len %u tot %u",
+                     state->buf.origptr.xlogid, state->buf.origptr.xrecoff,
+                     temp_record->xl_xid, temp_record->xl_rmid, temp_record->xl_len,
+                     temp_record->xl_tot_len);
+#endif
+            }
+            /* ----------------------------------------
+             * ok, everybody aggrees, the content of the current record are
+             * just plain boring. So fake-up a record that replaces it by a
+             * NOOP record.
+             *
+             * FIXME: we should allow "compressing" the output here. That is
+             * write something that shows how long the record should be if
+             * everything is decompressed again. This can radically reduce
+             * space-usage over the wire.
+             * It could also be very useful for traditional SR by removing
+             * unneded BKP blocks from being transferred.
+             * For that we would need to recompute CRCs though, which we
+             * currently don't support.
+             * ----------------------------------------
+             */
+            else
+            {
+                /*
+                 * we need to fix up a fake record with correct length that can
+                 * be written out.
+                 */
+                /* needs space for padding to SizeOfXLogRecord */
+                XLogRecord spacer;
+
+                /*
+                 * xl_tot_len contains the size of the XLogRecord itself, we
+                 * read that already though.
+                 */
+                state->remaining_size = temp_record->xl_tot_len
+                    - SizeOfXLogRecord;
+
+                state->in_record = true;
+                state->in_bkp_blocks = 0;
+                state->in_skip = true;
+
+                /* FIXME: fixup the xl_prev of the next record */
+                spacer.xl_prev = state->buf.origptr;
+                spacer.xl_xid = InvalidTransactionId;
+                spacer.xl_tot_len = temp_record->xl_tot_len;
+                spacer.xl_len = temp_record->xl_tot_len - SizeOfXLogRecord;
+                spacer.xl_rmid = RM_XLOG_ID;
+                spacer.xl_info = XLOG_NOOP;
+
+                state->writeout_data(state,
+                                     (char*)&spacer,
+                                     SizeOfXLogRecord);
+            }
+        }
+        /*
+         * We read an interesting page and now want the BKP
+         * blocks. Unfortunately a bkp header is stored unaligned and can be
+         * split across pages. So we copy it to a bit more permanent location.
+         */
+        else if (state->in_bkp_blocks > 0
+                && state->remaining_size == 0)
+        {
+            Assert(!state->in_bkp_block_header);
+            Assert(state->buf.record.xl_info &
+                   XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks));
+
+            state->in_bkp_block_header = true;
+            state->remaining_size = sizeof(BkpBlock);
+            /* in_bkp_blocks will be changed uppon completion */
+            state->in_skip = false;
+        }
+
+        Assert(state->in_record);
+
+        /* compute how much space on the current page is left */
+        if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+            len_in_block = 0;
+        else
+            len_in_block = XLOG_BLCKSZ - state->curptr.xrecoff % XLOG_BLCKSZ;
+
+        /* we have more data available than we need, so read only as much as needed */
+        if(len_in_block > state->remaining_size)
+            len_in_block = state->remaining_size;
+
+        /*
+         * Handle constraints set by endptr and the size of the output buffer.
+         *
+         * Normally we use XLogReaderHasSpace for that, but thats not
+         * convenient because we want to read data in parts. So, open-code the
+         * logic for that here.
+         */
+        if (state->curptr.xlogid == state->endptr.xlogid &&
+           state->curptr.xrecoff + len_in_block > state->endptr.xrecoff)
+        {
+            Size cur_len = len_in_block;
+            len_in_block = state->endptr.xrecoff - state->curptr.xrecoff;
+            partial_read = true;
+            elog(LOG, "truncating len_in_block due to endptr %X/%X %lu to %i at %X/%X",
+                 state->startptr.xlogid, state->startptr.xrecoff,
+                 cur_len, len_in_block,
+                 state->curptr.xlogid, state->curptr.xrecoff);
+        }
+        else if (len_in_block > (MAX_SEND_SIZE - state->nbytes))
+        {
+            Size cur_len = len_in_block;
+            len_in_block = MAX_SEND_SIZE - state->nbytes;
+            partial_write = true;
+            elog(LOG, "truncating len_in_block due to nbytes %lu to %i",
+                 cur_len, len_in_block);
+        }
+
+        /* ----------------------------------------
+         * copy data to whatever were currently reading.
+         * ----------------------------------------
+         */
+
+        /* nothing to do if were skipping */
+        if (state->in_skip)
+        {
+            /* writeout zero data */
+            if (!XLByteLT(state->curptr, state->startptr))
+                state->writeout_data(state, NULL, len_in_block);
+        }
+        /* copy data into the current bkp block */
+        else if (state->in_bkp_block_header)
+        {
+            int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+            BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+            Assert(state->in_bkp_blocks);
+
+            memcpy((char*)bkpb + sizeof(BkpBlock) - state->remaining_size,
+                   state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+                   len_in_block);
+
+            state->writeout_data(state,
+                                 state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+                                 len_in_block);
+
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "copying bkp header for %d of %u complete %lu at %X/%X rem %u",
+                 blockno, len_in_block, sizeof(BkpBlock),
+                 state->curptr.xlogid, state->curptr.xrecoff,
+                 state->remaining_size);
+            if (state->remaining_size == len_in_block)
+            {
+                elog(LOG, "block off %u len %u", bkpb->hole_offset, bkpb->hole_length);
+            }
+#endif
+        }
+        else if (state->in_bkp_blocks)
+        {
+            int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+            BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+            char* data = state->buf.bkp_block_data[blockno];
+
+            memcpy(data + BLCKSZ - bkpb->hole_length - state->remaining_size,
+                   state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+                   len_in_block);
+
+            state->writeout_data(state,
+                                 state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+                                 len_in_block);
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "copying data for %d of %u complete %u",
+                 blockno, len_in_block, state->remaining_size);
+#endif
+        }
+        /* read the (rest) of the XLogRecord's data */
+        else if (state->in_record)
+        {
+            if(state->buf.record_data_size < state->buf.record.xl_len){
+                state->buf.record_data_size = state->buf.record.xl_len;
+                state->buf.record_data =
+                    realloc(state->buf.record_data, state->buf.record_data_size);
+            }
+
+            memcpy(state->buf.record_data
+                   + state->buf.record.xl_len
+                   - state->remaining_size,
+                   state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+                   len_in_block);
+
+            state->writeout_data(state,
+                                 state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+                                 len_in_block);
+        }
+
+        /* should handle wrapping around to next page */
+        XLByteAdvance(state->curptr, len_in_block);
+
+        state->remaining_size -= len_in_block;
+
+        /*
+         * ----------------------------------------
+         * we completed whatever we were reading. So, handle going to the next
+         * state.
+         * ----------------------------------------
+         */
+
+        if (state->remaining_size == 0)
+        {
+            /*
+             * in the in_skip case we already read backup blocks, so everything
+             * is finished.
+             */
+            if (state->in_skip)
+            {
+                state->in_record = false;
+                state->in_bkp_blocks = 0;
+                state->in_skip = false;
+                /* alignment is handled when starting to read a record */
+            }
+            /*
+             * We read the header of the current block. Start reading the
+             * content of that now.
+             */
+            else if (state->in_bkp_block_header)
+            {
+                BkpBlock* bkpb;
+                int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+
+                Assert(state->in_bkp_blocks);
+
+                bkpb = &state->buf.bkp_block[blockno];
+                state->remaining_size = BLCKSZ - bkpb->hole_length;
+                state->in_bkp_block_header = false;
+#ifdef VERBOSE_DEBUG
+                elog(LOG, "completed reading of header for %d, reading data now %u hole %u, off %u",
+                     blockno, state->remaining_size, bkpb->hole_length,
+                     bkpb->hole_offset);
+#endif
+            }
+            /*
+             * The current backup block is finished, more maybe comming
+             */
+            else if (state->in_bkp_blocks)
+            {
+                int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+                BkpBlock* bkpb;
+                char* bkpb_data;
+
+                Assert(!state->in_bkp_block_header);
+
+                bkpb = &state->buf.bkp_block[blockno];
+                bkpb_data = state->buf.bkp_block_data[blockno];
+
+                /*
+                 * reassemble block to its entirety by removing the bkp_hole
+                 * "compression"
+                 */
+                if(bkpb->hole_length){
+                    memmove(bkpb_data + bkpb->hole_offset,
+                            bkpb_data + bkpb->hole_offset + bkpb->hole_length,
+                            BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+                    memset(bkpb_data + bkpb->hole_offset,
+                           0,
+                           bkpb->hole_length);
+                }
+#if 0
+                elog(LOG, "finished with bkp block %d", blockno);
+#endif
+                state->in_bkp_blocks--;
+
+                state->in_skip = false;
+
+                /*
+                 * only continue with in_record=true if we have bkp block
+                 */
+                while (state->in_bkp_blocks)
+                {
+                    if (state->buf.record.xl_info &
+                       XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks))
+                    {
+                        elog(LOG, "reading record %u", XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+                        break;
+                    }
+                    state->in_bkp_blocks--;
+                }
+
+                if (!state->in_bkp_blocks)
+                {
+                    goto all_bkp_finished;
+                }
+                /* bkp blocks are stored without regard for alignment */
+            }
+            /*
+             * read a non-skipped record, start reading bkp blocks afterwards
+             */
+            else if (state->in_record)
+            {
+                state->in_record = true;
+                state->in_skip = false;
+                state->in_bkp_blocks = XLR_MAX_BKP_BLOCKS;
+
+                /*
+                 * only continue with in_record=true if we have bkp block
+                 */
+                while (state->in_bkp_blocks)
+                {
+                    if (state->buf.record.xl_info &
+                        XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks))
+                    {
+#ifdef VERBOSE_DEBUG
+                        elog(LOG, "reading bkp block %u", XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+#endif
+                        break;
+                    }
+                    state->in_bkp_blocks--;
+                }
+
+                if (!state->in_bkp_blocks)
+                {
+                    goto all_bkp_finished;
+                }
+                /* bkp blocks are stored without regard for alignment */
+            }
+
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "finish with record at %X/%X",
+                 state->curptr.xlogid, state->curptr.xrecoff);
+#endif
+        }
+        /*
+         * Something could only be partially read inside a single block because
+         * of input or output space constraints.. This case needs to be
+         * separate because otherwise we would treat it as a continuation which
+         * would obviously be wrong (we don't have a continuation record).
+         */
+        else if (partial_read)
+        {
+            partial_read = false;
+            goto not_enough_input;
+        }
+        else if (partial_write)
+        {
+            partial_write = false;
+            goto not_enough_output;
+        }
+        /*
+         * Data continues into the next block. Read the contiuation record
+         * there and then continue.
+         */
+        else
+        {
+        }
+#ifdef VERBOSE_DEBUG
+        elog(LOG, "one loop: record: %u skip: %u bkb_block: %d in_bkp_header: %u xrecoff: %X/%X remaining: %u, off:
%u",
+             state->in_record, state->in_skip,
+             state->in_bkp_blocks, state->in_bkp_block_header,
+             state->curptr.xlogid, state->curptr.xrecoff,
+             state->remaining_size,
+             state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+        continue;
+
+    all_bkp_finished:
+        {
+            Assert(!state->in_skip);
+            Assert(!state->in_bkp_block_header);
+            Assert(!state->in_bkp_blocks);
+
+            state->finished_record(state, &state->buf);
+
+            state->in_record = false;
+
+            /* alignment is handled when starting to read a record */
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "currently at %X/%X to %X/%X, wrote nbytes: %lu",
+                 state->curptr.xlogid, state->curptr.xrecoff,
+                 state->endptr.xlogid, state->endptr.xrecoff, state->nbytes);
+#endif
+        }
+    }
+
+out:
+    if (state->in_skip)
+    {
+        state->incomplete = true;
+    }
+    else if (state->in_record)
+    {
+        state->incomplete = true;
+    }
+    else
+    {
+        state->incomplete = false;
+    }
+    return;
+
+not_enough_input:
+    state->needs_input = true;
+    goto out;
+
+not_enough_output:
+    state->needs_output = true;
+    goto out;
+}
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
new file mode 100644
index 0000000..7df98cf
--- /dev/null
+++ b/src/include/access/xlogreader.h
@@ -0,0 +1,173 @@
+/*-------------------------------------------------------------------------
+ *
+ * readxlog.h
+ *
+ * Generic xlog reading facility.
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * src/include/access/readxlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _READXLOG_H
+#define _READXLOG_H
+
+#include "access/xlog_internal.h"
+
+typedef struct XLogRecordBuffer
+{
+    /* the record itself */
+    XLogRecord record;
+
+    /* at which LSN was that record found at */
+    XLogRecPtr origptr;
+
+    /* the data for xlog record */
+    char* record_data;
+    uint32 record_data_size;
+
+    BkpBlock bkp_block[XLR_MAX_BKP_BLOCKS];
+    char* bkp_block_data[XLR_MAX_BKP_BLOCKS];
+} XLogRecordBuffer;
+
+
+struct XLogReaderState;
+
+typedef bool (*XLogReaderStateInterestingCB)(struct XLogReaderState*, XLogRecord* r);
+typedef void (*XLogReaderStateWriteoutCB)(struct XLogReaderState*, char* data, Size len);
+typedef void (*XLogReaderStateFinishedRecordCB)(struct XLogReaderState*, XLogRecordBuffer* buf);
+typedef void (*XLogReaderStateReadPageCB)(struct XLogReaderState*, char* cur_page, XLogRecPtr at);
+
+typedef struct XLogReaderState
+{
+    /* ----------------------------------------
+     * Public parameters
+     * ----------------------------------------
+     */
+
+    /* callbacks */
+
+    /*
+     * Called to decide whether a xlog record is interesting and should be
+     * assembled, analyzed (finished_record) and written out or skipped.
+     */
+    XLogReaderStateInterestingCB is_record_interesting;
+
+    /*
+     * writeout data. This doesn't have to do anything if the data isn't needed
+     * lateron.
+     */
+    XLogReaderStateWriteoutCB writeout_data;
+
+    /*
+     * Gets called after a record, including the backup blocks, has been fully
+     * reassembled.
+     */
+    XLogReaderStateFinishedRecordCB finished_record;
+
+    /*
+     * Data input function. Has to read XLOG_BLKSZ blocks into cur_page
+     * although everything behind endptr does not have to be valid.
+     */
+    XLogReaderStateReadPageCB read_page;
+
+    /*
+     * this can be used by the caller to pass state to the callbacks without
+     * using global variables or such ugliness
+     */
+    void* private_data;
+
+
+    /* from where to where are we reading */
+
+    /* so we know where interesting data starts after scrolling back to the beginning of a page */
+    XLogRecPtr startptr;
+
+    /* continue up to here in this run */
+    XLogRecPtr endptr;
+
+
+    /* ----------------------------------------
+     * output parameters
+     * ----------------------------------------
+     */
+
+    /* we need new input data - a later endptr - to continue reading */
+    bool needs_input;
+
+    /* we need new output space to continue reading */
+    bool needs_output;
+
+    /* track our progress */
+    XLogRecPtr curptr;
+
+    /*
+     * are we in the middle of something? This is useful for the outside to
+     * know whether to start reading anew
+     */
+    bool incomplete;
+
+    /* ----------------------------------------
+     * private parameters
+     * ----------------------------------------
+     */
+
+    char cur_page[XLOG_BLCKSZ];
+    XLogPageHeader page_header;
+    uint32 page_header_size;
+    XLogRecordBuffer buf;
+
+
+    /* ----------------------------------------
+     * state machine variables
+     * ----------------------------------------
+     */
+
+    bool initialized;
+
+    /* are we currently reading a record */
+    bool in_record;
+
+    /* how many bkp blocks remain to be read */
+    int in_bkp_blocks;
+
+    /*
+     * the header of a bkp block can be split across pages, so we need to
+     * support reading that incrementally
+     */
+    int in_bkp_block_header;
+
+    /* we don't want to read this block, so keep track of that */
+    bool in_skip;
+
+    /* how much more to read in the current state */
+    uint32 remaining_size;
+
+    Size nbytes; /* size of sent data*/
+
+} XLogReaderState;
+
+/*
+ * Get a new XLogReader
+ *
+ * The 4 callbacks, startptr and endptr have to be set before the reader can be
+ * used.
+ */
+extern XLogReaderState* XLogReaderAllocate(void);
+
+/*
+ * Reset internal state so it can be used without continuing from the last
+ * state.
+ *
+ * The callbacks and private_data won't be reset
+ */
+extern void XLogReaderReset(XLogReaderState* state);
+
+/*
+ * Read the xlog and call the appropriate callbacks as far as possible within
+ * the constraints of input data (startptr, endptr) and output space.
+ */
+extern void XLogReaderRead(XLogReaderState* state);
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: [PATCH 10/16] Introduce the concept that wal has a 'origin' node
Next
From: Andres Freund
Date:
Subject: [PATCH 16/16] current version of the design document