Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers

From Dilip Kumar
Subject Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Date
Msg-id CAFiTN-t1YNmoBf7k1kUrUue4q1Tf3GXjGwZTseNLivKmr9sz1Q@mail.gmail.com
Whole thread Raw
In response to Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
List pgsql-hackers
On Wed, Feb 5, 2020 at 4:05 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Feb 5, 2020 at 9:46 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> I think we don't need to maintain
> v8-0007-Support-logical_decoding_work_mem-set-from-create as per
> discussion in one of the above emails [1] as its usage is not clear.

Done

> v8-0008-Add-support-for-streaming-to-built-in-replication
> 1.
> -      information.  The allowed options are <literal>slot_name</literal> and
> -      <literal>synchronous_commit</literal>
> +      information.  The allowed options are <literal>slot_name</literal>,
> +      <literal>synchronous_commit</literal>, <literal>work_mem</literal>
> +      and <literal>streaming</literal>.
>
> As per the discussion above [1], I don't think we need work_mem here.
> You might want to remove the other usage from the patch as well.

Done

> 2.
> @@ -59,7 +59,8 @@ parse_subscription_options(List *options, bool
> *connect, bool *enabled_given,
>      bool *slot_name_given, char **slot_name,
>      bool *copy_data, char **synchronous_commit,
>      bool *refresh, int *logical_wm,
> -    bool *logical_wm_given)
> +    bool *logical_wm_given, bool *streaming,
> +    bool *streaming_given)
>
> It is not clear to me why we need two parameters 'streaming' and
> 'streaming_given' in this API.  Can't we handle similar to parameter
> 'refresh'?

The streaming option we need to update in the system table, so if we
don't remember whether the user has given its value or not then how we
will know whether to update this column or not?  Or you are suggesting
that we should always mark this as updated but IMHO that is not a good
idea.

> 3.
> diff --git a/src/backend/replication/logical/launcher.c
> b/src/backend/replication/logical/launcher.c
> index aec885e..e80d00c 100644
> --- a/src/backend/replication/logical/launcher.c
> +++ b/src/backend/replication/logical/launcher.c
> @@ -14,6 +14,8 @@
>   *
>   *-------------------------------------------------------------------------
>   */
> +#include <sys/types.h>
> +#include <unistd.h>
>
>  #include "postgres.h"
>
> I see only the above change in launcher.c.  Why we need to include
> these if there is no other change (at least not in this patch).

Removed

> 4.
> stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
>   /* Push callback + info on the error context stack */
>   state.ctx = ctx;
>   state.callback_name = "stream_start";
> - /* state.report_location = apply_lsn; */
> + state.report_location = InvalidXLogRecPtr;
>   errcallback.callback = output_plugin_error_callback;
>   errcallback.arg = (void *) &state;
>   errcallback.previous = error_context_stack;
> @@ -1194,7 +1194,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn)
>   /* Push callback + info on the error context stack */
>   state.ctx = ctx;
>   state.callback_name = "stream_stop";
> - /* state.report_location = apply_lsn; */
> + state.report_location = InvalidXLogRecPtr;
>   errcallback.callback = output_plugin_error_callback;
>   errcallback.arg = (void *) &state;
>   errcallback.previous = error_context_stack;
>
> Don't we want to set txn->final_lsn in report location as we do at few
> other places?

Fixed

