Re: Parallel copy - Mailing list pgsql-hackers
From | Bharath Rupireddy |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | CALj2ACWsbCGENq7JV9Sa717FHRjrWeEJTpcdnysWh0Kih=S8xw@mail.gmail.com Whole thread Raw |
In response to | Re: Parallel copy (vignesh C <vignesh21@gmail.com>) |
Responses |
Re: Parallel copy
|
List | pgsql-hackers |
Thanks Vignesh for the review. Addressed the comments in 0006 patch. > > we can create a local variable and use in place of > cstate->pcdata->curr_data_block. Done. > + if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) > + AdjustFieldInfo(cstate, 1); > + > + memcpy(&fld_count, > &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], > sizeof(fld_count)); > Should this be like below, as the remaining size can fit in current block: > if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE) > > + if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1)) > + { > + AdjustFieldInfo(cstate, 2); > + *new_block_pos = pcshared_info->cur_block_pos; > + } > Same like above. Yes you are right. Changed. > > + movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index; > + > + cstate->pcdata->curr_data_block->skip_bytes = movebytes; > + > + data_block = &pcshared_info->data_blocks[block_pos]; > + > + if (movebytes > 0) > Instead of the above check, we can have an assert check for movebytes. No, we can't use assert here. For the edge case where the current data block is full to the size DATA_BLOCK_SIZE, then movebytes will be 0, but we need to get a new data block. We avoid memmove by having movebytes>0 check. > + if (mode == 1) > + { > + cstate->pcdata->curr_data_block = data_block; > + cstate->raw_buf_index = 0; > + } > + else if(mode == 2) > + { > + ParallelCopyDataBlock *prev_data_block = NULL; > + prev_data_block = cstate->pcdata->curr_data_block; > + prev_data_block->following_block = block_pos; > + cstate->pcdata->curr_data_block = data_block; > + > + if (prev_data_block->curr_blk_completed == false) > + prev_data_block->curr_blk_completed = true; > + > + cstate->raw_buf_index = 0; > + } > > This code is common for both, keep in common flow and remove if (mode == 1) > cstate->pcdata->curr_data_block = data_block; > cstate->raw_buf_index = 0; > Done. > +#define CHECK_FIELD_COUNT \ > +{\ > + if (fld_count == -1) \ > + { \ > + if (IsParallelCopy() && \ > + !IsLeader()) \ > + return true; \ > + else if (IsParallelCopy() && \ > + IsLeader()) \ > + { \ > + if > (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index + > sizeof(fld_count)] != 0) \ > + ereport(ERROR, \ > + > (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ > + errmsg("received copy > data after EOF marker"))); \ > + return true; \ > + } \ > We only copy sizeof(fld_count), Shouldn't we check fld_count != > cstate->max_fields? Am I missing something here? fld_count != cstate->max_fields check is done after the above checks. > + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) > + { > + cstate->raw_buf_index = cstate->raw_buf_index + fld_size; > + } > We can keep the check like cstate->raw_buf_index + fld_size < ..., for > better readability and consistency. > I think this is okay. It gives a good meaning that available bytes in the current data block is greater or equal to fld_size then, the tuple lies in the current data block. > +static pg_attribute_always_inline void > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, > + Oid typioparam, int32 typmod, uint32 *new_block_pos, > + int m, ParallelCopyTupleInfo *tuple_start_info_ptr, > + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size) > flinfo, typioparam & typmod is not used, we can remove the parameter. > Done. > +static pg_attribute_always_inline void > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, > + Oid typioparam, int32 typmod, uint32 *new_block_pos, > + int m, ParallelCopyTupleInfo *tuple_start_info_ptr, > + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size) > I felt this function need not be an inline function. Yes. Changed. > > + /* binary format */ > + /* for paralle copy leader, fill in the error > There are some typos, run spell check Done. > > + /* raw_buf_index should never cross data block size, > + * as the required number of data blocks would have > + * been obtained in the above while loop. > + */ > There are few places, commenting style should be changed to postgres style Changed. > > + if (cstate->pcdata->curr_data_block == NULL) > + { > + block_pos = WaitGetFreeCopyBlock(pcshared_info); > + > + cstate->pcdata->curr_data_block = > &pcshared_info->data_blocks[block_pos]; > + > + cstate->raw_buf_index = 0; > + > + readbytes = CopyGetData(cstate, > &cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE); > + > + elog(DEBUG1, "LEADER - bytes read from file %d", readbytes); > + > + if (cstate->reached_eof) > + return true; > + } > There are many empty lines, these are not required. > Removed. > > + > + fld_count = (int16) pg_ntoh16(fld_count); > + > + CHECK_FIELD_COUNT; > + > + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count); > + new_block_pos = pcshared_info->cur_block_pos; > You can run pg_indent once for the changes. > I ran pg_indent and observed that there are many places getting modified by pg_indent. If we need to run pg_indet on copy.c for parallel copy alone, then first, we need to run on plane copy.c and take those changes and then run for all parallel copy files. I think we better run pg_indent, for all the parallel copy patches once and for all, maybe just before we kind of finish up all the code reviews. > + if (mode == 1) > + { > + cstate->pcdata->curr_data_block = data_block; > + cstate->raw_buf_index = 0; > + } > + else if(mode == 2) > + { > Could use macros for 1 & 2 for better readability. Done. > > + > + if (following_block_id == -1) > + break; > + } > + > + if (following_block_id != -1) > + > pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, > 1); > + > + *line_size = *line_size + > tuple_end_info_ptr->offset + 1; > + } > We could calculate the size as we parse and identify one record, if we > do that way this can be removed. > Done. With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com
Attachment
- 0001-Copy-code-readjustment-to-support-parallel-copy.patch
- 0002-Framework-for-leader-worker-in-parallel-copy.patch
- 0003-Allow-copy-from-command-to-process-data-from-file-ST.patch
- 0004-Documentation-for-parallel-copy.patch
- 0005-Tests-for-parallel-copy.patch
- 0006-Parallel-Copy-For-Binary-Format-Files.patch
pgsql-hackers by date: