Re: logical decoding and replication of sequences, take 2 - Mailing list pgsql-hackers

From Tomas Vondra
Subject Re: logical decoding and replication of sequences, take 2
Date
Msg-id d295927b-fcab-cb9f-0a0d-a846041fcf04@enterprisedb.com
Whole thread Raw
In response to Re: logical decoding and replication of sequences, take 2  (Andres Freund <andres@anarazel.de>)
Responses Re: logical decoding and replication of sequences, take 2
List pgsql-hackers

On 1/11/23 21:12, Andres Freund wrote:
> Hi,
> 
> 
> Heikki, CCed you due to the point about 2c03216d8311 below.
> 
> 
> On 2023-01-10 19:32:12 +0100, Tomas Vondra wrote:
>> 0001 is a fix for the pre-existing issue in logicalmsg_decode,
>> attempting to build a snapshot before getting into a consistent state.
>> AFAICS this only affects assert-enabled builds and is otherwise
>> harmless, because we are not actually using the snapshot (apply gets a
>> valid snapshot from the transaction).
> 
> LGTM.
> 
> 
>> 0002 is a rebased version of the original approach, committed as
>> 0da92dc530 (and then reverted in 2c7ea57e56). This includes the same fix
>> as 0001 (for the sequence messages), the primary reason for the revert.
>>
>> The rebase was not quite straightforward, due to extensive changes in
>> how publications deal with tables/schemas, and so on. So this adopts
>> them, but other than that it behaves just like the original patch.
> 
> This is a huge diff:
>>  72 files changed, 4715 insertions(+), 612 deletions(-)
> 
> It'd be nice to split it to make review easier. Perhaps the sequence decoding
> support could be split from the whole publication rigamarole?
> 
> 
>> This does not include any changes to test_decoding and/or the built-in
>> replication - those will be committed in separate patches.
> 
> Looks like that's not the case anymore?
> 

Ah, right!  Now I realized I originally committed this in chunks, but
the revert was a single commit. And I just "reverted the revert" to
create this patch.

I'll definitely split this into smaller patches. This also explains the
obsolete commit message about test_decoding not being included, etc.

> 
>> +/*
>> + * Update the sequence state by modifying the existing sequence data row.
>> + *
>> + * This keeps the same relfilenode, so the behavior is non-transactional.
>> + */
>> +static void
>> +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> +    SeqTable    elm;
>> +    Relation    seqrel;
>> +    Buffer        buf;
>> +    HeapTupleData seqdatatuple;
>> +    Form_pg_sequence_data seq;
>> +
>> +    /* open and lock sequence */
>> +    init_sequence(seqrelid, &elm, &seqrel);
>> +
>> +    /* lock page' buffer and read tuple */
>> +    seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> +    /* check the comment above nextval_internal()'s equivalent call. */
>> +    if (RelationNeedsWAL(seqrel))
>> +    {
>> +        GetTopTransactionId();
>> +
>> +        if (XLogLogicalInfoActive())
>> +            GetCurrentTransactionId();
>> +    }
>> +
>> +    /* ready to change the on-disk (or really, in-buffer) tuple */
>> +    START_CRIT_SECTION();
>> +
>> +    seq->last_value = last_value;
>> +    seq->is_called = is_called;
>> +    seq->log_cnt = log_cnt;
>> +
>> +    MarkBufferDirty(buf);
>> +
>> +    /* XLOG stuff */
>> +    if (RelationNeedsWAL(seqrel))
>> +    {
>> +        xl_seq_rec    xlrec;
>> +        XLogRecPtr    recptr;
>> +        Page        page = BufferGetPage(buf);
>> +
>> +        XLogBeginInsert();
>> +        XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
>> +
>> +        xlrec.locator = seqrel->rd_locator;
>> +        xlrec.created = false;
>> +
>> +        XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
>> +        XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
>> +
>> +        recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
>> +
>> +        PageSetLSN(page, recptr);
>> +    }
>> +
>> +    END_CRIT_SECTION();
>> +
>> +    UnlockReleaseBuffer(buf);
>> +
>> +    /* Clear local cache so that we don't think we have cached numbers */
>> +    /* Note that we do not change the currval() state */
>> +    elm->cached = elm->last;
>> +
>> +    relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Update the sequence state by creating a new relfilenode.
>> + *
>> + * This creates a new relfilenode, to allow transactional behavior.
>> + */
>> +static void
>> +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> +    SeqTable    elm;
>> +    Relation    seqrel;
>> +    Buffer        buf;
>> +    HeapTupleData seqdatatuple;
>> +    Form_pg_sequence_data seq;
>> +    HeapTuple    tuple;
>> +
>> +    /* open and lock sequence */
>> +    init_sequence(seq_relid, &elm, &seqrel);
>> +
>> +    /* lock page' buffer and read tuple */
>> +    seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> +    /* Copy the existing sequence tuple. */
>> +    tuple = heap_copytuple(&seqdatatuple);
>> +
>> +    /* Now we're done with the old page */
>> +    UnlockReleaseBuffer(buf);
>> +
>> +    /*
>> +     * Modify the copied tuple to update the sequence state (similar to what
>> +     * ResetSequence does).
>> +     */
>> +    seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
>> +    seq->last_value = last_value;
>> +    seq->is_called = is_called;
>> +    seq->log_cnt = log_cnt;
>> +
>> +    /*
>> +     * Create a new storage file for the sequence - this is needed for the
>> +     * transactional behavior.
>> +     */
>> +    RelationSetNewRelfilenumber(seqrel, seqrel->rd_rel->relpersistence);
>> +
>> +    /*
>> +     * Ensure sequence's relfrozenxid is at 0, since it won't contain any
>> +     * unfrozen XIDs.  Same with relminmxid, since a sequence will never
>> +     * contain multixacts.
>> +     */
>> +    Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId);
>> +    Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId);
>> +
>> +    /*
>> +     * Insert the modified tuple into the new storage file. This does all the
>> +     * necessary WAL-logging etc.
>> +     */
>> +    fill_seq_with_data(seqrel, tuple);
>> +
>> +    /* Clear local cache so that we don't think we have cached numbers */
>> +    /* Note that we do not change the currval() state */
>> +    elm->cached = elm->last;
>> +
>> +    relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Set a sequence to a specified internal state.
>> + *
>> + * The change is made transactionally, so that on failure of the current
>> + * transaction, the sequence will be restored to its previous state.
>> + * We do that by creating a whole new relfilenode for the sequence; so this
>> + * works much like the rewriting forms of ALTER TABLE.
>> + *
>> + * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
>> + * which must not be released until end of transaction.  Caller is also
>> + * responsible for permissions checking.
>> + */
>> +void
>> +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> +    if (transactional)
>> +        SetSequence_transactional(seq_relid, last_value, log_cnt, is_called);
>> +    else
>> +        SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called);
>> +}
> 
> That's a lot of duplication with existing code. There's no explanation why
> SetSequence() as well as do_setval() exists.
> 

Thanks, I'll look into this.

> 
>>  /*
>>   * Initialize a sequence's relation with the specified tuple as content
>>   *
>> @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
>>  
>>      /* check the comment above nextval_internal()'s equivalent call. */
>>      if (RelationNeedsWAL(rel))
>> +    {
>>          GetTopTransactionId();
>>  
>> +        if (XLogLogicalInfoActive())
>> +            GetCurrentTransactionId();
>> +    }
> 
> Is it actually possible to reach this without an xid already having been
> assigned for the current xact?
> 

I believe it is. That's probably how I found this change is needed,
actually.

