Re: Parallel copy

From: Rafia Sabih <rafia(dot)pghackers(at)gmail(dot)com>
To: Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
Cc: vignesh C <vignesh21(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, Robert Haas <robertmhaas(at)gmail(dot)com>, Ants Aasma <ants(at)cybertec(dot)at>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Alastair Turner <minion(at)decodable(dot)me>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Parallel copy
Date: 2020-07-12 09:57:29
Message-ID: CA+FpmFcJ8ZxGg7rZr74F9BM0XVG8LvFFyM0zgALZOmF-Ox8Y=g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Sat, 11 Jul 2020 at 08:55, Bharath Rupireddy
<bharath(dot)rupireddyforpostgres(at)gmail(dot)com> wrote:
>
> Thanks Vignesh for the review. Addressed the comments in 0006 patch.
>
> >
> > we can create a local variable and use in place of
> > cstate->pcdata->curr_data_block.
>
> Done.
>
> > + if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
> > + AdjustFieldInfo(cstate, 1);
> > +
> > + memcpy(&fld_count,
> > &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
> > sizeof(fld_count));
> > Should this be like below, as the remaining size can fit in current block:
> > if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE)
> >
> > + if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1))
> > + {
> > + AdjustFieldInfo(cstate, 2);
> > + *new_block_pos = pcshared_info->cur_block_pos;
> > + }
> > Same like above.
>
> Yes you are right. Changed.
>
> >
> > + movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index;
> > +
> > + cstate->pcdata->curr_data_block->skip_bytes = movebytes;
> > +
> > + data_block = &pcshared_info->data_blocks[block_pos];
> > +
> > + if (movebytes > 0)
> > Instead of the above check, we can have an assert check for movebytes.
>
> No, we can't use assert here. For the edge case where the current data
> block is full to the size DATA_BLOCK_SIZE, then movebytes will be 0,
> but we need to get a new data block. We avoid memmove by having
> movebytes>0 check.
>
> > + if (mode == 1)
> > + {
> > + cstate->pcdata->curr_data_block = data_block;
> > + cstate->raw_buf_index = 0;
> > + }
> > + else if(mode == 2)
> > + {
> > + ParallelCopyDataBlock *prev_data_block = NULL;
> > + prev_data_block = cstate->pcdata->curr_data_block;
> > + prev_data_block->following_block = block_pos;
> > + cstate->pcdata->curr_data_block = data_block;
> > +
> > + if (prev_data_block->curr_blk_completed == false)
> > + prev_data_block->curr_blk_completed = true;
> > +
> > + cstate->raw_buf_index = 0;
> > + }
> >
> > This code is common for both, keep in common flow and remove if (mode == 1)
> > cstate->pcdata->curr_data_block = data_block;
> > cstate->raw_buf_index = 0;
> >
>
> Done.
>
> > +#define CHECK_FIELD_COUNT \
> > +{\
> > + if (fld_count == -1) \
> > + { \
> > + if (IsParallelCopy() && \
> > + !IsLeader()) \
> > + return true; \
> > + else if (IsParallelCopy() && \
> > + IsLeader()) \
> > + { \
> > + if
> > (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index +
> > sizeof(fld_count)] != 0) \
> > + ereport(ERROR, \
> > +
> > (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \
> > + errmsg("received copy
> > data after EOF marker"))); \
> > + return true; \
> > + } \
> > We only copy sizeof(fld_count), Shouldn't we check fld_count !=
> > cstate->max_fields? Am I missing something here?
>
> fld_count != cstate->max_fields check is done after the above checks.
>
> > + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size)
> > + {
> > + cstate->raw_buf_index = cstate->raw_buf_index + fld_size;
> > + }
> > We can keep the check like cstate->raw_buf_index + fld_size < ..., for
> > better readability and consistency.
> >
>
> I think this is okay. It gives a good meaning that available bytes in
> the current data block is greater or equal to fld_size then, the tuple
> lies in the current data block.
>
> > +static pg_attribute_always_inline void
> > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
> > + Oid typioparam, int32 typmod, uint32 *new_block_pos,
> > + int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
> > + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
> > flinfo, typioparam & typmod is not used, we can remove the parameter.
> >
>
> Done.
>
> > +static pg_attribute_always_inline void
> > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
> > + Oid typioparam, int32 typmod, uint32 *new_block_pos,
> > + int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
> > + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
> > I felt this function need not be an inline function.
>
> Yes. Changed.
>
> >
> > + /* binary format */
> > + /* for paralle copy leader, fill in the error
> > There are some typos, run spell check
>
> Done.
>
> >
> > + /* raw_buf_index should never cross data block size,
> > + * as the required number of data blocks would have
> > + * been obtained in the above while loop.
> > + */
> > There are few places, commenting style should be changed to postgres style
>
> Changed.
>
> >
> > + if (cstate->pcdata->curr_data_block == NULL)
> > + {
> > + block_pos = WaitGetFreeCopyBlock(pcshared_info);
> > +
> > + cstate->pcdata->curr_data_block =
> > &pcshared_info->data_blocks[block_pos];
> > +
> > + cstate->raw_buf_index = 0;
> > +
> > + readbytes = CopyGetData(cstate,
> > &cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE);
> > +
> > + elog(DEBUG1, "LEADER - bytes read from file %d", readbytes);
> > +
> > + if (cstate->reached_eof)
> > + return true;
> > + }
> > There are many empty lines, these are not required.
> >
>
> Removed.
>
> >
> > +
> > + fld_count = (int16) pg_ntoh16(fld_count);
> > +
> > + CHECK_FIELD_COUNT;
> > +
> > + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count);
> > + new_block_pos = pcshared_info->cur_block_pos;
> > You can run pg_indent once for the changes.
> >
>
> I ran pg_indent and observed that there are many places getting
> modified by pg_indent. If we need to run pg_indet on copy.c for
> parallel copy alone, then first, we need to run on plane copy.c and
> take those changes and then run for all parallel copy files. I think
> we better run pg_indent, for all the parallel copy patches once and
> for all, maybe just before we kind of finish up all the code reviews.
>
> > + if (mode == 1)
> > + {
> > + cstate->pcdata->curr_data_block = data_block;
> > + cstate->raw_buf_index = 0;
> > + }
> > + else if(mode == 2)
> > + {
> > Could use macros for 1 & 2 for better readability.
>
> Done.
>
> >
> > +
> > + if (following_block_id == -1)
> > + break;
> > + }
> > +
> > + if (following_block_id != -1)
> > +
> > pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
> > 1);
> > +
> > + *line_size = *line_size +
> > tuple_end_info_ptr->offset + 1;
> > + }
> > We could calculate the size as we parse and identify one record, if we
> > do that way this can be removed.
> >
>
> Done.

Hi Bharath,

I was looking forward to review this patch-set but unfortunately it is
showing a reject in copy.c, and might need a rebase.
I was applying on master over the commit-
cd22d3cdb9bd9963c694c01a8c0232bbae3ddcfb.

--
Regards,
Rafia Sabih

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2020-07-12 11:59:39 Re: Towards easier AMs: Cleaning up inappropriate use of name "relkind"
Previous Message Julien Rouhaud 2020-07-12 05:48:50 Avoid useless retrieval of defaults and check constraints in pg_dump -a