Re: Parallel copy - Mailing list pgsql-hackers

From Andres Freund
Subject Re: Parallel copy
Date
Msg-id 20200603191448.ee6ypzbz2zvpvizi@alap3.anarazel.de
Whole thread Raw
In response to Re: Parallel copy  (vignesh C <vignesh21@gmail.com>)
Responses Re: Parallel copy  (vignesh C <vignesh21@gmail.com>)
List pgsql-hackers
Hi,

On 2020-06-03 15:53:24 +0530, vignesh C wrote:
> Workers/
> Exec time (seconds) copy from file,
> 2 indexes on integer columns
> 1 index on text column copy from stdin,
> 2 indexes on integer columns
> 1 index on text column copy from file, 1 gist index on text column copy
> from file,
> 3 indexes on integer columns copy from stdin, 3 indexes on integer columns
> 0 1162.772(1X) 1176.035(1X) 827.669(1X) 216.171(1X) 217.376(1X)
> 1 1110.288(1.05X) 1120.556(1.05X) 747.384(1.11X) 174.242(1.24X) 163.492(1.33X)
> 2 635.249(1.83X) 668.18(1.76X) 435.673(1.9X) 133.829(1.61X) 126.516(1.72X)
> 4 336.835(3.45X) 346.768(3.39X) 236.406(3.5X) 105.767(2.04X) 107.382(2.02X)
> 8 188.577(6.17X) 194.491(6.04X) 148.962(5.56X) 100.708(2.15X) 107.72(2.01X)
> 16 126.819(9.17X) 146.402(8.03X) 119.923(6.9X) 97.996(2.2X) 106.531(2.04X)
> 20 *117.845(9.87X)* 149.203(7.88X) 138.741(5.96X) 97.94(2.21X) 107.5(2.02)
> 30 127.554(9.11X) 161.218(7.29X) 172.443(4.8X) 98.232(2.2X) 108.778(1.99X)

Hm. you don't explicitly mention that in your design, but given how
small the benefits going from 0-1 workers is, I assume the leader
doesn't do any "chunk processing" on its own?


> Design of the Parallel Copy: The backend, to which the "COPY FROM" query is
> submitted acts as leader with the responsibility of reading data from the
> file/stdin, launching at most n number of workers as specified with
> PARALLEL 'n' option in the "COPY FROM" query. The leader populates the
> common data required for the workers execution in the DSM and shares it
> with the workers. The leader then executes before statement triggers if
> there exists any. Leader populates DSM chunks which includes the start
> offset and chunk size, while populating the chunks it reads as many blocks
> as required into the DSM data blocks from the file. Each block is of 64K
> size. The leader parses the data to identify a chunk, the existing logic
> from CopyReadLineText which identifies the chunks with some changes was
> used for this. Leader checks if a free chunk is available to copy the
> information, if there is no free chunk it waits till the required chunk is
> freed up by the worker and then copies the identified chunks information
> (offset & chunk size) into the DSM chunks. This process is repeated till
> the complete file is processed. Simultaneously, the workers cache the
> chunks(50) locally into the local memory and release the chunks to the
> leader for further populating. Each worker processes the chunk that it
> cached and inserts it into the table. The leader waits till all the chunks
> populated are processed by the workers and exits.

Why do we need the local copy of 50 chunks? Copying memory around is far
from free. I don't see why it'd be better to add per-process caching,
rather than making the DSM bigger? I can see some benefit in marking
multiple chunks as being processed with one lock acquisition, but I
don't think adding a memory copy is a good idea.


This patch *desperately* needs to be split up. It imo is close to
unreviewable, due to a large amount of changes that just move code
around without other functional changes being mixed in with the actual
new stuff.


>  /*
> + * State of the chunk.
> + */
> +typedef enum ChunkState
> +{
> +    CHUNK_INIT,                    /* initial state of chunk */
> +    CHUNK_LEADER_POPULATING,    /* leader processing chunk */
> +    CHUNK_LEADER_POPULATED,        /* leader completed populating chunk */
> +    CHUNK_WORKER_PROCESSING,    /* worker processing chunk */
> +    CHUNK_WORKER_PROCESSED        /* worker completed processing chunk */
> +}ChunkState;
> +
> +#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
> +
> +#define DATA_BLOCK_SIZE RAW_BUF_SIZE
> +#define RINGSIZE (10 * 1000)
> +#define MAX_BLOCKS_COUNT 1000
> +#define WORKER_CHUNK_COUNT 50    /* should be mod of RINGSIZE */
> +
> +#define    IsParallelCopy()        (cstate->is_parallel)
> +#define IsLeader()                (cstate->pcdata->is_leader)
> +#define IsHeaderLine()            (cstate->header_line && cstate->cur_lineno == 1)
> +
> +/*
> + * Copy data block information.
> + */
> +typedef struct CopyDataBlock
> +{
> +    /* The number of unprocessed chunks in the current block. */
> +    pg_atomic_uint32 unprocessed_chunk_parts;
> +
> +    /*
> +     * If the current chunk data is continued into another block,
> +     * following_block will have the position where the remaining data need to
> +     * be read.
> +     */
> +    uint32    following_block;
> +
> +    /*
> +     * This flag will be set, when the leader finds out this block can be read
> +     * safely by the worker. This helps the worker to start processing the chunk
> +     * early where the chunk will be spread across many blocks and the worker
> +     * need not wait for the complete chunk to be processed.
> +     */
> +    bool   curr_blk_completed;
> +    char   data[DATA_BLOCK_SIZE + 1]; /* data read from file */
> +}CopyDataBlock;

What's the + 1 here about?


> +/*
> + * Parallel copy line buffer information.
> + */
> +typedef struct ParallelCopyLineBuf
> +{
> +    StringInfoData        line_buf;
> +    uint64                cur_lineno;    /* line number for error messages */
> +}ParallelCopyLineBuf;

Why do we need separate infrastructure for this? We shouldn't duplicate
infrastructure unnecessarily.



> +/*
> + * Common information that need to be copied to shared memory.
> + */
> +typedef struct CopyWorkerCommonData
> +{

Why is parallel specific stuff here suddenly not named ParallelCopy*
anymore? If you introduce a naming like that it imo should be used
consistently.

> +    /* low-level state data */
> +    CopyDest            copy_dest;        /* type of copy source/destination */
> +    int                 file_encoding;    /* file or remote side's character encoding */
> +    bool                need_transcoding;    /* file encoding diff from server? */
> +    bool                encoding_embeds_ascii;    /* ASCII can be non-first byte? */
> +
> +    /* parameters from the COPY command */
> +    bool                csv_mode;        /* Comma Separated Value format? */
> +    bool                header_line;    /* CSV header line? */
> +    int                 null_print_len; /* length of same */
> +    bool                force_quote_all;    /* FORCE_QUOTE *? */
> +    bool                convert_selectively;    /* do selective binary conversion? */
> +
> +    /* Working state for COPY FROM */
> +    AttrNumber          num_defaults;
> +    Oid                 relid;
> +}CopyWorkerCommonData;

But I actually think we shouldn't have this information in two different
structs. This should exist once, independent of using parallel /
non-parallel copy.


> +/* List information */
> +typedef struct ListInfo
> +{
> +    int    count;        /* count of attributes */
> +
> +    /* string info in the form info followed by info1, info2... infon  */
> +    char    info[1];
> +} ListInfo;

Based on these comments I have no idea what this could be for.


>  /*
> - * This keeps the character read at the top of the loop in the buffer
> - * even if there is more than one read-ahead.
> + * This keeps the character read at the top of the loop in the buffer
> + * even if there is more than one read-ahead.
> + */
> +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
> +if (1) \
> +{ \
> +    if (copy_buff_state.raw_buf_ptr + (extralen) >= copy_buff_state.copy_buf_len && !hit_eof) \
> +    { \
> +        if (IsParallelCopy()) \
> +        { \
> +            copy_buff_state.chunk_size = prev_chunk_size; /* update previous chunk size */ \
> +            if (copy_buff_state.block_switched) \
> +            { \
> +                pg_atomic_sub_fetch_u32(©_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1); \
> +                copy_buff_state.copy_buf_len = prev_copy_buf_len; \
> +            } \
> +        } \
> +        copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
> +        need_data = true; \
> +        continue; \
> +    } \
> +} else ((void) 0)

I think it's an absolutely clear no-go to add new branches to
these. They're *really* hot already, and this is going to sprinkle a
significant amount of new instructions over a lot of places.


> +/*
> + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file data must
> + * be read.
> + */
> +#define SET_RAWBUF_FOR_LOAD() \
> +{ \
> +    ShmCopyInfo    *pcshared_info = cstate->pcdata->pcshared_info; \
> +    uint32 cur_block_pos; \
> +    /* \
> +     * Mark the previous block as completed, worker can start copying this data. \
> +     */ \
> +    if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr && \
> +        copy_buff_state.data_blk_ptr->curr_blk_completed == false) \
> +        copy_buff_state.data_blk_ptr->curr_blk_completed = true; \
> +    \
> +    copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> +    cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \
> +    copy_buff_state.curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; \
> +    \
> +    if (!copy_buff_state.data_blk_ptr) \
> +    { \
> +        copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> +        chunk_first_block = cur_block_pos; \
> +    } \
> +    else if (need_data == false) \
> +        copy_buff_state.data_blk_ptr->following_block = cur_block_pos; \
> +    \
> +    cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \
> +    copy_buff_state.copy_raw_buf = cstate->raw_buf; \
> +}
> +
> +/*
> + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory.
> + */
> +#define END_CHUNK_PARALLEL_COPY() \
> +{ \
> +    if (!IsHeaderLine()) \
> +    { \
> +        ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \
> +        ChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; \
> +        if (copy_buff_state.chunk_size) \
> +        { \
> +            ChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> +            /* \
> +             * If raw_buf_ptr is zero, unprocessed_chunk_parts would have been \
> +             * incremented in SEEK_COPY_BUFF_POS. This will happen if the whole \
> +             * chunk finishes at the end of the current block. If the \
> +             * new_line_size > raw_buf_ptr, then the new block has only new line \
> +             * char content. The unprocessed count should not be increased in \
> +             * this case. \
> +             */ \
> +            if (copy_buff_state.raw_buf_ptr != 0 && \
> +                copy_buff_state.raw_buf_ptr > new_line_size) \
> +                pg_atomic_add_fetch_u32(©_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts, 1); \
> +            \
> +            /* Update chunk size. */ \
> +            pg_atomic_write_u32(&chunkInfo->chunk_size, copy_buff_state.chunk_size); \
> +            pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED); \
> +            elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d", \
> +                        chunk_pos, copy_buff_state.chunk_size); \
> +            pcshared_info->populated++; \
> +        } \
> +        else if (new_line_size) \
> +        { \
> +            /* \
> +             * This means only new line char, empty record should be \
> +             * inserted. \
> +             */ \
> +            ChunkBoundary *chunkInfo; \
> +            chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, \
> +                                               CHUNK_LEADER_POPULATED); \
> +            chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> +            elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d", \
> +                         chunkInfo->start_offset, chunk_pos, \
> +                         pg_atomic_read_u32(&chunkInfo->chunk_size)); \
> +            pcshared_info->populated++; \
> +        } \
> +    }\
> +    \
> +    /*\
> +     * All of the read data is processed, reset index & len. In the\
> +     * subsequent read, we will get a new block and copy data in to the\
> +     * new block.\
> +     */\
> +    if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\
> +    {\
> +        cstate->raw_buf_index = 0;\
> +        cstate->raw_buf_len = 0;\
> +    }\
> +    else\
> +        cstate->raw_buf_len = copy_buff_state.copy_buf_len;\
> +}

Why are these macros? They are way way way above a length where that
makes any sort of sense.


Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Expand the use of check_canonical_path() for more GUCs
Next
From: Tomas Vondra
Date:
Subject: Re: significant slowdown of HashAggregate between 9.6 and 10