diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 4dfedf8..26f81f3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1065,6 +1065,21 @@ LANGUAGE INTERNAL STRICT IMMUTABLE PARALLEL SAFE AS 'jsonb_insert'; +CREATE OR REPLACE FUNCTION copy_srf( + IN filename text, + IN is_program boolean DEFAULT false, + IN format text DEFAULT null, + IN delimiter text DEFAULT null, + IN null_string text DEFAULT null, + IN header boolean DEFAULT null, + IN quote text DEFAULT null, + IN escape text DEFAULT null, + IN encoding text DEFAULT null) +RETURNS SETOF RECORD +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 CALLED ON NULL INPUT +AS 'copy_srf'; + -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather -- than use explicit 'superuser()' checks in those functions, we use the GRANT diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f9362be..4e6a32c 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -30,6 +30,7 @@ #include "commands/defrem.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" @@ -286,7 +287,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, +static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, + TupleDesc rsTupDesc, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options); static void EndCopy(CopyState cstate); @@ -562,7 +564,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) errmsg("could not read from COPY file: %m"))); break; case COPY_OLD_FE: - /* * We cannot read more than minread bytes (which in practice is 1) * because old protocol doesn't have any clear way of separating @@ -967,7 +968,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, PreventCommandIfReadOnly("COPY FROM"); PreventCommandIfParallelMode("COPY FROM"); - cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, + cstate = BeginCopyFrom(pstate, rel, NULL, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); cstate->range_table = range_table; *processed = CopyFrom(cstate); /* copy from file to database */ @@ -1370,6 +1371,7 @@ static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, + TupleDesc rsTupDesc, RawStmt *raw_query, Oid queryRelId, List *attnamelist, @@ -1396,6 +1398,9 @@ BeginCopy(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, cstate, is_from, options); + /* Cannot have both a relation a result set */ + Assert(rel == NULL || rsTupDesc == NULL); + /* Process the source/target relation or query */ if (rel) { @@ -1436,6 +1441,14 @@ BeginCopy(ParseState *pstate, cstate->partition_tuple_slot = partition_tuple_slot; } } + else if (rsTupDesc) + { + /* COPY FROM file/program to result set */ + Assert(!raw_query); + Assert(is_from); + Assert(attnamelist == NIL); + tupDesc = rsTupDesc; + } else { List *rewritten; @@ -1802,7 +1815,7 @@ BeginCopyTo(ParseState *pstate, RelationGetRelationName(rel)))); } - cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist, + cstate = BeginCopy(pstate, false, rel, NULL, query, queryRelId, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); @@ -2875,6 +2888,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, CopyState BeginCopyFrom(ParseState *pstate, Relation rel, + TupleDesc rsTupDesc, const char *filename, bool is_program, List *attnamelist, @@ -2895,13 +2909,14 @@ BeginCopyFrom(ParseState *pstate, MemoryContext oldcontext; bool volatile_defexprs; - cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); + cstate = BeginCopy(pstate, true, rel, rsTupDesc, NULL, InvalidOid, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Initialize state variables */ cstate->fe_eof = false; cstate->eol_type = EOL_UNKNOWN; - cstate->cur_relname = RelationGetRelationName(cstate->rel); + if (cstate->rel) + cstate->cur_relname = RelationGetRelationName(cstate->rel); cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; @@ -2913,7 +2928,10 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; - tupDesc = RelationGetDescr(cstate->rel); + if (cstate->rel) + tupDesc = RelationGetDescr(cstate->rel); + else + tupDesc = rsTupDesc; attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; num_defaults = 0; @@ -2950,8 +2968,10 @@ BeginCopyFrom(ParseState *pstate, { /* attribute is NOT to be copied from input */ /* use default value if one exists */ - Expr *defexpr = (Expr *) build_column_default(cstate->rel, - attnum); + Expr *defexpr = NULL; + + if (rel) + defexpr = (Expr *) build_column_default(cstate->rel, attnum); if (defexpr != NULL) { @@ -4740,3 +4760,167 @@ CreateCopyDestReceiver(void) return (DestReceiver *) self; } + +Datum +copy_srf(PG_FUNCTION_ARGS) +{ + ParseState *pstate = NULL; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + + CopyState cstate; + int col; + + Datum *values; + bool *nulls; + + char *filename = NULL; + bool is_program = false; + List *options = NIL; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc == NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + values = (Datum *) palloc(tupdesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); + in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid)); + + for (col = 0; col < tupdesc->natts; col++) + { + getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]); + fmgr_info(in_func_oid,&in_functions[col]); + } + + /*---------- + * Function signature is: + * copy_srf( filename text, + * is_program boolean default false, + * format text default null, + * delimiter text default null, + * null_string text default null, + * header boolean default null, + * quote text default null, + * escape text default null, + * encoding text default null + *---------- + */ + /* param 0: filename */ + if (! PG_ARGISNULL(0)) + filename = TextDatumGetCString(PG_GETARG_TEXT_P(0)); + + /* param 1: is_program */ + if (! PG_ARGISNULL(1)) + is_program = PG_GETARG_BOOL(1); + + if (filename == NULL && is_program) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("is_program cannot be true in pipe mode"))); + + + /* param 2: format - text, csv, binary */ + if (! PG_ARGISNULL(2)) + options = lcons(makeDefElem("format", + (Node *) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(2))),-1), + options); + + /* param 3: delimiter text default E'\t', */ + if (! PG_ARGISNULL(3)) + options = lcons(makeDefElem("delimiter", + (Node *) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(3))),-1), + options); + + /* param 4: null_string text default '\N', */ + if (! PG_ARGISNULL(4)) + options = lcons(makeDefElem("null", + (Node *) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(4))),-1), + options); + + /* param 5: header boolean default false, */ + if (! PG_ARGISNULL(5)) + if (PG_GETARG_BOOL(5)) + options = lcons(makeDefElem("header", (Node *) makeString( "true"),-1),options); + + /* param 6: quote text default '"', */ + if (! PG_ARGISNULL(6)) + options = lcons(makeDefElem("quote", + (Node *) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(6))),-1), + options); + + /* param 7: escape text default null, -- defaults to whatever quote is */ + if (! PG_ARGISNULL(7)) + options = lcons(makeDefElem("escape", + (Node *) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(6))),-1), + options); + + /* param 8: encoding text default null */ + if (! PG_ARGISNULL(8)) + options = lcons(makeDefElem("encoding", + (Node *) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(8))),-1), + options); + + /* let the caller know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + per_query_ctx = fcinfo->flinfo->fn_mcxt; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true,false,work_mem); + + cstate = BeginCopyFrom(pstate, NULL, tupdesc, filename, is_program, NIL, options); + + while(1) + { + char **field_strings; + int field_strings_count; + int col; + + if (! NextCopyFromRawFields(cstate,&field_strings,&field_strings_count)) + { + break; + } + if (field_strings_count != tupdesc->natts) + { + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("found %d fields but expected %d on line %d", + field_strings_count, tupdesc->natts, cstate->cur_lineno))); + } + + for (col = 0; col < tupdesc->natts; col++) + { + values[col] = InputFunctionCall(&in_functions[col], + field_strings[col], + typioparams[col], + tupdesc->attrs[col]->atttypmod); + nulls[col] = (field_strings[col] == NULL); + } + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + tuplestore_donestoring(tupstore); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + EndCopy(cstate); + + MemoryContextSwitchTo(oldcontext); + + return (Datum) 0; +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index c1f492b..9fb2ff7 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5359,6 +5359,10 @@ DESCR("pg_controldata init state information as a function"); DATA(insert OID = 3445 ( pg_import_system_collations PGNSP PGUID 12 100 0 0 0 f f f f t f v r 2 0 2278 "16 4089" _null_ _null_ "{if_not_exists,schema}" _null_ _null_ pg_import_system_collations _null_ _null_ _null_ )); DESCR("import collations from operating system"); +DATA(insert OID = 3353 ( copy_srf PGNSP PGUID 12 1 0 0 0 f f f f f t v u 9 0 2249 "25 16 25 25 25 16 25 25 25" _null_ _null_ _null_ _null_ _null_ copy_srf _null_ _null_ _null_ )); +DESCR("set-returning COPY proof of concept"); + + /* * Symbolic values for provolatile column: these indicate whether the result * of a function is dependent *only* on the values of its explicit arguments, diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index d63ca0f..6364604 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -27,8 +27,8 @@ extern void DoCopy(ParseState *state, const CopyStmt *stmt, uint64 *processed); extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options); -extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, - bool is_program, List *attnamelist, List *options); +extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, TupleDesc rsTupDesc, + const char *filename, bool is_program, List *attnamelist, List *options); extern void EndCopyFrom(CopyState cstate); extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls, Oid *tupleOid); @@ -38,4 +38,7 @@ extern void CopyFromErrorCallback(void *arg); extern DestReceiver *CreateCopyDestReceiver(void); +extern Datum copy_srf(PG_FUNCTION_ARGS); + + #endif /* COPY_H */