Re: Parallel copy - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | CALDaNm3GaZyYPpGu-PpF0SEkJg-eaW3TboHxpxJ-2criv2j_eA@mail.gmail.com Whole thread Raw |
In response to | Re: Parallel copy (Greg Nancarrow <gregn4422@gmail.com>) |
Responses |
Re: Parallel copy
|
List | pgsql-hackers |
Thanks Greg for reviewing the patch. Please find my thoughts for your comments. On Wed, Aug 12, 2020 at 9:10 AM Greg Nancarrow <gregn4422@gmail.com> wrote: > I have done some ad-hoc testing of the patch using parallel copies from text/csv/binary files and have not yet struck anyexecution problems other than some option validation and associated error messages on boundary cases. > > One general question that I have: is there a user benefit (over the normal non-parallel COPY) to allowing "COPY ... FROM... WITH (PARALLEL 1)"? > There will be marginal improvement as worker only need to process the data, need not do the file reading, file reading would have been done by the main process. The real improvement can be seen from 2 workers onwards. > > My following comments are broken down by patch: > > (1) v2-0001-Copy-code-readjustment-to-support-parallel-copy.patch > > (i) Whilst I can't entirely blame these patches for it (as they are following what is already there), I can't help noticingthe use of numerous macros in src/backend/commands/copy.c which paste in multiple lines of code in various places. > It's getting a little out-of-hand. Surely the majority of these would be best inline functions instead? > Perhaps hasn't been done because too many parameters need to be passed - thoughts? > I felt they have used macros mainly because it has a tight loop and having macros gives better performance. I have added the macros CLEAR_EOL_LINE, INCREMENTPROCESSED & GETPROCESSED as there will be slight difference in parallel copy & non parallel copy for these. In the remaining patches the macor will be extended to include parallel copy logic. Instead of having checks in the core logic, thought of keeping as macros so that the readability is good. > > (2) v2-0002-Framework-for-leader-worker-in-parallel-copy.patch > > (i) minor point: there are some tabbing/spacing issues in this patch (and the other patches), affecting alignment. > e.g. mixed tabs/spaces and misalignment in PARALLEL_COPY_KEY_xxx definitions > Fixed > (ii) > > +/* > + * Each worker will be allocated WORKER_CHUNK_COUNT of records from DSM data > + * block to process to avoid lock contention. This value should be mode of > + * RINGSIZE, as wrap around cases is currently not handled while selecting the > + * WORKER_CHUNK_COUNT by the worker. > + */ > +#define WORKER_CHUNK_COUNT 50 > > > "This value should be mode of RINGSIZE ..." > > -> typo: mode (mod? should evenly divide into RINGSIZE?) Fixed, changed it to divisible by. > (iii) > + * using pg_atomic_compare_exchange_u32, worker will change the sate to > > ->typo: sate (should be "state") Fixed > (iv) > > + errmsg("parallel option supported only for copy from"), > > -> suggest change to: errmsg("parallel option is supported only for COPY FROM"), > Fixed > (v) > > + errno = 0; /* To distinguish success/failure after call */ > + val = strtol(str, &endptr, 10); > + > + /* Check for various possible errors */ > + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) > + || (errno != 0 && val == 0) || > + *endptr) > + ereport(ERROR, > + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), > + errmsg("improper use of argument to option \"%s\"", > + defel->defname), > + parser_errposition(pstate, defel->location))); > + > + if (endptr == str) > + ereport(ERROR, > + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), > + errmsg("no digits were found in argument to option \"%s\"", > + defel->defname), > + parser_errposition(pstate, defel->location))); > + > + cstate->nworkers = (int) val; > + > + if (cstate->nworkers <= 0) > + ereport(ERROR, > + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), > + errmsg("argument to option \"%s\" must be a positive integer greater thanzero", > + defel->defname), > + parser_errposition(pstate, defel->location))); > > > I think this validation code needs to be improved, including the error messages (e.g. when can a "positive integer" NOTbe greater than zero?) > > There is some overlap in the "no digits were found" case between the two conditions above, depending, for example, if theargument is quoted. > Also, "improper use of argument to option" sounds a bit odd and vague to me. > Finally, not range checking before casting long to int can lead to allowing out-of-range int values like in the followingcase: > > test=# copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2147483648'); > ERROR: argument to option "parallel" must be a positive integer greater than zero > LINE 1: copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2... > ^ > BUT the following is allowed... > > test=# copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2147483649'); > COPY 1000000 > > > I'd suggest to change the above validation code to do similar validation to that for the CREATE TABLE parallel_workersstorage parameter (case RELOPT_TYPE_INT in reloptions.c). Like that code, wouldn't it be best to range-checkthe integer option value to be within a reasonable range, say 1 to 1024, with a corresponding errdetail messageif possible? > Fixed, changed as suggested. > (3) v2-0003-Allow-copy-from-command-to-process-data-from-file.patch > > (i) > > Patch comment says: > > "This feature allows the copy from to leverage multiple CPUs in order to copy > data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM > command where the user can specify the number of workers that can be used > to perform the COPY FROM command. Specifying zero as number of workers will > disable parallelism." > > BUT - the changes to ProcessCopyOptions() specified in "v2-0002-Framework-for-leader-worker-in-parallel-copy.patch" donot allow zero workers to be specified - you get an error in that case. Patch comment should be updated accordingly. > Removed "Specifying zero as number of workers will disable parallelism". As the new value is range from 1 to 1024. > (ii) > > #define GETPROCESSED(processed) \ > -return processed; > +if (!IsParallelCopy()) \ > + return processed; \ > +else \ > + return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); > + > > I think GETPROCESSED would be better named "RETURNPROCESSED". > Fixed. > (iii) > > The below comment seems out- of-date with the current code - is it referring to the loop embedded at the bottom of thecurrent loop that the comment is within? > > + /* > + * There is a possibility that the above loop has come out because > + * data_blk_ptr->curr_blk_completed is set, but dataSize read might > + * be an old value, if data_blk_ptr->curr_blk_completed and the line is > + * completed, line_size will be set. Read the line_size again to be > + * sure if it is complete or partial block. > + */ > Updated, it is referring to the embedded loop at the bottom of the current loop. > (iv) > > I may be wrong here, but in the following block of code, isn't there a window of opportunity (however small) in which theline_state might be updated (LINE_WORKER_PROCESSED) by another worker just AFTER pg_atomic_read_u32() returns the currentline_state which is put into curr_line_state, such that a write_pos update might be missed? And then a race-conditionexists for reading/setting line_size (since line_size gets atomically set after line_state is set)? > If I am wrong in thinking this synchronization might not be correct, maybe the comments could be improved here to explainhow this code is safe in that respect. > > > + /* Get the current line information. */ > + lineInfo = &pcshared_info->line_boundaries.ring[write_pos]; > + curr_line_state = pg_atomic_read_u32(&lineInfo->line_state); > + if ((write_pos % WORKER_CHUNK_COUNT == 0) && > + (curr_line_state == LINE_WORKER_PROCESSED || > + curr_line_state == LINE_WORKER_PROCESSING)) > + { > + pcdata->worker_processed_pos = write_pos; > + write_pos = (write_pos + WORKER_CHUNK_COUNT) % RINGSIZE; > + continue; > + } > + > + /* Get the size of this line. */ > + dataSize = pg_atomic_read_u32(&lineInfo->line_size); > + > + if (dataSize != 0) /* If not an empty line. */ > + { > + /* Get the block information. */ > + data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block]; > + > + if (!data_blk_ptr->curr_blk_completed && (dataSize == -1)) > + { > + /* Wait till the current line or block is added. */ > + COPY_WAIT_TO_PROCESS() > + continue; > + } > + } > + > + /* Make sure that no worker has consumed this element. */ > + if (pg_atomic_compare_exchange_u32(&lineInfo->line_state, > + &line_state, LINE_WORKER_PROCESSING)) > + break; > This is not possible because of pg_atomic_compare_exchange_u32, this will succeed only for one of the workers whose line_state is LINE_LEADER_POPULATED, for other workers it will fail. This is explained in detail above ParallelCopyLineBoundary. > > (4) v2-0004-Documentation-for-parallel-copy.patch > > (i) I think that it is necessary to mention the "max_worker_processes" option in the description of the COPY statementPARALLEL option. > > For example, something like: > > + Perform <command>COPY FROM</command> in parallel using <replaceable > + class="parameter"> integer</replaceable> background workers. Please > + note that it is not guaranteed that the number of parallel workers > + specified in <replaceable class="parameter">integer</replaceable> will > + be used during execution. It is possible for a copy to run with fewer > + workers than specified, or even with no workers at all (for example, > + due to the setting of max_worker_processes). This option is allowed > + only in <command>COPY FROM</command>. > Fixed. > (5) v2-0005-Tests-for-parallel-copy.patch > > (i) None of the provided tests seem to test beyond "PARALLEL 2" > I intentionally ran with 1 parallel worker, because when you specify more than 1 parallel worker the order of record insertion can vary & there may be random failures. > > (6) v2-0006-Parallel-Copy-For-Binary-Format-Files.patch > > (i) In the ParallelCopyFrom() function, "cstate->raw_buf" is pfree()d: > > + /* raw_buf is not used in parallel copy, instead data blocks are used.*/ > + pfree(cstate->raw_buf); > raw_buf is not used in parallel copy, instead raw_buf will be pointing to shared memory data blocks. This memory was allocated as part of BeginCopyFrom, uptil this point we cannot be 100% sure as copy can be performed sequentially like in case max_worker_processes is not available, if it switches to sequential mode raw_buf will be used while performing copy operation. At this place we can safely free this memory that was allocated. > This comment doesn't seem to be entirely true. > At least for text/csv file COPY FROM, cstate->raw_buf is subsequently referenced in the SetRawBufForLoad() function, whichis called by CopyReadLineText(): > > cur_data_blk_ptr = (cstate->raw_buf) ? &pcshared_info->data_blocks[cur_block_pos] : NULL; > > So I think cstate->raw_buf should be set to NULL after being pfree()d, and the comment fixed/adjusted. > > > (ii) This patch adds some macros (involving parallel copy checks) AFTER the comment: > > /* End parallel copy Macros */ Fixed, moved the macros above the comment. I have attached new set of patches with the fixes. Thoughts? Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
Attachment
- v3-0001-Copy-code-readjustment-to-support-parallel-copy.patch
- v3-0002-Framework-for-leader-worker-in-parallel-copy.patch
- v3-0003-Allow-copy-from-command-to-process-data-from-file.patch
- v3-0004-Documentation-for-parallel-copy.patch
- v3-0005-Tests-for-parallel-copy.patch
- v3-0006-Parallel-Copy-For-Binary-Format-Files.patch
pgsql-hackers by date: