Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers

From Dilip Kumar
Subject Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Date
Msg-id CAFiTN-twgco+2XcmVz=T-ZNtgOwc5vKEtBo_KLGHZnh3CqwJzA@mail.gmail.com
Whole thread Raw
In response to Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
List pgsql-hackers
On Wed, Dec 11, 2019 at 5:22 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Mon, Dec 9, 2019 at 1:27 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > I have review the patch set and here are few comments/questions
> >
> > 1.
> > +static void
> > +pg_decode_stream_change(LogicalDecodingContext *ctx,
> > + ReorderBufferTXN *txn,
> > + Relation relation,
> > + ReorderBufferChange *change)
> > +{
> > + OutputPluginPrepareWrite(ctx, true);
> > + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
> > + OutputPluginWrite(ctx, true);
> > +}
> >
> > Should we show the tuple in the streamed change like we do for the
> > pg_decode_change?
> >
>
> I think so.  The patch shows the message in
> pg_decode_stream_message(), so why to prohibit showing tuple here?
>
> > 2. pg_logical_slot_get_changes_guts
> > It recreate the decoding slot [ctx =
> > CreateDecodingContext(InvalidXLogRecPtr] but doesn't set the streaming
> > to false, should we pass a parameter to
> > pg_logical_slot_get_changes_guts saying whether we want streamed results or not
> >
>
> CreateDecodingContext internally calls StartupDecodingContext which
> sets the value of streaming based on if the plugin has provided
> callbacks for streaming functions. Isn't that sufficient?  Why do we
> need additional parameters here?

I don't think that if plugin provides streaming function then we
should stream.  Like pgoutput plugin provides streaming function but
we only stream if streaming is on in create subscription command.  So
I feel that should be true with any plugin.

>
> > 3.
> > + XLogRecPtr prev_lsn = InvalidXLogRecPtr;
> >   ReorderBufferChange *change;
> >   ReorderBufferChange *specinsert = NULL;
> >
> > @@ -1565,6 +1965,16 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> >   Relation relation = NULL;
> >   Oid reloid;
> >
> > + /*
> > + * Enforce correct ordering of changes, merged from multiple
> > + * subtransactions. The changes may have the same LSN due to
> > + * MULTI_INSERT xlog records.
> > + */
> > + if (prev_lsn != InvalidXLogRecPtr)
> > + Assert(prev_lsn <= change->lsn);
> > +
> > + prev_lsn = change->lsn;
> > I did not understand, how this change is relavent to this patch
> >
>
> This is just to ensure that changes are in LSN order.  I think as we
> are merging the changes before commit for streaming, it is good to
> have such an Assertion for ReorderBufferStreamTXN.   And, if we want
> to have it in ReorderBufferStreamTXN, then there is no harm in keeping
> it in ReorderBufferCommit() at least to keep the code consistent.  Do
> you see any problem with this?
I am fine with this.
>
> > 4.
> > + /*
> > + * TOCHECK: We have to rebuild historic snapshot to be sure it includes all
> > + * information about subtransactions, which could arrive after streaming start.
> > + */
> > + if (!txn->is_schema_sent)
> > + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
> > + txn, command_id);
> >
> > In which case, txn->is_schema_sent will be true, because at the end of
> > the stream in ReorderBufferExecuteInvalidations we are always setting
> > it false,
> > so while sending next stream it will always be false.  That means we
> > never required snapshot_now variable in ReorderBufferTXN.
> >
>
> You are probably right, but as discussed we need to change this part
> of design/code (when to send schema changes) due to the issues
> discovered.  So, I think this part will anyway change when we fix that
> problem.
Make sense.
>
> > 5.
> > @@ -2299,6 +2746,23 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer
> > *rb, TransactionId xid,
> >   txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
> >
> >   txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
> > +
> > + /*
> > + * We read catalog changes from WAL, which are not yet sent, so
> > + * invalidate current schema in order output plugin can resend
> > + * schema again.
> > + */
> > + txn->is_schema_sent = false;
> >
> > Same as point 4, during decode time it will never be true.
> >
>
> Sure, my previous point's reply applies here as well.
ok
>
> > 6.
> > + /* send fields */
> > + pq_sendint64(out, commit_lsn);
> > + pq_sendint64(out, txn->end_lsn);
> > + pq_sendint64(out, txn->commit_time);
> >
> > Commit_time and end_lsn is used in standby_feedback
> >
>
> I don't understand what you mean by this.  Can you be a bit more clear?
I think I paste it here by mistake.  just ignore it.
>
> >
> > 7.
> > + /* FIXME optimize the search by bsearch on sorted data */
> > + for (i = nsubxacts; i > 0; i--)
> > + {
> > + if (subxacts[i - 1].xid == subxid)
> > + {
> > + subidx = (i - 1);
> > + found = true;
> > + break;
> > + }
> > + }
> > We can not rollback intermediate subtransaction without rollbacking
> > latest sub-transaction, so why do we need
> > to search in the array?  It will always be the the last subxact no?
> >
>
> The same thing is already mentioned in the comments above this code
> ("XXX Or perhaps we can rely on the aborts to arrive in the reverse
> order, i.e. from the inner-most subxact (when nested)? In which case
> we could simply check the last element.").  I think what you are
> saying is probably right, but we can leave this as it is for now
> because this is a minor optimization which can be done later as well
> if required.  However, if you see any correctness issue, then we can
> discuss.
I think more than optimization here we have the question of whether
this loop is required at all or not.  Because, by optimizing we are
not adding the complexity, infact it will be simple.  I think here we
need more analysis that whether we need to traverse the array or not.
So maybe for time being we can leave this as it is.
>
> > 8.
> > + /*
> > + * send feedback to upstream
> > + *
> > + * XXX Probably should send a valid LSN. But which one?
> > + */
> > + send_feedback(InvalidXLogRecPtr, false, false);
> >
> > Why feedback is sent for every change?
> >
>
> I will study this part of the patch and let you know my opinion.
Sure.
>
> Few comments on this patch series:
>
> 0001-Immediately-WAL-log-assignments:
> ------------------------------------------------------------
>
> The commit message still refers to the old design for this patch.  I
> think you need to modify the commit message as per the latest patch.
>
> 0002-Issue-individual-invalidations-with-wal_level-log
> ----------------------------------------------------------------------------
> 1.
> xact_desc_invalidations(StringInfo buf,
> {
> ..
> + else if (msg->id == SHAREDINVALSNAPSHOT_ID)
> + appendStringInfo(buf, " snapshot %u", msg->sn.relId);
>
> You have removed logging for the above cache but forgot to remove its
> reference from one of the places.  Also, I think you need to add a
> comment somewhere in inval.c to say why you are writing for WAL for
> some types of invalidations and not for others?
>
> 0003-Extend-the-output-plugin-API-with-stream-methods
> --------------------------------------------------------------------------------
> 1.
> +     are required, while <function>stream_message_cb</function> and
> +     <function>stream_message_cb</function> are optional.
>
> stream_message_cb is mentioned twice.  It seems the second one is for truncate.
>
> 2.
> size of the transaction size and network bandwidth, the transfer time
> +    may significantly increase the apply lag.
>
> /size of the transaction size/size of the transaction
>
> no need to mention size twice.
>
> 3.
> +    Similarly to spill-to-disk behavior, streaming is triggered when the total
> +    amount of changes decoded from the WAL (for all in-progress
> transactions)
> +    exceeds limit defined by <varname>logical_work_mem</varname> setting.
>
> The guc name used is wrong.  /Similarly to/Similar to/
>
> 4.
> stream_start_cb_wrapper()
> {
> ..
> + /* state.report_location = apply_lsn; */
> ..
> + /* FIXME ctx->write_location = apply_lsn; */
> ..
> }
>
> See, if we can fix these and similar in the callback for the stop.  I
> think we don't have final_lsn till we commit/abort.  Can we compute
> before calling these API's?
>
>
> 0005-Gracefully-handle-concurrent-aborts-of-uncommitte
> ----------------------------------------------------------------------------------
> 1.
> @@ -1877,6 +1877,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
>   PG_CATCH();
>   {
>   /* TODO: Encapsulate cleanup
> from the PG_TRY and PG_CATCH blocks */
> +
>   if (iterstate)
>   ReorderBufferIterTXNFinish(rb, iterstate);
>
> Spurious line change.
>
> 2. The commit message of this patch refers to Prepared transactions.
> I think that needs to be changed.
>
> 0006-Implement-streaming-mode-in-ReorderBuffer
> -------------------------------------------------------------------------
> 1.
> +
> +/* iterator for streaming (only get data from memory) */
> +static ReorderBufferStreamIterTXNState * ReorderBufferStreamIterTXNInit(
> +
> ReorderBuffer *rb,
> +
> ReorderBufferTXN
> *txn);
> +
> +static ReorderBufferChange *ReorderBufferStreamIterTXNNext(
> +    ReorderBuffer *rb,
> +
>    ReorderBufferStreamIterTXNState * state);
> +
> +static void ReorderBufferStreamIterTXNFinish(
> +
> ReorderBuffer *rb,
> +
> ReorderBufferStreamIterTXNState * state);
>
> Do we really need to introduce new APIs for iterating over changes
> from streamed transactions?  Why can't we reuse the same API's as we
> use for committed xacts?
>
> 2.
> +static void
> +ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
>
> Please write some comments atop ReorderBufferStreamCommit.
>
> 3.
> +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
> {
> ..
> ..
> + if (txn->snapshot_now
> == NULL)
> + {
> + dlist_iter subxact_i;
> +
> + /* make sure this transaction is streamed for the first time */
> +
> Assert(!rbtxn_is_streamed(txn));
> +
> + /* at the beginning we should have invalid command ID */
> + Assert(txn->command_id ==
> InvalidCommandId);
> +
> + dlist_foreach(subxact_i, &txn->subtxns)
> + {
> + ReorderBufferTXN *subtxn;
> +
> +
> subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
> +
> + if (subtxn->base_snapshot != NULL &&
> +
> (txn->base_snapshot == NULL ||
> + txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
> + {
> +
> txn->base_snapshot = subtxn->base_snapshot;
>
> The logic here seems to be correct, but I am not sure why it is not
> considered to purge the base snapshot before assigning the subtxn's
> snapshot and similarly, we have not purged snapshot for subtxn once we
> are done with it.  I think we can use
> ReorderBufferTransferSnapToParent to replace part of the logic here.
> Do you see any reason for doing things differently here?
>
> 4. In ReorderBufferStreamTXN, why do you need to use
> ReorderBufferCopySnap to assign txn->base_snapshot to snapshot_now.
>
> 5. I see a lot of code similarity in ReorderBufferStreamTXN and
> existing ReorderBufferCommit. I understand that there are some subtle
> differences due to which we need to write this new function but can't
> we encapsulate the specific parts of code in functions and then call
> from both places.  I am talking about code in different cases for
> change->action.
>
> 6. + * Note: We never stream and serialize a transaction at the same time (e
> /(e/(we
>
I will look into these comments and reply separately.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: David Fetter
Date:
Subject: Let people set host(no)ssl settings from initdb
Next
From: Amit Kapila
Date:
Subject: Re: logical decoding : exceeded maxAllocatedDescs for .spill files