Re: Parallel copy

From: Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>
To: vignesh C <vignesh21(at)gmail(dot)com>
Cc: Andres Freund <andres(at)anarazel(dot)de>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Ants Aasma <ants(at)cybertec(dot)at>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Alastair Turner <minion(at)decodable(dot)me>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Parallel copy
Date: 2020-06-19 12:11:12
Message-ID: CAE9k0PnNLAFkT2CwvyNjW6aqt=tQU3yL8O-HTQVaMcA9jEpWiA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

I just got some time to review the first patch in the list i.e.
0001-Copy-code-readjustment-to-support-parallel-copy.patch. As the patch
name suggests, it is just trying to reshuffle the existing code for COPY
command here and there. There is no extra changes added in the patch as
such, but still I do have some review comments, please have a look:

1) Can you please add some comments atop the new function
PopulateAttributes() describing its functionality in detail. Further, this
new function contains the code from BeginCopy() to set attribute level
options used with COPY FROM such as FORCE_QUOTE, FORCE_NOT_NULL, FORCE_NULL
etc. in cstate and along with that it also copies the code from BeginCopy()
to set other infos such as client encoding type, encoding conversion etc.
Hence, I think it would be good to give it some better name, basically
something that matches with what actually it is doing.

2) Again, the name for the new function CheckCopyFromValidity() doesn't
look good to me. From the function name it appears as if it does the sanity
check of the entire COPY FROM command, but actually it is just doing the
sanity check for the target relation specified with COPY FROM. So, probably
something like CheckTargetRelValidity would look more sensible, I think?
TBH, I am not good at naming the functions so you can always ignore my
suggestions about function and variable names :)

3) Any reason for not making CheckCopyFromValidity as a macro instead of a
new function. It is just doing the sanity check for the target relation.

4) Earlier in CopyReadLine() function while trying to clear the EOL marker
from cstate->line_buf.data (copied data), we were not checking if the line
read by CopyReadLineText() function is a header line or not, but I can see
that your patch checks that before clearing the EOL marker. Any reason for
this extra check?

5) I noticed the below spurious line removal in the patch.

@@ -3839,7 +3953,6 @@ static bool
CopyReadLine(CopyState cstate)
{
bool result;
-

Please note that I haven't got a chance to look into other patches as of
now. I will do that whenever possible. Thank you.

--
With Regards,
Ashutosh Sharma
EnterpriseDB:http://www.enterprisedb.com

On Fri, Jun 12, 2020 at 11:01 AM vignesh C <vignesh21(at)gmail(dot)com> wrote:

> On Thu, Jun 4, 2020 at 12:44 AM Andres Freund <andres(at)anarazel(dot)de> wrote
> >
> >
> > Hm. you don't explicitly mention that in your design, but given how
> > small the benefits going from 0-1 workers is, I assume the leader
> > doesn't do any "chunk processing" on its own?
> >
>
> Yes you are right, the leader does not do any processing, Leader's
> work is mainly to populate the shared memory with the offset
> information for each record.
>
> >
> >
> > > Design of the Parallel Copy: The backend, to which the "COPY FROM"
> query is
> > > submitted acts as leader with the responsibility of reading data from
> the
> > > file/stdin, launching at most n number of workers as specified with
> > > PARALLEL 'n' option in the "COPY FROM" query. The leader populates the
> > > common data required for the workers execution in the DSM and shares it
> > > with the workers. The leader then executes before statement triggers if
> > > there exists any. Leader populates DSM chunks which includes the start
> > > offset and chunk size, while populating the chunks it reads as many
> blocks
> > > as required into the DSM data blocks from the file. Each block is of
> 64K
> > > size. The leader parses the data to identify a chunk, the existing
> logic
> > > from CopyReadLineText which identifies the chunks with some changes was
> > > used for this. Leader checks if a free chunk is available to copy the
> > > information, if there is no free chunk it waits till the required
> chunk is
> > > freed up by the worker and then copies the identified chunks
> information
> > > (offset & chunk size) into the DSM chunks. This process is repeated
> till
> > > the complete file is processed. Simultaneously, the workers cache the
> > > chunks(50) locally into the local memory and release the chunks to the
> > > leader for further populating. Each worker processes the chunk that it
> > > cached and inserts it into the table. The leader waits till all the
> chunks
> > > populated are processed by the workers and exits.
> >
> > Why do we need the local copy of 50 chunks? Copying memory around is far
> > from free. I don't see why it'd be better to add per-process caching,
> > rather than making the DSM bigger? I can see some benefit in marking
> > multiple chunks as being processed with one lock acquisition, but I
> > don't think adding a memory copy is a good idea.
>
> We had run performance with csv data file, 5.1GB, 10million tuples, 2
> indexes on integer columns, results for the same are given below. We
> noticed in some cases the performance is better if we copy the 50
> records locally and release the shared memory. We will get better
> benefits as the workers increase. Thoughts?
>
> ------------------------------------------------------------------------------------------------
> Workers | Exec time (With local copying | Exec time (Without copying,
> | 50 records & release the | processing record by
> record)
> | shared memory) |
>
> ------------------------------------------------------------------------------------------------
> 0 | 1162.772(1X) | 1152.684(1X)
> 2 | 635.249(1.83X) | 647.894(1.78X)
> 4 | 336.835(3.45X) | 335.534(3.43X)
> 8 | 188.577(6.17 X) | 189.461(6.08X)
> 16 | 126.819(9.17X) | 142.730(8.07X)
> 20 | 117.845(9.87X) | 146.533(7.87X)
> 30 | 127.554(9.11X) | 160.307(7.19X)
>
> > This patch *desperately* needs to be split up. It imo is close to
> > unreviewable, due to a large amount of changes that just move code
> > around without other functional changes being mixed in with the actual
> > new stuff.
>
> I have split the patch, the new split patches are attached.
>
> >
> >
> >
> > > /*
> > > + * State of the chunk.
> > > + */
> > > +typedef enum ChunkState
> > > +{
> > > + CHUNK_INIT, /* initial state
> of chunk */
> > > + CHUNK_LEADER_POPULATING, /* leader processing chunk */
> > > + CHUNK_LEADER_POPULATED, /* leader completed populating
> chunk */
> > > + CHUNK_WORKER_PROCESSING, /* worker processing chunk */
> > > + CHUNK_WORKER_PROCESSED /* worker completed processing
> chunk */
> > > +}ChunkState;
> > > +
> > > +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1
> bytes */
> > > +
> > > +#define DATA_BLOCK_SIZE RAW_BUF_SIZE
> > > +#define RINGSIZE (10 * 1000)
> > > +#define MAX_BLOCKS_COUNT 1000
> > > +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */
> > > +
> > > +#define IsParallelCopy() (cstate->is_parallel)
> > > +#define IsLeader()
> (cstate->pcdata->is_leader)
> > > +#define IsHeaderLine() (cstate->header_line &&
> cstate->cur_lineno == 1)
> > > +
> > > +/*
> > > + * Copy data block information.
> > > + */
> > > +typedef struct CopyDataBlock
> > > +{
> > > + /* The number of unprocessed chunks in the current block. */
> > > + pg_atomic_uint32 unprocessed_chunk_parts;
> > > +
> > > + /*
> > > + * If the current chunk data is continued into another block,
> > > + * following_block will have the position where the remaining
> data need to
> > > + * be read.
> > > + */
> > > + uint32 following_block;
> > > +
> > > + /*
> > > + * This flag will be set, when the leader finds out this block
> can be read
> > > + * safely by the worker. This helps the worker to start
> processing the chunk
> > > + * early where the chunk will be spread across many blocks and
> the worker
> > > + * need not wait for the complete chunk to be processed.
> > > + */
> > > + bool curr_blk_completed;
> > > + char data[DATA_BLOCK_SIZE + 1]; /* data read from file */
> > > +}CopyDataBlock;
> >
> > What's the + 1 here about?
>
> Fixed this, removed +1. That is not needed.
>
> >
> >
> > > +/*
> > > + * Parallel copy line buffer information.
> > > + */
> > > +typedef struct ParallelCopyLineBuf
> > > +{
> > > + StringInfoData line_buf;
> > > + uint64 cur_lineno; /* line number
> for error messages */
> > > +}ParallelCopyLineBuf;
> >
> > Why do we need separate infrastructure for this? We shouldn't duplicate
> > infrastructure unnecessarily.
> >
>
> This was required for copying the multiple records locally and
> releasing the shared memory. I have not changed this, will decide on
> this based on the decision taken for one of the previous comments.
>
> >
> >
> >
> > > +/*
> > > + * Common information that need to be copied to shared memory.
> > > + */
> > > +typedef struct CopyWorkerCommonData
> > > +{
> >
> > Why is parallel specific stuff here suddenly not named ParallelCopy*
> > anymore? If you introduce a naming like that it imo should be used
> > consistently.
>
> Fixed, changed to maintain ParallelCopy in all structs.
>
> >
> > > + /* low-level state data */
> > > + CopyDest copy_dest; /* type of copy
> source/destination */
> > > + int file_encoding; /* file or remote side's
> character encoding */
> > > + bool need_transcoding; /* file encoding diff
> from server? */
> > > + bool encoding_embeds_ascii; /* ASCII can be
> non-first byte? */
> > > +
> > > + /* parameters from the COPY command */
> > > + bool csv_mode; /* Comma Separated Value
> format? */
> > > + bool header_line; /* CSV header line? */
> > > + int null_print_len; /* length of same */
> > > + bool force_quote_all; /* FORCE_QUOTE *? */
> > > + bool convert_selectively; /* do selective
> binary conversion? */
> > > +
> > > + /* Working state for COPY FROM */
> > > + AttrNumber num_defaults;
> > > + Oid relid;
> > > +}CopyWorkerCommonData;
> >
> > But I actually think we shouldn't have this information in two different
> > structs. This should exist once, independent of using parallel /
> > non-parallel copy.
> >
>
> This structure helps in storing the common data from CopyStateData
> that are required by the workers. This information will then be
> allocated and stored into the DSM for the worker to retrieve and copy
> it to CopyStateData.
>
> >
> > > +/* List information */
> > > +typedef struct ListInfo
> > > +{
> > > + int count; /* count of attributes */
> > > +
> > > + /* string info in the form info followed by info1, info2...
> infon */
> > > + char info[1];
> > > +} ListInfo;
> >
> > Based on these comments I have no idea what this could be for.
> >
>
> Have added better comments for this. The following is added: This
> structure will help in converting a List data type into the below
> structure format with the count having the number of elements in the
> list and the info having the List elements appended contiguously. This
> converted structure will be allocated in shared memory and stored in
> DSM for the worker to retrieve and later convert it back to List data
> type.
>
> >
> > > /*
> > > - * This keeps the character read at the top of the loop in the buffer
> > > - * even if there is more than one read-ahead.
> > > + * This keeps the character read at the top of the loop in the buffer
> > > + * even if there is more than one read-ahead.
> > > + */
> > > +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
> > > +if (1) \
> > > +{ \
> > > + if (copy_buff_state.raw_buf_ptr + (extralen) >=
> copy_buff_state.copy_buf_len && !hit_eof) \
> > > + { \
> > > + if (IsParallelCopy()) \
> > > + { \
> > > + copy_buff_state.chunk_size = prev_chunk_size; /*
> update previous chunk size */ \
> > > + if (copy_buff_state.block_switched) \
> > > + { \
> > > +
> pg_atomic_sub_fetch_u32(&copy_buff_state.data_blk_ptr->unprocessed_chunk_parts,
> 1); \
> > > + copy_buff_state.copy_buf_len =
> prev_copy_buf_len; \
> > > + } \
> > > + } \
> > > + copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo
> fetch */ \
> > > + need_data = true; \
> > > + continue; \
> > > + } \
> > > +} else ((void) 0)
> >
> > I think it's an absolutely clear no-go to add new branches to
> > these. They're *really* hot already, and this is going to sprinkle a
> > significant amount of new instructions over a lot of places.
> >
>
> Fixed, removed this.
>
> >
> >
> > > +/*
> > > + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the
> file data must
> > > + * be read.
> > > + */
> > > +#define SET_RAWBUF_FOR_LOAD() \
> > > +{ \
> > > + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \
> > > + uint32 cur_block_pos; \
> > > + /* \
> > > + * Mark the previous block as completed, worker can start
> copying this data. \
> > > + */ \
> > > + if (copy_buff_state.data_blk_ptr !=
> copy_buff_state.curr_data_blk_ptr && \
> > > + copy_buff_state.data_blk_ptr->curr_blk_completed ==
> false) \
> > > + copy_buff_state.data_blk_ptr->curr_blk_completed = true;
> \
> > > + \
> > > + copy_buff_state.data_blk_ptr =
> copy_buff_state.curr_data_blk_ptr; \
> > > + cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \
> > > + copy_buff_state.curr_data_blk_ptr =
> &pcshared_info->data_blocks[cur_block_pos]; \
> > > + \
> > > + if (!copy_buff_state.data_blk_ptr) \
> > > + { \
> > > + copy_buff_state.data_blk_ptr =
> copy_buff_state.curr_data_blk_ptr; \
> > > + chunk_first_block = cur_block_pos; \
> > > + } \
> > > + else if (need_data == false) \
> > > + copy_buff_state.data_blk_ptr->following_block =
> cur_block_pos; \
> > > + \
> > > + cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \
> > > + copy_buff_state.copy_raw_buf = cstate->raw_buf; \
> > > +}
> > > +
> > > +/*
> > > + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared
> memory.
> > > + */
> > > +#define END_CHUNK_PARALLEL_COPY() \
> > > +{ \
> > > + if (!IsHeaderLine()) \
> > > + { \
> > > + ShmCopyInfo *pcshared_info =
> cstate->pcdata->pcshared_info; \
> > > + ChunkBoundaries *chunkBoundaryPtr =
> &pcshared_info->chunk_boundaries; \
> > > + if (copy_buff_state.chunk_size) \
> > > + { \
> > > + ChunkBoundary *chunkInfo =
> &chunkBoundaryPtr->ring[chunk_pos]; \
> > > + /* \
> > > + * If raw_buf_ptr is zero,
> unprocessed_chunk_parts would have been \
> > > + * incremented in SEEK_COPY_BUFF_POS. This will
> happen if the whole \
> > > + * chunk finishes at the end of the current
> block. If the \
> > > + * new_line_size > raw_buf_ptr, then the new
> block has only new line \
> > > + * char content. The unprocessed count should
> not be increased in \
> > > + * this case. \
> > > + */ \
> > > + if (copy_buff_state.raw_buf_ptr != 0 && \
> > > + copy_buff_state.raw_buf_ptr >
> new_line_size) \
> > > +
> pg_atomic_add_fetch_u32(&copy_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts,
> 1); \
> > > + \
> > > + /* Update chunk size. */ \
> > > + pg_atomic_write_u32(&chunkInfo->chunk_size,
> copy_buff_state.chunk_size); \
> > > + pg_atomic_write_u32(&chunkInfo->chunk_state,
> CHUNK_LEADER_POPULATED); \
> > > + elog(DEBUG1, "[Leader] After adding - chunk
> position:%d, chunk_size:%d", \
> > > + chunk_pos,
> copy_buff_state.chunk_size); \
> > > + pcshared_info->populated++; \
> > > + } \
> > > + else if (new_line_size) \
> > > + { \
> > > + /* \
> > > + * This means only new line char, empty record
> should be \
> > > + * inserted. \
> > > + */ \
> > > + ChunkBoundary *chunkInfo; \
> > > + chunk_pos = UpdateBlockInChunkInfo(cstate, -1,
> -1, 0, \
> > > +
> CHUNK_LEADER_POPULATED); \
> > > + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> > > + elog(DEBUG1, "[Leader] Added empty chunk with
> offset:%d, chunk position:%d, chunk size:%d", \
> > > +
> chunkInfo->start_offset, chunk_pos, \
> > > +
> pg_atomic_read_u32(&chunkInfo->chunk_size)); \
> > > + pcshared_info->populated++; \
> > > + } \
> > > + }\
> > > + \
> > > + /*\
> > > + * All of the read data is processed, reset index & len. In the\
> > > + * subsequent read, we will get a new block and copy data in to
> the\
> > > + * new block.\
> > > + */\
> > > + if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\
> > > + {\
> > > + cstate->raw_buf_index = 0;\
> > > + cstate->raw_buf_len = 0;\
> > > + }\
> > > + else\
> > > + cstate->raw_buf_len = copy_buff_state.copy_buf_len;\
> > > +}
> >
> > Why are these macros? They are way way way above a length where that
> > makes any sort of sense.
> >
>
> Converted these macros to functions.
>
>
> Regards,
> Vignesh
> EnterpriseDB: http://www.enterprisedb.com
>

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Juan José Santamaría Flecha 2020-06-19 12:12:32 Re: missing support for Microsoft VSS Writer
Previous Message Julien Rouhaud 2020-06-19 11:38:24 Re: doing something about the broken dynloader.h symlink