Re: Make COPY format extendable: Extract COPY TO format implementations - Mailing list pgsql-hackers
From | Sutou Kouhei |
---|---|
Subject | Re: Make COPY format extendable: Extract COPY TO format implementations |
Date | |
Msg-id | 20240929.005645.1104697064625237361.kou@clear-code.com Whole thread Raw |
In response to | Re: Make COPY format extendable: Extract COPY TO format implementations (Masahiko Sawada <sawada.mshk@gmail.com>) |
List | pgsql-hackers |
Hi, In <CAD21AoCwMmwLJ8PQLnZu0MbB4gDJiMvWrHREQD4xRp3-F2RU2Q@mail.gmail.com> "Re: Make COPY format extendable: Extract COPY TO format implementations" on Fri, 27 Sep 2024 16:33:13 -0700, Masahiko Sawada <sawada.mshk@gmail.com> wrote: >> * 0005 (that add "void *opaque" to Copy{From,To}StateData) >> has a bit negative impact for FROM and a bit positive >> impact for TO >> * But I don't know why. This doesn't change per row >> related codes. Increasing Copy{From,To}StateData size >> ("void *opaque" is added) may be related. > > I was surprised that the 0005 patch made COPY FROM slower (with fewer > rows) and COPY TO faster overall in spite of just adding one struct > field and some functions. Me too... > I'm interested in why the performance trends of COPY FROM are > different between fewer than 6M rows and more than 6M rows. My hypothesis: With this patch set: 1. One row processing is faster than master. 2. Non row related processing is slower than master. If we have many rows, 1. impact is greater than 2. impact. > Separating the patches into two parts (one is for COPY TO and another > one is for COPY FROM) could be a good idea. It would help reviews and > investigate performance regression in COPY FROM cases. And I think we > can commit them separately. > > Also, could you please rebase the patches as they conflict with the > current HEAD? OK. I've prepared 2 patch sets: v20: It just rebased on master. It still mixes COPY TO and COPY FROM implementations. v21: It's based on v20 but splits COPY TO implementations and COPY FROM implementations. 0001-0005 includes only COPY TO related changes. 0006-0010 includes only COPY FROM related changes. (v21 0001 + 0006) == (v20 v0001), (v21 0002 + 0007) == (v20 v0002) and so on. > I'll run some benchmarks on my environment as well. Thanks. It's very helpful. Thanks, -- kou From 51779387b107e80ba598fe26f4ecec71c54f916b Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 4 Mar 2024 13:52:34 +0900 Subject: [PATCH v20 1/5] Add CopyToRoutine/CopyFromRountine They are for implementing custom COPY TO/FROM format. But this is not enough to implement custom COPY TO/FROM format yet. We'll export some APIs to receive/send data and add "format" option to COPY TO/FROM later. Existing text/csv/binary format implementations don't use CopyToRoutine/CopyFromRoutine for now. We have a patch for it but we defer it. Because there are some mysterious profile results in spite of we get faster runtimes. See [1] for details. [1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz Note that this doesn't change existing text/csv/binary format implementations. --- src/backend/commands/copyfrom.c | 24 +++++- src/backend/commands/copyfromparse.c | 5 ++ src/backend/commands/copyto.c | 31 ++++++- src/include/commands/copyapi.h | 101 +++++++++++++++++++++++ src/include/commands/copyfrom_internal.h | 4 + src/tools/pgindent/typedefs.list | 2 + 6 files changed, 159 insertions(+), 8 deletions(-) create mode 100644 src/include/commands/copyapi.h diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 2d3462913e1..c42485bd9cb 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1632,12 +1632,22 @@ BeginCopyFrom(ParseState *pstate, /* Fetch the input function and typioparam info */ if (cstate->opts.binary) + { getTypeBinaryInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); + else + { getTypeInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1777,10 +1787,13 @@ BeginCopyFrom(ParseState *pstate, /* Read and verify binary header */ ReceiveCopyBinaryHeader(cstate); } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) + else if (cstate->routine) { + cstate->routine->CopyFromStart(cstate, tupDesc); + } + else + { + /* create workspace for CopyReadAttributes results */ AttrNumber attr_count = list_length(cstate->attnumlist); cstate->max_fields = attr_count; @@ -1798,6 +1811,9 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + if (cstate->routine) + cstate->routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 97a4c387a30..2e126448019 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -1012,6 +1012,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Assert(fieldno == attr_count); } + else if (cstate->routine) + { + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; + } else { /* binary */ diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 91de442f434..3c5a97679aa 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -20,6 +20,7 @@ #include "access/tableam.h" #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -64,6 +65,9 @@ typedef enum CopyDest */ typedef struct CopyToStateData { + /* format routine */ + const CopyToRoutine *routine; + /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ @@ -772,14 +776,22 @@ DoCopyTo(CopyToState cstate) Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (cstate->opts.binary) + { getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); else + { getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } } /* @@ -806,6 +818,8 @@ DoCopyTo(CopyToState cstate) tmp = 0; CopySendInt32(cstate, tmp); } + else if (cstate->routine) + cstate->routine->CopyToStart(cstate, tupDesc); else { /* @@ -887,6 +901,8 @@ DoCopyTo(CopyToState cstate) /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } + else if (cstate->routine) + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -908,15 +924,22 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + if (cstate->routine) + { + cstate->routine->CopyToOneRow(cstate, slot); + MemoryContextSwitchTo(oldcontext); + return; + } + if (cstate->opts.binary) { /* Binary per-tuple header */ CopySendInt16(cstate, list_length(cstate->attnumlist)); } - /* Make sure the tuple is fully deconstructed */ - slot_getallattrs(slot); - if (!cstate->opts.binary) { bool need_delim = false; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h new file mode 100644 index 00000000000..d1289424c67 --- /dev/null +++ b/src/include/commands/copyapi.h @@ -0,0 +1,101 @@ +/*------------------------------------------------------------------------- + * + * copyapi.h + * API for COPY TO/FROM handlers + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyapi.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYAPI_H +#define COPYAPI_H + +#include "executor/tuptable.h" +#include "nodes/execnodes.h" + +/* These are private in commands/copy[from|to].c */ +typedef struct CopyFromStateData *CopyFromState; +typedef struct CopyToStateData *CopyToState; + +/* + * API structure for a COPY FROM format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyFromRoutine +{ + /* + * Called when COPY FROM is started to set up the input functions + * associated with the relation's attributes writing to. `finfo` can be + * optionally filled to provide the catalog information of the input + * function. `typioparam` can be optionally filled to define the OID of + * the type to pass to the input function. `atttypid` is the OID of data + * type used by the relation's attribute. + */ + void (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); + + /* + * Called when COPY FROM is started. + * + * `tupDesc` is the tuple descriptor of the relation where the data needs + * to be copied. This can be used for any initialization steps required + * by a format. + */ + void (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc); + + /* + * Copy one row to a set of `values` and `nulls` of size tupDesc->natts. + * + * 'econtext' is used to evaluate default expression for each column that + * is either not read from the file or is using the DEFAULT option of COPY + * FROM. It is NULL if no default values are used. + * + * Returns false if there are no more tuples to copy. + */ + bool (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + + /* Called when COPY FROM has ended. */ + void (*CopyFromEnd) (CopyFromState cstate); +} CopyFromRoutine; + +/* + * API structure for a COPY TO format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyToRoutine +{ + /* + * Called when COPY TO is started to set up the output functions + * associated with the relation's attributes reading from. `finfo` can be + * optionally filled to provide the catalog information of the output + * function. `atttypid` is the OID of data type used by the relation's + * attribute. + */ + void (*CopyToOutFunc) (CopyToState cstate, Oid atttypid, + FmgrInfo *finfo); + + /* + * Called when COPY TO is started. + * + * `tupDesc` is the tuple descriptor of the relation from where the data + * is read. + */ + void (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc); + + /* + * Copy one row for COPY TO. + * + * `slot` is the tuple slot where the data is emitted. + */ + void (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO has ended */ + void (*CopyToEnd) (CopyToState cstate); +} CopyToRoutine; + +#endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc783..509b9e92a18 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -15,6 +15,7 @@ #define COPYFROM_INTERNAL_H #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/trigger.h" #include "nodes/miscnodes.h" @@ -58,6 +59,9 @@ typedef enum CopyInsertMethod */ typedef struct CopyFromStateData { + /* format routine */ + const CopyFromRoutine *routine; + /* low-level state data */ CopySource copy_src; /* type of copy source */ FILE *copy_file; /* used if copy_src == COPY_FILE */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5fabb127d7e..4c4bf60d9e5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -492,6 +492,7 @@ ConvertRowtypeExpr CookedConstraint CopyDest CopyFormatOptions +CopyFromRoutine CopyFromState CopyFromStateData CopyHeaderChoice @@ -503,6 +504,7 @@ CopyMultiInsertInfo CopyOnErrorChoice CopySource CopyStmt +CopyToRoutine CopyToState CopyToStateData Cost -- 2.45.2 From daca92389cfea80c22d95ae64b9bc1c11751eb52 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jul 2024 16:44:44 +0900 Subject: [PATCH v20 2/5] Use CopyToRoutine/CopyFromRountine for the existing formats The existing formats are text, csv and binary. If we find any performance regression by this, we will not merge this to master. This will increase indirect function call costs but this will reduce runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)" branch costs. This uses an optimization based of static inline function and a constant argument call for cstate->opts.csv_mode. For example, CopyFromTextLikeOneRow() uses this optimization. It accepts the "bool is_csv" argument instead of using cstate->opts.csv_mode in it. CopyFromTextOneRow() calls CopyFromTextLikeOneRow() with false (constant) for "bool is_csv". Compiler will remove "if (is_csv)" branch in it by this optimization. This doesn't change existing logic. This just moves existing codes. --- src/backend/commands/copyfrom.c | 215 ++++++--- src/backend/commands/copyfromparse.c | 550 +++++++++++++---------- src/backend/commands/copyto.c | 477 +++++++++++++------- src/include/commands/copy.h | 2 - src/include/commands/copyfrom_internal.h | 8 + 5 files changed, 806 insertions(+), 446 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index c42485bd9cb..14f95f17124 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -106,6 +106,157 @@ typedef struct CopyMultiInsertInfo /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); + +/* + * CopyFromRoutine implementations for text and CSV. + */ + +/* + * CopyFromTextLikeInFunc + * + * Assign input function data for a relation's attribute in text/CSV format. + */ +static void +CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromTextLikeStart + * + * Start of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Create workspace for CopyReadAttributes results; used by CSV and text + * format. + */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +/* + * CopyFromTextLikeEnd + * + * End of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * CopyFromRoutine implementation for "binary". + */ + +/* + * CopyFromBinaryInFunc + * + * Assign input function data for a relation's attribute in binary format. + */ +static void +CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromBinaryStart + * + * Start of COPY FROM for binary format. + */ +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +/* + * CopyFromBinaryEnd + * + * End of COPY FROM for binary format. + */ +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * Routines assigned to each format. ++ + * CSV and text share the same implementation, at the exception of the + * per-row callback. + */ +static const CopyFromRoutine CopyFromRoutineText = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromCSVOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromInFunc = CopyFromBinaryInFunc, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + +/* + * Define the COPY FROM routines to use for a format. + */ +static const CopyFromRoutine * +CopyFromGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyFromRoutineCSV; + else if (opts.binary) + return &CopyFromRoutineBinary; + + /* default is text */ + return &CopyFromRoutineText; +} + + /* * error context callback for COPY FROM * @@ -1393,7 +1544,6 @@ BeginCopyFrom(ParseState *pstate, num_defaults; FmgrInfo *in_functions; Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1425,6 +1575,9 @@ BeginCopyFrom(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyFromGetRoutine(cstate->opts); + /* Process the target relation */ cstate->rel = rel; @@ -1580,25 +1733,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1631,23 +1765,9 @@ BeginCopyFrom(ParseState *pstate, continue; /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - { - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyFromInFunc(cstate, att->atttypid, - &in_functions[attnum - 1], - &typioparams[attnum - 1]); - - else - { - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1782,23 +1902,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - else if (cstate->routine) - { - cstate->routine->CopyFromStart(cstate, tupDesc); - } - else - { - /* create workspace for CopyReadAttributes results */ - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1811,8 +1915,7 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { - if (cstate->routine) - cstate->routine->CopyFromEnd(cstate); + cstate->routine->CopyFromEnd(cstate); /* No COPY FROM related resources except memory. */ if (cstate->is_program) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 2e126448019..5f63b683d17 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -149,8 +149,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); +static bool CopyReadLine(CopyFromState cstate, bool is_csv); +static inline bool CopyReadLineText(CopyFromState cstate, bool is_csv); static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate); static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, @@ -750,8 +750,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) * * NOTE: force_not_null option are not applied to the returned fields. */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +static inline bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; bool done; @@ -768,13 +768,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) tupDesc = RelationGetDescr(cstate->rel); cstate->cur_lineno++; - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); if (cstate->opts.header_line == COPY_HEADER_MATCH) { int fldnum; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -818,7 +822,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) cstate->cur_lineno++; /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); /* * EOF at start of line means we're done. If we see EOF after some @@ -828,8 +832,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) if (done && cstate->line_buf.len == 0) return false; - /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) + /* + * Parse the line into de-escaped field values + * + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -839,6 +848,267 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } +/* + * CopyFromTextLikeOneRow + * + * Copy one row to a set of `values` and `nulls` for the text and CSV + * formats. + * + * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). + */ +static inline bool +CopyFromTextLikeOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls, + bool is_csv) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (is_csv) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } + + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + cstate->num_errors++; + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below + * notice message, we suppress error context information other + * than the relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); + + /* reset relname_only */ + cstate->relname_only = false; + } + + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + + return true; +} + + +/* + * CopyFromTextOneRow + * + * Per-row callback for COPY FROM with text format. + */ +bool +CopyFromTextOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); +} + +/* + * CopyFromCSVOneRow + * + * Per-row callback for COPY FROM with CSV format. + */ +bool +CopyFromCSVOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); +} + +/* + * CopyFromBinaryOneRow + * + * Copy one row to a set of `values` and `nulls` for the binary format. + */ +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + return true; +} + /* * Read next tuple from file for COPY FROM. Return false if no more tuples. * @@ -856,221 +1126,21 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, { TupleDesc tupDesc; AttrNumber num_phys_attrs, - attr_count, num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); - } - - /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors - */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - - cstate->num_errors++; - - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information - * other than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; - - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": nullinput", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; - } - - return true; - } - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else if (cstate->routine) - { - if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) - return false; - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; - - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; /* * Now compute and insert any defaults available for the columns not @@ -1101,7 +1171,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, * in the final value of line_buf. */ static bool -CopyReadLine(CopyFromState cstate) +CopyReadLine(CopyFromState cstate, bool is_csv) { bool result; @@ -1109,7 +1179,7 @@ CopyReadLine(CopyFromState cstate) cstate->line_buf_valid = false; /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); + result = CopyReadLineText(cstate, is_csv); if (result) { @@ -1176,8 +1246,8 @@ CopyReadLine(CopyFromState cstate) /* * CopyReadLineText - inner loop of CopyReadLine for text mode */ -static bool -CopyReadLineText(CopyFromState cstate) +static inline bool +CopyReadLineText(CopyFromState cstate, bool is_csv) { char *copy_input_buf; int input_buf_ptr; @@ -1193,7 +1263,11 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) { quotec = cstate->opts.quote[0]; escapec = cstate->opts.escape[0]; @@ -1270,7 +1344,11 @@ CopyReadLineText(CopyFromState cstate) prev_raw_ptr = input_buf_ptr; c = copy_input_buf[input_buf_ptr++]; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant + * at caller. + */ + if (is_csv) { /* * If character is '\\' or '\r', we may need to look ahead below. @@ -1309,7 +1387,7 @@ CopyReadLineText(CopyFromState cstate) } /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\r' && (!is_csv || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || @@ -1337,10 +1415,10 @@ CopyReadLineText(CopyFromState cstate) if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); @@ -1354,10 +1432,10 @@ CopyReadLineText(CopyFromState cstate) else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ @@ -1365,15 +1443,15 @@ CopyReadLineText(CopyFromState cstate) } /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\n' && (!is_csv || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ @@ -1385,7 +1463,7 @@ CopyReadLineText(CopyFromState cstate) * In CSV mode, we only recognize \. alone on a line. This is because * \. is a valid CSV data value. */ - if (c == '\\' && (!cstate->opts.csv_mode || first_char_in_line)) + if (c == '\\' && (!is_csv || first_char_in_line)) { char c2; @@ -1418,7 +1496,11 @@ CopyReadLineText(CopyFromState cstate) if (c2 == '\n') { - if (!cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as + * argument is constant at caller. + */ + if (!is_csv) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker does not match previous newline style"))); @@ -1427,7 +1509,11 @@ CopyReadLineText(CopyFromState cstate) } else if (c2 != '\r') { - if (!cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as + * argument is constant at caller. + */ + if (!is_csv) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); @@ -1443,7 +1529,11 @@ CopyReadLineText(CopyFromState cstate) if (c2 != '\r' && c2 != '\n') { - if (!cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument + * is constant at caller. + */ + if (!is_csv) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); @@ -1472,7 +1562,7 @@ CopyReadLineText(CopyFromState cstate) result = true; /* report EOF */ break; } - else if (!cstate->opts.csv_mode) + else if (!is_csv) { /* * If we are here, it means we found a backslash followed by diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 3c5a97679aa..86dc1b742cc 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -128,6 +128,317 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToRoutine implementations. + */ + +/* + * CopyToTextLikeSendEndOfRow + * + * Apply line terminations for a line sent in text or CSV format depending + * on the destination, then send the end of a row. + */ +static inline void +CopyToTextLikeSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + + /* Now take the actions related to the end of a row */ + CopySendEndOfRow(cstate); +} + +/* + * CopyToTextLikeStart + * + * Start of COPY TO for text and CSV format. + */ +static void +CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* + * For non-binary copy, we need to convert null_print to file encoding, + * because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + ListCell *cur; + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToTextLikeSendEndOfRow(cstate); + } +} + +/* + * CopyToTextLikeOutFunc + * + * Assign output function data for a relation's attribute in text/CSV format. + */ +static void +CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + + +/* + * CopyToTextLikeOneRow + * + * Process one row for text/CSV format. + * + * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow(). + */ +static inline void +CopyToTextLikeOneRow(CopyToState cstate, + TupleTableSlot *slot, + bool is_csv) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], + value); + + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1]); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToTextLikeSendEndOfRow(cstate); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with text format. + */ +static void +CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, false); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with CSV format. + */ +static void +CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, true); +} + +/* + * CopyToTextLikeEnd + * + * End of COPY TO for text/CSV format. + */ +static void +CopyToTextLikeEnd(CopyToState cstate) +{ + /* Nothing to do here */ +} + +/* + * CopyToRoutine implementation for "binary". + */ + +/* + * CopyToBinaryStart + * + * Start of COPY TO for binary format. + */ +static void +CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); +} + +/* + * CopyToBinaryOutFunc + * + * Assign output function data for a relation's attribute in binary format. + */ +static void +CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + +/* + * CopyToBinaryOneRow + * + * Process one row for binary format. + */ +static void +CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], + value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +/* + * CopyToBinaryEnd + * + * End of COPY TO for binary format. + */ +static void +CopyToBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} + +/* + * CSV and text share the same implementation, at the exception of the + * output representation and per-row callbacks. + */ +static const CopyToRoutine CopyToRoutineText = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineCSV = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToCSVOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineBinary = { + .CopyToStart = CopyToBinaryStart, + .CopyToOutFunc = CopyToBinaryOutFunc, + .CopyToOneRow = CopyToBinaryOneRow, + .CopyToEnd = CopyToBinaryEnd, +}; + +/* + * Define the COPY TO routines to use for a format. This should be called + * after options are parsed. + */ +static const CopyToRoutine * +CopyToGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyToRoutineCSV; + else if (opts.binary) + return &CopyToRoutineBinary; + + /* default is text */ + return &CopyToRoutineText; +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -195,16 +506,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -239,10 +540,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -430,6 +727,9 @@ BeginCopyTo(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyToGetRoutine(cstate->opts); + /* Process the source/target relation or query */ if (rel) { @@ -771,27 +1071,10 @@ DoCopyTo(CopyToState cstate) foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - if (cstate->opts.binary) - { - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyToOutFunc(cstate, attr->atttypid, - &cstate->out_functions[attnum - 1]); - else - { - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); } /* @@ -804,58 +1087,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else if (cstate->routine) - cstate->routine->CopyToStart(cstate, tupDesc); - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->routine->CopyToStart(cstate, tupDesc); if (cstate->rel) { @@ -894,15 +1126,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - else if (cstate->routine) - cstate->routine->CopyToEnd(cstate); + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -918,7 +1142,6 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; MemoryContextReset(cstate->rowcontext); @@ -927,69 +1150,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - if (cstate->routine) - { - cstate->routine->CopyToOneRow(cstate, slot); - MemoryContextSwitchTo(oldcontext); - return; - } - - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - - if (!cstate->opts.binary) - { - bool need_delim = false; - - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - char *string; - - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - - if (isnull) - CopySendString(cstate, cstate->opts.null_print_client); - else - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1]); - else - CopyAttributeOutText(cstate, string); - } - } - } - else - { - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - bytea *outputbytes; - - if (isnull) - CopySendInt32(cstate, -1); - else - { - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->routine->CopyToOneRow(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 141fd48dc10..ccfbdf0ee01 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -104,8 +104,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); -extern bool NextCopyFromRawFields(CopyFromState cstate, - char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); extern char *CopyLimitPrintoutLength(const char *str); diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 509b9e92a18..c11b5ff3cc0 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -187,4 +187,12 @@ typedef struct CopyFromStateData extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +/* Callbacks for CopyFromRoutine->CopyFromOneRow */ +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + #endif /* COPYFROM_INTERNAL_H */ -- 2.45.2 From 5ba7136a527aca0861a74d0a7e1287ceef74f222 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jul 2024 17:39:41 +0900 Subject: [PATCH v20 3/5] Add support for adding custom COPY TO/FROM format This uses the handler approach like tablesample. The approach creates an internal function that returns an internal struct. In this case, a COPY TO handler returns a CopyToRoutine and a COPY FROM handler returns a CopyFromRoutine. This uses the same handler for COPY TO and COPY FROM. PostgreSQL calls a COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM and false for COPY TO: copy_handler(true) returns CopyToRoutine copy_handler(false) returns CopyFromRoutine This also add a test module for custom COPY TO/FROM handler. --- src/backend/commands/copy.c | 96 ++++++++++++++--- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyto.c | 4 +- src/backend/nodes/Makefile | 1 + src/backend/nodes/gen_node_support.pl | 2 + src/backend/utils/adt/pseudotypes.c | 1 + src/include/catalog/pg_proc.dat | 6 ++ src/include/catalog/pg_type.dat | 6 ++ src/include/commands/copy.h | 2 + src/include/commands/copyapi.h | 4 + src/include/nodes/meson.build | 1 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_copy_format/.gitignore | 4 + src/test/modules/test_copy_format/Makefile | 23 ++++ .../expected/test_copy_format.out | 21 ++++ src/test/modules/test_copy_format/meson.build | 33 ++++++ .../test_copy_format/sql/test_copy_format.sql | 6 ++ .../test_copy_format--1.0.sql | 8 ++ .../test_copy_format/test_copy_format.c | 100 ++++++++++++++++++ .../test_copy_format/test_copy_format.control | 4 + 21 files changed, 313 insertions(+), 15 deletions(-) create mode 100644 src/test/modules/test_copy_format/.gitignore create mode 100644 src/test/modules/test_copy_format/Makefile create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out create mode 100644 src/test/modules/test_copy_format/meson.build create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format.c create mode 100644 src/test/modules/test_copy_format/test_copy_format.control diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3bb579a3a44..d7d409379d1 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -32,6 +32,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -443,6 +444,87 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) return COPY_LOG_VERBOSITY_DEFAULT; /* keep compiler quiet */ } +/* + * Process the "format" option. + * + * This function checks whether the option value is a built-in format such as + * "text" and "csv" or not. If the option value isn't a built-in format, this + * function finds a COPY format handler that returns a CopyToRoutine (for + * is_from == false) or CopyFromRountine (for is_from == true). If no COPY + * format handler is found, this function reports an error. + */ +static void +ProcessCopyOptionFormat(ParseState *pstate, + CopyFormatOptions *opts_out, + bool is_from, + DefElem *defel) +{ + char *format; + Oid funcargtypes[1]; + Oid handlerOid = InvalidOid; + Datum datum; + Node *routine; + + format = defGetString(defel); + + /* built-in formats */ + if (strcmp(format, "text") == 0) + /* default format */ return; + else if (strcmp(format, "csv") == 0) + { + opts_out->csv_mode = true; + return; + } + else if (strcmp(format, "binary") == 0) + { + opts_out->binary = true; + return; + } + + /* custom format */ + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); + if (!OidIsValid(handlerOid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", format), + parser_errposition(pstate, defel->location))); + + datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); + routine = (Node *) DatumGetPointer(datum); + if (is_from) + { + if (routine == NULL || !IsA(routine, CopyFromRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyFromRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } + else + { + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } + + opts_out->routine = routine; +} + /* * Process the statement option list for COPY. * @@ -485,22 +567,10 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(defel->defname, "format") == 0) { - char *fmt = defGetString(defel); - if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) - opts_out->csv_mode = true; - else if (strcmp(fmt, "binary") == 0) - opts_out->binary = true; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); + ProcessCopyOptionFormat(pstate, opts_out, is_from, defel); } else if (strcmp(defel->defname, "freeze") == 0) { diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 14f95f17124..7ecd9a1ad2c 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -247,7 +247,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = { static const CopyFromRoutine * CopyFromGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyFromRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyFromRoutineCSV; else if (opts.binary) return &CopyFromRoutineBinary; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 86dc1b742cc..8ddbddb119d 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -431,7 +431,9 @@ static const CopyToRoutine CopyToRoutineBinary = { static const CopyToRoutine * CopyToGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyToRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyToRoutineCSV; else if (opts.binary) return &CopyToRoutineBinary; diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index 66bbad8e6e0..173ee11811c 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -49,6 +49,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copyapi.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index 81df3bdf95f..428ab4f0d93 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ my @all_input_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ my @nodetag_only_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index e189e9b79d2..25f24ab95d2 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(internal); PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 322114d72a7..f108780e8b6 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7741,6 +7741,12 @@ { oid => '3312', descr => 'I/O', proname => 'tsm_handler_out', prorettype => 'cstring', proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' }, +{ oid => '8753', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler', + proargtypes => 'cstring', prosrc => 'copy_handler_in' }, +{ oid => '8754', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '267', descr => 'I/O', proname => 'table_am_handler_in', proisstrict => 'f', prorettype => 'table_am_handler', proargtypes => 'cstring', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index ceff66ccde1..37ebfa0908f 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -633,6 +633,12 @@ typcategory => 'P', typinput => 'tsm_handler_in', typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '8752', + descr => 'pseudo-type for the result of a copy to/from method function', + typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '269', descr => 'pseudo-type for the result of a table AM handler function', typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index ccfbdf0ee01..79bd4fb9151 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -84,6 +84,8 @@ typedef struct CopyFormatOptions CopyOnErrorChoice on_error; /* what to do when error happened */ CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine or CopyFromRoutine (can be + * NULL) */ } CopyFormatOptions; /* These are private in commands/copy[from|to].c */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index d1289424c67..e049a45a4b1 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -27,6 +27,8 @@ typedef struct CopyToStateData *CopyToState; */ typedef struct CopyFromRoutine { + NodeTag type; + /* * Called when COPY FROM is started to set up the input functions * associated with the relation's attributes writing to. `finfo` can be @@ -69,6 +71,8 @@ typedef struct CopyFromRoutine */ typedef struct CopyToRoutine { + NodeTag type; + /* * Called when COPY TO is started to set up the output functions * associated with the relation's attributes reading from. `finfo` can be diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build index b665e55b657..103df1a7873 100644 --- a/src/include/nodes/meson.build +++ b/src/include/nodes/meson.build @@ -11,6 +11,7 @@ node_support_input_i = [ 'access/sdir.h', 'access/tableam.h', 'access/tsmapi.h', + 'commands/copyapi.h', 'commands/event_trigger.h', 'commands/trigger.h', 'executor/tuptable.h', diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 256799f520a..b7b46928a19 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index d8fe059d236..c42b4b2b31f 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -14,6 +14,7 @@ subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 00000000000..8497f91624d --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +EXTENSION = test_copy_format +DATA = test_copy_format--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 00000000000..4ed7c0b12db --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,21 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd +COPY public.test TO stdout WITH (format 'test_copy_format'); +NOTICE: test_copy_format: is_from=false +NOTICE: CopyToOutFunc: atttypid=21 +NOTICE: CopyToOutFunc: atttypid=23 +NOTICE: CopyToOutFunc: atttypid=20 +NOTICE: CopyToStart: natts=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 00000000000..4cefe7b709a --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +test_install_data += files( + 'test_copy_format.control', + 'test_copy_format--1.0.sql', +) + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 00000000000..e805f7cb011 --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,6 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +\. +COPY public.test TO stdout WITH (format 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql b/src/test/modules/test_copy_format/test_copy_format--1.0.sql new file mode 100644 index 00000000000..d24ea03ce99 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit + +CREATE FUNCTION test_copy_format(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 00000000000..f6b105659ab --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,100 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copyapi.h" +#include "commands/defrem.h" + +PG_MODULE_MAGIC; + +static void +CopyFromInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid))); +} + +static void +CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts))); +} + +static bool +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +CopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .type = T_CopyFromRoutine, + .CopyFromInFunc = CopyFromInFunc, + .CopyFromStart = CopyFromStart, + .CopyFromOneRow = CopyFromOneRow, + .CopyFromEnd = CopyFromEnd, +}; + +static void +CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid))); +} + +static void +CopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts))); +} + +static void +CopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid))); +} + +static void +CopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .type = T_CopyToRoutine, + .CopyToOutFunc = CopyToOutFunc, + .CopyToStart = CopyToStart, + .CopyToOneRow = CopyToOneRow, + .CopyToEnd = CopyToEnd, +}; + +PG_FUNCTION_INFO_V1(test_copy_format); +Datum +test_copy_format(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + ereport(NOTICE, + (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); + + if (is_from) + PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat); + else + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.control b/src/test/modules/test_copy_format/test_copy_format.control new file mode 100644 index 00000000000..f05a6362358 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.control @@ -0,0 +1,4 @@ +comment = 'Test code for custom COPY format' +default_version = '1.0' +module_pathname = '$libdir/test_copy_format' +relocatable = true -- 2.45.2 From 95213545ae4754c3a435ba2d737fd0b2677e9241 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jan 2024 14:54:10 +0900 Subject: [PATCH v20 4/5] Export CopyToStateData and CopyFromStateData It's for custom COPY TO/FROM format handlers implemented as extension. This just moves codes. This doesn't change codes except CopyDest/CopySource enum values. CopyDest/CopySource enum values such as COPY_FILE are conflicted each other. So COPY_DEST_ prefix instead of COPY_ prefix is used for CopyDest enum values and COPY_SOURCE_ prefix instead of COPY_ prefix is used for CopySource enum values. For example, COPY_FILE in CopyDest is renamed to COPY_DEST_FILE and COPY_FILE in CopySource is renamed to COPY_SOURCE_FILE. Note that this isn't enough to implement custom COPY TO/FROM format handlers as extension. We'll do the followings in a subsequent commit: For custom COPY TO format handler: 1. Add an opaque space for custom COPY TO format handler 2. Export CopySendEndOfRow() to flush buffer For custom COPY FROM format handler: 1. Add an opaque space for custom COPY FROM format handler 2. Export CopyReadBinaryData() to read the next data --- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyfromparse.c | 10 +- src/backend/commands/copyto.c | 77 +----- src/include/commands/copy.h | 78 +----- src/include/commands/copyapi.h | 306 ++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 165 ------------ 6 files changed, 320 insertions(+), 320 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 7ecd9a1ad2c..dd4342f8b9c 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1713,7 +1713,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1841,7 +1841,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 5f63b683d17..ec86a17b3b3 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -180,7 +180,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -257,7 +257,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -340,7 +340,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1188,7 +1188,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 8ddbddb119d..37b150b44ba 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -37,67 +37,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* format routine */ - const CopyToRoutine *routine; - - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -143,7 +82,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) { switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); @@ -151,7 +90,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) CopySendString(cstate, "\r\n"); #endif break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ CopySendChar(cstate, '\n'); break; @@ -460,7 +399,7 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -507,7 +446,7 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -541,11 +480,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -925,12 +864,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 79bd4fb9151..e2411848e9f 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,87 +14,11 @@ #ifndef COPY_H #define COPY_H -#include "nodes/execnodes.h" +#include "commands/copyapi.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" -/* - * Represents whether a header line should be present, and whether it must - * match the actual names (which implies "true"). - */ -typedef enum CopyHeaderChoice -{ - COPY_HEADER_FALSE = 0, - COPY_HEADER_TRUE, - COPY_HEADER_MATCH, -} CopyHeaderChoice; - -/* - * Represents where to save input processing errors. More values to be added - * in the future. - */ -typedef enum CopyOnErrorChoice -{ - COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ - COPY_ON_ERROR_IGNORE, /* ignore errors */ -} CopyOnErrorChoice; - -/* - * Represents verbosity of logged messages by COPY command. - */ -typedef enum CopyLogVerbosityChoice -{ - COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages, default */ - COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ -} CopyLogVerbosityChoice; - -/* - * A struct to hold COPY options, in a parsed form. All of these are related - * to formatting, except for 'freeze', which doesn't really belong here, but - * it's expedient to parse it along with all the other options. - */ -typedef struct CopyFormatOptions -{ - /* parameters from the COPY command */ - int file_encoding; /* file or remote side's character encoding, - * -1 if not specified */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - CopyHeaderChoice header_line; /* header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *default_print; /* DEFAULT marker string */ - int default_print_len; /* length of same */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool force_notnull_all; /* FORCE_NOT_NULL *? */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool force_null_all; /* FORCE_NULL *? */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - CopyOnErrorChoice on_error; /* what to do when error happened */ - CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ - List *convert_select; /* list of column names (can be NIL) */ - Node *routine; /* CopyToRoutine or CopyFromRoutine (can be - * NULL) */ -} CopyFormatOptions; - -/* These are private in commands/copy[from|to].c */ -typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; - -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); -typedef void (*copy_data_dest_cb) (void *data, int len); - extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index e049a45a4b1..8a560903ede 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -14,12 +14,81 @@ #ifndef COPYAPI_H #define COPYAPI_H +#include "commands/trigger.h" +#include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" -/* These are private in commands/copy[from|to].c */ +/* + * Represents whether a header line should be present, and whether it must + * match the actual names (which implies "true"). + */ +typedef enum CopyHeaderChoice +{ + COPY_HEADER_FALSE = 0, + COPY_HEADER_TRUE, + COPY_HEADER_MATCH, +} CopyHeaderChoice; + +/* + * Represents where to save input processing errors. More values to be added + * in the future. + */ +typedef enum CopyOnErrorChoice +{ + COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ + COPY_ON_ERROR_IGNORE, /* ignore errors */ +} CopyOnErrorChoice; + +/* + * Represents verbosity of logged messages by COPY command. + */ +typedef enum CopyLogVerbosityChoice +{ + COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages, default */ + COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ +} CopyLogVerbosityChoice; + +/* + * A struct to hold COPY options, in a parsed form. All of these are related + * to formatting, except for 'freeze', which doesn't really belong here, but + * it's expedient to parse it along with all the other options. + */ +typedef struct CopyFormatOptions +{ + /* parameters from the COPY command */ + int file_encoding; /* file or remote side's character encoding, + * -1 if not specified */ + bool binary; /* binary format? */ + bool freeze; /* freeze rows on loading? */ + bool csv_mode; /* Comma Separated Value format? */ + CopyHeaderChoice header_line; /* header line? */ + char *null_print; /* NULL marker string (server encoding!) */ + int null_print_len; /* length of same */ + char *null_print_client; /* same converted to file encoding */ + char *default_print; /* DEFAULT marker string */ + int default_print_len; /* length of same */ + char *delim; /* column delimiter (must be 1 byte) */ + char *quote; /* CSV quote char (must be 1 byte) */ + char *escape; /* CSV escape char (must be 1 byte) */ + List *force_quote; /* list of column names */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + List *force_notnull; /* list of column names */ + bool force_notnull_all; /* FORCE_NOT_NULL *? */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool force_null_all; /* FORCE_NULL *? */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + CopyOnErrorChoice on_error; /* what to do when error happened */ + CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ + List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine or CopyFromRoutine (can be + * NULL) */ +} CopyFormatOptions; + typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; /* * API structure for a COPY FROM format implementation. Note this must be @@ -65,6 +134,176 @@ typedef struct CopyFromRoutine void (*CopyFromEnd) (CopyFromState cstate); } CopyFromRoutine; +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +/* + * Represents the insert method to be used during COPY FROM. + */ +typedef enum CopyInsertMethod +{ + CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ + CIM_MULTI, /* always use table_multi_insert or + * ExecForeignBatchInsert */ + CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + * ExecForeignBatchInsert only if valid */ +} CopyInsertMethod; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* format routine */ + const CopyFromRoutine *routine; + + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyFromStateData; + + +typedef struct CopyToStateData *CopyToState; + /* * API structure for a COPY TO format implementation. Note this must be * allocated in a server-lifetime manner, typically as a static const struct. @@ -102,4 +341,67 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +typedef void (*copy_data_dest_cb) (void *data, int len); + +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* format routine */ + const CopyToRoutine *routine; + + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyToStateData; + #endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c11b5ff3cc0..3863d26d5b7 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -19,171 +19,6 @@ #include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - -/* - * Represents the insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or - * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or - * ExecForeignBatchInsert only if valid */ -} CopyInsertMethod; - -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* format routine */ - const CopyFromRoutine *routine; - - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); -- 2.45.2 From 3f19b3ea56f3a234846b4078edcf85eb229df8ee Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jan 2024 15:12:43 +0900 Subject: [PATCH v20 5/5] Add support for implementing custom COPY TO/FROM format as extension For custom COPY TO format implementation: * Add CopyToStateData::opaque that can be used to keep data for custom COPY TO format implementation * Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf as CopyToStateFlush() For custom COPY FROM format implementation: * Add CopyFromStateData::opaque that can be used to keep data for custom COPY From format implementation * Export CopyReadBinaryData() to read the next data as CopyFromStateRead() --- src/backend/commands/copyfromparse.c | 14 ++++++++++++++ src/backend/commands/copyto.c | 14 ++++++++++++++ src/include/commands/copyapi.h | 10 ++++++++++ 3 files changed, 38 insertions(+) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index ec86a17b3b3..64772877b0f 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -739,6 +739,20 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) return copied_bytes; } +/* + * CopyFromStateRead + * + * Export CopyReadBinaryData() for extensions. We want to keep + * CopyReadBinaryData() as a static function for + * optimization. CopyReadBinaryData() calls in this file may be optimized by + * a compiler. + */ +int +CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes) +{ + return CopyReadBinaryData(cstate, dest, nbytes); +} + /* * Read raw fields in the next line for COPY FROM in text or csv mode. * Return false if no more lines. diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 37b150b44ba..c99edae575b 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -496,6 +496,20 @@ CopySendEndOfRow(CopyToState cstate) resetStringInfo(fe_msgbuf); } +/* + * CopyToStateFlush + * + * Export CopySendEndOfRow() for extensions. We want to keep + * CopySendEndOfRow() as a static function for + * optimization. CopySendEndOfRow() calls in this file may be optimized by a + * compiler. + */ +void +CopyToStateFlush(CopyToState cstate) +{ + CopySendEndOfRow(cstate); +} + /* * These functions do apply some data conversion */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 8a560903ede..c1e9fe366f3 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -299,8 +299,13 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyFromStateData; +extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); + typedef struct CopyToStateData *CopyToState; @@ -402,6 +407,11 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyToStateData; +extern void CopyToStateFlush(CopyToState cstate); + #endif /* COPYAPI_H */ -- 2.45.2 From e33abbfdf1bdf0c68d77a8ab367ae5a9800b613e Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:24:49 +0900 Subject: [PATCH v21 01/10] Add CopyToRountine It's for implementing custom COPY TO format. But this is not enough to implement custom COPY TO format yet. We'll export some APIs to send data and add "format" option to COPY TO later. Existing text/csv/binary format implementations don't use CopyToRoutine for now. We have a patch for it but we defer it. Because there are some mysterious profile results in spite of we get faster runtimes. See [1] for details. [1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz Note that this doesn't change existing text/csv/binary format implementations. --- src/backend/commands/copyto.c | 31 ++++++++++++++--- src/include/commands/copyapi.h | 58 ++++++++++++++++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 src/include/commands/copyapi.h diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 91de442f434..3c5a97679aa 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -20,6 +20,7 @@ #include "access/tableam.h" #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -64,6 +65,9 @@ typedef enum CopyDest */ typedef struct CopyToStateData { + /* format routine */ + const CopyToRoutine *routine; + /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ @@ -772,14 +776,22 @@ DoCopyTo(CopyToState cstate) Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (cstate->opts.binary) + { getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); else + { getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } } /* @@ -806,6 +818,8 @@ DoCopyTo(CopyToState cstate) tmp = 0; CopySendInt32(cstate, tmp); } + else if (cstate->routine) + cstate->routine->CopyToStart(cstate, tupDesc); else { /* @@ -887,6 +901,8 @@ DoCopyTo(CopyToState cstate) /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } + else if (cstate->routine) + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -908,15 +924,22 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + if (cstate->routine) + { + cstate->routine->CopyToOneRow(cstate, slot); + MemoryContextSwitchTo(oldcontext); + return; + } + if (cstate->opts.binary) { /* Binary per-tuple header */ CopySendInt16(cstate, list_length(cstate->attnumlist)); } - /* Make sure the tuple is fully deconstructed */ - slot_getallattrs(slot); - if (!cstate->opts.binary) { bool need_delim = false; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h new file mode 100644 index 00000000000..5ce24f195dc --- /dev/null +++ b/src/include/commands/copyapi.h @@ -0,0 +1,58 @@ +/*------------------------------------------------------------------------- + * + * copyapi.h + * API for COPY TO handlers + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyapi.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYAPI_H +#define COPYAPI_H + +#include "executor/tuptable.h" +#include "nodes/execnodes.h" + +/* This is private in commands/copyto.c */ +typedef struct CopyToStateData *CopyToState; + +/* + * API structure for a COPY TO format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyToRoutine +{ + /* + * Called when COPY TO is started to set up the output functions + * associated with the relation's attributes reading from. `finfo` can be + * optionally filled to provide the catalog information of the output + * function. `atttypid` is the OID of data type used by the relation's + * attribute. + */ + void (*CopyToOutFunc) (CopyToState cstate, Oid atttypid, + FmgrInfo *finfo); + + /* + * Called when COPY TO is started. + * + * `tupDesc` is the tuple descriptor of the relation from where the data + * is read. + */ + void (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc); + + /* + * Copy one row for COPY TO. + * + * `slot` is the tuple slot where the data is emitted. + */ + void (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO has ended */ + void (*CopyToEnd) (CopyToState cstate); +} CopyToRoutine; + +#endif /* COPYAPI_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5fabb127d7e..8eb537bfa77 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -503,6 +503,7 @@ CopyMultiInsertInfo CopyOnErrorChoice CopySource CopyStmt +CopyToRoutine CopyToState CopyToStateData Cost -- 2.45.2 From be1264ed79cdc5a7e529b05866f8b6fe589a705f Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:26:29 +0900 Subject: [PATCH v21 02/10] Use CopyToRountine for the existing formats The existing formats are text, csv and binary. If we find any performance regression by this, we will not merge this to master. This will increase indirect function call costs but this will reduce runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)" branch costs. This uses an optimization based of static inline function and a constant argument call for cstate->opts.csv_mode. For example, CopyToTextLikeOneRow() uses this optimization. It accepts the "bool is_csv" argument instead of using cstate->opts.csv_mode in it. CopyToTextOneRow() calls CopyToTextLikeOneRow() with false (constant) for "bool is_csv". Compiler will remove "if (is_csv)" branch in it by this optimization. This doesn't change existing logic. This just moves existing codes. --- src/backend/commands/copyto.c | 477 +++++++++++++++++++++++----------- 1 file changed, 319 insertions(+), 158 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 3c5a97679aa..86dc1b742cc 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -128,6 +128,317 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToRoutine implementations. + */ + +/* + * CopyToTextLikeSendEndOfRow + * + * Apply line terminations for a line sent in text or CSV format depending + * on the destination, then send the end of a row. + */ +static inline void +CopyToTextLikeSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + + /* Now take the actions related to the end of a row */ + CopySendEndOfRow(cstate); +} + +/* + * CopyToTextLikeStart + * + * Start of COPY TO for text and CSV format. + */ +static void +CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* + * For non-binary copy, we need to convert null_print to file encoding, + * because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + ListCell *cur; + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToTextLikeSendEndOfRow(cstate); + } +} + +/* + * CopyToTextLikeOutFunc + * + * Assign output function data for a relation's attribute in text/CSV format. + */ +static void +CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + + +/* + * CopyToTextLikeOneRow + * + * Process one row for text/CSV format. + * + * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow(). + */ +static inline void +CopyToTextLikeOneRow(CopyToState cstate, + TupleTableSlot *slot, + bool is_csv) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], + value); + + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1]); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToTextLikeSendEndOfRow(cstate); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with text format. + */ +static void +CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, false); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with CSV format. + */ +static void +CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, true); +} + +/* + * CopyToTextLikeEnd + * + * End of COPY TO for text/CSV format. + */ +static void +CopyToTextLikeEnd(CopyToState cstate) +{ + /* Nothing to do here */ +} + +/* + * CopyToRoutine implementation for "binary". + */ + +/* + * CopyToBinaryStart + * + * Start of COPY TO for binary format. + */ +static void +CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); +} + +/* + * CopyToBinaryOutFunc + * + * Assign output function data for a relation's attribute in binary format. + */ +static void +CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + +/* + * CopyToBinaryOneRow + * + * Process one row for binary format. + */ +static void +CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], + value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +/* + * CopyToBinaryEnd + * + * End of COPY TO for binary format. + */ +static void +CopyToBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} + +/* + * CSV and text share the same implementation, at the exception of the + * output representation and per-row callbacks. + */ +static const CopyToRoutine CopyToRoutineText = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineCSV = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToCSVOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineBinary = { + .CopyToStart = CopyToBinaryStart, + .CopyToOutFunc = CopyToBinaryOutFunc, + .CopyToOneRow = CopyToBinaryOneRow, + .CopyToEnd = CopyToBinaryEnd, +}; + +/* + * Define the COPY TO routines to use for a format. This should be called + * after options are parsed. + */ +static const CopyToRoutine * +CopyToGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyToRoutineCSV; + else if (opts.binary) + return &CopyToRoutineBinary; + + /* default is text */ + return &CopyToRoutineText; +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -195,16 +506,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -239,10 +540,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -430,6 +727,9 @@ BeginCopyTo(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyToGetRoutine(cstate->opts); + /* Process the source/target relation or query */ if (rel) { @@ -771,27 +1071,10 @@ DoCopyTo(CopyToState cstate) foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - if (cstate->opts.binary) - { - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyToOutFunc(cstate, attr->atttypid, - &cstate->out_functions[attnum - 1]); - else - { - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); } /* @@ -804,58 +1087,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else if (cstate->routine) - cstate->routine->CopyToStart(cstate, tupDesc); - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->routine->CopyToStart(cstate, tupDesc); if (cstate->rel) { @@ -894,15 +1126,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - else if (cstate->routine) - cstate->routine->CopyToEnd(cstate); + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -918,7 +1142,6 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; MemoryContextReset(cstate->rowcontext); @@ -927,69 +1150,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - if (cstate->routine) - { - cstate->routine->CopyToOneRow(cstate, slot); - MemoryContextSwitchTo(oldcontext); - return; - } - - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - - if (!cstate->opts.binary) - { - bool need_delim = false; - - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - char *string; - - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - - if (isnull) - CopySendString(cstate, cstate->opts.null_print_client); - else - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1]); - else - CopyAttributeOutText(cstate, string); - } - } - } - else - { - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - bytea *outputbytes; - - if (isnull) - CopySendInt32(cstate, -1); - else - { - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->routine->CopyToOneRow(cstate, slot); MemoryContextSwitchTo(oldcontext); } -- 2.45.2 From 9c6123756023e7515133c649a14be6d294a31f29 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:40:54 +0900 Subject: [PATCH v21 03/10] Add support for adding custom COPY TO format This uses the handler approach like tablesample. The approach creates an internal function that returns an internal struct. In this case, a COPY TO handler returns a CopyToRoutine. This also add a test module for custom COPY TO handler. --- src/backend/commands/copy.c | 82 ++++++++++++++++--- src/backend/commands/copyto.c | 4 +- src/backend/nodes/Makefile | 1 + src/backend/nodes/gen_node_support.pl | 2 + src/backend/utils/adt/pseudotypes.c | 1 + src/include/catalog/pg_proc.dat | 6 ++ src/include/catalog/pg_type.dat | 6 ++ src/include/commands/copy.h | 1 + src/include/commands/copyapi.h | 2 + src/include/nodes/meson.build | 1 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_copy_format/.gitignore | 4 + src/test/modules/test_copy_format/Makefile | 23 ++++++ .../expected/test_copy_format.out | 17 ++++ src/test/modules/test_copy_format/meson.build | 33 ++++++++ .../test_copy_format/sql/test_copy_format.sql | 5 ++ .../test_copy_format--1.0.sql | 8 ++ .../test_copy_format/test_copy_format.c | 63 ++++++++++++++ .../test_copy_format/test_copy_format.control | 4 + 20 files changed, 251 insertions(+), 14 deletions(-) create mode 100644 src/test/modules/test_copy_format/.gitignore create mode 100644 src/test/modules/test_copy_format/Makefile create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out create mode 100644 src/test/modules/test_copy_format/meson.build create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format.c create mode 100644 src/test/modules/test_copy_format/test_copy_format.control diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3bb579a3a44..3aea654ab8a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -32,6 +32,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -443,6 +444,73 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) return COPY_LOG_VERBOSITY_DEFAULT; /* keep compiler quiet */ } +/* + * Process the "format" option. + * + * This function checks whether the option value is a built-in format such as + * "text" and "csv" or not. If the option value isn't a built-in format, this + * function finds a COPY format handler that returns a CopyToRoutine (for + * is_from == false). If no COPY format handler is found, this function + * reports an error. + */ +static void +ProcessCopyOptionFormat(ParseState *pstate, + CopyFormatOptions *opts_out, + bool is_from, + DefElem *defel) +{ + char *format; + Oid funcargtypes[1]; + Oid handlerOid = InvalidOid; + Datum datum; + Node *routine; + + format = defGetString(defel); + + /* built-in formats */ + if (strcmp(format, "text") == 0) + /* default format */ return; + else if (strcmp(format, "csv") == 0) + { + opts_out->csv_mode = true; + return; + } + else if (strcmp(format, "binary") == 0) + { + opts_out->binary = true; + return; + } + + /* custom format */ + if (!is_from) + { + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); + } + if (!OidIsValid(handlerOid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", format), + parser_errposition(pstate, defel->location))); + + datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); + routine = (Node *) DatumGetPointer(datum); + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + + opts_out->routine = routine; +} + /* * Process the statement option list for COPY. * @@ -485,22 +553,10 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(defel->defname, "format") == 0) { - char *fmt = defGetString(defel); - if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) - opts_out->csv_mode = true; - else if (strcmp(fmt, "binary") == 0) - opts_out->binary = true; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); + ProcessCopyOptionFormat(pstate, opts_out, is_from, defel); } else if (strcmp(defel->defname, "freeze") == 0) { diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 86dc1b742cc..8ddbddb119d 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -431,7 +431,9 @@ static const CopyToRoutine CopyToRoutineBinary = { static const CopyToRoutine * CopyToGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyToRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyToRoutineCSV; else if (opts.binary) return &CopyToRoutineBinary; diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index 66bbad8e6e0..173ee11811c 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -49,6 +49,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copyapi.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index 81df3bdf95f..428ab4f0d93 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ my @all_input_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ my @nodetag_only_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index e189e9b79d2..25f24ab95d2 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(internal); PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 322114d72a7..f108780e8b6 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7741,6 +7741,12 @@ { oid => '3312', descr => 'I/O', proname => 'tsm_handler_out', prorettype => 'cstring', proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' }, +{ oid => '8753', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler', + proargtypes => 'cstring', prosrc => 'copy_handler_in' }, +{ oid => '8754', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '267', descr => 'I/O', proname => 'table_am_handler_in', proisstrict => 'f', prorettype => 'table_am_handler', proargtypes => 'cstring', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index ceff66ccde1..793dd671935 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -633,6 +633,12 @@ typcategory => 'P', typinput => 'tsm_handler_in', typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '8752', + descr => 'pseudo-type for the result of a copy to method function', + typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '269', descr => 'pseudo-type for the result of a table AM handler function', typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 141fd48dc10..e9ed8443210 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -84,6 +84,7 @@ typedef struct CopyFormatOptions CopyOnErrorChoice on_error; /* what to do when error happened */ CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine (can be NULL) */ } CopyFormatOptions; /* These are private in commands/copy[from|to].c */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 5ce24f195dc..05b7d92ddba 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -26,6 +26,8 @@ typedef struct CopyToStateData *CopyToState; */ typedef struct CopyToRoutine { + NodeTag type; + /* * Called when COPY TO is started to set up the output functions * associated with the relation's attributes reading from. `finfo` can be diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build index b665e55b657..103df1a7873 100644 --- a/src/include/nodes/meson.build +++ b/src/include/nodes/meson.build @@ -11,6 +11,7 @@ node_support_input_i = [ 'access/sdir.h', 'access/tableam.h', 'access/tsmapi.h', + 'commands/copyapi.h', 'commands/event_trigger.h', 'commands/trigger.h', 'executor/tuptable.h', diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 256799f520a..b7b46928a19 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index d8fe059d236..c42b4b2b31f 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -14,6 +14,7 @@ subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 00000000000..8497f91624d --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +EXTENSION = test_copy_format +DATA = test_copy_format--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 00000000000..606c78f6878 --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,17 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +ERROR: COPY format "test_copy_format" not recognized +LINE 1: COPY public.test FROM stdin WITH (format 'test_copy_format')... + ^ +COPY public.test TO stdout WITH (format 'test_copy_format'); +NOTICE: test_copy_format: is_from=false +NOTICE: CopyToOutFunc: atttypid=21 +NOTICE: CopyToOutFunc: atttypid=23 +NOTICE: CopyToOutFunc: atttypid=20 +NOTICE: CopyToStart: natts=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 00000000000..4cefe7b709a --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +test_install_data += files( + 'test_copy_format.control', + 'test_copy_format--1.0.sql', +) + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 00000000000..9406b3be3d4 --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,5 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +COPY public.test TO stdout WITH (format 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql b/src/test/modules/test_copy_format/test_copy_format--1.0.sql new file mode 100644 index 00000000000..d24ea03ce99 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit + +CREATE FUNCTION test_copy_format(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 00000000000..e064f40473b --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,63 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copyapi.h" +#include "commands/defrem.h" + +PG_MODULE_MAGIC; + +static void +CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid))); +} + +static void +CopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts))); +} + +static void +CopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid))); +} + +static void +CopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .type = T_CopyToRoutine, + .CopyToOutFunc = CopyToOutFunc, + .CopyToStart = CopyToStart, + .CopyToOneRow = CopyToOneRow, + .CopyToEnd = CopyToEnd, +}; + +PG_FUNCTION_INFO_V1(test_copy_format); +Datum +test_copy_format(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + ereport(NOTICE, + (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); + + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.control b/src/test/modules/test_copy_format/test_copy_format.control new file mode 100644 index 00000000000..f05a6362358 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.control @@ -0,0 +1,4 @@ +comment = 'Test code for custom COPY format' +default_version = '1.0' +module_pathname = '$libdir/test_copy_format' +relocatable = true -- 2.45.2 From c0c504a899861da2d9d66fd9bd3c31c56735ffe0 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:56:36 +0900 Subject: [PATCH v21 04/10] Export CopyToStateData It's for custom COPY TO format handlers implemented as extension. This just moves codes. This doesn't change codes except CopyDest enum values. CopyDest/CopyFrom enum values such as COPY_FILE are conflicted each other. So COPY_DEST_ prefix instead of COPY_ prefix is used for CopyDest enum values. For example, COPY_FILE in CopyDest is renamed to COPY_DEST_FILE. Note that this isn't enough to implement custom COPY TO format handlers as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY TO format handler 2. Export CopySendEndOfRow() to flush buffer --- src/backend/commands/copyto.c | 77 ++----------------- src/include/commands/copy.h | 74 +----------------- src/include/commands/copyapi.h | 134 ++++++++++++++++++++++++++++++++- 3 files changed, 143 insertions(+), 142 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 8ddbddb119d..37b150b44ba 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -37,67 +37,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* format routine */ - const CopyToRoutine *routine; - - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -143,7 +82,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) { switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); @@ -151,7 +90,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) CopySendString(cstate, "\r\n"); #endif break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ CopySendChar(cstate, '\n'); break; @@ -460,7 +399,7 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -507,7 +446,7 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -541,11 +480,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -925,12 +864,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index e9ed8443210..dd645eaa030 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,85 +14,15 @@ #ifndef COPY_H #define COPY_H -#include "nodes/execnodes.h" +#include "commands/copyapi.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" -/* - * Represents whether a header line should be present, and whether it must - * match the actual names (which implies "true"). - */ -typedef enum CopyHeaderChoice -{ - COPY_HEADER_FALSE = 0, - COPY_HEADER_TRUE, - COPY_HEADER_MATCH, -} CopyHeaderChoice; - -/* - * Represents where to save input processing errors. More values to be added - * in the future. - */ -typedef enum CopyOnErrorChoice -{ - COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ - COPY_ON_ERROR_IGNORE, /* ignore errors */ -} CopyOnErrorChoice; - -/* - * Represents verbosity of logged messages by COPY command. - */ -typedef enum CopyLogVerbosityChoice -{ - COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages, default */ - COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ -} CopyLogVerbosityChoice; - -/* - * A struct to hold COPY options, in a parsed form. All of these are related - * to formatting, except for 'freeze', which doesn't really belong here, but - * it's expedient to parse it along with all the other options. - */ -typedef struct CopyFormatOptions -{ - /* parameters from the COPY command */ - int file_encoding; /* file or remote side's character encoding, - * -1 if not specified */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - CopyHeaderChoice header_line; /* header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *default_print; /* DEFAULT marker string */ - int default_print_len; /* length of same */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool force_notnull_all; /* FORCE_NOT_NULL *? */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool force_null_all; /* FORCE_NULL *? */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - CopyOnErrorChoice on_error; /* what to do when error happened */ - CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ - List *convert_select; /* list of column names (can be NIL) */ - Node *routine; /* CopyToRoutine (can be NULL) */ -} CopyFormatOptions; - -/* These are private in commands/copy[from|to].c */ +/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); -typedef void (*copy_data_dest_cb) (void *data, int len); extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 05b7d92ddba..03779c15f43 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -14,10 +14,79 @@ #ifndef COPYAPI_H #define COPYAPI_H +#include "commands/trigger.h" +#include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" -/* This is private in commands/copyto.c */ +/* + * Represents whether a header line should be present, and whether it must + * match the actual names (which implies "true"). + */ +typedef enum CopyHeaderChoice +{ + COPY_HEADER_FALSE = 0, + COPY_HEADER_TRUE, + COPY_HEADER_MATCH, +} CopyHeaderChoice; + +/* + * Represents where to save input processing errors. More values to be added + * in the future. + */ +typedef enum CopyOnErrorChoice +{ + COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ + COPY_ON_ERROR_IGNORE, /* ignore errors */ +} CopyOnErrorChoice; + +/* + * Represents verbosity of logged messages by COPY command. + */ +typedef enum CopyLogVerbosityChoice +{ + COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages, default */ + COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ +} CopyLogVerbosityChoice; + +/* + * A struct to hold COPY options, in a parsed form. All of these are related + * to formatting, except for 'freeze', which doesn't really belong here, but + * it's expedient to parse it along with all the other options. + */ +typedef struct CopyFormatOptions +{ + /* parameters from the COPY command */ + int file_encoding; /* file or remote side's character encoding, + * -1 if not specified */ + bool binary; /* binary format? */ + bool freeze; /* freeze rows on loading? */ + bool csv_mode; /* Comma Separated Value format? */ + CopyHeaderChoice header_line; /* header line? */ + char *null_print; /* NULL marker string (server encoding!) */ + int null_print_len; /* length of same */ + char *null_print_client; /* same converted to file encoding */ + char *default_print; /* DEFAULT marker string */ + int default_print_len; /* length of same */ + char *delim; /* column delimiter (must be 1 byte) */ + char *quote; /* CSV quote char (must be 1 byte) */ + char *escape; /* CSV escape char (must be 1 byte) */ + List *force_quote; /* list of column names */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + List *force_notnull; /* list of column names */ + bool force_notnull_all; /* FORCE_NOT_NULL *? */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool force_null_all; /* FORCE_NULL *? */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + CopyOnErrorChoice on_error; /* what to do when error happened */ + CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ + List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine (can be NULL) */ +} CopyFormatOptions; + typedef struct CopyToStateData *CopyToState; /* @@ -57,4 +126,67 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +typedef void (*copy_data_dest_cb) (void *data, int len); + +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* format routine */ + const CopyToRoutine *routine; + + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyToStateData; + #endif /* COPYAPI_H */ -- 2.45.2 From 9a2c88f860f29d0eae0d2a486516451d4a288e2f Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:59:34 +0900 Subject: [PATCH v21 05/10] Add support for implementing custom COPY TO format as extension * Add CopyToStateData::opaque that can be used to keep data for custom COPY TO format implementation * Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf as CopyToStateFlush() --- src/backend/commands/copyto.c | 14 ++++++++++++++ src/include/commands/copyapi.h | 5 +++++ 2 files changed, 19 insertions(+) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 37b150b44ba..c99edae575b 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -496,6 +496,20 @@ CopySendEndOfRow(CopyToState cstate) resetStringInfo(fe_msgbuf); } +/* + * CopyToStateFlush + * + * Export CopySendEndOfRow() for extensions. We want to keep + * CopySendEndOfRow() as a static function for + * optimization. CopySendEndOfRow() calls in this file may be optimized by a + * compiler. + */ +void +CopyToStateFlush(CopyToState cstate) +{ + CopySendEndOfRow(cstate); +} + /* * These functions do apply some data conversion */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 03779c15f43..30765951e2e 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -187,6 +187,11 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyToStateData; +extern void CopyToStateFlush(CopyToState cstate); + #endif /* COPYAPI_H */ -- 2.45.2 From a75c8076668416cc6112c4166c728cf2552c7e4a Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:06:20 +0900 Subject: [PATCH v21 06/10] Add CopyFromRoutine This is for implementing custom COPY FROM format. But this is not enough to implement custom COPY FROM format yet. We'll export some APIs to receive data and add "format" option to COPY FROM later. Existing text/csv/binary format implementations don't use CopyFromRoutine for now. We have a patch for it but we defer it. Because there are some mysterious profile results in spite of we get faster runtimes. See [1] for details. [1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz Note that this doesn't change existing text/csv/binary format implementations. --- src/backend/commands/copyfrom.c | 24 ++++++++++-- src/backend/commands/copyfromparse.c | 5 +++ src/include/commands/copyapi.h | 47 +++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 4 ++ src/tools/pgindent/typedefs.list | 1 + 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 2d3462913e1..c42485bd9cb 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1632,12 +1632,22 @@ BeginCopyFrom(ParseState *pstate, /* Fetch the input function and typioparam info */ if (cstate->opts.binary) + { getTypeBinaryInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); + else + { getTypeInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1777,10 +1787,13 @@ BeginCopyFrom(ParseState *pstate, /* Read and verify binary header */ ReceiveCopyBinaryHeader(cstate); } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) + else if (cstate->routine) { + cstate->routine->CopyFromStart(cstate, tupDesc); + } + else + { + /* create workspace for CopyReadAttributes results */ AttrNumber attr_count = list_length(cstate->attnumlist); cstate->max_fields = attr_count; @@ -1798,6 +1811,9 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + if (cstate->routine) + cstate->routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 97a4c387a30..2e126448019 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -1012,6 +1012,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Assert(fieldno == attr_count); } + else if (cstate->routine) + { + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; + } else { /* binary */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 30765951e2e..7421241de83 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * copyapi.h - * API for COPY TO handlers + * API for COPY TO/FROM handlers * * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group @@ -87,6 +87,51 @@ typedef struct CopyFormatOptions Node *routine; /* CopyToRoutine (can be NULL) */ } CopyFormatOptions; +/* This is private in commands/copyfrom.c */ +typedef struct CopyFromStateData *CopyFromState; + +/* + * API structure for a COPY FROM format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyFromRoutine +{ + /* + * Called when COPY FROM is started to set up the input functions + * associated with the relation's attributes writing to. `finfo` can be + * optionally filled to provide the catalog information of the input + * function. `typioparam` can be optionally filled to define the OID of + * the type to pass to the input function. `atttypid` is the OID of data + * type used by the relation's attribute. + */ + void (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); + + /* + * Called when COPY FROM is started. + * + * `tupDesc` is the tuple descriptor of the relation where the data needs + * to be copied. This can be used for any initialization steps required + * by a format. + */ + void (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc); + + /* + * Copy one row to a set of `values` and `nulls` of size tupDesc->natts. + * + * 'econtext' is used to evaluate default expression for each column that + * is either not read from the file or is using the DEFAULT option of COPY + * FROM. It is NULL if no default values are used. + * + * Returns false if there are no more tuples to copy. + */ + bool (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + + /* Called when COPY FROM has ended. */ + void (*CopyFromEnd) (CopyFromState cstate); +} CopyFromRoutine; + typedef struct CopyToStateData *CopyToState; /* diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc783..509b9e92a18 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -15,6 +15,7 @@ #define COPYFROM_INTERNAL_H #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/trigger.h" #include "nodes/miscnodes.h" @@ -58,6 +59,9 @@ typedef enum CopyInsertMethod */ typedef struct CopyFromStateData { + /* format routine */ + const CopyFromRoutine *routine; + /* low-level state data */ CopySource copy_src; /* type of copy source */ FILE *copy_file; /* used if copy_src == COPY_FILE */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8eb537bfa77..4c4bf60d9e5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -492,6 +492,7 @@ ConvertRowtypeExpr CookedConstraint CopyDest CopyFormatOptions +CopyFromRoutine CopyFromState CopyFromStateData CopyHeaderChoice -- 2.45.2 From 5790fb08795adcc43c72186e662b125e32b5986f Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:09:29 +0900 Subject: [PATCH v21 07/10] Use CopyFromRoutine for the existing formats The existing formats are text, csv and binary. If we find any performance regression by this, we will not merge this to master. This will increase indirect function call costs but this will reduce runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)" branch costs. This uses an optimization based of static inline function and a constant argument call for cstate->opts.csv_mode. For example, CopyFromTextLikeOneRow() uses this optimization. It accepts the "bool is_csv" argument instead of using cstate->opts.csv_mode in it. CopyFromTextOneRow() calls CopyFromTextLikeOneRow() with false (constant) for "bool is_csv". Compiler will remove "if (is_csv)" branch in it by this optimization. This doesn't change existing logic. This just moves existing codes. --- src/backend/commands/copyfrom.c | 215 ++++++--- src/backend/commands/copyfromparse.c | 550 +++++++++++++---------- src/include/commands/copy.h | 2 - src/include/commands/copyfrom_internal.h | 8 + 4 files changed, 487 insertions(+), 288 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index c42485bd9cb..14f95f17124 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -106,6 +106,157 @@ typedef struct CopyMultiInsertInfo /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); + +/* + * CopyFromRoutine implementations for text and CSV. + */ + +/* + * CopyFromTextLikeInFunc + * + * Assign input function data for a relation's attribute in text/CSV format. + */ +static void +CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromTextLikeStart + * + * Start of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Create workspace for CopyReadAttributes results; used by CSV and text + * format. + */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +/* + * CopyFromTextLikeEnd + * + * End of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * CopyFromRoutine implementation for "binary". + */ + +/* + * CopyFromBinaryInFunc + * + * Assign input function data for a relation's attribute in binary format. + */ +static void +CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromBinaryStart + * + * Start of COPY FROM for binary format. + */ +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +/* + * CopyFromBinaryEnd + * + * End of COPY FROM for binary format. + */ +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * Routines assigned to each format. ++ + * CSV and text share the same implementation, at the exception of the + * per-row callback. + */ +static const CopyFromRoutine CopyFromRoutineText = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromCSVOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromInFunc = CopyFromBinaryInFunc, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + +/* + * Define the COPY FROM routines to use for a format. + */ +static const CopyFromRoutine * +CopyFromGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyFromRoutineCSV; + else if (opts.binary) + return &CopyFromRoutineBinary; + + /* default is text */ + return &CopyFromRoutineText; +} + + /* * error context callback for COPY FROM * @@ -1393,7 +1544,6 @@ BeginCopyFrom(ParseState *pstate, num_defaults; FmgrInfo *in_functions; Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1425,6 +1575,9 @@ BeginCopyFrom(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyFromGetRoutine(cstate->opts); + /* Process the target relation */ cstate->rel = rel; @@ -1580,25 +1733,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1631,23 +1765,9 @@ BeginCopyFrom(ParseState *pstate, continue; /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - { - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyFromInFunc(cstate, att->atttypid, - &in_functions[attnum - 1], - &typioparams[attnum - 1]); - - else - { - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1782,23 +1902,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - else if (cstate->routine) - { - cstate->routine->CopyFromStart(cstate, tupDesc); - } - else - { - /* create workspace for CopyReadAttributes results */ - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1811,8 +1915,7 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { - if (cstate->routine) - cstate->routine->CopyFromEnd(cstate); + cstate->routine->CopyFromEnd(cstate); /* No COPY FROM related resources except memory. */ if (cstate->is_program) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 2e126448019..5f63b683d17 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -149,8 +149,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); +static bool CopyReadLine(CopyFromState cstate, bool is_csv); +static inline bool CopyReadLineText(CopyFromState cstate, bool is_csv); static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate); static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, @@ -750,8 +750,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) * * NOTE: force_not_null option are not applied to the returned fields. */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +static inline bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; bool done; @@ -768,13 +768,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) tupDesc = RelationGetDescr(cstate->rel); cstate->cur_lineno++; - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); if (cstate->opts.header_line == COPY_HEADER_MATCH) { int fldnum; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -818,7 +822,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) cstate->cur_lineno++; /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); /* * EOF at start of line means we're done. If we see EOF after some @@ -828,8 +832,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) if (done && cstate->line_buf.len == 0) return false; - /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) + /* + * Parse the line into de-escaped field values + * + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -839,6 +848,267 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } +/* + * CopyFromTextLikeOneRow + * + * Copy one row to a set of `values` and `nulls` for the text and CSV + * formats. + * + * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). + */ +static inline bool +CopyFromTextLikeOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls, + bool is_csv) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (is_csv) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } + + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + cstate->num_errors++; + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below + * notice message, we suppress error context information other + * than the relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); + + /* reset relname_only */ + cstate->relname_only = false; + } + + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + + return true; +} + + +/* + * CopyFromTextOneRow + * + * Per-row callback for COPY FROM with text format. + */ +bool +CopyFromTextOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); +} + +/* + * CopyFromCSVOneRow + * + * Per-row callback for COPY FROM with CSV format. + */ +bool +CopyFromCSVOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); +} + +/* + * CopyFromBinaryOneRow + * + * Copy one row to a set of `values` and `nulls` for the binary format. + */ +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + return true; +} + /* * Read next tuple from file for COPY FROM. Return false if no more tuples. * @@ -856,221 +1126,21 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, { TupleDesc tupDesc; AttrNumber num_phys_attrs, - attr_count, num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); - } - - /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors - */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - - cstate->num_errors++; - - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information - * other than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; - - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": nullinput", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; - } - - return true; - } - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else if (cstate->routine) - { - if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) - return false; - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; - - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; /* * Now compute and insert any defaults available for the columns not @@ -1101,7 +1171,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, * in the final value of line_buf. */ static bool -CopyReadLine(CopyFromState cstate) +CopyReadLine(CopyFromState cstate, bool is_csv) { bool result; @@ -1109,7 +1179,7 @@ CopyReadLine(CopyFromState cstate) cstate->line_buf_valid = false; /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); + result = CopyReadLineText(cstate, is_csv); if (result) { @@ -1176,8 +1246,8 @@ CopyReadLine(CopyFromState cstate) /* * CopyReadLineText - inner loop of CopyReadLine for text mode */ -static bool -CopyReadLineText(CopyFromState cstate) +static inline bool +CopyReadLineText(CopyFromState cstate, bool is_csv) { char *copy_input_buf; int input_buf_ptr; @@ -1193,7 +1263,11 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) { quotec = cstate->opts.quote[0]; escapec = cstate->opts.escape[0]; @@ -1270,7 +1344,11 @@ CopyReadLineText(CopyFromState cstate) prev_raw_ptr = input_buf_ptr; c = copy_input_buf[input_buf_ptr++]; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant + * at caller. + */ + if (is_csv) { /* * If character is '\\' or '\r', we may need to look ahead below. @@ -1309,7 +1387,7 @@ CopyReadLineText(CopyFromState cstate) } /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\r' && (!is_csv || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || @@ -1337,10 +1415,10 @@ CopyReadLineText(CopyFromState cstate) if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); @@ -1354,10 +1432,10 @@ CopyReadLineText(CopyFromState cstate) else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ @@ -1365,15 +1443,15 @@ CopyReadLineText(CopyFromState cstate) } /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\n' && (!is_csv || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ @@ -1385,7 +1463,7 @@ CopyReadLineText(CopyFromState cstate) * In CSV mode, we only recognize \. alone on a line. This is because * \. is a valid CSV data value. */ - if (c == '\\' && (!cstate->opts.csv_mode || first_char_in_line)) + if (c == '\\' && (!is_csv || first_char_in_line)) { char c2; @@ -1418,7 +1496,11 @@ CopyReadLineText(CopyFromState cstate) if (c2 == '\n') { - if (!cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as + * argument is constant at caller. + */ + if (!is_csv) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker does not match previous newline style"))); @@ -1427,7 +1509,11 @@ CopyReadLineText(CopyFromState cstate) } else if (c2 != '\r') { - if (!cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as + * argument is constant at caller. + */ + if (!is_csv) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); @@ -1443,7 +1529,11 @@ CopyReadLineText(CopyFromState cstate) if (c2 != '\r' && c2 != '\n') { - if (!cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument + * is constant at caller. + */ + if (!is_csv) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); @@ -1472,7 +1562,7 @@ CopyReadLineText(CopyFromState cstate) result = true; /* report EOF */ break; } - else if (!cstate->opts.csv_mode) + else if (!is_csv) { /* * If we are here, it means we found a backslash followed by diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index dd645eaa030..e5696839637 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -35,8 +35,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); -extern bool NextCopyFromRawFields(CopyFromState cstate, - char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); extern char *CopyLimitPrintoutLength(const char *str); diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 509b9e92a18..c11b5ff3cc0 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -187,4 +187,12 @@ typedef struct CopyFromStateData extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +/* Callbacks for CopyFromRoutine->CopyFromOneRow */ +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + #endif /* COPYFROM_INTERNAL_H */ -- 2.45.2 From be33c9cf092ae2593391260d1ce99848cee1eef4 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:17:53 +0900 Subject: [PATCH v21 08/10] Add support for adding custom COPY FROM format This uses the same handler for COPY TO and COPY FROM but uses different routine. This uses CopyToRoutine for COPY TO and CopyFromRoutine for COPY FROM. PostgreSQL calls a COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM and false for COPY TO: copy_handler(true) returns CopyToRoutine copy_handler(false) returns CopyFromRoutine This also add a test module for custom COPY FROM handler. --- src/backend/commands/copy.c | 52 ++++++++++++------- src/backend/commands/copyfrom.c | 4 +- src/include/catalog/pg_type.dat | 2 +- src/include/commands/copyapi.h | 5 +- .../expected/test_copy_format.out | 10 ++-- .../test_copy_format/sql/test_copy_format.sql | 1 + .../test_copy_format/test_copy_format.c | 39 +++++++++++++- 7 files changed, 87 insertions(+), 26 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3aea654ab8a..d7d409379d1 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -450,8 +450,8 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) * This function checks whether the option value is a built-in format such as * "text" and "csv" or not. If the option value isn't a built-in format, this * function finds a COPY format handler that returns a CopyToRoutine (for - * is_from == false). If no COPY format handler is found, this function - * reports an error. + * is_from == false) or CopyFromRountine (for is_from == true). If no COPY + * format handler is found, this function reports an error. */ static void ProcessCopyOptionFormat(ParseState *pstate, @@ -482,12 +482,9 @@ ProcessCopyOptionFormat(ParseState *pstate, } /* custom format */ - if (!is_from) - { - funcargtypes[0] = INTERNALOID; - handlerOid = LookupFuncName(list_make1(makeString(format)), 1, - funcargtypes, true); - } + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); if (!OidIsValid(handlerOid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -496,17 +493,34 @@ ProcessCopyOptionFormat(ParseState *pstate, datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); routine = (Node *) DatumGetPointer(datum); - if (routine == NULL || !IsA(routine, CopyToRoutine)) - ereport( - ERROR, - (errcode( - ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY handler function " - "%s(%u) did not return a " - "CopyToRoutine struct", - format, handlerOid), - parser_errposition( - pstate, defel->location))); + if (is_from) + { + if (routine == NULL || !IsA(routine, CopyFromRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyFromRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } + else + { + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } opts_out->routine = routine; } diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 14f95f17124..7ecd9a1ad2c 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -247,7 +247,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = { static const CopyFromRoutine * CopyFromGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyFromRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyFromRoutineCSV; else if (opts.binary) return &CopyFromRoutineBinary; diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index 793dd671935..37ebfa0908f 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -634,7 +634,7 @@ typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, { oid => '8752', - descr => 'pseudo-type for the result of a copy to method function', + descr => 'pseudo-type for the result of a copy to/from method function', typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', typcategory => 'P', typinput => 'copy_handler_in', typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 7421241de83..744d432f387 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -84,7 +84,8 @@ typedef struct CopyFormatOptions CopyOnErrorChoice on_error; /* what to do when error happened */ CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ List *convert_select; /* list of column names (can be NIL) */ - Node *routine; /* CopyToRoutine (can be NULL) */ + Node *routine; /* CopyToRoutine or CopyFromRoutine (can be + * NULL) */ } CopyFormatOptions; /* This is private in commands/copyfrom.c */ @@ -96,6 +97,8 @@ typedef struct CopyFromStateData *CopyFromState; */ typedef struct CopyFromRoutine { + NodeTag type; + /* * Called when COPY FROM is started to set up the input functions * associated with the relation's attributes writing to. `finfo` can be diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out index 606c78f6878..4ed7c0b12db 100644 --- a/src/test/modules/test_copy_format/expected/test_copy_format.out +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -2,9 +2,13 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); COPY public.test FROM stdin WITH (format 'test_copy_format'); -ERROR: COPY format "test_copy_format" not recognized -LINE 1: COPY public.test FROM stdin WITH (format 'test_copy_format')... - ^ +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd COPY public.test TO stdout WITH (format 'test_copy_format'); NOTICE: test_copy_format: is_from=false NOTICE: CopyToOutFunc: atttypid=21 diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql index 9406b3be3d4..e805f7cb011 100644 --- a/src/test/modules/test_copy_format/sql/test_copy_format.sql +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -2,4 +2,5 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); COPY public.test FROM stdin WITH (format 'test_copy_format'); +\. COPY public.test TO stdout WITH (format 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c index e064f40473b..f6b105659ab 100644 --- a/src/test/modules/test_copy_format/test_copy_format.c +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -18,6 +18,40 @@ PG_MODULE_MAGIC; +static void +CopyFromInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid))); +} + +static void +CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts))); +} + +static bool +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +CopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .type = T_CopyFromRoutine, + .CopyFromInFunc = CopyFromInFunc, + .CopyFromStart = CopyFromStart, + .CopyFromOneRow = CopyFromOneRow, + .CopyFromEnd = CopyFromEnd, +}; + static void CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) { @@ -59,5 +93,8 @@ test_copy_format(PG_FUNCTION_ARGS) ereport(NOTICE, (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); - PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); + if (is_from) + PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat); + else + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); } -- 2.45.2 From 0c4550533ccd44febf576c144cfd5425002a9a41 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:28:02 +0900 Subject: [PATCH v21 09/10] Export CopyFromStateData It's for custom COPY FROM format handlers implemented as extension. This just moves codes. This doesn't change codes except CopySource enum values. This changes COPY_ prefix of CopySource enum values to COPY_SOURCE_ prefix like the CopyDest enum values prefix change. For example, COPY_FILE in CopySource is renamed to COPY_SOURCE_FILE. Note that this isn't enough to implement custom COPY FROM format handlers as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY FROM format handler 2. Export CopyReadBinaryData() to read the next data --- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyfromparse.c | 10 +- src/include/commands/copy.h | 5 - src/include/commands/copyapi.h | 168 ++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 165 ---------------------- 5 files changed, 174 insertions(+), 178 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 7ecd9a1ad2c..dd4342f8b9c 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1713,7 +1713,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1841,7 +1841,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 5f63b683d17..ec86a17b3b3 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -180,7 +180,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -257,7 +257,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -340,7 +340,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1188,7 +1188,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index e5696839637..e2411848e9f 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -19,11 +19,6 @@ #include "parser/parse_node.h" #include "tcop/dest.h" -/* This is private in commands/copyfrom.c */ -typedef struct CopyFromStateData *CopyFromState; - -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); - extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 744d432f387..c118558ee71 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -88,7 +88,6 @@ typedef struct CopyFormatOptions * NULL) */ } CopyFormatOptions; -/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; /* @@ -135,6 +134,173 @@ typedef struct CopyFromRoutine void (*CopyFromEnd) (CopyFromState cstate); } CopyFromRoutine; +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +/* + * Represents the insert method to be used during COPY FROM. + */ +typedef enum CopyInsertMethod +{ + CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ + CIM_MULTI, /* always use table_multi_insert or + * ExecForeignBatchInsert */ + CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + * ExecForeignBatchInsert only if valid */ +} CopyInsertMethod; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* format routine */ + const CopyFromRoutine *routine; + + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyFromStateData; + typedef struct CopyToStateData *CopyToState; /* diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c11b5ff3cc0..3863d26d5b7 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -19,171 +19,6 @@ #include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - -/* - * Represents the insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or - * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or - * ExecForeignBatchInsert only if valid */ -} CopyInsertMethod; - -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* format routine */ - const CopyFromRoutine *routine; - - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); -- 2.45.2 From 0c31ea16da2fe11004443e3fc1646d4f0dca0247 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:32:31 +0900 Subject: [PATCH v21 10/10] Add support for implementing custom COPY FROM format as extension * Add CopyFromStateData::opaque that can be used to keep data for custom COPY From format implementation * Export CopyReadBinaryData() to read the next data as CopyFromStateRead() --- src/backend/commands/copyfromparse.c | 14 ++++++++++++++ src/include/commands/copyapi.h | 6 ++++++ 2 files changed, 20 insertions(+) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index ec86a17b3b3..64772877b0f 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -739,6 +739,20 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) return copied_bytes; } +/* + * CopyFromStateRead + * + * Export CopyReadBinaryData() for extensions. We want to keep + * CopyReadBinaryData() as a static function for + * optimization. CopyReadBinaryData() calls in this file may be optimized by + * a compiler. + */ +int +CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes) +{ + return CopyReadBinaryData(cstate, dest, nbytes); +} + /* * Read raw fields in the next line for COPY FROM in text or csv mode. * Return false if no more lines. diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index c118558ee71..c1e9fe366f3 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -299,8 +299,14 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyFromStateData; +extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); + + typedef struct CopyToStateData *CopyToState; /* -- 2.45.2
pgsql-hackers by date: