[PATCH 1/4] Add "COPY ... TO FUNCTION ..." support - Mailing list pgsql-hackers
| From | Daniel Farina |
|---|---|
| Subject | [PATCH 1/4] Add "COPY ... TO FUNCTION ..." support |
| Date | |
| Msg-id | 1259012082-6196-2-git-send-email-dfarina@truviso.com Whole thread Raw |
| In response to | [PATCH 0/4] COPY to a UDF: "COPY ... TO FUNCTION ..." (Daniel Farina <dfarina@truviso.com>) |
| List | pgsql-hackers |
This relatively small change enables all sort of new and shiny evil by
allowing specification of a function to COPY that accepts each
produced row's content in turn. The function must accept a single
INTERNAL-type argument, which is actually of the type StringInfo.
Patch highlights:
- Grammar production changes to enable "TO FUNCTION <qualified name>"
- A new enumeration value in CopyDest to indicate this new mode called COPY_FN.
- CopyStateData's "filename" field has been renamed "destination" and is now a Node type. Before it was either a
stringor NULL; now it may be a RangeVar, a string, or NULL. Some code now has to go through some minor strVal()
unboxingfor the regular TO '/file' behavior.
- Due to the relatively restricted way this function can be called it was possible to reduce per-row overhead by
computingthe FunctionCallInfo once and then reusing it, as opposed to simply using one of the convenience functions
inthe fmgr.
- Add and expose the makeNameListFromRangeVar procedure to src/catalog/namespace.c, the inverse of
makeRangeVarFromNameList.
Signed-off-by: Daniel Farina <dfarina@truviso.com>
---src/backend/catalog/namespace.c | 21 +++++src/backend/commands/copy.c | 190
+++++++++++++++++++++++++++++++++-----src/backend/executor/spi.c | 2 +-src/backend/nodes/copyfuncs.c | 2
+-src/backend/nodes/equalfuncs.c | 2 +-src/backend/parser/gram.y | 30 ++++--src/include/catalog/namespace.h
| 1 +src/include/nodes/parsenodes.h | 3 +-8 files changed, 212 insertions(+), 39 deletions(-)
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 99c9140..8911e29 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -2467,6 +2467,27 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)}/*
+ * makeNameListFromRangeVar
+ * Utility routine to convert a qualified-name list into RangeVar form.
+ */
+List *
+makeNameListFromRangeVar(RangeVar *rangevar)
+{
+ List *names = NIL;
+
+ Assert(rangevar->relname != NULL);
+ names = lcons(makeString(rangevar->relname), names);
+
+ if (rangevar->schemaname != NULL)
+ names = lcons(makeString(rangevar->schemaname), names);
+
+ if (rangevar->catalogname != NULL)
+ names = lcons(makeString(rangevar->catalogname), names);
+
+ return names;
+}
+
+/* * makeRangeVarFromNameList * Utility routine to convert a qualified-name list into RangeVar form. */
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5e95fd7..985505a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,7 @@#include "mb/pg_wchar.h"#include "miscadmin.h"#include "optimizer/planner.h"
+#include "parser/parse_func.h"#include "parser/parse_relation.h"#include "rewrite/rewriteHandler.h"#include
"storage/fd.h"
@@ -55,7 +56,8 @@ typedef enum CopyDest{ COPY_FILE, /* to/from file */ COPY_OLD_FE,
/* to/from frontend (2.0 protocol) */
- COPY_NEW_FE /* to/from frontend (3.0 protocol) */
+ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
+ COPY_FN /* to function */} CopyDest;/*
@@ -104,7 +106,8 @@ typedef struct CopyStateData Relation rel; /* relation to copy to or from */
QueryDesc *queryDesc; /* executable query to copy from */ List *attnumlist; /* integer list of
attnumsto copy */
- char *filename; /* filename, or NULL for STDIN/STDOUT */
+ Node *destination; /* filename, or NULL for STDIN/STDOUT, or a
+ * function */ bool binary; /* binary format? */ bool
oids; /* include OIDs? */ bool csv_mode; /* Comma Separated Value format? */
@@ -131,6 +134,13 @@ typedef struct CopyStateData MemoryContext rowcontext; /* per-row evaluation context */
/*
+ * For writing rows out to a function. Used if copy_dest == COPY_FN
+ *
+ * Avoids repeated use of DirectFunctionCall for efficiency.
+ */
+ FunctionCallInfoData output_fcinfo;
+
+ /* * These variables are used to reduce overhead in textual COPY FROM. * * attribute_buf holds the
separated,de-escaped text for each field of
@@ -425,9 +435,11 @@ CopySendEndOfRow(CopyState cstate){ StringInfo fe_msgbuf = cstate->fe_msgbuf;
+ /* Take care adding row delimiters*/ switch (cstate->copy_dest) { case COPY_FILE:
+ case COPY_FN: if (!cstate->binary) { /* Default line termination depends
onplatform */
@@ -437,6 +449,18 @@ CopySendEndOfRow(CopyState cstate) CopySendString(cstate, "\r\n");#endif
}
+ break;
+ case COPY_NEW_FE:
+ case COPY_OLD_FE:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ if (!cstate->binary)
+ CopySendChar(cstate, '\n');
+ break;
+ }
+
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE: (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file);
@@ -446,10 +470,6 @@ CopySendEndOfRow(CopyState cstate) errmsg("could not write to COPY file:
%m"))); break; case COPY_OLD_FE:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->binary)
- CopySendChar(cstate, '\n');
- if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) { /* no hope of recovering
connectionsync, so FATAL */
@@ -459,13 +479,19 @@ CopySendEndOfRow(CopyState cstate) } break; case COPY_NEW_FE:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->binary)
- CopySendChar(cstate, '\n');
- /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d',
fe_msgbuf->data,fe_msgbuf->len); break;
+ case COPY_FN:
+ FunctionCallInvoke(&cstate->output_fcinfo);
+
+ /*
+ * These are set earlier and are not supposed to change row to row.
+ */
+ Assert(cstate->output_fcinfo.arg[0] ==
+ PointerGetDatum(cstate->fe_msgbuf));
+ Assert(!cstate->output_fcinfo.argnull[0]);
+ break; } resetStringInfo(fe_msgbuf);
@@ -577,6 +603,12 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) bytesread +=
avail; } break;
+ case COPY_FN:
+ /*
+ * Should be disallowed by some prior step
+ */
+ Assert(false);
+ break; } return bytesread;
@@ -719,7 +751,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString){ CopyState cstate; bool
is_from= stmt->is_from;
- bool pipe = (stmt->filename == NULL);
+ bool pipe = (stmt->destination == NULL); List *attnamelist = stmt->attlist; List
*force_quote= NIL; List *force_notnull = NIL;
@@ -986,6 +1018,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString) errhint("Anyone can COPY to
stdoutor from stdin. " "psql's \\copy command also works for anyone.")));
+ /* Disallow COPY ... FROM FUNCTION (only TO FUNCTION supported) */
+ if (is_from && cstate->destination != NULL &&
+ IsA(cstate->destination, RangeVar))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY FROM does not support functions as sources")));
+
+ if (stmt->relation) { Assert(!stmt->query);
@@ -1183,7 +1223,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) cstate->encoding_embeds_ascii =
PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */
- cstate->filename = stmt->filename;
+ cstate->destination = stmt->destination; if (is_from) CopyFrom(cstate); /* copy from file to
database*/
@@ -1225,7 +1265,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)static voidDoCopyTo(CopyState cstate){
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->destination == NULL); if (cstate->rel) {
@@ -1257,37 +1297,128 @@ DoCopyTo(CopyState cstate) else cstate->copy_file = stdout; }
- else
+ else if (IsA(cstate->destination, String)) { mode_t oumask; /* Pre-existing umask value */
struct stat st;
+ char *dest_filename = strVal(cstate->destination); /* * Prevent write to relative path
...too easy to shoot oneself in the * foot by overwriting a database file ... */
- if (!is_absolute_path(cstate->filename))
+ if (!is_absolute_path(dest_filename)) ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file")));
oumask= umask((mode_t) 022);
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+ cstate->copy_file = AllocateFile(dest_filename, PG_BINARY_W); umask(oumask); if
(cstate->copy_file== NULL) ereport(ERROR, (errcode_for_file_access(),
errmsg("couldnot open file \"%s\" for writing: %m",
- cstate->filename)));
+ dest_filename))); fstat(fileno(cstate->copy_file), &st); if
(S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ errmsg("\"%s\" is a directory", dest_filename))); }
+ /* Branch taken in the "COPY ... TO FUNCTION funcname" situation */
+ else if (IsA(cstate->destination, RangeVar))
+ {
+ List *names;
+ FmgrInfo *flinfo;
+ FuncDetailCode fdresult;
+ Oid funcid;
+ Oid rettype;
+ bool retset;
+ int nvargs;
+ Oid *true_typeids;
+ const int nargs = 1;
+ Oid argtypes[] = { INTERNALOID };
+
+ /* Flip copy-action dispatch flag */
+ cstate->copy_dest = COPY_FN;
+
+ /* Make an fcinfo that can be reused and is stored on the cstate. */
+ names = makeNameListFromRangeVar((RangeVar *) cstate->destination);
+ flinfo = palloc0(sizeof *flinfo);
+
+
+ fdresult = func_get_detail(names, NIL, NIL, nargs, argtypes, false,
+ false,
+
+ /* Begin out-arguments */
+ &funcid, &rettype, &retset, &nvargs,
+ &true_typeids, NULL);
+
+ /*
+ * Check to ensure that this is a "normal" function when looked up,
+ * otherwise error.
+ */
+ switch (fdresult)
+ {
+ /* Normal function found; do nothing */
+ case FUNCDETAIL_NORMAL:
+ break;
+
+ case FUNCDETAIL_NOTFOUND:
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("function %s does not exist",
+ func_signature_string(names, nargs, NIL,
+ argtypes))));
+ break;
+
+ case FUNCDETAIL_AGGREGATE:
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("function %s must not be an aggregate",
+ func_signature_string(names, nargs, NIL,
+ argtypes))));
+ break;
+
+ case FUNCDETAIL_WINDOWFUNC:
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("function %s must not be a window function",
+ func_signature_string(names, nargs, NIL,
+ argtypes))));
+ break;
+
+ case FUNCDETAIL_COERCION:
+ /*
+ * Should never be yielded from func_get_detail if it is passed
+ * fargs == NIL, as it is previously.
+ */
+ Assert(false);
+ break;
+
+ case FUNCDETAIL_MULTIPLE:
+ /*
+ * Only support one signature, thus overloading of a name with
+ * different types should never occur.
+ */
+ Assert(false);
+ break;
+
+ }
+
+ fmgr_info(funcid, flinfo);
+ InitFunctionCallInfoData(cstate->output_fcinfo, flinfo,
+ 1, NULL, NULL);
+ }
+ else
+ /* Unexpected type was found for cstate->destination. */
+ Assert(false);
+
+ PG_TRY(); { if (cstate->fe_copy)
@@ -1310,13 +1441,13 @@ DoCopyTo(CopyState cstate) } PG_END_TRY();
- if (!pipe)
+ if (!pipe && cstate->copy_dest != COPY_FN) { if (FreeFile(cstate->copy_file)) ereport(ERROR,
(errcode_for_file_access(), errmsg("could not write to file \"%s\": %m",
- cstate->filename)));
+ strVal(cstate->destination)))); }}
@@ -1342,6 +1473,13 @@ CopyTo(CopyState cstate) /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
+ /*
+ * fe_msgbuf is never rebound, so there is only a need to set up the
+ * output_fcinfo once.
+ */
+ cstate->output_fcinfo.arg[0] = PointerGetDatum(cstate->fe_msgbuf);
+ cstate->output_fcinfo.argnull[0] = false;
+ /* Get info about the columns we need to process. */ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs
*sizeof(FmgrInfo)); foreach(cur, cstate->attnumlist)
@@ -1668,7 +1806,7 @@ limit_printout_length(const char *str)static voidCopyFrom(CopyState cstate){
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->destination == NULL); HeapTuple tuple; TupleDesc tupDesc;
Form_pg_attribute*attr;
@@ -1768,19 +1906,21 @@ CopyFrom(CopyState cstate) { struct stat st;
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+ cstate->copy_file = AllocateFile(strVal(cstate->destination),
+ PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR,
(errcode_for_file_access(), errmsg("could not open file \"%s\" for reading: %m",
- cstate->filename)));
+ strVal(cstate->destination)))); fstat(fileno(cstate->copy_file), &st); if
(S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ errmsg("\"%s\" is a directory",
+ strVal(cstate->destination)))); } tupDesc = RelationGetDescr(cstate->rel);
@@ -2215,7 +2355,7 @@ CopyFrom(CopyState cstate) ereport(ERROR,
(errcode_for_file_access(), errmsg("could not read from file \"%s\": %m",
- cstate->filename)));
+ strVal(cstate->destination)))); } /*
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index f045f9c..0914dc9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -1829,7 +1829,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, {
CopyStmt *cstmt = (CopyStmt *) stmt;
- if (cstmt->filename == NULL)
+ if (cstmt->destination == NULL) { my_res =
SPI_ERROR_COPY; goto fail;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 8bc72d1..9b39abe 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2485,7 +2485,7 @@ _copyCopyStmt(CopyStmt *from) COPY_NODE_FIELD(query); COPY_NODE_FIELD(attlist);
COPY_SCALAR_FIELD(is_from);
- COPY_STRING_FIELD(filename);
+ COPY_NODE_FIELD(destination); COPY_NODE_FIELD(options); return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3d65d8b..6ddf226 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1085,7 +1085,7 @@ _equalCopyStmt(CopyStmt *a, CopyStmt *b) COMPARE_NODE_FIELD(query);
COMPARE_NODE_FIELD(attlist); COMPARE_SCALAR_FIELD(is_from);
- COMPARE_STRING_FIELD(filename);
+ COMPARE_NODE_FIELD(destination); COMPARE_NODE_FIELD(options); return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 130e6f4..23331ee 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -251,8 +251,7 @@ static TypeName *TableFuncTypeName(List *columns);%type <value> TriggerFuncArg%type <node>
TriggerWhen
-%type <str> copy_file_name
- database_name access_method_clause access_method attr_name
+%type <str> database_name access_method_clause access_method attr_name index_name name
cursor_namefile_name cluster_index_specification%type <list> func_name handler_name qual_Op qual_all_Op subquery_Op
@@ -433,6 +432,8 @@ static TypeName *TableFuncTypeName(List *columns);%type <ival> opt_frame_clause frame_extent
frame_bound
+%type <node> copy_file_or_function_name
+/* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their
numericcodes do not depend on
@@ -1977,14 +1978,15 @@ ClosePortalStmt:
*****************************************************************************/CopyStmt: COPY opt_binary
qualified_nameopt_column_list opt_oids
- copy_from copy_file_name copy_delimiter opt_with copy_options
+ copy_from copy_file_or_function_name copy_delimiter opt_with
+ copy_options { CopyStmt *n = makeNode(CopyStmt);
n->relation= $3; n->query = NULL; n->attlist = $4; n->is_from
=$6;
- n->filename = $7;
+ n->destination = $7; n->options = NIL; /* Concatenate
user-suppliedflags */
@@ -1998,14 +2000,15 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids
n->options= list_concat(n->options, $10); $$ = (Node *)n; }
- | COPY select_with_parens TO copy_file_name opt_with copy_options
+ | COPY select_with_parens TO copy_file_or_function_name
+ opt_with copy_options { CopyStmt *n = makeNode(CopyStmt);
n->relation = NULL; n->query = $2; n->attlist = NIL;
n->is_from= false;
- n->filename = $4;
+ n->destination = $4; n->options = $6; $$ = (Node *)n;
}
@@ -2021,10 +2024,17 @@ copy_from: * used depends on the direction. (It really doesn't make sense to copy from *
stdout.We silently correct the "typo".) - AY 9/94 */
-copy_file_name:
- Sconst { $$ = $1; }
- | STDIN { $$ = NULL; }
- | STDOUT { $$ = NULL; }
+copy_file_or_function_name:
+ Sconst { $$ = (Node *) makeString($1); }
+
+ /*
+ * Note that func_name is not used here because there is no need to
+ * accept the "funcname(TYPES)" construction, as there is only one
+ * valid signature.
+ */
+ | FUNCTION qualified_name { $$ = (Node *) $2; }
+ | STDIN { $$ = NULL; }
+ | STDOUT { $$ = NULL; } ;copy_options: copy_opt_list
{ $$ = $1; }
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index d356635..1d801cd 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -94,6 +94,7 @@ extern Oid LookupExplicitNamespace(const char *nspname);extern Oid
LookupCreationNamespace(constchar *nspname);extern Oid QualifiedNameGetCreationNamespace(List *names, char
**objname_p);
+extern List *makeNameListFromRangeVar(RangeVar *rangevar);extern RangeVar *makeRangeVarFromNameList(List
*names);externchar *NameListToString(List *names);extern char *NameListToQuotedString(List *names);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b34300f..203088c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1293,7 +1293,8 @@ typedef struct CopyStmt List *attlist; /* List of column names (as Strings), or
NIL * for all columns */ bool is_from; /* TO or FROM */
- char *filename; /* filename, or NULL for STDIN/STDOUT */
+ Node *destination; /* filename, or NULL for STDIN/STDOUT, or a
+ * function */ List *options; /* List of DefElem nodes */} CopyStmt;
--
1.6.5.3
pgsql-hackers by date: