Re: Parallel copy - Mailing list pgsql-hackers

From vignesh C
Subject Re: Parallel copy
Date
Msg-id CALDaNm3uyHpD9sKoFtB0EnMO8DLuD6H9pReFm=tm=9ccEWuUVQ@mail.gmail.com
Whole thread Raw
In response to Re: Parallel copy  (Andres Freund <andres@anarazel.de>)
Responses Re: Parallel copy
Re: Parallel copy
List pgsql-hackers
On Thu, Jun 4, 2020 at 12:44 AM Andres Freund <andres@anarazel.de> wrote
>
>
> Hm. you don't explicitly mention that in your design, but given how
> small the benefits going from 0-1 workers is, I assume the leader
> doesn't do any "chunk processing" on its own?
>

Yes you are right, the leader does not do any processing, Leader's
work is mainly to populate the shared memory with the offset
information for each record.

>
>
> > Design of the Parallel Copy: The backend, to which the "COPY FROM" query is
> > submitted acts as leader with the responsibility of reading data from the
> > file/stdin, launching at most n number of workers as specified with
> > PARALLEL 'n' option in the "COPY FROM" query. The leader populates the
> > common data required for the workers execution in the DSM and shares it
> > with the workers. The leader then executes before statement triggers if
> > there exists any. Leader populates DSM chunks which includes the start
> > offset and chunk size, while populating the chunks it reads as many blocks
> > as required into the DSM data blocks from the file. Each block is of 64K
> > size. The leader parses the data to identify a chunk, the existing logic
> > from CopyReadLineText which identifies the chunks with some changes was
> > used for this. Leader checks if a free chunk is available to copy the
> > information, if there is no free chunk it waits till the required chunk is
> > freed up by the worker and then copies the identified chunks information
> > (offset & chunk size) into the DSM chunks. This process is repeated till
> > the complete file is processed. Simultaneously, the workers cache the
> > chunks(50) locally into the local memory and release the chunks to the
> > leader for further populating. Each worker processes the chunk that it
> > cached and inserts it into the table. The leader waits till all the chunks
> > populated are processed by the workers and exits.
>
> Why do we need the local copy of 50 chunks? Copying memory around is far
> from free. I don't see why it'd be better to add per-process caching,
> rather than making the DSM bigger? I can see some benefit in marking
> multiple chunks as being processed with one lock acquisition, but I
> don't think adding a memory copy is a good idea.

We had run performance with  csv data file, 5.1GB, 10million tuples, 2
indexes on integer columns, results for the same are given below. We
noticed in some cases the performance is better if we copy the 50
records locally and release the shared memory. We will get better
benefits as the workers increase. Thoughts?
------------------------------------------------------------------------------------------------
Workers  | Exec time (With local copying | Exec time (Without copying,
               | 50 records & release the         | processing record by record)
               | shared memory)                      |
------------------------------------------------------------------------------------------------
0             |   1162.772(1X)                        |       1152.684(1X)
2             |   635.249(1.83X)                     |       647.894(1.78X)
4             |   336.835(3.45X)                     |       335.534(3.43X)
8             |   188.577(6.17 X)                    |       189.461(6.08X)
16           |   126.819(9.17X)                     |       142.730(8.07X)
20           |   117.845(9.87X)                     |       146.533(7.87X)
30           |   127.554(9.11X)                     |       160.307(7.19X)

> This patch *desperately* needs to be split up. It imo is close to
> unreviewable, due to a large amount of changes that just move code
> around without other functional changes being mixed in with the actual
> new stuff.

I have split the patch, the new split patches are attached.

