Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers
From | Dilip Kumar |
---|---|
Subject | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions |
Date | |
Msg-id | CAFiTN-v_ydjaCksAA3obA67LaC5imN4mbH4J+vr+NBb6YPvmrA@mail.gmail.com Whole thread Raw |
In response to | Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
|
List | pgsql-hackers |
On Wed, May 13, 2020 at 4:50 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Wed, May 13, 2020 at 11:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote: > > > > On Tue, May 12, 2020 at 4:39 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > > > > > > > > v20-0003-Extend-the-output-plugin-API-with-stream-methods > > > ---------------------------------------------------------------------------------------- > > > 1. > > > +static void > > > +pg_decode_stream_change(LogicalDecodingContext *ctx, > > > + ReorderBufferTXN *txn, > > > + Relation relation, > > > + ReorderBufferChange *change) > > > +{ > > > + OutputPluginPrepareWrite(ctx, true); > > > + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); > > > + OutputPluginWrite(ctx, true); > > > +} > > > + > > > +static void > > > +pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, > > > + int nrelations, Relation relations[], > > > + ReorderBufferChange *change) > > > +{ > > > + OutputPluginPrepareWrite(ctx, true); > > > + appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); > > > + OutputPluginWrite(ctx, true); > > > +} > > > > > > In the above and similar APIs, there are parameters like relation > > > which are not used. I think you should add some comments atop these > > > APIs to explain why it is so? I guess it is because we want to keep > > > them similar to non-stream version of APIs and we can't display > > > relation or other information as the transaction is still in-progress. > > > > I think because the interfaces are designed that way because other > > decoding plugins might need it e.g. in pgoutput we need change and > > relation but not here. We have other similar examples also e.g. > > pg_decode_message has the parameter txn but not used. Do you think we > > still need to add comments? > > > > In that case, we can leave but lets ensure that we are not exposing > any parameter which is not used and if there is any due to some > reason, we should document it. I will also look into this. Ok > > > 4. > > > +static void > > > +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) > > > +{ > > > + LogicalDecodingContext *ctx = cache->private_data; > > > + LogicalErrorCallbackState state; > > > + ErrorContextCallback errcallback; > > > + > > > + Assert(!ctx->fast_forward); > > > + > > > + /* We're only supposed to call this when streaming is supported. */ > > > + Assert(ctx->streaming); > > > + > > > + /* Push callback + info on the error context stack */ > > > + state.ctx = ctx; > > > + state.callback_name = "stream_start"; > > > + /* state.report_location = apply_lsn; */ > > > > > > Why can't we supply the report_location here? I think here we need to > > > report txn->first_lsn if this is the very first stream and > > > txn->final_lsn if it is any consecutive one. > > > > I am not sure about this, Because for the very first stream we will > > report the location of the first lsn of the stream and for the > > consecutive stream we will report the last lsn in the stream. > > > > Yeah, that doesn't seem to be consistent. How about if get it as an > additional parameter? The caller can pass the lsn of the very first > change it is trying to decode in this stream. Done > > > 11. > > > - * HeapTupleSatisfiesHistoricMVCC. > > > + * tqual.c's HeapTupleSatisfiesHistoricMVCC. > > > + * > > > + * We do build the hash table even if there are no CIDs. That's > > > + * because when streaming in-progress transactions we may run into > > > + * tuples with the CID before actually decoding them. Think e.g. about > > > + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded > > > + * yet when applying the INSERT. So we build a hash table so that > > > + * ResolveCminCmaxDuringDecoding does not segfault in this case. > > > + * > > > + * XXX We might limit this behavior to streaming mode, and just bail > > > + * out when decoding transaction at commit time (at which point it's > > > + * guaranteed to see all CIDs). > > > */ > > > static void > > > ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) > > > @@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer > > > *rb, ReorderBufferTXN *txn) > > > dlist_iter iter; > > > HASHCTL hash_ctl; > > > > > > - if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) > > > - return; > > > - > > > > > > I don't understand this change. Why would "INSERT followed by > > > TRUNCATE" could lead to a tuple which can come for decode before its > > > CID? > > > > Actually, even if we haven't decoded the DDL operation but in the > > actual system table the tuple might have been deleted from the next > > operation. e.g. while we are streaming the INSERT it is possible that > > the truncate has already deleted that tuple and set the max for the > > tuple. So before streaming patch, we were only streaming the INSERT > > only on commit so by that time we had got all the operation which has > > done DDL and we would have already prepared tuple CID hash. > > > > Okay, but I think for that case how good is that we always allow CID > hash table to be built even if there are no catalog changes in TXN > (see changes in ReorderBufferBuildTupleCidHash). Can't we detect that > while resolving the cmin/cmax? Done > > Few more comments for v20-0005-Implement-streaming-mode-in-ReorderBuffer: > ---------------------------------------------------------------------------------------------------------------- > 1. > /* > - * Binary heap comparison function. > + * Binary heap comparison function (regular non-streaming iterator). > */ > static int > ReorderBufferIterCompare(Datum a, Datum b, void *arg) > > It seems to me the above comment change is not required as per the latest patch. Done > 2. > * For subtransactions, we only mark them as streamed when there are > + * any changes in them. > + * > + * We do it this way because of aborts - we don't want to send aborts > + * for XIDs the downstream is not aware of. And of course, it always > + * knows about the toplevel xact (we send the XID in all messages), > + * but we never stream XIDs of empty subxacts. > + */ > + if ((!txn->toptxn) || (txn->nentries_mem != 0)) > + txn->txn_flags |= RBTXN_IS_STREAMED; > > /when there are any changes in them/when there are changes in them. I > think we don't need 'any' in the above sentence. Done > 3. > And, during catalog scan we can check the status of the xid and > + * if it is aborted we will report a specific error that we can ignore. We > + * might have already streamed some of the changes for the aborted > + * (sub)transaction, but that is fine because when we decode the abort we will > + * stream abort message to truncate the changes in the subscriber. > + */ > +static inline void > +SetupCheckXidLive(TransactionId xid) > > In the above comment, I don't think it is right to say that we ignore > the error raised due to the aborted transaction. We need to say that > we discard the already streamed changes on such an error. Done. > 4. > +static inline void > +SetupCheckXidLive(TransactionId xid) > +{ > /* > - * If this transaction has no snapshot, it didn't make any changes to the > - * database, so there's nothing to decode. Note that > - * ReorderBufferCommitChild will have transferred any snapshots from > - * subtransactions if there were any. > + * setup CheckXidAlive if it's not committed yet. We don't check if the xid > + * aborted. That will happen during catalog access. Also reset the > + * sysbegin_called flag. > */ > - if (txn->base_snapshot == NULL) > + if (!TransactionIdDidCommit(xid)) > { > - Assert(txn->ninvalidations == 0); > - ReorderBufferCleanupTXN(rb, txn); > - return; > + CheckXidAlive = xid; > + bsysscan = false; > } > > I think this function is inline as it needs to be called for each > change. If that is the case and otherwise also, isn't it better that > we check if passed xid is the same as CheckXidAlive before checking > TransactionIdDidCommit as TransactionIdDidCommit can be costly and > calling it for each change might not be a good idea? Done, Also I think it is good the check the TransactionIdIsInProgress instead of !TransactionIdDidCommit. I have changed that as well. > 5. > setup CheckXidAlive if it's not committed yet. We don't check if the xid > + * aborted. That will happen during catalog access. Also reset the > + * sysbegin_called flag. > > /if the xid aborted/if the xid is aborted. missing comma after Also. Done > 6. > ReorderBufferProcessTXN() > { > .. > - /* build data to be able to lookup the CommandIds of catalog tuples */ > + /* > + * build data to be able to lookup the CommandIds of catalog tuples > + */ > ReorderBufferBuildTupleCidHash(rb, txn); > .. > } > > Is there a need to change the formatting of the comment? No need changed back. > > 7. > ReorderBufferProcessTXN() > { > .. > if (using_subtxn) > - BeginInternalSubTransaction("replay"); > + BeginInternalSubTransaction("stream"); > else > StartTransactionCommand(); > .. > } > > I am not sure changing unconditionally "replay" to "stream" is a good > idea. How about something like BeginInternalSubTransaction(streaming > ? "stream" : "replay");? Done > 8. > @@ -1588,8 +1766,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, > * use as a normal record. It'll be cleaned up at the end > * of INSERT processing. > */ > - if (specinsert == NULL) > - elog(ERROR, "invalid ordering of speculative insertion changes"); > > You have removed this check but all other handling of specinsert is > same as far as this patch is concerned. Why so? Seems like a merge issue, or the leftover from the old design of the toast handling where we were streaming with the partial tuple. fixed now. > 9. > @@ -1676,8 +1860,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, > * freed/reused while restoring spooled data from > * disk. > */ > - Assert(change->data.tp.newtuple != NULL); > - > dlist_delete(&change->node); > > Why is this Assert removed? Same cause as above so fixed. > 10. > @@ -1753,7 +1935,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, > relations[nrelations++] = relation; > } > > - rb->apply_truncate(rb, txn, nrelations, relations, change); > + if (streaming) > + { > + rb->stream_truncate(rb, txn, nrelations, relations, change); > + > + /* Remember that we have sent some data. */ > + change->txn->any_data_sent = true; > + } > + else > + rb->apply_truncate(rb, txn, nrelations, relations, change); > > Can we encapsulate this in a separate function like > ReorderBufferApplyTruncate or something like that? Basically, rather > than having streaming check in this function, lets do it in some other > internal function. And we can likewise do it for all the streaming > checks in this function or at least whereever it is feasible. That > will make this function look clean. Done for truncate and change. I think we can create a few more such functions for start/stop and cleanup handling on error. I will work on that. > 11. > + * We currently can only decode a transaction's contents when its commit > + * record is read because that's the only place where we know about cache > + * invalidations. Thus, once a toplevel commit is read, we iterate over the top > + * and subtransactions (using a k-way merge) and replay the changes in lsn > + * order. > + */ > +void > +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, > { > .. > > I think the above comment needs to be updated after this patch. This > API can now be used during the decode of both a in-progress and a > committed transaction. Done -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: