From 0f003ee7bc369728fb83c1e5d4aa2ba1304fa451 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sun, 22 Mar 2026 00:04:22 +0800 Subject: [PATCH v2 2/2] Support COPY FROM with FORMAT JSON Bytes are read into raw_buf and optionally transcoded into input_buf. Instead of CopyReadLine, JSON mode uses CopyReadNextJson to fetch the next row object via a small state machine: - BEFORE_ARRAY: expect '[' or '{' - BEFORE_OBJECT: in concat mode, expect next object or EOF - IN_ARRAY: expect object, comma, or ']' - IN_OBJECT: track brace depth to find the matching '}' - IN_STRING / IN_STRING_ESC: skip braces inside strings - ARRAY_END: after ']', no more rows When a row object closes, copy_json_finalize_linebuf_for_row reshapes line_buf to [row text][unparsed tail] and sets parse_pos to the row length so jsonb_in sees only the current row. NextCopyFromJsonRawFieldsInternal calls jsonb_in on that slice, verifies a JSON object at the root, then looks up each target column by name. Values are converted to C strings via JsonbValueToCstring and stored in attribute_buf, following the same pattern as text/CSV. CopyFromJsonOneRow then runs the standard per-column input functions, so type coercion matches ordinary textual input and the existing COPY machinery for defaults and soft errors applies unchanged. --- doc/src/sgml/ref/copy.sgml | 84 ++- src/backend/commands/copy.c | 14 +- src/backend/commands/copyfrom.c | 69 +++ src/backend/commands/copyfromparse.c | 714 ++++++++++++++++++++++- src/backend/commands/tablecmds.c | 2 +- src/include/commands/copyfrom_internal.h | 32 + src/test/regress/expected/copy.out | 82 ++- src/test/regress/sql/copy.sql | 56 +- src/tools/pgindent/typedefs.list | 1 + 9 files changed, 1030 insertions(+), 24 deletions(-) diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 4706c9a4410..0d7d843e316 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -234,10 +234,6 @@ COPY { table_name [ ( text. See below for details. - - The json option is allowed only in - COPY TO. - In JSON format, SQL NULL values are output as @@ -390,7 +386,8 @@ COPY (SELECT j FROM (VALUES ('null'::json), (NULL::json)) v(j)) Force output of square brackets as array decorations at the beginning and end of output, and commas between the rows. It is allowed only in COPY TO, and only when using - json format. The default is + json format. It is not supported for + COPY FROM. The default is false. @@ -1120,6 +1117,73 @@ versions of PostgreSQL. + + + JSON Format + + + <command>COPY TO</command> + + + By default, COPY TO writes one JSON object per line + (record), in column order, using the column names as object keys. + With FORCE_ARRAY, the whole output is wrapped as a + single JSON array: an opening [, objects separated + by commas (with a line break after the opening bracket, matching + COPY TO row boundaries), and a closing + ]. + + + + + <command>COPY FROM</command> + + + COPY FROM with FORMAT JSON accepts + either of these shapes. + + + + + + A JSON array whose elements are objects, one + object per table row, e.g. + [{row1},{row2}]. + Standard JSON rules apply: a comma is required between array elements, + the array must be closed with ], and a trailing + comma after the last element is not allowed. + + + + + A stream of JSON objects concatenated back-to-back + (optionally separated by white space), with no + comma between objects, e.g. + {row1}{row2}. + This form is recognized when the document begins with + {. + + + + + + For each row object, keys are matched to the target columns by name. + Keys that do not correspond to a column in the COPY + column list are ignored. Missing keys are treated as + NULL for the corresponding column. When no column + list is given, column names are those of the table (excluding generated + columns). + + + + Values are parsed with the same input semantics as for JSON/JSONB: + each field is converted from its JSON representation to the column's + data type. Options that only apply to text or CSV line parsing + (such as DELIMITER, NULL, + HEADER, ON_ERROR row skipping)are not used with JSON format. + + + @@ -1148,6 +1212,16 @@ The output is as follows: + + To load rows from a JSON file (for example one produced by + COPY TO with FORMAT JSON): + +COPY mytable FROM '/path/to/rows.json' (FORMAT JSON); + + The file may be either a JSON array of row objects or a concatenation + of row objects; see . + + To copy data from a file into the country table: diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index df3d1191151..86772cd1fb0 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -969,17 +969,19 @@ ProcessCopyOptions(ParseState *pstate, errmsg("COPY %s cannot be used with %s", "FREEZE", "COPY TO"))); - /* Check json format */ - if (opts_out->format == COPY_FORMAT_JSON && is_from) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY %s is not supported for %s", "FORMAT JSON", "COPY FROM")); - if (opts_out->format != COPY_FORMAT_JSON && opts_out->force_array) ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY %s can only be used with JSON mode", "FORCE_ARRAY")); + if (is_from && opts_out->format == COPY_FORMAT_JSON && opts_out->force_array) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR, + second %s is a COPY with direction, e.g. COPY TO */ + errmsg("COPY %s cannot be used with %s", "FORCE_ARRAY", + "COPY FROM"))); + if (opts_out->default_print) { if (!is_from) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index aa253b587aa..42d6eaa38ce 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -52,6 +52,7 @@ #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/typcache.h" +#include "utils/fmgrprotos.h" /* * No more than this many tuples per CopyMultiInsertBuffer @@ -153,6 +154,19 @@ static const CopyFromRoutine CopyFromRoutineBinary = { .CopyFromEnd = CopyFromBinaryEnd, }; +/* JSON format */ +static void CopyFromJsonInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam); +static void CopyFromJsonStart(CopyFromState cstate, TupleDesc tupDesc); +static void CopyFromJsonEnd(CopyFromState cstate); + +static const CopyFromRoutine CopyFromRoutineJson = { + .CopyFromInFunc = CopyFromJsonInFunc, + .CopyFromStart = CopyFromJsonStart, + .CopyFromOneRow = CopyFromJsonOneRow, + .CopyFromEnd = CopyFromJsonEnd, +}; + /* Return a COPY FROM routine for the given options */ static const CopyFromRoutine * CopyFromGetRoutine(const CopyFormatOptions *opts) @@ -161,6 +175,8 @@ CopyFromGetRoutine(const CopyFormatOptions *opts) return &CopyFromRoutineCSV; else if (opts->format == COPY_FORMAT_BINARY) return &CopyFromRoutineBinary; + else if (opts->format == COPY_FORMAT_JSON) + return &CopyFromRoutineJson; /* default is text */ return &CopyFromRoutineText; @@ -247,6 +263,59 @@ CopyFromBinaryEnd(CopyFromState cstate) /* nothing to do */ } +/* Implementation of the infunc callback for JSON format */ +static void +CopyFromJsonInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the start callback for JSON format */ +static void +CopyFromJsonStart(CopyFromState cstate, TupleDesc tupDesc) +{ + CopyFromJsonState *json_state; + + /* + * Set up input_buf for encoding conversion, same as text format. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* Store state for CopyFromJsonOneRow (JSON scan uses line_buf) */ + json_state = palloc0_object(CopyFromJsonState); + /* Accept [...] or auto-detect concatenated objects {...}{...} */ + json_state->parse_state = JSON_PARSE_BEFORE_ARRAY; + json_state->row_text_start = -1; + json_state->row_text_end = -1; + cstate->format_private = json_state; + + /* + * Create workspace for raw_fields (used by error reporting). + */ + cstate->max_fields = list_length(cstate->attnumlist); + cstate->raw_fields = (char **) palloc(cstate->max_fields * sizeof(char *)); +} + +/* Implementation of the end callback for JSON format */ +static void +CopyFromJsonEnd(CopyFromState cstate) +{ + /* format_private (CopyFromJsonState) is freed with copycontext */ +} + /* * error context callback for COPY FROM * diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 65fd5a0ab4f..0e22ab77004 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -42,6 +42,13 @@ * but 'attribute_buf' is used as a temporary buffer to hold one attribute's * data when it's passed the receive function. * + * In JSON mode, steps 1--2 are the same as text mode. CopyReadNextJson() then + * scans line_buf (refilled via CopyLoadInputBuf, then drained into line_buf) + * for the next top-level JSON + * object, like CopyReadLine() gathers one text line. After each successful + * read, line_buf is [row object text][not-yet-parsed suffix]; jsonb_in must + * see only the row prefix (length in parse_pos). + * * 'raw_buf' is always 64 kB in size (RAW_BUF_SIZE). 'input_buf' is also * 64 kB (INPUT_BUF_SIZE), if encoding conversion is required. 'line_buf' * and 'attribute_buf' are expanded on demand, to hold the longest line @@ -75,8 +82,10 @@ #include "port/pg_bswap.h" #include "port/simd.h" #include "utils/builtins.h" +#include "utils/fmgrprotos.h" #include "utils/rel.h" #include "utils/wait_event.h" +#include "utils/jsonb.h" #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') @@ -160,6 +169,9 @@ static pg_attribute_always_inline bool NextCopyFromRawFieldsInternal(CopyFromSta char ***fields, int *nfields, bool is_csv); +static bool NextCopyFromJsonRawFieldsInternal(CopyFromState cstate, + char ***fields, int *nfields); +static bool CopyReadNextJson(CopyFromState cstate); /* Low-level communications functions */ @@ -643,6 +655,410 @@ CopyLoadRawBuf(CopyFromState cstate) cstate->raw_reached_eof = true; } +/* + * COPY FROM JSON: incremental scanner over line_buf. + * Each completed row is left at the start of line_buf, followed by the + * not-yet-parsed suffix (like one text line plus read-ahead in CopyReadLine). + * + * We support: + * - A single JSON array of objects: [ {...}, {...} ] + * - Concatenated objects (auto-detect when input starts with '{'): {...}{...} + * + * States: + * BEFORE_ARRAY — start of input; expect '[' or '{'. + * BEFORE_OBJECT — after an object in concatenated-object mode; expect '{'. + * IN_ARRAY — inside [...]; expect object, separators, or ']'. + * Another '{' after an object requires a preceding ',' (JSON). + * IN_OBJECT — brace depth count to find the matching '}' for one row; + * strings switch to IN_STRING so braces inside strings are ignored. + * IN_STRING / IN_STRING_ESC — minimal string lexer for the above. + * ARRAY_END — saw closing ']'; no more rows from this document. + * + * obj_start (byte offset in line_buf) is meaningful only in IN_OBJECT; + * we avoid compacting line_buf while in IN_OBJECT/IN_STRING* so it stays valid. + * + * The state machine is implemented in CopyReadNextJson(). + */ + +/* + * If the scan cursor is past buffered text, compact prefix (when safe) and + * read more via CopyLoadInputBuf (then append new input_buf bytes into + * line_buf). When line_buf is still empty afterward, + * distinguish clean EOF from truncation errors. + * + * Returns true if there is more input to scan in line_buf; false if there is + * no next JSON row (CopyReadNextJson should report EOF / end of array). + */ +static bool +copy_json_refill_if_exhausted(CopyFromState cstate, int *obj_start) +{ + CopyFromJsonState *json_state = (CopyFromJsonState *) cstate->format_private; + StringInfo line_buf = &cstate->line_buf; + + if (json_state->parse_pos < line_buf->len) + return true; + + /* + * Drop a fully consumed prefix to bound line_buf growth. Not while + * inside an object/string: obj_start and parse_pos must stay consistent. + */ + if (json_state->parse_pos > 0 && + json_state->parse_state != JSON_PARSE_IN_OBJECT && + json_state->parse_state != JSON_PARSE_IN_STRING && + json_state->parse_state != JSON_PARSE_IN_STRING_ESC) + { + memmove(line_buf->data, line_buf->data + json_state->parse_pos, + line_buf->len - json_state->parse_pos); + line_buf->len -= json_state->parse_pos; + line_buf->data[line_buf->len] = '\0'; + *obj_start = (*obj_start >= 0) ? (*obj_start - json_state->parse_pos) : -1; + json_state->parse_pos = 0; + } + + /* + * Same refill primitive as text COPY: fill input_buf, then move decoded + * bytes into line_buf. Always drain the full input_buf chunk (unlike + * CopyReadLine, which may leave a prefix in input_buf). + */ + { + int nbytes = INPUT_BUF_BYTES(cstate); + + CopyLoadInputBuf(cstate); + + if (INPUT_BUF_BYTES(cstate) > nbytes) + { + appendBinaryStringInfo(line_buf, + cstate->input_buf + cstate->input_buf_index, + INPUT_BUF_BYTES(cstate)); + appendStringInfoChar(line_buf, '\0'); + line_buf->len--; /* don't count NUL */ + cstate->input_buf_index = cstate->input_buf_len; + if (cstate->raw_buf == cstate->input_buf) + cstate->raw_buf_index = cstate->input_buf_index; + } + } + + if (line_buf->len > 0) + return true; + + if (json_state->parse_state == JSON_PARSE_IN_OBJECT || + json_state->parse_state == JSON_PARSE_IN_STRING || + json_state->parse_state == JSON_PARSE_IN_STRING_ESC) + { + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + cstate->line_buf_valid = false; + ereport(ERROR, + errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected end of input in COPY JSON")); + } + + /* + * Still inside [...] (e.g. missing closing "]", or a trailing comma after + * the last element with no "]" following). + */ + if (json_state->array_mode && + json_state->parse_state == JSON_PARSE_IN_ARRAY) + { + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + cstate->line_buf_valid = false; + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid input format for COPY JSON"), + errdetail("JSON array input was not closed with \"]\"."))); + } + + return false; +} + +/* + * After a row object completes in CopyReadNextJson(), reshape line_buf so + * jsonb_in sees only the current row while the scanner keeps read-ahead bytes + * for the next call. + * + * Before finalize, line_buf is one UTF-8 chunk; row_text_start (rs) and + * row_text_end (re) mark the completed object text as [rs, re). parse_pos + * has been advanced past the closing '}' and any following comma/whitespace + * (see CopyReadNextJson, JSON_PARSE_IN_OBJECT); call that index tail_from. + * Unparsed bytes are [tail_from, len). + * + * ... already scanned ... [ rs ... object JSON ... re )[ tail_from ... len ) + * ^-- row for jsonb_in --^ ^-- tail (next row) --^ + * + * After finalize, the same bytes are contiguous at the start of line_buf: + * + * [ 0 ... row byte count )[ row byte count ... len ) + * ^-- row (length = re - rs) ^-- tail + * + * We set parse_pos = re - rs (row length). NextCopyFromJsonRawFieldsInternal + * passes pnstrdup(data, parse_pos) to jsonb_in; CopyReadNextJson continues + * scanning from data[parse_pos]. + * + * We append a trailing NUL then decrement len so StringInfo holds row+tail + * bytes only; data[len] is '\0'. + * + * line_buf_valid is not set here: the buffer still holds scan tail, so + * NextCopyFromJsonRawFieldsInternal clears it before errors use Copy context. + */ +static void +copy_json_finalize_linebuf_for_row(CopyFromState cstate, StringInfo line_buf) +{ + CopyFromJsonState *st = (CopyFromJsonState *) cstate->format_private; + int rs = st->row_text_start; + int re = st->row_text_end; + int tail_from = st->parse_pos; + int tail_len; + char *rowdup; + char *taildup = NULL; + + Assert(rs >= 0 && re >= rs && re <= line_buf->len); + Assert(tail_from >= re && tail_from <= line_buf->len); + + tail_len = line_buf->len - tail_from; + rowdup = pnstrdup(line_buf->data + rs, re - rs); + if (tail_len > 0) + taildup = pnstrdup(line_buf->data + tail_from, tail_len); + + resetStringInfo(line_buf); + appendBinaryStringInfo(line_buf, rowdup, re - rs); + pfree(rowdup); + if (tail_len > 0) + { + appendBinaryStringInfo(line_buf, taildup, tail_len); + pfree(taildup); + } + + st->parse_pos = re - rs; + appendStringInfoChar(line_buf, '\0'); + line_buf->len--; + + st->row_text_start = -1; + st->row_text_end = -1; +} + +/* + * Read the next JSON row from the input pipeline, analogous to CopyReadLine() + * for text format: on success, line_buf is [row object][unparsed tail]. + * Does not call jsonb_in(). + * + * Returns true if there is no next object (EOF / end of array). + * + * Each iteration consumes one byte (c) from line_buf, then runs the state + * machine. + */ +static bool +CopyReadNextJson(CopyFromState cstate) +{ + CopyFromJsonState *json_state = (CopyFromJsonState *) cstate->format_private; + StringInfo line_buf = &cstate->line_buf; + int obj_start = -1; + + for (;;) + { + if (!copy_json_refill_if_exhausted(cstate, &obj_start)) + { + cstate->line_buf_valid = false; + return true; + } + + while (json_state->parse_pos < line_buf->len) + { + const char *p = line_buf->data + json_state->parse_pos; + unsigned char c = (unsigned char) *p++; + + json_state->parse_pos = p - line_buf->data; + + switch (json_state->parse_state) + { + case JSON_PARSE_BEFORE_ARRAY: + if (c == '[') + { + json_state->parse_state = JSON_PARSE_IN_ARRAY; + json_state->array_mode = true; + continue; + } + if (c == '{') + { + /* Auto-detect concatenated objects {...}{...}. */ + json_state->parse_state = JSON_PARSE_IN_OBJECT; + json_state->object_depth = 1; + obj_start = (p - 1) - line_buf->data; + continue; + } + if (isspace(c)) + continue; + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + cstate->line_buf_valid = false; + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid input format for COPY JSON"), + errdetail("Document must begin with \"[\" or \"{\"."))); + pg_unreachable(); + + case JSON_PARSE_BEFORE_OBJECT: + if (c == '{') + { + json_state->parse_state = JSON_PARSE_IN_OBJECT; + json_state->object_depth = 1; + obj_start = (p - 1) - line_buf->data; + continue; + } + if (isspace(c)) + continue; + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + cstate->line_buf_valid = false; + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY JSON, line %" PRIu64 ": invalid input format", + cstate->cur_lineno), + errdetail("Expected \"{\" to start the next row object."))); + pg_unreachable(); + + case JSON_PARSE_IN_ARRAY: + if (c == ']') + { + json_state->parse_state = JSON_PARSE_ARRAY_END; + cstate->line_buf_valid = false; + return true; + } + if (c == '{') + { + json_state->parse_state = JSON_PARSE_IN_OBJECT; + json_state->object_depth = 1; + obj_start = (p - 1) - line_buf->data; + continue; + } + if (isspace(c)) + continue; + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + cstate->line_buf_valid = false; + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY JSON, line %" PRIu64 ": each array element must be a JSON object", + cstate->cur_lineno))); + pg_unreachable(); + + case JSON_PARSE_IN_OBJECT: + switch (c) + { + case '{': + json_state->object_depth++; + break; + case '}': + json_state->object_depth--; + if (json_state->object_depth == 0) + { + int obj_end = json_state->parse_pos; + + json_state->row_text_start = obj_start; + json_state->row_text_end = obj_end; + + json_state->parse_state = (json_state->array_mode) + ? JSON_PARSE_IN_ARRAY : JSON_PARSE_BEFORE_OBJECT; + + /* + * Eat spaces + */ + while (obj_end < line_buf->len && + isspace(line_buf->data[obj_end])) + obj_end++; + + /* + * In array mode, JSON requires a comma + * between elements. The next '{' is normally + * consumed in JSON_PARSE_IN_ARRAY; reject + * "}{" with only optional whitespace between. + */ + if (!json_state->array_mode) + { + if (obj_end < line_buf->len && + line_buf->data[obj_end] == ',') + { + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + cstate->line_buf_valid = false; + ereport(ERROR, + errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY JSON, line %" PRIu64 ": invalid input format", + cstate->cur_lineno), + errdetail("Cannot use a comma between concatenated JSON objects; use a JSON array.")); + pg_unreachable(); + } + } + else + { + if (obj_end < line_buf->len && + line_buf->data[obj_end] == '{') + { + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + cstate->line_buf_valid = false; + ereport(ERROR, + errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY JSON, line %" PRIu64 ": invalid input format", + cstate->cur_lineno), + errdetail("Expected \",\" between array elements.")); + pg_unreachable(); + } + + if (obj_end < line_buf->len && + line_buf->data[obj_end] == ',') + { + obj_end++; + while (obj_end < line_buf->len && + isspace(line_buf->data[obj_end])) + obj_end++; + } + } + json_state->parse_pos = obj_end; + + copy_json_finalize_linebuf_for_row(cstate, line_buf); + return false; + } + break; + case '[': + json_state->object_depth++; + break; + case ']': + json_state->object_depth--; + break; + case '"': + json_state->parse_state = JSON_PARSE_IN_STRING; + break; + case '\\': + cstate->line_buf_valid = false; + ereport(ERROR, + errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid input syntax for type json")); + break; + default: + break; + } + break; + + case JSON_PARSE_IN_STRING: + if (c == '\\') + json_state->parse_state = JSON_PARSE_IN_STRING_ESC; + else if (c == '"') + json_state->parse_state = JSON_PARSE_IN_OBJECT; + break; + + case JSON_PARSE_IN_STRING_ESC: + json_state->parse_state = JSON_PARSE_IN_STRING; + break; + + case JSON_PARSE_ARRAY_END: + cstate->line_buf_valid = false; + return true; + } + } + } +} + /* * CopyLoadInputBuf loads some more data into input_buf * @@ -786,7 +1202,6 @@ NextCopyFromRawFieldsInternal(CopyFromState cstate, char ***fields, int *nfields /* on input check that the header line is correct if needed */ if (cstate->cur_lineno == 0 && cstate->opts.header_line != COPY_HEADER_FALSE) { - ListCell *cur; TupleDesc tupDesc; int lines_to_skip = cstate->opts.header_line; @@ -819,9 +1234,8 @@ NextCopyFromRawFieldsInternal(CopyFromState cstate, char ***fields, int *nfields fldct, list_length(cstate->attnumlist)))); fldnum = 0; - foreach(cur, cstate->attnumlist) + foreach_int(attnum, cstate->attnumlist) { - int attnum = lfirst_int(cur); char *colName; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); @@ -959,7 +1373,6 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext, Oid *typioparams = cstate->typioparams; ExprState **defexprs = cstate->defexprs; char **field_strings; - ListCell *cur; int fldct; int fieldno; char *string; @@ -981,9 +1394,8 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext, fieldno = 0; /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) + foreach_int(attnum, cstate->attnumlist) { - int attnum = lfirst_int(cur); int m = attnum - 1; Form_pg_attribute att = TupleDescAttr(tupDesc, m); @@ -1169,7 +1581,6 @@ CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, FmgrInfo *in_functions = cstate->in_functions; Oid *typioparams = cstate->typioparams; int16 fld_count; - ListCell *cur; tupDesc = RelationGetDescr(cstate->rel); attr_count = list_length(cstate->attnumlist); @@ -1207,9 +1618,8 @@ CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, errmsg("row field count is %d, expected %d", fld_count, attr_count))); - foreach(cur, cstate->attnumlist) + foreach_int(attnum, cstate->attnumlist) { - int attnum = lfirst_int(cur); int m = attnum - 1; Form_pg_attribute att = TupleDescAttr(tupDesc, m); @@ -1225,6 +1635,292 @@ CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, return true; } +/* + * Convert JsonbValue to cstring for input function. Returns NULL for jbvNull. + * Caller must pfree the result (when not NULL). + */ +static char * +JsonbValueToCstring(JsonbValue *v) +{ + switch (v->type) + { + case jbvNull: + return NULL; + case jbvString: + return pnstrdup(v->val.string.val, v->val.string.len); + case jbvNumeric: + return DatumGetCString(DirectFunctionCall1(numeric_out, + PointerGetDatum(v->val.numeric))); + case jbvBool: + return pstrdup(v->val.boolean ? "t" : "f"); + case jbvBinary: + return JsonbToCString(NULL, v->val.binary.data, v->val.binary.len); + default: + elog(ERROR, "unrecognized jsonb type: %d", (int) v->type); + return NULL; + } +} + +/* + * Read the next JSON object (CopyReadNextJson, then jsonb_in on the row prefix + * of line_buf; parse_pos is row length, tail must not be passed to jsonb_in) + * and extract raw field strings by column name. + * + * Mimics NextCopyFromRawFieldsInternal for JSON: populates raw_fields with + * string values for each column in attnumlist order (by matching object keys + * to attname). Values are stored in attribute_buf. NULL for missing keys + * or null values. + * + * Returns false if no more objects. On success, *fields and *nfields are set. + */ +static bool +NextCopyFromJsonRawFieldsInternal(CopyFromState cstate, char ***fields, int *nfields) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + CopyFromJsonState *json_state = (CopyFromJsonState *) cstate->format_private; + Jsonb *jb; + JsonbValue vbuf; + int fieldno; + char *rowtxt; + + Assert(cstate->opts.format == COPY_FORMAT_JSON); + + if (CopyReadNextJson(cstate)) + return false; + + /* + * line_buf is [row][scan tail]; only the row prefix is meaningful as a + * "line" for errors. Clear before jsonb_in so constraint/input errors + * report COPY context without the tail (e.g. closing ']'). + */ + cstate->line_buf_valid = false; + + rowtxt = pnstrdup(cstate->line_buf.data, json_state->parse_pos); + jb = DatumGetJsonbP(DirectFunctionCall1(jsonb_in, CStringGetDatum(rowtxt))); + pfree(rowtxt); + + if (!JB_ROOT_IS_OBJECT(jb)) + { + if (cstate->cur_lineno == 0) + cstate->cur_lineno = 1; + ereport(ERROR, + errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY JSON, line %" PRIu64 ": each array element must be a JSON object", + cstate->cur_lineno)); + } + + cstate->cur_lineno++; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* Ensure we have enough raw_fields slots (JSON has exactly attr_count) */ + while (attr_count > cstate->max_fields) + { + cstate->max_fields *= 2; + cstate->raw_fields = + repalloc_array(cstate->raw_fields, char *, cstate->max_fields); + } + + resetStringInfo(&cstate->attribute_buf); + + /* + * Extract values for each column by name, store in attribute_buf / + * raw_fields + */ + fieldno = 0; + foreach_int(attnum, cstate->attnumlist) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + const char *attname = NameStr(att->attname); + JsonbValue *val; + char *string; + + val = getKeyJsonValueFromContainer(&jb->root, + attname, strlen(attname), + &vbuf); + + if (val == NULL || val->type == jbvNull) + { + cstate->raw_fields[fieldno] = NULL; + } + else + { + string = JsonbValueToCstring(val); + if (string != NULL) + { + size_t len = strlen(string) + 1; + + /* + * Ensure space so append won't realloc and invalidate + * pointers + */ + if (cstate->attribute_buf.maxlen - cstate->attribute_buf.len < (int) len) + enlargeStringInfo(&cstate->attribute_buf, (int) len); + + cstate->raw_fields[fieldno] = cstate->attribute_buf.data + cstate->attribute_buf.len; + appendBinaryStringInfo(&cstate->attribute_buf, string, len); + pfree(string); + } + else + cstate->raw_fields[fieldno] = NULL; + } + fieldno++; + } + + pfree(jb); + + *fields = cstate->raw_fields; + *nfields = attr_count; + return true; +} + +/* Implementation of the per-row callback for JSON format */ +bool +CopyFromJsonOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + int fldct; + int fieldno; + char *string; + bool current_row_erroneous = false; + + /* read raw fields from the next JSON object (by column name) */ + if (!NextCopyFromJsonRawFieldsInternal(cstate, &field_strings, &fldct)) + return false; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + fieldno = 0; + + /* Loop to convert field strings to Datums for each column */ + foreach_int(attnum, cstate->attnumlist) + { + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + Assert(fieldno < fldct); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE) + cstate->num_errors++; + else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL) + { + cstate->escontext->error_occurred = false; + + Assert(cstate->domain_with_constraint != NULL); + + if (!cstate->domain_with_constraint[m] || + InputFunctionCallSafe(&in_functions[m], + NULL, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + nulls[m] = true; + values[m] = (Datum) 0; + } + else + ereport(ERROR, + errcode(ERRCODE_NOT_NULL_VIOLATION), + errmsg("domain %s does not allow null values", + format_type_be(typioparams[m])), + errdetail("ON_ERROR SET_NULL cannot be applied because column \"%s\" (domain %s) does not accept null values.", + cstate->cur_attname, + format_type_be(typioparams[m])), + errdatatype(typioparams[m])); + + if (!current_row_erroneous) + { + current_row_erroneous = true; + cstate->num_errors++; + } + } + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval = CopyLimitPrintoutLength(cstate->cur_attval); + + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE) + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %" PRIu64 " for column \"%s\": \"%s\"", + cstate->cur_lineno, + cstate->cur_attname, + attval)); + else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL) + ereport(NOTICE, + errmsg("setting to null due to data type incompatibility at line %" PRIu64 " for column \"%s\": \"%s\"", + cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE) + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %" PRIu64 " for column \"%s\": null input", + cstate->cur_lineno, + cstate->cur_attname)); + cstate->relname_only = false; + } + + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE) + return true; + else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL) + continue; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + + return true; +} + /* * Read the next input line and stash it in line_buf. * diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 219f604df7b..76447a1236a 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -20148,7 +20148,7 @@ ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partParams, AttrNu * SET EXPRESSION would need to check whether the column is * used in partition keys). Seems safer to prohibit for now. */ - if (TupleDescAttr(RelationGetDescr(rel), attno - 1)->attgenerated) + if (TupleDescCompactAttr(RelationGetDescr(rel), attno - 1)->attgenerated) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("cannot use generated column in partition key"), diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 9d3e244ee55..651735ee55a 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -17,6 +17,33 @@ #include "commands/copy.h" #include "commands/trigger.h" #include "nodes/miscnodes.h" +#include "utils/jsonb.h" + +/* + * State for COPY FROM JSON format. line_buf holds decoded UTF-8: while + * scanning, parse_pos advances; after each row, line_buf is [row object][tail] + * and parse_pos is the row length (tail starts there) for jsonb_in. + */ +typedef enum JsonParseState +{ + JSON_PARSE_BEFORE_ARRAY, /* skip whitespace, expect '[' (array mode) */ + JSON_PARSE_BEFORE_OBJECT, /* after {...} row when input is {...}{...} form */ + JSON_PARSE_IN_ARRAY, /* skip whitespace/comma, expect '{' or ']' */ + JSON_PARSE_IN_OBJECT, /* inside {...}, track depth to find matching '}' */ + JSON_PARSE_IN_STRING, /* inside "...", skip until unescaped '"' */ + JSON_PARSE_IN_STRING_ESC, /* saw '\' in string, consume escape sequence */ + JSON_PARSE_ARRAY_END /* saw ']', no more rows */ +} JsonParseState; + +typedef struct CopyFromJsonState +{ + JsonParseState parse_state; + int object_depth; /* brace/bracket depth: 1 = in target object */ + bool array_mode; /* have we seen the opening '[' */ + int parse_pos; /* scan cursor in line_buf; after row = row len */ + int row_text_start; /* set while completing a row; else -1 */ + int row_text_end; /* byte offset just past row's closing '}' */ +} CopyFromJsonState; /* * Represents the different source cases we need to worry about at @@ -189,6 +216,9 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* Format-specific private data */ + void *format_private; } CopyFromStateData; extern void ReceiveCopyBegin(CopyFromState cstate); @@ -201,5 +231,7 @@ extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); +extern bool CopyFromJsonOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); #endif /* COPYFROM_INTERNAL_H */ diff --git a/src/test/regress/expected/copy.out b/src/test/regress/expected/copy.out index 1714faab39c..aae2f0ffa44 100644 --- a/src/test/regress/expected/copy.out +++ b/src/test/regress/expected/copy.out @@ -135,9 +135,87 @@ LINE 1: copy copytest to stdout (format json, on_error ignore); ^ copy copytest to stdout (format json, reject_limit 1); ERROR: COPY REJECT_LIMIT requires ON_ERROR to be set to IGNORE -copy copytest from stdin(format json); -ERROR: COPY FORMAT JSON is not supported for COPY FROM -- all of the above should yield error +-- COPY FROM JSON: each array element is a row, object keys match column names +create temp table copytest_from_json (like copytest); +copy copytest_from_json (style, test) from stdin (format json); +select * from copytest_from_json order by style; + style | test | filler +---------+----------+-------- + DOS | abc\r +| + | def | + Mac | abc\rdef | + Unix | abc +| + | def | + esc\ape | a\r\\r\ +| + | \nb | +(4 rows) + +-- Round trip: COPY TO JSON file, then COPY FROM JSON file +\set copy_json_rt :abs_builddir '/results/copytest_roundtrip.json' +truncate copytest2; +copy copytest to :'copy_json_rt' (format json); +copy copytest2 from :'copy_json_rt' (format json); +select * from copytest except select * from copytest2; + style | test | filler +-------+------+-------- +(0 rows) + +truncate copytest2; +copy copytest to :'copy_json_rt' (format json, force_array true); +copy copytest2 from :'copy_json_rt' (format json); +select * from copytest except select * from copytest2; + style | test | filler +-------+------+-------- +(0 rows) + +-- COPY FROM JSON edge cases: invalid input, non-array, missing required field +copy copytest_from_json from stdin (format json); +ERROR: invalid input format for COPY JSON +DETAIL: Document must begin with "[" or "{". +CONTEXT: COPY copytest_from_json, line 1 +copy copytest_from_json from stdin (format json); +ERROR: COPY JSON, line 1: each array element must be a JSON object +CONTEXT: COPY copytest_from_json, line 1 +copy copytest_from_json from stdin (format json); +ERROR: COPY JSON, line 1: each array element must be a JSON object +CONTEXT: COPY copytest_from_json, line 1 +copy copytest_from_json from stdin (format json); +ERROR: COPY JSON, line 1: invalid input format +DETAIL: Expected "," between array elements. +CONTEXT: COPY copytest_from_json, line 1 +copy copytest_from_json from stdin (format json); +ERROR: COPY JSON, line 1: invalid input format +DETAIL: Cannot use a comma between concatenated JSON objects; use a JSON array. +CONTEXT: COPY copytest_from_json, line 1 +copy copytest_from_json from stdin (format json); +ERROR: invalid input format for COPY JSON +DETAIL: JSON array input was not closed with "]". +CONTEXT: COPY copytest_from_json, line 2 +copy copytest_from_json from stdin (format json); +ERROR: invalid input format for COPY JSON +DETAIL: JSON array input was not closed with "]". +CONTEXT: COPY copytest_from_json, line 2 +create temp table copyjson_req (style text NOT NULL, test text); +copy copyjson_req from stdin (format json); +ERROR: null value in column "style" of relation "copyjson_req" violates not-null constraint +DETAIL: Failing row contains (null, only test). +CONTEXT: COPY copyjson_req, line 1 +copy copyjson_req from stdin (format json); +select * from copyjson_req; + style | test +-------+------ + ok | both +(1 row) + +truncate copytest_from_json; +copy copytest_from_json (style, test) from stdin (format json); +select style, test, filler from copytest_from_json; + style | test | filler +-------+------+-------- + a | b | +(1 row) + -- column list with json format copy copytest (style, test, filler) to stdout (format json); {"style":"DOS","test":"abc\r\ndef","filler":1} diff --git a/src/test/regress/sql/copy.sql b/src/test/regress/sql/copy.sql index eaad290b257..1a23063edc4 100644 --- a/src/test/regress/sql/copy.sql +++ b/src/test/regress/sql/copy.sql @@ -105,9 +105,63 @@ copy copytest to stdout (format json, force_not_null *); copy copytest to stdout (format json, force_null *); copy copytest to stdout (format json, on_error ignore); copy copytest to stdout (format json, reject_limit 1); -copy copytest from stdin(format json); -- all of the above should yield error +-- COPY FROM JSON: each array element is a row, object keys match column names +create temp table copytest_from_json (like copytest); +copy copytest_from_json (style, test) from stdin (format json); +[ {"style":"DOS","test":"abc\r\ndef"} ,{"style":"Unix","test":"abc\ndef"} ,{"style":"Mac","test":"abc\rdef"} ,{"style":"esc\\ape","test":"a\\r\\\r\\\n\\nb"} ] +\. +select * from copytest_from_json order by style; + +-- Round trip: COPY TO JSON file, then COPY FROM JSON file +\set copy_json_rt :abs_builddir '/results/copytest_roundtrip.json' +truncate copytest2; +copy copytest to :'copy_json_rt' (format json); +copy copytest2 from :'copy_json_rt' (format json); +select * from copytest except select * from copytest2; + +truncate copytest2; +copy copytest to :'copy_json_rt' (format json, force_array true); +copy copytest2 from :'copy_json_rt' (format json); +select * from copytest except select * from copytest2; + +-- COPY FROM JSON edge cases: invalid input, non-array, missing required field +copy copytest_from_json from stdin (format json); +not valid json +\. +copy copytest_from_json from stdin (format json); +[1, 2, 3] +\. +copy copytest_from_json from stdin (format json); +[null, true, "string"] +\. +copy copytest_from_json from stdin (format json); +[{"style":"a","test":"b"} {"style":"c","test":"d"}] +\. +copy copytest_from_json from stdin (format json); +{"style":"a","test":"b"}, {"style":"c","test":"d"} +\. +copy copytest_from_json from stdin (format json); +[{"style":"a","test":"b"},{"style":"c","test":"d"} +\. +copy copytest_from_json from stdin (format json); +[{"style":"a","test":"b"},{"style":"c","test":"d"}, +\. +create temp table copyjson_req (style text NOT NULL, test text); +copy copyjson_req from stdin (format json); +[{"test":"only test"}] +\. +copy copyjson_req from stdin (format json); +[{"style":"ok","test":"both"}] +\. +select * from copyjson_req; +truncate copytest_from_json; +copy copytest_from_json (style, test) from stdin (format json); +[{"style":"a","test":"b","extra":"ignored","filler":999}] +\. +select style, test, filler from copytest_from_json; + -- column list with json format copy copytest (style, test, filler) to stdout (format json); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e1565329f1c..595148b76bf 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -532,6 +532,7 @@ CookedConstraint CopyDest CopyFormat CopyFormatOptions +CopyFromJsonState CopyFromRoutine CopyFromState CopyFromStateData -- 2.41.0