Re: Parallel Inserts in CREATE TABLE AS

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
Cc: Luc Vlaming <luc(at)swarm64(dot)com>, "Hou, Zhijie" <houzj(dot)fnst(at)cn(dot)fujitsu(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Parallel Inserts in CREATE TABLE AS
Date: 2020-12-05 10:59:07
Message-ID: CAA4eK1+zpf_C8nk00JF5Rs93Rn6L2Qk8moeWjWOsgW6_rz3fTA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Nov 30, 2020 at 10:43 AM Bharath Rupireddy
<bharath(dot)rupireddyforpostgres(at)gmail(dot)com> wrote:
>
> On Fri, Nov 27, 2020 at 1:07 PM Luc Vlaming <luc(at)swarm64(dot)com> wrote:
> >
> > Disclaimer: I have by no means throughly reviewed all the involved parts
> > and am probably missing quite a bit of context so if I understood parts
> > wrong or they have been discussed before then I'm sorry. Most notably
> > the whole situation about the command-id is still elusive for me and I
> > can really not judge yet anything related to that.
> >
> > IMHO The patch makes that we now have the gather do most of the CTAS
> > work, which seems unwanted. For the non-ctas insert/update case it seems
> > that a modifytable node exists to actually do the work. What I'm
> > wondering is if it is maybe not better to introduce a CreateTable node
> > as well?
> > This would have several merits:
> > - the rowcount of that node would be 0 for the parallel case, and
> > non-zero for the serial case. Then the gather ndoe and the Query struct
> > don't have to know about CTAS for the most part, removing e.g. the case
> > distinctions in cost_gather.
> > - the inserted rows can now be accounted in this new node instead of the
> > parallel executor state, and this node can also do its own DSM
> > intializations
> > - the generation of a partial variants of the CreateTable node can now
> > be done in the optimizer instead of the ExecCreateTableAs which IMHO is
> > a more logical place to make these kind of decisions. which then also
> > makes it potentially play nicer with costs and the like.
> > - the explain code can now be in its own place instead of part of the
> > gather node
> > - IIUC it would allow the removal of the code to only launch parallel
> > workers if its not CTAS, which IMHO would be quite a big benefit.
> >
> > Thoughts?
> >
>
> If I'm not wrong, I think currently we have no exec nodes for DDLs.
> I'm not sure whether we would like to introduce one for this.
>

Yeah, I am also not in favor of having an executor node for CTAS but
OTOH, I also don't like the way you have jammed the relevant
information in generic PlanState. How about keeping it in GatherState
and initializing it in ExecCreateTableAs() after the executor start.
You are already doing special treatment for the Gather node in
ExecCreateTableAs (via IsParallelInsertInCTASAllowed) so we can as
well initialize the required information in GatherState in
ExecCreateTableAs. I think that might help in reducing the special
treatment for intoclause at different places.

Few other assorted comments:
=========================
1.
+/*
+ * IsParallelInsertInCTASAllowed --- determine whether or not parallel
+ * insertion is possible.
+ */
+bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc)
+{
..
..
if (ps && IsA(ps, GatherState) && !ps->ps_ProjInfo &&
+ plannedstmt->parallelModeNeeded &&
+ plannedstmt->planTree &&
+ IsA(plannedstmt->planTree, Gather) &&
+ plannedstmt->planTree->lefttree &&
+ plannedstmt->planTree->lefttree->parallel_aware &&
+ plannedstmt->planTree->lefttree->parallel_safe)
+ {
+ /*
+ * Since there are no rows that are transferred from workers to
+ * Gather node, so we set it to 0 to be visible in explain
+ * plans. Note that we would have already accounted this for
+ * cost calculations in cost_gather().
+ */
+ plannedstmt->planTree->plan_rows = 0;

This looks a bit odd. The function name
'IsParallelInsertInCTASAllowed' suggests that it just checks whether
parallelism is allowed but it is internally changing the plan_rows. It
might be better to do this separately if the parallelism is allowed.

2.
static void ExecShutdownGatherWorkers(GatherState *node);
-
+static void ExecParallelInsertInCTAS(GatherState *node);

Spurious line removal.

3.
/* Wait for the parallel workers to finish. */
+ if (node->nworkers_launched > 0)
+ {
+ ExecShutdownGatherWorkers(node);
+
+ /*
+ * Add up the total tuples inserted by all workers, to the tuples
+ * inserted by the leader(if any). This will be shared to client.
+ */
+ node->ps.state->es_processed += pg_atomic_read_u64(node->pei->processed);
+ }

The comment and code appear a bit misleading as the function seems to
shutdown the workers rather than waiting for them to finish. How about
using something like below:

/*
* Next, accumulate buffer and WAL usage. (This must wait for the workers
* to finish, or we might get incomplete data.)
*/
if (nworkers > 0)
{
int i;

/* Wait for all vacuum workers to finish */
WaitForParallelWorkersToFinish(lps->pcxt);

for (i = 0; i < lps->pcxt->nworkers_launched; i++)
InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
}

This is how it works for parallel vacuum.

4.
+
+ /*
+ * Make the number of tuples that are transferred from workers to gather
+ * node zero as each worker parallelly insert the tuples that are resulted
+ * from its chunk of plan execution. This change may make the parallel
+ * plan cheap among all other plans, and influence the planner to consider
+ * this parallel plan.
+ */
+ if (!(root->parse->isForCTAS &&
+ root->query_level == 1))
+ run_cost += parallel_tuple_cost * path->path.rows;

The above comment doesn't seem to convey what it intends to convey.
How about changing it slightly as: "We don't compute the
parallel_tuple_cost for CTAS because the number of tuples that are
transferred from workers to the gather node is zero as each worker
parallelly inserts the tuples that are resulted from its chunk of plan
execution. This change may make the parallel plan cheap among all
other plans, and influence the planner to consider this parallel
plan."

Then, we can also have an Assert for path->path.rows to zero for the CTAS case.

5.
+ /* Prallel inserts in CTAS related info is specified below. */
+ IntoClause *intoclause;
+ Oid objectid;
+ DestReceiver *dest;
} PlanState;

Typo. /Prallel/Parallel

6.
Currently, it seems the plan look like:
Gather (actual time=970.524..972.913 rows=0 loops=1)
-> Create t1_test
Workers Planned: 2
Workers Launched: 2
-> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333 loops=3)

I would prefer it to be:
Gather (actual time=970.524..972.913 rows=0 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Create t1_test
-> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333 loops=3)

This way it looks like the writing part is done below the Gather node
and also it will match the Parallel Insert patch of Greg.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Laurenz Albe 2020-12-05 12:04:13 Re: Add session statistics to pg_stat_database
Previous Message Joel Jacobson 2020-12-05 07:22:13 Re: [PATCH] Add support for leading/trailing bytea trim()ing