Re: Parallel copy - Mailing list pgsql-hackers

From vignesh C
Subject Re: Parallel copy
Date
Msg-id CALDaNm2yhZQX57PTqOoWN2bgkXzK=OgAyJAB=bN6HZMJVXUMjg@mail.gmail.com
Whole thread Raw
In response to Re: Parallel copy  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: Parallel copy
List pgsql-hackers
Thanks for the comments Amit.
On Wed, Jul 15, 2020 at 10:34 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> Few comments:
> ====================
> 0001-Copy-code-readjustment-to-support-parallel-copy
>
> I am not sure converting the code to macros is a good idea, it makes
> this code harder to read.  Also, there are a few changes which I am
> not sure are necessary.
> 1.
> +/*
> + * CLEAR_EOL_FROM_COPIED_DATA - Clear EOL from the copied data.
> + */
> +#define CLEAR_EOL_FROM_COPIED_DATA(copy_line_data, copy_line_pos,
> copy_line_size) \
> +{ \
> + /* \
> + * If we didn't hit EOF, then we must have transferred the EOL marker \
> + * to line_buf along with the data.  Get rid of it. \
> + */ \
> +   switch (cstate->eol_type) \
> +   { \
> +    case EOL_NL: \
> +    Assert(copy_line_size >= 1); \
> +    Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
> +    copy_line_data[copy_line_pos - 1] = '\0'; \
> +    copy_line_size--; \
> +    break; \
> +    case EOL_CR: \
> +    Assert(copy_line_size >= 1); \
> +    Assert(copy_line_data[copy_line_pos - 1] == '\r'); \
> +    copy_line_data[copy_line_pos - 1] = '\0'; \
> +    copy_line_size--; \
> +    break; \
> +    case EOL_CRNL: \
> +    Assert(copy_line_size >= 2); \
> +    Assert(copy_line_data[copy_line_pos - 2] == '\r'); \
> +    Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
> +    copy_line_data[copy_line_pos - 2] = '\0'; \
> +    copy_line_size -= 2; \
> +    break; \
> +    case EOL_UNKNOWN: \
> +    /* shouldn't get here */ \
> +    Assert(false); \
> +    break; \
> +   } \
> +}
>
> In the original code, we are using only len and buffer, here we are
> using position, length/size and buffer.  Is it really required or can
> we do with just len and buffer?
>

Position is required so that we can have common code for parallel &
non-parallel copy, in case of parallel copy position & length will
differ as they can spread across multiple data blocks. Retained the
variables as is.
Changed the macro to function.

> 2.
> +/*
> + * INCREMENTPROCESSED - Increment the lines processed.
> + */
> +#define INCREMENTPROCESSED(processed)  \
> +processed++;
> +
> +/*
> + * GETPROCESSED - Get the lines processed.
> + */
> +#define GETPROCESSED(processed) \
> +return processed;
> +
>
> I don't like converting above to macros.  I don't think converting
> such things to macros will buy us much.
>

This macro will be extended to in
0003-Allow-copy-from-command-to-process-data-from-file.patch:
+#define INCREMENTPROCESSED(processed) \
+{ \
+       if (!IsParallelCopy()) \
+               processed++; \
+       else \
+
pg_atomic_add_fetch_u64(&cstate->pcdata->pcshared_info->processed, 1);
\
+}

This need to be made to macro so that it can handle both parallel copy
and non parallel copy.
Retaining this as macro, if you insist I can move the change to
0003-Allow-copy-from-command-to-process-data-from-file.patch patch.


> 0002-Framework-for-leader-worker-in-parallel-copy
> 3.
>  /*
> + * Copy data block information.
> + */
> +typedef struct ParallelCopyDataBlock
>
> It is better to add a few comments atop this data structure to explain
> how it is used?
>

Fixed.