>
>
>
> >  /*
> > + * State of the chunk.
> > + */
> > +typedef enum ChunkState
> > +{
> > +     CHUNK_INIT,                                     /* initial state of chunk */
> > +     CHUNK_LEADER_POPULATING,        /* leader processing chunk */
> > +     CHUNK_LEADER_POPULATED,         /* leader completed populating chunk */
> > +     CHUNK_WORKER_PROCESSING,        /* worker processing chunk */
> > +     CHUNK_WORKER_PROCESSED          /* worker completed processing chunk */
> > +}ChunkState;
> > +
> > +#define RAW_BUF_SIZE 65536           /* we palloc RAW_BUF_SIZE+1 bytes */
> > +
> > +#define DATA_BLOCK_SIZE RAW_BUF_SIZE
> > +#define RINGSIZE (10 * 1000)
> > +#define MAX_BLOCKS_COUNT 1000
> > +#define WORKER_CHUNK_COUNT 50        /* should be mod of RINGSIZE */
> > +
> > +#define      IsParallelCopy()                (cstate->is_parallel)
> > +#define IsLeader()                           (cstate->pcdata->is_leader)
> > +#define IsHeaderLine()                       (cstate->header_line && cstate->cur_lineno == 1)
> > +
> > +/*
> > + * Copy data block information.
> > + */
> > +typedef struct CopyDataBlock
> > +{
> > +     /* The number of unprocessed chunks in the current block. */
> > +     pg_atomic_uint32 unprocessed_chunk_parts;
> > +
> > +     /*
> > +      * If the current chunk data is continued into another block,
> > +      * following_block will have the position where the remaining data need to
> > +      * be read.
> > +      */
> > +     uint32  following_block;
> > +
> > +     /*
> > +      * This flag will be set, when the leader finds out this block can be read
> > +      * safely by the worker. This helps the worker to start processing the chunk
> > +      * early where the chunk will be spread across many blocks and the worker
> > +      * need not wait for the complete chunk to be processed.
> > +      */
> > +     bool   curr_blk_completed;
> > +     char   data[DATA_BLOCK_SIZE + 1]; /* data read from file */
> > +}CopyDataBlock;
>
> What's the + 1 here about?

Fixed this, removed +1. That is not needed.

>
>
> > +/*
> > + * Parallel copy line buffer information.
> > + */
> > +typedef struct ParallelCopyLineBuf
> > +{
> > +     StringInfoData          line_buf;
> > +     uint64                          cur_lineno;     /* line number for error messages */
> > +}ParallelCopyLineBuf;
>
> Why do we need separate infrastructure for this? We shouldn't duplicate
> infrastructure unnecessarily.
>

This was required for copying the multiple records locally and
releasing the shared memory. I have not changed this, will decide on
this based on the decision taken for one of the previous comments.

>
>
>
> > +/*
> > + * Common information that need to be copied to shared memory.
> > + */
> > +typedef struct CopyWorkerCommonData
> > +{
>
> Why is parallel specific stuff here suddenly not named ParallelCopy*
> anymore? If you introduce a naming like that it imo should be used
> consistently.

Fixed, changed to maintain ParallelCopy in all structs.

>
> > +     /* low-level state data */
> > +     CopyDest            copy_dest;          /* type of copy source/destination */
> > +     int                 file_encoding;      /* file or remote side's character encoding */
> > +     bool                need_transcoding;   /* file encoding diff from server? */
> > +     bool                encoding_embeds_ascii;      /* ASCII can be non-first byte? */
> > +
> > +     /* parameters from the COPY command */
> > +     bool                csv_mode;           /* Comma Separated Value format? */
> > +     bool                header_line;        /* CSV header line? */
> > +     int                 null_print_len; /* length of same */
> > +     bool                force_quote_all;    /* FORCE_QUOTE *? */
> > +     bool                convert_selectively;        /* do selective binary conversion? */
> > +
> > +     /* Working state for COPY FROM */
> > +     AttrNumber          num_defaults;
> > +     Oid                 relid;
> > +}CopyWorkerCommonData;
>
> But I actually think we shouldn't have this information in two different
> structs. This should exist once, independent of using parallel /
> non-parallel copy.
>

This structure helps in storing the common data from CopyStateData
that are required by the workers. This information will then be
allocated and stored into the DSM for the worker to retrieve and copy
it to CopyStateData.

>
> > +/* List information */
> > +typedef struct ListInfo
> > +{
> > +     int     count;          /* count of attributes */
> > +
> > +     /* string info in the form info followed by info1, info2... infon  */
> > +     char    info[1];
> > +} ListInfo;
>
> Based on these comments I have no idea what this could be for.
>

Have added better comments for this. The following is added: This
structure will help in converting a List data type into the below
structure format with the count having the number of elements in the
list and the info having the List elements appended contiguously. This
converted structure will be allocated in shared memory and stored in
DSM for the worker to retrieve and later convert it back to List data
type.

>
> >  /*
> > - * This keeps the character read at the top of the loop in the buffer
> > - * even if there is more than one read-ahead.
> > + * This keeps the character read at the top of the loop in the buffer
> > + * even if there is more than one read-ahead.
> > + */
> > +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
> > +if (1) \
> > +{ \
> > +     if (copy_buff_state.raw_buf_ptr + (extralen) >= copy_buff_state.copy_buf_len && !hit_eof) \
> > +     { \
> > +             if (IsParallelCopy()) \
> > +             { \
> > +                     copy_buff_state.chunk_size = prev_chunk_size; /* update previous chunk size */ \
> > +                     if (copy_buff_state.block_switched) \
> > +                     { \
> > +                             pg_atomic_sub_fetch_u32(©_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1);
\
> > +                             copy_buff_state.copy_buf_len = prev_copy_buf_len; \
> > +                     } \
> > +             } \
> > +             copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
> > +             need_data = true; \
> > +             continue; \
> > +     } \
> > +} else ((void) 0)
>
> I think it's an absolutely clear no-go to add new branches to
> these. They're *really* hot already, and this is going to sprinkle a
> significant amount of new instructions over a lot of places.
>

