Parallel copy

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Parallel copy
Date: 2020-02-14 08:11:54
Message-ID: CAA4eK1+kpddvvLxWm4BuG_AhVvYz8mKAEa7osxp_X0d4ZEiV=g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

This work is to parallelize the copy command and in particular "Copy
<table_name> from 'filename' Where <condition>;" command.

Before going into how and what portion of 'copy command' processing we
can parallelize, let us see in brief what are the top-level operations
we perform while copying from the file into a table. We read the file
in 64KB chunks, then find the line endings and process that data line
by line, where each line corresponds to one tuple. We first form the
tuple (in form of value/null array) from that line, check if it
qualifies the where condition and if it qualifies, then perform
constraint check and few other checks and then finally store it in
local tuple array. Once we reach 1000 tuples or consumed 64KB
(whichever occurred first), we insert them together via
table_multi_insert API and then for each tuple insert into the
index(es) and execute after row triggers.

So if we see here we do a lot of work after reading each 64K chunk.
We can read the next chunk only after all the tuples are processed in
the previous chunk we read. This brings us an opportunity to
parallelize each 64K chunk processing. I think we can do this in more
than one way.

The first idea is that we allocate each chunk to a worker and once the
worker has finished processing the current chunk, it can start with
the next unprocessed chunk. Here, we need to see how to handle the
partial tuples at the end or beginning of each chunk. We can read the
chunks in dsa/dsm instead of in local buffer for processing.
Alternatively, if we think that accessing shared memory can be costly
we can read the entire chunk in local memory, but copy the partial
tuple at the beginning of a chunk (if any) to a dsa. We mainly need
partial tuple in the shared memory area. The worker which has found
the initial part of the partial tuple will be responsible to process
the entire tuple. Now, to detect whether there is a partial tuple at
the beginning of a chunk, we always start reading one byte, prior to
the start of the current chunk and if that byte is not a terminating
line byte, we know that it is a partial tuple. Now, while processing
the chunk, we will ignore this first line and start after the first
terminating line.

To connect the partial tuple in two consecutive chunks, we need to
have another data structure (for the ease of reference in this email,
I call it CTM (chunk-tuple-map)) in shared memory where we store some
per-chunk information like the chunk-number, dsa location of that
chunk and a variable which indicates whether we can free/reuse the
current entry. Whenever we encounter the partial tuple at the
beginning of a chunk we note down its chunk number, and dsa location
in CTM. Next, whenever we encounter any partial tuple at the end of
the chunk, we search CTM for next chunk-number and read from
corresponding dsa location till we encounter terminating line byte.
Once we have read and processed this partial tuple, we can mark the
entry as available for reuse. There are some loose ends here like how
many entries shall we allocate in this data structure. It depends on
whether we want to allow the worker to start reading the next chunk
before the partial tuple of the previous chunk is processed. To keep
it simple, we can allow the worker to process the next chunk only when
the partial tuple in the previous chunk is processed. This will allow
us to keep the entries equal to a number of workers in CTM. I think
we can easily improve this if we want but I don't think it will matter
too much as in most cases by the time we processed the tuples in that
chunk, the partial tuple would have been consumed by the other worker.

Another approach that came up during an offlist discussion with Robert
is that we have one dedicated worker for reading the chunks from file
and it copies the complete tuples of one chunk in the shared memory
and once that is done, a handover that chunks to another worker which
can process tuples in that area. We can imagine that the reader
worker is responsible to form some sort of work queue that can be
processed by the other workers. In this idea, we won't be able to get
the benefit of initial tokenization (forming tuple boundaries) via
parallel workers and might need some additional memory processing as
after reader worker has handed the initial shared memory segment, we
need to somehow identify tuple boundaries and then process them.

Another thing we need to figure out is the how many workers to use for
the copy command. I think we can use it based on the file size which
needs some experiments or may be based on user input.

I think we have two related problems to solve for this (a) relation
extension lock (required for extending the relation) which won't
conflict among workers due to group locking, we are working on a
solution for this in another thread [1], (b) Use of Page locks in Gin
indexes, we can probably disallow parallelism if the table has Gin
index which is not a great thing but not bad either.

To be clear, this work is for PG14.

Thoughts?

[1] - https://www.postgresql.org/message-id/CAD21AoCmT3cFQUN4aVvzy5chw7DuzXrJCbrjTU05B%2BSs%3DGn1LA%40mail.gmail.com

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2020-02-14 08:20:02 Re: [HACKERS] Moving relation extension locks out of heavyweight lock manager
Previous Message Fujii Masao 2020-02-14 07:49:52 Re: small improvement of the elapsed time for truncating heap in vacuum