diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 4dfedf8..ae07cfb 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1065,6 +1065,21 @@ LANGUAGE INTERNAL STRICT IMMUTABLE PARALLEL SAFE AS 'jsonb_insert'; +CREATE OR REPLACE FUNCTION copy_srf( + IN filename text DEFAULT null, + IN is_program boolean DEFAULT false, + IN format text DEFAULT 'text', + IN delimiter text DEFAULT null, + IN null_string text DEFAULT E'\\N', + IN header boolean DEFAULT false, + IN quote text DEFAULT null, + IN escape text DEFAULT null, + IN encoding text DEFAULT null) +RETURNS SETOF RECORD +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 CALLED ON NULL INPUT +AS 'copy_srf'; + -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather -- than use explicit 'superuser()' checks in those functions, we use the GRANT diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f9362be..8e1bd39 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -30,6 +30,7 @@ #include "commands/defrem.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" @@ -562,7 +563,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) errmsg("could not read from COPY file: %m"))); break; case COPY_OLD_FE: - /* * We cannot read more than minread bytes (which in practice is 1) * because old protocol doesn't have any clear way of separating @@ -4740,3 +4740,377 @@ CreateCopyDestReceiver(void) return (DestReceiver *) self; } + +Datum +copy_srf(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + + CopyStateData copy_state; + int col; + + Datum *values; + bool *nulls; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + } + + if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + } + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + values = (Datum *) palloc(tupdesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); + in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid)); + + for (col = 0; col < tupdesc->natts; col++) + { + getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]); + fmgr_info(in_func_oid,&in_functions[col]); + } + + /* + * Function signature is: + * copy_srf( filename text default null, + * is_program boolean default false, + * format text default 'text', + * delimiter text default E'\t' in text mode, ',' in csv mode, + * null_string text default '\N', + * header boolean default false, + * quote text default '"' in csv mode only, + * escape text default null, -- defaults to whatever quote is + * encoding text default null + */ + + /* Mock up a copy state */ + initStringInfo(©_state.line_buf); + initStringInfo(©_state.attribute_buf); + copy_state.fe_msgbuf = makeStringInfo(); + copy_state.oids = false; + copy_state.freeze = false; + + copy_state.need_transcoding = false; + copy_state.encoding_embeds_ascii = false; + copy_state.rel = NULL; + copy_state.queryDesc = NULL; + + /* param 0: filename */ + if (PG_ARGISNULL(0)) + { + copy_state.copy_dest = COPY_NEW_FE; + copy_state.filename = NULL; + } + else + { + copy_state.copy_dest = COPY_FILE; + copy_state.filename = TextDatumGetCString(PG_GETARG_TEXT_P(0)); + } + + /* param 1: is_program */ + if (PG_ARGISNULL(1)) + { + copy_state.is_program = false; + } + else + { + copy_state.is_program = PG_GETARG_BOOL(1); + } + + /* param 2: format - text, csv, binary */ + if (PG_ARGISNULL(2)) + { + copy_state.binary = false; + copy_state.csv_mode = false; + } + else + { + char* format_str = TextDatumGetCString(PG_GETARG_TEXT_P(2)); + if (strcmp(format_str,"text") == 0) + { + copy_state.binary = false; + copy_state.csv_mode = false; + } + else if (strcmp(format_str,"csv") == 0) + { + copy_state.binary = false; + copy_state.csv_mode = true; + } + else if (strcmp(format_str,"binary") == 0) + { + copy_state.binary = true; + copy_state.csv_mode = false; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Format must be one of: text csv binary"))); + } + } + + /* param 3: delimiter text default E'\t', */ + if (PG_ARGISNULL(3)) + { + copy_state.delim = copy_state.csv_mode ? "," : "\t"; + } + else + { + if (copy_state.binary) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify DELIMITER in BINARY mode"))); + } + copy_state.delim = TextDatumGetCString(PG_GETARG_TEXT_P(3)); + + if (strlen(copy_state.delim) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY delimiter must be a single one-byte character"))); + } + + /* Disallow end-of-line characters */ + if (strchr(copy_state.delim, '\r') != NULL || + strchr(copy_state.delim, '\n') != NULL) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY delimiter cannot be newline or carriage return"))); + } + } + + /* param 4: null_string text default '\N', */ + if (PG_ARGISNULL(4)) + { + copy_state.null_print = copy_state.csv_mode ? "" : "\\N"; + } + else + { + copy_state.null_print = TextDatumGetCString(PG_GETARG_TEXT_P(4)); + } + copy_state.null_print_len = strlen(copy_state.null_print); + /* NOT SET char *null_print_client; */ + + /* param 5: header boolean default false, */ + if (PG_ARGISNULL(5)) + { + copy_state.header_line = false; + } + else + { + copy_state.header_line = PG_GETARG_BOOL(5); + } + + /* param 6: quote text default '"', */ + if (PG_ARGISNULL(6)) + { + copy_state.quote = "\""; + } + else + { + if (copy_state.csv_mode) + { + if (strlen(copy_state.quote) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY quote must be a single one-byte character"))); + } + + if (copy_state.delim[0] == copy_state.quote[0]) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY delimiter and quote must be different"))); + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY quote available only in CSV mode"))); + } + copy_state.quote = TextDatumGetCString(PG_GETARG_TEXT_P(6)); + } + + /* param 7: escape text default null, -- defaults to whatever quote is */ + if (PG_ARGISNULL(7)) + { + copy_state.escape = copy_state.quote; + } + else + { + if (copy_state.csv_mode) + { + copy_state.escape = TextDatumGetCString(PG_GETARG_TEXT_P(7)); + if (strlen(copy_state.escape) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY escape must be a single one-byte character"))); + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY escape available only in CSV mode"))); + } + } + + /* param 8: encoding text default null */ + if (PG_ARGISNULL(8)) + { + copy_state.file_encoding = pg_get_client_encoding(); + } + else + { + char* encoding = TextDatumGetCString(PG_GETARG_TEXT_P(8)); + copy_state.file_encoding = pg_char_to_encoding(encoding); + if (copy_state.file_encoding < 0) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be a valid encoding name", + encoding))); + } + } + + copy_state.max_fields = tupdesc->natts; + copy_state.raw_fields = (char **) palloc(tupdesc->natts * sizeof(char *)); + + /* let the caller know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + per_query_ctx = fcinfo->flinfo->fn_mcxt; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true,false,work_mem); + + /* open "file" */ + if (copy_state.is_program) + { + copy_state.copy_file = OpenPipeStream(copy_state.filename, PG_BINARY_R); + + if (copy_state.copy_file == NULL) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not execute command \"%s\": %m", + copy_state.filename))); + } + } + else + { + struct stat st; + + copy_state.copy_file = AllocateFile(copy_state.filename, PG_BINARY_R); + if (copy_state.copy_file == NULL) + { + /* copy errno because ereport subfunctions might change it */ + int save_errno = errno; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + copy_state.filename), + (save_errno == ENOENT || save_errno == EACCES) ? + errhint("copy_srf instructs the PostgreSQL server process to read a file. " + "You may want a client-side facility such as psql's \\copy.") : 0)); + } + + if (fstat(fileno(copy_state.copy_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + copy_state.filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", copy_state.filename))); + } + + while(1) + { + char **field_strings; + int field_strings_count; + int col; + HeapTuple tuple; + + if (! NextCopyFromRawFields(©_state,&field_strings,&field_strings_count)) + { + break; + } + if (field_strings_count != tupdesc->natts) + { + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("found %d fields but expected %d on line %d", + field_strings_count, tupdesc->natts, copy_state.cur_lineno))); + } + + for (col = 0; col < tupdesc->natts; col++) + { + values[col] = InputFunctionCall(&in_functions[col], + field_strings[col], + typioparams[col], + tupdesc->attrs[col]->atttypmod); + nulls[col] = (field_strings[col] == NULL); + } + + tuple = heap_form_tuple(tupdesc,values,nulls); + //tuple = BuildTupleFromCStrings(attinmeta, field_strings); + tuplestore_puttuple(tupstore, tuple); + } + + /* close "file" */ + if (copy_state.is_program) + { + int pclose_rc; + + pclose_rc = ClosePipeStream(copy_state.copy_file); + if (pclose_rc == -1) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close pipe to external command: %m"))); + else if (pclose_rc != 0) + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("program \"%s\" failed", + copy_state.filename), + errdetail_internal("%s", wait_result_to_str(pclose_rc)))); + } + else + { + if (copy_state.filename != NULL && FreeFile(copy_state.copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + copy_state.filename))); + } + + tuplestore_donestoring(tupstore); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + return (Datum) 0; +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index c1f492b..9fb2ff7 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5359,6 +5359,10 @@ DESCR("pg_controldata init state information as a function"); DATA(insert OID = 3445 ( pg_import_system_collations PGNSP PGUID 12 100 0 0 0 f f f f t f v r 2 0 2278 "16 4089" _null_ _null_ "{if_not_exists,schema}" _null_ _null_ pg_import_system_collations _null_ _null_ _null_ )); DESCR("import collations from operating system"); +DATA(insert OID = 3353 ( copy_srf PGNSP PGUID 12 1 0 0 0 f f f f f t v u 9 0 2249 "25 16 25 25 25 16 25 25 25" _null_ _null_ _null_ _null_ _null_ copy_srf _null_ _null_ _null_ )); +DESCR("set-returning COPY proof of concept"); + + /* * Symbolic values for provolatile column: these indicate whether the result * of a function is dependent *only* on the values of its explicit arguments, diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index d63ca0f..de09841 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -38,4 +38,7 @@ extern void CopyFromErrorCallback(void *arg); extern DestReceiver *CreateCopyDestReceiver(void); +extern Datum copy_srf(PG_FUNCTION_ARGS); + + #endif /* COPY_H */