Re: Parallel copy

From: vignesh C <vignesh21(at)gmail(dot)com>
To: Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, Robert Haas <robertmhaas(at)gmail(dot)com>, Ants Aasma <ants(at)cybertec(dot)at>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Alastair Turner <minion(at)decodable(dot)me>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Parallel copy
Date: 2020-06-18 13:11:57
Message-ID: CALDaNm2RAhNnT=dDRokH+1aJ-kGM4RX3EfADqGTnKRKAcj+SEw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Jun 15, 2020 at 4:39 PM Bharath Rupireddy
<bharath(dot)rupireddyforpostgres(at)gmail(dot)com> wrote:
>
> The above tests were run with the configuration attached config.txt, which is the same used for performance tests of csv/text files posted earlier in this mail chain.
>
> Request the community to take this patch up for review along with the parallel copy for csv/text file patches and provide feedback.
>

I had reviewed the patch, few comments:
+
+ /*
+ * Parallel copy for binary formatted files
+ */
+ ParallelCopyDataBlock *curr_data_block;
+ ParallelCopyDataBlock *prev_data_block;
+ uint32 curr_data_offset;
+ uint32 curr_block_pos;
+ ParallelCopyTupleInfo curr_tuple_start_info;
+ ParallelCopyTupleInfo curr_tuple_end_info;
} CopyStateData;

The new members added should be present in ParallelCopyData

+ if (cstate->curr_tuple_start_info.block_id ==
cstate->curr_tuple_end_info.block_id)
+ {
+ elog(DEBUG1,"LEADER - tuple lies in a single data block");
+
+ line_size = cstate->curr_tuple_end_info.offset -
cstate->curr_tuple_start_info.offset + 1;
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_line_parts,
1);
+ }
+ else
+ {
+ uint32 following_block_id =
pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].following_block;
+
+ elog(DEBUG1,"LEADER - tuple is spread across data blocks");
+
+ line_size = DATA_BLOCK_SIZE -
cstate->curr_tuple_start_info.offset -
+
pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_line_parts,
1);
+
+ while (following_block_id !=
cstate->curr_tuple_end_info.block_id)
+ {
+ line_size = line_size + DATA_BLOCK_SIZE -
pcshared_info->data_blocks[following_block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+ following_block_id =
pcshared_info->data_blocks[following_block_id].following_block;
+
+ if (following_block_id == -1)
+ break;
+ }
+
+ if (following_block_id != -1)
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+ line_size = line_size + cstate->curr_tuple_end_info.offset + 1;
+ }

line_size can be set as and when we process the tuple from
CopyReadBinaryTupleLeader and this can be set at the end. That way the
above code can be removed.

+
+ /*
+ * Parallel copy for binary formatted files
+ */
+ ParallelCopyDataBlock *curr_data_block;
+ ParallelCopyDataBlock *prev_data_block;
+ uint32 curr_data_offset;
+ uint32 curr_block_pos;
+ ParallelCopyTupleInfo curr_tuple_start_info;
+ ParallelCopyTupleInfo curr_tuple_end_info;
} CopyStateData;

curr_block_pos variable is present in ParallelCopyShmInfo, we could
use it and remove from here.
curr_data_offset, similar variable raw_buf_index is present in
CopyStateData, we could use it and remove from here.

+ if (cstate->curr_data_offset + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
+ {
+ ParallelCopyDataBlock *data_block = NULL;
+ uint8 movebytes = 0;
+
+ block_pos = WaitGetFreeCopyBlock(pcshared_info);
+
+ movebytes = DATA_BLOCK_SIZE - cstate->curr_data_offset;
+
+ cstate->curr_data_block->skip_bytes = movebytes;
+
+ data_block = &pcshared_info->data_blocks[block_pos];
+
+ if (movebytes > 0)
+ memmove(&data_block->data[0],
&cstate->curr_data_block->data[cstate->curr_data_offset],
+ movebytes);
+
+ elog(DEBUG1, "LEADER - field count is spread across data blocks -
moved %d bytes from current block %u to %u block",
+ movebytes, cstate->curr_block_pos, block_pos);
+
+ readbytes = CopyGetData(cstate, &data_block->data[movebytes], 1,
(DATA_BLOCK_SIZE - movebytes));
+
+ elog(DEBUG1, "LEADER - bytes read from file after field count is
moved to next data block %d", readbytes);
+
+ if (cstate->reached_eof)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("unexpected EOF in COPY data")));
+
+ cstate->curr_data_block = data_block;
+ cstate->curr_data_offset = 0;
+ cstate->curr_block_pos = block_pos;
+ }

This code is duplicate in CopyReadBinaryTupleLeader &
CopyReadBinaryAttributeLeader. We could make a function and re-use.

+/*
+ * CopyReadBinaryAttributeWorker - leader identifies boundaries/offsets
+ * for each attribute/column, it moves on to next data block if the
+ * attribute/column is spread across data blocks.
+ */
+static pg_attribute_always_inline Datum
+CopyReadBinaryAttributeWorker(CopyState cstate, int column_no,
+ FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull)
+{
+ int32 fld_size;
+ Datum result;

column_no is not used, it can be removed

+ if (fld_count == -1)
+ {
+ /*
+ * Received EOF marker. In a V3-protocol copy,
wait for the
+ * protocol-level EOF, and complain if it doesn't come
+ * immediately. This ensures that we correctly
handle CopyFail,
+ * if client chooses to send that now.
+ *
+ * Note that we MUST NOT try to read more data
in an old-protocol
+ * copy, since there is no protocol-level EOF
marker then. We
+ * could go either way for copy from file, but
choose to throw
+ * error if there's data after the EOF marker,
for consistency
+ * with the new-protocol case.
+ */
+ char dummy;
+
+ if (cstate->copy_dest != COPY_OLD_FE &&
+ CopyGetData(cstate, &dummy, 1, 1) > 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("received copy
data after EOF marker")));
+ return true;
+ }
+
+ if (fld_count != attr_count)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
+
+ cstate->curr_tuple_start_info.block_id = cstate->curr_block_pos;
+ cstate->curr_tuple_start_info.offset = cstate->curr_data_offset;
+ cstate->curr_data_offset = cstate->curr_data_offset + sizeof(fld_count);
+ new_block_pos = cstate->curr_block_pos;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ int m = attnum - 1;
+ Form_pg_attribute att = TupleDescAttr(tupDesc, m);

The above code is present in NextCopyFrom & CopyReadBinaryTupleLeader,
check if we can make a common function or we could use NextCopyFrom as
it is.

+ memcpy(&fld_count,
&cstate->curr_data_block->data[cstate->curr_data_offset],
sizeof(fld_count));
+ fld_count = (int16) pg_ntoh16(fld_count);
+
+ if (fld_count == -1)
+ {
+ return true;
+ }

Should this be an assert in CopyReadBinaryTupleWorker function as this
check is already done in the leader.

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Bruce Momjian 2020-06-18 13:19:02 Re: Transactions involving multiple postgres foreign servers, take 2
Previous Message Amit Kapila 2020-06-18 12:48:37 Re: min_safe_lsn column in pg_replication_slots view