Re: Make COPY format extendable: Extract COPY TO format implementations - Mailing list pgsql-hackers
From | Sutou Kouhei |
---|---|
Subject | Re: Make COPY format extendable: Extract COPY TO format implementations |
Date | |
Msg-id | 20241127.165344.1671821093365528062.kou@clear-code.com Whole thread Raw |
In response to | Re: Make COPY format extendable: Extract COPY TO format implementations (Masahiko Sawada <sawada.mshk@gmail.com>) |
List | pgsql-hackers |
Hi, In <CAD21AoBW5dEv=Gd2iF_BYNZGEsF=3KTG7fpq=vP5qwpC1CAOeA@mail.gmail.com> "Re: Make COPY format extendable: Extract COPY TO format implementations" on Mon, 25 Nov 2024 23:10:50 -0800, Masahiko Sawada <sawada.mshk@gmail.com> wrote: >> Custom COPY format extensions need to use >> CopyToStateData/CopyFromStateData. For example, >> CopyToStateData::rel is used to retrieve table schema. If we >> move CopyToStateData to copyto_internal.h not copyapi.h, >> custom COPY format extensions need to include >> copyto_internal.h. I feel that it's strange that extensions >> need to use internal headers. >> >> What is your real concern? If you don't want to export >> CopyToStateData/CopyFromStateData entirely, we can provide >> accessors only for some members of them. > > I'm not against exposing CopyToStateData and CopyFromStateData. My > concern is that if we move all copy-related structs to copyapi.h, > other copy-related files would need to include copyapi.h even if the > file is not related to copy format APIs. IMO copyapi.h should have > only copy-format-API-related variables structs such as CopyFromRoutine > and CopyToRoutine and functions that custom COPY format extension can > utilize to access data source and destination, such as CopyGetData(). > > When it comes to CopyToStateData and CopyFromStateData, I feel that > they have mixed fields of common fields (e.g., rel, num_errors, and > transition_capture) and format-specific fields (e.g., input_buf, > line_buf, and eol_type). While it makes sense to me that custom copy > format extensions can access the common fields, it seems odd to me > that they can access text-and-csv-format-specific fields such as > input_buf. We might want to sort out these fields but it could be a > huge task. I understand you concern. How about using Copy{To,From}StateData::opaque to store text-and-csv-format-specific data? I feel that this refactoring doesn't block the 0001/0002 patches. Do you think that this is a blocker of the 0001/0002 patches? I think that this may block the 0004/0007 patches that export Copy{To,From}StateData. But we can work on it after we merge the 0004/0007 patches. Which is preferred? > Also, I realized that CopyFromTextLikeOneRow() does input function > calls and handle soft errors based on ON_ERROR and LOG_VERBOSITY > options. I think these should be done in the core side. How about extracting the following part in NextCopyFrom() as a function and provide it for extensions? ---- Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); cstate->num_errors++; if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) { /* * Since we emit line number and column info in the below * notice message, we suppress error context information * other than the relation name. */ Assert(!cstate->relname_only); cstate->relname_only = true; if (cstate->cur_attval) { char *attval; attval = CopyLimitPrintoutLength(cstate->cur_attval); ereport(NOTICE, errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", (unsigned long long) cstate->cur_lineno, cstate->cur_attname, attval)); pfree(attval); } else ereport(NOTICE, errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": nullinput", (unsigned long long) cstate->cur_lineno, cstate->cur_attname)); /* reset relname_only */ cstate->relname_only = false; } ---- See the attached v27 patch set for this idea. 0001-0008 are almost same as the v26 patch set. ("format" -> "FORMAT" in COPY test changes are included.) 0009 exports the above code as CopyFromSkipErrorRow(). Extensions should call it when they use errsave() for a soft error in CopyFromOneRow callback. Thanks, -- kou From b95060713e5cfccc8b3db5acb34d352f18a8b1e2 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:24:49 +0900 Subject: [PATCH v27 1/9] Refactor COPY TO to use format callback functions. This commit introduces a new CopyToRoutine struct, which is a set of callback routines to copy tuples in a specific format. It also makes the existing formats (text, CSV, and binary) utilize these format callbacks. This change is a preliminary step towards making the COPY TO command extensible in terms of output formats. Additionally, this refactoring contributes to a performance improvement by reducing the number of "if" branches that need to be checked on a per-row basis when sending field representations in text or CSV mode. The performance benchmark results showed ~5% performance gain in text or CSV mode. Author: Sutou Kouhei Reviewed-by: Michael Paquier, Tomas Vondra, Masahiko Sawada Reviewed-by: Junwang Zhao Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com --- src/backend/commands/copyto.c | 441 +++++++++++++++++++++---------- src/include/commands/copyapi.h | 57 ++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 358 insertions(+), 141 deletions(-) create mode 100644 src/include/commands/copyapi.h diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index f55e6d96751..f81dadcc12b 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -19,7 +19,7 @@ #include <sys/stat.h> #include "access/tableam.h" -#include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -64,6 +64,9 @@ typedef enum CopyDest */ typedef struct CopyToStateData { + /* format-specific routines */ + const CopyToRoutine *routine; + /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ @@ -114,6 +117,19 @@ static void CopyAttributeOutText(CopyToState cstate, const char *string); static void CopyAttributeOutCSV(CopyToState cstate, const char *string, bool use_quote); +/* built-in format-specific routines */ +static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc); +static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo); +static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot); +static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot); +static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot, + bool is_csv); +static void CopyToTextLikeEnd(CopyToState cstate); +static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc); +static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo); +static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot); +static void CopyToBinaryEnd(CopyToState cstate); + /* Low-level communications functions */ static void SendCopyBegin(CopyToState cstate); static void SendCopyEnd(CopyToState cstate); @@ -121,9 +137,254 @@ static void CopySendData(CopyToState cstate, const void *databuf, int datasize); static void CopySendString(CopyToState cstate, const char *str); static void CopySendChar(CopyToState cstate, char c); static void CopySendEndOfRow(CopyToState cstate); +static void CopySendTextLikeEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * COPY TO routines for built-in formats. + * + * CSV and text formats share the same TextLike routines except for the + * one-row callback. + */ + +/* text format */ +static const CopyToRoutine CopyToRoutineText = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +/* CSV format */ +static const CopyToRoutine CopyToRoutineCSV = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToCSVOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +/* binary format */ +static const CopyToRoutine CopyToRoutineBinary = { + .CopyToStart = CopyToBinaryStart, + .CopyToOutFunc = CopyToBinaryOutFunc, + .CopyToOneRow = CopyToBinaryOneRow, + .CopyToEnd = CopyToBinaryEnd, +}; + +/* Return COPY TO routines for the given options */ +static const CopyToRoutine * +CopyToGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyToRoutineCSV; + else if (opts.binary) + return &CopyToRoutineBinary; + + /* default is text */ + return &CopyToRoutineText; +} + +/* Implementation of the start callback for text and CSV formats */ +static void +CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* + * For non-binary copy, we need to convert null_print to file encoding, + * because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + ListCell *cur; + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false); + else + CopyAttributeOutText(cstate, colname); + } + + CopySendTextLikeEndOfRow(cstate); + } +} + +/* + * Implementation of the outfunc callback for text and CSV formats. Assign + * the output function data to the given *finfo. + */ +static void +CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the per-row callback for text format */ +static void +CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, false); +} + +/* Implementation of the per-row callback for CSV format */ +static void +CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, true); +} + +/* + * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow(). + * + * We use pg_attribute_always_inline to reduce function call overheads. + */ +static pg_attribute_always_inline void +CopyToTextLikeOneRow(CopyToState cstate, + TupleTableSlot *slot, + bool is_csv) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], + value); + + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1]); + else + CopyAttributeOutText(cstate, string); + } + } + + CopySendTextLikeEndOfRow(cstate); +} + +/* Implementation of the end callback for text and CSV formats */ +static void +CopyToTextLikeEnd(CopyToState cstate) +{ + /* Nothing to do here */ +} + +/* + * Implementation of the start callback for binary format. Send a header + * for a binary copy. + */ +static void +CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); +} + +/* + * Implementation of the outfunc callback for binary format. Assign + * the binary output function to the given *finfo. + */ +static void +CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the per-row callback for binary format */ +static void +CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], + value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +/* Implementation of the end callback for binary format */ +static void +CopyToBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -191,16 +452,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -235,10 +486,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -254,6 +501,35 @@ CopySendEndOfRow(CopyToState cstate) resetStringInfo(fe_msgbuf); } +/* + * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the + * the line termination and do common appropriate things for the end of row. + */ +static inline void +CopySendTextLikeEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + + /* Now take the actions related to the end of a row */ + CopySendEndOfRow(cstate); +} + /* * These functions do apply some data conversion */ @@ -426,6 +702,9 @@ BeginCopyTo(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyToGetRoutine(cstate->opts); + /* Process the source/target relation or query */ if (rel) { @@ -771,19 +1050,10 @@ DoCopyTo(CopyToState cstate) foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - if (cstate->opts.binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); } /* @@ -796,56 +1066,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->routine->CopyToStart(cstate, tupDesc); if (cstate->rel) { @@ -884,13 +1105,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -903,74 +1118,18 @@ DoCopyTo(CopyToState cstate) /* * Emit one row during DoCopyTo(). */ -static void +static inline void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - if (!cstate->opts.binary) - { - bool need_delim = false; - - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - char *string; - - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - - if (isnull) - CopySendString(cstate, cstate->opts.null_print_client); - else - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1]); - else - CopyAttributeOutText(cstate, string); - } - } - } - else - { - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - bytea *outputbytes; - - if (isnull) - CopySendInt32(cstate, -1); - else - { - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->routine->CopyToOneRow(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h new file mode 100644 index 00000000000..eccc875d0e8 --- /dev/null +++ b/src/include/commands/copyapi.h @@ -0,0 +1,57 @@ +/*------------------------------------------------------------------------- + * + * copyapi.h + * API for COPY TO handlers + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyapi.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYAPI_H +#define COPYAPI_H + +#include "commands/copy.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" + +/* + * API structure for a COPY TO format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyToRoutine +{ + /* + * Set output function information. This callback is called once at the + * beginning of COPY TO. + * + * 'finfo' can be optionally filled to provide the catalog information of + * the output function. + * + * 'atttypid' is the OID of data type used by the relation's attribute. + */ + void (*CopyToOutFunc) (CopyToState cstate, Oid atttypid, + FmgrInfo *finfo); + + /* + * Start a COPY TO. This callback is called once at the beginning of COPY + * FROM. + * + * 'tupDesc' is the tuple descriptor of the relation from where the data + * is read. + */ + void (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc); + + /* + * Write one row to the 'slot'. + */ + void (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot); + + /* End a COPY TO. This callback is called once at the end of COPY FROM */ + void (*CopyToEnd) (CopyToState cstate); +} CopyToRoutine; + +#endif /* COPYAPI_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b54428b38cd..8edb41cce2e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -503,6 +503,7 @@ CopyMultiInsertInfo CopyOnErrorChoice CopySource CopyStmt +CopyToRoutine CopyToState CopyToStateData Cost -- 2.45.2 From c7eba0bf7bf4c42933b71d98aa6d519af0ce0121 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Mon, 18 Nov 2024 16:32:43 -0800 Subject: [PATCH v27 2/9] Refactor COPY FROM to use format callback functions. This commit introduces a new CopyFromRoutine struct, which is a set of callback routines to read tuples in a specific format. It also makes COPY FROM with the existing formats (text, CSV, and binary) utilize these format callbacks. This change is a preliminary step towards making the COPY TO command extensible in terms of output formats. Similar to XXXX, this refactoring contributes to a performance improvement by reducing the number of "if" branches that need to be checked on a per-row basis when sending field representations in text or CSV mode. The performance benchmark results showed ~5% performance gain in text or CSV mode. Author: Sutou Kouhei Reviewed-by: Michael Paquier, Tomas Vondra, Masahiko Sawada Reviewed-by: Junwang Zhao Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com --- contrib/file_fdw/file_fdw.c | 1 - src/backend/commands/copyfrom.c | 190 +++++++-- src/backend/commands/copyfromparse.c | 504 +++++++++++++---------- src/include/commands/copy.h | 2 - src/include/commands/copyapi.h | 48 ++- src/include/commands/copyfrom_internal.h | 13 +- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 492 insertions(+), 267 deletions(-) diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 9e2896f32ae..bac31315fcb 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -21,7 +21,6 @@ #include "access/table.h" #include "catalog/pg_authid.h" #include "catalog/pg_foreign_table.h" -#include "commands/copy.h" #include "commands/copyfrom_internal.h" #include "commands/defrem.h" #include "commands/explain.h" diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 754cb496169..c84081c3ba3 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); +/* + * Built-in format-specific routines. One-row callbacks are defined in + * copyfromparse.c + */ +static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam); +static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc); +static void CopyFromTextLikeEnd(CopyFromState cstate); +static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); +static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc); +static void CopyFromBinaryEnd(CopyFromState cstate); + + +/* + * COPY FROM routines for built-in formats. + * + * CSV and text formats share the same TextLike routines except for the + * one-row callback. + */ + +/* text format */ +static const CopyFromRoutine CopyFromRoutineText = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +/* CSV format */ +static const CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromCSVOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +/* binary format */ +static const CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromInFunc = CopyFromBinaryInFunc, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + +/* Return COPY FROM routines for the given options */ +static const CopyFromRoutine * +CopyFromGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyFromRoutineCSV; + else if (opts.binary) + return &CopyFromRoutineBinary; + + /* default is text */ + return &CopyFromRoutineText; +} + +/* Implementation of the start callback for text and CSV formats */ +static void +CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Create workspace for CopyReadAttributes results; used by CSV and text + * format. + */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +/* + * Implementation of the infunc callback for text and CSV formats. Assign + * the input function data to the given *finfo. + */ +static void +CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the end callback for text and CSV formats */ +static void +CopyFromTextLikeEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* Implementation of the start callback for binary format */ +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +/* + * Implementation of the infunc callback for binary format. Assign + * the binary input function to the given *finfo. + */ +static void +CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the end callback for binary format */ +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + /* * error context callback for COPY FROM * @@ -1396,7 +1535,6 @@ BeginCopyFrom(ParseState *pstate, num_defaults; FmgrInfo *in_functions; Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1428,6 +1566,9 @@ BeginCopyFrom(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + /* Set the format routine */ + cstate->routine = CopyFromGetRoutine(cstate->opts); + /* Process the target relation */ cstate->rel = rel; @@ -1583,25 +1724,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1634,13 +1756,9 @@ BeginCopyFrom(ParseState *pstate, continue; /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1775,20 +1893,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1801,6 +1906,9 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + /* Invoke the end callback */ + cstate->routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index d1d43b53d83..fdb506c58be 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -62,7 +62,6 @@ #include <unistd.h> #include <sys/stat.h> -#include "commands/copy.h" #include "commands/copyfrom_internal.h" #include "commands/progress.h" #include "executor/executor.h" @@ -140,8 +139,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); +static bool CopyReadLine(CopyFromState cstate, bool is_csv); +static bool CopyReadLineText(CopyFromState cstate, bool is_csv); static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate); static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, @@ -740,9 +739,11 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) * in the relation. * * NOTE: force_not_null option are not applied to the returned fields. + * + * We use pg_attribute_always_inline to reduce function call overheads. */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +static pg_attribute_always_inline bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; bool done; @@ -759,13 +760,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) tupDesc = RelationGetDescr(cstate->rel); cstate->cur_lineno++; - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); if (cstate->opts.header_line == COPY_HEADER_MATCH) { int fldnum; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -809,7 +814,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) cstate->cur_lineno++; /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); /* * EOF at start of line means we're done. If we see EOF after some @@ -819,8 +824,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) if (done && cstate->line_buf.len == 0) return false; - /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) + /* + * Parse the line into de-escaped field values + * + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -830,6 +840,244 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } +/* + * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). + * + * We use pg_attribute_always_inline to reduce function call overheads. + */ +static pg_attribute_always_inline bool +CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls, bool is_csv) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (is_csv) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } + + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + cstate->num_errors++; + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below + * notice message, we suppress error context information other + * than the relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); + + /* reset relname_only */ + cstate->relname_only = false; + } + + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + + return true; +} + +/* Implementation of the per-row callback for text format */ +bool +CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); +} + +/* Implementation of the per-row callback for CSV format */ +bool +CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); +} + +/* Implementation of the per-row callback for binary format */ +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + return true; +} + /* * Read next tuple from file for COPY FROM. Return false if no more tuples. * @@ -847,216 +1095,22 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, { TupleDesc tupDesc; AttrNumber num_phys_attrs, - attr_count, num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); - } - - /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors - */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - - cstate->num_errors++; - - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information - * other than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; - - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": nullinput", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; - } - - return true; - } - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; - - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } + /* Get one row from source */ + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; /* * Now compute and insert any defaults available for the columns not @@ -1087,7 +1141,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, * in the final value of line_buf. */ static bool -CopyReadLine(CopyFromState cstate) +CopyReadLine(CopyFromState cstate, bool is_csv) { bool result; @@ -1095,7 +1149,7 @@ CopyReadLine(CopyFromState cstate) cstate->line_buf_valid = false; /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); + result = CopyReadLineText(cstate, is_csv); if (result) { @@ -1163,7 +1217,7 @@ CopyReadLine(CopyFromState cstate) * CopyReadLineText - inner loop of CopyReadLine for text mode */ static bool -CopyReadLineText(CopyFromState cstate) +CopyReadLineText(CopyFromState cstate, bool is_csv) { char *copy_input_buf; int input_buf_ptr; @@ -1178,7 +1232,11 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) { quotec = cstate->opts.quote[0]; escapec = cstate->opts.escape[0]; @@ -1255,7 +1313,11 @@ CopyReadLineText(CopyFromState cstate) prev_raw_ptr = input_buf_ptr; c = copy_input_buf[input_buf_ptr++]; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant + * at caller. + */ + if (is_csv) { /* * If character is '\r', we may need to look ahead below. Force @@ -1294,7 +1356,7 @@ CopyReadLineText(CopyFromState cstate) } /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\r' && (!is_csv || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || @@ -1322,10 +1384,10 @@ CopyReadLineText(CopyFromState cstate) if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); @@ -1339,10 +1401,10 @@ CopyReadLineText(CopyFromState cstate) else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ @@ -1350,15 +1412,15 @@ CopyReadLineText(CopyFromState cstate) } /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\n' && (!is_csv || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ @@ -1370,7 +1432,7 @@ CopyReadLineText(CopyFromState cstate) * Process backslash, except in CSV mode where backslash is a normal * character. */ - if (c == '\\' && !cstate->opts.csv_mode) + if (c == '\\' && !is_csv) { char c2; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 4002a7f5382..f2409013fba 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); -extern bool NextCopyFromRawFields(CopyFromState cstate, - char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); extern char *CopyLimitPrintoutLength(const char *str); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index eccc875d0e8..19aacc8ddd3 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * copyapi.h - * API for COPY TO handlers + * API for COPY TO/FROM handlers * * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group @@ -54,4 +54,50 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +/* + * API structure for a COPY FROM format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyFromRoutine +{ + /* + * Set input function information. This callback is called once at the + * beginning of COPY FROM. + * + * 'finfo' can be optionally filled to provide the catalog information of + * the input function. + * + * 'typioparam' can be optionally filled to define the OID of the type to + * pass to the input function.'atttypid' is the OID of data type used by + * the relation's attribute. + */ + void (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); + + /* + * Start a COPY FROM. This callback is called once at the beginning of + * COPY FROM. + * + * 'tupDesc' is the tuple descriptor of the relation where the data needs + * to be copied. This can be used for any initialization steps required + * by a format. + */ + void (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc); + + /* + * Read one row from the source and fill *values and *nulls. + * + * 'econtext' is used to evaluate default expression for each column that + * is either not read from the file or is using the DEFAULT option of COPY + * FROM. It is NULL if no default values are used. + * + * Returns false if there are no more tuples to read. + */ + bool (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + + /* End a COPY FROM. This callback is called once at the end of COPY FROM */ + void (*CopyFromEnd) (CopyFromState cstate); +} CopyFromRoutine; + #endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc783..1ca058c6add 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -14,7 +14,7 @@ #ifndef COPYFROM_INTERNAL_H #define COPYFROM_INTERNAL_H -#include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/trigger.h" #include "nodes/miscnodes.h" @@ -58,6 +58,9 @@ typedef enum CopyInsertMethod */ typedef struct CopyFromStateData { + /* format routine */ + const CopyFromRoutine *routine; + /* low-level state data */ CopySource copy_src; /* type of copy source */ FILE *copy_file; /* used if copy_src == COPY_FILE */ @@ -183,4 +186,12 @@ typedef struct CopyFromStateData extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +/* One-row callbacks for built-in formats defined in copyfromparse.c */ +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + #endif /* COPYFROM_INTERNAL_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8edb41cce2e..e09407c7463 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -492,6 +492,7 @@ ConvertRowtypeExpr CookedConstraint CopyDest CopyFormatOptions +CopyFromRoutine CopyFromState CopyFromStateData CopyHeaderChoice -- 2.45.2 From 6cb25f8b8f76ff40a667485b09f5886a63f6d9bd Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 25 Nov 2024 12:19:15 +0900 Subject: [PATCH v27 3/9] Add support for adding custom COPY TO format This uses the handler approach like tablesample. The approach creates an internal function that returns an internal struct. In this case, a COPY TO handler returns a CopyToRoutine. This also add a test module for custom COPY TO handler. --- src/backend/commands/copy.c | 82 ++++++++++++++++--- src/backend/commands/copyto.c | 4 +- src/backend/nodes/Makefile | 1 + src/backend/nodes/gen_node_support.pl | 2 + src/backend/utils/adt/pseudotypes.c | 1 + src/include/catalog/pg_proc.dat | 6 ++ src/include/catalog/pg_type.dat | 6 ++ src/include/commands/copy.h | 1 + src/include/commands/copyapi.h | 2 + src/include/nodes/meson.build | 1 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_copy_format/.gitignore | 4 + src/test/modules/test_copy_format/Makefile | 23 ++++++ .../expected/test_copy_format.out | 17 ++++ src/test/modules/test_copy_format/meson.build | 33 ++++++++ .../test_copy_format/sql/test_copy_format.sql | 5 ++ .../test_copy_format--1.0.sql | 8 ++ .../test_copy_format/test_copy_format.c | 63 ++++++++++++++ .../test_copy_format/test_copy_format.control | 4 + 20 files changed, 251 insertions(+), 14 deletions(-) mode change 100644 => 100755 src/backend/nodes/gen_node_support.pl create mode 100644 src/test/modules/test_copy_format/.gitignore create mode 100644 src/test/modules/test_copy_format/Makefile create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out create mode 100644 src/test/modules/test_copy_format/meson.build create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format.c create mode 100644 src/test/modules/test_copy_format/test_copy_format.control diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 2d98ecf3f4e..d4906b44751 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -32,6 +32,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -476,6 +477,73 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) return COPY_LOG_VERBOSITY_DEFAULT; /* keep compiler quiet */ } +/* + * Process the "format" option. + * + * This function checks whether the option value is a built-in format such as + * "text" and "csv" or not. If the option value isn't a built-in format, this + * function finds a COPY format handler that returns a CopyToRoutine (for + * is_from == false). If no COPY format handler is found, this function + * reports an error. + */ +static void +ProcessCopyOptionFormat(ParseState *pstate, + CopyFormatOptions *opts_out, + bool is_from, + DefElem *defel) +{ + char *format; + Oid funcargtypes[1]; + Oid handlerOid = InvalidOid; + Datum datum; + Node *routine; + + format = defGetString(defel); + + /* built-in formats */ + if (strcmp(format, "text") == 0) + /* default format */ return; + else if (strcmp(format, "csv") == 0) + { + opts_out->csv_mode = true; + return; + } + else if (strcmp(format, "binary") == 0) + { + opts_out->binary = true; + return; + } + + /* custom format */ + if (!is_from) + { + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); + } + if (!OidIsValid(handlerOid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", format), + parser_errposition(pstate, defel->location))); + + datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); + routine = (Node *) DatumGetPointer(datum); + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + + opts_out->routine = routine; +} + /* * Process the statement option list for COPY. * @@ -519,22 +587,10 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(defel->defname, "format") == 0) { - char *fmt = defGetString(defel); - if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) - opts_out->csv_mode = true; - else if (strcmp(fmt, "binary") == 0) - opts_out->binary = true; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); + ProcessCopyOptionFormat(pstate, opts_out, is_from, defel); } else if (strcmp(defel->defname, "freeze") == 0) { diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index f81dadcc12b..ce3dd252c32 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -176,7 +176,9 @@ static const CopyToRoutine CopyToRoutineBinary = { static const CopyToRoutine * CopyToGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyToRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyToRoutineCSV; else if (opts.binary) return &CopyToRoutineBinary; diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index 66bbad8e6e0..173ee11811c 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -49,6 +49,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copyapi.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl old mode 100644 new mode 100755 index 81df3bdf95f..428ab4f0d93 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ my @all_input_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ my @nodetag_only_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index e189e9b79d2..25f24ab95d2 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(internal); PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index cbbe8acd382..959d0301c20 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7771,6 +7771,12 @@ { oid => '3312', descr => 'I/O', proname => 'tsm_handler_out', prorettype => 'cstring', proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' }, +{ oid => '8753', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler', + proargtypes => 'cstring', prosrc => 'copy_handler_in' }, +{ oid => '8754', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '267', descr => 'I/O', proname => 'table_am_handler_in', proisstrict => 'f', prorettype => 'table_am_handler', proargtypes => 'cstring', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index ceff66ccde1..793dd671935 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -633,6 +633,12 @@ typcategory => 'P', typinput => 'tsm_handler_in', typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '8752', + descr => 'pseudo-type for the result of a copy to method function', + typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '269', descr => 'pseudo-type for the result of a table AM handler function', typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2409013fba..6b740d5b917 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -87,6 +87,7 @@ typedef struct CopyFormatOptions CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine (can be NULL) */ } CopyFormatOptions; /* These are private in commands/copy[from|to].c */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 19aacc8ddd3..36057b92417 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -24,6 +24,8 @@ */ typedef struct CopyToRoutine { + NodeTag type; + /* * Set output function information. This callback is called once at the * beginning of COPY TO. diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build index b665e55b657..103df1a7873 100644 --- a/src/include/nodes/meson.build +++ b/src/include/nodes/meson.build @@ -11,6 +11,7 @@ node_support_input_i = [ 'access/sdir.h', 'access/tableam.h', 'access/tsmapi.h', + 'commands/copyapi.h', 'commands/event_trigger.h', 'commands/trigger.h', 'executor/tuptable.h', diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index c0d3cf0e14b..33e3a49a4fb 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index c829b619530..75b6ab1b6a9 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -14,6 +14,7 @@ subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 00000000000..8497f91624d --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +EXTENSION = test_copy_format +DATA = test_copy_format--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 00000000000..adfe7d1572a --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,17 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format'); +ERROR: COPY format "test_copy_format" not recognized +LINE 1: COPY public.test FROM stdin WITH (FORMAT 'test_copy_format')... + ^ +COPY public.test TO stdout WITH (FORMAT 'test_copy_format'); +NOTICE: test_copy_format: is_from=false +NOTICE: CopyToOutFunc: atttypid=21 +NOTICE: CopyToOutFunc: atttypid=23 +NOTICE: CopyToOutFunc: atttypid=20 +NOTICE: CopyToStart: natts=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 00000000000..4cefe7b709a --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +test_install_data += files( + 'test_copy_format.control', + 'test_copy_format--1.0.sql', +) + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 00000000000..810b3d8cedc --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,5 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format'); +COPY public.test TO stdout WITH (FORMAT 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql b/src/test/modules/test_copy_format/test_copy_format--1.0.sql new file mode 100644 index 00000000000..d24ea03ce99 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit + +CREATE FUNCTION test_copy_format(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 00000000000..e064f40473b --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,63 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copyapi.h" +#include "commands/defrem.h" + +PG_MODULE_MAGIC; + +static void +CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid))); +} + +static void +CopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts))); +} + +static void +CopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid))); +} + +static void +CopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .type = T_CopyToRoutine, + .CopyToOutFunc = CopyToOutFunc, + .CopyToStart = CopyToStart, + .CopyToOneRow = CopyToOneRow, + .CopyToEnd = CopyToEnd, +}; + +PG_FUNCTION_INFO_V1(test_copy_format); +Datum +test_copy_format(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + ereport(NOTICE, + (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); + + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.control b/src/test/modules/test_copy_format/test_copy_format.control new file mode 100644 index 00000000000..f05a6362358 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.control @@ -0,0 +1,4 @@ +comment = 'Test code for custom COPY format' +default_version = '1.0' +module_pathname = '$libdir/test_copy_format' +relocatable = true -- 2.45.2 From d0d842e37c7ec59bf7c77df3e6518ce80ec59575 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 25 Nov 2024 13:58:33 +0900 Subject: [PATCH v27 4/9] Export CopyToStateData It's for custom COPY TO format handlers implemented as extension. This just moves codes. This doesn't change codes except CopyDest enum values. CopyDest/CopyFrom enum values such as COPY_FILE are conflicted each other. So COPY_DEST_ prefix instead of COPY_ prefix is used for CopyDest enum values. For example, COPY_FILE in CopyDest is renamed to COPY_DEST_FILE. Note that this isn't enough to implement custom COPY TO format handlers as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY TO format handler 2. Export CopySendEndOfRow() to flush buffer --- src/backend/commands/copyto.c | 77 ++++------------------------------ src/include/commands/copy.h | 2 +- src/include/commands/copyapi.h | 62 +++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 70 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index ce3dd252c32..96b5e144a1d 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -36,67 +36,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* format-specific routines */ - const CopyToRoutine *routine; - - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -406,7 +345,7 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -453,7 +392,7 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -487,11 +426,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -512,7 +451,7 @@ CopySendTextLikeEndOfRow(CopyToState cstate) { switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); @@ -520,7 +459,7 @@ CopySendTextLikeEndOfRow(CopyToState cstate) CopySendString(cstate, "\r\n"); #endif break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ CopySendChar(cstate, '\n'); break; @@ -904,12 +843,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 6b740d5b917..98aa5707102 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -90,7 +90,7 @@ typedef struct CopyFormatOptions Node *routine; /* CopyToRoutine (can be NULL) */ } CopyFormatOptions; -/* These are private in commands/copy[from|to].c */ +/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; typedef struct CopyToStateData *CopyToState; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 36057b92417..1cb2815deab 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -15,6 +15,7 @@ #define COPYAPI_H #include "commands/copy.h" +#include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" @@ -56,6 +57,67 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* format-specific routines */ + const CopyToRoutine *routine; + + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyToStateData; + /* * API structure for a COPY FROM format implementation. Note this must be * allocated in a server-lifetime manner, typically as a static const struct. -- 2.45.2 From ac79ec655da666e4f03ac865d15821f4ac883cc4 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 25 Nov 2024 14:01:18 +0900 Subject: [PATCH v27 5/9] Add support for implementing custom COPY TO format as extension * Add CopyToStateData::opaque that can be used to keep data for custom COPY TO format implementation * Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf as CopyToStateFlush() --- src/backend/commands/copyto.c | 12 ++++++++++++ src/include/commands/copyapi.h | 5 +++++ 2 files changed, 17 insertions(+) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 96b5e144a1d..cb9bfa0053f 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -442,6 +442,18 @@ CopySendEndOfRow(CopyToState cstate) resetStringInfo(fe_msgbuf); } +/* + * Export CopySendEndOfRow() for extensions. We want to keep + * CopySendEndOfRow() as a static function for + * optimization. CopySendEndOfRow() calls in this file may be optimized by a + * compiler. + */ +void +CopyToStateFlush(CopyToState cstate) +{ + CopySendEndOfRow(cstate); +} + /* * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the * the line termination and do common appropriate things for the end of row. diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 1cb2815deab..030a82aca7f 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -116,8 +116,13 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyToStateData; +extern void CopyToStateFlush(CopyToState cstate); + /* * API structure for a COPY FROM format implementation. Note this must be * allocated in a server-lifetime manner, typically as a static const struct. -- 2.45.2 From 859faaa4d19e3c659afec43267eae6b7788bf964 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 25 Nov 2024 14:11:55 +0900 Subject: [PATCH v27 6/9] Add support for adding custom COPY FROM format This uses the same handler for COPY TO and COPY FROM but uses different routine. This uses CopyToRoutine for COPY TO and CopyFromRoutine for COPY FROM. PostgreSQL calls a COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM and false for COPY TO: copy_handler(true) returns CopyToRoutine copy_handler(false) returns CopyFromRoutine This also add a test module for custom COPY FROM handler. --- src/backend/commands/copy.c | 52 ++++++++++++------- src/backend/commands/copyfrom.c | 4 +- src/include/catalog/pg_type.dat | 2 +- src/include/commands/copy.h | 3 +- src/include/commands/copyapi.h | 2 + .../expected/test_copy_format.out | 10 ++-- .../test_copy_format/sql/test_copy_format.sql | 1 + .../test_copy_format/test_copy_format.c | 39 +++++++++++++- 8 files changed, 87 insertions(+), 26 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index d4906b44751..5be649c9c89 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -483,8 +483,8 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) * This function checks whether the option value is a built-in format such as * "text" and "csv" or not. If the option value isn't a built-in format, this * function finds a COPY format handler that returns a CopyToRoutine (for - * is_from == false). If no COPY format handler is found, this function - * reports an error. + * is_from == false) or CopyFromRountine (for is_from == true). If no COPY + * format handler is found, this function reports an error. */ static void ProcessCopyOptionFormat(ParseState *pstate, @@ -515,12 +515,9 @@ ProcessCopyOptionFormat(ParseState *pstate, } /* custom format */ - if (!is_from) - { - funcargtypes[0] = INTERNALOID; - handlerOid = LookupFuncName(list_make1(makeString(format)), 1, - funcargtypes, true); - } + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); if (!OidIsValid(handlerOid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -529,17 +526,34 @@ ProcessCopyOptionFormat(ParseState *pstate, datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); routine = (Node *) DatumGetPointer(datum); - if (routine == NULL || !IsA(routine, CopyToRoutine)) - ereport( - ERROR, - (errcode( - ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY handler function " - "%s(%u) did not return a " - "CopyToRoutine struct", - format, handlerOid), - parser_errposition( - pstate, defel->location))); + if (is_from) + { + if (routine == NULL || !IsA(routine, CopyFromRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyFromRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } + else + { + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } opts_out->routine = routine; } diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index c84081c3ba3..a4cdab75879 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -155,7 +155,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = { static const CopyFromRoutine * CopyFromGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyFromRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyFromRoutineCSV; else if (opts.binary) return &CopyFromRoutineBinary; diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index 793dd671935..37ebfa0908f 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -634,7 +634,7 @@ typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, { oid => '8752', - descr => 'pseudo-type for the result of a copy to method function', + descr => 'pseudo-type for the result of a copy to/from method function', typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', typcategory => 'P', typinput => 'copy_handler_in', typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 98aa5707102..e07988a0c74 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -87,7 +87,8 @@ typedef struct CopyFormatOptions CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ - Node *routine; /* CopyToRoutine (can be NULL) */ + Node *routine; /* CopyToRoutine or CopyFromRoutine (can be + * NULL) */ } CopyFormatOptions; /* This is private in commands/copyfrom.c */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 030a82aca7f..fa3d8d87760 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -129,6 +129,8 @@ extern void CopyToStateFlush(CopyToState cstate); */ typedef struct CopyFromRoutine { + NodeTag type; + /* * Set input function information. This callback is called once at the * beginning of COPY FROM. diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out index adfe7d1572a..016893e7026 100644 --- a/src/test/modules/test_copy_format/expected/test_copy_format.out +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -2,9 +2,13 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); COPY public.test FROM stdin WITH (FORMAT 'test_copy_format'); -ERROR: COPY format "test_copy_format" not recognized -LINE 1: COPY public.test FROM stdin WITH (FORMAT 'test_copy_format')... - ^ +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd COPY public.test TO stdout WITH (FORMAT 'test_copy_format'); NOTICE: test_copy_format: is_from=false NOTICE: CopyToOutFunc: atttypid=21 diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql index 810b3d8cedc..0dfdfa00080 100644 --- a/src/test/modules/test_copy_format/sql/test_copy_format.sql +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -2,4 +2,5 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); COPY public.test FROM stdin WITH (FORMAT 'test_copy_format'); +\. COPY public.test TO stdout WITH (FORMAT 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c index e064f40473b..f6b105659ab 100644 --- a/src/test/modules/test_copy_format/test_copy_format.c +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -18,6 +18,40 @@ PG_MODULE_MAGIC; +static void +CopyFromInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid))); +} + +static void +CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts))); +} + +static bool +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +CopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .type = T_CopyFromRoutine, + .CopyFromInFunc = CopyFromInFunc, + .CopyFromStart = CopyFromStart, + .CopyFromOneRow = CopyFromOneRow, + .CopyFromEnd = CopyFromEnd, +}; + static void CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) { @@ -59,5 +93,8 @@ test_copy_format(PG_FUNCTION_ARGS) ereport(NOTICE, (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); - PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); + if (is_from) + PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat); + else + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); } -- 2.45.2 From 81cee5244c5f7bdd52745cea09c98991d47207f5 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 25 Nov 2024 14:19:34 +0900 Subject: [PATCH v27 7/9] Export CopyFromStateData It's for custom COPY FROM format handlers implemented as extension. This just moves codes. This doesn't change codes except CopySource enum values. This changes COPY_ prefix of CopySource enum values to COPY_SOURCE_ prefix like the CopyDest enum values prefix change. For example, COPY_FILE in CopySource is renamed to COPY_SOURCE_FILE. Note that this isn't enough to implement custom COPY FROM format handlers as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY FROM format handler 2. Export CopyReadBinaryData() to read the next data --- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyfromparse.c | 10 +- src/include/commands/copy.h | 1 - src/include/commands/copyapi.h | 166 +++++++++++++++++++++++ src/include/commands/copyfrom_internal.h | 166 ----------------------- 5 files changed, 173 insertions(+), 174 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index a4cdab75879..e1fef1b95a5 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1704,7 +1704,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1832,7 +1832,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index fdb506c58be..1c68b0d2952 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -170,7 +170,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -238,7 +238,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -247,7 +247,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -330,7 +330,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1158,7 +1158,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index e07988a0c74..50af4b99258 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -91,7 +91,6 @@ typedef struct CopyFormatOptions * NULL) */ } CopyFormatOptions; -/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; typedef struct CopyToStateData *CopyToState; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index fa3d8d87760..335584f8877 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -15,6 +15,7 @@ #define COPYAPI_H #include "commands/copy.h" +#include "commands/trigger.h" #include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" @@ -171,4 +172,169 @@ typedef struct CopyFromRoutine void (*CopyFromEnd) (CopyFromState cstate); } CopyFromRoutine; +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +/* + * Represents the insert method to be used during COPY FROM. + */ +typedef enum CopyInsertMethod +{ + CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ + CIM_MULTI, /* always use table_multi_insert or + * ExecForeignBatchInsert */ + CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + * ExecForeignBatchInsert only if valid */ +} CopyInsertMethod; + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* format routine */ + const CopyFromRoutine *routine; + + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyFromStateData; + #endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 1ca058c6add..23760eb0e02 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -15,174 +15,8 @@ #define COPYFROM_INTERNAL_H #include "commands/copyapi.h" -#include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - -/* - * Represents the insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or - * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or - * ExecForeignBatchInsert only if valid */ -} CopyInsertMethod; - -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* format routine */ - const CopyFromRoutine *routine; - - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); -- 2.45.2 From 1500fa121c6d3028db4673ab595d2ba79c55cbf7 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 25 Nov 2024 14:21:39 +0900 Subject: [PATCH v27 8/9] Add support for implementing custom COPY FROM format as extension * Add CopyFromStateData::opaque that can be used to keep data for custom COPY From format implementation * Export CopyReadBinaryData() to read the next data as CopyFromStateRead() --- src/backend/commands/copyfromparse.c | 12 ++++++++++++ src/include/commands/copyapi.h | 5 +++++ 2 files changed, 17 insertions(+) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 1c68b0d2952..0a7e7255b7d 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -729,6 +729,18 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) return copied_bytes; } +/* + * Export CopyReadBinaryData() for extensions. We want to keep + * CopyReadBinaryData() as a static function for + * optimization. CopyReadBinaryData() calls in this file may be optimized by + * a compiler. + */ +int +CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes) +{ + return CopyReadBinaryData(cstate, dest, nbytes); +} + /* * Read raw fields in the next line for COPY FROM in text or csv mode. * Return false if no more lines. diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 335584f8877..caba308533d 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -335,6 +335,11 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyFromStateData; +extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); + #endif /* COPYAPI_H */ -- 2.45.2 From 407dc990b071b7938606b2f8082c8e9a09652d6b Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Wed, 27 Nov 2024 16:23:55 +0900 Subject: [PATCH v27 9/9] Add CopyFromSkipErrorRow() for custom COPY format extension Extensions must call CopyFromSkipErrorRow() when CopyFromOneRow callback reports an error by errsave(). CopyFromSkipErrorRow() handles "ON_ERROR stop" and "LOG_VERBOSITY verbose" cases. --- src/backend/commands/copyfromparse.c | 82 +++++++++++-------- src/include/commands/copyapi.h | 2 + .../expected/test_copy_format.out | 47 +++++++++++ .../test_copy_format/sql/test_copy_format.sql | 24 ++++++ .../test_copy_format/test_copy_format.c | 82 ++++++++++++++++++- 5 files changed, 199 insertions(+), 38 deletions(-) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 0a7e7255b7d..713a000fe5f 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -852,6 +852,51 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool i return true; } +/* + * Call this when you report an error by errsave() in your CopyFromOneRow + * callback. This handles "ON_ERROR stop" and "LOG_VERBOSITY verbose" cases + * for you. + */ +void +CopyFromSkipErrorRow(CopyFromState cstate) +{ + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + cstate->num_errors++; + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below notice + * message, we suppress error context information other than the + * relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); + + /* reset relname_only */ + cstate->relname_only = false; + } +} + /* * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). * @@ -960,42 +1005,7 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext, (Node *) cstate->escontext, &values[m])) { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - - cstate->num_errors++; - - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information other - * than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; - - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; - } - + CopyFromSkipErrorRow(cstate); return true; } diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index caba308533d..09585163e0d 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -342,4 +342,6 @@ typedef struct CopyFromStateData extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); +extern void CopyFromSkipErrorRow(CopyFromState cstate); + #endif /* COPYAPI_H */ diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out index 016893e7026..b9a6baa85c0 100644 --- a/src/test/modules/test_copy_format/expected/test_copy_format.out +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -1,6 +1,8 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +-- 987 is accepted. +-- 654 is a hard error because ON_ERROR is stop by default. COPY public.test FROM stdin WITH (FORMAT 'test_copy_format'); NOTICE: test_copy_format: is_from=true NOTICE: CopyFromInFunc: atttypid=21 @@ -8,7 +10,50 @@ NOTICE: CopyFromInFunc: atttypid=23 NOTICE: CopyFromInFunc: atttypid=20 NOTICE: CopyFromStart: natts=3 NOTICE: CopyFromOneRow +NOTICE: CopyFromOneRow +ERROR: invalid value: "6" +CONTEXT: COPY test, line 2, column a: "6" +-- 987 is accepted. +-- 654 is a soft error because ON_ERROR is ignore. +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore); +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromOneRow +NOTICE: CopyFromOneRow +NOTICE: 1 row was skipped due to data type incompatibility NOTICE: CopyFromEnd +-- 987 is accepted. +-- 654 is a soft error because ON_ERROR is ignore. +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore, LOG_VERBOSITY verbose); +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromOneRow +NOTICE: skipping row due to data type incompatibility at line 2 for column "a": "6" +NOTICE: CopyFromOneRow +NOTICE: 1 row was skipped due to data type incompatibility +NOTICE: CopyFromEnd +-- 987 is accepted. +-- 654 is a soft error because ON_ERROR is ignore. +-- 321 is a hard error. +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore); +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromOneRow +NOTICE: CopyFromOneRow +ERROR: too much lines: 3 +CONTEXT: COPY test, line 3 COPY public.test TO stdout WITH (FORMAT 'test_copy_format'); NOTICE: test_copy_format: is_from=false NOTICE: CopyToOutFunc: atttypid=21 @@ -18,4 +63,6 @@ NOTICE: CopyToStart: natts=3 NOTICE: CopyToOneRow: tts_nvalid=3 NOTICE: CopyToOneRow: tts_nvalid=3 NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql index 0dfdfa00080..86db71bce7f 100644 --- a/src/test/modules/test_copy_format/sql/test_copy_format.sql +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -1,6 +1,30 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +-- 987 is accepted. +-- 654 is a hard error because ON_ERROR is stop by default. COPY public.test FROM stdin WITH (FORMAT 'test_copy_format'); +987 +654 +\. +-- 987 is accepted. +-- 654 is a soft error because ON_ERROR is ignore. +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore); +987 +654 +\. +-- 987 is accepted. +-- 654 is a soft error because ON_ERROR is ignore. +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore, LOG_VERBOSITY verbose); +987 +654 +\. +-- 987 is accepted. +-- 654 is a soft error because ON_ERROR is ignore. +-- 321 is a hard error. +COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore); +987 +654 +321 \. COPY public.test TO stdout WITH (FORMAT 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c index f6b105659ab..f0f53838aef 100644 --- a/src/test/modules/test_copy_format/test_copy_format.c +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -32,10 +32,88 @@ CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) } static bool -CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) { + int n_attributes = list_length(cstate->attnumlist); + char *line; + int line_size = n_attributes + 1; /* +1 is for new line */ + int read_bytes; + ereport(NOTICE, (errmsg("CopyFromOneRow"))); - return false; + + cstate->cur_lineno++; + line = palloc(line_size); + read_bytes = CopyFromStateRead(cstate, line, line_size); + if (read_bytes == 0) + return false; + if (read_bytes != line_size) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("one line must be %d bytes: %d", + line_size, read_bytes))); + + if (cstate->cur_lineno == 1) + { + /* Success */ + TupleDesc tupDesc = RelationGetDescr(cstate->rel); + ListCell *cur; + int i = 0; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (att->atttypid == INT2OID) + { + values[i] = Int16GetDatum(line[i] - '0'); + } + else if (att->atttypid == INT4OID) + { + values[i] = Int32GetDatum(line[i] - '0'); + } + else if (att->atttypid == INT8OID) + { + values[i] = Int64GetDatum(line[i] - '0'); + } + nulls[i] = false; + i++; + } + } + else if (cstate->cur_lineno == 2) + { + /* Soft error */ + TupleDesc tupDesc = RelationGetDescr(cstate->rel); + int attnum = lfirst_int(list_head(cstate->attnumlist)); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + char value[2]; + + cstate->cur_attname = NameStr(att->attname); + value[0] = line[0]; + value[1] = '\0'; + cstate->cur_attval = value; + errsave((Node *) cstate->escontext, + ( + errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid value: \"%c\"", line[0]))); + CopyFromSkipErrorRow(cstate); + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + return true; + } + else + { + /* Hard error */ + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("too much lines: %llu", + (unsigned long long) cstate->cur_lineno))); + } + + return true; } static void -- 2.45.2
pgsql-hackers by date: