Re: Parallel copy - Mailing list pgsql-hackers

From vignesh C
Subject Re: Parallel copy
Date
Msg-id CALDaNm2Y4ZBqyobWR4-Tw=tN9N7PovE1pH7jMRJD9QNbx9A7vQ@mail.gmail.com
Whole thread Raw
In response to Re: Parallel copy  (vignesh C <vignesh21@gmail.com>)
Responses Re: Parallel copy  (Ants Aasma <ants@cybertec.at>)
List pgsql-hackers
On Thu, Mar 12, 2020 at 6:39 PM vignesh C <vignesh21@gmail.com> wrote:
>

Existing parallel copy code flow.  Copy supports copy operation from
csv, txt & bin format file. For processing csv & text format, it will
read 64kb chunk or lesser size if in case the file has lesser size
contents in the input file. Server will then read one tuple of data
and do the processing of the tuple. If the above tuple that is
generated was less than 64kb data, then the server will try to
generate another tuple for processing from the remaining unprocessed
data. If it is not able to generate one tuple from the unprocessed
data it will do a further 64kb data read or lesser remaining size that
is present in the file and send the tuple for processing. This process
is repeated till the complete file is processed. For processing bin
format file the flow is slightly different. Server will read the
number of columns that are present. Then read the column size data and
then read the actual column contents, repeat this for all the columns.
Server will then process the tuple that is generated. This process is
repeated for all the remaining tuples in the bin file. The tuple
processing flow is the same in all the formats. Currently all the
operations happen sequentially. This project will help in
parallelizing the copy operation.

I'm planning to do the POC of parallel copy with the below design:
Proposed Syntax:
COPY table_name FROM ‘copy_file' WITH (FORMAT ‘format’, PARALLEL ‘workers’);
Users can specify the number of workers that must be used for copying
the data in parallel. Here ‘workers’ is the number of workers that
must be used for parallel copy operation apart from the leader. Leader
is responsible for reading the data from the input file and generating
the work for the workers. Leader will start a transaction and share
this transaction with the workers. All workers will be using the same
transaction to insert the records. Leader will create a circular queue
and share it across the workers. The circular queue will be present in
DSM. Leader will be using a fixed size queue to share the contents
between the leader and the workers. Currently we will have 100
elements present in the queue. This will be created before the workers
are started and shared with the workers. The data structures that are
required by the parallel workers will be initialized by the leader,
the size required in dsm will be calculated and the necessary keys
will be loaded in the DSM. The specified number of workers will then
be launched. Leader will read the table data from the file and copy
the contents to the queue element by element. Each element in the
queue will have 64K size DSA. This DSA will be used to store tuple
contents from the file. The leader will try to copy as much content as
possible within one 64K DSA queue element. We intend to store at least
one tuple in each queue element. There are some cases where the 64K
space may not be enough to store a single tuple. Mostly in cases where
the table has toast data present and the single tuple can be more than
64K size. In these scenarios we will extend the DSA space accordingly.
We cannot change the size of the dsm once the workers are launched.
Whereas in case of DSA we can free the dsa pointer and reallocate the
dsa pointer based on the memory size required. This is the very reason
for choosing DSA over DSM for storing the data that must be inserted
into the relation. Leader will keep on loading the data into the queue
till the queue becomes full. Leader will transform his role into a
worker either when the Queue is full or the Complete file is
processed. Once the queue is full, the leader will switch its role to
become a worker, then the leader will continue to act as worker till
25% of the elements in the queue is consumed by all the workers. Once
there is at least 25% space available in the queue leader who was
working as a worker will switch its role back to become the leader
again. The above process of filling the queue will be continued by the
leader until the whole file is processed. Leader will wait until the
respective workers finish processing the queue elements. The copy from
functionality is also being used during initdb operations where the
copy is intended to be performed in single mode or the user can still
continue running in non-parallel mode. In case of non parallel mode,
memory allocation will happen using palloc instead of DSM/DSA and most
of the flow will be the same in both parallel and non parallel cases.

We had a couple of options for the way in which queue elements can be stored.
Option 1:  Each element (DSA chunk) will contain tuples such that each
tuple will be preceded by the length of the tuple.  So the tuples will
be arranged like (Length of tuple-1, tuple-1), (Length of tuple-2,
tuple-2), .... Or Option 2: Each element (DSA chunk) will contain only
tuples (tuple-1), (tuple-2), .....  And we will have a second
ring-buffer which contains a start-offset or length of each tuple. The
old design used to generate one tuple of data and process tuple by
tuple. In the new design, the server will generate multiple tuples of
data per queue element. The worker will then process data tuple by
tuple. As we are processing the data tuple by tuple, I felt both of
the options are almost the same. However Design1 was chosen over
Design 2 as we can save up on some space that was required by another
variable in each element of the queue.

The parallel workers will read the tuples from the queue and do the
following operations, all of these operations: a) where clause
handling, b) convert tuple to columns, c) add default null values for
the missing columns that are not present in that record, d) find the
partition if it is partitioned table, e) before row insert Triggers,
constraints  f) insertion of the data. Rest of the flow is the same as
the existing code.

Enhancements after POC is done:
Initially we plan to use the number of workers based on the worker
count user has specified, Later we will do some experiments and think
of an approach to choose workers automatically after processing sample
contents from the file.
Initially we plan to use 100 elements in the queue, Later we will
experiment to find the right size for the queue once the basic patch
is ready.
Initially we plan to generate the transaction from the leader and
share it across to the workers. Later we will change this in such a
way that the first process that will do an insert operation will
generate the transaction and share it with the rest of them.

Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com



pgsql-hackers by date:

Previous
From: Justin Pryzby
Date:
Subject: Re: doc review for parallel vacuum
Next
From: "movead.li@highgo.ca"
Date:
Subject: Re: A bug when use get_bit() function for a long bytea string