Re: Parallel copy - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | CALDaNm2RAhNnT=dDRokH+1aJ-kGM4RX3EfADqGTnKRKAcj+SEw@mail.gmail.com Whole thread Raw |
In response to | Re: Parallel copy (Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>) |
Responses |
Re: Parallel copy
|
List | pgsql-hackers |
On Mon, Jun 15, 2020 at 4:39 PM Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> wrote: > > The above tests were run with the configuration attached config.txt, which is the same used for performance tests of csv/textfiles posted earlier in this mail chain. > > Request the community to take this patch up for review along with the parallel copy for csv/text file patches and providefeedback. > I had reviewed the patch, few comments: + + /* + * Parallel copy for binary formatted files + */ + ParallelCopyDataBlock *curr_data_block; + ParallelCopyDataBlock *prev_data_block; + uint32 curr_data_offset; + uint32 curr_block_pos; + ParallelCopyTupleInfo curr_tuple_start_info; + ParallelCopyTupleInfo curr_tuple_end_info; } CopyStateData; The new members added should be present in ParallelCopyData + if (cstate->curr_tuple_start_info.block_id == cstate->curr_tuple_end_info.block_id) + { + elog(DEBUG1,"LEADER - tuple lies in a single data block"); + + line_size = cstate->curr_tuple_end_info.offset - cstate->curr_tuple_start_info.offset + 1; + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_line_parts, 1); + } + else + { + uint32 following_block_id = pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].following_block; + + elog(DEBUG1,"LEADER - tuple is spread across data blocks"); + + line_size = DATA_BLOCK_SIZE - cstate->curr_tuple_start_info.offset - + pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_line_parts, 1); + + while (following_block_id != cstate->curr_tuple_end_info.block_id) + { + line_size = line_size + DATA_BLOCK_SIZE - pcshared_info->data_blocks[following_block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, 1); + + following_block_id = pcshared_info->data_blocks[following_block_id].following_block; + + if (following_block_id == -1) + break; + } + + if (following_block_id != -1) + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, 1); + + line_size = line_size + cstate->curr_tuple_end_info.offset + 1; + } line_size can be set as and when we process the tuple from CopyReadBinaryTupleLeader and this can be set at the end. That way the above code can be removed. + + /* + * Parallel copy for binary formatted files + */ + ParallelCopyDataBlock *curr_data_block; + ParallelCopyDataBlock *prev_data_block; + uint32 curr_data_offset; + uint32 curr_block_pos; + ParallelCopyTupleInfo curr_tuple_start_info; + ParallelCopyTupleInfo curr_tuple_end_info; } CopyStateData; curr_block_pos variable is present in ParallelCopyShmInfo, we could use it and remove from here. curr_data_offset, similar variable raw_buf_index is present in CopyStateData, we could use it and remove from here. + if (cstate->curr_data_offset + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) + { + ParallelCopyDataBlock *data_block = NULL; + uint8 movebytes = 0; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + movebytes = DATA_BLOCK_SIZE - cstate->curr_data_offset; + + cstate->curr_data_block->skip_bytes = movebytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (movebytes > 0) + memmove(&data_block->data[0], &cstate->curr_data_block->data[cstate->curr_data_offset], + movebytes); + + elog(DEBUG1, "LEADER - field count is spread across data blocks - moved %d bytes from current block %u to %u block", + movebytes, cstate->curr_block_pos, block_pos); + + readbytes = CopyGetData(cstate, &data_block->data[movebytes], 1, (DATA_BLOCK_SIZE - movebytes)); + + elog(DEBUG1, "LEADER - bytes read from file after field count is moved to next data block %d", readbytes); + + if (cstate->reached_eof) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + cstate->curr_data_block = data_block; + cstate->curr_data_offset = 0; + cstate->curr_block_pos = block_pos; + } This code is duplicate in CopyReadBinaryTupleLeader & CopyReadBinaryAttributeLeader. We could make a function and re-use. +/* + * CopyReadBinaryAttributeWorker - leader identifies boundaries/offsets + * for each attribute/column, it moves on to next data block if the + * attribute/column is spread across data blocks. + */ +static pg_attribute_always_inline Datum +CopyReadBinaryAttributeWorker(CopyState cstate, int column_no, + FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull) +{ + int32 fld_size; + Datum result; column_no is not used, it can be removed + if (fld_count == -1) + { + /* + * Received EOF marker. In a V3-protocol copy, wait for the + * protocol-level EOF, and complain if it doesn't come + * immediately. This ensures that we correctly handle CopyFail, + * if client chooses to send that now. + * + * Note that we MUST NOT try to read more data in an old-protocol + * copy, since there is no protocol-level EOF marker then. We + * could go either way for copy from file, but choose to throw + * error if there's data after the EOF marker, for consistency + * with the new-protocol case. + */ + char dummy; + + if (cstate->copy_dest != COPY_OLD_FE && + CopyGetData(cstate, &dummy, 1, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return true; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + cstate->curr_tuple_start_info.block_id = cstate->curr_block_pos; + cstate->curr_tuple_start_info.offset = cstate->curr_data_offset; + cstate->curr_data_offset = cstate->curr_data_offset + sizeof(fld_count); + new_block_pos = cstate->curr_block_pos; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); The above code is present in NextCopyFrom & CopyReadBinaryTupleLeader, check if we can make a common function or we could use NextCopyFrom as it is. + memcpy(&fld_count, &cstate->curr_data_block->data[cstate->curr_data_offset], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + if (fld_count == -1) + { + return true; + } Should this be an assert in CopyReadBinaryTupleWorker function as this check is already done in the leader. Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: