Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers
From | Kuntal Ghosh |
---|---|
Subject | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
Date | |
Msg-id | CAGz5QCLJK0QxV3jBcnEWd+cjz9en22=adQNFvmqpkCmR37wvZg@mail.gmail.com Whole thread Raw |
In response to | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions (Dilip Kumar <dilipbalaut@gmail.com>) |
Responses |
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
|
List | pgsql-hackers |
On Mon, Apr 13, 2020 at 6:34 PM Dilip Kumar <dilipbalaut@gmail.com> wrote: > Skipping 0003 for now. Review comments from 0004-Gracefully-handle-*.patch @@ -5490,6 +5523,14 @@ heap_finish_speculative(Relation relation, ItemPointer tid) ItemId lp = NULL; HeapTupleHeader htup; + /* + * We don't expect direct calls to heap_hot_search with + * valid CheckXidAlive for regular tables. Track that below. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && + !(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))) + elog(ERROR, "unexpected heap_hot_search call during logical decoding"); The call is to heap_finish_speculative. @@ -481,6 +482,19 @@ systable_getnext(SysScanDesc sysscan) } } + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); s/transaction aborted/transaction aborted concurrently perhaps? Also, can we move this check at the begining of the function? If the condition fails, we can skip the sys scan. Some of the checks looks repetative in the same file. Should we declare them as inline functions? Review comments from 0005-Implement-streaming*.patch +static void +AssertChangeLsnOrder(ReorderBufferTXN *txn) +{ +#ifdef USE_ASSERT_CHECKING + dlist_iter iter; ... +#endif +} We can implement the same as following: #ifdef USE_ASSERT_CHECKING static void AssertChangeLsnOrder(ReorderBufferTXN *txn) { dlist_iter iter; ... } #else #define AssertChangeLsnOrder(txn) ((void)true) #endif + * if it is aborted we will report an specific error which we can ignore. We s/an specific/a specific + * Set the last last of the stream as the final lsn before calling + * stream stop. s/last last/last PG_CATCH(); { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); When we don't re-throw, the errdata should be freed by calling FreeErrorData(errdata), right? + /* + * Set the last last of the stream as the final lsn before + * calling stream stop. + */ + txn->final_lsn = prev_lsn; + rb->stream_stop(rb, txn); + + FlushErrorState(); + } stream_stop() can still throw some error, right? In that case, we should flush the error state before calling stream_stop(). + /* + * Remember the command ID and snapshot if transaction is streaming + * otherwise free the snapshot if we have copied it. + */ + if (streaming) + { + txn->command_id = command_id; + + /* Avoid copying if it's already copied. */ + if (snapshot_now->copied) + txn->snapshot_now = snapshot_now; + else + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + } + else if (snapshot_now->copied) + ReorderBufferFreeSnap(rb, snapshot_now); Hmm, it seems this part needs an assumption that after copying the snapshot, no subsequent step can throw any error. If they do, then we can again create a copy of the snapshot in catch block, which will leak some memory. Is my understanding correct? + } + else + { + ReorderBufferCleanupTXN(rb, txn); + PG_RE_THROW(); + } Shouldn't we switch back to previously created error memory context before re-throwing? +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + volatile Snapshot snapshot_now; + volatile CommandId command_id = FirstCommandId; In the modified ReorderBufferCommit(), why is it necessary to declare the above two variable as volatile? There is no try-catch block here. @@ -1946,6 +2284,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* + * When the (sub)transaction was streamed, notify the remote node + * about the abort only if we have sent any data for this transaction. + */ + if (rbtxn_is_streamed(txn) && txn->any_data_sent) + rb->stream_abort(rb, txn, lsn); + s/When/If + /* + * When the (sub)transaction was streamed, notify the remote node + * about the abort. + */ + if (rbtxn_is_streamed(txn)) + rb->stream_abort(rb, txn, lsn); s/When/If. And, in this case, if we've not sent any data, why should we send the abort message (similar to the previous one)? + * Note: We never do both stream and serialize a transaction (we only spill + * to disk when streaming is not supported by the plugin), so only one of + * those two flags may be set at any given time. + */ +#define rbtxn_is_streamed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ +) Should we put any assert (not necessarily here) to validate the above comment? + txn = ReorderBufferLargestTopTXN(rb); + + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); The same three assertions are already there in ReorderBufferLargestTopTXN(). +static bool +ReorderBufferCanStream(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + + return ctx->streaming; +} Potential inline function. +static void +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + volatile Snapshot snapshot_now; + volatile CommandId command_id; Here also, do we need to declare these two variables as volatile? -- Thanks & Regards, Kuntal Ghosh EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: