Re: Parallel copy

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>
Cc: Rafia Sabih <rafia(dot)pghackers(at)gmail(dot)com>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, Robert Haas <robertmhaas(at)gmail(dot)com>, Ants Aasma <ants(at)cybertec(dot)at>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Alastair Turner <minion(at)decodable(dot)me>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Parallel copy
Date: 2020-07-15 05:04:28
Message-ID: CAA4eK1KOvegJzV7DmX+DiWmqbqTGjSmJ4QPGwY2qjkEHPdD6mw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Sun, Jul 12, 2020 at 5:48 PM Bharath Rupireddy
<bharath(dot)rupireddyforpostgres(at)gmail(dot)com> wrote:
>
> >
> > Hi Bharath,
> >
> > I was looking forward to review this patch-set but unfortunately it is
> > showing a reject in copy.c, and might need a rebase.
> > I was applying on master over the commit-
> > cd22d3cdb9bd9963c694c01a8c0232bbae3ddcfb.
> >
>
> Thanks for showing interest. Please find the patch set rebased to
> latest commit b1e48bbe64a411666bb1928b9741e112e267836d.
>

Few comments:
====================
0001-Copy-code-readjustment-to-support-parallel-copy

I am not sure converting the code to macros is a good idea, it makes
this code harder to read. Also, there are a few changes which I am
not sure are necessary.
1.
+/*
+ * CLEAR_EOL_FROM_COPIED_DATA - Clear EOL from the copied data.
+ */
+#define CLEAR_EOL_FROM_COPIED_DATA(copy_line_data, copy_line_pos,
copy_line_size) \
+{ \
+ /* \
+ * If we didn't hit EOF, then we must have transferred the EOL marker \
+ * to line_buf along with the data. Get rid of it. \
+ */ \
+ switch (cstate->eol_type) \
+ { \
+ case EOL_NL: \
+ Assert(copy_line_size >= 1); \
+ Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
+ copy_line_data[copy_line_pos - 1] = '\0'; \
+ copy_line_size--; \
+ break; \
+ case EOL_CR: \
+ Assert(copy_line_size >= 1); \
+ Assert(copy_line_data[copy_line_pos - 1] == '\r'); \
+ copy_line_data[copy_line_pos - 1] = '\0'; \
+ copy_line_size--; \
+ break; \
+ case EOL_CRNL: \
+ Assert(copy_line_size >= 2); \
+ Assert(copy_line_data[copy_line_pos - 2] == '\r'); \
+ Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
+ copy_line_data[copy_line_pos - 2] = '\0'; \
+ copy_line_size -= 2; \
+ break; \
+ case EOL_UNKNOWN: \
+ /* shouldn't get here */ \
+ Assert(false); \
+ break; \
+ } \
+}

In the original code, we are using only len and buffer, here we are
using position, length/size and buffer. Is it really required or can
we do with just len and buffer?

2.
+/*
+ * INCREMENTPROCESSED - Increment the lines processed.
+ */
+#define INCREMENTPROCESSED(processed) \
+processed++;
+
+/*
+ * GETPROCESSED - Get the lines processed.
+ */
+#define GETPROCESSED(processed) \
+return processed;
+

I don't like converting above to macros. I don't think converting
such things to macros will buy us much.

0002-Framework-for-leader-worker-in-parallel-copy
3.
/*
+ * Copy data block information.
+ */
+typedef struct ParallelCopyDataBlock

It is better to add a few comments atop this data structure to explain
how it is used?

4.
+ * ParallelCopyLineBoundary is common data structure between leader & worker,
+ * this is protected by the following sequence in the leader & worker.
+ * Leader should operate in the following order:
+ * 1) update first_block, start_offset & cur_lineno in any order.
+ * 2) update line_size.
+ * 3) update line_state.
+ * Worker should operate in the following order:
+ * 1) read line_size.
+ * 2) only one worker should choose one line for processing, this is handled by
+ * using pg_atomic_compare_exchange_u32, worker will change the sate to
+ * LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED.
+ * 3) read first_block, start_offset & cur_lineno in any order.
+ */
+typedef struct ParallelCopyLineBoundary

Here, you have mentioned how workers and leader should operate to make
sure access to the data is sane. However, you have not explained what
is the problem if they don't do so and it is not apparent to me.
Also, it is not very clear what is the purpose of this data structure
from comments.

5.
+/*
+ * Circular queue used to store the line information.
+ */
+typedef struct ParallelCopyLineBoundaries
+{
+ /* Position for the leader to populate a line. */
+ uint32 leader_pos;

I don't think the variable needs to be named as leader_pos, it is okay
to name it is as 'pos' as the comment above it explains its usage.

7.
+#define DATA_BLOCK_SIZE RAW_BUF_SIZE
+#define RINGSIZE (10 * 1000)
+#define MAX_BLOCKS_COUNT 1000
+#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */

It would be good if you can write a few comments to explain why you
have chosen these default values.

8.
ParallelCopyCommonKeyData, shall we name this as
SerializedParallelCopyState or something like that? For example, see
SerializedSnapshotData which has been used to pass snapshot
information to passed to workers.

9.
+CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData
*shared_cstate)

If you agree with point-8, then let's name this as
SerializeParallelCopyState. See, if there is more usage of similar
types in the patch then lets change those as well.

10.
+ * in the DSM. The specified number of workers will then be launched.
+ *
+ */
+static ParallelContext*
+BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid)

No need of an extra line with only '*' in the above multi-line comment.

11.
BeginParallelCopy(..)
{
..
+ EstimateLineKeysStr(pcxt, cstate->null_print);
+ EstimateLineKeysStr(pcxt, cstate->null_print_client);
+ EstimateLineKeysStr(pcxt, cstate->delim);
+ EstimateLineKeysStr(pcxt, cstate->quote);
+ EstimateLineKeysStr(pcxt, cstate->escape);
..
}

Why do we need to do this separately for each variable of cstate?
Can't we serialize it along with other members of
SerializeParallelCopyState (a new name for ParallelCopyCommonKeyData)?

12.
BeginParallelCopy(..)
{
..
+ LaunchParallelWorkers(pcxt);
+ if (pcxt->nworkers_launched == 0)
+ {
+ EndParallelCopy(pcxt);
+ elog(WARNING,
+ "No workers available, copy will be run in non-parallel mode");
..
}

I don't see the need to issue a WARNING if we are not able to launch
workers. We don't do that for other cases where we fail to launch
workers.

13.
+}
+/*
+ * ParallelCopyMain -
..

+}
+/*
+ * ParallelCopyLeader

One line space is required before starting a new function.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2020-07-15 06:06:15 Re: Transactions involving multiple postgres foreign servers, take 2
Previous Message Peter Geoghegan 2020-07-15 04:12:35 Re: Default setting for enable_hashagg_disk