Re: Parallel copy - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: Parallel copy |
Date | |
Msg-id | CALDaNm2Y4ZBqyobWR4-Tw=tN9N7PovE1pH7jMRJD9QNbx9A7vQ@mail.gmail.com Whole thread Raw |
In response to | Re: Parallel copy (vignesh C <vignesh21@gmail.com>) |
Responses |
Re: Parallel copy
|
List | pgsql-hackers |
On Thu, Mar 12, 2020 at 6:39 PM vignesh C <vignesh21@gmail.com> wrote: > Existing parallel copy code flow. Copy supports copy operation from csv, txt & bin format file. For processing csv & text format, it will read 64kb chunk or lesser size if in case the file has lesser size contents in the input file. Server will then read one tuple of data and do the processing of the tuple. If the above tuple that is generated was less than 64kb data, then the server will try to generate another tuple for processing from the remaining unprocessed data. If it is not able to generate one tuple from the unprocessed data it will do a further 64kb data read or lesser remaining size that is present in the file and send the tuple for processing. This process is repeated till the complete file is processed. For processing bin format file the flow is slightly different. Server will read the number of columns that are present. Then read the column size data and then read the actual column contents, repeat this for all the columns. Server will then process the tuple that is generated. This process is repeated for all the remaining tuples in the bin file. The tuple processing flow is the same in all the formats. Currently all the operations happen sequentially. This project will help in parallelizing the copy operation. I'm planning to do the POC of parallel copy with the below design: Proposed Syntax: COPY table_name FROM ‘copy_file' WITH (FORMAT ‘format’, PARALLEL ‘workers’); Users can specify the number of workers that must be used for copying the data in parallel. Here ‘workers’ is the number of workers that must be used for parallel copy operation apart from the leader. Leader is responsible for reading the data from the input file and generating the work for the workers. Leader will start a transaction and share this transaction with the workers. All workers will be using the same transaction to insert the records. Leader will create a circular queue and share it across the workers. The circular queue will be present in DSM. Leader will be using a fixed size queue to share the contents between the leader and the workers. Currently we will have 100 elements present in the queue. This will be created before the workers are started and shared with the workers. The data structures that are required by the parallel workers will be initialized by the leader, the size required in dsm will be calculated and the necessary keys will be loaded in the DSM. The specified number of workers will then be launched. Leader will read the table data from the file and copy the contents to the queue element by element. Each element in the queue will have 64K size DSA. This DSA will be used to store tuple contents from the file. The leader will try to copy as much content as possible within one 64K DSA queue element. We intend to store at least one tuple in each queue element. There are some cases where the 64K space may not be enough to store a single tuple. Mostly in cases where the table has toast data present and the single tuple can be more than 64K size. In these scenarios we will extend the DSA space accordingly. We cannot change the size of the dsm once the workers are launched. Whereas in case of DSA we can free the dsa pointer and reallocate the dsa pointer based on the memory size required. This is the very reason for choosing DSA over DSM for storing the data that must be inserted into the relation. Leader will keep on loading the data into the queue till the queue becomes full. Leader will transform his role into a worker either when the Queue is full or the Complete file is processed. Once the queue is full, the leader will switch its role to become a worker, then the leader will continue to act as worker till 25% of the elements in the queue is consumed by all the workers. Once there is at least 25% space available in the queue leader who was working as a worker will switch its role back to become the leader again. The above process of filling the queue will be continued by the leader until the whole file is processed. Leader will wait until the respective workers finish processing the queue elements. The copy from functionality is also being used during initdb operations where the copy is intended to be performed in single mode or the user can still continue running in non-parallel mode. In case of non parallel mode, memory allocation will happen using palloc instead of DSM/DSA and most of the flow will be the same in both parallel and non parallel cases. We had a couple of options for the way in which queue elements can be stored. Option 1: Each element (DSA chunk) will contain tuples such that each tuple will be preceded by the length of the tuple. So the tuples will be arranged like (Length of tuple-1, tuple-1), (Length of tuple-2, tuple-2), .... Or Option 2: Each element (DSA chunk) will contain only tuples (tuple-1), (tuple-2), ..... And we will have a second ring-buffer which contains a start-offset or length of each tuple. The old design used to generate one tuple of data and process tuple by tuple. In the new design, the server will generate multiple tuples of data per queue element. The worker will then process data tuple by tuple. As we are processing the data tuple by tuple, I felt both of the options are almost the same. However Design1 was chosen over Design 2 as we can save up on some space that was required by another variable in each element of the queue. The parallel workers will read the tuples from the queue and do the following operations, all of these operations: a) where clause handling, b) convert tuple to columns, c) add default null values for the missing columns that are not present in that record, d) find the partition if it is partitioned table, e) before row insert Triggers, constraints f) insertion of the data. Rest of the flow is the same as the existing code. Enhancements after POC is done: Initially we plan to use the number of workers based on the worker count user has specified, Later we will do some experiments and think of an approach to choose workers automatically after processing sample contents from the file. Initially we plan to use 100 elements in the queue, Later we will experiment to find the right size for the queue once the basic patch is ready. Initially we plan to generate the transaction from the leader and share it across to the workers. Later we will change this in such a way that the first process that will do an insert operation will generate the transaction and share it with the rest of them. Thoughts? Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
pgsql-hackers by date: