Re: Parallel copy - Mailing list pgsql-hackers

From Bharath Rupireddy
Subject Re: Parallel copy
Date
Msg-id CALj2ACWsbCGENq7JV9Sa717FHRjrWeEJTpcdnysWh0Kih=S8xw@mail.gmail.com
Whole thread Raw
In response to Re: Parallel copy  (vignesh C <vignesh21@gmail.com>)
Responses Re: Parallel copy
List pgsql-hackers
Thanks Vignesh for the review. Addressed the comments in 0006 patch.

>
> we can create a local variable and use in place of
> cstate->pcdata->curr_data_block.

Done.

> +       if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
> +               AdjustFieldInfo(cstate, 1);
> +
> +       memcpy(&fld_count,
> &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
> sizeof(fld_count));
> Should this be like below, as the remaining size can fit in current block:
>        if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE)
>
> +       if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1))
> +       {
> +               AdjustFieldInfo(cstate, 2);
> +               *new_block_pos = pcshared_info->cur_block_pos;
> +       }
> Same like above.

Yes you are right. Changed.

>
> +       movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index;
> +
> +       cstate->pcdata->curr_data_block->skip_bytes = movebytes;
> +
> +       data_block = &pcshared_info->data_blocks[block_pos];
> +
> +       if (movebytes > 0)
> Instead of the above check, we can have an assert check for movebytes.

No, we can't use assert here. For the edge case where the current data
block is full to the size DATA_BLOCK_SIZE, then movebytes will be 0,
but we need to get a new data block. We avoid memmove by having
movebytes>0 check.

> +       if (mode == 1)
> +       {
> +               cstate->pcdata->curr_data_block = data_block;
> +               cstate->raw_buf_index = 0;
> +       }
> +       else if(mode == 2)
> +       {
> +               ParallelCopyDataBlock *prev_data_block = NULL;
> +               prev_data_block = cstate->pcdata->curr_data_block;
> +               prev_data_block->following_block = block_pos;
> +               cstate->pcdata->curr_data_block = data_block;
> +
> +               if (prev_data_block->curr_blk_completed  == false)
> +                       prev_data_block->curr_blk_completed = true;
> +
> +               cstate->raw_buf_index = 0;
> +       }
>
> This code is common for both, keep in common flow and remove if (mode == 1)
> cstate->pcdata->curr_data_block = data_block;
> cstate->raw_buf_index = 0;
>

Done.

> +#define CHECK_FIELD_COUNT \
> +{\
> +       if (fld_count == -1) \
> +       { \
> +               if (IsParallelCopy() && \
> +                       !IsLeader()) \
> +                       return true; \
> +               else if (IsParallelCopy() && \
> +                       IsLeader()) \
> +               { \
> +                       if
> (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index +
> sizeof(fld_count)] != 0) \
> +                               ereport(ERROR, \
> +
> (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \
> +                                               errmsg("received copy
> data after EOF marker"))); \
> +                       return true; \
> +               } \
> We only copy sizeof(fld_count), Shouldn't we check fld_count !=
> cstate->max_fields? Am I missing something here?

fld_count != cstate->max_fields check is done after the above checks.

> +       if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size)
> +       {
> +               cstate->raw_buf_index = cstate->raw_buf_index + fld_size;
> +       }
> We can keep the check like cstate->raw_buf_index + fld_size < ..., for
> better readability and consistency.
>

I think this is okay. It gives a good meaning that available bytes in
the current data block is greater or equal to fld_size then, the tuple
lies in the current data block.

> +static pg_attribute_always_inline void
> +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
> +       Oid typioparam, int32 typmod, uint32 *new_block_pos,
> +       int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
> +       ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
> flinfo, typioparam & typmod is not used, we can remove the parameter.
>

Done.

> +static pg_attribute_always_inline void
> +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
> +       Oid typioparam, int32 typmod, uint32 *new_block_pos,
> +       int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
> +       ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
> I felt this function need not be an inline function.

Yes. Changed.

>
> +               /* binary format */
> +               /* for paralle copy leader, fill in the error
> There are some typos, run spell check

Done.

>
> +               /* raw_buf_index should never cross data block size,
> +                * as the required number of data blocks would have
> +                * been obtained in the above while loop.
> +                */
> There are few places, commenting style should be changed to postgres style

Changed.

>
> +       if (cstate->pcdata->curr_data_block == NULL)
> +       {
> +               block_pos = WaitGetFreeCopyBlock(pcshared_info);
> +
> +               cstate->pcdata->curr_data_block =
> &pcshared_info->data_blocks[block_pos];
> +
> +               cstate->raw_buf_index = 0;
> +
> +               readbytes = CopyGetData(cstate,
> &cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE);
> +
> +               elog(DEBUG1, "LEADER - bytes read from file %d", readbytes);
> +
> +               if (cstate->reached_eof)
> +                       return true;
> +       }
> There are many empty lines, these are not required.
>

Removed.

>
> +
> +       fld_count = (int16) pg_ntoh16(fld_count);
> +
> +       CHECK_FIELD_COUNT;
> +
> +       cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count);
> +       new_block_pos = pcshared_info->cur_block_pos;
> You can run pg_indent once for the changes.
>

I ran pg_indent and observed that there are many places getting
modified by pg_indent. If we need to run pg_indet on copy.c for
parallel copy alone, then first, we need to run on plane copy.c and
take those changes and then run for all parallel copy files. I think
we better run pg_indent, for all the parallel copy patches once and
for all, maybe just before we kind of finish up all the code reviews.

> +       if (mode == 1)
> +       {
> +               cstate->pcdata->curr_data_block = data_block;
> +               cstate->raw_buf_index = 0;
> +       }
> +       else if(mode == 2)
> +       {
> Could use macros for 1 & 2 for better readability.

Done.

>
> +
> +                               if (following_block_id == -1)
> +                                       break;
> +                       }
> +
> +                       if (following_block_id != -1)
> +
> pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
> 1);
> +
> +                       *line_size = *line_size +
> tuple_end_info_ptr->offset + 1;
> +               }
> We could calculate the size as we parse and identify one record, if we
> do that way this can be removed.
>

Done.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com

Attachment

pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: pg_regress cleans up tablespace twice.
Next
From: Julien Rouhaud
Date:
Subject: Re: Wait profiling