Re: logical decoding and replication of sequences, take 2 - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | Re: logical decoding and replication of sequences, take 2 |
Date | |
Msg-id | 20230111201216.xyc7rptmw23fdem4@awork3.anarazel.de Whole thread Raw |
In response to | Re: logical decoding and replication of sequences, take 2 (Tomas Vondra <tomas.vondra@enterprisedb.com>) |
Responses |
Re: logical decoding and replication of sequences, take 2
|
List | pgsql-hackers |
Hi, Heikki, CCed you due to the point about 2c03216d8311 below. On 2023-01-10 19:32:12 +0100, Tomas Vondra wrote: > 0001 is a fix for the pre-existing issue in logicalmsg_decode, > attempting to build a snapshot before getting into a consistent state. > AFAICS this only affects assert-enabled builds and is otherwise > harmless, because we are not actually using the snapshot (apply gets a > valid snapshot from the transaction). LGTM. > 0002 is a rebased version of the original approach, committed as > 0da92dc530 (and then reverted in 2c7ea57e56). This includes the same fix > as 0001 (for the sequence messages), the primary reason for the revert. > > The rebase was not quite straightforward, due to extensive changes in > how publications deal with tables/schemas, and so on. So this adopts > them, but other than that it behaves just like the original patch. This is a huge diff: > 72 files changed, 4715 insertions(+), 612 deletions(-) It'd be nice to split it to make review easier. Perhaps the sequence decoding support could be split from the whole publication rigamarole? > This does not include any changes to test_decoding and/or the built-in > replication - those will be committed in separate patches. Looks like that's not the case anymore? > +/* > + * Update the sequence state by modifying the existing sequence data row. > + * > + * This keeps the same relfilenode, so the behavior is non-transactional. > + */ > +static void > +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called) > +{ > + SeqTable elm; > + Relation seqrel; > + Buffer buf; > + HeapTupleData seqdatatuple; > + Form_pg_sequence_data seq; > + > + /* open and lock sequence */ > + init_sequence(seqrelid, &elm, &seqrel); > + > + /* lock page' buffer and read tuple */ > + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); > + > + /* check the comment above nextval_internal()'s equivalent call. */ > + if (RelationNeedsWAL(seqrel)) > + { > + GetTopTransactionId(); > + > + if (XLogLogicalInfoActive()) > + GetCurrentTransactionId(); > + } > + > + /* ready to change the on-disk (or really, in-buffer) tuple */ > + START_CRIT_SECTION(); > + > + seq->last_value = last_value; > + seq->is_called = is_called; > + seq->log_cnt = log_cnt; > + > + MarkBufferDirty(buf); > + > + /* XLOG stuff */ > + if (RelationNeedsWAL(seqrel)) > + { > + xl_seq_rec xlrec; > + XLogRecPtr recptr; > + Page page = BufferGetPage(buf); > + > + XLogBeginInsert(); > + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); > + > + xlrec.locator = seqrel->rd_locator; > + xlrec.created = false; > + > + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); > + XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); > + > + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); > + > + PageSetLSN(page, recptr); > + } > + > + END_CRIT_SECTION(); > + > + UnlockReleaseBuffer(buf); > + > + /* Clear local cache so that we don't think we have cached numbers */ > + /* Note that we do not change the currval() state */ > + elm->cached = elm->last; > + > + relation_close(seqrel, NoLock); > +} > + > +/* > + * Update the sequence state by creating a new relfilenode. > + * > + * This creates a new relfilenode, to allow transactional behavior. > + */ > +static void > +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called) > +{ > + SeqTable elm; > + Relation seqrel; > + Buffer buf; > + HeapTupleData seqdatatuple; > + Form_pg_sequence_data seq; > + HeapTuple tuple; > + > + /* open and lock sequence */ > + init_sequence(seq_relid, &elm, &seqrel); > + > + /* lock page' buffer and read tuple */ > + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); > + > + /* Copy the existing sequence tuple. */ > + tuple = heap_copytuple(&seqdatatuple); > + > + /* Now we're done with the old page */ > + UnlockReleaseBuffer(buf); > + > + /* > + * Modify the copied tuple to update the sequence state (similar to what > + * ResetSequence does). > + */ > + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); > + seq->last_value = last_value; > + seq->is_called = is_called; > + seq->log_cnt = log_cnt; > + > + /* > + * Create a new storage file for the sequence - this is needed for the > + * transactional behavior. > + */ > + RelationSetNewRelfilenumber(seqrel, seqrel->rd_rel->relpersistence); > + > + /* > + * Ensure sequence's relfrozenxid is at 0, since it won't contain any > + * unfrozen XIDs. Same with relminmxid, since a sequence will never > + * contain multixacts. > + */ > + Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId); > + Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId); > + > + /* > + * Insert the modified tuple into the new storage file. This does all the > + * necessary WAL-logging etc. > + */ > + fill_seq_with_data(seqrel, tuple); > + > + /* Clear local cache so that we don't think we have cached numbers */ > + /* Note that we do not change the currval() state */ > + elm->cached = elm->last; > + > + relation_close(seqrel, NoLock); > +} > + > +/* > + * Set a sequence to a specified internal state. > + * > + * The change is made transactionally, so that on failure of the current > + * transaction, the sequence will be restored to its previous state. > + * We do that by creating a whole new relfilenode for the sequence; so this > + * works much like the rewriting forms of ALTER TABLE. > + * > + * Caller is assumed to have acquired AccessExclusiveLock on the sequence, > + * which must not be released until end of transaction. Caller is also > + * responsible for permissions checking. > + */ > +void > +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called) > +{ > + if (transactional) > + SetSequence_transactional(seq_relid, last_value, log_cnt, is_called); > + else > + SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called); > +} That's a lot of duplication with existing code. There's no explanation why SetSequence() as well as do_setval() exists. > /* > * Initialize a sequence's relation with the specified tuple as content > * > @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum) > > /* check the comment above nextval_internal()'s equivalent call. */ > if (RelationNeedsWAL(rel)) > + { > GetTopTransactionId(); > > + if (XLogLogicalInfoActive()) > + GetCurrentTransactionId(); > + } Is it actually possible to reach this without an xid already having been assigned for the current xact? > @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions) > * It's sufficient to ensure the toplevel transaction has an xid, no need > * to assign xids subxacts, that'll already trigger an appropriate wait. > * (Have to do that here, so we're outside the critical section) > + * > + * We have to ensure we have a proper XID, which will be included in > + * the XLOG record by XLogRecordAssemble. Otherwise the first nextval() > + * in a subxact (without any preceding changes) would get XID 0, and it > + * would then be impossible to decide which top xact it belongs to. > + * It'd also trigger assert in DecodeSequence. We only do that with > + * wal_level=logical, though. > + * > + * XXX This might seem unnecessary, because if there's no XID the xact > + * couldn't have done anything important yet, e.g. it could not have > + * created a sequence. But that's incorrect, because of subxacts. The > + * current subtransaction might not have done anything yet (thus no XID), > + * but an earlier one might have created the sequence. > */ What about restricting this to the case you're mentioning, i.e. subtransactions? > @@ -845,6 +1023,7 @@ nextval_internal(Oid relid, bool check_permissions) > seq->log_cnt = 0; > > xlrec.locator = seqrel->rd_locator; I realize this isn't from this patch, but: Why do we include the locator in the record? We already have it via XLogRegisterBuffer(), no? And afaict we don't even use it, as we read the page via XLogInitBufferForRedo() during recovery. Kinda looks like an oversight in 2c03216d8311 > +/* > + * Handle sequence decode > + * > + * Decoding sequences is a bit tricky, because while most sequence actions > + * are non-transactional (not subject to rollback), some need to be handled > + * as transactional. > + * > + * By default, a sequence increment is non-transactional - we must not queue > + * it in a transaction as other changes, because the transaction might get > + * rolled back and we'd discard the increment. The downstream would not be > + * notified about the increment, which is wrong. > + * > + * On the other hand, the sequence may be created in a transaction. In this > + * case we *should* queue the change as other changes in the transaction, > + * because we don't want to send the increments for unknown sequence to the > + * plugin - it might get confused about which sequence it's related to etc. > + */ > +void > +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) > +{ > + /* extract the WAL record, with "created" flag */ > + xlrec = (xl_seq_rec *) XLogRecGetData(r); > + > + /* XXX how could we have sequence change without data? */ > + if(!datalen || !tupledata) > + return; Yea, I think we should error out here instead, something has gone quite wrong if this happens. > + tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); > + DecodeSeqTuple(tupledata, datalen, tuplebuf); > + > + /* > + * Should we handle the sequence increment as transactional or not? > + * > + * If the sequence was created in a still-running transaction, treat > + * it as transactional and queue the increments. Otherwise it needs > + * to be treated as non-transactional, in which case we send it to > + * the plugin right away. > + */ > + transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, > + target_locator, > + xlrec->created); Why re-create this information during decoding, when we basically already have it available on the primary? I think we already pay the price for that tracking, which we e.g. use for doing a non-transactional truncate: /* * Normally, we need a transaction-safe truncation here. However, if * the table was either created in the current (sub)transaction or has * a new relfilenumber in the current (sub)transaction, then we can * just truncate it in-place, because a rollback would cause the whole * table or the current physical file to be thrown away anyway. */ if (rel->rd_createSubid == mySubid || rel->rd_newRelfilelocatorSubid == mySubid) { /* Immediate, non-rollbackable truncation is OK */ heap_truncate_one_rel(rel); } Afaict we could do something similar for sequences, except that I think we would just check if the sequence was created in the current transaction (i.e. any of the fields are set). > +/* > + * A transactional sequence increment is queued to be processed upon commit > + * and a non-transactional increment gets processed immediately. > + * > + * A sequence update may be both transactional and non-transactional. When > + * created in a running transaction, treat it as transactional and queue > + * the change in it. Otherwise treat it as non-transactional, so that we > + * don't forget the increment in case of a rollback. > + */ > +void > +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, > + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, > + RelFileLocator rlocator, bool transactional, bool created, > + ReorderBufferTupleBuf *tuplebuf) > + /* > + * Decoding needs access to syscaches et al., which in turn use > + * heavyweight locks and such. Thus we need to have enough state around to > + * keep track of those. The easiest way is to simply use a transaction > + * internally. That also allows us to easily enforce that nothing writes > + * to the database by checking for xid assignments. > + * > + * When we're called via the SQL SRF there's already a transaction > + * started, so start an explicit subtransaction there. > + */ > + using_subtxn = IsTransactionOrTransactionBlock(); This duplicates a lot of the code from ReorderBufferProcessTXN(). But only does so partially. It's hard to tell whether some of the differences are intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()? Maybe something like void ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals); void ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error); Greetings, Andres Freund
pgsql-hackers by date: