Re: Parallel copy - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | 20200603191448.ee6ypzbz2zvpvizi@alap3.anarazel.de Whole thread Raw |
In response to | Re: Parallel copy (vignesh C <vignesh21@gmail.com>) |
Responses |
Re: Parallel copy
|
List | pgsql-hackers |
Hi, On 2020-06-03 15:53:24 +0530, vignesh C wrote: > Workers/ > Exec time (seconds) copy from file, > 2 indexes on integer columns > 1 index on text column copy from stdin, > 2 indexes on integer columns > 1 index on text column copy from file, 1 gist index on text column copy > from file, > 3 indexes on integer columns copy from stdin, 3 indexes on integer columns > 0 1162.772(1X) 1176.035(1X) 827.669(1X) 216.171(1X) 217.376(1X) > 1 1110.288(1.05X) 1120.556(1.05X) 747.384(1.11X) 174.242(1.24X) 163.492(1.33X) > 2 635.249(1.83X) 668.18(1.76X) 435.673(1.9X) 133.829(1.61X) 126.516(1.72X) > 4 336.835(3.45X) 346.768(3.39X) 236.406(3.5X) 105.767(2.04X) 107.382(2.02X) > 8 188.577(6.17X) 194.491(6.04X) 148.962(5.56X) 100.708(2.15X) 107.72(2.01X) > 16 126.819(9.17X) 146.402(8.03X) 119.923(6.9X) 97.996(2.2X) 106.531(2.04X) > 20 *117.845(9.87X)* 149.203(7.88X) 138.741(5.96X) 97.94(2.21X) 107.5(2.02) > 30 127.554(9.11X) 161.218(7.29X) 172.443(4.8X) 98.232(2.2X) 108.778(1.99X) Hm. you don't explicitly mention that in your design, but given how small the benefits going from 0-1 workers is, I assume the leader doesn't do any "chunk processing" on its own? > Design of the Parallel Copy: The backend, to which the "COPY FROM" query is > submitted acts as leader with the responsibility of reading data from the > file/stdin, launching at most n number of workers as specified with > PARALLEL 'n' option in the "COPY FROM" query. The leader populates the > common data required for the workers execution in the DSM and shares it > with the workers. The leader then executes before statement triggers if > there exists any. Leader populates DSM chunks which includes the start > offset and chunk size, while populating the chunks it reads as many blocks > as required into the DSM data blocks from the file. Each block is of 64K > size. The leader parses the data to identify a chunk, the existing logic > from CopyReadLineText which identifies the chunks with some changes was > used for this. Leader checks if a free chunk is available to copy the > information, if there is no free chunk it waits till the required chunk is > freed up by the worker and then copies the identified chunks information > (offset & chunk size) into the DSM chunks. This process is repeated till > the complete file is processed. Simultaneously, the workers cache the > chunks(50) locally into the local memory and release the chunks to the > leader for further populating. Each worker processes the chunk that it > cached and inserts it into the table. The leader waits till all the chunks > populated are processed by the workers and exits. Why do we need the local copy of 50 chunks? Copying memory around is far from free. I don't see why it'd be better to add per-process caching, rather than making the DSM bigger? I can see some benefit in marking multiple chunks as being processed with one lock acquisition, but I don't think adding a memory copy is a good idea. This patch *desperately* needs to be split up. It imo is close to unreviewable, due to a large amount of changes that just move code around without other functional changes being mixed in with the actual new stuff. > /* > + * State of the chunk. > + */ > +typedef enum ChunkState > +{ > + CHUNK_INIT, /* initial state of chunk */ > + CHUNK_LEADER_POPULATING, /* leader processing chunk */ > + CHUNK_LEADER_POPULATED, /* leader completed populating chunk */ > + CHUNK_WORKER_PROCESSING, /* worker processing chunk */ > + CHUNK_WORKER_PROCESSED /* worker completed processing chunk */ > +}ChunkState; > + > +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ > + > +#define DATA_BLOCK_SIZE RAW_BUF_SIZE > +#define RINGSIZE (10 * 1000) > +#define MAX_BLOCKS_COUNT 1000 > +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */ > + > +#define IsParallelCopy() (cstate->is_parallel) > +#define IsLeader() (cstate->pcdata->is_leader) > +#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) > + > +/* > + * Copy data block information. > + */ > +typedef struct CopyDataBlock > +{ > + /* The number of unprocessed chunks in the current block. */ > + pg_atomic_uint32 unprocessed_chunk_parts; > + > + /* > + * If the current chunk data is continued into another block, > + * following_block will have the position where the remaining data need to > + * be read. > + */ > + uint32 following_block; > + > + /* > + * This flag will be set, when the leader finds out this block can be read > + * safely by the worker. This helps the worker to start processing the chunk > + * early where the chunk will be spread across many blocks and the worker > + * need not wait for the complete chunk to be processed. > + */ > + bool curr_blk_completed; > + char data[DATA_BLOCK_SIZE + 1]; /* data read from file */ > +}CopyDataBlock; What's the + 1 here about? > +/* > + * Parallel copy line buffer information. > + */ > +typedef struct ParallelCopyLineBuf > +{ > + StringInfoData line_buf; > + uint64 cur_lineno; /* line number for error messages */ > +}ParallelCopyLineBuf; Why do we need separate infrastructure for this? We shouldn't duplicate infrastructure unnecessarily. > +/* > + * Common information that need to be copied to shared memory. > + */ > +typedef struct CopyWorkerCommonData > +{ Why is parallel specific stuff here suddenly not named ParallelCopy* anymore? If you introduce a naming like that it imo should be used consistently. > + /* low-level state data */ > + CopyDest copy_dest; /* type of copy source/destination */ > + int file_encoding; /* file or remote side's character encoding */ > + bool need_transcoding; /* file encoding diff from server? */ > + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ > + > + /* parameters from the COPY command */ > + bool csv_mode; /* Comma Separated Value format? */ > + bool header_line; /* CSV header line? */ > + int null_print_len; /* length of same */ > + bool force_quote_all; /* FORCE_QUOTE *? */ > + bool convert_selectively; /* do selective binary conversion? */ > + > + /* Working state for COPY FROM */ > + AttrNumber num_defaults; > + Oid relid; > +}CopyWorkerCommonData; But I actually think we shouldn't have this information in two different structs. This should exist once, independent of using parallel / non-parallel copy. > +/* List information */ > +typedef struct ListInfo > +{ > + int count; /* count of attributes */ > + > + /* string info in the form info followed by info1, info2... infon */ > + char info[1]; > +} ListInfo; Based on these comments I have no idea what this could be for. > /* > - * This keeps the character read at the top of the loop in the buffer > - * even if there is more than one read-ahead. > + * This keeps the character read at the top of the loop in the buffer > + * even if there is more than one read-ahead. > + */ > +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ > +if (1) \ > +{ \ > + if (copy_buff_state.raw_buf_ptr + (extralen) >= copy_buff_state.copy_buf_len && !hit_eof) \ > + { \ > + if (IsParallelCopy()) \ > + { \ > + copy_buff_state.chunk_size = prev_chunk_size; /* update previous chunk size */ \ > + if (copy_buff_state.block_switched) \ > + { \ > + pg_atomic_sub_fetch_u32(©_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1); \ > + copy_buff_state.copy_buf_len = prev_copy_buf_len; \ > + } \ > + } \ > + copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ > + need_data = true; \ > + continue; \ > + } \ > +} else ((void) 0) I think it's an absolutely clear no-go to add new branches to these. They're *really* hot already, and this is going to sprinkle a significant amount of new instructions over a lot of places. > +/* > + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file data must > + * be read. > + */ > +#define SET_RAWBUF_FOR_LOAD() \ > +{ \ > + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \ > + uint32 cur_block_pos; \ > + /* \ > + * Mark the previous block as completed, worker can start copying this data. \ > + */ \ > + if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr && \ > + copy_buff_state.data_blk_ptr->curr_blk_completed == false) \ > + copy_buff_state.data_blk_ptr->curr_blk_completed = true; \ > + \ > + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \ > + cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \ > + copy_buff_state.curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; \ > + \ > + if (!copy_buff_state.data_blk_ptr) \ > + { \ > + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \ > + chunk_first_block = cur_block_pos; \ > + } \ > + else if (need_data == false) \ > + copy_buff_state.data_blk_ptr->following_block = cur_block_pos; \ > + \ > + cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \ > + copy_buff_state.copy_raw_buf = cstate->raw_buf; \ > +} > + > +/* > + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory. > + */ > +#define END_CHUNK_PARALLEL_COPY() \ > +{ \ > + if (!IsHeaderLine()) \ > + { \ > + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \ > + ChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; \ > + if (copy_buff_state.chunk_size) \ > + { \ > + ChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \ > + /* \ > + * If raw_buf_ptr is zero, unprocessed_chunk_parts would have been \ > + * incremented in SEEK_COPY_BUFF_POS. This will happen if the whole \ > + * chunk finishes at the end of the current block. If the \ > + * new_line_size > raw_buf_ptr, then the new block has only new line \ > + * char content. The unprocessed count should not be increased in \ > + * this case. \ > + */ \ > + if (copy_buff_state.raw_buf_ptr != 0 && \ > + copy_buff_state.raw_buf_ptr > new_line_size) \ > + pg_atomic_add_fetch_u32(©_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts, 1); \ > + \ > + /* Update chunk size. */ \ > + pg_atomic_write_u32(&chunkInfo->chunk_size, copy_buff_state.chunk_size); \ > + pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED); \ > + elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d", \ > + chunk_pos, copy_buff_state.chunk_size); \ > + pcshared_info->populated++; \ > + } \ > + else if (new_line_size) \ > + { \ > + /* \ > + * This means only new line char, empty record should be \ > + * inserted. \ > + */ \ > + ChunkBoundary *chunkInfo; \ > + chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, \ > + CHUNK_LEADER_POPULATED); \ > + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \ > + elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d", \ > + chunkInfo->start_offset, chunk_pos, \ > + pg_atomic_read_u32(&chunkInfo->chunk_size)); \ > + pcshared_info->populated++; \ > + } \ > + }\ > + \ > + /*\ > + * All of the read data is processed, reset index & len. In the\ > + * subsequent read, we will get a new block and copy data in to the\ > + * new block.\ > + */\ > + if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\ > + {\ > + cstate->raw_buf_index = 0;\ > + cstate->raw_buf_len = 0;\ > + }\ > + else\ > + cstate->raw_buf_len = copy_buff_state.copy_buf_len;\ > +} Why are these macros? They are way way way above a length where that makes any sort of sense. Greetings, Andres Freund
pgsql-hackers by date: