Re: Parallel copy - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | CALDaNm3uyHpD9sKoFtB0EnMO8DLuD6H9pReFm=tm=9ccEWuUVQ@mail.gmail.com Whole thread Raw |
In response to | Re: Parallel copy (Andres Freund <andres@anarazel.de>) |
Responses |
Re: Parallel copy
Re: Parallel copy |
List | pgsql-hackers |
On Thu, Jun 4, 2020 at 12:44 AM Andres Freund <andres@anarazel.de> wrote > > > Hm. you don't explicitly mention that in your design, but given how > small the benefits going from 0-1 workers is, I assume the leader > doesn't do any "chunk processing" on its own? > Yes you are right, the leader does not do any processing, Leader's work is mainly to populate the shared memory with the offset information for each record. > > > > Design of the Parallel Copy: The backend, to which the "COPY FROM" query is > > submitted acts as leader with the responsibility of reading data from the > > file/stdin, launching at most n number of workers as specified with > > PARALLEL 'n' option in the "COPY FROM" query. The leader populates the > > common data required for the workers execution in the DSM and shares it > > with the workers. The leader then executes before statement triggers if > > there exists any. Leader populates DSM chunks which includes the start > > offset and chunk size, while populating the chunks it reads as many blocks > > as required into the DSM data blocks from the file. Each block is of 64K > > size. The leader parses the data to identify a chunk, the existing logic > > from CopyReadLineText which identifies the chunks with some changes was > > used for this. Leader checks if a free chunk is available to copy the > > information, if there is no free chunk it waits till the required chunk is > > freed up by the worker and then copies the identified chunks information > > (offset & chunk size) into the DSM chunks. This process is repeated till > > the complete file is processed. Simultaneously, the workers cache the > > chunks(50) locally into the local memory and release the chunks to the > > leader for further populating. Each worker processes the chunk that it > > cached and inserts it into the table. The leader waits till all the chunks > > populated are processed by the workers and exits. > > Why do we need the local copy of 50 chunks? Copying memory around is far > from free. I don't see why it'd be better to add per-process caching, > rather than making the DSM bigger? I can see some benefit in marking > multiple chunks as being processed with one lock acquisition, but I > don't think adding a memory copy is a good idea. We had run performance with csv data file, 5.1GB, 10million tuples, 2 indexes on integer columns, results for the same are given below. We noticed in some cases the performance is better if we copy the 50 records locally and release the shared memory. We will get better benefits as the workers increase. Thoughts? ------------------------------------------------------------------------------------------------ Workers | Exec time (With local copying | Exec time (Without copying, | 50 records & release the | processing record by record) | shared memory) | ------------------------------------------------------------------------------------------------ 0 | 1162.772(1X) | 1152.684(1X) 2 | 635.249(1.83X) | 647.894(1.78X) 4 | 336.835(3.45X) | 335.534(3.43X) 8 | 188.577(6.17 X) | 189.461(6.08X) 16 | 126.819(9.17X) | 142.730(8.07X) 20 | 117.845(9.87X) | 146.533(7.87X) 30 | 127.554(9.11X) | 160.307(7.19X) > This patch *desperately* needs to be split up. It imo is close to > unreviewable, due to a large amount of changes that just move code > around without other functional changes being mixed in with the actual > new stuff. I have split the patch, the new split patches are attached. > > > > > /* > > + * State of the chunk. > > + */ > > +typedef enum ChunkState > > +{ > > + CHUNK_INIT, /* initial state of chunk */ > > + CHUNK_LEADER_POPULATING, /* leader processing chunk */ > > + CHUNK_LEADER_POPULATED, /* leader completed populating chunk */ > > + CHUNK_WORKER_PROCESSING, /* worker processing chunk */ > > + CHUNK_WORKER_PROCESSED /* worker completed processing chunk */ > > +}ChunkState; > > + > > +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ > > + > > +#define DATA_BLOCK_SIZE RAW_BUF_SIZE > > +#define RINGSIZE (10 * 1000) > > +#define MAX_BLOCKS_COUNT 1000 > > +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */ > > + > > +#define IsParallelCopy() (cstate->is_parallel) > > +#define IsLeader() (cstate->pcdata->is_leader) > > +#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) > > + > > +/* > > + * Copy data block information. > > + */ > > +typedef struct CopyDataBlock > > +{ > > + /* The number of unprocessed chunks in the current block. */ > > + pg_atomic_uint32 unprocessed_chunk_parts; > > + > > + /* > > + * If the current chunk data is continued into another block, > > + * following_block will have the position where the remaining data need to > > + * be read. > > + */ > > + uint32 following_block; > > + > > + /* > > + * This flag will be set, when the leader finds out this block can be read > > + * safely by the worker. This helps the worker to start processing the chunk > > + * early where the chunk will be spread across many blocks and the worker > > + * need not wait for the complete chunk to be processed. > > + */ > > + bool curr_blk_completed; > > + char data[DATA_BLOCK_SIZE + 1]; /* data read from file */ > > +}CopyDataBlock; > > What's the + 1 here about? Fixed this, removed +1. That is not needed. > > > > +/* > > + * Parallel copy line buffer information. > > + */ > > +typedef struct ParallelCopyLineBuf > > +{ > > + StringInfoData line_buf; > > + uint64 cur_lineno; /* line number for error messages */ > > +}ParallelCopyLineBuf; > > Why do we need separate infrastructure for this? We shouldn't duplicate > infrastructure unnecessarily. > This was required for copying the multiple records locally and releasing the shared memory. I have not changed this, will decide on this based on the decision taken for one of the previous comments. > > > > > +/* > > + * Common information that need to be copied to shared memory. > > + */ > > +typedef struct CopyWorkerCommonData > > +{ > > Why is parallel specific stuff here suddenly not named ParallelCopy* > anymore? If you introduce a naming like that it imo should be used > consistently. Fixed, changed to maintain ParallelCopy in all structs. > > > + /* low-level state data */ > > + CopyDest copy_dest; /* type of copy source/destination */ > > + int file_encoding; /* file or remote side's character encoding */ > > + bool need_transcoding; /* file encoding diff from server? */ > > + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ > > + > > + /* parameters from the COPY command */ > > + bool csv_mode; /* Comma Separated Value format? */ > > + bool header_line; /* CSV header line? */ > > + int null_print_len; /* length of same */ > > + bool force_quote_all; /* FORCE_QUOTE *? */ > > + bool convert_selectively; /* do selective binary conversion? */ > > + > > + /* Working state for COPY FROM */ > > + AttrNumber num_defaults; > > + Oid relid; > > +}CopyWorkerCommonData; > > But I actually think we shouldn't have this information in two different > structs. This should exist once, independent of using parallel / > non-parallel copy. > This structure helps in storing the common data from CopyStateData that are required by the workers. This information will then be allocated and stored into the DSM for the worker to retrieve and copy it to CopyStateData. > > > +/* List information */ > > +typedef struct ListInfo > > +{ > > + int count; /* count of attributes */ > > + > > + /* string info in the form info followed by info1, info2... infon */ > > + char info[1]; > > +} ListInfo; > > Based on these comments I have no idea what this could be for. > Have added better comments for this. The following is added: This structure will help in converting a List data type into the below structure format with the count having the number of elements in the list and the info having the List elements appended contiguously. This converted structure will be allocated in shared memory and stored in DSM for the worker to retrieve and later convert it back to List data type. > > > /* > > - * This keeps the character read at the top of the loop in the buffer > > - * even if there is more than one read-ahead. > > + * This keeps the character read at the top of the loop in the buffer > > + * even if there is more than one read-ahead. > > + */ > > +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ > > +if (1) \ > > +{ \ > > + if (copy_buff_state.raw_buf_ptr + (extralen) >= copy_buff_state.copy_buf_len && !hit_eof) \ > > + { \ > > + if (IsParallelCopy()) \ > > + { \ > > + copy_buff_state.chunk_size = prev_chunk_size; /* update previous chunk size */ \ > > + if (copy_buff_state.block_switched) \ > > + { \ > > + pg_atomic_sub_fetch_u32(©_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1); \ > > + copy_buff_state.copy_buf_len = prev_copy_buf_len; \ > > + } \ > > + } \ > > + copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ > > + need_data = true; \ > > + continue; \ > > + } \ > > +} else ((void) 0) > > I think it's an absolutely clear no-go to add new branches to > these. They're *really* hot already, and this is going to sprinkle a > significant amount of new instructions over a lot of places. > Fixed, removed this. > > > > +/* > > + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file data must > > + * be read. > > + */ > > +#define SET_RAWBUF_FOR_LOAD() \ > > +{ \ > > + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \ > > + uint32 cur_block_pos; \ > > + /* \ > > + * Mark the previous block as completed, worker can start copying this data. \ > > + */ \ > > + if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr && \ > > + copy_buff_state.data_blk_ptr->curr_blk_completed == false) \ > > + copy_buff_state.data_blk_ptr->curr_blk_completed = true; \ > > + \ > > + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \ > > + cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \ > > + copy_buff_state.curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; \ > > + \ > > + if (!copy_buff_state.data_blk_ptr) \ > > + { \ > > + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \ > > + chunk_first_block = cur_block_pos; \ > > + } \ > > + else if (need_data == false) \ > > + copy_buff_state.data_blk_ptr->following_block = cur_block_pos; \ > > + \ > > + cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \ > > + copy_buff_state.copy_raw_buf = cstate->raw_buf; \ > > +} > > + > > +/* > > + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory. > > + */ > > +#define END_CHUNK_PARALLEL_COPY() \ > > +{ \ > > + if (!IsHeaderLine()) \ > > + { \ > > + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \ > > + ChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; \ > > + if (copy_buff_state.chunk_size) \ > > + { \ > > + ChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \ > > + /* \ > > + * If raw_buf_ptr is zero, unprocessed_chunk_parts would have been \ > > + * incremented in SEEK_COPY_BUFF_POS. This will happen if the whole \ > > + * chunk finishes at the end of the current block. If the \ > > + * new_line_size > raw_buf_ptr, then the new block has only new line \ > > + * char content. The unprocessed count should not be increased in \ > > + * this case. \ > > + */ \ > > + if (copy_buff_state.raw_buf_ptr != 0 && \ > > + copy_buff_state.raw_buf_ptr > new_line_size) \ > > + pg_atomic_add_fetch_u32(©_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts, 1);\ > > + \ > > + /* Update chunk size. */ \ > > + pg_atomic_write_u32(&chunkInfo->chunk_size, copy_buff_state.chunk_size); \ > > + pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED); \ > > + elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d", \ > > + chunk_pos, copy_buff_state.chunk_size); \ > > + pcshared_info->populated++; \ > > + } \ > > + else if (new_line_size) \ > > + { \ > > + /* \ > > + * This means only new line char, empty record should be \ > > + * inserted. \ > > + */ \ > > + ChunkBoundary *chunkInfo; \ > > + chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, \ > > + CHUNK_LEADER_POPULATED); \ > > + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \ > > + elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d", \ > > + chunkInfo->start_offset, chunk_pos, \ > > + pg_atomic_read_u32(&chunkInfo->chunk_size)); \ > > + pcshared_info->populated++; \ > > + } \ > > + }\ > > + \ > > + /*\ > > + * All of the read data is processed, reset index & len. In the\ > > + * subsequent read, we will get a new block and copy data in to the\ > > + * new block.\ > > + */\ > > + if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\ > > + {\ > > + cstate->raw_buf_index = 0;\ > > + cstate->raw_buf_len = 0;\ > > + }\ > > + else\ > > + cstate->raw_buf_len = copy_buff_state.copy_buf_len;\ > > +} > > Why are these macros? They are way way way above a length where that > makes any sort of sense. > Converted these macros to functions. Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
Attachment
pgsql-hackers by date: