[PATCH 1/4] Add "COPY ... TO FUNCTION ..." support

From: Daniel Farina <dfarina(at)truviso(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Cc: Daniel Farina <dfarina(at)truviso(dot)com>
Subject: [PATCH 1/4] Add "COPY ... TO FUNCTION ..." support
Date: 2009-11-23 21:34:39
Message-ID: 1259012082-6196-2-git-send-email-dfarina@truviso.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

This relatively small change enables all sort of new and shiny evil by
allowing specification of a function to COPY that accepts each
produced row's content in turn. The function must accept a single
INTERNAL-type argument, which is actually of the type StringInfo.

Patch highlights:

- Grammar production changes to enable "TO FUNCTION <qualified name>"

- A new enumeration value in CopyDest to indicate this new mode
called COPY_FN.

- CopyStateData's "filename" field has been renamed "destination"
and is now a Node type. Before it was either a string or NULL;
now it may be a RangeVar, a string, or NULL. Some code now has to
go through some minor strVal() unboxing for the regular TO '/file'
behavior.

- Due to the relatively restricted way this function can be called
it was possible to reduce per-row overhead by computing the
FunctionCallInfo once and then reusing it, as opposed to simply
using one of the convenience functions in the fmgr.

- Add and expose the makeNameListFromRangeVar procedure to
src/catalog/namespace.c, the inverse of makeRangeVarFromNameList.

Signed-off-by: Daniel Farina <dfarina(at)truviso(dot)com>
---
src/backend/catalog/namespace.c | 21 +++++
src/backend/commands/copy.c | 190 +++++++++++++++++++++++++++++++++-----
src/backend/executor/spi.c | 2 +-
src/backend/nodes/copyfuncs.c | 2 +-
src/backend/nodes/equalfuncs.c | 2 +-
src/backend/parser/gram.y | 30 ++++--
src/include/catalog/namespace.h | 1 +
src/include/nodes/parsenodes.h | 3 +-
8 files changed, 212 insertions(+), 39 deletions(-)

diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 99c9140..8911e29 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -2467,6 +2467,27 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)
}

/*
+ * makeNameListFromRangeVar
+ * Utility routine to convert a qualified-name list into RangeVar form.
+ */
+List *
+makeNameListFromRangeVar(RangeVar *rangevar)
+{
+ List *names = NIL;
+
+ Assert(rangevar->relname != NULL);
+ names = lcons(makeString(rangevar->relname), names);
+
+ if (rangevar->schemaname != NULL)
+ names = lcons(makeString(rangevar->schemaname), names);
+
+ if (rangevar->catalogname != NULL)
+ names = lcons(makeString(rangevar->catalogname), names);
+
+ return names;
+}
+
+/*
* makeRangeVarFromNameList
* Utility routine to convert a qualified-name list into RangeVar form.
*/
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5e95fd7..985505a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,7 @@
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/planner.h"
+#include "parser/parse_func.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
@@ -55,7 +56,8 @@ typedef enum CopyDest
{
COPY_FILE, /* to/from file */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
- COPY_NEW_FE /* to/from frontend (3.0 protocol) */
+ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
+ COPY_FN /* to function */
} CopyDest;

/*
@@ -104,7 +106,8 @@ typedef struct CopyStateData
Relation rel; /* relation to copy to or from */
QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
- char *filename; /* filename, or NULL for STDIN/STDOUT */
+ Node *destination; /* filename, or NULL for STDIN/STDOUT, or a
+ * function */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool csv_mode; /* Comma Separated Value format? */
@@ -131,6 +134,13 @@ typedef struct CopyStateData
MemoryContext rowcontext; /* per-row evaluation context */

/*
+ * For writing rows out to a function. Used if copy_dest == COPY_FN
+ *
+ * Avoids repeated use of DirectFunctionCall for efficiency.
+ */
+ FunctionCallInfoData output_fcinfo;
+
+ /*
* These variables are used to reduce overhead in textual COPY FROM.
*
* attribute_buf holds the separated, de-escaped text for each field of
@@ -425,9 +435,11 @@ CopySendEndOfRow(CopyState cstate)
{
StringInfo fe_msgbuf = cstate->fe_msgbuf;

+ /* Take care adding row delimiters*/
switch (cstate->copy_dest)
{
case COPY_FILE:
+ case COPY_FN:
if (!cstate->binary)
{
/* Default line termination depends on platform */
@@ -437,6 +449,18 @@ CopySendEndOfRow(CopyState cstate)
CopySendString(cstate, "\r\n");
#endif
}
+ break;
+ case COPY_NEW_FE:
+ case COPY_OLD_FE:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ if (!cstate->binary)
+ CopySendChar(cstate, '\n');
+ break;
+ }
+
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE:

(void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
1, cstate->copy_file);
@@ -446,10 +470,6 @@ CopySendEndOfRow(CopyState cstate)
errmsg("could not write to COPY file: %m")));
break;
case COPY_OLD_FE:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->binary)
- CopySendChar(cstate, '\n');
-
if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
{
/* no hope of recovering connection sync, so FATAL */
@@ -459,13 +479,19 @@ CopySendEndOfRow(CopyState cstate)
}
break;
case COPY_NEW_FE:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->binary)
- CopySendChar(cstate, '\n');
-
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_FN:
+ FunctionCallInvoke(&cstate->output_fcinfo);
+
+ /*
+ * These are set earlier and are not supposed to change row to row.
+ */
+ Assert(cstate->output_fcinfo.arg[0] ==
+ PointerGetDatum(cstate->fe_msgbuf));
+ Assert(!cstate->output_fcinfo.argnull[0]);
+ break;
}

resetStringInfo(fe_msgbuf);
@@ -577,6 +603,12 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
bytesread += avail;
}
break;
+ case COPY_FN:
+ /*
+ * Should be disallowed by some prior step
+ */
+ Assert(false);
+ break;
}

return bytesread;
@@ -719,7 +751,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
{
CopyState cstate;
bool is_from = stmt->is_from;
- bool pipe = (stmt->filename == NULL);
+ bool pipe = (stmt->destination == NULL);
List *attnamelist = stmt->attlist;
List *force_quote = NIL;
List *force_notnull = NIL;
@@ -986,6 +1018,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));

+ /* Disallow COPY ... FROM FUNCTION (only TO FUNCTION supported) */
+ if (is_from && cstate->destination != NULL &&
+ IsA(cstate->destination, RangeVar))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY FROM does not support functions as sources")));
+
+
if (stmt->relation)
{
Assert(!stmt->query);
@@ -1183,7 +1223,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);

cstate->copy_dest = COPY_FILE; /* default */
- cstate->filename = stmt->filename;
+ cstate->destination = stmt->destination;

if (is_from)
CopyFrom(cstate); /* copy from file to database */
@@ -1225,7 +1265,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
static void
DoCopyTo(CopyState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->destination == NULL);

if (cstate->rel)
{
@@ -1257,37 +1297,128 @@ DoCopyTo(CopyState cstate)
else
cstate->copy_file = stdout;
}
- else
+ else if (IsA(cstate->destination, String))
{
mode_t oumask; /* Pre-existing umask value */
struct stat st;
+ char *dest_filename = strVal(cstate->destination);

/*
* Prevent write to relative path ... too easy to shoot oneself in the
* foot by overwriting a database file ...
*/
- if (!is_absolute_path(cstate->filename))
+ if (!is_absolute_path(dest_filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));

oumask = umask((mode_t) 022);
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+ cstate->copy_file = AllocateFile(dest_filename, PG_BINARY_W);
umask(oumask);

if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m",
- cstate->filename)));
+ dest_filename)));

fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ errmsg("\"%s\" is a directory", dest_filename)));
}

+ /* Branch taken in the "COPY ... TO FUNCTION funcname" situation */
+ else if (IsA(cstate->destination, RangeVar))
+ {
+ List *names;
+ FmgrInfo *flinfo;
+ FuncDetailCode fdresult;
+ Oid funcid;
+ Oid rettype;
+ bool retset;
+ int nvargs;
+ Oid *true_typeids;
+ const int nargs = 1;
+ Oid argtypes[] = { INTERNALOID };
+
+ /* Flip copy-action dispatch flag */
+ cstate->copy_dest = COPY_FN;
+
+ /* Make an fcinfo that can be reused and is stored on the cstate. */
+ names = makeNameListFromRangeVar((RangeVar *) cstate->destination);
+ flinfo = palloc0(sizeof *flinfo);
+
+
+ fdresult = func_get_detail(names, NIL, NIL, nargs, argtypes, false,
+ false,
+
+ /* Begin out-arguments */
+ &funcid, &rettype, &retset, &nvargs,
+ &true_typeids, NULL);
+
+ /*
+ * Check to ensure that this is a "normal" function when looked up,
+ * otherwise error.
+ */
+ switch (fdresult)
+ {
+ /* Normal function found; do nothing */
+ case FUNCDETAIL_NORMAL:
+ break;
+
+ case FUNCDETAIL_NOTFOUND:
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("function %s does not exist",
+ func_signature_string(names, nargs, NIL,
+ argtypes))));
+ break;
+
+ case FUNCDETAIL_AGGREGATE:
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("function %s must not be an aggregate",
+ func_signature_string(names, nargs, NIL,
+ argtypes))));
+ break;
+
+ case FUNCDETAIL_WINDOWFUNC:
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("function %s must not be a window function",
+ func_signature_string(names, nargs, NIL,
+ argtypes))));
+ break;
+
+ case FUNCDETAIL_COERCION:
+ /*
+ * Should never be yielded from func_get_detail if it is passed
+ * fargs == NIL, as it is previously.
+ */
+ Assert(false);
+ break;
+
+ case FUNCDETAIL_MULTIPLE:
+ /*
+ * Only support one signature, thus overloading of a name with
+ * different types should never occur.
+ */
+ Assert(false);
+ break;
+
+ }
+
+ fmgr_info(funcid, flinfo);
+ InitFunctionCallInfoData(cstate->output_fcinfo, flinfo,
+ 1, NULL, NULL);
+ }
+ else
+ /* Unexpected type was found for cstate->destination. */
+ Assert(false);
+
+
PG_TRY();
{
if (cstate->fe_copy)
@@ -1310,13 +1441,13 @@ DoCopyTo(CopyState cstate)
}
PG_END_TRY();

- if (!pipe)
+ if (!pipe && cstate->copy_dest != COPY_FN)
{
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m",
- cstate->filename)));
+ strVal(cstate->destination))));
}
}

@@ -1342,6 +1473,13 @@ CopyTo(CopyState cstate)
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();

+ /*
+ * fe_msgbuf is never rebound, so there is only a need to set up the
+ * output_fcinfo once.
+ */
+ cstate->output_fcinfo.arg[0] = PointerGetDatum(cstate->fe_msgbuf);
+ cstate->output_fcinfo.argnull[0] = false;
+
/* Get info about the columns we need to process. */
cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
foreach(cur, cstate->attnumlist)
@@ -1668,7 +1806,7 @@ limit_printout_length(const char *str)
static void
CopyFrom(CopyState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->destination == NULL);
HeapTuple tuple;
TupleDesc tupDesc;
Form_pg_attribute *attr;
@@ -1768,19 +1906,21 @@ CopyFrom(CopyState cstate)
{
struct stat st;

- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+ cstate->copy_file = AllocateFile(strVal(cstate->destination),
+ PG_BINARY_R);

if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
- cstate->filename)));
+ strVal(cstate->destination))));

fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ errmsg("\"%s\" is a directory",
+ strVal(cstate->destination))));
}

tupDesc = RelationGetDescr(cstate->rel);
@@ -2215,7 +2355,7 @@ CopyFrom(CopyState cstate)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from file \"%s\": %m",
- cstate->filename)));
+ strVal(cstate->destination))));
}

/*
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index f045f9c..0914dc9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -1829,7 +1829,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
CopyStmt *cstmt = (CopyStmt *) stmt;

- if (cstmt->filename == NULL)
+ if (cstmt->destination == NULL)
{
my_res = SPI_ERROR_COPY;
goto fail;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 8bc72d1..9b39abe 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2485,7 +2485,7 @@ _copyCopyStmt(CopyStmt *from)
COPY_NODE_FIELD(query);
COPY_NODE_FIELD(attlist);
COPY_SCALAR_FIELD(is_from);
- COPY_STRING_FIELD(filename);
+ COPY_NODE_FIELD(destination);
COPY_NODE_FIELD(options);

return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3d65d8b..6ddf226 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1085,7 +1085,7 @@ _equalCopyStmt(CopyStmt *a, CopyStmt *b)
COMPARE_NODE_FIELD(query);
COMPARE_NODE_FIELD(attlist);
COMPARE_SCALAR_FIELD(is_from);
- COMPARE_STRING_FIELD(filename);
+ COMPARE_NODE_FIELD(destination);
COMPARE_NODE_FIELD(options);

return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 130e6f4..23331ee 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -251,8 +251,7 @@ static TypeName *TableFuncTypeName(List *columns);
%type <value> TriggerFuncArg
%type <node> TriggerWhen

-%type <str> copy_file_name
- database_name access_method_clause access_method attr_name
+%type <str> database_name access_method_clause access_method attr_name
index_name name cursor_name file_name cluster_index_specification

%type <list> func_name handler_name qual_Op qual_all_Op subquery_Op
@@ -433,6 +432,8 @@ static TypeName *TableFuncTypeName(List *columns);
%type <ival> opt_frame_clause frame_extent frame_bound


+%type <node> copy_file_or_function_name
+
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
* They must be listed first so that their numeric codes do not depend on
@@ -1977,14 +1978,15 @@ ClosePortalStmt:
*****************************************************************************/

CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids
- copy_from copy_file_name copy_delimiter opt_with copy_options
+ copy_from copy_file_or_function_name copy_delimiter opt_with
+ copy_options
{
CopyStmt *n = makeNode(CopyStmt);
n->relation = $3;
n->query = NULL;
n->attlist = $4;
n->is_from = $6;
- n->filename = $7;
+ n->destination = $7;

n->options = NIL;
/* Concatenate user-supplied flags */
@@ -1998,14 +2000,15 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids
n->options = list_concat(n->options, $10);
$$ = (Node *)n;
}
- | COPY select_with_parens TO copy_file_name opt_with copy_options
+ | COPY select_with_parens TO copy_file_or_function_name
+ opt_with copy_options
{
CopyStmt *n = makeNode(CopyStmt);
n->relation = NULL;
n->query = $2;
n->attlist = NIL;
n->is_from = false;
- n->filename = $4;
+ n->destination = $4;
n->options = $6;
$$ = (Node *)n;
}
@@ -2021,10 +2024,17 @@ copy_from:
* used depends on the direction. (It really doesn't make sense to copy from
* stdout. We silently correct the "typo".) - AY 9/94
*/
-copy_file_name:
- Sconst { $$ = $1; }
- | STDIN { $$ = NULL; }
- | STDOUT { $$ = NULL; }
+copy_file_or_function_name:
+ Sconst { $$ = (Node *) makeString($1); }
+
+ /*
+ * Note that func_name is not used here because there is no need to
+ * accept the "funcname(TYPES)" construction, as there is only one
+ * valid signature.
+ */
+ | FUNCTION qualified_name { $$ = (Node *) $2; }
+ | STDIN { $$ = NULL; }
+ | STDOUT { $$ = NULL; }
;

copy_options: copy_opt_list { $$ = $1; }
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index d356635..1d801cd 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -94,6 +94,7 @@ extern Oid LookupExplicitNamespace(const char *nspname);

extern Oid LookupCreationNamespace(const char *nspname);
extern Oid QualifiedNameGetCreationNamespace(List *names, char **objname_p);
+extern List *makeNameListFromRangeVar(RangeVar *rangevar);
extern RangeVar *makeRangeVarFromNameList(List *names);
extern char *NameListToString(List *names);
extern char *NameListToQuotedString(List *names);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b34300f..203088c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1293,7 +1293,8 @@ typedef struct CopyStmt
List *attlist; /* List of column names (as Strings), or NIL
* for all columns */
bool is_from; /* TO or FROM */
- char *filename; /* filename, or NULL for STDIN/STDOUT */
+ Node *destination; /* filename, or NULL for STDIN/STDOUT, or a
+ * function */
List *options; /* List of DefElem nodes */
} CopyStmt;

--
1.6.5.3

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Daniel Farina 2009-11-23 21:34:40 [PATCH 2/4] Add tests for "COPY ... TO FUNCTION ..."
Previous Message Daniel Farina 2009-11-23 21:34:38 [PATCH 0/4] COPY to a UDF: "COPY ... TO FUNCTION ..."