Re: Parallel copy - Mailing list pgsql-hackers

From vignesh C
Subject Re: Parallel copy
Date
Msg-id CALDaNm3AaXUZLLHjxcEeiFLtDZbrXgSUQMhz2oBeO5_4JCm7yQ@mail.gmail.com
Whole thread Raw
In response to Re: Parallel copy  (Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>)
Responses Re: Parallel copy
List pgsql-hackers
On Wed, Jun 24, 2020 at 1:41 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
>
> Along with the review comments addressed
> patch(0006-Parallel-Copy-For-Binary-Format-Files.patch) also attaching
> all other latest series of patches(0001 to 0005) from [1], the order
> of applying patches is from 0001 to 0006.
>
> [1] https://www.postgresql.org/message-id/CALDaNm0H3N9gK7CMheoaXkO99g%3DuAPA93nSZXu0xDarPyPY6sg%40mail.gmail.com
>

Some comments:

+       movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index;
+
+       cstate->pcdata->curr_data_block->skip_bytes = movebytes;
+
+       data_block = &pcshared_info->data_blocks[block_pos];
+
+       if (movebytes > 0)
+               memmove(&data_block->data[0],
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
+                       movebytes);
we can create a local variable and use in place of
cstate->pcdata->curr_data_block.

+       if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
+               AdjustFieldInfo(cstate, 1);
+
+       memcpy(&fld_count,
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
sizeof(fld_count));
Should this be like below, as the remaining size can fit in current block:
       if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE)

+       if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1))
+       {
+               AdjustFieldInfo(cstate, 2);
+               *new_block_pos = pcshared_info->cur_block_pos;
+       }
Same like above.

+       movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index;
+
+       cstate->pcdata->curr_data_block->skip_bytes = movebytes;
+
+       data_block = &pcshared_info->data_blocks[block_pos];
+
+       if (movebytes > 0)
Instead of the above check, we can have an assert check for movebytes.

+       if (mode == 1)
+       {
+               cstate->pcdata->curr_data_block = data_block;
+               cstate->raw_buf_index = 0;
+       }
+       else if(mode == 2)
+       {
+               ParallelCopyDataBlock *prev_data_block = NULL;
+               prev_data_block = cstate->pcdata->curr_data_block;
+               prev_data_block->following_block = block_pos;
+               cstate->pcdata->curr_data_block = data_block;
+
+               if (prev_data_block->curr_blk_completed  == false)
+                       prev_data_block->curr_blk_completed = true;
+
+               cstate->raw_buf_index = 0;
+       }

This code is common for both, keep in common flow and remove if (mode == 1)
cstate->pcdata->curr_data_block = data_block;
cstate->raw_buf_index = 0;

+#define CHECK_FIELD_COUNT \
+{\
+       if (fld_count == -1) \
+       { \
+               if (IsParallelCopy() && \
+                       !IsLeader()) \
+                       return true; \
+               else if (IsParallelCopy() && \
+                       IsLeader()) \
+               { \
+                       if
(cstate->pcdata->curr_data_block->data[cstate->raw_buf_index +
sizeof(fld_count)] != 0) \
+                               ereport(ERROR, \
+
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \
+                                               errmsg("received copy
data after EOF marker"))); \
+                       return true; \
+               } \
We only copy sizeof(fld_count), Shouldn't we check fld_count !=
cstate->max_fields? Am I missing something here?

+       if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1))
+       {
+               AdjustFieldInfo(cstate, 2);
+               *new_block_pos = pcshared_info->cur_block_pos;
+       }
+
+       memcpy(&fld_size,
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
sizeof(fld_size));
+
+       cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_size);
+
+       fld_size = (int32) pg_ntoh32(fld_size);
+
+       if (fld_size == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                errmsg("unexpected EOF in COPY data")));
+
+       if (fld_size < -1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                errmsg("invalid field size")));
+
+       if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size)
+       {
+               cstate->raw_buf_index = cstate->raw_buf_index + fld_size;
+       }
We can keep the check like cstate->raw_buf_index + fld_size < ..., for
better readability and consistency.

+static pg_attribute_always_inline void
+CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
+       Oid typioparam, int32 typmod, uint32 *new_block_pos,
+       int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
+       ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
flinfo, typioparam & typmod is not used, we can remove the parameter.

+static pg_attribute_always_inline void
+CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
+       Oid typioparam, int32 typmod, uint32 *new_block_pos,
+       int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
+       ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
I felt this function need not be an inline function.

+               /* binary format */
+               /* for paralle copy leader, fill in the error
There are some typos, run spell check

+               /* raw_buf_index should never cross data block size,
+                * as the required number of data blocks would have
+                * been obtained in the above while loop.
+                */
There are few places, commenting style should be changed to postgres style

+       if (cstate->pcdata->curr_data_block == NULL)
+       {
+               block_pos = WaitGetFreeCopyBlock(pcshared_info);
+
+               cstate->pcdata->curr_data_block =
&pcshared_info->data_blocks[block_pos];
+
+               cstate->raw_buf_index = 0;
+
+               readbytes = CopyGetData(cstate,
&cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE);
+
+               elog(DEBUG1, "LEADER - bytes read from file %d", readbytes);
+
+               if (cstate->reached_eof)
+                       return true;
+       }
There are many empty lines, these are not required.


+       if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
+               AdjustFieldInfo(cstate, 1);
+
+       memcpy(&fld_count,
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
sizeof(fld_count));
+
+       fld_count = (int16) pg_ntoh16(fld_count);
+
+       CHECK_FIELD_COUNT;
+
+       cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count);
+       new_block_pos = pcshared_info->cur_block_pos;
You can run pg_indent once for the changes.

+       if (mode == 1)
+       {
+               cstate->pcdata->curr_data_block = data_block;
+               cstate->raw_buf_index = 0;
+       }
+       else if(mode == 2)
+       {
Could use macros for 1 & 2 for better readability.

+               if (tuple_start_info_ptr->block_id ==
tuple_end_info_ptr->block_id)
+               {
+                       elog(DEBUG1,"LEADER - tuple lies in a single
data block");
+
+                       *line_size = tuple_end_info_ptr->offset -
tuple_start_info_ptr->offset + 1;
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts,
1);
+               }
+               else
+               {
+                       uint32 following_block_id =
pcshared_info->data_blocks[tuple_start_info_ptr->block_id].following_block;
+
+                       elog(DEBUG1,"LEADER - tuple is spread across
data blocks");
+
+                       *line_size = DATA_BLOCK_SIZE -
tuple_start_info_ptr->offset -
+
pcshared_info->data_blocks[tuple_start_info_ptr->block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts,
1);
+
+                       while (following_block_id !=
tuple_end_info_ptr->block_id)
+                       {
+                               *line_size = *line_size +
DATA_BLOCK_SIZE -
pcshared_info->data_blocks[following_block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+                               following_block_id =
pcshared_info->data_blocks[following_block_id].following_block;
+
+                               if (following_block_id == -1)
+                                       break;
+                       }
+
+                       if (following_block_id != -1)
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+                       *line_size = *line_size +
tuple_end_info_ptr->offset + 1;
+               }
We could calculate the size as we parse and identify one record, if we
do that way this can be removed.

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: Cache lookup errors with functions manipulation object addresses
Next
From: Bharath Rupireddy
Date:
Subject: Issue with cancel_before_shmem_exit while searching to remove a particular registered exit callbacks