Re: logical decoding and replication of sequences, take 2 - Mailing list pgsql-hackers

From Andres Freund
Subject Re: logical decoding and replication of sequences, take 2
Date
Msg-id 20230111201216.xyc7rptmw23fdem4@awork3.anarazel.de
Whole thread Raw
In response to Re: logical decoding and replication of sequences, take 2  (Tomas Vondra <tomas.vondra@enterprisedb.com>)
Responses Re: logical decoding and replication of sequences, take 2
List pgsql-hackers
Hi,


Heikki, CCed you due to the point about 2c03216d8311 below.


On 2023-01-10 19:32:12 +0100, Tomas Vondra wrote:
> 0001 is a fix for the pre-existing issue in logicalmsg_decode,
> attempting to build a snapshot before getting into a consistent state.
> AFAICS this only affects assert-enabled builds and is otherwise
> harmless, because we are not actually using the snapshot (apply gets a
> valid snapshot from the transaction).

LGTM.


> 0002 is a rebased version of the original approach, committed as
> 0da92dc530 (and then reverted in 2c7ea57e56). This includes the same fix
> as 0001 (for the sequence messages), the primary reason for the revert.
> 
> The rebase was not quite straightforward, due to extensive changes in
> how publications deal with tables/schemas, and so on. So this adopts
> them, but other than that it behaves just like the original patch.

This is a huge diff:
>  72 files changed, 4715 insertions(+), 612 deletions(-)

It'd be nice to split it to make review easier. Perhaps the sequence decoding
support could be split from the whole publication rigamarole?


> This does not include any changes to test_decoding and/or the built-in
> replication - those will be committed in separate patches.

Looks like that's not the case anymore?


> +/*
> + * Update the sequence state by modifying the existing sequence data row.
> + *
> + * This keeps the same relfilenode, so the behavior is non-transactional.
> + */
> +static void
> +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called)
> +{
> +    SeqTable    elm;
> +    Relation    seqrel;
> +    Buffer        buf;
> +    HeapTupleData seqdatatuple;
> +    Form_pg_sequence_data seq;
> +
> +    /* open and lock sequence */
> +    init_sequence(seqrelid, &elm, &seqrel);
> +
> +    /* lock page' buffer and read tuple */
> +    seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
> +
> +    /* check the comment above nextval_internal()'s equivalent call. */
> +    if (RelationNeedsWAL(seqrel))
> +    {
> +        GetTopTransactionId();
> +
> +        if (XLogLogicalInfoActive())
> +            GetCurrentTransactionId();
> +    }
> +
> +    /* ready to change the on-disk (or really, in-buffer) tuple */
> +    START_CRIT_SECTION();
> +
> +    seq->last_value = last_value;
> +    seq->is_called = is_called;
> +    seq->log_cnt = log_cnt;
> +
> +    MarkBufferDirty(buf);
> +
> +    /* XLOG stuff */
> +    if (RelationNeedsWAL(seqrel))
> +    {
> +        xl_seq_rec    xlrec;
> +        XLogRecPtr    recptr;
> +        Page        page = BufferGetPage(buf);
> +
> +        XLogBeginInsert();
> +        XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
> +
> +        xlrec.locator = seqrel->rd_locator;
> +        xlrec.created = false;
> +
> +        XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
> +        XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
> +
> +        recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
> +
> +        PageSetLSN(page, recptr);
> +    }
> +
> +    END_CRIT_SECTION();
> +
> +    UnlockReleaseBuffer(buf);
> +
> +    /* Clear local cache so that we don't think we have cached numbers */
> +    /* Note that we do not change the currval() state */
> +    elm->cached = elm->last;
> +
> +    relation_close(seqrel, NoLock);
> +}
> +
> +/*
> + * Update the sequence state by creating a new relfilenode.
> + *
> + * This creates a new relfilenode, to allow transactional behavior.
> + */
> +static void
> +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
> +{
> +    SeqTable    elm;
> +    Relation    seqrel;
> +    Buffer        buf;
> +    HeapTupleData seqdatatuple;
> +    Form_pg_sequence_data seq;
> +    HeapTuple    tuple;
> +
> +    /* open and lock sequence */
> +    init_sequence(seq_relid, &elm, &seqrel);
> +
> +    /* lock page' buffer and read tuple */
> +    seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
> +
> +    /* Copy the existing sequence tuple. */
> +    tuple = heap_copytuple(&seqdatatuple);
> +
> +    /* Now we're done with the old page */
> +    UnlockReleaseBuffer(buf);
> +
> +    /*
> +     * Modify the copied tuple to update the sequence state (similar to what
> +     * ResetSequence does).
> +     */
> +    seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
> +    seq->last_value = last_value;
> +    seq->is_called = is_called;
> +    seq->log_cnt = log_cnt;
> +
> +    /*
> +     * Create a new storage file for the sequence - this is needed for the
> +     * transactional behavior.
> +     */
> +    RelationSetNewRelfilenumber(seqrel, seqrel->rd_rel->relpersistence);
> +
> +    /*
> +     * Ensure sequence's relfrozenxid is at 0, since it won't contain any
> +     * unfrozen XIDs.  Same with relminmxid, since a sequence will never
> +     * contain multixacts.
> +     */
> +    Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId);
> +    Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId);
> +
> +    /*
> +     * Insert the modified tuple into the new storage file. This does all the
> +     * necessary WAL-logging etc.
> +     */
> +    fill_seq_with_data(seqrel, tuple);
> +
> +    /* Clear local cache so that we don't think we have cached numbers */
> +    /* Note that we do not change the currval() state */
> +    elm->cached = elm->last;
> +
> +    relation_close(seqrel, NoLock);
> +}
> +
> +/*
> + * Set a sequence to a specified internal state.
> + *
> + * The change is made transactionally, so that on failure of the current
> + * transaction, the sequence will be restored to its previous state.
> + * We do that by creating a whole new relfilenode for the sequence; so this
> + * works much like the rewriting forms of ALTER TABLE.
> + *
> + * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
> + * which must not be released until end of transaction.  Caller is also
> + * responsible for permissions checking.
> + */
> +void
> +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called)
> +{
> +    if (transactional)
> +        SetSequence_transactional(seq_relid, last_value, log_cnt, is_called);
> +    else
> +        SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called);
> +}

