Re: Parallel copy

From: vignesh C <vignesh21(at)gmail(dot)com>
To: Greg Nancarrow <gregn4422(at)gmail(dot)com>
Cc: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
Subject: Re: Parallel copy
Date: 2020-08-14 15:48:13
Message-ID: CALDaNm3GaZyYPpGu-PpF0SEkJg-eaW3TboHxpxJ-2criv2j_eA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Thanks Greg for reviewing the patch. Please find my thoughts for your comments.

On Wed, Aug 12, 2020 at 9:10 AM Greg Nancarrow <gregn4422(at)gmail(dot)com> wrote:
> I have done some ad-hoc testing of the patch using parallel copies from text/csv/binary files and have not yet struck any execution problems other than some option validation and associated error messages on boundary cases.
>
> One general question that I have: is there a user benefit (over the normal non-parallel COPY) to allowing "COPY ... FROM ... WITH (PARALLEL 1)"?
>

There will be marginal improvement as worker only need to process the
data, need not do the file reading, file reading would have been done
by the main process. The real improvement can be seen from 2 workers
onwards.

>
> My following comments are broken down by patch:
>
> (1) v2-0001-Copy-code-readjustment-to-support-parallel-copy.patch
>
> (i) Whilst I can't entirely blame these patches for it (as they are following what is already there), I can't help noticing the use of numerous macros in src/backend/commands/copy.c which paste in multiple lines of code in various places.
> It's getting a little out-of-hand. Surely the majority of these would be best inline functions instead?
> Perhaps hasn't been done because too many parameters need to be passed - thoughts?
>

I felt they have used macros mainly because it has a tight loop and
having macros gives better performance. I have added the macros
CLEAR_EOL_LINE, INCREMENTPROCESSED & GETPROCESSED as there will be
slight difference in parallel copy & non parallel copy for these. In
the remaining patches the macor will be extended to include parallel
copy logic. Instead of having checks in the core logic, thought of
keeping as macros so that the readability is good.

>
> (2) v2-0002-Framework-for-leader-worker-in-parallel-copy.patch
>
> (i) minor point: there are some tabbing/spacing issues in this patch (and the other patches), affecting alignment.
> e.g. mixed tabs/spaces and misalignment in PARALLEL_COPY_KEY_xxx definitions
>

Fixed

> (ii)
>
> +/*
> + * Each worker will be allocated WORKER_CHUNK_COUNT of records from DSM data
> + * block to process to avoid lock contention. This value should be mode of
> + * RINGSIZE, as wrap around cases is currently not handled while selecting the
> + * WORKER_CHUNK_COUNT by the worker.
> + */
> +#define WORKER_CHUNK_COUNT 50
>
>
> "This value should be mode of RINGSIZE ..."
>
> -> typo: mode (mod? should evenly divide into RINGSIZE?)

Fixed, changed it to divisible by.

> (iii)
> + * using pg_atomic_compare_exchange_u32, worker will change the sate to
>
> ->typo: sate (should be "state")

Fixed

> (iv)
>
> + errmsg("parallel option supported only for copy from"),
>
> -> suggest change to: errmsg("parallel option is supported only for COPY FROM"),
>

Fixed

> (v)
>
> + errno = 0; /* To distinguish success/failure after call */
> + val = strtol(str, &endptr, 10);
> +
> + /* Check for various possible errors */
> + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN))
> + || (errno != 0 && val == 0) ||
> + *endptr)
> + ereport(ERROR,
> + (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> + errmsg("improper use of argument to option \"%s\"",
> + defel->defname),
> + parser_errposition(pstate, defel->location)));
> +
> + if (endptr == str)
> + ereport(ERROR,
> + (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> + errmsg("no digits were found in argument to option \"%s\"",
> + defel->defname),
> + parser_errposition(pstate, defel->location)));
> +
> + cstate->nworkers = (int) val;
> +
> + if (cstate->nworkers <= 0)
> + ereport(ERROR,
> + (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> + errmsg("argument to option \"%s\" must be a positive integer greater than zero",
> + defel->defname),
> + parser_errposition(pstate, defel->location)));
>
>
> I think this validation code needs to be improved, including the error messages (e.g. when can a "positive integer" NOT be greater than zero?)
>
> There is some overlap in the "no digits were found" case between the two conditions above, depending, for example, if the argument is quoted.
> Also, "improper use of argument to option" sounds a bit odd and vague to me.
> Finally, not range checking before casting long to int can lead to allowing out-of-range int values like in the following case:
>
> test=# copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2147483648');
> ERROR: argument to option "parallel" must be a positive integer greater than zero
> LINE 1: copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2...
> ^
> BUT the following is allowed...
>
> test=# copy mytable from '/myspace/test_pcopy/tmp.dat' (parallel '-2147483649');
> COPY 1000000
>
>
> I'd suggest to change the above validation code to do similar validation to that for the CREATE TABLE parallel_workers storage parameter (case RELOPT_TYPE_INT in reloptions.c). Like that code, wouldn't it be best to range-check the integer option value to be within a reasonable range, say 1 to 1024, with a corresponding errdetail message if possible?
>

Fixed, changed as suggested.

> (3) v2-0003-Allow-copy-from-command-to-process-data-from-file.patch
>
> (i)
>
> Patch comment says:
>
> "This feature allows the copy from to leverage multiple CPUs in order to copy
> data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM
> command where the user can specify the number of workers that can be used
> to perform the COPY FROM command. Specifying zero as number of workers will
> disable parallelism."
>
> BUT - the changes to ProcessCopyOptions() specified in "v2-0002-Framework-for-leader-worker-in-parallel-copy.patch" do not allow zero workers to be specified - you get an error in that case. Patch comment should be updated accordingly.
>

Removed "Specifying zero as number of workers will disable
parallelism". As the new value is range from 1 to 1024.

> (ii)
>
> #define GETPROCESSED(processed) \
> -return processed;
> +if (!IsParallelCopy()) \
> + return processed; \
> +else \
> + return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed);
> +
>
> I think GETPROCESSED would be better named "RETURNPROCESSED".
>

Fixed.

> (iii)
>
> The below comment seems out- of-date with the current code - is it referring to the loop embedded at the bottom of the current loop that the comment is within?
>
> + /*
> + * There is a possibility that the above loop has come out because
> + * data_blk_ptr->curr_blk_completed is set, but dataSize read might
> + * be an old value, if data_blk_ptr->curr_blk_completed and the line is
> + * completed, line_size will be set. Read the line_size again to be
> + * sure if it is complete or partial block.
> + */
>

Updated, it is referring to the embedded loop at the bottom of the current loop.

> (iv)
>
> I may be wrong here, but in the following block of code, isn't there a window of opportunity (however small) in which the line_state might be updated (LINE_WORKER_PROCESSED) by another worker just AFTER pg_atomic_read_u32() returns the current line_state which is put into curr_line_state, such that a write_pos update might be missed? And then a race-condition exists for reading/setting line_size (since line_size gets atomically set after line_state is set)?
> If I am wrong in thinking this synchronization might not be correct, maybe the comments could be improved here to explain how this code is safe in that respect.
>
>
> + /* Get the current line information. */
> + lineInfo = &pcshared_info->line_boundaries.ring[write_pos];
> + curr_line_state = pg_atomic_read_u32(&lineInfo->line_state);
> + if ((write_pos % WORKER_CHUNK_COUNT == 0) &&
> + (curr_line_state == LINE_WORKER_PROCESSED ||
> + curr_line_state == LINE_WORKER_PROCESSING))
> + {
> + pcdata->worker_processed_pos = write_pos;
> + write_pos = (write_pos + WORKER_CHUNK_COUNT) % RINGSIZE;
> + continue;
> + }
> +
> + /* Get the size of this line. */
> + dataSize = pg_atomic_read_u32(&lineInfo->line_size);
> +
> + if (dataSize != 0) /* If not an empty line. */
> + {
> + /* Get the block information. */
> + data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block];
> +
> + if (!data_blk_ptr->curr_blk_completed && (dataSize == -1))
> + {
> + /* Wait till the current line or block is added. */
> + COPY_WAIT_TO_PROCESS()
> + continue;
> + }
> + }
> +
> + /* Make sure that no worker has consumed this element. */
> + if (pg_atomic_compare_exchange_u32(&lineInfo->line_state,
> + &line_state, LINE_WORKER_PROCESSING))
> + break;
>

This is not possible because of pg_atomic_compare_exchange_u32, this
will succeed only for one of the workers whose line_state is
LINE_LEADER_POPULATED, for other workers it will fail. This is
explained in detail above ParallelCopyLineBoundary.

>
> (4) v2-0004-Documentation-for-parallel-copy.patch
>
> (i) I think that it is necessary to mention the "max_worker_processes" option in the description of the COPY statement PARALLEL option.
>
> For example, something like:
>
> + Perform <command>COPY FROM</command> in parallel using <replaceable
> + class="parameter"> integer</replaceable> background workers. Please
> + note that it is not guaranteed that the number of parallel workers
> + specified in <replaceable class="parameter">integer</replaceable> will
> + be used during execution. It is possible for a copy to run with fewer
> + workers than specified, or even with no workers at all (for example,
> + due to the setting of max_worker_processes). This option is allowed
> + only in <command>COPY FROM</command>.
>

Fixed.

> (5) v2-0005-Tests-for-parallel-copy.patch
>
> (i) None of the provided tests seem to test beyond "PARALLEL 2"
>

I intentionally ran with 1 parallel worker, because when you specify
more than 1 parallel worker the order of record insertion can vary &
there may be random failures.

>
> (6) v2-0006-Parallel-Copy-For-Binary-Format-Files.patch
>
> (i) In the ParallelCopyFrom() function, "cstate->raw_buf" is pfree()d:
>
> + /* raw_buf is not used in parallel copy, instead data blocks are used.*/
> + pfree(cstate->raw_buf);
>

raw_buf is not used in parallel copy, instead raw_buf will be pointing
to shared memory data blocks. This memory was allocated as part of
BeginCopyFrom, uptil this point we cannot be 100% sure as copy can be
performed sequentially like in case max_worker_processes is not
available, if it switches to sequential mode raw_buf will be used
while performing copy operation. At this place we can safely free this
memory that was allocated.

> This comment doesn't seem to be entirely true.
> At least for text/csv file COPY FROM, cstate->raw_buf is subsequently referenced in the SetRawBufForLoad() function, which is called by CopyReadLineText():
>
> cur_data_blk_ptr = (cstate->raw_buf) ? &pcshared_info->data_blocks[cur_block_pos] : NULL;
>
> So I think cstate->raw_buf should be set to NULL after being pfree()d, and the comment fixed/adjusted.
>
>
> (ii) This patch adds some macros (involving parallel copy checks) AFTER the comment:
>
> /* End parallel copy Macros */

Fixed, moved the macros above the comment.

I have attached new set of patches with the fixes.
Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com

Attachment Content-Type Size
v3-0001-Copy-code-readjustment-to-support-parallel-copy.patch text/x-patch 16.7 KB
v3-0002-Framework-for-leader-worker-in-parallel-copy.patch text/x-patch 31.0 KB
v3-0003-Allow-copy-from-command-to-process-data-from-file.patch text/x-patch 43.0 KB
v3-0004-Documentation-for-parallel-copy.patch text/x-patch 2.0 KB
v3-0005-Tests-for-parallel-copy.patch text/x-patch 19.7 KB
v3-0006-Parallel-Copy-For-Binary-Format-Files.patch text/x-patch 27.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message mohand oubelkacem makhoukhene 2020-08-14 15:48:50 Implement a new data type
Previous Message Arseny Sher 2020-08-14 15:31:24 Re: Parallel query hangs after a smart shutdown is issued