Re: Parallel copy

From: Greg Nancarrow <gregn4422(at)gmail(dot)com>
To: pgsql-hackers(at)lists(dot)postgresql(dot)org
Cc: vignesh C <vignesh21(at)gmail(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
Subject: Re: Parallel copy
Date: 2020-08-12 03:39:12
Message-ID: 159720355283.790.16098219619754504834.pgcf@coridan.postgresql.org
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

The following review has been posted through the commitfest application:
make installcheck-world: tested, passed
Implements feature: tested, passed
Spec compliant: tested, passed
Documentation: tested, failed

Hi,

I don't claim to yet understand all of the Postgres internals that this patch is updating and interacting with, so I'm still testing and debugging portions of this patch, but would like to give feedback on what I've noticed so far.
I have done some ad-hoc testing of the patch using parallel copies from text/csv/binary files and have not yet struck any execution problems other than some option validation and associated error messages on boundary cases.

One general question that I have: is there a user benefit (over the normal non-parallel COPY) to allowing "COPY ... FROM ... WITH (PARALLEL 1)"?

My following comments are broken down by patch:

(1) v2-0001-Copy-code-readjustment-to-support-parallel-copy.patch

(i) Whilst I can't entirely blame these patches for it (as they are following what is already there), I can't help noticing the use of numerous macros in src/backend/commands/copy.c which paste in multiple lines of code in various places.
It's getting a little out-of-hand. Surely the majority of these would be best inline functions instead?
Perhaps hasn't been done because too many parameters need to be passed - thoughts?

(2) v2-0002-Framework-for-leader-worker-in-parallel-copy.patch

(i) minor point: there are some tabbing/spacing issues in this patch (and the other patches), affecting alignment.
e.g. mixed tabs/spaces and misalignment in PARALLEL_COPY_KEY_xxx definitions

(ii)

+/*
+ * Each worker will be allocated WORKER_CHUNK_COUNT of records from DSM data
+ * block to process to avoid lock contention. This value should be mode of
+ * RINGSIZE, as wrap around cases is currently not handled while selecting the
+ * WORKER_CHUNK_COUNT by the worker.
+ */
+#define WORKER_CHUNK_COUNT 50

"This value should be mode of RINGSIZE ..."

-> typo: mode (mod? should evenly divide into RINGSIZE?)

(iii)
+ * using pg_atomic_compare_exchange_u32, worker will change the sate to

->typo: sate (should be "state")

(iv)

+ errmsg("parallel option supported only for copy from"),

-> suggest change to: errmsg("parallel option is supported only for COPY FROM"),

(v)

+ errno = 0; /* To distinguish success/failure after call */
+ val = strtol(str, &endptr, 10);
+
+ /* Check for various possible errors */
+ if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN))
+ || (errno != 0 && val == 0) ||
+ *endptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("improper use of argument to option \"%s\"",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+
+ if (endptr == str)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("no digits were found in argument to option \"%s\"",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+
+ cstate->nworkers = (int) val;
+
+ if (cstate->nworkers <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a positive integer greater than zero",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));

I think this validation code needs to be improved, including the error messages (e.g. when can a "positive integer" NOT be greater than zero?)

There is some overlap in the "no digits were found" case between the two conditions above, depending, for example, if the argument is quoted.
Also, "improper use of argument to option" sounds a bit odd and vague to me.
Finally, not range checking before casting long to int can lead to allowing out-of-range int values like in the following case:

test=# copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2147483648');
ERROR: argument to option "parallel" must be a positive integer greater than zero
LINE 1: copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2...
^
BUT the following is allowed...

test=# copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2147483649');
COPY 1000000

I'd suggest to change the above validation code to do similar validation to that for the CREATE TABLE parallel_workers storage parameter (case RELOPT_TYPE_INT in reloptions.c). Like that code, wouldn't it be best to range-check the integer option value to be within a reasonable range, say 1 to 1024, with a corresponding errdetail message if possible?

(3) v2-0003-Allow-copy-from-command-to-process-data-from-file.patch

(i)

Patch comment says:

"This feature allows the copy from to leverage multiple CPUs in order to copy
data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM
command where the user can specify the number of workers that can be used
to perform the COPY FROM command. Specifying zero as number of workers will
disable parallelism."

BUT - the changes to ProcessCopyOptions() specified in "v2-0002-Framework-for-leader-worker-in-parallel-copy.patch" do not allow zero workers to be specified - you get an error in that case. Patch comment should be updated accordingly.

(ii)

#define GETPROCESSED(processed) \
-return processed;
+if (!IsParallelCopy()) \
+ return processed; \
+else \
+ return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed);
+

I think GETPROCESSED would be better named "RETURNPROCESSED".

(iii)

The below comment seems out- of-date with the current code - is it referring to the loop embedded at the bottom of the current loop that the comment is within?

+ /*
+ * There is a possibility that the above loop has come out because
+ * data_blk_ptr->curr_blk_completed is set, but dataSize read might
+ * be an old value, if data_blk_ptr->curr_blk_completed and the line is
+ * completed, line_size will be set. Read the line_size again to be
+ * sure if it is complete or partial block.
+ */

(iv)

I may be wrong here, but in the following block of code, isn't there a window of opportunity (however small) in which the line_state might be updated (LINE_WORKER_PROCESSED) by another worker just AFTER pg_atomic_read_u32() returns the current line_state which is put into curr_line_state, such that a write_pos update might be missed? And then a race-condition exists for reading/setting line_size (since line_size gets atomically set after line_state is set)?
If I am wrong in thinking this synchronization might not be correct, maybe the comments could be improved here to explain how this code is safe in that respect.

+ /* Get the current line information. */
+ lineInfo = &pcshared_info->line_boundaries.ring[write_pos];
+ curr_line_state = pg_atomic_read_u32(&lineInfo->line_state);
+ if ((write_pos % WORKER_CHUNK_COUNT == 0) &&
+ (curr_line_state == LINE_WORKER_PROCESSED ||
+ curr_line_state == LINE_WORKER_PROCESSING))
+ {
+ pcdata->worker_processed_pos = write_pos;
+ write_pos = (write_pos + WORKER_CHUNK_COUNT) % RINGSIZE;
+ continue;
+ }
+
+ /* Get the size of this line. */
+ dataSize = pg_atomic_read_u32(&lineInfo->line_size);
+
+ if (dataSize != 0) /* If not an empty line. */
+ {
+ /* Get the block information. */
+ data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block];
+
+ if (!data_blk_ptr->curr_blk_completed && (dataSize == -1))
+ {
+ /* Wait till the current line or block is added. */
+ COPY_WAIT_TO_PROCESS()
+ continue;
+ }
+ }
+
+ /* Make sure that no worker has consumed this element. */
+ if (pg_atomic_compare_exchange_u32(&lineInfo->line_state,
+ &line_state, LINE_WORKER_PROCESSING))
+ break;

(4) v2-0004-Documentation-for-parallel-copy.patch

(i) I think that it is necessary to mention the "max_worker_processes" option in the description of the COPY statement PARALLEL option.

For example, something like:

+ Perform <command>COPY FROM</command> in parallel using <replaceable
+ class="parameter"> integer</replaceable> background workers. Please
+ note that it is not guaranteed that the number of parallel workers
+ specified in <replaceable class="parameter">integer</replaceable> will
+ be used during execution. It is possible for a copy to run with fewer
+ workers than specified, or even with no workers at all (for example,
+ due to the setting of max_worker_processes). This option is allowed
+ only in <command>COPY FROM</command>.

(5) v2-0005-Tests-for-parallel-copy.patch

(i) None of the provided tests seem to test beyond "PARALLEL 2"

(6) v2-0006-Parallel-Copy-For-Binary-Format-Files.patch

(i) In the ParallelCopyFrom() function, "cstate->raw_buf" is pfree()d:

+ /* raw_buf is not used in parallel copy, instead data blocks are used.*/
+ pfree(cstate->raw_buf);

This comment doesn't seem to be entirely true.
At least for text/csv file COPY FROM, cstate->raw_buf is subsequently referenced in the SetRawBufForLoad() function, which is called by CopyReadLineText():

cur_data_blk_ptr = (cstate->raw_buf) ? &pcshared_info->data_blocks[cur_block_pos] : NULL;

So I think cstate->raw_buf should be set to NULL after being pfree()d, and the comment fixed/adjusted.

(ii) This patch adds some macros (involving parallel copy checks) AFTER the comment:

/* End parallel copy Macros */

Regards,
Greg Nancarrow
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Langote 2020-08-12 03:51:35 Re: posgres 12 bug (partitioned table)
Previous Message tsunakawa.takay@fujitsu.com 2020-08-12 02:57:58 RE: Can I test Extended Query in core test framework