> 5.
> -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
> +logicalrep_write_delete(StringInfo out, TransactionId xid,
> + Relation rel, HeapTuple oldtuple)
>  {
> + pq_sendbyte(out, 'D'); /* action DELETE */
> +
>   Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
>      rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
>      rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
>
> - pq_sendbyte(out, 'D'); /* action DELETE */
>
> Why this patch need to change the above code?

Fixed

> 6.
> +void
> +logicalrep_write_stream_start(StringInfo out,
> +   TransactionId xid, bool first_segment)
> +{
> + pq_sendbyte(out, 'S'); /* action STREAM START */
> +
> + Assert(TransactionIdIsValid(xid));
> +
> + /* transaction ID (we're starting to stream, so must be valid) */
> + pq_sendint32(out, xid);
> +
> + /* 1 if this is the first streaming segment for this xid */
> + pq_sendint32(out, first_segment ? 1 : 0);
> +}
> +
> +TransactionId
> +logicalrep_read_stream_start(StringInfo in, bool *first_segment)
> +{
> + TransactionId xid;
> +
> + Assert(first_segment);
> +
> + xid = pq_getmsgint(in, 4);
> + *first_segment = (pq_getmsgint(in, 4) == 1);
> +
> + return xid;
> +}
>
> In these functions for sending bool, pq_sendint32 is used.  Can't we
> use pq_sendbyte similar to what we do in boolsend?

Done

> 7.
> +void
> +logicalrep_write_stream_stop(StringInfo out, TransactionId xid)
> +{
> + pq_sendbyte(out, 'E'); /* action STREAM END */
> +
> + Assert(TransactionIdIsValid(xid));
> +
> + /* transaction ID (we're starting to stream, so must be valid) */
> + pq_sendint32(out, xid);
> +}
>
> In comments, 'starting to stream' is mentioned whereas this function
> is to stop it.

Fixed

> 8.
> +void
> +logicalrep_write_stream_stop(StringInfo out, TransactionId xid)
> +{
> + pq_sendbyte(out, 'E'); /* action STREAM END */
> +
> + Assert(TransactionIdIsValid(xid));
> +
> + /* transaction ID (we're starting to stream, so must be valid) */
> + pq_sendint32(out, xid);
> +}
> +
> +TransactionId
> +logicalrep_read_stream_stop(StringInfo in)
> +{
> + TransactionId xid;
> +
> + xid = pq_getmsgint(in, 4);
> +
> + return xid;
> +}
>
> Is there a reason to send xid on stopping stream?  I don't see any use
> of function logicalrep_read_stream_stop.

Removed

> 9.
> + * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end.
> + */
> +static void
> +subxact_info_write(Oid subid, TransactionId xid)
> {
> ..
> + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_WRITE);
> ..
> + pgstat_report_wait_end();
> ..
> }
>
> I see the calls to pgstat_report_wait_start/pgstat_report_wait_end in
> this function, so not sure if the above comment makes sense.

Fixed

> 10.
> + * The files are placed in /tmp by default, and the filenames include both
> + * the XID of the toplevel transaction and OID of the subscription.
>
> Are we keeping files in /tmp or pg's temp tablespace dir.  Seeing
> below code, it doesn't seem that we place them in /tmp.  If I am
> correct, then can you update the comment.
> +static void
> +subxact_filename(char *path, Oid subid, TransactionId xid)
> +{
> + char tempdirpath[MAXPGPATH];
> +
> + TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);

Done

> 11.
> + * The change is serialied in a simple format, with length (not including
> + * the length), action code (identifying the message type) and message
> + * contents (without the subxact TransactionId value).
> + *
> ..
> + */
> +static void
> +stream_write_change(char action, StringInfo s)
>
> The part of the comment which says "with length (not including the
> length) .." is not clear to me.  What does "not including the length"
> mean?

Basically, it says that the 4 bytes which are used for storing then
the length of total data doesn't include the 4 bytes.

> 12.
> + * TODO: Add missing_ok flag to specify in which cases it's OK not to
> + * find the files, and when it's an error.
> + */
> +static void
> +stream_cleanup_files(Oid subid, TransactionId xid)
>
> I think we can implement this TODO.  It is clear when this function is
> called from apply_handle_stream_commit, the file must exist.  We can
> similarly analyze other callers of this API.

Done

> 13.
> +apply_handle_stream_abort(StringInfo s)
> {
> ..
> + /* FIXME optimize the search by bsearch on sorted data */
> + for (i = nsubxacts; i > 0; i--)
> ..
>
> I am not sure how important this optimization is, so instead of FIXME,
> it is better to keep it as a XXX comment.  In the future, if we hit
> any performance issue due to this, we can revisit our decision.

Done

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment

pgsql-hackers by date:

Previous
From: Andy Fan
Date:
Subject: Re: [PATCH] Erase the distinctClause if the result is unique by definition
Next
From: Craig Ringer
Date:
Subject: Re: POC: GUC option for skipping shared buffers in core dumps