That's a lot of duplication with existing code. There's no explanation why
SetSequence() as well as do_setval() exists.


>  /*
>   * Initialize a sequence's relation with the specified tuple as content
>   *
> @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
>  
>      /* check the comment above nextval_internal()'s equivalent call. */
>      if (RelationNeedsWAL(rel))
> +    {
>          GetTopTransactionId();
>  
> +        if (XLogLogicalInfoActive())
> +            GetCurrentTransactionId();
> +    }

Is it actually possible to reach this without an xid already having been
assigned for the current xact?



> @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions)
>       * It's sufficient to ensure the toplevel transaction has an xid, no need
>       * to assign xids subxacts, that'll already trigger an appropriate wait.
>       * (Have to do that here, so we're outside the critical section)
> +     *
> +     * We have to ensure we have a proper XID, which will be included in
> +     * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
> +     * in a subxact (without any preceding changes) would get XID 0, and it
> +     * would then be impossible to decide which top xact it belongs to.
> +     * It'd also trigger assert in DecodeSequence. We only do that with
> +     * wal_level=logical, though.
> +     *
> +     * XXX This might seem unnecessary, because if there's no XID the xact
> +     * couldn't have done anything important yet, e.g. it could not have
> +     * created a sequence. But that's incorrect, because of subxacts. The
> +     * current subtransaction might not have done anything yet (thus no XID),
> +     * but an earlier one might have created the sequence.
>       */

What about restricting this to the case you're mentioning,
i.e. subtransactions?


> @@ -845,6 +1023,7 @@ nextval_internal(Oid relid, bool check_permissions)
>          seq->log_cnt = 0;
>  
>          xlrec.locator = seqrel->rd_locator;

I realize this isn't from this patch, but:

Why do we include the locator in the record? We already have it via
XLogRegisterBuffer(), no? And afaict we don't even use it, as we read the page
via XLogInitBufferForRedo() during recovery.

Kinda looks like an oversight in 2c03216d8311




> +/*
> + * Handle sequence decode
> + *
> + * Decoding sequences is a bit tricky, because while most sequence actions
> + * are non-transactional (not subject to rollback), some need to be handled
> + * as transactional.
> + *
> + * By default, a sequence increment is non-transactional - we must not queue
> + * it in a transaction as other changes, because the transaction might get
> + * rolled back and we'd discard the increment. The downstream would not be
> + * notified about the increment, which is wrong.
> + *
> + * On the other hand, the sequence may be created in a transaction. In this
> + * case we *should* queue the change as other changes in the transaction,
> + * because we don't want to send the increments for unknown sequence to the
> + * plugin - it might get confused about which sequence it's related to etc.
> + */
> +void
> +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{

> +    /* extract the WAL record, with "created" flag */
> +    xlrec = (xl_seq_rec *) XLogRecGetData(r);
> +
> +    /* XXX how could we have sequence change without data? */
> +    if(!datalen || !tupledata)
> +        return;

Yea, I think we should error out here instead, something has gone quite wrong
if this happens.


> +    tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
> +    DecodeSeqTuple(tupledata, datalen, tuplebuf);
> +
> +    /*
> +     * Should we handle the sequence increment as transactional or not?
> +     *
> +     * If the sequence was created in a still-running transaction, treat
> +     * it as transactional and queue the increments. Otherwise it needs
> +     * to be treated as non-transactional, in which case we send it to
> +     * the plugin right away.
> +     */
> +    transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
> +                                                         target_locator,
> +                                                         xlrec->created);

Why re-create this information during decoding, when we basically already have
it available on the primary? I think we already pay the price for that
tracking, which we e.g. use for doing a non-transactional truncate:

        /*
         * Normally, we need a transaction-safe truncation here.  However, if
         * the table was either created in the current (sub)transaction or has
         * a new relfilenumber in the current (sub)transaction, then we can
         * just truncate it in-place, because a rollback would cause the whole
         * table or the current physical file to be thrown away anyway.
         */
        if (rel->rd_createSubid == mySubid ||
            rel->rd_newRelfilelocatorSubid == mySubid)
        {
            /* Immediate, non-rollbackable truncation is OK */
            heap_truncate_one_rel(rel);
        }

Afaict we could do something similar for sequences, except that I think we
would just check if the sequence was created in the current transaction
(i.e. any of the fields are set).


> +/*
> + * A transactional sequence increment is queued to be processed upon commit
> + * and a non-transactional increment gets processed immediately.
> + *
> + * A sequence update may be both transactional and non-transactional. When
> + * created in a running transaction, treat it as transactional and queue
> + * the change in it. Otherwise treat it as non-transactional, so that we
> + * don't forget the increment in case of a rollback.
> + */
> +void
> +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
> +                           Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
> +                           RelFileLocator rlocator, bool transactional, bool created,
> +                           ReorderBufferTupleBuf *tuplebuf)


> +        /*
> +         * Decoding needs access to syscaches et al., which in turn use
> +         * heavyweight locks and such. Thus we need to have enough state around to
> +         * keep track of those.  The easiest way is to simply use a transaction
> +         * internally.  That also allows us to easily enforce that nothing writes
> +         * to the database by checking for xid assignments.
> +         *
> +         * When we're called via the SQL SRF there's already a transaction
> +         * started, so start an explicit subtransaction there.
> +         */
> +        using_subtxn = IsTransactionOrTransactionBlock();

This duplicates a lot of the code from ReorderBufferProcessTXN(). But only
does so partially. It's hard to tell whether some of the differences are
intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()?

Maybe something like

void
ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals);

void
ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error);




Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Nathan Bossart
Date:
Subject: Re: recovery modules
Next
From: Robert Haas
Date:
Subject: Re: allowing for control over SET ROLE