Re: Parallel copy - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | CALDaNm3AaXUZLLHjxcEeiFLtDZbrXgSUQMhz2oBeO5_4JCm7yQ@mail.gmail.com Whole thread Raw |
In response to | Re: Parallel copy (Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>) |
Responses |
Re: Parallel copy
|
List | pgsql-hackers |
On Wed, Jun 24, 2020 at 1:41 PM Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> wrote: > > Along with the review comments addressed > patch(0006-Parallel-Copy-For-Binary-Format-Files.patch) also attaching > all other latest series of patches(0001 to 0005) from [1], the order > of applying patches is from 0001 to 0006. > > [1] https://www.postgresql.org/message-id/CALDaNm0H3N9gK7CMheoaXkO99g%3DuAPA93nSZXu0xDarPyPY6sg%40mail.gmail.com > Some comments: + movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index; + + cstate->pcdata->curr_data_block->skip_bytes = movebytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (movebytes > 0) + memmove(&data_block->data[0], &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], + movebytes); we can create a local variable and use in place of cstate->pcdata->curr_data_block. + if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) + AdjustFieldInfo(cstate, 1); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); Should this be like below, as the remaining size can fit in current block: if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE) + if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1)) + { + AdjustFieldInfo(cstate, 2); + *new_block_pos = pcshared_info->cur_block_pos; + } Same like above. + movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index; + + cstate->pcdata->curr_data_block->skip_bytes = movebytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (movebytes > 0) Instead of the above check, we can have an assert check for movebytes. + if (mode == 1) + { + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = 0; + } + else if(mode == 2) + { + ParallelCopyDataBlock *prev_data_block = NULL; + prev_data_block = cstate->pcdata->curr_data_block; + prev_data_block->following_block = block_pos; + cstate->pcdata->curr_data_block = data_block; + + if (prev_data_block->curr_blk_completed == false) + prev_data_block->curr_blk_completed = true; + + cstate->raw_buf_index = 0; + } This code is common for both, keep in common flow and remove if (mode == 1) cstate->pcdata->curr_data_block = data_block; cstate->raw_buf_index = 0; +#define CHECK_FIELD_COUNT \ +{\ + if (fld_count == -1) \ + { \ + if (IsParallelCopy() && \ + !IsLeader()) \ + return true; \ + else if (IsParallelCopy() && \ + IsLeader()) \ + { \ + if (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index + sizeof(fld_count)] != 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return true; \ + } \ We only copy sizeof(fld_count), Shouldn't we check fld_count != cstate->max_fields? Am I missing something here? + if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1)) + { + AdjustFieldInfo(cstate, 2); + *new_block_pos = pcshared_info->cur_block_pos; + } + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_size); + + fld_size = (int32) pg_ntoh32(fld_size); + + if (fld_size == 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + if (fld_size < -1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid field size"))); + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + cstate->raw_buf_index = cstate->raw_buf_index + fld_size; + } We can keep the check like cstate->raw_buf_index + fld_size < ..., for better readability and consistency. +static pg_attribute_always_inline void +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, uint32 *new_block_pos, + int m, ParallelCopyTupleInfo *tuple_start_info_ptr, + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size) flinfo, typioparam & typmod is not used, we can remove the parameter. +static pg_attribute_always_inline void +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, uint32 *new_block_pos, + int m, ParallelCopyTupleInfo *tuple_start_info_ptr, + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size) I felt this function need not be an inline function. + /* binary format */ + /* for paralle copy leader, fill in the error There are some typos, run spell check + /* raw_buf_index should never cross data block size, + * as the required number of data blocks would have + * been obtained in the above while loop. + */ There are few places, commenting style should be changed to postgres style + if (cstate->pcdata->curr_data_block == NULL) + { + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[block_pos]; + + cstate->raw_buf_index = 0; + + readbytes = CopyGetData(cstate, &cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE); + + elog(DEBUG1, "LEADER - bytes read from file %d", readbytes); + + if (cstate->reached_eof) + return true; + } There are many empty lines, these are not required. + if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) + AdjustFieldInfo(cstate, 1); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count); + new_block_pos = pcshared_info->cur_block_pos; You can run pg_indent once for the changes. + if (mode == 1) + { + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = 0; + } + else if(mode == 2) + { Could use macros for 1 & 2 for better readability. + if (tuple_start_info_ptr->block_id == tuple_end_info_ptr->block_id) + { + elog(DEBUG1,"LEADER - tuple lies in a single data block"); + + *line_size = tuple_end_info_ptr->offset - tuple_start_info_ptr->offset + 1; + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts, 1); + } + else + { + uint32 following_block_id = pcshared_info->data_blocks[tuple_start_info_ptr->block_id].following_block; + + elog(DEBUG1,"LEADER - tuple is spread across data blocks"); + + *line_size = DATA_BLOCK_SIZE - tuple_start_info_ptr->offset - + pcshared_info->data_blocks[tuple_start_info_ptr->block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts, 1); + + while (following_block_id != tuple_end_info_ptr->block_id) + { + *line_size = *line_size + DATA_BLOCK_SIZE - pcshared_info->data_blocks[following_block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, 1); + + following_block_id = pcshared_info->data_blocks[following_block_id].following_block; + + if (following_block_id == -1) + break; + } + + if (following_block_id != -1) + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, 1); + + *line_size = *line_size + tuple_end_info_ptr->offset + 1; + } We could calculate the size as we parse and identify one record, if we do that way this can be removed. Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: