Re: logical decoding and replication of sequences, take 2 - Mailing list pgsql-hackers
From | Tomas Vondra |
---|---|
Subject | Re: logical decoding and replication of sequences, take 2 |
Date | |
Msg-id | d295927b-fcab-cb9f-0a0d-a846041fcf04@enterprisedb.com Whole thread Raw |
In response to | Re: logical decoding and replication of sequences, take 2 (Andres Freund <andres@anarazel.de>) |
Responses |
Re: logical decoding and replication of sequences, take 2
|
List | pgsql-hackers |
On 1/11/23 21:12, Andres Freund wrote: > Hi, > > > Heikki, CCed you due to the point about 2c03216d8311 below. > > > On 2023-01-10 19:32:12 +0100, Tomas Vondra wrote: >> 0001 is a fix for the pre-existing issue in logicalmsg_decode, >> attempting to build a snapshot before getting into a consistent state. >> AFAICS this only affects assert-enabled builds and is otherwise >> harmless, because we are not actually using the snapshot (apply gets a >> valid snapshot from the transaction). > > LGTM. > > >> 0002 is a rebased version of the original approach, committed as >> 0da92dc530 (and then reverted in 2c7ea57e56). This includes the same fix >> as 0001 (for the sequence messages), the primary reason for the revert. >> >> The rebase was not quite straightforward, due to extensive changes in >> how publications deal with tables/schemas, and so on. So this adopts >> them, but other than that it behaves just like the original patch. > > This is a huge diff: >> 72 files changed, 4715 insertions(+), 612 deletions(-) > > It'd be nice to split it to make review easier. Perhaps the sequence decoding > support could be split from the whole publication rigamarole? > > >> This does not include any changes to test_decoding and/or the built-in >> replication - those will be committed in separate patches. > > Looks like that's not the case anymore? > Ah, right! Now I realized I originally committed this in chunks, but the revert was a single commit. And I just "reverted the revert" to create this patch. I'll definitely split this into smaller patches. This also explains the obsolete commit message about test_decoding not being included, etc. > >> +/* >> + * Update the sequence state by modifying the existing sequence data row. >> + * >> + * This keeps the same relfilenode, so the behavior is non-transactional. >> + */ >> +static void >> +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called) >> +{ >> + SeqTable elm; >> + Relation seqrel; >> + Buffer buf; >> + HeapTupleData seqdatatuple; >> + Form_pg_sequence_data seq; >> + >> + /* open and lock sequence */ >> + init_sequence(seqrelid, &elm, &seqrel); >> + >> + /* lock page' buffer and read tuple */ >> + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); >> + >> + /* check the comment above nextval_internal()'s equivalent call. */ >> + if (RelationNeedsWAL(seqrel)) >> + { >> + GetTopTransactionId(); >> + >> + if (XLogLogicalInfoActive()) >> + GetCurrentTransactionId(); >> + } >> + >> + /* ready to change the on-disk (or really, in-buffer) tuple */ >> + START_CRIT_SECTION(); >> + >> + seq->last_value = last_value; >> + seq->is_called = is_called; >> + seq->log_cnt = log_cnt; >> + >> + MarkBufferDirty(buf); >> + >> + /* XLOG stuff */ >> + if (RelationNeedsWAL(seqrel)) >> + { >> + xl_seq_rec xlrec; >> + XLogRecPtr recptr; >> + Page page = BufferGetPage(buf); >> + >> + XLogBeginInsert(); >> + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); >> + >> + xlrec.locator = seqrel->rd_locator; >> + xlrec.created = false; >> + >> + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); >> + XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); >> + >> + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); >> + >> + PageSetLSN(page, recptr); >> + } >> + >> + END_CRIT_SECTION(); >> + >> + UnlockReleaseBuffer(buf); >> + >> + /* Clear local cache so that we don't think we have cached numbers */ >> + /* Note that we do not change the currval() state */ >> + elm->cached = elm->last; >> + >> + relation_close(seqrel, NoLock); >> +} >> + >> +/* >> + * Update the sequence state by creating a new relfilenode. >> + * >> + * This creates a new relfilenode, to allow transactional behavior. >> + */ >> +static void >> +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called) >> +{ >> + SeqTable elm; >> + Relation seqrel; >> + Buffer buf; >> + HeapTupleData seqdatatuple; >> + Form_pg_sequence_data seq; >> + HeapTuple tuple; >> + >> + /* open and lock sequence */ >> + init_sequence(seq_relid, &elm, &seqrel); >> + >> + /* lock page' buffer and read tuple */ >> + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); >> + >> + /* Copy the existing sequence tuple. */ >> + tuple = heap_copytuple(&seqdatatuple); >> + >> + /* Now we're done with the old page */ >> + UnlockReleaseBuffer(buf); >> + >> + /* >> + * Modify the copied tuple to update the sequence state (similar to what >> + * ResetSequence does). >> + */ >> + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); >> + seq->last_value = last_value; >> + seq->is_called = is_called; >> + seq->log_cnt = log_cnt; >> + >> + /* >> + * Create a new storage file for the sequence - this is needed for the >> + * transactional behavior. >> + */ >> + RelationSetNewRelfilenumber(seqrel, seqrel->rd_rel->relpersistence); >> + >> + /* >> + * Ensure sequence's relfrozenxid is at 0, since it won't contain any >> + * unfrozen XIDs. Same with relminmxid, since a sequence will never >> + * contain multixacts. >> + */ >> + Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId); >> + Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId); >> + >> + /* >> + * Insert the modified tuple into the new storage file. This does all the >> + * necessary WAL-logging etc. >> + */ >> + fill_seq_with_data(seqrel, tuple); >> + >> + /* Clear local cache so that we don't think we have cached numbers */ >> + /* Note that we do not change the currval() state */ >> + elm->cached = elm->last; >> + >> + relation_close(seqrel, NoLock); >> +} >> + >> +/* >> + * Set a sequence to a specified internal state. >> + * >> + * The change is made transactionally, so that on failure of the current >> + * transaction, the sequence will be restored to its previous state. >> + * We do that by creating a whole new relfilenode for the sequence; so this >> + * works much like the rewriting forms of ALTER TABLE. >> + * >> + * Caller is assumed to have acquired AccessExclusiveLock on the sequence, >> + * which must not be released until end of transaction. Caller is also >> + * responsible for permissions checking. >> + */ >> +void >> +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called) >> +{ >> + if (transactional) >> + SetSequence_transactional(seq_relid, last_value, log_cnt, is_called); >> + else >> + SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called); >> +} > > That's a lot of duplication with existing code. There's no explanation why > SetSequence() as well as do_setval() exists. > Thanks, I'll look into this. > >> /* >> * Initialize a sequence's relation with the specified tuple as content >> * >> @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum) >> >> /* check the comment above nextval_internal()'s equivalent call. */ >> if (RelationNeedsWAL(rel)) >> + { >> GetTopTransactionId(); >> >> + if (XLogLogicalInfoActive()) >> + GetCurrentTransactionId(); >> + } > > Is it actually possible to reach this without an xid already having been > assigned for the current xact? > I believe it is. That's probably how I found this change is needed, actually. > > >> @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions) >> * It's sufficient to ensure the toplevel transaction has an xid, no need >> * to assign xids subxacts, that'll already trigger an appropriate wait. >> * (Have to do that here, so we're outside the critical section) >> + * >> + * We have to ensure we have a proper XID, which will be included in >> + * the XLOG record by XLogRecordAssemble. Otherwise the first nextval() >> + * in a subxact (without any preceding changes) would get XID 0, and it >> + * would then be impossible to decide which top xact it belongs to. >> + * It'd also trigger assert in DecodeSequence. We only do that with >> + * wal_level=logical, though. >> + * >> + * XXX This might seem unnecessary, because if there's no XID the xact >> + * couldn't have done anything important yet, e.g. it could not have >> + * created a sequence. But that's incorrect, because of subxacts. The >> + * current subtransaction might not have done anything yet (thus no XID), >> + * but an earlier one might have created the sequence. >> */ > > What about restricting this to the case you're mentioning, > i.e. subtransactions? > That might work, but I need to think about it a bit. I don't think it'd save us much, though. I mean, vast majority of transactions (and subtransactions) calling nextval() will then do something else which requires a XID. This just moves the XID a bit, that's all. > >> @@ -845,6 +1023,7 @@ nextval_internal(Oid relid, bool check_permissions) >> seq->log_cnt = 0; >> >> xlrec.locator = seqrel->rd_locator; > > I realize this isn't from this patch, but: > > Why do we include the locator in the record? We already have it via > XLogRegisterBuffer(), no? And afaict we don't even use it, as we read the page > via XLogInitBufferForRedo() during recovery. > > Kinda looks like an oversight in 2c03216d8311 > I don't know, it's what the code did. > > > >> +/* >> + * Handle sequence decode >> + * >> + * Decoding sequences is a bit tricky, because while most sequence actions >> + * are non-transactional (not subject to rollback), some need to be handled >> + * as transactional. >> + * >> + * By default, a sequence increment is non-transactional - we must not queue >> + * it in a transaction as other changes, because the transaction might get >> + * rolled back and we'd discard the increment. The downstream would not be >> + * notified about the increment, which is wrong. >> + * >> + * On the other hand, the sequence may be created in a transaction. In this >> + * case we *should* queue the change as other changes in the transaction, >> + * because we don't want to send the increments for unknown sequence to the >> + * plugin - it might get confused about which sequence it's related to etc. >> + */ >> +void >> +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) >> +{ > >> + /* extract the WAL record, with "created" flag */ >> + xlrec = (xl_seq_rec *) XLogRecGetData(r); >> + >> + /* XXX how could we have sequence change without data? */ >> + if(!datalen || !tupledata) >> + return; > > Yea, I think we should error out here instead, something has gone quite wrong > if this happens. > OK > >> + tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); >> + DecodeSeqTuple(tupledata, datalen, tuplebuf); >> + >> + /* >> + * Should we handle the sequence increment as transactional or not? >> + * >> + * If the sequence was created in a still-running transaction, treat >> + * it as transactional and queue the increments. Otherwise it needs >> + * to be treated as non-transactional, in which case we send it to >> + * the plugin right away. >> + */ >> + transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, >> + target_locator, >> + xlrec->created); > > Why re-create this information during decoding, when we basically already have > it available on the primary? I think we already pay the price for that > tracking, which we e.g. use for doing a non-transactional truncate: > > /* > * Normally, we need a transaction-safe truncation here. However, if > * the table was either created in the current (sub)transaction or has > * a new relfilenumber in the current (sub)transaction, then we can > * just truncate it in-place, because a rollback would cause the whole > * table or the current physical file to be thrown away anyway. > */ > if (rel->rd_createSubid == mySubid || > rel->rd_newRelfilelocatorSubid == mySubid) > { > /* Immediate, non-rollbackable truncation is OK */ > heap_truncate_one_rel(rel); > } > > Afaict we could do something similar for sequences, except that I think we > would just check if the sequence was created in the current transaction > (i.e. any of the fields are set). > Hmm, good point. > >> +/* >> + * A transactional sequence increment is queued to be processed upon commit >> + * and a non-transactional increment gets processed immediately. >> + * >> + * A sequence update may be both transactional and non-transactional. When >> + * created in a running transaction, treat it as transactional and queue >> + * the change in it. Otherwise treat it as non-transactional, so that we >> + * don't forget the increment in case of a rollback. >> + */ >> +void >> +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, >> + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, >> + RelFileLocator rlocator, bool transactional, bool created, >> + ReorderBufferTupleBuf *tuplebuf) > > >> + /* >> + * Decoding needs access to syscaches et al., which in turn use >> + * heavyweight locks and such. Thus we need to have enough state around to >> + * keep track of those. The easiest way is to simply use a transaction >> + * internally. That also allows us to easily enforce that nothing writes >> + * to the database by checking for xid assignments. >> + * >> + * When we're called via the SQL SRF there's already a transaction >> + * started, so start an explicit subtransaction there. >> + */ >> + using_subtxn = IsTransactionOrTransactionBlock(); > > This duplicates a lot of the code from ReorderBufferProcessTXN(). But only > does so partially. It's hard to tell whether some of the differences are > intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()? > > Maybe something like > > void > ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals); > > void > ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error); > Thanks for the suggestion, I'll definitely consider that in the next version of the patch. regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
pgsql-hackers by date: