Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Date
Msg-id CAA4eK1LT2UArE9FyPqZcmV2kALhEmcXnZGdreYWCdCJ9vRUd8Q@mail.gmail.com
Whole thread Raw
In response to Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Dilip Kumar <dilipbalaut@gmail.com>)
Responses Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
List pgsql-hackers
On Wed, Feb 5, 2020 at 9:46 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> Fixed in the latest version sent upthread.
>

Okay, thanks.  I haven't looked at the latest version of patch series
as I was reviewing the previous version and I think all of these
comments are in the patch which is not modified.  Here are my
comments:

I think we don't need to maintain
v8-0007-Support-logical_decoding_work_mem-set-from-create as per
discussion in one of the above emails [1] as its usage is not clear.

v8-0008-Add-support-for-streaming-to-built-in-replication
1.
-      information.  The allowed options are <literal>slot_name</literal> and
-      <literal>synchronous_commit</literal>
+      information.  The allowed options are <literal>slot_name</literal>,
+      <literal>synchronous_commit</literal>, <literal>work_mem</literal>
+      and <literal>streaming</literal>.

As per the discussion above [1], I don't think we need work_mem here.
You might want to remove the other usage from the patch as well.

2.
@@ -59,7 +59,8 @@ parse_subscription_options(List *options, bool
*connect, bool *enabled_given,
     bool *slot_name_given, char **slot_name,
     bool *copy_data, char **synchronous_commit,
     bool *refresh, int *logical_wm,
-    bool *logical_wm_given)
+    bool *logical_wm_given, bool *streaming,
+    bool *streaming_given)

It is not clear to me why we need two parameters 'streaming' and
'streaming_given' in this API.  Can't we handle similar to parameter
'refresh'?

3.
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index aec885e..e80d00c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -14,6 +14,8 @@
  *
  *-------------------------------------------------------------------------
  */
+#include <sys/types.h>
+#include <unistd.h>

 #include "postgres.h"

I see only the above change in launcher.c.  Why we need to include
these if there is no other change (at least not in this patch).

4.
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
  /* Push callback + info on the error context stack */
  state.ctx = ctx;
  state.callback_name = "stream_start";
- /* state.report_location = apply_lsn; */
+ state.report_location = InvalidXLogRecPtr;
  errcallback.callback = output_plugin_error_callback;
  errcallback.arg = (void *) &state;
  errcallback.previous = error_context_stack;
@@ -1194,7 +1194,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn)
  /* Push callback + info on the error context stack */
  state.ctx = ctx;
  state.callback_name = "stream_stop";
- /* state.report_location = apply_lsn; */
+ state.report_location = InvalidXLogRecPtr;
  errcallback.callback = output_plugin_error_callback;
  errcallback.arg = (void *) &state;
  errcallback.previous = error_context_stack;

Don't we want to set txn->final_lsn in report location as we do at few
other places?

5.
-logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
+logicalrep_write_delete(StringInfo out, TransactionId xid,
+ Relation rel, HeapTuple oldtuple)
 {
+ pq_sendbyte(out, 'D'); /* action DELETE */
+
  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);

- pq_sendbyte(out, 'D'); /* action DELETE */

Why this patch need to change the above code?

6.
+void
+logicalrep_write_stream_start(StringInfo out,
+   TransactionId xid, bool first_segment)
+{
+ pq_sendbyte(out, 'S'); /* action STREAM START */
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* transaction ID (we're starting to stream, so must be valid) */
+ pq_sendint32(out, xid);
+
+ /* 1 if this is the first streaming segment for this xid */
+ pq_sendint32(out, first_segment ? 1 : 0);
+}
+
+TransactionId
+logicalrep_read_stream_start(StringInfo in, bool *first_segment)
+{
+ TransactionId xid;
+
+ Assert(first_segment);
+
+ xid = pq_getmsgint(in, 4);
+ *first_segment = (pq_getmsgint(in, 4) == 1);
+
+ return xid;
+}

In these functions for sending bool, pq_sendint32 is used.  Can't we
use pq_sendbyte similar to what we do in boolsend?

7.
+void
+logicalrep_write_stream_stop(StringInfo out, TransactionId xid)
+{
+ pq_sendbyte(out, 'E'); /* action STREAM END */
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* transaction ID (we're starting to stream, so must be valid) */
+ pq_sendint32(out, xid);
+}

In comments, 'starting to stream' is mentioned whereas this function
is to stop it.

8.
+void
+logicalrep_write_stream_stop(StringInfo out, TransactionId xid)
+{
+ pq_sendbyte(out, 'E'); /* action STREAM END */
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* transaction ID (we're starting to stream, so must be valid) */
+ pq_sendint32(out, xid);
+}
+
+TransactionId
+logicalrep_read_stream_stop(StringInfo in)
+{
+ TransactionId xid;
+
+ xid = pq_getmsgint(in, 4);
+
+ return xid;
+}

Is there a reason to send xid on stopping stream?  I don't see any use
of function logicalrep_read_stream_stop.

9.
+ * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end.
+ */
+static void
+subxact_info_write(Oid subid, TransactionId xid)
{
..
+ pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_WRITE);
..
+ pgstat_report_wait_end();
..
}

I see the calls to pgstat_report_wait_start/pgstat_report_wait_end in
this function, so not sure if the above comment makes sense.

10.
+ * The files are placed in /tmp by default, and the filenames include both
+ * the XID of the toplevel transaction and OID of the subscription.

Are we keeping files in /tmp or pg's temp tablespace dir.  Seeing
below code, it doesn't seem that we place them in /tmp.  If I am
correct, then can you update the comment.
+static void
+subxact_filename(char *path, Oid subid, TransactionId xid)
+{
+ char tempdirpath[MAXPGPATH];
+
+ TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);

11.
+ * The change is serialied in a simple format, with length (not including
+ * the length), action code (identifying the message type) and message
+ * contents (without the subxact TransactionId value).
+ *
..
+ */
+static void
+stream_write_change(char action, StringInfo s)

The part of the comment which says "with length (not including the
length) .." is not clear to me.  What does "not including the length"
mean?

12.
+ * TODO: Add missing_ok flag to specify in which cases it's OK not to
+ * find the files, and when it's an error.
+ */
+static void
+stream_cleanup_files(Oid subid, TransactionId xid)

I think we can implement this TODO.  It is clear when this function is
called from apply_handle_stream_commit, the file must exist.  We can
similarly analyze other callers of this API.

13.
+apply_handle_stream_abort(StringInfo s)
{
..
+ /* FIXME optimize the search by bsearch on sorted data */
+ for (i = nsubxacts; i > 0; i--)
..

I am not sure how important this optimization is, so instead of FIXME,
it is better to keep it as a XXX comment.  In the future, if we hit
any performance issue due to this, we can revisit our decision.

[1] - https://www.postgresql.org/message-id/CAA4eK1LH7xzF%2B-qHRv9EDXQTFYjPUYZw5B7FSK9QLEg7F603OQ%40mail.gmail.com


-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: Complete data erasure
Next
From: Masahiko Sawada
Date:
Subject: Identifying user-created objects