Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers
From | Dilip Kumar |
---|---|
Subject | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
Date | |
Msg-id | CAFiTN-vy3_KA-+kTff1=UvKasp0r1BDScDuJv5hN2SEw1PHn_A@mail.gmail.com Whole thread Raw |
In response to | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions (Kuntal Ghosh <kuntalghosh.2007@gmail.com>) |
Responses |
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
List | pgsql-hackers |
On Mon, Apr 13, 2020 at 11:43 PM Kuntal Ghosh <kuntalghosh.2007@gmail.com> wrote: > > On Mon, Apr 13, 2020 at 6:34 PM Dilip Kumar <dilipbalaut@gmail.com> wrote: > > > Skipping 0003 for now. Review comments from 0004-Gracefully-handle-*.patch > > @@ -5490,6 +5523,14 @@ heap_finish_speculative(Relation relation, > ItemPointer tid) > ItemId lp = NULL; > HeapTupleHeader htup; > > + /* > + * We don't expect direct calls to heap_hot_search with > + * valid CheckXidAlive for regular tables. Track that below. > + */ > + if (unlikely(TransactionIdIsValid(CheckXidAlive) && > + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) > + elog(ERROR, "unexpected heap_hot_search call during logical decoding"); > The call is to heap_finish_speculative. Fixed > @@ -481,6 +482,19 @@ systable_getnext(SysScanDesc sysscan) > } > } > > + if (TransactionIdIsValid(CheckXidAlive) && > + !TransactionIdIsInProgress(CheckXidAlive) && > + !TransactionIdDidCommit(CheckXidAlive)) > + ereport(ERROR, > + (errcode(ERRCODE_TRANSACTION_ROLLBACK), > + errmsg("transaction aborted during system catalog scan"))); > s/transaction aborted/transaction aborted concurrently perhaps? Also, > can we move this check at the begining of the function? If the > condition fails, we can skip the sys scan. We must check this after we get the tuple because our goal is, not to decode based on the wrong tuple. And, if we move the check before then, what if the transaction aborted after the check. Once we get the tuple and if the transaction is alive by that time then it doesn't matter even if it aborts because we have got the right tuple already. > > Some of the checks looks repetative in the same file. Should we > declare them as inline functions? > > Review comments from 0005-Implement-streaming*.patch > > +static void > +AssertChangeLsnOrder(ReorderBufferTXN *txn) > +{ > +#ifdef USE_ASSERT_CHECKING > + dlist_iter iter; > ... > +#endif > +} > > We can implement the same as following: > #ifdef USE_ASSERT_CHECKING > static void > AssertChangeLsnOrder(ReorderBufferTXN *txn) > { > dlist_iter iter; > ... > } > #else > #define AssertChangeLsnOrder(txn) ((void)true) > #endif I am not sure, this doesn't look clean. Moreover, the other similar functions are defined in the same way. e.g. AssertTXNLsnOrder. > > + * if it is aborted we will report an specific error which we can ignore. We > s/an specific/a specific Done > > + * Set the last last of the stream as the final lsn before calling > + * stream stop. > s/last last/last > > PG_CATCH(); > { > + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); > + ErrorData *errdata = CopyErrorData(); > When we don't re-throw, the errdata should be freed by calling > FreeErrorData(errdata), right? Done > > + /* > + * Set the last last of the stream as the final lsn before > + * calling stream stop. > + */ > + txn->final_lsn = prev_lsn; > + rb->stream_stop(rb, txn); > + > + FlushErrorState(); > + } > stream_stop() can still throw some error, right? In that case, we > should flush the error state before calling stream_stop(). Done > > + /* > + * Remember the command ID and snapshot if transaction is streaming > + * otherwise free the snapshot if we have copied it. > + */ > + if (streaming) > + { > + txn->command_id = command_id; > + > + /* Avoid copying if it's already copied. */ > + if (snapshot_now->copied) > + txn->snapshot_now = snapshot_now; > + else > + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, > + txn, command_id); > + } > + else if (snapshot_now->copied) > + ReorderBufferFreeSnap(rb, snapshot_now); > Hmm, it seems this part needs an assumption that after copying the > snapshot, no subsequent step can throw any error. If they do, then we > can again create a copy of the snapshot in catch block, which will > leak some memory. Is my understanding correct? Actually, In CATCH we copy only if the error is ERRCODE_TRANSACTION_ROLLBACK. And, that can occur during systable scan. Basically, in TRY block we copy snapshot after we have streamed all the changes i.e. systable scan is done, now if there is any error that will not be ERRCODE_TRANSACTION_ROLLBACK. So we will not copy again. > > + } > + else > + { > + ReorderBufferCleanupTXN(rb, txn); > + PG_RE_THROW(); > + } > Shouldn't we switch back to previously created error memory context > before re-throwing? Fixed. > > +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, > + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, > + TimestampTz commit_time, > + RepOriginId origin_id, XLogRecPtr origin_lsn) > +{ > + ReorderBufferTXN *txn; > + volatile Snapshot snapshot_now; > + volatile CommandId command_id = FirstCommandId; > In the modified ReorderBufferCommit(), why is it necessary to declare > the above two variable as volatile? There is no try-catch block here. Fixed > > @@ -1946,6 +2284,13 @@ ReorderBufferAbort(ReorderBuffer *rb, > TransactionId xid, XLogRecPtr lsn) > if (txn == NULL) > return; > > + /* > + * When the (sub)transaction was streamed, notify the remote node > + * about the abort only if we have sent any data for this transaction. > + */ > + if (rbtxn_is_streamed(txn) && txn->any_data_sent) > + rb->stream_abort(rb, txn, lsn); > + > s/When/If > > + /* > + * When the (sub)transaction was streamed, notify the remote node > + * about the abort. > + */ > + if (rbtxn_is_streamed(txn)) > + rb->stream_abort(rb, txn, lsn); > s/When/If. And, in this case, if we've not sent any data, why should > we send the abort message (similar to the previous one)? Fixed > > + * Note: We never do both stream and serialize a transaction (we only spill > + * to disk when streaming is not supported by the plugin), so only one of > + * those two flags may be set at any given time. > + */ > +#define rbtxn_is_streamed(txn) \ > +( \ > + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ > +) > Should we put any assert (not necessarily here) to validate the above comment? Because of toast handling, this assumption is changed now so I will remove this note in that patch (0010). > > + txn = ReorderBufferLargestTopTXN(rb); > + > + /* we know there has to be one, because the size is not zero */ > + Assert(txn && !txn->toptxn); > + Assert(txn->size > 0); > + Assert(rb->size >= txn->size); > The same three assertions are already there in ReorderBufferLargestTopTXN(). > > +static bool > +ReorderBufferCanStream(ReorderBuffer *rb) > +{ > + LogicalDecodingContext *ctx = rb->private_data; > + > + return ctx->streaming; > +} > Potential inline function. Done > +static void > +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) > +{ > + volatile Snapshot snapshot_now; > + volatile CommandId command_id; > Here also, do we need to declare these two variables as volatile? Done -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
Attachment
- v14-0001-Immediately-WAL-log-assignments.patch
- v14-0003-Extend-the-output-plugin-API-with-stream-methods.patch
- v14-0002-Issue-individual-invalidations-with.patch
- v14-0005-Implement-streaming-mode-in-ReorderBuffer.patch
- v14-0004-Gracefully-handle-concurrent-aborts-of-uncommitt.patch
- v14-0006-Add-support-for-streaming-to-built-in-replicatio.patch
- v14-0008-Enable-streaming-for-all-subscription-TAP-tests.patch
- v14-0009-Add-TAP-test-for-streaming-vs.-DDL.patch
- v14-0007-Track-statistics-for-streaming.patch
- v14-0010-Bugfix-handling-of-incomplete-toast-tuple.patch
pgsql-hackers by date: