Re: COPY FROM WHEN condition

From: Andres Freund <andres(at)anarazel(dot)de>
To: David Rowley <david(dot)rowley(at)2ndquadrant(dot)com>
Cc: Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Surafel Temesgen <surafel3000(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>, Adam Berlin <berlin(dot)ab(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: COPY FROM WHEN condition
Date: 2019-04-02 17:52:16
Message-ID: 87ef6kl2gf.fsf@alap4.lan
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 2019-04-03 06:41:49 +1300, David Rowley wrote:
> However, I've ended up not doing it that way as the patch requires
> more than just an array of TupleTableSlots to be stored in the
> ResultRelInfo, it'll pretty much need all of what I have in
> CopyMultiInsertBuffer, which includes line numbers for error
> reporting, a BulkInsertState and a counter to track how many of the
> slots are used. I had thoughts that we could just tag the whole
> CopyMultiInsertBuffer in ResultRelInfo, but that requires moving it
> somewhere a bit more generic. Another thought would be that we have
> something like "void *extra;" in ResultRelInfo that we can just borrow
> for copy.c. It may be interesting to try this out to see if it saves
> much in the way of performance.

Hm, we could just forwad declare struct CopyMultiInsertBuffer in a
header, and only define it in copy.c. That doesn't sound insane to me.

> I've got the patch into a working state now and wrote a bunch of
> comments. I've not really done a thorough read back of the code again.
> I normally wait until the light is coming from a different angle
> before doing that, but there does not seem to be much time to wait for
> that in this case, so here's v2. Performance seems to be about the
> same as what I reported yesterday.

Cool.

> +/*
> + * Multi-Insert handling code for COPY FROM. Inserts are performed in
> + * batches of up to MAX_BUFFERED_TUPLES.

This should probably be moved to the top of the file.

> + * When COPY FROM is used on a partitioned table, we attempt to maintain
> + * only MAX_PARTITION_BUFFERS buffers at once. Buffers that are completely
> + * unused in each batch are removed so that we end up not keeping buffers for
> + * partitions that we might not insert into again.
> + */
> +#define MAX_BUFFERED_TUPLES 1000
> +#define MAX_BUFFERED_BYTES 65535
> +#define MAX_PARTITION_BUFFERS 15

> +/*
> + * CopyMultiInsertBuffer
> + * Stores multi-insert data related to a single relation in CopyFrom.
> + */
> +typedef struct

Please don't create anonymous structs - they can't be forward declared,
and some debugging tools handle them worse than named structs. And it
makes naming CopyMultiInsertBuffer in the comment less necessary.

> +{
> + Oid relid; /* Relation ID this insert data is for */
> + TupleTableSlot **slots; /* Array of MAX_BUFFERED_TUPLES to store
> + * tuples */
> + uint64 *linenos; /* Line # of tuple in copy stream */

It could make sense to allocate those two together, or even as part of
the CopyMultiInsertBuffer itself, to reduce allocator overhead.

> + else
> + {
> + CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
> +
> + /* Non-partitioned table. Just setup the buffer for the table. */
> + buffer->relid = RelationGetRelid(rri->ri_RelationDesc);
> + buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
> + buffer->linenos = palloc(MAX_BUFFERED_TUPLES * sizeof(uint64));
> + buffer->resultRelInfo = rri;
> + buffer->bistate = GetBulkInsertState();
> + buffer->nused = 0;
> + miinfo->multiInsertBufferTab = NULL;
> + miinfo->buffer = buffer;
> + miinfo->nbuffers = 1;
> + }

Can this be moved into a helper function?

> + /*
> + * heap_multi_insert leaks memory, so switch to short-lived memory context
> + * before calling it.
> + */

s/heap_multi_insert/table_multi_insert/

> + /*
> + * If there are any indexes, update them for all the inserted tuples, and
> + * run AFTER ROW INSERT triggers.
> + */
> + if (resultRelInfo->ri_NumIndices > 0)
> + {
> + for (i = 0; i < nBufferedTuples; i++)
> + {
> + List *recheckIndexes;
> +
> + cstate->cur_lineno = buffer->linenos[i];
> + recheckIndexes =
> + ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
> + NIL);
> + ExecARInsertTriggers(estate, resultRelInfo,
> + buffer->slots[i],
> + recheckIndexes, cstate->transition_capture);
> + list_free(recheckIndexes);
> + }
> + }
> +
> + /*
> + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
> + * anyway.
> + */
> + else if (resultRelInfo->ri_TrigDesc != NULL &&
> + (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
> + resultRelInfo->ri_TrigDesc->trig_insert_new_table))
> + {
> + for (i = 0; i < nBufferedTuples; i++)
> + {
> + cstate->cur_lineno = buffer->linenos[i];
> + ExecARInsertTriggers(estate, resultRelInfo,
> + bufferedSlots[i],
> + NIL, cstate->transition_capture);
> + }
> + }

> + for (i = 0; i < nBufferedTuples; i++)
> + ExecClearTuple(bufferedSlots[i]);

I wonder about combining these loops somehow. But it's probably ok.

> +/*
> + * CopyMultiInsertBuffer_Cleanup
> + * Drop used slots and free member for this buffer. The buffer
> + * must be flushed before cleanup.
> + */
> +static inline void
> +CopyMultiInsertBuffer_Cleanup(CopyMultiInsertBuffer *buffer)
> +{
> + int i;
> +
> + ReleaseBulkInsertStatePin(buffer->bistate);

Shouldn't this FreeBulkInsertState() rather than
ReleaseBulkInsertStatePin()?

> +
> +/*
> + * CopyMultiInsertBuffer_RemoveBuffer
> + * Remove a buffer from being tracked by miinfo
> + */
> +static inline void
> +CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo,
> + CopyMultiInsertBuffer *buffer)
> +{
> + Oid relid = buffer->relid;
> +
> + CopyMultiInsertBuffer_Cleanup(buffer);
> +
> + hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE,
> + NULL);
> + miinfo->nbuffers--;

Aren't we leaking the CopyMultiInsertBuffer itself here?

> +}
> +
> +/*
> + * CopyMultiInsertInfo_Flush
> + * Write out all stored tuples in all buffers out to the tables.
> + *
> + * To save us from ending up with buffers for 1000s of partitions we remove
> + * buffers belonging to partitions that we've seen no tuples for in this batch
> + */
> +static inline void
> +CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, CopyState cstate,
> + EState *estate, CommandId mycid, int ti_options)
> +{
> + CopyMultiInsertBuffer *buffer;
> +
> + /*
> + * If just storing buffers for a non-partitioned table, then just flush
> + * that buffer.
> + */
> + if (miinfo->multiInsertBufferTab == NULL)
> + {
> + buffer = miinfo->buffer;
> +
> + CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate, mycid,
> + ti_options);
> + }
> + else
> + {
> + HASH_SEQ_STATUS status;
> +
> + /*
> + * Otherwise make a pass over the hash table and flush all buffers
> + * that have any tuples stored in them.
> + */
> + hash_seq_init(&status, miinfo->multiInsertBufferTab);
> +
> + while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL)
> + {
> + if (buffer->nused > 0)
> + {
> + /* Flush the buffer if it was used */
> + CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate,
> + mycid, ti_options);
> + }
> + else
> + {
> + /*
> + * Otherwise just remove it. If we saw no tuples for it this
> + * batch, then likely its best to make way for buffers for
> + * other partitions.
> + */
> + CopyMultiInsertBuffer_RemoveBuffer(miinfo, buffer);
> + }
> + }
> + }
> +
> + miinfo->bufferedTuples = 0;
> + miinfo->bufferedBytes = 0;
> +}
> +
> +/*
> + * CopyMultiInsertInfo_Cleanup
> + * Cleanup allocated buffers and free memory
> + */
> +static inline void
> +CopyMultiInsertInfo_Cleanup(CopyMultiInsertInfo *miinfo)
> +{
> + if (miinfo->multiInsertBufferTab == NULL)
> + CopyMultiInsertBuffer_Cleanup(miinfo->buffer);
> + else
> + {
> + HASH_SEQ_STATUS status;
> + CopyMultiInsertBuffer *buffer;
> +
> + hash_seq_init(&status, miinfo->multiInsertBufferTab);
> +
> + while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL)
> + {
> + Assert(buffer->nused == 0);
> + CopyMultiInsertBuffer_Cleanup(buffer);
> + }
> +
> + hash_destroy(miinfo->multiInsertBufferTab);
> + }
> +}
> +
> +/*
> + * CopyMultiInsertInfo_NextFreeSlot
> + * Get the next TupleTableSlot that the next tuple should be stored in.
> + *
> + * Callers must ensure that the buffer is not full.
> + */
> +static inline TupleTableSlot *
> +CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo,
> + ResultRelInfo *rri)
> +{
> + CopyMultiInsertBuffer *buffer = miinfo->buffer;
> + int nused = buffer->nused;
> +
> + Assert(nused < MAX_BUFFERED_TUPLES);
> +
> + if (buffer->slots[nused] == NULL)
> + buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
> + return buffer->slots[nused];
> +}
> +
> +/*
> + * CopyMultiInsertInfo_Store
> + * Consume the previously reserved TupleTableSlot that was reserved by
> + * CopyMultiInsertInfo_NextFreeSlot.
> + */
> +static inline void
> +CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, TupleTableSlot *slot,
> + int tuplen, uint64 lineno)
> +{
> + CopyMultiInsertBuffer *buffer = miinfo->buffer;
> +
> + Assert(slot == buffer->slots[buffer->nused]);
> +
> + /* Store the line number so we can properly report any errors later */
> + buffer->linenos[buffer->nused] = lineno;
> +
> + /* Record this slot as being used */
> + buffer->nused++;
> +
> + /* Update how many tuples are stored and their size */
> + miinfo->bufferedTuples++;
> + miinfo->bufferedBytes += tuplen;
> +}
> +
> /*
> * Copy FROM file to relation.
> */
> uint64
> CopyFrom(CopyState cstate)
> {
> - HeapTuple tuple;
> - TupleDesc tupDesc;
> - Datum *values;
> - bool *nulls;
> ResultRelInfo *resultRelInfo;
> ResultRelInfo *target_resultRelInfo;
> ResultRelInfo *prevResultRelInfo = NULL;
> EState *estate = CreateExecutorState(); /* for ExecConstraints() */
> ModifyTableState *mtstate;
> ExprContext *econtext;
> - TupleTableSlot *myslot;
> + TupleTableSlot *singleslot = NULL;
> MemoryContext oldcontext = CurrentMemoryContext;
> - MemoryContext batchcontext;
>
> PartitionTupleRouting *proute = NULL;
> ErrorContextCallback errcallback;
> CommandId mycid = GetCurrentCommandId(true);
> int ti_options = 0; /* start with default table_insert options */
> - BulkInsertState bistate;
> + BulkInsertState bistate = NULL;
> CopyInsertMethod insertMethod;
> + CopyMultiInsertInfo multiInsertInfo;
> uint64 processed = 0;
> - int nBufferedTuples = 0;
> bool has_before_insert_row_trig;
> bool has_instead_insert_row_trig;
> bool leafpart_use_multi_insert = false;
>
> -#define MAX_BUFFERED_TUPLES 1000
> -#define RECHECK_MULTI_INSERT_THRESHOLD 1000
> - HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
> - Size bufferedTuplesSize = 0;
> - uint64 firstBufferedLineNo = 0;
> - uint64 lastPartitionSampleLineNo = 0;
> - uint64 nPartitionChanges = 0;
> - double avgTuplesPerPartChange = 0;
> -
> Assert(cstate->rel);
>
> + memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo));
> +
> /*
> * The target must be a plain, foreign, or partitioned relation, or have
> * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
> @@ -2382,8 +2767,6 @@ CopyFrom(CopyState cstate)
> RelationGetRelationName(cstate->rel))));
> }
>
> - tupDesc = RelationGetDescr(cstate->rel);
> -
> /*----------
> * Check to see if we can avoid writing WAL
> *
> @@ -2467,8 +2850,8 @@ CopyFrom(CopyState cstate)
> if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
> {
> ereport(ERROR,
> - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> - errmsg("cannot perform FREEZE on a partitioned table")));
> + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> + errmsg("cannot perform FREEZE on a partitioned table")));
> }
>
> /*
> @@ -2518,10 +2901,6 @@ CopyFrom(CopyState cstate)
>
> ExecInitRangeTable(estate, cstate->range_table);
>
> - /* Set up a tuple slot too */
> - myslot = ExecInitExtraTupleSlot(estate, tupDesc,
> - &TTSOpsHeapTuple);
> -
> /*
> * Set up a ModifyTableState so we can let FDW(s) init themselves for
> * foreign-table result relation(s).
> @@ -2565,10 +2944,11 @@ CopyFrom(CopyState cstate)
> &mtstate->ps);
>
> /*
> - * It's more efficient to prepare a bunch of tuples for insertion, and
> - * insert them in one heap_multi_insert() call, than call heap_insert()
> - * separately for every tuple. However, there are a number of reasons why
> - * we might not be able to do this. These are explained below.
> + * It's generally more efficient to prepare a bunch of tuples for
> + * insertion, and insert them in one table_multi_insert() call, than call
> + * table_insert() separately for every tuple. However, there are a number
> + * of reasons why we might not be able to do this. These are explained
> + * below.
> */
> if (resultRelInfo->ri_TrigDesc != NULL &&
> (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
> @@ -2589,8 +2969,8 @@ CopyFrom(CopyState cstate)
> * For partitioned tables we can't support multi-inserts when there
> * are any statement level insert triggers. It might be possible to
> * allow partitioned tables with such triggers in the future, but for
> - * now, CopyFromInsertBatch expects that any before row insert and
> - * statement level insert triggers are on the same relation.
> + * now, CopyMultiInsertInfo_Flush expects that any before row insert
> + * and statement level insert triggers are on the same relation.
> */
> insertMethod = CIM_SINGLE;
> }
> @@ -2622,8 +3002,7 @@ CopyFrom(CopyState cstate)
> {
> /*
> * For partitioned tables, we may still be able to perform bulk
> - * inserts for sets of consecutive tuples which belong to the same
> - * partition. However, the possibility of this depends on which types
> + * inserts. However, the possibility of this depends on which types
> * of triggers exist on the partition. We must disable bulk inserts
> * if the partition is a foreign table or it has any before row insert
> * or insert instead triggers (same as we checked above for the parent
> @@ -2632,18 +3011,27 @@ CopyFrom(CopyState cstate)
> * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
> * flag that we must later determine if we can use bulk-inserts for
> * the partition being inserted into.
> - *
> - * Normally, when performing bulk inserts we just flush the insert
> - * buffer whenever it becomes full, but for the partitioned table
> - * case, we flush it whenever the current tuple does not belong to the
> - * same partition as the previous tuple.
> */
> if (proute)
> insertMethod = CIM_MULTI_CONDITIONAL;
> else
> insertMethod = CIM_MULTI;
>
> - bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
> + CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo,
> + proute != NULL);
> + }
> +
> + /*
> + * If not using batch mode (which allocates slots as needed) set up a
> + * tuple slot too. When inserting into a partitioned table, we also need
> + * one, even if we might batch insert, to read the tuple in the root
> + * partition's form.
> + */
> + if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
> + {
> + singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
> + &estate->es_tupleTable);
> + bistate = GetBulkInsertState();
> }
>
> has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
> @@ -2660,10 +3048,6 @@ CopyFrom(CopyState cstate)
> */
> ExecBSInsertTriggers(estate, resultRelInfo);
>
> - values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
> - nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
> -
> - bistate = GetBulkInsertState();
> econtext = GetPerTupleExprContext(estate);
>
> /* Set up callback to identify error line number */
> @@ -2672,17 +3056,9 @@ CopyFrom(CopyState cstate)
> errcallback.previous = error_context_stack;
> error_context_stack = &errcallback;
>
> - /*
> - * Set up memory context for batches. For cases without batching we could
> - * use the per-tuple context, but it's simpler to just use it every time.
> - */
> - batchcontext = AllocSetContextCreate(CurrentMemoryContext,
> - "batch context",
> - ALLOCSET_DEFAULT_SIZES);
> -
> for (;;)
> {
> - TupleTableSlot *slot;
> + TupleTableSlot *myslot;
> bool skip_tuple;
>
> CHECK_FOR_INTERRUPTS();
> @@ -2693,20 +3069,33 @@ CopyFrom(CopyState cstate)
> */
> ResetPerTupleExprContext(estate);
>
> + if (insertMethod == CIM_SINGLE || proute)
> + {
> + myslot = singleslot;
> + Assert(myslot != NULL);
> + }
> + else
> + {
> + Assert(resultRelInfo == target_resultRelInfo);
> + Assert(insertMethod == CIM_MULTI);
> +
> + myslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
> + resultRelInfo);
> + }
> +
> /*
> * Switch to per-tuple context before calling NextCopyFrom, which does
> * evaluate default expressions etc. and requires per-tuple context.
> */
> MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
>
> - if (!NextCopyFrom(cstate, econtext, values, nulls))
> - break;
> + ExecClearTuple(myslot);
>
> - /* Switch into per-batch memory context before forming the tuple. */
> - MemoryContextSwitchTo(batchcontext);
> + /* Directly store the values/nulls array in the slot */
> + if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
> + break;
>
> - /* And now we can form the input tuple. */
> - tuple = heap_form_tuple(tupDesc, values, nulls);
> + ExecStoreVirtualTuple(myslot);
>
> /*
> * Constraints might reference the tableoid column, so (re-)initialize
> @@ -2717,18 +3106,15 @@ CopyFrom(CopyState cstate)
> /* Triggers and stuff need to be invoked in query context. */
> MemoryContextSwitchTo(oldcontext);
>
> - /* Place tuple in tuple slot --- but slot shouldn't free it */
> - slot = myslot;
> - ExecStoreHeapTuple(tuple, slot, false);
> -
> if (cstate->whereClause)
> {
> econtext->ecxt_scantuple = myslot;
> + /* Skip items that don't match the COPY's WHERE clause */
> if (!ExecQual(cstate->qualexpr, econtext))
> continue;
> }
>
> - /* Determine the partition to heap_insert the tuple into */
> + /* Determine the partition to table_insert the tuple into */
> if (proute)
> {
> TupleConversionMap *map;
> @@ -2739,80 +3125,10 @@ CopyFrom(CopyState cstate)
> * if the found partition is not suitable for INSERTs.
> */
> resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
> - proute, slot, estate);
> + proute, myslot, estate);
>
> if (prevResultRelInfo != resultRelInfo)
> {
> - /* Check if we can multi-insert into this partition */
> - if (insertMethod == CIM_MULTI_CONDITIONAL)
> - {
> - /*
> - * When performing bulk-inserts into partitioned tables we
> - * must insert the tuples seen so far to the heap whenever
> - * the partition changes.
> - */
> - if (nBufferedTuples > 0)
> - {
> - MemoryContext oldcontext;
> -
> - CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> - prevResultRelInfo, myslot, bistate,
> - nBufferedTuples, bufferedTuples,
> - firstBufferedLineNo);
> - nBufferedTuples = 0;
> - bufferedTuplesSize = 0;
> -
> - /*
> - * The tuple is already allocated in the batch context, which
> - * we want to reset. So to keep the tuple we copy it into the
> - * short-lived (per-tuple) context, reset the batch context
> - * and then copy it back into the per-batch one.
> - */
> - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> - tuple = heap_copytuple(tuple);
> - MemoryContextSwitchTo(oldcontext);
> -
> - /* cleanup the old batch */
> - MemoryContextReset(batchcontext);
> -
> - /* copy the tuple back to the per-batch context */
> - oldcontext = MemoryContextSwitchTo(batchcontext);
> - tuple = heap_copytuple(tuple);
> - MemoryContextSwitchTo(oldcontext);
> -
> - /*
> - * Also push the tuple copy to the slot (resetting the context
> - * invalidated the slot contents).
> - */
> - ExecStoreHeapTuple(tuple, slot, false);
> - }
> -
> - nPartitionChanges++;
> -
> - /*
> - * Here we adaptively enable multi-inserts based on the
> - * average number of tuples from recent multi-insert
> - * batches. We recalculate the average every
> - * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
> - * the average over the whole copy. This allows us to
> - * enable multi-inserts when we get periods in the copy
> - * stream that have tuples commonly belonging to the same
> - * partition, and disable when the partition is changing
> - * too often.
> - */
> - if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
> - RECHECK_MULTI_INSERT_THRESHOLD)
> - && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
> - {
> - avgTuplesPerPartChange =
> - (cstate->cur_lineno - lastPartitionSampleLineNo) /
> - (double) nPartitionChanges;
> -
> - lastPartitionSampleLineNo = cstate->cur_lineno;
> - nPartitionChanges = 0;
> - }
> - }
> -
> /* Determine which triggers exist on this partition */
> has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
> resultRelInfo->ri_TrigDesc->trig_insert_before_row);
> @@ -2821,22 +3137,19 @@ CopyFrom(CopyState cstate)
> resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
>
> /*
> - * Tests have shown that using multi-inserts when the
> - * partition changes on every tuple slightly decreases the
> - * performance, however, there are benefits even when only
> - * some batches have just 2 tuples, so let's enable
> - * multi-inserts even when the average is quite low.
> + * Enable multi-inserts when the partition has BEFORE/INSTEAD
> + * OF triggers, or if the partition is a foreign partition.
> */
> leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
> - avgTuplesPerPartChange >= 1.3 &&
> !has_before_insert_row_trig &&
> !has_instead_insert_row_trig &&
> resultRelInfo->ri_FdwRoutine == NULL;
>
> - /*
> - * We'd better make the bulk insert mechanism gets a new
> - * buffer when the partition being inserted into changes.
> - */
> + /* Set the multi-insert buffer to use for this partition. */
> + if (leafpart_use_multi_insert)
> + CopyMultiInsertInfo_SetCurrentBuffer(&multiInsertInfo,
> + resultRelInfo);
> +
> ReleaseBulkInsertStatePin(bistate);
> prevResultRelInfo = resultRelInfo;
> }
> @@ -2879,26 +3192,48 @@ CopyFrom(CopyState cstate)
> * rowtype.
> */
> map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
> - if (map != NULL)
> + if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
> {
> - TupleTableSlot *new_slot;
> - MemoryContext oldcontext;
> -
> - new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
> - Assert(new_slot != NULL);
> -
> - slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
> + /* non batch insert */
> + if (map != NULL)
> + {
> + TupleTableSlot *new_slot;
>
> + new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
> + myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
> + }
> + }
> + else
> + {
> /*
> - * Get the tuple in the per-batch context, so that it will be
> - * freed after each batch insert.
> + * Batch insert into partitioned table.
> */
> - oldcontext = MemoryContextSwitchTo(batchcontext);
> - tuple = ExecCopySlotHeapTuple(slot);
> - MemoryContextSwitchTo(oldcontext);
> + TupleTableSlot *nextslot;
> +
> + /* no other path available for partitioned table */
> + Assert(insertMethod == CIM_MULTI_CONDITIONAL);
> +
> + nextslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
> + resultRelInfo);
> +
> + if (map != NULL)
> + myslot = execute_attr_map_slot(map->attrMap, myslot, nextslot);
> + else
> + {
> + /*
> + * This looks more expensive than it is (Believe me, I
> + * optimized it away. Twice). The input is in virtual
> + * form, and we'll materialize the slot below - for most
> + * slot types the copy performs the work materialization
> + * would later require anyway.
> + */
> + ExecCopySlot(nextslot, myslot);
> + myslot = nextslot;
> + }
> }
>
> - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> + /* ensure that triggers etc see the right relation */
> + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> }
>
> skip_tuple = false;
> @@ -2906,7 +3241,7 @@ CopyFrom(CopyState cstate)
> /* BEFORE ROW INSERT Triggers */
> if (has_before_insert_row_trig)
> {
> - if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
> + if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
> skip_tuple = true; /* "do nothing" */
> }
>
> @@ -2919,7 +3254,7 @@ CopyFrom(CopyState cstate)
> */
> if (has_instead_insert_row_trig)
> {
> - ExecIRInsertTriggers(estate, resultRelInfo, slot);
> + ExecIRInsertTriggers(estate, resultRelInfo, myslot);
> }
> else
> {
> @@ -2931,12 +3266,7 @@ CopyFrom(CopyState cstate)
> */
> if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
> resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
> - {
> - ExecComputeStoredGenerated(estate, slot);
> - MemoryContextSwitchTo(batchcontext);
> - tuple = ExecCopySlotHeapTuple(slot);
> - MemoryContextSwitchTo(oldcontext);
> - }
> + ExecComputeStoredGenerated(estate, myslot);
>
> /*
> * If the target is a plain table, check the constraints of
> @@ -2944,7 +3274,7 @@ CopyFrom(CopyState cstate)
> */
> if (resultRelInfo->ri_FdwRoutine == NULL &&
> resultRelInfo->ri_RelationDesc->rd_att->constr)
> - ExecConstraints(resultRelInfo, slot, estate);
> + ExecConstraints(resultRelInfo, myslot, estate);
>
> /*
> * Also check the tuple against the partition constraint, if
> @@ -2954,7 +3284,7 @@ CopyFrom(CopyState cstate)
> */
> if (resultRelInfo->ri_PartitionCheck &&
> (proute == NULL || has_before_insert_row_trig))
> - ExecPartitionCheck(resultRelInfo, slot, estate, true);
> + ExecPartitionCheck(resultRelInfo, myslot, estate, true);
>
> /*
> * Perform multi-inserts when enabled, or when loading a
> @@ -2963,31 +3293,21 @@ CopyFrom(CopyState cstate)
> */
> if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
> {
> - /* Add this tuple to the tuple buffer */
> - if (nBufferedTuples == 0)
> - firstBufferedLineNo = cstate->cur_lineno;
> - bufferedTuples[nBufferedTuples++] = tuple;
> - bufferedTuplesSize += tuple->t_len;
> -
> /*
> - * If the buffer filled up, flush it. Also flush if the
> - * total size of all the tuples in the buffer becomes
> - * large, to avoid using large amounts of memory for the
> - * buffer when the tuples are exceptionally wide.
> + * The slot previously might point into the per-tuple
> + * context. For batching it needs to be longer lived.
> */
> - if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
> - bufferedTuplesSize > 65535)
> - {
> - CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> - resultRelInfo, myslot, bistate,
> - nBufferedTuples, bufferedTuples,
> - firstBufferedLineNo);
> - nBufferedTuples = 0;
> - bufferedTuplesSize = 0;
> -
> - /* free memory occupied by tuples from the batch */
> - MemoryContextReset(batchcontext);
> - }
> + ExecMaterializeSlot(myslot);
> +
> + /* Add this tuple to the tuple buffer */
> + CopyMultiInsertInfo_Store(&multiInsertInfo, myslot,
> + cstate->line_buf.len,
> + cstate->cur_lineno);
> +
> + /* If the buffer filled up, flush it. */
> + if (CopyMultiInsertInfo_IsFull(&multiInsertInfo))
> + CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate,
> + estate, mycid, ti_options);
> }
> else
> {
> @@ -2996,12 +3316,12 @@ CopyFrom(CopyState cstate)
> /* OK, store the tuple */
> if (resultRelInfo->ri_FdwRoutine != NULL)
> {
> - slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
> - resultRelInfo,
> - slot,
> - NULL);
> + myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
> + resultRelInfo,
> + myslot,
> + NULL);
>
> - if (slot == NULL) /* "do nothing" */
> + if (myslot == NULL) /* "do nothing" */
> continue; /* next tuple please */
>
> /*
> @@ -3009,27 +3329,26 @@ CopyFrom(CopyState cstate)
> * column, so (re-)initialize tts_tableOid before
> * evaluating them.
> */
> - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> }
> else
> {
> - tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
> - heap_insert(resultRelInfo->ri_RelationDesc, tuple,
> - mycid, ti_options, bistate);
> - ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
> - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
> + /* OK, store the tuple and create index entries for it */
> + table_insert(resultRelInfo->ri_RelationDesc, myslot,
> + mycid, ti_options, bistate);
> }
>
> +
> /* And create index entries for it */
> if (resultRelInfo->ri_NumIndices > 0)
> - recheckIndexes = ExecInsertIndexTuples(slot,
> + recheckIndexes = ExecInsertIndexTuples(myslot,
> estate,
> false,
> NULL,
> NIL);
>
> /* AFTER ROW INSERT Triggers */
> - ExecARInsertTriggers(estate, resultRelInfo, slot,
> + ExecARInsertTriggers(estate, resultRelInfo, myslot,
> recheckIndexes, cstate->transition_capture);
>
> list_free(recheckIndexes);
> @@ -3045,32 +3364,25 @@ CopyFrom(CopyState cstate)
> }
> }
>
> - /* Flush any remaining buffered tuples */
> - if (nBufferedTuples > 0)
> + if (insertMethod != CIM_SINGLE)
> {
> - if (insertMethod == CIM_MULTI_CONDITIONAL)
> - {
> - CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> - prevResultRelInfo, myslot, bistate,
> - nBufferedTuples, bufferedTuples,
> - firstBufferedLineNo);
> - }
> - else
> - CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> - resultRelInfo, myslot, bistate,
> - nBufferedTuples, bufferedTuples,
> - firstBufferedLineNo);
> + /* Flush any remaining buffered tuples */
> + if (!CopyMultiInsertInfo_IsEmpty(&multiInsertInfo))
> + CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid,
> + ti_options);
> +
> + /* Tear down the multi-insert data */
> + CopyMultiInsertInfo_Cleanup(&multiInsertInfo);
> }
>
> /* Done, clean up */
> error_context_stack = errcallback.previous;
>
> - FreeBulkInsertState(bistate);
> + if (bistate != NULL)
> + ReleaseBulkInsertStatePin(bistate);
>
> MemoryContextSwitchTo(oldcontext);
>
> - MemoryContextDelete(batchcontext);
> -
> /*
> * In the old protocol, tell pqcomm that we can process normal protocol
> * messages again.
> @@ -3084,9 +3396,6 @@ CopyFrom(CopyState cstate)
> /* Handle queued AFTER triggers */
> AfterTriggerEndQuery(estate);
>
> - pfree(values);
> - pfree(nulls);
> -
> ExecResetTupleTable(estate->es_tupleTable, false);
>
> /* Allow the FDW to shut down */
> @@ -3111,88 +3420,6 @@ CopyFrom(CopyState cstate)
> return processed;
> }
>
> -/*
> - * A subroutine of CopyFrom, to write the current batch of buffered heap
> - * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
> - * triggers.
> - */
> -static void
> -CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
> - int ti_options, ResultRelInfo *resultRelInfo,
> - TupleTableSlot *myslot, BulkInsertState bistate,
> - int nBufferedTuples, HeapTuple *bufferedTuples,
> - uint64 firstBufferedLineNo)
> -{
> - MemoryContext oldcontext;
> - int i;
> - uint64 save_cur_lineno;
> - bool line_buf_valid = cstate->line_buf_valid;
> -
> - /*
> - * Print error context information correctly, if one of the operations
> - * below fail.
> - */
> - cstate->line_buf_valid = false;
> - save_cur_lineno = cstate->cur_lineno;
> -
> - /*
> - * heap_multi_insert leaks memory, so switch to short-lived memory context
> - * before calling it.
> - */
> - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> - heap_multi_insert(resultRelInfo->ri_RelationDesc,
> - bufferedTuples,
> - nBufferedTuples,
> - mycid,
> - ti_options,
> - bistate);
> - MemoryContextSwitchTo(oldcontext);
> -
> - /*
> - * If there are any indexes, update them for all the inserted tuples, and
> - * run AFTER ROW INSERT triggers.
> - */
> - if (resultRelInfo->ri_NumIndices > 0)
> - {
> - for (i = 0; i < nBufferedTuples; i++)
> - {
> - List *recheckIndexes;
> -
> - cstate->cur_lineno = firstBufferedLineNo + i;
> - ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
> - recheckIndexes =
> - ExecInsertIndexTuples(myslot,
> - estate, false, NULL, NIL);
> - ExecARInsertTriggers(estate, resultRelInfo,
> - myslot,
> - recheckIndexes, cstate->transition_capture);
> - list_free(recheckIndexes);
> - }
> - }
> -
> - /*
> - * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
> - * anyway.
> - */
> - else if (resultRelInfo->ri_TrigDesc != NULL &&
> - (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
> - resultRelInfo->ri_TrigDesc->trig_insert_new_table))
> - {
> - for (i = 0; i < nBufferedTuples; i++)
> - {
> - cstate->cur_lineno = firstBufferedLineNo + i;
> - ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
> - ExecARInsertTriggers(estate, resultRelInfo,
> - myslot,
> - NIL, cstate->transition_capture);
> - }
> - }
> -
> - /* reset cur_lineno and line_buf_valid to what they were */
> - cstate->line_buf_valid = line_buf_valid;
> - cstate->cur_lineno = save_cur_lineno;
> -}
> -
> /*
> * Setup to read tuples from a file for COPY FROM.
> *
> @@ -4990,11 +5217,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
> DR_copy *myState = (DR_copy *) self;
> CopyState cstate = myState->cstate;
>
> - /* Make sure the tuple is fully deconstructed */
> - slot_getallattrs(slot);
> -
> - /* And send the data */
> - CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
> + /* Send the data */
> + CopyOneRowTo(cstate, slot);
> myState->processed++;
>
> return true;
> diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
> index 4c077755d5..ed0e2de144 100644
> --- a/src/include/access/heapam.h
> +++ b/src/include/access/heapam.h
> @@ -36,6 +36,7 @@
> #define HEAP_INSERT_SPECULATIVE 0x0010
>
> typedef struct BulkInsertStateData *BulkInsertState;
> +struct TupleTableSlot;
>
> #define MaxLockTupleMode LockTupleExclusive
>
> @@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
>
> extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
> int options, BulkInsertState bistate);
> -extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
> +extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples,
> CommandId cid, int options, BulkInsertState bistate);
> extern TM_Result heap_delete(Relation relation, ItemPointer tid,
> CommandId cid, Snapshot crosscheck, bool wait,
> diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
> index 4efe178ed1..c2fdedc551 100644
> --- a/src/include/access/tableam.h
> +++ b/src/include/access/tableam.h
> @@ -328,6 +328,9 @@ typedef struct TableAmRoutine
> * ------------------------------------------------------------------------
> */
>
> + void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
> + CommandId cid, int options, struct BulkInsertStateData *bistate);
> +
> /* see table_insert() for reference about parameters */
> void (*tuple_insert) (Relation rel, TupleTableSlot *slot,
> CommandId cid, int options,
> @@ -1157,6 +1160,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
> lockmode, update_indexes);
> }
>
> +/*
> + * table_multi_insert - insert multiple tuple into a table
> + */
> +static inline void
> +table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
> + CommandId cid, int options, struct BulkInsertStateData *bistate)
> +{
> + rel->rd_tableam->multi_insert(rel, slots, nslots,
> + cid, options, bistate);
> +}
> +
> /*
> * Lock a tuple in the specified mode.
> *

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2019-04-02 17:57:53 Re: COPY FROM WHEN condition
Previous Message David Rowley 2019-04-02 17:41:49 Re: COPY FROM WHEN condition