From dd9b6be19573b5391d01373b53e64a5c1dc305fd Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 28 Dec 2020 15:00:48 +0530 Subject: [PATCH v12 7/7] Parallel copy based on workers identifying line boundary. Parallel copy based on workers identifying line boundary. --- src/backend/commands/copyfromparse.c | 93 +++---- src/backend/commands/copyparallel.c | 441 +++++++++++++++++-------------- src/include/commands/copyfrom_internal.h | 38 ++- src/test/regress/expected/copy2.out | 2 +- 4 files changed, 318 insertions(+), 256 deletions(-) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index a767bae..1d79da9 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -82,13 +82,15 @@ if (1) \ { \ if (raw_buf_ptr > cstate->raw_buf_index) \ { \ - if (!IsParallelCopy()) \ - appendBinaryStringInfo(&cstate->line_buf, \ - cstate->raw_buf + cstate->raw_buf_index, \ - raw_buf_ptr - cstate->raw_buf_index); \ - else \ - line_size += raw_buf_ptr - cstate->raw_buf_index; \ - \ + appendBinaryStringInfo(&cstate->line_buf, \ + cstate->raw_buf + cstate->raw_buf_index, \ + raw_buf_ptr - cstate->raw_buf_index); \ + if (IsParallelCopy()) \ + { \ + ParallelCopyDataBlock *curr_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[cstate->pcdata->first_blk_info.current_block]; \ + curr_data_blk_ptr->skip_bytes += raw_buf_ptr - cstate->raw_buf_index; \ + cstate->pcdata->first_blk_info.start_offset += raw_buf_ptr - cstate->raw_buf_index; \ + } \ cstate->raw_buf_index = raw_buf_ptr; \ } \ } else ((void) 0) @@ -120,7 +122,6 @@ int CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread); static inline bool CopyGetInt32(CopyFromState cstate, int32 *val); static inline bool CopyGetInt16(CopyFromState cstate, int16 *val); -static bool CopyLoadRawBuf(CopyFromState cstate); int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes); static void ClearEOLFromCopiedData(CopyFromState cstate, char *copy_line_data, int copy_line_pos, int *copy_line_size); @@ -389,7 +390,7 @@ CopyGetInt16(CopyFromState cstate, int16 *val) * of the buffer and then we load more data after that. This case occurs only * when a multibyte character crosses a bufferload boundary. */ -static bool +bool CopyLoadRawBuf(CopyFromState cstate) { int nbytes = (!IsParallelCopy()) ? RAW_BUF_BYTES(cstate) : cstate->raw_buf_len; @@ -725,7 +726,7 @@ CopyReadLine(CopyFromState cstate) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_NEW_FE) + if (cstate->copy_src == COPY_NEW_FE && !IsParallelCopy()) { bool bIsFirst = true; @@ -809,7 +810,7 @@ void ConvertToServerEncoding(CopyFromState cstate) { /* Done reading the line. Convert it to server encoding. */ - if (cstate->need_transcoding && (!IsParallelCopy() || IsWorker())) + if (cstate->need_transcoding) { char *cvt; @@ -850,10 +851,6 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - /* For parallel copy */ - int line_size = 0; - uint32 line_pos = 0; - cstate->eol_type = EOL_UNKNOWN; if (cstate->opts.csv_mode) @@ -910,18 +907,23 @@ CopyReadLineText(CopyFromState cstate) if (raw_buf_ptr >= copy_buf_len || need_data) { REFILL_LINEBUF; - if ((copy_buf_len == DATA_BLOCK_SIZE || copy_buf_len == 0) && - IsParallelCopy()) - SetRawBufForLoad(cstate, line_size, copy_buf_len, raw_buf_ptr, - ©_raw_buf); - /* - * Try to read some more data. This will certainly reset - * raw_buf_index to zero, and raw_buf_ptr must go with it. - */ - if (!CopyLoadRawBuf(cstate)) - hit_eof = true; - raw_buf_ptr = (IsParallelCopy()) ? cstate->raw_buf_index : 0; + if (IsParallelCopy()) + { + if (!GetWorkerBlockPos(cstate, ©_raw_buf)) + hit_eof = true; + } + else + { + /* + * Try to read some more data. This will certainly reset + * raw_buf_index to zero, and raw_buf_ptr must go with it. + */ + if (!CopyLoadRawBuf(cstate)) + hit_eof = true; + } + + raw_buf_ptr = 0; copy_buf_len = cstate->raw_buf_len; /* @@ -1139,12 +1141,15 @@ CopyReadLineText(CopyFromState cstate) */ if (prev_raw_ptr > cstate->raw_buf_index) { - if (!IsParallelCopy()) - appendBinaryStringInfo(&cstate->line_buf, - cstate->raw_buf + cstate->raw_buf_index, - prev_raw_ptr - cstate->raw_buf_index); - else - line_size += prev_raw_ptr - cstate->raw_buf_index; + appendBinaryStringInfo(&cstate->line_buf, + cstate->raw_buf + cstate->raw_buf_index, + prev_raw_ptr - cstate->raw_buf_index); + if (IsParallelCopy()) + { + ParallelCopyDataBlock *curr_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[cstate->pcdata->first_blk_info.current_block]; + curr_data_blk_ptr->skip_bytes += raw_buf_ptr - cstate->raw_buf_index; + cstate->pcdata->first_blk_info.start_offset += prev_raw_ptr - cstate->raw_buf_index; + } } cstate->raw_buf_index = raw_buf_ptr; @@ -1199,20 +1204,6 @@ not_end_of_copy: raw_buf_ptr += mblen - 1; } - /* - * Skip the header line. Update the line here, this cannot be done at - * the beginning, as there is a possibility that file contains empty - * lines. - */ - if (IsParallelCopy() && first_char_in_line && !IsHeaderLine()) - { - ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; - - line_pos = UpdateSharedLineInfo(cstate, - pcshared_info->cur_block_pos, - cstate->raw_buf_index, -1, - LINE_LEADER_POPULATING, -1); - } first_char_in_line = false; } /* end of outer loop */ @@ -1223,15 +1214,15 @@ not_end_of_copy: REFILL_LINEBUF; if (!result && !IsHeaderLine()) { - if (IsParallelCopy()) - ClearEOLFromCopiedData(cstate, cstate->raw_buf, raw_buf_ptr, - &line_size); - else + //if (IsParallelCopy()) + // ClearEOLFromCopiedData(cstate, cstate->raw_buf, raw_buf_ptr, + // &line_size); + //else ClearEOLFromCopiedData(cstate, cstate->line_buf.data, cstate->line_buf.len, &cstate->line_buf.len); } - EndLineParallelCopy(cstate, line_pos, line_size, raw_buf_ptr); + //EndLineParallelCopy(cstate, line_pos, line_size, raw_buf_ptr); return result; } diff --git a/src/backend/commands/copyparallel.c b/src/backend/commands/copyparallel.c index 5149798..d9704a1 100644 --- a/src/backend/commands/copyparallel.c +++ b/src/backend/commands/copyparallel.c @@ -83,6 +83,9 @@ ResetLatch(MyLatch); \ } +static void SetWorkerBlockPos(CopyFromState cstate, uint32 first_block, + uint32 start_offset, uint64 cur_lineno); + /* * Estimate1ByteStrSize * @@ -383,7 +386,15 @@ PopulateParallelCopyShmInfo(ParallelCopyShmInfo *shared_info_ptr) ParallelCopyLineBoundary *lineInfo = &shared_info_ptr->line_boundaries.ring[count]; pg_atomic_init_u32(&(lineInfo->line_size), -1); + lineInfo->first_block = 0; + lineInfo->start_offset = 0; } + + shared_info_ptr->line_boundaries.ring[0].first_block = 0; + shared_info_ptr->line_boundaries.ring[0].current_block = 0; + shared_info_ptr->line_boundaries.ring[0].start_offset = COPY_BLOCK_RESERVE_BYTES; + shared_info_ptr->line_boundaries.ring[0].cur_lineno = 1; + shared_info_ptr->line_boundaries.pos_filled = true; } /* @@ -445,6 +456,8 @@ CheckExprParallelSafety(CopyFromState cstate) bool IsParallelCopyAllowed(CopyFromState cstate, Oid relid) { + if (cstate->opts.binary) + return false; /* * Check if parallel operation can be performed based on local * table/foreign table/index/check constraints/triggers present for the @@ -612,10 +625,14 @@ InitializeParallelCopyInfo(CopyFromState cstate, List *attnamelist) cstate->reached_eof = false; cstate->eol_type = EOL_UNKNOWN; cstate->cur_relname = RelationGetRelationName(cstate->rel); - cstate->cur_lineno = 0; + cstate->cur_lineno = 1; cstate->cur_attname = NULL; cstate->cur_attval = NULL; - cstate->pcdata->curr_data_block = NULL; + pcdata->curr_data_block = NULL; + pcdata->first_blk_info.first_block = -1; + pcdata->first_blk_info.current_block = -1; + pcdata->first_blk_info.start_offset = 0; + pcdata->first_blk_info.cur_lineno = 1; /* Set up variables to avoid per-attribute overhead. */ initStringInfo(&cstate->attribute_buf); @@ -641,167 +658,32 @@ InitializeParallelCopyInfo(CopyFromState cstate, List *attnamelist) } /* - * UpdateLineInfo - * - * Update line information & return. - */ -static bool -UpdateLineInfo(ParallelCopyShmInfo *pcshared_info, - ParallelCopyLineBoundary *lineInfo, uint32 write_pos) -{ - elog(DEBUG1, "[Worker] Completed processing line:%u", write_pos); - pg_atomic_write_u32(&lineInfo->line_state, LINE_WORKER_PROCESSED); - pg_atomic_write_u32(&lineInfo->line_size, -1); - pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); - return false; -} - -/* * CacheLineInfo * * Cache the line information to local memory. */ static bool -CacheLineInfo(CopyFromState cstate, uint32 buff_count, uint32 line_pos) +CacheLineInfo(CopyFromState cstate, uint32 line_pos) { - ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + bool done; ParallelCopyData *pcdata = cstate->pcdata; - uint32 write_pos; - ParallelCopyDataBlock *data_blk_ptr; - ParallelCopyLineBoundary *lineInfo; - uint32 offset; - uint32 dataSize; - uint32 copiedSize = 0; - - resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf); - write_pos = GetLinePosition(cstate, line_pos); - if (-1 == write_pos) - return true; - - /* Get the current line information. */ - lineInfo = &pcshared_info->line_boundaries.ring[write_pos]; - if (pg_atomic_read_u32(&lineInfo->line_size) == 0) - return UpdateLineInfo(pcshared_info, lineInfo, write_pos); - - /* Get the block information. */ - data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block]; - - /* Get the offset information from where the data must be copied. */ - offset = lineInfo->start_offset; - pcdata->worker_line_buf[buff_count].cur_lineno = lineInfo->cur_lineno; - - elog(DEBUG1, "[Worker] Processing - line position:%u, block:%u, unprocessed lines:%u, offset:%u, line size:%u", - write_pos, lineInfo->first_block, - pg_atomic_read_u32(&data_blk_ptr->unprocessed_line_parts), - offset, pg_atomic_read_u32(&lineInfo->line_size)); - - for (;;) - { - uint8 skip_bytes = data_blk_ptr->skip_bytes; - - /* - * There is a possibility that the loop embedded at the bottom of the - * current loop has come out because data_blk_ptr->curr_blk_completed - * is set, but dataSize read might be an old value, if - * data_blk_ptr->curr_blk_completed and the line is completed, - * line_size will be set. Read the line_size again to be sure if it is - * completed or partial block. - */ - dataSize = pg_atomic_read_u32(&lineInfo->line_size); - if (dataSize != -1) - { - uint32 remainingSize = dataSize - copiedSize; - - if (!remainingSize) - break; - - /* Whole line is in current block. */ - if (remainingSize + offset + skip_bytes < DATA_BLOCK_SIZE) - { - appendBinaryStringInfo(&pcdata->worker_line_buf[buff_count].line_buf, - &data_blk_ptr->data[offset], - remainingSize); - pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, - 1); - break; - } - else - { - /* Line is spread across the blocks. */ - uint32 lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; - - appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, - &data_blk_ptr->data[offset], - lineInCurrentBlock); - pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1); - copiedSize += lineInCurrentBlock; - while (copiedSize < dataSize) - { - uint32 currentBlockCopySize; - ParallelCopyDataBlock *currBlkPtr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; - - skip_bytes = currBlkPtr->skip_bytes; - - /* - * If complete data is present in current block use - * dataSize - copiedSize, or copy the whole block from - * current block. - */ - currentBlockCopySize = Min(dataSize - copiedSize, DATA_BLOCK_SIZE - skip_bytes); - appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, - &currBlkPtr->data[0], - currentBlockCopySize); - pg_atomic_sub_fetch_u32(&currBlkPtr->unprocessed_line_parts, 1); - copiedSize += currentBlockCopySize; - data_blk_ptr = currBlkPtr; - } + cstate->line_buf = pcdata->worker_line_buf[line_pos].line_buf; - break; - } - } - else if (data_blk_ptr->curr_blk_completed) - { - /* Copy this complete block from the current offset. */ - uint32 lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; - - appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, - &data_blk_ptr->data[offset], - lineInCurrentBlock); - pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1); - copiedSize += lineInCurrentBlock; - - /* - * Reset the offset. For the first copy, copy from the offset. For - * the subsequent copy the complete block. - */ - offset = 0; - - /* Set data_blk_ptr to the following block. */ - data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; - } + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); - for (;;) - { - /* Get the size of this line */ - dataSize = pg_atomic_read_u32(&lineInfo->line_size); + pcdata->worker_line_buf[line_pos].line_buf = cstate->line_buf; + pcdata->worker_line_buf[line_pos].cur_lineno = cstate->cur_lineno; - /* - * If the data is present in current block lineInfo->line_size - * will be updated. If the data is spread across the blocks either - * of lineInfo->line_size or data_blk_ptr->curr_blk_completed can - * be updated. lineInfo->line_size will be updated if the complete - * read is finished. data_blk_ptr->curr_blk_completed will be - * updated if processing of current block is finished and data - * processing is not finished. - */ - if (data_blk_ptr->curr_blk_completed || (dataSize != -1)) - break; - - COPY_WAIT_TO_PROCESS() - } - } + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; - return UpdateLineInfo(pcshared_info, lineInfo, write_pos); + return true; } /* @@ -835,10 +717,9 @@ GetCachedLine(CopyFromState cstate, ParallelCopyData *pcdata) bool GetWorkerLine(CopyFromState cstate) { - uint32 buff_count; ParallelCopyData *pcdata = cstate->pcdata; - ParallelCopyLineBoundaries *line_boundaries = &pcdata->pcshared_info->line_boundaries; - uint32 worker_pos; + uint32 first_block = pcdata->first_blk_info.first_block; + uint32 current_block = pcdata->first_blk_info.current_block; /* * Copy the line data to line_buf and release the line position so that @@ -850,22 +731,32 @@ GetWorkerLine(CopyFromState cstate) pcdata->worker_line_buf_pos = 0; pcdata->worker_line_buf_count = 0; - SpinLockAcquire(&line_boundaries->worker_pos_lock); - worker_pos = line_boundaries->worker_pos; - line_boundaries->worker_pos = (line_boundaries->worker_pos + - WORKER_CHUNK_COUNT) % RINGSIZE; - SpinLockRelease(&line_boundaries->worker_pos_lock); - - for (buff_count = 0; buff_count < WORKER_CHUNK_COUNT; buff_count++) + /* While worker has not switched to next block or if the buffer lines are not filled */ + while (first_block == current_block && + pcdata->worker_line_buf_count < WORKER_CHUNK_COUNT) { - bool result = CacheLineInfo(cstate, buff_count, - worker_pos + buff_count); - if (result) + bool result = CacheLineInfo(cstate, pcdata->worker_line_buf_count); + if (!result) break; + if (cstate->cur_lineno == 1 && cstate->opts.header_line) + { + /* on input just throw the header line away */ + continue; + } + + cstate->cur_lineno++; pcdata->worker_line_buf_count++; } + if (pcdata->first_blk_info.current_block != -1) + SetWorkerBlockPos(cstate, pcdata->first_blk_info.current_block, + cstate->pcdata->first_blk_info.start_offset, + cstate->cur_lineno); + cstate->raw_buf = 0; + cstate->raw_buf_index = 0; + cstate->raw_buf_len = 0; + if (pcdata->worker_line_buf_count > 0) return GetCachedLine(cstate, pcdata); else @@ -1053,17 +944,7 @@ ParallelCopyFrom(CopyFromState cstate) if (!cstate->opts.binary) { - /* On input just throw the header line away. */ - if (cstate->cur_lineno == 0 && cstate->opts.header_line) - { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) - { - pcshared_info->is_read_in_progress = false; - return; /* done */ - } - } - + /* Get data block by block until read is completed. */ for (;;) { bool done; @@ -1073,14 +954,14 @@ ParallelCopyFrom(CopyFromState cstate) cstate->cur_lineno++; /* Actually read the line into memory here. */ - done = CopyReadLine(cstate); + done = ParallelCopyGetData(cstate); /* * EOF at start of line means we're done. If we see EOF after * some characters, we act as though it was newline followed by * EOF, ie, process the line and then exit loop on next iteration. */ - if (done && cstate->line_buf.len == 0) + if (done) break; } } @@ -1489,11 +1370,8 @@ GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) while (count < (MAX_BLOCKS_COUNT - 1)) { ParallelCopyDataBlock *dataBlkPtr = &pcshared_info->data_blocks[block_pos]; - uint32 unprocessed_line_parts = pg_atomic_read_u32(&dataBlkPtr->unprocessed_line_parts); - - if (unprocessed_line_parts == 0) + if (pg_atomic_read_u32(&dataBlkPtr->data_blk_state) == FREE) { - dataBlkPtr->curr_blk_completed = false; dataBlkPtr->skip_bytes = 0; dataBlkPtr->following_block = -1; pcshared_info->cur_block_pos = block_pos; @@ -1536,8 +1414,7 @@ WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) * Set raw_buf to the shared memory where the file data must be read. */ void -SetRawBufForLoad(CopyFromState cstate, uint32 line_size, uint32 copy_buf_len, - uint32 raw_buf_ptr, char **copy_raw_buf) +SetRawBufForLoad(CopyFromState cstate) { ParallelCopyShmInfo *pcshared_info; uint32 cur_block_pos; @@ -1554,33 +1431,199 @@ SetRawBufForLoad(CopyFromState cstate, uint32 line_size, uint32 copy_buf_len, next_data_blk_ptr = &pcshared_info->data_blocks[next_block_pos]; /* set raw_buf to the data block in shared memory */ - cstate->raw_buf = next_data_blk_ptr->data; - *copy_raw_buf = cstate->raw_buf; + cstate->raw_buf = &next_data_blk_ptr->data[COPY_BLOCK_RESERVE_BYTES]; if (cur_data_blk_ptr) { - if (line_size) + cur_data_blk_ptr->following_block = next_block_pos; + pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED); + } + + cstate->raw_buf_len = 0; + cstate->raw_buf_index = 0; +} + +/* + * Read the next input line and stash it in line_buf, with conversion to + * server encoding. + * + * Result is true if read was terminated by EOF, false if terminated + * by newline. The terminating newline or EOF marker is not included + * in the final value of line_buf. + */ +bool +ParallelCopyGetData(CopyFromState cstate) +{ + bool hit_eof = false; + uint32 cur_block_pos; + ParallelCopyDataBlock *cur_data_blk_ptr; + + SetRawBufForLoad(cstate); + cur_block_pos = cstate->pcdata->pcshared_info->cur_block_pos; + cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[cur_block_pos]; + + /* + * Try to read some more data. This will certainly reset + * raw_buf_index to zero, and raw_buf_ptr must go with it. + */ + if (!CopyLoadRawBuf(cstate)) + hit_eof = true; + + /* + * If we are completely out of data, break out of the loop, + * reporting EOF. + */ + if (RAW_BUF_BYTES(cstate) <= 0) + return true; + + cur_data_blk_ptr->skip_bytes = DATA_BLOCK_SIZE - cstate->raw_buf_len; + pg_atomic_add_fetch_u32(&cstate->pcdata->pcshared_info->total_populated, 1); + + if (hit_eof) + { + if (cur_data_blk_ptr) + { + cur_data_blk_ptr->following_block = -1; + pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED); + } + } + + return hit_eof; +} + +bool +GetWorkerBlockPos(CopyFromState cstate, char **copy_raw_buf) +{ + ParallelCopyDataBlock *cur_data_blk_ptr; + ParallelCopyLineBoundary *first_blk_info = &cstate->pcdata->first_blk_info; + uint32 start_offset; + int nbytes = RAW_BUF_BYTES(cstate); + bool bret = true; + + /* Worker should move to the next block, first data block information is already available */ + if (first_blk_info->first_block != -1) + { + ParallelCopyDataBlock *prev_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[first_blk_info->current_block]; + pg_atomic_add_fetch_u32(&cstate->pcdata->pcshared_info->total_processed, 1); + pg_atomic_write_u32(&prev_data_blk_ptr->data_blk_state, FREE); + start_offset = first_blk_info->start_offset = COPY_BLOCK_RESERVE_BYTES; + + if (prev_data_blk_ptr->following_block != -1) + { + /* The next block that should be processed will be set in the + * following_block. + */ + cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[prev_data_blk_ptr->following_block]; + first_blk_info->current_block = prev_data_blk_ptr->following_block; + } + else { /* - * Mark the previous block as completed, worker can start copying - * this data. + * Last block was the last block, this will happen if worker could + * not decide if it was EOL from the last few bytes of last block. + * We can get a free block as worker will not be doing any work at + * this time. */ - cur_data_blk_ptr->following_block = next_block_pos; - pg_atomic_add_fetch_u32(&cur_data_blk_ptr->unprocessed_line_parts, 1); - cur_data_blk_ptr->curr_blk_completed = true; + uint32 next_block; + next_block = WaitGetFreeCopyBlock(cstate->pcdata->pcshared_info); + cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[next_block]; + pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED); + + first_blk_info->current_block = next_block; + cur_data_blk_ptr->skip_bytes = DATA_BLOCK_SIZE; + } + + if (!cstate->pcdata->pcshared_info->is_read_in_progress && + (pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_processed) == + pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_populated))) + { + if (nbytes) + bret = false; + else + return true; } + } + else + { + /* Get the block for the worker to process. */ + for(;;) + { + COPY_WAIT_TO_PROCESS(); + + if (!cstate->pcdata->pcshared_info->is_read_in_progress && + (pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_processed) == + pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_populated))) + return true; - cur_data_blk_ptr->skip_bytes = copy_buf_len - raw_buf_ptr; - cstate->raw_buf_len = cur_data_blk_ptr->skip_bytes; + SpinLockAcquire(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock); + if (cstate->pcdata->pcshared_info->line_boundaries.pos_filled == false) + { + SpinLockRelease(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock); + continue; + } - /* Copy the skip bytes to the next block to be processed. */ - if (cur_data_blk_ptr->skip_bytes) - memcpy(cstate->raw_buf, cur_data_blk_ptr->data + raw_buf_ptr, - cur_data_blk_ptr->skip_bytes); + first_blk_info->first_block = cstate->pcdata->pcshared_info->line_boundaries.ring[0].first_block; + first_blk_info->current_block = cstate->pcdata->pcshared_info->line_boundaries.ring[0].first_block; + start_offset = first_blk_info->start_offset = cstate->pcdata->pcshared_info->line_boundaries.ring[0].start_offset; + cstate->cur_lineno = first_blk_info->cur_lineno = cstate->pcdata->pcshared_info->line_boundaries.ring[0].cur_lineno; + cstate->pcdata->pcshared_info->line_boundaries.pos_filled = false; + SpinLockRelease(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock); + break; + } + + cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[first_blk_info->first_block]; + } + + /* Wait till the block is populated by leader */ + while (pg_atomic_read_u32(&cur_data_blk_ptr->data_blk_state) != FILLED) + COPY_WAIT_TO_PROCESS(); + + pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, PROCESSING); + + /* set cstate variables */ + if (nbytes > 0) + { + char *raw_buf = &cur_data_blk_ptr->data[start_offset]; + + /* Copy down the unprocessed data if any. */ + memmove(raw_buf, cstate->raw_buf + cstate->raw_buf_index, nbytes); + + cstate->raw_buf = raw_buf; + cstate->raw_buf_len = DATA_BLOCK_SIZE - cur_data_blk_ptr->skip_bytes + nbytes; } else - cstate->raw_buf_len = 0; + { + cstate->raw_buf = &cur_data_blk_ptr->data[start_offset]; + cstate->raw_buf_len = DATA_BLOCK_SIZE - cur_data_blk_ptr->skip_bytes; + } cstate->raw_buf_index = 0; + *copy_raw_buf = cstate->raw_buf; + return bret; +} + +static void +SetWorkerBlockPos(CopyFromState cstate, uint32 first_block, + uint32 start_offset, uint64 cur_lineno) +{ + ParallelCopyDataBlock *cur_data_blk_ptr; + cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[first_block]; + pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED); + + SpinLockAcquire(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock); + cstate->pcdata->pcshared_info->line_boundaries.ring[0].first_block = first_block; + cstate->pcdata->pcshared_info->line_boundaries.ring[0].current_block = first_block; + cstate->pcdata->pcshared_info->line_boundaries.ring[0].start_offset = start_offset; + cstate->pcdata->pcshared_info->line_boundaries.ring[0].cur_lineno = cur_lineno; + cstate->pcdata->pcshared_info->line_boundaries.pos_filled = true; + SpinLockRelease(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock); + + cstate->pcdata->first_blk_info.first_block = -1; + cstate->pcdata->first_blk_info.current_block = -1; + cstate->pcdata->first_blk_info.start_offset = 0; + cstate->pcdata->first_blk_info.cur_lineno = 1; + + //elog(LOG, "Setting first_block to %d", first_block); + //elog(LOG, "Setting offset to %d", start_offset); } /* diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 67cf775..40105b0 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -43,7 +43,7 @@ * should be a multiple of WORKER_CHUNK_COUNT, as wrap around cases is currently * not handled while selecting the WORKER_CHUNK_COUNT by the worker. */ -#define RINGSIZE (10 * 1024) +#define RINGSIZE 1 /* * While accessing DSM, each worker will pick the WORKER_CHUNK_COUNT records @@ -205,6 +205,17 @@ typedef enum CopyInsertMethod CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; +#define COPY_BLOCK_RESERVE_BYTES 8 + +typedef enum BlockState +{ + FREE, /* buffer is empty */ + FILLED, /* leader has filled the buffer with raw data */ + READY, /* start pos has been filled in, but no worker process has claimed the block yet */ + PROCESSING, /* worker has claimed the block, and is processing it */ + +} BlockState; + /* * Copy data block information. * @@ -234,6 +245,9 @@ typedef struct ParallelCopyDataBlock */ bool curr_blk_completed; + /* Current state of the block */ + pg_atomic_uint32 data_blk_state; + /* * Few bytes need to be skipped from this block, this will be set when a * sequence of characters like \r\n is expected, but end of our block @@ -242,8 +256,8 @@ typedef struct ParallelCopyDataBlock * Worker will use skip_bytes to know that this data must be skipped from * this data block. */ - uint8 skip_bytes; - char data[DATA_BLOCK_SIZE]; /* data read from file */ + uint32 skip_bytes; + char data[COPY_BLOCK_RESERVE_BYTES + DATA_BLOCK_SIZE]; /* data read from file */ } ParallelCopyDataBlock; /* @@ -283,6 +297,7 @@ typedef struct ParallelCopyLineBoundary /* Position of the first block in data_blocks array. */ uint32 first_block; uint32 start_offset; /* start offset of the line */ + uint32 current_block; /* * Size of the current line -1 means line is yet to be filled completely, @@ -303,6 +318,12 @@ typedef struct ParallelCopyLineBoundaries uint32 worker_pos; /* Worker's last blocked Position. */ slock_t worker_pos_lock; /* locks worker_pos shared variable. */ + /* + * If one of the worker has filled ring variable, to indicate if any of the + * worker should pick or wait till it is filled. + */ + bool pos_filled; + /* Data read from the file/stdin by the leader process. */ ParallelCopyLineBoundary ring[RINGSIZE]; } ParallelCopyLineBoundaries; @@ -328,6 +349,9 @@ typedef struct ParallelCopyShmInfo * clause. */ pg_atomic_uint64 total_worker_processed; + + pg_atomic_uint32 total_populated; + pg_atomic_uint32 total_processed; uint64 populated; /* lines populated by leader */ uint32 cur_block_pos; /* current data block */ ParallelCopyDataBlock data_blocks[MAX_BLOCKS_COUNT]; /* data block array */ @@ -394,6 +418,8 @@ typedef struct ParallelCopyData /* Current position in worker_line_buf */ uint32 worker_line_buf_pos; + ParallelCopyLineBoundary first_blk_info; + /* For binary formatted files */ ParallelCopyDataBlock *curr_data_block; } ParallelCopyData; @@ -528,12 +554,14 @@ extern uint32 GetLinePosition(CopyFromState cstate, uint32 line_pos); extern bool GetWorkerLine(CopyFromState cstate); extern bool CopyReadLine(CopyFromState cstate); extern uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info); -extern void SetRawBufForLoad(CopyFromState cstate, uint32 line_size, uint32 copy_buf_len, - uint32 raw_buf_ptr, char **copy_raw_buf); +extern void SetRawBufForLoad(CopyFromState cstate); extern uint32 UpdateSharedLineInfo(CopyFromState cstate, uint32 blk_pos, uint32 offset, uint32 line_size, uint32 line_state, uint32 blk_line_pos); extern void EndLineParallelCopy(CopyFromState cstate, uint32 line_pos, uint32 line_size, uint32 raw_buf_ptr); +extern bool ParallelCopyGetData(CopyFromState cstate); +extern bool GetWorkerBlockPos(CopyFromState cstate, char **copy_raw_buf); +extern bool CopyLoadRawBuf(CopyFromState cstate); extern int CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread); extern int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes); diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index 22274cb..d87432a 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -767,7 +767,7 @@ ERROR: column "d" specified more than once -- missing data: should fail COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); ERROR: invalid input syntax for type integer: "" -CONTEXT: COPY test_parallel_copy, line 0, column a: "" +CONTEXT: COPY test_parallel_copy, line 1, column a: "" parallel worker COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); ERROR: missing data for column "e" -- 1.8.3.1