From 8e10b07b43231b44085a73fcbe1a89d36913862c Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 3 Nov 2020 09:31:11 +0200 Subject: [PATCH v4 2/2] Split copyfrom.c further into copyfrom.c and copyfromparse.c. COPY FROM processing has two main parts: 1. Parse the input text/CSV/binary file into rows, as Datums. 2. Feed the rows into a table. These parts are both fairly complicated, and fairly independent of each other. Extract the code for the first part into copyfromparse.c. --- src/backend/commands/Makefile | 1 + src/backend/commands/copyfrom.c | 1658 +--------------------- src/backend/commands/copyfromparse.c | 1646 +++++++++++++++++++++ src/include/commands/copyfrom_internal.h | 170 +++ 4 files changed, 1819 insertions(+), 1656 deletions(-) create mode 100644 src/backend/commands/copyfromparse.c create mode 100644 src/include/commands/copyfrom_internal.h diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 0e1b9247e76..e8504f0ae41 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -25,6 +25,7 @@ OBJS = \ conversioncmds.o \ copy.o \ copyfrom.o \ + copyfromparse.o \ copyto.o \ createas.o \ dbcommands.o \ diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 03721681e1f..8bd49831642 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -28,6 +28,7 @@ #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/copyfrom_internal.h" #include "commands/defrem.h" #include "commands/trigger.h" #include "executor/execPartition.h" @@ -62,152 +63,6 @@ #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_OLD_FE, /* from frontend (2.0 protocol) */ - COPY_NEW_FE, /* from frontend (3.0 protocol) */ - COPY_CALLBACK /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL -} EolType; - -/* - * Represents the heap insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or fdw routine */ - CIM_MULTI, /* always use table_multi_insert */ - CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ -} CopyInsertMethod; - -/* - * This struct contains all the state variables used throughout a COPY - * operation. For simplicity, we use the same struct for all variants of COPY, - * even though some fields are used in only some cases. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyFromStateData -{ - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for - * dest == COPY_NEW_FE in COPY FROM */ - bool reached_eof; /* true if we read to end of copy data (not - * all copy_src types maintain this) */ - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN/STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - - /* - * Working state for COPY TO/FROM - */ - MemoryContext copycontext; /* per-copy execution context */ - - /* - * Working state for COPY FROM - */ - AttrNumber num_defaults; - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - int *defmap; /* array of default att numbers */ - ExprState **defexprs; /* array of default att expressions */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, convert it - * to server encoding there, and then extract the individual attribute - * fields into attribute_buf. line_buf is preserved unmodified so that we - * can display it in error messages if appropriate. (In binary mode, - * line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_converted; /* converted to server encoding? */ - bool line_buf_valid; /* contains the row being processed? */ - - /* - * Finally, raw_buf holds raw data read from the data source (file or - * client connection). In text mode, CopyReadLine parses this data - * sufficiently to locate line boundaries, then transfers the data to - * line_buf and converts it. In binary mode, CopyReadBinaryData fetches - * appropriate amounts of data from this buffer. In both modes, we - * guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) -} CopyFromStateData; - /* * No more than this many tuples per CopyMultiInsertBuffer * @@ -255,7 +110,6 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; - /* * These macros centralize code used to process line_buf and raw_buf buffers. * They are macros because they often do continue/break control and to avoid @@ -321,304 +175,12 @@ if (1) \ goto not_end_of_copy; \ } else ((void) 0) -static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; - /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); -static int CopyReadAttributesText(CopyFromState cstate); -static int CopyReadAttributesCSV(CopyFromState cstate); -static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, - Oid typioparam, int32 typmod, - bool *isnull); static char *limit_printout_length(const char *str); static void ClosePipeToProgram(CopyFromState cstate); -/* Low-level communications functions */ -static void ReceiveCopyBegin(CopyFromState cstate); -static int CopyGetData(CopyFromState cstate, void *databuf, - int minread, int maxread); -static bool CopyGetInt32(CopyFromState cstate, int32 *val); -static bool CopyGetInt16(CopyFromState cstate, int16 *val); -static bool CopyLoadRawBuf(CopyFromState cstate); -static int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes); - -static void -ReceiveCopyBegin(CopyFromState cstate) -{ - if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) - { - /* new way */ - StringInfoData buf; - int natts = list_length(cstate->attnumlist); - int16 format = (cstate->opts.binary ? 1 : 0); - int i; - - pq_beginmessage(&buf, 'G'); - pq_sendbyte(&buf, format); /* overall format */ - pq_sendint16(&buf, natts); - for (i = 0; i < natts; i++) - pq_sendint16(&buf, format); /* per-column formats */ - pq_endmessage(&buf); - cstate->copy_src = COPY_NEW_FE; - cstate->fe_msgbuf = makeStringInfo(); - } - else - { - /* old way */ - if (cstate->opts.binary) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY BINARY is not supported to stdout or from stdin"))); - pq_putemptymessage('G'); - /* any error in old protocol will make us lose sync */ - pq_startmsgread(); - cstate->copy_src = COPY_OLD_FE; - } - /* We *must* flush here to ensure FE knows it can send. */ - pq_flush(); -} - -/* - * CopyGetData reads data from the source (file or frontend) - * - * We attempt to read at least minread, and at most maxread, bytes from - * the source. The actual number of bytes read is returned; if this is - * less than minread, EOF was detected. - * - * Note: when copying from the frontend, we expect a proper EOF mark per - * protocol; if the frontend simply drops the connection, we raise error. - * It seems unwise to allow the COPY IN to complete normally in that case. - * - * NB: no data conversion is applied here. - */ -static int -CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) -{ - int bytesread = 0; - - switch (cstate->copy_src) - { - case COPY_FILE: - bytesread = fread(databuf, 1, maxread, cstate->copy_file); - if (ferror(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from COPY file: %m"))); - if (bytesread == 0) - cstate->reached_eof = true; - break; - case COPY_OLD_FE: - - /* - * We cannot read more than minread bytes (which in practice is 1) - * because old protocol doesn't have any clear way of separating - * the COPY stream from following data. This is slow, but not any - * slower than the code path was originally, and we don't care - * much anymore about the performance of old protocol. - */ - if (pq_getbytes((char *) databuf, minread)) - { - /* Only a \. terminator is legal EOF in old protocol */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection with an open transaction"))); - } - bytesread = minread; - break; - case COPY_NEW_FE: - while (maxread > 0 && bytesread < minread && !cstate->reached_eof) - { - int avail; - - while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len) - { - /* Try to receive another message */ - int mtype; - - readmessage: - HOLD_CANCEL_INTERRUPTS(); - pq_startmsgread(); - mtype = pq_getbyte(); - if (mtype == EOF) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection with an open transaction"))); - if (pq_getmessage(cstate->fe_msgbuf, 0)) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection with an open transaction"))); - RESUME_CANCEL_INTERRUPTS(); - switch (mtype) - { - case 'd': /* CopyData */ - break; - case 'c': /* CopyDone */ - /* COPY IN correctly terminated by frontend */ - cstate->reached_eof = true; - return bytesread; - case 'f': /* CopyFail */ - ereport(ERROR, - (errcode(ERRCODE_QUERY_CANCELED), - errmsg("COPY from stdin failed: %s", - pq_getmsgstring(cstate->fe_msgbuf)))); - break; - case 'H': /* Flush */ - case 'S': /* Sync */ - - /* - * Ignore Flush/Sync for the convenience of client - * libraries (such as libpq) that may send those - * without noticing that the command they just - * sent was COPY. - */ - goto readmessage; - default: - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected message type 0x%02X during COPY from stdin", - mtype))); - break; - } - } - avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; - if (avail > maxread) - avail = maxread; - pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); - databuf = (void *) ((char *) databuf + avail); - maxread -= avail; - bytesread += avail; - } - break; - case COPY_CALLBACK: - bytesread = cstate->data_source_cb(databuf, minread, maxread); - break; - } - - return bytesread; -} - - -/* - * These functions do apply some data conversion - */ - -/* - * CopyGetInt32 reads an int32 that appears in network byte order - * - * Returns true if OK, false if EOF - */ -static inline bool -CopyGetInt32(CopyFromState cstate, int32 *val) -{ - uint32 buf; - - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int32) pg_ntoh32(buf); - return true; -} - -/* - * CopyGetInt16 reads an int16 that appears in network byte order - */ -static inline bool -CopyGetInt16(CopyFromState cstate, int16 *val) -{ - uint16 buf; - - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int16) pg_ntoh16(buf); - return true; -} - - -/* - * CopyLoadRawBuf loads some more data into raw_buf - * - * Returns true if able to obtain at least one more byte, else false. - * - * If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start - * of the buffer and then we load more data after that. This case occurs only - * when a multibyte character crosses a bufferload boundary. - */ -static bool -CopyLoadRawBuf(CopyFromState cstate) -{ - int nbytes = RAW_BUF_BYTES(cstate); - int inbytes; - - /* Copy down the unprocessed data if any. */ - if (nbytes > 0) - memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, - nbytes); - - inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, - 1, RAW_BUF_SIZE - nbytes); - nbytes += inbytes; - cstate->raw_buf[nbytes] = '\0'; - cstate->raw_buf_index = 0; - cstate->raw_buf_len = nbytes; - return (inbytes > 0); -} - -/* - * CopyReadBinaryData - * - * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf - * and writes them to 'dest'. Returns the number of bytes read (which - * would be less than 'nbytes' only if we reach EOF). - */ -static int -CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) -{ - int copied_bytes = 0; - - if (RAW_BUF_BYTES(cstate) >= nbytes) - { - /* Enough bytes are present in the buffer. */ - memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, nbytes); - cstate->raw_buf_index += nbytes; - copied_bytes = nbytes; - } - else - { - /* - * Not enough bytes in the buffer, so must read from the file. Need - * to loop since 'nbytes' could be larger than the buffer size. - */ - do - { - int copy_bytes; - - /* Load more data if buffer is empty. */ - if (RAW_BUF_BYTES(cstate) == 0) - { - if (!CopyLoadRawBuf(cstate)) - break; /* EOF */ - } - - /* Transfer some bytes. */ - copy_bytes = Min(nbytes - copied_bytes, RAW_BUF_BYTES(cstate)); - memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, copy_bytes); - cstate->raw_buf_index += copy_bytes; - dest += copy_bytes; - copied_bytes += copy_bytes; - } while (copied_bytes < nbytes); - } - - return copied_bytes; -} - /* * error context callback for COPY FROM * @@ -2002,43 +1564,7 @@ BeginCopyFrom(ParseState *pstate, if (cstate->opts.binary) { /* Read and verify binary header */ - char readSig[11]; - int32 tmp; - - /* Signature */ - if (CopyReadBinaryData(cstate, readSig, 11) != 11 || - memcmp(readSig, BinarySignature, 11) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("COPY file signature not recognized"))); - /* Flags field */ - if (!CopyGetInt32(cstate, &tmp)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (missing flags)"))); - if ((tmp & (1 << 16)) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (WITH OIDS)"))); - tmp &= ~(1 << 16); - if ((tmp >> 16) != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unrecognized critical flags in COPY file header"))); - /* Header extension length */ - if (!CopyGetInt32(cstate, &tmp) || - tmp < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (missing length)"))); - /* Skip extension header, if present */ - while (tmp-- > 0) - { - if (CopyReadBinaryData(cstate, readSig, 1) != 1) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid COPY file header (wrong length)"))); - } + ReceiveCopyBinaryHeader(cstate); } /* create workspace for CopyReadAttributes results */ @@ -2055,251 +1581,6 @@ BeginCopyFrom(ParseState *pstate, return cstate; } -/* - * Read raw fields in the next line for COPY FROM in text or csv mode. - * Return false if no more lines. - * - * An internal temporary buffer is returned via 'fields'. It is valid until - * the next call of the function. Since the function returns all raw fields - * in the input file, 'nfields' could be different from the number of columns - * in the relation. - * - * NOTE: force_not_null option are not applied to the returned fields. - */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) -{ - int fldct; - bool done; - - /* only available for text or csv input */ - Assert(!cstate->opts.binary); - - /* on input just throw the header line away */ - if (cstate->cur_lineno == 0 && cstate->opts.header_line) - { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) - return false; /* done */ - } - - cstate->cur_lineno++; - - /* Actually read the line into memory here */ - done = CopyReadLine(cstate); - - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - return false; - - /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) - fldct = CopyReadAttributesCSV(cstate); - else - fldct = CopyReadAttributesText(cstate); - - *fields = cstate->raw_fields; - *nfields = fldct; - return true; -} - -/* - * Read next tuple from file for COPY FROM. Return false if no more tuples. - * - * 'econtext' is used to evaluate default expression for each columns not - * read from the file. It can be NULL when no default values are used, i.e. - * when all columns are read from the file. - * - * 'values' and 'nulls' arrays must be the same length as columns of the - * relation passed to BeginCopyFrom. This function fills the arrays. - * Oid of the tuple is returned with 'tupleOid' separately. - */ -bool -NextCopyFrom(CopyFromState cstate, ExprContext *econtext, - Datum *values, bool *nulls) -{ - TupleDesc tupDesc; - AttrNumber num_phys_attrs, - attr_count, - num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; - int i; - int *defmap = cstate->defmap; - ExprState **defexprs = cstate->defexprs; - - tupDesc = RelationGetDescr(cstate->rel); - num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); - - /* Initialize all values for row to NULL */ - MemSet(values, 0, num_phys_attrs * sizeof(Datum)); - MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - att->atttypmod); - if (string != NULL) - nulls[m] = false; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; - - if (cstate->copy_src != COPY_OLD_FE && - CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } - - /* - * Now compute and insert any defaults available for the columns not - * provided by the input data. Anything not processed here or above will - * remain NULL. - */ - for (i = 0; i < num_defaults; i++) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, - &nulls[defmap[i]]); - } - - return true; -} - /* * Clean up storage and release resources for COPY FROM. */ @@ -2358,938 +1639,3 @@ ClosePipeToProgram(CopyFromState cstate) errdetail_internal("%s", wait_result_to_str(pclose_rc)))); } } - -/* - * Read the next input line and stash it in line_buf, with conversion to - * server encoding. - * - * Result is true if read was terminated by EOF, false if terminated - * by newline. The terminating newline or EOF marker is not included - * in the final value of line_buf. - */ -static bool -CopyReadLine(CopyFromState cstate) -{ - bool result; - - resetStringInfo(&cstate->line_buf); - cstate->line_buf_valid = true; - - /* Mark that encoding conversion hasn't occurred yet */ - cstate->line_buf_converted = false; - - /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); - - if (result) - { - /* - * Reached EOF. In protocol version 3, we should ignore anything - * after \. up to the protocol end of copy data. (XXX maybe better - * not to treat \. as special?) - */ - if (cstate->copy_src == COPY_NEW_FE) - { - do - { - cstate->raw_buf_index = cstate->raw_buf_len; - } while (CopyLoadRawBuf(cstate)); - } - } - else - { - /* - * If we didn't hit EOF, then we must have transferred the EOL marker - * to line_buf along with the data. Get rid of it. - */ - switch (cstate->eol_type) - { - case EOL_NL: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CR: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CRNL: - Assert(cstate->line_buf.len >= 2); - Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len -= 2; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_UNKNOWN: - /* shouldn't get here */ - Assert(false); - break; - } - } - - /* Done reading the line. Convert it to server encoding. */ - if (cstate->need_transcoding) - { - char *cvt; - - cvt = pg_any_to_server(cstate->line_buf.data, - cstate->line_buf.len, - cstate->file_encoding); - if (cvt != cstate->line_buf.data) - { - /* transfer converted data back to line_buf */ - resetStringInfo(&cstate->line_buf); - appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); - pfree(cvt); - } - } - - /* Now it's safe to use the buffer in error messages */ - cstate->line_buf_converted = true; - - return result; -} - -/* - * CopyReadLineText - inner loop of CopyReadLine for text mode - */ -static bool -CopyReadLineText(CopyFromState cstate) -{ - char *copy_raw_buf; - int raw_buf_ptr; - int copy_buf_len; - bool need_data = false; - bool hit_eof = false; - bool result = false; - char mblen_str[2]; - - /* CSV variables */ - bool first_char_in_line = true; - bool in_quote = false, - last_was_esc = false; - char quotec = '\0'; - char escapec = '\0'; - - if (cstate->opts.csv_mode) - { - quotec = cstate->opts.quote[0]; - escapec = cstate->opts.escape[0]; - /* ignore special escape processing if it's the same as quotec */ - if (quotec == escapec) - escapec = '\0'; - } - - mblen_str[1] = '\0'; - - /* - * The objective of this loop is to transfer the entire next input line - * into line_buf. Hence, we only care for detecting newlines (\r and/or - * \n) and the end-of-copy marker (\.). - * - * In CSV mode, \r and \n inside a quoted field are just part of the data - * value and are put in line_buf. We keep just enough state to know if we - * are currently in a quoted field or not. - * - * These four characters, and the CSV escape and quote characters, are - * assumed the same in frontend and backend encodings. - * - * For speed, we try to move data from raw_buf to line_buf in chunks - * rather than one character at a time. raw_buf_ptr points to the next - * character to examine; any characters from raw_buf_index to raw_buf_ptr - * have been determined to be part of the line, but not yet transferred to - * line_buf. - * - * For a little extra speed within the loop, we copy raw_buf and - * raw_buf_len into local variables. - */ - copy_raw_buf = cstate->raw_buf; - raw_buf_ptr = cstate->raw_buf_index; - copy_buf_len = cstate->raw_buf_len; - - for (;;) - { - int prev_raw_ptr; - char c; - - /* - * Load more data if needed. Ideally we would just force four bytes - * of read-ahead and avoid the many calls to - * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol - * does not allow us to read too far ahead or we might read into the - * next data, so we read-ahead only as far we know we can. One - * optimization would be to read-ahead four byte here if - * cstate->copy_src != COPY_OLD_FE, but it hardly seems worth it, - * considering the size of the buffer. - */ - if (raw_buf_ptr >= copy_buf_len || need_data) - { - REFILL_LINEBUF; - - /* - * Try to read some more data. This will certainly reset - * raw_buf_index to zero, and raw_buf_ptr must go with it. - */ - if (!CopyLoadRawBuf(cstate)) - hit_eof = true; - raw_buf_ptr = 0; - copy_buf_len = cstate->raw_buf_len; - - /* - * If we are completely out of data, break out of the loop, - * reporting EOF. - */ - if (copy_buf_len <= 0) - { - result = true; - break; - } - need_data = false; - } - - /* OK to fetch a character */ - prev_raw_ptr = raw_buf_ptr; - c = copy_raw_buf[raw_buf_ptr++]; - - if (cstate->opts.csv_mode) - { - /* - * If character is '\\' or '\r', we may need to look ahead below. - * Force fetch of the next character if we don't already have it. - * We need to do this before changing CSV state, in case one of - * these characters is also the quote or escape character. - * - * Note: old-protocol does not like forced prefetch, but it's OK - * here since we cannot validly be at EOF. - */ - if (c == '\\' || c == '\r') - { - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - } - - /* - * Dealing with quotes and escapes here is mildly tricky. If the - * quote char is also the escape char, there's no problem - we - * just use the char as a toggle. If they are different, we need - * to ensure that we only take account of an escape inside a - * quoted field and immediately preceding a quote char, and not - * the second in an escape-escape sequence. - */ - if (in_quote && c == escapec) - last_was_esc = !last_was_esc; - if (c == quotec && !last_was_esc) - in_quote = !in_quote; - if (c != escapec) - last_was_esc = false; - - /* - * Updating the line count for embedded CR and/or LF chars is - * necessarily a little fragile - this test is probably about the - * best we can do. (XXX it's arguable whether we should do this - * at all --- is cur_lineno a physical or logical count?) - */ - if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) - cstate->cur_lineno++; - } - - /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) - { - /* Check for \r\n on first line, _and_ handle \r\n. */ - if (cstate->eol_type == EOL_UNKNOWN || - cstate->eol_type == EOL_CRNL) - { - /* - * If need more data, go back to loop top to load it. - * - * Note that if we are at EOF, c will wind up as '\0' because - * of the guaranteed pad of raw_buf. - */ - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - - /* get next char */ - c = copy_raw_buf[raw_buf_ptr]; - - if (c == '\n') - { - raw_buf_ptr++; /* eat newline */ - cstate->eol_type = EOL_CRNL; /* in case not set yet */ - } - else - { - /* found \r, but no \n */ - if (cstate->eol_type == EOL_CRNL) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? - errmsg("literal carriage return found in data") : - errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? - errhint("Use \"\\r\" to represent carriage return.") : - errhint("Use quoted CSV field to represent carriage return."))); - - /* - * if we got here, it is the first line and we didn't find - * \n, so don't consume the peeked character - */ - cstate->eol_type = EOL_CR; - } - } - else if (cstate->eol_type == EOL_NL) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? - errmsg("literal carriage return found in data") : - errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? - errhint("Use \"\\r\" to represent carriage return.") : - errhint("Use quoted CSV field to represent carriage return."))); - /* If reach here, we have found the line terminator */ - break; - } - - /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) - { - if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? - errmsg("literal newline found in data") : - errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? - errhint("Use \"\\n\" to represent newline.") : - errhint("Use quoted CSV field to represent newline."))); - cstate->eol_type = EOL_NL; /* in case not set yet */ - /* If reach here, we have found the line terminator */ - break; - } - - /* - * In CSV mode, we only recognize \. alone on a line. This is because - * \. is a valid CSV data value. - */ - if (c == '\\' && (!cstate->opts.csv_mode || first_char_in_line)) - { - char c2; - - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - IF_NEED_REFILL_AND_EOF_BREAK(0); - - /* ----- - * get next character - * Note: we do not change c so if it isn't \., we can fall - * through and continue processing for file encoding. - * ----- - */ - c2 = copy_raw_buf[raw_buf_ptr]; - - if (c2 == '.') - { - raw_buf_ptr++; /* consume the '.' */ - - /* - * Note: if we loop back for more data here, it does not - * matter that the CSV state change checks are re-executed; we - * will come back here with no important state changed. - */ - if (cstate->eol_type == EOL_CRNL) - { - /* Get the next character */ - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - /* if hit_eof, c2 will become '\0' */ - c2 = copy_raw_buf[raw_buf_ptr++]; - - if (c2 == '\n') - { - if (!cstate->opts.csv_mode) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker does not match previous newline style"))); - else - NO_END_OF_COPY_GOTO; - } - else if (c2 != '\r') - { - if (!cstate->opts.csv_mode) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker corrupt"))); - else - NO_END_OF_COPY_GOTO; - } - } - - /* Get the next character */ - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); - /* if hit_eof, c2 will become '\0' */ - c2 = copy_raw_buf[raw_buf_ptr++]; - - if (c2 != '\r' && c2 != '\n') - { - if (!cstate->opts.csv_mode) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker corrupt"))); - else - NO_END_OF_COPY_GOTO; - } - - if ((cstate->eol_type == EOL_NL && c2 != '\n') || - (cstate->eol_type == EOL_CRNL && c2 != '\n') || - (cstate->eol_type == EOL_CR && c2 != '\r')) - { - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("end-of-copy marker does not match previous newline style"))); - } - - /* - * Transfer only the data before the \. into line_buf, then - * discard the data and the \. sequence. - */ - if (prev_raw_ptr > cstate->raw_buf_index) - appendBinaryStringInfo(&cstate->line_buf, - cstate->raw_buf + cstate->raw_buf_index, - prev_raw_ptr - cstate->raw_buf_index); - cstate->raw_buf_index = raw_buf_ptr; - result = true; /* report EOF */ - break; - } - else if (!cstate->opts.csv_mode) - - /* - * If we are here, it means we found a backslash followed by - * something other than a period. In non-CSV mode, anything - * after a backslash is special, so we skip over that second - * character too. If we didn't do that \\. would be - * considered an eof-of copy, while in non-CSV mode it is a - * literal backslash followed by a period. In CSV mode, - * backslashes are not special, so we want to process the - * character after the backslash just like a normal character, - * so we don't increment in those cases. - */ - raw_buf_ptr++; - } - - /* - * This label is for CSV cases where \. appears at the start of a - * line, but there is more text after it, meaning it was a data value. - * We are more strict for \. in CSV mode because \. could be a data - * value, while in non-CSV mode, \. cannot be a data value. - */ -not_end_of_copy: - - /* - * Process all bytes of a multi-byte character as a group. - * - * We only support multi-byte sequences where the first byte has the - * high-bit set, so as an optimization we can avoid this block - * entirely if it is not set. - */ - if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c)) - { - int mblen; - - /* - * It is enough to look at the first byte in all our encodings, to - * get the length. (GB18030 is a bit special, but still works for - * our purposes; see comment in pg_gb18030_mblen()) - */ - mblen_str[0] = c; - mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str); - - IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1); - IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); - raw_buf_ptr += mblen - 1; - } - first_char_in_line = false; - } /* end of outer loop */ - - /* - * Transfer any still-uncopied data to line_buf. - */ - REFILL_LINEBUF; - - return result; -} - -/* - * Return decimal value for a hexadecimal digit - */ -static int -GetDecimalFromHex(char hex) -{ - if (isdigit((unsigned char) hex)) - return hex - '0'; - else - return tolower((unsigned char) hex) - 'a' + 10; -} - -/* - * Parse the current line into separate attributes (fields), - * performing de-escaping as needed. - * - * The input is in line_buf. We use attribute_buf to hold the result - * strings. cstate->raw_fields[k] is set to point to the k'th attribute - * string, or NULL when the input matches the null marker string. - * This array is expanded as necessary. - * - * (Note that the caller cannot check for nulls since the returned - * string would be the post-de-escaping equivalent, which may look - * the same as some valid data string.) - * - * delim is the column delimiter string (must be just one byte for now). - * null_print is the null marker string. Note that this is compared to - * the pre-de-escaped input string. - * - * The return value is the number of fields actually read. - */ -static int -CopyReadAttributesText(CopyFromState cstate) -{ - char delimc = cstate->opts.delim[0]; - int fieldno; - char *output_ptr; - char *cur_ptr; - char *line_end_ptr; - - /* - * We need a special case for zero-column tables: check that the input - * line is empty, and return. - */ - if (cstate->max_fields <= 0) - { - if (cstate->line_buf.len != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - return 0; - } - - resetStringInfo(&cstate->attribute_buf); - - /* - * The de-escaped attributes will certainly not be longer than the input - * data line, so we can just force attribute_buf to be large enough and - * then transfer data without any checks for enough space. We need to do - * it this way because enlarging attribute_buf mid-stream would invalidate - * pointers already stored into cstate->raw_fields[]. - */ - if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) - enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); - output_ptr = cstate->attribute_buf.data; - - /* set pointer variables for loop */ - cur_ptr = cstate->line_buf.data; - line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; - - /* Outer loop iterates over fields */ - fieldno = 0; - for (;;) - { - bool found_delim = false; - char *start_ptr; - char *end_ptr; - int input_len; - bool saw_non_ascii = false; - - /* Make sure there is enough space for the next value */ - if (fieldno >= cstate->max_fields) - { - cstate->max_fields *= 2; - cstate->raw_fields = - repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); - } - - /* Remember start of field on both input and output sides */ - start_ptr = cur_ptr; - cstate->raw_fields[fieldno] = output_ptr; - - /* - * Scan data for field. - * - * Note that in this loop, we are scanning to locate the end of field - * and also speculatively performing de-escaping. Once we find the - * end-of-field, we can match the raw field contents against the null - * marker string. Only after that comparison fails do we know that - * de-escaping is actually the right thing to do; therefore we *must - * not* throw any syntax errors before we've done the null-marker - * check. - */ - for (;;) - { - char c; - - end_ptr = cur_ptr; - if (cur_ptr >= line_end_ptr) - break; - c = *cur_ptr++; - if (c == delimc) - { - found_delim = true; - break; - } - if (c == '\\') - { - if (cur_ptr >= line_end_ptr) - break; - c = *cur_ptr++; - switch (c) - { - case '0': - case '1': - case '2': - case '3': - case '4': - case '5': - case '6': - case '7': - { - /* handle \013 */ - int val; - - val = OCTVALUE(c); - if (cur_ptr < line_end_ptr) - { - c = *cur_ptr; - if (ISOCTAL(c)) - { - cur_ptr++; - val = (val << 3) + OCTVALUE(c); - if (cur_ptr < line_end_ptr) - { - c = *cur_ptr; - if (ISOCTAL(c)) - { - cur_ptr++; - val = (val << 3) + OCTVALUE(c); - } - } - } - } - c = val & 0377; - if (c == '\0' || IS_HIGHBIT_SET(c)) - saw_non_ascii = true; - } - break; - case 'x': - /* Handle \x3F */ - if (cur_ptr < line_end_ptr) - { - char hexchar = *cur_ptr; - - if (isxdigit((unsigned char) hexchar)) - { - int val = GetDecimalFromHex(hexchar); - - cur_ptr++; - if (cur_ptr < line_end_ptr) - { - hexchar = *cur_ptr; - if (isxdigit((unsigned char) hexchar)) - { - cur_ptr++; - val = (val << 4) + GetDecimalFromHex(hexchar); - } - } - c = val & 0xff; - if (c == '\0' || IS_HIGHBIT_SET(c)) - saw_non_ascii = true; - } - } - break; - case 'b': - c = '\b'; - break; - case 'f': - c = '\f'; - break; - case 'n': - c = '\n'; - break; - case 'r': - c = '\r'; - break; - case 't': - c = '\t'; - break; - case 'v': - c = '\v'; - break; - - /* - * in all other cases, take the char after '\' - * literally - */ - } - } - - /* Add c to output string */ - *output_ptr++ = c; - } - - /* Check whether raw input matched null marker */ - input_len = end_ptr - start_ptr; - if (input_len == cstate->opts.null_print_len && - strncmp(start_ptr, cstate->opts.null_print, input_len) == 0) - cstate->raw_fields[fieldno] = NULL; - else - { - /* - * At this point we know the field is supposed to contain data. - * - * If we de-escaped any non-7-bit-ASCII chars, make sure the - * resulting string is valid data for the db encoding. - */ - if (saw_non_ascii) - { - char *fld = cstate->raw_fields[fieldno]; - - pg_verifymbstr(fld, output_ptr - fld, false); - } - } - - /* Terminate attribute value in output area */ - *output_ptr++ = '\0'; - - fieldno++; - /* Done if we hit EOL instead of a delim */ - if (!found_delim) - break; - } - - /* Clean up state of attribute_buf */ - output_ptr--; - Assert(*output_ptr == '\0'); - cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); - - return fieldno; -} - -/* - * Parse the current line into separate attributes (fields), - * performing de-escaping as needed. This has exactly the same API as - * CopyReadAttributesText, except we parse the fields according to - * "standard" (i.e. common) CSV usage. - */ -static int -CopyReadAttributesCSV(CopyFromState cstate) -{ - char delimc = cstate->opts.delim[0]; - char quotec = cstate->opts.quote[0]; - char escapec = cstate->opts.escape[0]; - int fieldno; - char *output_ptr; - char *cur_ptr; - char *line_end_ptr; - - /* - * We need a special case for zero-column tables: check that the input - * line is empty, and return. - */ - if (cstate->max_fields <= 0) - { - if (cstate->line_buf.len != 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - return 0; - } - - resetStringInfo(&cstate->attribute_buf); - - /* - * The de-escaped attributes will certainly not be longer than the input - * data line, so we can just force attribute_buf to be large enough and - * then transfer data without any checks for enough space. We need to do - * it this way because enlarging attribute_buf mid-stream would invalidate - * pointers already stored into cstate->raw_fields[]. - */ - if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) - enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); - output_ptr = cstate->attribute_buf.data; - - /* set pointer variables for loop */ - cur_ptr = cstate->line_buf.data; - line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; - - /* Outer loop iterates over fields */ - fieldno = 0; - for (;;) - { - bool found_delim = false; - bool saw_quote = false; - char *start_ptr; - char *end_ptr; - int input_len; - - /* Make sure there is enough space for the next value */ - if (fieldno >= cstate->max_fields) - { - cstate->max_fields *= 2; - cstate->raw_fields = - repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); - } - - /* Remember start of field on both input and output sides */ - start_ptr = cur_ptr; - cstate->raw_fields[fieldno] = output_ptr; - - /* - * Scan data for field, - * - * The loop starts in "not quote" mode and then toggles between that - * and "in quote" mode. The loop exits normally if it is in "not - * quote" mode and a delimiter or line end is seen. - */ - for (;;) - { - char c; - - /* Not in quote */ - for (;;) - { - end_ptr = cur_ptr; - if (cur_ptr >= line_end_ptr) - goto endfield; - c = *cur_ptr++; - /* unquoted field delimiter */ - if (c == delimc) - { - found_delim = true; - goto endfield; - } - /* start of quoted field (or part of field) */ - if (c == quotec) - { - saw_quote = true; - break; - } - /* Add c to output string */ - *output_ptr++ = c; - } - - /* In quote */ - for (;;) - { - end_ptr = cur_ptr; - if (cur_ptr >= line_end_ptr) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unterminated CSV quoted field"))); - - c = *cur_ptr++; - - /* escape within a quoted field */ - if (c == escapec) - { - /* - * peek at the next char if available, and escape it if it - * is an escape char or a quote char - */ - if (cur_ptr < line_end_ptr) - { - char nextc = *cur_ptr; - - if (nextc == escapec || nextc == quotec) - { - *output_ptr++ = nextc; - cur_ptr++; - continue; - } - } - } - - /* - * end of quoted field. Must do this test after testing for - * escape in case quote char and escape char are the same - * (which is the common case). - */ - if (c == quotec) - break; - - /* Add c to output string */ - *output_ptr++ = c; - } - } -endfield: - - /* Terminate attribute value in output area */ - *output_ptr++ = '\0'; - - /* Check whether raw input matched null marker */ - input_len = end_ptr - start_ptr; - if (!saw_quote && input_len == cstate->opts.null_print_len && - strncmp(start_ptr, cstate->opts.null_print, input_len) == 0) - cstate->raw_fields[fieldno] = NULL; - - fieldno++; - /* Done if we hit EOL instead of a delim */ - if (!found_delim) - break; - } - - /* Clean up state of attribute_buf */ - output_ptr--; - Assert(*output_ptr == '\0'); - cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); - - return fieldno; -} - - -/* - * Read a binary attribute - */ -static Datum -CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, - Oid typioparam, int32 typmod, - bool *isnull) -{ - int32 fld_size; - Datum result; - - if (!CopyGetInt32(cstate, &fld_size)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); - if (fld_size == -1) - { - *isnull = true; - return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); - } - if (fld_size < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid field size"))); - - /* reset attribute_buf to empty, and load raw data in it */ - resetStringInfo(&cstate->attribute_buf); - - enlargeStringInfo(&cstate->attribute_buf, fld_size); - if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, - fld_size) != fld_size) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); - - cstate->attribute_buf.len = fld_size; - cstate->attribute_buf.data[fld_size] = '\0'; - - /* Call the column type's binary input converter */ - result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, - typioparam, typmod); - - /* Trouble if it didn't eat the whole buffer */ - if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) - ereport(ERROR, - (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), - errmsg("incorrect binary data format"))); - - *isnull = false; - return result; -} diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c new file mode 100644 index 00000000000..4b7583392dd --- /dev/null +++ b/src/backend/commands/copyfromparse.c @@ -0,0 +1,1646 @@ +/*------------------------------------------------------------------------- + * + * copyfromparse.c + * Parse CSV/text/binary format for COPY FROM. + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/copyfrom.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include +#include + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/sysattr.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "catalog/dependency.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_type.h" +#include "commands/copy.h" +#include "commands/copyfrom_internal.h" +#include "commands/defrem.h" +#include "commands/trigger.h" +#include "executor/execPartition.h" +#include "executor/executor.h" +#include "executor/nodeModifyTable.h" +#include "executor/tuptable.h" +#include "foreign/fdwapi.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" +#include "parser/parse_coerce.h" +#include "parser/parse_collate.h" +#include "parser/parse_expr.h" +#include "parser/parse_relation.h" +#include "port/pg_bswap.h" +#include "rewrite/rewriteHandler.h" +#include "storage/fd.h" +#include "tcop/tcopprot.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/partcache.h" +#include "utils/portal.h" +#include "utils/rel.h" +#include "utils/rls.h" +#include "utils/snapmgr.h" + +#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) +#define OCTVALUE(c) ((c) - '0') + +/* + * These macros centralize code used to process line_buf and raw_buf buffers. + * They are macros because they often do continue/break control and to avoid + * function call overhead in tight COPY loops. + * + * We must use "if (1)" because the usual "do {...} while(0)" wrapper would + * prevent the continue/break processing from working. We end the "if (1)" + * with "else ((void) 0)" to ensure the "if" does not unintentionally match + * any "else" in the calling code, and to avoid any compiler warnings about + * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros. + */ + +/* + * This keeps the character read at the top of the loop in the buffer + * even if there is more than one read-ahead. + */ +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ +if (1) \ +{ \ + if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \ + { \ + raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ + need_data = true; \ + continue; \ + } \ +} else ((void) 0) + +/* This consumes the remainder of the buffer and breaks */ +#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \ +if (1) \ +{ \ + if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \ + { \ + if (extralen) \ + raw_buf_ptr = copy_buf_len; /* consume the partial character */ \ + /* backslash just before EOF, treat as data char */ \ + result = true; \ + break; \ + } \ +} else ((void) 0) + +/* + * Transfer any approved data to line_buf; must do this to be sure + * there is some room in raw_buf. + */ +#define REFILL_LINEBUF \ +if (1) \ +{ \ + if (raw_buf_ptr > cstate->raw_buf_index) \ + { \ + appendBinaryStringInfo(&cstate->line_buf, \ + cstate->raw_buf + cstate->raw_buf_index, \ + raw_buf_ptr - cstate->raw_buf_index); \ + cstate->raw_buf_index = raw_buf_ptr; \ + } \ +} else ((void) 0) + +/* Undo any read-ahead and jump out of the block. */ +#define NO_END_OF_COPY_GOTO \ +if (1) \ +{ \ + raw_buf_ptr = prev_raw_ptr + 1; \ + goto not_end_of_copy; \ +} else ((void) 0) + +static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; + + +/* non-export function prototypes */ +static bool CopyReadLine(CopyFromState cstate); +static bool CopyReadLineText(CopyFromState cstate); +static int CopyReadAttributesText(CopyFromState cstate); +static int CopyReadAttributesCSV(CopyFromState cstate); +static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, + bool *isnull); + + +/* Low-level communications functions */ +static int CopyGetData(CopyFromState cstate, void *databuf, + int minread, int maxread); +static bool CopyGetInt32(CopyFromState cstate, int32 *val); +static bool CopyGetInt16(CopyFromState cstate, int16 *val); +static bool CopyLoadRawBuf(CopyFromState cstate); +static int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes); + +void +ReceiveCopyBegin(CopyFromState cstate) +{ + if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) + { + /* new way */ + StringInfoData buf; + int natts = list_length(cstate->attnumlist); + int16 format = (cstate->opts.binary ? 1 : 0); + int i; + + pq_beginmessage(&buf, 'G'); + pq_sendbyte(&buf, format); /* overall format */ + pq_sendint16(&buf, natts); + for (i = 0; i < natts; i++) + pq_sendint16(&buf, format); /* per-column formats */ + pq_endmessage(&buf); + cstate->copy_src = COPY_NEW_FE; + cstate->fe_msgbuf = makeStringInfo(); + } + else + { + /* old way */ + if (cstate->opts.binary) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY BINARY is not supported to stdout or from stdin"))); + pq_putemptymessage('G'); + /* any error in old protocol will make us lose sync */ + pq_startmsgread(); + cstate->copy_src = COPY_OLD_FE; + } + /* We *must* flush here to ensure FE knows it can send. */ + pq_flush(); +} + +void +ReceiveCopyBinaryHeader(CopyFromState cstate) +{ + char readSig[11]; + int32 tmp; + + /* Signature */ + if (CopyReadBinaryData(cstate, readSig, 11) != 11 || + memcmp(readSig, BinarySignature, 11) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY file signature not recognized"))); + /* Flags field */ + if (!CopyGetInt32(cstate, &tmp)) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (missing flags)"))); + if ((tmp & (1 << 16)) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (WITH OIDS)"))); + tmp &= ~(1 << 16); + if ((tmp >> 16) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unrecognized critical flags in COPY file header"))); + /* Header extension length */ + if (!CopyGetInt32(cstate, &tmp) || + tmp < 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (missing length)"))); + /* Skip extension header, if present */ + while (tmp-- > 0) + { + if (CopyReadBinaryData(cstate, readSig, 1) != 1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (wrong length)"))); + } +} + +/* + * CopyGetData reads data from the source (file or frontend) + * + * We attempt to read at least minread, and at most maxread, bytes from + * the source. The actual number of bytes read is returned; if this is + * less than minread, EOF was detected. + * + * Note: when copying from the frontend, we expect a proper EOF mark per + * protocol; if the frontend simply drops the connection, we raise error. + * It seems unwise to allow the COPY IN to complete normally in that case. + * + * NB: no data conversion is applied here. + */ +static int +CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) +{ + int bytesread = 0; + + switch (cstate->copy_src) + { + case COPY_FILE: + bytesread = fread(databuf, 1, maxread, cstate->copy_file); + if (ferror(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from COPY file: %m"))); + if (bytesread == 0) + cstate->reached_eof = true; + break; + case COPY_OLD_FE: + + /* + * We cannot read more than minread bytes (which in practice is 1) + * because old protocol doesn't have any clear way of separating + * the COPY stream from following data. This is slow, but not any + * slower than the code path was originally, and we don't care + * much anymore about the performance of old protocol. + */ + if (pq_getbytes((char *) databuf, minread)) + { + /* Only a \. terminator is legal EOF in old protocol */ + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on client connection with an open transaction"))); + } + bytesread = minread; + break; + case COPY_NEW_FE: + while (maxread > 0 && bytesread < minread && !cstate->reached_eof) + { + int avail; + + while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len) + { + /* Try to receive another message */ + int mtype; + + readmessage: + HOLD_CANCEL_INTERRUPTS(); + pq_startmsgread(); + mtype = pq_getbyte(); + if (mtype == EOF) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on client connection with an open transaction"))); + if (pq_getmessage(cstate->fe_msgbuf, 0)) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on client connection with an open transaction"))); + RESUME_CANCEL_INTERRUPTS(); + switch (mtype) + { + case 'd': /* CopyData */ + break; + case 'c': /* CopyDone */ + /* COPY IN correctly terminated by frontend */ + cstate->reached_eof = true; + return bytesread; + case 'f': /* CopyFail */ + ereport(ERROR, + (errcode(ERRCODE_QUERY_CANCELED), + errmsg("COPY from stdin failed: %s", + pq_getmsgstring(cstate->fe_msgbuf)))); + break; + case 'H': /* Flush */ + case 'S': /* Sync */ + + /* + * Ignore Flush/Sync for the convenience of client + * libraries (such as libpq) that may send those + * without noticing that the command they just + * sent was COPY. + */ + goto readmessage; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message type 0x%02X during COPY from stdin", + mtype))); + break; + } + } + avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; + if (avail > maxread) + avail = maxread; + pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); + databuf = (void *) ((char *) databuf + avail); + maxread -= avail; + bytesread += avail; + } + break; + case COPY_CALLBACK: + bytesread = cstate->data_source_cb(databuf, minread, maxread); + break; + } + + return bytesread; +} + + +/* + * These functions do apply some data conversion + */ + +/* + * CopyGetInt32 reads an int32 that appears in network byte order + * + * Returns true if OK, false if EOF + */ +static inline bool +CopyGetInt32(CopyFromState cstate, int32 *val) +{ + uint32 buf; + + if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } + *val = (int32) pg_ntoh32(buf); + return true; +} + +/* + * CopyGetInt16 reads an int16 that appears in network byte order + */ +static inline bool +CopyGetInt16(CopyFromState cstate, int16 *val) +{ + uint16 buf; + + if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } + *val = (int16) pg_ntoh16(buf); + return true; +} + + +/* + * CopyLoadRawBuf loads some more data into raw_buf + * + * Returns true if able to obtain at least one more byte, else false. + * + * If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start + * of the buffer and then we load more data after that. This case occurs only + * when a multibyte character crosses a bufferload boundary. + */ +static bool +CopyLoadRawBuf(CopyFromState cstate) +{ + int nbytes = RAW_BUF_BYTES(cstate); + int inbytes; + + /* Copy down the unprocessed data if any. */ + if (nbytes > 0) + memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, + nbytes); + + inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, + 1, RAW_BUF_SIZE - nbytes); + nbytes += inbytes; + cstate->raw_buf[nbytes] = '\0'; + cstate->raw_buf_index = 0; + cstate->raw_buf_len = nbytes; + return (inbytes > 0); +} + +/* + * CopyReadBinaryData + * + * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf + * and writes them to 'dest'. Returns the number of bytes read (which + * would be less than 'nbytes' only if we reach EOF). + */ +static int +CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) +{ + int copied_bytes = 0; + + if (RAW_BUF_BYTES(cstate) >= nbytes) + { + /* Enough bytes are present in the buffer. */ + memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, nbytes); + cstate->raw_buf_index += nbytes; + copied_bytes = nbytes; + } + else + { + /* + * Not enough bytes in the buffer, so must read from the file. Need + * to loop since 'nbytes' could be larger than the buffer size. + */ + do + { + int copy_bytes; + + /* Load more data if buffer is empty. */ + if (RAW_BUF_BYTES(cstate) == 0) + { + if (!CopyLoadRawBuf(cstate)) + break; /* EOF */ + } + + /* Transfer some bytes. */ + copy_bytes = Min(nbytes - copied_bytes, RAW_BUF_BYTES(cstate)); + memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, copy_bytes); + cstate->raw_buf_index += copy_bytes; + dest += copy_bytes; + copied_bytes += copy_bytes; + } while (copied_bytes < nbytes); + } + + return copied_bytes; +} + +/* + * Read raw fields in the next line for COPY FROM in text or csv mode. + * Return false if no more lines. + * + * An internal temporary buffer is returned via 'fields'. It is valid until + * the next call of the function. Since the function returns all raw fields + * in the input file, 'nfields' could be different from the number of columns + * in the relation. + * + * NOTE: force_not_null option are not applied to the returned fields. + */ +bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +{ + int fldct; + bool done; + + /* only available for text or csv input */ + Assert(!cstate->opts.binary); + + /* on input just throw the header line away */ + if (cstate->cur_lineno == 0 && cstate->opts.header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + return false; /* done */ + } + + cstate->cur_lineno++; + + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; + + /* Parse the line into de-escaped field values */ + if (cstate->opts.csv_mode) + fldct = CopyReadAttributesCSV(cstate); + else + fldct = CopyReadAttributesText(cstate); + + *fields = cstate->raw_fields; + *nfields = fldct; + return true; +} + +/* + * Read next tuple from file for COPY FROM. Return false if no more tuples. + * + * 'econtext' is used to evaluate default expression for each columns not + * read from the file. It can be NULL when no default values are used, i.e. + * when all columns are read from the file. + * + * 'values' and 'nulls' arrays must be the same length as columns of the + * relation passed to BeginCopyFrom. This function fills the arrays. + * Oid of the tuple is returned with 'tupleOid' separately. + */ +bool +NextCopyFrom(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber num_phys_attrs, + attr_count, + num_defaults = cstate->num_defaults; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int i; + int *defmap = cstate->defmap; + ExprState **defexprs = cstate->defexprs; + + tupDesc = RelationGetDescr(cstate->rel); + num_phys_attrs = tupDesc->natts; + attr_count = list_length(cstate->attnumlist); + + /* Initialize all values for row to NULL */ + MemSet(values, 0, num_phys_attrs * sizeof(Datum)); + MemSet(nulls, true, num_phys_attrs * sizeof(bool)); + + if (!cstate->opts.binary) + { + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (cstate->opts.csv_mode) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - + * convert it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the + * string would already have been set to NULL. Convert it + * to NULL as specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + att->atttypmod); + if (string != NULL) + nulls[m] = false; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + } + else + { + /* binary */ + int16 fld_count; + ListCell *cur; + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. In a V3-protocol copy, wait for the + * protocol-level EOF, and complain if it doesn't come + * immediately. This ensures that we correctly handle CopyFail, + * if client chooses to send that now. + * + * Note that we MUST NOT try to read more data in an old-protocol + * copy, since there is no protocol-level EOF marker then. We + * could go either way for copy from file, but choose to throw + * error if there's data after the EOF marker, for consistency + * with the new-protocol case. + */ + char dummy; + + if (cstate->copy_src != COPY_OLD_FE && + CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + + /* + * Now compute and insert any defaults available for the columns not + * provided by the input data. Anything not processed here or above will + * remain NULL. + */ + for (i = 0; i < num_defaults; i++) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, + &nulls[defmap[i]]); + } + + return true; +} + +/* + * Read the next input line and stash it in line_buf, with conversion to + * server encoding. + * + * Result is true if read was terminated by EOF, false if terminated + * by newline. The terminating newline or EOF marker is not included + * in the final value of line_buf. + */ +static bool +CopyReadLine(CopyFromState cstate) +{ + bool result; + + resetStringInfo(&cstate->line_buf); + cstate->line_buf_valid = true; + + /* Mark that encoding conversion hasn't occurred yet */ + cstate->line_buf_converted = false; + + /* Parse data and transfer into line_buf */ + result = CopyReadLineText(cstate); + + if (result) + { + /* + * Reached EOF. In protocol version 3, we should ignore anything + * after \. up to the protocol end of copy data. (XXX maybe better + * not to treat \. as special?) + */ + if (cstate->copy_src == COPY_NEW_FE) + { + do + { + cstate->raw_buf_index = cstate->raw_buf_len; + } while (CopyLoadRawBuf(cstate)); + } + } + else + { + /* + * If we didn't hit EOF, then we must have transferred the EOL marker + * to line_buf along with the data. Get rid of it. + */ + switch (cstate->eol_type) + { + case EOL_NL: + Assert(cstate->line_buf.len >= 1); + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); + cstate->line_buf.len--; + cstate->line_buf.data[cstate->line_buf.len] = '\0'; + break; + case EOL_CR: + Assert(cstate->line_buf.len >= 1); + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); + cstate->line_buf.len--; + cstate->line_buf.data[cstate->line_buf.len] = '\0'; + break; + case EOL_CRNL: + Assert(cstate->line_buf.len >= 2); + Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); + cstate->line_buf.len -= 2; + cstate->line_buf.data[cstate->line_buf.len] = '\0'; + break; + case EOL_UNKNOWN: + /* shouldn't get here */ + Assert(false); + break; + } + } + + /* Done reading the line. Convert it to server encoding. */ + if (cstate->need_transcoding) + { + char *cvt; + + cvt = pg_any_to_server(cstate->line_buf.data, + cstate->line_buf.len, + cstate->file_encoding); + if (cvt != cstate->line_buf.data) + { + /* transfer converted data back to line_buf */ + resetStringInfo(&cstate->line_buf); + appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); + pfree(cvt); + } + } + + /* Now it's safe to use the buffer in error messages */ + cstate->line_buf_converted = true; + + return result; +} + +/* + * CopyReadLineText - inner loop of CopyReadLine for text mode + */ +static bool +CopyReadLineText(CopyFromState cstate) +{ + char *copy_raw_buf; + int raw_buf_ptr; + int copy_buf_len; + bool need_data = false; + bool hit_eof = false; + bool result = false; + char mblen_str[2]; + + /* CSV variables */ + bool first_char_in_line = true; + bool in_quote = false, + last_was_esc = false; + char quotec = '\0'; + char escapec = '\0'; + + if (cstate->opts.csv_mode) + { + quotec = cstate->opts.quote[0]; + escapec = cstate->opts.escape[0]; + /* ignore special escape processing if it's the same as quotec */ + if (quotec == escapec) + escapec = '\0'; + } + + mblen_str[1] = '\0'; + + /* + * The objective of this loop is to transfer the entire next input line + * into line_buf. Hence, we only care for detecting newlines (\r and/or + * \n) and the end-of-copy marker (\.). + * + * In CSV mode, \r and \n inside a quoted field are just part of the data + * value and are put in line_buf. We keep just enough state to know if we + * are currently in a quoted field or not. + * + * These four characters, and the CSV escape and quote characters, are + * assumed the same in frontend and backend encodings. + * + * For speed, we try to move data from raw_buf to line_buf in chunks + * rather than one character at a time. raw_buf_ptr points to the next + * character to examine; any characters from raw_buf_index to raw_buf_ptr + * have been determined to be part of the line, but not yet transferred to + * line_buf. + * + * For a little extra speed within the loop, we copy raw_buf and + * raw_buf_len into local variables. + */ + copy_raw_buf = cstate->raw_buf; + raw_buf_ptr = cstate->raw_buf_index; + copy_buf_len = cstate->raw_buf_len; + + for (;;) + { + int prev_raw_ptr; + char c; + + /* + * Load more data if needed. Ideally we would just force four bytes + * of read-ahead and avoid the many calls to + * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol + * does not allow us to read too far ahead or we might read into the + * next data, so we read-ahead only as far we know we can. One + * optimization would be to read-ahead four byte here if + * cstate->copy_src != COPY_OLD_FE, but it hardly seems worth it, + * considering the size of the buffer. + */ + if (raw_buf_ptr >= copy_buf_len || need_data) + { + REFILL_LINEBUF; + + /* + * Try to read some more data. This will certainly reset + * raw_buf_index to zero, and raw_buf_ptr must go with it. + */ + if (!CopyLoadRawBuf(cstate)) + hit_eof = true; + raw_buf_ptr = 0; + copy_buf_len = cstate->raw_buf_len; + + /* + * If we are completely out of data, break out of the loop, + * reporting EOF. + */ + if (copy_buf_len <= 0) + { + result = true; + break; + } + need_data = false; + } + + /* OK to fetch a character */ + prev_raw_ptr = raw_buf_ptr; + c = copy_raw_buf[raw_buf_ptr++]; + + if (cstate->opts.csv_mode) + { + /* + * If character is '\\' or '\r', we may need to look ahead below. + * Force fetch of the next character if we don't already have it. + * We need to do this before changing CSV state, in case one of + * these characters is also the quote or escape character. + * + * Note: old-protocol does not like forced prefetch, but it's OK + * here since we cannot validly be at EOF. + */ + if (c == '\\' || c == '\r') + { + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + } + + /* + * Dealing with quotes and escapes here is mildly tricky. If the + * quote char is also the escape char, there's no problem - we + * just use the char as a toggle. If they are different, we need + * to ensure that we only take account of an escape inside a + * quoted field and immediately preceding a quote char, and not + * the second in an escape-escape sequence. + */ + if (in_quote && c == escapec) + last_was_esc = !last_was_esc; + if (c == quotec && !last_was_esc) + in_quote = !in_quote; + if (c != escapec) + last_was_esc = false; + + /* + * Updating the line count for embedded CR and/or LF chars is + * necessarily a little fragile - this test is probably about the + * best we can do. (XXX it's arguable whether we should do this + * at all --- is cur_lineno a physical or logical count?) + */ + if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) + cstate->cur_lineno++; + } + + /* Process \r */ + if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + { + /* Check for \r\n on first line, _and_ handle \r\n. */ + if (cstate->eol_type == EOL_UNKNOWN || + cstate->eol_type == EOL_CRNL) + { + /* + * If need more data, go back to loop top to load it. + * + * Note that if we are at EOF, c will wind up as '\0' because + * of the guaranteed pad of raw_buf. + */ + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + + /* get next char */ + c = copy_raw_buf[raw_buf_ptr]; + + if (c == '\n') + { + raw_buf_ptr++; /* eat newline */ + cstate->eol_type = EOL_CRNL; /* in case not set yet */ + } + else + { + /* found \r, but no \n */ + if (cstate->eol_type == EOL_CRNL) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + !cstate->opts.csv_mode ? + errmsg("literal carriage return found in data") : + errmsg("unquoted carriage return found in data"), + !cstate->opts.csv_mode ? + errhint("Use \"\\r\" to represent carriage return.") : + errhint("Use quoted CSV field to represent carriage return."))); + + /* + * if we got here, it is the first line and we didn't find + * \n, so don't consume the peeked character + */ + cstate->eol_type = EOL_CR; + } + } + else if (cstate->eol_type == EOL_NL) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + !cstate->opts.csv_mode ? + errmsg("literal carriage return found in data") : + errmsg("unquoted carriage return found in data"), + !cstate->opts.csv_mode ? + errhint("Use \"\\r\" to represent carriage return.") : + errhint("Use quoted CSV field to represent carriage return."))); + /* If reach here, we have found the line terminator */ + break; + } + + /* Process \n */ + if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + { + if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + !cstate->opts.csv_mode ? + errmsg("literal newline found in data") : + errmsg("unquoted newline found in data"), + !cstate->opts.csv_mode ? + errhint("Use \"\\n\" to represent newline.") : + errhint("Use quoted CSV field to represent newline."))); + cstate->eol_type = EOL_NL; /* in case not set yet */ + /* If reach here, we have found the line terminator */ + break; + } + + /* + * In CSV mode, we only recognize \. alone on a line. This is because + * \. is a valid CSV data value. + */ + if (c == '\\' && (!cstate->opts.csv_mode || first_char_in_line)) + { + char c2; + + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + IF_NEED_REFILL_AND_EOF_BREAK(0); + + /* ----- + * get next character + * Note: we do not change c so if it isn't \., we can fall + * through and continue processing for file encoding. + * ----- + */ + c2 = copy_raw_buf[raw_buf_ptr]; + + if (c2 == '.') + { + raw_buf_ptr++; /* consume the '.' */ + + /* + * Note: if we loop back for more data here, it does not + * matter that the CSV state change checks are re-executed; we + * will come back here with no important state changed. + */ + if (cstate->eol_type == EOL_CRNL) + { + /* Get the next character */ + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + /* if hit_eof, c2 will become '\0' */ + c2 = copy_raw_buf[raw_buf_ptr++]; + + if (c2 == '\n') + { + if (!cstate->opts.csv_mode) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker does not match previous newline style"))); + else + NO_END_OF_COPY_GOTO; + } + else if (c2 != '\r') + { + if (!cstate->opts.csv_mode) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker corrupt"))); + else + NO_END_OF_COPY_GOTO; + } + } + + /* Get the next character */ + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + /* if hit_eof, c2 will become '\0' */ + c2 = copy_raw_buf[raw_buf_ptr++]; + + if (c2 != '\r' && c2 != '\n') + { + if (!cstate->opts.csv_mode) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker corrupt"))); + else + NO_END_OF_COPY_GOTO; + } + + if ((cstate->eol_type == EOL_NL && c2 != '\n') || + (cstate->eol_type == EOL_CRNL && c2 != '\n') || + (cstate->eol_type == EOL_CR && c2 != '\r')) + { + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker does not match previous newline style"))); + } + + /* + * Transfer only the data before the \. into line_buf, then + * discard the data and the \. sequence. + */ + if (prev_raw_ptr > cstate->raw_buf_index) + appendBinaryStringInfo(&cstate->line_buf, + cstate->raw_buf + cstate->raw_buf_index, + prev_raw_ptr - cstate->raw_buf_index); + cstate->raw_buf_index = raw_buf_ptr; + result = true; /* report EOF */ + break; + } + else if (!cstate->opts.csv_mode) + + /* + * If we are here, it means we found a backslash followed by + * something other than a period. In non-CSV mode, anything + * after a backslash is special, so we skip over that second + * character too. If we didn't do that \\. would be + * considered an eof-of copy, while in non-CSV mode it is a + * literal backslash followed by a period. In CSV mode, + * backslashes are not special, so we want to process the + * character after the backslash just like a normal character, + * so we don't increment in those cases. + */ + raw_buf_ptr++; + } + + /* + * This label is for CSV cases where \. appears at the start of a + * line, but there is more text after it, meaning it was a data value. + * We are more strict for \. in CSV mode because \. could be a data + * value, while in non-CSV mode, \. cannot be a data value. + */ +not_end_of_copy: + + /* + * Process all bytes of a multi-byte character as a group. + * + * We only support multi-byte sequences where the first byte has the + * high-bit set, so as an optimization we can avoid this block + * entirely if it is not set. + */ + if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c)) + { + int mblen; + + /* + * It is enough to look at the first byte in all our encodings, to + * get the length. (GB18030 is a bit special, but still works for + * our purposes; see comment in pg_gb18030_mblen()) + */ + mblen_str[0] = c; + mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str); + + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1); + IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); + raw_buf_ptr += mblen - 1; + } + first_char_in_line = false; + } /* end of outer loop */ + + /* + * Transfer any still-uncopied data to line_buf. + */ + REFILL_LINEBUF; + + return result; +} + +/* + * Return decimal value for a hexadecimal digit + */ +static int +GetDecimalFromHex(char hex) +{ + if (isdigit((unsigned char) hex)) + return hex - '0'; + else + return tolower((unsigned char) hex) - 'a' + 10; +} + +/* + * Parse the current line into separate attributes (fields), + * performing de-escaping as needed. + * + * The input is in line_buf. We use attribute_buf to hold the result + * strings. cstate->raw_fields[k] is set to point to the k'th attribute + * string, or NULL when the input matches the null marker string. + * This array is expanded as necessary. + * + * (Note that the caller cannot check for nulls since the returned + * string would be the post-de-escaping equivalent, which may look + * the same as some valid data string.) + * + * delim is the column delimiter string (must be just one byte for now). + * null_print is the null marker string. Note that this is compared to + * the pre-de-escaped input string. + * + * The return value is the number of fields actually read. + */ +static int +CopyReadAttributesText(CopyFromState cstate) +{ + char delimc = cstate->opts.delim[0]; + int fieldno; + char *output_ptr; + char *cur_ptr; + char *line_end_ptr; + + /* + * We need a special case for zero-column tables: check that the input + * line is empty, and return. + */ + if (cstate->max_fields <= 0) + { + if (cstate->line_buf.len != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + return 0; + } + + resetStringInfo(&cstate->attribute_buf); + + /* + * The de-escaped attributes will certainly not be longer than the input + * data line, so we can just force attribute_buf to be large enough and + * then transfer data without any checks for enough space. We need to do + * it this way because enlarging attribute_buf mid-stream would invalidate + * pointers already stored into cstate->raw_fields[]. + */ + if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) + enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); + output_ptr = cstate->attribute_buf.data; + + /* set pointer variables for loop */ + cur_ptr = cstate->line_buf.data; + line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; + + /* Outer loop iterates over fields */ + fieldno = 0; + for (;;) + { + bool found_delim = false; + char *start_ptr; + char *end_ptr; + int input_len; + bool saw_non_ascii = false; + + /* Make sure there is enough space for the next value */ + if (fieldno >= cstate->max_fields) + { + cstate->max_fields *= 2; + cstate->raw_fields = + repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); + } + + /* Remember start of field on both input and output sides */ + start_ptr = cur_ptr; + cstate->raw_fields[fieldno] = output_ptr; + + /* + * Scan data for field. + * + * Note that in this loop, we are scanning to locate the end of field + * and also speculatively performing de-escaping. Once we find the + * end-of-field, we can match the raw field contents against the null + * marker string. Only after that comparison fails do we know that + * de-escaping is actually the right thing to do; therefore we *must + * not* throw any syntax errors before we've done the null-marker + * check. + */ + for (;;) + { + char c; + + end_ptr = cur_ptr; + if (cur_ptr >= line_end_ptr) + break; + c = *cur_ptr++; + if (c == delimc) + { + found_delim = true; + break; + } + if (c == '\\') + { + if (cur_ptr >= line_end_ptr) + break; + c = *cur_ptr++; + switch (c) + { + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + { + /* handle \013 */ + int val; + + val = OCTVALUE(c); + if (cur_ptr < line_end_ptr) + { + c = *cur_ptr; + if (ISOCTAL(c)) + { + cur_ptr++; + val = (val << 3) + OCTVALUE(c); + if (cur_ptr < line_end_ptr) + { + c = *cur_ptr; + if (ISOCTAL(c)) + { + cur_ptr++; + val = (val << 3) + OCTVALUE(c); + } + } + } + } + c = val & 0377; + if (c == '\0' || IS_HIGHBIT_SET(c)) + saw_non_ascii = true; + } + break; + case 'x': + /* Handle \x3F */ + if (cur_ptr < line_end_ptr) + { + char hexchar = *cur_ptr; + + if (isxdigit((unsigned char) hexchar)) + { + int val = GetDecimalFromHex(hexchar); + + cur_ptr++; + if (cur_ptr < line_end_ptr) + { + hexchar = *cur_ptr; + if (isxdigit((unsigned char) hexchar)) + { + cur_ptr++; + val = (val << 4) + GetDecimalFromHex(hexchar); + } + } + c = val & 0xff; + if (c == '\0' || IS_HIGHBIT_SET(c)) + saw_non_ascii = true; + } + } + break; + case 'b': + c = '\b'; + break; + case 'f': + c = '\f'; + break; + case 'n': + c = '\n'; + break; + case 'r': + c = '\r'; + break; + case 't': + c = '\t'; + break; + case 'v': + c = '\v'; + break; + + /* + * in all other cases, take the char after '\' + * literally + */ + } + } + + /* Add c to output string */ + *output_ptr++ = c; + } + + /* Check whether raw input matched null marker */ + input_len = end_ptr - start_ptr; + if (input_len == cstate->opts.null_print_len && + strncmp(start_ptr, cstate->opts.null_print, input_len) == 0) + cstate->raw_fields[fieldno] = NULL; + else + { + /* + * At this point we know the field is supposed to contain data. + * + * If we de-escaped any non-7-bit-ASCII chars, make sure the + * resulting string is valid data for the db encoding. + */ + if (saw_non_ascii) + { + char *fld = cstate->raw_fields[fieldno]; + + pg_verifymbstr(fld, output_ptr - fld, false); + } + } + + /* Terminate attribute value in output area */ + *output_ptr++ = '\0'; + + fieldno++; + /* Done if we hit EOL instead of a delim */ + if (!found_delim) + break; + } + + /* Clean up state of attribute_buf */ + output_ptr--; + Assert(*output_ptr == '\0'); + cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); + + return fieldno; +} + +/* + * Parse the current line into separate attributes (fields), + * performing de-escaping as needed. This has exactly the same API as + * CopyReadAttributesText, except we parse the fields according to + * "standard" (i.e. common) CSV usage. + */ +static int +CopyReadAttributesCSV(CopyFromState cstate) +{ + char delimc = cstate->opts.delim[0]; + char quotec = cstate->opts.quote[0]; + char escapec = cstate->opts.escape[0]; + int fieldno; + char *output_ptr; + char *cur_ptr; + char *line_end_ptr; + + /* + * We need a special case for zero-column tables: check that the input + * line is empty, and return. + */ + if (cstate->max_fields <= 0) + { + if (cstate->line_buf.len != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + return 0; + } + + resetStringInfo(&cstate->attribute_buf); + + /* + * The de-escaped attributes will certainly not be longer than the input + * data line, so we can just force attribute_buf to be large enough and + * then transfer data without any checks for enough space. We need to do + * it this way because enlarging attribute_buf mid-stream would invalidate + * pointers already stored into cstate->raw_fields[]. + */ + if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) + enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); + output_ptr = cstate->attribute_buf.data; + + /* set pointer variables for loop */ + cur_ptr = cstate->line_buf.data; + line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; + + /* Outer loop iterates over fields */ + fieldno = 0; + for (;;) + { + bool found_delim = false; + bool saw_quote = false; + char *start_ptr; + char *end_ptr; + int input_len; + + /* Make sure there is enough space for the next value */ + if (fieldno >= cstate->max_fields) + { + cstate->max_fields *= 2; + cstate->raw_fields = + repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); + } + + /* Remember start of field on both input and output sides */ + start_ptr = cur_ptr; + cstate->raw_fields[fieldno] = output_ptr; + + /* + * Scan data for field, + * + * The loop starts in "not quote" mode and then toggles between that + * and "in quote" mode. The loop exits normally if it is in "not + * quote" mode and a delimiter or line end is seen. + */ + for (;;) + { + char c; + + /* Not in quote */ + for (;;) + { + end_ptr = cur_ptr; + if (cur_ptr >= line_end_ptr) + goto endfield; + c = *cur_ptr++; + /* unquoted field delimiter */ + if (c == delimc) + { + found_delim = true; + goto endfield; + } + /* start of quoted field (or part of field) */ + if (c == quotec) + { + saw_quote = true; + break; + } + /* Add c to output string */ + *output_ptr++ = c; + } + + /* In quote */ + for (;;) + { + end_ptr = cur_ptr; + if (cur_ptr >= line_end_ptr) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unterminated CSV quoted field"))); + + c = *cur_ptr++; + + /* escape within a quoted field */ + if (c == escapec) + { + /* + * peek at the next char if available, and escape it if it + * is an escape char or a quote char + */ + if (cur_ptr < line_end_ptr) + { + char nextc = *cur_ptr; + + if (nextc == escapec || nextc == quotec) + { + *output_ptr++ = nextc; + cur_ptr++; + continue; + } + } + } + + /* + * end of quoted field. Must do this test after testing for + * escape in case quote char and escape char are the same + * (which is the common case). + */ + if (c == quotec) + break; + + /* Add c to output string */ + *output_ptr++ = c; + } + } +endfield: + + /* Terminate attribute value in output area */ + *output_ptr++ = '\0'; + + /* Check whether raw input matched null marker */ + input_len = end_ptr - start_ptr; + if (!saw_quote && input_len == cstate->opts.null_print_len && + strncmp(start_ptr, cstate->opts.null_print, input_len) == 0) + cstate->raw_fields[fieldno] = NULL; + + fieldno++; + /* Done if we hit EOL instead of a delim */ + if (!found_delim) + break; + } + + /* Clean up state of attribute_buf */ + output_ptr--; + Assert(*output_ptr == '\0'); + cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); + + return fieldno; +} + + +/* + * Read a binary attribute + */ +static Datum +CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, + bool *isnull) +{ + int32 fld_size; + Datum result; + + if (!CopyGetInt32(cstate, &fld_size)) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + if (fld_size == -1) + { + *isnull = true; + return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); + } + if (fld_size < 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid field size"))); + + /* reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, + fld_size) != fld_size) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h new file mode 100644 index 00000000000..cbaeee8c0d4 --- /dev/null +++ b/src/include/commands/copyfrom_internal.h @@ -0,0 +1,170 @@ +/*------------------------------------------------------------------------- + * + * copyfrom_internal.h + * Internal definitions for copy command. + * + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyfrom_internal.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYFROM_INTERNAL_H +#define COPYFROM_INTERNAL_H + +#include "commands/copy.h" +#include "commands/trigger.h" +#include "executor/execdesc.h" + +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_FILE, /* from file (or a piped program) */ + COPY_OLD_FE, /* from frontend (2.0 protocol) */ + COPY_NEW_FE, /* from frontend (3.0 protocol) */ + COPY_CALLBACK /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL +} EolType; + +/* + * Represents the heap insert method to be used during COPY FROM. + */ +typedef enum CopyInsertMethod +{ + CIM_SINGLE, /* use table_tuple_insert or fdw routine */ + CIM_MULTI, /* always use table_multi_insert */ + CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ +} CopyInsertMethod; + +/* + * This struct contains all the state variables used throughout a COPY + * operation. For simplicity, we use the same struct for all variants of COPY, + * even though some fields are used in only some cases. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyFromStateData +{ + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for + * dest == COPY_NEW_FE in COPY FROM */ + bool reached_eof; /* true if we read to end of copy data (not + * all copy_src types maintain this) */ + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN/STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + + /* + * Working state for COPY TO/FROM + */ + MemoryContext copycontext; /* per-copy execution context */ + + /* + * Working state for COPY FROM + */ + AttrNumber num_defaults; + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + int *defmap; /* array of default att numbers */ + ExprState **defexprs; /* array of default att expressions */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, convert it + * to server encoding there, and then extract the individual attribute + * fields into attribute_buf. line_buf is preserved unmodified so that we + * can display it in error messages if appropriate. (In binary mode, + * line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_converted; /* converted to server encoding? */ + bool line_buf_valid; /* contains the row being processed? */ + + /* + * Finally, raw_buf holds raw data read from the data source (file or + * client connection). In text mode, CopyReadLine parses this data + * sufficiently to locate line boundaries, then transfers the data to + * line_buf and converts it. In binary mode, CopyReadBinaryData fetches + * appropriate amounts of data from this buffer. In both modes, we + * guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) +} CopyFromStateData; + +extern void ReceiveCopyBegin(CopyFromState cstate); +extern void ReceiveCopyBinaryHeader(CopyFromState cstate); + +#endif /* COPYFROM_INTERNAL_H */ -- 2.20.1