Index: src/backend/commands/trigger.c
===================================================================
RCS file: /home/manu/cvsrepo/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.257
diff -c -r1.257 trigger.c
*** src/backend/commands/trigger.c 20 Nov 2009 20:38:10 -0000 1.257
--- src/backend/commands/trigger.c 22 Nov 2009 16:52:29 -0000
***************
*** 1921,1926 ****
--- 1921,1968 ----
return newtuple;
}
+ HeapTuple
+ ExecARInsertTriggersNow(EState *estate, ResultRelInfo *relinfo,
+ HeapTuple trigtuple)
+ {
+ TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
+ int ntrigs = trigdesc->n_after_row[TRIGGER_EVENT_INSERT];
+ int *tgindx = trigdesc->tg_after_row[TRIGGER_EVENT_INSERT];
+ HeapTuple newtuple = trigtuple;
+ HeapTuple oldtuple;
+ TriggerData LocTriggerData;
+ int i;
+
+ LocTriggerData.type = T_TriggerData;
+ LocTriggerData.tg_event = TRIGGER_EVENT_INSERT |
+ TRIGGER_EVENT_ROW;
+ LocTriggerData.tg_relation = relinfo->ri_RelationDesc;
+ LocTriggerData.tg_newtuple = NULL;
+ LocTriggerData.tg_newtuplebuf = InvalidBuffer;
+ for (i = 0; i < ntrigs; i++)
+ {
+ Trigger *trigger = &trigdesc->triggers[tgindx[i]];
+
+ if (!TriggerEnabled(estate, relinfo, trigger, LocTriggerData.tg_event,
+ NULL, NULL, newtuple))
+ continue;
+
+ LocTriggerData.tg_trigtuple = oldtuple = newtuple;
+ LocTriggerData.tg_trigtuplebuf = InvalidBuffer;
+ LocTriggerData.tg_trigger = trigger;
+ newtuple = ExecCallTriggerFunc(&LocTriggerData,
+ tgindx[i],
+ relinfo->ri_TrigFunctions,
+ relinfo->ri_TrigInstrument,
+ GetPerTupleMemoryContext(estate));
+ if (oldtuple != newtuple && oldtuple != trigtuple)
+ heap_freetuple(oldtuple);
+ if (newtuple == NULL)
+ break;
+ }
+ return newtuple;
+ }
+
void
ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo,
HeapTuple trigtuple, List *recheckIndexes)
Index: src/backend/commands/copy.c
===================================================================
RCS file: /home/manu/cvsrepo/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.318
diff -c -r1.318 copy.c
*** src/backend/commands/copy.c 20 Nov 2009 20:38:10 -0000 1.318
--- src/backend/commands/copy.c 22 Nov 2009 16:52:29 -0000
***************
*** 43,48 ****
--- 43,56 ----
#include "utils/memutils.h"
#include "utils/snapmgr.h"
+ /* For tuple routing */
+ #include "catalog/pg_inherits.h"
+ #include "catalog/pg_inherits_fn.h"
+ #include "nodes/makefuncs.h"
+ #include "nodes/pg_list.h"
+ #include "utils/fmgroids.h"
+ #include "utils/relcache.h"
+ #include "utils/tqual.h"
#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
***************
*** 117,122 ****
--- 125,131 ----
char *escape; /* CSV escape char (must be 1 byte) */
bool *force_quote_flags; /* per-column CSV FQ flags */
bool *force_notnull_flags; /* per-column CSV FNN flags */
+ bool partitioning; /* tuple routing in table hierarchy */
/* these are just for error messages, see copy_in_error_callback */
const char *cur_relname; /* table name for error messages */
***************
*** 173,178 ****
--- 182,190 ----
} DR_copy;
+ /* List of child tables where tuples where routed (for partitioning option) */
+ List *child_table_lru = NULL;
+
/*
* These macros centralize code used to process line_buf and raw_buf buffers.
* They are macros because they often do continue/break control and to avoid
***************
*** 839,844 ****
--- 851,864 ----
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname)));
}
+ else if (strcmp(defel->defname, "partitioning") == 0)
+ {
+ if (cstate->partitioning)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ cstate->partitioning = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
***************
*** 1662,1667 ****
--- 1682,1949 ----
return res;
}
+ /**
+ * Check that the given tuple matches the constraints of the given child table
+ * and performs an insert if the constraints are matched. insert_tuple specifies
+ * if the tuple must be inserted in the table if the constraint is satisfied.
+ * The method returns true if the constraint is satisfied (and insert was
+ * performed if insert_tuple is true), false otherwise (constraints not
+ * satisfied for this tuple on this child table).
+ */
+ static bool
+ check_tuple_constraints(Relation child_table_relation, HeapTuple tuple,
+ bool insert_tuple, int hi_options, ResultRelInfo *parentResultRelInfo)
+ {
+ /* Check the constraints */
+ ResultRelInfo *resultRelInfo;
+ TupleTableSlot *slot;
+ EState *estate = CreateExecutorState();
+ bool result = false;
+
+ resultRelInfo = makeNode(ResultRelInfo);
+ resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
+ resultRelInfo->ri_RelationDesc = child_table_relation;
+ resultRelInfo->ri_TrigDesc = CopyTriggerDesc(child_table_relation->trigdesc);
+ if (resultRelInfo->ri_TrigDesc)
+ resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
+ palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
+ resultRelInfo->ri_TrigInstrument = NULL;
+
+ ExecOpenIndices(resultRelInfo);
+
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+
+ /* Set up a tuple slot too */
+ slot = MakeSingleTupleTableSlot(child_table_relation->rd_att);
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+ if (ExecRelCheck(resultRelInfo, slot, estate) == NULL)
+ {
+ /* Constraints satisfied */
+ if (insert_tuple)
+ {
+ /* Insert the row in the child table */
+ List *recheckIndexes = NIL;
+
+ /* BEFORE ROW INSERT Triggers */
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
+ {
+ HeapTuple newtuple;
+ newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
+
+ if (newtuple != tuple)
+ {
+ /* tuple modified by Trigger(s), check that the constraint is still valid */
+ heap_freetuple(tuple);
+ tuple = newtuple;
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+ if (ExecRelCheck(resultRelInfo, slot, estate) != NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("Before row insert trigger on table \"%s\" modified partitioning routing decision. Aborting insert.",
+ RelationGetRelationName(child_table_relation))));
+ }
+ }
+ }
+
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(child_table_relation, tuple, GetCurrentCommandId(true),
+ hi_options, NULL);
+
+ /* Update indices */
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate, false);
+
+ /* AFTER ROW INSERT Triggers */
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->n_after_row[TRIGGER_EVENT_INSERT] > 0)
+ {
+ HeapTuple newtuple;
+ newtuple = ExecARInsertTriggersNow(estate, resultRelInfo, tuple);
+ if (newtuple != tuple)
+ {
+ /* tuple modified by Trigger(s), check that the constraint is still valid */
+ heap_freetuple(tuple);
+ tuple = newtuple;
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+ if (ExecRelCheck(resultRelInfo, slot, estate) != NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("After row insert trigger on table \"%s\" modified partitioning routing decision. Aborting insert.",
+ RelationGetRelationName(child_table_relation))));
+ }
+ }
+ }
+ }
+ result = true;
+ }
+
+ /* Free resources */
+ FreeExecutorState(estate);
+ ExecDropSingleTupleTableSlot(slot);
+ ExecCloseIndices(resultRelInfo);
+
+ return result;
+ }
+
+
+ /**
+ * Route a tuple into a child table that matches the constraints of the tuple
+ * to be inserted.
+ * @param parent_relation_id Oid of the parent relation
+ * @param tuple the tuple to be routed
+ */
+ static bool route_tuple_to_child(Relation parent_relation, HeapTuple tuple, int hi_options, ResultRelInfo *parentResultRelInfo)
+ {
+ Relation child_table_relation;
+ bool result = false;
+ Relation catalog_relation;
+ HeapTuple inherits_tuple;
+ HeapScanDesc scan;
+ ScanKeyData key[1];
+
+ /* Try to exploit locality for bulk inserts
+ * We expect consecutive insert to go to the same child table */
+ if (child_table_lru != NULL)
+ {
+ /* Try the child table LRU */
+ ListCell *child_oid_cell;
+ Oid child_relation_id;
+
+ foreach(child_oid_cell, child_table_lru)
+ {
+ child_relation_id = lfirst_oid(child_oid_cell);
+ child_table_relation = try_relation_open(child_relation_id,
+ RowExclusiveLock);
+
+ if (child_table_relation == NULL)
+ {
+ /* Child table does not exist anymore, purge cache entry */
+ child_table_lru = list_delete_oid(child_table_lru, child_relation_id);
+ if (list_length(child_table_lru) == 0)
+ break; /* Cache is now empty */
+ else
+ { /* Restart scanning */
+ child_oid_cell = list_head(child_table_lru);
+ continue;
+ }
+ }
+
+ if (check_tuple_constraints(child_table_relation, tuple, true, hi_options, parentResultRelInfo))
+ {
+ /* Hit, move in front if not already the head */
+ if (lfirst_oid(list_head(child_table_lru)) != child_relation_id)
+ {
+ /* The partitioning cache is in the CurTransactionContext) */
+ MemoryContext currentContext = MemoryContextSwitchTo(CurTransactionContext);
+ child_table_lru = list_delete_oid(child_table_lru, child_relation_id);
+ child_table_lru = lcons_oid(child_relation_id, child_table_lru);
+ MemoryContextSwitchTo(currentContext);
+ }
+
+ /* Close the relation but keep the lock until the end of
+ * the transaction */
+ relation_close(child_table_relation, NoLock);
+
+ return true;
+ }
+ relation_close(child_table_relation, RowExclusiveLock);
+ }
+ /* We got a miss */
+ }
+
+ /* Looking up child tables */
+ ScanKeyInit(&key[0],
+ Anum_pg_inherits_inhparent,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(parent_relation->rd_id));
+ catalog_relation = heap_open(InheritsRelationId, AccessShareLock);
+ scan = heap_beginscan(catalog_relation, SnapshotNow, 1, key);
+ while ((inherits_tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ TupleConstr *constr;
+ Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inherits_tuple);
+ Oid child_relation_id = inh->inhrelid;
+
+ /* Check if the child table satisfy the constraints, if the relation
+ * cannot be opened this throws an exception */
+ child_table_relation = relation_open(child_relation_id, RowExclusiveLock);
+
+ constr = child_table_relation->rd_att->constr;
+ if (constr->num_check == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("partition routing found no constraint for relation \"%s\"",
+ RelationGetRelationName(child_table_relation))));
+ }
+
+ if (has_subclass(child_table_relation->rd_id))
+ {
+ /* This is a parent table, check its constraints first */
+ if (check_tuple_constraints(child_table_relation, tuple, false, hi_options, parentResultRelInfo))
+ {
+ /* Constraint satisfied, explore the child tables */
+ result = route_tuple_to_child(child_table_relation, tuple, hi_options, parentResultRelInfo);
+ if (result)
+ {
+ /* Success, one of our child tables matched.
+ * Release the lock on this parent relation, we did not use it */
+ relation_close(child_table_relation, RowExclusiveLock);
+ break;
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("tuple matched constraints of relation \"%s\" but none of "
+ "its children",
+ RelationGetRelationName(child_table_relation))));
+ }
+ }
+ }
+ else
+ {
+ /* Child table, try it */
+ result = check_tuple_constraints(child_table_relation, tuple, true, hi_options, parentResultRelInfo);
+ }
+
+ if (result)
+ {
+ MemoryContext currentContext;
+ /* We found the one, update the LRU and exit the loop!
+ *
+ * Close the relation but keep the lock until the end of
+ * the transaction */
+ relation_close(child_table_relation, NoLock);
+
+ /* The partitioning cache is in the CurTransactionContext) */
+ currentContext = MemoryContextSwitchTo(CurTransactionContext);
+
+ /* Add the new entry in head of the list (also builds the list if needed) */
+ child_table_lru = lcons_oid(child_relation_id, child_table_lru);
+
+ /* Restore memory context */
+ MemoryContextSwitchTo(currentContext);
+ break;
+ }
+ else
+ {
+ /* Release the lock on that relation, we did not use it */
+ relation_close(child_table_relation, RowExclusiveLock);
+ }
+ }
+ heap_endscan(scan);
+ heap_close(catalog_relation, AccessShareLock);
+ return result;
+ }
+
/*
* Copy FROM file to relation.
*/
***************
*** 2154,2189 ****
{
List *recheckIndexes = NIL;
! /* Place tuple in tuple slot */
! ExecStoreTuple(tuple, slot, InvalidBuffer, false);
!
! /* Check the constraints of the tuple */
! if (cstate->rel->rd_att->constr)
! ExecConstraints(resultRelInfo, slot, estate);
!
! /* OK, store the tuple and create index entries for it */
! heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
! if (resultRelInfo->ri_NumIndices > 0)
! recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
! estate, false);
! /* AFTER ROW INSERT Triggers */
! ExecARInsertTriggers(estate, resultRelInfo, tuple,
! recheckIndexes);
! /*
! * We count only tuples not suppressed by a BEFORE INSERT trigger;
! * this is the same definition used by execMain.c for counting
! * tuples inserted by an INSERT command.
! */
! cstate->processed++;
}
}
/* Done, clean up */
error_context_stack = errcontext.previous;
FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
--- 2436,2503 ----
{
List *recheckIndexes = NIL;
! /* If routing is enabled and table has child tables, let's try routing */
! if (cstate->partitioning && has_subclass(cstate->rel->rd_id))
! {
! if (route_tuple_to_child(cstate->rel, tuple, hi_options, resultRelInfo))
! {
! /* increase the counter so that we return how many
! * tuples got copied into all tables in total */
! cstate->processed++;
! }
! else
! {
! ereport(ERROR, (
! errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
! errmsg("tuple does not satisfy any child table constraint")
! ));
! }
! }
! else
! {
! /* No partitioning, prepare the tuple and
! * check the constraints */
! /* Place tuple in tuple slot */
! ExecStoreTuple(tuple, slot, InvalidBuffer, false);
! /* Check the constraints of the tuple */
! if (cstate->rel->rd_att->constr)
! ExecConstraints(resultRelInfo, slot, estate);
!
! /* OK, store the tuple and create index entries for it */
! heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
!
! if (resultRelInfo->ri_NumIndices > 0)
! recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
! estate, false);
!
! /* AFTER ROW INSERT Triggers */
! ExecARInsertTriggers(estate, resultRelInfo, tuple,
! recheckIndexes);
! /*
! * We count only tuples not suppressed by a BEFORE INSERT trigger;
! * this is the same definition used by execMain.c for counting
! * tuples inserted by an INSERT command.
! */
! cstate->processed++;
! }
}
}
/* Done, clean up */
error_context_stack = errcontext.previous;
+ /* Free the partitioning LRU list if any */
+ if (child_table_lru != NULL)
+ {
+ MemoryContext currentContext = MemoryContextSwitchTo(CurTransactionContext);
+ list_free(child_table_lru);
+ child_table_lru = NULL;
+ MemoryContextSwitchTo(currentContext);
+ }
+
FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
Index: src/include/commands/trigger.h
===================================================================
RCS file: /home/manu/cvsrepo/pgsql/src/include/commands/trigger.h,v
retrieving revision 1.78
diff -c -r1.78 trigger.h
*** src/include/commands/trigger.h 20 Nov 2009 20:38:11 -0000 1.78
--- src/include/commands/trigger.h 22 Nov 2009 16:52:29 -0000
***************
*** 130,135 ****
--- 130,138 ----
extern HeapTuple ExecBRInsertTriggers(EState *estate,
ResultRelInfo *relinfo,
HeapTuple trigtuple);
+ extern HeapTuple ExecARInsertTriggersNow(EState *estate,
+ ResultRelInfo *relinfo,
+ HeapTuple trigtuple);
extern void ExecARInsertTriggers(EState *estate,
ResultRelInfo *relinfo,
HeapTuple trigtuple,
Index: src/include/executor/executor.h
===================================================================
RCS file: /home/manu/cvsrepo/pgsql/src/include/executor/executor.h,v
retrieving revision 1.163
diff -c -r1.163 executor.h
*** src/include/executor/executor.h 26 Oct 2009 02:26:41 -0000 1.163
--- src/include/executor/executor.h 22 Nov 2009 16:52:29 -0000
***************
*** 166,171 ****
--- 166,173 ----
extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
extern void ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
+ extern const char *ExecRelCheck(ResultRelInfo *resultRelInfo,
+ TupleTableSlot *slot, EState *estate);
extern TupleTableSlot *EvalPlanQual(EState *estate, EPQState *epqstate,
Relation relation, Index rti,
ItemPointer tid, TransactionId priorXmax);
Index: src/backend/executor/execMain.c
===================================================================
RCS file: /home/manu/cvsrepo/pgsql/src/backend/executor/execMain.c,v
retrieving revision 1.335
diff -c -r1.335 execMain.c
*** src/backend/executor/execMain.c 20 Nov 2009 20:38:10 -0000 1.335
--- src/backend/executor/execMain.c 22 Nov 2009 16:52:29 -0000
***************
*** 1239,1245 ****
/*
* ExecRelCheck --- check that tuple meets constraints for result relation
*/
! static const char *
ExecRelCheck(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
{
--- 1239,1245 ----
/*
* ExecRelCheck --- check that tuple meets constraints for result relation
*/
! const char *
ExecRelCheck(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
{
Index: src/test/regress/parallel_schedule
===================================================================
RCS file: /home/manu/cvsrepo/pgsql/src/test/regress/parallel_schedule,v
retrieving revision 1.57
diff -c -r1.57 parallel_schedule
*** src/test/regress/parallel_schedule 24 Aug 2009 03:10:16 -0000 1.57
--- src/test/regress/parallel_schedule 22 Nov 2009 16:52:29 -0000
***************
*** 47,53 ****
# execute two copy tests parallel, to check that copy itself
# is concurrent safe.
# ----------
! test: copy copyselect
# ----------
# Another group of parallel tests
--- 47,55 ----
# execute two copy tests parallel, to check that copy itself
# is concurrent safe.
# ----------
! test: copy copyselect
! test: copy_partitioning
! test: copy_partitioning_trigger
# ----------
# Another group of parallel tests
Index: doc/src/sgml/ref/copy.sgml
===================================================================
RCS file: /home/manu/cvsrepo/pgsql/doc/src/sgml/ref/copy.sgml,v
retrieving revision 1.92
diff -c -r1.92 copy.sgml
*** doc/src/sgml/ref/copy.sgml 21 Sep 2009 20:10:21 -0000 1.92
--- doc/src/sgml/ref/copy.sgml 22 Nov 2009 16:52:29 -0000
***************
*** 41,46 ****
--- 41,47 ----
ESCAPE 'escape_character'
FORCE_QUOTE { ( column [, ...] ) | * }
FORCE_NOT_NULL ( column [, ...] )
+ PARTITIONING [ boolean ]
***************
*** 282,287 ****
--- 283,298 ----
+
+ PARTITIONING>
+
+
+ In PARTITIONING> mode, COPY TO> a parent
+ table will automatically move each row to the child table that
+ has the matching constraints.
+
+
+
***************
*** 384,389 ****
--- 395,411 ----
VACUUM to recover the wasted space.
+
+ PARTITIONING> mode scans for each child table constraint in the
+ hierarchy to find a match. PARTITIONING> assumes that every child
+ table has at least one constraint defined otherwise an error is thrown. If child
+ tables have overlapping constraints, the row is inserted in the first child table
+ found (be it a cached table or the first table to appear in the lookup).
+ Before of after ROW triggers will generate an error and abort the COPY operation
+ if they modify the tuple value in a way that violates the constraints of the child
+ table where the tuple has been routed.
+
+
***************
*** 828,833 ****
--- 850,993 ----
0000200 M B A B W E 377 377 377 377 377 377
+
+
+ Multiple options are separated by a comma like:
+
+ COPY (SELECT t FROM foo WHERE id = 1) TO STDOUT (FORMAT CSV, HEADER, FORCE_QUOTE (t));
+
+
+
+
+ Partitioning examples
+
+ Here is an example on how to use partitioning. Let's first create a parent
+ table and 3 child tables as follows:
+
+ CREATE TABLE y2008 (
+ id int not null,
+ date date not null,
+ value int
+ );
+
+ CREATE TABLE jan2008 (
+ CHECK ( date >= DATE '2008-01-01' AND date < DATE '2008-02-01' )
+ ) INHERITS (y2008);
+
+ CREATE TABLE feb2008 (
+ CHECK ( date >= DATE '2008-02-01' AND date < DATE '2008-03-01' )
+ ) INHERITS (y2008);
+
+ CREATE TABLE mar2008 (
+ CHECK ( date >= DATE '2008-03-01' AND date < DATE '2008-04-01' )
+ ) INHERITS (y2008);
+
+ We prepare the following data file (1 row for each child table):
+ copy_input.data content:
+
+ 11 '2008-01-10' 11
+ 12 '2008-02-15' 12
+ 13 '2008-03-15' 13
+ 21 '2008-01-10' 11
+ 31 '2008-01-10' 11
+ 41 '2008-01-10' 11
+ 22 '2008-02-15' 12
+ 23 '2008-03-15' 13
+ 32 '2008-02-15' 12
+ 33 '2008-03-15' 13
+ 42 '2008-02-15' 12
+ 43 '2008-03-15' 13
+
+ If we COPY the data in the parent table without partitioning enabled, all
+ rows are inserted in the master table as in this example:
+
+ COPY y2008 FROM 'copy_input.data';
+
+ SELECT COUNT(*) FROM y2008;
+ count
+ -------
+ 12
+ (1 row)
+
+ SELECT COUNT(*) FROM jan2008;
+ count
+ -------
+ 0
+ (1 row)
+
+ SELECT COUNT(*) FROM feb2008;
+ count
+ -------
+ 0
+ (1 row)
+
+ SELECT COUNT(*) FROM mar2008;
+ count
+ -------
+ 0
+ (1 row)
+
+ DELETE FROM y2008;
+
+ If we execute COPY with partitioning enabled, rows are loaded in the
+ appropriate child table automatically as in this example:
+
+ COPY y2008 FROM 'copy_input.data' (PARTITIONING);
+
+ SELECT * FROM y2008;
+ id | date | value
+ ----+------------+-------
+ 11 | 01-10-2008 | 11
+ 21 | 01-10-2008 | 11
+ 31 | 01-10-2008 | 11
+ 41 | 01-10-2008 | 11
+ 12 | 02-15-2008 | 12
+ 22 | 02-15-2008 | 12
+ 32 | 02-15-2008 | 12
+ 42 | 02-15-2008 | 12
+ 13 | 03-15-2008 | 13
+ 23 | 03-15-2008 | 13
+ 33 | 03-15-2008 | 13
+ 43 | 03-15-2008 | 13
+ (12 rows)
+
+ SELECT * FROM jan2008;
+ id | date | value
+ ----+------------+-------
+ 11 | 01-10-2008 | 11
+ 21 | 01-10-2008 | 11
+ 31 | 01-10-2008 | 11
+ 41 | 01-10-2008 | 11
+ (4 rows)
+
+ SELECT * FROM feb2008;
+ id | date | value
+ ----+------------+-------
+ 12 | 02-15-2008 | 12
+ 22 | 02-15-2008 | 12
+ 32 | 02-15-2008 | 12
+ 42 | 02-15-2008 | 12
+ (4 rows)
+
+ SELECT * FROM mar2008;
+ id | date | value
+ ----+------------+-------
+ 13 | 03-15-2008 | 13
+ 23 | 03-15-2008 | 13
+ 33 | 03-15-2008 | 13
+ 43 | 03-15-2008 | 13
+ (4 rows)
+
+ The cache size can be tuned using:
+
+ set copy_partitioning_cache_size = 3;
+
+ Repeating the COPY command will now be faster:
+
+ COPY y2008 FROM 'copy_input.data' (PARTITIONING);
+
+
+
Index: src/test/regress/input/copy_partitioning_trigger.source
===================================================================
RCS file: src/test/regress/input/copy_partitioning_trigger.source
diff -N src/test/regress/input/copy_partitioning_trigger.source
*** /dev/null 1 Jan 1970 00:00:00 -0000
--- src/test/regress/input/copy_partitioning_trigger.source 1 Jan 1970 00:00:00 -0000
***************
*** 0 ****
--- 1,60 ----
+ -- Test triggers with partitioning
+ create table t(i int);
+ create table t1 (check (i > 0 and i <= 1)) inherits (t);
+ create table t2 (check (i > 1 and i <= 2)) inherits (t);
+ create table t3 (check (i > 2 and i <= 3)) inherits (t);
+
+ create table audit(i int);
+
+ create function audit() returns trigger as $$ begin insert into audit(i) values (new.i); return new; end; $$ language plpgsql;
+
+ create trigger t_a after insert on t for each row execute procedure audit();
+ -- the before trigger on the t would get fired
+ -- create trigger t_a2 before insert on t for each row execute procedure audit();
+ create trigger t1_a before insert on t1 for each row execute procedure audit();
+ create trigger t1_a2 after insert on t1 for each row execute procedure audit();
+
+ copy t from stdin with (partitioning);
+ 1
+ 2
+ 3
+ \.
+
+ -- no rows if trigger does not work
+ select * from audit;
+
+ drop table t cascade;
+ drop table audit cascade;
+ drop function audit();
+
+ -- Test bad before row trigger
+ create table t(i int);
+ create table t1 (check (i > 0 and i <= 1)) inherits (t);
+ create table t2 (check (i > 1 and i <= 2)) inherits (t);
+
+ create function i2() returns trigger as $$ begin NEW.i := 2; return NEW; end; $$ language plpgsql;
+ create trigger t1_before before insert on t1 for each row execute procedure i2();
+
+ -- COPY should fail
+ copy t from stdin with (partitioning);
+ 1
+ \.
+
+ drop table t cascade;
+ drop function i2();
+
+ -- Test bad after row trigger
+ create table t(i int);
+ create table t1 (check (i > 0 and i <= 1)) inherits (t);
+ create table t2 (check (i > 1 and i <= 2)) inherits (t);
+
+ create function i2() returns trigger as $$ begin NEW.i := 2; return NEW; end; $$ language plpgsql;
+ create trigger t1_after after insert on t1 for each row execute procedure i2();
+
+ -- COPY should fail
+ copy t from stdin with (partitioning);
+ 1
+ \.
+
+ drop table t cascade;
+ drop function i2();
Index: src/test/regress/data/copy_input.data
===================================================================
RCS file: src/test/regress/data/copy_input.data
diff -N src/test/regress/data/copy_input.data
*** /dev/null 1 Jan 1970 00:00:00 -0000
--- src/test/regress/data/copy_input.data 1 Jan 1970 00:00:00 -0000
***************
*** 0 ****
--- 1,12 ----
+ 11 '2008-01-19' 11
+ 12 '2008-02-15' 12
+ 13 '2008-03-15' 13
+ 21 '2008-01-10' 11
+ 31 '2008-01-10' 11
+ 41 '2008-01-10' 11
+ 22 '2008-02-15' 12
+ 23 '2008-03-15' 13
+ 32 '2008-02-15' 12
+ 33 '2008-03-15' 13
+ 42 '2008-02-15' 12
+ 43 '2008-03-15' 13
Index: src/test/regress/input/copy_partitioning.source
===================================================================
RCS file: src/test/regress/input/copy_partitioning.source
diff -N src/test/regress/input/copy_partitioning.source
*** /dev/null 1 Jan 1970 00:00:00 -0000
--- src/test/regress/input/copy_partitioning.source 1 Jan 1970 00:00:00 -0000
***************
*** 0 ****
--- 1,105 ----
+ -- test 1
+ create table parent(i int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent);
+ copy parent from stdin with (partitioning);
+ 1
+ \.
+
+ drop table parent cascade;
+
+ create table parent(i int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent);
+ copy parent from stdin with (partitioning);
+ 1
+ \.
+
+ drop table parent cascade;
+
+ -- test 2 (index update check)
+ create table parent(i int, j int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent);
+ create table c2 (check (i > 1 and i <= 2)) inherits (parent);
+ create table c3 (check (i > 2 and i <= 3)) inherits (parent);
+
+ create index c1_idx on c1(j);
+ copy (select i % 3 + 1, i from generate_series(1, 1000) s(i)) to '/tmp/parent';
+ copy parent from '/tmp/parent' with (partitioning);
+ analyse;
+
+ set enable_seqscan to false;
+ -- no rows if index was not updated
+ select * from c1 where j = 3;
+
+ set enable_seqscan to true;
+ set enable_indexscan to false;
+ -- 1 row
+ select * from c1 where j = 3;
+ drop table parent cascade;
+
+ -- test 3
+ CREATE TABLE y2008 (
+ id int not null,
+ date date not null,
+ value int,
+ primary key(id)
+ );
+
+ CREATE TABLE jan2008 (
+ CHECK ( date >= DATE '2008-01-01' AND date < DATE '2008-02-01' )
+ ) INHERITS (y2008);
+
+ CREATE TABLE jan2008half1 (
+ CHECK ( date >= DATE '2008-01-01' AND date < DATE '2008-01-15' )
+ ) INHERITS (jan2008);
+
+ CREATE TABLE jan2008half2 (
+ CHECK ( date >= DATE '2008-01-16' AND date < DATE '2008-01-31' )
+ ) INHERITS (jan2008);
+
+ CREATE TABLE feb2008 (
+ CHECK ( date >= DATE '2008-02-01' AND date < DATE '2008-03-01' )
+ ) INHERITS (y2008);
+
+ CREATE TABLE mar2008 (
+ CHECK ( date >= DATE '2008-03-01' AND date < DATE '2008-04-01' )
+ ) INHERITS (y2008);
+
+ COPY y2008 FROM '@abs_srcdir@/data/copy_input.data';
+
+ SELECT COUNT(*) FROM y2008;
+ SELECT COUNT(*) FROM jan2008;
+ SELECT COUNT(*) FROM jan2008half1;
+ SELECT COUNT(*) FROM jan2008half2;
+ SELECT COUNT(*) FROM feb2008;
+ SELECT COUNT(*) FROM mar2008;
+
+ DELETE FROM y2008;
+ COPY y2008 FROM '@abs_srcdir@/data/copy_input.data' (PARTITIONING);
+ SELECT * FROM y2008 ORDER BY id;
+ SELECT * FROM jan2008 ORDER BY id;
+ SELECT * FROM jan2008half1 ORDER BY id;
+ SELECT * FROM jan2008half2 ORDER BY id;
+ SELECT * FROM feb2008 ORDER BY id;
+ SELECT * FROM mar2008 ORDER BY id;
+ DELETE FROM y2008;
+
+ DROP TABLE y2008 CASCADE;
+
+ -- test 4 (cache testing)
+ create table parent1(i int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent1);
+ create table parent2(i int);
+ create table c2 (check (i > 0 and i <= 1)) inherits (parent2);
+ copy parent1 from stdin with (partitioning);
+ 1
+ \.
+
+ copy parent2 from stdin with (partitioning);
+ 1
+ \.
+
+ -- If the caching does not work all tuples will go to parent1
+ select * from parent1;
+ select * from parent2;
+ drop table parent1 cascade;
+ drop table parent2 cascade;
Index: src/test/regress/output/copy_partitioning.source
===================================================================
RCS file: src/test/regress/output/copy_partitioning.source
diff -N src/test/regress/output/copy_partitioning.source
*** /dev/null 1 Jan 1970 00:00:00 -0000
--- src/test/regress/output/copy_partitioning.source 1 Jan 1970 00:00:00 -0000
***************
*** 0 ****
--- 1,194 ----
+ -- test 1
+ create table parent(i int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent);
+ copy parent from stdin with (partitioning);
+ drop table parent cascade;
+ NOTICE: drop cascades to table c1
+ create table parent(i int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent);
+ copy parent from stdin with (partitioning);
+ drop table parent cascade;
+ NOTICE: drop cascades to table c1
+ -- test 2 (index update check)
+ create table parent(i int, j int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent);
+ create table c2 (check (i > 1 and i <= 2)) inherits (parent);
+ create table c3 (check (i > 2 and i <= 3)) inherits (parent);
+ create index c1_idx on c1(j);
+ copy (select i % 3 + 1, i from generate_series(1, 1000) s(i)) to '/tmp/parent';
+ copy parent from '/tmp/parent' with (partitioning);
+ analyse;
+ set enable_seqscan to false;
+ -- no rows if index was not updated
+ select * from c1 where j = 3;
+ i | j
+ ---+---
+ 1 | 3
+ (1 row)
+
+ set enable_seqscan to true;
+ set enable_indexscan to false;
+ -- 1 row
+ select * from c1 where j = 3;
+ i | j
+ ---+---
+ 1 | 3
+ (1 row)
+
+ drop table parent cascade;
+ NOTICE: drop cascades to 3 other objects
+ DETAIL: drop cascades to table c1
+ drop cascades to table c2
+ drop cascades to table c3
+ -- test 3
+ CREATE TABLE y2008 (
+ id int not null,
+ date date not null,
+ value int,
+ primary key(id)
+ );
+ NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "y2008_pkey" for table "y2008"
+ CREATE TABLE jan2008 (
+ CHECK ( date >= DATE '2008-01-01' AND date < DATE '2008-02-01' )
+ ) INHERITS (y2008);
+ CREATE TABLE jan2008half1 (
+ CHECK ( date >= DATE '2008-01-01' AND date < DATE '2008-01-15' )
+ ) INHERITS (jan2008);
+ CREATE TABLE jan2008half2 (
+ CHECK ( date >= DATE '2008-01-16' AND date < DATE '2008-01-31' )
+ ) INHERITS (jan2008);
+ CREATE TABLE feb2008 (
+ CHECK ( date >= DATE '2008-02-01' AND date < DATE '2008-03-01' )
+ ) INHERITS (y2008);
+ CREATE TABLE mar2008 (
+ CHECK ( date >= DATE '2008-03-01' AND date < DATE '2008-04-01' )
+ ) INHERITS (y2008);
+ COPY y2008 FROM '@abs_srcdir@/data/copy_input.data';
+ SELECT COUNT(*) FROM y2008;
+ count
+ -------
+ 12
+ (1 row)
+
+ SELECT COUNT(*) FROM jan2008;
+ count
+ -------
+ 0
+ (1 row)
+
+ SELECT COUNT(*) FROM jan2008half1;
+ count
+ -------
+ 0
+ (1 row)
+
+ SELECT COUNT(*) FROM jan2008half2;
+ count
+ -------
+ 0
+ (1 row)
+
+ SELECT COUNT(*) FROM feb2008;
+ count
+ -------
+ 0
+ (1 row)
+
+ SELECT COUNT(*) FROM mar2008;
+ count
+ -------
+ 0
+ (1 row)
+
+ DELETE FROM y2008;
+ COPY y2008 FROM '@abs_srcdir@/data/copy_input.data' (PARTITIONING);
+ SELECT * FROM y2008 ORDER BY id;
+ id | date | value
+ ----+------------+-------
+ 11 | 01-19-2008 | 11
+ 12 | 02-15-2008 | 12
+ 13 | 03-15-2008 | 13
+ 21 | 01-10-2008 | 11
+ 22 | 02-15-2008 | 12
+ 23 | 03-15-2008 | 13
+ 31 | 01-10-2008 | 11
+ 32 | 02-15-2008 | 12
+ 33 | 03-15-2008 | 13
+ 41 | 01-10-2008 | 11
+ 42 | 02-15-2008 | 12
+ 43 | 03-15-2008 | 13
+ (12 rows)
+
+ SELECT * FROM jan2008 ORDER BY id;
+ id | date | value
+ ----+------------+-------
+ 11 | 01-19-2008 | 11
+ 21 | 01-10-2008 | 11
+ 31 | 01-10-2008 | 11
+ 41 | 01-10-2008 | 11
+ (4 rows)
+
+ SELECT * FROM jan2008half1 ORDER BY id;
+ id | date | value
+ ----+------------+-------
+ 21 | 01-10-2008 | 11
+ 31 | 01-10-2008 | 11
+ 41 | 01-10-2008 | 11
+ (3 rows)
+
+ SELECT * FROM jan2008half2 ORDER BY id;
+ id | date | value
+ ----+------------+-------
+ 11 | 01-19-2008 | 11
+ (1 row)
+
+ SELECT * FROM feb2008 ORDER BY id;
+ id | date | value
+ ----+------------+-------
+ 12 | 02-15-2008 | 12
+ 22 | 02-15-2008 | 12
+ 32 | 02-15-2008 | 12
+ 42 | 02-15-2008 | 12
+ (4 rows)
+
+ SELECT * FROM mar2008 ORDER BY id;
+ id | date | value
+ ----+------------+-------
+ 13 | 03-15-2008 | 13
+ 23 | 03-15-2008 | 13
+ 33 | 03-15-2008 | 13
+ 43 | 03-15-2008 | 13
+ (4 rows)
+
+ DELETE FROM y2008;
+ DROP TABLE y2008 CASCADE;
+ NOTICE: drop cascades to 5 other objects
+ DETAIL: drop cascades to table jan2008
+ drop cascades to table jan2008half1
+ drop cascades to table jan2008half2
+ drop cascades to table feb2008
+ drop cascades to table mar2008
+ -- test 4 (cache testing)
+ create table parent1(i int);
+ create table c1 (check (i > 0 and i <= 1)) inherits (parent1);
+ create table parent2(i int);
+ create table c2 (check (i > 0 and i <= 1)) inherits (parent2);
+ copy parent1 from stdin with (partitioning);
+ copy parent2 from stdin with (partitioning);
+ -- If the caching does not work all tuples will go to parent1
+ select * from parent1;
+ i
+ ---
+ 1
+ (1 row)
+
+ select * from parent2;
+ i
+ ---
+ 1
+ (1 row)
+
+ drop table parent1 cascade;
+ NOTICE: drop cascades to table c1
+ drop table parent2 cascade;
+ NOTICE: drop cascades to table c2
Index: src/test/regress/output/copy_partitioning_trigger.source
===================================================================
RCS file: src/test/regress/output/copy_partitioning_trigger.source
diff -N src/test/regress/output/copy_partitioning_trigger.source
*** /dev/null 1 Jan 1970 00:00:00 -0000
--- src/test/regress/output/copy_partitioning_trigger.source 1 Jan 1970 00:00:00 -0000
***************
*** 0 ****
--- 1,58 ----
+ -- Test triggers with partitioning
+ create table t(i int);
+ create table t1 (check (i > 0 and i <= 1)) inherits (t);
+ create table t2 (check (i > 1 and i <= 2)) inherits (t);
+ create table t3 (check (i > 2 and i <= 3)) inherits (t);
+ create table audit(i int);
+ create function audit() returns trigger as $$ begin insert into audit(i) values (new.i); return new; end; $$ language plpgsql;
+ create trigger t_a after insert on t for each row execute procedure audit();
+ -- the before trigger on the t would get fired
+ -- create trigger t_a2 before insert on t for each row execute procedure audit();
+ create trigger t1_a before insert on t1 for each row execute procedure audit();
+ create trigger t1_a2 after insert on t1 for each row execute procedure audit();
+ copy t from stdin with (partitioning);
+ -- no rows if trigger does not work
+ select * from audit;
+ i
+ ---
+ 1
+ 1
+ (2 rows)
+
+ drop table t cascade;
+ NOTICE: drop cascades to 3 other objects
+ DETAIL: drop cascades to table t1
+ drop cascades to table t2
+ drop cascades to table t3
+ drop table audit cascade;
+ drop function audit();
+ -- Test bad before row trigger
+ create table t(i int);
+ create table t1 (check (i > 0 and i <= 1)) inherits (t);
+ create table t2 (check (i > 1 and i <= 2)) inherits (t);
+ create function i2() returns trigger as $$ begin NEW.i := 2; return NEW; end; $$ language plpgsql;
+ create trigger t1_before before insert on t1 for each row execute procedure i2();
+ -- COPY should fail
+ copy t from stdin with (partitioning);
+ ERROR: Before row insert trigger on table "t1" modified partitioning routing decision. Aborting insert.
+ CONTEXT: COPY t, line 1: "1"
+ drop table t cascade;
+ NOTICE: drop cascades to 2 other objects
+ DETAIL: drop cascades to table t1
+ drop cascades to table t2
+ drop function i2();
+ -- Test bad after row trigger
+ create table t(i int);
+ create table t1 (check (i > 0 and i <= 1)) inherits (t);
+ create table t2 (check (i > 1 and i <= 2)) inherits (t);
+ create function i2() returns trigger as $$ begin NEW.i := 2; return NEW; end; $$ language plpgsql;
+ create trigger t1_after after insert on t1 for each row execute procedure i2();
+ -- COPY should fail
+ copy t from stdin with (partitioning);
+ ERROR: After row insert trigger on table "t1" modified partitioning routing decision. Aborting insert.
+ CONTEXT: COPY t, line 1: "1"
+ drop table t cascade;
+ NOTICE: drop cascades to 2 other objects
+ DETAIL: drop cascades to table t1
+ drop cascades to table t2
+ drop function i2();