> 
> 
>> @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions)
>>       * It's sufficient to ensure the toplevel transaction has an xid, no need
>>       * to assign xids subxacts, that'll already trigger an appropriate wait.
>>       * (Have to do that here, so we're outside the critical section)
>> +     *
>> +     * We have to ensure we have a proper XID, which will be included in
>> +     * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
>> +     * in a subxact (without any preceding changes) would get XID 0, and it
>> +     * would then be impossible to decide which top xact it belongs to.
>> +     * It'd also trigger assert in DecodeSequence. We only do that with
>> +     * wal_level=logical, though.
>> +     *
>> +     * XXX This might seem unnecessary, because if there's no XID the xact
>> +     * couldn't have done anything important yet, e.g. it could not have
>> +     * created a sequence. But that's incorrect, because of subxacts. The
>> +     * current subtransaction might not have done anything yet (thus no XID),
>> +     * but an earlier one might have created the sequence.
>>       */
> 
> What about restricting this to the case you're mentioning,
> i.e. subtransactions?
> 

That might work, but I need to think about it a bit.

I don't think it'd save us much, though. I mean, vast majority of
transactions (and subtransactions) calling nextval() will then do
something else which requires a XID. This just moves the XID a bit,
that's all.

> 
>> @@ -845,6 +1023,7 @@ nextval_internal(Oid relid, bool check_permissions)
>>          seq->log_cnt = 0;
>>  
>>          xlrec.locator = seqrel->rd_locator;
> 
> I realize this isn't from this patch, but:
> 
> Why do we include the locator in the record? We already have it via
> XLogRegisterBuffer(), no? And afaict we don't even use it, as we read the page
> via XLogInitBufferForRedo() during recovery.
> 
> Kinda looks like an oversight in 2c03216d8311
> 

I don't know, it's what the code did.

> 
> 
> 
>> +/*
>> + * Handle sequence decode
>> + *
>> + * Decoding sequences is a bit tricky, because while most sequence actions
>> + * are non-transactional (not subject to rollback), some need to be handled
>> + * as transactional.
>> + *
>> + * By default, a sequence increment is non-transactional - we must not queue
>> + * it in a transaction as other changes, because the transaction might get
>> + * rolled back and we'd discard the increment. The downstream would not be
>> + * notified about the increment, which is wrong.
>> + *
>> + * On the other hand, the sequence may be created in a transaction. In this
>> + * case we *should* queue the change as other changes in the transaction,
>> + * because we don't want to send the increments for unknown sequence to the
>> + * plugin - it might get confused about which sequence it's related to etc.
>> + */
>> +void
>> +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> +{
> 
>> +    /* extract the WAL record, with "created" flag */
>> +    xlrec = (xl_seq_rec *) XLogRecGetData(r);
>> +
>> +    /* XXX how could we have sequence change without data? */
>> +    if(!datalen || !tupledata)
>> +        return;
> 
> Yea, I think we should error out here instead, something has gone quite wrong
> if this happens.
> 

OK

> 
>> +    tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
>> +    DecodeSeqTuple(tupledata, datalen, tuplebuf);
>> +
>> +    /*
>> +     * Should we handle the sequence increment as transactional or not?
>> +     *
>> +     * If the sequence was created in a still-running transaction, treat
>> +     * it as transactional and queue the increments. Otherwise it needs
>> +     * to be treated as non-transactional, in which case we send it to
>> +     * the plugin right away.
>> +     */
>> +    transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
>> +                                                         target_locator,
>> +                                                         xlrec->created);
> 
> Why re-create this information during decoding, when we basically already have
> it available on the primary? I think we already pay the price for that
> tracking, which we e.g. use for doing a non-transactional truncate:
> 
>         /*
>          * Normally, we need a transaction-safe truncation here.  However, if
>          * the table was either created in the current (sub)transaction or has
>          * a new relfilenumber in the current (sub)transaction, then we can
>          * just truncate it in-place, because a rollback would cause the whole
>          * table or the current physical file to be thrown away anyway.
>          */
>         if (rel->rd_createSubid == mySubid ||
>             rel->rd_newRelfilelocatorSubid == mySubid)
>         {
>             /* Immediate, non-rollbackable truncation is OK */
>             heap_truncate_one_rel(rel);
>         }
> 
> Afaict we could do something similar for sequences, except that I think we
> would just check if the sequence was created in the current transaction
> (i.e. any of the fields are set).
> 

Hmm, good point.

> 
>> +/*
>> + * A transactional sequence increment is queued to be processed upon commit
>> + * and a non-transactional increment gets processed immediately.
>> + *
>> + * A sequence update may be both transactional and non-transactional. When
>> + * created in a running transaction, treat it as transactional and queue
>> + * the change in it. Otherwise treat it as non-transactional, so that we
>> + * don't forget the increment in case of a rollback.
>> + */
>> +void
>> +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
>> +                           Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
>> +                           RelFileLocator rlocator, bool transactional, bool created,
>> +                           ReorderBufferTupleBuf *tuplebuf)
> 
> 
>> +        /*
>> +         * Decoding needs access to syscaches et al., which in turn use
>> +         * heavyweight locks and such. Thus we need to have enough state around to
>> +         * keep track of those.  The easiest way is to simply use a transaction
>> +         * internally.  That also allows us to easily enforce that nothing writes
>> +         * to the database by checking for xid assignments.
>> +         *
>> +         * When we're called via the SQL SRF there's already a transaction
>> +         * started, so start an explicit subtransaction there.
>> +         */
>> +        using_subtxn = IsTransactionOrTransactionBlock();
> 
> This duplicates a lot of the code from ReorderBufferProcessTXN(). But only
> does so partially. It's hard to tell whether some of the differences are
> intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()?
> 
> Maybe something like
> 
> void
> ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals);
> 
> void
> ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error);
> 

Thanks for the suggestion, I'll definitely consider that in the next
version of the patch.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: Re: Option to not use ringbuffer in VACUUM, using it in failsafe mode
Next
From: Andres Freund
Date:
Subject: Re: logical decoding and replication of sequences, take 2