> 4.
> + * ParallelCopyLineBoundary is common data structure between leader & worker,
> + * this is protected by the following sequence in the leader & worker.
> + * Leader should operate in the following order:
> + * 1) update first_block, start_offset & cur_lineno in any order.
> + * 2) update line_size.
> + * 3) update line_state.
> + * Worker should operate in the following order:
> + * 1) read line_size.
> + * 2) only one worker should choose one line for processing, this is handled by
> + *    using pg_atomic_compare_exchange_u32, worker will change the sate to
> + *    LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED.
> + * 3) read first_block, start_offset & cur_lineno in any order.
> + */
> +typedef struct ParallelCopyLineBoundary
>
> Here, you have mentioned how workers and leader should operate to make
> sure access to the data is sane.  However, you have not explained what
> is the problem if they don't do so and it is not apparent to me.
> Also, it is not very clear what is the purpose of this data structure
> from comments.
>

Fixed

> 5.
> +/*
> + * Circular queue used to store the line information.
> + */
> +typedef struct ParallelCopyLineBoundaries
> +{
> + /* Position for the leader to populate a line. */
> + uint32 leader_pos;
>
> I don't think the variable needs to be named as leader_pos, it is okay
> to name it is as 'pos' as the comment above it explains its usage.
>

Fixed

> 7.
> +#define DATA_BLOCK_SIZE RAW_BUF_SIZE
> +#define RINGSIZE (10 * 1000)
> +#define MAX_BLOCKS_COUNT 1000
> +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */
>
> It would be good if you can write a few comments to explain why you
> have chosen these default values.
>

Fixed

> 8.
> ParallelCopyCommonKeyData, shall we name this as
> SerializedParallelCopyState or something like that?  For example, see
> SerializedSnapshotData which has been used to pass snapshot
> information to passed to workers.
>

Renamed as suggested

> 9.
> +CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData
> *shared_cstate)
>
> If you agree with point-8, then let's name this as
> SerializeParallelCopyState.  See, if there is more usage of similar
> types in the patch then lets change those as well.
>

Fixed

> 10.
> + * in the DSM. The specified number of workers will then be launched.
> + *
> + */
> +static ParallelContext*
> +BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid)
>
> No need of an extra line with only '*' in the above multi-line comment.
>

Fixed

> 11.
> BeginParallelCopy(..)
> {
> ..
> + EstimateLineKeysStr(pcxt, cstate->null_print);
> + EstimateLineKeysStr(pcxt, cstate->null_print_client);
> + EstimateLineKeysStr(pcxt, cstate->delim);
> + EstimateLineKeysStr(pcxt, cstate->quote);
> + EstimateLineKeysStr(pcxt, cstate->escape);
> ..
> }
>
> Why do we need to do this separately for each variable of cstate?
> Can't we serialize it along with other members of
> SerializeParallelCopyState (a new name for ParallelCopyCommonKeyData)?
>

These are variable length string variables, I felt we will not be able
to serialize along with other members and need to be serialized
separately.

> 12.
> BeginParallelCopy(..)
> {
> ..
> + LaunchParallelWorkers(pcxt);
> + if (pcxt->nworkers_launched == 0)
> + {
> + EndParallelCopy(pcxt);
> + elog(WARNING,
> + "No workers available, copy will be run in non-parallel mode");
> ..
> }
>
> I don't see the need to issue a WARNING if we are not able to launch
> workers.  We don't do that for other cases where we fail to launch
> workers.
>

Fixed

> 13.
> +}
> +/*
> + * ParallelCopyMain -
> ..
>
> +}
> +/*
> + * ParallelCopyLeader
>
> One line space is required before starting a new function.
>

Fixed

Please find the updated patch with the fixes included.


Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com

Attachment

pgsql-hackers by date:

Previous
From: vignesh C
Date:
Subject: Re: [PATCH] Performance Improvement For Copy From Binary Files
Next
From: Anastasia Lubennikova
Date:
Subject: Re: Using Valgrind to detect faulty buffer accesses (no pin or buffer content lock held)