Fixed, removed this.

>
>
> > +/*
> > + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file data must
> > + * be read.
> > + */
> > +#define SET_RAWBUF_FOR_LOAD() \
> > +{ \
> > +     ShmCopyInfo     *pcshared_info = cstate->pcdata->pcshared_info; \
> > +     uint32 cur_block_pos; \
> > +     /* \
> > +      * Mark the previous block as completed, worker can start copying this data. \
> > +      */ \
> > +     if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr && \
> > +             copy_buff_state.data_blk_ptr->curr_blk_completed == false) \
> > +             copy_buff_state.data_blk_ptr->curr_blk_completed = true; \
> > +     \
> > +     copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> > +     cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \
> > +     copy_buff_state.curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; \
> > +     \
> > +     if (!copy_buff_state.data_blk_ptr) \
> > +     { \
> > +             copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> > +             chunk_first_block = cur_block_pos; \
> > +     } \
> > +     else if (need_data == false) \
> > +             copy_buff_state.data_blk_ptr->following_block = cur_block_pos; \
> > +     \
> > +     cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \
> > +     copy_buff_state.copy_raw_buf = cstate->raw_buf; \
> > +}
> > +
> > +/*
> > + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory.
> > + */
> > +#define END_CHUNK_PARALLEL_COPY() \
> > +{ \
> > +     if (!IsHeaderLine()) \
> > +     { \
> > +             ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \
> > +             ChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; \
> > +             if (copy_buff_state.chunk_size) \
> > +             { \
> > +                     ChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> > +                     /* \
> > +                      * If raw_buf_ptr is zero, unprocessed_chunk_parts would have been \
> > +                      * incremented in SEEK_COPY_BUFF_POS. This will happen if the whole \
> > +                      * chunk finishes at the end of the current block. If the \
> > +                      * new_line_size > raw_buf_ptr, then the new block has only new line \
> > +                      * char content. The unprocessed count should not be increased in \
> > +                      * this case. \
> > +                      */ \
> > +                     if (copy_buff_state.raw_buf_ptr != 0 && \
> > +                             copy_buff_state.raw_buf_ptr > new_line_size) \
> > +                             pg_atomic_add_fetch_u32(©_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts,
1);\
 
> > +                     \
> > +                     /* Update chunk size. */ \
> > +                     pg_atomic_write_u32(&chunkInfo->chunk_size, copy_buff_state.chunk_size); \
> > +                     pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED); \
> > +                     elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d", \
> > +                                             chunk_pos, copy_buff_state.chunk_size); \
> > +                     pcshared_info->populated++; \
> > +             } \
> > +             else if (new_line_size) \
> > +             { \
> > +                     /* \
> > +                      * This means only new line char, empty record should be \
> > +                      * inserted. \
> > +                      */ \
> > +                     ChunkBoundary *chunkInfo; \
> > +                     chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, \
> > +                                                                                        CHUNK_LEADER_POPULATED);
\
> > +                     chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> > +                     elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d",
\
> > +                                              chunkInfo->start_offset, chunk_pos, \
> > +                                              pg_atomic_read_u32(&chunkInfo->chunk_size)); \
> > +                     pcshared_info->populated++; \
> > +             } \
> > +     }\
> > +     \
> > +     /*\
> > +      * All of the read data is processed, reset index & len. In the\
> > +      * subsequent read, we will get a new block and copy data in to the\
> > +      * new block.\
> > +      */\
> > +     if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\
> > +     {\
> > +             cstate->raw_buf_index = 0;\
> > +             cstate->raw_buf_len = 0;\
> > +     }\
> > +     else\
> > +             cstate->raw_buf_len = copy_buff_state.copy_buf_len;\
> > +}
>
> Why are these macros? They are way way way above a length where that
> makes any sort of sense.
>

Converted these macros to functions.


Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com

Attachment

pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: Resetting spilled txn statistics in pg_stat_replication
Next
From: Masahiko Sawada
Date:
Subject: Re: Resetting spilled txn statistics in pg_stat_replication