Re: Parallel copy - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | CALDaNm2yhZQX57PTqOoWN2bgkXzK=OgAyJAB=bN6HZMJVXUMjg@mail.gmail.com Whole thread Raw |
In response to | Re: Parallel copy (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: Parallel copy
|
List | pgsql-hackers |
Thanks for the comments Amit. On Wed, Jul 15, 2020 at 10:34 AM Amit Kapila <amit.kapila16@gmail.com> wrote: > > Few comments: > ==================== > 0001-Copy-code-readjustment-to-support-parallel-copy > > I am not sure converting the code to macros is a good idea, it makes > this code harder to read. Also, there are a few changes which I am > not sure are necessary. > 1. > +/* > + * CLEAR_EOL_FROM_COPIED_DATA - Clear EOL from the copied data. > + */ > +#define CLEAR_EOL_FROM_COPIED_DATA(copy_line_data, copy_line_pos, > copy_line_size) \ > +{ \ > + /* \ > + * If we didn't hit EOF, then we must have transferred the EOL marker \ > + * to line_buf along with the data. Get rid of it. \ > + */ \ > + switch (cstate->eol_type) \ > + { \ > + case EOL_NL: \ > + Assert(copy_line_size >= 1); \ > + Assert(copy_line_data[copy_line_pos - 1] == '\n'); \ > + copy_line_data[copy_line_pos - 1] = '\0'; \ > + copy_line_size--; \ > + break; \ > + case EOL_CR: \ > + Assert(copy_line_size >= 1); \ > + Assert(copy_line_data[copy_line_pos - 1] == '\r'); \ > + copy_line_data[copy_line_pos - 1] = '\0'; \ > + copy_line_size--; \ > + break; \ > + case EOL_CRNL: \ > + Assert(copy_line_size >= 2); \ > + Assert(copy_line_data[copy_line_pos - 2] == '\r'); \ > + Assert(copy_line_data[copy_line_pos - 1] == '\n'); \ > + copy_line_data[copy_line_pos - 2] = '\0'; \ > + copy_line_size -= 2; \ > + break; \ > + case EOL_UNKNOWN: \ > + /* shouldn't get here */ \ > + Assert(false); \ > + break; \ > + } \ > +} > > In the original code, we are using only len and buffer, here we are > using position, length/size and buffer. Is it really required or can > we do with just len and buffer? > Position is required so that we can have common code for parallel & non-parallel copy, in case of parallel copy position & length will differ as they can spread across multiple data blocks. Retained the variables as is. Changed the macro to function. > 2. > +/* > + * INCREMENTPROCESSED - Increment the lines processed. > + */ > +#define INCREMENTPROCESSED(processed) \ > +processed++; > + > +/* > + * GETPROCESSED - Get the lines processed. > + */ > +#define GETPROCESSED(processed) \ > +return processed; > + > > I don't like converting above to macros. I don't think converting > such things to macros will buy us much. > This macro will be extended to in 0003-Allow-copy-from-command-to-process-data-from-file.patch: +#define INCREMENTPROCESSED(processed) \ +{ \ + if (!IsParallelCopy()) \ + processed++; \ + else \ + pg_atomic_add_fetch_u64(&cstate->pcdata->pcshared_info->processed, 1); \ +} This need to be made to macro so that it can handle both parallel copy and non parallel copy. Retaining this as macro, if you insist I can move the change to 0003-Allow-copy-from-command-to-process-data-from-file.patch patch. > 0002-Framework-for-leader-worker-in-parallel-copy > 3. > /* > + * Copy data block information. > + */ > +typedef struct ParallelCopyDataBlock > > It is better to add a few comments atop this data structure to explain > how it is used? > Fixed. > 4. > + * ParallelCopyLineBoundary is common data structure between leader & worker, > + * this is protected by the following sequence in the leader & worker. > + * Leader should operate in the following order: > + * 1) update first_block, start_offset & cur_lineno in any order. > + * 2) update line_size. > + * 3) update line_state. > + * Worker should operate in the following order: > + * 1) read line_size. > + * 2) only one worker should choose one line for processing, this is handled by > + * using pg_atomic_compare_exchange_u32, worker will change the sate to > + * LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED. > + * 3) read first_block, start_offset & cur_lineno in any order. > + */ > +typedef struct ParallelCopyLineBoundary > > Here, you have mentioned how workers and leader should operate to make > sure access to the data is sane. However, you have not explained what > is the problem if they don't do so and it is not apparent to me. > Also, it is not very clear what is the purpose of this data structure > from comments. > Fixed > 5. > +/* > + * Circular queue used to store the line information. > + */ > +typedef struct ParallelCopyLineBoundaries > +{ > + /* Position for the leader to populate a line. */ > + uint32 leader_pos; > > I don't think the variable needs to be named as leader_pos, it is okay > to name it is as 'pos' as the comment above it explains its usage. > Fixed > 7. > +#define DATA_BLOCK_SIZE RAW_BUF_SIZE > +#define RINGSIZE (10 * 1000) > +#define MAX_BLOCKS_COUNT 1000 > +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */ > > It would be good if you can write a few comments to explain why you > have chosen these default values. > Fixed > 8. > ParallelCopyCommonKeyData, shall we name this as > SerializedParallelCopyState or something like that? For example, see > SerializedSnapshotData which has been used to pass snapshot > information to passed to workers. > Renamed as suggested > 9. > +CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData > *shared_cstate) > > If you agree with point-8, then let's name this as > SerializeParallelCopyState. See, if there is more usage of similar > types in the patch then lets change those as well. > Fixed > 10. > + * in the DSM. The specified number of workers will then be launched. > + * > + */ > +static ParallelContext* > +BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) > > No need of an extra line with only '*' in the above multi-line comment. > Fixed > 11. > BeginParallelCopy(..) > { > .. > + EstimateLineKeysStr(pcxt, cstate->null_print); > + EstimateLineKeysStr(pcxt, cstate->null_print_client); > + EstimateLineKeysStr(pcxt, cstate->delim); > + EstimateLineKeysStr(pcxt, cstate->quote); > + EstimateLineKeysStr(pcxt, cstate->escape); > .. > } > > Why do we need to do this separately for each variable of cstate? > Can't we serialize it along with other members of > SerializeParallelCopyState (a new name for ParallelCopyCommonKeyData)? > These are variable length string variables, I felt we will not be able to serialize along with other members and need to be serialized separately. > 12. > BeginParallelCopy(..) > { > .. > + LaunchParallelWorkers(pcxt); > + if (pcxt->nworkers_launched == 0) > + { > + EndParallelCopy(pcxt); > + elog(WARNING, > + "No workers available, copy will be run in non-parallel mode"); > .. > } > > I don't see the need to issue a WARNING if we are not able to launch > workers. We don't do that for other cases where we fail to launch > workers. > Fixed > 13. > +} > +/* > + * ParallelCopyMain - > .. > > +} > +/* > + * ParallelCopyLeader > > One line space is required before starting a new function. > Fixed Please find the updated patch with the fixes included. Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
Attachment
- 0001-Copy-code-readjustment-to-support-parallel-copy.patch
- 0002-Framework-for-leader-worker-in-parallel-copy.patch
- 0003-Allow-copy-from-command-to-process-data-from-file-ST.patch
- 0004-Documentation-for-parallel-copy.patch
- 0005-Tests-for-parallel-copy.patch
- 0006-Parallel-Copy-For-Binary-Format-Files.patch
pgsql-hackers by date: