Re: Parallel copy - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: Parallel copy
Date
Msg-id CAA4eK1JEieFMtvyshMQivO1MEG3J7v0mc4GmsfpoiCEmnpvBiw@mail.gmail.com
Whole thread Raw
In response to Re: Parallel copy  (vignesh C <vignesh21@gmail.com>)
Responses Re: Parallel copy  (vignesh C <vignesh21@gmail.com>)
List pgsql-hackers
On Fri, Jul 17, 2020 at 2:09 PM vignesh C <vignesh21@gmail.com> wrote:
>
> >
> > Please find the updated patch with the fixes included.
> >
>
> Patch 0003-Allow-copy-from-command-to-process-data-from-file-ST.patch
> had few indentation issues, I have fixed and attached the patch for
> the same.
>

Ensure to use the version with each patch-series as that makes it
easier for the reviewer to verify the changes done in the latest
version of the patch.  One way is to use commands like "git
format-patch -6 -v <version_of_patch_series>" or you can add the
version number manually.

Review comments:
===================

0001-Copy-code-readjustment-to-support-parallel-copy
1.
@@ -807,8 +835,11 @@ CopyLoadRawBuf(CopyState cstate)
  else
  nbytes = 0; /* no data need be saved */

+ if (cstate->copy_dest == COPY_NEW_FE)
+ minread = RAW_BUF_SIZE - nbytes;
+
  inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
-   1, RAW_BUF_SIZE - nbytes);
+   minread, RAW_BUF_SIZE - nbytes);

No comment to explain why this change is done?

0002-Framework-for-leader-worker-in-parallel-copy
2.
+ * ParallelCopyLineBoundary is common data structure between leader & worker,
+ * Leader process will be populating data block, data block offset &
the size of
+ * the record in DSM for the workers to copy the data into the relation.
+ * This is protected by the following sequence in the leader & worker. If they
+ * don't follow this order the worker might process wrong line_size and leader
+ * might populate the information which worker has not yet processed or in the
+ * process of processing.
+ * Leader should operate in the following order:
+ * 1) check if line_size is -1, if not wait, it means worker is still
+ * processing.
+ * 2) set line_state to LINE_LEADER_POPULATING.
+ * 3) update first_block, start_offset & cur_lineno in any order.
+ * 4) update line_size.
+ * 5) update line_state to LINE_LEADER_POPULATED.
+ * Worker should operate in the following order:
+ * 1) check line_state is LINE_LEADER_POPULATED, if not it means
leader is still
+ * populating the data.
+ * 2) read line_size.
+ * 3) only one worker should choose one line for processing, this is handled by
+ *    using pg_atomic_compare_exchange_u32, worker will change the sate to
+ *    LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED.
+ * 4) read first_block, start_offset & cur_lineno in any order.
+ * 5) process line_size data.
+ * 6) update line_size to -1.
+ */
+typedef struct ParallelCopyLineBoundary

Are we doing all this state management to avoid using locks while
processing lines?  If so, I think we can use either spinlock or LWLock
to keep the main patch simple and then provide a later patch to make
it lock-less.  This will allow us to first focus on the main design of
the patch rather than trying to make this datastructure processing
lock-less in the best possible way.

3.
+ /*
+ * Actual lines inserted by worker (some records will be filtered based on
+ * where condition).
+ */
+ pg_atomic_uint64 processed;
+ pg_atomic_uint64 total_worker_processed; /* total processed records
by the workers */

The difference between processed and total_worker_processed is not
clear.  Can we expand the comments a bit?

4.
+ * SerializeList - Insert a list into shared memory.
+ */
+static void
+SerializeList(ParallelContext *pcxt, int key, List *inputlist,
+   Size est_list_size)
+{
+ if (inputlist != NIL)
+ {
+ ParallelCopyKeyListInfo *sharedlistinfo = (ParallelCopyKeyListInfo
*)shm_toc_allocate(pcxt->toc,
+ est_list_size);
+ CopyListSharedMemory(inputlist, est_list_size, sharedlistinfo);
+ shm_toc_insert(pcxt->toc, key, sharedlistinfo);
+ }
+}

Why do we need to write a special mechanism (CopyListSharedMemory) to
serialize a list.  Why can't we use nodeToString?  It should be able
to take care of List datatype, see outNode which is called from
nodeToString.  Once you do that, I think you won't need even
EstimateLineKeysList, strlen should work instead.

Check, if you have any similar special handling for other types that
can be dealt with nodeToString?

5.
+ MemSet(shared_info_ptr, 0, est_shared_info);
+ shared_info_ptr->is_read_in_progress = true;
+ shared_info_ptr->cur_block_pos = -1;
+ shared_info_ptr->full_transaction_id = full_transaction_id;
+ shared_info_ptr->mycid = GetCurrentCommandId(true);
+ for (count = 0; count < RINGSIZE; count++)
+ {
+ ParallelCopyLineBoundary *lineInfo =
&shared_info_ptr->line_boundaries.ring[count];
+ pg_atomic_init_u32(&(lineInfo->line_size), -1);
+ }
+

You can move this initialization in a separate function.

6.
In function BeginParallelCopy(), you need to keep a provision to
collect wal_usage and buf_usage stats.  See _bt_begin_parallel for
reference.  Those will be required for pg_stat_statements.

7.
DeserializeString() -- it is better to name this function as RestoreString.
ParallelWorkerInitialization() -- it is better to name this function
as InitializeParallelCopyInfo or something like that, the current name
is quite confusing.
ParallelCopyLeader() -- how about ParallelCopyFrom? ParallelCopyLeader
doesn't sound good to me.  You can suggest something else if you don't
like ParallelCopyFrom

8.
 /*
- * PopulateGlobalsForCopyFrom - Populates the common variables
required for copy
- * from operation. This is a helper function for BeginCopy function.
+ * PopulateCatalogInformation - Populates the common variables
required for copy
+ * from operation. This is a helper function for BeginCopy &
+ * ParallelWorkerInitialization function.
  */
 static void
 PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc tupDesc,
- List *attnamelist)
+    List *attnamelist)

The actual function name and the name in function header don't match.
I also don't like this function name, how about
PopulateCommonCstateInfo?  Similarly how about changing
PopulateCatalogInformation to PopulateCstateCatalogInfo?

9.
+static const struct
+{
+ char *fn_name;
+ copy_data_source_cb fn_addr;
+} InternalParallelCopyFuncPtrs[] =
+
+{
+ {
+ "copy_read_data", copy_read_data
+ },
+};

The function copy_read_data is present in
src/backend/replication/logical/tablesync.c and seems to be used
during logical replication.  Why do we want to expose this function as
part of this patch?

0003-Allow-copy-from-command-to-process-data-from-file-ST
10.
In the commit message, you have written "The leader does not
participate in the insertion of data, leaders only responsibility will
be to identify the lines as fast as possible for the workers to do the
actual copy operation. The leader waits till all the lines populated
are processed by the workers and exits."

I think you should also mention that we have chosen this design based
on the reason "that everything stalls if the leader doesn't accept
further input data, as well as when there are no available splitted
chunks so it doesn't seem like a good idea to have the leader do other
work.  This is backed by the performance data where we have seen that
with 1 worker there is just a 5-10% (or whatever percentage difference
you have seen) performance difference)".

-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: "k.jamison@fujitsu.com"
Date:
Subject: RE: Parallel Seq Scan vs kernel read ahead
Next
From: Amit Kapila
Date:
Subject: Re: Parallel Seq Scan vs kernel read ahead